計時器對于故障恢復(fù)、基于速率的流量控制、調(diào)度算法、控制網(wǎng)絡(luò)中的數(shù)據(jù)包生命周期至關(guān)重要。
而一般計時器的實現(xiàn)維護(hù)成本比較高,比如JDK自帶的 Timer、DelayQueue對于任務(wù)的進(jìn)出其時間復(fù)雜度為O(logN)。
對于要求高性能且需要保證高頻繁大量操作任務(wù)的優(yōu)先級框架,比如Kafka、Netty等框架,重排序的時間復(fù)雜度O(logN)是不能滿足其要求的。而基于一種時間輪的算法可以實現(xiàn)將這種重排序的時間復(fù)雜度降為O(1)。
2. 什么是時間輪算法?算法來自于生活,我們?nèi)粘?磿r間使用手表,一個表盤就可以無限的去循環(huán)每一天,通過同樣的一個表盤不同的指針來指向不同維度的時間(時分秒),日常中如果我們由大量任務(wù)需要進(jìn)行提醒,可以進(jìn)行備忘與時鐘里的時間進(jìn)行指定按時提醒。
同樣的時間輪算法數(shù)據(jù)結(jié)構(gòu)其實抽象于手表時鐘,時間輪是用環(huán)形數(shù)組抽象表盤,數(shù)組里面的每一個元素就是一個bucket(刻度之間的間隔,也可以指代時間的精度)。bucket內(nèi)部用雙向鏈表存這待執(zhí)行的任務(wù),此時添加和刪除的鏈表操作時間復(fù)雜度都是o(1)。
2.1 單層時間輪從圖中可以看到此時指針指向的是第一個bucket,一共有八個bucket0~7,假設(shè)bucket的時間單位為 1 秒,現(xiàn)在要加入一個延時 6秒的任務(wù),計算方式就是 6 % 8 = 6,即放在下標(biāo)為6 的那個bucket中,具體的操作只要直接添加到bucket雙向鏈表的tail尾部就行了。
2.2 多層時間輪比如當(dāng)我們加一個延遲9s后執(zhí)行的任務(wù),此時超出表盤的范圍時,如何解決呢?同樣借鑒于表盤中循環(huán)以及多個指針代表不同時間維度的思想,時間輪算法有兩種解決方案。
2.2.1 增加輪次的概念延遲9s任務(wù)存放的bucket 下標(biāo) = 9%8 = 1,輪數(shù)記為 9/8 = 1。
意思就是當(dāng)循環(huán)1輪后,指針指向下表為1的bucket就會觸發(fā)這個任務(wù)。Netty 中的 HashedWheelTimer 使用的就是這種方式。
這種概念,就和我們手表里的時分秒不同指針代表不同維度時間概念一樣了,只不過這里是分層的設(shè)計。
實現(xiàn)的方式如同手表里的指針轉(zhuǎn)動,當(dāng)秒針走一圈,分針走一格,分針走一圈,時針走一格。
這里三層的時間輪,一共是38=24格bucket, 最多可以延遲88*8=512秒。
多層時間輪,任務(wù)存放的位置還會隨著時間進(jìn)行降層變動,比如一個延遲65秒的任務(wù),剛才是放在第三層,時間過了1s后,此時只需要64s就會執(zhí)行,那么這個任務(wù)就會被降層到第二層,隨著時間的不斷的進(jìn)行,這個任務(wù)最終會降層到第一層等待執(zhí)行。
為什么要進(jìn)行降層操作呢? 這是為了保證時間精度的一致性,Kakfa內(nèi)部用的就是多層次時間輪算法。
2.3 小結(jié)時間輪是一種實現(xiàn)延遲功能(定時器)的高效調(diào)度模型算法。其設(shè)計思想類似于手表時鐘的設(shè)計,主要的數(shù)據(jù)結(jié)構(gòu)為數(shù)組+鏈表,多層時間輪有兩種實現(xiàn)方案,一種是輪次時間輪,一種是多層時間輪方案。
3. 實現(xiàn)案例 3.1 Kafka中的時間輪Kafka的時間輪(TimingWheel)是一個存儲定時任務(wù)的環(huán)形隊列,底層采用數(shù)組實現(xiàn),數(shù)組中的每個元素可以存放一個定時任務(wù)鏈表(TimerTaskList),或者稱之為任務(wù)槽。TimerTaskList是一個環(huán)形的雙向鏈表,鏈表中的每一項表示的均是定時任務(wù)(TimerTaskEntry),其中封裝了真正的定時任務(wù)(TimerTask)。
時間輪由多個時間格組成, 每個時間格代表當(dāng)前時間輪的基本時間跨度(tickMs) 。時間輪的時間格個數(shù)是固定的,可用wheelSize來表示,那么整個時間輪的總體時間跨度(interval)可以通過公式tickMs × wheelSize計算得出。時間輪還有一個表盤指針(currentTime),用來表示時間輪當(dāng)前所處的時間,currentTime是tickMs的整數(shù)倍。currentTime可以將整個時間輪劃分為到期部分和未到期部分,currentTime當(dāng)前指向的時間格也屬于到期部分,表示剛好到期,需要處理此時間格所對應(yīng)的TimerTaskList中的所有任務(wù)。
3.1.1 任務(wù)的添加// TimerTaskEntry的就是包裝了任務(wù),并且記錄任務(wù)的執(zhí)行時間 = 延時+當(dāng)前時間
def add(timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) { // Cancelled
false
} else if (expiration< currentTime + tickMs) {// 如果到期
// Already expired
false
} else if (expiration< currentTime + interval) {// 如果還在本層
// Put in its own bucket
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt) // 計算bucket
bucket.add(timerTaskEntry) // 添加到bucket中的雙向鏈表中
// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {// 更新bucket過期時間
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket) // 將bucket加入delayQueue
}
true
} else { // Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
從上面的 add 方法我們知道每次對比都是根據(jù)expiration< currentTime + interval 來進(jìn)行對比的,那currentTime 如何進(jìn)行推進(jìn)的呢?
3.1.2 時間輪的推進(jìn)Netty 中是通過固定的時間間隔掃描,時候未到就等待來進(jìn)行時間輪的推動。
而 Kafka 就利用了空間換時間的思想,通過 DelayQueue,來保存每個槽,通過每個槽的過期時間排序。這樣擁有最早需要執(zhí)行任務(wù)的槽會有優(yōu)先獲取。如果時候未到,那么 delayQueue.poll 就會阻塞著,這樣就不會有空推進(jìn)的情況發(fā)送。
我們先看下SystemTimer構(gòu)造器如下:
@threadsafe
class SystemTimer(executorName: String,
tickMs: Long = 1,
wheelSize: Int = 20,
startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1,
(runnable: Runnable) =>KafkaThread.nonDaemon("executor-" + executorName, runnable))
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs,
wheelSize = wheelSize,
startMs = startMs,
taskCounter = taskCounter,
delayQueue
)
其中SystemTimer.advanceClock即為推進(jìn)的方法
Kafka 用了多層次時間輪來實現(xiàn),并且是按需創(chuàng)建時間輪,采用任務(wù)的絕對時間來判斷延期,并且對于每個bucket槽(槽內(nèi)存放的也是任務(wù)的雙向鏈表)都會維護(hù)一個過期時間,利用 DelayQueue 來對每個槽的過期時間排序,來進(jìn)行時間的推進(jìn),防止空推進(jìn)的存在。
每次推進(jìn)都會更新 currentTime 為當(dāng)前時間戳,當(dāng)然做了點微調(diào)使得 currentTime 是 tickMs 的整數(shù)倍。并且每次推進(jìn)都會把能降級的任務(wù)重新插入降級。
可以看到這里的 DelayQueue 的元素是每個槽,而不是任務(wù),因此數(shù)量就少很多了,這應(yīng)該是權(quán)衡了對于槽操作的延時隊列的時間復(fù)雜度與空推進(jìn)的影響。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
分享名稱:計時器TimingWheel時間輪算法-創(chuàng)新互聯(lián)
本文地址:http://aaarwkj.com/article0/ideio.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供標(biāo)簽優(yōu)化、全網(wǎng)營銷推廣、網(wǎng)站建設(shè)、網(wǎng)站排名、網(wǎng)站收錄、做網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容