WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,允許服務器和客户端之間進行實時雙向通信。
基本使用
1. 創建 WebSocket 連接
// 創建 WebSocket 連接 const socket = new WebSocket('ws://localhost:8080'); // 或者使用安全連接 const secureSocket = new WebSocket('wss://example.com/socket');
2. WebSocket 事件
// 連接建立時觸發 socket.onopen = function(event) { console.log('連接已建立'); socket.send('Hello Server!'); }; // 接收到消息時觸發 socket.onmessage = function(event) { console.log('收到消息:', event.data); // 處理接收到的數據 }; // 發生錯誤時觸發 socket.onerror = function(error) { console.error('WebSocket 錯誤:', error); }; // 連接關閉時觸發 socket.onclose = function(event) { console.log('連接關閉', event.code, event.reason); // 可以在這裏嘗試重連 };
完整示例
客户端示例
<!DOCTYPE html> <html> <head> <title>WebSocket 示例</title> </head> <body> <div> <input type="text" id="messageInput" placeholder="輸入消息"> <button onclick="sendMessage()">發送</button> </div> <div id="messages"></div> <script> // 創建 WebSocket 連接 const socket = new WebSocket('ws://localhost:8080'); const messagesDiv = document.getElementById('messages'); // 連接建立 socket.onopen = function() { addMessage('系統', '連接成功!'); }; // 接收消息 socket.onmessage = function(event) { try { const data = JSON.parse(event.data); addMessage(data.sender, data.message); } catch (e) { addMessage('系統', event.data); } }; // 錯誤處理 socket.onerror = function(error) { addMessage('系統', '連接錯誤'); }; // 連接關閉 socket.onclose = function() { addMessage('系統', '連接已關閉'); }; // 發送消息 function sendMessage() { const input = document.getElementById('messageInput'); const message = input.value.trim(); if (message) { socket.send(JSON.stringify({ type: 'message', content: message, timestamp: new Date().toISOString() })); input.value = ''; } } // 顯示消息 function addMessage(sender, text) { const msgElement = document.createElement('div'); msgElement.innerHTML = `<strong>${sender}:</strong> ${text}`; messagesDiv.appendChild(msgElement); messagesDiv.scrollTop = messagesDiv.scrollHeight; } // 關閉連接(頁面卸載時) window.addEventListener('beforeunload', function() { if (socket.readyState === WebSocket.OPEN) { socket.close(1000, '用户離開頁面'); } }); </script> </body> </html>
Node.js 服務器端示例
// 使用 ws 庫 const WebSocket = require('ws'); // 創建 WebSocket 服務器 const wss = new WebSocket.Server({ port: 8080 }); console.log('WebSocket 服務器啓動在 ws://localhost:8080'); // 連接處理 wss.on('connection', function connection(ws) { console.log('新客户端連接'); // 發送歡迎消息 ws.send(JSON.stringify({ type: 'system', message: '歡迎連接到服務器!' })); // 接收客户端消息 ws.on('message', function incoming(message) { console.log('收到消息:', message); try { const data = JSON.parse(message); // 廣播消息給所有客户端 wss.clients.forEach(function each(client) { if (client !== ws && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ type: 'message', sender: '用户', message: data.content, timestamp: new Date().toISOString() })); } }); } catch (error) { console.error('消息解析錯誤:', error); } }); // 連接關閉 ws.on('close', function() { console.log('客户端斷開連接'); }); // 錯誤處理 ws.on('error', function(error) { console.error('WebSocket 錯誤:', error); }); });
WebSocket 狀態
// 檢查連接狀態 switch(socket.readyState) { case WebSocket.CONNECTING: // 0 - 連接中 console.log('連接中...'); break; case WebSocket.OPEN: // 1 - 已連接 console.log('已連接'); break; case WebSocket.CLOSING: // 2 - 關閉中 console.log('正在關閉...'); break; case WebSocket.CLOSED: // 3 - 已關閉 console.log('已關閉'); break; }
高級特性
1. 心跳檢測
// 心跳檢測 let heartbeatInterval; socket.onopen = function() { console.log('連接建立'); // 開始心跳 heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ type: 'ping' })); } }, 30000); }; socket.onclose = function() { // 清除心跳 clearInterval(heartbeatInterval); };
2. 重連機制
class WebSocketClient { constructor(url) { this.url = url; this.socket = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 1000; } connect() { this.socket = new WebSocket(this.url); this.socket.onopen = () => { console.log('連接成功'); this.reconnectAttempts = 0; }; this.socket.onclose = (event) => { console.log('連接斷開,嘗試重連...'); this.reconnect(); }; this.socket.onerror = (error) => { console.error('連接錯誤:', error); }; } reconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts); setTimeout(() => { console.log(`第 ${this.reconnectAttempts} 次重連`); this.connect(); }, delay); } else { console.error('重連次數已達上限'); } } send(data) { if (this.socket.readyState === WebSocket.OPEN) { this.socket.send(data); } } }
3. 二進制數據傳輸
// 發送二進制數據 socket.onopen = function() { // 發送 ArrayBuffer const buffer = new ArrayBuffer(4); const view = new Uint8Array(buffer); view[0] = 1; view[1] = 2; view[2] = 3; view[3] = 4; socket.send(buffer); // 發送 Blob const blob = new Blob(['Hello'], { type: 'text/plain' }); socket.send(blob); }; // 接收二進制數據 socket.binaryType = 'arraybuffer'; // 或 'blob' socket.onmessage = function(event) { if (event.data instanceof ArrayBuffer) { // 處理 ArrayBuffer const view = new Uint8Array(event.data); console.log('收到二進制數據:', view); } else { // 處理文本數據 console.log('收到文本數據:', event.data); } };
Spring Boot 中使用 WebSocket
添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
基礎配置類
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { // 消息代理前綴 config.enableSimpleBroker("/topic", "/queue"); // 應用目的地前綴 config.setApplicationDestinationPrefixes("/app"); // 用户目的地前綴(一對一消息) config.setUserDestinationPrefix("/user"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 註冊 WebSocket 端點 registry.addEndpoint("/ws") .setAllowedOriginPatterns("*") .withSockJS(); // 支持 SockJS 降級 // 也可以添加多個端點 registry.addEndpoint("/ws-native") .setAllowedOriginPatterns("*"); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { // 配置傳輸限制 registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB registration.setSendTimeLimit(20 * 1000); // 發送超時 20秒 registration.setSendBufferSizeLimit(512 * 1024); // 發送緩衝區限制 512KB } }
控制器示例
@Controller public class WebSocketController { // 注入消息模板 @Autowired private SimpMessagingTemplate messagingTemplate; /** * 處理客户端發送的消息 * 目的地:/app/chat */ @MessageMapping("/chat") @SendTo("/topic/messages") public ChatMessage handleMessage(ChatMessage message) { message.setTimestamp(new Date()); System.out.println("收到消息: " + message.getContent()); return message; } /** * 發送廣播消息 */ @GetMapping("/broadcast") public void broadcast(String content) { ChatMessage message = new ChatMessage(); message.setContent(content); message.setSender("系統"); message.setTimestamp(new Date()); // 發送到 /topic/messages messagingTemplate.convertAndSend("/topic/messages", message); } /** * 發送點對點消息 */ @GetMapping("/sendToUser") public void sendToUser(String userId, String content) { ChatMessage message = new ChatMessage(); message.setContent(content); message.setSender("管理員"); message.setTimestamp(new Date()); // 發送給指定用户:/user/{userId}/queue/messages messagingTemplate.convertAndSendToUser( userId, "/queue/messages", message ); } } // 消息實體類 @Data @AllArgsConstructor @NoArgsConstructor public class ChatMessage { private String sender; private String content; private Date timestamp; }
連接攔截器
@Component public class WebSocketInterceptor extends ChannelInterceptorAdapter { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { // 連接建立時處理 String token = accessor.getFirstNativeHeader("token"); // 驗證 token... System.out.println("用户連接: " + accessor.getSessionId()); } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) { // 連接斷開時處理 System.out.println("用户斷開: " + accessor.getSessionId()); } return message; } }
原生 Java WebSocket(JSR 356)
註解方式
@ServerEndpoint("/chat/{userId}")
@Component
public class ChatEndpoint {
// 存儲所有連接
private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
// 存儲用户ID和session的映射
private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
/**
* 連接建立時調用
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
System.out.println("連接建立: " + session.getId() + ", 用户: " + userId);
// 保存連接
sessions.put(session.getId(), session);
userSessionMap.put(userId, session.getId());
// 通知其他用户有新用户上線
broadcast("系統", "用户 " + userId + " 上線了");
}
/**
* 收到消息時調用
*/
@OnMessage
public void onMessage(String message, Session session,
@PathParam("userId") String userId) {
System.out.println("收到消息: " + message + " from: " + userId);
try {
// 解析消息
JSONObject json = new JSONObject(message);
String content = json.getString("content");
String toUserId = json.optString("to", null);
if (toUserId != null) {
// 私聊消息
sendToUser(userId, toUserId, content);
} else {
// 羣發消息
broadcast(userId, content);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 連接關閉時調用
*/
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
System.out.println("連接關閉: " + session.getId());
// 移除連接
sessions.remove(session.getId());
userSessionMap.remove(userId);
// 通知其他用户
broadcast("系統", "用户 " + userId + " 下線了");
}
/**
* 發生錯誤時調用
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("連接錯誤: " + session.getId());
error.printStackTrace();
}
/**
* 廣播消息給所有用户
*/
private void broadcast(String sender, String content) {
JSONObject message = new JSONObject();
message.put("sender", sender);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis());
message.put("type", "broadcast");
// 發送給所有連接的客户端
for (Session session : sessions.values()) {
if (session.isOpen()) {
try {
session.getAsyncRemote().sendText(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 發送私聊消息
*/
private void sendToUser(String fromUserId, String toUserId, String content) {
String toSessionId = userSessionMap.get(toUserId);
if (toSessionId != null) {
Session toSession = sessions.get(toSessionId);
if (toSession != null && toSession.isOpen()) {
try {
JSONObject message = new JSONObject();
message.put("sender", fromUserId);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis());
message.put("type", "private");
toSession.getAsyncRemote().sendText(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
編程方式(繼承 Endpoint 類)
@ServerEndpoint("/game")
public class GameEndpoint extends Endpoint {
private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("新連接: " + session.getId());
sessions.add(session);
// 添加消息處理器
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
System.out.println("收到: " + message);
// 處理遊戲邏輯
handleGameMessage(session, message);
}
});
// 發送歡迎消息
try {
JSONObject welcome = new JSONObject();
welcome.put("type", "welcome");
welcome.put("message", "歡迎加入遊戲!");
welcome.put("sessionId", session.getId());
session.getBasicRemote().sendText(welcome.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onClose(Session session, CloseReason closeReason) {
System.out.println("連接關閉: " + session.getId());
sessions.remove(session);
// 通知其他玩家
broadcastPlayerLeft(session.getId());
}
@Override
public void onError(Session session, Throwable thr) {
System.err.println("連接錯誤: " + session.getId());
thr.printStackTrace();
}
private void handleGameMessage(Session session, String message) {
try {
JSONObject json = new JSONObject(message);
String type = json.getString("type");
switch (type) {
case "move":
// 處理移動
handlePlayerMove(session, json);
break;
case "chat":
// 處理聊天
handleChatMessage(session, json);
break;
default:
System.out.println("未知消息類型: " + type);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void handlePlayerMove(Session session, JSONObject moveData) {
// 處理玩家移動邏輯
// 廣播給所有玩家
broadcastGameUpdate(moveData);
}
private void handleChatMessage(Session session, JSONObject chatData) {
// 廣播聊天消息
JSONObject broadcastMsg = new JSONObject();
broadcastMsg.put("type", "chat");
broadcastMsg.put("sender", session.getId());
broadcastMsg.put("message", chatData.getString("message"));
broadcastMsg.put("timestamp", System.currentTimeMillis());
broadcast(broadcastMsg.toString());
}
private void broadcast(String message) {
synchronized (sessions) {
for (Session s : sessions) {
if (s.isOpen()) {
try {
s.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
配置文件
application.yml 配置
spring:
websocket:
# WebSocket 配置
enabled: true
server:
# 服務器配置
port: 8080
servlet:
context-path: /api
# 自定義配置
websocket:
max-sessions: 1000
heartbeat-interval: 30000
max-message-size: 128KB
心跳檢測和連接管理
@Component public class WebSocketHeartbeat { @Autowired private SimpMessagingTemplate messagingTemplate; private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @PostConstruct public void init() { // 每30秒發送一次心跳 scheduler.scheduleAtFixedRate(() -> { try { messagingTemplate.convertAndSend("/topic/heartbeat", Map.of("timestamp", System.currentTimeMillis(), "type", "heartbeat")); } catch (Exception e) { e.printStackTrace(); } }, 0, 30, TimeUnit.SECONDS); } @PreDestroy public void destroy() { scheduler.shutdown(); } }
消息編碼器/解碼器
// 自定義消息編解碼器 @Component public class ChatMessageConverter implements MessageConverter { @Override public Message<?> toMessage(Object payload, MessageHeaders headers) { if (payload instanceof ChatMessage) { ChatMessage msg = (ChatMessage) payload; byte[] bytes = serializeMessage(msg); return MessageBuilder.createMessage(bytes, headers); } return null; } @Override public Object fromMessage(Message<?> message, Class<?> targetClass) { if (targetClass == ChatMessage.class) { byte[] bytes = (byte[]) message.getPayload(); return deserializeMessage(bytes); } return null; } private byte[] serializeMessage(ChatMessage message) { try { return new ObjectMapper().writeValueAsBytes(message); } catch (Exception e) { throw new RuntimeException("序列化失敗", e); } } private ChatMessage deserializeMessage(byte[] bytes) { try { return new ObjectMapper().readValue(bytes, ChatMessage.class); } catch (Exception e) { throw new RuntimeException("反序列化失敗", e); } } }
集羣支持
@Configuration @EnableRedisRepositories public class RedisConfig { @Bean public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class)); return template; } } // Redis 廣播消息 @Component public class RedisMessagePublisher { @Autowired private RedisTemplate<String, Object> redisTemplate; public void publish(String channel, Object message) { redisTemplate.convertAndSend(channel, message); } } @Component public class RedisMessageSubscriber implements MessageListener { @Autowired private SimpMessagingTemplate messagingTemplate; @Override public void onMessage(Message message, byte[] pattern) { // 處理從 Redis 收到的消息 // 轉發給 WebSocket 客户端 String channel = new String(pattern); String msg = new String(message.getBody()); messagingTemplate.convertAndSend("/topic/" + channel, msg); } }
Spring Boot 的 STOMP 實現更加完整和易於使用,而原生 WebSocket 則更加靈活。