
欢迎来到Bokey的空间🌼
加载中...
🤖关于我的WebSocket封装
我的WebSocket封装📦方便后来我的项目快速开始WebSocket

WebSocket封装
在实际项目中,WebSocket 如果只是简单使用,很快就会遇到这些问题:
连接如何管理?
如何区分不同业务?
如何做权限认证?
如何避免连接假死?
前端如何优雅处理重连?
这篇文章分享一套我在项目中使用的 WebSocket 完整封装方案(前后端)。
整体架构设计
在开始代码之前,我们先明确核心架构:
客户端
↓
wsRouter(消息分发)
↓
handlers(业务处理)
↓
wsService(消息发送)
↓
wsManager(连接管理)
和我们常用的http封装类似,可以类比为:
| HTTP | WebSocket |
|---|---|
| 路由 router | wsRouter |
| controller | handler |
| service | wsService |
| session/user | wsManager |
后段封装
后端封装我一般主要在src/ws中进行,下面的文件都是在src/ws中的文件(除了个别写了文文件夹路径的)
入口文件
入口文件主要做三件事:
- 挂载 /ws 路由
- 建立连接生命周期(连接 / 消息 / 断开)
- 启动心跳机制
index.ts:
TypeScript
import expressWs from "express-ws";
import type { Express } from "express";
import { wsManager } from "./wsManager";
import { handleMessage } from "./wsRouter";
import { startHeartbeat } from "./heartbeat";
/**
* 初始化 WebSocket 模块
* - 将 WebSocket 挂载到 Express
* - 建立连接 / 消息 / 断开的完整链路
* - 启动心跳检测
*/
export function initWebSocket(app: Express, server: any) {
const { app: wsApp } = expressWs(app, server);
// 定义 WebSocket 连接地址
wsApp.ws("/ws", (ws: any, _req: any) => {
// 新连接加入管理器
const client = wsManager.add(ws);
// auth 超时,防止客户端连上但一直不 auth
const timer = setTimeout(() => {
if (!client.userId) { // 强制 auth 机制,防止客户端连接后不认证,占用资源
ws.close();
}
}, 10000);
// 监听客户端消息
ws.on("message", (msg: any) => {
handleMessage(client, msg.toString()); // 路由处理客户端消息
});
// 连接关闭时清理
ws.on("close", () => {
clearTimeout(timer)
wsManager.remove(ws);
});
});
startHeartbeat(); // 启动心跳机制
}
消息入口
所有客户端消息都会先统一进入这里,再分发到handler,主要做三件事:
- 解析 JSON
- 校验是否已认证
- 根据 type 分发 handler
wsRouter.ts:
TypeScript
// 本文件是:客户端 → 服务端 的入口
import type { WSClient, WSMessage, WsHandler } from "@/interface";
import * as baseHandler from './handlers/base.handlers'
/**
* WebSocket 消息路由表
*/
const handlers: Record<string, WsHandler> = {
...baseHandler
};
/**
* WebSocket 消息入口
* 所有客户端消息都从这里进入
*/
export function handleMessage(client: WSClient, raw: string) {
try {
const msg: WSMessage = JSON.parse(raw);
// 所有连接必须先使用 auth 来建立
if (msg.type !== 'auth' && !client.userId) {
return
}
handlers[msg.type]?.(client, msg.data);
} catch (e) {
console.error("[WS] message error", e);
}
}
业务(响应)处理
在这里处理所有客户端的请求响应,写在src/ws/handlers文件夹中,编写handler,来处理不同类型的webSocket请求。这里先封装基本的处理base.handlers.ts,其他的可供日后自行拓展。
src/ws/handlers/base.handlers.ts:
TypeScript
import { wsManager } from "@/ws/wsManager";
import type { WsHandler } from "@/interface";
import { jwt,ws as wsUtils } from "@/utils";
/**
* 客户端首次连接后发送 auth
* 用于绑定 userId
*/
export const auth: WsHandler<{ token: string }> = (client, data) => {
try {
const payload = jwt.verifyToken(data.token);
client.userId = payload.id;
client.isAlive = true;
// ✅ 明确告诉前端:认证成功
client.ws.send(
JSON.stringify({
type: "auth_ok",
}),
);
} catch (err) {
// ❌ 明确告诉前端:认证失败
client.ws.send(
JSON.stringify({
type: "auth_error",
data: {
code: "INVALID_TOKEN",
},
}),
);
// token 无效,给前端一点时间接收消息再断开
setTimeout(() => {
client.ws.close();
}, 50);
}
};
/**
* 订阅房间
*/
export const join: WsHandler<{ room: string }> = (client, data) => {
if(!wsUtils.ensureAuthed(client)) return
wsManager.joinRoom(client, data.room);
};
/**
* 取消订阅房间
*/
export const leave: WsHandler<{ room: string }> = (client, data) => {
if(!wsUtils.ensureAuthed(client)) return
wsManager.leaveRoom(client, data.room);
};
/**
* 客户端响应心跳
* 证明连接仍然存活
*/
export const pong: WsHandler = (client) => {
client.isAlive = true;
};
消息出口
这里服务端主动推送消息的统一出口,封装三种基本推送方式:
- sendToUser(userId)
- sendToRoom(room)
- broadcast()
👉 好处:
避免业务层直接操作 ws
统一 JSON 格式
可扩展(日志 / 限流)
wsService.ts:
TypeScript
import { wsManager } from "./wsManager";
/**
* 向客户端发送消息的基础方法
* 统一序列化格式,避免业务层重复写 JSON.stringify
*/
function send(ws: any, payload: any) {
ws.send(JSON.stringify(payload));
}
/**
* WebSocket 对外服务接口
*/
export const wsService = {
/**
* 向指定用户推送消息
*/
sendToUser(userId: string, payload: any) {
wsManager.getByUser(userId).forEach((c) => send(c.ws, payload));
},
/**
* 向某个房间内的所有客户端推送
*/
sendToRoom(room: string, payload: any) {
console.log("sendToRoom", wsManager.getRoom(room), payload);
wsManager.getRoom(room).forEach((c) => send(c.ws, payload));
},
/**
* 广播给所有在线客户端
*/
broadcast(payload: any) {
wsManager.all().forEach((c) => send(c.ws, payload));
},
};
连接管理
这是整个封装的核心之一,只做一件事:管理所有 WebSocket 连接
提供能力:
- 获取所有连接
- 按 userId 查询
- 房间管理(join / leave)
- 多端登录支持
👉 注意:
wsManager 不处理业务,只负责“连接状态”
wsManager.ts:
TypeScript
// 本文件是:服务端 → 客户端 的出口
import type { WebSocket } from "express-ws";
import { WSClient } from "@/interface";
/**
* WebSocket 连接管理器
* - 维护所有在线 WebSocket 客户端
* - 提供按用户 / 按房间的查询能力
* - 不关心具体业务,只管理“连接状态”
*/
class WSManager {
// 所有在线客户端的Set
private clients = new Set<WSClient>();
/**
* 添加新的管理客户端
* @param ws ws客户端
*/
add(ws: WebSocket) {
const client: WSClient = {
ws,
rooms: new Set(),
isAlive: true,
userId: undefined,
};
this.clients.add(client);
return client;
}
/**
* 移除某管理客户端
* @param ws ws客户端
*/
remove(ws: WebSocket) {
for (const client of this.clients) {
if (client.ws === ws) {
this.clients.delete(client);
break;
}
}
}
/**
* 得到所有管理客户端,注意:返回的是数组副本,避免外部直接修改 Set
*/
all() {
return [...this.clients];
}
/**
* 按用户 ID 获取客户端
* @param userId 用户id
* @returns 支持一个用户多端在线(多连接)
*/
getByUser(userId: string) {
return this.all().filter((c) => c.userId === userId);
}
/**
* 将客户端加入某个房间
* 房间是纯逻辑概念(字符串)
*/
joinRoom(client: WSClient, room: string) {
client.rooms.add(room);
}
/**
* 客户端离开房间
* @param client 客户端
* @param room 房间
*/
leaveRoom(client: WSClient, room: string) {
client.rooms.delete(room);
}
/**
* 获取订阅了某个房间的所有客户端
* @param room 房间
*/
getRoom(room: string) {
return this.all().filter((c) => c.rooms.has(room));
}
}
export const wsManager = new WSManager();
心跳机制
这里处理对客户端连接的心跳处理,主要:
- 检测客户端是否在线
- 清理“假死连接”
heartbeat.ts:
TypeScript
import { wsManager } from './wsManager'
/**
* 启动 WebSocket 心跳检测
* - 定期检测客户端是否仍然存活
* - 清理“已断线但未触发 close 事件”的连接
*/
export function startHeartbeat(interval = 30000) {
setInterval(() => {
// 遍历当前所有已连接的客户端,发送ping消息
wsManager.all().forEach(client => {
if (!client.isAlive) { // 若上个周期的ping消息没有收到回应,关闭连接
(client.ws as any).terminate() // 强制关闭连接
return
}
client.isAlive = false // 假设客户端已“失活”,等待下一次 pong 来证明存活
client.ws.send(JSON.stringify({ type: 'ping' })) // 向客户端发送 ping 消息
})
}, interval)
}
前端封装
前端封装我也主要在src/ws中进行,下面的文件夹都是在src/ws中的文件。我没有直接用原生 WebSocket,而是封装成一个:SocketClient 类(类似 SDK)
index.js:
JavaScript
import { SocketClient } from "./socket";
export { WS_STATUS, AUTH_STATUS } from "./socket";
import Cookies from "js-cookie"; // 获取Cookie
// ws的连接url,默认和baseUrl同个后端的/ws路由
const baseUrl = import.meta.env.VITE_BASE_URL;
const wsUrl =
(baseUrl.startsWith("https")
? baseUrl.replace(/^https/, "wss")
: baseUrl.replace(/^http/, "ws")) + "/ws";
/**
* 全局唯一 WebSocket 实例
*/
export const ws = new SocketClient(wsUrl, {
tokenGetter: () => Cookies.get("token"),
});
socket.js:
JavaScript
/**
* 经典 WebSocket Client 封装
* - 生命周期管理
* - 自动重连(指数退避)
* - auth / ping-pong
* - 内部状态管理(不依赖任何框架)
* - 事件订阅
*/
export const WS_STATUS = {
OFFLINE: "offline",
CONNECTING: "connecting",
ONLINE: "online",
};
export const AUTH_STATUS = {
PENDING: "pending",
AUTHED: "authed",
FAILED: "failed",
};
export class SocketClient {
constructor(url, options = {}) {
this.url = url;
this.tokenGetter = options.tokenGetter;
this.ws = null;
/** 内部状态(完全内聚) */
this.state = {
status: WS_STATUS.OFFLINE,
reconnectCount: 0,
auth: AUTH_STATUS.PENDING,
};
/** 消息事件 */
this.handlers = new Map();
/** 状态监听(给 UI / 调试 / 日志用) */
this.stateListeners = new Set();
this.reconnectTimer = null;
this.authTimer = null;
this.shouldReconnect = true; // 是否允许重新连接
// 内部处理后端给出的auth事件,判断当前权限鉴别状态
this.on("auth_ok", () => {
clearTimeout(this.authTimer);
this._setState({ auth: AUTH_STATUS.AUTHED });
});
this.on("auth_error", (err) => {
this._onAuthFailed(err);
});
}
/* --- 状态系统 --- */
getState() {
return { ...this.state };
}
onStateChange(fn) {
this.stateListeners.add(fn);
// 立即同步一次当前状态
fn(this.getState());
return () => this.stateListeners.delete(fn);
}
_setState(patch) {
Object.assign(this.state, patch);
this.stateListeners.forEach((fn) => fn(this.getState()));
}
/* --- 连接管理 --- */
connect() {
if (this.ws?.readyState === WebSocket.OPEN) return;
this._setState({
status: WS_STATUS.CONNECTING,
auth: AUTH_STATUS.PENDING,
reconnectCount: 0,
});
this.shouldReconnect = true; // 重置连接
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this._setState({
status: WS_STATUS.ONLINE,
});
this._auth();
};
this.ws.onclose = () => {
this._setState({ status: WS_STATUS.OFFLINE });
if (this.shouldReconnect && this.state.auth !== AUTH_STATUS.FAILED) {
this._reconnect();
}
};
this.ws.onerror = () => {
this.ws.close();
};
// 统一事件 - 事件全部走emit
this.ws.onmessage = (ev) => {
try {
const msg = JSON.parse(ev.data);
this._emit(msg.type, msg.data);
} catch (e) {
console.error("[WS] invalid message", ev.data);
}
};
}
close() {
this.shouldReconnect = false;
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
clearTimeout(this.authTimer);
this.ws?.close();
this._setState({ status: WS_STATUS.OFFLINE });
}
/* --- 重连 --- */
// 系统重连
_reconnect() {
if (this.reconnectTimer) return;
const delay = Math.min(3000 * (this.state.reconnectCount + 1), 15000);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this._setState({
status: WS_STATUS.CONNECTING,
reconnectCount: this.state.reconnectCount + 1,
});
this.connect();
}, delay);
}
// 用户重连:立刻执行
reconnect() {
this.shouldReconnect = true;
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
this._setState({
status: WS_STATUS.CONNECTING,
reconnectCount: 0, // 主动行为,重置次数
auth: AUTH_STATUS.PENDING,
});
this.ws?.close(); // 确保干净
this.connect();
}
/* --- 认证 --- */
_auth() {
const token = this.tokenGetter?.();
if (!token) return;
this.send("auth", { token });
// auth 超时兜底(防服务端未响应)
this.authTimer = setTimeout(() => {
if (this.state.auth === AUTH_STATUS.PENDING) {
console.warn("[WS] auth timeout");
this._onAuthFailed({
code: "AUTH_TIMEOUT",
});
}
}, 5000);
}
_onAuthFailed(err) {
clearTimeout(this.authTimer);
this._setState({ auth: AUTH_STATUS.FAILED });
this.shouldReconnect = false;
this._emit("auth_failed", err); // 对外通知
this.ws?.close();
}
/* --- 消息系统 --- */
send(type, data) {
if (type !== "auth" && this.state.auth !== AUTH_STATUS.AUTHED) {
console.warn("[WS] not authed, drop message:", type);
return;
}
if (this.ws?.readyState !== WebSocket.OPEN) {
console.warn("[WS] not connected, drop message:", type);
return;
}
this.ws.send(JSON.stringify({ type, data }));
}
on(type, handler) {
if (!this.handlers.has(type)) {
this.handlers.set(type, new Set());
}
this.handlers.get(type).add(handler);
return () => this.off(type, handler);
}
off(type, handler) {
this.handlers.get(type)?.delete(handler);
}
_emit(type, data) {
if (type === "ping") {
this.send("pong");
return;
}
this.handlers.get(type)?.forEach((fn) => {
fn(data)
});
}
/* --- 房间 --- */
join(room) {
this.send("join", { room });
}
leave(room) {
this.send("leave", { room });
}
}
发布于
2026-04-01
更新于
2026-04-11
类目
作者
Bokey
版权协议