博客 / 詳情

返回

WebSocket 的使用

WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,允許服務器和客户端之間進行實時雙向通信。

基本使用

1. 創建 WebSocket 連接

// 創建 WebSocket 連接
const socket = new WebSocket('ws://localhost:8080');

// 或者使用安全連接
const secureSocket = new WebSocket('wss://example.com/socket');

2. WebSocket 事件

// 連接建立時觸發
socket.onopen = function(event) {
    console.log('連接已建立');
    socket.send('Hello Server!');
};

// 接收到消息時觸發
socket.onmessage = function(event) {
    console.log('收到消息:', event.data);
    // 處理接收到的數據
};

// 發生錯誤時觸發
socket.onerror = function(error) {
    console.error('WebSocket 錯誤:', error);
};

// 連接關閉時觸發
socket.onclose = function(event) {
    console.log('連接關閉', event.code, event.reason);
    // 可以在這裏嘗試重連
};

完整示例

客户端示例

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket 示例</title>
</head>
<body>
    <div>
        <input type="text" id="messageInput" placeholder="輸入消息">
        <button onclick="sendMessage()">發送</button>
    </div>
    <div id="messages"></div>

    <script>
        // 創建 WebSocket 連接
        const socket = new WebSocket('ws://localhost:8080');
        const messagesDiv = document.getElementById('messages');
        
        // 連接建立
        socket.onopen = function() {
            addMessage('系統', '連接成功!');
        };
        
        // 接收消息
        socket.onmessage = function(event) {
            try {
                const data = JSON.parse(event.data);
                addMessage(data.sender, data.message);
            } catch (e) {
                addMessage('系統', event.data);
            }
        };
        
        // 錯誤處理
        socket.onerror = function(error) {
            addMessage('系統', '連接錯誤');
        };
        
        // 連接關閉
        socket.onclose = function() {
            addMessage('系統', '連接已關閉');
        };
        
        // 發送消息
        function sendMessage() {
            const input = document.getElementById('messageInput');
            const message = input.value.trim();
            
            if (message) {
                socket.send(JSON.stringify({
                    type: 'message',
                    content: message,
                    timestamp: new Date().toISOString()
                }));
                input.value = '';
            }
        }
        
        // 顯示消息
        function addMessage(sender, text) {
            const msgElement = document.createElement('div');
            msgElement.innerHTML = `<strong>${sender}:</strong> ${text}`;
            messagesDiv.appendChild(msgElement);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }
        
        // 關閉連接(頁面卸載時)
        window.addEventListener('beforeunload', function() {
            if (socket.readyState === WebSocket.OPEN) {
                socket.close(1000, '用户離開頁面');
            }
        });
    </script>
</body>
</html>

Node.js 服務器端示例

// 使用 ws 庫
const WebSocket = require('ws');

// 創建 WebSocket 服務器
const wss = new WebSocket.Server({ port: 8080 });

console.log('WebSocket 服務器啓動在 ws://localhost:8080');

// 連接處理
wss.on('connection', function connection(ws) {
    console.log('新客户端連接');
    
    // 發送歡迎消息
    ws.send(JSON.stringify({
        type: 'system',
        message: '歡迎連接到服務器!'
    }));
    
    // 接收客户端消息
    ws.on('message', function incoming(message) {
        console.log('收到消息:', message);
        
        try {
            const data = JSON.parse(message);
            
            // 廣播消息給所有客户端
            wss.clients.forEach(function each(client) {
                if (client !== ws && client.readyState === WebSocket.OPEN) {
                    client.send(JSON.stringify({
                        type: 'message',
                        sender: '用户',
                        message: data.content,
                        timestamp: new Date().toISOString()
                    }));
                }
            });
        } catch (error) {
            console.error('消息解析錯誤:', error);
        }
    });
    
    // 連接關閉
    ws.on('close', function() {
        console.log('客户端斷開連接');
    });
    
    // 錯誤處理
    ws.on('error', function(error) {
        console.error('WebSocket 錯誤:', error);
    });
});

WebSocket 狀態

