知識庫 / Spring / Spring Boot RSS 訂閱

Spring Boot 和 Dapr 靈活的 Pub/Sub 消息傳遞

Spring Boot
HongKong
4
10:46 AM · Dec 06 ,2025

1. 簡介

本文將介紹 Dapr 是什麼,它如何與 Spring Boot 集成,以及如何創建一個發佈/訂閲系統,而無需與特定的消息代理耦合。 我們將通過一個網約車場景進行演示,其中用户請求行程,而司機則訂閲這些行程請求。 最終,我們將實現無需 Dapr CLI 或外部基礎設施即可運行的測試。

2. 使用 Dapr 實現無廠商依賴的基礎設施

分佈式系統經常伴隨着常見的但複雜的挑戰。我們通常使用混合的供應商特定庫、基礎設施工具和手動集成工作來解決這些問題。

Dapr(分佈式應用程序運行時)提供了一組 API 和 構建模塊,以解決這些挑戰,抽象掉基礎設施,使我們能夠專注於業務邏輯。 這些原則適用於其他問題,例如通過服務調用 API 調用其他服務、通過狀態管理 API 持久化狀態或通過密鑰 API 檢索密鑰。

這種解耦使得應用程序更容易測試、在不同環境中更具可移植性,並且對基礎設施的更改更具彈性。 在本文中,我們將重點介紹 pub/sub API,以實際中説明這些優勢。

2.1. Spring 消息與 Dapr 橋接

Spring Boot 具有強烈的集成模型,尤其是在消息傳遞方面。 許多開發人員已經熟悉 Spring 抽象,例如 <em >KafkaTemplate</em><em >RabbitTemplate</em> 以及它們的監聽器對應項。 雖然這些簡化了消息代理集成,但它們仍然緊密耦合到特定的技術。

