1. 概述
在標準的 REST 響應中,服務器會等待收到整個 payload 之後再將其返回給客户端。然而,大型語言模型 (LLM) 通常以 token 為單位生成輸出,產生完整響應需要花費大量時間。
這導致了等待完整響應的延遲,尤其是在輸出涉及大量 token 的情況下。流式響應通過分塊發送數據來解決這個問題。
在本教程中,我們將探索如何使用 Spring AI 的 ChatClient 返回流式聊天響應,而不是一次性發送整個響應。
2. Maven 依賴
讓我們首先將 Spring AI OpenAI 依賴 添加到我們的 <em >pom.xml</em >> 中:
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-openai</artifactId>
<version>1.0.2</version>
</dependency>我們需要一個 Web 容器來演示聊天響應流式傳輸。我們可以選擇以下依賴項:spring-boot-starter-web 或 spring-boot-starter-webflux。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>3. 常用組件
在探索不同的流式傳輸方法之前,讓我們為後續章節創建一個常用組件。<em>ChatRequest</em> 類包含我們 API 調用中的報負載。
public class ChatRequest {
@NotNull
private String prompt;
// constructor, getter and setter
}對於以下章節,我們將向我們的端點發送以下聊天請求。這樣做是故意為的,目的是讓聊天模型產生一個冗長的響應,以便我們展示流式傳輸。
{
"prompt": "Tell me a story about a girl loves a boy, around 250 words"
}現在,我們已經準備就緒,可以繼續探索不同的流式傳輸方法。
4. 實時流式傳輸
為了提供更逼真的體驗,我們不想在等待完整響應之前將其返回給客户端。我們可以將響應實時流式傳輸給客户端。Spring AI 默認情況下,會逐字逐句地將聊天響應流式傳輸。
讓我們創建一個 <em >ChatService</em > 以啓用從 <em >ChatClient</em > 中流式傳輸聊天響應。關鍵在於我們調用 <em >stream()</em > 並將響應作為 <em >Flux<String></em >> 返回。
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatModel) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.stream()
.content();
}
}啓用聊天響應流式傳輸有2個條件。首先,REST控制器必須返回一個 Flux<String> 對象。其次,響應內容類型必須設置為 text/event-stream:。
@RestController
@Validated
public class ChatController {
private final ChatService chatService;
public ChatController(ChatService chatService) {
this.chatService = chatService;
}
@PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestBody @Valid ChatRequest request) {
return chatService.chat(request.getPrompt());
}
}
現在,一切都已準備就緒。我們可以啓動 Spring Boot 應用程序,並使用 Postman 向 REST 端點發送聊天請求。
執行後,我們可以看到 Postman 中顯示的響應體是行優先,其中每一行都是服務器端事件。
從響應中,我們可以看到 Spring AI 以逐字流式傳輸響應。 這允許客户端立即開始消耗結果,無需等待完整響應。 這種方式提供了極低的延遲,讓用户感覺就像實時輸入。
5. 逐塊流式傳輸
儘管逐詞流式傳輸具有很高的響應速度,但它也可能顯著增加開銷。
我們可以通過將詞語彙集在一起形成更大的塊,並返回該塊,而不是單個詞語來降低開銷。 這樣可以使流式傳輸更高效,同時保留漸進式流式傳輸的體驗。
我們可以修改我們的 chat() 方法,並調用 transform 方法於 Flux
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatModel) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.stream()
.content()
.transform(flux -> toChunk(flux, 100));
}
private Flux<String> toChunk(Flux<String> tokenFlux, int chunkSize) {
return Flux.create(sink -> {
StringBuilder buffer = new StringBuilder();
tokenFlux.subscribe(
token -> {
buffer.append(token);
if (buffer.length() >= chunkSize) {
sink.next(buffer.toString());
buffer.setLength(0);
}
},
sink::error,
() -> {
if (buffer.length() > 0) {
sink.next(buffer.toString());
}
sink.complete();
}
);
});
}
}基本上,我們收集每個從 Flux 返回的單詞,並將其追加到 StringBuilder 中。當緩衝區大小達到至少 100 個字符的最小值時,我們將緩衝區作為塊發送給客户端。在流結束時,我們還會將剩餘的緩衝區作為最終塊發送。
現在,如果我們向修改後的 ChatService 發送聊天請求,我們將會看到服務器端事件的內容至少為 100 個字符,除非是最後一個塊:
6. 實時流式 JSON
為了以結構化格式流式傳輸聊天響應,我們可以使用換行符分隔的 JSON (NDJSON)。 NDJSON 是一種流式格式,其中每一行包含一個 JSON 對象,並且對象之間用換行符分隔。
要實現這一點,我們可以指示聊天模型通過添加系統提示,並附帶一個示例 JSON,以確保聊天模型完全理解所需的格式,並避免混淆:
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatModel) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.system(systemMessage -> systemMessage.text(
"""
Respond in NDJSON format.
Each JSON object should contains around 100 characters.
Sample json object format: {"part":0,"text":"Once in a small town..."}
"""))
.user(userMessage -> userMessage.text(prompt))
.stream()
.content()
.transform(this::toJsonChunk);
}
private Flux<String> toJsonChunk(Flux<String> tokenFlux) {
return Flux.create(sink -> {
StringBuilder buffer = new StringBuilder();
tokenFlux.subscribe(
token -> {
buffer.append(token);
int idx;
if ((idx = buffer.indexOf("\n")) >= 0) {
String line = buffer.substring(0, idx);
sink.next(line);
buffer.delete(0, idx + 1);
}
},
sink::error,
() -> {
if (buffer.length() > 0) {
sink.next(buffer.toString());
}
sink.complete();
}
);
});
}
}方法 toJsonChunk() 與上一節中的 toChunk() 類似。關鍵區別在於刷新策略。 它不是在緩衝區達到最小大小時刷新數據,而是當令牌中找到換行符時一次性刷新緩衝區內容。
讓我們再次發起一個聊天請求以查看結果:
我們可以看到每一行都是一個 JSON 對象,其格式遵循系統提示。JSON 廣泛受不同編程語言支持,這使得客户端在事件到達時輕鬆解析和消費。
7. 非流式傳輸 (Non-Streaming)
我們已經探討了不同類型的響應流式傳輸方法。現在,讓我們來了解一下傳統的非流式傳輸方法。
當使用 spring-boot-starter-web Maven 依賴關係返回同步聊天響應時,我們只需調用 ChatClient 的 call() 方法:
ChatClient chatClient = ...;
chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.call()
.content()然而,如果我們使用 spring-boot-starter-webflux 依賴項執行相同的操作,將會得到以下異常:
org.springframework.web.client.ResourceAccessException: I/O error on POST request for "https://api.openai.com/v1/chat/completions": block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3這主要是因為 WebFlux 是非阻塞的,不允許使用阻塞操作,例如 call()。
為了在 WebFlux 中實現相同的非流式響應,我們需要在 ChatClient 中調用 stream(),並將收集到的流組合成一個單一的響應:
@Component
public class ChatService {
private final ChatClient chatClient;
public ChatService(ChatModel chatMode) {
this.chatClient = ChatClient.builder(chatModel)
.build();
}
public Flux<String> chat(String prompt) {
return chatClient.prompt()
.user(userMessage -> userMessage.text(prompt))
.stream()
.content();
}
}在控制器中,我們需要將 Flux<String> 轉換為 Mono<String>,通過收集單詞並將其連接來實現:
@RestController
@Validated
public class ChatController {
private final ChatService chatService;
public ChatController(ChatService chatService) {
this.chatService = chatService;
}
@PostMapping(value = "/chat")
public Mono<String> chat(@RequestBody @Valid ChatRequest request) {
return chatService.chat(request.getPrompt())
.collectList()
.map(list -> String.join("", list));
}
}採用這種方法,我們可以使用 WebFlux 的非阻塞模型返回非流式響應。
8. 結論
在本文中,我們探討了使用 Spring AI 的 ChatClient 實時流式傳輸聊天響應的不同方法。
這包括按單詞流式傳輸、按塊流式傳輸和按 JSON 流式傳輸。 通過這些技術,我們能夠顯著降低將聊天響應返回給客户端的延遲,並提升用户體驗。