知識庫 / Spring / Spring AI RSS 訂閱

Spring AI ChatClient 流式響應

Spring AI
HongKong
5
10:39 AM · Dec 06 ,2025

1. 概述

在標準的 REST 響應中,服務器會等待收到整個 payload 之後再將其返回給客户端。然而,大型語言模型 (LLM) 通常以 token 為單位生成輸出,產生完整響應需要花費大量時間。

這導致了等待完整響應的延遲,尤其是在輸出涉及大量 token 的情況下。流式響應通過分塊發送數據來解決這個問題。

在本教程中,我們將探索如何使用 Spring AI 的 ChatClient 返回流式聊天響應,而不是一次性發送整個響應。

2. Maven 依賴

讓我們首先將 Spring AI OpenAI 依賴 添加到我們的 <em >pom.xml</em >> 中:

<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-starter-model-openai</artifactId>
    <version>1.0.2</version>
</dependency>

我們需要一個 Web 容器來演示聊天響應流式傳輸。我們可以選擇以下依賴項:spring-boot-starter-webspring-boot-starter-webflux

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

3. 常用組件

在探索不同的流式傳輸方法之前,讓我們為後續章節創建一個常用組件。<em>ChatRequest</em> 類包含我們 API 調用中的報負載。

public class ChatRequest {
    @NotNull
    private String prompt;

    // constructor, getter and setter
}

對於以下章節,我們將向我們的端點發送以下聊天請求。這樣做是故意為的,目的是讓聊天模型產生一個冗長的響應,以便我們展示流式傳輸。

{
    "prompt": "Tell me a story about a girl loves a boy, around 250 words"
}

現在,我們已經準備就緒,可以繼續探索不同的流式傳輸方法。

4. 實時流式傳輸

為了提供更逼真的體驗,我們不想在等待完整響應之前將其返回給客户端。我們可以將響應實時流式傳輸給客户端。Spring AI 默認情況下,會逐字逐句地將聊天響應流式傳輸。

讓我們創建一個 <em >ChatService</em > 以啓用從 <em >ChatClient</em > 中流式傳輸聊天響應。關鍵在於我們調用 <em >stream()</em > 並將響應作為 <em >Flux&lt;String&gt;</em >> 返回。

@Component
public class ChatService {
    private final ChatClient chatClient;

    public ChatService(ChatModel chatModel) {
        this.chatClient = ChatClient.builder(chatModel)
          .build();
    }

    public Flux<String> chat(String prompt) {
        return chatClient.prompt()
          .user(userMessage -> userMessage.text(prompt))
          .stream()
          .content();
    }
}

啓用聊天響應流式傳輸有2個條件。首先,REST控制器必須返回一個 Flux<String> 對象。其次,響應內容類型必須設置為 text/event-stream:

@RestController
@Validated
public class ChatController {
    private final ChatService chatService;

    public ChatController(ChatService chatService) {
        this.chatService = chatService;
    }

    @PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> chat(@RequestBody @Valid ChatRequest request) {
        return chatService.chat(request.getPrompt());
    }
}

現在,一切都已準備就緒。我們可以啓動 Spring Boot 應用程序,並使用 Postman 向 REST 端點發送聊天請求。

執行後,我們可以看到 Postman 中顯示的響應體是行優先,其中每一行都是服務器端事件。

從響應中,我們可以看到 Spring AI 以逐字流式傳輸響應。 這允許客户端立即開始消耗結果,無需等待完整響應。 這種方式提供了極低的延遲,讓用户感覺就像實時輸入。

5. 逐塊流式傳輸

儘管逐詞流式傳輸具有很高的響應速度,但它也可能顯著增加開銷。

我們可以通過將詞語彙集在一起形成更大的塊,並返回該塊,而不是單個詞語來降低開銷。 這樣可以使流式傳輸更高效,同時保留漸進式流式傳輸的體驗。

我們可以修改我們的 chat() 方法,並調用 transform 方法於 Flux 上,直到塊大小達到 100:

@Component
public class ChatService {
    private final ChatClient chatClient;

    public ChatService(ChatModel chatModel) {
        this.chatClient = ChatClient.builder(chatModel)
          .build();
    }

    public Flux<String> chat(String prompt) {
        return chatClient.prompt()
          .user(userMessage -> userMessage.text(prompt))
          .stream()
          .content()
          .transform(flux -> toChunk(flux, 100));
    }

    private Flux<String> toChunk(Flux<String> tokenFlux, int chunkSize) {
        return Flux.create(sink -> {
            StringBuilder buffer = new StringBuilder();
            tokenFlux.subscribe(
              token -> {
                  buffer.append(token);
                  if (buffer.length() >= chunkSize) {
                      sink.next(buffer.toString());
                      buffer.setLength(0);
                  }
              },
              sink::error,
              () -> {
                  if (buffer.length() > 0) {
                      sink.next(buffer.toString());
                  }
                  sink.complete();
              }
            );
        });
    }
}

