知識庫 / Spring RSS 訂閱

使用和測試 Axon 應用 via REST 接口

REST,Spring
HongKong
9
03:37 AM · Dec 06 ,2025

1. 概述

Axon 框架幫助我們構建事件驅動的微服務系統。在《Axon 框架指南》中,我們通過一個簡單的 Axon Spring Boot 應用程序,瞭解了 Axon。該應用程序可以創建和更新訂單,並且可以確認和發貨這些訂單。

在《Axon 框架中查詢調度》中,我們向 OrderQueryService 添加了一些額外的查詢。

查詢通常用於 UI 界面,這些界面通常調用 REST 端點。

在本教程中,我們將 創建所有查詢的 REST 端點。我們還將從這些端點中進行集成測試。

2. 使用查詢在 REST 端點

我們可以通過向一個帶有 <em @RestController</em> 註解的類添加函數來添加 REST 端點。我們將使用名為 <em OrderRestEndpoint</em> 的類來實現這一點。 之前我們直接在控制器中使用 <em QueryGateway</em>。 我們將替換注入的 <em QueryGateway</em> 用於 <em OrderQueryService</em>,該服務我們在 Axon Framework 中實現為分發查詢。 這樣,控制器函數的唯一關注點就是將行為綁定到 REST 路徑。

所有端點都列在項目中的 <em order-api.http</em> 文件中。 藉助該文件,我們可以使用 IntelliJ 作為我們的 IDE 調用端點。

2.1. 點對點查詢

點對點查詢僅有一個響應,因此易於實現:

@GetMapping("/all-orders")
public CompletableFuture<List<OrderResponse>> findAllOrders() {
    return orderQueryService.findAllOrders();
}

Spring 等待 CompletableFuture 解決後,會返回 JSON 格式的負載。我們可以通過調用 localhost:8080/all-orders 來測試它,以獲取所有訂單在一個數組中。

在乾淨的設置中,如果首先使用向 http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768http://localhost:8080/ship-order 發送 POST 請求添加兩個訂單,當我們調用 http://localhost:8080/all-orders 時,我們應該看到以下內容:

[
  {
    "orderId": "72d67527-a27c-416e-a904-396ebf222344",
    "products": {
      "Deluxe Chair": 1
    },
    "orderStatus": "SHIPPED"
  },
  {
    "orderId": "666a1661-474d-4046-8b12-8b5896312768",
    "products": {},
    "orderStatus": "CREATED"
  }
]

2.2. 實時查詢

實時查詢將返回一個事件流,並最終關閉。我們可以等待該流關閉後,在完成時發送響應。但是,直接流式傳輸更有效。我們通過利用 Server-Send 事件來實現這一點:

@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
    return orderQueryService.allOrdersStreaming();
}

通過添加媒體類型,Spring 瞭解到我們希望以服務器端事件的形式接收響應。這意味着每個訂單都會單獨發送。如果客户端支持服務器端事件,localhost:8080/all-orders-streaming 將會逐個返回所有訂單。

與點對點查詢保持數據庫中的相同項目,將產生如下結果:

data:{"orderId":"72d67527-a27c-416e-a904-396ebf222344","products":{"Deluxe Chair":1},"orderStatus":"SHIPPED"}

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}

這分別代表兩個獨立的服務器發送事件。

2.3. 散搜查詢。

將 Axon 查詢返回的響應組合的邏輯已經存在於 OrderQueryService 中。這使得 實現與點對點查詢非常相似,因為只有一個響應。例如,使用散搜查詢添加端點:

@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
    return orderQueryService.totalShipped(productId);
}

調用 http://localhost:8080/total-shipped/Deluxe Chair 將返回總共已運送的椅子數量,包括來自 LegacyQueryHandler 的 234 個。如果 ship-order 調用中的一個仍然存在於數據庫中,它應該返回 235。

2.4. 訂閲查詢

與流式查詢不同,訂閲查詢可能永遠不會結束。因此,等待訂閲查詢完成並非一個選項。 我們將再次利用服務器端事件來添加端點:

@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
    return orderQueryService.orderUpdates(orderId);
}

調用 http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768 將會提供關於該產品的更新流。 通過向 http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/product/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3dd 發送 POST 請求,我們觸發了更新。 該更新以 Server-Send 事件的形式發送。

我們將看到初始狀態和更新後的狀態。 連接保持開放以接收進一步的更新。

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}

data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{"a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":1},"orderStatus":"CREATED"}

正如我們所見,此次更新包含了我們添加的產品。

3. 集成測試

對於集成測試,我們將使用 WebClient。

對於這些測試,我們將使用@SpringBootTest,首先通過其他 REST 端點更改狀態。這些其他 REST 端點會觸發一個或多個命令來創建事件。為了創建訂單,我們將使用在《Axon 框架指南》中添加的端點。我們使用 @DirtiesContext 註解,因此一個測試中創建的事件不會影響另一個測試。

我們不運行集成測試期間的 Axon Server,而是將 axon.axonserver.enabled=false 設置在 application.properties 中,位於我們的 src/test/resources 目錄中。 這樣,我們就可以使用非分佈式網關,這些網關運行速度更快,並且不需要 Axon Server。 這些網關處理三種不同類型的消息。

