知識庫 / Spring RSS 訂閱

Kafka消費者中實現重試機制

Spring
HongKong
2
11:43 AM · Dec 06 ,2025

1. 概述

本教程將討論在 Kafka 中實施重試的重要性。我們將探索在 Spring Boot 上實施各種選項,並學習如何最大化 Kafka 消費者可靠性和彈性。

如果您正在首次配置 Spring 上運行的 Kafka,並且希望瞭解更多信息,可以從 Spring 和 Kafka 的入門文章開始。

2. 項目設置

讓我們創建一個新的 Spring Boot 項目,並添加 《spring-kafka》 依賴項:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

現在我們來創建一個對象:

public class Greeting {

    private String msg;
    private String name;

    // standard constructors, getters and setters
}

3. Kafka 消費者

一個 Kafka 消費者 是一個從 Kafka 集羣讀取數據的客户端應用程序。它訂閲一個或多個主題,並消費發佈的消息。 生產者 將消息發送到主題,主題是一個記錄存儲和發佈時的類別名稱。主題被劃分為多個分區,以允許它們水平擴展。每個分區是一個不可變的記錄序列。

消費者 可以通過指定偏移量讀取特定分區的消息,偏移量是記錄在分區內的位置。 確認 (ack) 是消費者發送給 Kafka 代理的消息,用於指示已成功處理記錄。 確認將會在發送確認後更新消費者偏移量。

這確保了消息被消費,並且不會再次被髮送到當前的監聽器。

3.1. 確認模式 (Ack 模式)

確認模式決定了消息代理 (broker) 更新消費者的偏移量 (offset) 的時機。

有三種確認模式:

  1. 自動提交: 消費者在接收到消息後,立即向消息代理髮送確認。
  2. 後處理: 消費者只有在成功處理消息後,才會向消息代理髮送確認。
  3. 手動: 消費者在收到特定指令之前,不會向消息代理髮送確認。

確認模式決定了消費者如何處理從 Kafka 集羣讀取的消息。

讓我們創建一個新的 Bean,使用 ConcurrentKafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // Other configurations
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

可用的 ACK 模式有多種,我們可以配置它們:

  1. AckMode.RECORD: 在此後處理模式中,消費者為它處理的消息發送一個確認。
  2. AckMode.BATCH: 在此手動模式中,消費者為一批消息發送確認,而不是為每個消息發送確認。
  3. AckMode.COUNT: 在此手動模式中,消費者在已處理特定數量的消息後發送確認。
  4. AckMode.MANUAL: 在此手動模式中,消費者不為它處理的消息發送確認。
  5. AckMode.TIME: 在此手動模式中,消費者在經過一段時間後發送確認。

為了在 Kafka 中實現消息處理的重試邏輯,我們需要選擇一個 AckMode

這個 AckMode 應該允許消費者告知 Broker 哪些特定消息已成功處理。 這樣,Broker 就可以將未確認的消息重新發送給另一個消費者。

這可能是 RECORDMANUAL 模式,在阻塞重試的情況下。

4. 阻斷重試

阻斷重試允許消費者在初始嘗試由於臨時錯誤而失敗時,再次嘗試消費消息。 消費者會在等待一定時間,即“重試退避期”後,再次嘗試消費消息。

此外,消費者可以使用固定延遲或指數退避策略自定義重試退避期。 還可以設置最大重試次數,在放棄嘗試並標記消息為失敗之前。

4.1. 錯誤處理程序

讓我們定義兩個屬性添加到 Kafka 配置類中:

@Value(value = "${kafka.backoff.interval}")
private Long interval;

@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;

為了處理消費過程中拋出的所有異常,我們將定義一個新的錯誤處理程序:

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        // logic to execute when all the retry attemps are exhausted
    }, fixedBackOff);
    return errorHandler;
}

FixedBackOff 類接受兩個參數:

  • interval:在毫秒級別等待重試的時間
  • maxAttempts:在所有重試嘗試耗盡之前,操作重試的最大次數

在這種策略中,消費者在重試消息消費之前等待固定的時間。

DefaultErrorHandler 被初始化為一個 lambda 函數,該函數代表在所有重試嘗試耗盡時執行的邏輯。

該 lambda 函數接受兩個參數:

  • consumerRecord:表示導致錯誤的 Kafka 記錄
  • exception:表示拋出的異常

4.2. 容器工廠

讓我們為錯誤處理程序添加一個容器工廠 Bean:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // Other configurations
    factory.setCommonErrorHandler(errorHandler());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

如果存在重試策略,我們將設置 ack 模式為 AckMode.RECORD,以確保消費者在發生錯誤時會重新傳遞消息。

