欢迎来到Bokey的空间🌼

加载中...

💓 0🔥 6

🤖关于我的WebSocket封装

我的WebSocket封装📦方便后来我的项目快速开始WebSocket

🕘 2026-04-01

WebSocket封装

在实际项目中,WebSocket 如果只是简单使用,很快就会遇到这些问题:

连接如何管理?
如何区分不同业务?
如何做权限认证?
如何避免连接假死?
前端如何优雅处理重连?

这篇文章分享一套我在项目中使用的 WebSocket 完整封装方案(前后端)。

整体架构设计

在开始代码之前,我们先明确核心架构:

复制代码
客户端
   ↓
wsRouter(消息分发)
   ↓
handlers(业务处理)
   ↓
wsService(消息发送)
   ↓
wsManager(连接管理)

和我们常用的http封装类似,可以类比为:

HTTP WebSocket
路由 router wsRouter
controller handler
service wsService
session/user wsManager

后段封装

后端封装我一般主要在src/ws中进行,下面的文件都是在src/ws中的文件(除了个别写了文文件夹路径的)

入口文件

入口文件主要做三件事:

  • 挂载 /ws 路由
  • 建立连接生命周期(连接 / 消息 / 断开)
  • 启动心跳机制

index.ts:

TypeScript 复制代码
import expressWs from "express-ws";
import type { Express } from "express";
import { wsManager } from "./wsManager";
import { handleMessage } from "./wsRouter";
import { startHeartbeat } from "./heartbeat";

/**
 * 初始化 WebSocket 模块
 * - 将 WebSocket 挂载到 Express
 * - 建立连接 / 消息 / 断开的完整链路
 * - 启动心跳检测
 */
export function initWebSocket(app: Express, server: any) {
  const { app: wsApp } = expressWs(app, server);
  // 定义 WebSocket 连接地址
  wsApp.ws("/ws", (ws: any, _req: any) => {
    // 新连接加入管理器
    const client = wsManager.add(ws);
    // auth 超时,防止客户端连上但一直不 auth
    const timer = setTimeout(() => {
      if (!client.userId) { // 强制 auth 机制,防止客户端连接后不认证,占用资源
        ws.close();
      }
    }, 10000);
    // 监听客户端消息
    ws.on("message", (msg: any) => {
      handleMessage(client, msg.toString()); // 路由处理客户端消息
    });
    // 连接关闭时清理
    ws.on("close", () => {
      clearTimeout(timer)
      wsManager.remove(ws);
    });
  });
  startHeartbeat(); // 启动心跳机制
}

消息入口

所有客户端消息都会先统一进入这里,再分发到handler,主要做三件事:

  • 解析 JSON
  • 校验是否已认证
  • 根据 type 分发 handler

wsRouter.ts:

TypeScript 复制代码
// 本文件是:客户端 → 服务端 的入口
import type { WSClient, WSMessage, WsHandler } from "@/interface";
import * as baseHandler from './handlers/base.handlers'

/**
 * WebSocket 消息路由表
 */
const handlers: Record<string, WsHandler> = {
    ...baseHandler
};

/**
 * WebSocket 消息入口
 * 所有客户端消息都从这里进入
 */
export function handleMessage(client: WSClient, raw: string) {
  try {
    const msg: WSMessage = JSON.parse(raw);
    // 所有连接必须先使用 auth 来建立
    if (msg.type !== 'auth' && !client.userId) {
      return
    }
    handlers[msg.type]?.(client, msg.data);
  } catch (e) {
    console.error("[WS] message error", e);
  }
}

业务(响应)处理

在这里处理所有客户端的请求响应,写在src/ws/handlers文件夹中,编写handler,来处理不同类型的webSocket请求。这里先封装基本的处理base.handlers.ts,其他的可供日后自行拓展。

src/ws/handlers/base.handlers.ts:

TypeScript 复制代码
import { wsManager } from "@/ws/wsManager";
import type { WsHandler } from "@/interface";
import { jwt,ws as wsUtils } from "@/utils";

/**
 * 客户端首次连接后发送 auth
 * 用于绑定 userId
 */
