知識庫 / Spring / Spring Boot RSS 訂閱

測試 Kafka 和 Spring Boot

Spring Boot,Testing
HongKong
8
12:43 PM · Dec 06 ,2025

1. 概述

Apache Kafka 是一種強大的、分佈式、容錯的流處理系統。 在之前的教程中,我們學習瞭如何使用 Spring 和 Kafka。

在本教程中,我們將在此基礎上繼續學習,並學習如何編寫可靠、自包含的集成測試,這些測試不依賴於外部 Kafka 服務器運行。

首先,我們將從瞭解如何使用和配置嵌入式 Kafka 實例開始。

然後我們將看到如何利用流行的框架 Testcontainers從我們的測試中受益。

2. 依賴項

當然,我們需要將標準的 《spring-kafka》依賴項添加到我們的 pom.xml 中:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

接下來,我們需要兩個額外的依賴項專門用於我們的測試。

首先,我們將添加 spring-kafka-test 軟件包:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>3.1.1</version>
    <scope>test</scope>
</dependency>

最後,我們將添加 Testcontainers Kafka 依賴項,它也在 Maven Central 上可用:Maven Central

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

現在我們已經配置了所有必要的依賴項,就可以使用 Kafka 創建一個簡單的 Spring Boot 應用程序。

3. 一個簡單的 Kafka Producer-Consumer 應用

在本文教程中,我們將重點測試一個簡單的 Kafka Producer-Consumer Spring Boot 應用。

讓我們首先定義應用程序的入口點:

@SpringBootApplication
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

如我們所見,這是一個標準的 Spring Boot 應用。

3.1. Producer 設置

接下來,讓我們考慮一個 Producer Bean,用於將消息發送到指定的 Kafka 主題:

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

我們定義的 KafkaProducer Bean 只是一個包裝器,圍繞着 KafkaTemplate 類。 該類提供高層、線程安全的操作,例如將數據發送到提供的 topic,這正是我們在 send 方法中做的事情。

3.2. 消費者設置

同樣,我們現在將定義一個簡單的消費者 Bean,該 Bean 將監聽 Kafka 主題並接收消息:

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
        latch.countDown();
    }

    public void resetLatch() {
        latch = new CountDownLatch(1);
    }

    // other getters
}

我們的簡單消費者使用 @KafkaListener 註解在 receive 方法上,監聽指定主題的消息。稍後我們將看到如何配置 test.topic

此外,receive 方法將消息內容存儲在我們的 Bean 中,並遞減 latch 變量計數。 該變量是一個簡單的線程安全計數器字段,我們將稍後從我們的測試中使用它,以確保我們成功接收到消息。

現在我們已經實現了使用 Spring Boot 的簡單 Kafka 應用程序,讓我們看看如何編寫集成測試。

4. 關於測試的説明

在編寫乾淨的集成測試時,我們通常不應依賴我們無法控制或可能突然停止工作的外部服務。這可能會對我們的測試結果產生不利影響。

同樣,如果我們的測試依賴於外部服務,例如正在運行的 Kafka 代理,我們很可能無法以我們想要的方式從測試中設置、控制和清理它。

4.1. 應用程序屬性

我們將使用一套輕量級的應用程序配置屬性用於我們的測試。

我們將定義這些屬性在我們的 <em src/test/resources/application.yml</em> 文件中:

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

這是在與嵌入式 Kafka 實例或本地代理一起工作時所需的最基本屬性集。

其中大部分屬性含義自明,但我們應該特別強調的屬性是消費者屬性 auto-offset-reset: earliest

此屬性確保我們的消費者組能夠接收到發送的消息,因為容器可能在消息發送完成後才啓動。

此外,我們還配置了一個主題屬性,值為 embedded-test-topic,這是我們在測試中使用的主題。

5. 使用嵌入式 Kafka 進行測試

本節將介紹如何使用內存中的 Kafka 實例來運行我們的測試。 這也被稱為嵌入式 Kafka。

依賴項 <em >spring-kafka-test </em> 之前添加的,包含一些有用的工具,可以幫助我們測試應用程序。 其中最重要的是,它包含 EmbeddedKafkaBroker 類。

