知識庫 / Spring / Spring Boot RSS 訂閱

如何捕獲 Spring-Kafka 反序列化錯誤?

Spring Boot
HongKong
4
11:19 AM · Dec 06 ,2025

1. 概述

本文將介紹 Spring-Kafka 中的 <em >RecordDeserializationException</em>。之後,我們將創建一個自定義錯誤處理程序來捕獲該異常,並跳過無效消息,從而允許消費者繼續處理下一個事件。

本文依賴於 Spring Boot 的 Kafka 模塊,它提供了方便的工具,用於與消息代理進行交互。要更深入地瞭解 Kafka 的內部原理,我們可以回顧一下平台的根本概念。

2. 創建 Kafka 監聽器

對於本文中的代碼示例,我們將使用一個監聽“baeldung.articles.published”主題並處理傳入消息的小應用程序。為了展示自定義錯誤處理,我們的應用程序應在遇到反序列化異常後繼續消費消息。   

Spring-Kafka 的版本將由父級 Spring Boot pom 自動解決。因此,我們只需要添加模塊依賴項:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

本模塊使我們能夠使用 @KafkaListener 註解,該註解是對 Kafka Consumer API 的抽象。 讓我們利用該註解來創建 ArticlesPublishedListener 組件。 此外,我們還將引入另一個組件 EmailService,它將對每條傳入的消息執行操作:

@Component
class ArticlesPublishedListener {
    private final EmailService emailService;

    // constructor

    @KafkaListener(topics = "baeldung.articles.published")
    public void onArticlePublished(ArticlePublishedEvent event) {
        emailService.sendNewsletter(event.article());
    }
}

record ArticlePublishedEvent(String article) {
}

對於消費者配置,我們將重點定義對我們示例至關重要的屬性。在開發生產應用程序時,我們可以根據自身需求調整這些屬性,或者將其外部化到單獨的配置文件中:

@Bean
ConsumerFactory<String, ArticlePublishedEvent> consumerFactory(
  @Value("${spring.kafka.bootstrap-servers}") String bootstrapServers
) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung-app-1");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(),
      new JsonDeserializer<>(ArticlePublishedEvent.class)
    );
}

@Bean
KafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
  ConsumerFactory<String, ArticlePublishedEvent> consumerFactory
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
    factory.setConsumerFactory(consumerFactory);
    return factory;
}

3. 設置測試環境

要設置我們的測試環境,我們可以利用 Kafka Testcontainer,它將無縫地啓動一個 Kafka Docker 容器用於測試:

@Testcontainers
@SpringBootTest(classes = Application.class)
class DeserializationExceptionLiveTest {

    @Container
    private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    @DynamicPropertySource
    static void setProps(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
    }

    // ...
}

同時,我們需要一個 KafkaProducer 和一個 EmailService 來驗證監聽器的功能。這些組件將向我們的監聽器發送消息並驗證其準確處理。為了簡化測試並避免使用 Mock,我們將在內存中持久化所有傳入的文章,稍後使用 getter 訪問它們:

@Service
class EmailService { 
    private final List<String> articles = new ArrayList<>();
   
    // logger, getter

    public void sendNewsletter(String article) {
        log.info("Sending newsletter for article: " + article);
        articles.add(article);
    }
}

因此,我們只需將 EmailService 注入到我們的測試類中。接下來,讓我們創建 testKafkaProducer

@Autowired
EmailService emailService;

static KafkaProducer<String, String> testKafkaProducer;

@BeforeAll
static void beforeAll() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    testKafkaProducer = new KafkaProducer<>(props);
}

通過這種配置,我們已經可以測試正常流程。讓我們發佈兩篇文章,使用有效的 JSON,並驗證我們的應用程序是否成功調用了 emailService 接口:

@Test
void whenPublishingValidArticleEvent_thenProcessWithoutErrors() {
    publishArticle("{ \"article\": \"Kotlin for Java Developers\" }");
    publishArticle("{ \"article\": \"The S.O.L.I.D. Principles\" }");

    await().untilAsserted(() -> 
      assertThat(emailService.getArticles())
        .containsExactlyInAnyOrder(
          "Kotlin for Java Developers", 
          "The S.O.L.I.D. Principles"
        ));
}

4. 導致 RecordDeserializationException 異常

Kafka 在配置的解序列化器無法正確解析消息的鍵或值時,會拋出 RecordDeserializationException 異常。 要重現此錯誤,我們只需發佈包含無效 JSON 消息的主題即可。:

@Test
void whenPublishingInvalidArticleEvent_thenCatchExceptionAndContinueProcessing() {
    publishArticle("{ \"article\": \"Introduction to Kafka\" }");
    publishArticle(" !! Invalid JSON !! ");
    publishArticle("{ \"article\": \"Kafka Streams Tutorial\" }");

    await().untilAsserted(() -> 
      assertThat(emailService.getArticles())
        .containsExactlyInAnyOrder(
          "Kotlin for Java Developers",
          "The S.O.L.I.D. Principles"
        ));
}

如果運行此測試並檢查控制枱,我們將會觀察到重複出現的錯誤被記錄下來:

