博客 / 詳情

返回

TDMQ RocketMQ 版秒級定時消息原理解析

導語

隨着分佈式系統架構的普及,消息隊列已成為支撐大規模、高併發在線業務的核心組件之一。騰訊雲消息隊列 RocketMQ 版作為一款高性能、高可靠的消息中間件,通過提供穩定、低延遲的消息服務,幫助企業輕鬆應對業務洪峯、實現系統解耦。

最初的消息隊列只支持簡單的在線消息收發,但隨着業務場景的豐富,越來越多的需求涌現,例如訂單超時處理、輕量級延時任務調度、定時通知推送等場景,這些場景都要求消息能夠延遲消費。為此,消息隊列技術不斷演進,最初 Apache RocketMQ 實現了多級別延時消息,但是無法滿足更靈活的延時場景,因此演進為文件版定時消息時間輪,到騰訊雲消息隊列 RocketMQ 版也推出 RocksDB 版本定時消息多級時間輪,最終實現了高吞吐場景下定時消息場景的精準調度。

本文將帶您深入探索 TDMQ RocketMQ 版秒級定時消息的實現原理:

  • 首先從典型業務場景切入,看看定時消息在分佈式定時調度、電商等場景的應用;
  • 接着回顧定時消息的技術演進歷程,瞭解定時消息如何從基礎延時功能發展為高精度調度系統;
  • 最後,深入核心架構設計,解析定時消息技術原理,並介紹騰訊雲基於 RocksDB 版本定時消息多級時間輪的優化創新,揭秘秒級定時投遞的底層邏輯。

概念與應用場景

定時消息,顧名思義就是當用户將消息封裝定時屬性後發送到 MQ,MQ 會在指定時間後將消息暴露給消費者消費,而在未達到指定時間期間消息對消費者是不可見的。這種特性在分佈式系統中具有廣泛的應用價值,以下是兩個典型場景:

1、大量分佈式定時任務調度

image.png

在分佈式定時調度場景下,需要實現各類精度的定時任務,例如每天5點執行文件清理、每隔2分鐘觸發一次消息推送等需求。傳統的基於數據庫的定時調度方案實現複雜,且在高併發場景下性能較差。基於 Apache RocketMQ 的定時消息可以封裝靈活的定時消息,同時使用方法比較簡單。

2、電商場景的訂單超時取消/延遲支付訂單

image.png

以電商交易場景為例,用户下單後若未及時支付,系統通常需要在一定時間後自動關閉訂單。使用 Apache RocketMQ 定時消息可以實現超時任務的檢查觸發,一方面可以定時去關閉訂單,另一方面支持定時消息的撤回,如果訂單已經支付,可以發送一條刪除屬性的消息進行 Abort。

基於定時消息的超時任務處理具備如下優勢:

  • 精度高、開發門檻低:基於消息通知方式不存在定時階梯間隔。可以輕鬆實現任意精度事件觸發,無需業務去重。
  • 高性能可擴展:傳統的數據庫掃描方式較為複雜,需要頻繁調用接口掃描,容易產生性能瓶頸。 Apache RocketMQ 的定時消息具有高併發和水平擴展的能力。

技術演進路線

術語説明

  1. 消息存儲文件 CommitLog**: 存儲原始消息的文件。
  2. 消息索引文件 ConsumeQueue: 存儲消息索引數據,定位到 CommitLog。
  3. Real Topic:用户指定消息投遞到的 Topic。
  4. 延時主題(Delay Topic)/定時主題(Timer Topic) :所有的延時/定時消息都不會直接發到 Real Topic,而是先發到定時/延時 Topic。

多級別延時消息

最開始,Apache RocketMQ 複用 RetryTopic 的多級重試投遞邏輯,採用延時隊列來實現延時消息。但是由於每一條隊列對應一個延遲時間,導致隊列數量成為瓶頸,因此最終支持多級別延時消息。以下是目前所支持的18個延時級別分別對應的延時時間。

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

技術實現原理

多級別延時消息底層基於一個延時 Topic,多條 Queue,每一條 Queue 對應一個延時級別。

image.png