export const auth: WsHandler<{ token: string }> = (client, data) => {
  try {
    const payload = jwt.verifyToken(data.token);
    client.userId = payload.id;
    client.isAlive = true;
    // ✅ 明确告诉前端:认证成功
    client.ws.send(
      JSON.stringify({
        type: "auth_ok",
      }),
    );
  } catch (err) {
    // ❌ 明确告诉前端:认证失败
    client.ws.send(
      JSON.stringify({
        type: "auth_error",
        data: {
          code: "INVALID_TOKEN",
        },
      }),
    );
    // token 无效,给前端一点时间接收消息再断开
    setTimeout(() => {
      client.ws.close();
    }, 50);
  }
};

/**
 * 订阅房间
 */
export const join: WsHandler<{ room: string }> = (client, data) => {
  if(!wsUtils.ensureAuthed(client)) return
  wsManager.joinRoom(client, data.room);
};

/**
 * 取消订阅房间
 */
export const leave: WsHandler<{ room: string }> = (client, data) => {
  if(!wsUtils.ensureAuthed(client)) return
  wsManager.leaveRoom(client, data.room);
};

/**
 * 客户端响应心跳
 * 证明连接仍然存活
 */
export const pong: WsHandler = (client) => {
  client.isAlive = true;
};

消息出口

这里服务端主动推送消息的统一出口,封装三种基本推送方式:

  • sendToUser(userId)
  • sendToRoom(room)
  • broadcast()
    👉 好处:

避免业务层直接操作 ws
统一 JSON 格式
可扩展(日志 / 限流)

wsService.ts:

TypeScript 复制代码
import { wsManager } from "./wsManager";

/**
 * 向客户端发送消息的基础方法
 * 统一序列化格式,避免业务层重复写 JSON.stringify
 */
function send(ws: any, payload: any) {
  ws.send(JSON.stringify(payload));
}

/**
 * WebSocket 对外服务接口
 */
export const wsService = {
  /**
   * 向指定用户推送消息
   */
  sendToUser(userId: string, payload: any) {
    wsManager.getByUser(userId).forEach((c) => send(c.ws, payload));
  },
  /**
   * 向某个房间内的所有客户端推送
   */
  sendToRoom(room: string, payload: any) {
    console.log("sendToRoom", wsManager.getRoom(room), payload);
    wsManager.getRoom(room).forEach((c) => send(c.ws, payload));
  },
  /**
   * 广播给所有在线客户端
   */
  broadcast(payload: any) {
    wsManager.all().forEach((c) => send(c.ws, payload));
  },
};

连接管理

这是整个封装的核心之一,只做一件事:管理所有 WebSocket 连接


提供能力:

  • 获取所有连接
  • 按 userId 查询
  • 房间管理(join / leave)
  • 多端登录支持

👉 注意:

wsManager 不处理业务,只负责“连接状态”

wsManager.ts:

TypeScript 复制代码
// 本文件是:服务端 → 客户端 的出口
import type { WebSocket } from "express-ws";
import { WSClient } from "@/interface";

/**
 * WebSocket 连接管理器
 * - 维护所有在线 WebSocket 客户端
 * - 提供按用户 / 按房间的查询能力
 * - 不关心具体业务,只管理“连接状态”
 */
class WSManager {
  // 所有在线客户端的Set
  private clients = new Set<WSClient>();

  /**
   * 添加新的管理客户端
   * @param ws ws客户端
   */
  add(ws: WebSocket) {
    const client: WSClient = {
      ws,
      rooms: new Set(),
      isAlive: true,
      userId: undefined,
    };
    this.clients.add(client);
    return client;
  }

  /**
   * 移除某管理客户端
   * @param ws ws客户端
   */
  remove(ws: WebSocket) {
    for (const client of this.clients) {
      if (client.ws === ws) {
        this.clients.delete(client);
        break;
      }
    }
  }

  /**
   * 得到所有管理客户端,注意:返回的是数组副本,避免外部直接修改 Set
   */
  all() {
    return [...this.clients];
  }

  /**
   * 按用户 ID 获取客户端
   * @param userId 用户id
   * @returns 支持一个用户多端在线(多连接)
   */
  getByUser(userId: string) {
    return this.all().filter((c) => c.userId === userId);
  }

  /**
   * 将客户端加入某个房间
   * 房间是纯逻辑概念(字符串)
   */
  joinRoom(client: WSClient, room: string) {
    client.rooms.add(room);
  }

