使用 CompletableFuture 處理多個 REST 調用

REST
Remote
0
03:32 AM · Dec 01 ,2025

1. 引言

在創建軟件能力時,從不同來源檢索數據並將其聚合到響應中是一個日常活動。在微服務中,這些來源通常是外部 REST API。

在本教程中,我們將使用 Java 的 CompletableFuture 以高效的方式從多個外部 REST API 中並行檢索數據。

2. 使用並行調用 REST API 的原因

設想一個場景,我們需要更新對象中的多個字段,每個字段的值來自外部 REST 調用。一種替代方案是依次調用每個 API 來更新每個字段。

但是,等待一個 REST 調用完成後再啓動另一個調用會增加服務的響應時間。例如,如果我們調用兩個每個持續 5 秒的 API,總時間至少為 10 秒,因為第二個調用需要等待第一個完成。

相反,我們可以並行調用所有 API,這樣總時間將等於最慢的 REST 調用時間。例如,一個調用持續 7 秒,另一個持續 5 秒。在這種情況下,我們將等待 7 秒,因為我們已並行處理所有內容,並且必須等待所有結果完成。

因此,並行調用是一種極好的替代方案,可以減少服務的響應時間,使其更具可擴展性並改善用户體驗。

3. 使用 CompletableFuture 進行並行化

Java 中的 CompletableFuture 類是一個方便的工具,用於組合和運行不同的並行任務,並處理單個任務的錯誤。

在以下部分,我們將使用它來組合和運行每個輸入列表中的三個 REST 調用。

3.1. 創建演示應用程序

首先,我們定義目標 POJO 的更新:

public class Purchase {
    String orderDescription;
    String paymentDescription;
    String buyerName;
    String orderId;
    String paymentId;
    String userId;

    // all-arg constructor, getters and setters
}

Purchase 類有三個字段需要更新,每個字段通過一個 ID 查詢一個 REST 調用。

首先,我們創建一個定義 RestTemplate 豆和 REST 調用用於域名的類:

@Component
public class PurchaseRestCallsAsyncExecutor {
    RestTemplate restTemplate;
    static final String BASE_URL = "https://internal-api.com";

    // all-arg constructor
}

現在,我們定義 /orders API 調用:

public String getOrderDescription(String orderId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/orders/%s", BASE_URL, orderId),
        String.class);

    return result.getBody();
}

然後,我們定義 /payments API 調用:

public String getPaymentDescription(String paymentId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/payments/%s", BASE_URL, paymentId),
        String.class);

    return result.getBody();
}

最後,我們定義 /users API 調用:

public String getUserName(String userId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/users/%s", BASE_URL, userId),
        String.class);

    return result.getBody();
}

所有三個方法都使用 getForEntity() 方法來執行 REST 調用,並將結果包裝在 ResponseEntity 對象中。

然後,我們調用 getBody() 來從 REST 調用中獲取響應正文。

3.2. 使用 CompletableFuture 執行多個 REST 調用

現在,我們創建一個用於構建和運行三個 CompletableFuture 的方法:

public void updatePurchase(Purchase purchase) {
    CompletableFuture.allOf(
      CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
        .thenAccept(purchase::setOrderDescription),
      CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
        .thenAccept(purchase::setPaymentDescription),
      CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
        .thenAccept(purchase::setBuyerName)
    ).join();
}

我們使用了 allOf() 方法來構建我們的 CompletableFuture 的步驟。每個參數是另一個 CompletableFuture 的形式,它構建了另一個 CompletableFuture,它使用 REST 調用和其結果。

為了構建每個並行任務,我們首先使用了 supplyAsync() 方法來提供 Supplier,從中我們檢索數據。然後,我們使用 thenAccept() 來消費 supplyAsync() 從中檢索的結果,並將其設置為 Purchase 類中的相應字段。

allOf() 的末尾,我們剛剛構建了任務,沒有采取任何行動。

最後,我們調用 join() 在末尾運行所有任務並行並收集其結果。 由於 join() 是一個線程阻塞操作,因此我們只在末尾調用它,而不是在每個任務步驟中調用它。 這是為了優化應用程序的性能,通過減少線程阻塞次數。

由於我們沒有向 supplyAsync() 方法提供自定義 ExecutorService,因此所有任務都在相同的執行器中運行。 默認情況下,Java 使用 ForkJoinPool.commonPool()

通常,為了更好地控制線程池參數,最好指定自定義 ExecutorServicesupplyAsync()

3.3. 執行每個列表中的多個 REST 調用

為了將 updatePurchase() 方法應用於集合,我們可以簡單地在 forEach() 循環中調用它:

public void updatePurchases(List<Purchase> purchases) {
    purchases.forEach(this::updatePurchase);
}

我們的 updatePurchases() 方法接收一個 Purchase 的列表,並將先前創建的 updatePurchase() 方法應用於每個元素。

每次調用 updatePurchases() 時,都會運行三個並行任務,如 CompletableFuture 中定義的那樣。 因此,每個購買都有自己的 CompletableFuture 對象,用於在並行 REST 調用中運行。

4. Handling Errors

在分佈式系統中,服務不可用或網絡故障是很常見的。這些故障可能發生在外部 REST API 中,而作為這些 API 的客户端,我們可能無法感知到。例如,如果應用程序已停止,發送到網絡的請求永遠不會完成。

4.1. 使用 優雅地處理錯誤

在 REST 調用執行過程中可能會發生異常。例如,如果 API 服務已停止或我們輸入了無效參數,我們將會收到錯誤。

因此,我們可以使用 方法分別處理每個 REST 調用異常:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

該方法參數是一個 ,包含上一個任務的結果和異常作為參數。

為了説明,讓我們將 步驟添加到我們 的一個步驟中:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
          .thenAccept(purchase::setPaymentDescription)
          .handle((result, exception) -> {
              if (exception != null) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

在上述示例中, 中獲得一個 類型,該類型由 通過調用 調用的。

然後,它將任何在 操作中拋出的錯誤存儲在 中。因此,我們使用它來檢查是否存在錯誤並正確地在 語句中處理它。

最後, 如果沒有拋出異常,則返回作為參數傳遞的值;否則,返回

4.2. 處理 REST 調用超時

當我們使用 時,我們可以指定與我們在 REST 調用中定義的類似的任務超時。因此,如果任務在指定時間內未完成,Java 將使用 結束任務執行。

要做到這一點,讓我們修改我們 的一個任務,以處理超時:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
          .thenAccept(purchase::setOrderDescription)
          .orTimeout(5, TimeUnit.SECONDS)
          .handle((result, exception) -> {
              if (exception instanceof TimeoutException) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

我們已經在 構造器中添加了 行,如果任務在 秒內未完成,則強制停止任務執行。

我們還在 方法中添加了一個 語句,以單獨處理

添加超時可確保任務始終完成。這對於避免線程無限期地等待操作的結果非常重要,而該操作可能永遠不會完成。 這減少了長時間處於 狀態中的線程數量,並提高了應用程序的健康狀況。

5. 結論

在處理分佈式系統時,一個常見的任務是向不同的API發出REST調用,以構建適當的響應。

在本文中,我們看到了如何使用CompletableFuture 構建一個集合中每個對象的並行REST調用任務。

我們還看到了如何優雅地處理超時和一般異常,使用handle()方法。

發佈 評論

Some HTML is okay.