博客 / 詳情

返回

TDMQ RocketMQ 版訂閲關係一致性原理與實踐

導語

騰訊雲 TDMQ RocketMQ 版是基於 Apache RocketMQ 打造的滿足金融級高可靠的在線業務消息隊列產品,憑藉其高可用、高可靠等特性,被廣泛應用於金融、電商,社交等高併發場景,獲得了各行各業用户的廣泛認可。在實際使用中, 訂閲關係不一致是開發者經常容易遇到的一個問題,可能會導致消息消費異常、消息丟失等現象。

本文將深入解析訂閲關係一致性的核心要點,從定義與約束機制,到底層實現原理與優化實踐,再結合真實案例分享 TDMQ RocketMQ 版針對訂閲關係不一致問題的解決方案,幫助開發者快速定位問題根源,構建穩定可靠的消息系統。

訂閲關係定義

訂閲關係是 RocketMQ 系統中消費者獲取消息、處理消息的規則和狀態配置,訂閲關係由消費者組動態註冊到服務端,並在後續的消息傳輸中按照訂閲關係定義的過濾規則進行消息匹配和消費進度維護。

通過配置訂閲關係,可控制如下消費行為:

  • 消息過濾規則:用於控制消費者在消費消息時,選擇主題內的哪些消息進行消費,設置消費過濾規則可以高效地過濾消費者需要的消息集合,靈活根據不同的業務場景設置不同的消息接收範圍。
  • 消費狀態:RocketMQ 服務端默認提供訂閲關係持久化的能力,即消費者分組在服務端註冊訂閲關係後,當消費者離線並再次上線後,可以獲取離線前的消費進度並繼續消費。

在 RocketMQ 的領域模型中,訂閲關係的位置和流程如下:

  1. 消息由生產者初始化併發送到 RocketMQ 服務端。
  2. 消息按照到達 RocketMQ 服務端的順序存儲到主題的指定隊列中。
  3. 消費者按照指定的訂閲關係從 RocketMQ 服務端中獲取消息並消費。

訂閲關係一致性約束

訂閲關係一致性要求同一消費者組內的所有消費者實例所訂閲的主題必須和過濾規則完全一致。這裏涉及三個約束條件,具體來看:

  • 消費組必須一致

對於大多數分佈式應用來説,一個消費組下通常會掛載多個 Consumer 實例,訂閲關係一致性的約束範圍就是同一個消費組下的所有消費者。

  • 訂閲的主題必須一致

同一個消費組下的所有消費者訂閲的主題必須一致,例如:Consumer1 訂閲 TopicA 和 TopicB,Consumer2 也必須訂閲 TopicA 和 TopicB,不能只訂閲 TopicA、只訂閲 TopicB 或訂閲 TopicA 和 TopicC。

  • 過濾規則必須一致

同一個消費組下的所有消費者過濾規則必須一致,包括 Tag 的數量和 Tag 的順序,例如:Consumer1 訂閲 TopicB 且 Tag 為 Tag1||Tag2,Consumer2 訂閲 TopicB 的 Tag 也必須是 Tag1||Tag2,不能只訂閲 Tag1、只訂閲 Tag2 或者訂閲 Tag2||Tag1。

訂閲關係一致的示例

下圖展示了常見的兩種正確的訂閲關係,分別對應兩種情況:

正確示例一:單 Topic 單 Tag 訂閲

如圖中 Group 1 的 Consumer1 和 Consumer2 都訂閲 TopicA 中所有消息。

正確示例二:單 Topic 多 Tag 訂閲

如圖中 Group 2 的 Consumer1 和 Consumer2 都訂閲 TopicA 中 Tag 為 Tag1 或 Tag2 的消息,且順序都是 Tag1||Tag2。

訂閲關係不一致的示例

下圖展示了三種典型錯誤的訂閲關係,分別對應三種情況:

錯誤示例一:訂閲 Topic 不同

