知識庫 / Spring RSS 訂閱

Spring AMQP 錯誤處理

Spring
HongKong
10
01:06 PM · Dec 06 ,2025

1. 引言

異步消息傳遞是一種日益流行的鬆散耦合分佈式通信方式,適用於構建事件驅動架構。幸運的是,Spring Framework 提供 Spring AMQP 項目,使我們能夠構建基於 AMQP 的消息傳遞解決方案。

另一方面,在這樣的環境中處理錯誤可能是一項非易事。因此,在本教程中,我們將涵蓋處理錯誤的各種策略。

2. 環境搭建

為了本教程,我們將使用 RabbitMQ,它實現了 AMQP 標準。 此外,Spring AMQP 提供了 spring-rabbit 模塊,這使得集成變得非常簡單。

讓我們以獨立服務器的方式運行 RabbitMQ。 我們將在 Docker 容器中運行它,通過執行以下命令:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

有關詳細的配置和項目依賴關係設置,請參閲我們的 Spring AMQP 文章。

3. 故障場景

通常,基於消息的系統由於其分佈式特性,與其他單體或打包式應用程序相比,可能出現的錯誤類型更多。

我們可以指出一些異常類型:

  • 網絡-I/O相關 – 網絡連接和I/O操作的常規故障
  • 協議-基礎設施相關 – 通常代表消息基礎設施配置錯誤
  • 消息代理相關 – 警告客户端與AMQP消息代理之間配置不當的情況。例如,達到定義的限制或閾值、身份驗證或無效策略配置
  • 應用程序-消息相關 – 通常指示違反某些業務或應用程序規則的異常

當然,此故障列表並非詳盡無遺,但包含最常見類型的錯誤。

我們應該注意的是,Spring AMQP 會自動處理連接相關和低級問題,例如通過應用重試或重排隊列策略。 此外,大多數故障和故障都轉換為 AmqpException 或其子類。

在下一部分,我們將主要關注應用程序特定和高級錯誤,然後涵蓋全局錯誤處理策略。

4. 項目設置

現在,讓我們定義一個簡單的隊列和交換配置,以便開始:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

接下來,我們創建一個簡單的生產者:

public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}
<p>最後,一個拋出異常的消費者:</p>
@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}

默認情況下,所有失敗的消息都會立即重新排隊到目標隊列的頭部,循環進行。

讓我們通過執行以下 Maven 命令來運行我們的示例應用程序:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp

現在我們應該看到類似的結果輸出:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

因此,默認情況下,我們將會看到大量的此類消息出現在輸出中。

要更改此行為,我們有以下兩個選項:

  • default-requeue-rejected 選項設置為 false,在監聽器端配置為 spring.rabbitmq.listener.simple.default-requeue-rejected=false
  • 拋出 AmqpRejectAndDontRequeueException – 這對於將來可能不再有意義的消息非常有用,可以將其丟棄。

現在,讓我們探索如何以更智能的方式處理失敗的消息。

5. 死信隊列

死信隊列 (DLQ) 是一種用於存儲未送達或失敗消息的隊列。 DLQ 允許我們處理無效或錯誤消息,監控故障模式,並在系統發生異常時進行恢復。

更重要的是,這有助於防止隊列在不斷處理錯誤消息的情況下導致無限循環,從而降低系統性能。

總而言之,主要有兩項概念:死信交換 (DLX) 和死信隊列本身。事實上,direct、topicfanout。">DLX 是一種常見的交換類型,我們可以將其定義為:directtopicfanout

務必理解的是,生產者不瞭解任何隊列。它只瞭解交換,所有生產的消息都根據交換配置和路由鍵進行路由。

現在讓我們看看如何通過使用死信隊列來處理異常情況。

5.1. 基本配置

為了配置DLQ,我們需要在定義隊列時指定額外的參數:

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "")
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

在上述示例中,我們使用了兩個額外的參數:x-dead-letter-exchangex-dead-letter-routing-key對於 x-dead-letter-exchange 選項,將其為空字符串的值指示消息代理使用默認交換機

第二個參數的重要性與為簡單消息設置路由鍵相差無幾。此選項會更改消息的初始路由鍵,以便 DLX 進一步路由該消息。

5.2. 失敗消息路由

當消息無法成功交付時,它會被路由到死信交換(Dead Letter Exchange)。但正如我們之前所提到的,DLX 只是一個普通的交換機。因此,如果失敗消息路由鍵與交換機不匹配,則不會被髮送到 DLQ。

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

因此,如果我們在示例中省略了 x-dead-letter-routing-key 參數,則失敗的消息將被無限重試循環阻塞。

此外,消息的原始元數據可在 x-death 標頭中找到:

x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954

上述信息可在 RabbitMQ 管理控制枱中查看,通常在本地運行在 15672 端口。

除了此配置之外,如果使用 Spring Cloud Stream,我們還可以通過利用配置屬性 republishToDlqautoBindDlq 來簡化配置過程。

5.3. 死信交換 (Dead Letter Exchange)

