知識庫 / Spring / Spring Boot RSS 訂閱

使用 ReplyingKafkaTemplate 與 Apache Kafka 同步通信

Spring Boot
HongKong
5
10:47 AM · Dec 06 ,2025

1. 概述

Apache Kafka 已成為構建事件驅動架構的最受歡迎和廣泛使用的消息傳遞系統之一。其中一個微服務將消息發佈到主題,而另一個微服務異步地消費和處理該消息。

然而,在發佈者微服務需要立即響應以繼續進行進一步處理的情況下,存在一些場景。雖然 Kafka 本質上是為異步通信而設計的,但可以通過使用單獨的主題來支持通過 請求-響應 通信。

在本教程中,我們將探討如何使用 Apache Kafka 在 Spring Boot 應用程序中實現同步請求-響應通信。

2. 設置項目

為了演示我們的系統,我們將模擬一個通知分發系統。我們將創建一個單體 Spring Boot 應用程序,它將同時充當生產者和消費者。

2.1. 依賴項

讓我們首先將 Spring Kafka 依賴項 添加到我們項目的 pom.xml 文件中:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.3.4</version>
</dependency>

這個依賴項為我們提供了必要的類,用於與配置好的 Kafka 實例建立連接並與之交互。

2.2. 定義請求-響應消息

接下來,讓我們定義兩個記錄來表示我們的請求和響應消息:

record NotificationDispatchRequest(String emailId, String content) {
}

public record NotificationDispatchResponse(UUID notificationId) {
}

在此,NotificationDispatchRequest 記錄包含通知的 emailIdcontent,而 NotificationDispatchResponse 記錄包含一個在處理請求後生成的唯一 notificationId

2.3. 定義 Kafka 主題和配置屬性

現在,讓我們定義請求和回覆 Kafka 主題。 此外,我們還將為從消費者組件接收回復設置一個超時持續時間。

我們將這些屬性存儲在項目的 <em >application.yaml</em> 文件中,並使用 <em >@ConfigurationProperties</em> 將值映射到 Java 記錄,以便我們的配置和服務層可以引用該記錄:

@Validated
@ConfigurationProperties(prefix = "com.baeldung.kafka.synchronous")
record SynchronousKafkaProperties(
    @NotBlank
    String requestTopic,

    @NotBlank
    String replyTopic,

    @NotNull @DurationMin(seconds = 10) @DurationMax(minutes = 2)
    Duration replyTimeout
) {
}

我們還添加了驗證註釋,以確保所有必需的屬性已正確配置。如果定義的驗證失敗,Spring ApplicationContext 將無法啓動。 這使我們能夠遵循“快速失敗”原則

以下是我們的 application.yaml 文件的片段,它定義了將自動映射到我們的 SynchronousKafkaProperties 記錄所需的屬性:

com:
  baeldung:
    kafka:
      synchronous:
        request-topic: notification-dispatch-request
        reply-topic: notification-dispatch-response
        reply-timeout: 30s

在這裏,我們配置了請求和回覆的 Kafka 主題名稱,以及一個回覆超時時間為三十秒。

除了我們自定義的屬性之外,我們還將添加一些核心 Kafka 配置屬性到我們的 application.yaml 文件中:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: synchronous-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: com.baeldung.kafka.synchronous
    properties:
      allow:
        auto:
          create:
            topics: true

首先,為了允許我們的應用程序連接到配置好的 Kafka 實例,我們使用環境變量配置其 bootstrap 服務器 URL。

接下來,我們配置了消費者和生產者的鍵值序列化和反序列化屬性。 此外,對於我們的 消費者,我們配置了 group-id,並信任包含我們請求-響應記錄的包,用於 JSON 反序列化。

配置以上屬性後,Spring Kafka 會自動為我們創建 ConsumerFactory 和 ProducerFactory 類型的 Bean

最後,我們啓用了自動創建主題的功能,因此 Kafka 會在主題不存在時自動創建它們。 請注意,我們僅將此屬性用於演示,不應在生產應用程序中執行此操作。

2.4. 定義 Kafka 配置 Bean

有了配置屬性已就位,現在我們來定義必要的 Kafka 配置 Bean:

@Bean
KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer(
    ConsumerFactory<String, NotificationDispatchResponse> consumerFactory
) {
    String replyTopic = synchronousKafkaProperties.replyTopic();
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}

首先,我們注入 ConsumerFactory 實例並使用它,結合配置好的 replyTopic,創建 KafkaMessageListenerContainer Bean。 該 Bean 負責創建容器,用於從我們的 reply topic 中輪詢消息

接下來,我們將定義用於在服務層中執行同步通信的核心 Bean:

@Bean
ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate(
    ProducerFactory<String, NotificationDispatchRequest> producerFactory,
    KafkaMessageListenerContainer<String, NotificationDispatchResponse> kafkaMessageListenerContainer
) {
    Duration replyTimeout = synchronousKafkaProperties.replyTimeout();
    var replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    replyingKafkaTemplate.setDefaultReplyTimeout(replyTimeout);
    return replyingKafkaTemplate;
}

使用 <em >ProducerFactory</em > 和先前定義的 <em >KafkaMessageListenerContainer</em > Bean,我們創建了一個 <em >ReplyingKafkaTemplate</em > Bean。 此外,利用自動注入的 <em >synchronousKafkaProperties</em >>,我們配置了在 <em >application.yaml</em > 文件中定義的回覆超時時間,這將決定我們的服務在等待響應之前超時的時間長度。

