欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

KafkaNetwork層解析,還是有人把它說清楚了

我們知道kafka是基于TCP連接的。其并沒有像很多中間件使用netty作為TCP服務器。而是自己基于Java NIO寫了一套。

多端合一響應式網(wǎng)站:PC+平板+手機,同一后臺修改數(shù)據(jù)多端同步更新提交您的需求,獲取網(wǎng)站建設與營銷策劃方案報價,我們會在1小時內(nèi)與您聯(lián)系!

幾個重要類

先看下Kafka Client的網(wǎng)絡層架構(gòu)。

Kafka Network層解析,還是有人把它說清楚了

本文主要分析的是Network層。

Network層有兩個重要的類:SelectorKafkaChannel

這兩個類和Java NIO層的java.nio.channels.SelectorChannel有點類似。

Selector幾個關(guān)鍵字段如下

//?jdk?nio中的Selector
java.nio.channels.Selector?nioSelector;
//?記錄當前Selector的所有連接信息
Map<String,?KafkaChannel>?channels;
//?已發(fā)送完成的請求
List<Send>?completedSends;
//?已收到的請求
List<NetworkReceive>?completedReceives;
//?還沒有完全收到的請求,對上層不可見
Map<KafkaChannel,?Deque<NetworkReceive>>?stagedReceives;
//?作為client端,調(diào)用connect連接遠端時返回true的連接
Set<SelectionKey>?immediatelyConnectedKeys;
//?已經(jīng)完成的連接
List<String>?connected;
//?一次讀取的最大大小
int?maxReceiveSize;

從網(wǎng)絡層來看kafka是分為client端(producer和consumer,broker作為從時也是client)和server端(broker)的。本文將分析client端是如何建立連接,以及收發(fā)數(shù)據(jù)的。server也是依靠SelectorKafkaChannel進行網(wǎng)絡傳輸。在Network層兩端的區(qū)別并不大。

建立連接

kafka的client端啟動時會調(diào)用Selector#connect(下文中如無特殊注明,均指org.apache.kafka.common.network.Selector)方法建立連接。

public?void?connect(String?id,?InetSocketAddress?address,?int?sendBufferSize,?int?receiveBufferSize)?throws?IOException?{
????if?(this.channels.containsKey(id))
????????throw?new?IllegalStateException("There?is?already?a?connection?for?id?"?+?id);
????//?創(chuàng)建一個SocketChannel
????SocketChannel?socketChannel?=?SocketChannel.open();
????//?設置為非阻塞模式
????socketChannel.configureBlocking(false);
????//?創(chuàng)建socket并設置相關(guān)屬性
????Socket?socket?=?socketChannel.socket();
????socket.setKeepAlive(true);
????if?(sendBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE)
????????socket.setSendBufferSize(sendBufferSize);
????if?(receiveBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE)
????????socket.setReceiveBufferSize(receiveBufferSize);
????socket.setTcpNoDelay(true);
????boolean?connected;
????try?{
????????//?調(diào)用SocketChannel的connect方法,該方法會向遠端發(fā)起tcp建連請求
????????//?因為是非阻塞的,所以該方法返回時,連接不一定已經(jīng)建立好(即完成3次握手)。連接如果已經(jīng)建立好則返回true,否則返回false。一般來說server和client在一臺機器上,該方法可能返回true。
????????connected?=?socketChannel.connect(address);
????}?catch?(UnresolvedAddressException?e)?{
????????socketChannel.close();
????????throw?new?IOException("Can't?resolve?address:?"?+?address,?e);
????}?catch?(IOException?e)?{
????????socketChannel.close();
????????throw?e;
????}
????//?對CONNECT事件進行注冊
????SelectionKey?key?=?socketChannel.register(nioSelector,?SelectionKey.OP_CONNECT);
????KafkaChannel?channel;
????try?{
????????//?構(gòu)造一個KafkaChannel
????????channel?=?channelBuilder.buildChannel(id,?key,?maxReceiveSize);
????}?catch?(Exception?e)?{
??????...
????}
????//?將kafkachannel綁定到SelectionKey上
????key.attach(channel);
????//?放入到map中,id是遠端服務器的名稱
????this.channels.put(id,?channel);
????//?connectct為true代表該連接不會再觸發(fā)CONNECT事件,所以這里要單獨處理
????if?(connected)?{
????????//?OP_CONNECT?won't?trigger?for?immediately?connected?channels
????????log.debug("Immediately?connected?to?node?{}",?channel.id());
????????//?加入到一個單獨的集合中
????????immediatelyConnectedKeys.add(key);
????????//?取消對該連接的CONNECT事件的監(jiān)聽
????????key.interestOps(0);
????}
}

