知識庫 / Reactive RSS 訂閱

Spring 中的服務器發送事件

Reactive,Spring Web
HongKong
8
01:38 PM · Dec 06 ,2025

1. 概述

在本教程中,我們將學習如何使用 Spring 實現基於 Server-Sent-Events 的 API。

簡單來説,Server-Sent-Events,簡稱 SSE,是一種 HTTP 標準,它允許 Web 應用程序處理單向事件流,並在服務器發出數據時接收更新。

Spring 4.2 版本已經支持它,但從 Spring 5 開始,我們現在有更具表現力和便捷的方式來處理它。

2. 使用 Spring 6 Webflux 實現 SSE

為了實現這一點,我們可以利用諸如 Flux 類(由 Reactor 庫提供的)之類的實現,或者潛在的 ServerSentEvent 實體,從而獲得對事件元數據的控制。

2.1. 使用 Flux 流式事件

Flux 是一個對流式事件的反應式表示 – 它根據指定的請求或響應媒體類型進行處理。

為了創建 SSE 流式端點,我們需要遵循 W3C 規範,並將其 MIME 類型設置為 text/event-stream

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}

interval 方法創建一個 Flux,該 Flux 會逐步地發出 值。然後我們將這些值映射到我們想要的結果。

讓我們啓動我們的應用程序並嘗試通過瀏覽端點進行測試。

我們將看到瀏覽器如何響應服務器每秒鐘推送的事件。有關 FluxReactor Core 的更多信息,請查看這篇帖子。

2.2. 使用 ServerSentEvent 元素

我們將把我們的輸出 String 包裝成 ServerSentEvent 對象,並探討這樣做的好處:

@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
          .event("periodic-event")
          .data("SSE - " + LocalTime.now().toString())
          .build());
}

正如我們所能理解的,使用 ServerSentEvent 實體具有幾個好處:

  1. 我們可以處理事件元數據,這在實際場景中都是必需的
  2. 我們可以忽略 “text/event-stream” 媒體類型聲明

在這種情況下,我們指定了 idevent name 和最重要的,事件的實際 data

此外,我們還可以添加 comments 屬性和 retry 值,這將指定在嘗試發送事件時使用的重連時間。

2.3. 使用 WebClient 消費 Server-Sent Events

現在,讓我們使用 WebClient 消費我們的事件流:

public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

subscribe方法允許我們指示在成功接收事件、發生錯誤或流式傳輸完成時應採取的行動。

在我們的示例中,我們使用了retrieve方法,這是一種簡單直接的方式來獲取響應體。

此方法會在收到4xx或5xx響應時自動拋出WebClientResponseException,除非我們處理這些場景並添加onStatus語句。

另一方面,我們也可以使用exchange方法,它提供對ClientResponse的訪問,並且不會在失敗響應上發出錯誤信號。

需要注意的是,如果不需要事件元數據,我們可以繞過ServerSentEvent包裝器。

3. Spring MVC 中的 SSE 流式傳輸

正如我們所説,SSE(Server-Sent Events)規範自 Spring 4.2 版本起就得到了支持,當時引入了 <em >SseEmitter</em> 類。

簡單來説,我們將定義一個 ExecutorService,即 SseEmitter 將執行其工作,推送數據的線程,並返回該 emitter 實例,以這種方式保持連接的打開狀態:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}

請務必根據您的用例場景選擇合適的ExecutorService

我們可以通過閲讀這篇有趣的教程,瞭解更多關於 SSE 在 Spring MVC 中的信息,並查看其他示例。

4. 理解服務器推送事件

現在我們知道如何實現 SSE 端點,接下來讓我們深入理解一些底層概念。

SSE 是一種由大多數瀏覽器採用的規範,允許在任何時候單向流式傳輸事件。

“事件”只是 UTF-8 編碼的文本數據流,遵循規範定義的格式。

該格式由一系列鍵值對元素組成(id、retry、data 和 event,表示名稱),這些元素之間用換行符分隔。

同時,支持註釋。

規範對數據報文格式沒有任何限制;我們可以使用簡單的 String 或更復雜的 JSON 或 XML 結構。

我們必須考慮的一點是,SSE 流式傳輸和 WebSockets 之間的區別。

雖然 WebSockets 提供了服務器與客户端之間全雙工(雙向)的通信,而 SSE 採用單向通信。

此外,WebSockets 不是 HTTP 協議,與 SSE 相比,它不提供錯誤處理標準。

5. 結論

總而言之,本文介紹了 SSE 串流的主要概念,這無疑是一個強大的資源,將幫助我們構建下一代系統。

我們現在對使用該協議時底層發生的事情有了深刻的理解。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.