知識庫 / Spring / Spring Boot RSS 訂閱

動態管理 Spring Boot 中的 Kafka 監聽器

Spring Boot
HongKong
4
11:17 AM · Dec 06 ,2025

1. 概述

在當今的事件驅動架構中,有效地管理數據流至關重要。 Apache Kafka 是一個流行的選擇,但將其集成到我們的應用程序中仍然存在挑戰,儘管有諸如 Spring Kafka 這樣的輔助框架。 最大的挑戰之一是實現適當的動態監聽器管理,這為我們應用程序不斷變化的工作負載和維護提供了靈活性和控制。

在本教程中,我們將學習如何在 Spring Boot 應用程序中動態啓動和停止 Kafka 監聽器

2. 先決條件

首先,我們需要將 《spring-kafka》 依賴項導入到我們的項目中:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

3. 配置 Kafka 消費者

生產者是向 Kafka 主題發佈(寫入)事件的應用程序。

在本文教程中,我們將使用單元測試來模擬生產者向 Kafka 主題發送事件。消費者,負責訂閲主題並處理事件流,在我們的應用程序中由一個監聽器表示。該監聽器配置為處理來自 Kafka 的傳入消息。

讓我們通過 <em >KafkaConsumerConfig</em> 類配置我們的 Kafka 消費者,該類包含 Kafka 代理的地址、消費者組 ID 和用於鍵值對的解序列化器:

@Bean
public DefaultKafkaConsumerFactory<String, UserEvent> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.baeldung.spring.kafka.start.stop.consumer");
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(UserEvent.class));
}

4. 配置 Kafka 監聽器

在 Spring Kafka 中,通過為方法註解 ,創建一個監聽器,該監聽器能夠從指定的主題中消費消息。為了定義它,讓我們聲明一個 類:

@KafkaListener(id = Constants.LISTENER_ID, topics = Constants.MULTI_PARTITION_TOPIC, groupId = "test-group",
  containerFactory = "kafkaListenerContainerFactory", autoStartup = "false")
public void processUserEvent(UserEvent userEvent) {
    logger.info("Received UserEvent: " + userEvent.getUserEventId());
    userEventStore.addUserEvent(userEvent);
}

監聽器等待來自主題 multi_partition_topic的消息,並使用 processUserEvent()方法進行處理。 我們將 groupId設置為 test-group,確保消費者成為更廣泛組的一部分,從而實現多實例分佈式處理。

我們使用 id屬性為每個監聽器分配一個唯一的標識符。 在本例中,分配的監聽器 ID 為 listener-id-1

autoStartup 屬性允許我們控制監聽器在應用程序初始化時是否啓動。 在我們的示例中,我們將它設置為 false,這意味着監聽器不會在應用程序啓動時自動啓動。 此配置為我們提供了手動啓動監聽器的靈活性。

這種手動啓動可以由各種事件觸發,例如新用户註冊、應用程序內的特定條件(如達到一定數據量閾值)或管理界面手動啓動監聽器等行政操作。 例如,如果在線零售應用程序檢測到閃購期間的流量激增,它可以自動啓動額外的監聽器來處理增加的負載,從而優化性能。

UserEventStore 用作監聽器接收到的事件的臨時存儲:

@Component
public class UserEventStore {

    private final List<UserEvent> userEvents = new ArrayList<>();

    public void addUserEvent(UserEvent userEvent) {
        userEvents.add(userEvent);
    }

    public List<UserEvent> getUserEvents() {
        return userEvents;
    }

    public void clearUserEvents() {
        this.userEvents.clear();
    }
}

5. 動態控制監聽器

讓我們創建一個 KafkaListenerControlService,它使用 KafkaListenerEndpointRegistry 動態啓動和停止 Kafka 監聽器:

@Service
public class KafkaListenerControlService {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    public void startListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && !listenerContainer.isRunning()) {
            listenerContainer.start();
        }
    }

    public void stopListener(String listenerId) {
        MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
        if (listenerContainer != null && listenerContainer.isRunning()) {
            listenerContainer.stop();
        }
    }
}

KafkaListenerControlService 可以精確地根據其分配的 ID 管理單個監聽器實例。startListener()stopListener() 方法都使用 listenerId 作為參數,允許我們根據需要從主題中啓動和停止消息消費