// 檢查連接狀態
switch(socket.readyState) {
    case WebSocket.CONNECTING:  // 0 - 連接中
        console.log('連接中...');
        break;
    case WebSocket.OPEN:        // 1 - 已連接
        console.log('已連接');
        break;
    case WebSocket.CLOSING:     // 2 - 關閉中
        console.log('正在關閉...');
        break;
    case WebSocket.CLOSED:      // 3 - 已關閉
        console.log('已關閉');
        break;
}

高級特性

1. 心跳檢測

// 心跳檢測
let heartbeatInterval;

socket.onopen = function() {
    console.log('連接建立');
    
    // 開始心跳
    heartbeatInterval = setInterval(() => {
        if (socket.readyState === WebSocket.OPEN) {
            socket.send(JSON.stringify({ type: 'ping' }));
        }
    }, 30000);
};

socket.onclose = function() {
    // 清除心跳
    clearInterval(heartbeatInterval);
};

2. 重連機制

class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.socket = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;
    }
    
    connect() {
        this.socket = new WebSocket(this.url);
        
        this.socket.onopen = () => {
            console.log('連接成功');
            this.reconnectAttempts = 0;
        };
        
        this.socket.onclose = (event) => {
            console.log('連接斷開,嘗試重連...');
            this.reconnect();
        };
        
        this.socket.onerror = (error) => {
            console.error('連接錯誤:', error);
        };
    }
    
    reconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
            
            setTimeout(() => {
                console.log(`第 ${this.reconnectAttempts} 次重連`);
                this.connect();
            }, delay);
        } else {
            console.error('重連次數已達上限');
        }
    }
    
    send(data) {
        if (this.socket.readyState === WebSocket.OPEN) {
            this.socket.send(data);
        }
    }
}

3. 二進制數據傳輸

// 發送二進制數據
socket.onopen = function() {
    // 發送 ArrayBuffer
    const buffer = new ArrayBuffer(4);
    const view = new Uint8Array(buffer);
    view[0] = 1;
    view[1] = 2;
    view[2] = 3;
    view[3] = 4;
    
    socket.send(buffer);
    
    // 發送 Blob
    const blob = new Blob(['Hello'], { type: 'text/plain' });
    socket.send(blob);
};

// 接收二進制數據
socket.binaryType = 'arraybuffer'; // 或 'blob'

socket.onmessage = function(event) {
    if (event.data instanceof ArrayBuffer) {
        // 處理 ArrayBuffer
        const view = new Uint8Array(event.data);
        console.log('收到二進制數據:', view);
    } else {
        // 處理文本數據
        console.log('收到文本數據:', event.data);
    }
};

Spring Boot 中使用 WebSocket

添加依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

基礎配置類

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 消息代理前綴
        config.enableSimpleBroker("/topic", "/queue");
        // 應用目的地前綴
        config.setApplicationDestinationPrefixes("/app");
        // 用户目的地前綴(一對一消息)
        config.setUserDestinationPrefix("/user");
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 註冊 WebSocket 端點
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
                .withSockJS();  // 支持 SockJS 降級
        
        // 也可以添加多個端點
        registry.addEndpoint("/ws-native")
                .setAllowedOriginPatterns("*");
    }
    
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        // 配置傳輸限制
        registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB
        registration.setSendTimeLimit(20 * 1000);    // 發送超時 20秒
        registration.setSendBufferSizeLimit(512 * 1024); // 發送緩衝區限制 512KB
    }
}

控制器示例

@Controller
public class WebSocketController {
    
    // 注入消息模板
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    /**
     * 處理客户端發送的消息
     * 目的地:/app/chat
     */
    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public ChatMessage handleMessage(ChatMessage message) {
        message.setTimestamp(new Date());
        System.out.println("收到消息: " + message.getContent());
        return message;
    }
    
    /**
     * 發送廣播消息
     */
    @GetMapping("/broadcast")
    public void broadcast(String content) {
        ChatMessage message = new ChatMessage();
        message.setContent(content);
        message.setSender("系統");
        message.setTimestamp(new Date());
        
        // 發送到 /topic/messages
        messagingTemplate.convertAndSend("/topic/messages", message);
    }
    
