知識庫 / Spring / Spring Cloud RSS 訂閱

使用 Kafka、Apache Avro 和 Confluent Schema Registry 構建 Spring Cloud Stream 指南

Spring Cloud
HongKong
6
01:20 PM · Dec 06 ,2025

1. 簡介

Apache Kafka 是一個消息平台。 通過它,我們可以以大規模的方式在不同的應用程序之間交換數據。

Spring Cloud Stream 是構建消息驅動應用程序的框架。它能夠簡化 Kafka 集成到我們的服務中的過程。

通常情況下,Kafka 與 Avro 消息格式一起使用,並由一個模式註冊器支持。在本教程中,我們將使用 Confluent Schema Registry。 我們將嘗試使用 Spring 實現的與 Confluent Schema Registry 集成,以及 Confluent 提供的原生庫。

2. Confluent Schema Registry

Kafka 將所有數據表示為字節,因此通常 使用外部模式並根據該模式進行序列化和反序列化為字節。而不是為每個消息提供該模式的副本,這會造成昂貴的開銷,因此通常將模式存儲在註冊表中,併為每個消息提供僅 ID。

Confluent Schema Registry 提供了一種便捷的方式來存儲、檢索和管理模式。它暴露了多個有用的 RESTful API

模式按主題存儲,默認情況下,註冊表在上傳新模式到主題之前會進行兼容性檢查。

每個生產者都會知道它正在生產的模式,每個消費者應該能夠以任何格式消費數據,或者應該有一個它偏好的特定模式進行讀取。 生產者會諮詢註冊表以確定用於發送消息的正確 ID。消費者使用註冊表來獲取發送者的模式。

當消費者既知道發送者的模式,也知道其自己的所需消息格式時,Avro 庫可以將其數據轉換為消費者的所需格式。

3. Apache Avro

Apache Avro 是一種數據序列化系統。

它使用 JSON 結構來定義模式,從而實現字節與結構化數據之間的序列化。

Avro 的優勢在於它支持在一種模式版本中編寫的消息,可以轉換為與兼容的替代模式定義的格式。

Avro 工具集還可以生成類來表示這些模式的數據結構,從而輕鬆地進行入和出序列化,例如到和從 POJO。

4. 項目設置

要使用 schema 註冊器與 Spring Cloud Stream 配合,我們需要以下 Maven 依賴項:Spring Cloud Kafka Binderschema 註冊器

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

對於 confluent 的序列化器,我們需要:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>4.0.0</version>
</dependency>

confluent 的序列化器位於他們的倉庫中:

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

此外,我們還將使用一個 Maven 插件 來生成 Avro 類:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>                        
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

為了測試,我們可以使用現有的 Kafka 和 Schema Registry 部署,或者使用容器化的 Confluent 和 Kafka (容器化 Confluent 和 Kafka)。

5. Spring Cloud Stream

現在我們已經完成了項目的搭建,接下來我們將使用 Spring Cloud Stream 創建一個生產者,它將在一個主題上發佈員工信息。

然後,我們將創建一個消費者,它將從該主題讀取事件並將其寫入日誌語句中。

5.1. 模式

首先,我們定義一個員工信息的模式。我們可以將其命名為 employee-schema.avsc

我們可以將模式文件保存在 src/main/resources 中。

{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
    {
        "name": "id",
        "type": "int"
    },
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "lastName",
        "type": "string"
    }]
}

創建上述模式後,我們需要構建項目。然後,Apache Avro代碼生成器將在 com.baeldung.schema 包下創建一個名為 Employee 的 POJO。

5.2. 生產者

Spring Cloud Stream 提供 處理器 接口。 這使我們能夠使用輸出和輸入通道。

讓我們使用它來創建一個生產者,該生產者將 Employee 對象發送到 employee-details Kafka 主題:

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    employee.setFirstName(firstName);
    employee.setLastName(lastName);

    Message<Employee> message = MessageBuilder.withPayload(employee)
                .build();

    processor.output()
        .send(message);
}

5.2. 消費者

現在,讓我們編寫我們的消費者:

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("Let's process employee details: {}", employeeDetails);
}

這個消費者將讀取在 employee-details 主題上發佈的消息。為了查看它做了什麼,我們將它的輸出定向到日誌中。

5.3. Kafka 綁定

目前我們僅對 inputoutput 渠道(通道)以及我們的 Processor 對象進行操作。這些通道需要配置正確的 Kafka 目標。

請使用 application.yml 文件來提供 Kafka 綁定:

spring:
  cloud:
    stream: 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro

我們應指出的是,在此處,destination 指的是 Kafka 主題。由於在此處它被認為是輸入源,因此可能令人感到困惑,因為它被稱為 destination。但請注意,這在消費者和生產者中是一個保持一致的術語。

5.4. 啓動點

現在我們已經有了生產者和消費者,接下來我們將暴露一個 API,用於接收用户輸入並將其傳遞給生產者:

@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, 
  @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "Sent employee details to consumer";
}

5.5. 啓用 Confluent Schema Registry 和綁定

為了使我們的應用程序應用 Kafka 和 Schema Registry 綁定,我們需要在其中一個配置類上添加 <em @EnableBinding</em><em @EnableSchemaRegistryClient</em> 註解。

@SpringBootApplication
@EnableBinding(Processor.class)
// The @EnableSchemaRegistryClient annotation needs to be uncommented to use the Spring native method.
// @EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }

}

