知識庫 / Spring / Spring Boot RSS 訂閱

使用 Apache Pulsar 和 Spring Boot 入門指南

Spring Boot
HongKong
11
11:30 AM · Dec 06 ,2025

1. 概述

Apache Pulsar 是一個分佈式發佈-訂閲消息系統。雖然 Apache Pulsar 提供的功能與 Apache Kafka 類似,但 Pulsar 旨在克服 Kafka 在高延遲、低吞吐量、擴展和異地複製方面的侷限性等問題。當處理大量需要實時處理的數據時,Apache Pulsar 是一個不錯的選擇。

在本教程中,我們將學習如何將 Apache Pulsar 集成到我們的 Spring Boot 應用程序中。我們將利用 Pulsar Spring Boot Starter 提供的 PulsarTemplatePulsarListener。我們還將根據我們的要求修改它們的默認配置。

2. Maven 依賴

首先,按照《Apache Pulsar 簡介》中描述,運行一個獨立的 Apache Pulsar 服務器。

接下來,將 spring-pulsar-spring-boot-starter 庫添加到我們的項目中:

// 這是一個示例代碼塊,僅用於演示代碼塊的翻譯
// 翻譯的註釋內容
// 這段代碼展示瞭如何使用 spring-pulsar 庫
<dependency>
    <groupId>org.springframework.pulsar</groupId>
    <artifactId>spring-pulsar-spring-boot-starter</artifactId>
    <version>0.2.0</version>
</dependency>

3. PulsarClient

為了與 Pulsar 服務器進行交互,我們需要配置一個 PulsarClient。 默認情況下,Spring 會自動配置一個 PulsarClient,該客户端連接到 Pulsar 服務器,地址為 localhost:6650

spring:
  pulsar:
    client:
      service-url: pulsar://localhost:6650

我們可以更改此配置以在不同的地址上建立連接。

要連接到安全的服務器,我們可以使用 pulsar+ssl 代替 pulsar

此外,我們還可以通過向 application.yml 添加 spring.pulsar.client.* 屬性,配置連接超時、身份驗證和內存限制等屬性。

4. 定義自定義對象 Schema

我們將使用一個簡單的 User 類用於我們的應用程序:

public class User {

    private String email;
    private String firstName;

    // standard constructors, getters and setters
}

Spring-Pulsar 會自動檢測基本數據類型並生成相應的 Schema。但是,如果需要使用自定義 JSON 對象,則必須為 PulsarClient 配置其 Schema 信息

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.baeldung.springpulsar.User
          schema-info:
            schema-type: JSON

此處,message-type 屬性接受消息類的完全限定名,schema-type 提供用於使用的模式類型的相關信息。對於複雜的對象,schema-type 屬性可以接受 AVROJSON 值。

儘管使用屬性文件指定模式是首選方法,但我們也可以通過 Bean 提供該模式:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
    return (schemaResolver) -> {
        schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
    }
}

此配置應同時添加到生產者和監聽器應用程序中。

5. 發佈者

要將消息發佈到 Pulsar 主題上,我們將使用 PulsarTemplatePulsarTemplate 實現 PulsarOperations 接口,並提供以同步和異步形式發佈記錄的方法。 同步 send 方法會阻塞調用以提供同步操作能力,而異步 sendAsync 方法則提供非阻塞異步操作。

在本教程中,我們將使用同步操作來發布記錄。

5.1. 發佈消息

Spring Boot 會自動配置一個可直接使用的 PulsarTemplate,用於將記錄發佈到指定的 Topic。

讓我們創建一個 Producer,用於將 String 消息發佈到隊列:

@Component
public class PulsarProducer {

    @Autowired
    private PulsarTemplate<String> stringTemplate;

    private static final String STRING_TOPIC = "string-topic";

    public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
        stringTemplate.send(STRING_TOPIC, str);
    }
}

現在,讓我們嘗試將 User 對象發送到一個新的隊列:

@Autowired
private PulsarTemplate<User> template;

private static final String USER_TOPIC = "user-topic";

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.send(USER_TOPIC, user);
}

在上述代碼片段中,我們使用了 PulsarTemplate 類來將 User 類的對象發送到名為 user-topic 的 Apache Pulsar 主題。

5.2. 生產者端自定義

PulsarTemplate 接受 TypedMessageBuilderCustomizer 以配置發送消息,並接受 ProducerBuilderCustomizer 以自定義生產者的屬性。

