1. 概述
本文將介紹如何處理“未知魔術字節”錯誤和其他在通過 Spring Kafka 消費 Avro 消息時出現的反序列化問題。我們將探索 ErrorHandlingDeserializer 並瞭解它如何處理“毒藥藥丸”消息。
最後,我們將配置 DefaultErrorHandler 以及 DeadLetterPublishingRecoverer,以便將問題記錄路由到 DLQ 主題,確保消費者繼續處理,而不會卡住。
2. 毒藥信封與魔法字節
有時,我們接收到的消息由於格式問題或意外內容而無法處理,這些消息被稱為“毒藥信封”消息。與其反覆嘗試處理它們,不如以優雅的方式處理這些消息。
在 Kafka 中,“毒藥信封”消息通常發生在消費者期望接收 Avro 編碼的數據,但卻接收到了不同的內容。例如,使用 StringSerializer 的生產者可能會將純文本消息發送到期望接收 Avro 編碼數據的 topic,從而導致消費者端 AvroDeserializer 失敗:
由此,我們得到帶有 “Unknown magic byte” 消息的解複本錯誤。 “魔法字節” 是 Avro 編碼消息開頭的標記,有助於解複本正確識別和處理它。 如果消息沒有使用 Avro 序列化器進行序列化,並且它沒有以該字節開頭,則解複本會拋出錯誤,指示格式不匹配。
3. 模擬問題
為了重現此問題,我們將使用一個簡單的 Spring Boot 應用程序,該應用程序從 Avro 格式的消息中消費 Kafka 主題中的消息。 我們的應用程序將使用 spring-kafka、avro 和 kafka-avro-deserialzier 依賴項:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.9.1</version>
</dependency>此外,我們的服務使用一個 @KafkaListener來監聽來自 “baeldung.article.published”主題的所有消息。為了演示目的,我們將存儲所有傳入消息的文章名稱,並將其保存在內存中的一個 List 中:
@Component
class AvroMagicByteApp {
// logger
List<String> blog = new ArrayList<>();
@KafkaListener(topics = "baeldung.article.published")
public void listen(Article article) {
LOG.info("a new article was published: {}", article);
blog.add(article.getTitle());
}
}接下來,我們將添加我們的 Kafka 專屬應用程序屬性。由於我們使用了 Spring Boot 內置的 Testcontainers 支持,因此可以省略 bootstrap-servers 屬性,因為它將被自動注入。我們還將將 schema.registry.url 設置為 “mock://test”,因為在測試期間我們不會使用真實的 Schema Registry:
spring:
kafka:
# bootstrap-servers <-- it'll be injected in test by Spring and Testcontainers
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: mock://test
specific.avro.reader: true這就是全部,現在我們可以使用 Testcontainers 啓動一個帶有 Kafka 代理的 Docker 容器並測試我們簡單應用程序的正常流程。
但是,如果我們向我們的測試主題發佈一個“毒藥消息”,我們將會遇到 異常。為了生產不符合規範的消息,我們將利用一個使用 StringSerializer 的 KafkaTemplate 實例,並向主題中發佈一個佔位符 String:
@SpringBootTest
class AvroMagicByteLiveTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:4.0.0"));
@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
stringKafkaTemplate()
.send("baeldung.article.published", "not a valid avro message!")
.get();
Thread.sleep(10_000L);
// manually verify that the poison-pill message is handled correctly
}
private static KafkaTemplate<Object, Object> stringKafkaTemplate() { /* ... */ }
}此外,我們還臨時添加了一個 <em Thread.sleep() ,以便我們觀察應用程序日誌。正如預期的那樣,我們的服務未能反序列化消息,並且遇到了 <em “Unknown magic byte!” 錯誤:
ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; __please consider configuring an 'ErrorHandlingDeserializer'__ in the value and/or key deserializer
at org.springframework.kafka...DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192)
[...]
Caused by: org.apache.kafka...RecordDeserializationException:
Error deserializing VALUE for partition baeldung.article.published-0 at offset 1.
__If needed, please seek past the record to continue consumption.__
at org.apache.kafka.clients...CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346)
[...]
Caused by: org.apache.kafka...errors.SerializationException: __Unknown magic byte!__
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:649)
[...]此外,我們還會反覆遇到這個問題,因為我們沒有正確處理它,也沒有承認這條消息。 簡而言之,消費者會卡在偏移量上,持續嘗試處理格式錯誤的條消息。
4. 錯誤處理反序列器
幸運的是,錯誤日誌非常詳細,甚至還提供了可能的修復建議:
This error handler cannot process 'SerializationException's directly;
please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer.在 Spring Kafka 中,ErrorHandlingDeserializer 是一個包裝器,用於捕獲反序列化錯誤並允許我們的應用程序以優雅的方式處理它們,從而防止消費者崩潰。 它通過將實際的反序列化委託給另一個反序列化器,例如 JsonDeserializer 或 KafkaAvroDeserializer,並在該過程中捕獲任何拋出的異常。
要配置它,我們將 value-deserializer 屬性更新為 ErrorHandlingDeserializer。 此外,我們將在 spring.kafka.consumer.spring.deserializer.value.delegate.class 中指定原始反序列器:
spring.kafka:
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
使用這種配置, “未知魔術字節!” 異常只會在日誌中出現一次。 此時,應用程序優雅地處理了被毒藥藥丸的消息,並且在不再次嘗試反序列化它的情況下繼續運行。
5. 將消息發佈到 DLQ
我們已經配置了 ErrorHandlingDeserializer 用於消息有效負載,並正確處理了毒藥藥丸場景。但是,如果僅僅捕獲異常並繼續,就很難檢查或恢復這些無效消息。為了解決這個問題,我們應該考慮將它們發送到 DLQ 主題。
死信隊列 (DLQ) 是用於存儲在經過一個或多個重試後仍無法成功處理的消息的特殊主題。 讓我們在我們的應用程序中啓用此行為:
@Configuration
class DlqConfig {
@Bean
DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer dlqPublishingRecoverer) {
return new DefaultErrorHandler(dlqPublishingRecoverer);
}
@Bean
DeadLetterPublishingRecoverer dlqPublishingRecoverer(KafkaTemplate<byte[], byte[]> bytesKafkaTemplate) {
return new DeadLetterPublishingRecoverer(bytesKafkaTemplate);
}
@Bean("bytesKafkaTemplate")
KafkaTemplate<?, ?> bytesTemplate(ProducerFactory<?, ?> kafkaProducerFactory) {
return new KafkaTemplate<>(kafkaProducerFactory,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()));
}
}如我們所見,我們定義了一個 DefaultErrorHandler Bean,它確定了哪些可重試的異常。在本例中,序列化異常被認為是不可重試的,因此它們將被直接發送到 DLQ。 在創建錯誤處理程序時,我們將通過構造函數注入一個 DeadLetterPublishingRecoverer 實例。
另一方面,dlqPublishingRecoverer 使用 KafkaTemplate 和 ByteArraySerializer 將失敗的消息轉發到 DLQ 主題,因為毒藥消息的確切格式未知。 此外,它還負責解析 DLQ 主題名稱; 默認情況下,它會將 “-dlt” 添加到原始主題名稱中:
@Test
void whenSendingMalformedMessage_thenSendToDLQ() throws Exception {
stringKafkaTemplate()
.send("baeldung.article.published", "not a valid avro message!")
.get();
var dlqRecord = listenForOneMessage("baeldung.article.published-dlt", ofSeconds(5L));
assertThat(dlqRecord.value())
.isEqualTo("not a valid avro message!");
}
private static ConsumerRecord<?, ?> listenForOneMessage(String topic, Duration timeout) {
return KafkaTestUtils.getOneRecord(
kafka.getBootstrapServers(), "test-group-id", topic, 0, false, true, timeout);
}如我們所見,配置 ErrorHandlingDeserializer 使得我們能夠優雅地處理格式錯誤的報文。隨後,定製的 DefaultErrorHandler 和 DeadLetterPublishingRecoverer Bean 允許我們將這些故障報文推送到 DLQ 主題。
6. 結論
在本教程中,我們介紹瞭如何解決“未知魔術字節”錯誤以及處理帶有 Spring Kafka 的 Avro 消息時可能出現的其他反序列化問題。我們探討了 ErrorHandlingDeserializer 如何幫助消費者避免因問題消息而阻塞。
最後,我們回顧了死信隊列的概念,並配置了 Spring Kafka bean 以將“毒藥藥丸”消息路由到專用 DLQ 主題,從而確保平穩且不間斷的進程。