1. 簡介
Apache Kafka 是一個消息平台。 通過它,我們可以以大規模的方式在不同的應用程序之間交換數據。
Spring Cloud Stream 是構建消息驅動應用程序的框架。它能夠簡化 Kafka 集成到我們的服務中的過程。
通常情況下,Kafka 與 Avro 消息格式一起使用,並由一個模式註冊器支持。在本教程中,我們將使用 Confluent Schema Registry。 我們將嘗試使用 Spring 實現的與 Confluent Schema Registry 集成,以及 Confluent 提供的原生庫。
2. Confluent Schema Registry
Kafka 將所有數據表示為字節,因此通常 使用外部模式並根據該模式進行序列化和反序列化為字節。而不是為每個消息提供該模式的副本,這會造成昂貴的開銷,因此通常將模式存儲在註冊表中,併為每個消息提供僅 ID。
Confluent Schema Registry 提供了一種便捷的方式來存儲、檢索和管理模式。它暴露了多個有用的 RESTful API。
模式按主題存儲,默認情況下,註冊表在上傳新模式到主題之前會進行兼容性檢查。
每個生產者都會知道它正在生產的模式,每個消費者應該能夠以任何格式消費數據,或者應該有一個它偏好的特定模式進行讀取。 生產者會諮詢註冊表以確定用於發送消息的正確 ID。消費者使用註冊表來獲取發送者的模式。
當消費者既知道發送者的模式,也知道其自己的所需消息格式時,Avro 庫可以將其數據轉換為消費者的所需格式。
3. Apache Avro
Apache Avro 是一種數據序列化系統。
它使用 JSON 結構來定義模式,從而實現字節與結構化數據之間的序列化。
Avro 的優勢在於它支持在一種模式版本中編寫的消息,可以轉換為與兼容的替代模式定義的格式。
Avro 工具集還可以生成類來表示這些模式的數據結構,從而輕鬆地進行入和出序列化,例如到和從 POJO。
4. 項目設置
要使用 schema 註冊器與 Spring Cloud Stream 配合,我們需要以下 Maven 依賴項:Spring Cloud Kafka Binder 和 schema 註冊器。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
</dependency>對於 confluent 的序列化器,我們需要:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>4.0.0</version>
</dependency>confluent 的序列化器位於他們的倉庫中:
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>此外,我們還將使用一個 Maven 插件 來生成 Avro 類:
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>為了測試,我們可以使用現有的 Kafka 和 Schema Registry 部署,或者使用容器化的 Confluent 和 Kafka (容器化 Confluent 和 Kafka)。
5. Spring Cloud Stream
現在我們已經完成了項目的搭建,接下來我們將使用 Spring Cloud Stream 創建一個生產者,它將在一個主題上發佈員工信息。
然後,我們將創建一個消費者,它將從該主題讀取事件並將其寫入日誌語句中。
5.1. 模式
首先,我們定義一個員工信息的模式。我們可以將其命名為 employee-schema.avsc。
我們可以將模式文件保存在 src/main/resources 中。
{
"type": "record",
"name": "Employee",
"namespace": "com.baeldung.schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
}]
}創建上述模式後,我們需要構建項目。然後,Apache Avro代碼生成器將在 com.baeldung.schema 包下創建一個名為 Employee 的 POJO。
5.2. 生產者
Spring Cloud Stream 提供 處理器 接口。 這使我們能夠使用輸出和輸入通道。
讓我們使用它來創建一個生產者,該生產者將 Employee 對象發送到 employee-details Kafka 主題:
@Autowired
private Processor processor;
public void produceEmployeeDetails(int empId, String firstName, String lastName) {
// creating employee details
Employee employee = new Employee();
employee.setId(empId);
employee.setFirstName(firstName);
employee.setLastName(lastName);
Message<Employee> message = MessageBuilder.withPayload(employee)
.build();
processor.output()
.send(message);
}5.2. 消費者
現在,讓我們編寫我們的消費者:
@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
logger.info("Let's process employee details: {}", employeeDetails);
}這個消費者將讀取在 employee-details 主題上發佈的消息。為了查看它做了什麼,我們將它的輸出定向到日誌中。
5.3. Kafka 綁定
目前我們僅對 input 和 output 渠道(通道)以及我們的 Processor 對象進行操作。這些通道需要配置正確的 Kafka 目標。
請使用 application.yml 文件來提供 Kafka 綁定:
spring:
cloud:
stream:
bindings:
input:
destination: employee-details
content-type: application/*+avro
output:
destination: employee-details
content-type: application/*+avro我們應指出的是,在此處,destination 指的是 Kafka 主題。由於在此處它被認為是輸入源,因此可能令人感到困惑,因為它被稱為 destination。但請注意,這在消費者和生產者中是一個保持一致的術語。
5.4. 啓動點
現在我們已經有了生產者和消費者,接下來我們將暴露一個 API,用於接收用户輸入並將其傳遞給生產者:
@Autowired
private AvroProducer avroProducer;
@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName,
@PathVariable String lastName) {
avroProducer.produceEmployeeDetails(id, firstName, lastName);
return "Sent employee details to consumer";
}5.5. 啓用 Confluent Schema Registry 和綁定
為了使我們的應用程序應用 Kafka 和 Schema Registry 綁定,我們需要在其中一個配置類上添加 <em @EnableBinding</em> 和 <em @EnableSchemaRegistryClient</em> 註解。
@SpringBootApplication
@EnableBinding(Processor.class)
// The @EnableSchemaRegistryClient annotation needs to be uncommented to use the Spring native method.
// @EnableSchemaRegistryClient
public class AvroKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(AvroKafkaApplication.class, args);
}
}我們應該提供一個 ConfluentSchemaRegistryClient Bean:
@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;
@Bean
public SchemaRegistryClient schemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endPoint);
return client;
}endPoint 是 Confluent Schema Registry 的 URL。
5.6. 測試我們的服務
讓我們通過 POST 請求測試該服務:
curl -X POST localhost:8080/employees/1001/Harry/Potter日誌表明此操作已成功。
2019-06-11 18:45:45.343 INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}5.7. 處理過程中發生了什麼?
讓我們來了解一下在我們的示例應用程序中究竟發生了什麼:
- 生產者使用 Employee 對象構建 Kafka 消息。
- 生產者將員工模式註冊到模式註冊表中獲取模式版本 ID,這要麼創建一個新的 ID,要麼重用該確切模式的現有 ID。
- Avro 使用模式序列化了 Employee 對象。
- Spring Cloud 將模式 ID 放入消息頭中。
- 消息已發佈到主題上。
- 當消息到達消費者時,它從頭中讀取模式 ID。
- 消費者使用模式 ID 從註冊表中獲取 Employee 模式。
- 消費者找到了可以表示該對象的本地類,並將消息反序列化到其中。
6. 使用原生 Kafka 庫進行序列化/反序列化
Spring Boot 提供了一些內置的消息轉換器。默認情況下,Spring Boot 使用 Content-Type 標頭來選擇合適的消息轉換器。
在我們的示例中,Content-Type 是 application/*+avro, 因此它使用了 AvroSchemaMessageConverter 來讀取和寫入 Avro 格式。但是,Confluent 建議使用 KafkaAvroSerializer 和 KafkaAvroDeserializer 進行消息轉換。
雖然 Spring 自己的格式在某些情況下表現良好,但它在分區方面存在一些缺點,並且與 Confluent 標準不兼容,這可能會影響 Kafka 實例上的一些非 Spring 服務。
讓我們更新我們的 application.yml 以使用 Confluent 轉換器:
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
bindings:
input:
destination: employee-details
content-type: application/*+avro
output:
destination: employee-details
content-type: application/*+avro
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
我們已啓用 useNativeEncoding。它強制 Spring Cloud Stream 將序列化委託給提供的類。
我們還應該瞭解如何在 Spring Cloud 中使用 kafka.binder.producer-properties 和 kafka.binder.consumer-properties 來提供原生設置屬性。
7. 消費者組和分區
消費者組是屬於同一應用程序的消費者集合。同一消費者組的消費者共享相同的組名。
請更新 application.yml 以添加消費者組名:
spring:
cloud:
stream:
// ...
bindings:
input:
destination: employee-details
content-type: application/*+avro
group: group-1
// ...所有消費者會均勻地將主題分區分配給它們。不同分區的消息將並行處理。
**在消費者組中,同時讀取消息的最大消費者數量等於分區數量。** 因此,我們可以配置分區和消費者的數量以獲得所需的並行性。通常,我們應該比所有服務的副本上的消費者的總數還要多分區。
7.1. 分區鍵 (Partition Key)
當處理我們的消息時,消息的處理順序可能很重要。當我們的消息並行處理時,很難控制處理的順序。
Kafka 提供了一個規則,即 在給定的分區內,消息始終按照它們到達的順序進行處理。因此,當某些消息需要按照正確的順序處理時,我們確保它們都落入同一個分區。
我們可以發送消息到主題時提供分區鍵。 具有相同分區鍵的消息將始終進入同一個分區。如果未指定分區鍵,消息將採用輪詢方式進行分區。
讓我們通過一個例子來理解這一點。 假設我們接收到多個員工的消息,並且我們希望按照正確的順序處理所有員工的消息。部門名稱和員工 ID 可以唯一標識一個員工。
因此,讓我們使用員工 ID 和部門名稱定義分區鍵:
{
"type": "record",
"name": "EmployeeKey",
"namespace": "com.baeldung.schema",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "departmentName",
"type": "string"
}]
}在構建項目後,EmployeeKey POJO 將在 com.baeldung.schema 包下生成。
讓我們更新生產者,使用 EmployeeKey 作為分區鍵:
public void produceEmployeeDetails(int empId, String firstName, String lastName) {
// creating employee details
Employee employee = new Employee();
employee.setId(empId);
// ...
// creating partition key for kafka topic
EmployeeKey employeeKey = new EmployeeKey();
employeeKey.setId(empId);
employeeKey.setDepartmentName("IT");
Message<Employee> message = MessageBuilder.withPayload(employee)
.setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
.build();
processor.output()
.send(message);
}在這裏,我們把分區鍵放在消息頭中。
現在,相同的分區將接收具有相同員工 ID 和部門名稱的消息。
7.2. 消費者併發
Spring Cloud Stream 允許我們通過在 application.yml 中設置,為消費者配置併發數:
spring:
cloud:
stream:
// ...
bindings:
input:
destination: employee-details
content-type: application/*+avro
group: group-1
concurrency: 3現在,我們的消費者將併發地從該主題讀取三個消息。換句話説,Spring 將啓動三個不同的線程,獨立地進行消費。
8. 結論
在本文中,我們整合了使用 Apache Kafka、Avro 模式和 Confluent Schema Registry 的生產者和消費者。
我們通過一個應用程序完成了這個整合,但生產者和消費者可以部署在不同的應用程序中,並且可以擁有各自的模式版本,這些版本通過註冊表保持同步。
我們研究了 Spring 中 Avro 和 Schema Registry 客户端的實現,並探討了如何使用 Confluent 標準實現進行序列化和反序列化,以實現互操作性。
最後,我們研究瞭如何分區我們的主題,並確保我們擁有正確的消息鍵,從而實現消息的 safe 並行處理。