在上一節中,我們瞭解到當消息路由到死信交換時,路由鍵會被修改。但是這種行為並不總是理想的。我們可以通過自行配置 DLX 並使用 fanout 類型來定義它來改變這種行為:

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build();
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

我們現在定義了一個自定義的 fanout 類型的交換,因此消息將被髮送到所有定義的限流隊列。 此外,我們設置了 x-dead-letter-exchange 參數的值為我們的DLX的名稱。 同時,我們移除了 x-dead-letter-routing-key 參數。

現在,如果運行我們的示例,失敗的消息應被髮送到DLQ,但不會改變初始路由鍵:

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue

5.4. 處理死信隊列消息

當然,我們將它們移動到死信隊列的原因是為了在稍後的時間重新處理它們。

讓我們定義一個監聽器來監聽死信隊列:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}

如果現在運行我們的代碼示例,我們應該看到日誌輸出:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:

我們收到一條失敗的消息,接下來應該怎麼做? 這取決於具體的系統要求、異常的類型或消息的類型。

例如,我們可以簡單地將消息重新排隊到原始目的地:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

但是這種異常處理邏輯與默認重試策略並不大不相同:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
  Received message: 
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer        : 
  Received failed message, requeueing:

一個常見的策略可能需要對消息進行 n 次重試處理,然後拒絕該消息。我們通過利用消息頭來實現該策略。

public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message");
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

首先,我們獲取 x-retries-count 請求頭的值,然後將該值與最大允許值進行比較。隨後,當計數器達到嘗試次數限制時,消息將被丟棄:

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Discarding message

我們應該補充説明,我們還可以利用 x-message-ttl 頭部來設置一個時間,在此之後消息將被丟棄。這可能會對防止隊列無限增長有所幫助。

5.5. 停車區隊列

另一方面,如果無法直接丟棄消息,例如在銀行領域的交易,則情況不同。有時,消息可能需要人工處理,或者我們僅僅需要記錄超過 n 次失敗的消息。

對於此類情況,存在 停車區隊列的概念。我們可以將所有從DLQ(超過允許次數失敗的消息)轉發到停車區隊列進行進一步處理。

現在讓我們來實現這個想法:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

第二,我們來重構監聽器邏輯,使其向停車區隊列發送消息:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, 
          failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

最終,我們還需要處理到達停車區隊列的消息:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue");
    // Save to DB or send a notification.
}

現在我們可以將失敗的消息保存到數據庫,或者發送電子郵件通知。

為了測試這個邏輯,我們運行應用程序。

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Received message in parking lot queue

正如我們從輸出結果中看到,經過多次失敗嘗試後,消息被髮送到停車場隊列。

6. 自定義錯誤處理

在上一節中,我們已經瞭解瞭如何使用專用隊列和交換機來處理失敗。 然而,在某些情況下,我們可能需要捕獲所有錯誤,例如用於記錄或將其持久化到數據庫。

6.1. 全局 ErrorHandler

此前,我們使用了默認的 SimpleRabbitListenerContainerFactory,該工廠默認使用 ConditionalRejectingErrorHandler。該處理程序捕獲不同的異常並將其轉換為 AmqpException 層次結構中的一個異常。

需要注意的是,如果需要處理連接錯誤,則需要實現 ApplicationListener 接口。

簡單來説,ConditionalRejectingErrorHandler 決定是否拒絕特定消息。當導致異常的消息被拒絕時,它不會被重新排隊。

讓我們定義一個自定義 ErrorHandler,它將僅重新排隊 BusinessException 異常:

public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}

此外,由於我們是在監聽器方法中拋出異常,因此該異常會被封裝在一個 ListenerExecutionFailedException 中。因此,我們需要調用 getCause 方法來獲取原始異常。

6.2. <em >FatalExceptionStrategy</em >

在底層,此處理程序使用 `FatalExceptionStrategy> 來檢查是否應該將異常視為致命異常。如果是,則失敗的消息將被拒絕。

以下異常被視為致命異常:

  • `MessageConversionException`
  • `MessageConversionException`
  • `MethodArgumentNotValidException`
  • `MethodArgumentTypeMismatchException`
  • `NoSuchMethodException`
  • `ClassCastException`

與其實現 <em >ErrorHandler</em >> 接口,我們只需提供我們的FatalExceptionStrategy> 即可。

public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    @Override
    public boolean isFatal(Throwable t) {
        return !(t.getCause() instanceof BusinessException);
    }
}

最後,我們需要將自定義策略傳遞到 ConditionalRejectingErrorHandler 構造函數中:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
      SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
      configurer.configure(factory, connectionFactory);
      factory.setErrorHandler(errorHandler());
      return factory;
}
 
@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
 
@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}

7. 結論

在本教程中,我們討論了在使用 Spring AMQP 以及 RabbitMQ 時的不同錯誤處理方法。

每個系統都需要一套特定的錯誤處理策略。我們涵蓋了事件驅動架構中最常見的錯誤處理方法。此外,我們還了解到,我們可以將多種策略結合起來,構建一個更全面、更健壯的解決方案。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.