知識庫 / Spring RSS 訂閱

消費延遲的 Kafka 消息處理

Spring
HongKong
4
11:12 AM · Dec 06 ,2025

1. 概述

Apache Kafka 是一個事件流平台,用於大規模地收集、處理、存儲和集成數據。有時,我們可能需要延遲處理 Kafka 中的消息。例如,一個客户訂單處理系統旨在在 X 秒的延遲後處理訂單,以容納在此時間範圍內進行的取消操作。

在本文中,我們將探討使用 Spring Kafka 延遲處理 Kafka 消息的消費者處理方式。雖然 Kafka 本身不提供內置的延遲消息消費支持,但我們將探討一種實現替代方案。

2. 應用上下文

Kafka 提供了多種在錯誤發生時重試的方法。我們將使用此重試機制來延遲消費者處理消息的時間。因此,理解 Kafka 的重試機制非常重要。

考慮一個訂單處理應用程序,其中客户可以在 UI 上下訂單。用户可以在 10 秒內取消誤下單。這些訂單會發送到 Kafka 主題 web.orders,我們的應用程序在此處理這些訂單。

一個外部服務暴露了最新的訂單狀態(CREATED, ORDER_CONFIRMED, ORDER_PROCESSED, DELETED)。我們的應用程序需要接收消息,等待 10 秒,並與外部服務進行檢查,以確認訂單狀態是否為 CONFIRMED,即在 10 秒內未被用户取消。

為了測試,來自 web.orders.internal 的內部訂單不應延遲。

讓我們添加一個簡單的 Order 模型,其中 orderGeneratedDateTime 由生產者填充,以及在延遲期間由消費者填充的 orderProcessedTime

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private UUID orderId;

    private LocalDateTime orderGeneratedDateTime;

    private LocalDateTime orderProcessedTime;

    private List<String> address;

    private double price;
}

3. Kafka 監聽器與外部服務

接下來,我們將添加一個用於消費主題的監聽器和一個暴露訂單狀態的服務。

讓我們添加一個 <a href="https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html">KafkaListener</a>,它讀取並處理來自 web.ordersweb.internal.orders 主題的消息:

@RetryableTopic(attempts = "1", include = KafkaBackoffException.class, dltStrategy = DltStrategy.NO_DLT)
@KafkaListener(topics = { "web.orders", "web.internal.orders" }, groupId = "orders")
public void handleOrders(String order) throws JsonProcessingException {
    Order orderDetails = objectMapper.readValue(order, Order.class);
    OrderService.Status orderStatus = orderService.findStatusById(orderDetails.getOrderId());
    if (orderStatus.equals(OrderService.Status.ORDER_CONFIRMED)) {
        orderService.processOrder(orderDetails);
    }
}

為了確保監聽器允許重試,必須包含 KafkaBackoffException。為了簡化起見,我們假設外部 OrderService 始終返回訂單狀態為 CONFIRMED。 此外,processOrder() 方法將訂單處理時間設置為當前時間,並將訂單保存到 HashMap 中:

@Service
public class OrderService {

    HashMap<UUID, Order> orders = new HashMap<>();

    public Status findStatusById(UUID orderId) {
        return Status.ORDER_CONFIRMED;
    }

    public void processOrder(Order order) {
        order.setOrderProcessedTime(LocalDateTime.now());
        orders.put(order.getOrderId(), order);
    }
}

4. 自定義延遲消息監聽器

Spring-Kafka 提供 KafkaBackoffAwareMessageListenerAdapter,它擴展了 AbstractAdaptableMessageListener 並實現了 AcknowledgingConsumerAwareMessageListenerdueTimestamp header and either backs off the message by invoking KafkaConsumerBackoffManager or retries the processing.">此適配器檢查 dueTimestamp 背調時間頭,並根據需要通過調用 KafkaConsumerBackoffManager 進行背調或重試處理。

現在,讓我們實現與 KafkaBackoffAwareMessageListenerAdapter 類似的 DelayedMessageListenerAdapter,該適配器應提供按主題配置延遲的靈活性,以及默認延遲為 0 秒:

public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<MessageListener<K, V>> 
  implements AcknowledgingConsumerAwareMessageListener<K, V> {

    // Field declaration and constructor

    public void setDelayForTopic(String topic, Duration delay) {
        Objects.requireNonNull(topic, "Topic cannot be null");
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.delaysPerTopic.put(topic, delay);
    }

    public void setDefaultDelay(Duration delay) {
        Objects.requireNonNull(delay, "Delay cannot be null");
        this.logger.debug(() -> String.format("Setting delay %s for listener id %s", delay, this.listenerId));
        this.defaultDelay = delay;
    }

    @Override
    public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws KafkaBackoffException {
        this.kafkaConsumerBackoffManager.backOffIfNecessary(createContext(consumerRecord,
          consumerRecord.timestamp() + delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay)
          .toMillis(), consumer));
        invokeDelegateOnMessage(consumerRecord, acknowledgment, consumer);
    }

    private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp, Consumer<?, ?> consumer) {
        return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, 
          this.listenerId, 
          new TopicPartition(data.topic(), data.partition()), consumer);
    }
}

