小編給大家分享一下Spark的廣播變量和累加器怎么用,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
江寧ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書(shū)未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書(shū)銷(xiāo)售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18982081108(備注:SSL證書(shū)合作)期待與您的合作!
一、廣播變量和累加器
通常情況下,當(dāng)向Spark操作(如map,reduce)傳遞一個(gè)函數(shù)時(shí),它會(huì)在一個(gè)遠(yuǎn)程集群節(jié)點(diǎn)上執(zhí)行,它會(huì)使用函數(shù)中所有變量的副本。這些變量被復(fù)制到所有的機(jī)器上,遠(yuǎn)程機(jī)器上并沒(méi)有被更新的變量會(huì)向驅(qū)動(dòng)程序回傳。在任務(wù)之間使用通用的,支持讀寫(xiě)的共享變量是低效的。盡管如此,Spark提供了兩種有限類(lèi)型的共享變量,廣播變量和累加器。
1.1 廣播變量:
廣播變量允許程序員將一個(gè)只讀的變量緩存在每臺(tái)機(jī)器上,而不用在任務(wù)之間傳遞變量。廣播變量可被用于有效地給每個(gè)節(jié)點(diǎn)一個(gè)大輸入數(shù)據(jù)集的副本。Spark還嘗試使用高效地廣播算法來(lái)分發(fā)變量,進(jìn)而減少通信的開(kāi)銷(xiāo)。
Spark的動(dòng)作通過(guò)一系列的步驟執(zhí)行,這些步驟由分布式的shuffle操作分開(kāi)。Spark自動(dòng)地廣播每個(gè)步驟每個(gè)任務(wù)需要的通用數(shù)據(jù)。這些廣播數(shù)據(jù)被序列化地緩存,在運(yùn)行任務(wù)之前被反序列化出來(lái)。這意味著當(dāng)我們需要在多個(gè)階段的任務(wù)之間使用相同的數(shù)據(jù),或者以反序列化形式緩存數(shù)據(jù)是十分重要的時(shí)候,顯式地創(chuàng)建廣播變量才有用。
通過(guò)在一個(gè)變量v上調(diào)用SparkContext.broadcast(v)可以創(chuàng)建廣播變量。廣播變量是圍繞著v的封裝,可以通過(guò)value方法訪問(wèn)這個(gè)變量。舉例如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
在創(chuàng)建了廣播變量之后,在集群上的所有函數(shù)中應(yīng)該使用它來(lái)替代使用v.這樣v就不會(huì)不止一次地在節(jié)點(diǎn)之間傳輸了。另外,為了確保所有的節(jié)點(diǎn)獲得相同的變量,對(duì)象v在被廣播之后就不應(yīng)該再修改。
1.2 累加器:
累加器是僅僅被相關(guān)操作累加的變量,因此可以在并行中被有效地支持。它可以被用來(lái)實(shí)現(xiàn)計(jì)數(shù)器和總和。Spark原生地只支持?jǐn)?shù)字類(lèi)型的累加器,編程者可以添加新類(lèi)型的支持。如果創(chuàng)建累加器時(shí)指定了名字,可以在Spark的UI界面看到。這有利于理解每個(gè)執(zhí)行階段的進(jìn)程。(對(duì)于python還不支持)
累加器通過(guò)對(duì)一個(gè)初始化了的變量v調(diào)用SparkContext.accumulator(v)來(lái)創(chuàng)建。在集群上運(yùn)行的任務(wù)可以通過(guò)add或者”+=”方法在累加器上進(jìn)行累加操作。但是,它們不能讀取它的值。只有驅(qū)動(dòng)程序能夠讀取它的值,通過(guò)累加器的value方法。
下面的代碼展示了如何把一個(gè)數(shù)組中的所有元素累加到累加器上:
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
盡管上面的例子使用了內(nèi)置支持的累加器類(lèi)型Int,但是開(kāi)發(fā)人員也可以通過(guò)繼承AccumulatorParam類(lèi)來(lái)創(chuàng)建它們自己的累加器類(lèi)型。AccumulatorParam接口有兩個(gè)方法:
zero方法為你的類(lèi)型提供一個(gè)0值。
addInPlace方法將兩個(gè)值相加。
假設(shè)我們有一個(gè)代表數(shù)學(xué)vector的Vector類(lèi)。我們可以向下面這樣實(shí)現(xiàn):
object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加接口來(lái)累加數(shù)據(jù),盡管結(jié)果的類(lèi)型和累加的數(shù)據(jù)類(lèi)型可能不一致(例如,通過(guò)收集在一起的元素來(lái)創(chuàng)建一個(gè)列表)。同時(shí),SparkContext..accumulableCollection方法來(lái)累加通用的Scala的集合類(lèi)型。
累加器僅僅在動(dòng)作操作內(nèi)部被更新,Spark保證每個(gè)任務(wù)在累加器上的更新操作只被執(zhí)行一次,也就是說(shuō),重啟任務(wù)也不會(huì)更新。在轉(zhuǎn)換操作中,用戶(hù)必須意識(shí)到每個(gè)任務(wù)對(duì)累加器的更新操作可能被不只一次執(zhí)行,如果重新執(zhí)行了任務(wù)和作業(yè)的階段。
累加器并沒(méi)有改變Spark的惰性求值模型。如果它們被RDD上的操作更新,它們的值只有當(dāng)RDD因?yàn)閯?dòng)作操作被計(jì)算時(shí)才被更新。因此,當(dāng)執(zhí)行一個(gè)惰性的轉(zhuǎn)換操作,比如map時(shí),不能保證對(duì)累加器值的更新被實(shí)際執(zhí)行了。下面的代碼片段演示了此特性:
val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } //在這里,accum的值仍然是0,因?yàn)闆](méi)有動(dòng)作操作引起map被實(shí)際的計(jì)算.
二.Java和Scala版本的實(shí)戰(zhàn)演示
2.1 Java版本:
/** * 實(shí)例:利用廣播進(jìn)行黑名單過(guò)濾! * 檢查新的數(shù)據(jù) 根據(jù)是否在廣播變量-黑名單內(nèi),從而實(shí)現(xiàn)過(guò)濾數(shù)據(jù)。 */ public class BroadcastAccumulator { /** * 創(chuàng)建一個(gè)List的廣播變量 * */ private static volatile Broadcast<List<String>> broadcastList = null; /** * 計(jì)數(shù)器! */ private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("WordCountOnlineBroadcast"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 注意:分發(fā)廣播需要一個(gè)action操作觸發(fā)。 * 注意:廣播的是Arrays的asList 而非對(duì)象的引用。廣播Array數(shù)組的對(duì)象引用會(huì)出錯(cuò)。 * 使用broadcast廣播黑名單到每個(gè)Executor中! */ broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive")); /** * 累加器作為全局計(jì)數(shù)器!用于統(tǒng)計(jì)在線過(guò)濾了多少個(gè)黑名單! * 在這里實(shí)例化。 */ accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999); /** * 這里省去flatmap因?yàn)槊麊问且粋€(gè)個(gè)的! */ JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) { return v1 + v2; } }); /** * Funtion里面 前幾個(gè)參數(shù)是 入?yún)ⅰ? * 后面的出參。 * 體現(xiàn)在call方法里面! * */ wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { @Override public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { if (broadcastList.value().contains(wordPair._1)) { /** * accumulator不僅僅用來(lái)計(jì)數(shù)。 * 可以同時(shí)寫(xiě)進(jìn)數(shù)據(jù)庫(kù)或者緩存中。 */ accumulator.add(wordPair._2); return false; }else { return true; } }; /** * 廣播和計(jì)數(shù)器的執(zhí)行,需要進(jìn)行一個(gè)action操作! */ }).collect(); System.out.println("廣播器里面的值"+broadcastList.value()); System.out.println("計(jì)時(shí)器里面的值"+accumulator.value()); return null; } }); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
2.2 Scala版本
package com.Streaming import java.util import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf} import org.apache.spark.broadcast.Broadcast /** * Created by lxh on 2016/6/30. */ object BroadcastAccumulatorStreaming { /** * 聲明一個(gè)廣播和累加器! */ private var broadcastList:Broadcast[List[String]] = _ private var accumulator:Accumulator[Int] = _ def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest") val sc = new SparkContext(sparkConf) /** * duration是ms */ val ssc = new StreamingContext(sc,Duration(2000)) // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark")) broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark")) accumulator= ssc.sparkContext.accumulator(0,"broadcasttest") /** * 獲取數(shù)據(jù)! */ val lines = ssc.socketTextStream("localhost",9999) /** * 1.flatmap把行分割成詞。 * 2.map把詞變成tuple(word,1) * 3.reducebykey累加value * (4.sortBykey排名) * 4.進(jìn)行過(guò)濾。 value是否在累加器中。 * 5.打印顯示。 */ val words = lines.flatMap(line => line.split(" ")) val wordpair = words.map(word => (word,1)) wordpair.filter(record => {broadcastList.value.contains(record._1)}) val pair = wordpair.reduceByKey(_+_) /** * 這個(gè)pair 是PairDStream<String, Integer> * 查看這個(gè)id是否在黑名單中,如果是的話,累加器就+1 */ /* pair.foreachRDD(rdd => { rdd.filter(record => { if (broadcastList.value.contains(record._1)) { accumulator.add(1) return true } else { return false } }) })*/ val filtedpair = pair.filter(record => { if (broadcastList.value.contains(record._1)) { accumulator.add(record._2) true } else { false } }).print println("累加器的值"+accumulator.value) // pair.filter(record => {broadcastList.value.contains(record._1)}) /* val keypair = pair.map(pair => (pair._2,pair._1))*/ /** * 如果DStream自己沒(méi)有某個(gè)算子操作。就通過(guò)轉(zhuǎn)化transform! */ /* keypair.transform(rdd => { rdd.sortByKey(false)//TODO })*/ pair.print() ssc.start() ssc.awaitTermination() } }
以上是“Spark的廣播變量和累加器怎么用”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
網(wǎng)站欄目:Spark的廣播變量和累加器怎么用
鏈接分享:http://aaarwkj.com/article10/iggcgo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、、網(wǎng)站排名、網(wǎng)站制作、網(wǎng)站營(yíng)銷(xiāo)、標(biāo)簽優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)