1. 簡介
默認情況下,Spring AMQP 會將失敗的消息重新排隊進行再次消費。 這種行為可能導致無限消費循環,從而造成系統不穩定並浪費資源。
雖然使用死信隊列處理失敗消息是一種標準做法, 但我們可能希望嘗試重試消息消費並恢復系統到正常狀態。
在本教程中,我們將介紹兩種實現名為 指數退避 的重試策略的實現方式。
2. 先決條件
在本文教程中,我們將使用 RabbitMQ,一個流行的 AMQP 實現。 因此,我們可能會參考這篇 Spring AMQP 文章,以獲取有關如何配置和使用 RabbitMQ 與 Spring 的進一步説明。
為了簡化操作,我們還將使用 Docker 鏡像來運行我們的 RabbitMQ 實例,儘管 任何監聽 5672 端口的 RabbitMQ 實例都有效。
讓我們啓動 RabbitMQ Docker 容器:
docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management為了實現我們的示例,我們需要添加對spring-boot-starter-amqp的依賴。最新版本可在Maven Central上找到:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
</dependencies>3. 阻塞式方法
我們的第一種方法將使用 Spring Retry fixtures。 我們將創建一個簡單的隊列和一個消費者,配置為在失敗消息的重試之間等待一段時間。
首先,讓我們創建我們的隊列:
@Bean
public Queue blockingQueue() {
return QueueBuilder.nonDurable("blocking-queue").build();
}第二,我們將在 RetryOperationsInterceptor 中配置回退策略,並將其注入到自定義的 RabbitListenerContainerFactory 中。
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.backOffOptions(1000, 3.0, 10000)
.maxAttempts(5)
.recoverer(observableRecoverer())
.build();
}
@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}如上所示,我們配置了一個初始間隔為1000ms,倍數為3.0,最多等待時間為10000ms。此外,在五次嘗試後,消息將被丟棄。
現在,我們添加我們的消費者,並通過拋出異常強制生成失敗消息:
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
logger.info("Processing message from blocking-queue: {}", payload);
throw new Exception("exception occured!");
}最後,讓我們創建一個測試並向我們的隊列發送兩條消息:
@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
observableRecoverer.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
}
latch.await();
}請注意,CountdownLatch僅作為測試用例使用。
讓我們運行測試並檢查我們的日誌輸出:
2020-02-18 21:17:55.638 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657 INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875 ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871 INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!可以觀察到,此日誌正確地顯示了每次重試之間的指數級等待時間。雖然我們的退避策略有效,但我們的消費者在重試耗盡後仍然被阻塞。 簡單的改進是,通過設置 concurrency 屬性,使我們的消費者能夠併發執行:
@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")然而,一條已退出的 消息仍然阻止了消費者實例。因此,應用程序可能會出現延遲問題。
在下一部分,我們將展示一種非阻塞的方式來實現類似的策略。
4. 以非阻塞方式
一種替代方案涉及多個重試隊列與消息過期結合。事實上,當消息過期時,它會進入死信隊列。換句話説,如果死信隊列消費者將消息重新發送回其原始隊列,我們實際上是在執行一個重試循環。
結果是,使用的重試隊列數量等於將發生的重試次數。
首先,讓我們為我們的重試隊列創建死信隊列:
@Bean
public Queue retryWaitEndedQueue() {
return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}讓我們為重試死信隊列添加一個消費者。 這個消費者的唯一職責是將消息重新發送回其原始隊列:
@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
MessageProperties props = message.getMessageProperties();
rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"),
props.getHeader("x-original-routing-key"), message);
}第二,我們創建一個包裝對象來管理我們的重試隊列。這個對象將包含指數退避的配置:
public class RetryQueues {
private Queue[] queues;
private long initialInterval;
private double factor;
private long maxWait;
// constructor, getters and setters第三,我們定義三個重試隊列:
@Bean
public Queue retryQueue1() {
return QueueBuilder.nonDurable("retry-queue-1")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue2() {
return QueueBuilder.nonDurable("retry-queue-2")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public Queue retryQueue3() {
return QueueBuilder.nonDurable("retry-queue-3")
.deadLetterExchange("")
.deadLetterRoutingKey("retry-wait-ended-queue")
.build();
}
@Bean
public RetryQueues retryQueues() {
return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}然後,我們需要一個攔截器來處理消息的消費:
public class RetryQueuesInterceptor implements MethodInterceptor {
// fields and constructor
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
try {
int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
sendToNextRetryQueue(messageAndChannel, retryCount);
} catch (Throwable t) {
// ...
throw new RuntimeException(t);
}
});
}當消費者成功返回時,我們僅會確認消息。
但是,如果消費者拋出異常且仍有重試次數,則將消息發送到下一個重試隊列:
private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
String retryQueueName = retryQueues.getQueueName(retryCount);
rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
MessageProperties props = m.getMessageProperties();
props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
props.setHeader("x-original-exchange", props.getReceivedExchange());
props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());
return m;
});
mac.channel.basicReject(mac.message.getMessageProperties()
.getDeliveryTag(), false);
}再次,讓我們在自定義的 RabbitListenerContainerFactory 中配置攔截器。
@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
Advice[] adviceChain = { retryInterceptor };
factory.setAdviceChain(adviceChain);
return factory;
}最後,我們定義了主隊列和一個模擬失敗消息的消費者:
@Bean
public Queue nonBlockingQueue() {
return QueueBuilder.nonDurable("non-blocking-queue")
.build();
}
@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory",
ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
logger.info("Processing message from non-blocking-queue: {}", payload);
throw new Exception("Error occured!");
}讓我們創建一個測試併發送兩條消息:
@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
int nb = 2;
CountDownLatch latch = new CountDownLatch(nb);
retryQueues.setObserver(() -> latch.countDown());
for (int i = 1; i <= nb; i++) {
rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
}
latch.await();
}然後,讓我們啓動測試並檢查日誌:
2020-02-19 10:31:40.640 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751 INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829 INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!再次觀察到,每次重試之間都有指數級的等待時間。然而,與其等待所有嘗試都完成,消息的處理是併發進行的。
雖然這種設置相當靈活,並且有助於緩解延遲問題,但存在一個常見陷阱。事實上,RabbitMQ 僅在消息到達隊列頭部時才移除過期的消息。因此,如果消息具有較長的過期時間,它將阻塞隊列中的所有其他消息。因此,應確保回覆隊列僅包含具有相同過期值的消息。
4. 結論
如上所示,基於事件的系統可以通過採用指數退避策略來提高容錯性。雖然實施此類解決方案可能並不複雜,但重要的是要認識到,某些解決方案可能適合小型系統,但在高吞吐量生態系統中卻可能導致延遲問題。