知識庫 / Spring / Spring Boot RSS 訂閱

Spring Boot 與 Kafka Streams

Spring Boot
HongKong
4
12:20 PM · Dec 06 ,2025

1. 引言

本文將介紹如何使用 Spring Boot 設置 Kafka Streams。Kafka Streams 是一個基於 Apache Kafka 的客户端庫。 它允許以聲明式的方式處理無限流的事件。

現實生活中的流式數據示例包括傳感器數據、股票市場事件流和系統日誌。對於本教程,我們將構建一個簡單的詞計數流式應用程序。首先,我們來概述 Kafka Streams,然後設置示例及其在 Spring Boot 中的測試。

2. 概述

Kafka Streams 提供了一種 Kafka 主題和關係數據庫表的二元關係。它允許我們執行諸如連接、分組、聚合和過濾一個或多個流式事件等操作。

Kafka Streams 中一個重要的概念是處理器拓撲(Processor Topology)。處理器拓撲是 Kafka 流處理操作在一個或多個事件流上的藍圖。 基本上,處理器拓撲可以被認為是有一個有向無環圖。 在這個圖中,節點被分為源節點、處理器節點和匯出節點,而邊代表事件流的流動。

拓撲的源頭接收來自 Kafka 的流式數據,將其向下傳遞到處理器節點,其中執行自定義操作,並通過匯出節點流向一個新的 Kafka 主題。 此外,在核心處理之外,使用檢查點定期保存流的狀態,以實現故障容錯和彈性。

3. 依賴項

我們將首先添加 <a href="https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka"><em>spring-kafka</em></a><a href="https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams"><em>kafka-streams</em></a> 依賴項到我們的 POM 文件中:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId
    <artifactId>kafka-streams</artifactId>
    <version>3.6.1</version>
</dependency> 

4. 示例

我們的示例應用程序從輸入 Kafka 主題中讀取流式事件。一旦讀取了記錄,它就會對其進行處理,以分割文本並計算單個單詞的數量。隨後,它會將更新後的單詞計數發送到 Kafka 輸出主題。除了輸出主題之外,我們還將創建一個簡單的 REST 服務,以通過 HTTP 端點暴露此計數。

總而言之,輸出主題將持續更新,其中包含從輸入事件中提取的單詞及其更新後的計數。

4.1. 配置

首先,讓我們在 Java 配置類中定義 Kafka 流處理配置:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }

    // other config
}

在這裏,我們使用了@EnableKafkaStreams註解來自動配置所需的組件。這種自動配置需要一個名為DEFAULT_STREAMS_CONFIG_BEAN_NAMEKafkaStreamsConfiguration Bean。因此,Spring Boot 使用此配置並創建 KafkaStreams 客户端來管理我們的應用程序生命週期

在我們的示例中,我們提供了應用程序 ID、bootstrap 服務器連接詳細信息和 SerDes(序列化/反序列化器)用於我們的配置。

4.2. 拓撲結構

現在我們已經完成了配置,接下來我們將構建應用程序的拓撲結構,用於統計輸入消息中的單詞數量:

@Component
public class WordCountProcessor {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

        KTable<String, Long> wordCounts = messageStream
          .mapValues((ValueMapper<String, String>) String::toLowerCase)
          .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
          .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
          .count();

        wordCounts.toStream().to("output-topic");
    }
}

在這裏,我們定義了一個配置方法並使用 @Autowired 註解對其進行標註。Spring 會處理此註解並從容器中注入與 StreamsBuilder 參數匹配的 Bean。 另一種方法是,在配置類中創建 Bean 以生成拓撲結構。

StreamsBuilder 為我們提供了訪問所有 Kafka Streams API 的權限,並且它就像一個常規的 Kafka Streams 應用程序。 在我們的示例中,我們使用這個 高層 DSL 定義了應用程序的轉換

  • 使用指定的鍵和值 SerDes 創建一個 KStream
  • 通過轉換、分割、分組和計數數據來創建 KTable
  • 將結果材料化為輸出流。

本質上,Spring Boot 提供了一個對 Streams API 的薄封裝,同時管理我們的 KStream 實例的生命週期。 它創建並配置了拓撲結構所需的組件,並執行我們的 Streams 應用程序。 重要的是,這使我們能夠專注於核心業務邏輯,而 Spring 則管理生命週期。

