異步 HTTP 編程與 Play 框架

REST,Web Services
Remote
0
08:07 AM · Dec 01 ,2025

1. 概述

通常,我們的 Web 服務需要使用其他 Web 服務才能完成任務。在保持低響應時間的同時,處理用户請求可能會變得困難。一個慢速的外部服務會增加我們的響應時間,並導致我們的系統堆積請求,從而消耗更多的資源。這時,採用非阻塞方法就非常有幫助。

在本教程中,我們將向一個 Play Framework 應用程序發送多個異步請求,以調用服務。通過利用 Java 的非阻塞 HTTP 功能,我們將能夠平滑地查詢外部資源,而不會影響我們自己的主邏輯。

在我們的示例中,我們將探索 Play WebService Library

2. Play WebService (WS) 庫

WS 庫是一個功能強大的庫,它使用 Java 的 Action 提供異步 HTTP 調用。

使用該庫,我們的代碼可以發送這些請求,並且不會阻塞。為了處理請求的結果,我們提供一個消費函數,即 Consumer 接口的實現。

這種模式與 JavaScript 中回調、Promisesasync/await 模式的實現有些相似。

讓我們構建一個簡單的 Consumer,用於記錄一些響應數據:

ws.url(url)
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " 請求完成:響應代碼 = " + r.getStatus() 
      + " | 響應: " + r.getBody() 
      + " | 當前時間:" + System.currentTimeMillis()))

我們的 Consumer 只是在示例中進行記錄。但是,消費者可以執行我們需要的任何操作,例如將結果存儲在數據庫中。

如果我們深入研究該庫的實現,可以觀察到 WS 封裝並配置了 Java 的 AsyncHttpClient,該組件是 JDK 標準的一部分,並且不依賴 Play。

3. 準備一個示例項目

為了實驗框架,我們創建一個單元測試來啓動請求。我們將創建一個骨架 Web 應用程序來回答這些請求,並使用 WS 框架進行 HTTP 請求。

3.1. 骨架 Web 應用程序

首先,我們使用 sbt new 命令創建初始項目:

sbt new playframework/play-java-seed.g8

在新的文件夾中,我們然後編輯 build.sbt 文件並添加 WS 庫依賴項:

libraryDependencies += javaWs

現在,我們可以使用 sbt run 命令啓動服務器:

$ sbt run
...
--- (啓動應用程序,自動刷新已啓用) ---

[info] p.c.s.AkkaHttpServer - 監聽 HTTP 在 /0:0:0:0:0:0:0:0:9000

一旦應用程序啓動,我們可以通過瀏覽 http://localhost:9000 來檢查一切是否正常,這將打開 Play 的歡迎頁面。

3.2. 測試環境

為了測試我們的應用程序,我們將使用單元測試類 HomeControllerTest

首先,我們需要擴展 WithServer,它將提供服務器生命週期:

public class HomeControllerTest extends WithServer {
}

得益於它的父類,此類現在啓動我們的骨架 Web 服務器在測試模式下,並在隨機端口上,在運行測試之前。 WithServer 類在測試完成時還會停止應用程序。

接下來,我們需要提供一個應用程序來運行。

我們可以使用 GuiceGuiceApplicationBuilder 創建它:

@Override
protected Application provideApplication() {
    return new GuiceApplicationBuilder().build();
}

最後,我們使用測試中的端口號設置服務器 URL:

@Override
@Before
public void setup() {
    OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
    if (optHttpsPort.isPresent()) {
        port = optHttpsPort.getAsInt();
        url = "https://localhost:" + port;
    } else {
        port = testServer.getRunningHttpPort()
          .getAsInt();
        url = "http://localhost:" + port;
    }
}

現在,我們準備好編寫測試。 完整的測試框架允許我們專注於編寫測試請求的代碼。

4. 準備 WSRequest

讓我們看看如何發起基本類型的請求,例如 GET 或 POST,以及用於文件上傳的多部分請求。

4.1. 初始化 WSRequest 對象

首先,我們需要獲取一個 WSClient 實例來配置和初始化我們的請求。

在實際應用中,我們可以通過依賴注入獲取配置好的客户端:

@Autowired
WSClient ws;

雖然在測試類中,我們使用 WSTestClient, 可從 Play Test framework 中獲取:

WSClient ws = play.test.WSTestClient.newClient(port);

獲取客户端後,我們可以通過調用 url 方法初始化一個 WSRequest 對象:

ws.url(url)

url 方法已經足夠讓我們發起請求。但是,我們可以通過添加自定義設置來進一步自定義它:

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + num);

正如我們所見,添加 header 和 query 參數非常簡單。

配置完成後,我們可以調用方法發起請求。

4.2. 通用 GET 請求

