博客 / 詳情

返回

通俗易懂:AI大模型基於SSE的實時流式響應技術原理和實踐示例

本文引用了後台技術匯一枚少年郎“大模型應用之:SSE流式響應”的內容,下文有修訂和重新排版。

1、引言
文章介紹了SSE(Server-Sent Events)技術在大模型流式響應中的應用,包括其發展歷程、ChatGPT流式輸出原理、SSE技術特點及與WebSocket的對比,並提供了兩種流式響應落地方案。
圖片
* 相關閲讀:《全民AI時代,大模型客户端和服務端的實時通信到底用什麼協議?》、
《大模型時代多模型AI網關的架構設計與實現》。

技術交流:

  • 移動端IM開發入門文章:《新手入門一篇就夠:從零開發移動端IM》
  • 開源IM框架源碼:https://github.com/JackJiang2011/MobileIMSDK(備用地址點此)
    (本文已同步發佈於:http://www.52im.net/thread-4856-1-1.html)

2、技術背景
當使用ChatGPT時,模型的回覆不是一次性生成整個回答的,而是逐字逐句地生成。

這是因為語言模型需要在每個時間步驟預測下一個最合適的單詞或字符。如果等待整個回覆生成後再輸出到網頁,會導致用户長時間等待,極大降低用户體驗。相反,逐字蹦出回覆可以實現更快的交互響應。

ChatGPT可以在輸入消息後迅速開始生成回答的開頭,並根據上下文逐漸細化回答。這種漸進式的呈現方式可以提供更流暢的對話體驗,同時讓用户知道模型正在工作,避免感覺像卡住了或沒有響應。

此外,逐字蹦出的回覆還有助於用户跟蹤模型的思考過程,看到它逐步構建回答的方式。這種可見的生成過程有助於用户理解模型是如何形成回答的,提高對話的透明度和可解釋性。

3、SSE的技術演進歷程從實驗性到標準化:
圖片
1)前身與痛點(2006年前) 早期Web依賴HTTP的請求-響應模式,實時性需求(如股票行情、IM聊天消息)只能通過輪詢或長輪詢實現,導致高延遲和資源浪費。Comet技術雖嘗試長連接方案,但實現複雜且兼容性差。
2)誕生與早期實踐(2006-2008) 2006年,Opera 9瀏覽器首次引入SSE作為實驗性技術,通過DOM事件實現服務器向客户端的單向推送。這一設計基於HTTP協議,避免了WebSocket的雙向通信複雜性,初步驗證了技術可行性。
3)標準化進程(2008-2014):
a. 2008年:SSE被正式納入HTML5草案,成為HTML5規範的一部分,定義了text/event-stream的MIME類型和事件流格式;
b. 2014年:隨HTML5成為W3C推薦標準,SSE獲得主流瀏覽器支持(除IE外),確立了其在Web實時通信中的地位。
4)生態爆發期(2022年後) 隨着ChatGPT等大模型應用興起,SSE因流式輸出特性成為大模型交互的首選協議,支持逐詞返回的“打字機效果”,推動技術進一步普及。

4、ChatGPT的流式輸出技術原理
我們看一下ChatGPT的completion API的completion API。

演示案例:

curl -i -X POST -H 'Content-Type: application/json' -H 'Authorization: Bearer sk-' [url=https://api.openai.com/v1/chat/completions]https://api.openai.com/v1/chat/completions[/url] -d '{"model":"gpt-3.5-turbo","messages":[{"role": "user", "content": "3+5=?"}],"temperature":0.8,"stream":true}'

結果如下:

HTTP/2 200

date: Fri, 08 Sep 2023 03:39:50 GMT

content-type: text/event-stream

access-control-allow-origin: *

cache-control: no-cache, must-revalidate

openai-organization: metaverse-cloud-pte-ltd-orfbgw

openai-processing-ms: 5

openai-version: 2020-10-01

strict-transport-security: max-age=15724800; includeSubDomains

x-ratelimit-limit-requests: 3500

x-ratelimit-limit-tokens: 90000

x-ratelimit-remaining-requests: 3499

x-ratelimit-remaining-tokens: 89980

x-ratelimit-reset-requests: 17ms

x-ratelimit-reset-tokens: 12ms

x-request-id: 96ff4efafed25a52fbedb6e5c7a3ab09

cf-cache-status: DYNAMIC

server: cloudflare

cf-ray: 80342aa96ae00974-HKG

alt-svc: h3=":443"; ma=86400

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"3"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" +"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"5"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" ="},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"8"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

