本篇內(nèi)容主要講解“什么是go-stash組件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“什么是go-stash組件”吧!
創(chuàng)新互聯(lián)公司:于2013年開始為各行業(yè)開拓出企業(yè)自己的“網(wǎng)站建設(shè)”服務(wù),為上1000家公司企業(yè)提供了專業(yè)的成都網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁設(shè)計和網(wǎng)站推廣服務(wù), 按需求定制網(wǎng)站由設(shè)計師親自精心設(shè)計,設(shè)計的效果完全按照客戶的要求,并適當(dāng)?shù)奶岢龊侠淼慕ㄗh,擁有的視覺效果,策劃師分析客戶的同行競爭對手,根據(jù)客戶的實際情況給出合理的網(wǎng)站構(gòu)架,制作客戶同行業(yè)具有領(lǐng)先地位的。
今天來介紹 go-zero
生態(tài)的另一個組件 go-stash
。這是一個 logstash
的 Go 語言替代版,我們用 go-stash
相比原先的 logstash
節(jié)省了2/3的服務(wù)器資源。如果你在用 logstash
,不妨試試,也可以看看基于 go-zero
實現(xiàn)這樣的工具是多么的容易,這個工具作者僅用了兩天時間。
先從它的配置中,我們來看看設(shè)計架構(gòu)。
Clusters: - Input: Kafka: # Kafka 配置 --> 聯(lián)動 go-queue Filters: # filter action - Action: drop - Action: remove_field - Action: transfer Output: ElasticSearch: # es 配置 {host, index}
看配置名:kafka
是數(shù)據(jù)輸出端,es
是數(shù)據(jù)輸入端,filter
抽象了數(shù)據(jù)處理過程。
對,整個 go-stash
就是如 config 配置中顯示的,所見即所得。
從 stash.go
的啟動流程大致分為幾個部分。因為可以配置多個 cluster
,那從一個 cluster
分析:
建立與 es
的連接【傳入 es
配置】
構(gòu)建 filter processors
【es
前置處理器,做數(shù)據(jù)過濾以及處理,可以設(shè)置多個】
完善對 es
中 索引配置,啟動 handle
,同時將 filter
加入handle【處理輸入輸出】
連接下游的 kafka
,將上面創(chuàng)建的 handle
傳入,完成 kafka
和 es
之間的數(shù)據(jù)消費和數(shù)據(jù)寫入
在上面架構(gòu)圖中,中間的 filter
只是從 config 中看到,其實更詳細(xì)是 MessageHandler
的一部分,做數(shù)據(jù)過濾和轉(zhuǎn)換,下面來說說這塊。
> 以下代碼:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go
type MessageHandler struct { writer *es.Writer indexer *es.Index filters []filter.FilterFunc }
這個就對應(yīng)上面說的,filter
只是其中一部分,在結(jié)構(gòu)上 MessageHandler
是對接下游 es
,但是沒有看到對 kafka
的操作。
別急,從接口設(shè)計上 MessageHandler
實現(xiàn)了 go-queue
中 ConsumeHandler
接口。
這里,上下游就串聯(lián)了:
MessageHandler
接管了 es
的操作,負(fù)責(zé)數(shù)據(jù)處理到數(shù)據(jù)寫入
對上實現(xiàn)了 kafka
的 Consume
操作。這樣在消費過程中執(zhí)行 handler
的操作,從而寫入 es
實際上,Consume()
也是這么處理的:
func (mh *MessageHandler) Consume(_, val string) error { var m map[string]interface{} // 反序列化從 kafka 中的消息 if err := jsoniter.Unmarshal([]byte(val), &m); err != nil { return err } // es 寫入index配置 index := mh.indexer.GetIndex(m) // filter 鏈?zhǔn)教幚怼疽驗闆]有泛型,整個處理都是 `map進map出`】 for _, proc := range mh.filters { if m = proc(m); m == nil { return nil } } bs, err := jsoniter.Marshal(m) if err != nil { return err } // es 寫入 return mh.writer.Write(index, string(bs)) }
說完了數(shù)據(jù)處理,以及上下游的連接點。但是數(shù)據(jù)要從 kafka -> es
,數(shù)據(jù)流出這個動作從 kafka
角度看,應(yīng)該是由開發(fā)者主動 pull data from kafka
。
那么數(shù)據(jù)流是怎么動起來?我們回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go
其實 啟動 整個流程中,其實就是一個組合模式:
func main() { // 解析命令行參數(shù),啟動優(yōu)雅退出 ... // service 組合模式 group := service.NewServiceGroup() defer group.Stop() for _, processor := range c.Clusters { // 連接es ... // filter processors 構(gòu)建 ... // 準(zhǔn)備es的寫入操作 {寫入的index, 寫入器writer} handle := handler.NewHandler(writer, indexer) handle.AddFilters(filters...) handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) // 按照配置啟動kafka,并將消費操作傳入,同時加入組合器 for _, k := range toKqConf(processor.Input.Kafka) { group.Add(kq.MustNewQueue(k, handle)) } } // 啟動這個組合器 group.Start() }
整個數(shù)據(jù)流,就和這個 group
組合器有關(guān)了。
group.Start() |- group.doStart() |- [service.Start() for service in group.services]
那么說明加入 group
的 service
都是實現(xiàn) Start()
。也就是說 kafka
端的啟動邏輯在 Start()
:
func (q *kafkaQueue) Start() { q.startConsumers() q.startProducers() q.producerRoutines.Wait() close(q.channel) q.consumerRoutines.Wait() }
啟動 kafka
消費程序
啟動 kafka
消費拉取端【可能會被名字迷惑,實際上是從 kafka
拉取消息到 q.channel
】
消費程序終止,收尾工作
而我們傳入 kafka
中的 handler
,上文說過其實是 Consume
,而這個方法就是在 q.startConsumers()
中執(zhí)行的:
q.startConsumers() |- [q.consumeOne(key, value) for msg in q.channel] |- q.handler.Consume(key, value)
這樣整個數(shù)據(jù)流就徹底串起來了:
到此,相信大家對“什么是go-stash組件”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
新聞標(biāo)題:什么是go-stash組件
標(biāo)題鏈接:http://aaarwkj.com/article8/pegjip.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、網(wǎng)站設(shè)計、定制開發(fā)、品牌網(wǎng)站制作、面包屑導(dǎo)航、品牌網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)