博客 / 詳情

返回

TDMQ RocketMQ 版事務消息原理解析

引言

在分佈式架構系統中,確保跨服務應用的數據一致性始終是系統設計的核心挑戰之一。TDMQ RocketMQ 版作為一款基於 Apache RocketMQ 構建的企業級消息中間件,憑藉其高可用性和高可靠性特點,通過提供完善的事務消息機制,為這一難題提供了專業的解決方案。本文將結合核心源碼,深入解析 RocketMQ 事務消息的實現原理,希望能幫助開發者去構建更健壯的分佈式事務系統。

事務消息的概念與應用場景

事務消息是 RocketMQ 提供的一種高級特性消息,通過將二階段提交的動作和本地事務綁定,來保障分佈式場景下消息生產和本地事務的最終一致性,相比普通消息,主要是擴展了二次確認和本地事務狀態回查補償的機制。

在電商平台中,積分兑換商品這一常見功能就涉及到分佈式事務,用户發起兑換後,可能涉及創建兑換訂單、扣除用户積分、通知發貨服務、扣減庫存等一系列動作,此時就要保證訂單服務和多個下游業務執行結果的最終一致性,如果積分扣除成功但訂單創建失敗,會導致用户積分被扣但未獲得商品,如果訂單創建成功但積分扣除失敗,會導致用户獲得商品但未扣除積分。

我們可以採用 TDMQ RocketMQ 版事務消息來實現這一功能,具體分為以下三個階段:

1、階段一:發送事務消息(準備核銷積分)

用户提交訂單並選擇使用積分兑換後,訂單服務向 RocketMQ 服務端對應的業務 Topic 發送一條事務消息,內容包含 “用户 user-001 發起訂單 Order-001 並使用 1000 積分兑換商品 A”。此時,該消息對下游的積分服務和庫存服務等均不可見,避免在訂單服務事務完成前,積分服務提前扣減積分,確保積分不會被誤扣。

2、階段二:執行本地事務(創建訂單)

事務消息發送成功後,訂單服務繼續執行本地事務,創建訂單並預佔積分,若本地事務成功,則提交二次確認 Commit 到 RocketMQ 服務端,消息被繼續投遞到下游,反之,提交 Rollback,事務結束,積分狀態保持不變。

3、階段三:下游服務消費(扣減積分並更新庫存)

積分服務和庫存服務預先訂閲上面的 Topic,接收到消息後,積分服務扣減積分,庫存服務更新庫存。若消費過程中因網絡異常、服務不可用等問題導致失敗,RocketMQ 將自動觸發重試機制,若多次重試仍未成功,消息將轉入死信隊列,後續由人工介入核對,通過補償流程保障積分和庫存數據的最終一致性。

通過以上三個階段,RocketMQ 事務消息機制在積分核銷場景中,保障了訂單本地事務和消息發送的同時成功/失敗,成功實現了分佈式事務的最終一致性。類似的,在金融交易、企業多系統數據同步等場景中,RocketMQ 事務消息都能憑藉其可靠的機制,保障跨服務操作的最終一致性。那麼,RocketMQ 事務消息究竟是如何在底層實現這些複雜操作,確保最終一致性的呢?接下來,我們深入探究其背後的原理。

事務消息實現原理詳解

術語説明

在分析 RocketMQ 事務消息的實現原理之前,有必要先了解一下這些概念和術語:

1、半消息 half message:

生產者發送事務消息到 RocketMQ 服務端後,消息會被持久化並標記為“暫不可投遞”的狀態,直到本地事務執行完成並確認後,消息才會決定是否對消費者可見,此狀態下的消息,稱為半消息(Half Message)。

2、二階段提交

實現事務最終一致性的關鍵機制,一階段為發送 Half Message,二階段為生產者執行本地事務,並根據執行結果向 RocketMQ 服務端提交 Commit(允許投遞)或 Rollback(丟棄消息)的確認結果,以此來決定 Half Message 的去留。

3、OP 消息:

用於給消息狀態打標記,沒有對應 OP 消息的 Half Message,就説明二階段確認狀態未知,需要 RocketMQ 服務端進行本地事務狀態主動回查,OP 消息的內容為對應的 Half Message 的存儲的 Offset。