這個 <em >ReplyingKafkaTemplate</em > Bean 管理了請求和回覆主題之間的交互,從而使 Kafka 上的同步通信成為可能。

最後,讓我們定義 Bean 以啓用我們的監聽器組件向回覆主題發送響應:

@Bean
KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate(ProducerFactory<String, NotificationDispatchResponse> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, NotificationDispatchRequest>> kafkaListenerContainerFactory(
    ConsumerFactory<String, NotificationDispatchRequest> consumerFactory,
    KafkaTemplate<String, NotificationDispatchResponse> kafkaTemplate
) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, NotificationDispatchRequest>();
    factory.setConsumerFactory(consumerFactory);
    factory.setReplyTemplate(kafkaTemplate);
    return factory;
}

首先,我們使用 ProducerFactory bean 創建一個標準 KafkaTemplate bean。

然後,我們使用它與 ConsumerFactory bean 一起定義 KafkaListenerContainerFactory bean。 該 bean 允許我們的監聽器組件從請求主題消費消息,並在完成所需處理後將消息發送回回復主題

3. 使用 Kafka 實現同步通信

配置就緒後,我們將實現兩個配置好的 Kafka 主題之間的同步請求-響應通信。

3.1. 使用 ReplyingKafkaTemplate 發送和接收消息

首先,讓我們創建一個 NotificationDispatchService 類,該類使用我們之前定義的 ReplyingKafkaTemplate bean,向配置好的請求主題發送消息:

@Service
@EnableConfigurationProperties(SynchronousKafkaProperties.class)
class NotificationDispatchService {

    private final SynchronousKafkaProperties synchronousKafkaProperties;
    private final ReplyingKafkaTemplate<String, NotificationDispatchRequest, NotificationDispatchResponse> replyingKafkaTemplate;

    // standard constructor

    NotificationDispatchResponse dispatch(NotificationDispatchRequest notificationDispatchRequest) {
        String requestTopic = synchronousKafkaProperties.requestTopic();
        ProducerRecord<String, NotificationDispatchRequest> producerRecord = new ProducerRecord<>(requestTopic, notificationDispatchRequest);

        var requestReplyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
        return requestReplyFuture.get().value();
    }
}

在這裏,在我們的 dispatch() 方法中,我們使用自動注入的 synchronousKafkaProperties 實例來提取來自我們 application.yaml 文件的 requestTopic。然後,我們使用它,以及方法參數中傳遞的 notificationDispatchRequest 來創建一個 ProducerRecord 實例。

接下來,我們將創建的 ProducerRecord 實例傳遞給 sendAndReceive() 方法,以將消息發佈到請求主題。該方法返回一個 RequestReplyFuture 對象,我們使用它來等待響應,然後返回其值。

在幕後,當我們調用 sendAndReceive() 方法時,ReplyingKafkaTemplate 類會生成一個唯一的關聯 ID,這是一個隨機 UUID,並將它附加到發往消息的標頭中。此外,它還會添加一個包含響應主題名稱的標頭,其中它期望收到響應。請記住,我們已經在 KafkaMessageListenerContainer bean 中配置了響應主題。

ReplyingKafkaTemplate bean 使用生成的關聯 ID 作為鍵,將 RequestReplyFuture 對象存儲在線程安全的 ConcurrentHashMap 中。 這使得它能夠在多線程環境中工作,並支持併發請求

3.2. 定義 Kafka 消息監聽器

接下來,為了完成我們的實現,我們創建一個監聽器組件,該組件監聽配置好的請求主題中的消息,並將響應發送回回復主題:

@Component
class NotificationDispatchListener {

    @SendTo
    @KafkaListener(topics = "${com.baeldung.kafka.synchronous.request-topic}")
    NotificationDispatchResponse listen(NotificationDispatchRequest notificationDispatchRequest) {
        // ... processing logic
        UUID notificationId = UUID.randomUUID();
        return new NotificationDispatchResponse(notificationId);
    }
}

我們使用 @KafkaListener 標註來監聽我們在 application.yaml 文件中配置的請求主題。

在我們的 listen() 方法中,我們簡單地返回一個 NotificationDispatchResponse 記錄,其中包含一個唯一的 notificationId

重要的是,我們使用 @SendTo 標註我們的方法,這會指示 Spring Kafka 從消息頭中提取相關 ID 和回覆主題名稱。 它使用它們自動將方法返回值發送到提取的回覆主題,並將相同的相關 ID 添加到消息頭中

這使得我們在 NotificationDispatchService 類中的 ReplyingKafkaTemplate Bean 可以使用它最初生成的關聯 ID 檢索正確的 RequestReplyFuture 對象。

4. 結論

在本文中,我們探討了如何使用 Apache Kafka 在 Spring Boot 應用程序的兩個組件之間實現同步通信。

我們完成了必要的配置並模擬了一個通知分發系統。

通過使用 ReplyingKafkaTemplate,我們可以將 Apache Kafka 的異步特性轉換為同步的請求-響應模式。由於這種方法有些不常見,因此在生產環境中實施之前,務必仔細評估它是否與項目的架構相符。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.