要觸發 GET 請求,我們只需調用 get 方法即可:

ws.url(url)
  ...
  .get();

由於這是一個非阻塞代碼,它會啓動請求,然後繼續在函數下一行執行。

get 方法返回一個 CompletionStage 實例,它屬於 CompletableFuture API。

HTTP 調用完成後,這個 stage 會執行幾條指令。它會將響應包裝在一個 WSResponse 對象中。

通常,這個結果會被傳遞到執行鏈的下一階段。在這個例子中,我們沒有提供任何消費函數,所以結果丟失了。

因此,這個請求的類型是“fire-and-forget”。

4.3. 提交表單

提交表單與 get 示例非常相似。

要觸發請求,我們只需調用 post 方法:

ws.url(url)
  ...
  .setContentType("application/x-www-form-urlencoded")
  .post("key1=value1&key2=value2");

post 方法需要傳遞一個 body 作為參數。 它可以是一個簡單的字符串,例如一個文件、一個 JSON 或 XML 文檔、一個 BodyWritable 或一個 Source

4.4. 提交多部分/表單數據

多部分表單需要我們發送輸入字段和附加文件或流的數據。

為了在框架中實現這一點,我們使用 post 方法,並傳入一個 Source

Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> file = 
  new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");

ws.url(url)
...
  .post(Source.from(Arrays.asList(file, data)));

雖然這種方法增加了更多的配置,但它仍然與其它類型的請求非常相似。

5. Process the Async Response

Up to this point, we have only triggered fire-and-forget requests, where our code doesn’t do anything with the response data.

Let’s now explore two techniques for processing an asynchronous response.

We can either block the main thread, waiting for a CompletableFuture, or consume asynchronously with a Consumer.

5.1. Process Response by Blocking With CompletableFuture

Even when using an asynchronous framework, we may choose to block our code’s execution and wait for the response.

Using the CompletableFuture API, we just need a few changes in our code to implement this scenario:

WSResponse response = ws.url(url)
  .get()
  .toCompletableFuture()
  .get();

This could be useful, for example, to provide a strong data consistency that we cannot achieve in other ways.

5.2. Process Response Asynchronously

To process an asynchronous response without blocking, we provide a Consumer or Function that is run by the asynchronous framework when the response is available.

For example, let’s add a Consumer to our previous example to log the response:

ws.url(url)
  .addHeader("key", "value")
  .addQueryParameter("num", "" + 1)
  .get()
  .thenAccept(r -> 
    log.debug("Thread#" + Thread.currentThread().getId() 
      + " Request complete: Response code = " + r.getStatus() 
      + " | Response: " + r.getBody() 
      + " | Current Time:" + System.currentTimeMillis()));

We then see the response in the logs:

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
  "Result" : "ok",
  "Params" : {
    "num" : [ "1" ]
  },
  "Headers" : {
    "accept" : [ "*/*" ],
    "host" : [ "localhost:19001" ],
    "key" : [ "value" ],
    "user-agent" : [ "AHC/2.1" ]
  }
} | Current Time:1579303109613

It’s worth noting that we used thenAccept, which requires a Consumer function since we don’t need to return anything after logging.

When we want the current stage to return something, so that we can use it in the next stage, we need thenApply instead, which takes a Function.

These use the conventions of the standard Java Functional Interfaces.

5.3. Large Response Body

The code we’ve implemented so far is a good solution for small responses and most use cases. However, if we need to process a few hundreds of megabytes of data, we’ll need a better strategy.

We should note: Request methods like get and post load the entire response in memory.

To avoid a possible OutOfMemoryError, we can use Akka Streams to process the response without letting it fill our memory.

For example, we can write its body in a file:

ws.url(url)
  .stream()
  .thenAccept(
    response -> {
        try {
            OutputStream outputStream = Files.newOutputStream(path);
            Sink<ByteString, CompletionStage<Done>> outputWriter =
              Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
            response.getBodyAsSource().runWith(outputWriter, materializer);
        } catch (IOException e) {
            log.error("An error happened while opening the output stream", e);
        }
    });

The stream method returns a CompletionStage where the WSResponse has a getBodyAsStream method that provides a Source<ByteString, ?>.

We can tell the code how to process this type of body by using Akka’s Sink, which in our example will simply write any data passing through in the OutputStream.

5.4. Timeouts

When building a request, we can also set a specific timeout, so the request is interrupted if we don’t receive the complete response in time.

This is a particularly useful feature when we see that a service we’re querying is particularly slow and could cause a pile-up of open connections stuck waiting for the response.

We can set a global timeout for all our requests using tuning parameters. For a request-specific timeout, we can add to a request using setRequestTimeout:

ws.url(url)
  .setRequestTimeout(Duration.of(1, SECONDS));