4、相關 Topic:

  • Real Topic:業務真實 Topic,生產者發消息時指定的 Topic 值。
  • Half Topic:系統 Topic,Topic名稱為 RMQ_SYS_TRANS_HALF_TOPIC,用於存儲 Half Message。
  • OP Topic:系統 Topic,Topic名稱為 MQ_SYS_TRANS_OP_HALF_TOPIC,用於存儲 OP 消息, Half Message 二次狀態確認後,不管是 Commit 還是 Rollback,都會寫入一條對應的 OP 消息到這個 Topic。

事務消息處理流程

瞭解完基本概念,結合上面的業務場景,我們來看 RocketMQ 事務消息的實現流程:

步驟説明:

  1. 生產者發送事務消息到 RocketMQ 服務端。
  2. 服務端存儲這條消息後返回發送成功的響應,此時消息對下游消費者不可見,處於Half Message 狀態。
  3. 生產者收到半消息成功的響應後,繼續往下執行本地事務(如更新業務數據庫)。
  4. 根據本地事務的執行結果,生產者會向 RocketMQ 服務端提交最終狀態,也就是二次確認。
  5. 確認結果為 Commit 時,服務端會將事務消息繼續向下投遞給消費者,確認結果為 Rollback 時,服務端將會丟棄該消息,不再向下投遞。
  6. 確認結果是 Unknown 或一直沒有收到確認結果時,一定時間後,將會觸發事務狀態主動回查。
  7. 當生產者未提交最終狀態或者二次確認的結果為 Unknown 時,RocketMQ 服務端將會主動發起事務結果查詢請求到生產者服務。
  8. 生產者收到請求後提交二次確認結果,邏輯再次回到第5步,此時如果生產者服務暫時不可用,則 RocketMQ 服務端會在指定時間間隔後,繼續主動發起回查請求,直到超過最大回查次數後,回滾消息。

如此,不管本地事務是否執行成功,都能實現事務狀態的最終一致性。以上步驟,可用時序圖直觀體現為:

半消息的具體實現

瞭解了事務消息基本的實現流程後,你可能會有疑問,半消息為什麼對消費者不可見?二次確認 Commit 或者 Rollback 後,服務端如何投遞或者刪除半消息?前面提到,Half Message 在服務端做了持久化,但在消費端卻不可見,實現這一效果的方式,就是 Topic 替換:首先將事務消息的 Real Topic 和隊列信息作為屬性暫存起來,以便後續二階段提交結果為 Commit 時,能正確地投遞到下游消費者,然後將消息的 Topic 改為系統 Topic RMQ_SYS_TRANS_HALF_TOPIC,隊列 ID 改為0,用户的消費者正常不會訂閲這個系統 Topic,自然也就不能看到 Half Message。

Half Message 被成功投遞到上面的系統 Topic 後,開始執行本地事務,如果生產者提交的本地事務二次確認結果為 Commit,則在消息屬性中獲取消息的 Real Topic、隊列等信息,設置 Topic = Real Topic後,再投遞下游,最後刪除 Half Message(邏輯刪除),如果二次確認結果為 Rollback,則只需要邏輯刪除對應的 Half Message 即可。這裏邏輯刪除的實現,就是前面提到的 OP Topic,OP 隊列中的消息,記錄了 Half Message 對應的二次確認狀態,根據這個狀態,RocketMQ 服務端會進行第二個核心機制:事務狀態主動回查。

事務回查的具體實現

Half Message 寫入後,可能會因為種種原因,導致 RocketMQ 服務端一直收不到二次確認結果,比如網絡異常、生產者服務暫時不可用、本地事務死鎖導致執行時間超長等,此時,就需要 RocketMQ 服務端主動去詢問生產者服務本地事務是否執行成功,以決定 Half Message 的最終去留。

RocketMQ 服務端會啓動事務檢查定時任務,默認每60秒執行一次,最大回查15次,可通過 TransactionCheckInterval 和 TransactionCheckMax 這兩項配置按業務實際情況進行定製化調整。回查時,會對比 Half 隊列和 OP 隊列的偏移量,若發現 Half 消息未在 OP 隊列中有對應的記錄且 Half Message 的留存時間超過了事務超時時間(前面分析過,Half Message 是否被二次確認過,是根據 OP 隊列來判斷的),則觸發主動回查動作,向生產者服務發起事務狀態檢查請求,如此,就解決了部分事務消息狀態懸而未決的問題,實現了本地事務和消息發送之間的最終一致性。

