1. 概述
在本教程中,我們將學習如何使用 Spring 實現基於 Server-Sent-Events 的 API。
簡單來説,Server-Sent-Events,簡稱 SSE,是一種 HTTP 標準,它允許 Web 應用程序處理單向事件流,並在服務器發出數據時接收更新。
Spring 4.2 版本已經支持它,但從 Spring 5 開始,我們現在有更具表現力和便捷的方式來處理它。
2. 使用 Spring 6 Webflux 實現 SSE
為了實現這一點,我們可以利用諸如 Flux 類(由 Reactor 庫提供的)之類的實現,或者潛在的 ServerSentEvent 實體,從而獲得對事件元數據的控制。
2.1. 使用 Flux 流式事件
Flux 是一個對流式事件的反應式表示 – 它根據指定的請求或響應媒體類型進行處理。
為了創建 SSE 流式端點,我們需要遵循 W3C 規範,並將其 MIME 類型設置為 text/event-stream。
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}interval 方法創建一個 Flux,該 Flux 會逐步地發出 長 值。然後我們將這些值映射到我們想要的結果。
讓我們啓動我們的應用程序並嘗試通過瀏覽端點進行測試。
我們將看到瀏覽器如何響應服務器每秒鐘推送的事件。有關 Flux 和 Reactor Core 的更多信息,請查看這篇帖子。
2.2. 使用 ServerSentEvent 元素
我們將把我們的輸出 String 包裝成 ServerSentEvent 對象,並探討這樣做的好處:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}正如我們所能理解的,使用 ServerSentEvent 實體具有幾個好處:
- 我們可以處理事件元數據,這在實際場景中都是必需的
- 我們可以忽略 “text/event-stream” 媒體類型聲明
在這種情況下,我們指定了 id、event name 和最重要的,事件的實際 data。
此外,我們還可以添加 comments 屬性和 retry 值,這將指定在嘗試發送事件時使用的重連時間。
2.3. 使用 WebClient 消費 Server-Sent Events
現在,讓我們使用 WebClient 消費我們的事件流:
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}subscribe方法允許我們指示在成功接收事件、發生錯誤或流式傳輸完成時應採取的行動。
在我們的示例中,我們使用了retrieve方法,這是一種簡單直接的方式來獲取響應體。
此方法會在收到4xx或5xx響應時自動拋出WebClientResponseException,除非我們處理這些場景並添加onStatus語句。
另一方面,我們也可以使用exchange方法,它提供對ClientResponse的訪問,並且不會在失敗響應上發出錯誤信號。
需要注意的是,如果不需要事件元數據,我們可以繞過ServerSentEvent包裝器。
3. Spring MVC 中的 SSE 流式傳輸
正如我們所説,SSE(Server-Sent Events)規範自 Spring 4.2 版本起就得到了支持,當時引入了 <em >SseEmitter</em> 類。
簡單來説,我們將定義一個 ExecutorService,即 SseEmitter 將執行其工作,推送數據的線程,並返回該 emitter 實例,以這種方式保持連接的打開狀態:
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}請務必根據您的用例場景選擇合適的ExecutorService。
我們可以通過閲讀這篇有趣的教程,瞭解更多關於 SSE 在 Spring MVC 中的信息,並查看其他示例。
4. 理解服務器推送事件
現在我們知道如何實現 SSE 端點,接下來讓我們深入理解一些底層概念。
SSE 是一種由大多數瀏覽器採用的規範,允許在任何時候單向流式傳輸事件。
“事件”只是 UTF-8 編碼的文本數據流,遵循規範定義的格式。
該格式由一系列鍵值對元素組成(id、retry、data 和 event,表示名稱),這些元素之間用換行符分隔。
同時,支持註釋。
規範對數據報文格式沒有任何限制;我們可以使用簡單的 String 或更復雜的 JSON 或 XML 結構。
我們必須考慮的一點是,SSE 流式傳輸和 WebSockets 之間的區別。
雖然 WebSockets 提供了服務器與客户端之間全雙工(雙向)的通信,而 SSE 採用單向通信。
此外,WebSockets 不是 HTTP 協議,與 SSE 相比,它不提供錯誤處理標準。
5. 結論
總而言之,本文介紹了 SSE 串流的主要概念,這無疑是一個強大的資源,將幫助我們構建下一代系統。
我們現在對使用該協議時底層發生的事情有了深刻的理解。