// 延時消息服務啓動
public void start() {
        if (started.compareAndSet(false, true)) {      
            this.load();     
            // 創建與延時級別一樣數量的線程      
            this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));     
            // 初始化線程池等操作...      
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {             
                 // 為每個延時等級都創建定時任務       
                 if (timeDelay != null) { 
                     if (this.enableAsyncDeliver) {                        
                         this.handleExecutorService.schedule(new HandlePutResultTask(level),                 
                             FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);                    
                 }                    
                     this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level,                         offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);       
                 }            
             }      
             // 位點以及數據持久化定時任務...        、
         } 
     }
  • 當延時消息進入時會判斷延時等級屬性,放進相應的隊列尾部。
  • 通過線程池對每個隊列進行輪詢掃描,判斷頭部消息是否達到延時時間,如果達到則將消息投遞到 Real Topic,否則繼續輪詢。

注意:多級別延時消息受到消息存儲時間限制;


// 執行循環掃描
public void executeOnTimeUp() {
        ConsumeQueueInterface cq;
        // 獲取cq,如果cq為null則新建定時任務並校準下次掃描開始位點
        ....
        // 設置本次掃描開始位點
        long nextOffset = this.offset;
        try {
             while (bufferCQ.hasNext() && isStarted()) {
                    CqUnit cqUnit = bufferCQ.next();
                    long offsetPy = cqUnit.getPos();
                    int sizePy = cqUnit.getSize();
                    long tagsCode = cqUnit.getTagsCode();
                    // 判斷是否達到延時時間
                    long now = System.currentTimeMillis();
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
                    // 計算下次拉取offset
                    long currOffset = cqUnit.getQueueOffset();
                    nextOffset = currOffset + cqUnit.getBatchNum();
                    long countdown = deliverTimestamp - now;
                    if (countdown > 0) {
                        // 未到延時時間,則創建下次定時任務並return
                        this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
                        return;
                    }
 
                    MessageExt msgExt = ScheduleMessageService.this.brokerController.
                        getMessageStore().lookMessageByOffset(offsetPy, sizePy);
                    // 獲取消息內容並重新投遞
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
                    } else {
                        this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
                    }
                    //如果投遞失敗,立即創建下次任務,並設置下次任務開始掃描位點為currOffset,防止消息丟失
                 }
            } catch (Exception e) {
                ......
            } finally {
                bufferCQ.release();
            }
            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }

使用示例


// 在Producer端設置消息為延時消息
Message msg = new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
 
// 設置延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);

存在哪些痛點

  • 系統僅支持預定義的有限延遲級別,限制了延遲配置的靈活性與適應性。
  • 最大延遲時間存在硬性上限,無法滿足超長週期延遲任務的需求。
  • 延遲時間的精度控制不足,難以實現細粒度的延遲調度。

由於多級別延時受限於隊列數量,因此我們需要額外探索一種支持高精度、超長延時,自定義延時的實現,這時便出現了超長秒級定時消息。

超長秒級定時消息

超長秒級定時消息支持用户設置任意時間(社區版默認最長7天), Apache RocketMQ 社區引入文件版時間輪來實現,同時支持定時消息的撤回,在設計上通過消息的重新投遞使超長延時消息不受消息存儲時間限制。

技術實現原理

設計上的考慮:

  • 定時消息實現不侵入原本存儲邏輯,防止互相影響,通過將定時消息寫入定時消息主題,對該主題的索引文件掃描從而拿到原始消息;
  • 實現任意時間定時的要點在於知道在某一時刻需要投遞哪些消息,因此需要額外設計存儲格式,同時儘可能複用 Commitlog 消息文件存儲,通過引入定時消息索引文件,原始消息存儲在 Commitlog,同時為了能掃描出此刻的所有定時消息並且兼顧消息寫入性能,採用鏈表結構進行索引單元的鏈接,定時消息索引文件寫入直接 Append-only Log(順序寫入),保證了消息寫入的性能。
  • 為了定位第一條定時消息索引,引出時間輪結構,需要作為中間層去精準訪問定時消息索引文件。

最終社區為定時消息(在 rip-43)引入兩個存儲文件:Timelog + Timewheel。

