小程序对接文心一言:基于 Socket 实现流式消息实时推送
在小程序开发中,对接大模型的流式接口时, wx.request 对数据流(chunk)的处理能力有限,无法高效接收分段返回的内容。本文将详细讲解如何基于 Egg.js 框架,通过 Socket 通信实现文心一言(ERNIE)流式聊天接口的对接,解决小程序端流式消息接收的痛点问题。

一、需求背景与技术选型
1.1 核心痛点
文心一言的流式接口会将回答内容分段返回(chunk),而小程序的 wx.request 没有原生的流式处理能力,无法实时接收每一段返回的内容,只能等待接口完全返回后才能获取完整数据,导致用户体验差(等待时间长、无实时反馈)。
1.2 解决方案
- 后端:使用 Egg.js 作为服务端框架,通过
request库调用文心一言流式接口,解析分段返回的 chunk 数据; - 通信方式:采用 Socket 通信,后端解析到每一段 chunk 后,立即通过 Socket 推送给小程序前端;
- 缓存优化:对文心一言的
access_token进行缓存,避免重复请求获取 token,提升接口性能。
二、代码模块拆解与详细说明
2.1 服务层(Service):获取并缓存文心一言 Access Token
服务层主要负责获取文心一言接口的 access_token ,并通过缓存减少重复请求,是对接文心一言的基础。
// service/yiyan.js
'use strict';
const Service = require('egg').Service;
const request = require('request');
module.exports = class extends Service {
// 核心方法:获取文心一言 access_token
async getToken() {
// 1. 定义缓存 key,用于标识 token 缓存
const key = 'yiyanToken';
// 2. 先从缓存中获取 token,避免重复请求
let yiyanToken = await this.ctx.service.memcache.get(key);
// 3. 缓存中无 token 时,重新请求获取
if (typeof yiyanToken === 'undefined') {
// 文心一言控制台获取的客户端 ID 和密钥(需替换为实际值)
const client_id = 'client_id_code';
const client_secret = 'client_secret_code';
// token 请求地址
const url = `https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=${client_id}&client_secret=${client_secret}`;
// 4. 同步请求获取 token 响应
const res = await this.requestSync({ method: 'POST', url });
// 5. 解析响应内容,提取 access_token
const match = res.match(/"access_token":"(.+?)",/);
if (match) yiyanToken = match[1];
// 6. 将 token 存入缓存,有效期 5 天(token 实际有效期为 30 天,此处留余量)
await this.ctx.service.memcache.set(key, yiyanToken, 5 * 24 * 60 * 60);
}
// 7. 返回 token
return yiyanToken;
}
// 辅助方法:封装同步请求(适配 request 异步特性)
requestSync(params) {
return new Promise(r => {
request(params).on('data', data => {
r(data.toString());
});
});
}
};关键说明 :
getToken方法是核心:先查缓存,缓存失效时再调用百度 OAuth 接口获取 token;requestSync方法封装了request库的异步请求,转为 Promise 风格,适配 Egg.js 的异步编程模式;- 缓存有效期设置为 5 天,既避免频繁请求 token,又能防止 token 过期(百度 token 有效期为 30 天)。
2.2 控制器层(Controller):处理聊天请求与流式数据推送
控制器层是核心业务逻辑层,负责接收小程序前端的聊天请求、调用文心一言流式接口、解析 chunk 数据,并通过 Socket 推送给前端。
2.2.1 前置准备:参数处理与数据清洗
// controller/yiyan.js
/* eslint-disable prefer-const */
'use strict';
const Controller = require('egg').Controller;
const request = require('request');
module.exports = class extends Controller {
async index() {
// 1. 获取文心一言 access_token(调用上文的 service 方法)
const access_token = await this.ctx.service.yiyan.getToken();
// 2. 获取前端传入的请求体(包含聊天消息和 socketId)
const body = this.ctx.request.body;
// 解析消息内容(前端传入的 message 为 JSON 字符串,需转为对象)
body.message = JSON.parse(body.message);
// 3. 过滤空内容/无效回复(如“会话已中止”“终止生成”)
for (let i = body.message.length - 1; i >= 0; i--) {
if (
body.message[i] &&
body.message[i].role === 'assistant' && // 仅过滤 AI 回复的无效内容
(body.message[i].content === '' ||
body.message[i].content === '<p>会话已中止</p>' ||
body.message[i].content === '终止生成')
) {
body.message.splice(i, 1); // 删除无效 AI 回复
body.message.splice(i - 1, 1); // 连带删除对应的用户提问
}
}
// 4. 去重连续相同角色的消息(避免连续用户/AI 消息)
let role = '';
for (let i = body.message.length - 1; i >= 0; i--) {
if (role === body.message[i].role) {
body.message.splice(i, 1);
}
role = body.message[i].role;
}
// 5. 限制会话最大消息数(避免上下文过长)
let max_chat_number = Number(await this.ctx.service.option.getOption('max_chat_number')) || 20;
if (max_chat_number % 2 === 0) max_chat_number += 1; // 保证奇数(用户+AI 成对)
if (body.message.length > max_chat_number) {
// 超出限制时,删除最早的消息,保留最新的 max_chat_number 条
body.message.splice(0, body.message.length - max_chat_number);
}关键说明 :
- 数据清洗是必要步骤:过滤无效内容可避免文心一言接口处理无意义的上下文;
- 限制会话消息数:防止上下文过长导致接口调用失败,同时优化接口响应速度;
- 去重连续相同角色消息:符合文心一言接口的消息格式要求(用户和 AI 消息需交替)。
2.2.2 核心逻辑:调用文心一言流式接口并解析 chunk
// 6. 定义文心一言不同模型的接口地址
const yiyanModuleUrl = {
'ERNIE-Bot': 'https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions',
'ERNIE-4.0': 'https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_pro',
'ERNIE-Bot-turbo': 'https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant',
};
// 选择使用的模型(可根据需求切换)
const yiyanModule = 'ERNIE-Bot';
// 拼接完整的接口地址(包含 access_token)
const url = `${yiyanModuleUrl[yiyanModule]}?access_token=${access_token}`;
// 用于标记已推送的 chunk 索引,避免重复推送
let sendIndex = 0;
// 存储分段接收的数据流
let chunk = '';
// 7. 调用文心一言流式接口
request
.post(url, {
json: {
messages: body.message, // 清洗后的聊天上下文
stream: true, // 开启流式返回
temperature: 0.5, // 回答随机性(0-1,值越小越稳定)
enable_citation: true, // 开启引用标注
},
})
// 监听响应事件:处理流式返回的 chunk 数据
.on('response', response => {
response.on('data', async data => {
// 拼接分段数据
chunk += data.toString();
let chunk2 = chunk.trim();
// 过滤不完整的 chunk(以 }} 结尾才是完整的分段)
if (!/\}\}$/.test(chunk2)) return;
// 拆分多个 chunk 数据(按 data: 分隔)
const chunkArray = chunk2.split('data:');
// 遍历 chunk 数组,推送未发送的分段
for (let i = 0; i < chunkArray.length; i++) {
if (chunkArray.length - 1 < sendIndex) continue;
if (chunkArray[sendIndex]) {
// 通过 Socket 推送当前 chunk 到前端(指定 socketId)
this.app.messenger.sendToApp('socketSend', {
socket_id: this.ctx.auth.socket_id_wxsp, // 前端传入的 socketId
message: {
code: 0,
type: 'chat', // 消息类型:聊天内容
message: '',
data: chunkArray[sendIndex].trim(), // 当前分段的聊天内容
},
});
}
sendIndex++; // 标记已推送的索引
}
});
})关键说明 :
stream: true是开启流式返回的核心参数,文心一言接口会分段返回回答内容;- 数据拼接与校验:通过
chunk += data.toString()拼接分段数据,通过/\}\}$/正则校验完整的 chunk,避免处理不完整数据; - Socket 推送:通过
this.app.messenger.sendToApp将每一段 chunk 推送给对应 socketId 的前端,实现“边生成边返回”的效果。
2.2.3 收尾逻辑:会话记录与异常处理
// 8. 监听流式接口结束事件:保存完整会话记录
.on('end', async () => {
const result = chunk.split('data:');
let rtText = '';
// 拼接所有 chunk,得到完整的 AI 回复
for (let i = 0; i < result.length; i++) {
if (!result[i]) continue;
let line = result[i].trim();
// 过滤非 JSON 格式的内容
if (!this.ctx.helper.isJSON(line)) continue;
line = JSON.parse(line);
if (!line.result) continue;
rtText += line.result;
}
// 保存会话记录到数据库
if (rtText) {
await this.ctx.service.yiyanChat.insert({
user_id: this.ctx.auth.id, // 当前用户 ID
message: JSON.stringify(body.message), // 聊天上下文
response: rtText.trim(), // 完整的 AI 回复
chunk, // 原始 chunk 数据(便于排查问题)
});
}
})
// 9. 监听接口错误事件:保存错误记录
.on('error', async err => {
await this.ctx.service.yiyanChat.insert({
user_id: this.ctx.auth.id,
message: JSON.stringify(body.message),
response: '',
error_info: typeof err === 'object' ? err.message : JSON.stringify(err), // 错误信息
chunk,
});
});
// 10. 返回前端基础响应(流式内容通过 Socket 推送,此处仅告知请求已受理)
this.ctx.body = {
success: true,
data: '',
message: '',
code: 0,
};
}
};关键说明 :
- 流式接口结束后,拼接所有 chunk 得到完整回复,并存入数据库,便于后续查看会话记录;
- 错误监听:捕获接口调用异常,保存错误信息,便于问题排查;
- 控制器最终返回的
ctx.body仅作为“请求受理”的响应,实际聊天内容通过 Socket 实时推送。
2.3 Socket 通信补充说明
代码中通过 this.app.messenger.sendToApp('socketSend', {...}) 推送消息,需配合 Egg.js 的 Socket.IO 配置使用(前端小程序需连接对应的 Socket 服务):
- 小程序端连接 Socket 后,会收到后端返回的
socketId; - 小程序发送聊天请求时,需携带该
socketId; - 后端通过
socketId精准推送当前用户的聊天 chunk 数据; - 小程序端监听 Socket 消息,接收每一段 chunk 并实时渲染到页面。
三、小程序前端适配要点
- 连接 Socket:使用
wx.connectSocket连接后端 Socket 服务,监听onMessage事件接收 chunk 数据; - 消息拼接:收到每一段
chat类型的消息后,解析data字段,拼接成完整的回答内容; - 异常处理:监听 Socket 断开事件,必要时重新连接;
- 传参规范:调用
/v1/yiyan接口时,需将message(聊天上下文)转为 JSON 字符串,同时携带socketId。
总结
- 核心思路:通过 Egg.js 后端中转文心一言流式接口,利用 Socket 解决小程序
wx.request无法处理流式数据的问题; - 关键优化:对
access_token缓存、聊天上下文清洗(过滤无效内容、限制长度),提升接口稳定性和性能; - 核心逻辑:解析文心一言返回的 chunk 数据,通过 Socket 实时推送给小程序前端,实现“边生成边显示”的聊天体验。
该方案既解决了小程序流式数据处理的痛点,又通过数据清洗、缓存等手段保证了接口的稳定性,可直接适配到各类小程序 AI 聊天场景中。
发布评论
发布评论前请先 登录。
评论列表 0

暂无评论



