1. 引言
在分佈式系統中使用外部 Web 依賴項並保持低延遲是一個關鍵任務。
在本教程中,我們將使用 OpenFeign 和 CompletableFuture 並行化多個 HTTP 請求,處理錯誤,並設置網絡和線程超時時間。
2. 設置演示應用程序
為了説明並行請求的使用方法,我們將創建一個功能,允許客户在網站上購買商品。首先,服務會向系統發送一個請求,以獲取根據客户所在國家/地區獲取可用的支付方式。 其次,它會向客户生成一份關於購買的報告,該報告不包含支付方式的信息。
因此,讓我們首先添加依賴項,以便與 spring-cloud-starter-openfeign 配合使用:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>3. 創建外部依賴客户端
現在,讓我們使用帶有 @FeignClient 註解,創建兩個客户端,它們分別指向 localhost:8083:
@FeignClient(name = "paymentMethodClient", url = "http://localhost:8083")
public interface PaymentMethodClient {
@RequestMapping(method = RequestMethod.GET, value = "/payment_methods")
String getAvailablePaymentMethods(@RequestParam(name = "site_id") String siteId);
}我們的第一個客户端名為 paymentMethodClient。它通過調用 GET /payment_methods 來獲取可用的支付方法,並使用 site_id 請求參數表示客户的國家/地區。
現在,讓我們看看我們的第二個客户端:
@FeignClient(name = "reportClient", url = "http://localhost:8083")
public interface ReportClient {
@RequestMapping(method = RequestMethod.POST, value = "/reports")
void sendReport(@RequestBody String reportRequest);
}我們將其命名為 reportClient,它通過調用 POST /reports 接口生成採購報告。
4. 創建並行請求執行器
依次調用這兩個客户端即可滿足演示應用程序的要求。在這種情況下,該API的總響應時間至少等於兩個請求的響應時間之和。
值得注意的是,報告中不包含支付方法信息,因此這兩個請求是獨立的。因此,我們可以並行執行任務,從而將我們的API的總響應時間降低到與最慢請求的響應時間大致相同。
在下一部分,我們將看到如何創建HTTP調用並行執行器以及處理外部錯誤。
4.1. 創建並行執行器
因此,讓我們使用 CompletableFuture 創建一個並行化兩個請求的服務:
@Service
public class PurchaseService {
private final PaymentMethodClient paymentMethodClient;
private final ReportClient reportClient;
// all-arg constructor
public String executePurchase(String siteId) throws ExecutionException, InterruptedException {
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() ->
paymentMethodClient.getAvailablePaymentMethods(siteId));
CompletableFuture.runAsync(() -> reportClient.sendReport("Purchase Order Report"));
return String.format("Purchase executed with payment method %s", paymentMethodsFuture.get());
}
}executePurchase() 方法首先向獲取可用支付方式發出一個並行任務,使用supplyAsync()。然後,我們提交另一個並行任務來生成報告,使用runAsync()。最後,我們使用get()檢索支付方法結果,並返回完整的結果。
supplyAsync() 和 runAsync() 這兩個任務的選擇是因為這兩種方法具有不同的特性。supplyAsync() 方法返回 GET 調用結果。另一方面,runAsync() 不返回任何內容,因此更適合生成報告。
另一個差異在於,runAsync() 在我們調用代碼時會立即啓動一個新的線程,而無需線程池進行任務調度。相比之下,supplyAsync() 任務可能會根據線程池中是否有其他任務進行調度或延遲。
為了驗證我們的代碼,讓我們使用 WireMock 進行集成測試:
@BeforeEach
public void startWireMockServer() {
wireMockServer = new WireMockServer(8083);
configureFor("localhost", 8083);
wireMockServer.start();
stubFor(post(urlEqualTo("/reports"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value())));
}
@AfterEach
public void stopWireMockServer() {
wireMockServer.stop();
}
@Test
void givenRestCalls_whenBothReturnsOk_thenReturnCorrectResult() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.OK.value()).withBody("credit_card")));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method credit_card", result);
}在上述測試中,我們首先配置了一個 WireMockServer,使其在 localhost:8083 上啓動,並在完成時使用 @BeforeEach 和 @AfterEach 註解關閉。
然後,在測試場景方法中,我們使用了兩個樁,它們在調用兩個Feign客户端時返回一個 200 HTTP狀態碼。最後,我們使用 assertEquals() 驗證了並行執行器的正確結果。
4.2. 使用 exceptionally() 處理外部 API 錯誤
如果 GET 請求 /payment_methods 失敗並返回 404 HTTP 狀態碼,表明該國家/地區沒有可用的支付方式,那麼 採取一些措施是有幫助的,例如返回默認值。
要處理 CompletableFuture 中的錯誤,請將以下 exceptionally() 代碼塊添加到我們的 paymentMethodsFuture 中:
CompletableFuture <String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.exceptionally(ex -> {
if (ex.getCause() instanceof FeignException &&
((FeignException) ex.getCause()).status() == 404) {
return "cash";
});現在,如果返回了 404 錯誤,則返回默認支付方式,即 現金。
@Test
void givenRestCalls_whenPurchaseReturns404_thenReturnDefault() throws ExecutionException, InterruptedException {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withStatus(HttpStatus.NOT_FOUND.value())));
String result = purchaseService.executePurchase("BR");
assertNotNull(result);
assertEquals("Purchase executed with payment method cash", result);
}5. 為並行任務和網絡請求添加超時
當調用外部依賴時,我們無法確定請求執行所需的時間。因此,如果請求執行時間過長,在適當的時候應該放棄該請求。考慮到這一點,我們可以添加兩種類型:一個 FeignClient 和一個 CompletableFuture 超時。
5.1. 為 Feign 客户端添加網絡超時
這種超時機制適用於單次請求在網絡層面的數據傳輸。因此,它會在單個請求的網絡層面上中斷與外部依賴的連接。
我們可以使用 Spring Boot 自動配置來為 FeignClient 配置超時設置:
feign.client.config.paymentMethodClient.readTimeout: 200
feign.client.config.paymentMethodClient.connectTimeout: 100在上述 <em style="font-style: italic;">application.properties</em> 文件中,我們設置了 <em style="font-style: italic;">PaymentMethodClient</em> 的讀取和連接超時時間。數值以毫秒為單位。
連接超時時間指示 feign 客户端在閾值超過後中斷 TCP 手動握針嘗試。同樣,讀取超時會在連接建立成功但無法從套接字讀取數據時中斷請求。
然後,我們可以在我們的並行執行器中 <em style="font-style: italic;">exceptionally()</em> 塊中處理這種錯誤。
if (ex.getCause() instanceof RetryableException) {
// handle TCP timeout
throw new RuntimeException("TCP call network timeout!");
}為了驗證正確行為,我們可以添加另一個測試場景:
@Test
void givenRestCalls_whenPurchaseRequestWebTimeout_thenReturnDefault() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(250)));
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: REST call network timeout!", error.getMessage());
}在這裏,我們使用了 方法,延遲時間為 毫秒,以模擬 TCP 超時。
5.2. 添加線程超時
另一方面,線程超時會停止整個 CompletableFuture 的內容,而不僅僅是單個請求嘗試。例如,對於Feign客户端的重試,原始請求的時間和重試嘗試也會在評估超時閾值時被計算。
要配置線程超時,我們可以稍微修改我們的支付方法 CompletableFuture:
CompletableFuture<String> paymentMethodsFuture = CompletableFuture.supplyAsync(() -> paymentMethodClient.getAvailablePaymentMethods(siteId))
.orTimeout(400, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
// exception handlers
});然後,我們可以在 exceptionally()</em/> 塊中處理威脅超時錯誤:
if (ex instanceof TimeoutException) {
// handle thread timeout
throw new RuntimeException("Thread timeout!", ex);
}因此,我們可以驗證它是否正常工作:
@Test
void givenRestCalls_whenPurchaseCompletableFutureTimeout_thenThrowNewException() {
stubFor(get(urlEqualTo("/payment_methods?site_id=BR"))
.willReturn(aResponse().withFixedDelay(450)));
Throwable error = assertThrows(ExecutionException.class, () -> purchaseService.executePurchase("BR"));
assertEquals("java.lang.RuntimeException: Thread timeout!", error.getMessage());
}我們已增加更長的延遲到/payments_method/,使其通過網絡超時閾值測試,但在線程超時測試中失敗。
6. 結論
在本文中,我們學習瞭如何使用 <em >CompletableFuture</em> 和 <em >FeignClient</em> 並行執行兩個外部依賴請求的方法。
我們還看到了如何為程序在時間閾值後中斷添加網絡和線程超時。
最後,我們使用 <em >CompletableFuture.exceptionally()</em> 優雅地處理了 <em >404</em> API 和超時錯誤。