  /**
   * 客户端离开房间
   * @param client 客户端
   * @param room 房间
   */
  leaveRoom(client: WSClient, room: string) {
    client.rooms.delete(room);
  }

  /**
   * 获取订阅了某个房间的所有客户端
   * @param room 房间
   */
  getRoom(room: string) {
    return this.all().filter((c) => c.rooms.has(room));
  }
}

export const wsManager = new WSManager();

心跳机制

这里处理对客户端连接的心跳处理,主要:

  • 检测客户端是否在线
  • 清理“假死连接”

heartbeat.ts:

TypeScript 复制代码
import { wsManager } from './wsManager'

/**
 * 启动 WebSocket 心跳检测
 * - 定期检测客户端是否仍然存活
 * - 清理“已断线但未触发 close 事件”的连接
 */
export function startHeartbeat(interval = 30000) {
  setInterval(() => {
    // 遍历当前所有已连接的客户端,发送ping消息
    wsManager.all().forEach(client => {
      if (!client.isAlive) { // 若上个周期的ping消息没有收到回应,关闭连接
        (client.ws as any).terminate() // 强制关闭连接
        return
      }
      client.isAlive = false // 假设客户端已“失活”,等待下一次 pong 来证明存活
      client.ws.send(JSON.stringify({ type: 'ping' })) // 向客户端发送 ping 消息
    })
  }, interval)
}

前端封装

前端封装我也主要在src/ws中进行,下面的文件夹都是在src/ws中的文件。我没有直接用原生 WebSocket,而是封装成一个:SocketClient 类(类似 SDK)

index.js:

JavaScript 复制代码
import { SocketClient } from "./socket";
export { WS_STATUS, AUTH_STATUS } from "./socket";
import Cookies from "js-cookie"; // 获取Cookie

// ws的连接url,默认和baseUrl同个后端的/ws路由
const baseUrl = import.meta.env.VITE_BASE_URL;
const wsUrl =
  (baseUrl.startsWith("https")
    ? baseUrl.replace(/^https/, "wss")
    : baseUrl.replace(/^http/, "ws")) + "/ws";

/**
 * 全局唯一 WebSocket 实例
 */
export const ws = new SocketClient(wsUrl, {
  tokenGetter: () => Cookies.get("token"),
});

socket.js:

JavaScript 复制代码
/**
 * 经典 WebSocket Client 封装
 * - 生命周期管理
 * - 自动重连(指数退避)
 * - auth / ping-pong
 * - 内部状态管理(不依赖任何框架)
 * - 事件订阅
 */

export const WS_STATUS = {
  OFFLINE: "offline",
  CONNECTING: "connecting",
  ONLINE: "online",
};

export const AUTH_STATUS = {
  PENDING: "pending",
  AUTHED: "authed",
  FAILED: "failed",
};

