知識庫 / Spring RSS 訂閱

指數退避與 Spring AMQP

Spring
HongKong
4
01:02 PM · Dec 06 ,2025

1. 簡介

默認情況下,Spring AMQP 會將失敗的消息重新排隊進行再次消費。 這種行為可能導致無限消費循環,從而造成系統不穩定並浪費資源。

雖然使用死信隊列處理失敗消息是一種標準做法, 但我們可能希望嘗試重試消息消費並恢復系統到正常狀態。

在本教程中,我們將介紹兩種實現名為 指數退避 的重試策略的實現方式。

2. 先決條件

在本文教程中,我們將使用 RabbitMQ,一個流行的 AMQP 實現。 因此,我們可能會參考這篇 Spring AMQP 文章,以獲取有關如何配置和使用 RabbitMQ 與 Spring 的進一步説明。

為了簡化操作,我們還將使用 Docker 鏡像來運行我們的 RabbitMQ 實例,儘管 任何監聽 5672 端口的 RabbitMQ 實例都有效

讓我們啓動 RabbitMQ Docker 容器:

docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

為了實現我們的示例,我們需要添加對spring-boot-starter-amqp的依賴。最新版本可在Maven Central上找到:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.4.RELEASE</version>
    </dependency>
</dependencies>

3. 阻塞式方法

我們的第一種方法將使用 Spring Retry fixtures。 我們將創建一個簡單的隊列和一個消費者,配置為在失敗消息的重試之間等待一段時間。

首先,讓我們創建我們的隊列:

@Bean
public Queue blockingQueue() {
    return QueueBuilder.nonDurable("blocking-queue").build();
}

第二,我們將在 RetryOperationsInterceptor 中配置回退策略,並將其注入到自定義的 RabbitListenerContainerFactory 中。

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
      .backOffOptions(1000, 3.0, 10000)
      .maxAttempts(5)
      .recoverer(observableRecoverer())
      .build();
}

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
  ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

如上所示,我們配置了一個初始間隔為1000ms,倍數為3.0,最多等待時間為10000ms。此外,在五次嘗試後,消息將被丟棄。

現在,我們添加我們的消費者,並通過拋出異常強制生成失敗消息:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
    logger.info("Processing message from blocking-queue: {}", payload);

    throw new Exception("exception occured!");
}

最後,讓我們創建一個測試並向我們的隊列發送兩條消息:

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    observableRecoverer.setObserver(() -&gt; latch.countDown());

    for (int i = 1; i &lt;= nb; i++) {
        rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
    }

    latch.await();
}

請注意,CountdownLatch僅作為測試用例使用。

讓我們運行測試並檢查我們的日誌輸出:

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875  ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!

可以觀察到,此日誌正確地顯示了每次重試之間的指數級等待時間。雖然我們的退避策略有效,但我們的消費者在重試耗盡後仍然被阻塞。 簡單的改進是,通過設置 concurrency 屬性,使我們的消費者能夠併發執行:

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")

然而,一條已退出的 消息仍然阻止了消費者實例。因此,應用程序可能會出現延遲問題。

在下一部分,我們將展示一種非阻塞的方式來實現類似的策略。

4. 以非阻塞方式

一種替代方案涉及多個重試隊列與消息過期結合。事實上,當消息過期時,它會進入死信隊列。換句話説,如果死信隊列消費者將消息重新發送回其原始隊列,我們實際上是在執行一個重試循環

結果是,使用的重試隊列數量等於將發生的重試次數

首先,讓我們為我們的重試隊列創建死信隊列:

@Bean
public Queue retryWaitEndedQueue() {
    return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

讓我們為重試死信隊列添加一個消費者。 這個消費者的唯一職責是將消息重新發送回其原始隊列

@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
    MessageProperties props = message.getMessageProperties();

    rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), 
      props.getHeader("x-original-routing-key"), message);
}

第二,我們創建一個包裝對象來管理我們的重試隊列。這個對象將包含指數退避的配置:

public class RetryQueues {
    private Queue[] queues;
    private long initialInterval;
    private double factor;
    private long maxWait;

    // constructor, getters and setters

第三,我們定義三個重試隊列:

@Bean
public Queue retryQueue1() {
    return QueueBuilder.nonDurable("retry-queue-1")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue2() {
    return QueueBuilder.nonDurable("retry-queue-2")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue3() {
    return QueueBuilder.nonDurable("retry-queue-3")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public RetryQueues retryQueues() {
    return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

然後,我們需要一個攔截器來處理消息的消費:

public class RetryQueuesInterceptor implements MethodInterceptor {

    // fields and constructor

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
            try {
                int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
                sendToNextRetryQueue(messageAndChannel, retryCount);
            } catch (Throwable t) {
                // ...
                throw new RuntimeException(t);
            }
        });
    }

當消費者成功返回時,我們僅會確認消息。

但是,如果消費者拋出異常且仍有重試次數,則將消息發送到下一個重試隊列:

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
    String retryQueueName = retryQueues.getQueueName(retryCount);

    rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
        MessageProperties props = m.getMessageProperties();
        props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
        props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
        props.setHeader("x-original-exchange", props.getReceivedExchange());
        props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());

        return m;
    });

    mac.channel.basicReject(mac.message.getMessageProperties()
      .getDeliveryTag(), false);
}

再次,讓我們在自定義的 RabbitListenerContainerFactory 中配置攔截器。

@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
  ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

最後,我們定義了主隊列和一個模擬失敗消息的消費者:

@Bean
public Queue nonBlockingQueue() {
    return QueueBuilder.nonDurable("non-blocking-queue")
      .build();
}

@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", 
  ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
    logger.info("Processing message from non-blocking-queue: {}", payload);

    throw new Exception("Error occured!");
}

讓我們創建一個測試併發送兩條消息:

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    retryQueues.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
    }

    latch.await();
}

然後,讓我們啓動測試並檢查日誌:

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!

再次觀察到,每次重試之間都有指數級的等待時間。然而,與其等待所有嘗試都完成,消息的處理是併發進行的。

雖然這種設置相當靈活,並且有助於緩解延遲問題,但存在一個常見陷阱。事實上,RabbitMQ 僅在消息到達隊列頭部時才移除過期的消息。因此,如果消息具有較長的過期時間,它將阻塞隊列中的所有其他消息。因此,應確保回覆隊列僅包含具有相同過期值的消息

4. 結論

如上所示,基於事件的系統可以通過採用指數退避策略來提高容錯性。雖然實施此類解決方案可能並不複雜,但重要的是要認識到,某些解決方案可能適合小型系統,但在高吞吐量生態系統中卻可能導致延遲問題。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.