1. 概述
本教程將討論使用 Spring Kafka 庫的 <em @KafkaListener</em>> 註解,以批處理方式處理 Kafka 消息。<strong>Kafka 經紀人 (broker) 是一箇中間件,它幫助從源系統持久化消息。目標系統配置為定期輪詢 Kafka 主題/隊列,然後從中讀取消息。</strong>
此方法可以防止目標系統或服務出現故障時導致消息丟失。當目標服務恢復時,它們將繼續接受未處理的消息。因此,這種架構有助於提高消息的持久性,從而提高系統的容錯性。
2. 為什麼批量處理消息?
通常情況下,多個源頭或事件生產者會同時將消息發送到同一個 Kafka 隊列或主題。 這樣會導致大量消息在這些隊列或主題中積壓。 如果目標服務或消費者接收到這些大量消息,在單個會話中處理,可能會導致效率低下。
這可能會產生連鎖反應,導致瓶頸效應。 最終,這會影響所有依賴於這些消息的下游流程。 因此,消費者或消息監聽器應該限制它們在任何給定時間可以處理的消息數量。
為了以批處理模式運行,我們必須根據主題上發佈的數據量和應用程序的容量,配置合適的批處理大小。 此外,消費者應用程序應設計為批量處理消息,以滿足 SLA。
此外,如果沒有進行批處理,消費者必須定期輪詢 Kafka 主題以獲取單個消息。 這種方法會給計算資源帶來壓力。 因此,批量處理比每次輪詢單個消息更有效。
然而,
- 消息量較小
- 在對時間敏感的應用程序中,需要立即處理
- 計算和內存資源有限
- 嚴格的消息順序至關重要
3. 使用 @KafkaListener 註解進行批量處理
為了理解批量處理,我們首先將定義一個用例。然後我們首先使用基本消息處理方法來實現它,然後再使用批量處理方法。 這樣我們可以更好地理解處理消息的批量處理的重要性。
3.1. 用例描述
假設許多關鍵的IT基礎設施設備,如服務器和網絡設備,運行在公司的機房中。 多個監控工具跟蹤這些設備的 KPI(關鍵績效指標)。 由於運維團隊希望進行主動監控,他們期望獲得實時可操作的分析。 因此,存在嚴格的 SLA(服務級別協議)來將 KPI 傳輸到目標分析應用程序。
運維團隊配置監控工具將 KPI 以定期間隔發送到 Kafka 主題。 一個消費者應用程序讀取主題中的消息,然後將其推送到數據湖。 一個應用程序從數據湖讀取數據並生成實時分析。
讓我們實現一個配置了和沒有配置批量處理的消費者。 我們將分析這兩種實現的差異和結果。
3.2. 前提條件
在開始實施批量處理之前,務必理解 Spring Kafka 庫至關重要。 幸運的是,我們已經在文章《Spring 中 Apache Kafka 簡介》中討論過這個主題,這為我們提供了必要的動力。
為了學習目的,我們需要一個 Kafka 實例。 因此,為了快速上手,我們將使用嵌入式 Kafka。
最後,我們需要一個程序,用於在 Kafka 代理上創建一個事件隊列並定期向其中發佈示例消息。 基本上,我們將使用 Junit5 來理解這個概念。
3.3. 基本監聽器
讓我們從一個基本監聽器開始,該監聽器會逐條從 Kafka 代理讀取消息。我們將會在 KafkaKpiConsumerWithNoBatchConfig 配置類中定義 ConcurrentKafkaListenerContainerFactory bean。
public class KafkaKpiConsumerWithNoBatchConfig {
@Bean(name = "kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBasicListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}kafkaKpiBasicListenerContainerFactory() 方法返回 kafkaKpiListenerContainerFactory Bean。該 Bean 用於配置一個基本監聽器,它可以一次處理一條消息。
@Component
public class KpiConsumer {
private CountDownLatch latch = new CountDownLatch(1);
private ConsumerRecord<String, String> message;
@Autowired
private DataLakeService dataLakeService;
@KafkaListener(
id = "kpi-listener",
topics = "kpi_topic",
containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) throws InterruptedException {
this.message = record;
latch.await();
List<String> messages = new ArrayList<>();
messages.add(record.value());
dataLakeService.save(messages);
//reset the latch
latch = new CountDownLatch(1);
}
//General getter methods
}我們已將 @KafkaListener 註解應用於 listen() 方法。該註解有助於設置監聽器主題和監聽器容器工廠 Bean。在 KpiConsumer 類中的 CountDownLatch 對象用於控制 Junit5 測試中的消息處理。我們將使用它來理解整個概念。
CountDownLatch 的 await() 方法會暫停監聽器線程,當測試方法調用 countDown() 方法時,線程會恢復。如果沒有這個機制,理解和跟蹤消息將變得困難。最後,下游的 DataLakeService#save() 方法接收一條消息進行處理。
現在,讓我們來查看用於跟蹤 KpiListener 類處理的消息的方法:
@RepeatedTest(10)
void givenKafka_whenMessage1OnTopic_thenListenerConsumesMessages(RepetitionInfo repetitionInfo) {
String testNo = String.valueOf(repetitionInfo.getCurrentRepetition());
assertThat(kpiConsumer.getMessage().value()).isEqualTo("Test KPI Message-".concat(testNo));
kpiConsumer.getLatch().countDown();
}當監控工具將 KPI 消息發佈到 kpi_topic Kafka 主題時,監聽器接收到的消息會按照其到達順序排列。
每次方法執行時,它會跟蹤在 KpiListener#listen() 方法中接收到的消息。確認消息順序後,它會釋放鎖,監聽器完成處理。
3.4. 能夠進行批量處理的監聽器
現在,讓我們探討 Kafka 中的批量處理支持。我們首先將在 Spring 配置類中定義 ConcurrentKafkaListenerContainerFactory 豆:
@Bean(name="kafkaKpiListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaKpiBatchListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory();
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
consumerFactory.updateConfigs(configProps);
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(3000);
factory.setBatchListener(true);
return factory;
}該方法與上一節中定義的 kafkaKpiBasicListenerContainerFactory() 方法類似。
我們通過調用 ConsumerFactory#setBatchListener() 方法啓用了批量處理。此外,我們使用 ConsumerConfig.MAX_POLL_RECORDS_CONFIG 屬性設置了每次輪詢的最大消息數量, ConsumerFactory#setConcurrency() 幫助設置併發消費者線程數,從而同時處理消息。 您可以在官方 Spring Kafka 站點上參考其他 配置。
此外,還有諸如 ConsumerConfig.DEFAULT_FETCH_MAX_BYTES 和 ConsumerConfig.DEFAULT_FETCH_MIN_BYTES 等配置屬性,可以幫助限制消息大小。
現在,讓我們來看看消費者:
@Component
public class KpiBatchConsumer {
private CountDownLatch latch = new CountDownLatch(1);
@Autowired
private DataLakeService dataLakeService;
private List<String> receivedMessages = new ArrayList<>();
@KafkaListener(
id = "kpi-batch-listener",
topics = "kpi_batch_topic",
batch = "true",
containerFactory = "kafkaKpiListenerContainerFactory")
public void listen(ConsumerRecords<String, String> records) throws InterruptedException {
records.forEach(record -> receivedMessages.add(record.value()));
latch.await();
dataLakeService.save(receivedMessages);
latch = new CountDownLatch(1);
}
// Standard getter methods
}KpiBatchConsumer 與先前定義的KpiConsumer 類類似,區別在於@KafkaListener 註解包含額外的batch 屬性。listen() 方法的參數類型為 ConsumerRecords,而不是 ConsumerRecord。我們可以遍歷 ConsumerRecords 對象以獲取批處理中的所有 ConsumerRecord 元素。
監聽器還可以處理同一順序接收到的批次消息。但是,跨 Kafka 主題分區中的批次消息保持順序非常複雜。
這裏 ConsumerRecord 代表發佈到 Kafka 主題的消息。最終,我們調用 DataLakeService#save() 方法處理更多消息。最後,CountDownLatch 類在前面所見的作用相同。
假設 100 條 KPI 消息被推送到 kpi_batch_topic Kafka 主題中。現在我們可以檢查監聽器在行動的情況:
@RepeatedTest(5)
void givenKafka_whenMessagesOnTopic_thenListenerConsumesMessages() {
int messageSize = kpiBatchConsumer.getReceivedMessages().size();
assertThat(messageSize % 20).isEqualTo(0);
kpiBatchConsumer.getLatch().countDown();
}與基本的監聽器不同,該監聽器在消息逐個接收的情況下,現在通過 <em KpiBatchConsumer#listen() 方法接收一個包含 <em 20 個 KPI 消息的批次。
4. 結論
本文討論了基本 Kafka 監聽器與啓用了批量處理的監聽器之間的區別。批量處理能夠同時處理多個消息,從而提高應用程序性能。然而,適當的批量處理量和消息大小限制對於控制應用程序的性能至關重要,因此必須經過仔細和嚴格的基準測試流程進行優化。