我們應該提供一個 ConfluentSchemaRegistryClient Bean:

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}

endPoint 是 Confluent Schema Registry 的 URL。

5.6. 測試我們的服務

讓我們通過 POST 請求測試該服務:

curl -X POST localhost:8080/employees/1001/Harry/Potter

日誌表明此操作已成功。

2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

5.7. 處理過程中發生了什麼?

讓我們來了解一下在我們的示例應用程序中究竟發生了什麼:

  1. 生產者使用 Employee 對象構建 Kafka 消息。
  2. 生產者將員工模式註冊到模式註冊表中獲取模式版本 ID,這要麼創建一個新的 ID,要麼重用該確切模式的現有 ID。
  3. Avro 使用模式序列化了 Employee 對象。
  4. Spring Cloud 將模式 ID 放入消息頭中。
  5. 消息已發佈到主題上。
  6. 當消息到達消費者時,它從頭中讀取模式 ID。
  7. 消費者使用模式 ID 從註冊表中獲取 Employee 模式。
  8. 消費者找到了可以表示該對象的本地類,並將消息反序列化到其中。

6. 使用原生 Kafka 庫進行序列化/反序列化

Spring Boot 提供了一些內置的消息轉換器。默認情況下,Spring Boot 使用 Content-Type 標頭來選擇合適的消息轉換器。

在我們的示例中,Content-Typeapplication/*+avro, 因此它使用了 AvroSchemaMessageConverter 來讀取和寫入 Avro 格式。但是,Confluent 建議使用 KafkaAvroSerializer KafkaAvroDeserializer 進行消息轉換。

雖然 Spring 自己的格式在某些情況下表現良好,但它在分區方面存在一些缺點,並且與 Confluent 標準不兼容,這可能會影響 Kafka 實例上的一些非 Spring 服務。

讓我們更新我們的 application.yml 以使用 Confluent 轉換器:

spring:
  cloud:
    stream:
      default: 
        producer: 
          useNativeEncoding: true
        consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
         binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081 
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true

我們已啓用 useNativeEncoding。它強制 Spring Cloud Stream 將序列化委託給提供的類。

我們還應該瞭解如何在 Spring Cloud 中使用 kafka.binder.producer-propertieskafka.binder.consumer-properties 來提供原生設置屬性。

7. 消費者組和分區

消費者組是屬於同一應用程序的消費者集合。同一消費者組的消費者共享相同的組名。

請更新 application.yml 以添加消費者組名:

spring:
  cloud:
    stream:
      // ...     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
      // ...

所有消費者會均勻地將主題分區分配給它們。不同分區的消息將並行處理。

**在消費者組中,同時讀取消息的最大消費者數量等於分區數量。** 因此,我們可以配置分區和消費者的數量以獲得所需的並行性。通常,我們應該比所有服務的副本上的消費者的總數還要多分區。

7.1. 分區鍵 (Partition Key)

當處理我們的消息時,消息的處理順序可能很重要。當我們的消息並行處理時,很難控制處理的順序。

Kafka 提供了一個規則,即 在給定的分區內,消息始終按照它們到達的順序進行處理。因此,當某些消息需要按照正確的順序處理時,我們確保它們都落入同一個分區。

我們可以發送消息到主題時提供分區鍵。 具有相同分區鍵的消息將始終進入同一個分區。如果未指定分區鍵,消息將採用輪詢方式進行分區。

讓我們通過一個例子來理解這一點。 假設我們接收到多個員工的消息,並且我們希望按照正確的順序處理所有員工的消息。部門名稱和員工 ID 可以唯一標識一個員工。

因此,讓我們使用員工 ID 和部門名稱定義分區鍵:

{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
     {
        "name": "id",
        "type": "int"
    },
    {
        "name": "departmentName",
        "type": "string"
    }]
}

在構建項目後,EmployeeKey POJO 將在 com.baeldung.schema 包下生成。

讓我們更新生產者,使用 EmployeeKey 作為分區鍵:

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    // ...

    // creating partition key for kafka topic
    EmployeeKey employeeKey = new EmployeeKey();
    employeeKey.setId(empId);
    employeeKey.setDepartmentName("IT");

    Message<Employee> message = MessageBuilder.withPayload(employee)
        .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
        .build();

    processor.output()
        .send(message);
}

在這裏,我們把分區鍵放在消息頭中。

現在,相同的分區將接收具有相同員工 ID 和部門名稱的消息。

7.2. 消費者併發

Spring Cloud Stream 允許我們通過在 application.yml 中設置,為消費者配置併發數:

spring:
  cloud:
    stream:
      // ... 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3

現在,我們的消費者將併發地從該主題讀取三個消息。換句話説,Spring 將啓動三個不同的線程,獨立地進行消費。

8. 結論

在本文中,我們整合了使用 Apache KafkaAvro 模式Confluent Schema Registry 的生產者和消費者。

我們通過一個應用程序完成了這個整合,但生產者和消費者可以部署在不同的應用程序中,並且可以擁有各自的模式版本,這些版本通過註冊表保持同步。

我們研究了 Spring 中 Avro 和 Schema Registry 客户端的實現,並探討了如何使用 Confluent 標準實現進行序列化和反序列化,以實現互操作性。

最後,我們研究瞭如何分區我們的主題,並確保我們擁有正確的消息鍵,從而實現消息的 safe 並行處理。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.