小結:服務器返回的響應頭為 Content-Type: text/event-stream,數據塊以 data: 開頭,以 \n\n 分隔,最後以 [DONE] 標記結束。

5、SSE技術原理簡述
SSE (Server-Sent Events) 技術是一種用於實現服務器主動推送數據給客户端的通信協議。相比傳統的請求-響應模式,SSE 提供了一種持久連接,允許服務器隨時向客户端發送事件和數據,實現了實時性的消息傳遞。SSE 的工作原理非常簡單直觀。客户端通過與服務器建立一條持久化的 HTTP 連接,然後服務器使用該連接將數據以事件流(event stream)的形式發送給客户端。這些事件流由多個事件(event)組成,每個事件包含一個標識符、類型和數據字段。客户端通過監聽事件流來獲取最新的數據,並在接收到事件後進行處理。SSE 數據交互示意圖:
圖片
與 WebSocket 技術相比,SSE 使用的是基於 HTTP 的長輪詢機制,而不需要建立全雙工的網絡連接。這使得 SSE 更容易在現有的基礎設施上部署,無需特殊的代理或中間件支持。另外,SSE 能夠與現有的 Web 技術(如 AJAX 和 RESTful API)很好地集成,同時也更適合傳輸較少頻繁更新的數據。
SSE 的優點:
1)實時性:SSE 允許服務器主動將數據推送給客户端,實現實時更新和通知;
2)簡單易用:SSE 基於標準的 HTTP 協議,無需額外的庫或協議轉換;
3)可靠性:SSE 使用 HTTP 連接,兼容性好,並能通過處理連接斷開和錯誤情況來確保數據傳輸的可靠性;
4)輕量級:與 WebSocket 相比,SSE 不需要建立全雙工連接,減少了通信的開銷和服務器負載。
SSE 的弊端:
1)單向通信: SSE 是單向通信的,只能由服務器向客户端發送數據,無法實現雙向通信;
2)兼容性: SSE 不被一些老舊的瀏覽器支持,而且在某些情況下可能受到瀏覽器連接數限制;
3)無法跨域: SSE 受同源策略的限制,無法直接在跨域情況下使用,需要使用 CORS 等方法解決跨域問題。
綜上所述:SSE 技術提供了一種簡單、實時的服務器推送數據給客户端的方法,適用於需要實現實時更新和通知的應用場景。它在 Web 開發中具有廣泛的應用,可用於構建聊天應用、實時監控系統等,併為開發人員帶來便利和靈活性。但在需要雙向通信、跨域支持或更復雜的實時應用中,WebSocket 技術可能更為適用。(更多SSE技術資料請閲讀《SSE技術詳解:一種全新的HTML5服務器推送事件技術》)

6、與WebSocket對比
WebSocket 是 HTML5 引入的 全雙工通信協議,允許客户端和服務器之間保持持久連接,實現低延遲的雙向通信(詳情可閲讀《WebSocket從入門到精通,半小時就夠!》)。
WebSocket 特點:
1)全雙工通信:客户端和服務器都可以主動發送數據;
2)低延遲:連接建立後,數據交換無需額外的 HTTP 頭部,提高通信效率;
3)支持二進制數據:可以發送文本(JSON)和二進制數據(Blob、ArrayBuffer);
4)需要握手:使用 HTTP 進行 Upgrade: websocket 協商,建立 WebSocket 連接。

