1. 概述
本教程將討論在 Kafka 中實施重試的重要性。我們將探索在 Spring Boot 上實施各種選項,並學習如何最大化 Kafka 消費者可靠性和彈性。
如果您正在首次配置 Spring 上運行的 Kafka,並且希望瞭解更多信息,可以從 Spring 和 Kafka 的入門文章開始。
2. 項目設置
讓我們創建一個新的 Spring Boot 項目,並添加 《spring-kafka》 依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>現在我們來創建一個對象:
public class Greeting {
private String msg;
private String name;
// standard constructors, getters and setters
}3. Kafka 消費者
一個 Kafka 消費者 是一個從 Kafka 集羣讀取數據的客户端應用程序。它訂閲一個或多個主題,並消費發佈的消息。 生產者 將消息發送到主題,主題是一個記錄存儲和發佈時的類別名稱。主題被劃分為多個分區,以允許它們水平擴展。每個分區是一個不可變的記錄序列。
消費者 可以通過指定偏移量讀取特定分區的消息,偏移量是記錄在分區內的位置。 確認 (ack) 是消費者發送給 Kafka 代理的消息,用於指示已成功處理記錄。 確認將會在發送確認後更新消費者偏移量。
這確保了消息被消費,並且不會再次被髮送到當前的監聽器。
3.1. 確認模式 (Ack 模式)
確認模式決定了消息代理 (broker) 更新消費者的偏移量 (offset) 的時機。
有三種確認模式:
- 自動提交: 消費者在接收到消息後,立即向消息代理髮送確認。
- 後處理: 消費者只有在成功處理消息後,才會向消息代理髮送確認。
- 手動: 消費者在收到特定指令之前,不會向消息代理髮送確認。
確認模式決定了消費者如何處理從 Kafka 集羣讀取的消息。
讓我們創建一個新的 Bean,使用 ConcurrentKafkaListenerContainerFactory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Other configurations
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}可用的 ACK 模式有多種,我們可以配置它們:
- AckMode.RECORD: 在此後處理模式中,消費者為它處理的消息發送一個確認。
- AckMode.BATCH: 在此手動模式中,消費者為一批消息發送確認,而不是為每個消息發送確認。
- AckMode.COUNT: 在此手動模式中,消費者在已處理特定數量的消息後發送確認。
- AckMode.MANUAL: 在此手動模式中,消費者不為它處理的消息發送確認。
- AckMode.TIME: 在此手動模式中,消費者在經過一段時間後發送確認。
為了在 Kafka 中實現消息處理的重試邏輯,我們需要選擇一個 AckMode。
這個 AckMode 應該允許消費者告知 Broker 哪些特定消息已成功處理。 這樣,Broker 就可以將未確認的消息重新發送給另一個消費者。
這可能是 RECORD 或 MANUAL 模式,在阻塞重試的情況下。
4. 阻斷重試
阻斷重試允許消費者在初始嘗試由於臨時錯誤而失敗時,再次嘗試消費消息。 消費者會在等待一定時間,即“重試退避期”後,再次嘗試消費消息。
此外,消費者可以使用固定延遲或指數退避策略自定義重試退避期。 還可以設置最大重試次數,在放棄嘗試並標記消息為失敗之前。
4.1. 錯誤處理程序
讓我們定義兩個屬性添加到 Kafka 配置類中:
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;為了處理消費過程中拋出的所有異常,我們將定義一個新的錯誤處理程序:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
// logic to execute when all the retry attemps are exhausted
}, fixedBackOff);
return errorHandler;
}FixedBackOff 類接受兩個參數:
interval:在毫秒級別等待重試的時間maxAttempts:在所有重試嘗試耗盡之前,操作重試的最大次數
在這種策略中,消費者在重試消息消費之前等待固定的時間。
DefaultErrorHandler 被初始化為一個 lambda 函數,該函數代表在所有重試嘗試耗盡時執行的邏輯。
該 lambda 函數接受兩個參數:
consumerRecord:表示導致錯誤的 Kafka 記錄exception:表示拋出的異常
4.2. 容器工廠
讓我們為錯誤處理程序添加一個容器工廠 Bean:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Other configurations
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}如果存在重試策略,我們將設置 ack 模式為 AckMode.RECORD,以確保消費者在發生錯誤時會重新傳遞消息。
我們不應該將 ack 模式設置為 AckMode.BATCH 或 AckMode.TIME,因為消費者會一次性確認多個消息。這是因為如果消息處理過程中發生錯誤,消費者不會將批次或時間窗口中的所有消息重新傳遞給自己。
因此,重試策略將無法正確處理錯誤。
4.3. 可重試異常和不可重試異常
我們可以指定哪些異常是可重試的,哪些是不可重試的。
讓我們修改 ErrorHandler:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
// logic to execute when all the retry attemps are exhausted
}, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}在這裏,我們指定了應觸發重試策略的異常類型。
當發生 SocketTimeoutException 時,會被認為是可重試的,而當發生 NullPointerException 時,則會被認為是不可重試的。
如果我們未設置任何可重試的異常類型,則將使用默認的可重試異常集:
4.4. 優點與缺點
在基於重試的機制中,當消息處理失敗時,消費者會阻塞,直到重試機制完成所有重試次數,或達到最大重試次數。
使用基於重試的機制存在一些優點和缺點。
基於重試的機制可以提高消息處理管道的可靠性,通過允許消費者在發生錯誤時重試消費消息,從而確保消息即使在發生瞬態錯誤的情況下也能成功處理。
基於重試的機制可以簡化消息處理邏輯的實現,通過抽象掉重試機制,使消費者能夠專注於消息處理,並將重試機制負責處理可能發生的任何錯誤。
最後,如果消費者需要等待重試機制完成所有重試次數,基於重試的機制可能會在消息處理管道中引入延遲,從而影響系統的整體性能。此外,基於重試的機制也可能導致消費者消耗更多的資源,例如 CPU 和內存,因為消費者在等待重試機制完成所有重試次數。這可能會影響系統的整體可擴展性。
5. 非阻塞重試
非阻塞重試允許消費者在不阻塞消息監聽器方法執行的情況下,異步地重試消息的消費。
5.1. @RetryableTopic
讓我們為 <em>KafkaListener</em></p> 添加@RetryableTopic
Let’s add the @RetryableTopic annotation to the KafkaListener:
@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {
@KafkaHandler
@RetryableTopic(
backoff = @Backoff(value = 3000L),
attempts = "5",
autoCreateTopics = "false",
include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
}我們通過修改多個屬性來定製重試行為,例如:
- backoff:此屬性配置了因失敗消息而產生的重試延遲,有助於管理瞬時錯誤。例如,backoff = @Backoff(value = 3000L) 確立了每次重試嘗試之前的固定延遲 3 秒。 此外,我們還可以配置指數退避策略,其中延遲隨着每次重試而增加(例如,value = 1000L, multiplier = 2.0 – 從 1 秒開始,每次都翻倍,直到最大延遲)。
- attempts:此屬性指定消息應重試的最大次數,在放棄之前。
- autoCreateTopics:此屬性指定是否應自動創建重試主題和 DLT(死信主題),如果它們尚不存在。
- include:此屬性指定應觸發重試的異常。
- exclude:此屬性指定不應觸發重試的異常。
當消息無法成功交付到其預期主題時,它將被自動發送到重試主題進行重試。
如果消息在達到最大嘗試次數後仍無法交付,則它將被髮送到 DLT 進行進一步處理。
5.2. 優點與缺點
實施非阻塞重試具有以下幾個優點:
- 性能提升:非阻塞重試允許在不阻塞調用線程的情況下重試失敗的消息,從而可以提高應用程序的整體性能。
- 可靠性增強:非阻塞重試可以幫助應用程序從故障中恢復並繼續處理消息,即使某些消息無法成功交付。
然而,在實施非阻塞重試時,也需要考慮一些潛在的缺點:
- 增加複雜性:非阻塞重試會增加應用程序的複雜性,因為我們需要處理重試邏輯和DLT。
- 消息重複交付的風險:如果消息在重試後成功交付,則如果原始交付和重試都成功,該消息可能會被多次交付。我們需要考慮這種風險,並在必要時實施措施以防止消息重複交付。
- 消息順序:重試消息通過異步方式發送到重試主題,並且可能比未重試的消息晚於原始主題交付。
6. 結論
本文分析瞭如何在 Kafka 主題上實現重試邏輯,包括阻塞式和非阻塞式方法。