與其僅僅作為另一種 API,<a href="https://central.sonatype.com/artifact/io.dapr/dapr-sdk-springboot?smo=true" target="_blank" rel="noopener">dapr-spring-boot-starter</a> 項目提供無縫集成。</strong> 它使用熟悉的名字的接口,例如DaprMessagingTemplate 和 `@Topic。 這些使您能夠輕鬆地利用 Dapr 的分佈式消息傳遞功能,而無需瞭解底層基礎設施的細節。

更具體地説,通過包含 Dapr Spring Boot starter,我們無需包含任何特定的消息代理依賴項。 這 允許我們切換消息代理,而無需修改代碼。 組件級別的配置特定功能也是可以實現的,而無需修改應用程序代碼。 例如,我們可以包含一個 <a href="https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/">Kafka 專用設置</a> 以利用消費者組等原生功能。

2.2. 在不鎖定基礎設施的前提下擁有基礎設施靈活性

Dapr 將應用程序代碼與基礎設施解耦。例如,無論我們使用 Kafka、RabbitMQ、Redis Streams 還是 Azure Service Bus 作為底層,我們的 Spring Boot 應用程序通過 HTTP 或 gRPC 與 Dapr sidecar 交互,而 Dapr 則負責與實際的代理處理集成。

最重要的是,我們可以無需完整基礎設施即可本地測試,正如我們將使用 Testcontainers 所見。 <a href="https://central.sonatype.com/artifact/io.dapr.spring/dapr-spring-boot-starter-test">dapr-spring-boot-starter-test</a> 模塊在測試生命週期中啓動 Dapr sidecar,從而消除了學習 Dapr CLI 或 Kubernetes 的需求。

3. 設置 Spring Boot 項目

我們將模擬一個網約車應用,以演示 Dapr 如何與 Spring Boot 集成。用户將向我們的 API 端點發送乘車請求,該端點會將消息發佈給訂閲了該消息的司機。司機可以選擇接受乘車請求。

讓我們首先添加所需的依賴項。我們需要 spring-boot-starter-web 用於我們的 REST 端點,以及 dapr-spring-boot-starter 用於 Spring Boot 集成:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>io.dapr.spring</groupId>
    <artifactId>dapr-spring-boot-starter</artifactId>
    <version>1.16.0</version>
</dependency>

為了測試,我們還將添加 dapr-spring-boot-starter-test 以支持 Testcontainers,以及 RabbitMQ 容器作為我們的消息代理:

<dependency>
    <groupId>io.dapr.spring</groupId>
    <artifactId>dapr-spring-boot-starter-test</artifactId>
    <version>1.16.0</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId> 
    <artifactId>rabbitmq</artifactId>
    <version>1.21.3</version>
    <scope>test</scope>
</dependency>

3.1. 創建模型

這個POJO代表一個行程請求:

public class RideRequest {
    private String passengerId;
    private String location;
    private String destination;

    // default getters and setters
}

無需對消息傳遞進行特殊標註。

4. 使用 DaprMessagingTemplate 實現發佈者

DaprMessagingTemplate 類似於 Spring 提供的其他消息模板,但不需要將特定的消息代理作為依賴項。 讓我們首先在 application.properties 中定義我們的消息組件名稱:

dapr.pubsub.name=ride-hailing

然後,我們將使用 DaprPubSubProperties 類來引用該屬性,並將我們的 RideRequest 作為消息類型。 這完成了發送消息所需的配置。

@Configuration
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprMessagingConfig {

    @Bean
    public DaprMessagingTemplate<RideRequest> messagingTemplate(
      DaprClient client, DaprPubSubProperties config) {
        return new DaprMessagingTemplate<>(client, config.getName(), false);
    }
}

4.1. 接收消息與端點

接下來,我們將創建一個控制器來接收行程請求並使用 Dapr 模板將其轉發到“ride-requests”主題。我們可以將控制器映射到任何我們想要的路徑:

@RestController
@RequestMapping("/passenger")
public class PassengerRestController {

    @Autowired
    private DaprMessagingTemplate<RideRequest> messaging;

    @PostMapping("/request-ride")
    public String requestRide(@RequestBody RideRequest request) {
        messaging.send("ride-requests", request);
        return "waiting for drivers";
    }
}

請注意,我們的消息主體無需任何轉換或配置,因為 Dapr 將自動處理它。

5. 創建和配置訂閲者

在我們的示例中,司機充當訂閲者,接收請求的乘車邀請並決定是否接受。我們將使用 Dapr 的 <em >@Topic</em> 註解將傳入的消息綁定到控制器方法中來實現這一功能。

5.1. 使用 @Topic

當使用 @Topic註解時,必須同時包含組件和主題的名稱。 Dapr sidecar(由測試容器自動處理)會在轉發消息到消息代理時調用該端點。

@RestController
@RequestMapping("driver")
public class DriverRestController {

    // ...

    @PostMapping("ride-request")
    @Topic(pubsubName = "ride-hailing", name = "ride-requests")
    public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
        // ...
    }
}

請注意,負載被包裹在一個 CloudEvent 對象中,Dapr 會自動創建這個對象。這對於高級場景,例如基於 CloudEvent 的元數據進行路由或過濾非常有用,但對於基本的 pub/sub 功能來説並非必需。

5.2. 配置訂閲者行為

我們的訂閲者代表一名司機,他可以接受或拒絕行程。為了説明,我們將使用簡單的模式邏輯來確定行程是否可接受。請將以下內容添加到我們的 application.properties 文件中,以便在啓動應用程序時輕鬆更改其值:

driver.acceptance.criteria=East Side

接下來,我們將此值注入到我們的控制器中的一個變量中,同時還包含變量來跟蹤已接受/拒絕的驅動器數量:

int drivesAccepted;
int drivesRejected;

@Value("${driver.acceptance.criteria}")
String criteria;

public int getDrivesAccepted() {
    return drivesAccepted;
}

public int getDrivesRejected() {
    return drivesRejected;
}

我們將使用這些內容來編寫測試,以檢查我們的控制器行為。

5.3. 處理 CloudEvent

最後,我們將從 CloudEvent 中檢索我們的負載,並決定驅動是否可接受:

@Topic(pubsubName = "ride-hailing", name = "ride-requests")
public void onRideRequest(@RequestBody CloudEvent<RideRequest> event) {
    RideRequest request = event.getData();

    if (request.getDestination().contains(criteria)) {
        drivesAccepted++;
    } else {
        drivesRejected++;
        throw new UnsupportedOperationException("drive rejected");
    }
}

由於我們無法直接拒絕消息,因此我們拋出異常以觸發消息的重新排隊。

對於 RabbitMQ,這需要配置 requeueInFailure,我們在創建測試容器時將設置該配置。

6. 使用 Testcontainers 測試發佈器

為了驗證我們的發佈器是否正確發送消息,我們將使用 Testcontainers 編寫集成測試。這允許我們啓動 Dapr sidecar 和 RabbitMQ 實例,而無需依賴外部工具或 Dapr CLI。

6.1. 設置測試配置

對於我們的測試屬性,除了驗收標準外,我們還將包含消息組件名稱和 Dapr 容器的專用服務器端口。

此外,我們需要選擇一個固定的端口,以便我們的組件能夠在同一網絡中相互定位:

driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60601

我們將從設置服務器端口號和指定組件間網絡共享開始進行配置。我們還將包含 DaprPubSubProperties 以稍後獲取 RabbitMQ 組件的名稱:

@TestConfiguration(proxyBeanMethods = false)
@EnableConfigurationProperties({ DaprPubSubProperties.class })
public class DaprTestContainersConfig {

    @Value("${server.port}")
    private int serverPort;

    @Bean
    public Network daprNetwork() {
        return Network.newNetwork();
    }

    // ...
}

6.2. 配置容器

讓我們創建一個 RabbitMQ 容器,使其暴露默認端口 5672:

@Bean
public RabbitMQContainer rabbitMQContainer(Network daprNetwork) {
    return new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"))
      .withExposedPorts(5672)
      .withNetworkAliases("rabbitmq")
      .withNetwork(daprNetwork);
}

最後,我們將添加一個 Dapr 容器來完成所有內容,並使用 @ServiceConnection 註解來簡化配置:

@Bean
@ServiceConnection
public DaprContainer daprContainer(
  Network daprNetwork, RabbitMQContainer rabbitMQ, DaprPubSubProperties pubSub) {
    Map<String, String> rabbitMqConfig = new HashMap<>();
    rabbitMqConfig.put("connectionString", "amqp://guest:guest@rabbitmq:5672");
    rabbitMqConfig.put("user", "guest");
    rabbitMqConfig.put("password", "guest");
    rabbitMqConfig.put("requeueInFailure", "true");

    return new DaprContainer("daprio/daprd:1.14.4")
      .withAppName("dapr-pubsub")
      .withNetwork(daprNetwork)
      .withComponent(new Component(pubSub.getName(), "pubsub.rabbitmq", "v1", rabbitMqConfig))
      .withAppPort(serverPort)
      .withAppChannelAddress("host.testcontainers.internal")
      .dependsOn(rabbitMQ);
}

除了樣板代碼之外,關鍵配置包括:

  • requeueInFailure:由於我們無法直接對 NACK 消息進行否定確認,因此啓用此選項。當有多個訂閲實例時,它允許其他客户端接收其他客户端拒絕的消息。
  • withComponent(…”pubsub.rabbitmq”):我們使用 RabbitMQ 實現,因此在這裏指定它。Dapr 支持 許多消息代理,包括雲提供商管理的雲 PubSub 服務,如 Google PubSubAmazon SQS/SNSAzure Event Hub
  • withAppChannelAddress:我們包含此選項以啓用容器對 主機訪問到容器。如果沒有它,測試可能會在等待 Dapr 響應時掛起。

我們還可以使用日誌配置啓動 Dapr 容器,從而更輕鬆地進行調試。為此,我們設置withDaprLogLevelwithLogConsumer 選項:

.withDaprLogLevel(DaprLogLevel.INFO) 
.withLogConsumer(outputFrame -> logger.info(outputFrame.getUtf8String()))

6.3. 創建測試應用

現在,我們準備在測試包中創建我們的測試應用:

@SpringBootApplication
public class DaprPublisherTestApp {

    public static void main(String[] args) {
        SpringApplication.from(DaprPublisherApp::main)
          .with(DaprTestContainersConfig.class)
          .run(args);
    }
}

我們將引用我們的主應用程序類,以避免重複配置,例如 DaprMessagingConfig 類。 此外,我們還需要將 DriverRestController 類複製到測試文件夾,用於集成測試。

6.4. 創建集成測試

我們需要引用我們的測試應用、配置以及 <em >DaprAutoConfiguration</em> 類。然後,注入我們的控制器來檢查控制變量,以及 Dapr 容器,以便在我們的應用準備接收消息時獲知狀態:

@SpringBootTest(
  classes = { 
    DaprPublisherTestApp.class, 
    DaprTestContainersConfig.class, 
    DaprAutoConfiguration.class }, 
  webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprPublisherIntegrationTest {

    @Autowired
    DriverRestController controller;

    @Autowired
    DaprContainer daprContainer;

    @Value("${server.port}")
    int serverPort;

    @Value("${driver.acceptance.criteria}")
    String criteria;

    // ...
}

為了驗證我們的容器是否已正確啓動,我們可以等待“應用程序已訂閲以下主題”的消息。 這有助於確保我們的測試僅在容器準備好接受消息時開始。 我們還將定義 API 的基本 URI,以便使用 RestAssured 進行調用:

@BeforeEach
void setUp() {
    RestAssured.baseURI = "http://localhost:" + serverPort;
    org.testcontainers.Testcontainers.exposeHostPorts(serverPort);

    Wait.forLogMessage(".*app is subscribed to the following topics.*", 1)
      .waitUntilReady(daprContainer);
}

我們的首次測試涉及發佈一個符合駕駛員接受標準的要求的驅動請求,並檢查接受的驅動數量。當這個數字增加時,我們可以斷言訂閲者已處理消息:

@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
    int drivesAccepted = controller.getDrivesAccepted();

    given()
      .contentType(ContentType.JSON)
      .body("""
        {
          "passengerId": "1",
          "location": "Point A",
          "destination": "%s Point B"
        }
      """.formatted(criteria))
    .when()
      .post("/passenger/request-ride")
    .then()
      .statusCode(200);

    await()
      .atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}

相反,我們的第二個測試涉及發佈一個驅動程序應拒絕的驅動請求:

@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
    int drivesRejected = controller.getDrivesRejected();

    given().contentType(ContentType.JSON)
      .body("""
        {
          "passengerId": "2",
          "location": "Point B",
          "destination": "West Side A"
        }
      """)
    .when()
      .post("/passenger/request-ride")
    .then()
      .statusCode(200);

    await()
      .atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesRejected, greaterThan(drivesRejected));
}

本次測試旨在驗證被拒絕的驅動數量是否增加。 此外,由於錯誤時消息會被重新排隊,因此我們驗證變量是否大於其初始值,因為我們無法確定該變量已被處理了多少次。

7. 使用 Testcontainers 測試訂閲者

現在,讓我們測試訂閲者的行為。我們將創建一個與發佈者相似的設置,重點驗證訂閲者如何處理傳入的消息。

7.1. 環境搭建

為了開始,我們將包含類似的測試屬性,僅更改服務器端口:

driver.acceptance.criteria=East Side
dapr.pubsub.name=ride-hailing
server.port=60602

我們將複製 DaprMessagingConfig 類到我們的測試包,以便在集成測試中使用它。 我們還需要將 DaprTestContainersConfig 複製到我們的測試文件夾,因為我們需要相同的容器。

7.2. 創建集成測試

與我們之前的集成測試類似,我們需要在 <em @Setup</em> 中連接容器、控制器、服務器端口、驅動程序接受標準,並等待容器準備就緒。 我們還需要包含 Dapr 消息模板,以便向我們的訂閲者發送消息:

@SpringBootTest(
  classes = { 
    DaprSubscriberTestApp.class, 
    DaprTestContainersConfig.class, 
    DaprMessagingConfig.class, 
    DaprAutoConfiguration.class }, 
  webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class DaprSubscriberIntegrationTest {

    @Autowired
    DaprMessagingTemplate<RideRequest> messaging;

    @Autowired
    DriverRestController controller;

    @Autowired
    DaprContainer daprContainer;

    @Value("${server.port}")
    int serverPort;

    @Value("${driver.acceptance.criteria}")
    String criteria;

    // test setup...
}

7.3. 實現測試場景

我們的第一個測試涉及發送一個可接受的驅動器並檢查我們的控制器是否正確接收到了它:

@Test
void whenDriveAcceptable_thenDrivesAcceptedIncrease() {
    int drivesAccepted = controller.getDrivesAccepted();

    RideRequest ride = new RideRequest(
      "1", "Point A", String.format("%s Point B", criteria));
    messaging.send("ride-requests", ride);

    await().atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesAccepted, equalTo(drivesAccepted + 1));
}

我們的第二個測試包括髮送一個不可接受的驅動器,並檢查我們的控制器是否正確地拒絕它。

@Test
void whenDriveUnacceptable_thenDrivesRejectedIncrease() {
    int drivesRejected = controller.getDrivesRejected();

    RideRequest request = new RideRequest("2", "Point B", "West Side Point A");
    messaging.send("ride-requests", request);

    await().atMost(Duration.ofSeconds(5))
      .until(controller::getDrivesRejected, greaterThan(drivesRejected));
}

通過實施訂閲者測試,我們已驗證 Dapr 正確地將消息從消息中間件路由到我們的 Spring Boot 應用程序,並且訂閲者的行為符合預期。

8. 結論

在本文中,我們使用 Spring Boot 和 Dapr 構建了一個鬆散耦合的 pub/sub 消息系統。通過利用 Dapr 對消息代理的抽象以及與 Spring Boot 的集成,我們簡化了消息邏輯,而無需綁定到特定的基礎設施。我們還展示瞭如何使用 Testcontainers 本地運行和測試整個設置,從而在開發過程中實現了快速的反饋循環。

如往常一樣,源代碼可在 GitHub 上獲取

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.