適用場景:
1)在線聊天應用(如 IM);
2)實時遊戲(如在線對戰);
3)股票行情推送;
4)直播彈幕。WebSocket數據交互示意圖:
圖片
WebSocket 與 SSE 對比總結:
圖片

7、流式響應落地示例
1)使用框架接受流式響應:LanghChain的stream接口。
async def _async_stream_with_custom_tokenizer(self, request: Request,

                                                   langchain,

                                                   prompt:str="",

                                                   history_messages: List[Message] = None):

"""

異步非阻塞版,區別 同步阻塞版(_generate_event_stream_with_custom_tokenizer)

"""

total_stream_content = ""

async for stream_content in langchain.astream({}):

    if await request.is_disconnected():

        logger.warning(f"[generate_event_stream] "

                       f", [trace_id] = {trace_id_context.get()}"

                       f", gateway connection abort..")

        break

    if isinstance(stream_content, str):

        content = stream_content

        total_stream_content += content

    elif isinstance(stream_content, AIMessageChunk):

        content = stream_content.content

        total_stream_content += content

    else:

        logger.error(f"[generate_event_stream] "

                     f", [trace_id] = {trace_id_context.get()}"

                     f", unexpected stream_content type: {type(stream_content)}")

        break

    # print(f"[custom_tokenizer] langchain stream response: {stream_content}")

    # 提取token統計信息

    usage = None

    if (stream_content.response_metadata is not None

            and (stream_content.response_metadata.get('finish_reason') == 'stop'

                 or stream_content.response_metadata.get('done_reason') == 'stop')):

        # hunyuan/azure_openai

        # logger.debug(f"=====> finish stream response, signal = {stream_content.response_metadata.get('finish_reason')}")

        # logger.debug(f"=====> finish stream response, signal = {stream_content.response_metadata.get('done_reason')}")

        if usage is None:

            token_usage = TokenTracker(self.model).track_full_token_usage(

                input_text=prompt,

                output_text=total_stream_content,

                context=history_messages

            )

            usage = self._get_token_usage(self.model, token_usage)

    resp = GenerateResponse(code=AiErrorCode.SUCCESS.value["code"],

                            message=AiErrorCode.SUCCESS.value["message"],

                            resp=content,

                            token_usage=usage)

    yield resp.to_string()

2)自行拆包整合算法,處理流式響應:使用基礎的python庫完成網絡數據讀取。

需要注意的是,緩衝區管理:

1)cache_raw_data:存儲未處理的二進制數據塊,避免因網絡分片導致的數據截斷;
2)buffer:暫存已解碼但未完整解析的文本數據(如SSE的 data: 前綴或JSON片段)。

async def _handle_stream_response(self,

                        resp,

                        prompt: str = None,

                        history_messages: List[Message] = None,

                        model:str=None):

# 全量數據包響應 & 單個數據包響應

total_stream_content = ""

usage = None

buffer = ""

cache_raw_data = b''

cache_raw_data_enable = False

# 分塊讀取

