Egg.js 正式环境多进程下 Socket.io 收不到消息问题解决方案

2026-01-23 21 浏览 0 评论

在 Egg.js 项目中使用 Socket.io 实现实时消息推送时,常会遇到一个 环境差异化 问题: 本地开发环境(npm run dev)消息收发正常,但部署到正式环境(npm run start)后,频繁出现收不到消息的情况 。这一问题的核心根源在于 Egg.js 正式环境的多进程模型与全局变量的隔离特性,本文将详细拆解问题成因,完整呈现从 问题排查 到 方案落地 的全过程,并补充核心技术原理与实操注意事项,帮助开发者彻底解决该问题。

一、问题现象与初始实现方案

1. 问题现象

本地开发时,Socket.io 连接稳定,消息推送、接收正常;部署到正式环境后,用户虽能建立连接,但经常出现 消息丢失 - 部分用户收不到推送消息,或消息推送延迟严重,且问题无固定规律,难以复现和排查。

2. 初始实现方案(存在缺陷)

最初参考 Egg.js 官方文档,通过 全局变量存储连接句柄 的方式实现消息推送,核心逻辑如下: (1)在 Socket 连接中间件(app/io/middleware/connection.js)中,验证用户权限后,将当前连接的 socket 实例存入全局变量 global.userList ; (2)用户离线时,从 global.userList 中删除对应的 socket 实例; (3)在其他 Controller 或 Service 中,直接从 global.userList 中获取目标用户的 socket 实例,调用 emit 方法推送消息。 核心代码实现(连接中间件):


// app/io/middleware/connection.js
module.exports = app => {
  return async (ctx, next) => {
    // 获取当前连接的 socket 实例
    const socket = ctx.socket;

    // 1. 用户权限验证(基于 JWT)
    try {
      // 从查询参数中获取 token 并验证
      const auth = ctx.jwt.verify(ctx.query.token, app.config.jwt.secret);
      // 根据验证后的用户 ID 获取用户信息,挂载到 ctx 上
      ctx.auth = await ctx.service.user.getUserBy('id', auth.id);
    } catch (err) {
      // 权限验证失败,可直接断开连接
      socket.disconnect(true);
      return;
    }

    // 2. 给 socket 实例附加用户 ID,用于后续身份匹配
    socket.user_id = ctx.auth.id;

    // 3. 将 socket 实例存入全局变量,供其他模块调用
    if (!global.userList) {
      // 初始化全局连接列表(避免首次调用时为 undefined)
      global.userList = [];
    }
    global.userList.push(socket);

    // 执行后续中间件
    await next();

    // 4. 用户离线:从全局列表中移除对应的 socket 实例
    if (global.userList && global.userList.length > 0) {
      for (let i = 0; i < global.userList.length; i++) {
        if (global.userList[i].user_id === ctx.auth.id) {
          global.userList.splice(i, 1);
          break;
        }
      }
    }
  };
};

二、问题根源:Egg.js 多进程模型与全局变量隔离

上述方案在本地开发环境正常,却在正式环境失效,核心原因是 Egg.js 正式环境的多进程模型导致全局变量隔离 ,具体原理如下:

1. Egg.js 环境差异:开发环境 vs 正式环境

Egg.js 提供两种核心启动模式,对应不同的进程模型:

  • 开发环境(npm run dev):为简化调试,默认采用 单进程模式 (即使配置了多进程,也会自动优化为单进程运行),全局变量 global 在整个应用中唯一,因此 global.userList 能正常存储所有 socket 连接;
  • 正式环境(npm run start):为充分利用服务器 CPU 资源,默认采用 Cluster 多进程模式 - 主进程(Master)会根据 CPU 核心数 fork 出多个工作进程(Worker),每个 Worker 进程都是独立的 Node.js 实例,拥有自己的 global 变量,进程间的 global 完全隔离,互不共享。

2. 多进程下的核心矛盾

在正式环境的多进程模型中,每个 Worker 进程都会独立处理一部分 Socket 连接:

  • 用户 A 的 socket 连接被分配到 Worker 1 进程,存入 Worker 1 的 global.userList
  • 用户 B 的 socket 连接被分配到 Worker 2 进程,存入 Worker 2 的 global.userList
  • 当需要给用户 A 推送消息时,若推送逻辑运行在 Worker 2 进程,Worker 2 的 global.userList 中没有用户 A 的 socket 实例,就会导致消息推送失败。

这就是正式环境 收不到消息 的本质 - 全局变量的进程隔离性,导致部分 socket 连接无法被推送逻辑找到

三、解决方案选型:拒绝集中式存储,基于进程间通信实现全局匹配

