一、簡介
創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站制作、做網(wǎng)站、仙居網(wǎng)絡(luò)推廣、小程序開發(fā)、仙居網(wǎng)絡(luò)營銷、仙居企業(yè)策劃、仙居品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供仙居建站搭建服務(wù),24小時(shí)服務(wù)熱線:13518219792,官方網(wǎng)址:aaarwkj.com
1、消息傳輸流程
Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息。
Producer即生產(chǎn)者,向Kafka集群發(fā)送消息,在發(fā)送消息之前,會(huì)對(duì)消息進(jìn)行分類,即Topic,上圖展示了兩個(gè)producer發(fā)送了分類為topic1的消息,另外一個(gè)發(fā)送了topic2的消息。
Topic即主題,通過對(duì)消息指定主題可以將消息分類,消費(fèi)者可以只關(guān)注自己需要的Topic中的消息
Consumer即消費(fèi)者,消費(fèi)者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對(duì)這些消息進(jìn)行處理。
2、Topics/logs
一個(gè)Topic可以認(rèn)為是一類消息,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件。任何發(fā)布到此partition的消息都會(huì)被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個(gè)long型數(shù)字,它是唯一標(biāo)記一條消息。它唯一的標(biāo)記一條消息。kafka并沒有提供其他額外的索引機(jī)制來存儲(chǔ)offset,因?yàn)樵趉afka中幾乎不允許對(duì)消息進(jìn)行“隨機(jī)讀寫”。
談到kafka的存儲(chǔ),就不得不提到分區(qū),即partitions,創(chuàng)建一個(gè)topic時(shí),同時(shí)可以指定分區(qū)數(shù)目,分區(qū)數(shù)越多,其吞吐量也越大,但是需要的資源也越多,同時(shí)也會(huì)導(dǎo)致更高的不可用性,kafka在接收到生產(chǎn)者發(fā)送的消息之后,會(huì)根據(jù)均衡策略將消息存儲(chǔ)到不同的分區(qū)中。
kafka服務(wù)器消息存儲(chǔ)策略如圖
kafka和JMS(Java Message Service)實(shí)現(xiàn)(activeMQ)不同的是:即使消息被消費(fèi),消息仍然不會(huì)被立即刪除.日志文件將會(huì)根據(jù)broker中的配置要求,保留一定的時(shí)間之后刪除;比如log文件保留2天,那么兩天后,文件會(huì)被清除,無論其中的消息是否被消費(fèi).kafka通過這種簡單的手段,來釋放磁盤空間,以及減少消息消費(fèi)之后對(duì)文件內(nèi)容改動(dòng)的磁盤IO開支.
對(duì)于consumer而言,它需要保存消費(fèi)消息的offset,對(duì)于offset的保存和使用,有consumer來控制;當(dāng)consumer正常消費(fèi)消息時(shí),offset將會(huì)"線性"的向前驅(qū)動(dòng),即消息將依次順序被消費(fèi).事實(shí)上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值..(offset將會(huì)保存在zookeeper中,參見下文)
kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實(shí)現(xiàn)非常輕量級(jí),它們可以隨意離開,而不會(huì)對(duì)集群造成額外的影響.
partitions的設(shè)計(jì)目的有多個(gè).最根本原因是kafka基于文件存儲(chǔ).通過分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來避免文件尺寸達(dá)到單機(jī)磁盤的上限,每個(gè)partiton都會(huì)被當(dāng)前server(kafka實(shí)例)保存;可以將一個(gè)topic切分多任意多個(gè)partitions,來消息保存/消費(fèi)的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.(具體原理參見下文).
3、Distribution(分布)
一個(gè)Topic的多個(gè)partitions,被分布在kafka集群中的多個(gè)server上;每個(gè)server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個(gè)數(shù)(replicas),每個(gè)partition將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性.
基于replicated方案,那么就意味著需要對(duì)多個(gè)備份進(jìn)行調(diào)度;每個(gè)partition都有一個(gè)server為"leader";leader負(fù)責(zé)所有的讀寫操作,如果leader失效,那么將會(huì)有其他follower來接管(成為新的leader);follower只是單調(diào)的和leader跟進(jìn),同步消息即可..由此可見作為leader的server承載了全部的請(qǐng)求壓力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)"leader",kafka會(huì)將"leader"均衡的分散在每個(gè)實(shí)例上,來確保整體的性能穩(wěn)定.
Producers
Producer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于"round-robin"方式或者通過其他的一些算法等.
Consumers
本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過來說,每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi).
如果所有的consumer都具有相同的group,這種情況和queue模式很像;消息將會(huì)在consumers之間負(fù)載均衡.
如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會(huì)廣播給所有的消費(fèi)者.
在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi);每個(gè)group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過一個(gè)consumer可以消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí),消息是順序的.事實(shí)上,從Topic角度來說,消息仍不是有序的.
kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無法得到消息.
Guarantees
1) 發(fā)送到partitions中的消息將會(huì)按照它接收的順序追加到日志中
2) 對(duì)于消費(fèi)者而言,它們消費(fèi)消息的順序和日志中消息順序一致.
3) 如果Topic的"replicationfactor"為N,那么允許N-1個(gè)kafka實(shí)例失效。
與生產(chǎn)者的交互
生產(chǎn)者在向kafka集群發(fā)送消息的時(shí)候,可以通過指定分區(qū)來發(fā)送到指定的分區(qū)中,也可以通過指定均衡策略來將消息發(fā)送到不同的分區(qū)中,如果不指定,就會(huì)采用默認(rèn)的隨機(jī)均衡策略,將消息隨機(jī)的存儲(chǔ)到不同的分區(qū)中
與消費(fèi)者的交互
在消費(fèi)者消費(fèi)消息時(shí),kafka使用offset來記錄當(dāng)前消費(fèi)的位置,在kafka的設(shè)計(jì)中,可以有多個(gè)不同的group來同時(shí)消費(fèi)同一個(gè)topic下的消息,如圖,我們有兩個(gè)不同的group同時(shí)消費(fèi),他們的的消費(fèi)的記錄位置offset各不項(xiàng)目,不互相干擾。
對(duì)于一個(gè)group而言,消費(fèi)者的數(shù)量不應(yīng)該多余分區(qū)的數(shù)量,因?yàn)樵谝粋€(gè)group中,每個(gè)分區(qū)至多只能綁定到一個(gè)消費(fèi)者上,即一個(gè)消費(fèi)者可以消費(fèi)多個(gè)分區(qū),一個(gè)分區(qū)只能給一個(gè)消費(fèi)者消費(fèi)
因此,若一個(gè)group中的消費(fèi)者數(shù)量大于分區(qū)數(shù)量的話,多余的消費(fèi)者將不會(huì)收到任何消息。
二、使用場(chǎng)景
1、Messaging
對(duì)于一些常規(guī)的消息系統(tǒng),kafka是個(gè)不錯(cuò)的選擇;partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性和性能優(yōu)勢(shì).不過到目前為止,我們應(yīng)該很清楚認(rèn)識(shí)到,kafka并沒有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級(jí)特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對(duì)可靠(比如,消息重發(fā),消息發(fā)送丟失等)
2、Websit activity tracking
kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁/用戶操作等信息發(fā)送到kafka中.并實(shí)時(shí)監(jiān)控,或者離線統(tǒng)計(jì)分析等
3、Log Aggregation
kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對(duì)producer端而言,幾乎感覺不到性能的開支.此時(shí)consumer端可以使hadoop等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng).
三、設(shè)計(jì)原理
kafka的設(shè)計(jì)初衷是希望作為一個(gè)統(tǒng)一的信息收集平臺(tái),能夠?qū)崟r(shí)的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯(cuò)能力。
1、持久性
2、性能
3、生產(chǎn)者
4、消費(fèi)者
5、消息傳送機(jī)制
6、復(fù)制備份
7、日志
8、分配
四、主要配置
1、Broker配置
2、Consumer主要配置
3、Producer主要配置
五、kafka集群搭建步驟
1、系統(tǒng)環(huán)境
主機(jī)名 | 系統(tǒng) | zookeeper版本 | IP |
master | CentOS7.4 | 3.4.12 | 192.168.56.129 |
slave1 | CentOS7.4 | 3.4.12 | 192.168.56.130 |
slave2 | CentOS7.4 | 3.4.12 | 192.168.56.131 |
2、暫時(shí)關(guān)閉防火墻和selinux
3、軟件下載
下載地址:http://kafka.apache.org/downloads.html
備注:下載最新的二進(jìn)制tgz包
4、搭建zookeeper集群
備注:小伙伴可以參考上一篇文章即可
5、kafka集群
5.1、根據(jù)上面的zookeeper集群服務(wù)器,把kafka上傳到/home下
5.2、解壓
[root@master home]# tar -zxvf kafka_2.12-2.0.0.tgz
[root@master home]# mv kafka_2.12-2.0.0 kafka01
5.3、配置文件
[root@master home]# cd /home/kafka01/config/
備注:server.properties文件里的broker.id,log.dirs,zookeeper.connect必須根據(jù)實(shí)際情況進(jìn)行修改,其他項(xiàng)根據(jù)需要自行斟酌,master配置如下:
broker.id=1
port=9091
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/var/log/kafka/kafka-logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
num.replica.fetchers=2
log.cleanup.interval.mins=10
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
5.4、啟動(dòng)服務(wù)(master)----前提是三個(gè)節(jié)點(diǎn)的zookeeper已啟動(dòng)
[root@master kafka01]# ./bin/kafka-server-start.sh config/server.properties &
補(bǔ)充:
問題:&可以使程序在后臺(tái)運(yùn)行,但一旦斷開ssh終端,后臺(tái)Java程序也會(huì)終止。
解決辦法:使用shell腳本啟動(dòng)
[root@master kafka01]# cat start.sh
#!/bin/bash
cd /home/kafka01/
./bin/kafka-server-start.sh config/server.properties &
exit
授權(quán),運(yùn)行即可
[root@master kafka01]#chmod +x start.sh
5.5、配置slave1和slave2
slave1配置如下:
broker.id=2
port=9092
log.dirs=/var/log/kafka
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
啟動(dòng)即可
slave2配置如下:
broker.id=3
port=9093
log.dirs=/var/log/kafka
zookeeper.connect=192.168.56.129:2181,192.168.56.130:2181,192.168.56.131:2181
啟動(dòng)即可
6、測(cè)試
Kafka通過topic對(duì)同一類的數(shù)據(jù)進(jìn)行管理,同一類的數(shù)據(jù)使用同一個(gè)topic可以在處理數(shù)據(jù)時(shí)更加的便捷
6.1、創(chuàng)建一個(gè)Topic
[root@master kafka01]# bin/kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --replication-factor 1 --partitions 1 --topic test
查看
[root@master kafka01]# bin/kafka-topics.sh --list --zookeeper 192.168.56.129:2181
6.2、創(chuàng)建一個(gè)消息消費(fèi)者
[root@master kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.129:9091 --topic test --from-beginning
消費(fèi)者創(chuàng)建完成之后,因?yàn)檫€沒有發(fā)送任何數(shù)據(jù),因此這里在執(zhí)行后沒有打印出任何數(shù)據(jù)
不過別著急,不要關(guān)閉這個(gè)終端,打開一個(gè)新的終端,接下來我們創(chuàng)建第一個(gè)消息生產(chǎn)者
6.3、創(chuàng)建一個(gè)消息生產(chǎn)者
在kafka解壓目錄打開一個(gè)新的終端,輸入
[root@master kafka01]# bin/kafka-console-producer.sh --broker-list 192.168.56.129:9091 --topic test
在發(fā)送完消息之后,可以回到我們的消息消費(fèi)者終端中,可以看到,終端中已經(jīng)打印出了我們剛才發(fā)送的消息
zookeeper查看topic
到此即可,共同進(jìn)步之路?。。。?!
新聞名稱:kafka分布式集群
轉(zhuǎn)載來于:http://aaarwkj.com/article24/gpjece.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、建站公司、網(wǎng)站導(dǎo)航、微信公眾號(hào)、網(wǎng)站制作、全網(wǎng)營銷推廣
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)