我們可以創建輔助方法以使我們的測試更具可讀性。 這些輔助函數提供正確的類型並設置 HTTP 標頭(如果需要)。 例如:

private void verifyVoidPost(WebClient client, String uri) {
    StepVerifier.create(retrieveResponse(client.post()
      .uri(uri)))
      .verifyComplete();
}

這對於調用具有空返回類型的 post 端點非常有用。它將使用 retrieveResponse() 輔助函數進行調用並驗證其完成情況。 類似的需求經常出現,只需要幾行代碼。 通過將它們放在輔助函數中,我們可以使測試更具可讀性和可維護性。

3.1. 測試點對點查詢

為了測試 /all-orders REST 端點,我們首先創建一個訂單,然後驗證是否可以檢索到創建的訂單。要做到這一點,我們首先需要創建一個 <em >WebClient</em>。WebClient 是一個反應式實例,可用於執行 HTTP 調用。在創建訂單後,我們檢索所有訂單並驗證結果:

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
createRandomNewOrder(client);
StepVerifier.create(retrieveListResponse(client.get()
    .uri("http://localhost:" + port + "/all-orders")))
  .expectNextMatches(list -> 1 == list.size() && list.get(0)
    .getOrderStatus() == OrderStatusResponse.CREATED)
  .verifyComplete();

由於其反應式特性,我們可以使用 StepVerifierreactor-test 來驗證響應。

我們期望列表中只有一個 Order,即我們剛剛創建的那個。 此外,我們期望該 Order 具有 CREATED 訂單狀態。

3.2. 實時查詢測試

實時查詢可能會返回多個訂單。 我們還需要測試流是否完成。 為了進行測試,我們將創建三個新的隨機訂單,然後測試實時查詢的響應:

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
for (int i = 0; i < 3; i++) {
    createRandomNewOrder(client);
}
StepVerifier.create(retrieveStreamingResponse(client.get()
    .uri("http://localhost:" + port + "/all-orders-streaming")))
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
  .verifyComplete();

在流程結束時調用 verifyComplete() 以確保流程已關閉。需要注意的是,在某種情況下,可以實現流式查詢,使其不完成。在這種情況下,它確實完成,並且需要進行驗證。

3.3. 測試散佈-收集查詢

為了測試散佈-收集查詢,我們需要確保來自多個處理器的結果得到合併。我們使用一個端點發送一把椅子。然後我們檢索所有已發送的椅子。 由於 LegacyQueryHandler 返回椅子數量為 234,因此結果應為 235

WebClient client = WebClient.builder()
  .clientConnector(httpConnector())
  .build();
verifyVoidPost(client, "http://localhost:" + port + "/ship-order");
StepVerifier.create(retrieveIntegerResponse(client.get()
    .uri("http://localhost:" + port + "/total-shipped/Deluxe Chair")))
  .assertNext(r -> assertEquals(235, r))
  .verifyComplete();

retrieveIntegerResponse()輔助函數返回響應體中的整數。

3.4. 訂閲查詢測試

訂閲查詢將一直保持活動狀態,只要我們沒有關閉連接。我們希望測試初始結果和更新。因此,我們使用 ScheduledExecutorService,以便在測試中在多個線程中使用。該服務允許在一個 Thread 中更新訂單,同時在另一個線程中驗證返回的訂單。為了使其更易於閲讀,我們使用不同的方法來執行更新:

private void addIncrementDecrementConfirmAndShipProduct(String orderId, String productId) {
    WebClient client = WebClient.builder()
      .clientConnector(httpConnector())
      .build();
    String base = "http://localhost:" + port + "/order/" + orderId;
    verifyVoidPost(client, base + "/product/" + productId);
    verifyVoidPost(client, base + "/product/" + productId + "/increment");
    // and some more
}

該方法創建並使用自己的 Web 客户端,以避免干擾用於驗證響應的客户端。

實際的測試將從執行器調用該方法,並驗證更新。

//Create two webclients, creating the id's for the test, and create an order.
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> addIncrementDecrementConfirmAndShipProduct(orderId, productId), 1L, TimeUnit.SECONDS);
try {
    StepVerifier.create(retrieveStreamingResponse(receiverClient.get()
      .uri("http://localhost:" + port + "/order-updates/" + orderId)))
      .assertNext(p -> assertTrue(p.getProducts().isEmpty()))
      //Some more assertions.
      .assertNext(p -> assertEquals(OrderStatusResponse.SHIPPED, p.getOrderStatus()))
      .thenCancel()
      .verify();
} finally {
    executor.shutdown();
}

我們應該注意到,在更新發生之前,我們等待一秒鐘以確保我們不會錯過第一次更新。我們使用一個隨機的 UUID 來生成 productId,該值用於更新和驗證結果。每次更改都應該觸發更新。

根據更新後預期狀態,我們添加一個斷言。我們需要調用 thenCancel() 以結束測試,因為訂閲如果沒有它會一直保持打開狀態。使用 finally 塊以確保我們始終關閉執行器。

4. 結論

在本文中,我們學習瞭如何為查詢添加 REST 端點。這些端點可用於構建 UI。

我們還學習瞭如何使用 <em >WebClient</em> 測試這些端點。

對於此主題的任何其他問題,請查看 Discuss AxonIQ

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.