trait TimerTask extends Runnable {
// 通常是request.timeout.ms參數值
// timestamp in millisecond
val delayMs: Long
// 每個TimerTask實例關聯一個TimerTaskEntry
// 就是説每個定時任務需要知道它在哪個Bucket鏈表下的哪個鏈表元素上
private[this] var timerTaskEntry: TimerTaskEntry = null
// 取消定時任務,原理就是將關聯的timerTaskEntry置空
def cancel(): Unit = {
synchronized {
if (timerTaskEntry != null) timerTaskEntry.remove()
timerTaskEntry = null
}
}
private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
synchronized {
// if this timerTask is already held by an existing timer task entry,
// we will remove such an entry first.
if (timerTaskEntry != null && timerTaskEntry != entry)
timerTaskEntry.remove()
timerTaskEntry = entry
}
}
private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
timerTaskEntry
}
}
private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
// 綁定的Bucket鏈表實例
@volatile
var list: TimerTaskList = null
var next: TimerTaskEntry = null
var prev: TimerTaskEntry = null
// if this timerTask is already held by an existing timer task entry,
// setTimerTaskEntry will remove it.
if (timerTask != null) timerTask.setTimerTaskEntry(this)
def cancelled: Boolean = {
timerTask.getTimerTaskEntry != this
}
def remove(): Unit = {
var currentList = list
// If remove is called when another thread is moving the entry from a task entry list to another,
// this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
// In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
while (currentList != null) {
// 在移除的代碼裏面包含了邏輯 timerTaskEntry.list = null 如果移除成功 list就為空了
currentList.remove(this)
currentList = list
}
}
override def compare(that: TimerTaskEntry): Int = {
java.lang.Long.compare(expirationMs, that.expirationMs)
}
}
這段代碼是 Apache Kafka 中 分層時間輪(Hierarchical Timing Wheels) 調度系統的核心數據結構,用於高效管理大量延遲任務。我將從設計思想、數據結構關係、併發安全機制和整體架構四個方面詳細解析。
一、整體架構:為什麼需要這種設計?
Kafka 有大量延遲操作(如 DelayedProduce、DelayedFetch),傳統定時器(Timer、ScheduledExecutorService)在高併發下性能不佳:
- 時間複雜度 O(log n)
- 大量短時任務創建銷燬開銷大
- 不適合百萬級任務管理
分層時間輪 通過空間換時間,實現 O(1) 插入/刪除,適合 Kafka 的高吞吐場景。
二、數據結構關係:雙向綁定設計
1:1
1:n
雙向鏈表
雙向鏈表
TimerTask
TimerTaskEntry
TimerTaskList
TimerTaskEntry
TimerTaskEntry
1. TimerTask:任務抽象
trait TimerTask extends Runnable {
val delayMs: Long // 延遲時長
private[this] var timerTaskEntry: TimerTaskEntry = null // 反向引用
}
- 職責:定義要執行的業務邏輯(
run()) - 反向引用:
timerTaskEntry指向自己在時間輪中的節點 - 取消機制:通過
timerTaskEntry.remove()從時間輪中移除
2. TimerTaskEntry:時間輪中的節點
class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) {
@volatile var list: TimerTaskList = null // 所屬鏈表
var next: TimerTaskEntry = null // 雙向鏈表指針
var prev: TimerTaskEntry = null
}
- 職責:在時間輪中作為鏈表節點存在
- 絕對時間:
expirationMs而非delayMs(便於排序) - 鏈表管理:維護在
TimerTaskList中的位置
三、雙向綁定機制:防止狀態不一致
綁定邏輯(構造時)
// TimerTaskEntry 構造器中
if (timerTask != null) timerTask.setTimerTaskEntry(this)
綁定/解綁保護
private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
synchronized {
if (timerTaskEntry != null && timerTaskEntry != entry)
timerTaskEntry.remove() // 舊節點移除
timerTaskEntry = entry
}
}
設計亮點:保證
TimerTask與TimerTaskEntry一對一關係,防止任務被多個節點引用。
取消檢測機制
def cancelled: Boolean = {
timerTask.getTimerTaskEntry != this // 通過反向引用一致性判斷
}
巧妙之處:當任務被取消時,
timerTaskEntry被置為null,其他線程可通過此方法檢測到取消狀態。
四、併發安全:remove() 的精妙設計
核心問題
在分層時間輪中,當低層桶過期時,任務會被批量遷移到高層桶。此時可能出現:
- 線程 A 正在從
list1移除entry - 線程 B 正在將
entry從list1移到list2 - 線程 C 正在調用
entry.cancel()
解決方案
def remove(): Unit = {
var currentList = list
while (currentList != null) {
currentList.remove(this) // 從當前鏈表移除
currentList = list // 重新讀取,可能已被其他線程修改
}
}
volatile 語義:保證
list的可見性
循環重試:直到list == null(表示已完全脱離所有鏈表)
五、排序與比較:支持優先隊列
extends Ordered[TimerTaskEntry] {
override def compare(that: TimerTaskEntry): Int = {
java.lang.Long.compare(expirationMs, that.expirationMs)
}
}
- 用途:在時間輪過期檢查時,可按到期時間排序處理
- 性能:避免額外排序開銷
六、使用場景示例
以 DelayedOperation 為例:
class DelayedFetch(...) extends DelayedOperation(delayMs) {
override def run(): Unit = {
if (forceComplete()) onExpiration()
}
}
完整流程:
- 創建
DelayedFetch→ 實現TimerTask - 調用
timer.add(delayedFetch) - 創建
TimerTaskEntry(delayedFetch, currentTime + delayMs) - 插入到對應時間輪的
TimerTaskList中 - 時間輪線程到期時取出並執行
entry.timerTask.run()
七、設計哲學總結
|
設計要素
|
説明
|
|
解耦 |
任務邏輯( |
|
一致性 |
雙向綁定 + 同步保護,防止狀態不一致
|
|
併發安全 |
volatile + 循環重試,處理併發移動場景
|
|
性能 |
O(1) 插入/刪除,適合高頻操作
|
|
健壯性 |
取消檢測 + 防重複執行
|
八、類比理解
想象一個醫院預約系統:
TimerTaskTimerTaskEntryTimerTaskListcancel()- 時間輪推進 = 到時間了,護士叫號執行診療
而 remove() 的循環機制就像:
“患者可能從 A 區被轉移到 B 區,工作人員要確保把他的號牌從所有區域都移除乾淨。”
這套設計是 Kafka 高性能延遲任務調度的基石,通過精妙的數據結構和併發控制,實現了對百萬級延遲任務的高效管理。