知識庫 / Spring RSS 訂閱

Spring Kafka:在同一主題上配置多個監聽器

Spring
HongKong
4
11:41 AM · Dec 06 ,2025

1. 概述

本文將通過一個實際示例,學習如何為同一個 Kafka 主題配置多個監聽器。

如果您是第一次在 Spring 中配置 Kafka,那麼從我們的 Apache Kafka 與 Spring 介紹開始是一個不錯的選擇。

2. 項目設置

讓我們構建一個圖書消費者服務,該服務監聽圖書館內新進圖書,並將其用於各種目的,例如全文內容搜索、價格索引或用户通知。

首先,我們創建一個 Spring Boot 服務,並使用 spring-kafka 依賴項:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

此外,我們還需要定義監聽器將消費的 BookEvent

public class BookEvent {

    private String title;
    private String description;
    private Double price;

    //  standard constructors, getters and setters
}

3. 生產消息

Kafka Producers 是生態系統中的關鍵組成部分,因為 Producers 將消息寫入 Kafka 集羣。考慮到這一點,首先我們需要定義一個 Producers,該 Producers 將消息寫入稍後由消費者應用程序消費的主題。

按照我們的示例,讓我們編寫一個簡單的 Kafka Producers 函數,將新的 BookEvent 對象寫入“books”主題。

private static final String TOPIC = "books";

@Autowired
private KafkaTemplate<String, BookEvent> bookEventKafkaTemplate;

public void sentBookEvent(BookEvent book){
    bookEventKafkaTemplate.send(TOPIC, UUID.randomUUID().toString(), book);
}

4. 從同一個 Kafka 主題中消費數據

Kafka 消費者 是訂閲 Kafka 集羣中一個或多個主題的客户端應用程序。稍後我們將探討如何在同一個主題上設置多個監聽器。

4.1. 消費者配置

首先,要配置消費者,我們需要定義監聽器所需的 `ConcurrentKafkaListenerContainerFactory</i/> Bean。

現在,讓我們定義用於消費 `BookEvent</em/> 對象的容器工廠:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BookEvent> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BookEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    public ConsumerFactory<String, BookEvent> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        
        // required consumer factory properties

        return new DefaultKafkaConsumerFactory<>(props);
    }
}

接下來,我們將探討收聽傳入消息的不同策略。

4.2. 使用相同消費者組的多個監聽器

一種將多個監聽器添加到相同消費者組的策略是增加該消費者組內的併發級別。因此,我們可以簡單地在 @KafkaListener 註解中指定此設置。

為了理解其工作原理,讓我們為我們的庫定義一個通知監聽器:

@KafkaListener(topics = "books", groupId = "book-notification-consumer", concurrency = "2")
public void bookNotificationConsumer(BookEvent event) {
    logger.info("Books event received for notification => {}", event);
}

接下來,我們將查看發佈三條消息後的控制枱輸出。此外,讓我們理解為什麼消息只會被消費一次:

Books event received for notification => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for notification => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for notification => BookEvent(title=book 3, description=description 3, price=3.0)

這主要是因為,在內部,對於每個併發級別,Kafka都會實例化一個新的監聽器,位於同一個消費者組內

此外,同一個消費者組內所有監聽器實例的作用域是相互分發消息,以加快任務完成速度並提高吞吐量。

4.3. 使用不同消費組的多個監聽器

如果需要多次消費相同的消息併為每個監聽器應用不同的處理邏輯,則必須配置 @KafkaListener 以具有不同的組 ID。 這樣做將 Kafka 為每個監聽器創建專用消費組,並將所有已發佈的消息推送到每個監聽器上

為了觀察這種策略的實際效果,讓我們定義一個用於全文本搜索索引的監聽器和一個用於價格索引的監聽器。 兩個監聽器都將監聽相同的“books”主題:

@KafkaListener(topics = "books", groupId = "books-content-search")
public void bookContentSearchConsumer(BookEvent event) {
    logger.info("Books event received for full-text search indexing => {}", event);
}

@KafkaListener(topics = "books", groupId = "books-price-index")
public void bookPriceIndexerConsumer(BookEvent event) {
    logger.info("Books event received for price indexing => {}", event);
}

現在,讓我們運行上面的代碼並分析輸出結果:

Books event received for price indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 1, description=description 1, price=1.0)
Books event received for full-text search indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for price indexing => BookEvent(title=book 2, description=description 2, price=2.0)
Books event received for full-text search indexing => BookEvent(title=book 3, description=description 3, price=3.0)
Books event received for price indexing => BookEvent(title=book 3, description=description 3, price=3.0)

如我們所見,所有監聽器都會接收到每個 BookEvent ,並且可以為所有傳入的消息應用獨立的處理邏輯。

5. 如何選擇不同的監聽器策略

正如我們已經學到的,可以通過配置 concurrency 屬性(值為大於 1 的值)或定義多個 @KafkaListener 方法來設置多個監聽器,這些方法監聽同一個 Kafka 主題並具有不同的消費者 ID。

選擇哪種策略取決於我們想要實現的目標。只要我們解決性能問題,通過更快地處理消息來提高吞吐量,那麼正確的策略就是增加同一消費者組內的監聽器數量。

但是,為了多次處理同一個消息以滿足不同的要求,我們應該定義具有不同消費者組的專用監聽器,這些監聽器監聽同一個主題。

作為一般原則,我們應該為每個需要滿足的要求使用一個消費者組,如果需要使該監聽器更快,則可以在同一消費者組內增加監聽器數量。

6. 結論

在本文中,我們學習瞭如何使用 Spring Kafka 庫為同一個主題配置多個監聽器,並通過一個圖書庫的實際示例進行了説明。我們首先從 Producer 和 Consumer 的配置開始,並繼續探討了為同一個主題添加多個監聽器的不同方法。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.