事務消息核心源碼解析

分析完具體的實現原理,接下來我們對照 Half Message 發送、二次確認提交、事務主動回查這三個關鍵部分的源碼實現,來具體看看以上理論在代碼中的體現:

發送事務消息

首先看看事務消息的發送的具體實現,核心代碼為org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl 下的sendMessageInTransaction 方法:

public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException {
        // 檢查事務監聽器對象是否為空,後續本地事務的執行和回查都依靠它
        TransactionListener transactionListener = this.getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", (Throwable)null);
        } else {
            if (msg.getDelayTimeLevel() != 0) {
                // 延遲消息屬性在這裏不生效 
                MessageAccessor.clearProperty(msg, "DELAY");
            }
            // 消息內容必要性檢查
            Validators.checkMessage(msg, this.defaultMQProducer);
            SendResult sendResult = null;
            // 添加事務消息相關屬性
            MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());

            try {
                // 開始發送消息 
                sendResult = this.send(msg);
            } catch (Exception var11) {
                Exception e = var11;
                throw new MQClientException("send message Exception", e);
            }
            
            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            // 如果消息發送成功,則繼續處理本地事務
            switch (sendResult.getSendStatus()) {
                case SEND_OK:
                    try {
                        if (sendResult.getTransactionId() != null) {
                            msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                        }

                        String transactionId = msg.getProperty("UNIQ_KEY");
                        if (null != transactionId && !"".equals(transactionId)) {
                            msg.setTransactionId(transactionId);
                        }

                        if (null != localTransactionExecuter) {
                            // 開始執行本地事務,並得到一個本地事務狀態的結果
                            localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                        } else if (transactionListener != null) {
                            this.log.debug("Used new transaction API");
                            localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                        }

                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }

                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                            this.log.info(msg.toString());
                        }
                    } catch (Throwable var10) {
                        Throwable e = var10;
                        this.log.info("executeLocalTransactionBranch exception", e);
                        this.log.info(msg.toString());
                        localException = e;
                    }
                    break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                    // 消息發送都未成功,則本地事務狀態直接為rollback
                    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }

            try {
                // 根據本地事務狀態結果,進行下一步,決定half message是繼續投遞還是刪除
                this.endTransaction(msg, sendResult, localTransactionState, localException);
            } catch (Exception var9) {
                Exception e = var9;
                this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
            }

            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            // 返回消息發送的結果
            return transactionSendResult;
        }
    }

對照以上源碼,可以看到,生產者方法發送消息後,首先會對事務監聽器做非空校驗,因為後面本地事務的執行以及事務狀態的主動回查,都需要依賴它來完成,接下來的主要邏輯有四點:

  1. 給原始消息加一個 TRAN_MSG=true 的屬性,這是後面判定一條消息為事務消息的條件。
  2. 同步發送 Half Message,若發送失敗,則不再執行本地事務,保證了“同失敗”的事務一致性。
  3. 若發送成功,則開始執行我們設置的本地事務,並根據執行結果修改本地事務狀態值。
  4. 根據事務狀態值,來 endTransaction 做收尾工作,這裏包含了下面我們要説的事務回查和 Half Message 的刪除。

二次確認提交

Half Message 發送成功後,開始執行本地事務,並根據執行結果,提交二次確認結果Commit/Rollback 到 RocketMQ 服務端。

/**
 * @param msg: 原始消息對象
 * @param sendResult: 消息發送結果(包含事務ID、消息ID等)
 * @param localTransactionState: 本地事務執行狀態(COMMIT/ROLLBACK/UNKNOWN)
 * @param localException: 本地事務執行異常
 */