image.png

  • TimerWheel 是時間輪的文件,表示投遞時間,它保存了2天(默認,同時保證超長定時消息不受消息存儲時間限制)內的所有時間窗。每個槽位表示一個對應的投遞時間窗,並且可以調整槽位對應的時間窗長度來控制定時的精確度。採用時間輪的好處是它可以複用,在2天之後無需新建時間輪文件,而是隻要將當前的時間輪直接覆蓋即可。
/**
 * Represents a slot of timing wheel. Format:
 * ┌────────────┬───────────┬───────────┬───────────┬───────────┐
 * │delayed time│ first pos │ last pos  │    num    │   magic   │
 * ├────────────┼───────────┼───────────┼───────────┼───────────┤
 * │   8bytes   │   8bytes  │  8bytes   │   4bytes  │   4bytes  │
 * └────────────┴───────────┴───────────┴───────────┴───────────┘
 */
  • TimerLog 是定時消息索引文件,保存定時消息的索引(在消息文件中存儲的位置),內部通過反向鏈表進行鏈接,它的寫入為 Append-only Log,保證了消息寫入的性能。
public final static int UNIT_SIZE = 4  //size
            + 8 //prev pos
            + 4 //magic value
            + 8 //curr write time, for trace
            + 4 //delayed time, for check
            + 8 //offsetPy
            + 4 //sizePy
            + 4 //hash code of real topic
            + 8; //reserved value, just in case of

TimerWheel 中的每個槽位都可以保存一個指向 TimerLog 中某個元素的索引,TimerLog 中的元素又保存它前一個元素的索引。TimerLog 呈鏈表結構,存儲着 TimerWheel 對應槽位時間窗所要投遞的所有定時消息。

image.png

從圖中可以看出,共有五個 Service 分別處理定時消息的放置和存儲。工作流如下:

  1.  針對放置定時消息的 Service,每50ms從消息文件讀取指定主題(TIMER_TOPIC)的定時消息。

a.  TimerEnqueueGetService 從消息文件讀取得到定時主題的消息,並先將其放入 EnqueuePutQueue。

public boolean enqueue(int queueId) {
        // 判斷服務是否正常執行,獲取cq文件準備遍歷
        ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);
        // 校準位點...
        long offset = currQueueOffset;
        ReferredIterator<CqUnit> iterator = null;
        try {
            iterator = cq.iterateFrom(offset);
            if (null == iterator) {
                return false;
            }
            // 開始遍歷索引文件並從CommitLog讀取出消息體
            int i = 0;
            while (iterator.hasNext()) {
                i++;
                try {
                    CqUnit cqUnit = iterator.next();
                    // 計算消息讀取位點與消息大小
                    MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy);
                    if (null == msgExt) {
                        perfCounterTicks.getCounter("enqueue_get_miss");
                    } else {
                        // 讀取消息的相關信息進行timerRequest構造並投遞到enqueuePutQueue
                        TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy,
                            delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt);
                        while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) {
                            if (!isRunningEnqueue()) {
                                return false;
                            }
                        }
                     }
                     // 指標計算等邏輯...                    }
                } catch (Exception e) {...} finally {...}
                // if broker role changes, ignore last enqueue
                if (!isRunningEnqueue()) {
                    return false;
                }
                currQueueOffset = offset + i;
            }
            currQueueOffset = offset + i;
            return i > 0;
        } catch (Exception e) {...} finally {
            if (iterator != null) {
                iterator.release();
            }
        }
        return false;
    }

b.  另一個線程 TimerEnqueuePutService 將其執行 Timerlog-unit 構建邏輯並放入 TimerLog,更新時間輪(Timewheel)的存儲內容。


    protected void putMessageToTimerWheel(TimerRequest req) {
            try {
                // 指標計算...+如果消息已經到期直接投遞到dequeuePutQueue等待投遞到realTopic                
                if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
                    req.setEnqueueTime(Long.MAX_VALUE);
                    dequeuePutQueue.put(req);
                } else {
                    boolean doEnqueueRes = doEnqueue(
                        req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
                    req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());
                }
             } catch (Throwable t) {...}
    }
    public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) {
        // 調整deleayTime等操作...
        String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
        Slot slot = timerWheel.getSlot(delayedTime);
        // 構造timeLog unit
        ByteBuffer tmpBuffer = timerLogBuffer;
        tmpBuffer.clear();
        tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
        //...
        long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
        if (-1 != ret) {
            // 如果timelog更新成功則更新timewheel
            timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
                isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
        }
        return -1 != ret;
    }
  1.  針對取出定時消息的 Service,每50ms讀取下一秒的 Slot。有三個線程將讀取到的消息重新放回 CommitLog。

