1. 簡介
本教程將介紹如何使用 Temporal 工作流引擎,以及 Spring Boot 集成庫,該庫是 Temporal 的 Java SDK 的一部分。
2. 快速回顧
Temporal 是一款強大的工作流引擎,重點在於通過確定性執行提供強大的容錯能力。
我們已經在“入門教程”中介紹了 Temporal 的基本概念,在這裏我們將重點介紹其主要功能:
- 基於中心化服務,協調工作流在分佈式工作者實例上執行
- 工作流:定義完成給定業務用例的步驟序列,以及更新和/或查詢其狀態的方法
- 活動:執行與特定步驟相對應的操作
工作流和活動只是開發者使用支持的語言和相應的 SDK 編寫的代碼。
3. Temporal 的 Spring Boot 集成概述
Temporal 的 Spring Boot 集成模塊補充了基礎 Java SDK,為使用 Spring Framework 的項目提供了一些實用功能:
- 使用標準依賴注入機制自動註冊工作流和活動
- 聲明式配置工作流隊列
- 提供一個自動配置的 WorkflowClient 實例,作為應用程序中的普通 Bean 使用
- 只需更改幾個屬性即可輕鬆從本地開發服務器切換到內存測試服務器或生產服務器。
在實踐中,使用此模塊只需向項目的 pom 文件添加一個額外的依賴項。
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-spring-boot-starter</artifactId>
<version>1.32.0</version>
</dependency>
最新版本的該依賴項可在 Maven Central 上獲取:https://mvnrepository.com/artifact/io.temporal/temporal-spring-boot-starter。
4. 訂單處理示例
在本教程中,我們將使用 Temporal 的 Spring Boot 集成模塊創建一個簡單的訂單處理工作流程。以下是我們將實現的流程的圖形表示,使用 BPMN 符號:
該圖展示了我們必須協調的活動和事件的序列,以完成整個訂單處理工作流實例。
當然,這是一個非常簡化的流程,但它包含可能存在於真實場景中的功能:
- 並行執行:創建發貨和請求付款
- 外部事件處理:付款被接受/拒絕、發貨進度事件
- 處理超時:取貨超時、送達超時
- 流程級別故障處理:處理退貨事件、退款付款
5. 應用結構
該 Order 應用遵循其他 Spring Boot 應用中常見的結構。
在應用的高層,我們有 OrderApplication 類,它僅包含應用的入口點。在其下方,我們有用於不同 Artifact 的子包:
- workflow:Workflow 接口和實現
- activities:Activities 接口和實現
- services:用於執行與活動相關任務的服務
- domain:在整個應用中使用的值對象
- adapter.rest:控制器,用於暴露一個簡單的 REST API,允許客户端提交訂單、向實時實例發送事件以及查詢有關它們的有關信息
- config:Spring 配置類
最後,但並非最不重要,我們需要定義一些屬性,以便自動配置機制可以啓動引擎的環境。這是一個示例 application.yaml 文件,它定義了運行我們示例所需的最小屬性集:
spring:
temporal:
connection:
target: local
workers-auto-discovery:
packages:
- "com.baeldung.temporal.workflows.sboot.order"
所有與Temporal相關的屬性都應使用 spring.temporal 前綴。
在本例中,我們定義的目標連接為 local,它是指向本地運行開發服務器的別名,該服務器以標準端口 7233 接收請求。
workers-auto-discovery 屬性期望一個包名列表,這些包名將遞歸地掃描包含 Temporal 相關注解的類和/或接口。
6. 工作流接口
我們將首先創建 <em >OrderWorkflow</em> 接口,並添加 <em >@WorkflowInterface</em> 註解,以代表我們的訂單業務流程。我們定義了一個 <em >processOrder()</em> 方法,並在其中應用 <em >@WorkflowMethod</em> 註解,以便 Temporal 知道它是工作流的入口點。該方法接受一個 <em >OrderSpec</em> 記錄,該記錄包含有關購買的商品、賬單、運輸和客户信息。
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
void processOrder(OrderSpec spec);
// ... other methods omitted
}接下來,我們添加了帶有@SignalMethod-註解的方法,這些方法對應於工作流程中期望的消息:
@WorkflowInterface
public interface OrderWorkflow {
// ... other methods omitted
@SignalMethod
void paymentAuthorized(String transactionId, String authorizationId);
@SignalMethod
void paymentDeclined(String transactionId, String cause);
@SignalMethod
void packagePickup(Instant pickupTime);
@SignalMethod
void packageDelivered(Instant pickupTime);
@SignalMethod
void packageReturned(Instant pickupTime);
}請注意,我們使用 @SignalMethod 而不是 @UpdateMethod,因為在我們的情況下,客户端不需要等待這些調用被處理。 這也被稱為“一鍵釋放”模式,與 @UpdateMethod 調用不同,後者會阻止客户端直到方法完成。
為了完成工作流接口,我們將添加 @QueryMethod 方法,這將允許我們觀察實例的內部狀態:
@WorkflowInterface
public interface OrderWorkflow {
// ... other methods omitted
@QueryMethod
Order getOrder();
@QueryMethod
Shipping getShipping();
@QueryMethod
PaymentAuthorization getPayment();
@QueryMethod
RefundRequest getRefund();
}
7. 活動接口
活動接口作為一種外觀(facade),提供方法,這些方法對應於工作流實例在其生命週期中執行的活動。這意味着,通常情況下,我們將有方法與BPMN圖中的活動盒對應,映射關係較為直接。
@ActivityInterface
public interface OrderActivities {
void reserveOrderItems(Order order);
void cancelReservedItems(Order order);
void returnOrderItems(Order order);
void dispatchOrderItems(Order order);
PaymentAuthorization createPaymentRequest(Order order, BillingInfo billingInfo);
RefundRequest createRefundRequest(PaymentAuthorization payment);
Shipping createShipping(Order order);
Shipping updateShipping(Shipping shipping, ShippingStatus status);
}
8. 工作流程實施
移動到實施階段,我們可以看到業務流程有兩個需要我們注意的功能:
- 並行執行:在預訂訂單項後,流程會分支成兩個流,一個創建支付授權請求,另一個創建發貨
- 阻塞在互斥事件上:超時或(互斥)的消息
讓我們來看一下如何實現這些功能,首先從並行執行開始:
@Service
@WorkflowImpl(taskQueues = "ORDERS")
public class OrderWorkflowImpl implements OrderWorkflow {
// ... fields and constructor omitted
@Override
public void processOrder(OrderSpec spec) {
// ... order initialization omitted
activities.reserveOrderItems(spec.order());
// Create a payment request
Async.function(() -> payment = activities.createPaymentRequest(spec.order(), spec.billingInfo()));
// Create a shipping request
shipping = activities.createShipping(spec.order());
// ... workflow logic omitted
}
// ... other methods omitted
}關鍵在於使用 Async.function() 在另一個線程中運行活動。 由於我們只有兩個分支,因此我們在後台執行支付請求,同時繼續在主線程中執行活動。
創建送貨訂單後,必須等待支付。這裏,我們將使用 Workflow.await(),它接受一個返回布爾值的 lambda 表達式:
Workflow.await(() -> payment != null && payment.status() != PaymentStatus.PENDING);
在這種情況下,我們首先必須檢查我們是否已經有支付以及它的狀態。這樣做是出於對 null 檢查,因為 payment 類實例變量將異步設置,因此我們不能假設它已經具有值。
另一個關鍵方面是 lambda 表達式,它們絕對不能有副作用。 這是 Temporal 的要求,與確定性工作流執行原則密切相關,這是正確錯誤恢復的基礎。
9. 信號和查詢方法實現
與主流程方法相比,這些方法相當簡單。以下是處理 paymentAuthorized 消息的信號處理器的實現:
@Override
public void paymentAuthorized(String transactionId, String authorizationId) {
Workflow.await(() -> payment != null);
payment = new PaymentAuthorization(
payment.info(),
PaymentStatus.APPROVED,
payment.orderId(),
transactionId,
authorizationId,
null
);
}
請注意使用 Workflow.await() 以確保工作流已創建初始支付請求。我們需要這個檢查,因為客户端可能會在主工作流方法開始之前調用 signal、update 和 query 方法。
必須意識到,與 signal 和 update 方法不同,query 方法不能使用 Workflow.await() – 這將導致運行時異常! 如果客户端請求不可用的信息,返回值必須能夠清楚地表明這種情況:
@Override
public PaymentAuthorization getPayment() {
return payment;
}
在這裏,該方法如果被在工作流程填充值之前調用,則簡單地返回 null。
10. 活動實施
<em>OrderActivitiesImpl</em> 類實現了 <em>OrderActivities</em> 接口。這裏是真正的核心邏輯所在,因為它的方法負責與工作流程的支持服務進行交互。
例如,讓我們來看 <em>reserveInventoryItems</em> 方法:
@Service
@ActivityImpl(taskQueues = "ORDERS")
public class OrderActivitiesImpl implements OrderActivities {
// ... fields and constructors omitted
@Override
public void reserveOrderItems(Order order) {
for (OrderItem item : order.items()) {
inventoryService.reserveInventory(item.sku(), item.quantity());
}
}
// ... other activities omitted
}該活動會遍歷訂單項並請求inventoryService從可用庫存中預留它們。
11. REST API
該API的主要目的是保護客户端免受 Temporal 特定的知識影響。雖然並非嚴格要求,但為了避免過度依賴供應商,通常採用這種機制。
如果需要,該API可以作為完全獨立的模塊存在。為了簡化,我們將其添加到同一項目中。這是該API的@PostMapping方法,用於創建新的工作流實例:
@RestController
@RequestMapping("/order")
public class OrderApi {
// ...fields and constructor omitted
@PostMapping
public ResponseEntity<OrderCreationResponse> createOrder(@RequestBody OrderSpec orderSpec) {
var execution = orderService.createOrderWorkflow(orderSpec);
var location = UriComponentsBuilder.fromUriString("/order/{orderExecutionId}").build(execution);
return ResponseEntity.created(location).body(new OrderCreationResponse(execution));
}
// ... other endpoint methods omitted
}OrderService 是WorkflowClient(Temporal 提供的一個 Bean)的薄層,它提取了一些樣板代碼:
@Service
public class OrderService {
private final WorkflowClient workflowClient;
public OrderService(WorkflowClient workflowClient) {
this.workflowClient = workflowClient;
}
public OrderWorkflow getWorkflow(String orderExecutionId) {
return workflowClient.newWorkflowStub(OrderWorkflow.class, orderExecutionId);
}
public String createOrderWorkflow(OrderSpec orderSpec) {
var uuid = UUID.randomUUID();
var wf = workflowClient.newWorkflowStub(
OrderWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("ORDERS")
.setWorkflowId(uuid.toString()).build());
var execution = WorkflowClient.start(wf::processOrder, orderSpec);
return execution.getWorkflowId();
}
}12. 測試
工作流應用需要測試,以模擬與調用服務及其相關故障模式之間的複雜交互。 在這種情況下,儘管單元測試仍然對驗證系統特定部分有價值,但通常會補充集成測試。
一種組織這些測試的方式是創建“暢通路徑”及其相關替代路徑的場景。 重點應在於覆蓋給定工作流實例所能採取的所有可能路徑,這意味着在實踐中,我們應力求實現主要工作流方法的 100% 代碼覆蓋率。
這個示例 展示了“暢通路徑”測試用例的實現。 值得注意的點:
- 測試使用內存中的 Temporal 測試服務器。 要啓用它,我們已將 spring.temporal.test-server.enabled 屬性設置為 true
- 所有測試都由對 REST API 的調用驅動,由 @SpringBootTest 註解創建
- 為了模擬支付網關的響應,我們需要內部對工作流的支付請求數據。 請注意,由於此信息可能在工作流邏輯創建它時才可用,因此我們以循環方式查詢此信息。
當使用嵌入式測試服務器時,一個非常有用的功能是跳過時間的選項。 這對於測試具有花費數小時甚至數天的活動的工作流至關重要。
要使用此功能,我們使用 sleep() 方法,該方法可在 TestWorkflowEnvironment 服務中提供,該服務可用於 Spring 測試中的注入。 這樣,我們就可以使用此方法來模擬 Order 工作流中的提貨超時:
@Test
public void whenPickupTimeout_thenItemsReturnToStock() {
// ... order creation steps (omitted)
// Fast-forward 1 day to force a the delivery timeout
testEnv.sleep(Duration.ofDays(1));
// Wait until the workflow completes
testEnv.getWorkflowClient().newUntypedWorkflowStub(orderExecutionId).getResult(Void.class);
// ... Check for order cancelled and itens returned to stock (omitted)
}當測試代碼調用 sleep() 時,引擎會將其內部時鐘推進指定的時長,從而確保任何待完成的 await 調用都能相應地完成。
13. 結論
在本文中,我們介紹瞭如何使用 Temporal 工作流引擎與 Spring Boot 結合使用。以訂單履行為例,我們展示瞭如何設置項目並利用自動配置支持以實現簡化設置。
我們還更詳細地介紹了實現真實世界工作流所需的技巧,包括並行執行、消息和超時。
如往常一樣,所有代碼均可在 GitHub 上獲取。