欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

什么是go-stash組件

本篇內(nèi)容主要講解“什么是go-stash組件”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“什么是go-stash組件”吧!

創(chuàng)新互聯(lián)公司:于2013年開始為各行業(yè)開拓出企業(yè)自己的“網(wǎng)站建設(shè)”服務(wù),為上1000家公司企業(yè)提供了專業(yè)的成都網(wǎng)站制作、做網(wǎng)站、網(wǎng)頁設(shè)計和網(wǎng)站推廣服務(wù), 按需求定制網(wǎng)站由設(shè)計師親自精心設(shè)計,設(shè)計的效果完全按照客戶的要求,并適當(dāng)?shù)奶岢龊侠淼慕ㄗh,擁有的視覺效果,策劃師分析客戶的同行競爭對手,根據(jù)客戶的實際情況給出合理的網(wǎng)站構(gòu)架,制作客戶同行業(yè)具有領(lǐng)先地位的。

前言

今天來介紹 go-zero 生態(tài)的另一個組件 go-stash。這是一個 logstash 的 Go 語言替代版,我們用 go-stash 相比原先的 logstash 節(jié)省了2/3的服務(wù)器資源。如果你在用 logstash,不妨試試,也可以看看基于 go-zero 實現(xiàn)這樣的工具是多么的容易,這個工具作者僅用了兩天時間。

整體架構(gòu)

先從它的配置中,我們來看看設(shè)計架構(gòu)。

Clusters:
  - Input:
      Kafka:
        # Kafka 配置 --> 聯(lián)動 go-queue
    Filters:
    	# filter action
      - Action: drop            
      - Action: remove_field
      - Action: transfer      
    Output:
      ElasticSearch:
        # es 配置 {host, index}

看配置名:kafka 是數(shù)據(jù)輸出端,es 是數(shù)據(jù)輸入端,filter 抽象了數(shù)據(jù)處理過程。

對,整個 go-stash 就是如 config 配置中顯示的,所見即所得。

什么是go-stash組件

啟動

stash.go 的啟動流程大致分為幾個部分。因為可以配置多個 cluster,那從一個 cluster 分析:

  1. 建立與 es 的連接【傳入 es 配置】

  2. 構(gòu)建 filter processorses 前置處理器,做數(shù)據(jù)過濾以及處理,可以設(shè)置多個】

  3. 完善對 es 中 索引配置,啟動 handle ,同時將 filter 加入handle【處理輸入輸出】

  4. 連接下游的 kafka,將上面創(chuàng)建的 handle 傳入,完成 kafkaes 之間的數(shù)據(jù)消費和數(shù)據(jù)寫入

MessageHandler

在上面架構(gòu)圖中,中間的 filter 只是從 config 中看到,其實更詳細(xì)是 MessageHandler 的一部分,做數(shù)據(jù)過濾和轉(zhuǎn)換,下面來說說這塊。

> 以下代碼:https://github.com/tal-tech/go-stash/tree/master/stash/handler/handler.go

type MessageHandler struct {
	writer  *es.Writer
	indexer *es.Index
	filters []filter.FilterFunc
}

這個就對應(yīng)上面說的,filter 只是其中一部分,在結(jié)構(gòu)上 MessageHandler 是對接下游 es ,但是沒有看到對 kafka 的操作。

別急,從接口設(shè)計上 MessageHandler 實現(xiàn)了 go-queueConsumeHandler 接口。

這里,上下游就串聯(lián)了:

  1. MessageHandler 接管了 es 的操作,負(fù)責(zé)數(shù)據(jù)處理到數(shù)據(jù)寫入

  2. 對上實現(xiàn)了 kafkaConsume 操作。這樣在消費過程中執(zhí)行 handler 的操作,從而寫入 es

實際上,Consume() 也是這么處理的:

func (mh *MessageHandler) Consume(_, val string) error {
	var m map[string]interface{}
  // 反序列化從 kafka 中的消息
	if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
		return err
	}
	// es 寫入index配置
	index := mh.indexer.GetIndex(m)
  // filter 鏈?zhǔn)教幚怼疽驗闆]有泛型,整個處理都是 `map進map出`】
	for _, proc := range mh.filters {
		if m = proc(m); m == nil {
			return nil
		}
	}
	bs, err := jsoniter.Marshal(m)
	if err != nil {
		return err
	}
	// es 寫入
	return mh.writer.Write(index, string(bs))
}

