1. 概述
Apache Pulsar 是一個分佈式發佈-訂閲消息系統。雖然 Apache Pulsar 提供的功能與 Apache Kafka 類似,但 Pulsar 旨在克服 Kafka 在高延遲、低吞吐量、擴展和異地複製方面的侷限性等問題。當處理大量需要實時處理的數據時,Apache Pulsar 是一個不錯的選擇。
在本教程中,我們將學習如何將 Apache Pulsar 集成到我們的 Spring Boot 應用程序中。我們將利用 Pulsar Spring Boot Starter 提供的 PulsarTemplate 和 PulsarListener。我們還將根據我們的要求修改它們的默認配置。
2. Maven 依賴
首先,按照《Apache Pulsar 簡介》中描述,運行一個獨立的 Apache Pulsar 服務器。
接下來,將 spring-pulsar-spring-boot-starter 庫添加到我們的項目中:
// 這是一個示例代碼塊,僅用於演示代碼塊的翻譯
// 翻譯的註釋內容
// 這段代碼展示瞭如何使用 spring-pulsar 庫
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>3. PulsarClient
為了與 Pulsar 服務器進行交互,我們需要配置一個 PulsarClient。 默認情況下,Spring 會自動配置一個 PulsarClient,該客户端連接到 Pulsar 服務器,地址為 localhost:6650。
spring:
pulsar:
client:
service-url: pulsar://localhost:6650我們可以更改此配置以在不同的地址上建立連接。
要連接到安全的服務器,我們可以使用 pulsar+ssl 代替 pulsar。
此外,我們還可以通過向 application.yml 添加 spring.pulsar.client.* 屬性,配置連接超時、身份驗證和內存限制等屬性。
4. 定義自定義對象 Schema
我們將使用一個簡單的 User 類用於我們的應用程序:
public class User {
private String email;
private String firstName;
// standard constructors, getters and setters
}Spring-Pulsar 會自動檢測基本數據類型並生成相應的 Schema。但是,如果需要使用自定義 JSON 對象,則必須為 PulsarClient 配置其 Schema 信息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON此處,message-type 屬性接受消息類的完全限定名,schema-type 提供用於使用的模式類型的相關信息。對於複雜的對象,schema-type 屬性可以接受 AVRO 或 JSON 值。
儘管使用屬性文件指定模式是首選方法,但我們也可以通過 Bean 提供該模式:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
}
}此配置應同時添加到生產者和監聽器應用程序中。
5. 發佈者
要將消息發佈到 Pulsar 主題上,我們將使用 PulsarTemplate。 PulsarTemplate 實現 PulsarOperations 接口,並提供以同步和異步形式發佈記錄的方法。 同步 send 方法會阻塞調用以提供同步操作能力,而異步 sendAsync 方法則提供非阻塞異步操作。
在本教程中,我們將使用同步操作來發布記錄。
5.1. 發佈消息
Spring Boot 會自動配置一個可直接使用的 PulsarTemplate,用於將記錄發佈到指定的 Topic。
讓我們創建一個 Producer,用於將 String 消息發佈到隊列:
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<String> stringTemplate;
private static final String STRING_TOPIC = "string-topic";
public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
stringTemplate.send(STRING_TOPIC, str);
}
}現在,讓我們嘗試將 User 對象發送到一個新的隊列:
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}在上述代碼片段中,我們使用了 PulsarTemplate 類來將 User 類的對象發送到名為 user-topic 的 Apache Pulsar 主題。
5.2. 生產者端自定義
PulsarTemplate 接受 TypedMessageBuilderCustomizer 以配置發送消息,並接受 ProducerBuilderCustomizer 以自定義生產者的屬性。
我們可以使用 TypedMessageBuilderCustomizer 配置消息延遲、在特定時間發送、禁用複製以及提供其他屬性:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.send();
}ProducerBuilderCustomizer 可用於添加訪問模式、自定義消息路由以及攔截器,並啓用或禁用分塊和批量處理。
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.send();
}6. 消費者
在將消息發佈到我們的主題後,我們現在將建立對同一主題的監聽器。為了啓用對主題的監聽,我們需要使用 <em >@PulsarListener</em> 註解裝飾監聽器方法。
Spring Boot 會配置所有必要的組件供監聽器方法使用。
我們還需要使用 <em >@EnablePulsar</em> 才能使用 <em >PulsarListener</em>。
6.1. 接收消息
我們將首先為之前創建的 string-topic 創建一個監聽器方法:
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
}在這裏,在 PulsarListener 註解中,我們配置了該方法將監聽的主題為 topicName,並在 subscriptionName 屬性中指定了訂閲名稱。
現在,讓我們為 user-topic 創建一個監聽器方法,該方法用於 User 類:
private static final String USER_TOPIC = "user-topic";
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}除了在先前 Listener 方法中提供的屬性之外,我們還添加了一個 schemaType 屬性,其值與生產者中的值相同。
我們還將向主類添加 @EnablePulsar 註解:
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}6.2. 客户端自定義
除了訂閲名稱和模式類型之外,<em >PulsarListener</em> 還可以用於配置諸如自動啓動、批量處理和確認模式等屬性:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}在這裏,我們已將確認模式設置為 記錄,並已將確認超時設置為 60 秒。
7. 使用死信主題
如果消息確認超時或服務器收到 <em nack</em>>,Pulsar 會嘗試重發消息一定次數。在這些重試耗盡後,這些未送達的消息可以發送到稱為“死信隊列”(DLQ)的隊列中。
此選項僅適用於<em Shared</em>>訂閲類型。要為我們的<em user-topic </em>隊列配置DLQ,我們首先將創建一個<em DeadLetterPolicy</em> Bean,該 Bean 將定義重發應嘗試的次數以及用於作為DLQ使用的隊列的名稱:
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}現在,我們將此策略添加到我們之前創建的 PulsarListener 中:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}在這裏,我們配置了 userTopicListener 使用我們之前創建的 deadLetterPolicy,並設置了確認時間為 60 秒。
我們可以創建一個單獨的 Listener 來處理 DQL 中的消息:
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}8. 結論
在本教程中,我們學習瞭如何使用 Apache Pulsar 與我們的 Spring Boot 應用程序以及一些更改默認配置的方法。