本文不贅述具體概念,通過具體案例效果,學習sse (Server-Sent Events)的具體實現,以react框架為例
SSE具體應用場景
SSE(Server-Sent Events,服務器推送事件)是一種基於 HTTP 的單向實時通信協議,核心特點是服務器主動向客户端推送數據,客户端僅被動接收,無需頻繁輪詢,且天然支持斷線重連、事件標識等特性。其應用場景主要集中在 “服務器需主動向客户端推送實時數據,且客户端無需向服務器發送高頻請求” 的場景
具體場景
- 服務器監控到系統異常如負載過高、數據庫連接失敗等,通過SSE將告警信息推送給運維人員的管理後台界面(物聯網設備故障也可以通過SSE進行告警信息的推送)
- 審批類:業務系統中審批流程通過或者駁回時,向申請人推送審批結果通知
- 消息推送類:用户收到新私信、點贊、評論、關注請求時,服務器通過 SSE 將通知推送給對應用户的客户端,實現實時提醒
- 訂單狀態變更(如商家接單、快遞攬收、派送中)時,通過 SSE 推送狀態通知;商品降價、補貨時,向訂閲該商品的用户推送提醒
- 等等...
總而言之,就是及時推送消息,無需用户手動刷新獲取最新數據
效果圖
- 本文給到的效果示例
- 對應線上演示效果地址:https://ashuai.site/reactExamples/sse
- github倉庫:https://github.com/shuirongshuifu/react-examples
需求場景邏輯流程
- 有一段文章,當用户點擊開始接收按鈕的時候
- 前端需要使用
new EventSource去向後端的sse接口建立鏈接 -
鏈接建立以後,後端就會讀取逐字逐段地去掃描文章文本,然後開始發送message消息事件
- 假設我們的文章是
const article = 亮答曰:"自董卓以來,豪傑並起..." - 單純的message消息事件有些籠統,我們可以把消息事件自定義細分成為消息開始start事件、消息傳輸chunk事件、消息發送完畢end事件
- 這樣控制能夠更好地讓前端區分,做對應的UI操作
- 假設我們的文章是
-
每個消息中,會帶着一段文章文本字符串,交給前端去處理
- 實際上,如果是單純的文本回復需求,前端直接拿到
chunk事件中的文本字符串即可使用 - 但是,如果回覆的內容中除了純文本之外,還要回復鏈接、圖片、甚至代碼等類型的話
- 剛剛提到的
chunk事件就不夠用了,所以就可以新增一些事件類型 - 比如
chunk事件裏面存放純文本 imgChunk事件裏面存放圖片linkChunk事件裏面存放鏈接等- 大家可以根據自己的業務需求,自定義很多的事件類型
- 無論是什麼事件類型,前端都可以通過
eventSource監聽到
- 實際上,如果是單純的文本回復需求,前端直接拿到
- 前端可以監聽到對應事件後,不斷地把信息渲染到UI視圖上(實現光標跟隨的打字機效果)
前端監聽對應自定義事件類型比如:
// 創建SSE連接
const eventSource = new EventSource('https://ashuai.site/fastify-api/sse/article/66')
// 監聽後端自定義的start事件
eventSource.addEventListener('start', (event) => {
console.log('SSE開始:', event)
})
// 監聽後端自定義的chunk事件
eventSource.addEventListener('chunk', (event) => {
console.log('接收到的數據:', event)
})
// 監聽後端自定義的end事件
eventSource.addEventListener('end', (event) => {
console.log('SSE結束:', event)
})
// 監聽後端自定義的error事件
eventSource.addEventListener('error', (event) => {
console.error('SSE錯誤:', event)
})
因為原生事件有些籠統,不便於前端更精細化控制
const eventSource = new EventSource('https://ashuai.site/fastify-api/sse/article/66')
eventSource.onmessage = (event) => { }
eventSource.onerror = (event) => { }
這個接口https://ashuai.site/fastify-api/sse/article/66大家可以直接使用,可以直接將其複製粘貼到地址欄並回車,或者使用curl命令
後端代碼
後端使用fastify框架,搭配fastify-sse-v2這個sse的包
Controller層
- Controller負責接收用户發送的http的Request請求
- 並處理請求參數與數據校驗
- 校驗通過後,會調用Service層進行業務邏輯處理,
- 處理Service層返回結果並構造響應Response異常處理
const createSseService = require('../services/sseService')
const ResponseUtils = require('../utils/response')
const { validateId } = require('../utils/validation')
// 根據ID獲取對應文章(SSE分段輸出)
const getArticleById = async (request, reply) => {
try {
const { id } = request.params
// 參數校驗
const idValidation = validateId(id)
if (!idValidation.isValid) {
return ResponseUtils.sendValidationError(reply, 'ID參數無效', idValidation.errors)
}
const sseService = createSseService()
// 設置自定義響應頭
reply.header('hello', 'world')
// 定義處理每個數據塊的回調函數
const handleChunk = async (chunk, isLast) => {
reply.sse({
event: chunk.type,
data: JSON.stringify(chunk.data)
})
}
// 調用service層的流式傳輸方法
const result = await sseService.streamArticleWithSSE(
idValidation.data,
handleChunk,
{ chunkSize: 10, delayMs: 500 }
)
if (result === null) {
return ResponseUtils.sendNotFound(reply, '文章不存在')
}
} catch (error) {
request.log.error(error)
// SSE錯誤處理
reply.sse({
event: 'error',
data: JSON.stringify({
code: 500,
message: '查詢失敗',
error: error.message
})
})
}
}
Service層
- Service層主要是處理核心業務邏輯
- 協調數據訪問,就是處理數據
- 比如調用DAO層(或Repository層) 來完成mysql數據查詢
- 並進行數據處理與轉換等
- 所以sse數據的核心加工處理在這一層
假設我們的文章是這個
const article = `亮答曰:"自董卓以來,豪傑並起,跨州連郡者不可勝數。曹操比於袁紹,則名微而眾寡。然操遂能克紹,以弱為強者,非惟天時,抑亦人謀也。
今操已擁百萬之眾,挾天子而令諸侯,此誠不可與爭鋒。孫權據有江東,已歷三世,國險而民附,賢能為之用,此可以為援而不可圖也。
荊州北據漢、沔,利盡南海,東連吳會,西通巴、蜀,此用武之國,而其主不能守,此殆天所以資將軍,將軍豈有意乎?
益州險塞,沃野千里,天府之土,高祖因之以成帝業。劉璋闇弱,張魯在北,民殷國富而不知存恤,智能之士思得明君。
將軍既帝室之胄,信義著於四海,總攬英雄,思賢如渴,若跨有荊、益,保其巖阻,西和諸戎,南撫夷越,外結好孫權,內修政理;
天下有變,則命一上將將荊州之軍以向宛、洛,將軍身率益州之眾出於秦川,百姓孰敢不簞食壺漿以迎將軍者乎?誠如是,則霸業可成,漢室可興矣。"`
對應Service層代碼
const createSseService = () => {
// 根據前端參數ID獲取對應文章(這裏模擬從數據庫撈取數據)
const getArticleById = (id) => {
return { id, article, // 直接使用上述文章 }
}
/**
* SSE流式傳輸文章
* @param {number} id - 文章ID
* @param {Function} onChunk - 處理每個數據塊的回調函數 (chunk, isLast) => void
* @param {Object} options - 配置選項
* @param {number} options.chunkSize - 每個分段的字符數,默認10
* @param {number} options.delayMs - 每個分段之間的延遲時間(毫秒),默認500
* @returns {Promise<boolean|null>} 成功返回true,文章不存在返回null
*/
const streamArticleWithSSE = async (id, onChunk, options = {}) => {
const { chunkSize = 10, delayMs = 500 } = options
const articleData = getArticleById(id)
const { article } = articleData
// 發送開始事件
await onChunk({
type: 'start',
data: {
id: articleData.id,
totalLength: article.length
}
}, false)
// 等待延遲後開始發送內容
await new Promise(resolve => setTimeout(resolve, delayMs))
// 分段發送文章內容
let currentIndex = 0
const totalChunks = Math.ceil(article.length / chunkSize)
while (currentIndex < article.length) {
const chunk = article.slice(currentIndex, currentIndex + chunkSize)
await onChunk({
type: 'chunk',
data: {
index: Math.floor(currentIndex / chunkSize),
content: chunk,
progress: Math.round(((currentIndex + chunkSize) / article.length) * 100)
}
}, false)
currentIndex += chunkSize
// 如果還有下一段,等待延遲
if (currentIndex < article.length) {
await new Promise(resolve => setTimeout(resolve, delayMs))
}
}
// 發送結束事件
await onChunk({
type: 'end',
data: {
totalChunks,
message: '文章發送完成'
}
}, true)
return true
}
// 返回所有方法
return { getArticleById, streamArticleWithSSE }
}
module.exports = createSseService
這樣的話,我們就有了一個sse的接口了
Nginx設置對應響應頭
注意,sse需要設置特殊的響應頭,這裏我們使用nginx代理,可以配置如下
# SSE專用配置,接口調用
location /fastify-api/sse/ {
proxy_pass http://localhost:33333/fastify/sse/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-Host $server_name;
# SSE特殊配置
proxy_buffering off; #避免阻塞流式輸出
proxy_cache off; #防止緩存干擾實時數據
proxy_set_header Connection ""; #維持長連接
# SSE超時
proxy_connect_timeout 30s;
proxy_send_timeout 30s;
proxy_read_timeout 86400s; # 24小時,適合SSE
}
前端
光標跟隨實現
- 如果只是展示普通的文本,那麼可以創建一個元素
- 將這個元素擺放在文字的最後
- 通過動畫的方式,模擬出來光標閃爍的效果
- 如下
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
<style>
.cursor {
display: inline-block;
width: 2px;
height: 1.2em;
background-color: #1677ff;
margin-left: 2px;
/* 光標元素的底部與父元素的文本底部對齊 */
vertical-align: text-bottom;
animation: blink 1s infinite;
}
@keyframes blink {
0%,
50% {
opacity: 1;
}
51%,
100% {
opacity: 0;
}
}
</style>
</head>
<body>
<p>你好,這個世界 <span class="cursor"></span> </p>
</body>
</html>
效果圖
- 如果要展示覆雜的帶層級的結構,比如代碼、鏈接等,就要通過js去動態控制
- 找到最後一個文本接口、追加一個問題,並獲取文字的位置,再設置光標到文字為止
可以參考這個視頻的實現方案:https://www.douyin.com/video/7553576140888116516
使用Promise.resolve()的鏈式調用,實現隊列方案
案例效果中,因為是不斷地完成打字機渲染數據的效果,所以我們要確保上一次渲染完成後,再執行下一次的打字機渲染,這裏我們採用Promise.resolve()的鏈式調用,實現隊列方案(保證順序)
案例分析,假設我們要發送五個請求(通過id傳參的形式區分),下方的寫法,無法保證輸入的結果,對應的是參數 1 2 3 4 5的結果
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script src="https://cdn.bootcdn.net/ajax/libs/axios/1.3.0/axios.js"></script>
</head>
<body>
<script>
const base = 'https://ashuai.work/api/xyj?id='
const ids = ['1', '2', '3', '4', '5']
ids.forEach(async (id) => {
const res = await axios.get(`${base}${id}`)
console.log('res', res.data.data)
})
</script>
</body>
</html>
但是,如果我們使用使用Promise的鏈式調用,就能夠實現通過.then()實現入隊操作,而Promise會自動執.then的代碼做到自動出隊的效果,如下
<script>
const base = 'https://ashuai.work/api/xyj?id='
const ids = ['1', '2', '3', '4', '5']
// 初始化一個"空的已完成Promise"作為起點
let promiseChain = Promise.resolve()
// 用forEach遍歷
ids.forEach(id => {
// 將新任務追加到Promise鏈的末尾(變量重新指向新創建的Promise)
promiseChain = promiseChain.then(async () => {
const res = await axios.get(`${base}${id}`)
console.log('res', res.data.data)
return res.data.data
})
})
</script>
或者使用for of也行,也能保證順序
const base = 'https://ashuai.work/api/xyj?id='
const ids = ['1', '2', '3', '4', '5']
async function fetchSequentially() {
for (const id of ids) {
const res = await axios.get(`${base}${id}`)
console.log('res', res.data.data)
}
console.log('全部完成')
}
// 調用函數
fetchSequentially()
完整前端代碼
前端控制流程
用户點擊"開始接收"
↓
startSSE() - 重置狀態 + 建立連接
├─ articleText='', progress=0
├─ isStreaming=true (顯示光標█)
└─ new EventSource(url)
↓
━━━━ 服務器: start 事件 ━━━━
↓
console.log('SSE開始')
↓
━━━━ 服務器: chunk 事件 (多次) ━━━━
↓
chunk1 到達 → Promise鏈
└─ typeWriter("文本1")
└─ 逐字顯示 (50ms/字) ⏰
└─ 完成後 setProgress(20)
↓
chunk2 到達 → Promise鏈 (等待chunk1完成)
└─ typeWriter("文本2")
└─ 逐字顯示 (50ms/字) ⏰
└─ 完成後 setProgress(40)
↓
chunk3 到達 → Promise鏈 (等待chunk2完成)
└─ typeWriter("文本3")
└─ 逐字顯示 (50ms/字) ⏰
└─ 完成後 setProgress(100)
↓
━━━━ 服務器: end 事件 ━━━━
↓
Promise鏈 (等待所有typeWriter完成)
└─ setIsStreaming(false) (隱藏光標)
└─ eventSource.close()
└─ 完成 ✓
注意,這裏是使用Promise鏈式調用保證順序的
【Promise 鏈保證順序】
服務器快速發送:
chunk1 → chunk2 → chunk3 → end
前端執行順序:
typeWriter(chunk1) ✓
↓ 等待完成
typeWriter(chunk2) ✓
↓ 等待完成
typeWriter(chunk3) ✓
↓ 等待完成
隱藏光標 ✓
核心: promiseChainRef 確保串行執行,不會亂序
React代碼
import React, { useState, useEffect, useRef } from 'react'
import { Button, Progress } from 'antd'
import './Sse.css'
export default function Sse() {
const [articleText, setArticleText] = useState('') // 文章內容
const currentTextRef = useRef('') // 存儲打字機效果中累積的文本內容
const [progress, setProgress] = useState(0) // sse返回的進度
const [isStreaming, setIsStreaming] = useState(false) // 是否正在接收
const eventSourceRef = useRef(null) // sse鏈接實例
const typeWriterTimerRef = useRef(null) // 定時器的id,用於清理定時器
const promiseChainRef = useRef(Promise.resolve()) // Promise鏈
// 清理函數
useEffect(() => {
return () => {
// 組件卸載清理關閉sse連接和定時器
if (eventSourceRef.current) {
eventSourceRef.current.close()
}
if (typeWriterTimerRef.current) {
clearInterval(typeWriterTimerRef.current)
}
}
}, [])
// 打字機效果返回Promise,確保順序執行
const typeWriter = (text) => {
return new Promise((resolve) => {
let index = 0
const interval = setInterval(() => {
// 組件被卸載了,停止打字機效果
if (!typeWriterTimerRef.current) {
clearInterval(interval)
return
}
// 定時器循環賦值刷新UI,直到賦值完成,再執行resolve
if (index < text.length) {
currentTextRef.current += text[index]
setArticleText(currentTextRef.current)
index++
} else {
clearInterval(interval)
typeWriterTimerRef.current = null
resolve()
}
}, 50)
typeWriterTimerRef.current = interval
})
}
// 開始SSE連接
const startSSE = () => {
if (eventSourceRef.current) {
eventSourceRef.current.close()
}
// 重置狀態
currentTextRef.current = ''
setArticleText('')
setProgress(0)
setIsStreaming(true)
promiseChainRef.current = Promise.resolve() // 重置Promise鏈
// 創建SSE連接
const eventSource = new EventSource('https://ashuai.site/fastify-api/sse/article/66')
eventSourceRef.current = eventSource
// 監聽後端自定義的start事件
eventSource.addEventListener('start', (event) => {
const data = JSON.parse(event.data)
console.log('SSE開始:', data)
})
// 監聽後端自定義的chunk事件
eventSource.addEventListener('chunk', (event) => {
const data = JSON.parse(event.data)
console.log('接收到的數據:', data)
// 鏈式調用:等前一個完成後再執行,這樣能夠確保前一句話完成,再執行下一句話
promiseChainRef.current = promiseChainRef.current
.then(() => typeWriter(data.content))
.then(() => setProgress(data.progress))
})
// 監聽後端自定義的end事件
eventSource.addEventListener('end', (event) => {
const data = JSON.parse(event.data)
console.log('SSE結束:', data)
// 等待所有Promise鏈完成後再設置isStreaming為false
// 防止see結束了,文字還沒有打完,光標消失了
promiseChainRef.current = promiseChainRef.current.then(() => {
setIsStreaming(false)
})
// 關閉連接
eventSource.close()
eventSourceRef.current = null
})
// 監聽後端自定義的error事件
eventSource.addEventListener('error', (event) => {
console.error('SSE錯誤:', event)
setIsStreaming(false)
if (eventSourceRef.current) {
eventSourceRef.current.close()
eventSourceRef.current = null
}
// 也可以加上定時器重連機制
})
}
// 取消SSE連接
const cancelSSE = () => {
if (eventSourceRef.current) {
eventSourceRef.current.close()
eventSourceRef.current = null
}
if (typeWriterTimerRef.current) {
clearInterval(typeWriterTimerRef.current)
typeWriterTimerRef.current = null
}
setIsStreaming(false)
promiseChainRef.current = Promise.resolve()
}
return (
<div style={{ width: '720px' }}>
<div className="article-container">
<div className="article-text">
{articleText}
{/* 當正在接收時,使用span元素模擬鼠標光標 */}
{isStreaming && <span className="cursor"></span>}
</div>
</div>
<Progress percent={progress} />
<Button
type="primary"
onClick={startSSE}
disabled={isStreaming}
>
{isStreaming ? '正在接收...' : '開始接收'}
</Button>
<Button onClick={cancelSSE} disabled={!isStreaming}>{isStreaming ? '取消鏈接' : '未曾鏈接'}</Button>
</div>
)
}
Css代碼
.article-container {
background: #f7fafc;
border-radius: 8px;
padding: 20px;
border-left: 4px solid #1677ff;
min-height: 240px;
}
.article-text {
font-size: 16px;
line-height: 1.8;
color: #2d3748;
white-space: pre-wrap;
word-wrap: break-word;
}
.cursor {
display: inline-block;
width: 2px;
height: 1.2em;
background-color: #1677ff;
margin-left: 2px;
/* 光標元素的底部與父元素的文本底部對齊 */
vertical-align: text-bottom;
animation: blink 1s infinite;
}
@keyframes blink {
0%,
50% {
opacity: 1;
}
51%,
100% {
opacity: 0;
}
}
原生new EventSource的不足之處
- 原生
new EventSource主要有以下不足之處 - 首先無法自定義請求頭,比如我想要在接口的請求頭中添加
'Authorization': 'myToken'是不好控制的,畢竟可能業務中接口要有鑑權才行 - 然後,僅支持get方法,post不能用,參數只能通過 URL 傳遞,比如上述筆者提供的接口,就是通過get請求的params拿到的參數,即
new EventSource('https://ashuai.site/fastify-api/sse/article/66')這裏的參數比如是66或者67等 - 當然 我們也可以把token拼接到params中去,比如
new EventSource('https://ashuai.site/fastify-api/sse/article/66/user_token') - 或者也可以使用cookie去帶着token
- 但是這畢竟不優雅,前人已經提供好了一些包解決方案供我們使用
- 比如event-source-polyfill和fetch-event-source
event-source-polyfill
示例代碼
import { EventSourcePolyfill } from 'event-source-polyfill';
// 初始化配置項多多
const es = new EventSourcePolyfill('/api/sse', {
method: 'POST', // 支持POST
headers: {
'Authorization': 'user_token',
'Content-Type': 'application/json'
},
body: JSON.stringify({ id: 66 }) // POST請求體
});
// 監聽服務器發送的消息
es.onmessage = (event) => { console.log('收到數據:', event.data) };
// 監聽連接打開
es.onopen = () => { console.log('連接已建立') };
// 監聽錯誤
es.onerror = (error) => { console.error('連接錯誤:', error) };
或者使用fetch-event-source
fetch-event-source
示例代碼
import { fetchEventSource } from '@microsoft/fetch-event-source';
fetchEventSource('/api/sse', {
method: 'POST',
headers: {
'Authorization': 'user_token',
'Content-Type': 'application/json'
},
body: JSON.stringify({ id: 66 }),
onopen: (response) => {
if (!response.ok) {
console.error('連接失敗:', response.status);
} else {
console.log('連接已建立');
}
},
onmessage: (event) => {
console.log('收到數據:', event.data);
},
onclose: () => {
console.log('連接關閉');
},
onerror: (error) => {
console.error('連接錯誤:', error);
// 可以返回true表示重試連接
return true;
}
});
| 特性 | event-source-polyfill |
fetch-event-source |
|---|---|---|
| 底層依賴 | 模擬原生 EventSource |
基於現代 Fetch API |
| 兼容性 | 支持舊瀏覽器(如 IE9+) | 依賴 Fetch,需兼容現代瀏覽器 |
| 靈活性 | 有限(接近原生 API) | 更高(支持 Fetch 所有特性) |
| 重試與控制 | 簡單重試邏輯 | 自定義重試策略、取消機制 |
筆者推薦fetch-event-source這個包,筆者在實際項目中,使用的也是這個,沒有用原生EventSource寫法
F12的Network中請求類型的區別
注意,如果是使用原生的EventSource,那麼F12的Network中請求type類型是eventssource
如果是使用fetch方式的話,那麼請求type類型則是fetch,我們打開ChatGpt發現其類型正是fetch
但是,無論哪種,後端的響應頭,都要返回Content-Type text/event-stream; charset=utf-8來告訴瀏覽器,這是一個事件流,這樣的話才能有如下圖的EventStream
筆者的返回EventStream
ChatGpt的返回EventStream
大家可以打開豆包、通義千文,大家會發現這兩家也是用的fetch技術,並沒有直接使用EventSource,不過deepseek特殊一些用的是xhr方式回覆消息,但同樣也是Content-Type text/event-stream; charset=utf-8
除此之外,還可以使用Fetch ReadableStream讀取Chunked流實現see功能如下
使用Fetch ReadableStream讀取Chunked分塊流實現see功能
核心:
rawReply.writeHead(200, {
'Transfer-Encoding': 'chunked', // 👈 關鍵!告訴瀏覽器用分塊方式接收
'Content-Type': 'text/plain; charset=utf-8',
...
})
瀏覽器收到這個頭後會
- 知道數據會分多次發送過來
- 自動解析每個chunk前面的十六進制長度
- 把解析後的純數據交給JavaScript
- 持續讀取直到收到
0\r\n\r\n(結束標記)
這種方式稍微複雜一些,應對特殊的需求能用上 一般來説,fetch-event-source 夠用了
Fetch + ReadableStream
- 是更底層、更靈活的流式數據接收方式
- 可以接收任意格式的流式數據(JSON、純文本、二進制等)
- 需要自己處理數據解析、緩衝區等
- 可以實現 SSE 的效果,甚至更強大
效果圖
線上示例地址:https://ashuai.site/reactExamples/sse2
Controller層
// 根據ID獲取對應文章2(chunked傳輸)
const getArticleById2 = async (request, reply) => {
try {
const { id } = request.params
// 參數校驗
const idValidation = validateId(id)
if (!idValidation.isValid) {
return ResponseUtils.sendValidationError(reply, 'ID參數無效', idValidation.errors)
}
const sseService = createSseService()
// 獲取原始響應對象
const rawReply = reply.raw
// 設置響應頭
rawReply.writeHead(200, {
'Content-Type': 'text/plain; charset=utf-8',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
'Connection': 'keep-alive'
})
let isConnectionClosed = false
// 監聽連接關閉事件
rawReply.on('close', () => {
isConnectionClosed = true
})
rawReply.on('finish', () => {
isConnectionClosed = true
})
// 定義寫入chunk的輔助函數
const writeChunk = (data) => {
if (isConnectionClosed) return
const dataStr = JSON.stringify(data)
const bufferChunk = Buffer.from(dataStr, 'utf8')
rawReply.write(`${bufferChunk.length.toString(16)}\r\n`)
rawReply.write(dataStr)
rawReply.write('\r\n')
}
// 定義處理每個數據塊的回調函數
const handleChunk = async (chunk, isLast) => {
if (isConnectionClosed) return
writeChunk(chunk)
if (isLast) {
rawReply.write('0\r\n\r\n')
rawReply.end()
}
}
// 調用service層的流式傳輸方法
const result = await sseService.streamArticleWithChunked(
idValidation.data,
handleChunk,
{ chunkSize: 10, delayMs: 500 }
)
if (result === null) {
return ResponseUtils.sendNotFound(reply, '文章不存在')
}
} catch (error) {
request.log.error(error)
// chunked傳輸錯誤處理
const errorData = JSON.stringify({
code: 500,
message: '查詢失敗',
error: error.message,
type: 'error'
})
const errorChunk = Buffer.from(errorData, 'utf8')
reply.raw.write(`${errorChunk.length.toString(16)}\r\n`)
reply.raw.write(errorData)
reply.raw.write('\r\n0\r\n\r\n')
reply.raw.end()
}
}
Service層
數據
const article2 = `臣本布衣,躬耕於南陽,苟全性命於亂世,不求聞達於諸侯。
先帝不以臣卑鄙,猥自枉屈,三顧臣於草廬之中,諮臣以當世之事,由是感激,遂許先帝以驅馳。
後值傾覆,受任於敗軍之際,奉命於危難之間,爾來二十有一年矣。
先帝知臣謹慎,故臨崩寄臣以大事也。受命以來,夙夜憂嘆,恐託付不效,以傷先帝之明;
故五月渡瀘,深入不毛。今南方已定,兵甲已足,當獎率三軍,北定中原,庶竭駑鈍,攘除奸兇,興復漢室,還於舊都。
此臣所以報先帝而忠陛下之職分也。至於斟酌損益,進盡忠言,則攸之、禕、允之任也。
願陛下託臣以討賊興復之效,不效,則治臣之罪,以告先帝之靈。若無興德之言,則責攸之、禕、允等之慢,以彰其咎;
陛下亦宜自謀,以諮諏善道,察納雅言,深追先帝遺詔。臣不勝受恩感激。今當遠離,臨表涕零,不知所言。`
業務邏輯處理
const createSseService = () => {
// 根據ID獲取對應文章2(用於chunked傳輸)
const getArticleById2 = (id) => {
return {
id,
article: article2,
}
}
/**
* Chunked傳輸文章
* @param {number} id - 文章ID
* @param {Function} onChunk - 處理每個數據塊的回調函數 (chunk, isLast) => void
* @param {Object} options - 配置選項
* @param {number} options.chunkSize - 每個分段的字符數,默認10
* @param {number} options.delayMs - 每個分段之間的延遲時間(毫秒),默認500
* @returns {Promise<boolean|null>} 成功返回true,文章不存在返回null
*/
const streamArticleWithChunked = async (id, onChunk, options = {}) => {
const { chunkSize = 10, delayMs = 500 } = options
const articleData = getArticleById2(id)
const { article } = articleData
// 發送開始標記
await onChunk({
id: articleData.id,
totalLength: article.length,
type: 'chunked'
}, false)
// 等待初始延遲
await new Promise(resolve => setTimeout(resolve, 100))
// 分段發送文章內容
let currentIndex = 0
const totalChunks = Math.ceil(article.length / chunkSize)
while (currentIndex < article.length) {
const chunk = article.slice(currentIndex, currentIndex + chunkSize)
await onChunk({
index: Math.floor(currentIndex / chunkSize),
content: chunk,
progress: Math.round(((currentIndex + chunkSize) / article.length) * 100),
type: 'chunk'
}, false)
currentIndex += chunkSize
// 如果還有下一段,等待延遲
if (currentIndex < article.length) {
await new Promise(resolve => setTimeout(resolve, delayMs))
}
}
// 發送結束標記
await onChunk({
totalChunks,
message: '文章發送完成',
type: 'end'
}, true)
return true
}
// 返回所有方法
return {
getArticleById2,
streamArticleWithChunked
}
}
module.exports = createSseService
前端代碼
import React, { useState, useRef, useEffect } from 'react'
import { Button, Progress } from 'antd'
import './Sse2.css'
export default function Sse2() {
const [articleText, setArticleText] = useState('') // 顯示在界面上的文章內容
const [progress, setProgress] = useState(0) // 當前加載進度(0-100)
const [isStreaming, setIsStreaming] = useState(false) // 標記是否正在接收數據流
const currentTextRef = useRef('') // 打字機效果中實時累積的文本內容(緩存)
const typeWriterTimerRef = useRef(null) // 打字機定時器的 ID,用於在需要時清理定時器
const promiseChainRef = useRef(Promise.resolve()) // Promise 鏈,確保多個打字機效果按順序執行,不會亂序
const abortControllerRef = useRef(null) // 用於中斷 fetch 請求的控制器
useEffect(() => {
return () => {
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
if (typeWriterTimerRef.current) {
clearInterval(typeWriterTimerRef.current)
}
}
}, [])
const typeWriter = (text) => {
return new Promise((resolve) => {
let index = 0 // 當前要顯示的字符索引
const interval = setInterval(() => {
// 安全檢查:如果組件已卸載(定時器被清除),停止執行
if (!typeWriterTimerRef.current) {
clearInterval(interval)
return
}
// 還有字符沒顯示完,繼續逐個添加
if (index < text.length) {
currentTextRef.current += text[index] // 累積文本到 ref 中
setArticleText(currentTextRef.current) // 更新界面顯示
index++ // 移動到下一個字符
} else {
// 所有字符都顯示完了,清理定時器並通知 Promise 完成
clearInterval(interval)
typeWriterTimerRef.current = null
resolve() // 告訴外部:這段文本顯示完了,可以顯示下一段了
}
}, 50) // 每 50ms 顯示一個字符
typeWriterTimerRef.current = interval // 保存定時器 ID,方便後續清理
})
}
/**
* 處理從服務器接收到的分塊數據
* @param {Object} data - 服務器返回的數據對象
*
* 數據類型説明:
* - type: 'chunk' -> 文章內容片段,需要用打字機效果顯示
* - type: 'end' -> 傳輸結束標記
* - type: 'error' -> 錯誤信息
*/
const handleChunkedData = async (data) => {
// 情況1:接收到一段文章內容
if (data.type === 'chunk') {
promiseChainRef.current = promiseChainRef.current
.then(() => typeWriter(data.content)) // 先等上一段顯示完
.then(() => setProgress(data.progress)) // 再更新進度條
return
}
// 情況2:接收到結束標記
if (data.type === 'end') {
// 等待所有文本都顯示完成後,再標記傳輸結束
// 這樣不會出現文本還沒顯示完,按鈕就變成可點擊的情況
promiseChainRef.current = promiseChainRef.current.then(() => {
setIsStreaming(false)
})
return
}
// 情況3:服務器返回了錯誤
if (data.type === 'error') {
throw new Error(data.message || '服務器返回錯誤')
}
}
/**
* 使用 fetch + ReadableStream 接收服務器的流式數據
*
* 流式傳輸的優勢:
* 1. 數據邊接收邊顯示,不用等所有數據到達後才顯示
* 2. 類似於 ChatGPT 的效果,用户體驗更好
* 3. 可以處理大量數據,不會一次性佔用太多內存
*/
const startChunkedStream = async () => {
// 如果之前有未完成的請求,先取消掉
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
// 創建新的取消控制器,用於取消這次請求
abortControllerRef.current = new AbortController()
// 重置所有狀態,準備接收新數據
currentTextRef.current = '' // 清空緩存的文本
setArticleText('') // 清空界面顯示
setProgress(0) // 進度歸零
setIsStreaming(true) // 標記為正在接收
promiseChainRef.current = Promise.resolve() // 重置 Promise 鏈
try {
// 向服務器發起請求
const response = await fetch('https://ashuai.site/fastify-api/sse/article2/1', {
signal: abortControllerRef.current.signal // 傳入取消控制器,可隨時中斷請求
})
// 檢查請求是否成功
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`)
}
// 獲取響應體的讀取器(用於讀取流式數據)
const reader = response.body.getReader()
// 創建文本解碼器(將二進制數據轉換為文本)
const decoder = new TextDecoder()
// 緩衝區:用於存儲不完整的數據
let buffer = ''
// 循環讀取數據流
while (true) {
// 讀取一塊數據
const { done, value } = await reader.read()
// 如果數據讀取完畢,退出循環
if (done) {
break
}
// 將二進制數據解碼成文本
const chunk = decoder.decode(value, { stream: true })
buffer += chunk // 累積到緩衝區
// 按行分割數據(服務器每次發送一個完整的 JSON 對象,以換行符分隔)
const lines = buffer.split('\n')
// 處理每一行數據(除了最後一行,因為最後一行可能不完整)
for (let i = 0; i < lines.length - 1; i++) {
const line = lines[i].trim()
// 忽略空行和 '0'('0' 是 chunked 編碼的結束標記)
if (line && line !== '0') {
try {
// 將 JSON 字符串解析為對象
const data = JSON.parse(line)
// 處理這條數據(顯示文本、更新進度等)
await handleChunkedData(data)
} catch (e) {
// 如果解析失敗,忽略這條數據,繼續處理下一條
}
}
}
// 將不完整的行保留在緩衝區中,等待下次接收數據時拼接
const lastNewlineIndex = buffer.lastIndexOf('\n')
if (lastNewlineIndex !== -1) {
buffer = buffer.substring(lastNewlineIndex + 1)
}
}
} catch (error) {
// 處理錯誤
if (error.name === 'AbortError') {
console.log('傳輸已取消')
} else {
console.error('Chunked傳輸錯誤:', error)
}
setIsStreaming(false)
} finally {
// 無論成功或失敗,都清空取消控制器
abortControllerRef.current = null
}
}
/**
* 取消正在進行的流式傳輸
* 需要清理三個東西:
* 1. 網絡請求
* 2. 打字機定時器
* 3. Promise 鏈
*/
const cancelStream = () => {
// 1. 取消網絡請求
if (abortControllerRef.current) {
abortControllerRef.current.abort()
abortControllerRef.current = null
}
// 2. 停止打字機效果
if (typeWriterTimerRef.current) {
clearInterval(typeWriterTimerRef.current)
typeWriterTimerRef.current = null
}
// 3. 更新狀態
setIsStreaming(false)
// 4. 重置 Promise 鏈
promiseChainRef.current = Promise.resolve()
}
return (
<div style={{ width: '720px' }}>
{/* 文章顯示區域 */}
<div className="article-container">
<div className="article-text">
{articleText}
{/* 當正在接收時,顯示一個閃爍的光標,模擬打字效果 */}
{isStreaming && <span className="cursor"></span>}
</div>
</div>
<Progress percent={progress} />
<Button
type="primary"
onClick={startChunkedStream}
disabled={isStreaming} // 正在接收時禁用,避免重複點擊
>
{isStreaming ? '正在接收...' : '開始接收'}
</Button>
<Button onClick={cancelStream} disabled={!isStreaming}>
{isStreaming ? '取消傳輸' : '未在傳輸'}
</Button>
</div>
)
}
A good memory is not as reliable as a written record. Write it down...