1. 概述
本教程將深入探討 Java 響應式編程,以解決如何將 Flux<DataBuffer> 讀取到一個單一 InputStream 的有趣問題。
2. 請求設置
為了解決將 Flux
String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";
接下來,我們定義 getWebClient() 方法,用於獲取 WebClient 類的新實例:
static WebClient getWebClient() {
WebClient.Builder webClientBuilder = WebClient.builder();
return webClientBuilder.build();
}此時,我們將向 /public/v2/users 端點發出一個 GET 請求。但是,我們必須將響應體作為 Flux<DataBuffer> 對象獲取。因此,我們繼續到下一部分關於 BodyExtractors 以實現這一點。
3. BodyExtractors 和 DataBufferUtils
我們可以使用 BodyExtractors 類中提供的 toDataBuffers() 方法,將響應體提取為 Flux<DataBuffer>。
讓我們創建一個 body 實例,類型為 Flux<DataBuffer>:
Flux<DataBuffer> body = client
.get(
.uri(REQUEST_ENDPOINT)
.exchangeToFlux( clientResponse -> {
return clientResponse.body(BodyExtractors.toDataBuffers());
});接下來,由於我們需要將這些 流收集到一個單一的 中,使用 和 是一個不錯的策略。
此外,我們打算將數據寫入 ,並最終從 中讀取。 讓我們看看如何創建這兩個連接的流:
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);我們必須注意的是,默認大小為 1024 字節。然而,我們預計從 Flux<DataBuffer> 收集到的結果可能超過默認值。因此,我們需要明確指定更大的值作為大小,在本例中是 1024*10。
最後,我們使用 DataBufferUtils 類中提供的 write() 實用方法,將 body 作為發佈者寫入到 outputStream 中:
DataBufferUtils.write(body, outputStream).subscribe();我們必須注意到,在聲明時,我們已經將 inputStream 連接到 outputStream。因此,我們可以從 inputStream 中讀取數據。接下來,讓我們進入下一部分,以查看其實際運行情況。
4. 從 PipedInputStream 讀取內容
首先,我們定義一個輔助方法 readContent(),用於將一個 InputStream 讀取為一個 String 對象:
String readContent(InputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
byte[] tmp = new byte[stream.available()];
int byteCount = stream.read(tmp, 0, tmp.length);
contentStringBuffer.append(new String(tmp));
return String.valueOf(contentStringBuffer);
}
String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
try {
Thread pipeReader = new Thread(() -> {
try {
contentStringBuffer.append(readContent(stream));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
pipeReader.start();
pipeReader.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stream.close();
}
return String.valueOf(contentStringBuffer);
}目前,我們的代碼已準備好用於模擬。讓我們看看它的運行效果:
WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));由於我們正在處理一個異步系統,因此在從流中讀取之前,我們會在讀取前延遲 3 秒,以便查看完整的響應。此外,在日誌記錄時,我們插入換行符以將長輸出分成多行。
最後,讓我們驗證代碼執行產生的輸出:
20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content:
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]
這就完成了!看起來我們已經搞定了所有內容。
5. 結論
在本文中,我們利用了管道流的概念以及BodyExtractors 和 DataBufferUtils 類中提供的實用方法,將 Flux<DataBuffer> 讀取到一個單一的 InputStream 中。