1. 概述
在本文中,我們將創建一個快速示例,使用最新的 Spring 5 WebSockets API 以及 Spring WebFlux 提供的反應式特性。
WebSocket 是一種成熟的協議,它允許客户端和服務器之間進行全雙工通信,通常用於 Web 應用程序,其中客户端和服務器需要以高頻率和低延遲進行事件交換。
Spring Framework 5 現代化了 WebSockets 支持,為該通信通道添加了反應式能力。
有關 Spring WebFlux 的更多信息,請參閲 此處。
2. Maven 依賴項
我們將使用 spring-boot-starter 依賴項來引入 spring-boot-starter-webflux。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.3.0</version>
</dependency>3. WebSocket 配置在 Spring 中
我們的配置相當簡單:我們將注入 WebSocketHandler 以處理我們的 Spring WebSocket 應用程序中的套接字會話。
@Autowired
private WebSocketHandler webSocketHandler;
此外,我們還將創建一個使用 HandlerMapping 註解的方法,該方法負責將請求映射到 handler 對象:
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}我們可以連接的URL是 ws://localhost:<port>/event-emitter.
4. WebSocket 消息處理在 Spring 中
我們的 ReactiveWebSocketHandler 類將負責在服務器端管理 WebSocket 會話。
它實現了 WebSocketHandler 接口,因此我們可以覆蓋 handle 方法,該方法將用於將消息發送到 WebSocket 客户端:
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
// private fields ...
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText)
.log());
}
}5. 創建一個簡單的反應式 WebSocket 客户端
現在,我們將創建一個 Spring 反應式 WebSocket 客户端,它可以連接並與我們的 WebSocket 服務器交換信息。
5.1. Maven 依賴
首先,Maven 依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>在這裏,我們使用了之前相同的 spring-boot-starter-webflux 來搭建我們的反應式 WebSocket 服務器應用程序。
5.2. WebSocket 客户端
現在,讓我們創建一個 ReactiveClientWebSocket 類,該類負責啓動與服務器的通信:
public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log())
.then())
.block(Duration.ofSeconds(10L));
}
}在上述代碼中,我們可以看到我們使用了 ReactorNettyWebSocketClient,它是用於與 Reactor Netty 配合使用的 WebSocketClient 實現。
此外,客户端通過 URL ws://localhost:8080/event-emitter 連接到 WebSocket 服務器,並在連接到服務器後立即建立會話。
我們還可以看到,我們向服務器發送了一條消息 (“event-spring-reactive-client-websocket”) 以及連接請求。
更重要的是,調用了 send 方法,該方法期望一個類型為 Publisher<T>, 在本例中,我們的 Publisher<T> 是 Mono<T>,且 T 是一個簡單的字符串 “event-me-from-reactive-java-client-websocket“。
此外,調用了 thenMany(…) 方法,該方法期望一個類型為 Flux 的流,該流是 String 類型。 receive() 方法獲取傳入的消息流,然後將其轉換為字符串。
最後,調用了 block() 方法,強制客户端在給定的時間(在本例中為 10 秒)後斷開與服務器的連接。
5.3. 啓動客户端
要運行它,請確保 Reactive WebSocket 服務器已啓動並運行。然後,啓動 <em >ReactiveJavaClientWebSocket</em> 類,我們可以在 <em >sysout</em> 日誌中看到發出的事件:
[reactor-http-nio-4] INFO reactor.Flux.Map.1 -
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2018-01-11T23:29:26.900"})我們還可以從我們的 Reactive WebSocket 服務器的日誌中看到客户端在連接嘗試期間發送的消息:
[reactor-http-nio-2] reactor.Flux.Map.1:
onNext(event-me-from-reactive-java-client)此外,當客户端完成請求後,我們還可以看到斷開連接的消息(在本例中,在10秒後):
[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()6. 創建瀏覽器 WebSocket 客户端
讓我們創建一個簡單的 HTML/Javascript 客户端,使用 WebSocket 與我們的反應式 WebSocket 服務器應用程序進行通信。
<div class="events"></div>
<script>
var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
clientWebSocket.onopen = function() {
console.log("clientWebSocket.onopen", clientWebSocket);
console.log("clientWebSocket.readyState", "websocketstatus");
clientWebSocket.send("event-me-from-browser");
}
clientWebSocket.onclose = function(error) {
console.log("clientWebSocket.onclose", clientWebSocket, error);
events("Closing connection");
}
clientWebSocket.onerror = function(error) {
console.log("clientWebSocket.onerror", clientWebSocket, error);
events("An error occured");
}
clientWebSocket.onmessage = function(error) {
console.log("clientWebSocket.onmessage", clientWebSocket, error);
events(error.data);
}
function events(responseEvent) {
document.querySelector(".events").innerHTML += responseEvent + "<br>";
}
</script>
在 WebSocket 服務器運行的情況下,通過在瀏覽器(例如 Chrome、Internet Explorer、Mozilla Firefox 等)中打開此 HTML 文件,我們應該能在屏幕上看到事件被打印出來,每個事件之間間隔 1 秒,這與我們的 WebSocket 服務器中定義的間隔時間一致。
{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2018-01-11T23:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2018-01-11T23:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2018-01-11T23:56:09.782"}7. 結論
在此,我們展示了使用 Spring 5 框架在服務器和客户端之間創建 WebSocket 通信的示例,並實現了 Spring Webflux 提供的全新響應式特性。