知識庫 / REST RSS 訂閱

使用 CompletableFuture 構建多個 REST 調用

REST
HongKong
5
03:35 AM · Dec 06 ,2025

1. 引言

在創建軟件能力時,從不同來源檢索數據並將其聚合到響應中是一項日常活動。在微服務中,這些來源通常是外部 REST API。

在本教程中,我們將使用 Java 的 <em >CompletableFuture</em> 以高效的方式從多個外部 REST API 中並行檢索數據。

2. 使用並行調用 REST API 的原因

設想一個場景,我們需要更新對象中的多個字段,每個字段的值都來自外部 REST 調用。一種替代方案是依次調用每個 API 來更新每個字段。

然而,等待一個 REST 調用完成後再啓動另一個調用會增加服務的響應時間。例如,如果我們調用兩個每個花費 5 秒的 API,總時間至少為 10 秒,因為第二個調用需要等待第一個完成。

相反,我們可以並行調用所有 API,這樣總時間將等於最慢的 REST 調用時間。例如,一個調用花費 7 秒,另一個花費 5 秒。在這種情況下,我們將等待 7 秒,因為我們已並行處理所有內容,必須等待所有結果完成。

因此,並行調用是一種極佳的替代方案,可以減少服務的響應時間,使其更具可擴展性並改善用户體驗。

3. 使用 CompletableFuture 進行並行處理

Java 中的 CompletableFuture 類是一個方便的工具,用於組合和運行不同的並行任務,並處理單個任務的錯誤。

在後面的章節中,我們將使用它來組合和運行輸入列表中每個對象的三條 REST 調用。

3.1. 創建演示應用程序

首先,讓我們定義我們的目標POJO用於更新:

public class Purchase {
    String orderDescription;
    String paymentDescription;
    String buyerName;
    String orderId;
    String paymentId;
    String userId;

    // all-arg constructor, getters and setters
}

Purchase 類包含三個應更新的字段,每個字段通過不同的 REST 調用,並根據 ID 進行查詢。

首先,我們創建一個類,定義一個 RestTemplate Bean 以及用於 REST 調用域 URL:

@Component
public class PurchaseRestCallsAsyncExecutor {
    RestTemplate restTemplate;
    static final String BASE_URL = "https://internal-api.com";

    // all-arg constructor
}

現在,我們來定義 /訂單 API 調用:

public String getOrderDescription(String orderId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/orders/%s", BASE_URL, orderId),
        String.class);

    return result.getBody();
}

然後,讓我們定義支付 API 調用:

public String getPaymentDescription(String paymentId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/payments/%s", BASE_URL, paymentId),
        String.class);

    return result.getBody();
}
<p>最後,我們定義了 <code>/users</code> API 調用:</p>
public String getUserName(String userId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/users/%s", BASE_URL, userId),
        String.class);

    return result.getBody();
}

這三種方法都使用 getForEntity() 方法來發起 REST 調用,並將結果封裝在 ResponseEntity 對象中。

然後,我們調用 getBody() 方法來獲取 REST 調用中的響應體。

3.2. 使用 CompletableFuture 併發執行多個 REST 調用

現在,讓我們創建一個方法,該方法構建並運行三個 CompletableFuture

public void updatePurchase(Purchase purchase) {
    CompletableFuture.allOf(
      CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
        .thenAccept(purchase::setOrderDescription),
      CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
        .thenAccept(purchase::setPaymentDescription),
      CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
        .thenAccept(purchase::setBuyerName)
    ).join();
}

我們使用了 <em allOf()</em> 方法來構建我們 <em CompletableFuture</em> 的步驟。每個參數都是以 REST 調用及其結果為內容的另一個 <em CompletableFuture</em> 並行任務。

為了構建每個並行任務,我們首先使用 <em supplyAsync()</em> 方法提供一個 <em Supplier</em>,從中檢索我們的數據。然後,我們使用 <em thenAccept()</em> 來消費 <em supplyAsync()</em> 的結果並將其設置到 <em Purchase</em> 類中的相應字段中。