如圖中 Group 1 的 Consumer1 和 Consumer2 分別訂閲了不同的 Topic。

錯誤示例二:訂閲 Topic 相同但 Tag 不同

如圖中 Group 2 的 Consumer1 訂閲 TopicA 的 Tag1,Consumer2 訂閲 TopicA 的 Tag2。

錯誤示例三:訂閲 Topic 和 Tag 都相同但 Tag 順序不同

如圖中 Group 3 的 Consumer1 訂閲 TopicA 的 Tag1||Tag2,Consumer2 訂閲 TopicA 的 Tag2||Tag1,這裏雖然訂閲 Tag 都相同但順序不同,也不符合訂閲一致性約束。

訂閲關係不一致的影響

如果訂閲關係不一致,可能導致消息消費邏輯混亂,消息被重複消費或遺漏。

如下示例,這裏我們啓動兩個 Consumer,它們都屬於消費組 Group1,都訂閲了主題 TopicA,但是 Consumer1 訂閲的是 Tag1 的消息,Consumer2 訂閲的是 Tag2 的消息。

String topic = "TopicA";
String consumerGroup = "Group1";
FilterExpression filterExpressionTag1 = new FilterExpression("Tag1", FilterExpressionType.TAG);
PushConsumer consumer1 = provider.newPushConsumerBuilder()
    .setConsumerGroup(consumerGroup)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpressionTag1))
    .build();
FilterExpression filterExpressionTag2 = new FilterExpression("Tag2", FilterExpressionType.TAG);
PushConsumer consumer2 = provider.newPushConsumerBuilder()
    .setConsumerGroup(consumerGroup)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpressionTag2))
    .build();

這種情況下兩個客户端分別會有什麼表現呢?

  • Consumer1 消費者無法消費 Tag 值為 Tag1 的消息,因為 Consumer1 消費者在拉取消息時,服務端該消費組的訂閲信息中 Tag 值為 Tag2,經過服務端過濾後,Consumer1 消費者拉取到的消息的 Tag 值都是 Tag2 , 但消費者在收到消息後也會進行過濾,這部分消息也都被過濾掉了。
    Consumer2 消費者只能消費部分 Tag 值為 Tag2 的消息,因為只有部分隊列分配給了 Consumer2。

但是在服務端同一個消費組內的各個消費者客户端的訂閲信息會相互被覆蓋,所以這種消費狀態非常混亂,上面示例中 Consumer1 和 Consumer2 的消費情況可能也會發生切換。

訂閲關係一致性原理

通過上邊的示例我們可以看到,訂閲關係不一致時,客户端消費邏輯是不確定的,那麼這個現象是如何形成的呢?讓我們通過源碼來一探究竟。

注:以下涉及源碼均出自 Apache RocketMQ 社區 release-5.2.0 分支。

客户端上報訂閲關係

消費者啓動後,會定時向所有 Broker 發送心跳包,攜帶訂閲關係信息。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

public synchronized void start() throws MQClientException {
        this.mQClientFactory.checkClientInBroker();
        if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
            this.mQClientFactory.rebalanceImmediately();
        }
    }

org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData

public class HeartbeatData extends RemotingSerializable {
    // 消費者客户端ID
    private String clientID;
    private Set<ConsumerData> consumerDataSet = new HashSet<>();
}
public class ConsumerData {
    // 消費組ID
    private String groupName;
    // 訂閲關係
    private Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
}
public class SubscriptionData implements Comparable<SubscriptionData> {
    // 訂閲主題
    private String topic;
    // 過濾的 Tag 列表
    private Set<String> tagsSet = new HashSet<>();
    // 上報的時間戳(版本號)
    private long subVersion = System.currentTimeMillis();
    // 過濾類型
    private String expressionType = ExpressionType.TAG;
}

