欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

為什么Spark的Broadcast要用單例模式

這篇文章給大家介紹為什么Spark 的Broadcast要用單例模式,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

網(wǎng)站建設(shè)哪家好,找成都創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了城區(qū)免費(fèi)建站歡迎大家使用!

很多用Spark Streaming  的朋友應(yīng)該使用過(guò)broadcast,大多數(shù)情況下廣播變量都是以單例模式聲明的有沒(méi)有粉絲想過(guò)為什么?浪尖在這里幫大家分析一下,有以下幾個(gè)原因:

  1. 廣播變量大多數(shù)情況下是不會(huì)變更的,使用單例模式可以減少spark streaming每次job生成執(zhí)行,重復(fù)生成廣播變量帶來(lái)的開銷。

  2. 單例模式也要做同步。這個(gè)對(duì)于很多新手來(lái)說(shuō)可以不用考慮同步問(wèn)題,原因很簡(jiǎn)單因?yàn)樾率植粫?huì)調(diào)整spark  程序task的調(diào)度模式,而默認(rèn)采用FIFO的調(diào)度模式,基本不會(huì)產(chǎn)生并發(fā)問(wèn)題。1).假如你配置了Fair調(diào)度模式,同時(shí)修改了Spark  Streaming運(yùn)行的并行執(zhí)行的job數(shù),默認(rèn)為1,那么就要加上同步代碼了。2).還有一個(gè)原因,在多輸出流的情況下共享broadcast,同時(shí)配置了Fair調(diào)度模式,也會(huì)產(chǎn)生并發(fā)問(wèn)題。

  3. 注意。有些時(shí)候比如廣播配置文件,規(guī)則等需要變更broadcast,在使用fair的時(shí)候可以在foreachrdd里面使用局部變量作為廣播,避免相互干擾。

先看例子,后面逐步揭曉內(nèi)部機(jī)制。

1.例子

下面是一個(gè)雙重檢查式的broadcast變量的聲明方式。

object WordBlacklist {    @volatile private var instance: Broadcast[Seq[String]] = null    def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {     if (instance == null) {       synchronized {         if (instance == null) {           val wordBlacklist = Seq("a", "b", "c")           instance = sc.broadcast(wordBlacklist)         }       }     }     instance   } }

廣播變量的使用方法如下:

val lines = ssc.socketTextStream(ip, port)     val words = lines.flatMap(_.split(" "))     val wordCounts = words.map((_, 1)).reduceByKey(_ + _)     wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>       // Get or register the blacklist Broadcast       val blacklist = WordBlacklist.getInstance(rdd.sparkContext)       // Get or register the droppedWordsCounter Accumulator       val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)       // Use blacklist to drop words and use droppedWordsCounter to count them       val counts = rdd.filter { case (word, count) =>         if (blacklist.value.contains(word)) {           droppedWordsCounter.add(count)           false         } else {           true         }       }.collect().mkString("[", ", ", "]")       val output = s"Counts at time $time $counts"       println(output)       println(s"Dropped ${droppedWordsCounter.value} word(s) totally")       println(s"Appending to ${outputFile.getAbsolutePath}")       Files.append(output + "\n", outputFile, Charset.defaultCharset())     }

2.概念補(bǔ)充

為什么Spark 的Broadcast要用單例模式

首先,一個(gè)基本概念就是Spark應(yīng)用程序從開始提交到task執(zhí)行分了很多層。

  1. 應(yīng)用調(diào)度器。主要是資源管理器,比如standalone,yarn等負(fù)責(zé)Spark整個(gè)應(yīng)用的調(diào)度和集群資源的管理。

  2. job調(diào)度器。spark  的算子分為主要兩大類,transform和action,其中每一個(gè)action都會(huì)產(chǎn)生一個(gè)job。這個(gè)job需要在executor提供的資源池里調(diào)度執(zhí)行,當(dāng)然并不少直接調(diào)度執(zhí)行job。

  3. stage劃分及調(diào)度。job具體會(huì)劃分為若干stage,這個(gè)就有一個(gè)基本的概念就是寬依賴和窄依賴,寬依賴就會(huì)劃分stage。stage也需要調(diào)度執(zhí)行,從后往前劃分,從前往后調(diào)度執(zhí)行。

  4. task切割及調(diào)度。stage往下繼續(xù)細(xì)化就是會(huì)根據(jù)不太的并行度劃分出task集合,這個(gè)就是在executor上調(diào)度執(zhí)行的基本單元,目前的調(diào)度默認(rèn)是一個(gè)task一個(gè)cpu。

  5. Spark Streaming 的job生成是周期性的。當(dāng)前job的執(zhí)行時(shí)間超過(guò)生成周期就會(huì)產(chǎn)生job  累加。累加一定數(shù)目的job后有可能會(huì)導(dǎo)致應(yīng)用程序失敗。這個(gè)主要原因是由于FIFO的調(diào)度模式和Spark Streaming的默認(rèn)單線程的job執(zhí)行機(jī)制

3.Spark Streaming job生成

這個(gè)源碼主要入口是StreamingContext#JobScheduler#JobGenerator對(duì)象,內(nèi)部有個(gè)RecurringTimer,主要負(fù)責(zé)按照批處理時(shí)間周期產(chǎn)生GenrateJobs事件,當(dāng)然在存在windows的情況下,該周期有可能不會(huì)生成job,要取決于滑動(dòng)間隔,有興趣自己去揭秘,浪尖星球里分享的視頻教程里講到了。具體代碼塊如下

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