There’s still one case to handle, though: We may have received all the data, but our Consumer may be very slow processing it. This might happen if there is lots of data crunching, database calls, etc.

In low throughput systems, we can simply let the code run until it completes. However, we may wish to abort long-running activities.

To achieve that, we have to wrap our code with some futures handling.

Let’s simulate a very long process in our code:

ws.url(url)
  .get()
  .thenApply(
    result -> { 
        try { 
            Thread.sleep(10000L); 
            return Results.ok(); 
        } catch (InterruptedException e) { 
            return Results.status(SERVICE_UNAVAILABLE); 
        } 
    });

This will return an OK response after 10 seconds, but we don’t want to wait that long.

Instead, with the timeout wrapper, we instruct our code to wait for no more than 1 second:

CompletionStage<Result> f = futures.timeout(
  ws.url(url)
    .get()
    .thenApply(result -> { 
        try { 
            Thread.sleep(10000L); 
            return Results.ok(); 
        } catch (InterruptedException e) { 
            return Results.status(SERVICE_UNAVAILABLE); 
        } 
    }), 1L, TimeUnit.SECONDS);

Now our future will return a result either way: the computation result if the Consumer finished in time, or the exception due to the futures timeout.

5.5. Handling Exceptions

In the previous example, we created a function that either returns a result or fails with an exception. So, now we need to handle both scenarios.

We can handle both success and failure scenarios with the handleAsync method.

Let’s say that we want to return the result, if we’ve got it, or log the error and return the exception for further handling:

CompletionStage<Object> res = f.handleAsync((result, e) -> {
    if (e != null) {
        log.error("Exception thrown", e);
        return e.getCause();
    } else {
        return result;
    }
});

The code should now return a CompletionStage containing the TimeoutException thrown.

We can verify it by simply calling an assertEquals on the class of the exception object returned:

[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...

When running the test, it will also log the exception we received:

6. 請求過濾器

有時,我們需要在請求觸發之前運行一些邏輯。

我們可以初始化後操作 WSRequest 對象,但更優雅的技術是設置 WSRequestFilter

過濾器可以在初始化期間設置,在調用觸發方法之前設置,並附加到請求邏輯中。

我們可以通過實現 WSRequestFilter 接口來定義自己的過濾器,或者添加現成的過濾器。

常見場景是在執行請求之前記錄請求的內容。

在這種情況下,我們只需要設置 AhcCurlRequestLogger

ws.url(url)
  ...
  .setRequestFilter(new AhcCurlRequestLogger())
  ...
  .get();

生成的日誌具有 curl 類似格式:

[info] p.l.w.a.AhcCurlRequestLogger - curl \
  --verbose \
  --request GET \
  --header 'key: value' \
  'http://localhost:19001'

我們可以通過更改我們的 logback.xml 配置來設置所需的日誌級別。

7. 響應緩存

WSClient 也支持響應緩存。

此功能在相同的請求被觸發多次,並且我們不需要每次都獲取最新數據時特別有用。

它也有助於服務暫時不可用時。

7.1. 添加緩存依賴

要配置緩存,我們需要首先在我們的 build.sbt 中添加依賴項:

libraryDependencies += ehcache

這將配置 Ehcache 作為我們的緩存層。

如果我們不想使用 Ehcache,我們可以使用任何其他 JSR-107 緩存實現。

7.2. 強制緩存啓發式

默認情況下,Play WS 不會在服務器沒有返回任何緩存配置時緩存 HTTP 響應。

為了規避此問題,我們可以通過在我們的 application.conf 中添加設置來強制啓發式緩存:

play.ws.cache.heuristics.enabled=true

這將配置系統決定何時緩存 HTTP 響應,無論遠程服務提供的緩存配置如何。

8. 額外調整

向外部服務發送請求可能需要一些客户端配置。我們可能需要處理重定向、慢速服務器或根據用户代理頭進行過濾。

為了解決這個問題,我們可以調整我們的 WS 客户端,使用我們在 application.conf 中的屬性:

play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# 等待連接建立的時間
play.ws.timeout.connection=30
# 連接打開後等待數據的時間
play.ws.timeout.idle=30
# 完成請求的最大可用時間
play.ws.timeout.request=300

還可以直接配置底層的 AsyncHttpClient

完整的可用屬性列表可以在 AhcConfig 的源代碼中查看。

9. 結論

在本文中,我們探討了 Play WS 庫及其主要功能。我們配置了我們的項目,學習瞭如何發送常見請求以及處理它們的響應,包括同步和異步方式。

我們處理了大型數據下載,並瞭解瞭如何中斷長時間運行的活動。

最後,我們研究了緩存以提高性能,以及如何調整客户端。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.