public void endTransaction(
    final Message msg,
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    // 優先使用offsetMsgId解析消息ID,獲取偏移量
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    // 事務ID
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    // 構建二次確認的請求頭
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
    // 不同的本地事務執行結果,設置不同的請求頭
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    // 執行事務結束鈎子,設置一些必要的上下文信息
    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    // 發送請求,請求Command為END_TRANSACTION
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

從上面源碼可以看出:本地事務執行結束後,事務收尾的工作內容主要包括:

  1. 解析消息 ID,獲取 Offset。
  2. 根據本地事務執行結果,構建不同的請求頭。
  3. 發送請求到 RocketMQ 服務端。

二次確認消息發送到 RocketMQ 服務端後,由核心類 EndTransactionProcessor 的 processRequest 方法來處理 Commit 或者 Rollback 消息,考慮到篇幅問題,這裏只分析邏輯刪除 Half Message 的部分:

// 二次確認結果是commit
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 從RMQ_SYS_TRANS_HALF_TOPIC(Half消息隊列)查詢待提交的消息,驗證消息的有效性
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 二次確認,判讀事務ID一直想、消息存儲時間有效性等
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 核心邏輯msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
            // 即從屬性中恢復消息一開始的真實topic
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            // 清理掉事務消息的標誌性屬性TRAN_MSG == true,下面要將消息做為普通消息向下投遞
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
            // 投遞消息到real topic
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                // 刪除half message,本質是向op topic寫入一條消息,標識這個half message已經確認過了
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
    // 二次確認結果為rollback
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        // 同上,必要性檢查
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 既然是需要回滾,那就只需要刪除對應的half message,同樣是向op topic 寫入一條消息
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}
 // Delete prepare message when this message has been committed or rolled back.
 @Override
public boolean deletePrepareMessage(MessageExt msgExt) {
    // 向op topic 寫入一條消息,消息內容是給對應的half message打了一個remove標記
    if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
        log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
        return true;
    } else {
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
        return false;
    }
}

可以看到,不管生產者提交的二次確認結果是 Commit 還是 Rollback,都會執行 deletePrepareMessage 方法,向 OP 隊列寫入消息,標識這條 Half Message 已經被處理過了,而並不是把這條消息物理刪除掉。和 Rollback 不同的是,Commit 時,需要先獲取並設置消息的 Real Topic 和 Real QueueId(這個在第一步發送 Half Message 時已經記錄在了消息屬性中),然後向下投遞,此時,消息對下游消費者可見。

事務狀態回查

對於始終沒有收到二次確認的消息,RocketMQ 服務端會主動發起事務回查,對於事務超時未確認的核心邏輯在org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl 的Check 方法,Check 方法被前面提到的定時任務每隔60秒調用一次,回查時,首先從 Half Topic 下的所有隊列開始:

  String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 獲取存儲half message的topic下的所有隊列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    // 沒有half message則直接退出
    log.warn("The queue of topic is empty :" + topic);
    return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);

然後遍歷每個隊列,和 op topic 下的數據做對比:

// 獲取對應的Op隊列(RMQ_SYS_TRANS_OP_HALF_TOPIC)
MessageQueue opQueue = getOpQueue(messageQueue);
// Half隊列消費位點
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// Op隊列消費位點
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);

如果 OP Topic 中有 Half Message 的相關記錄,就不再回查,否則,判讀消息是否需要被跳過回查(消息超過最大檢查次數、超過了消息最大保留時間、剛寫入的消息),並且對於超出最大檢查次數的消息,丟棄操作其實是將消息轉移到 TRANS_CHECK_MAX_TIME_TOPIC 這個系統 Topic。

// Half消息已處理過了(存在對應的Op記錄)
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// Half消息未被二次確認,根據偏移量查詢對應的消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
// 消息還存在,判斷是否需要丟棄或者跳過,丟棄是滿足檢查次數超出了最大檢查次數,跳過則是滿足消息留存時長超過了最大保留時間
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
    // 執行丟棄邏輯,發往TRANS_CHECK_MAXTIME_TOPIC這個系統Topic
    listener.resolveDiscardMsg(msgExt);
    newOffset = i + 1;
    i++;
    continue;
}
// 消息是剛寫入不久的,先跳過,一會兒再檢查
if (msgExt.getStoreTimestamp() >= startTime) {
    log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
        new Date(msgExt.getStoreTimestamp()));
    break;
}

直到判斷出 Half Message 沒有對應的 OP 記錄,並且消息留存時長超過了事務超時時間,開始組裝發送回查請求到生產者端。

// 獲取Op消息,查看消息二次確認的標識
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 沒有對應的Op消息,且留存時常超過了事務超時時間,觸發回查邏輯
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
        || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
        || (valueOfCurrentMinusBorn <= -1);

if (isNeedCheck) {
    // 重新保存half message,避免寫放大
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    // 調用sendCheckMessage方法,裏面在組裝和發送回查請求
    listener.resolveHalfMsg(msgExt);
} 