我們不應該將 ack 模式設置為 AckMode.BATCHAckMode.TIME,因為消費者會一次性確認多個消息。這是因為如果消息處理過程中發生錯誤,消費者不會將批次或時間窗口中的所有消息重新傳遞給自己。

因此,重試策略將無法正確處理錯誤。

4.3. 可重試異常和不可重試異常

我們可以指定哪些異常是可重試的,哪些是不可重試的。

讓我們修改 ErrorHandler

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        // logic to execute when all the retry attemps are exhausted
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(SocketTimeoutException.class);
    errorHandler.addNotRetryableExceptions(NullPointerException.class);
    return errorHandler;
}

在這裏,我們指定了應觸發重試策略的異常類型。

當發生 SocketTimeoutException 時,會被認為是可重試的,而當發生 NullPointerException 時,則會被認為是不可重試的。

如果我們未設置任何可重試的異常類型,則將使用默認的可重試異常集:

4.4. 優點與缺點

在基於重試的機制中,當消息處理失敗時,消費者會阻塞,直到重試機制完成所有重試次數,或達到最大重試次數。

使用基於重試的機制存在一些優點和缺點。

基於重試的機制可以提高消息處理管道的可靠性,通過允許消費者在發生錯誤時重試消費消息,從而確保消息即使在發生瞬態錯誤的情況下也能成功處理。

基於重試的機制可以簡化消息處理邏輯的實現,通過抽象掉重試機制,使消費者能夠專注於消息處理,並將重試機制負責處理可能發生的任何錯誤。

最後,如果消費者需要等待重試機制完成所有重試次數,基於重試的機制可能會在消息處理管道中引入延遲,從而影響系統的整體性能。此外,基於重試的機制也可能導致消費者消耗更多的資源,例如 CPU 和內存,因為消費者在等待重試機制完成所有重試次數。這可能會影響系統的整體可擴展性。

5. 非阻塞重試

非阻塞重試允許消費者在不阻塞消息監聽器方法執行的情況下,異步地重試消息的消費。

5.1. @RetryableTopic

讓我們為 <em>KafkaListener</em></p> 添加@RetryableTopic

註解。

Let’s add the @RetryableTopic annotation to the KafkaListener:

@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {

    @KafkaHandler
    @RetryableTopic(
      backoff = @Backoff(value = 3000L), 
      attempts = "5", 
      autoCreateTopics = "false",
      include = SocketTimeoutException.class, exclude = NullPointerException.class)
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }
}

我們通過修改多個屬性來定製重試行為,例如:

  • backoff:此屬性配置了因失敗消息而產生的重試延遲,有助於管理瞬時錯誤。例如,backoff = @Backoff(value = 3000L) 確立了每次重試嘗試之前的固定延遲 3 秒。 此外,我們還可以配置指數退避策略,其中延遲隨着每次重試而增加(例如,value = 1000L, multiplier = 2.0 – 從 1 秒開始,每次都翻倍,直到最大延遲)。
  • attempts:此屬性指定消息應重試的最大次數,在放棄之前。
  • autoCreateTopics:此屬性指定是否應自動創建重試主題和 DLT(死信主題),如果它們尚不存在。
  • include:此屬性指定應觸發重試的異常。
  • exclude:此屬性指定不應觸發重試的異常。

當消息無法成功交付到其預期主題時,它將被自動發送到重試主題進行重試。

如果消息在達到最大嘗試次數後仍無法交付,則它將被髮送到 DLT 進行進一步處理。

5.2. 優點與缺點

實施非阻塞重試具有以下幾個優點:

  • 性能提升:非阻塞重試允許在不阻塞調用線程的情況下重試失敗的消息,從而可以提高應用程序的整體性能。
  • 可靠性增強:非阻塞重試可以幫助應用程序從故障中恢復並繼續處理消息,即使某些消息無法成功交付。

然而,在實施非阻塞重試時,也需要考慮一些潛在的缺點:

  • 增加複雜性:非阻塞重試會增加應用程序的複雜性,因為我們需要處理重試邏輯和DLT。
  • 消息重複交付的風險:如果消息在重試後成功交付,則如果原始交付和重試都成功,該消息可能會被多次交付。我們需要考慮這種風險,並在必要時實施措施以防止消息重複交付。
  • 消息順序:重試消息通過異步方式發送到重試主題,並且可能比未重試的消息晚於原始主題交付。

6. 結論

本文分析瞭如何在 Kafka 主題上實現重試邏輯,包括阻塞式和非阻塞式方法。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.