1. 概述
Axon 框架幫助我們構建事件驅動的微服務系統。在《Axon 框架指南》中,我們通過一個簡單的 Axon Spring Boot 應用程序,瞭解了 Axon。該應用程序可以創建和更新訂單,並且可以確認和發貨這些訂單。
在《Axon 框架中查詢調度》中,我們向 OrderQueryService 添加了一些額外的查詢。
查詢通常用於 UI 界面,這些界面通常調用 REST 端點。
在本教程中,我們將 創建所有查詢的 REST 端點。我們還將從這些端點中進行集成測試。
2. 使用查詢在 REST 端點
我們可以通過向一個帶有 <em @RestController</em> 註解的類添加函數來添加 REST 端點。我們將使用名為 <em OrderRestEndpoint</em> 的類來實現這一點。 之前我們直接在控制器中使用 <em QueryGateway</em>。 我們將替換注入的 <em QueryGateway</em> 用於 <em OrderQueryService</em>,該服務我們在 Axon Framework 中實現為分發查詢。 這樣,控制器函數的唯一關注點就是將行為綁定到 REST 路徑。
所有端點都列在項目中的 <em order-api.http</em> 文件中。 藉助該文件,我們可以使用 IntelliJ 作為我們的 IDE 調用端點。
2.1. 點對點查詢
點對點查詢僅有一個響應,因此易於實現:
@GetMapping("/all-orders")
public CompletableFuture<List<OrderResponse>> findAllOrders() {
return orderQueryService.findAllOrders();
}Spring 等待 CompletableFuture 解決後,會返回 JSON 格式的負載。我們可以通過調用 localhost:8080/all-orders 來測試它,以獲取所有訂單在一個數組中。
在乾淨的設置中,如果首先使用向 http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768 和 http://localhost:8080/ship-order 發送 POST 請求添加兩個訂單,當我們調用 http://localhost:8080/all-orders 時,我們應該看到以下內容:
[
{
"orderId": "72d67527-a27c-416e-a904-396ebf222344",
"products": {
"Deluxe Chair": 1
},
"orderStatus": "SHIPPED"
},
{
"orderId": "666a1661-474d-4046-8b12-8b5896312768",
"products": {},
"orderStatus": "CREATED"
}
]2.2. 實時查詢
實時查詢將返回一個事件流,並最終關閉。我們可以等待該流關閉後,在完成時發送響應。但是,直接流式傳輸更有效。我們通過利用 Server-Send 事件來實現這一點:
@GetMapping(path = "/all-orders-streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> allOrdersStreaming() {
return orderQueryService.allOrdersStreaming();
}通過添加媒體類型,Spring 瞭解到我們希望以服務器端事件的形式接收響應。這意味着每個訂單都會單獨發送。如果客户端支持服務器端事件,localhost:8080/all-orders-streaming 將會逐個返回所有訂單。
與點對點查詢保持數據庫中的相同項目,將產生如下結果:
data:{"orderId":"72d67527-a27c-416e-a904-396ebf222344","products":{"Deluxe Chair":1},"orderStatus":"SHIPPED"}
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}這分別代表兩個獨立的服務器發送事件。
2.3. 散搜查詢。
將 Axon 查詢返回的響應組合的邏輯已經存在於 OrderQueryService 中。這使得 實現與點對點查詢非常相似,因為只有一個響應。例如,使用散搜查詢添加端點:
@GetMapping("/total-shipped/{product-id}")
public Integer totalShipped(@PathVariable("product-id") String productId) {
return orderQueryService.totalShipped(productId);
}調用 http://localhost:8080/total-shipped/Deluxe Chair 將返回總共已運送的椅子數量,包括來自 LegacyQueryHandler 的 234 個。如果 ship-order 調用中的一個仍然存在於數據庫中,它應該返回 235。
2.4. 訂閲查詢
與流式查詢不同,訂閲查詢可能永遠不會結束。因此,等待訂閲查詢完成並非一個選項。 我們將再次利用服務器端事件來添加端點:
@GetMapping(path = "/order-updates/{order-id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<OrderResponse> orderUpdates(@PathVariable("order-id") String orderId) {
return orderQueryService.orderUpdates(orderId);
}調用 http://localhost:8080/order-updates/666a1661-474d-4046-8b12-8b5896312768 將會提供關於該產品的更新流。 通過向 http://localhost:8080/order/666a1661-474d-4046-8b12-8b5896312768/product/a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3dd 發送 POST 請求,我們觸發了更新。 該更新以 Server-Send 事件的形式發送。
我們將看到初始狀態和更新後的狀態。 連接保持開放以接收進一步的更新。
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{},"orderStatus":"CREATED"}
data:{"orderId":"666a1661-474d-4046-8b12-8b5896312768","products":{"a6aa01eb-4e38-4dfb-b53b-b5b82961fbf3":1},"orderStatus":"CREATED"}正如我們所見,此次更新包含了我們添加的產品。
3. 集成測試
對於集成測試,我們將使用 WebClient。
對於這些測試,我們將使用@SpringBootTest,首先通過其他 REST 端點更改狀態。這些其他 REST 端點會觸發一個或多個命令來創建事件。為了創建訂單,我們將使用在《Axon 框架指南》中添加的端點。我們使用 @DirtiesContext 註解,因此一個測試中創建的事件不會影響另一個測試。
我們不運行集成測試期間的 Axon Server,而是將 axon.axonserver.enabled=false 設置在 application.properties 中,位於我們的 src/test/resources 目錄中。 這樣,我們就可以使用非分佈式網關,這些網關運行速度更快,並且不需要 Axon Server。 這些網關處理三種不同類型的消息。
我們可以創建輔助方法以使我們的測試更具可讀性。 這些輔助函數提供正確的類型並設置 HTTP 標頭(如果需要)。 例如:
private void verifyVoidPost(WebClient client, String uri) {
StepVerifier.create(retrieveResponse(client.post()
.uri(uri)))
.verifyComplete();
}這對於調用具有空返回類型的 post 端點非常有用。它將使用 retrieveResponse() 輔助函數進行調用並驗證其完成情況。 類似的需求經常出現,只需要幾行代碼。 通過將它們放在輔助函數中,我們可以使測試更具可讀性和可維護性。
3.1. 測試點對點查詢
為了測試 /all-orders REST 端點,我們首先創建一個訂單,然後驗證是否可以檢索到創建的訂單。要做到這一點,我們首先需要創建一個 <em >WebClient</em>。WebClient 是一個反應式實例,可用於執行 HTTP 調用。在創建訂單後,我們檢索所有訂單並驗證結果:
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
createRandomNewOrder(client);
StepVerifier.create(retrieveListResponse(client.get()
.uri("http://localhost:" + port + "/all-orders")))
.expectNextMatches(list -> 1 == list.size() && list.get(0)
.getOrderStatus() == OrderStatusResponse.CREATED)
.verifyComplete();由於其反應式特性,我們可以使用 StepVerifier 從 reactor-test 來驗證響應。
我們期望列表中只有一個 Order,即我們剛剛創建的那個。 此外,我們期望該 Order 具有 CREATED 訂單狀態。
3.2. 實時查詢測試
實時查詢可能會返回多個訂單。 我們還需要測試流是否完成。 為了進行測試,我們將創建三個新的隨機訂單,然後測試實時查詢的響應:
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
for (int i = 0; i < 3; i++) {
createRandomNewOrder(client);
}
StepVerifier.create(retrieveStreamingResponse(client.get()
.uri("http://localhost:" + port + "/all-orders-streaming")))
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.expectNextMatches(o -> o.getOrderStatus() == OrderStatusResponse.CREATED)
.verifyComplete();在流程結束時調用 verifyComplete() 以確保流程已關閉。需要注意的是,在某種情況下,可以實現流式查詢,使其不完成。在這種情況下,它確實完成,並且需要進行驗證。
3.3. 測試散佈-收集查詢
為了測試散佈-收集查詢,我們需要確保來自多個處理器的結果得到合併。我們使用一個端點發送一把椅子。然後我們檢索所有已發送的椅子。 由於 LegacyQueryHandler 返回椅子數量為 234,因此結果應為 235。
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
verifyVoidPost(client, "http://localhost:" + port + "/ship-order");
StepVerifier.create(retrieveIntegerResponse(client.get()
.uri("http://localhost:" + port + "/total-shipped/Deluxe Chair")))
.assertNext(r -> assertEquals(235, r))
.verifyComplete();retrieveIntegerResponse()輔助函數返回響應體中的整數。
3.4. 訂閲查詢測試
訂閲查詢將一直保持活動狀態,只要我們沒有關閉連接。我們希望測試初始結果和更新。因此,我們使用 ScheduledExecutorService,以便在測試中在多個線程中使用。該服務允許在一個 Thread 中更新訂單,同時在另一個線程中驗證返回的訂單。為了使其更易於閲讀,我們使用不同的方法來執行更新:
private void addIncrementDecrementConfirmAndShipProduct(String orderId, String productId) {
WebClient client = WebClient.builder()
.clientConnector(httpConnector())
.build();
String base = "http://localhost:" + port + "/order/" + orderId;
verifyVoidPost(client, base + "/product/" + productId);
verifyVoidPost(client, base + "/product/" + productId + "/increment");
// and some more
}該方法創建並使用自己的 Web 客户端,以避免干擾用於驗證響應的客户端。
實際的測試將從執行器調用該方法,並驗證更新。
//Create two webclients, creating the id's for the test, and create an order.
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> addIncrementDecrementConfirmAndShipProduct(orderId, productId), 1L, TimeUnit.SECONDS);
try {
StepVerifier.create(retrieveStreamingResponse(receiverClient.get()
.uri("http://localhost:" + port + "/order-updates/" + orderId)))
.assertNext(p -> assertTrue(p.getProducts().isEmpty()))
//Some more assertions.
.assertNext(p -> assertEquals(OrderStatusResponse.SHIPPED, p.getOrderStatus()))
.thenCancel()
.verify();
} finally {
executor.shutdown();
}我們應該注意到,在更新發生之前,我們等待一秒鐘以確保我們不會錯過第一次更新。我們使用一個隨機的 UUID 來生成 productId,該值用於更新和驗證結果。每次更改都應該觸發更新。
根據更新後預期狀態,我們添加一個斷言。我們需要調用 thenCancel() 以結束測試,因為訂閲如果沒有它會一直保持打開狀態。使用 finally 塊以確保我們始終關閉執行器。
4. 結論
在本文中,我們學習瞭如何為查詢添加 REST 端點。這些端點可用於構建 UI。
我們還學習瞭如何使用 <em >WebClient</em> 測試這些端點。
對於此主題的任何其他問題,請查看 Discuss AxonIQ。