1. 介紹
在 Apache Kafka 中消息傳輸過程中,客户端和服務器會就使用通用的句法格式達成一致。Apache Kafka 默認提供轉換器(如 <em >String</em > 和 <em >Long</em >>),但也支持自定義序列化器,以適應特定用例。在本教程中,我們將學習如何實現它們。
2. Apache Kafka 中的序列化器
序列化是將對象轉換為字節的過程。反序列化是相反的過程——將字節流轉換為對象。 簡單來説,它 將內容轉換為可讀和可解釋的信息。
正如我們所提到的,Apache Kafka 提供了用於多種基本類型的默認序列化器,並且允許我們實現自定義序列化器:
圖表展示了通過網絡向 Kafka 主題發送消息的過程。 在這個過程中,自定義序列化器在生產者將消息發送到主題之前,將對象轉換為字節。 同樣,它還展示了反序列器如何將字節轉換為對象,以便消費者可以正確地處理它。
2.1. 自定義序列化器
Apache Kafka 提供了一些基本類型的預構建序列化器和反序列化器:
但是,它還提供了實現自定義 (de)序列化器的能力。為了序列化我們自己的對象,我們將實現 Serializer 接口。 同樣,為了創建一個自定義的反序列化器,我們將實現 Deserializer 接口。
這兩個接口提供了三個方法供重寫:
configure: 用於實現配置細節serialize/deserialize: 這些方法包含我們自定義序列化和反序列化的實際實現。close: 使用此方法關閉 Kafka 會話。
3. 在 Apache Kafka 中實現自定義序列化器
Apache Kafka 提供自定義序列化器的能力。 它可以實現特定的轉換器,不僅適用於消息值,還適用於鍵。
3.1. 依賴項
為了實現這些示例,我們將簡單地將 Kafka Consumer API 依賴項添加到我們的 pom.xml 中:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
3.2. 自定義序列化器
首先,我們將使用 Lombok 來指定通過 Kafka 發送的自定義對象:
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
private String message;
private String version;
}接下來,我們將實現 Kafka 提供的用於生產者發送消息的 Serializer 接口:
public class CustomSerializer implements Serializer<MessageDto> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, MessageDto data) {
try {
if (data == null){
System.out.println("Null received at serializing");
return null;
}
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
我們將覆蓋接口中的serialize方法。因此,在我們的實現中,我們將使用Jackson ObjectMapper對自定義對象進行轉換。然後,我們將返回字節流,以便將消息正確地發送到網絡。
3.3. 自定義反序列器
正如上面所述,我們將為消費者實現 <a href="https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html"><em >Deserializer</em></a > 接口:
@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public MessageDto deserialize(String topic, byte[] data) {
try {
if (data == null){
System.out.println("Null received at deserializing");
return null;
}
System.out.println("Deserializing...");
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to MessageDto");
}
}
@Override
public void close() {
}
}
如前所述,我們將覆蓋接口中的 deserialize方法。因此,我們將使用相同的 Jackson ObjectMapper將字節流轉換為自定義對象。
3.4. 消費示例消息
讓我們來看一個使用自定義序列化器和反序列器發送和接收示例消息的實際示例。
首先,我們將創建並配置 Kafka Producer:
private static KafkaProducer<String, MessageDto> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");
return new KafkaProducer(props);
}我們將配置值序列化器屬性為我們的自定義類,並將鍵序列化器配置為默認的 StringSerializer。
其次,我們將創建 Kafka Consumer:
private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");
return new KafkaConsumer<>(props);
}除了使用我們自定義類的鍵值解序列化器之外,必須包含組 ID。除此之外,我們將 auto offset reset 配置設置為 earliest,以確保生產者在消費者開始消費之前已發送所有消息。
在創建生產者和消費者客户端後,現在發送一個示例消息:
MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();
KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();我們也可以通過訂閲主題來接收與消費者相關的消息:
AtomicReference<MessageDto> msgCons = new AtomicReference<>();
KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
msgCons.set(record.value());
System.out.println("Message received " + record.value());
});
consumer.close();結果在控制枱中是:
Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)4. 結論
在本教程中,我們展示了生產者如何使用 Apache Kafka 中的序列化器通過網絡發送消息。同樣,我們也展示了消費者如何使用反序列器來解釋接收到的消息。
此外,我們學習了可用的默認序列化器,以及最重要的是,實現自定義序列化器和反序列器的能力。