欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

第43課:Spark1.6RPC內(nèi)幕解密:運行機制、源碼詳解、Netty與Akka等

  Spark 是分布式計算框架,多臺機器之間必然存在著通信。Spark在早期版本采用Akka實現(xiàn)?,F(xiàn)在在Akka的上層抽象出了一個RpcEnv。RpcEnv負責管理機器之間的通信。

成都創(chuàng)新互聯(lián)從2013年開始,是專業(yè)互聯(lián)網(wǎng)技術服務公司,擁有項目成都網(wǎng)站制作、做網(wǎng)站網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元濮陽做網(wǎng)站,已為上家服務,為濮陽各地企業(yè)和個人服務,聯(lián)系電話:18982081108

RpcEnv包含了如下三大核心:

  • RpcEndpoint 消息循環(huán)體,負責接收并處理消息。Spark中的Master、Worker都是RpcEndpoint 。

  • RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必須獲取它的RpcEndpointRef,通過RpcEndpointRef發(fā)送消息。

  • Dispatcher:消息調(diào)度器,負責RPC消息路由到適當?shù)腞pcEndpoint。

RpcEnv被創(chuàng)建以后,RpcEndpoint可以注冊到RpcEnv中,被注冊的RpcEndpoint會生成一個相應的RpcEndpointRef來引用它。如果你需要向RpcEndpoint發(fā)送消息,必須到RpcEnv中通過RpcEndpoint的名稱來獲取對應的RpcEndpointRef,然后通過RpcEndpointRef向RpcEndpoint發(fā)送消息。

RpcEnv負責管理RpcEndpoint的整個生命周期

  • 注冊RpcEndpoint,使用name或者uri

  • 路由發(fā)送給RpcEndpoint的消息。

  • 停止RpcEndpoint

注:一個RpcEndpoint只能注冊給一個RpcEnv

RpcAddress:RpcEnv的邏輯地址,使用主機名和端口表示。

RpcEndpointAddress:注冊到RpcEnv上的RpcEndpoint的地址,由RpcAddress和name構成。

由此可見RpcEnv和RpcEndpoint是在相同的機器上(相同的JVM中)。而要想給遠端機器發(fā)送消息,是獲取遠端機器的RpcEndpointRef,而并不是遠端的RpcEndpoint注冊到本地的RpcEnv中。

在Spark1.6版本中,默認使用的是netty

private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
  val rpcEnvNames = Map(
    "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
    "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
  val rpcEnvName = conf.get("spark.rpc", "netty")
  val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
  Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}

RpcEndpoint是一個消息循環(huán)體,它的生命周期:

構造(Constructor)->啟動(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)

receive():不斷的運行,處理客戶端發(fā)送過來的消息。

receiveAndReply():處理消息,并且回應對方。

我們看一下Master的代碼:

def main(argStrings: Array[String]) {
  SignalLogger.register(log)
  val conf = new SparkConf
  val args = new MasterArguments(argStrings, conf)
  //指定的主機名必須是start-master.sh腳本運行的本地機器名稱
  val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
  rpcEnv.awaitTermination()
}

/**
 * Start the Master and return a three tuple of:
 *   (1) The Master RpcEnv
 *   (2) The web UI bound port
 *   (3) The REST server bound port, if any
 */
def startRpcEnvAndEndpoint(
    host: String,
    port: Int,
    webUiPort: Int,
    conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
  val securityMgr = new SecurityManager(conf)
  //創(chuàng)建Rpc環(huán)境,主機名和端口就是Standalone集群的訪問地址。SYSTEM_NAME=sparkMaster
  val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  // 將Master實例注冊到RpcEnv中
  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
    new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
  (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}

在main方法中創(chuàng)建了RpcEnv,并且實例化Master實例,然后注冊到RpcEnv中。

RpcEndpoint其實是注冊到Dispatcher中的,在netty中的代碼實現(xiàn)如下:

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  dispatcher.registerRpcEndpoint(name, endpoint)
}

注:NettyRpcEnv.scala的第135行

而Dispatcher中使用如下數(shù)據(jù)結構來存儲RpcEndpoint和RpcEndpointRef

private val endpoints = new ConcurrentHashMap[String, EndpointData]
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

EndpointData為一個case class:

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)
}

在Master中使用數(shù)據(jù)結構WorkerInfo保存著每個Worker的信息,其中就包括每個Worker的RpcEndpointRef

第43課:Spark 1.6 RPC內(nèi)幕解密:運行機制、源碼詳解、Netty與Akka等

備注:

1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數(shù)據(jù)實戰(zhàn)YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains

網(wǎng)頁題目:第43課:Spark1.6RPC內(nèi)幕解密:運行機制、源碼詳解、Netty與Akka等
本文來源:http://aaarwkj.com/article36/gpjjsg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、網(wǎng)頁設計公司、微信小程序網(wǎng)站內(nèi)鏈、商城網(wǎng)站網(wǎng)站改版

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設
国产超码片内射在线| 成年人午夜在线观看网址| 国产亚洲日本一区二区三区| 日韩 在线一区二区| 中国亚洲视频一区二区| 亚洲成av人在线播放| 久热精品视频在线观看| 国产第一页第二页在线| 国产精品综合久久久久久| 九九热精品只有这里有| 国产精品人妻在线av| 久章草在线免费视频播放| 中文字幕国产成人在线视频| 蜜臀av网站在线播放| 尤物视频最新在线观看| 字幕日本欧美一区二区| 日本人妻丰满熟妇久久| 国产产品在线免费看91| 国产精品伦一区二区三级| 91一区二区亚洲嫩草| 老熟妇奂伦一区二区三区| 亚洲自偷精品视频自拍| 人妻熟女一区二区视频| 国产精品国产三级国产av丨 | 十八女毛片一区二区三区| 国产av不卡二区三区| 国产又大又长又粗又硬又猛| 免费毛片一区二区三区四区| 欧美人妻不卡一区二区久久| 免费观看国产裸体视频| 国产精品一区二区久久毛片| 一区二区三区在线观看日韩| 88国产精品久久久久久| 精彩国产av一区二区三区| 人妻上司无奈中文字幕| 天天操夜夜操夜夜操精品| av一级免费在线观看| 日本高清免费黄色录像| 国产精品av国产精华液| 91国产精品视频在线| 亚洲一区二区三区不卡视频|