事務消息實踐指南

使用示例

這裏以 TDMQ 版 RocketMQ 5.x 版本集羣為例,演示事務消息的使用方式和效果。
1、首先登錄騰訊雲控制枱,新建一個消息類型為事務消息的 Topic。

2、以 Java 語言為例,引入 5.x 對應版本的依賴。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.6</version>
</dependency>

3、啓動生產者。

public class ProducerTransactionMessageDemo {

    private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageDemo.class);

    private static boolean executeLocalTransaction() {
        // 模擬本地事務(如數據庫插入操作),這裏假設執行成功
        return true;
    }

    private static boolean checkTransactionStatus(String orderId) {
        // 模擬查詢本地事務執行結果,如查詢訂單ID是否已入庫,查到則return true
        return true;
    }


    public static void main(String[] args) throws ClientException {

        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 在控制枱權限管理頁面獲取ak和sk
        String accessKey = "your-ak";
        String secretKey = "your-sk";
        SessionCredentialsProvider sessionCredentialsProvider =
            new StaticSessionCredentialsProvider(accessKey, secretKey);

        // 在控制枱獲取並填寫騰訊雲提供的接入地址
        String endpoints = "https://your-endpoints";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .enableSsl(false)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();

        String topic = "tran_topic";
        TransactionChecker checker = messageView -> {
            log.info("Receive transactional result check request, message={}", messageView);
            // 服務端主動回查本地事務狀態
            String orderId = messageView.getProperties().get("orderId");
            boolean isSuccess = checkTransactionStatus(orderId);
            return isSuccess ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
        };

        // 創建生產着並設置回查的checker對象
        Producer producer = provider.newProducerBuilder()
            .setClientConfiguration(clientConfiguration)
            .setTopics(topic)
            .setTransactionChecker(checker)
            .build();

        // 開啓事務
        final Transaction transaction = producer.beginTransaction();

        byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "tagA";
        final Message message = provider.newMessageBuilder()
            .setTopic(topic)
            .setTag(tag)
            .setKeys("your-key-565ef26f5727")
            //一般事務消息都會設置一個本地事務關聯的唯一ID,用來做本地事務回查的校驗
            .addProperty("orderId", "0001")
            .setBody(body)
            .build();

        // 發送半消息
        try {
            final SendReceipt sendReceipt = producer.send(message, transaction);
            log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
            return;
        }

        // 執行本地事務
        boolean localTxSuccess = executeLocalTransaction();
        if (localTxSuccess) {
            // 本地事務執行成功,二次確認為Commit
            transaction.commit();
        } else {
            // 本地事務執行失敗,二次確認為Rollback
            transaction.rollback();
        }
        // producer.close();
    }
}

4、運行代碼後,在控制枱的消息查詢頁面,可以看到已經有一條投遞完成等待消費的消息。

5、啓動消費者,訂閲這個 Topic,成功消費消息後,在騰訊雲控制枱查看消息軌跡:

6、修改代碼,假設本地事務執行失敗,使處於 Half Message 狀態的事務消息回滾。

private static boolean executeLocalTransaction() {
    // 本地事務執行失敗
    return false;
}

private static boolean checkTransactionStatus(String orderId) {
    // 回查結果自然也是rollback,返回false
    return false;
}

7、此時,可以發現消息發送成功了,但在控制枱的消息消息查詢頁面是不可見的,啓動消費者也不能消費到這條消息。

注意事項

使用事務消息過程中,需注意以下幾點:

  1. Topic 類型必須為事務 TRANSACTION,否則生產消息會報錯,關鍵錯誤信息:current message type not match with topic accept message types。
  2. 事務消息不支持延遲,若設置了延遲屬性,在發送消息前會被清除延遲屬性。
  3. 如果本地事務執行較慢,此時服務端進行事務回查時,應返回 Unknown,且如果確認本地事務執行耗時會很長,應修改第一次事務回查的時間,以避免產生大量結果未知的事務。

總結

本文從理論與源碼雙視角剖析了 TDMQ RocketMQ 版事務消息的三大核心流程——半消息的發送存儲、二階段提交及事務狀態回查的實現機制。在實際生產中,建議開發者通過冪等設計規避重複消費,合理設置事務超時時間,並關注 Topic 類型限制等約束條件,以充分發揮事務消息在分佈式場景中的價值。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.