我們直接看其實(shí)現(xiàn)代碼塊:

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)        override protected def onError(e: Throwable): Unit = {         jobScheduler.reportError("Error in job generator", e)       }     }     eventLoop.start()

event處理函數(shù)是processEvent方法

/** Processes all events */   private def processEvent(event: JobGeneratorEvent) {     logDebug("Got event " + event)     event match {       case GenerateJobs(time) => generateJobs(time)       case ClearMetadata(time) => clearMetadata(time)       case DoCheckpoint(time, clearCheckpointDataLater) =>         doCheckpoint(time, clearCheckpointDataLater)       case ClearCheckpointData(time) => clearCheckpointData(time)     }   }

在接受到GenerateJob事件的時(shí)候,會(huì)執(zhí)行g(shù)enerateJobs代碼,就是在該代碼內(nèi)部產(chǎn)生和調(diào)度job的。

/** Generate jobs and perform checkpointing for the given `time`.  */   private def generateJobs(time: Time) {     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")     Try {       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch       graph.generateJobs(time) // generate jobs using allocated block     } match {       case Success(jobs) =>         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))       case Failure(e) =>         jobScheduler.reportError("Error generating jobs for time " + time, e)         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)     }     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))   }

可以看到代碼里首先會(huì)執(zhí)行job生成代碼

graph.generateJobs(time)  具體代碼塊兒  def generateJobs(time: Time): Seq[Job] = {     logDebug("Generating jobs for time " + time)     val jobs = this.synchronized {       outputStreams.flatMap { outputStream =>         val jobOption = outputStream.generateJob(time)         jobOption.foreach(_.setCallSite(outputStream.creationSite))         jobOption       }     }     logDebug("Generated " + jobs.length + " jobs for time " + time)     jobs   }

每個(gè)輸出流都會(huì)生成一個(gè)job,輸出流就類似于foreachrdd,print這些。其實(shí)內(nèi)部都是ForEachDStream。所以生成的是一個(gè)job集合。

然后就會(huì)將job集合提交到線程池里去執(zhí)行,這些都是在driver端完成的哦。

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))  具體h函數(shù)內(nèi)容 def submitJobSet(jobSet: JobSet) {     if (jobSet.jobs.isEmpty) {       logInfo("No jobs added for time " + jobSet.time)     } else {       listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))       jobSets.put(jobSet.time, jobSet)       jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))       logInfo("Added jobs for time " + jobSet.time)     }   }

其實(shí)就是遍歷生成的job集合,然后提交到線程池jobExecutor內(nèi)部執(zhí)行。這個(gè)也是在driver端的哦。

jobExecutor就是一個(gè)固定線程數(shù)的線程池,默認(rèn)是1個(gè)線程。

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)   private val jobExecutor =     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

需要的話可以配置spark.streaming.concurrentJobs來(lái)同時(shí)提交執(zhí)行多個(gè)job。

那么這種情況下,job就可以并行執(zhí)行了嗎?

顯然不是的!

還要修改一下調(diào)度模式為Fair

簡(jiǎn)單的均分的話只需要

conf.set("spark.scheduler.mode", "FAIR")

然后,同時(shí)運(yùn)行的job就會(huì)均分所有executor提供的資源。

這就是整個(gè)job生成的整個(gè)過(guò)程了哦。

因?yàn)镾park Streaming的任務(wù)存在Fair模式下并發(fā)的情況,所以需要在使用單例模式生成broadcast的時(shí)候要注意聲明同步。

關(guān)于為什么Spark 的Broadcast要用單例模式就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

網(wǎng)站欄目:為什么Spark的Broadcast要用單例模式
網(wǎng)站路徑:http://aaarwkj.com/article0/gjijio.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供云服務(wù)器品牌網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站建設(shè)域名注冊(cè)、網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都做網(wǎng)站
日韩不卡免费在线视频| 青青草成年人免费视频| 三级av电影中文字幕| 大片天天看菲色亚洲黄色| 男女在线视频网站免费| 一区二区三区在线观看日韩| 欧美日韩国产免费,日日骚| 视频在线免费观看97| 91在线视频欧美国产| 日韩av高清在线播放| 熟女俱乐部五十路六十路| 亚洲国产成人欧美日韩另类| 国模在线视频一区二区| 精品啪啪高潮一区二区| 亚洲国产熟女导航网站视频| 97资源视频在线播放| 色婷婷av一区二区三区张| 亚洲天堂国产成人精品| 国产视频一区二区三区网| 亚洲国产精品中文字幕一区久久| 国产一区二区主播不卡| 传媒视频在线免费观看| 日韩一区二区三区不卡| 日本欧美高清一区二区| 国内精品偷拍一区二区三区| 小草少妇视频免费看视频| 亚洲一区在线观看激情| 亚洲av欧美日韩国产| 亚洲最大午夜福利视频| 激情偷拍一区二区三区视频| 国语对白视频在线观看| a一级成人插少妇的逼| 国产尹人99大香蕉| 国产精品传媒免费在线观看| 精品国产一区二区三区卡| 国产又粗又长在线视频| 国产传媒免费在线播放| 日本人妻内射一区二区| 美女福利视频一区二区| 色呦呦一区二区三区视频| 九九精品在线观看视频|