a.  首先,TimerDequeueGetService 每50ms讀一次下一秒的 Slot,從 TimerLog 中得到指定的數據,並放進 dequeueGetQueue。

public int dequeue() throws Exception {
        // 判斷消息是否正常出隊
        Slot slot = timerWheel.getSlot(currReadTimeMs);
        try {
            long currOffsetPy = slot.lastPos;
            Set<String> deleteUniqKeys = new ConcurrentSkipListSet<>();
            LinkedList<TimerRequest> normalMsgStack = new LinkedList<>();
            LinkedList<TimerRequest> deleteMsgStack = new LinkedList<>();
            LinkedList<SelectMappedBufferResult> sbrs = new LinkedList<>();
            SelectMappedBufferResult timeSbr = null;
            //read the timer log one by one, 開始遍歷當前時間的所有消息
            while (currOffsetPy != -1) {
                // 讀取出timelog對應的unit
                if (null == timeSbr || timeSbr.getStartOffset() > currOffsetPy) {
                    timeSbr = timerLog.getWholeBuffer(currOffsetPy);
                    if (null != timeSbr) {
                        sbrs.add(timeSbr);
                    }
                }
                if (null == timeSbr) {
                    break;
                }
                long prevPos = -1;
                try {
                    // 讀取buffer獲取相關數據並構造timerRequest
                    TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy,
                        delayedTime, enqueueTime, magic);
                    timerRequest.setDeleteList(deleteUniqKeys);
                    // 判斷消息是否不用滾動並是刪除類型,是就加進deleteMsgStack表示刪除類型消息
                    if (needDelete(magic) && !needRoll(magic)) {
                        deleteMsgStack.add(timerRequest);
                    } else {
                        normalMsgStack.addFirst(timerRequest);
                    }
                } catch (Exception e) {...} finally {//計算下次讀取timelog位點,以及指標計算}
            }
            // 下面先投遞delete類型msg,確保可以精準刪除對應消息
            CountDownLatch deleteLatch = new CountDownLatch(deleteMsgStack.size());
            //read the delete msg: the msg used to mark another msg is deleted
            for (List<TimerRequest> deleteList : splitIntoLists(deleteMsgStack)) {
                for (TimerRequest tr : deleteList) {
                    tr.setLatch(deleteLatch);
                }
                dequeueGetQueue.put(deleteList);
            }
            //do we need to use loop with tryAcquire
            checkDequeueLatch(deleteLatch, currReadTimeMs);
 
            CountDownLatch normalLatch = new CountDownLatch(normalMsgStack.size());
            //read the normal msg
            for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
                for (TimerRequest tr : normalList) {
                    tr.setLatch(normalLatch);
                }
                dequeueGetQueue.put(normalList);
            }
            checkDequeueLatch(normalLatch, currReadTimeMs);
            } catch (Throwable t) {...}
        return 1;
    }

b.  而後 TimerDequeueGetMessageService 從 dequeueGetQueue 中取出數據並根據索引信息,從消息文件中查出對應的 msgs,並將其放入待寫入消息文件的隊列中(dequeuePutQueue)。

c.  最後 TimerDequeuePutMessageService 將這個 Putqueue 中的消息取出,若已到期則修改 Topic,放回 Commitlog(投遞到真正的 Topic),否則繼續按指定主題(TIMER_TOPIC)寫回 CommitLog 滾動(避免消息過期)。

使用示例

