欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

SparkStreaming反壓機(jī)制探秘-創(chuàng)新互聯(lián)

1.反壓機(jī)制原理

Spark Streaming中的反壓機(jī)制是Spark 1.5.0推出的新特性,可以根據(jù)處理效率動態(tài)調(diào)整攝入速率。

在余干等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需定制開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),營銷型網(wǎng)站,外貿(mào)網(wǎng)站建設(shè),余干網(wǎng)站建設(shè)費(fèi)用合理。

當(dāng)批處理時間(Batch Processing Time)大于批次間隔(Batch Interval,即 BatchDuration)時,說明處理數(shù)據(jù)的速度小于數(shù)據(jù)攝入的速度,持續(xù)時間過長或源頭數(shù)據(jù)暴增,容易造成數(shù)據(jù)在內(nèi)存中堆積,最終導(dǎo)致Executor OOM或任務(wù)奔潰。

在這種情況下,若是基于Kafka Receiver的數(shù)據(jù)源,可以通過設(shè)置spark.streaming.receiver.maxRate來控制大輸入速率;若是基于Direct的數(shù)據(jù)源(如Kafka Direct Stream),則可以通過設(shè)置spark.streaming.kafka.maxRatePerPartition來控制大輸入速率。當(dāng)然,在事先經(jīng)過壓測,且流量高峰不會超過預(yù)期的情況下,設(shè)置這些參數(shù)一般沒什么問題。但大值,不代表是最優(yōu)值,最好還能根據(jù)每個批次處理情況來動態(tài)預(yù)估下個批次最優(yōu)速率。在Spark 1.5.0以上,就可通過背壓機(jī)制來實(shí)現(xiàn)。開啟反壓機(jī)制,即設(shè)置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據(jù)處理能力來調(diào)整輸入速率,從而在流量高峰時仍能保證大的吞吐和性能。

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      // 處理結(jié)束時間
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      // 處理時間,即`processingEndTime` - `processingStartTime`
     workDelay <- batchCompleted.batchInfo.processingDelay
      // 在調(diào)度隊(duì)列中的等待時間,即`processingStartTime` - `submissionTime`
     waitDelay <- batchCompleted.batchInfo.schedulingDelay
      // 當(dāng)前批次處理的記錄數(shù)
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }

可以看到,接著又調(diào)用的是computeAndPublish方法,如下:

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      // 根據(jù)處理時間、調(diào)度時間、當(dāng)前Batch記錄數(shù),預(yù)估新速率
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
      // 設(shè)置新速率
        rateLimit.set(s.toLong)
      // 發(fā)布新速率
        publish(getLatestRate())
      }
    }

更深一層,具體調(diào)用的是rateEstimator.compute方法來預(yù)估新速率,如下:

def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double]

2.反壓機(jī)制相關(guān)參數(shù)

  • spark.streaming.backpressure.enabled
    默認(rèn)值false,是否啟用反壓機(jī)制。

  • spark.streaming.backpressure.initialRate
    默認(rèn)值無,初始大接收速率。只適用于Receiver Stream,不適用于Direct Stream。類型為整數(shù),默認(rèn)直接讀取所有,在1開啟的情況下,限制第一次批處理應(yīng)該消費(fèi)的數(shù)據(jù),因?yàn)槌绦蚶鋯雨?duì)列里面有大量積壓,防止第一次全部讀取,造成系統(tǒng)阻塞

  • spark.streaming.kafka.maxRatePerPartition
    類型為整數(shù),默認(rèn)直接讀取所有,限制每秒每個消費(fèi)線程讀取每個kafka分區(qū)大的數(shù)據(jù)量

  • spark.streaming.stopGracefullyOnShutdown
    優(yōu)雅關(guān)閉,確保在kill任務(wù)時,能夠處理完最后一批數(shù)據(jù),再關(guān)閉程序,不會發(fā)生強(qiáng)制kill導(dǎo)致數(shù)據(jù)處理中斷,沒處理完的數(shù)據(jù)丟失