我們可以使用 TypedMessageBuilderCustomizer 配置消息延遲、在特定時間發送、禁用複製以及提供其他屬性:

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.newMessage(user)
      .withMessageCustomizer(mc -> {
        mc.deliverAfter(10L, TimeUnit.SECONDS);
      })
      .send();
}

ProducerBuilderCustomizer 可用於添加訪問模式、自定義消息路由以及攔截器,並啓用或禁用分塊和批量處理。

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.newMessage(user)
      .withProducerCustomizer(pc -> {
        pc.accessMode(ProducerAccessMode.Shared);
      })
      .send();
}

6. 消費者

在將消息發佈到我們的主題後,我們現在將建立對同一主題的監聽器。為了啓用對主題的監聽,我們需要使用 <em >@PulsarListener</em> 註解裝飾監聽器方法。

Spring Boot 會配置所有必要的組件供監聽器方法使用。

我們還需要使用 <em >@EnablePulsar</em> 才能使用 <em >PulsarListener</em>

6.1. 接收消息

我們將首先為之前創建的 string-topic 創建一個監聽器方法:

@Service
public class PulsarConsumer {

    private static final String STRING_TOPIC = "string-topic";

    @PulsarListener(
      subscriptionName = "string-topic-subscription",
      topics = STRING_TOPIC,
      subscriptionType = SubscriptionType.Shared
    )
    public void stringTopicListener(String str) {
        LOGGER.info("Received String message: {}", str);
    }
}

在這裏,在 PulsarListener 註解中,我們配置了該方法將監聽的主題為 topicName,並在 subscriptionName 屬性中指定了訂閲名稱。

現在,讓我們為 user-topic 創建一個監聽器方法,該方法用於 User 類:

private static final String USER_TOPIC = "user-topic";

@PulsarListener(
    subscriptionName = "user-topic-subscription",
    topics = USER_TOPIC,
    schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

除了在先前 Listener 方法中提供的屬性之外,我們還添加了一個 schemaType 屬性,其值與生產者中的值相同。

我們還將向主類添加 @EnablePulsar 註解:

@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringPulsarApplication.class, args);
    }
}

6.2. 客户端自定義

除了訂閲名稱和模式類型之外,<em >PulsarListener</em> 還可以用於配置諸如自動啓動、批量處理和確認模式等屬性:

@PulsarListener(
  subscriptionName = "user-topic-subscription",
  topics = USER_TOPIC,
  subscriptionType = SubscriptionType.Shared,
  schemaType = SchemaType.JSON,
  ackMode = AckMode.RECORD,
  properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

在這裏,我們已將確認模式設置為 記錄,並已將確認超時設置為 60 秒。

7. 使用死信主題

如果消息確認超時或服務器收到 <em nack</em>>,Pulsar 會嘗試重發消息一定次數。在這些重試耗盡後,這些未送達的消息可以發送到稱為“死信隊列”(DLQ)的隊列中。

此選項僅適用於<em Shared</em>>訂閲類型。要為我們的<em user-topic&nbsp;</em>隊列配置DLQ,我們首先將創建一個<em DeadLetterPolicy</em> Bean,該 Bean 將定義重發應嘗試的次數以及用於作為DLQ使用的隊列的名稱:

private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
    return DeadLetterPolicy.builder()
      .maxRedeliverCount(10)
      .deadLetterTopic(USER_DEAD_LETTER_TOPIC)
      .build();
}

現在,我們將此策略添加到我們之前創建的 PulsarListener 中:

@PulsarListener(
  subscriptionName = "user-topic-subscription",
  topics = USER_TOPIC,
  subscriptionType = SubscriptionType.Shared,
  schemaType = SchemaType.JSON,
  deadLetterPolicy = "deadLetterPolicy",
  properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

在這裏,我們配置了 userTopicListener 使用我們之前創建的 deadLetterPolicy,並設置了確認時間為 60 秒。

我們可以創建一個單獨的 Listener 來處理 DQL 中的消息:

@PulsarListener(
  subscriptionName = "dead-letter-topic-subscription",
  topics = USER_DEAD_LETTER_TOPIC,
  subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
    LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}

8. 結論

在本教程中,我們學習瞭如何使用 Apache Pulsar 與我們的 Spring Boot 應用程序以及一些更改默認配置的方法。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.