知識庫 / Reactive RSS 訂閱

使用 Spring Reactive WebClient 將 Flux 轉換為單個 InputStream

Reactive,Spring Web
HongKong
10
11:58 AM · Dec 06 ,2025

1. 概述

本教程將深入探討 Java 響應式編程,以解決如何將 Flux<DataBuffer> 讀取到一個單一 InputStream 的有趣問題。

2. 請求設置

為了解決將 Flux 讀取到一個單一 InputStream 的問題,我們首先將使用 Spring reactive WebClient 發送一個 GET 請求。此外,我們還可以利用 gorest.co.in 提供的公共 API 端點,用於此類測試場景:

String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";

接下來,我們定義 getWebClient() 方法,用於獲取 WebClient 類的新實例:

static WebClient getWebClient() {
    WebClient.Builder webClientBuilder = WebClient.builder();
    return webClientBuilder.build();
}

此時,我們將向 /public/v2/users 端點發出一個 GET 請求。但是,我們必須將響應體作為 Flux<DataBuffer> 對象獲取。因此,我們繼續到下一部分關於 BodyExtractors 以實現這一點。

3. BodyExtractorsDataBufferUtils

我們可以使用 BodyExtractors 類中提供的 toDataBuffers() 方法,將響應體提取為 Flux<DataBuffer>

讓我們創建一個 body 實例,類型為 Flux<DataBuffer>

Flux<DataBuffer> body = client
  .get(
  .uri(REQUEST_ENDPOINT)
    .exchangeToFlux( clientResponse -> {
        return clientResponse.body(BodyExtractors.toDataBuffers());
    });

接下來,由於我們需要將這些 流收集到一個單一的 中,使用 是一個不錯的策略。

此外,我們打算將數據寫入 ,並最終從 中讀取。 讓我們看看如何創建這兩個連接的流:

PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);

我們必須注意的是,默認大小為 1024 字節。然而,我們預計從 Flux<DataBuffer> 收集到的結果可能超過默認值。因此,我們需要明確指定更大的值作為大小,在本例中是 1024*10

最後,我們使用 DataBufferUtils 類中提供的 write() 實用方法,將 body 作為發佈者寫入到 outputStream 中:

DataBufferUtils.write(body, outputStream).subscribe();

我們必須注意到,在聲明時,我們已經將 inputStream 連接到 outputStream。因此,我們可以從 inputStream 中讀取數據。接下來,讓我們進入下一部分,以查看其實際運行情況。

4. 從 PipedInputStream 讀取內容

首先,我們定義一個輔助方法 readContent(),用於將一個 InputStream 讀取為一個 String 對象:

String readContent(InputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    byte[] tmp = new byte[stream.available()];
    int byteCount = stream.read(tmp, 0, tmp.length);
    contentStringBuffer.append(new String(tmp));
    return String.valueOf(contentStringBuffer);
}

接下來,由於使用 常規做法PipedInputStream在不同的線程中讀取,所以我們創建一個readContentFromPipedInputStream()方法,該方法內部會啓動一個新的線程來將內容從PipedInputStream讀取到一個String對象,通過調用readContent()方法實現:

String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
    StringBuffer contentStringBuffer = new StringBuffer();
    try {
        Thread pipeReader = new Thread(() -> {
            try {
                contentStringBuffer.append(readContent(stream));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        pipeReader.start();
        pipeReader.join();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        stream.close();
    }

    return String.valueOf(contentStringBuffer);
}

目前,我們的代碼已準備好用於模擬。讓我們看看它的運行效果:

WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));

由於我們正在處理一個異步系統,因此在從流中讀取之前,我們會在讀取前延遲 3 秒,以便查看完整的響應。此外,在日誌記錄時,我們插入換行符以將長輸出分成多行。

最後,讓我們驗證代碼執行產生的輸出:

20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content: 
[{"id":2642,"name":"Bhupen Trivedi","email":"[email protected]","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"[email protected]","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"[email protected]","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"[email protected]","gender":"female","status":"inactive"}
]

這就完成了!看起來我們已經搞定了所有內容。

5. 結論

在本文中,我們利用了管道流的概念以及BodyExtractorsDataBufferUtils 類中提供的實用方法,將 Flux<DataBuffer> 讀取到一個單一的 InputStream 中。

發佈 評論

Some HTML is okay.