1. 引言
在創建軟件能力時,從不同來源檢索數據並將其聚合到響應中是一項日常活動。在微服務中,這些來源通常是外部 REST API。
在本教程中,我們將使用 Java 的 <em >CompletableFuture</em> 以高效的方式從多個外部 REST API 中並行檢索數據。
2. 使用並行調用 REST API 的原因
設想一個場景,我們需要更新對象中的多個字段,每個字段的值都來自外部 REST 調用。一種替代方案是依次調用每個 API 來更新每個字段。
然而,等待一個 REST 調用完成後再啓動另一個調用會增加服務的響應時間。例如,如果我們調用兩個每個花費 5 秒的 API,總時間至少為 10 秒,因為第二個調用需要等待第一個完成。
相反,我們可以並行調用所有 API,這樣總時間將等於最慢的 REST 調用時間。例如,一個調用花費 7 秒,另一個花費 5 秒。在這種情況下,我們將等待 7 秒,因為我們已並行處理所有內容,必須等待所有結果完成。
因此,並行調用是一種極佳的替代方案,可以減少服務的響應時間,使其更具可擴展性並改善用户體驗。
3. 使用 CompletableFuture 進行並行處理
Java 中的 CompletableFuture 類是一個方便的工具,用於組合和運行不同的並行任務,並處理單個任務的錯誤。
在後面的章節中,我們將使用它來組合和運行輸入列表中每個對象的三條 REST 調用。
3.1. 創建演示應用程序
首先,讓我們定義我們的目標POJO用於更新:
public class Purchase {
String orderDescription;
String paymentDescription;
String buyerName;
String orderId;
String paymentId;
String userId;
// all-arg constructor, getters and setters
}該 Purchase 類包含三個應更新的字段,每個字段通過不同的 REST 調用,並根據 ID 進行查詢。
首先,我們創建一個類,定義一個 RestTemplate Bean 以及用於 REST 調用域 URL:
@Component
public class PurchaseRestCallsAsyncExecutor {
RestTemplate restTemplate;
static final String BASE_URL = "https://internal-api.com";
// all-arg constructor
}現在,我們來定義 /訂單 API 調用:
public String getOrderDescription(String orderId) {
ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/orders/%s", BASE_URL, orderId),
String.class);
return result.getBody();
}然後,讓我們定義支付 API 調用:
public String getPaymentDescription(String paymentId) {
ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/payments/%s", BASE_URL, paymentId),
String.class);
return result.getBody();
}<p>最後,我們定義了 <code>/users</code> API 調用:</p>
public String getUserName(String userId) {
ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/users/%s", BASE_URL, userId),
String.class);
return result.getBody();
}這三種方法都使用 getForEntity() 方法來發起 REST 調用,並將結果封裝在 ResponseEntity 對象中。
然後,我們調用 getBody() 方法來獲取 REST 調用中的響應體。
3.2. 使用 CompletableFuture 併發執行多個 REST 調用
現在,讓我們創建一個方法,該方法構建並運行三個 CompletableFuture:
public void updatePurchase(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription),
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription),
CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
.thenAccept(purchase::setBuyerName)
).join();
}我們使用了 <em allOf()</em> 方法來構建我們 <em CompletableFuture</em> 的步驟。每個參數都是以 REST 調用及其結果為內容的另一個 <em CompletableFuture</em> 並行任務。
為了構建每個並行任務,我們首先使用 <em supplyAsync()</em> 方法提供一個 <em Supplier</em>,從中檢索我們的數據。然後,我們使用 <em thenAccept()</em> 來消費 <em supplyAsync()</em> 的結果並將其設置到 <em Purchase</em> 類中的相應字段中。
在 <em allOf()</em> 的結束時,我們已經構建了這些任務。沒有采取任何操作。
最後,我們調用 <em join()</em> 以並行運行所有任務並收集其結果。<strong class="note">由於<em join()是一個線程阻塞操作,因此我們只在最後一次調用,而不是在每個任務步驟中調用它。這樣做是為了優化應用程序性能,減少線程阻塞。</strong>
由於我們沒有將自定義 <em ExecutorService</em> 提供給 <em supplyAsync()</em> 方法,因此所有任務都在相同的執行器中運行。Java 默認使用 <em ForkJoinPool.commonPool()</em>。
通常,為了更好地控制線程池參數,最好為 <em supplyAsync()</em> 指定自定義 <em ExecutorService</em>。
3.3. 為列表中的每個元素執行多個 REST 調用
為了將我們的 updatePurchase() 方法應用於一個集合,我們只需在一個 forEach() 循環中調用它:
public void updatePurchases(List<Purchase> purchases) {
purchases.forEach(this::updatePurchase);
}我們的 updatePurchases() 方法接收一個 Purchase 對象的列表,並對每個元素應用先前創建的 updatePurchase() 方法。
每次調用 updatePurchases() 方法都會運行三個並行任務,正如我們在 CompletableFuture 中所定義的那樣。因此,每個購買項目都有自己的 CompletableFuture 對象,用於運行這三個並行 REST 調用。
4. 處理錯誤
在分佈式系統中,服務不可用或網絡故障是很常見的現象。這些故障可能發生在外部 REST API 中,作為我們這些 API 的客户端而無法察覺。例如,如果應用程序已停止運行,發送到網絡的請求永遠不會完成。
4.1. 優雅地處理錯誤,使用 handle() 方法
在 REST 調用執行過程中,可能會發生異常。例如,如果 API 服務不可用或我們輸入了無效參數,我們將會收到錯誤。
因此,我們可以使用 handle() 方法分別處理每個 REST 調用異常:參考。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)方法參數是一個BiFunction,包含前一個任務的結果和異常作為參數。
為了説明,我們來將handle()步驟添加到我們的一系列CompletableFuture中的步驟中:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
.thenAccept(purchase::setPaymentDescription)
.handle((result, exception) -> {
if (exception != null) {
// handle exception
return null;
}
return result;
})
).join();
}在示例中,handle() 從 setPaymentDescription() 調用中獲得 Void 類型,由 thenAccept() 觸發。
然後,它存儲在 exception 中,任何在 thenAccept() 動作中拋出的錯誤都會被存儲。因此,我們使用它來檢查是否有錯誤,並在 if 語句中正確處理該錯誤。
最後,handle() 如果沒有拋出異常,則返回傳遞給它作為參數的值;否則,它返回 null。
4.2. 處理 REST 調用超時
當使用 CompletableFuture 時,我們可以指定與我們在 REST 調用中定義的類似的任務超時時間。因此,如果任務在指定時間內未完成,Java 將使用 TimeoutException 完成任務執行。
要做到這一點,讓我們修改一個 CompletableFuture 的任務以處理超時:
public void updatePurchaseHandlingExceptions(Purchase purchase) {
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
.thenAccept(purchase::setOrderDescription)
.orTimeout(5, TimeUnit.SECONDS)
.handle((result, exception) -> {
if (exception instanceof TimeoutException) {
// handle exception
return null;
}
return result;
})
).join();
}我們已將 orTimeout() 行添加到我們的 CompletableFuture 構建器中,以便在任務未在 5 秒內完成時立即停止任務執行。
我們還已在 handle() 方法中添加了一個 if 語句,以單獨處理 TimeoutException。
添加對 CompletableFuture 的超時機制,可確保任務始終完成。這對於避免線程無限期地掛起,等待操作結果(如果結果可能永遠不會完成)至關重要。因此,它減少了長時間處於 RUNNING 狀態的線程數量,並提高應用程序的健康狀況。
5. 結論
當處理分佈式系統時,一個常見的任務是向不同的API發出REST調用,以構建適當的響應。
在本文中,我們看到了如何使用CompletableFuture 構建一個集合中每個對象的並行REST調用任務。
我們還看到了如何優雅地使用handle() 方法處理超時和一般異常。