這里的流程和標準的NIO流程差不多,需要單獨說下的是socketChannel#connect方法返回true的場景,該方法的注釋中有提到

*?<p>?If?this?channel?is?in?non-blocking?mode?then?an?invocation?of?this
*?method?initiates?a?non-blocking?connection?operation.??If?the?connection
*?is?established?immediately,?as?can?happen?with?a?local?connection,?then
*?this?method?returns?<tt>true</tt>.??Otherwise?this?method?returns
*?<tt>false</tt>?and?the?connection?operation?must?later?be?completed?by
*?invoking?the?{@link?#finishConnect?finishConnect}?method.

也就是說在非阻塞模式下,對于local connection,連接可能在馬上就建立好了,那該方法會返回true,對于這種情況,不會再觸發(fā)之后的connect事件。因此kafka用一個單獨的集合immediatelyConnectedKeys將這些特殊的連接記錄下來。在接下來的步驟會進行特殊處理。

之后會調(diào)用poll方法對網(wǎng)絡事件監(jiān)聽:

public?void?poll(long?timeout)?throws?IOException?{
...
//?select方法是對java.nio.channels.Selector#select的一個簡單封裝
int?readyKeys?=?select(timeout);
...
//?如果有就緒的事件或者immediatelyConnectedKeys非空
if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{
????//?對已就緒的事件進行處理,第2個參數(shù)為false
????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect);
????//?對immediatelyConnectedKeys進行處理。第2個參數(shù)為true
????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect);
}

addToCompletedReceives();

...
}

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創(chuàng)建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????//?如果當前處理的是immediatelyConnectedKeys集合的元素或處理的是CONNECT事件
????????if?(isImmediatelyConnected?||?key.isConnectable())?{
????????????//?finishconnect中會增加READ事件的監(jiān)聽
????????????if?(channel.finishConnect())?{
????????????????this.connected.add(channel.id());
????????????????this.sensors.connectionCreated.record();
????????????????...
????????????}?else
????????????????continue;
????????}

????????//?對于ssl的連接還有些額外的步驟
????????if?(channel.isConnected()?&&?!channel.ready())
????????????channel.prepare();

????????//?如果是READ事件
????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{
????????????NetworkReceive?networkReceive;
????????????while?((networkReceive?=?channel.read())?!=?null)
????????????????addToStagedReceives(channel,?networkReceive);
????????}

????????//?如果是WRITE事件
????????if?(channel.ready()?&&?key.isWritable())?{
????????????Send?send?=?channel.write();
????????????if?(send?!=?null)?{
????????????????this.completedSends.add(send);
????????????????this.sensors.recordBytesSent(channel.id(),?send.size());
????????????}
????????}

????????//?如果連接失效
????????if?(!key.isValid())
????????????close(channel,?true);

