欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

Spark中分區(qū)器的作用是什么

本篇文章為大家展示了Spark中分區(qū)器的作用是什么,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛(ài)。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名注冊(cè)、虛擬空間、營(yíng)銷軟件、網(wǎng)站建設(shè)、白云鄂網(wǎng)站維護(hù)、網(wǎng)站推廣。

在Spark中給自己挖了一個(gè)數(shù)據(jù)傾斜的坑。為了解決這個(gè)問(wèn)題,順便研究了下Spark分區(qū)器的原理,趁著周末加班總結(jié)一下~

先說(shuō)說(shuō)數(shù)據(jù)傾斜

數(shù)據(jù)傾斜是指Spark中的RDD在計(jì)算的時(shí)候,每個(gè)RDD內(nèi)部的分區(qū)包含的數(shù)據(jù)不平均。比如一共有5個(gè)分區(qū),其中一個(gè)占有了90%的數(shù)據(jù),這就導(dǎo)致本來(lái)5個(gè)分區(qū)可以5個(gè)人一起并行干活,結(jié)果四個(gè)人不怎么干活,工作全都?jí)旱揭粋€(gè)人身上了。遇到這種問(wèn)題,網(wǎng)上有很多的解決辦法。

但是如果是底層數(shù)據(jù)的問(wèn)題,無(wú)論怎么優(yōu)化,還是無(wú)法解決數(shù)據(jù)傾斜的。

比如你想要對(duì)某個(gè)rdd做groupby,然后做join操作,如果分組的key就是分布不均勻的,那么真樣都是無(wú)法優(yōu)化的。因?yàn)橐坏┻@個(gè)key被切分,就無(wú)法完整的做join了,如果不對(duì)這個(gè)key切分,必然會(huì)造成對(duì)應(yīng)的分區(qū)數(shù)據(jù)傾斜。

不過(guò),了解數(shù)據(jù)為什么會(huì)傾斜還是很重要的,繼續(xù)往下看吧!

分區(qū)的作用

在PairRDD即(key,value)這種格式的rdd中,很多操作都是基于key的,因此為了獨(dú)立分割任務(wù),會(huì)按照key對(duì)數(shù)據(jù)進(jìn)行重組。比如groupbykey

Spark中分區(qū)器的作用是什么

重組肯定是需要一個(gè)規(guī)則的,最常見(jiàn)的就是基于Hash,Spark還提供了一種稍微復(fù)雜點(diǎn)的基于抽樣的Range分區(qū)方法。

下面我們先看看分區(qū)器在Spark計(jì)算流程中是怎么使用的:

Paritioner的使用

就拿groupbykey來(lái)說(shuō):

def groupByKey(): JavaPairRDD[K, JIterable[V]] =     fromRDD(groupByResultToJava(rdd.groupByKey()))

它會(huì)調(diào)用PairRDDFunction的groupByKey()方法

def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {     groupByKey(defaultPartitioner(self))   }

在這個(gè)方法里面創(chuàng)建了默認(rèn)的分區(qū)器。默認(rèn)的分區(qū)器是這樣定義的:

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {     val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse     for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {       return r.partitioner.get     }     if (rdd.context.conf.contains("spark.default.parallelism")) {       new HashPartitioner(rdd.context.defaultParallelism)     } else {       new HashPartitioner(bySize.head.partitions.size)     }   }

首先獲取當(dāng)前分區(qū)的分區(qū)個(gè)數(shù),如果沒(méi)有設(shè)置spark.default.parallelism參數(shù),則創(chuàng)建一個(gè)跟之前分區(qū)個(gè)數(shù)一樣的Hash分區(qū)器。

當(dāng)然,用戶也可以自定義分區(qū)器,或者使用其他提供的分區(qū)器。API里面也是支持的:

// 傳入分區(qū)器對(duì)象 def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =     fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) // 傳入分區(qū)的個(gè)數(shù) def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =     fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

HashPatitioner

Hash分區(qū)器,是最簡(jiǎn)單也是默認(rèn)提供的分區(qū)器,了解它的分區(qū)規(guī)則,對(duì)我們處理數(shù)據(jù)傾斜或者設(shè)計(jì)分組的key時(shí),還是很有幫助的。

