欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

怎樣簡談Kafka中的NIO網(wǎng)絡(luò)通信模型

這篇文章將為大家詳細(xì)講解有關(guān)怎樣簡談Kafka中的NIO網(wǎng)絡(luò)通信模型,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

在潼南等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供做網(wǎng)站、網(wǎng)站設(shè)計 網(wǎng)站設(shè)計制作定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都營銷網(wǎng)站建設(shè),外貿(mào)網(wǎng)站建設(shè),潼南網(wǎng)站建設(shè)費(fèi)用合理。

摘要:很多人喜歡把RocketMQ與Kafka做對比,其實(shí)這兩款消息隊列的網(wǎng)絡(luò)通信層還是比較相似的,小編就為大家簡要地介紹下Kafka的NIO網(wǎng)絡(luò)通信模型

下面主要通過對Kafka源碼的分析來簡述其Reactor的多線程網(wǎng)絡(luò)通信模型和總體框架結(jié)構(gòu),同時簡要介紹Kafka網(wǎng)絡(luò)通信層的設(shè)計與具體實(shí)現(xiàn)。

一、Kafka網(wǎng)絡(luò)通信模型的整體框架概述

Kafka的網(wǎng)絡(luò)通信模型是基于NIO的Reactor多線程模型來設(shè)計的。這里先引用Kafka源碼中注釋的一段話:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的這段引文注釋后,大致可以了解到Kafka的網(wǎng)絡(luò)通信層模型,主要采用了 1(1個Acceptor線程)+N(N個Processor線程)+M(M個業(yè)務(wù)處理線程) 。下面的表格簡要的列舉了下(這里先簡單的看下后面還會詳細(xì)說明):

線程數(shù)線程名線程具體說明1kafka-socket-acceptor_%xAcceptor線程,負(fù)責(zé)監(jiān)聽Client端發(fā)起的請求Nkafka-network-thread_%dProcessor線程,負(fù)責(zé)對Socket進(jìn)行讀寫Mkafka-request-handler-_%dWorker線程,處理具體的業(yè)務(wù)邏輯并生成Response返回

Kafka網(wǎng)絡(luò)通信層的完整框架圖如下圖所示:

怎樣簡談Kafka中的NIO網(wǎng)絡(luò)通信模型

Kafka消息隊列的通信層模型—1+N+M模型.png

剛開始看到上面的這個框架圖可能會有一些不太理解,并不要緊,這里可以先對Kafka的網(wǎng)絡(luò)通信層框架結(jié)構(gòu)有一個大致了解。本文后面會結(jié)合Kafka的部分重要源碼來詳細(xì)闡述上面的過程。這里可以簡單總結(jié)一下其網(wǎng)絡(luò)通信模型中的幾個重要概念:

(1), Acceptor :1個接收線程,負(fù)責(zé)監(jiān)聽新的連接請求,同時注冊O(shè)P_ACCEPT 事件,將新的連接按照 "round robin" 方式交給對應(yīng)的 Processor 線程處理;

(2), Processor :N個處理器線程,其中每個 Processor 都有自己的 selector,它會向 Acceptor 分配的 SocketChannel 注冊相應(yīng)的 OP_READ 事件,N 的大小由 “num.networker.threads” 決定;

(3), KafkaRequestHandler :M個請求處理線程,包含在線程池—KafkaRequestHandlerPool內(nèi)部,從RequestChannel的全局請求隊列—requestQueue中獲取請求數(shù)據(jù)并交給KafkaApis處理,M的大小由 “num.io.threads” 決定;

(4), RequestChannel :其為Kafka服務(wù)端的請求通道,該數(shù)據(jù)結(jié)構(gòu)中包含了一個全局的請求隊列 requestQueue和多個與Processor處理器相對應(yīng)的響應(yīng)隊列responseQueue,提供給Processor與請求處理線程KafkaRequestHandler和KafkaApis交換數(shù)據(jù)的地方。

(5), NetworkClient :其底層是對 Java NIO 進(jìn)行相應(yīng)的封裝,位于Kafka的網(wǎng)絡(luò)接口層。Kafka消息生產(chǎn)者對象—KafkaProducer的send方法主要調(diào)用NetworkClient完成消息發(fā)送;

(6), SocketServer :其是一個NIO的服務(wù),它同時啟動一個Acceptor接收線程和多個Processor處理器線程。提供了一種典型的Reactor多線程模式,將接收客戶端請求和處理請求相分離;

(7), KafkaServer :代表了一個Kafka Broker的實(shí)例;其startup方法為實(shí)例啟動的入口;

(8), KafkaApis :Kafka的業(yè)務(wù)邏輯處理Api,負(fù)責(zé)處理不同類型的請求;比如 “發(fā)送消息”、 “獲取消息偏移量—offset” 和 “處理心跳請求” 等;

二、Kafka網(wǎng)絡(luò)通信層的設(shè)計與具體實(shí)現(xiàn)