????}?catch?(Exception?e)?{
????????String?desc?=?channel.socketDescription();
????????if?(e?instanceof?IOException)
????????????log.debug("Connection?with?{}?disconnected",?desc,?e);
????????else
????????????log.warn("Unexpected?error?from?{};?closing?connection",?desc,?e);
????????close(channel,?true);
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

因為immediatelyConnectedKeys中的連接不會觸發(fā)CONNNECT事件,所以在poll時會單獨對immediatelyConnectedKeys的channel調(diào)用finishConnect方法。在明文傳輸模式下該方法會調(diào)用到PlaintextTransportLayer#finishConnect,其實現(xiàn)如下:

public?boolean?finishConnect()?throws?IOException?{
????//?返回true代表已經(jīng)連接好了
????boolean?connected?=?socketChannel.finishConnect();
????if?(connected)
????????//?取消監(jiān)聽CONNECt事件,增加READ事件的監(jiān)聽
????????key.interestOps(key.interestOps()?&?~SelectionKey.OP_CONNECT?|?SelectionKey.OP_READ);
????return?connected;
}

關(guān)于immediatelyConnectedKeys更詳細的內(nèi)容可以看看這里。

發(fā)送數(shù)據(jù)

kafka發(fā)送數(shù)據(jù)分為兩個步驟:

1.調(diào)用Selector#send將要發(fā)送的數(shù)據(jù)保存在對應的KafkaChannel中,該方法并沒有進行真正的網(wǎng)絡IO。

//?Selector#send
public?void?send(Send?send)?{
????String?connectionId?=?send.destination();
????//?如果所在的連接正在關(guān)閉中,則加入到失敗集合failedSends中
????if?(closingChannels.containsKey(connectionId))
????????this.failedSends.add(connectionId);
????else?{
????????KafkaChannel?channel?=?channelOrFail(connectionId,?false);
????????try?{
????????????channel.setSend(send);
????????}?catch?(CancelledKeyException?e)?{
????????????this.failedSends.add(connectionId);
????????????close(channel,?false);
????????}
????}
}

//KafkaChannel#setSend
public?void?setSend(Send?send)?{
????//?如果還有數(shù)據(jù)沒有發(fā)送出去則報錯
????if?(this.send?!=?null)
????????throw?new?IllegalStateException("Attempt?to?begin?a?send?operation?with?prior?send?operation?still?in?progress.");
????//?保存下來
????this.send?=?send;
????//?添加對WRITE事件的監(jiān)聽
????this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
  1. 調(diào)用Selector#poll,在第一步中已經(jīng)對該channel注冊了WRITE事件的監(jiān)聽,所以在當channel可寫時,會調(diào)用到pollSelectionKeys將數(shù)據(jù)真正的發(fā)送出去。

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創(chuàng)建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????...
?

????????//?如果是WRITE事件
????????if?(channel.ready()?&&?key.isWritable())?{
????????????//?真正的網(wǎng)絡寫
????????????Send?send?=?channel.write();
????????????//?一個Send對象可能會被拆成幾次發(fā)送,write非空代表一個send發(fā)送完成
????????????if?(send?!=?null)?{
????????????????//?completedSends代表已發(fā)送完成的集合
????????????????this.completedSends.add(send);
????????????????this.sensors.recordBytesSent(channel.id(),?send.size());
????????????}
????????}
		...
????}?catch?(Exception?e)?{
?????...
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

當可寫時,會調(diào)用KafkaChannel#write方法,該方法中會進行真正的網(wǎng)絡IO:

public?Send?write()?throws?IOException?{
????Send?result?=?null;
????if?(send?!=?null?&&?send(send))?{
????????result?=?send;
????????send?=?null;
????}
????return?result;
}
private?boolean?send(Send?send)?throws?IOException?{
????//?最終調(diào)用SocketChannel#write進行真正的寫
????send.writeTo(transportLayer);
????if?(send.completed())
????????//?如果寫完了,則移除對WRITE事件的監(jiān)聽
????????transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

????return?send.completed();
}

接收數(shù)據(jù)

如果遠端有發(fā)送數(shù)據(jù)過來,那調(diào)用poll方法時,會對接收到的數(shù)據(jù)進行處理。

public?void?poll(long?timeout)?throws?IOException?{
...
//?select方法是對java.nio.channels.Selector#select的一個簡單封裝
int?readyKeys?=?select(timeout);
...
//?如果有就緒的事件或者immediatelyConnectedKeys非空
if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{
????//?對已就緒的事件進行處理,第2個參數(shù)為false
????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect);
????//?對immediatelyConnectedKeys進行處理。第2個參數(shù)為true
????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect);
}

addToCompletedReceives();

...
}

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創(chuàng)建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????...
?