對於每一條傳入的消息,此適配器首先接收記錄並檢查主題中設置的延遲。該延遲將在配置中設置,如果未設置,則使用默認延遲。

現有的 KafkaConsumerBackoffManager#backOffIfNecessary 方法檢查上下文記錄的時間戳與當前時間戳之間的差值。如果差值為正數,表明沒有消費的必要,則分區會暫停並拋出 KafkaBackoffException。否則,它會將記錄發送到 KafkaListener 方法進行消費。

5. 監聽器配置

ConcurrentKafkaListenerContainerFactory 是 Spring Kafka 的默認實現,負責構建 KafkaListener 容器。 它允許我們配置併發的 KafkaListener 實例數量。 每個容器可以被視為一個邏輯線程池,其中每個線程負責從一個或多個 Kafka 主題中監聽消息。

DelayedMessageListenerAdapter 需要使用自定義的 ConcurrentKafkaListenerContainerFactory 配置監聽器。 我們可以為特定的主題(例如 web.orders)設置延遲,並且還可以為任何其他主題設置默認延遲為 0

@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(ConsumerFactory<Object, Object> consumerFactory, 
  ListenerContainerRegistry registry, TaskScheduler scheduler) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    KafkaConsumerBackoffManager backOffManager = createBackOffManager(registry, scheduler);
    factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
    factory.setContainerCustomizer(container -> {
        DelayedMessageListenerAdapter<Object, Object> delayedAdapter = wrapWithDelayedMessageListenerAdapter(backOffManager, container);
        delayedAdapter.setDelayForTopic("web.orders", Duration.ofSeconds(10));
        delayedAdapter.setDefaultDelay(Duration.ZERO);
        container.setupMessageListener(delayedAdapter);
    });
    return factory;
}

@SuppressWarnings("unchecked")
private DelayedMessageListenerAdapter<Object, Object> wrapWithDelayedMessageListenerAdapter(KafkaConsumerBackoffManager backOffManager, 
  ConcurrentMessageListenerContainer<Object, Object> container) {
    return new DelayedMessageListenerAdapter<>((MessageListener<Object, Object>) container.getContainerProperties()
      .getMessageListener(), backOffManager, container.getListenerId());
}

private ContainerPartitionPausingBackOffManager createBackOffManager(ListenerContainerRegistry registry, TaskScheduler scheduler) {
    return new ContainerPartitionPausingBackOffManager(registry, 
      new ContainerPausingBackOffHandler(new ListenerContainerPauseService(registry, scheduler)));
}

特別地,將確認模式設置為 RECORD 級別至關重要,以確保消費者在處理過程中發生錯誤時重新傳遞消息。

最後,我們需要定義一個 TaskScheduler Bean,以便在延遲期間恢復暫停的分區,並且該調度器需要注入到 BackOffManager 中,該管理器將由 DelayedMessageListenerAdapter 使用:

@Bean
public TaskScheduler taskScheduler() {
    return new ThreadPoolTaskScheduler();
}

6. 測試

確保所有在 web.orders 主題下的訂單在進行測試之前,先延遲 10 秒鐘進行處理:

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceived_thenMessageShouldBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // then
          Map<UUID, Order> orders = orderService.getOrders();
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() >= 10;
      });
}

接下來,我們將測試任何訂單會遵循默認延遲 0 秒,並跟蹤到 web.internal.orders

@Test
void givenKafkaBrokerExists_whenCreateOrderIsReceivedForOtherTopics_thenMessageShouldNotBeDelayed() throws Exception {
    // Given
    var orderId = UUID.randomUUID();
    Order order = Order.builder()
      .orderId(orderId)
      .price(1.0)
      .orderGeneratedDateTime(LocalDateTime.now())
      .address(List.of("41 Felix Avenue, Luton"))
      .build();

    String orderString = objectMapper.writeValueAsString(order);
    ProducerRecord<String, String> record = new ProducerRecord<>("web.internal.orders", orderString);
    
    // When
    testKafkaProducer.send(record)
      .get();
    await().atMost(Duration.ofSeconds(1800))
      .until(() -> {
          // Then
          Map<UUID, Order> orders = orderService.getOrders();
          System.out.println("Time...." + Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds());
          return orders != null && orders.get(orderId) != null && Duration.between(orders.get(orderId)
              .getOrderGeneratedDateTime(), orders.get(orderId)
              .getOrderProcessedTime())
            .getSeconds() <= 1;
      });
}

7. 結論

在本教程中,我們探討了如何通過固定間隔來延遲 Kafka 消費者處理消息的方式。

我們可以修改實現,通過將嵌入式消息持續時間作為消息的一部分,動態地設置處理延遲。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.