    /**
     * 發送點對點消息
     */
    @GetMapping("/sendToUser")
    public void sendToUser(String userId, String content) {
        ChatMessage message = new ChatMessage();
        message.setContent(content);
        message.setSender("管理員");
        message.setTimestamp(new Date());
        
        // 發送給指定用户:/user/{userId}/queue/messages
        messagingTemplate.convertAndSendToUser(
            userId, 
            "/queue/messages", 
            message
        );
    }
}

// 消息實體類
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {
    private String sender;
    private String content;
    private Date timestamp;
}

連接攔截器

@Component
public class WebSocketInterceptor extends ChannelInterceptorAdapter {
    
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = 
            MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            // 連接建立時處理
            String token = accessor.getFirstNativeHeader("token");
            // 驗證 token...
            System.out.println("用户連接: " + accessor.getSessionId());
        } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            // 連接斷開時處理
            System.out.println("用户斷開: " + accessor.getSessionId());
        }
        
        return message;
    }
}

原生 Java WebSocket(JSR 356)

註解方式

@ServerEndpoint("/chat/{userId}")
@Component
public class ChatEndpoint {
    
    // 存儲所有連接
    private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
    
    // 存儲用户ID和session的映射
    private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
    
    /**
     * 連接建立時調用
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        System.out.println("連接建立: " + session.getId() + ", 用户: " + userId);
        
        // 保存連接
        sessions.put(session.getId(), session);
        userSessionMap.put(userId, session.getId());
        
        // 通知其他用户有新用户上線
        broadcast("系統", "用户 " + userId + " 上線了");
    }
    
    /**
     * 收到消息時調用
     */
    @OnMessage
    public void onMessage(String message, Session session, 
                         @PathParam("userId") String userId) {
        System.out.println("收到消息: " + message + " from: " + userId);
        
        try {
            // 解析消息
            JSONObject json = new JSONObject(message);
            String content = json.getString("content");
            String toUserId = json.optString("to", null);
            
            if (toUserId != null) {
                // 私聊消息
                sendToUser(userId, toUserId, content);
            } else {
                // 羣發消息
                broadcast(userId, content);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 連接關閉時調用
     */
    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        System.out.println("連接關閉: " + session.getId());
        
        // 移除連接
        sessions.remove(session.getId());
        userSessionMap.remove(userId);
        
        // 通知其他用户
        broadcast("系統", "用户 " + userId + " 下線了");
    }
    
    /**
     * 發生錯誤時調用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("連接錯誤: " + session.getId());
        error.printStackTrace();
    }
    
    /**
     * 廣播消息給所有用户
     */
    private void broadcast(String sender, String content) {
        JSONObject message = new JSONObject();
        message.put("sender", sender);
        message.put("content", content);
        message.put("timestamp", System.currentTimeMillis());
        message.put("type", "broadcast");
        
        // 發送給所有連接的客户端
        for (Session session : sessions.values()) {
            if (session.isOpen()) {
                try {
                    session.getAsyncRemote().sendText(message.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 發送私聊消息
     */
    private void sendToUser(String fromUserId, String toUserId, String content) {
        String toSessionId = userSessionMap.get(toUserId);
        if (toSessionId != null) {
            Session toSession = sessions.get(toSessionId);
            if (toSession != null && toSession.isOpen()) {
                try {
                    JSONObject message = new JSONObject();
                    message.put("sender", fromUserId);
                    message.put("content", content);
                    message.put("timestamp", System.currentTimeMillis());
                    message.put("type", "private");
                    
                    toSession.getAsyncRemote().sendText(message.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

編程方式(繼承 Endpoint 類)

@ServerEndpoint("/game")
public class GameEndpoint extends Endpoint {
    
    private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
    
    @Override
    public void onOpen(Session session, EndpointConfig config) {
        System.out.println("新連接: " + session.getId());
        sessions.add(session);
        
        // 添加消息處理器
        session.addMessageHandler(new MessageHandler.Whole<String>() {
            @Override
            public void onMessage(String message) {
                System.out.println("收到: " + message);
                // 處理遊戲邏輯
                handleGameMessage(session, message);
            }
        });
        
        // 發送歡迎消息
        try {
            JSONObject welcome = new JSONObject();
            welcome.put("type", "welcome");
            welcome.put("message", "歡迎加入遊戲!");
            welcome.put("sessionId", session.getId());
            session.getBasicRemote().sendText(welcome.toString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void onClose(Session session, CloseReason closeReason) {
        System.out.println("連接關閉: " + session.getId());
        sessions.remove(session);
        
        // 通知其他玩家
        broadcastPlayerLeft(session.getId());
    }
    
    @Override
    public void onError(Session session, Throwable thr) {
        System.err.println("連接錯誤: " + session.getId());
        thr.printStackTrace();
    }
    
    private void handleGameMessage(Session session, String message) {
        try {
            JSONObject json = new JSONObject(message);
            String type = json.getString("type");
            
            switch (type) {
                case "move":
                    // 處理移動
                    handlePlayerMove(session, json);
                    break;
                case "chat":
                    // 處理聊天
                    handleChatMessage(session, json);
                    break;
                default:
                    System.out.println("未知消息類型: " + type);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handlePlayerMove(Session session, JSONObject moveData) {
        // 處理玩家移動邏輯
        // 廣播給所有玩家
        broadcastGameUpdate(moveData);
    }
    
    private void handleChatMessage(Session session, JSONObject chatData) {
        // 廣播聊天消息
        JSONObject broadcastMsg = new JSONObject();
        broadcastMsg.put("type", "chat");
        broadcastMsg.put("sender", session.getId());
        broadcastMsg.put("message", chatData.getString("message"));
        broadcastMsg.put("timestamp", System.currentTimeMillis());
        
        broadcast(broadcastMsg.toString());
    }
    
    private void broadcast(String message) {
        synchronized (sessions) {
            for (Session s : sessions) {
                if (s.isOpen()) {
                    try {
                        s.getBasicRemote().sendText(message);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

 配置文件

application.yml 配置

spring:
  websocket:
    # WebSocket 配置
    enabled: true
    
server:
  # 服務器配置
  port: 8080
  servlet:
    context-path: /api
  
# 自定義配置
websocket:
  max-sessions: 1000
  heartbeat-interval: 30000
  max-message-size: 128KB

心跳檢測和連接管理

@Component
public class WebSocketHeartbeat {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    @PostConstruct
    public void init() {
        // 每30秒發送一次心跳
        scheduler.scheduleAtFixedRate(() -> {
            try {
                messagingTemplate.convertAndSend("/topic/heartbeat", 
                    Map.of("timestamp", System.currentTimeMillis(), 
                           "type", "heartbeat"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    @PreDestroy
    public void destroy() {
        scheduler.shutdown();
    }
}

消息編碼器/解碼器

// 自定義消息編解碼器
@Component
public class ChatMessageConverter implements MessageConverter {
    
    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        if (payload instanceof ChatMessage) {
            ChatMessage msg = (ChatMessage) payload;
            byte[] bytes = serializeMessage(msg);
            return MessageBuilder.createMessage(bytes, headers);
        }
        return null;
    }
    
    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        if (targetClass == ChatMessage.class) {
            byte[] bytes = (byte[]) message.getPayload();
            return deserializeMessage(bytes);
        }
        return null;
    }
    
    private byte[] serializeMessage(ChatMessage message) {
        try {
            return new ObjectMapper().writeValueAsBytes(message);
        } catch (Exception e) {
            throw new RuntimeException("序列化失敗", e);
        }
    }
    
    private ChatMessage deserializeMessage(byte[] bytes) {
        try {
            return new ObjectMapper().readValue(bytes, ChatMessage.class);
        } catch (Exception e) {
            throw new RuntimeException("反序列化失敗", e);
        }
    }
}

集羣支持

@Configuration
@EnableRedisRepositories
public class RedisConfig {
    
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }
}

// Redis 廣播消息
@Component
public class RedisMessagePublisher {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void publish(String channel, Object message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

@Component
public class RedisMessageSubscriber implements MessageListener {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 處理從 Redis 收到的消息
        // 轉發給 WebSocket 客户端
        String channel = new String(pattern);
        String msg = new String(message.getBody());
        
        messagingTemplate.convertAndSend("/topic/" + channel, msg);
    }
}

Spring Boot 的 STOMP 實現更加完整和易於使用,而原生 WebSocket 則更加靈活。

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.