for stream_response in resp.iter_content(chunk_size=100):

    # 解碼響應(可能因分塊邊界截斷UTF-8字符)

    origin_content = ""

    try:

        if cache_raw_data_enable:

            cache_raw_data += stream_response

            # 嘗試UTF-8解碼

            origin_content = cache_raw_data.decode('utf-8')

            # 每次成功解碼後自動清理緩存

            cache_raw_data = b''

        else :

            # 嘗試UTF-8解碼

            origin_content = stream_response.decode('utf-8')

        cache_raw_data_enable = False

    except UnicodeDecodeError:

        logger.error(f"extract_content, data chunk decode error, trace_id = {trace_id_context.get()}, origin data = {stream_response}")

        # 方案1:容錯處理(有亂碼字符輸出,影響用户體驗)

        # origin_content = stream_response.decode('utf-8', errors='replace')

        # 方案2:解碼失敗,緩存數據,緩存數據包待處理

        logger.debug(f"extract_content, cache_raw_data_enable= {cache_raw_data_enable}, cache_raw_data = {cache_raw_data}")

        cache_raw_data += stream_response

        cache_raw_data_enable = True

        # 跳過後續處理,等待下一塊數據

        continue

    logger.debug(f"extract_content, trace_id = {trace_id_context.get()}, origin data = {origin_content}")

    buffer += origin_content

    while True:

        # SSE協議:定位兩個連續換行符,標識事件結束

        idx = buffer.find('\n\n')

        if idx == -1:

            break

        event_data = buffer[:idx]

        # 移除已處理數據

        buffer = buffer[idx + 2:]

        # 處理事件數據中的每一行

        for line in event_data.split('\n'):

            line = line.strip()

            if not line.startswith('data:'):

                continue

            # 移除"data:" or "data: "(這裏的data:,後面可能跟1個或0個空格,eg,deepseek是沒有空格,而azureopenai又有空格,這裏做兼容)

            data_str = line

            if line.startswith('data: '):

                data_str = line[6:]

            elif line.startswith('data:'):

                data_str = line[5:]

            if data_str == '[DONE]':

                # 2.1 自定義token計數器

                token_usage = TokenTracker(model_name=model).track_full_token_usage(

                    input_text=prompt,

                    output_text=total_stream_content,

                    context=history_messages

                )

                usage = super()._get_token_usage(model=model, usage=token_usage)

                # 2.1 拼接最終結果

                res = GenerateResponse(code=AiErrorCode.SUCCESS.value["code"],

                                       message=AiErrorCode.SUCCESS.value["message"],

                                       resp=None,

                                       token_usage=usage)

                logger.debug(f"finish stream, trace_id = {trace_id_context.get()}, token data = {usage}")

                yield res.to_string()

            else:

                try:

                    # 解析JSON數據

                    data = json.loads(data_str)

                    # 提取delta中的content

                    if 'choices' in data:

                        for choice in data['choices']:

                            delta = choice.get('delta', {})

                            content = delta.get('content')

                            if content is not None:

                                total_stream_content += content

                                # 3.8 拼接最終結果

                                res2 = GenerateResponse(code=AiErrorCode.SUCCESS.value["code"],

                                                        message=AiErrorCode.SUCCESS.value["message"],

                                                        resp=content,

                                                        token_usage=usage)

                                logger.debug(f"解析一個數據包數據完成, trace_id = {trace_id_context.get()}, origin data = {content}")

                                yield res2.to_string()

                except json.JSONDecodeError:

                    pass # 忽略無效JSON數據

8、本文小結
圖片
實際很多大模型接入的商用場景,並非採用標準化的api-key/base-url的配置化方法,因為出於數據安全因素,大模型服務商並不採用雲服務接入方法。如果要進行合作對接,進行類似的API接入和手動的數據拆包是大概率的事情。

9、參考資料
[1] Web端即時通訊技術盤點:短輪詢、Comet、Websocket、SSE

[2] SSE技術詳解:一種全新的HTML5服務器推送事件技術

[3] 使用WebSocket和SSE技術實現Web端消息推送

[4] 詳解Web端通信方式的演進:從Ajax、JSONP 到 SSE、Websocket

[5] 使用WebSocket和SSE技術實現Web端消息推送

[6] 一文讀懂前端技術演進:盤點Web前端20年的技術變遷史

[7] WebSocket從入門到精通,半小時就夠!

[8] 網頁端IM通信技術快速入門:短輪詢、長輪詢、SSE、WebSocket

[9] 搞懂現代Web端即時通訊技術一文就夠:WebSocket、socket.io、SSE

[10] 大模型時代多模型AI網關的架構設計與實現

[11] 全民AI時代,大模型客户端和服務端的實時通信到底用什麼協議?

[12] Web端實時通信技術SSE在攜程機票業務中的實踐應用

(本文已同步發佈於:http://www.52im.net/thread-4856-1-1.html)

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.