可以看到心跳包中包含了當前消費者客户端的 ID 和客户端消費信息,其中消費信息包含該消費者屬於哪個消費組,還有該消費者的訂閲關係列表,每個訂閲關係中表明瞭它訂閲的是哪個主題、哪種訂閲類型、用來過濾消息的 Tag 列表和當前的時間戳。

服務端處理訂閲關係

Broker 在接收到心跳後,會更新本地的訂閲關係表。

org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor#registerConsumer

public boolean registerConsumer(...) {
        // 獲取或創建消費組信息
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        // 更新訂閲關係
        consumerGroupInfo.updateSubscription(subList);
    }
public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;
        for (SubscriptionData sub : subList) {
            // 根據 Topic 查對應的訂閲關係
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
            // 訂閲關係不存在,更新本訂閲關係
            if (old == null) {
                this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            // 如果已存在則用新的覆蓋舊的
            } else if (sub.getSubVersion() > old.getSubVersion()) {
                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }
        return updated;
    }

這裏可以看到 Broker 在更新訂閲關係時,同一個消費組下訂閲的同一個主題的訂閲關係是直接使用最新上報的關係,那麼不同客户端上報的訂閲關係不一致時服務端報錯的訂閲關係就會一直被相互覆蓋,只會以最新上報的訂閲關係為準。

根據訂閲關係過濾消息

而消費者客户端在拉取消息時,Broker 會使用已保存的訂閲關係來進行過濾。

org.apache.rocketmq.store.DefaultMessageStore#getMessage

@Overridepublic
boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    if (null == tagsCode || null == subscriptionData) {    
        return true;
    }
    if (subscriptionData.isClassFilterMode()) {    
        return true;
    }
    // 匹配邏輯
    return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)||     subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

那麼當 Broker 上的訂閲關係一致在變化時,過濾消息的結果就可能是不符合預期的。詳細實現原理參考:《Apache RocketMQ 消息過濾的實現原理與騰訊雲的使用實踐》。

騰訊雲優化實踐

訂閲關係不一致會直接導致消息消費異常,需快速定位並修復。TDMQ RocketMQ 版控制枱提供可視化檢測能力,無需人工逐個排查日誌或配置,通過控制枱 3 步即可完成問題的發現、定位、修復,降低運維複雜度。

1、一鍵檢測

自動對比消費組內所有客户端的訂閲配置,高亮顯示出不一致的訂閲關係。

2、精準定位

點擊不一致詳情,直接關聯到具體客户端實例,可快速判斷出非預期的訂閲關係所在的實例。

3、閉環驗證

修訂後實時同步訂閲關係一致性狀態,確保消費組訂閲關係符合預期。

常見問題

哪些典型場景會出現訂閲關係不一致?

  • 環境未完全隔離,非生產環境和生產環境使用了相同的 Group 訂閲不同的 Topic。
  • 業務代碼修改了訂閲關係,在灰度和版本發佈過程中,新舊版本消費者共存。

總結

訂閲關係一致性是 RocketMQ 消息系統消費行為正確的核心保障。本文通過訂閲關係定義、一致性約束機制、原理分析與騰訊雲優化實踐,系統闡述了訂閲關係不一致的潛在風險與解決方案:

  1. 核心約束:同一消費組內的所有消費需嚴格遵循主題一致、過濾規則一致(含 Tag 順序) 的原則,任一環節的差異均可能導致漏消費消息。
  2. 騰訊雲能力:依託控制枱的一鍵檢測、精準定位、閉環驗證功能,開發者可快速識別異常實例並修復,將傳統人工排查耗時從小時級縮短至分鐘級。
  3. 最佳實踐:建議通過消費組隔離、動態配置強校驗、多 Topic 場景分層治理等策略,從源頭規避訂閲關係不一致風險。

通過本文的介紹與案例分析,相信讀者對 TDMQ RocketMQ 版的訂閲關係一致性機制有了更深入的理解,並能夠在實際項目中靈活應用這一機制,保障消息 100% 按預期路由,避免因配置偏差導致業務消息漏消費。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.