1. 概述
通常,我們的 Web 服務需要使用其他 Web 服務才能完成任務。在保持低響應時間的同時,處理用户請求可能會變得困難。一個緩慢的外部服務會增加我們的響應時間,並導致系統請求堆積,從而消耗更多資源。這時,採用非阻塞方法就非常有幫助。
在本教程中,我們將從 Play Framework 應用程序中向一個服務發送多個異步請求。通過利用 Java 的非阻塞 HTTP 功能,我們可以在不影響自身主邏輯的情況下,平滑地查詢外部資源。
在我們的示例中,我們將探索 Play WebService 庫。
2. Play WebService (WS) 庫
WS 庫是一個功能強大的庫,它使用 Java 的 Action 提供異步 HTTP 調用。
通過使用該庫,我們的代碼可以發送這些請求,並在不阻塞的情況下繼續執行。為了處理請求的結果,我們提供了一個消費函數,即 Consumer 接口的實現。
這種模式與 JavaScript 中回調、Promises 和 async/await 模式的實現有一些相似之處。
讓我們構建一個簡單的 Consumer,用於記錄一些響應數據:
ws.url(url)
.thenAccept(r ->
log.debug("Thread#" + Thread.currentThread().getId()
+ " Request complete: Response code = " + r.getStatus()
+ " | Response: " + r.getBody()
+ " | Current Time:" + System.currentTimeMillis()))我們的消費者僅在此示例中進行登錄。儘管如此,消費者可以執行我們需要的任何操作,例如將結果存儲在數據庫中。
如果我們深入研究該庫的實現,可以觀察到 WS 封裝並配置了 Java 的 AsyncHttpClient,它是標準 JDK 的一部分,並且不依賴於 Play。
3. 準備一個示例項目
為了試驗該框架,我們創建一個單元測試來發起請求。我們將創建一個骨架 Web 應用程序來響應這些請求,並使用 WS 框架進行 HTTP 請求。
3.1. 骨架 Web 應用程序
首先,我們使用 sbt new 命令創建初始項目:
sbt new playframework/play-java-seed.g8在新的文件夾中,我們隨後編輯 build.sbt文件,並添加 WS 庫依賴項:
libraryDependencies += javaWs現在我們可以使用 sbt run 命令啓動服務器:
$ sbt run
...
--- (Running the application, auto-reloading is enabled) ---
[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000一旦應用程序啓動,我們可以通過瀏覽 http://localhost:9000 來檢查一切是否正常,這將打開 Play 的歡迎頁面。
3.2. 測試環境
為了測試我們的應用程序,我們將使用單元測試類 HomeControllerTest。
首先,我們需要擴展 WithServer,它將提供服務器生命週期:
public class HomeControllerTest extends WithServer {
多虧了其父類,此類現在啓動我們的骨架 Web 服務器在測試模式下,並在隨機端口上,然後再運行測試。 WithServer 類在測試完成時也會停止應用程序。
接下來,我們需要提供一個應用程序來運行。
我們可以使用 Guice 的 GuiceApplicationBuilder 創建它:
@Override
protected Application provideApplication() {
return new GuiceApplicationBuilder().build();
}
最後,我們設置了用於測試的服務器 URL,使用測試服務器提供的端口號:
@Override
@Before
public void setup() {
OptionalInt optHttpsPort = testServer.getRunningHttpsPort();
if (optHttpsPort.isPresent()) {
port = optHttpsPort.getAsInt();
url = "https://localhost:" + port;
} else {
port = testServer.getRunningHttpPort()
.getAsInt();
url = "http://localhost:" + port;
}
}現在我們準備好編寫測試。全面的測試框架使我們能夠專注於編寫測試請求。
4. 準備 WSRequest
讓我們看看如何發起基本類型的請求,例如 GET 或 POST,以及用於文件上傳的多部分請求。
4.1. 初始化 WSRequest 對象
首先,我們需要獲取一個 WSClient 實例,以便配置和初始化我們的請求。
在實際應用中,我們可以通過依賴注入獲取一個配置了默認設置的客户端:
@Autowired
WSClient ws;在我們的測試類中,我們使用 `WSTestClient,該組件可從 Play 測試框架 中獲取。
WSClient ws = play.test.WSTestClient.newClient(port);一旦我們獲得客户,就可以通過調用 url 方法來初始化一個 WSRequest 對象:
ws.url(url)URL 方法足以讓我們發起請求。但是,我們可以通過添加自定義設置進一步定製它:
ws.url(url)
.addHeader("key", "value")
.addQueryParameter("num", "" + num);如我們所見,添加標題和查詢參數非常簡單。
在我們完全配置請求後,我們可以調用該方法來啓動它。
4.2. 通用 GET 請求
要觸發 GET 請求,只需在我們的 WSRequest 對象上調用 get 方法:
ws.url(url)
...
.get();由於這是一個非阻塞代碼,它會啓動請求並繼續在函數下一行的執行。
**返回的由 get 產生的對象是一個 CompletionStage 實例,它是 CompletableFuture API 的一部分。
一旦 HTTP 調用完成,這個階段會執行幾條指令。它將響應包裝在一個 WSResponse 對象中。
通常,這個結果會被傳遞給執行鏈的下一階段。在這個例子中,我們沒有提供任何消費函數,因此結果丟失。
因此,這個請求是“一鍵啓動,一鍵放棄”類型。
4.3. 提交表單
提交表單與之前的get示例並沒有本質區別。
為了觸發請求,我們只需調用post方法:
ws.url(url)
...
.setContentType("application/x-www-form-urlencoded")
.post("key1=value1&key2=value2");在本場景中,我們需要將一個體內容作為參數傳遞。 這可以是簡單的字符串,例如文件,JSON 或 XML 文檔,BodyWritable 或 Source。
4.4. 提交多部分/表單數據
多部分表單需要我們同時發送輸入字段和附加文件或流中的數據。
為了在框架中實現這一點,我們使用 post 方法,並使用 Source。
在 Source 內部,我們可以封裝表單所需的各種數據類型:
Source<ByteString, ?> file = FileIO.fromPath(Paths.get("hello.txt"));
FilePart<Source<ByteString, ?>> file =
new FilePart<>("fileParam", "myfile.txt", "text/plain", file);
DataPart data = new DataPart("key", "value");
ws.url(url)
...
.post(Source.from(Arrays.asList(file, data)));雖然這種方法增加了更多的配置,但它與其它類型的請求仍然非常相似。
5. 處理異步響應
在此之前,我們僅觸發了“點並忘”類型的請求,我們的代碼不會對響應數據進行任何處理。
現在,讓我們探索兩種處理異步響應的技術。
我們可以選擇阻塞主線程,等待一個 CompletableFuture,或者異步消費,使用 Consumer。
5.1. 通過阻塞使用 CompletableFuture
即使在使用異步框架中,我們也可以選擇阻塞代碼的執行並等待響應。
通過使用 CompletableFuture API,只需在代碼中進行少量修改即可實現此場景:
WSResponse response = ws.url(url)
.get()
.toCompletableFuture()
.get();這可能很有用,例如,可以提供強大的數據一致性,而我們無法通過其他方式實現。
5.2. 異步處理響應
為了異步處理響應,而無需阻塞,我們提供一個 Consumer 或 Function,該對象將在響應可用時由異步框架執行。
例如,讓我們為我們之前的示例添加一個 Consumer 以記錄響應:
ws.url(url)
.addHeader("key", "value")
.addQueryParameter("num", "" + 1)
.get()
.thenAccept(r ->
log.debug("Thread#" + Thread.currentThread().getId()
+ " Request complete: Response code = " + r.getStatus()
+ " | Response: " + r.getBody()
+ " | Current Time:" + System.currentTimeMillis()));我們隨後在日誌中查看了響應:
[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: {
"Result" : "ok",
"Params" : {
"num" : [ "1" ]
},
"Headers" : {
"accept" : [ "*/*" ],
"host" : [ "localhost:19001" ],
"key" : [ "value" ],
"user-agent" : [ "AHC/2.1" ]
}
} | Current Time:1579303109613值得注意的是,我們使用了 thenAccept,因為它需要一個 Consumer 函數,因為我們不需要在日誌後返回任何內容。
當我們需要當前階段返回一些內容,以便在下一階段使用時,則需要使用 thenApply,它接受一個 Function。
這些用法遵循標準 Java 函數接口的約定。
5.3. 大型響應體
我們目前實現的代碼對於小型響應和大多數用例來説是一個不錯的解決方案。但是,如果需要處理數百兆甚至更大容量的數據,我們需要採用更有效的策略。
需要注意的是:<em style="font-style: italic;">get</em> 和 <em style="font-style: italic;">post</em> 等請求方法會將整個響應加載到內存中。
為了避免可能發生的 <em style="font-style: italic;">OutOfMemoryError</em> 錯誤,我們可以使用 Akka Streams 在不將響應加載到內存中的情況下進行處理。
例如,我們可以將其內容寫入文件:
ws.url(url)
.stream()
.thenAccept(
response -> {
try {
OutputStream outputStream = Files.newOutputStream(path);
Sink<ByteString, CompletionStage<Done>> outputWriter =
Sink.foreach(bytes -> outputStream.write(bytes.toArray()));
response.getBodyAsSource().runWith(outputWriter, materializer);
} catch (IOException e) {
log.error("An error happened while opening the output stream", e);
}
});stream 方法返回一個 CompletionStage,其中 WSResponse 具有 getBodyAsStream 方法,該方法提供一個 Source<ByteString, ?>。
我們可以通過使用 Akka 的 Sink 來告訴代碼如何處理這種類型的體,在我們的示例中,它將簡單地將通過 OutputStream 傳遞的任何數據寫入。
5.4. 超時設置
當構建請求時,我們可以設置特定的超時時間,如果我們在規定時間內沒有收到完整的響應,則請求會被中斷。
此功能在查詢速度特別慢的服務時尤其有用,可以避免大量連接被阻塞,等待響應。
我們可以使用調優參數為所有請求設置全局超時時間。要為特定請求設置超時時間,可以使用 setRequestTimeout 方法。
ws.url(url)
.setRequestTimeout(Duration.of(1, SECONDS));雖然還有一種情況需要處理:我們可能已經接收到了所有數據,但我們的消費者(Consumer)處理這些數據可能會非常緩慢。這可能是由於大量的數據計算、數據庫調用等造成的。
在低吞吐量系統中,我們可以簡單地讓代碼運行直到它完成。但是,我們可能希望中止長時間運行的活動。
為了實現這一點,我們需要用未來(futures)處理我們的代碼。
讓我們在代碼中模擬一個非常耗時的過程:
ws.url(url)
.get()
.thenApply(
result -> {
try {
Thread.sleep(10000L);
return Results.ok();
} catch (InterruptedException e) {
return Results.status(SERVICE_UNAVAILABLE);
}
});這將返回一個 OK 響應,但我們不想等待這麼長時間。
相反,使用 timeout 包裝器,我們指示代碼最多等待 1 秒。
CompletionStage<Result> f = futures.timeout(
ws.url(url)
.get()
.thenApply(result -> {
try {
Thread.sleep(10000L);
return Results.ok();
} catch (InterruptedException e) {
return Results.status(SERVICE_UNAVAILABLE);
}
}), 1L, TimeUnit.SECONDS);
現在,無論結果如何,我們的未來都會返回結果:如果消費者在規定時間內完成,則返回計算結果;否則,返回因期貨超時引起的異常。
5.5. 處理異常
在之前的示例中,我們創建了一個函數,該函數要麼返回結果,要麼拋出異常。因此,現在我們需要處理這兩個場景。
我們可以使用 handleAsync 方法來處理成功和失敗的場景。
假設我們想要在獲取到結果時返回結果,或者在發生錯誤時記錄錯誤並返回異常以便進一步處理:
CompletionStage<Object> res = f.handleAsync((result, e) -> {
if (e != null) {
log.error("Exception thrown", e);
return e.getCause();
} else {
return result;
}
});
代碼現在應該返回一個包含 TimeoutException 異常的 CompletionStage。
我們可以通過簡單地在返回的異常對象類上調用 assertEquals 來驗證它。
Class<?> clazz = res.toCompletableFuture().get().getClass();
assertEquals(TimeoutException.class, clazz);當運行測試時,還會記錄我們接收到的異常:
[error] c.HomeControllerTest - Exception thrown
java.util.concurrent.TimeoutException: Timeout after 1 second
...6. 請求過濾器
有時,我們需要在請求觸發之前運行一些邏輯。
我們可以初始化後操縱 WSRequest 對象,但更優雅的技術是設置 WSRequestFilter。
過濾器可以在初始化期間、在調用觸發方法之前設置,並附加到請求邏輯中。
我們可以通過實現 WSRequestFilter 接口來定義自己的過濾器,或者添加現成的過濾器。
常見場景是在執行請求之前記錄請求的內容。
在這種情況下,我們只需要設置 AhcCurlRequestLogger:
ws.url(url)
...
.setRequestFilter(new AhcCurlRequestLogger())
...
.get();結果日誌具有curl 類似格式:
[info] p.l.w.a.AhcCurlRequestLogger - curl \
--verbose \
--request GET \
--header 'key: value' \
'http://localhost:19001'我們可以通過修改我們的 logback.xml 配置來設置所需的日誌級別。
7. 響應緩存
WSClient 也支持響應緩存。
此功能在相同的請求被多次觸發時特別有用,並且我們不需要每次都獲取最新數據。
它也有助於在調用服務暫時不可用時提供幫助。
7.1. 添加緩存依賴
為了配置緩存,我們需要首先在我們的 build.sbt 中添加依賴:
libraryDependencies += ehcache此配置將 Ehcache 用作我們的緩存層。
如果我們不想使用 Ehcache,我們可以使用任何其他 JSR-107 緩存實現。
7.2. 強制緩存啓發式
默認情況下,Play WS 不會緩存 HTTP 響應,除非服務器沒有返回任何緩存配置。
為了規避此限制,我們可以通過在 application.conf 中添加設置來強制啓用啓發式緩存:
play.ws.cache.heuristics.enabled=true這將配置系統決定何時緩存 HTTP 響應,無論遠程服務是否聲明可以緩存。
8. 額外調整
向外部服務發送請求可能需要一些客户端配置。 我們可能需要處理重定向、慢速服務器或根據用户代理標頭進行過濾。
為了解決這個問題,我們可以調整我們的 WS 客户端,使用在 application.conf 中的屬性:
play.ws.followRedirects=false
play.ws.useragent=MyPlayApplication
play.ws.compressionEnabled=true
# time to wait for the connection to be established
play.ws.timeout.connection=30
# time to wait for data after the connection is open
play.ws.timeout.idle=30
# max time available to complete the request
play.ws.timeout.request=300還可以配置底層的 AsyncHttpClient 直接。
完整的可用屬性列表可以在 AhcConfig 的源代碼中找到。
9. 結論
在本文中,我們探討了 Play WS 庫及其主要功能。我們配置了我們的項目,學習瞭如何發送常見請求以及處理它們的響應,無論是同步還是異步。
我們處理了大型數據下載,並瞭解瞭如何中斷長時間運行的活動。
最後,我們研究了緩存以提高性能,以及如何調整客户端。