注意: 只有 3 激活的時候,每次消費(fèi)的大數(shù)據(jù)量,就是設(shè)置的數(shù)據(jù)量,如果不足這個數(shù),就有多少讀多少,如果超過這個數(shù)字,就讀取這個數(shù)字的設(shè)置的值
只有 1+3 激活的時候,每次消費(fèi)讀取的數(shù)量大會等于3設(shè)置的值,最小是spark根據(jù)系統(tǒng)負(fù)載自動推斷的值,消費(fèi)的數(shù)據(jù)量會在這兩個范圍之內(nèi)變化根據(jù)系統(tǒng)情況,但第一次啟動會有多少讀多少數(shù)據(jù)。此后按 1+3 設(shè)置規(guī)則運(yùn)行
1+2+3 同時激活的時候,跟上一個消費(fèi)情況基本一樣,但第一次消費(fèi)會得到限制,因?yàn)槲覀冊O(shè)置第一次消費(fèi)的頻率了。
  • spark.streaming.backpressure.rateEstimator
    默認(rèn)值pid,速率控制器,Spark 默認(rèn)只支持此控制器,可自定義。

  • spark.streaming.backpressure.pid.proportional
    默認(rèn)值1.0,只能為非負(fù)值。當(dāng)前速率與最后一批速率之間的差值對總控制信號貢獻(xiàn)的權(quán)重。用默認(rèn)值即可。

  • spark.streaming.backpressure.pid.integral
    默認(rèn)值0.2,只能為非負(fù)值。比例誤差累積對總控制信號貢獻(xiàn)的權(quán)重。用默認(rèn)值即可。

  • spark.streaming.backpressure.pid.derived
    默認(rèn)值0.0,只能為非負(fù)值。比例誤差變化對總控制信號貢獻(xiàn)的權(quán)重。用默認(rèn)值即可。

  • spark.streaming.backpressure.pid.minRate

    默認(rèn)值100,只能為正數(shù),最小速率。

3.反壓機(jī)制的使用

//啟用反壓機(jī)制
conf.set("spark.streaming.backpressure.enabled","true")
//最小攝入條數(shù)控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//大攝入條數(shù)控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")
//初始大接收速率控制
conf.set("spark.streaming.backpressure.initialRate","10")    

要保證反壓機(jī)制真正起作用前Spark 應(yīng)用程序不會崩潰,需要控制每個批次大攝入速率。以Direct Stream為例,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數(shù)來控制。此參數(shù)代表了 每秒每個分區(qū)大攝入的數(shù)據(jù)條數(shù)。假設(shè)BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區(qū)數(shù)為3個,則一個批(Batch)大讀取的數(shù)據(jù)條數(shù)為360條(31210=360)。同時,需要注意,該參數(shù)也代表了整個應(yīng)用生命周期中的大速率,即使是背壓調(diào)整的大值也不會超過該參數(shù)。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

當(dāng)前名稱:SparkStreaming反壓機(jī)制探秘-創(chuàng)新互聯(lián)
文章位置:http://aaarwkj.com/article4/jdsoe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信小程序、云服務(wù)器營銷型網(wǎng)站建設(shè)、網(wǎng)站營銷、網(wǎng)站維護(hù)、微信公眾號

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
高清白嫩学生自拍视频| 精品欧美一区二区三区在线| 日韩午夜电影一区二区三区| 亚洲精品欧美综合二区| 一本久久精品午夜福利| 日本一区二区日本一区| 精品熟女少妇av免费观看| 亚洲免费精品一区二区三区四区| 熟妇一区二区三区av| 国产粉嫩一区二区三区在线观看| 麻豆精品国产一区二区91| 扒开少妇毛茸茸的大荫萍蒂| 特黄特色的日本大片| 一区三区精品久久久精品| 尤物视频在线观看一下| 亚洲精品成人免费电影| 国产老熟女高潮精品视频网站免费| 久久超碰一区二区三区| 欧美日韩精品一区二区三| 日韩精品视频在线不卡播放| 精品爆白浆一区二区三区| 亚洲精品中文字幕日本乱码 | 综合久久精品亚洲天堂| 日韩视频免费看一区二区| 精品人妻少妇一区二区三区| 国产色综合一区二区| 丰满人妻一区二区三区免费| 色哟哟精品丝袜一区二区| 亚洲高清有码在线观看| 国产激情av网站在线观看 | 久久精品国产久精国产爱| 亚洲免费麻豆一区二区三区| 国产日韩精品一区二区在线| 亚洲国产传媒在线观看| 中文字幕在线看精品乱码| 亚洲精品91在线中文字幕| 免费中文字幕av电影| 日本一道本不卡一区二区| 在线国产一区二区不卡| 亚洲av日韩av一区| 日韩精品中文字幕有码|