排查到问题根源后,首先考虑官方文档的推荐方案:使用 egg-redis 实现 clients/rooms 等信息的全局共享。但该方案需要额外引入 Redis 服务,增加了系统复杂度( 服务越少,Bug 越少 )。结合项目已使用 memcache 作为缓存工具的现状,进一步分析发现: 连接句柄(socket 实例)是进程内的内存对象,无法被序列化存储到 memcache、数据库等外部存储中 ,因此 集中式存储连接句柄 的思路不可行。 最终确定可行方案: 利用 Egg.js 的进程间通信(IPC)机制,结合每个 Worker 进程的本地 socket 列表,实现全局 socket 连接的匹配与消息推送 。核心思路是:

  1. 放弃全局变量存储,直接使用 Socket.io 内置的连接管理对象 this.app.io.sockets.sockets - 该对象在每个 Worker 进程中存储当前进程处理的所有 socket 连接;
  2. 在连接建立时,给每个 socket 实例附加 user_id 标识,用于后续匹配;
  3. 需要推送消息时,通过 Egg.js 的 messenger (进程间通信工具)向所有 Worker 进程发送消息匹配请求;
  4. 每个 Worker 进程收到请求后,在本地的 this.app.io.sockets.sockets 中匹配目标 user_id 的 socket 实例,找到后执行推送逻辑。

四、完整实现步骤

1. 优化连接中间件:移除全局变量,附加 user_id 标识

保留用户权限验证逻辑,删除全局变量 global.userList 的相关操作,仅给 socket 实例附加 user_id (后续用于匹配)。优化后的代码:


// app/io/middleware/connection.js
module.exports = app => {
  return async (ctx, next) => {
    const socket = ctx.socket;

    // 权限验证(失败则断开连接)
    try {
      const auth = ctx.jwt.verify(ctx.query.token, app.config.jwt.secret);
      ctx.auth = await ctx.service.user.getUserBy('id', auth.id);
    } catch (err) {
      socket.disconnect(true);
      return;
    }

    // 关键:给 socket 附加 user_id,用于后续进程内匹配
    socket.user_id = ctx.auth.id;

    await next();

    // 用户离线时无需额外操作(Socket.io 会自动管理连接状态)
  };
};

2. 利用进程间通信:发送消息到所有 Worker 进程

当需要推送消息时(如在 Controller 或 Service 中),通过 this.app.messenger.sendToApp() 方法向所有 Worker 进程发送消息。该方法是 Egg.js 封装的进程间通信 API,能确保消息被所有 Worker 进程接收。 示例(在某个 Service 中触发推送):


// app/service/message.js
const Service = require('egg').Service;
module.exports = class extends Service {
  // 向指定用户推送消息
  async pushToUser(user_id, message) {
    // 通过 messenger 向所有 Worker 进程发送推送请求
    this.app.messenger.sendToApp('socketSend', {
      user_id: user_id,       // 目标用户 ID
      type: 'privateMessage', // 消息类型(自定义,用于客户端区分)
      message: message        // 消息内容
    });
  }
};

3. 监听进程间消息:在每个 Worker 进程中执行推送

在 Egg.js 应用启动完成后(app.js 的 serverDidReady 生命周期),监听 messenger 传来的 socketSend 消息,收到消息后调用专门的 Service 执行本地 socket 匹配与推送。 app.js 配置:


// app.js
class AppBootHook {
  constructor(app) {
    this.app = app;
  }

  // 应用启动完成后执行(所有 Worker 进程已启动)
  async serverDidReady() {
    const ctx = this.app.createAnonymousContext(); // 创建匿名上下文(无用户信息,用于调用 Service)
    
    // 监听 messenger 传来的 socketSend 消息
    this.app.messenger.on('socketSend', data => {
      // 调用 Service 执行推送逻辑
      ctx.service.socketHandler.socketSend(data.user_id, data.type, data.message);
    });
  }
}

module.exports = AppBootHook;

4. 实现推送 Service:匹配本地 socket 并发送消息

编写专门的 Service(socketHandler.js),在每个 Worker 进程中遍历本地的 this.app.io.sockets.sockets 集合,根据 user_id 匹配目标 socket 实例,找到后调用 emit 方法推送消息。


/* eslint-disable array-bracket-spacing */
'use strict';
const Service = require('egg').Service;

module.exports = class extends Service {
  /**
   * 进程内 socket 匹配与消息推送
   * @param {number} user_id - 目标用户 ID
   * @param {string} type - 消息类型(客户端需对应监听该类型)
   * @param {object} message - 消息内容(可序列化对象)
   */
  async socketSend(user_id, type, message) {
    const { app } = this;
    // this.app.io.sockets.sockets 是当前进程的所有 socket 连接集合(键为 socket.id,值为 socket 实例)
    const sockets = app.io.sockets.sockets;

    // 遍历所有本地 socket 连接,匹配目标 user_id
    for (const socketId in sockets) {
      const socket = sockets[socketId];
      // 注意:user_id 可能为字符串/数字,需统一类型后匹配
      if (Number(socket.user_id) === Number(user_id)) {
        // 推送消息到目标客户端
        socket.emit(type, message);
        break; // 一个用户可能只有一个连接,匹配到后直接退出循环
      }
    }
  }
};

