1. 概述
Apache Kafka 是一個分佈式且容錯性強的流式處理系統。
在本教程中,我們將涵蓋 Spring 對 Kafka 的支持以及它在 Kafka Java 客户端 API 上的抽象層次。
Spring Kafka 將標準的 Spring 模板編程模型與 KafkaTemplate 和通過 @KafkaListener 註解實現的基於消息的 POJO 結合起來。
2. Spring 中 Apache Kafka 的監聽器容器是什麼?
Spring 框架通過依賴注入(DI)實現控制反轉(IoC)原則。對象直接定義其依賴項,而 IoC 容器在創建 Bean 時注入它們。Bean 是由 Spring IoC 容器實例化、組裝和管理的對象。換句話説,容器是一個負責實例化、配置和組裝 Bean 的應用程序上下文。
在 Apache Kafka 的上下文中,監聽器容器是指包含 Kafka 消息消費者的容器。Spring for Apache Kafka 使用容器工廠來創建消息監聽器容器。我們使用 將 Bean 方法標記為消息監聽器,用於監聽器容器。因此,容器工廠為帶有 的 Bean 方法創建監聽器容器。Spring for Apache Kafka 框架提供接口和類來管理監聽器容器的實例化:
- org.springframework.kafka.listener.MessageListenerContainer – 用於實例化 Kafka 消息監聽器容器的抽象
- org.springframework.kafka.listener.KafkaMessageListenerContainer – 用於創建單線程消息監聽器容器的實現類
- org.springframework.kafka.listener.ConcurrentMessageListenerContainer – 用於根據併發性創建 하나 이상의 KafkaMessageListenerContainers 的實現類
- org.springframework.kafka.config.KafkaListenerContainerFactory – MessageListenerContainers 的抽象工廠
- org.springframework.kafka.config.ConcurrentKafkaListenerFactory – 用於創建 ConcurrentMessageListenerContainer 的實現類
3. 安裝與配置
要下載和安裝 Kafka,請參考官方指南 這裏。
還需要在我們的 pom.xml 中 添加 spring-kafka 依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.1</version>
</dependency>並配置 spring-boot-maven-plugin,如下所示:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.3.2</version>
<configuration>
<mainClass>com.baeldung.spring.kafka.KafkaApplication</mainClass>
</configuration>
</plugin>我們可以通過 這裏 找到此 Artifact 的最新版本。
我們的示例應用程序將是一個 Spring Boot 應用程序。
我們假設服務器已使用默認配置啓動,並且我們未更改任何服務器端口。
4. 配置主題
此前,我們使用命令行工具創建 Kafka “主題”:
$ bin/kafka-topics.sh --create
--zookeeper localhost:2181
--replication-factor 1 --partitions 1
--topic mytopic但隨着 AdminClient 在 Kafka 的引入,我們現在可以編程方式創建主題。
我們需要添加 KafkaAdmin Spring Bean,它將自動為所有 NewTopic 類型 Bean 添加主題。
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("baeldung", 1, (short) 1);
}
}5. 生產消息
要創建消息,首先需要配置一個 ProducerFactory。 這設置了創建 Kafka Producer 實例的策略。
然後,我們需要一個 KafkaTemplate,它封裝了一個 Producer 實例,並提供方便的方法來將消息發送到 Kafka 主題。
Producer 實例是線程安全的。 因此,在應用程序上下文中使用單個實例將提供更高的性能。 結果,KafkaTemplate 實例也是線程安全的,並且推薦使用單個實例。
5.1. 生產者配置
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}5.2. 發佈消息
我們可以使用 KafkaTemplate 類發送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}發送 API 返回一個 CompletableFuture 對象。如果想要阻塞發送線程並獲取已發送消息的結果,我們可以調用 CompletableFuture 對象的 get API。線程將等待結果,但會降低生產者的性能。
Kafka 是一個快速的流處理平台。因此,最好異步處理結果,以便後續消息不等待先前消息的結果。
我們可以通過回調來實現:
public void sendMessage(String message) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
} else {
System.out.println("Unable to send message=[" +
message + "] due to : " + ex.getMessage());
}
});
}6. 消費消息
6.1. 消費者配置
為了消費消息,我們需要配置一個 <a href="http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.1.3.RELEASE/api/org/springframework/kafka/core/ConsumerFactory.html">ConsumerFactory</a> 和一個 <a href="http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.1.3.RELEASE/api/org/springframework/kafka/config/KafkaListenerContainerFactory.html">KafkaListenerContainerFactory</a>。一旦這些 Bean 在 Spring Bean 工廠中可用,基於 POJO 的消費者可以使用 <a href="http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.1.3.RELEASE/api/org/springframework/kafka/annotation/KafkaListener.html">@KafkaListener</a> 註解進行配置。
為了啓用 spring-managed Bean 上 <em>@KafkaListener</em> 註解的檢測,需要在配置類上添加 <em>@EnableKafka</em> 註解。
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}6.2. 消費消息
讓我們使用 @KafkaListener 標註配置一個基於POJO的監聽器,也稱為消費者:
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group foo: " + message);
}我們可以為每個主題實現多個監聽器,每個監聽器具有不同的組 ID。 此外,一個消費者可以監聽來自各種主題的消息:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")Spring 還支持通過 @Header 註解在監聽器中檢索一個或多個消息頭:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}6.3. 從特定分區消費消息
請注意,我們創建了僅包含一個分區的 topic baeldung。
對於包含多個分區的 topic,@KafkaListener 可以顯式地訂閲 topic 的特定分區,並指定初始偏移量:
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}由於該監聽器中 initialOffset 已設置為 0,因此每次初始化監聽器時,所有先前消費的來自分區 0 和 3 的消息都將被重新消費。
如果我們不需要設置偏移量,則可以使用 @TopicPartition 註解的 partitions 屬性,僅設置分區而不設置偏移量:
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))6.4. 為監聽器添加消息過濾器
我們可以通過添加自定義過濾器來配置監聽器,使其消費特定消息內容。 這可以通過將 RecordFilterStrategy 設置到 KafkaListenerContainerFactory 中來實現:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}我們可以配置一個監聽器來使用這個容器工廠:
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
System.out.println("Received Message in filtered listener: " + message);
}此監聽器會丟棄所有匹配過濾器的消息。
7. 自定義消息轉換器
目前,我們只討論了字符串作為消息的發送和接收。但是,我們也可以發送和接收自定義的 Java 對象。 這需要配置 ProducerFactory 中的適當序列化器,以及 ConsumerFactory 中的反序列器。
讓我們來看一個簡單的 Bean 類,我們將它作為消息發送:
public class Greeting {
private String msg;
private String name;
// standard getters, setters and constructor
}7.1. 生成自定義消息
在示例中,我們將使用 JsonSerializer。
讓我們來看一下 ProducerFactory 和 KafkaTemplate 的代碼:
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}我們可以使用這個新的 KafkaTemplate 來發送 Greeting 消息:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));7.2. 消費自定義消息
同樣,讓我們修改 ConsumerFactory 和 KafkaListenerContainerFactory 以正確地反序列化 Greeting 消息:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}Spring Kafka 的 JSON 序列化器和反序列器使用 Jackson 庫,Jackson 庫本身也是 Spring Kafka 項目的選型 Maven 依賴。
因此,我們將其添加到我們的 pom.xml 中:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.2</version>
</dependency>建議不要使用 Jackson 的最新版本,而是應使用添加到 spring-kafka 的 pom.xml 中的版本。
最後,我們需要編寫一個監聽器來消費 Greeting 消息:
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
// process greeting message
}8. 多方法監聽器
現在我們將看看如何配置應用程序以向相同的訂閲主題發送不同類型的對象,然後消費它們。
首先,我們將添加一個新的類,Farewell:
public class Farewell {
private String message;
private Integer remainingMinutes;
// standard getters, setters and constructor
}我們需要進行一些額外的配置,才能將 Greeting 和 Farewell 對象都發送到同一個主題。
8.1. 在生產者中設置映射類型
在生產者中,我們需要配置 JSON 類型映射:
configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");這樣一來,庫將使用相應的類名填充類型頭信息。
結果,ProducerFactory 和 KafkaTemplate 如下所示:
@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
return new KafkaTemplate<>(multiTypeProducerFactory());
}我們可以使用這個 KafkaTemplate 將 Greeting, Farewell, 或任何 Object 發送到主題:
multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");8.2. 使用自定義 MessageConverter 在消費者中
為了能夠反序列化傳入的消息,我們需要為 Consumer 提供一個自定義的 MessageConverter。
在幕後,MessageConverter 依賴於 Jackson2JavaTypeMapper。 默認情況下,映射器會推斷接收對象的類型;相反,我們需要明確地告訴它使用類型頭來確定反序列化的目標類:
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);我們還需要提供反向映射信息。 查找 “greeting” 在類型頭中標識一個 Greeting 對象,而 “farewell” 則對應一個 Farewell 對象:
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);最後,我們需要配置映射器信任的包。 確保它包含目標類的位置:
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");因此,以下是 MessageConverter 的最終定義:
@Bean
public RecordMessageConverter multiTypeConverter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}現在我們需要配置我們的 ConcurrentKafkaListenerContainerFactory 使用 MessageConverter 並且使用一個基本的 ConsumerFactory。
@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
HashMap<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(multiTypeConsumerFactory());
factory.setRecordMessageConverter(multiTypeConverter());
return factory;
}8.3. 使用 <em @KafkaHandler 在監聽器中
最後,在我們的 <em @KafkaHandler 中,我們將創建一個處理程序方法來檢索所有可能的對象。每個處理程序都需要使用 <em @KafkaHandler 註解。
請注意,我們還可以為無法綁定到 Greeting 或 Farewell 類中的對象而定義一個默認處理程序:
@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {
@KafkaHandler
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
@KafkaHandler
public void handleF(Farewell farewell) {
System.out.println("Farewell received: " + farewell);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Unkown type received: " + object);
}
}9. 結論
本文介紹了 Spring 支持 Apache Kafka 的基本知識。我們簡要地探討了用於發送和接收消息的類。
在運行代碼之前,請確保 Kafka 服務器已啓動,並且主題已手動創建。