Message message = new Message(TOPIC, ("Hello" + i).getBytes(StandardCharsets.UTF_8));
// 延遲 10s 後投遞
message.setDelayTimeSec(10);
// 延遲 10000ms 後投遞,投遞到服務端後計算定時時間,即投遞到服務端的時間+delayTime
message.setDelayTimeMs(10_000L);
// 定時投遞,定時時間為當前時間 + 10000ms
message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);
// 發送消息
SendResult result = producer.send(message);

騰訊雲定時消息的技術優化

技術實現原理

至此我們已經支持了秒級精度,超長時間定時消息,看似已經是完美的實現,那這文件版時間輪定時消息還有什麼問題嗎?

RocketMQ 5.0 社區開發的基於文件版,利用反向鏈表索引的方案,大大降低了存儲成本,但是反向鏈表的掃描效率不高,SSD 盤下基本1000 TPS 就會成為瓶頸,造成調度誤差增大。

騰訊雲選取 RocksDB 支持定時消息多級時間輪,利用 KV 結構可以快速範圍掃描某一時刻的定時消息,保證更精準的定時調度。

image.png

我們為小時/分鐘/秒鐘都設置為一個 Wheel 管理,類似鐘錶,通過秒針轉動一圈驅動分針轉動,分針轉動一圈驅動時針轉動;當定時超過一天時,仍放進小時級別的時間輪,後續會重新投遞此消息,避免消息過期;

壓測實驗結果

壓測場景

  • 定時業務場景: 發送一億條消息,延遲時間隨機30秒~10分鐘,一個下游實時消費。
  • 普通對比場景: 髮型普通消息一億條消息,一個下游實時消費,對比資源利用率。
  • Broker節點規格: 8C16G 1T SSD 雲盤。
  • 延遲誤差定義: 定時消息指定的預期觸發時間和實際觸發消息的差值,預期1s以內,超過1s時,對下游表現為消費延遲增大。

壓測結論

  1.  定時消息在寫入 14000 TPS 時,發送耗時平穩,延遲誤差可以控制在1s內。
  2.  對比普通消息,定時消息對資源的消耗約為普通消息的一倍,符合設計預期。

壓測詳細數據

延遲時間隨機30秒~10分鐘,延遲誤差比較低,P999 穩定在1s以內。

生產線程數 生產TPS 發送平均耗時 (ms) 延遲誤差P50 (ms) 延遲誤差P90 (ms) 延遲誤差P99 (ms) 延遲誤差P999 (ms) 機器負載
16 14000 1.2 301.0 690.0 904.0 979.0 cpu30%

普通消息對比

作為對比,普通消息14000 TPS,發送耗時穩定,並且 CPU 利用率是定時消息的一半,符合設計預期,發送耗時和消費耗時也符合預期。

生產線程數 生產TPS 發送平均耗時 (ms) 端到端平均耗時 (ms) 機器負載
16 14000 1.1 2 cpu15%

方案優勢總結

  • 高精度調度:在較高吞吐下,依然保持1s的精準調度。
  • 無限延時支持:理論上支持無限的延時時長。
  • 精細化監控:提供更精準的指標統計,便於問題排查。
  • 低成本高可靠:嵌入式 KV 不需要外掛其他存儲,降低運維複雜度與成本。

消息查詢優化

社區版對於未觸發的定時消息,根據 MessageID 無法查詢,騰訊雲版優化後支持根據 MessageID 查詢未觸發的定時消息。

使用示例

發送消息:延時一分鐘

SendResult [sendStatus=SEND_OK, msgId=155BEAE2BB5218B4AAC2318E41AF0000, offsetMsgId=0B8D67E5000022CF0000000001C66D54, messageQueue=MessageQueue [topic=test-timer, brokerName=vbroker-rmq-16x4gww4j5-0, queueId=1], queueOffset=1794, recallHandle=null]

根據 MessageID 查詢並查看消息軌跡:

image.png

image.png

總結

騰訊雲 TDMQ RocketMQ 版定時消息完全兼容開源,極其容易上手使用。

該方案在保證高性能的同時,兼顧了存儲效率與調度準確性,是騰訊雲在消息中間件領域的又一技術創新,歡迎大家來使用騰訊雲 TDMQ RocketMQ 版!

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.