這篇文章主要講解了“怎么理解kafka分區(qū)、生產(chǎn)和消費(fèi)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“怎么理解kafka分區(qū)、生產(chǎn)和消費(fèi)”吧!
周村網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司,周村網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為周村上千多家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的周村做網(wǎng)站的公司定做!
分區(qū)規(guī)則指的是將每個(gè)Topic劃分成多個(gè)分區(qū)(Partition),每個(gè)分區(qū)是一組有序的消息日志,生產(chǎn)者生產(chǎn)的每條消息只會(huì)被發(fā)送到其中一個(gè)分區(qū)。
分區(qū) (Partition) 都是一個(gè)有序的、不可變的數(shù)據(jù)序列,消息數(shù)據(jù)被不斷的添加到序列的尾部。分區(qū)中的每一條消息數(shù)據(jù)都被賦予了一個(gè)連續(xù)的數(shù)字ID,即偏移量 (offset) ,用于唯一標(biāo)識(shí)分區(qū)中的每條消息數(shù)據(jù)。
分區(qū)(Partition)的作用就是提供負(fù)載均衡的能力,單個(gè)topic的不同分區(qū)可存儲(chǔ)在相同或不同節(jié)點(diǎn)機(jī)上,為實(shí)現(xiàn)系統(tǒng)的高伸縮性(Scalability),不同的分區(qū)被放置到不同節(jié)點(diǎn)的機(jī)器上,各節(jié)點(diǎn)機(jī)獨(dú)立地執(zhí)行各自分區(qū)的讀寫任務(wù),如果性能不足,可通過添加新的節(jié)點(diǎn)機(jī)器來增加整體系統(tǒng)的吞吐量。
Kafka分區(qū)下數(shù)據(jù)使用消息日志(Log)方式保存數(shù)據(jù),具體方式是在磁盤上創(chuàng)建只能追加寫(Append-only)消息的物理文件。因?yàn)橹荒茏芳訉懭?,因此避免了緩慢的隨機(jī)I/O操作,改為性能較好的順序I/O寫操作。Kafka日志文件分為多個(gè)日志段(Log Segment),消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿一個(gè)日志段后Kafka會(huì)自動(dòng)切分出一個(gè)新的日志段,并將舊的日志段封存。
Kafka將消息數(shù)據(jù)根據(jù)Partition進(jìn)行存儲(chǔ),Partition分為若干Segment,每個(gè)Segment的大小相等。Segment由index file、log file、timeindex file等組成,后綴為".index"和".log",分別表示為Segment索引文件、數(shù)據(jù)文件,每一個(gè)Segment存儲(chǔ)著多條信息。
分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)的算法。Kafka提供默認(rèn)的分區(qū)策略,同時(shí)支持自定義分區(qū)策略。
Kafka 默認(rèn)分區(qū)策略同時(shí)實(shí)現(xiàn)了兩種策略:如果指定Key,那么默認(rèn)實(shí)現(xiàn)按消息鍵保序策略;如果沒有指定Key,則使用輪詢策略
輪詢策略(Round-robin),即順序分配策略。如果一個(gè)Topic有3個(gè)分區(qū),則第1條消息被發(fā)送到分區(qū)0,第2條被發(fā)送到分區(qū)1,第3條被發(fā)送到分區(qū)2,以此類推。當(dāng)生產(chǎn)第4條消息時(shí)又會(huì)重新輪詢將其分配到分區(qū)0。
輪詢策略是Kafka Java生產(chǎn)者API默認(rèn)提供的分區(qū)策略。如果未指定partitioner.class參數(shù),那么生產(chǎn)者程序會(huì)按照輪詢的方式在Topic的所有分區(qū)間均勻地存儲(chǔ)消息。輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),能保證消息最大限度地被平均分配到所有分區(qū)上。
隨機(jī)策略(Randomness)是將消息隨機(jī)地放置到任意一個(gè)分區(qū)上。如果要實(shí)現(xiàn)隨機(jī)策略版的partition方法,Java版如下:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitions.size());
先計(jì)算出Topic的總分區(qū)數(shù),然后隨機(jī)地返回一個(gè)小于分區(qū)數(shù)的正整數(shù)。隨機(jī)策略本質(zhì)上是力求將數(shù)據(jù)均勻地分散到各個(gè)分區(qū),但實(shí)際表現(xiàn)要遜于輪詢策略,如果追求數(shù)據(jù)的均勻分布,推薦使用輪詢策略。
Kafka允許為每條消息定義消息鍵,簡(jiǎn)稱為Key。Key可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,如客戶代碼、部門編號(hào)或是業(yè)務(wù)ID等,也可以用來表征消息元數(shù)據(jù)。一旦消息被定義了Key,就可以保證同一個(gè)Key的所有消息都進(jìn)入到相同的分區(qū)中。
實(shí)現(xiàn)分區(qū)策略的partition方法只需要兩行代碼即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size();
基于地理位置的分區(qū)策略通常只針對(duì)大規(guī)模的Kafka集群,特別是跨城市、跨國家甚至跨大洲的集群。假設(shè)天貓計(jì)劃為每個(gè)新注冊(cè)用戶提供一份注冊(cè)禮品,比如歐美的用戶注冊(cè)天貓時(shí)可以免費(fèi)得到一臺(tái)iphone SE手機(jī),而中國的新注冊(cè)用戶可以得到一臺(tái)華為P40 Pro。為了實(shí)現(xiàn)相應(yīng)的注冊(cè)業(yè)務(wù)邏輯,只需要?jiǎng)?chuàng)建一個(gè)雙分區(qū)的主題,然后再創(chuàng)建兩個(gè)消費(fèi)者程序分別處理歐美和中國用戶的注冊(cè)用戶邏輯即可,同時(shí)必須把不同地理位置的用戶注冊(cè)的消息發(fā)送到不同機(jī)房中,因?yàn)樘幚碜?cè)消息的消費(fèi)者程序只可能在某一個(gè)機(jī)房中啟動(dòng)著?;诘乩砦恢玫姆謪^(qū)策略可以根據(jù)Broker的IP地址實(shí)現(xiàn)定制化的分區(qū)策略。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return partitions.stream().filter(p -> isChina(p.leader().host())).map(PartitionInfo::partition).findAny().get();
可以從所有分區(qū)中找出Leader副本在中國的所有分區(qū),然后隨機(jī)挑選一個(gè)進(jìn)行消息發(fā)送。
如果要自定義分區(qū)策略,需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class。編寫生產(chǎn)者程序時(shí),可以編寫一個(gè)具體的類實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner
接口(partition()和close()),通常只需要實(shí)現(xiàn)最重要的partition方法。
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
topic、key、keyBytes、value和valueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當(dāng)前Kafka集群共有多少主題、多少Broker等)。設(shè)置partitioner.class參數(shù)為自己實(shí)現(xiàn)類的Full Qualified Name,生產(chǎn)者程序就會(huì)按照自定義分區(qū)策略的代碼邏輯對(duì)消息進(jìn)行分區(qū)。
無論消息是否被消費(fèi),kafka都會(huì)保留所有消息,同時(shí)定期檢查舊的日志段是否能夠被刪除,從而回收磁盤空間,刪除策略有兩種:
基于時(shí)間:log.retention.hours=168
基于大?。簂og.retention.bytes=1073741824
需要注意的是,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高Kafka 性能無關(guān)。
Kafka 2.1.0版本前,支持GZIP、Snappy、LZ4三種壓縮算法。2.1.0版本開始正式支持Zstandard算法(簡(jiǎn)寫為zstd ,F(xiàn)acebook開源的一個(gè)壓縮算法),該算法能夠提供超高的壓縮比(compression ratio)。壓縮算法可以使用壓縮比和壓縮/解壓縮吞吐量?jī)蓚€(gè)指標(biāo)進(jìn)行衡量。不同壓縮算法的性能比較如下:
生產(chǎn)環(huán)境中,GZIP、Snappy、LZ4、zstd性能表現(xiàn)各有千秋,在吞吐量方面:LZ4 > Snappy > zstd > GZIP;在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
如果要啟用Producer端的壓縮,Producer程序運(yùn)行機(jī)器上的CPU資源必須充足。除了CPU資源充足,如果生產(chǎn)環(huán)境中帶寬資源有限,也建議Producer端開啟壓縮。通常,帶寬比CPU和內(nèi)存要昂貴的多,因此千兆網(wǎng)絡(luò)中Kafka集群帶寬資源耗盡很容易出現(xiàn)。如果客戶端機(jī)器CPU資源富余,建議Producer端開啟zstd壓縮,可以極大地節(jié)省網(wǎng)絡(luò)資源消耗。對(duì)于解壓縮,需要避免非正常的解壓縮,如消息格式轉(zhuǎn)換的解壓縮操作、Broker與Producer解壓縮算法不一致。
Producer發(fā)送壓縮消息到Broker后,Broker會(huì)原封不動(dòng)保存。當(dāng)Consumer程序請(qǐng)求消息時(shí),Broker 會(huì)原樣發(fā)出,當(dāng)消息到達(dá)Consumer端后,Consumer自行解壓縮消息。Kafka會(huì)將使用的壓縮算法封裝進(jìn)消息集合中,當(dāng)Consumer讀取到消息集合時(shí),會(huì)知道消息使用的壓縮算法。除了在Consumer端解壓縮,Broker端也會(huì)進(jìn)行解壓縮,每個(gè)壓縮過的消息集合在Broker端寫入時(shí)都要發(fā)生解壓縮操作,對(duì)消息執(zhí)行各種驗(yàn)證。解壓縮對(duì)Broker端性能是有一定影響的。
如果將Topic設(shè)置成單分區(qū),該Topic的所有的消息都只在一個(gè)分區(qū)內(nèi)讀寫,保證全局的順序性,但將喪失Kafka多分區(qū)帶來的高吞吐量和負(fù)載均衡的性能優(yōu)勢(shì)。
多分區(qū)消息保序的方法是按消息鍵保序策略,根據(jù)業(yè)務(wù)提取出需要保序的消息的邏輯主體,并建立消息標(biāo)志位ID,,對(duì)標(biāo)志位設(shè)定專門的分區(qū)策略,保證同一標(biāo)志位的所有消息都發(fā)送到同一分區(qū),既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來的搞吞吐量。
說明:消息重試只是簡(jiǎn)單將消息重新發(fā)送到原來的分區(qū),不會(huì)重新選擇分區(qū)。
kafka只能保證分區(qū)內(nèi)有序,無法保證分區(qū)間有序,所以消費(fèi)時(shí),數(shù)據(jù)是相對(duì)有序的。
在通過API方式發(fā)布消息時(shí),生產(chǎn)者是以Record為消息進(jìn)行發(fā)布的。Record中包含key與value,value才是消息本身,而key用于路由消息所要存放Partition。消息要寫入到哪個(gè)Partition并不是隨機(jī)的,而是由路由策略決定。
指定Partition,直接寫入指定Partition。
沒有指定Partition但指定了key,則通過對(duì)key的hash值與Partition數(shù)量取模,結(jié)果就是要選出的Partition索引。
Partition和key都未指定,則使用輪詢算法選出一個(gè)Partition。
增加分區(qū)時(shí),Partition內(nèi)的消息不會(huì)重新進(jìn)行分配,隨著數(shù)據(jù)繼續(xù)寫入,新分區(qū)才會(huì)參與再平衡。
Producer先通過分區(qū)策略確定數(shù)據(jù)錄入的partition,再從Zookeeper中找到Partition的Leader
Producer將消息發(fā)送給分區(qū)的Leader。
Leader將消息接入本地的Log,并通知ISR(In-sync Replicas,副本同步列表)的Followers。
ISR中的Followers從Leader中pull消息,寫入本地Log后向Leader發(fā)送ACK(消息發(fā)送確認(rèn)機(jī)制)。
Leader收到所有ISR中的Followers的ACK后,增加HW(high watermark,最后commit 的offset)并向Producer發(fā)送ACK,表示消息寫入成功。
必須使用producer.send(msg, callback)接口發(fā)送消息。
Producer端設(shè)置acks參數(shù)值為all。acks參數(shù)值為all表示ISR中所有Broker副本都接收到消息,消息才算已提交。
設(shè)置Producer端retries參數(shù)值為一個(gè)較大值,表示Producer自動(dòng)重試次數(shù)。當(dāng)出現(xiàn)網(wǎng)絡(luò)瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)Producer能夠自動(dòng)重試消息發(fā)送,避免消息丟失。
設(shè)置Broker端unclean.leader.election.enable = false,unclean.leader.election.enable參數(shù)用于控制有資格競(jìng)選分區(qū)Leader的Broker。如果一個(gè)Broker落后原Leader太多,那么成為新Leader必然會(huì)造成消息丟失。因此,要將unclean.leader.election.enable參數(shù)設(shè)置成false。
設(shè)置Broker端參數(shù)replication.factor >= 3,將消息保存多份副本。
設(shè)置Broker參數(shù)min.insync.replicas > 1,保證ISR中Broker副本的最少個(gè)數(shù),在acks=-1時(shí)才生效。設(shè)置成大于1可以提升消息持久性,生產(chǎn)環(huán)境中不能使用默認(rèn)值 1。
必須確保replication.factor > min.insync.replicas,如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)無法正常工作。推薦設(shè)置成replication.factor = min.insync.replicas + 1。
確保消息消費(fèi)完成再提交。設(shè)置Consumer端參數(shù)enable.auto.commit為false,并采用手動(dòng)提交位移的方式。
Producer端攔截器實(shí)現(xiàn)類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口有兩個(gè)核心的方法:
onSend:在消息發(fā)送前被調(diào)用。
onAcknowledgement:在消息成功提交或發(fā)送失敗后被調(diào)用。onAcknowledgement 調(diào)用要早于發(fā)送回調(diào)通知callback的調(diào)用。onAcknowledgement與onSend 方法不是在同一個(gè)線程中被調(diào)用,因此如果兩個(gè)方法中使用了某個(gè)共享可變對(duì)象,要保證線程安全。
假設(shè)第一個(gè)攔截器的完整類路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個(gè)攔截器是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,Producer指定攔截器的Java代碼示例如下:
Properties props = new Properties(); List<String> interceptors = new ArrayList<>(); interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Consumer向Broker提交連接請(qǐng)求,連接的Broker會(huì)向其發(fā)送Broker Controller的通信URL,即配置文件中的listeners地址;
當(dāng)Consumer指定了要消費(fèi)的Topic后,會(huì)向Broker Controller發(fā)送消費(fèi)請(qǐng)求;
Broker Controller會(huì)為Consumer分配一個(gè)或幾個(gè)Partition Leader,并將Partition的當(dāng)前offset發(fā)送給Consumer;
Consumer會(huì)按照Broker Controller分配的Partition對(duì)其中的消息進(jìn)行消費(fèi);
當(dāng)Consumer消費(fèi)完消息后,Consumer會(huì)向Broker發(fā)送一個(gè)消息已經(jīng)被消費(fèi)反饋,即消息的offset;
在Broker接收到Consumer的offset后,會(huì)更新相應(yīng)的__consumer_offset中;
Consumer攔截器的實(shí)現(xiàn)類要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor接口,ConsumerInterceptor有兩個(gè)核心方法。
onConsume:在消息返回給Consumer程序前調(diào)用。在開始正式處理消息前,攔截器會(huì)先做一些處理,再返回給Consumer。
onCommit:Consumer在提交位移后調(diào)用,可以進(jìn)行一些打日志操作等。
同一個(gè)Consumer重復(fù)消費(fèi)
當(dāng)Consumer由于消費(fèi)能力低而引發(fā)了消費(fèi)超時(shí),則可能會(huì)形成重復(fù)消費(fèi)。
在某數(shù)據(jù)剛好消費(fèi)完畢,但正準(zhǔn)備提交offset時(shí),消費(fèi)時(shí)間超時(shí),則Broker認(rèn)為消息未消費(fèi)成功,產(chǎn)生重復(fù)消費(fèi)問題。
其解決方案:延長(zhǎng)offset提交時(shí)間。
不同的Consumer重復(fù)消費(fèi)
當(dāng)Consumer消費(fèi)了消息,但還沒有提交offset時(shí)宕機(jī),則已經(jīng)被消費(fèi)過的消息會(huì)被重復(fù)消費(fèi)。
感謝各位的閱讀,以上就是“怎么理解kafka分區(qū)、生產(chǎn)和消費(fèi)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)怎么理解kafka分區(qū)、生產(chǎn)和消費(fèi)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
分享文章:怎么理解kafka分區(qū)、生產(chǎn)和消費(fèi)
網(wǎng)頁路徑:http://aaarwkj.com/article10/igoodo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、標(biāo)簽優(yōu)化、電子商務(wù)、網(wǎng)站營(yíng)銷、域名注冊(cè)、品牌網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)