基本上,我們收集每個從 Flux 返回的單詞,並將其追加到 StringBuilder 中。當緩衝區大小達到至少 100 個字符的最小值時,我們將緩衝區作為塊發送給客户端。在流結束時,我們還會將剩餘的緩衝區作為最終塊發送。

現在,如果我們向修改後的 ChatService 發送聊天請求,我們將會看到服務器端事件的內容至少為 100 個字符,除非是最後一個塊:

6. 實時流式 JSON

為了以結構化格式流式傳輸聊天響應,我們可以使用換行符分隔的 JSON (NDJSON)。 NDJSON 是一種流式格式,其中每一行包含一個 JSON 對象,並且對象之間用換行符分隔。

要實現這一點,我們可以指示聊天模型通過添加系統提示,並附帶一個示例 JSON,以確保聊天模型完全理解所需的格式,並避免混淆:

@Component
public class ChatService {
    private final ChatClient chatClient;

    public ChatService(ChatModel chatModel) {
        this.chatClient = ChatClient.builder(chatModel)
          .build();
    }

    public Flux<String> chat(String prompt) {
        return chatClient.prompt()
          .system(systemMessage -> systemMessage.text(
            """
              Respond in NDJSON format.
              Each JSON object should contains around 100 characters.
              Sample json object format: {"part":0,"text":"Once in a small town..."}
            """))
          .user(userMessage -> userMessage.text(prompt))
          .stream()
          .content()
          .transform(this::toJsonChunk);
    }

    private Flux<String> toJsonChunk(Flux<String> tokenFlux) {
        return Flux.create(sink -> {
            StringBuilder buffer = new StringBuilder();
            tokenFlux.subscribe(
              token -> {
                  buffer.append(token);
                  int idx;
                  if ((idx = buffer.indexOf("\n")) >= 0) {
                      String line = buffer.substring(0, idx);
                      sink.next(line);
                      buffer.delete(0, idx + 1);
                  }
              },
              sink::error,
              () -> {
                  if (buffer.length() > 0) {
                      sink.next(buffer.toString());
                  }
                  sink.complete();
              }
            );
        });
    }
}

方法 toJsonChunk() 與上一節中的 toChunk() 類似。關鍵區別在於刷新策略。 它不是在緩衝區達到最小大小時刷新數據,而是當令牌中找到換行符時一次性刷新緩衝區內容。

讓我們再次發起一個聊天請求以查看結果:

我們可以看到每一行都是一個 JSON 對象,其格式遵循系統提示。JSON 廣泛受不同編程語言支持,這使得客户端在事件到達時輕鬆解析和消費。

7. 非流式傳輸 (Non-Streaming)

我們已經探討了不同類型的響應流式傳輸方法。現在,讓我們來了解一下傳統的非流式傳輸方法。

當使用 spring-boot-starter-web Maven 依賴關係返回同步聊天響應時,我們只需調用 ChatClientcall() 方法:

ChatClient chatClient = ...;
chatClient.prompt()
  .user(userMessage -> userMessage.text(prompt))
  .call()
  .content()

然而,如果我們使用 spring-boot-starter-webflux 依賴項執行相同的操作,將會得到以下異常:

org.springframework.web.client.ResourceAccessException: I/O error on POST request for "https://api.openai.com/v1/chat/completions": block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

這主要是因為 WebFlux 是非阻塞的,不允許使用阻塞操作,例如 call()

為了在 WebFlux 中實現相同的非流式響應,我們需要在 ChatClient 中調用 stream(),並將收集到的流組合成一個單一的響應:

@Component
public class ChatService {
    private final ChatClient chatClient;

    public ChatService(ChatModel chatMode) {
        this.chatClient = ChatClient.builder(chatModel)
          .build();
    }

    public Flux<String> chat(String prompt) {
        return chatClient.prompt()
          .user(userMessage -> userMessage.text(prompt))
          .stream()
          .content();
    }
}

在控制器中,我們需要將 Flux<String> 轉換為 Mono<String>,通過收集單詞並將其連接來實現:

@RestController
@Validated
public class ChatController {
    private final ChatService chatService;

    public ChatController(ChatService chatService) {
        this.chatService = chatService;
    }

    @PostMapping(value = "/chat")
    public Mono<String> chat(@RequestBody @Valid ChatRequest request) {
        return chatService.chat(request.getPrompt())
          .collectList()
          .map(list -> String.join("", list));
    }
}

採用這種方法,我們可以使用 WebFlux 的非阻塞模型返回非流式響應。

8. 結論

在本文中,我們探討了使用 Spring AI 的 ChatClient 實時流式傳輸聊天響應的不同方法。

這包括按單詞流式傳輸、按塊流式傳輸和按 JSON 流式傳輸。 通過這些技術,我們能夠顯著降低將聊天響應返回給客户端的延遲,並提升用户體驗。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.