export class SocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.tokenGetter = options.tokenGetter;
    this.ws = null;
    /** 内部状态(完全内聚) */
    this.state = {
      status: WS_STATUS.OFFLINE,
      reconnectCount: 0,
      auth: AUTH_STATUS.PENDING,
    };
    /** 消息事件 */
    this.handlers = new Map();
    /** 状态监听(给 UI / 调试 / 日志用) */
    this.stateListeners = new Set();
    this.reconnectTimer = null;
    this.authTimer = null;
    this.shouldReconnect = true; // 是否允许重新连接

    // 内部处理后端给出的auth事件,判断当前权限鉴别状态
    this.on("auth_ok", () => {
      clearTimeout(this.authTimer);
      this._setState({ auth: AUTH_STATUS.AUTHED });
    });
    this.on("auth_error", (err) => {
      this._onAuthFailed(err);
    });
  }

  /* --- 状态系统 --- */
  getState() {
    return { ...this.state };
  }

  onStateChange(fn) {
    this.stateListeners.add(fn);
    // 立即同步一次当前状态
    fn(this.getState());
    return () => this.stateListeners.delete(fn);
  }

  _setState(patch) {
    Object.assign(this.state, patch);
    this.stateListeners.forEach((fn) => fn(this.getState()));
  }

  /* --- 连接管理 --- */
  connect() {
    if (this.ws?.readyState === WebSocket.OPEN) return;
    this._setState({
      status: WS_STATUS.CONNECTING,
      auth: AUTH_STATUS.PENDING,
      reconnectCount: 0,
    });
    this.shouldReconnect = true; // 重置连接
    this.ws = new WebSocket(this.url);
    this.ws.onopen = () => {
      this._setState({
        status: WS_STATUS.ONLINE,
      });
      this._auth();
    };
    this.ws.onclose = () => {
      this._setState({ status: WS_STATUS.OFFLINE });
      if (this.shouldReconnect && this.state.auth !== AUTH_STATUS.FAILED) {
        this._reconnect();
      }
    };
    this.ws.onerror = () => {
      this.ws.close();
    };
    // 统一事件 - 事件全部走emit
    this.ws.onmessage = (ev) => {
      try {
        const msg = JSON.parse(ev.data);
        this._emit(msg.type, msg.data);
      } catch (e) {
        console.error("[WS] invalid message", ev.data);
      }
    };
  }

  close() {
    this.shouldReconnect = false;
    clearTimeout(this.reconnectTimer);
    this.reconnectTimer = null;
    clearTimeout(this.authTimer);
    this.ws?.close();
    this._setState({ status: WS_STATUS.OFFLINE });
  }

  /* --- 重连 --- */
  // 系统重连
  _reconnect() {
    if (this.reconnectTimer) return;
    const delay = Math.min(3000 * (this.state.reconnectCount + 1), 15000);
    this.reconnectTimer = setTimeout(() => {
      this.reconnectTimer = null;
      this._setState({
        status: WS_STATUS.CONNECTING,
        reconnectCount: this.state.reconnectCount + 1,
      });
      this.connect();
    }, delay);
  }
  // 用户重连:立刻执行
  reconnect() {
    this.shouldReconnect = true;
    clearTimeout(this.reconnectTimer);
    this.reconnectTimer = null;
    this._setState({
      status: WS_STATUS.CONNECTING,
      reconnectCount: 0, // 主动行为,重置次数
      auth: AUTH_STATUS.PENDING,
    });
    this.ws?.close(); // 确保干净
    this.connect();
  }

  /* --- 认证 --- */
  _auth() {
    const token = this.tokenGetter?.();
    if (!token) return;
    this.send("auth", { token });
    // auth 超时兜底(防服务端未响应)
    this.authTimer = setTimeout(() => {
      if (this.state.auth === AUTH_STATUS.PENDING) {
        console.warn("[WS] auth timeout");
        this._onAuthFailed({
          code: "AUTH_TIMEOUT",
        });
      }
    }, 5000);
  }
  _onAuthFailed(err) {
    clearTimeout(this.authTimer);
    this._setState({ auth: AUTH_STATUS.FAILED });
    this.shouldReconnect = false;
    this._emit("auth_failed", err); // 对外通知
    this.ws?.close();
  }

  /* --- 消息系统 --- */
  send(type, data) {
    if (type !== "auth" && this.state.auth !== AUTH_STATUS.AUTHED) {
      console.warn("[WS] not authed, drop message:", type);
      return;
    }
    if (this.ws?.readyState !== WebSocket.OPEN) {
      console.warn("[WS] not connected, drop message:", type);
      return;
    }
    this.ws.send(JSON.stringify({ type, data }));
  }

  on(type, handler) {
    if (!this.handlers.has(type)) {
      this.handlers.set(type, new Set());
    }
    this.handlers.get(type).add(handler);
    return () => this.off(type, handler);
  }

  off(type, handler) {
    this.handlers.get(type)?.delete(handler);
  }

  _emit(type, data) {
    if (type === "ping") {
      this.send("pong");
      return;
    }
    this.handlers.get(type)?.forEach((fn) => {
      fn(data)
    });
  }

  /* --- 房间 --- */
  join(room) {
    this.send("join", { room });
  }

  leave(room) {
    this.send("leave", { room });
  }
}
🤖关于我的WebSocket封装
  • 发布于

    2026-04-01

  • 更新于

    2026-04-11

  • 类目

  • 作者

    Bokey

  • 版权协议

cc

Developed & Design by Bokey
已经发电运行了 0 天,我会继续努力
Copyright © 2024-2029 Bokey's Space
CC BY-NC-SA 4.0
粤ICP备2025398830号-1