KafkaListenerEndpointRegistry 作為 Spring 應用上下文內所有 Kafka 監聽器端點的中心存儲庫。它監控這些監聽器容器,從而允許我們對它們的狀態進行編程控制,無論是在啓動、停止還是暫停之間。這種功能對於需要實時調整消息處理活動的應用至關重要,而無需重新啓動整個應用程序。

6. 驗證動態監聽器控制

接下來,讓我們重點關注在我們的 Spring Boot 應用程序中, Kafka 監聽器動態啓動和停止功能的測試。首先,啓動監聽器:

kafkaListenerControlService.startListener(Constants.LISTENER_ID);

我們隨後驗證監聽器是否被激活,通過發送並處理一個測試事件來完成:

UserEvent startUserEventTest = new UserEvent(UUID.randomUUID().toString()); 
producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, startUserEventTest)); 
await().untilAsserted(() -> assertEquals(1, this.userEventStore.getUserEvents().size())); 
this.userEventStore.clearUserEvents();

現在監聽器已激活,我們將發送一批包含十條消息進行處理。發送四條消息後,我們將停止監聽器,然後將剩餘的消息發送到 Kafka 主題:

for (long count = 1; count <= 10; count++) {
    UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
    Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Constants.MULTI_PARTITION_TOPIC, userEvent));
    RecordMetadata metadata = future.get();
    if (count == 4) {
        await().untilAsserted(() -> assertEquals(4, this.userEventStore.getUserEvents().size()));
        this.kafkaListenerControlService.stopListener(Constants.LISTENER_ID);
        this.userEventStore.clearUserEvents();
    }
    logger.info("User Event ID: " + userEvent.getUserEventId() + ", Partition : " + metadata.partition());
}

在啓動監聽器之前,我們首先會驗證事件存儲中是否存在任何消息:

assertEquals(0, this.userEventStore.getUserEvents().size());
kafkaListenerControlService.startListener(Constants.LISTENER_ID);
await().untilAsserted(() -> assertEquals(6, this.userEventStore.getUserEvents().size()));
kafkaListenerControlService.stopListener(Constants.LISTENER_ID);

當監聽器重新啓動後,它會處理我們在監聽器停止後發送到 Kafka 主題的剩餘六條消息。該測試展示了 Spring Boot 應用程序動態管理 Kafka 監聽器的能力。

7. 用例

動態監聽器管理在需要高適應性場景中表現出色。例如,在高峯負載期間,我們可以動態啓動額外的監聽器以提高吞吐量並縮短處理時間。相反,在維護或低流量期間,我們可以停止監聽器以節約資源。這種靈活性也有助於在功能標誌的控制下部署新功能,從而能夠在不影響整體系統的情況下實現無縫的實時調整。

讓我們考慮一個電子商務平台推出新推薦引擎的場景,該引擎旨在通過根據瀏覽歷史和購買模式向用户推薦產品來增強用户體驗。為了在全面發佈之前驗證該功能的有效性,我們決定使用功能標誌對其進行部署。

激活該功能標誌將啓動 Kafka 監聽器。隨着終端用户與平台交互,由 Kafka 監聽器驅動的推薦引擎將處理傳入的用户活動數據流,以生成個性化的產品推薦。

當我們停用該功能標誌時,我們將停止 Kafka 監聽器,平台將返回到其現有的推薦引擎。這確保了無論新引擎處於測試階段,用户體驗都將無縫,從而保證了用户體驗的連續性。

在功能處於活動狀態期間,我們積極收集數據,監控性能指標並對推薦引擎進行調整。我們重複此功能測試過程,直到我們達到所需的結果。

通過這種迭代過程,動態監聽器管理證明了其作為寶貴工具的價值。它使新功能能夠無縫引入。

8. 結論

在本文中,我們探討了 Spring Boot 與 Kafka 的集成,重點關注動態管理 Kafka 監聽器。 這種能力對於管理波動性工作負載和進行常規維護至關重要。 此外,它還允許進行特徵開關、根據流量模式擴展服務,以及管理具有特定觸發器的事件驅動工作流程。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.