背景
在我做過的一個對接海康威視監控實現直播流的項目中,需要處理 WebSocket 的長連接,以便於和服務器保持實時通信。WebSocket 的一個挑戰是連接的穩定性,特別是在網絡波動或斷開時,如何確保能自動重連並保持通信流暢。為了簡化這一過程,我決定封裝一個 WebSocket 類,使得開發者無需每次都重複編寫連接、心跳、重連等邏輯。
這個 CustomSocket 類支持以下功能:
- 心跳機制:定時發送心跳包,確保連接不會因長時間無數據傳輸而被關閉。
- 斷線重連:連接斷開後,自動嘗試重連,最大重連次數可配置。
- 手動連接:支持手動發起連接,適用於取消自動重連並重新啓動連接。
- 消息回調:支持自定義處理接收到的消息,並且能夠過濾心跳消息。
- 連接狀態感知:可以實時監聽 WebSocket 的連接狀態,便於進行調試和優化。
- 調試日誌:在 debug 模式下,可以輸出詳細的連接狀態和操作日誌,幫助調試和排查問題。
代碼實現
僅此我項目夠用了,便不封裝太多避免太複雜繁瑣,諸君有需要的可拿去更改。
以下是 CustomSocket 類的完整實現:
/**
* WebSocket 封裝類
* 功能:自動重連、心跳機制、狀態管理、消息回調
*/
const SOCKET_STATUS = {
CONNECTING: "connecting", // 正在連接
CONNECTED: "connected", // 已連接
DISCONNECTED: "disconnected", // 已斷開
RECONNECTING: "reconnecting", // 正在重連
CLOSED: "closed", // 已手動關閉
};
export class CustomSocket {
/**
* 構造函數
* @param {string} url - WebSocket 服務器地址
* @param {Object} options - 配置項
// ===== 配置項 =====
* @param {number} [options.heartbeatInterval=10000] - 心跳發送間隔(毫秒)
* @param {number} [options.reconnectDelay=5000] - 重連延遲時間(毫秒)
* @param {string} [options.pingMessage='ping'] - 發送心跳消息內容
* @param {string} [options.pongMessage='pong'] - 響應心跳消息內容
* @param {Function} [options.onMessage] - 接收到非心跳消息時的回調
* @param {Function} [options.onOpen] - 連接成功時的回調
* @param {Function} [options.onError] - 連接錯誤時的回調
* @param {Function} [options.onClose] - 連接關閉時的回調
* @param {Function} [options.onReconnect] - 嘗試重連時的回調
* @param {Function} [options.onStatusChange] - 任意狀態變更時的回調(如 connected, disconnected, reconnecting, closed, connecting)
* @param {Function} [options.onConnecting] - 啓動連接前的回調(包括首次連接 & 重連)
* @param {boolean} [options.debug=false] - 是否打印調試日誌
* @param {number} [options.maxReconnectAttempts=Infinity] - 最大重連次數,默認無限
*/
constructor(url, options = {}) {
this.url = url; // WebSocket 地址
this.socket = null; // WebSocket 實例
// 默認配置項
this.heartbeatInterval = options.heartbeatInterval || 10000; // 心跳間隔時間,默認 10 秒
this.reconnectDelay = options.reconnectDelay || 5000; // 重連等待時間,默認 5 秒
this.pingMessage = options.pingMessage || "ping"; // 發送心跳消息內容
this.pongMessage = options.pongMessage || "pong"; // 響應心跳消息內容
this.debug = options.debug || false; // 是否啓用調試日誌
this.maxReconnectAttempts = options.maxReconnectAttempts || Infinity; // 最大重連次數
// ===== 回調函數(支持注入)=====
const noop = () => {};
this.onMessage = options.onMessage || noop; // 消息接收回調
this.onOpen = options.onOpen || noop; // 連接開啓回調
this.onError = options.onError || noop; // 連接錯誤回調
this.onClose = options.onClose || noop; // 連接關閉回調
this.onReconnect = options.onReconnect || noop; // 重連時回調
this.onStatusChange = options.onStatusChange || noop; // 狀態變更統一回調
this.onConnecting = options.onConnecting || noop; // 連接開始回調
// 狀態字段
this.isConnected = false; // 當前是否連接成功
this.isReconnecting = false; // 當前是否正在重連
this.manualClose = false; // 當前是手動關閉
this.reconnectAttempts = 0; // 當前重連嘗試次數
this.lastPongTime = null; // 上一次收到 pong 的時間戳
// 定時器句柄
this.heartbeatTimer = null; // 心跳定時器
this.reconnectTimer = null; // 重連定時器
// 初始化連接
this.init();
}
/**
* 獲取當前 WebSocket 連接狀態(供外部狀態感知使用)
* @returns {string} - 當前狀態
*/
getStatus() {
if (this.isConnected) return SOCKET_STATUS.CONNECTED;
if (this.isReconnecting) return SOCKET_STATUS.RECONNECTING;
if (this.socket && this.socket.readyState === WebSocket.CONNECTING)
return SOCKET_STATUS.CONNECTING;
return SOCKET_STATUS.DISCONNECTED;
}
/**
* 初始化 WebSocket 連接並設置事件監聽
*/
init() {
this.manualClose = false; // 每次新建連接都重置
this.onStatusChange(SOCKET_STATUS.CONNECTING); // 新增:狀態回調
this.onConnecting(); // 頁面可用來顯示 loading
this.socket = new WebSocket(this.url);
// 連接成功
this.socket.onopen = () => {
this.isConnected = true;
this.isReconnecting = false;
this.reconnectAttempts = 0;
this.debug && console.log("WebSocket opened.");
this.onStatusChange("connected");
this.onOpen();
this.startHeartbeat();
};
// 接收消息
this.socket.onmessage = (msg) => {
// 處理 pong 響應(心跳回復)
if (msg.data === this.pongMessage) {
this.lastPongTime = Date.now(); // 記錄收到心跳回應時間
return; // 不傳給業務邏輯
}
this.onMessage(msg); // 非心跳業務消息,轉發給用户
};
// 連接出錯
this.socket.onerror = (err) => {
this.debug && console.error("WebSocket error:", err);
this.onError(err);
};
// 連接關閉
this.socket.onclose = () => {
this.debug && console.warn("WebSocket closed.");
this.isConnected = false;
this.stopHeartbeat();
this.onClose();
this.onStatusChange("disconnected");
if (!this.manualClose) {
this.debug &&
console.warn(
"WebSocket closed unexpectedly, attempting to reconnect..."
);
this.reconnect();
} else {
this.debug && console.log("WebSocket closed manually, no reconnect.");
}
};
}
/**
* 手動連接(用於取消自動重連並立即發起連接)
*/
manualConnect() {
this.clearReconnectTimer(); // 清除已有的重連定時器
this.isReconnecting = false; // 重置重連標記
this.reconnectAttempts = 0; // 重置嘗試次數
this.manualClose = false; // 標記為非手動關閉
this.init(); // 發起連接
}
/**
* 啓動心跳檢測,定時發送心跳消息
*/
startHeartbeat() {
this.stopHeartbeat(); // 防止重複定時器
this.heartbeatTimer = setInterval(() => {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(this.pingMessage);
this.debug && console.log("Heartbeat sent:", this.pingMessage);
}
}, this.heartbeatInterval);
}
/**
* 停止心跳檢測
*/
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* 重連邏輯(含延時與重試限制)
*/
reconnect() {
// 已在重連中則不重複觸發
if (this.reconnectTimer || this.isReconnecting) return;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.debug &&
console.warn("Max reconnect attempts reached, stopping reconnect.");
this.clearReconnectTimer();
return;
}
this.isReconnecting = true;
this.reconnectAttempts++;
this.onStatusChange(SOCKET_STATUS.RECONNECTING);
this.onReconnect(); // 調用重連回調
// 延遲執行重連
this.reconnectTimer = setTimeout(() => {
this.clearReconnectTimer();
this.socket = null; // 銷燬舊連接引用
this.debug &&
console.log(`Reconnecting... (Attempt ${this.reconnectAttempts})`);
this.init();
}, this.reconnectDelay);
}
/**
* 清除重連定時器
*/
clearReconnectTimer() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
/**
* 發送消息,如果是對象會自動序列化為 JSON 字符串
* @param {string|Object} data - 要發送的數據
*/
send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
if (typeof data === "object") {
data = JSON.stringify(data); // 如果是對象,自動序列化為 JSON 字符串
}
this.socket.send(data);
} else {
this.debug && console.warn("WebSocket 未連接,發送失敗");
}
}
/**
* 主動關閉連接(將不再自動重連)
*/
close() {
this.manualClose = true; // 設置手動關閉標記
this.stopHeartbeat();
this.clearReconnectTimer();
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.isConnected = false;
this.isReconnecting = false;
this.onStatusChange("closed");
}
}
使用示例(vue2)
import { CustomSocket } from './CustomSocket'; // 假設你的類文件名為 CustomSocket.js
initSocket() {
this.loading = true;
const host = `${this...}/host/${this.userInfo.user_id}`;
this.socketInstance = new CustomSocket(host, {
heartbeatInterval: 9000, // 每 9 秒發送一次心跳
maxReconnectAttempts: 3, // 最多重連 3 次
// 收到警報消息時的處理函數
onMessage: this.handleMsg,
// 開始連接前,展示 loading 狀態
onConnecting: () => {
this.loading = true;
this.getState = "";
},
// 成功建立連接
onOpen: () => {
console.log("Alert WebSocket connected");
Notification.closeAll(); // 清除舊的警報提示
this.loading = false;
this.getState = "success";
},
// 斷線重連提示
onReconnect: () => {
Notification.error({
title: "提示",
message: "待處理警報連接斷開,正在嘗試重連",
duration: 10000,
});
},
// 連接異常
onError: (err) => {
console.error("Alert WebSocket error", err);
this.loading = false;
this.getState = "error";
},
// 連接關閉
onClose: () => {
console.warn("Alert WebSocket closed");
this.loading = false;
this.getState = "error";
},
});
}
記得關閉頁面時銷燬
beforeDestroy() {
if (this.socketInstance) {
this.socketInstance.close();
}
}