ERROR 7716 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

**java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer**
   at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:151) ~[spring-kafka-2.8.11.jar:2.8.11]
   at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1815) ~[spring-kafka-2.8.11.jar:2.8.11]
   ...
**Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition baeldung.articles.published-0 at offset 1. If needed, please seek past the record to continue consumption.**
   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1448) ~[kafka-clients-3.1.2.jar:na]
   ...
**Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data** [[32, 33, 33, 32, 73, 110, 118, 97, 108, 105, 100, 32, 74, 83, 79, 78, 32, 33, 33, 32]] from topic [baeldung.articles.published]
   at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.11.jar:2.8.11]
   ...
**Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('!' (code 33))**: expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
   **at [Source: (byte[])" !! Invalid JSON !! "; line: 1, column: 3]**
   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.5.jar:2.13.5]
   ...

然後,測試最終會超時並失敗。如果檢查斷言錯誤,我們會注意到只有第一個消息被成功處理:

org.awaitility.core.ConditionTimeoutException: Assertion condition 
Expecting actual:
  ["Introduction to Kafka"]
to contain exactly in any order:
  ["Introduction to Kafka", "Kafka Streams Tutorial"]
but could not find the following elements:
  ["Kafka Streams Tutorial"]
 within 5 seconds.

正如預期的那樣,對第二個消息的解序列化失敗。因此,監聽器繼續嘗試消費同一條消息,導致錯誤重複發生。

5. 創建錯誤處理程序

如果我們仔細分析故障日誌,我們會注意到以下兩點建議:

  • 考慮配置一個 ‘ErrorHandlingDeserializer‘;
  • 如果需要,請繼續消費記錄,超越記錄本身;

換句話説,我們可以創建一個自定義錯誤處理程序,該程序將處理反序列化異常並增加消費者的偏移量。這將允許我們跳過無效消息並繼續消費。

5.1. 實現 CommonErrorHandler 接口

為了實現 CommonErrorHandler 接口,我們需要覆蓋兩個公共方法,這些方法沒有默認實現:

  • handleOne() – 用於處理單個失敗的記錄;
  • handleOtherException() – 當拋出異常,但不是針對特定記錄時調用;

我們可以使用類似的方法來處理這兩個情況。首先,我們捕獲異常並記錄錯誤消息:

class KafkaErrorHandler implements CommonErrorHandler {

    @Override
    public void handleOne(Exception exception, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, MessageListenerContainer container) {
        handle(exception, consumer);
    }

    @Override
    public void handleOtherException(Exception exception, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        handle(exception, consumer);
    }

    private void handle(Exception exception, Consumer<?, ?> consumer) {
        log.error("Exception thrown", exception);
        // ...
    }
}

5.2. Kafka Consumer 的 <em seek()</em><em commitSync()</em>

我們可以使用 Consumer 接口中的 `` 方法手動更改特定主題內某個分區的當前偏移量。 簡單來説,我們可以根據偏移量來重新處理或跳過消息,以滿足需要。

在我們的情況下,如果異常是 RecordDeserializationException 的實例,我們將調用 `` 方法,傳入主題分區和下一個偏移量:

void handle(Exception exception, Consumer<?, ?> consumer) {
    log.error("Exception thrown", exception);
    if (exception instanceof RecordDeserializationException ex) {
        consumer.seek(ex.topicPartition(), ex.offset() + 1L);
        consumer.commitSync();
    } else {
        log.error("Exception not handled", exception);
    }
}

如我們所見,我們需要從 Consumer 接口中調用 commitSync() 方法。這將提交偏移量並確保 Kafka 代理確認新的位置並將其持久化。 這一步驟至關重要,因為它更新了消費者組提交的偏移量,指示已成功處理到調整後的位置的消息。

5.3. 更新配置

最後,我們需要將自定義錯誤處理程序添加到我們的消費者配置中。我們首先將其聲明為 @Bean

@Bean
CommonErrorHandler commonErrorHandler() {
    return new KafkaErrorHandler();
}

之後,我們將使用它的專用設置器向 ConcurrentKafkaListenerContainerFactory 添加新的 Bean:

@Bean
ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent> kafkaListenerContainerFactory(
  ConsumerFactory<String, ArticlePublishedEvent> consumerFactory,
  CommonErrorHandler commonErrorHandler
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, ArticlePublishedEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setCommonErrorHandler(commonErrorHandler);
    return factory;
}

當然,這是翻譯後的內容:

這就完成了!現在我們可以重新運行測試,並期望監聽器跳過無效消息並繼續消費消息。

6. 結論

在本文中,我們討論了 Spring Kafka 中的 <em >RecordDeserializationException</em>,並發現如果不正確處理,可能會阻塞消費者組針對特定分區的消費。

隨後,我們深入研究了 Kafka 的 <em >CommonErrorHandler</em> 接口,並將其實現到我們的監聽器中,從而使監聽器能夠處理反序列化失敗的情況,同時繼續處理消息。我們利用了消費者的 API 方法,特別是 <em >seek()</em><em >commitSync()</em>,通過調整消費者的偏移量來規避無效消息。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.