4.3. REST 服務

在定義了我們的流水線(通過聲明式步驟)後,我們現在將創建 REST 控制器。它提供端點,用於將消息發佈到輸入主題,以及獲取指定單詞的計數。重要的是,應用程序從 Kafka Streams 狀態存儲中檢索數據,而不是從輸出主題中檢索

首先,讓我們修改早期的 KTable,並將聚合計數作為本地狀態存儲進行持久化。然後,該 KTable 可以由 REST 控制器查詢:

KTable<String, Long> wordCounts = textStream
  .mapValues((ValueMapper<String, String>) String::toLowerCase)
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
  .count(Materialized.as("counts"));

之後,我們可以更新控制器以從這個 counts 狀態存儲中檢索計數值:

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
    KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
    ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
      StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
    );
    return counts.get(word);
}

在這裏,<em>factoryBean</em> 是一個實例,它被連接到控制器。這提供了由該工廠 Bean 管理的 <em>KafkaStreams</em> 實例。因此,我們可以從中獲取我們之前創建的 <em>counts</em> 鍵/值狀態存儲,表示為 <em>KTable</em>。 在此時,我們可以利用它來獲取請求的單詞從本地狀態存儲中的當前計數。

5. 測試

測試是開發和驗證我們的應用程序拓撲結構的關鍵組成部分。Spring Kafka 測試庫和 Testcontainers 都能提供極佳的支持,用於在不同層面測試我們的應用程序。

5.1. 單元測試

首先,我們使用 TopologyTestDriver 為我們的拓撲構建一個單元測試。這是測試 Kafka Streams 應用程序的主要測試工具:

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    wordCountProcessor.buildPipeline(streamsBuilder);
    Topology topology = streamsBuilder.build();

    try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
        TestInputTopic<String, String> inputTopic = topologyTestDriver
          .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
        
        TestOutputTopic<String, Long> outputTopic = topologyTestDriver
          .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

        inputTopic.pipeInput("key", "hello world");
        inputTopic.pipeInput("key2", "hello");

        assertThat(outputTopic.readKeyValuesToList())
          .containsExactly(
            KeyValue.pair("hello", 1L),
            KeyValue.pair("world", 1L),
            KeyValue.pair("hello", 2L)
          );
    }
}

在此,首先我們需要的是 拓撲結構 (Topology),它封裝了我們正在測試的業務邏輯,來源於 WordCountProcessor。 我們可以使用它與 TopologyTestDriver 共同創建測試中的輸入和輸出主題。 關鍵在於,這 消除了運行消息代理 (broker) 的需求,並能驗證管道 (pipeline) 的行為。 換句話説,它使我們能夠快速驗證管道行為,而無需使用真實的 Kafka 代理。

5.2. 端到端測試

最後,我們使用 Testcontainers 框架對我們的應用程序進行端到端測試。 這使用了一個正在運行的 Kafka 代理,並啓動了我們的應用程序以進行完整的測試。

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

    // other test setup

    @Test
    void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
        postMessage("test message");

        startOutputTopicConsumer();

        // assert correct counts on output topic
        assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
        assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

        // assert correct count from REST service
        assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
        assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
    }
}

在此,我們已向我們的 REST 控制器發送了一條 POST 請求,該控制器的作用是將其消息發送到 Kafka 輸入主題。作為設置的一部分,我們還啓動了一個 Kafka 消費者。該消費者異步監聽輸出 Kafka 主題,並使用接收到的詞頻統計更新 BlockingQueue

在測試執行期間,應用程序應處理輸入消息。隨後,我們可以使用 REST 服務驗證來自主題以及狀態存儲的預期輸出。

6. 結論

在本教程中,我們學習瞭如何使用 Kafka Streams 和 Spring Boot 創建一個簡單的基於事件驅動的應用,以處理消息。

在對核心流處理概念進行簡要概述後,我們探討了 Streams 拓撲的配置和創建。隨後,我們瞭解瞭如何將其與 Spring Boot 提供的 REST 功能集成。最後,我們介紹了有效測試和驗證拓撲和應用程序行為的一些方法。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.