引言
在分佈式架構系統中,確保跨服務應用的數據一致性始終是系統設計的核心挑戰之一。TDMQ RocketMQ 版作為一款基於 Apache RocketMQ 構建的企業級消息中間件,憑藉其高可用性和高可靠性特點,通過提供完善的事務消息機制,為這一難題提供了專業的解決方案。本文將結合核心源碼,深入解析 RocketMQ 事務消息的實現原理,希望能幫助開發者去構建更健壯的分佈式事務系統。
事務消息的概念與應用場景
事務消息是 RocketMQ 提供的一種高級特性消息,通過將二階段提交的動作和本地事務綁定,來保障分佈式場景下消息生產和本地事務的最終一致性,相比普通消息,主要是擴展了二次確認和本地事務狀態回查補償的機制。
在電商平台中,積分兑換商品這一常見功能就涉及到分佈式事務,用户發起兑換後,可能涉及創建兑換訂單、扣除用户積分、通知發貨服務、扣減庫存等一系列動作,此時就要保證訂單服務和多個下游業務執行結果的最終一致性,如果積分扣除成功但訂單創建失敗,會導致用户積分被扣但未獲得商品,如果訂單創建成功但積分扣除失敗,會導致用户獲得商品但未扣除積分。
我們可以採用 TDMQ RocketMQ 版事務消息來實現這一功能,具體分為以下三個階段:
1、階段一:發送事務消息(準備核銷積分)
用户提交訂單並選擇使用積分兑換後,訂單服務向 RocketMQ 服務端對應的業務 Topic 發送一條事務消息,內容包含 “用户 user-001 發起訂單 Order-001 並使用 1000 積分兑換商品 A”。此時,該消息對下游的積分服務和庫存服務等均不可見,避免在訂單服務事務完成前,積分服務提前扣減積分,確保積分不會被誤扣。
2、階段二:執行本地事務(創建訂單)
事務消息發送成功後,訂單服務繼續執行本地事務,創建訂單並預佔積分,若本地事務成功,則提交二次確認 Commit 到 RocketMQ 服務端,消息被繼續投遞到下游,反之,提交 Rollback,事務結束,積分狀態保持不變。
3、階段三:下游服務消費(扣減積分並更新庫存)
積分服務和庫存服務預先訂閲上面的 Topic,接收到消息後,積分服務扣減積分,庫存服務更新庫存。若消費過程中因網絡異常、服務不可用等問題導致失敗,RocketMQ 將自動觸發重試機制,若多次重試仍未成功,消息將轉入死信隊列,後續由人工介入核對,通過補償流程保障積分和庫存數據的最終一致性。
通過以上三個階段,RocketMQ 事務消息機制在積分核銷場景中,保障了訂單本地事務和消息發送的同時成功/失敗,成功實現了分佈式事務的最終一致性。類似的,在金融交易、企業多系統數據同步等場景中,RocketMQ 事務消息都能憑藉其可靠的機制,保障跨服務操作的最終一致性。那麼,RocketMQ 事務消息究竟是如何在底層實現這些複雜操作,確保最終一致性的呢?接下來,我們深入探究其背後的原理。
事務消息實現原理詳解
術語説明
在分析 RocketMQ 事務消息的實現原理之前,有必要先了解一下這些概念和術語:
1、半消息 half message:
生產者發送事務消息到 RocketMQ 服務端後,消息會被持久化並標記為“暫不可投遞”的狀態,直到本地事務執行完成並確認後,消息才會決定是否對消費者可見,此狀態下的消息,稱為半消息(Half Message)。
2、二階段提交
實現事務最終一致性的關鍵機制,一階段為發送 Half Message,二階段為生產者執行本地事務,並根據執行結果向 RocketMQ 服務端提交 Commit(允許投遞)或 Rollback(丟棄消息)的確認結果,以此來決定 Half Message 的去留。
3、OP 消息:
用於給消息狀態打標記,沒有對應 OP 消息的 Half Message,就説明二階段確認狀態未知,需要 RocketMQ 服務端進行本地事務狀態主動回查,OP 消息的內容為對應的 Half Message 的存儲的 Offset。
4、相關 Topic:
- Real Topic:業務真實 Topic,生產者發消息時指定的 Topic 值。
- Half Topic:系統 Topic,Topic名稱為 RMQ_SYS_TRANS_HALF_TOPIC,用於存儲 Half Message。
- OP Topic:系統 Topic,Topic名稱為 MQ_SYS_TRANS_OP_HALF_TOPIC,用於存儲 OP 消息, Half Message 二次狀態確認後,不管是 Commit 還是 Rollback,都會寫入一條對應的 OP 消息到這個 Topic。
事務消息處理流程
瞭解完基本概念,結合上面的業務場景,我們來看 RocketMQ 事務消息的實現流程:
步驟説明:
- 生產者發送事務消息到 RocketMQ 服務端。
- 服務端存儲這條消息後返回發送成功的響應,此時消息對下游消費者不可見,處於Half Message 狀態。
- 生產者收到半消息成功的響應後,繼續往下執行本地事務(如更新業務數據庫)。
- 根據本地事務的執行結果,生產者會向 RocketMQ 服務端提交最終狀態,也就是二次確認。
- 確認結果為 Commit 時,服務端會將事務消息繼續向下投遞給消費者,確認結果為 Rollback 時,服務端將會丟棄該消息,不再向下投遞。
- 確認結果是 Unknown 或一直沒有收到確認結果時,一定時間後,將會觸發事務狀態主動回查。
- 當生產者未提交最終狀態或者二次確認的結果為 Unknown 時,RocketMQ 服務端將會主動發起事務結果查詢請求到生產者服務。
- 生產者收到請求後提交二次確認結果,邏輯再次回到第5步,此時如果生產者服務暫時不可用,則 RocketMQ 服務端會在指定時間間隔後,繼續主動發起回查請求,直到超過最大回查次數後,回滾消息。
如此,不管本地事務是否執行成功,都能實現事務狀態的最終一致性。以上步驟,可用時序圖直觀體現為:
半消息的具體實現
瞭解了事務消息基本的實現流程後,你可能會有疑問,半消息為什麼對消費者不可見?二次確認 Commit 或者 Rollback 後,服務端如何投遞或者刪除半消息?前面提到,Half Message 在服務端做了持久化,但在消費端卻不可見,實現這一效果的方式,就是 Topic 替換:首先將事務消息的 Real Topic 和隊列信息作為屬性暫存起來,以便後續二階段提交結果為 Commit 時,能正確地投遞到下游消費者,然後將消息的 Topic 改為系統 Topic RMQ_SYS_TRANS_HALF_TOPIC,隊列 ID 改為0,用户的消費者正常不會訂閲這個系統 Topic,自然也就不能看到 Half Message。
Half Message 被成功投遞到上面的系統 Topic 後,開始執行本地事務,如果生產者提交的本地事務二次確認結果為 Commit,則在消息屬性中獲取消息的 Real Topic、隊列等信息,設置 Topic = Real Topic後,再投遞下游,最後刪除 Half Message(邏輯刪除),如果二次確認結果為 Rollback,則只需要邏輯刪除對應的 Half Message 即可。這裏邏輯刪除的實現,就是前面提到的 OP Topic,OP 隊列中的消息,記錄了 Half Message 對應的二次確認狀態,根據這個狀態,RocketMQ 服務端會進行第二個核心機制:事務狀態主動回查。
事務回查的具體實現
Half Message 寫入後,可能會因為種種原因,導致 RocketMQ 服務端一直收不到二次確認結果,比如網絡異常、生產者服務暫時不可用、本地事務死鎖導致執行時間超長等,此時,就需要 RocketMQ 服務端主動去詢問生產者服務本地事務是否執行成功,以決定 Half Message 的最終去留。
RocketMQ 服務端會啓動事務檢查定時任務,默認每60秒執行一次,最大回查15次,可通過 TransactionCheckInterval 和 TransactionCheckMax 這兩項配置按業務實際情況進行定製化調整。回查時,會對比 Half 隊列和 OP 隊列的偏移量,若發現 Half 消息未在 OP 隊列中有對應的記錄且 Half Message 的留存時間超過了事務超時時間(前面分析過,Half Message 是否被二次確認過,是根據 OP 隊列來判斷的),則觸發主動回查動作,向生產者服務發起事務狀態檢查請求,如此,就解決了部分事務消息狀態懸而未決的問題,實現了本地事務和消息發送之間的最終一致性。
事務消息核心源碼解析
分析完具體的實現原理,接下來我們對照 Half Message 發送、二次確認提交、事務主動回查這三個關鍵部分的源碼實現,來具體看看以上理論在代碼中的體現:
發送事務消息
首先看看事務消息的發送的具體實現,核心代碼為org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl 下的sendMessageInTransaction 方法:
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter localTransactionExecuter, Object arg) throws MQClientException {
// 檢查事務監聽器對象是否為空,後續本地事務的執行和回查都依靠它
TransactionListener transactionListener = this.getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", (Throwable)null);
} else {
if (msg.getDelayTimeLevel() != 0) {
// 延遲消息屬性在這裏不生效
MessageAccessor.clearProperty(msg, "DELAY");
}
// 消息內容必要性檢查
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 添加事務消息相關屬性
MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
try {
// 開始發送消息
sendResult = this.send(msg);
} catch (Exception var11) {
Exception e = var11;
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 如果消息發送成功,則繼續處理本地事務
switch (sendResult.getSendStatus()) {
case SEND_OK:
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty("UNIQ_KEY");
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 開始執行本地事務,並得到一個本地事務狀態的結果
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
this.log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
this.log.info(msg.toString());
}
} catch (Throwable var10) {
Throwable e = var10;
this.log.info("executeLocalTransactionBranch exception", e);
this.log.info(msg.toString());
localException = e;
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
// 消息發送都未成功,則本地事務狀態直接為rollback
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
}
try {
// 根據本地事務狀態結果,進行下一步,決定half message是繼續投遞還是刪除
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception var9) {
Exception e = var9;
this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
// 返回消息發送的結果
return transactionSendResult;
}
}
對照以上源碼,可以看到,生產者方法發送消息後,首先會對事務監聽器做非空校驗,因為後面本地事務的執行以及事務狀態的主動回查,都需要依賴它來完成,接下來的主要邏輯有四點:
- 給原始消息加一個 TRAN_MSG=true 的屬性,這是後面判定一條消息為事務消息的條件。
- 同步發送 Half Message,若發送失敗,則不再執行本地事務,保證了“同失敗”的事務一致性。
- 若發送成功,則開始執行我們設置的本地事務,並根據執行結果修改本地事務狀態值。
- 根據事務狀態值,來 endTransaction 做收尾工作,這裏包含了下面我們要説的事務回查和 Half Message 的刪除。
二次確認提交
Half Message 發送成功後,開始執行本地事務,並根據執行結果,提交二次確認結果Commit/Rollback 到 RocketMQ 服務端。
/**
* @param msg: 原始消息對象
* @param sendResult: 消息發送結果(包含事務ID、消息ID等)
* @param localTransactionState: 本地事務執行狀態(COMMIT/ROLLBACK/UNKNOWN)
* @param localException: 本地事務執行異常
*/
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// 優先使用offsetMsgId解析消息ID,獲取偏移量
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
// 事務ID
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 構建二次確認的請求頭
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
// 不同的本地事務執行結果,設置不同的請求頭
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
// 執行事務結束鈎子,設置一些必要的上下文信息
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 發送請求,請求Command為END_TRANSACTION
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
從上面源碼可以看出:本地事務執行結束後,事務收尾的工作內容主要包括:
- 解析消息 ID,獲取 Offset。
- 根據本地事務執行結果,構建不同的請求頭。
- 發送請求到 RocketMQ 服務端。
二次確認消息發送到 RocketMQ 服務端後,由核心類 EndTransactionProcessor 的 processRequest 方法來處理 Commit 或者 Rollback 消息,考慮到篇幅問題,這裏只分析邏輯刪除 Half Message 的部分:
// 二次確認結果是commit
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 從RMQ_SYS_TRANS_HALF_TOPIC(Half消息隊列)查詢待提交的消息,驗證消息的有效性
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 二次確認,判讀事務ID一直想、消息存儲時間有效性等
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 核心邏輯msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 即從屬性中恢復消息一開始的真實topic
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 清理掉事務消息的標誌性屬性TRAN_MSG == true,下面要將消息做為普通消息向下投遞
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 投遞消息到real topic
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 刪除half message,本質是向op topic寫入一條消息,標識這個half message已經確認過了
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
// 二次確認結果為rollback
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 同上,必要性檢查
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 既然是需要回滾,那就只需要刪除對應的half message,同樣是向op topic 寫入一條消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
// Delete prepare message when this message has been committed or rolled back.
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
// 向op topic 寫入一條消息,消息內容是給對應的half message打了一個remove標記
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
return false;
}
}
可以看到,不管生產者提交的二次確認結果是 Commit 還是 Rollback,都會執行 deletePrepareMessage 方法,向 OP 隊列寫入消息,標識這條 Half Message 已經被處理過了,而並不是把這條消息物理刪除掉。和 Rollback 不同的是,Commit 時,需要先獲取並設置消息的 Real Topic 和 Real QueueId(這個在第一步發送 Half Message 時已經記錄在了消息屬性中),然後向下投遞,此時,消息對下游消費者可見。
事務狀態回查
對於始終沒有收到二次確認的消息,RocketMQ 服務端會主動發起事務回查,對於事務超時未確認的核心邏輯在org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl 的Check 方法,Check 方法被前面提到的定時任務每隔60秒調用一次,回查時,首先從 Half Topic 下的所有隊列開始:
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
// 獲取存儲half message的topic下的所有隊列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
// 沒有half message則直接退出
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
然後遍歷每個隊列,和 op topic 下的數據做對比:
// 獲取對應的Op隊列(RMQ_SYS_TRANS_OP_HALF_TOPIC)
MessageQueue opQueue = getOpQueue(messageQueue);
// Half隊列消費位點
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// Op隊列消費位點
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
如果 OP Topic 中有 Half Message 的相關記錄,就不再回查,否則,判讀消息是否需要被跳過回查(消息超過最大檢查次數、超過了消息最大保留時間、剛寫入的消息),並且對於超出最大檢查次數的消息,丟棄操作其實是將消息轉移到 TRANS_CHECK_MAX_TIME_TOPIC 這個系統 Topic。
// Half消息已處理過了(存在對應的Op記錄)
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// Half消息未被二次確認,根據偏移量查詢對應的消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
// 消息還存在,判斷是否需要丟棄或者跳過,丟棄是滿足檢查次數超出了最大檢查次數,跳過則是滿足消息留存時長超過了最大保留時間
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
// 執行丟棄邏輯,發往TRANS_CHECK_MAXTIME_TOPIC這個系統Topic
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
// 消息是剛寫入不久的,先跳過,一會兒再檢查
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
直到判斷出 Half Message 沒有對應的 OP 記錄,並且消息留存時長超過了事務超時時間,開始組裝發送回查請求到生產者端。
// 獲取Op消息,查看消息二次確認的標識
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 沒有對應的Op消息,且留存時常超過了事務超時時間,觸發回查邏輯
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 重新保存half message,避免寫放大
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 調用sendCheckMessage方法,裏面在組裝和發送回查請求
listener.resolveHalfMsg(msgExt);
}
事務消息實踐指南
使用示例
這裏以 TDMQ 版 RocketMQ 5.x 版本集羣為例,演示事務消息的使用方式和效果。
1、首先登錄騰訊雲控制枱,新建一個消息類型為事務消息的 Topic。
2、以 Java 語言為例,引入 5.x 對應版本的依賴。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
3、啓動生產者。
public class ProducerTransactionMessageDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageDemo.class);
private static boolean executeLocalTransaction() {
// 模擬本地事務(如數據庫插入操作),這裏假設執行成功
return true;
}
private static boolean checkTransactionStatus(String orderId) {
// 模擬查詢本地事務執行結果,如查詢訂單ID是否已入庫,查到則return true
return true;
}
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 在控制枱權限管理頁面獲取ak和sk
String accessKey = "your-ak";
String secretKey = "your-sk";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
// 在控制枱獲取並填寫騰訊雲提供的接入地址
String endpoints = "https://your-endpoints";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "tran_topic";
TransactionChecker checker = messageView -> {
log.info("Receive transactional result check request, message={}", messageView);
// 服務端主動回查本地事務狀態
String orderId = messageView.getProperties().get("orderId");
boolean isSuccess = checkTransactionStatus(orderId);
return isSuccess ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
};
// 創建生產着並設置回查的checker對象
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.setTransactionChecker(checker)
.build();
// 開啓事務
final Transaction transaction = producer.beginTransaction();
byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "tagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("your-key-565ef26f5727")
//一般事務消息都會設置一個本地事務關聯的唯一ID,用來做本地事務回查的校驗
.addProperty("orderId", "0001")
.setBody(body)
.build();
// 發送半消息
try {
final SendReceipt sendReceipt = producer.send(message, transaction);
log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
return;
}
// 執行本地事務
boolean localTxSuccess = executeLocalTransaction();
if (localTxSuccess) {
// 本地事務執行成功,二次確認為Commit
transaction.commit();
} else {
// 本地事務執行失敗,二次確認為Rollback
transaction.rollback();
}
// producer.close();
}
}
4、運行代碼後,在控制枱的消息查詢頁面,可以看到已經有一條投遞完成等待消費的消息。
5、啓動消費者,訂閲這個 Topic,成功消費消息後,在騰訊雲控制枱查看消息軌跡:
6、修改代碼,假設本地事務執行失敗,使處於 Half Message 狀態的事務消息回滾。
private static boolean executeLocalTransaction() {
// 本地事務執行失敗
return false;
}
private static boolean checkTransactionStatus(String orderId) {
// 回查結果自然也是rollback,返回false
return false;
}
7、此時,可以發現消息發送成功了,但在控制枱的消息消息查詢頁面是不可見的,啓動消費者也不能消費到這條消息。
注意事項
使用事務消息過程中,需注意以下幾點:
- Topic 類型必須為事務 TRANSACTION,否則生產消息會報錯,關鍵錯誤信息:current message type not match with topic accept message types。
- 事務消息不支持延遲,若設置了延遲屬性,在發送消息前會被清除延遲屬性。
- 如果本地事務執行較慢,此時服務端進行事務回查時,應返回 Unknown,且如果確認本地事務執行耗時會很長,應修改第一次事務回查的時間,以避免產生大量結果未知的事務。
總結
本文從理論與源碼雙視角剖析了 TDMQ RocketMQ 版事務消息的三大核心流程——半消息的發送存儲、二階段提交及事務狀態回查的實現機制。在實際生產中,建議開發者通過冪等設計規避重複消費,合理設置事務超時時間,並關注 Topic 類型限制等約束條件,以充分發揮事務消息在分佈式場景中的價值。