本文已被收錄至「RocketMQ 中文社區」技術專欄,該平台提供更多系統性學習資料和答疑💪
引言
本文主要介紹在使用 RocketMQ 時為什麼需要重試與兜底機制,生產者與消費者觸發重試的條件和具體行為,如何在 RocketMQ 中合理使用重試機制,幫助構建彈性,高可用系統的最佳實踐。
RocketMQ 的重試機制包括三部分,分別是生產者重試,服務端內部數據複製遇到非預期問題時重試,消費者消費重試。本文中僅討論生產者重試和消費者消費重試兩種面向用户側的實現。
生產者發送重試
RocketMQ 的生產者在發送消息到服務端時,可能會因為網絡問題,服務異常等原因導致調用失敗,這時候應該怎麼辦?如何儘可能的保證消息不丟失呢?
1. 生產者重試次數
RocketMQ 在客户端中內置了請求重試邏輯,支持在初始化時配置消息發送最大重試次數(默認為 2 次),失敗時會按照設置的重試次數重新發送。直到消息發送成功,或者達到最大重試次數時結束,並在最後一次失敗後返回調用錯誤的響應。對於同步發送和異步發送,均支持消息發送重試。
- 同步發送:調用線程會一直阻塞,直到某次重試成功或最終重試失敗(返回錯誤碼或拋出異常)。
- 異步發送:調用線程不會阻塞,但調用結果會通過回調的形式,以異常事件或者成功事件返回。
2. 生產者重試間隔
在介紹生產者重試前,我們先來了解下流控的概念,流控一般是指服務端壓力過大,容量不足時服務端會限制客户端收發消息的行為,是服務端自我保護的一種設計。RocketMQ 會根據當前是否觸發了流控而採用不同的重試策略:
非流控錯誤場景:其他觸發條件觸發重試後,均會立即進行重試,無等待間隔。
流控錯誤場景:系統會按照預設的指數退避策略進行延遲重試。
- **為什麼要引入退避和隨機抖動? **
如果故障是由過載流控引起的,重試會增加服務端負載,導致情況進一步惡化,因此客户端在遇到流控時會在兩次嘗試之間等待一段時間。每次嘗試後的等待時間都呈指數級延長。指數回退可能導致很長的回退時間,因為指數函數增長很快。指數退避算法通過以下參數控制重試行為,更多信息,請參見 connection-backoff.md。
INITIAL_BACKOFF:第一次失敗重試前後需等待多久,默認值:1 秒; MULTIPLIER :指數退避因子,即退避倍率,默認值:1.6; JITTER :隨機抖動因子,默認值:0.2; MAX_BACKOFF :等待間隔時間上限,默認值:120 秒; MIN_CONNECT_TIMEOUT :最短重試間隔,默認值:20 秒。
ConnectWithBackoff()
current_backoff = INITIAL_BACKOFF
current_deadline = now() + INITIAL_BACKOFF
while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS)
SleepUntil(current_deadline)
current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)
current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)
特別説明:對於事務消息,只會進行透明重試(transparent retries),網絡超時或異常等場景不會進行重試。
3. 重試帶來的副作用
不停的重試看起來很美好,但也是有副作用的,主要包括兩方面:消息重複,服務端壓力增大
- 遠程調用的不確定性,因請求超時觸發消息發送重試流程,此時客户端無法感知服務端的處理結果;客户端進行的消息發送重試可能會導致消費方重複消費,應該按照用户ID、業務主鍵等信息冪等處理消息。
- 較多的重試次數也會增大服務端的處理壓力。
4. 用户的最佳實踐是什麼
1)合理設置發送超時時間,發送的最大次數
發送的最大次數在初始化客户端時配置在 ClientConfiguration;對於某些實時調用類場景,可能會導致消息發送請求鏈路被阻塞導致業務請求整體耗時高或耗時;需要合理評估每次調用請求的超時時間以及最大重試次數,避免影響全鏈路的耗時。
2)如何保證發送消息不丟失
由於分佈式環境的複雜性,例如網絡不可達時 RocketMQ 客户端發送請求重試機制並不能保證消息發送一定成功。業務方需要捕獲異常,並做好冗餘保護處理,常見的解決方案有兩種:
1. 向調用方返回業務處理失敗;
2. 嘗試將失敗的消息存儲到數據庫,然後由後台線程定時重試,保證業務邏輯的最終一致性。
3)關注流控異常導致無法重試
觸發流控的根本原因是系統容量不足,如果因為突發原因觸發消息流控,且客户端內置的重試流程執行失敗,則建議執行服務端擴容,將請求調用臨時替換到其他系統進行應急處理。
4)早期版本客户端如何使用故障延遲機制進行發送重試?
對於 RocketMQ 4.x 和 3.x 以下客户端開啓故障延遲機制可以用:
producer.setSendLatencyFaultEnable(true)
配置重試次數使用:
producer.setRetryTimesWhenSendFailed()
producer.setRetryTimesWhenSendAsyncFailed()
消費者消費重試
消息中間件做異步解耦時的一個典型問題是如果下游服務處理消息事件失敗,那應該怎麼做呢?
RocketMQ 的消息確認機制以及消費重試策略可以幫助分析如下問題:
- 如何保證業務完整處理消息?
消費重試策略可以在設計實現消費者邏輯時保證每條消息處理的完整性,避免部分消息消費異常導致業務狀態不一致。
- 業務應用異常時處理中的消息狀態如何恢復?
當系統出現異常(宕機故障)等場景時,處理中的消息狀態如何恢復,消費重試具體行為是什麼。
1. 什麼是消費重試?
- 什麼時候認為消費失敗? 消費者在接收到消息後將調用用户的消費函數執行業務邏輯。如果客户端返回消費失敗 ReconsumeLater,拋出非預期異常,或消息處理超時(包括在 PushConsumer 中排隊超時),只要服務端服務端一定時間內沒收到響應,將認為消費失敗。
- 消費重試是什麼? 消費者在消費某條消息失敗後,服務端會根據重試策略重新向客户端投遞該消息。超過一次定數後若還未消費成功,則該消息將不再繼續重試,直接被髮送到死信隊列中;
- 重試過程狀態機:消息在重試流程中的狀態和變化邏輯;
- 重試間隔:上一次消費失敗或超時後,下次重新嘗試消費的間隔時間;
- 最大重試次數:消息可被重試消費的最大次數。
2. 消息重試的場景
需要注意重試是應對異常情況,給予程序再次消費失敗消息的機會,不應該被用作常態化的鏈路。
推薦使用場景:
- 業務處理失敗,失敗原因跟當前的消息內容相關,預期一段時間後可執行成功;
- 是一個小概率事件,對於大批的消息只有很少量的失敗,後面的消息大概率會消費成功,是非常態化的。
正例:消費邏輯是扣減庫存,極少量商品因為樂觀鎖版本衝突導致扣減失敗,重試一般立刻成功。
錯誤使用場景:
- 消費處理邏輯中使用消費失敗來做條件判斷的結果分流,是不合理的。
反例:訂單在數據庫中狀態已經是已取消,此時如果收到發貨的消息,處理時不應返回消費失敗,而應該返回成功並標記不用發貨。
- 消費處理中使用消費失敗來做處理速率限流,是不合理的。 限流的目的是將超出流量的消息暫時堆積在隊列中達到削峯的作用,而不是讓消息進入重試鏈路。 這種做法會讓消息反覆在服務端和客户端之間傳遞,增大了系統的開銷,主要包括以下方面:
- RocketMQ 內部重試涉及寫放大,每一次重試將生成新的重試消息,大量重試將帶來嚴重的 IO 壓力;
- 重試有複雜的退避邏輯,內部實現為梯度定時器,該定時器本身不具備高吞吐的特性,大量重試將導致重試消息無法及時出隊。重試的間隔將不穩定,將導致大量重試消息延後消費,即削峯的週期被大幅度延長。
3. 不要以重試替代限流
上述誤用的場景實際上是組合了限流和重試能力來進行削峯,RocketMQ 推薦的削峯最佳手段為組合限流和堆積,業務以保護自身為前提,需要對消費流量進行限流,並利用 RocketMQ 提供的堆積能力將超出業務當前處理的消息滯後消費,以達到削峯的目的。下圖中超過處理能力的消息都應該被堆積在服務端,而不是通過消費失敗進行重試。
如果不想依賴額外的產品/組件來完成該功能,也可以利用一些本地工具類,比如 Guava 的 RateLimiter 來完成單機限流。如下所示,聲明一個 50 QPS 的 RateLimiter,在消費前以阻塞的方式 acquire 一個令牌,獲取到即處理消息,未獲取到阻塞。
RateLimiter rateLimiter = RateLimiter.create(50);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 設置訂閲組名稱
.setConsumerGroup(consumerGroup)
// 設置訂閲的過濾器
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// 阻塞直到獲得一個令牌,也可以配置一個超時時間
rateLimiter.acquire();
LOGGER.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
4. PushConsumer 消費重試策略
PushConsumer 消費消息時,消息的幾個主要狀態如下:
- Ready:已就緒狀態。消息在消息隊列RocketMQ版服務端已就緒,可以被消費者消費;
- Inflight:處理中狀態。消息被消費者客户端獲取,處於消費中還未返回消費結果的狀態;
- Commit:提交狀態。消費成功的狀態,消費者返回成功響應即可結束消息的狀態機;
- DLQ:死信狀態 消費邏輯的最終兜底機制,若消息一直處理失敗並不斷進行重試,直到超過最大重試次數還未成功,此時消息不會再重試。 該消息會被投遞至死信隊列。您可以通過消費死信隊列的消息進行業務恢復。
- 最大重試次數
PushConsumer 的最大重試次數由創建時決定。
例如,最大重試次數為 3 次,則該消息最多可被投遞 4 次,1 次為原始消息,3 次為重試投遞次數。
- 重試間隔時間
- 無序消息(非順序消息):重試間隔為階梯時間,具體時間如下:
説明:若重試次數超過 16 次,後面每次重試間隔都為 2 小時。
- 順序消息:重試間隔為固定時間,默認為 3 秒。
5. SimpleConsumer 消費重試策略
和 PushConsumer 消費重試策略不同,SimpleConsumer 消費者的重試間隔是預分配的,每次獲取消息消費者會在調用 API 時設置一個不可見時間參數 InvisibleDuration,即消息的最大處理時長。若消息消費失敗觸發重試,不需要設置下一次重試的時間間隔,直接複用不可見時間參數的取值。
由於不可見時間為預分配的,可能和實際業務中的消息處理時間差別較大,可以通過 API 接口修改不可見時間。
例如,預設消息處理耗時最多 20 ms,但實際業務中 20 ms內消息處理不完,可以修改消息不可見時間,延長消息處理時間,避免消息觸發重試機制。
修改消息不可見時間需要滿足以下條件:
- 消息處理未超時
- 消息處理未提交消費狀態
如下圖所示,消息不可見時間修改後立即生效,即從調用 API 時刻開始,重新計算消息不可見時間。
- 最大重試次數
與 PushConsumer 相同。
- 消息重試間隔
消息重試間隔 = 不可見時間 - 消息實際處理時長
例如:消息不可見時間為 30 ms,實際消息處理用了 10 ms 就返回失敗響應,則距下次消息重試還需要 20 ms,此時的消息重試間隔即為 20 ms;若直到 30 ms 消息還未處理完成且未返回結果,則消息超時,立即重試,此時重試間隔即為 0 ms。
SimpleConsumer 的消費重試間隔通過消息的不可見時間控制。
//消費示例:使用SimpleConsumer消費普通消息,主動獲取消息處理並提交。
ClientServiceProvider provider1 = ClientServiceProvider.loadService();
String topic1 = "Your Topic";
FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder()
//設置消費者分組。
.setConsumerGroup("Your ConsumerGroup")
//設置接入點。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//設置預綁定的訂閲關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
//SimpleConsumer需要主動獲取消息,並處理。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消費處理完成後,需要主動調用ACK提交消費結果。
//沒有ack會被認為消費失敗
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系統流控等原因造成拉取失敗,需要重新發起獲取消息請求。
e.printStackTrace();
}
- 修改消息的不可見時間
案例:某產品使用消息隊列來發送解耦“視頻渲染”的業務邏輯,發送方發送任務編號,消費方收到編號後處理任務。由於消費方的業務邏輯耗時較長,消費者重新消費到同一個任務時,該任務未完成,只能返回消費失敗。在這種全新的 API 下,用户可以調用可以通過修改不可見時間給消息續期,實現對單條消息狀態的精確控制。
simpleConsumer.changeInvisibleDuration(); simpleConsumer.changeInvisibleDurationAsync();
6. 功能約束與最佳實踐
- 設置消費的最大超時時間和次數
儘快明確的向服務端返回成功或失敗,不要以超時(有時是異常拋出)代替消費失敗。
- 不要用重試機制來進行業務限流
錯誤示例:如果當前消費速度過高觸發限流,則返回消費失敗,等待下次重新消費。
正確示例:如果當前消費速度過高觸發限流,則延遲獲取消息,稍後再消費。
- 發送重試和消費重試會導致相同的消息重複消費,消費方應該有一個良好的冪等設計
正確示例:某系統中消費的邏輯是為某個用户發送短信,該短信已經發送成功了,當消費者應用重複收到該消息,此時應該返回消費成功。
總結
本文主要介紹重試的基本概念,生產者消費者收發消息時觸發重試的條件和具體行為,以及 RocketMQ 收發容錯的最佳實踐。
重試策略幫助我們從隨機的、短暫的瞬態故障中恢復,是在容忍錯誤時,提高可用性的一種強大機制。但請謹記 “重試是對於分佈式系統來説自私的”,因為客户端認為其請求很重要,並要求服務端花費更多資源來處理,盲目的重試設計不可取,合理的使用重試可以幫助我們構建更加彈性且可靠的系統。