五、补充优化:在线用户统计

上述方案解决了消息推送问题,但无法直观查看在线用户数量。可基于 进程间通信 + 缓存计数 实现全局在线用户统计,核心逻辑如下:

  1. 用户连接时:当前 Worker 进程通过 messenger 向主进程发送 在线计数 +1 请求,主进程调用 memcache 递增在线用户数;
  2. 用户离线时:当前 Worker 进程通过 messenger 向主进程发送 在线计数 -1 请求,主进程调用 memcache 递减在线用户数;
  3. 查看在线数时:直接从 memcache 中获取计数即可。

示例(连接中间件中添加计数逻辑):


// app/io/middleware/connection.js(补充计数逻辑)
module.exports = app => {
  return async (ctx, next) => {
    const socket = ctx.socket;

    // 权限验证(同上,省略重复代码)
    try {
      const auth = ctx.jwt.verify(ctx.query.token, app.config.jwt.secret);
      ctx.auth = await ctx.service.user.getUserBy('id', auth.id);
    } catch (err) {
      socket.disconnect(true);
      return;
    }

    socket.user_id = ctx.auth.id;

    // 1. 用户连接:发送 +1 请求到主进程
    app.messenger.sendToApp('onlineCountChange', { action: 'increase' });

    await next();

    // 2. 用户离线:发送 -1 请求到主进程(next() 之后执行,代表连接已断开)
    app.messenger.sendToApp('onlineCountChange', { action: 'decrease' });
  };
};

app.js 中补充计数监听逻辑:


// app.js(补充在线计数监听)
async serverDidReady() {
  const ctx = this.app.createAnonymousContext();
  
  // 初始化在线用户数(若 memcache 中无值则设为 0)
  const currentCount = await ctx.service.memcache.get('onlineUserCount') || 0;
  await ctx.service.memcache.set('onlineUserCount', currentCount);

  // 监听在线计数变更消息
  this.app.messenger.on('onlineCountChange', async data => {
    let count = await ctx.service.memcache.get('onlineUserCount');
    count = Number(count) || 0;
    // 根据 action 增减计数(确保计数不小于 0)
    if (data.action === 'increase') {
      count += 1;
    } else if (data.action === 'decrease' && count > 0) {
      count -= 1;
    }
    await ctx.service.memcache.set('onlineUserCount', count);
  });

  // 原有 socketSend 监听逻辑(同上,省略)
  this.app.messenger.on('socketSend', data => {
    ctx.service.socketHandler.socketSend(data.user_id, data.type, data.message);
  });
}

六、核心注意事项

  1. 进程间通信的消息格式 :通过 messenger 发送的消息必须是 可序列化对象 (如 JSON 格式),不能包含函数、socket 实例等不可序列化的内容,否则会导致消息发送失败;
  2. user_id 类型统一 :不同地方存储的 user_id 可能存在 字符串/数字 类型差异(如数据库中是数字,socket 附加的是字符串),匹配时需统一类型(如使用 Number() 转换),避免匹配失败;
  3. Sticky 模式的配合 :若已开启 Egg.js 的 Sticky 模式(解决 Socket.io 400 错误的方案),该方案仍完全适用 - Sticky 模式确保同一用户的连接始终分配到同一 Worker 进程,而本方案解决的是 跨 Worker 进程的消息推送 问题;
  4. 连接状态的可靠性 :Socket.io 会自动处理连接断开(如网络异常),但仍建议在客户端添加 重连逻辑 ,并在服务端通过 socket.on('disconnect') 事件增强离线检测,确保计数准确性;
  5. 高并发场景优化 :若项目并发量极高(万级以上连接),遍历 this.app.io.sockets.sockets 可能存在性能损耗,可在服务端维护一个 本地 user_id -> socket.id 的映射对象(替代遍历),进一步提升匹配效率。

七、总结

Egg.js 正式环境多进程下 Socket.io 收不到消息的核心原因,是全局变量的进程隔离性导致连接句柄无法跨进程共享。本文提出的 进程间通信 + 本地连接匹配 方案,无需引入额外服务(如 Redis),充分利用 Egg.js 内置的 messenger 工具和 Socket.io 的连接管理对象,就能实现跨进程的消息推送。 核心步骤可概括为:

  1. 连接时给 socket 附加 user_id;
  2. 推送消息时通过 messenger 通知所有 Worker;
  3. 每个 Worker 遍历本地 socket 集合匹配推送。

该方案兼顾了 轻量性 和 可靠性 ,适用于大多数中小型实时应用;若需支撑更高并发,可在此基础上优化连接匹配逻辑,或引入 Redis 实现更精细化的连接管理。


发布评论

发布评论前请先 登录

评论列表 0

暂无评论