1. 引言
本文將介紹如何使用 Spring Boot 設置 Kafka Streams。Kafka Streams 是一個基於 Apache Kafka 的客户端庫。 它允許以聲明式的方式處理無限流的事件。
現實生活中的流式數據示例包括傳感器數據、股票市場事件流和系統日誌。對於本教程,我們將構建一個簡單的詞計數流式應用程序。首先,我們來概述 Kafka Streams,然後設置示例及其在 Spring Boot 中的測試。
2. 概述
Kafka Streams 提供了一種 Kafka 主題和關係數據庫表的二元關係。它允許我們執行諸如連接、分組、聚合和過濾一個或多個流式事件等操作。
Kafka Streams 中一個重要的概念是處理器拓撲(Processor Topology)。處理器拓撲是 Kafka 流處理操作在一個或多個事件流上的藍圖。 基本上,處理器拓撲可以被認為是有一個有向無環圖。 在這個圖中,節點被分為源節點、處理器節點和匯出節點,而邊代表事件流的流動。
拓撲的源頭接收來自 Kafka 的流式數據,將其向下傳遞到處理器節點,其中執行自定義操作,並通過匯出節點流向一個新的 Kafka 主題。 此外,在核心處理之外,使用檢查點定期保存流的狀態,以實現故障容錯和彈性。
3. 依賴項
我們將首先添加 <a href="https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka"><em>spring-kafka</em></a> 和 <a href="https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams"><em>kafka-streams</em></a> 依賴項到我們的 POM 文件中:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId
<artifactId>kafka-streams</artifactId>
<version>3.6.1</version>
</dependency>
4. 示例
我們的示例應用程序從輸入 Kafka 主題中讀取流式事件。一旦讀取了記錄,它就會對其進行處理,以分割文本並計算單個單詞的數量。隨後,它會將更新後的單詞計數發送到 Kafka 輸出主題。除了輸出主題之外,我們還將創建一個簡單的 REST 服務,以通過 HTTP 端點暴露此計數。
總而言之,輸出主題將持續更新,其中包含從輸入事件中提取的單詞及其更新後的計數。
4.1. 配置
首先,讓我們在 Java 配置類中定義 Kafka 流處理配置:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "streams-app");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
// other config
}在這裏,我們使用了@EnableKafkaStreams註解來自動配置所需的組件。這種自動配置需要一個名為DEFAULT_STREAMS_CONFIG_BEAN_NAME的KafkaStreamsConfiguration Bean。因此,Spring Boot 使用此配置並創建 KafkaStreams 客户端來管理我們的應用程序生命週期。
在我們的示例中,我們提供了應用程序 ID、bootstrap 服務器連接詳細信息和 SerDes(序列化/反序列化器)用於我們的配置。
4.2. 拓撲結構
現在我們已經完成了配置,接下來我們將構建應用程序的拓撲結構,用於統計輸入消息中的單詞數量:
@Component
public class WordCountProcessor {
private static final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));
KTable<String, Long> wordCounts = messageStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
.count();
wordCounts.toStream().to("output-topic");
}
}在這裏,我們定義了一個配置方法並使用 @Autowired 註解對其進行標註。Spring 會處理此註解並從容器中注入與 StreamsBuilder 參數匹配的 Bean。 另一種方法是,在配置類中創建 Bean 以生成拓撲結構。
StreamsBuilder 為我們提供了訪問所有 Kafka Streams API 的權限,並且它就像一個常規的 Kafka Streams 應用程序。 在我們的示例中,我們使用這個 高層 DSL 定義了應用程序的轉換:
- 使用指定的鍵和值 SerDes 創建一個 KStream。
- 通過轉換、分割、分組和計數數據來創建 KTable。
- 將結果材料化為輸出流。
本質上,Spring Boot 提供了一個對 Streams API 的薄封裝,同時管理我們的 KStream 實例的生命週期。 它創建並配置了拓撲結構所需的組件,並執行我們的 Streams 應用程序。 重要的是,這使我們能夠專注於核心業務邏輯,而 Spring 則管理生命週期。
4.3. REST 服務
在定義了我們的流水線(通過聲明式步驟)後,我們現在將創建 REST 控制器。它提供端點,用於將消息發佈到輸入主題,以及獲取指定單詞的計數。重要的是,應用程序從 Kafka Streams 狀態存儲中檢索數據,而不是從輸出主題中檢索。
首先,讓我們修改早期的 KTable,並將聚合計數作為本地狀態存儲進行持久化。然後,該 KTable 可以由 REST 控制器查詢:
KTable<String, Long> wordCounts = textStream
.mapValues((ValueMapper<String, String>) String::toLowerCase)
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.as("counts"));之後,我們可以更新控制器以從這個 counts 狀態存儲中檢索計數值:
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
);
return counts.get(word);
}在這裏,<em>factoryBean</em> 是一個實例,它被連接到控制器。這提供了由該工廠 Bean 管理的 <em>KafkaStreams</em> 實例。因此,我們可以從中獲取我們之前創建的 <em>counts</em> 鍵/值狀態存儲,表示為 <em>KTable</em>。 在此時,我們可以利用它來獲取請求的單詞從本地狀態存儲中的當前計數。
5. 測試
測試是開發和驗證我們的應用程序拓撲結構的關鍵組成部分。Spring Kafka 測試庫和 Testcontainers 都能提供極佳的支持,用於在不同層面測試我們的應用程序。
5.1. 單元測試
首先,我們使用 TopologyTestDriver 為我們的拓撲構建一個單元測試。這是測試 Kafka Streams 應用程序的主要測試工具:
@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
wordCountProcessor.buildPipeline(streamsBuilder);
Topology topology = streamsBuilder.build();
try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
TestInputTopic<String, String> inputTopic = topologyTestDriver
.createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
TestOutputTopic<String, Long> outputTopic = topologyTestDriver
.createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());
inputTopic.pipeInput("key", "hello world");
inputTopic.pipeInput("key2", "hello");
assertThat(outputTopic.readKeyValuesToList())
.containsExactly(
KeyValue.pair("hello", 1L),
KeyValue.pair("world", 1L),
KeyValue.pair("hello", 2L)
);
}
}在此,首先我們需要的是 拓撲結構 (Topology),它封裝了我們正在測試的業務邏輯,來源於 WordCountProcessor。 我們可以使用它與 TopologyTestDriver 共同創建測試中的輸入和輸出主題。 關鍵在於,這 消除了運行消息代理 (broker) 的需求,並能驗證管道 (pipeline) 的行為。 換句話説,它使我們能夠快速驗證管道行為,而無需使用真實的 Kafka 代理。
5.2. 端到端測試
最後,我們使用 Testcontainers 框架對我們的應用程序進行端到端測試。 這使用了一個正在運行的 Kafka 代理,並啓動了我們的應用程序以進行完整的測試。
@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {
@Container
private static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
private final BlockingQueue<String> output = new LinkedBlockingQueue<>();
// other test setup
@Test
void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
postMessage("test message");
startOutputTopicConsumer();
// assert correct counts on output topic
assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");
// assert correct count from REST service
assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
}
}在此,我們已向我們的 REST 控制器發送了一條 POST 請求,該控制器的作用是將其消息發送到 Kafka 輸入主題。作為設置的一部分,我們還啓動了一個 Kafka 消費者。該消費者異步監聽輸出 Kafka 主題,並使用接收到的詞頻統計更新 BlockingQueue。
在測試執行期間,應用程序應處理輸入消息。隨後,我們可以使用 REST 服務驗證來自主題以及狀態存儲的預期輸出。
6. 結論
在本教程中,我們學習瞭如何使用 Kafka Streams 和 Spring Boot 創建一個簡單的基於事件驅動的應用,以處理消息。
在對核心流處理概念進行簡要概述後,我們探討了 Streams 拓撲的配置和創建。隨後,我們瞭解瞭如何將其與 Spring Boot 提供的 REST 功能集成。最後,我們介紹了有效測試和驗證拓撲和應用程序行為的一些方法。