<em allOf()</em> 的結束時,我們已經構建了這些任務。沒有采取任何操作。

最後,我們調用 <em join()</em> 以並行運行所有任務並收集其結果。<strong class="note">由於<em join()是一個線程阻塞操作,因此我們只在最後一次調用,而不是在每個任務步驟中調用它。這樣做是為了優化應用程序性能,減少線程阻塞。</strong>

由於我們沒有將自定義 <em ExecutorService</em> 提供給 <em supplyAsync()</em> 方法,因此所有任務都在相同的執行器中運行。Java 默認使用 <em ForkJoinPool.commonPool()</em>

通常,為了更好地控制線程池參數,最好為 <em supplyAsync()</em> 指定自定義 <em ExecutorService</em>

3.3. 為列表中的每個元素執行多個 REST 調用

為了將我們的 updatePurchase() 方法應用於一個集合,我們只需在一個 forEach() 循環中調用它:

public void updatePurchases(List<Purchase> purchases) {
    purchases.forEach(this::updatePurchase);
}

我們的 updatePurchases() 方法接收一個 Purchase 對象的列表,並對每個元素應用先前創建的 updatePurchase() 方法。

每次調用 updatePurchases() 方法都會運行三個並行任務,正如我們在 CompletableFuture 中所定義的那樣。因此,每個購買項目都有自己的 CompletableFuture 對象,用於運行這三個並行 REST 調用。

4. 處理錯誤

在分佈式系統中,服務不可用或網絡故障是很常見的現象。這些故障可能發生在外部 REST API 中,作為我們這些 API 的客户端而無法察覺。例如,如果應用程序已停止運行,發送到網絡的請求永遠不會完成。

4.1. 優雅地處理錯誤,使用 handle() 方法

在 REST 調用執行過程中,可能會發生異常。例如,如果 API 服務不可用或我們輸入了無效參數,我們將會收到錯誤。

因此,我們可以使用 handle() 方法分別處理每個 REST 調用異常:參考

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

方法參數是一個BiFunction,包含前一個任務的結果和異常作為參數。

為了説明,我們來將handle()步驟添加到我們的一系列CompletableFuture中的步驟中:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
          .thenAccept(purchase::setPaymentDescription)
          .handle((result, exception) -> {
              if (exception != null) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

在示例中,handle() 從 setPaymentDescription() 調用中獲得 Void 類型,由 thenAccept() 觸發。

然後,它存儲在 exception 中,任何在 thenAccept() 動作中拋出的錯誤都會被存儲。因此,我們使用它來檢查是否有錯誤,並在 if 語句中正確處理該錯誤。

最後,handle() 如果沒有拋出異常,則返回傳遞給它作為參數的值;否則,它返回 null

4.2. 處理 REST 調用超時

當使用 CompletableFuture 時,我們可以指定與我們在 REST 調用中定義的類似的任務超時時間。因此,如果任務在指定時間內未完成,Java 將使用 TimeoutException 完成任務執行。

要做到這一點,讓我們修改一個 CompletableFuture 的任務以處理超時:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
          .thenAccept(purchase::setOrderDescription)
          .orTimeout(5, TimeUnit.SECONDS)
          .handle((result, exception) -> {
              if (exception instanceof TimeoutException) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

我們已將 orTimeout() 行添加到我們的 CompletableFuture 構建器中,以便在任務未在 5 秒內完成時立即停止任務執行。

我們還已在 handle() 方法中添加了一個 if 語句,以單獨處理 TimeoutException

添加對 CompletableFuture 的超時機制,可確保任務始終完成。這對於避免線程無限期地掛起,等待操作結果(如果結果可能永遠不會完成)至關重要。因此,它減少了長時間處於 RUNNING 狀態的線程數量,並提高應用程序的健康狀況。

5. 結論

當處理分佈式系統時,一個常見的任務是向不同的API發出REST調用,以構建適當的響應。

在本文中,我們看到了如何使用CompletableFuture 構建一個集合中每個對象的並行REST調用任務。

我們還看到了如何優雅地使用handle() 方法處理超時和一般異常。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.