博客 / 詳情

返回

Apache Pulsar 技術系列 - 大規模延遲消息解析

導語

Apache Pulsar 是一個多租户、高性能的服務間消息傳輸解決方案,支持多租户、低延時、讀寫分離、跨地域複製(GEO replication)、快速擴容、靈活容錯等特性。在很多場景下,用户需要使用到延遲消息,本文是 Pulsar 技術系列中的一篇,主要介紹 Pulsar 3.x 大規模延遲消息投遞的實現。

背景

之前有文章介紹過延遲消息的使用場景、使用方式以及實現原理,同時也提出了當時版本的侷限性,完全基於內存構建延遲消息索引,導致無法支持大規模延遲消息場景。Pulsar 在 3.x 版本支持了基於磁盤的延遲消息索引方案,使得內存不再是延遲消息規模的瓶頸。

大規模延遲消息方案

Pulsar 消費流程

在講解延遲消息之前,先簡單描述一下 Pulsar 服務端消費處理流程。Broker 為每個訂閲維護單獨的 Dispatcher 對象,Dispatcher 負責管理整個訂閲的消費。大致流程如下:

  1. Dispatcher 啓動後初始化 Markdelete(已 Ack 最小位置)和 ReadPosition(當前讀位置)(起始時 ReadPosition 為 MarkDelete 的下一個位置)。
  2. 如果 RedeliveryTracker 中有數據,優先推送 RedeliveryTracker 中數據, 否則從 Bookie 中讀取數據。
  3. 普通數據: 推送數據給合適的 Consumer,Consumer 未就緒時,拉取到的數據把索引信息存儲到 RedeliveryTracker 中。
  4. 延遲消息: 如果時間到了,直接推送給用户,否則添加到 DelayMesageTracker 裏。
  5. DelayMesageTracker 會定時把到期數據倒入 RedeliveryTracker。

可以簡單理解為,Dispatcher 持續往後讀取數據,已過期延遲消息就和普通消息一樣推送給客户端。其餘延遲消息會被添加到 DelayedDeliveryTracker,數據到期後再推送給客户端。

DelayedDeliveryTracker

延遲消息是由 DelayedDeliveryTracker 管理,從上圖中可以看到,DelayedDeliveryTracker 主要功能為添加消息以及讀取已經到期的消息,代碼片段如下:

public interface DelayedDeliveryTracker extends AutoCloseable {
    # 添加消息
    boolean addMessage(long ledgerId, long entryId, long deliveryAt);
    # 獲取到期消息
    NavigableSet<PositionImpl> getScheduledMessages(int maxMessages);
    # 其餘為輔助方法
    boolean hasMessageAvailable();
    long getNumberOfDelayedMessages();
    long getBufferMemoryUsage();
    boolean shouldPauseAllDeliveries();
    void resetTickTime(long tickTime);
    CompletableFuture<Void> clear();
    void close();}

可以看到,DeliveryTracker 只添加 Position 信息,Data 信息是不需要保存的。目前具體的實現 Tracker 有兩個,InMemoryDelayedDeliveryTracker 和 BucketDelayedDeliveryTracker,前者為完全基於內存的老版本實現,後者為基於 BK 存儲支持超大規模延遲消息的實現,下文主要分析 BucketDelayedDeliveryTracker 的實現原理。

BucketDelayedDeliveryTracker

如何存儲

Broker 設計為無狀態服務,所以基於磁盤的 DelayedDeliveryTracker 是 Bookie 來負責。BucketDelayedDeliveryTracker 為了降低磁盤的寫入次數(寫入 Bookie),會積累到一定量延遲消息索引後再觸發寫入。可以看到 Bucket 生命週期和 Ledgers 是很類似的,只有最後一個 Bucket 支持寫入(LastMutableBucket),前面的 Bucket 只支持讀取(ImmutableBuckets),讀取完成後會刪除。由於 Ledger 是遞增的,所以可以看到每個 Bucket 中 Ledger 也是遞增的,每個 Bucket 負責存儲一定範圍的 Ledger 延遲消息。

數據加入 DeliveryTracker 具體流程如下:

  1. 判斷是否存在,如果存在了,直接返回(Bucket 內的 Metadata 有 Bitmap 快速標識哪些 Entry 是已存在的)。
  2. 如果需要加入的消息 LedgerId 不包含於 LastMutableBucket 中的話,説明之前的數據沒有存儲到(不應該出現)。直接放到 Share 池子裏面。
  3. 其餘正常情況會直接存儲在 LastMutableBucket 中,LastMutableBucket 中的延遲消息累計到一定數量後會生成新的 ImmutableBucket 並刷入磁盤。
  4. 刷盤成功後,會清空 LastMutableBucket 數據重新接收新的延遲消息寫入。