class HashPartitioner(partitions: Int) extends Partitioner {   require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")    def numPartitions: Int = partitions    // 通過(guò)key計(jì)算其HashCode,并根據(jù)分區(qū)數(shù)取模。如果結(jié)果小于0,直接加上分區(qū)數(shù)。   def getPartition(key: Any): Int = key match {     case null => 0     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)   }    // 對(duì)比兩個(gè)分區(qū)器是否相同,直接對(duì)比其分區(qū)個(gè)數(shù)就行   override def equals(other: Any): Boolean = other match {     case h: HashPartitioner =>       h.numPartitions == numPartitions     case _ =>       false   }    override def hashCode: Int = numPartitions }

這里最重要的是這個(gè)Utils.nonNegativeMod(key.hashCode, numPartitions),它決定了數(shù)據(jù)進(jìn)入到哪個(gè)分區(qū)。

def nonNegativeMod(x: Int, mod: Int): Int = {     val rawMod = x % mod     rawMod + (if (rawMod < 0) mod else 0)   }

說(shuō)白了,就是基于這個(gè)key獲取它的hashCode,然后對(duì)分區(qū)個(gè)數(shù)取模。由于HashCode可能為負(fù),這里直接判斷下,如果小于0,再加上分區(qū)個(gè)數(shù)即可。

因此,基于hash的分區(qū),只要保證你的key是分散的,那么最終數(shù)據(jù)就不會(huì)出現(xiàn)數(shù)據(jù)傾斜的情況。

RangePartitioner

這個(gè)分區(qū)器,適合想要把數(shù)據(jù)打散的場(chǎng)景,但是如果相同的key重復(fù)量很大,依然會(huì)出現(xiàn)數(shù)據(jù)傾斜的情況。

每個(gè)分區(qū)器,最核心的方法,就是getPartition

def getPartition(key: Any): Int = {     val k = key.asInstanceOf[K]     var partition = 0     if (rangeBounds.length <= 128) {       // If we have less than 128 partitions naive search       while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {         partition += 1       }     } else {       // Determine which binary search method to use only once.       partition = binarySearch(rangeBounds, k)       // binarySearch either returns the match location or -[insertion point]-1       if (partition < 0) {         partition = -partition-1       }       if (partition > rangeBounds.length) {         partition = rangeBounds.length       }     }     if (ascending) {       partition     } else {       rangeBounds.length - partition     }   }

在range分區(qū)中,會(huì)存儲(chǔ)一個(gè)邊界的數(shù)組,比如[1,100,200,300,400],然后對(duì)比傳進(jìn)來(lái)的key,返回對(duì)應(yīng)的分區(qū)id。

那么這個(gè)邊界是怎么確定的呢?

這就是Range分區(qū)最核心的算法了,大概描述下,就是遍歷每個(gè)paritiion,對(duì)里面的數(shù)據(jù)進(jìn)行抽樣,把抽樣的數(shù)據(jù)進(jìn)行排序,并按照對(duì)應(yīng)的權(quán)重確定邊界。

有幾個(gè)比較重要的地方:

1 抽樣

2 確定邊界

關(guān)于抽樣,有一個(gè)很常見(jiàn)的算法題,即在不知道數(shù)據(jù)規(guī)模的情況下,如何以等概率的方式,隨機(jī)選擇一個(gè)值。

最笨的辦法,就是遍歷一次數(shù)據(jù),知道數(shù)據(jù)的規(guī)模,然后隨機(jī)一個(gè)數(shù),取其對(duì)應(yīng)的值。其實(shí)這樣相當(dāng)于遍歷了兩次(第二次的取值根據(jù)不同的存儲(chǔ)介質(zhì),可能不同)。

在Spark中,是使用水塘抽樣這種算法。即首先取***個(gè)值,然后依次往后遍歷;第二個(gè)值有二分之一的幾率替換選出來(lái)的值;第三個(gè)值有三分之一的幾率替換選出來(lái)的值;&hellip;;直到遍歷到***一個(gè)值。這樣,通過(guò)依次遍歷就取出來(lái)隨機(jī)的數(shù)值了。

算法參考源碼:

private var rangeBounds: Array[K] = {     if (partitions <= 1) {       Array.empty     } else {       // This is the sample size we need to have roughly balanced output partitions, capped at 1M.       // ***采樣數(shù)量不能超過(guò)1M。比如,如果分區(qū)是5,采樣數(shù)為100       val sampleSize = math.min(20.0 * partitions, 1e6)       // Assume the input partitions are roughly balanced and over-sample a little bit.       // 每個(gè)分區(qū)的采樣數(shù)為平均值的三倍,避免數(shù)據(jù)傾斜造成的數(shù)據(jù)量過(guò)少       val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt        // 真正的采樣算法(參數(shù)1:rdd的key數(shù)組, 采樣個(gè)數(shù))       val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)       if (numItems == 0L) {         Array.empty       } else {         // If a partition contains much more than the average number of items, we re-sample from it         // to ensure that enough items are collected from that partition.         // 如果有的分區(qū)包含的數(shù)量遠(yuǎn)超過(guò)平均值,那么需要對(duì)它重新采樣。每個(gè)分區(qū)的采樣數(shù)/采樣返回的總的記錄數(shù)         val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)         //保存有效的采樣數(shù)         val candidates = ArrayBuffer.empty[(K, Float)]         //保存數(shù)據(jù)傾斜導(dǎo)致的采樣數(shù)過(guò)多的信息         val imbalancedPartitions = mutable.Set.empty[Int]          sketched.foreach { case (idx, n, sample) =>           if (fraction * n > sampleSizePerPartition) {             imbalancedPartitions += idx           } else {             // The weight is 1 over the sampling probability.             val weight = (n.toDouble / sample.size).toFloat             for (key <- sample) {               candidates += ((key, weight))             }           }         }         if (imbalancedPartitions.nonEmpty) {           // Re-sample imbalanced partitions with the desired sampling probability.           val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)           val seed = byteswap32(-rdd.id - 1)           //基于RDD獲取采樣數(shù)據(jù)           val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()           val weight = (1.0 / fraction).toFloat           candidates ++= reSampled.map(x => (x, weight))         }         RangePartitioner.determineBounds(candidates, partitions)       }     }   }      def sketch[K : ClassTag](       rdd: RDD[K],       sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {     val shift = rdd.id     // val classTagK = classTag[K] // to avoid serializing the entire partitioner object     val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>       val seed = byteswap32(idx ^ (shift << 16))       val (sample, n) = SamplingUtils.reservoirSampleAndCount(         iter, sampleSizePerPartition, seed)       //包裝成三元組,(索引號(hào),分區(qū)的內(nèi)容個(gè)數(shù),抽樣的內(nèi)容)       Iterator((idx, n, sample))     }.collect()     val numItems = sketched.map(_._2).sum     //返回(數(shù)據(jù)條數(shù),(索引號(hào),分區(qū)的內(nèi)容個(gè)數(shù),抽樣的內(nèi)容))     (numItems, sketched)   }

真正的抽樣算法在SamplingUtils中,由于在Spark中是需要一次性取多個(gè)值的,因此直接去前n個(gè)數(shù)值,然后依次概率替換即可:

def reservoirSampleAndCount[T: ClassTag](       input: Iterator[T],       k: Int,       seed: Long = Random.nextLong())     : (Array[T], Long) = {     //創(chuàng)建臨時(shí)數(shù)組     val reservoir = new Array[T](k)     // Put the first k elements in the reservoir.     // 取出前k個(gè)數(shù),并把對(duì)應(yīng)的rdd中的數(shù)據(jù)放入對(duì)應(yīng)的序號(hào)的數(shù)組中     var i = 0     while (i < k && input.hasNext) {       val item = input.next()       reservoir(i) = item       i += 1     }      // If we have consumed all the elements, return them. Otherwise do the replacement.     // 如果全部的元素,比要抽取的采樣數(shù)少,那么直接返回     if (i < k) {       // If input size < k, trim the array to return only an array of input size.       val trimReservoir = new Array[T](i)       System.arraycopy(reservoir, 0, trimReservoir, 0, i)       (trimReservoir, i)      // 否則開(kāi)始抽樣替換     } else {       // If input size > k, continue the sampling process.       // 從剛才的序號(hào)開(kāi)始,繼續(xù)遍歷       var l = i.toLong       // 隨機(jī)數(shù)       val rand = new XORShiftRandom(seed)       while (input.hasNext) {         val item = input.next()         // 隨機(jī)一個(gè)數(shù)與當(dāng)前的l相乘,如果小于采樣數(shù)k,就替換。(越到后面,替換的概率越小...)         val replacementIndex = (rand.nextDouble() * l).toLong         if (replacementIndex < k) {           reservoir(replacementIndex.toInt) = item         }         l += 1       }       (reservoir, l)     }   }

確定邊界

最后就可以通過(guò)獲取的樣本數(shù)據(jù),確定邊界了。

def determineBounds[K : Ordering : ClassTag](       candidates: ArrayBuffer[(K, Float)],       partitions: Int): Array[K] = {     val ordering = implicitly[Ordering[K]]     // 數(shù)據(jù)格式為(key,權(quán)重)     val ordered = candidates.sortBy(_._1)     val numCandidates = ordered.size     val sumWeights = ordered.map(_._2.toDouble).sum     val step = sumWeights / partitions     var cumWeight = 0.0     var target = step     val bounds = ArrayBuffer.empty[K]     var i = 0     var j = 0     var previousBound = Option.empty[K]     while ((i < numCandidates) && (j < partitions - 1)) {       val (key, weight) = ordered(i)       cumWeight += weight       if (cumWeight >= target) {         // Skip duplicate values.         if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {           bounds += key           target += step           j += 1           previousBound = Some(key)         }       }       i += 1     }     bounds.toArray   }

直接看代碼,還是有些晦澀難懂,我們舉個(gè)例子,一步一步解釋下:

Spark中分區(qū)器的作用是什么

按照上面的算法流程,大致可以理解:

抽樣-->確定邊界(排序)

首先對(duì)spark有一定了解的都應(yīng)該知道,在spark中每個(gè)RDD可以理解為一組分區(qū),這些分區(qū)對(duì)應(yīng)了內(nèi)存塊block,他們才是數(shù)據(jù)最終的載體。那么一個(gè)RDD由不同的分區(qū)組成,這樣在處理一些map,filter等算子的時(shí)候,就可以直接以分區(qū)為單位并行計(jì)算了。直到遇到shuffle的時(shí)候才需要和其他的RDD配合。

在上面的圖中,如果我們不特殊設(shè)置的話,一個(gè)RDD由3個(gè)分區(qū)組成,那么在對(duì)它進(jìn)行g(shù)roupbykey的時(shí)候,就會(huì)按照3進(jìn)行分區(qū)。

按照上面的算法流程,如果分區(qū)數(shù)為3,那么采樣的大小為:

val sampleSize = math.min(20.0 * partitions, 1e6)

即采樣數(shù)為60,每個(gè)分區(qū)取60個(gè)數(shù)。但是考慮到數(shù)據(jù)傾斜的情況,有的分區(qū)可能數(shù)據(jù)很多,因此在實(shí)際的采樣時(shí),會(huì)按照3倍大小采樣:

val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt

也就是說(shuō),最多會(huì)取60個(gè)樣本數(shù)據(jù)。

然后就是遍歷每個(gè)分區(qū),取對(duì)應(yīng)的樣本數(shù)。

val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>       val seed = byteswap32(idx ^ (shift << 16))       val (sample, n) = SamplingUtils.reservoirSampleAndCount(         iter, sampleSizePerPartition, seed)       //包裝成三元組,(索引號(hào),分區(qū)的內(nèi)容個(gè)數(shù),抽樣的內(nèi)容)       Iterator((idx, n, sample))     }.collect()

然后檢查,是否有分區(qū)的樣本數(shù)過(guò)多,如果多于平均值,則繼續(xù)采樣,這時(shí)直接用sample 就可以了

sketched.foreach { case (idx, n, sample) =>           if (fraction * n > sampleSizePerPartition) {             imbalancedPartitions += idx           } else {             // The weight is 1 over the sampling probability.             val weight = (n.toDouble / sample.size).toFloat             for (key <- sample) {               candidates += ((key, weight))             }           }         }         if (imbalancedPartitions.nonEmpty) {           // Re-sample imbalanced partitions with the desired sampling probability.           val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)           val seed = byteswap32(-rdd.id - 1)           //基于RDD獲取采樣數(shù)據(jù)           val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()           val weight = (1.0 / fraction).toFloat           candidates ++= reSampled.map(x => (x, weight))         }

取出樣本后,就到了確定邊界的時(shí)候了。

注意每個(gè)key都會(huì)有一個(gè)權(quán)重,這個(gè)權(quán)重是 【分區(qū)的數(shù)據(jù)總數(shù)/樣本數(shù)】

RangePartitioner.determineBounds(candidates, partitions)

首先排序val ordered = candidates.sortBy(_._1),然后確定一個(gè)權(quán)重的步長(zhǎng)

val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions

基于該步長(zhǎng),確定邊界,***就形成了幾個(gè)范圍數(shù)據(jù)。

然后分區(qū)器形成二叉樹(shù),遍歷該數(shù)確定每個(gè)key對(duì)應(yīng)的分區(qū)id

partition = binarySearch(rangeBounds, k)

實(shí)踐 &mdash;&mdash; 自定義分區(qū)器

自定義分區(qū)器,也是很簡(jiǎn)單的,只需要實(shí)現(xiàn)對(duì)應(yīng)的兩個(gè)方法就行:

public class MyPartioner extends Partitioner {     @Override     public int numPartitions() {         return 1000;     }      @Override     public int getPartition(Object key) {         String k = (String) key;         int code = k.hashCode() % 1000;         System.out.println(k+":"+code);         return  code < 0?code+1000:code;     }      @Override     public boolean equals(Object obj) {         if(obj instanceof MyPartioner){             if(this.numPartitions()==((MyPartioner) obj).numPartitions()){                 return true;             }             return false;         }         return super.equals(obj);     } }

使用的時(shí)候,可以直接new一個(gè)對(duì)象即可。

pairRdd.groupbykey(new MyPartitioner())

上述內(nèi)容就是Spark中分區(qū)器的作用是什么,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

標(biāo)題名稱:Spark中分區(qū)器的作用是什么
文章位置:http://aaarwkj.com/article32/igegsc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站關(guān)鍵詞優(yōu)化、手機(jī)網(wǎng)站建設(shè)搜索引擎優(yōu)化、外貿(mào)建站、軟件開(kāi)發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
亚洲成人久久久久久久| 亚洲另类欧美日韩中文字幕 | 麻豆蜜桃精品视频在线观看| 国产精品中文字幕有码| 成人在线一区二区三区观看| 乱熟av一区二区三区| 欧美日韩在线视频第三区| 给我搜亚洲免费播放黄色大片| 亚洲激情中文字幕av网| 九九九热精品视频在线观看| av在线中文字幕乱码| 中文字幕成人乱码亚洲| 97碰碰视频在线观看| 91激情黑丝在线观看| 欧美日韩综合在线第一页| 午夜毛片免费在线播放| 国产av综合一区二区三区最新 | 91麻豆精品国产91久5久久| 国语对白精品视频在线| 亚洲国产欧美精品综合在线| 国产精品极品网站91青青| 国产白丝诱惑在线视频| 国产精品岛国片在线观看| 粗暴蹂躏中文一区二区三区| 国产av白浆一区二区色爽黄| 九九免费在线视频观看| 日韩欧美一区二区麻豆| 朋友的尤物人妻中文字幕| 视频免费观看网站不卡| 亚洲欧美综合一区二区三区| 内射嫩国产欧美国产日韩欧美| 亚洲精品视频久久免费| av日韩在线一区二区三区| 精品亚洲午夜久久久久| 色久悠悠婷婷综合在线亚洲| 亚洲最大午夜福利视频| 国产做a爰片久久91| 蜜臀av中文字幕亚洲| 尤物视频在线观看官网| 国产精品一区二区三区久久| 偷窥偷拍原味一区二区三区|