????????//?如果是READ事件
????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{
????????????NetworkReceive?networkReceive;
????????????//?read方法會從網(wǎng)絡中讀取數(shù)據(jù),但可能一次只能讀取一個req的部分數(shù)據(jù)。只有讀到一個完整的req的情況下,該方法才返回非null
????????????while?((networkReceive?=?channel.read())?!=?null)
????????????????//?將讀到的請求存在stagedReceives中
????????????????addToStagedReceives(channel,?networkReceive);
????????}
		...
????}?catch?(Exception?e)?{
?????...
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

private?void?addToStagedReceives(KafkaChannel?channel,?NetworkReceive?receive)?{
????if?(!stagedReceives.containsKey(channel))
????????stagedReceives.put(channel,?new?ArrayDeque<NetworkReceive>());

????Deque<NetworkReceive>?deque?=?stagedReceives.get(channel);
????deque.add(receive);
}

在之后的addToCompletedReceives方法中會對該集合進行處理。

private?void?addToCompletedReceives()?{
????if?(!this.stagedReceives.isEmpty())?{
????????Iterator<Map.Entry<KafkaChannel,?Deque<NetworkReceive>>>?iter?=?this.stagedReceives.entrySet().iterator();
????????while?(iter.hasNext())?{
????????????Map.Entry<KafkaChannel,?Deque<NetworkReceive>>?entry?=?iter.next();
????????????KafkaChannel?channel?=?entry.getKey();
????????????//?對于client端來說該isMute返回為false,server端則依靠該方法保證消息的順序
????????????if?(!channel.isMute())?{
????????????????Deque<NetworkReceive>?deque?=?entry.getValue();
????????????????addToCompletedReceives(channel,?deque);
????????????????if?(deque.isEmpty())
????????????????????iter.remove();
????????????}
????????}
????}
}
private?void?addToCompletedReceives(KafkaChannel?channel,?Deque<NetworkReceive>?stagedDeque)?{
????//?將每個channel的第一個NetworkReceive加入到completedReceives
????NetworkReceive?networkReceive?=?stagedDeque.poll();
????this.completedReceives.add(networkReceive);
????this.sensors.recordBytesReceived(channel.id(),?networkReceive.payload().limit());
}

讀出數(shù)據(jù)后,會先放到stagedReceives集合中,然后在addToCompletedReceives方法中對于每個channel都會從stagedReceives取出一個NetworkReceive(如果有的話),放入到completedReceives中。

這樣做的原因有兩點:

  1. 對于SSL的連接來說,其數(shù)據(jù)內(nèi)容是加密的,所以不能精準的確定本次需要讀取的數(shù)據(jù)大小,只能盡可能的多讀,這樣會導致可能會比請求的數(shù)據(jù)讀的要多。那如果該channel之后沒有數(shù)據(jù)可以讀,會導致多讀的數(shù)據(jù)將不會被處理。

  2. kafka需要確保一個channel上request被處理的順序是其發(fā)送的順序。因此對于每個channel而言,每次poll上層最多只能看見一個請求,當該請求處理完成之后,再處理其他的請求。在sever端,每次poll后都會將該channel給mute掉,即不再從該channel上讀取數(shù)據(jù)。當處理完成之后,才將該channelunmute,即之后可以從該socket上讀取數(shù)據(jù)。而client端則是通過InFlightRequests#canSendMore控制。

代碼中關(guān)于這段邏輯的注釋如下:

/*?In?the?"Plaintext"?setting,?we?are?using?socketChannel?to?read?&?write?to?the?network.?But?for?the?"SSL"?setting,
*?we?encrypt?the?data?before?we?use?socketChannel?to?write?data?to?the?network,?and?decrypt?before?we?return?the?responses.
*?This?requires?additional?buffers?to?be?maintained?as?we?are?reading?from?network,?since?the?data?on?the?wire?is?encrypted
*?we?won't?be?able?to?read?exact?no.of?bytes?as?kafka?protocol?requires.?We?read?as?many?bytes?as?we?can,?up?to?SSLEngine's
*?application?buffer?size.?This?means?we?might?be?reading?additional?bytes?than?the?requested?size.
*?If?there?is?no?further?data?to?read?from?socketChannel?selector?won't?invoke?that?channel?and?we've?have?additional?bytes
*?in?the?buffer.?To?overcome?this?issue?we?added?"stagedReceives"?map?which?contains?per-channel?deque.?When?we?are
*?reading?a?channel?we?read?as?many?responses?as?we?can?and?store?them?into?"stagedReceives"?and?pop?one?response?during
*?the?poll?to?add?the?completedReceives.?If?there?are?any?active?channels?in?the?"stagedReceives"?we?set?"timeout"?to?0
*?and?pop?response?and?add?to?the?completedReceives.

*?Atmost?one?entry?is?added?to?"completedReceives"?for?a?channel?in?each?poll.?This?is?necessary?to?guarantee?that
?????*?requests?from?a?channel?are?processed?on?the?broker?in?the?order?they?are?sent.?Since?outstanding?requests?added
?????*?by?SocketServer?to?the?request?queue?may?be?processed?by?different?request?handler?threads,?requests?on?each
?????*?channel?must?be?processed?one-at-a-time?to?guarantee?ordering.
*/

End

本文分析了kafka network層的實現(xiàn),在閱讀kafka源碼時,如果不把network層搞清楚會比較迷,比如req/resp的順序保障機制、真正進行網(wǎng)絡IO的不是send方法等等。

分享標題:KafkaNetwork層解析,還是有人把它說清楚了
轉(zhuǎn)載來源:http://aaarwkj.com/article8/godoop.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營銷推廣、建站公司外貿(mào)建站、服務器托管、關(guān)鍵詞優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設
亚洲av成人在线不卡| 欧美国产大片一区视频| 91国语对白在线观看| 午夜视频在线播放一区二区三区| 亚洲熟妇亚洲熟妇亚洲熟妇| 日本熟女肥臀一区二区| 国产av一区二区三区久久| 亚洲国产成在人网站天堂 | 岛国高清乱码中文字幕| 国语自产精品视频在线不卡| 日韩欧美一区二区三级| 免费人妻aⅴ中文字幕| 日本欧美三级高潮受不了| 国产成人亚洲欧美激情| 老司机看片午夜久久福利| 亚洲精品一区二区99| 国产精品成人一区二区艾草| 夜夜春久久天堂亚洲精品 | 国产一级无码免费视频| 日本一区二区手机在线| 日韩欧美亚洲另类激情一区| 天天天干夜夜添狠操美女| 国产激情视频一区二区三区| 老汉av免费在线观看| 蜜臀人妻四季av一区二区不卡 | 人妻大乳一区二区三区| 青青草av一区二区三区| 青青草原这里只有精品| 美女后入式在线观看| 日本不卡一二区不久精品免费| 日韩在线视频精品一区| 亚洲码与欧洲码一二三| 夫妻性生活视频全过程| 亚洲熟妇av一区二区三区| 精品爆白浆一区二区三区| 99热这里66只有精品| 日日夜夜久久国产精品| av中文字幕一区二区三区| 国精品午夜福利视频不卡| 91精品国产成人在线| 国产亚洲美女在线视频视频|