當 LastMutableBucket 累計到一定量的延遲消息後(默認 5w,會存儲完最後一個 Ledger 的全部延遲消息再切換),會觸發刷盤, 具體步驟如下:

  1. LastMutableBucket 內存中的延遲消息,按照時間(5min)、條數(5000)維度分成多個 Segment(簡單理解第一個 Segment 是最新5分鐘的消息,第二個是最近5~10分鐘消息)。
  2. 生成好每個 Segement 的元數據(包含每個 Segement 中消息的最大和最小時間,以及用來快速判斷某個消息是否是延遲消息的 Bitmap)。
  3. 創建一個新的 Ledger,並把元數據存儲在對應 Cursor Property 中。格式為 <#pulsar.internal.delayed.bucket-${startLedgerId}-${endLedgerId},${bucketLedgerId>。
  4. 將元數據寫入 Entry0 位置,其餘 Segment 分別作為一個 Entry 順序寫入,並關閉 Ledger。

可以看到,整個 Bucket 通過幾次 Append 寫入就能把全部的延遲信息落盤,已寫入的數據不能修改,只能刪除。

如何讀取

BucketDelayedDeliveryTracker 在 Addmsg 時已經把延遲消息寫入到磁盤,內存中只會存儲部分的延遲消息。在 BucketDelayedDeliveryTracker 中內存中的延遲消息都存儲在 SharedBucketPriorityQueue(小堆實現的優先級隊列)中。所有的延遲消息都通過 SharedBucketPriorityQueue 來獲取。

讀取相對來説比較簡單, 大致流程如下:

  1. 每次拉取延遲消息時會先把 LastMutableBucket 中到期的延遲消息轉移到 SharedBucketPriorityQueue 中。
  2. 在每個 ImmutableBucket 刷磁盤時,已經把第一個Segment 加載到 SharedBucketPriorityQueue 中。
  3. 當 ImmutableBucket 中的當前 Segement 中的最後一條消息被獲取後,會觸發從 Bookie 中加載下一個 Segement 存入 SharedBucketPriorityQueue。

何時刪除

ImmutableBucket 被使用完後,在以下幾個時機會被刪除。

  1. Bucket 中的最後一個 Segement 的最後一條數據被讀取後(實現上為加載下一個 Segment 發現未空時)。
  2. 訂閲重新加載時(分區 Leadership 發生變化)時,如果 Bucket 中的延遲消息都已到期。
  3. Bucket 觸發 Merger 後(可控制內存中的 Bucket 個數進而控制內存消耗),會刪除原先的 Bucket。用户設置了 MaxNumBuckets,已存在 Bucket 個數大於這個配置值時,挑選若干個 Bucket 合併成一個 Bucket,並刪除掉原有的 Bucket。
  4. 用户調用 

    org.apache.pulsar.client.admin.Topics#skipAllMessages。

  5. ImmutableBucket 刪除時會先清空 ZK 中的元數據,再刪除對應的 Ledger。

Bucket 數據丟失

如果延遲消息在 LastMutableBucket 中還沒有刷盤到 Bookie,此時發生故障,LastMutableBucket 中的數據將會丟失。但是這並不會有什麼影響,在重新啓動後,依然會從 MackDelete 位置往後讀取消息,重建 Bucket。這也就是為什麼 Bucket 中的數據只需要被讀走(不需要客户端 Ack)就可以被刪除。

重建 Bucket

重建 Bucket 其餘與第一次構建 Bucket 是一致的,都是往後讀取消息,未過期的延遲消息重新加入 Bucket 中,已過期和延遲消息會被當做普通消息直接推送給客户端。重建 Bucket 過程資源消耗可控,也不會阻塞消費,暫時不必擔心重建 Bucket 對消費造成額外壓力。

總結

Pulsar 3.x 版本大規模延遲消息方案整體比較簡單,採用先分桶,再分段的策略,只在內存中保存最近的延遲消息,延遲消息規模將不再受到內存的限制。新版本實際壓測下來與設計基本一致,內存佔用達到穩定後將不會上漲,目前驗證單節點數十億延遲消息穩定運行。

未來規劃

當前的實現版本 DelayedDeliveryTracker 是基於訂閲維度,如果 Topic 下有很多訂閲,佔用的內存和磁盤存儲會隨着訂閲數量等比例放大。目前這裏有一定的優化空間,可以 Topic 下的多訂閲共享 DeliveryTracker 存儲,類似每個訂閲有單獨的 ReadPosition 即可。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.