知識庫 / Reactive RSS 訂閱

反應式WebSockets與Spring

Reactive,Spring Web
HongKong
9
02:07 PM · Dec 06 ,2025

1. 概述

在本文中,我們將創建一個快速示例,使用最新的 Spring 5 WebSockets API 以及 Spring WebFlux 提供的反應式特性。

WebSocket 是一種成熟的協議,它允許客户端和服務器之間進行全雙工通信,通常用於 Web 應用程序,其中客户端和服務器需要以高頻率和低延遲進行事件交換。

Spring Framework 5 現代化了 WebSockets 支持,為該通信通道添加了反應式能力。

有關 Spring WebFlux 的更多信息,請參閲 此處

2. Maven 依賴項

我們將使用 spring-boot-starter 依賴項來引入 spring-boot-starter-webflux

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.3.0</version>
</dependency>

3. WebSocket 配置在 Spring 中

我們的配置相當簡單:我們將注入 WebSocketHandler 以處理我們的 Spring WebSocket 應用程序中的套接字會話。

@Autowired
private WebSocketHandler webSocketHandler;

此外,我們還將創建一個使用 HandlerMapping 註解的方法,該方法負責將請求映射到 handler 對象:

@Bean
public HandlerMapping webSocketHandlerMapping() {
    Map<String, WebSocketHandler> map = new HashMap<>();
    map.put("/event-emitter", webSocketHandler);

    SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
    handlerMapping.setOrder(1);
    handlerMapping.setUrlMap(map);
    return handlerMapping;
}

我們可以連接的URL是 ws://localhost:<port>/event-emitter.

4. WebSocket 消息處理在 Spring 中

我們的 ReactiveWebSocketHandler 類將負責在服務器端管理 WebSocket 會話。

它實現了 WebSocketHandler 接口,因此我們可以覆蓋 handle 方法,該方法將用於將消息發送到 WebSocket 客户端:

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
    
    // private fields ...

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        return webSocketSession.send(intervalFlux
          .map(webSocketSession::textMessage))
          .and(webSocketSession.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .log());
    }
}

5. 創建一個簡單的反應式 WebSocket 客户端

現在,我們將創建一個 Spring 反應式 WebSocket 客户端,它可以連接並與我們的 WebSocket 服務器交換信息。

5.1. Maven 依賴

首先,Maven 依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

在這裏,我們使用了之前相同的 spring-boot-starter-webflux 來搭建我們的反應式 WebSocket 服務器應用程序。

5.2. WebSocket 客户端

現在,讓我們創建一個 ReactiveClientWebSocket 類,該類負責啓動與服務器的通信:

public class ReactiveJavaClientWebSocket {
 
    public static void main(String[] args) throws InterruptedException {
 
        WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(
          URI.create("ws://localhost:8080/event-emitter"), 
          session -> session.send(
            Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
            .thenMany(session.receive()
              .map(WebSocketMessage::getPayloadAsText)
              .log())
            .then())
            .block(Duration.ofSeconds(10L));
    }
}

在上述代碼中,我們可以看到我們使用了 ReactorNettyWebSocketClient,它是用於與 Reactor Netty 配合使用的 WebSocketClient 實現。

此外,客户端通過 URL ws://localhost:8080/event-emitter 連接到 WebSocket 服務器,並在連接到服務器後立即建立會話。

我們還可以看到,我們向服務器發送了一條消息 (“event-spring-reactive-client-websocket”) 以及連接請求。

更重要的是,調用了 send 方法,該方法期望一個類型為 Publisher<T>, 在本例中,我們的 Publisher<T>Mono<T>,且 T 是一個簡單的字符串 “event-me-from-reactive-java-client-websocket“。

此外,調用了 thenMany(…) 方法,該方法期望一個類型為 Flux 的流,該流是 String 類型。 receive() 方法獲取傳入的消息流,然後將其轉換為字符串。

最後,調用了 block() 方法,強制客户端在給定的時間(在本例中為 10 秒)後斷開與服務器的連接。

5.3. 啓動客户端

要運行它,請確保 Reactive WebSocket 服務器已啓動並運行。然後,啓動 <em >ReactiveJavaClientWebSocket</em> 類,我們可以在 <em >sysout</em> 日誌中看到發出的事件:

[reactor-http-nio-4] INFO reactor.Flux.Map.1 - 
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2018-01-11T23:29:26.900"})

我們還可以從我們的 Reactive WebSocket 服務器的日誌中看到客户端在連接嘗試期間發送的消息:

[reactor-http-nio-2] reactor.Flux.Map.1: 
onNext(event-me-from-reactive-java-client)

此外,當客户端完成請求後,我們還可以看到斷開連接的消息(在本例中,在10秒後):

[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()

6. 創建瀏覽器 WebSocket 客户端

讓我們創建一個簡單的 HTML/Javascript 客户端,使用 WebSocket 與我們的反應式 WebSocket 服務器應用程序進行通信。

<div class="events"></div>
<script>
    var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
    clientWebSocket.onopen = function() {
        console.log("clientWebSocket.onopen", clientWebSocket);
        console.log("clientWebSocket.readyState", "websocketstatus");
        clientWebSocket.send("event-me-from-browser");
    }
    clientWebSocket.onclose = function(error) {
        console.log("clientWebSocket.onclose", clientWebSocket, error);
        events("Closing connection");
    }
    clientWebSocket.onerror = function(error) {
        console.log("clientWebSocket.onerror", clientWebSocket, error);
        events("An error occured");
    }
    clientWebSocket.onmessage = function(error) {
        console.log("clientWebSocket.onmessage", clientWebSocket, error);
        events(error.data);
    }
    function events(responseEvent) {
        document.querySelector(".events").innerHTML += responseEvent + "<br>";
    }
</script>

在 WebSocket 服務器運行的情況下,通過在瀏覽器(例如 Chrome、Internet Explorer、Mozilla Firefox 等)中打開此 HTML 文件,我們應該能在屏幕上看到事件被打印出來,每個事件之間間隔 1 秒,這與我們的 WebSocket 服務器中定義的間隔時間一致。

{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}

7. 結論

在此,我們展示了使用 Spring 5 框架在服務器和客户端之間創建 WebSocket 通信的示例,並實現了 Spring Webflux 提供的全新響應式特性。

user avatar
0 位用戶收藏了這個故事!
收藏

發佈 評論

Some HTML is okay.