考慮到這一點,讓我們編寫第一個集成測試:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
        
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

讓我們來逐步瞭解我們的測試的關鍵部分。

首先,我們通過使用兩個標準的 Spring 註解來裝飾我們的測試類:

  • @SpringBootTest 註解將確保我們的測試啓動 Spring 應用上下文。
  • 我們還使用 @DirtiesContext 註解,它將確保在不同的測試之間上下文被清理和重置。

接下來是關鍵部分——我們使用 @EmbeddedKafka 註解將 EmbeddedKafkaBroker 實例注入到我們的測試中。

此外,還有一些可用的屬性可以用來配置嵌入式 Kafka 節點:

  • partitions – 這是每個主題中使用的分區數量。為了保持簡單,我們只想在測試中使用一個。
  • brokerProperties – 用於 Kafka 節點的附加屬性。同樣,我們保持簡單,並指定一個純文本監聽器和一個端口號。

接下來,我們自動注入 consumerproducer 類,並配置一個主題,使用 application.properties 中的值。

作為拼圖的最後一塊,我們簡單地將一條消息發送到我們的測試主題,並驗證消息已收到,並且包含我們的測試主題的名稱。

當我們運行測試時,在詳細的 Spring 輸出中,我們會看到:

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = Sending with our own simple KafkaProducer key)'

這已確認我們的測試正在正常運行。太棒了!我們現在可以使用內存中的 Kafka 代理編寫自包含、獨立的集成測試方法。

6. 使用 TestContainers 測試 Kafka

有時,實際外部服務與專門為測試目的而提供的嵌入式內存實例之間可能會出現細微差異。儘管這種情況不太可能發生,但測試端口可能已被佔用,從而導致失敗。

考慮到這一點,在本節中,我們將採用與之前章節類似的變化方法,使用 Testcontainers 框架進行測試。我們將學習如何實例化和管理一個託管在 Docker 容器內的外部 Apache Kafka 代理,該代理將用於集成測試。

讓我們定義一個與之前章節中看到的非常相似的集成測試:

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived() 
      throws Exception {
        String data = "Sending with our own simple KafkaProducer";
        
        producer.send(topic, data);
     
        boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
        
        assertTrue(messageConsumed);
        assertThat(consumer.getPayload(), containsString(data));
    }
}

讓我們來查看一下差異。我們聲明瞭 kafka 字段,這是一個標準的 JUnit @ClassRule此字段是 KafkaContainer 類的實例,它將準備和管理我們運行 Kafka 的容器的生命週期。

為了避免端口衝突,Testcontainers 在我們的 Docker 容器啓動時動態分配端口號。

因此,我們使用類 KafkaTestContainersConfiguration 提供自定義的消費者和生產者工廠配置:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // more standard configuration
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // more standard configuration
    return new DefaultKafkaProducerFactory<>(configProps);
}

我們隨後通過在測試的開頭使用 @Import 註解引用此配置。

之所以這樣做是因為我們需要將服務器地址注入到我們的應用程序中,正如之前提到的,該地址是動態生成的。

我們通過調用 getBootstrapServers() 方法來實現這一點,該方法將返回引導服務器的位置。

bootstrap.servers = [PLAINTEXT://localhost:32789]

現在當我們運行測試時,我們應該看到 Testcontainers 會執行以下操作:

  • 檢查本地 Docker 設置
  • 如果需要,拉取 confluentinc/cp-kafka:5.4.3 鏡像
  • 啓動一個新的容器並等待其準備就緒
  • 最後,在測試完成後關閉並刪除該容器

再次強調,這已通過檢查測試輸出得到證實:

13:33:10.396 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

咔嚓!使用 Kafka Docker 容器的運行集成測試。

7. 結論

在本文中,我們學習了使用 Spring Boot 測試 Kafka 應用程序的幾種方法。

在第一種方法中,我們瞭解瞭如何配置和使用本地內存 Kafka 代理。

然後,我們瞭解瞭如何使用 Testcontainers 設置一個外部 Kafka 代理,該代理在 Docker 容器內運行,以便從測試中運行。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.