今天就跟大家聊聊有關(guān)如何解析client-go中workqueue,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡(jiǎn)單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名申請(qǐng)、虛擬主機(jī)、營(yíng)銷軟件、網(wǎng)站建設(shè)、皇姑網(wǎng)站維護(hù)、網(wǎng)站推廣。
下面主要講述下client-go中workqueue, 看一下client-go的一個(gè)整體數(shù)據(jù)走向.如下圖:
而workqueue主要是在listener這里引用,listener使用chan獲取到數(shù)據(jù)之后將數(shù)據(jù)放入到工作隊(duì)列進(jìn)行處理。主要是由于chan過于簡(jiǎn)單,已經(jīng)無法滿足K8S的場(chǎng)景,所以衍生出了workqueue,
特性
有序
去重
并發(fā)
延遲處理
限速
當(dāng)前有三種workqueue
基本隊(duì)列
延遲隊(duì)列
限速隊(duì)列
其中延遲隊(duì)列是基于基本隊(duì)列實(shí)現(xiàn)的,而限流隊(duì)列基于延遲隊(duì)列實(shí)現(xiàn)
基本隊(duì)列
看一下基本隊(duì)列的接口
// client-go源碼路徑util/workqueue/queue.go type Interface interface { //新增元素 可以是任意對(duì)象 Add(item interface{}) //獲取當(dāng)前隊(duì)列的長(zhǎng)度 Len() int // 阻塞獲取頭部元素(先入先出) 返回元素以及隊(duì)列是否關(guān)閉 Get() (item interface{}, shutdown bool) // 顯示標(biāo)記完成元素的處理 Done(item interface{}) //關(guān)閉隊(duì)列 ShutDown() //隊(duì)列是否處于關(guān)閉狀態(tài) ShuttingDown() bool }
看一下基本隊(duì)列的數(shù)據(jù)結(jié)構(gòu),只看三個(gè)重點(diǎn)處理的,其他的沒有展示出來
type Type struct { //含有所有元素的元素的隊(duì)列 保證有序 queue []t //所有需要處理的元素 set是基于map以value為空struct實(shí)現(xiàn)的結(jié)構(gòu),保證去重 dirty set //當(dāng)前正在處理中的元素 processing set ... } type empty struct{} type t interface{} type set map[t]empty
基本隊(duì)列的hello world也很簡(jiǎn)單
wq := workqueue.New() wq.Add("hello") v, _ := wq.Get()
基本隊(duì)列Add
func (q *Type) Add(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() //如果當(dāng)前處于關(guān)閉狀態(tài),則不再新增元素 if q.shuttingDown { return } //如果元素已經(jīng)在等待處理中,則不再新增 if q.dirty.has(item) { return } //添加到metrics q.metrics.add(item) //加入等待處理中 q.dirty.insert(item) //如果目前正在處理該元素 就不將元素添加到隊(duì)列 if q.processing.has(item) { return } q.queue = append(q.queue, item) q.cond.Signal() }
基本隊(duì)列Get
func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() //如果當(dāng)前沒有元素并且不處于關(guān)閉狀態(tài),則阻塞 for len(q.queue) == 0 && !q.shuttingDown { q.cond.Wait() } ... item, q.queue = q.queue[0], q.queue[1:] q.metrics.get(item) //把元素添加到正在處理隊(duì)列中 q.processing.insert(item) //把隊(duì)列從等待處理隊(duì)列中刪除 q.dirty.delete(item) return item, false }
基本隊(duì)列實(shí)例化
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type { t := &Type{ clock: c, dirty: set{}, processing: set{}, cond: sync.NewCond(&sync.Mutex{}), metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod, } //啟動(dòng)一個(gè)協(xié)程 定時(shí)更新metrics go t.updateUnfinishedWorkLoop() return t } func (q *Type) updateUnfinishedWorkLoop() { t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) defer t.Stop() for range t.C() { if !func() bool { q.cond.L.Lock() defer q.cond.L.Unlock() if !q.shuttingDown { q.metrics.updateUnfinishedWork() return true } return false }() { return } } }
延遲隊(duì)列
延遲隊(duì)列的實(shí)現(xiàn)思路主要是使用優(yōu)先隊(duì)列存放需要延遲添加的元素,每次判斷最小延遲的元素書否已經(jīng)達(dá)到了加入隊(duì)列的要求(延遲的時(shí)間到了),如果是則判斷下一個(gè)元素,直到?jīng)]有元素或者元素還需要延遲為止。
看一下延遲隊(duì)列的數(shù)據(jù)結(jié)構(gòu)
type delayingType struct { Interface ... //放置延遲添加的元素 waitingForAddCh chan *waitFor ... }
主要是使用chan來保存延遲添加的元素,而具體實(shí)現(xiàn)是通過一個(gè)實(shí)現(xiàn)了一個(gè)AddAfter方法,看一下具體的內(nèi)容
//延遲隊(duì)列的接口 type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration) } func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { ... //如果延遲實(shí)現(xiàn)小于等于0 直接添加到隊(duì)列 if duration <= 0 { q.Add(item) return } select { case <-q.stopCh: //添加到chan,下面會(huì)講一下這個(gè)chan的處理 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: } }
延遲元素的處理
func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() never := make(<-chan time.Time) var nextReadyAtTimer clock.Timer waitingForQueue := &waitForPriorityQueue{} //這里是初始化一個(gè)優(yōu)先隊(duì)列 具體實(shí)現(xiàn)有興趣的同學(xué)可以研究下 heap.Init(waitingForQueue) waitingEntryByData := map[t]*waitFor{} for { if q.Interface.ShuttingDown() { return } now := q.clock.Now() // Add ready entries for waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) //看一下第一個(gè)元素是否已經(jīng)到達(dá)延遲的時(shí)間了 if entry.readyAt.After(now) { break } //時(shí)間到了,將元素添加到工作的隊(duì)列,并且從延遲的元素中移除 entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) delete(waitingEntryByData, entry.data) } // Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never if waitingForQueue.Len() > 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } //如果還有需要延遲的元素,計(jì)算第一個(gè)元素的延遲時(shí)間(最小延遲的元素) entry := waitingForQueue.Peek().(*waitFor) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() } select { case <-q.stopCh: return case <-q.heartbeat.C(): //定時(shí)檢查下是否有元素達(dá)到延遲的時(shí)間 case <-nextReadyAt: //這里是上面計(jì)算出來的時(shí)間,時(shí)間到了,處理到達(dá)延遲時(shí)間的元素 case waitEntry := <-q.waitingForAddCh: //檢查是否需要延遲,如果需要延遲就加入到延遲等待 if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { //如果不需要延遲就直接添加到隊(duì)列 q.Add(waitEntry.data) } drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh:
上面waitingLoop 是在實(shí)例化延遲隊(duì)列的時(shí)候調(diào)用的,看一下實(shí)例化時(shí)候的邏輯
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { //實(shí)例化一個(gè)數(shù)據(jù)結(jié)構(gòu) ret := &delayingType{ Interface: NewNamed(name), clock: clock, heartbeat: clock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(name), } //放到一個(gè)協(xié)程中處理延遲元素 go ret.waitingLoop() return ret }
限速隊(duì)列
當(dāng)前限速隊(duì)列支持4中限速模式
令牌桶算法限速
排隊(duì)指數(shù)限速
計(jì)數(shù)器模式
混合模式(多種限速算法同時(shí)使用)
限速隊(duì)列的底層實(shí)際上還是通過延遲隊(duì)列來進(jìn)行限速,通過計(jì)算出元素的限速時(shí)間作為延遲時(shí)間
來看一下限速接口
type RateLimiter interface { // When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int }
看一下限速隊(duì)列的數(shù)據(jù)結(jié)構(gòu)
// RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { DelayingInterface //實(shí)際上底層還是調(diào)用的延遲隊(duì)列,通過計(jì)算出元素的延遲時(shí)間 進(jìn)行限速 AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int } func (q *rateLimitingType) AddRateLimited(item interface{}) { //通過when方法計(jì)算延遲加入隊(duì)列的時(shí)間 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item)) }
令牌桶算法
client-go中的令牌桶限速是通過 golang.org/x/time/rat包來實(shí)現(xiàn)的
可以通過 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int) 來使用令牌桶限速算法,其中第一個(gè)參數(shù)qps表示每秒補(bǔ)充多少token,burst表示總token上限為多少。
排隊(duì)指數(shù)算法
排隊(duì)指數(shù)可以通過 workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) 來使用。
這個(gè)算法有兩個(gè)參數(shù):
baseDelay 基礎(chǔ)限速時(shí)間
maxDelay 最大限速時(shí)間
舉個(gè)例子來理解一下這個(gè)算法,例如快速插入5個(gè)相同元素,baseDelay設(shè)置為1秒,maxDelay設(shè)置為10秒,都在同一個(gè)限速期內(nèi)。第一個(gè)元素會(huì)在1秒后加入到隊(duì)列,第二個(gè)元素會(huì)在2秒后加入到隊(duì)列,第三個(gè)元素會(huì)在4秒后加入到隊(duì)列,第四個(gè)元素會(huì)在8秒后加入到隊(duì)列,第五個(gè)元素會(huì)在10秒后加入到隊(duì)列(指數(shù)計(jì)算的結(jié)果為16,但是最大值設(shè)置了10秒)。
來看一下源碼的計(jì)算
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() //第一次為0 exp := r.failures[item] //累加1 r.failures[item] = r.failures[item] + 1 //通過當(dāng)前計(jì)數(shù)和baseDelay計(jì)算指數(shù)結(jié)果 baseDelay*(2的exp次方) backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) if backoff > math.MaxInt64 { return r.maxDelay } calculated := time.Duration(backoff) if calculated > r.maxDelay { return r.maxDelay } return calculated }
計(jì)數(shù)器模式
計(jì)數(shù)器模式可以通過 workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)來使用,有三個(gè)參數(shù)
fastDelay 快限速時(shí)間
slowDelay 慢限速時(shí)間
maxFastAttempts 快限速元素個(gè)數(shù)
原理是這樣的,假設(shè)fastDelay設(shè)置為1秒,slowDelay設(shè)置為10秒,maxFastAttempts設(shè)置為3,同樣在一個(gè)限速周期內(nèi)快速插入5個(gè)相同的元素。前三個(gè)元素都是以1秒的限速時(shí)間加入到隊(duì)列,添加第四個(gè)元素時(shí)開始使用slowDelay限速時(shí)間,也就是10秒后加入到隊(duì)列,后面的元素都將以10秒的限速時(shí)間加入到隊(duì)列,直到限速周期結(jié)束。
來看一下源碼
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() //添加一次就計(jì)數(shù)一次 r.failures[item] = r.failures[item] + 1 //計(jì)數(shù)小于maxFastAttempts都以fastDelay為限速時(shí)間,否則以slowDelay為限速時(shí)間 if r.failures[item] <= r.maxFastAttempts { return r.fastDelay } return r.slowDelay }
混合模式
最后一種是混合模式,可以組合使用不同的限速算法實(shí)例化限速隊(duì)列
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { return &MaxOfRateLimiter{limiters: limiters} }
在k8s-client-go的源碼中可以看到,大量的接口組合運(yùn)用,將各種功能拆分成各個(gè)細(xì)小的庫,是一種非常值得學(xué)習(xí)的代碼風(fēng)格以及思路。
看完上述內(nèi)容,你們對(duì)如何解析client-go中workqueue有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
當(dāng)前標(biāo)題:如何解析client-go中workqueue
標(biāo)題URL:http://aaarwkj.com/article12/igocdc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、外貿(mào)建站、靜態(tài)網(wǎng)站、網(wǎng)站營(yíng)銷、面包屑導(dǎo)航、外貿(mào)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)