數(shù)據(jù)流

說完了數(shù)據(jù)處理,以及上下游的連接點。但是數(shù)據(jù)要從 kafka -> es ,數(shù)據(jù)流出這個動作從 kafka 角度看,應(yīng)該是由開發(fā)者主動 pull data from kafka。

那么數(shù)據(jù)流是怎么動起來?我們回到主程序 https://github.com/tal-tech/go-stash/blob/master/stash/stash.go

其實 啟動 整個流程中,其實就是一個組合模式:

func main() {
	// 解析命令行參數(shù),啟動優(yōu)雅退出
	...
  // service 組合模式
	group := service.NewServiceGroup()
	defer group.Stop()

	for _, processor := range c.Clusters {
		// 連接es
    ...
		// filter processors 構(gòu)建
    ...
    // 準(zhǔn)備es的寫入操作 {寫入的index, 寫入器writer}
		handle := handler.NewHandler(writer, indexer)
		handle.AddFilters(filters...)
		handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
    // 按照配置啟動kafka,并將消費操作傳入,同時加入組合器
		for _, k := range toKqConf(processor.Input.Kafka) {
			group.Add(kq.MustNewQueue(k, handle))
		}
	}
	// 啟動這個組合器
	group.Start()
}

整個數(shù)據(jù)流,就和這個 group 組合器有關(guān)了。

group.Start()
	|- group.doStart()
		|- [service.Start() for service in group.services]

那么說明加入 groupservice 都是實現(xiàn) Start()。也就是說 kafka 端的啟動邏輯在 Start()

func (q *kafkaQueue) Start() {
	q.startConsumers()
	q.startProducers()

	q.producerRoutines.Wait()
	close(q.channel)
	q.consumerRoutines.Wait()
}
  1. 啟動 kafka 消費程序

  2. 啟動 kafka 消費拉取端【可能會被名字迷惑,實際上是從 kafka 拉取消息到 q.channel

  3. 消費程序終止,收尾工作

而我們傳入 kafka 中的 handler,上文說過其實是 Consume,而這個方法就是在 q.startConsumers() 中執(zhí)行的:

q.startConsumers()
	|- [q.consumeOne(key, value) for msg in q.channel]
		|- q.handler.Consume(key, value)

這樣整個數(shù)據(jù)流就徹底串起來了:

什么是go-stash組件

到此,相信大家對“什么是go-stash組件”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

新聞標(biāo)題:什么是go-stash組件
標(biāo)題鏈接:http://aaarwkj.com/article8/pegjip.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、網(wǎng)站設(shè)計定制開發(fā)、品牌網(wǎng)站制作、面包屑導(dǎo)航、品牌網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
亚洲欧美国产日韩另类| 亚洲精品国产中文字幕| 在线播放国内自拍情侣酒店| 一区二区亚洲欧美精品| 国产精品久久久天美传媒| 日韩高清在线不卡视频| 久久国产福利一区二区| 中文字幕五月婷婷免费| 国产精品一区二区久久蜜桃麻豆| 少妇高潮喷水下面的水| 日韩精品大全一区二区| 日韩精品第一区第二区| 97色伦综合在线欧美| 国产福利在线观看午夜| 蜜桃av噜噜一区二区三| 色吊最新在线视频免费观看| 国产三级国产精品国产国在线观看| 日韩av亚洲在线观看| 欧美日韩综合精品无人区| 亚洲欧美日韩成人在线| 国产三级视频在线2022| 宅男视频在线观看视频| 国产亚洲精品第一最新| 成人午夜欧美熟妇小视频| 日本亚洲欧美男人的天堂| 欧美人妻精品一区二区| 一区二区三区在线观看美女视频 | 精品欧美一区二区在线| 日韩免费毛片在线观看| 日本高清免费观看一区| 亚洲一区二区三区日韩欧美| 日韩av一区二区人妻| 992免费影院 在线观看| 加勒比久久精品网址系列| 亚洲国产欧美日韩激情在线| 韩国av网址在线观看| av资源天堂第一区第二区第三区| 美女张开腿让男人插进去| 久久精品资源综合网| 日韩精品一区高清视频| 国产v精品欧美精品v日韩|