這一節(jié)將結(jié)合Kafka網(wǎng)絡(luò)通信層的源碼來分析其設(shè)計與實(shí)現(xiàn),這里主要詳細(xì)介紹網(wǎng)絡(luò)通信層的幾個重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源碼部分均基于Kafka的0.11.0版本。

1、SocketServer

SocketServer是接收客戶端Socket請求連接、處理請求并返回處理結(jié)果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這里實(shí)現(xiàn)的。在KafkaServer實(shí)例啟動時會調(diào)用其startup的初始化方法,會初始化1個 Acceptor和N個Processor線程(每個EndPoint都會初始化,一般來說一個Server只會設(shè)置一個端口),其實(shí)現(xiàn)如下:

 
def startup() { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 // 一個broker一般只設(shè)置一個端口 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads //N 個 processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool) //1個 Acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } }

2、Acceptor

Acceptor是一個繼承自抽象類AbstractServerThread的線程類。Acceptor的主要任務(wù)是監(jiān)聽并且接收客戶端的請求,同時建立數(shù)據(jù)傳輸通道—SocketChannel,然后以輪詢的方式交給一個后端的Processor線程處理(具體的方式是添加socketChannel至并發(fā)隊列并喚醒Processor線程處理)。

在該線程類中主要可以關(guān)注以下兩個重要的變量:

(1), nioSelector :通過NSelector.open()方法創(chuàng)建的變量,封裝了JAVA NIO Selector的相關(guān)操作;

(2), serverChannel :用于監(jiān)聽端口的服務(wù)端Socket套接字對象;

下面來看下Acceptor主要的run方法的源碼:

 
def run() { //首先注冊O(shè)P_ACCEPT事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 //以輪詢方式查詢并等待關(guān)注的事件發(fā)生 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) //如果事件發(fā)生則調(diào)用accept方法對OP_ACCEPT事件處理 accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") //輪詢算法 // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } //代碼省略 } def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) } catch { //省略部分代碼 } } def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() }

在上面源碼中可以看到,Acceptor線程啟動后,首先會向用于監(jiān)聽端口的服務(wù)端套接字對象—ServerSocketChannel上注冊O(shè)P_ACCEPT 事件。然后以輪詢的方式等待所關(guān)注的事件發(fā)生。如果該事件發(fā)生,則調(diào)用accept()方法對OP_ACCEPT事件進(jìn)行處理。這里,Processor是通過 round robin 方法選擇的,這樣可以保證后面多個Processor線程的負(fù)載基本均勻。

Acceptor的accept()方法的作用主要如下:

(1)通過SelectionKey取得與之對應(yīng)的serverSocketChannel實(shí)例,并調(diào)用它的accept()方法與客戶端建立連接;

(2)調(diào)用connectionQuotas.inc()方法增加連接統(tǒng)計計數(shù);并同時設(shè)置第(1)步中創(chuàng)建返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)

(3)將socketChannel交給processor.accept()方法進(jìn)行處理。這里主要是將socketChannel加入Processor處理器的并發(fā)隊列newConnections隊列中,然后喚醒Processor線程從隊列中獲取socketChannel并處理。其中,newConnections會被Acceptor線程和Processor線程并發(fā)訪問操作,所以newConnections是ConcurrentLinkedQueue隊列(一個基于鏈接節(jié)點(diǎn)的無界線程安全隊列)

3、Processor

Processor同Acceptor一樣,也是一個線程類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請求中讀取數(shù)據(jù)和將KafkaRequestHandler處理完響應(yīng)結(jié)果返回給客戶端。在該線程類中主要關(guān)注以下幾個重要的變量:

(1), newConnections :在上面的 Acceptor 一節(jié)中已經(jīng)提到過,它是一種ConcurrentLinkedQueue[SocketChannel]類型的隊列,用于保存新連接交由Processor處理的socketChannel;

(2), inflightResponses :是一個Map[String, RequestChannel.Response]類型的集合,用于記錄尚未發(fā)送的響應(yīng);

(3), selector :是一個類型為KSelector變量,用于管理網(wǎng)絡(luò)連接;

下面先給出Processor處理器線程run方法執(zhí)行的流程圖:

怎樣簡談Kafka中的NIO網(wǎng)絡(luò)通信模型

Kafk_Processor線程的處理流程圖.png

從上面的流程圖中能夠可以看出Processor處理器線程在其主流程中主要完成了這樣子幾步操作:

(1), 處理newConnections隊列中的socketChannel 。遍歷取出隊列中的每個socketChannel并將其在selector上注冊O(shè)P_READ事件;

(2), 處理RequestChannel中與當(dāng)前Processor對應(yīng)響應(yīng)隊列中的Response 。在這一步中會根據(jù)responseAction的類型(NoOpAction/SendAction/CloseConnectionAction)進(jìn)行判斷,若為“NoOpAction”,表示該連接對應(yīng)的請求無需響應(yīng);若為“SendAction”,表示該Response需要發(fā)送給客戶端,則會通過“selector.send”注冊O(shè)P_WRITE事件,并且將該Response從responseQueue響應(yīng)隊列中移至inflightResponses集合中;“CloseConnectionAction”,表示該連接是要關(guān)閉的;

(3), 調(diào)用selector.poll()方法進(jìn)行處理 。該方法底層即為調(diào)用nioSelector.select()方法進(jìn)行處理。

(4), 處理已接受完成的數(shù)據(jù)包隊列—completedReceives 。在processCompletedReceives方法中調(diào)用“requestChannel.sendRequest”方法將請求Request添加至requestChannel的全局請求隊列—requestQueue中,等待KafkaRequestHandler來處理。同時,調(diào)用“selector.mute”方法取消與該請求對應(yīng)的連接通道上的OP_READ事件;

(5), 處理已發(fā)送完的隊列—completedSends 。當(dāng)已經(jīng)完成將response發(fā)送給客戶端,則將其從inflightResponses移除,同時通過調(diào)用“selector.unmute”方法為對應(yīng)的連接通道重新注冊O(shè)P_READ事件;

(6), 處理斷開連接的隊列 。將該response從inflightResponses集合中移除,同時將connectionQuotas統(tǒng)計計數(shù)減1;

4、RequestChannel

在Kafka的網(wǎng)絡(luò)通信層中,RequestChannel為Processor處理器線程與KafkaRequestHandler線程之間的數(shù)據(jù)交換提供了一個數(shù)據(jù)緩沖區(qū),是通信過程中Request和Response緩存的地方。因此,其作用就是在通信中起到了一個數(shù)據(jù)緩沖隊列的作用。Processor線程將讀取到的請求添加至RequestChannel的全局請求隊列—requestQueue中;KafkaRequestHandler線程從請求隊列中獲取并處理,處理完以后將Response添加至RequestChannel的響應(yīng)隊列—responseQueue中,并通過responseListeners喚醒對應(yīng)的Processor線程,最后Processor線程從響應(yīng)隊列中取出后發(fā)送至客戶端。

5、KafkaRequestHandler

KafkaRequestHandler也是一種線程類,在KafkaServer實(shí)例啟動時候會實(shí)例化一個線程池—KafkaRequestHandlerPool對象(包含了若干個KafkaRequestHandler線程),這些線程以守護(hù)線程的方式在后臺運(yùn)行。在KafkaRequestHandler的run方法中會循環(huán)地從RequestChannel中阻塞式讀取request,讀取后再交由KafkaApis來具體處理。

6、KafkaApis

KafkaApis是用于處理對通信網(wǎng)絡(luò)傳輸過來的業(yè)務(wù)消息請求的中心轉(zhuǎn)發(fā)組件。該組件反映出Kafka Broker Server可以提供哪些服務(wù)。

關(guān)于怎樣簡談Kafka中的NIO網(wǎng)絡(luò)通信模型就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

新聞名稱:怎樣簡談Kafka中的NIO網(wǎng)絡(luò)通信模型
網(wǎng)頁地址:http://aaarwkj.com/article2/gojjoc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設(shè)、小程序開發(fā)、做網(wǎng)站、品牌網(wǎng)站設(shè)計、網(wǎng)站內(nèi)鏈、ChatGPT

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司
国产视频一区二区三区网| 亚洲伦理一区二区三区中文| 毛片成人18毛片免费看| 免费看av网站一区二区| 国产精品国产三级国产不产一地| 日本久久在线观看视频| 亚州精品乱码久久电影| 中文字幕乱码熟女人妻视频| 日本精品在线不卡视频| 97视频观看免费观看| 国产午夜福利一区在线| 亚洲av男人天堂一区| 超碰av之男人的天堂| 亚洲成人日韩欧美在线| 熟妇人妻精品一区二区三区颏| 成年视频免费观看视频| 97国产免费全部免费观看| 激情内射日本一区二区三区| 久久精品视频就在久久| 欧美色高清视频在线播放| av毛片天堂在线观看| 女人的天堂啪啪啪av| 婷婷网色偷偷亚洲男人| 欧美av精品一区二区三区| 未满18禁止入内在线观看| 绯色av一区二区三区蜜臀| 成人性生交视频免费看| 日韩精品一区二区三区欲色av| 欧美一级黄色免费电影| 国产一区二区激情在线| 亚洲三级伦理在线视频| 成人黄色免费在线网站| 国欧美一区二区三区| 日韩精品一区二区视频在线| 99久久精品人妻少妇一| 一级黄片国产精品久久| 国产原创av剧情愿望成真| 免费观看国产裸体视频| 国产三级全黄在线播放| 人妻巨乳一区二区三区| 亚洲精品乱码在线播放|