trait TimerTask extends Runnable {
  // 通常是request.timeout.ms參數值
  // timestamp in millisecond
  val delayMs: Long
  // 每個TimerTask實例關聯一個TimerTaskEntry
  // 就是説每個定時任務需要知道它在哪個Bucket鏈表下的哪個鏈表元素上
  private[this] var timerTaskEntry: TimerTaskEntry = null
  // 取消定時任務,原理就是將關聯的timerTaskEntry置空
  def cancel(): Unit = {
    synchronized {
      if (timerTaskEntry != null) timerTaskEntry.remove()
      timerTaskEntry = null
    }
  }

  private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
    synchronized {
      // if this timerTask is already held by an existing timer task entry,
      // we will remove such an entry first.
      if (timerTaskEntry != null && timerTaskEntry != entry)
        timerTaskEntry.remove()

      timerTaskEntry = entry
    }
  }

  private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
    timerTaskEntry
  }

}


private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
  // 綁定的Bucket鏈表實例
  @volatile
  var list: TimerTaskList = null
  var next: TimerTaskEntry = null
  var prev: TimerTaskEntry = null

  // if this timerTask is already held by an existing timer task entry,
  // setTimerTaskEntry will remove it.
  if (timerTask != null) timerTask.setTimerTaskEntry(this)

  def cancelled: Boolean = {
    timerTask.getTimerTaskEntry != this
  }

  def remove(): Unit = {
    var currentList = list
    // If remove is called when another thread is moving the entry from a task entry list to another,
    // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
    // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
    while (currentList != null) {
      // 在移除的代碼裏面包含了邏輯  timerTaskEntry.list = null 如果移除成功 list就為空了
      currentList.remove(this)
      currentList = list
    }
  }

  override def compare(that: TimerTaskEntry): Int = {
    java.lang.Long.compare(expirationMs, that.expirationMs)
  }
}

這段代碼是 Apache Kafka 中 分層時間輪(Hierarchical Timing Wheels) 調度系統的核心數據結構,用於高效管理大量延遲任務。我將從設計思想、數據結構關係、併發安全機制和整體架構四個方面詳細解析。


一、整體架構:為什麼需要這種設計?

Kafka 有大量延遲操作(如 DelayedProduceDelayedFetch),傳統定時器(TimerScheduledExecutorService)在高併發下性能不佳:

  • 時間複雜度 O(log n)
  • 大量短時任務創建銷燬開銷大
  • 不適合百萬級任務管理

分層時間輪 通過空間換時間,實現 O(1) 插入/刪除,適合 Kafka 的高吞吐場景。


二、數據結構關係:雙向綁定設計

1:1

1:n

雙向鏈表

雙向鏈表

TimerTask

TimerTaskEntry

TimerTaskList

TimerTaskEntry

TimerTaskEntry

1. TimerTask:任務抽象

trait TimerTask extends Runnable {
  val delayMs: Long  // 延遲時長
  private[this] var timerTaskEntry: TimerTaskEntry = null  // 反向引用
}
  • 職責:定義要執行的業務邏輯(run()
  • 反向引用timerTaskEntry 指向自己在時間輪中的節點
  • 取消機制:通過 timerTaskEntry.remove() 從時間輪中移除

2. TimerTaskEntry:時間輪中的節點

class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) {
  @volatile var list: TimerTaskList = null  // 所屬鏈表
  var next: TimerTaskEntry = null           // 雙向鏈表指針
  var prev: TimerTaskEntry = null
}
  • 職責:在時間輪中作為鏈表節點存在
  • 絕對時間expirationMs 而非 delayMs(便於排序)
  • 鏈表管理:維護在 TimerTaskList 中的位置

三、雙向綁定機制:防止狀態不一致

綁定邏輯(構造時)

// TimerTaskEntry 構造器中
if (timerTask != null) timerTask.setTimerTaskEntry(this)

綁定/解綁保護

private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
  synchronized {
    if (timerTaskEntry != null && timerTaskEntry != entry)
      timerTaskEntry.remove()  // 舊節點移除
    timerTaskEntry = entry
  }
}

設計亮點:保證 TimerTaskTimerTaskEntry 一對一關係,防止任務被多個節點引用。

取消檢測機制

def cancelled: Boolean = {
  timerTask.getTimerTaskEntry != this  // 通過反向引用一致性判斷
}

巧妙之處:當任務被取消時,timerTaskEntry 被置為 null,其他線程可通過此方法檢測到取消狀態。


四、併發安全:remove() 的精妙設計

核心問題

在分層時間輪中,當低層桶過期時,任務會被批量遷移到高層桶。此時可能出現:

  • 線程 A 正在從 list1 移除 entry
  • 線程 B 正在將 entrylist1 移到 list2
  • 線程 C 正在調用 entry.cancel()

解決方案

def remove(): Unit = {
  var currentList = list
  while (currentList != null) {
    currentList.remove(this)  // 從當前鏈表移除
    currentList = list        // 重新讀取,可能已被其他線程修改
  }
}

volatile 語義:保證 list 的可見性
循環重試:直到 list == null(表示已完全脱離所有鏈表)


五、排序與比較:支持優先隊列

extends Ordered[TimerTaskEntry] {
  override def compare(that: TimerTaskEntry): Int = {
    java.lang.Long.compare(expirationMs, that.expirationMs)
  }
}
  • 用途:在時間輪過期檢查時,可按到期時間排序處理
  • 性能:避免額外排序開銷

六、使用場景示例

DelayedOperation 為例:

class DelayedFetch(...) extends DelayedOperation(delayMs) {
  override def run(): Unit = {
    if (forceComplete()) onExpiration()
  }
}

完整流程

  1. 創建 DelayedFetch → 實現 TimerTask
  2. 調用 timer.add(delayedFetch)
  3. 創建 TimerTaskEntry(delayedFetch, currentTime + delayMs)
  4. 插入到對應時間輪的 TimerTaskList
  5. 時間輪線程到期時取出並執行 entry.timerTask.run()

七、設計哲學總結

設計要素

説明

解耦

任務邏輯(TimerTask)與調度結構(TimerTaskEntry)分離

一致性

雙向綁定 + 同步保護,防止狀態不一致

併發安全

volatile + 循環重試,處理併發移動場景

性能

O(1) 插入/刪除,適合高頻操作

健壯性

取消檢測 + 防重複執行


八、類比理解

想象一個醫院預約系統

  • TimerTask
  • TimerTaskEntry
  • TimerTaskList
  • cancel()
  • 時間輪推進 = 到時間了,護士叫號執行診療

remove() 的循環機制就像:

“患者可能從 A 區被轉移到 B 區,工作人員要確保把他的號牌從所有區域都移除乾淨。”


這套設計是 Kafka 高性能延遲任務調度的基石,通過精妙的數據結構和併發控制,實現了對百萬級延遲任務的高效管理。