Egg.js 正式环境多进程下 Socket.io 收不到消息问题解决方案
在 Egg.js 项目中使用 Socket.io 实现实时消息推送时,常会遇到一个 环境差异化 问题: 本地开发环境(npm run dev)消息收发正常,但部署到正式环境(npm run start)后,频繁出现收不到消息的情况 。这一问题的核心根源在于 Egg.js 正式环境的多进程模型与全局变量的隔离特性,本文将详细拆解问题成因,完整呈现从 问题排查 到 方案落地 的全过程,并补充核心技术原理与实操注意事项,帮助开发者彻底解决该问题。
一、问题现象与初始实现方案
1. 问题现象
本地开发时,Socket.io 连接稳定,消息推送、接收正常;部署到正式环境后,用户虽能建立连接,但经常出现 消息丢失 - 部分用户收不到推送消息,或消息推送延迟严重,且问题无固定规律,难以复现和排查。
2. 初始实现方案(存在缺陷)
最初参考 Egg.js 官方文档,通过 全局变量存储连接句柄 的方式实现消息推送,核心逻辑如下: (1)在 Socket 连接中间件(app/io/middleware/connection.js)中,验证用户权限后,将当前连接的 socket 实例存入全局变量 global.userList ; (2)用户离线时,从 global.userList 中删除对应的 socket 实例; (3)在其他 Controller 或 Service 中,直接从 global.userList 中获取目标用户的 socket 实例,调用 emit 方法推送消息。 核心代码实现(连接中间件):
// app/io/middleware/connection.js
module.exports = app => {
return async (ctx, next) => {
// 获取当前连接的 socket 实例
const socket = ctx.socket;
// 1. 用户权限验证(基于 JWT)
try {
// 从查询参数中获取 token 并验证
const auth = ctx.jwt.verify(ctx.query.token, app.config.jwt.secret);
// 根据验证后的用户 ID 获取用户信息,挂载到 ctx 上
ctx.auth = await ctx.service.user.getUserBy('id', auth.id);
} catch (err) {
// 权限验证失败,可直接断开连接
socket.disconnect(true);
return;
}
// 2. 给 socket 实例附加用户 ID,用于后续身份匹配
socket.user_id = ctx.auth.id;
// 3. 将 socket 实例存入全局变量,供其他模块调用
if (!global.userList) {
// 初始化全局连接列表(避免首次调用时为 undefined)
global.userList = [];
}
global.userList.push(socket);
// 执行后续中间件
await next();
// 4. 用户离线:从全局列表中移除对应的 socket 实例
if (global.userList && global.userList.length > 0) {
for (let i = 0; i < global.userList.length; i++) {
if (global.userList[i].user_id === ctx.auth.id) {
global.userList.splice(i, 1);
break;
}
}
}
};
};二、问题根源:Egg.js 多进程模型与全局变量隔离
上述方案在本地开发环境正常,却在正式环境失效,核心原因是 Egg.js 正式环境的多进程模型导致全局变量隔离 ,具体原理如下:
1. Egg.js 环境差异:开发环境 vs 正式环境
Egg.js 提供两种核心启动模式,对应不同的进程模型:
- 开发环境(npm run dev):为简化调试,默认采用 单进程模式 (即使配置了多进程,也会自动优化为单进程运行),全局变量
global在整个应用中唯一,因此global.userList能正常存储所有 socket 连接; - 正式环境(npm run start):为充分利用服务器 CPU 资源,默认采用 Cluster 多进程模式 - 主进程(Master)会根据 CPU 核心数 fork 出多个工作进程(Worker),每个 Worker 进程都是独立的 Node.js 实例,拥有自己的
global变量,进程间的global完全隔离,互不共享。
2. 多进程下的核心矛盾
在正式环境的多进程模型中,每个 Worker 进程都会独立处理一部分 Socket 连接:
- 用户 A 的 socket 连接被分配到 Worker 1 进程,存入 Worker 1 的
global.userList; - 用户 B 的 socket 连接被分配到 Worker 2 进程,存入 Worker 2 的
global.userList; - 当需要给用户 A 推送消息时,若推送逻辑运行在 Worker 2 进程,Worker 2 的
global.userList中没有用户 A 的 socket 实例,就会导致消息推送失败。
这就是正式环境 收不到消息 的本质 - 全局变量的进程隔离性,导致部分 socket 连接无法被推送逻辑找到 。
三、解决方案选型:拒绝集中式存储,基于进程间通信实现全局匹配
排查到问题根源后,首先考虑官方文档的推荐方案:使用 egg-redis 实现 clients/rooms 等信息的全局共享。但该方案需要额外引入 Redis 服务,增加了系统复杂度( 服务越少,Bug 越少 )。结合项目已使用 memcache 作为缓存工具的现状,进一步分析发现: 连接句柄(socket 实例)是进程内的内存对象,无法被序列化存储到 memcache、数据库等外部存储中 ,因此 集中式存储连接句柄 的思路不可行。 最终确定可行方案: 利用 Egg.js 的进程间通信(IPC)机制,结合每个 Worker 进程的本地 socket 列表,实现全局 socket 连接的匹配与消息推送 。核心思路是:
- 放弃全局变量存储,直接使用 Socket.io 内置的连接管理对象
this.app.io.sockets.sockets- 该对象在每个 Worker 进程中存储当前进程处理的所有 socket 连接; - 在连接建立时,给每个 socket 实例附加
user_id标识,用于后续匹配; - 需要推送消息时,通过 Egg.js 的
messenger(进程间通信工具)向所有 Worker 进程发送消息匹配请求; - 每个 Worker 进程收到请求后,在本地的
this.app.io.sockets.sockets中匹配目标user_id的 socket 实例,找到后执行推送逻辑。
四、完整实现步骤
1. 优化连接中间件:移除全局变量,附加 user_id 标识
保留用户权限验证逻辑,删除全局变量 global.userList 的相关操作,仅给 socket 实例附加 user_id (后续用于匹配)。优化后的代码:
// app/io/middleware/connection.js
module.exports = app => {
return async (ctx, next) => {
const socket = ctx.socket;
// 权限验证(失败则断开连接)
try {
const auth = ctx.jwt.verify(ctx.query.token, app.config.jwt.secret);
ctx.auth = await ctx.service.user.getUserBy('id', auth.id);
} catch (err) {
socket.disconnect(true);
return;
}
// 关键:给 socket 附加 user_id,用于后续进程内匹配
socket.user_id = ctx.auth.id;
await next();
// 用户离线时无需额外操作(Socket.io 会自动管理连接状态)
};
};2. 利用进程间通信:发送消息到所有 Worker 进程
当需要推送消息时(如在 Controller 或 Service 中),通过 this.app.messenger.sendToApp() 方法向所有 Worker 进程发送消息。该方法是 Egg.js 封装的进程间通信 API,能确保消息被所有 Worker 进程接收。 示例(在某个 Service 中触发推送):
// app/service/message.js
const Service = require('egg').Service;
module.exports = class extends Service {
// 向指定用户推送消息
async pushToUser(user_id, message) {
// 通过 messenger 向所有 Worker 进程发送推送请求
this.app.messenger.sendToApp('socketSend', {
user_id: user_id, // 目标用户 ID
type: 'privateMessage', // 消息类型(自定义,用于客户端区分)
message: message // 消息内容
});
}
};3. 监听进程间消息:在每个 Worker 进程中执行推送
在 Egg.js 应用启动完成后(app.js 的 serverDidReady 生命周期),监听 messenger 传来的 socketSend 消息,收到消息后调用专门的 Service 执行本地 socket 匹配与推送。 app.js 配置:
// app.js
class AppBootHook {
constructor(app) {
this.app = app;
}
// 应用启动完成后执行(所有 Worker 进程已启动)
async serverDidReady() {
const ctx = this.app.createAnonymousContext(); // 创建匿名上下文(无用户信息,用于调用 Service)
// 监听 messenger 传来的 socketSend 消息
this.app.messenger.on('socketSend', data => {
// 调用 Service 执行推送逻辑
ctx.service.socketHandler.socketSend(data.user_id, data.type, data.message);
});
}
}
module.exports = AppBootHook;4. 实现推送 Service:匹配本地 socket 并发送消息
编写专门的 Service(socketHandler.js),在每个 Worker 进程中遍历本地的 this.app.io.sockets.sockets 集合,根据 user_id 匹配目标 socket 实例,找到后调用 emit 方法推送消息。
/* eslint-disable array-bracket-spacing */
'use strict';
const Service = require('egg').Service;
module.exports = class extends Service {
/**
* 进程内 socket 匹配与消息推送
* @param {number} user_id - 目标用户 ID
* @param {string} type - 消息类型(客户端需对应监听该类型)
* @param {object} message - 消息内容(可序列化对象)
*/
async socketSend(user_id, type, message) {
const { app } = this;
// this.app.io.sockets.sockets 是当前进程的所有 socket 连接集合(键为 socket.id,值为 socket 实例)
const sockets = app.io.sockets.sockets;
// 遍历所有本地 socket 连接,匹配目标 user_id
for (const socketId in sockets) {
const socket = sockets[socketId];
// 注意:user_id 可能为字符串/数字,需统一类型后匹配
if (Number(socket.user_id) === Number(user_id)) {
// 推送消息到目标客户端
socket.emit(type, message);
break; // 一个用户可能只有一个连接,匹配到后直接退出循环
}
}
}
};五、补充优化:在线用户统计
上述方案解决了消息推送问题,但无法直观查看在线用户数量。可基于 进程间通信 + 缓存计数 实现全局在线用户统计,核心逻辑如下:
- 用户连接时:当前 Worker 进程通过
messenger向主进程发送 在线计数 +1 请求,主进程调用 memcache 递增在线用户数; - 用户离线时:当前 Worker 进程通过
messenger向主进程发送 在线计数 -1 请求,主进程调用 memcache 递减在线用户数; - 查看在线数时:直接从 memcache 中获取计数即可。
示例(连接中间件中添加计数逻辑):
// app/io/middleware/connection.js(补充计数逻辑)
module.exports = app => {
return async (ctx, next) => {
const socket = ctx.socket;
// 权限验证(同上,省略重复代码)
try {
const auth = ctx.jwt.verify(ctx.query.token, app.config.jwt.secret);
ctx.auth = await ctx.service.user.getUserBy('id', auth.id);
} catch (err) {
socket.disconnect(true);
return;
}
socket.user_id = ctx.auth.id;
// 1. 用户连接:发送 +1 请求到主进程
app.messenger.sendToApp('onlineCountChange', { action: 'increase' });
await next();
// 2. 用户离线:发送 -1 请求到主进程(next() 之后执行,代表连接已断开)
app.messenger.sendToApp('onlineCountChange', { action: 'decrease' });
};
};app.js 中补充计数监听逻辑:
// app.js(补充在线计数监听)
async serverDidReady() {
const ctx = this.app.createAnonymousContext();
// 初始化在线用户数(若 memcache 中无值则设为 0)
const currentCount = await ctx.service.memcache.get('onlineUserCount') || 0;
await ctx.service.memcache.set('onlineUserCount', currentCount);
// 监听在线计数变更消息
this.app.messenger.on('onlineCountChange', async data => {
let count = await ctx.service.memcache.get('onlineUserCount');
count = Number(count) || 0;
// 根据 action 增减计数(确保计数不小于 0)
if (data.action === 'increase') {
count += 1;
} else if (data.action === 'decrease' && count > 0) {
count -= 1;
}
await ctx.service.memcache.set('onlineUserCount', count);
});
// 原有 socketSend 监听逻辑(同上,省略)
this.app.messenger.on('socketSend', data => {
ctx.service.socketHandler.socketSend(data.user_id, data.type, data.message);
});
}六、核心注意事项
- 进程间通信的消息格式 :通过
messenger发送的消息必须是 可序列化对象 (如 JSON 格式),不能包含函数、socket 实例等不可序列化的内容,否则会导致消息发送失败; - user_id 类型统一 :不同地方存储的
user_id可能存在 字符串/数字 类型差异(如数据库中是数字,socket 附加的是字符串),匹配时需统一类型(如使用Number()转换),避免匹配失败; - Sticky 模式的配合 :若已开启 Egg.js 的 Sticky 模式(解决 Socket.io 400 错误的方案),该方案仍完全适用 - Sticky 模式确保同一用户的连接始终分配到同一 Worker 进程,而本方案解决的是 跨 Worker 进程的消息推送 问题;
- 连接状态的可靠性 :Socket.io 会自动处理连接断开(如网络异常),但仍建议在客户端添加 重连逻辑 ,并在服务端通过
socket.on('disconnect')事件增强离线检测,确保计数准确性; - 高并发场景优化 :若项目并发量极高(万级以上连接),遍历
this.app.io.sockets.sockets可能存在性能损耗,可在服务端维护一个 本地 user_id -> socket.id 的映射对象(替代遍历),进一步提升匹配效率。
七、总结
Egg.js 正式环境多进程下 Socket.io 收不到消息的核心原因,是全局变量的进程隔离性导致连接句柄无法跨进程共享。本文提出的 进程间通信 + 本地连接匹配 方案,无需引入额外服务(如 Redis),充分利用 Egg.js 内置的 messenger 工具和 Socket.io 的连接管理对象,就能实现跨进程的消息推送。 核心步骤可概括为:
- 连接时给 socket 附加 user_id;
- 推送消息时通过 messenger 通知所有 Worker;
- 每个 Worker 遍历本地 socket 集合匹配推送。
该方案兼顾了 轻量性 和 可靠性 ,适用于大多数中小型实时应用;若需支撑更高并发,可在此基础上优化连接匹配逻辑,或引入 Redis 实现更精细化的连接管理。
发布评论
评论列表 0






