小程序对接文心一言:基于 Socket 实现流式消息实时推送

2026-01-30 43 浏览 0 评论

在小程序开发中,对接大模型的流式接口时, wx.request 对数据流(chunk)的处理能力有限,无法高效接收分段返回的内容。本文将详细讲解如何基于 Egg.js 框架,通过 Socket 通信实现文心一言(ERNIE)流式聊天接口的对接,解决小程序端流式消息接收的痛点问题。

一、需求背景与技术选型

1.1 核心痛点

文心一言的流式接口会将回答内容分段返回(chunk),而小程序的 wx.request 没有原生的流式处理能力,无法实时接收每一段返回的内容,只能等待接口完全返回后才能获取完整数据,导致用户体验差(等待时间长、无实时反馈)。

1.2 解决方案

  • 后端:使用 Egg.js 作为服务端框架,通过 request 库调用文心一言流式接口,解析分段返回的 chunk 数据;
  • 通信方式:采用 Socket 通信,后端解析到每一段 chunk 后,立即通过 Socket 推送给小程序前端;
  • 缓存优化:对文心一言的 access_token 进行缓存,避免重复请求获取 token,提升接口性能。

二、代码模块拆解与详细说明

2.1 服务层(Service):获取并缓存文心一言 Access Token

服务层主要负责获取文心一言接口的 access_token ,并通过缓存减少重复请求,是对接文心一言的基础。

// service/yiyan.js
'use strict';

const Service = require('egg').Service;
const request = require('request');

module.exports = class extends Service {
  // 核心方法:获取文心一言 access_token
  async getToken() {
    // 1. 定义缓存 key,用于标识 token 缓存
    const key = 'yiyanToken';
    // 2. 先从缓存中获取 token,避免重复请求
    let yiyanToken = await this.ctx.service.memcache.get(key);

    // 3. 缓存中无 token 时,重新请求获取
    if (typeof yiyanToken === 'undefined') {
      // 文心一言控制台获取的客户端 ID 和密钥(需替换为实际值)
      const client_id = 'client_id_code';
      const client_secret = 'client_secret_code';
      // token 请求地址
      const url = `https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=${client_id}&client_secret=${client_secret}`;

      // 4. 同步请求获取 token 响应
      const res = await this.requestSync({ method: 'POST', url });
      // 5. 解析响应内容,提取 access_token
      const match = res.match(/"access_token":"(.+?)",/);
      if (match) yiyanToken = match[1];

      // 6. 将 token 存入缓存,有效期 5 天(token 实际有效期为 30 天,此处留余量)
      await this.ctx.service.memcache.set(key, yiyanToken, 5 * 24 * 60 * 60);
    }
    // 7. 返回 token
    return yiyanToken;
  }

  // 辅助方法:封装同步请求(适配 request 异步特性)
  requestSync(params) {
    return new Promise(r => {
      request(params).on('data', data => {
        r(data.toString());
      });
    });
  }
};

关键说明

  • getToken 方法是核心:先查缓存,缓存失效时再调用百度 OAuth 接口获取 token;
  • requestSync 方法封装了 request 库的异步请求,转为 Promise 风格,适配 Egg.js 的异步编程模式;
  • 缓存有效期设置为 5 天,既避免频繁请求 token,又能防止 token 过期(百度 token 有效期为 30 天)。

2.2 控制器层(Controller):处理聊天请求与流式数据推送

控制器层是核心业务逻辑层,负责接收小程序前端的聊天请求、调用文心一言流式接口、解析 chunk 数据,并通过 Socket 推送给前端。

2.2.1 前置准备:参数处理与数据清洗

// controller/yiyan.js
/* eslint-disable prefer-const */
'use strict';

const Controller = require('egg').Controller;
const request = require('request');

module.exports = class extends Controller {
  async index() {
    // 1. 获取文心一言 access_token(调用上文的 service 方法)
    const access_token = await this.ctx.service.yiyan.getToken();

    // 2. 获取前端传入的请求体(包含聊天消息和 socketId)
    const body = this.ctx.request.body;
    // 解析消息内容(前端传入的 message 为 JSON 字符串,需转为对象)
    body.message = JSON.parse(body.message);

    // 3. 过滤空内容/无效回复(如“会话已中止”“终止生成”)
    for (let i = body.message.length - 1; i >= 0; i--) {
      if (
        body.message[i] &&
        body.message[i].role === 'assistant' && // 仅过滤 AI 回复的无效内容
        (body.message[i].content === '' ||
          body.message[i].content === '<p>会话已中止</p>' ||
          body.message[i].content === '终止生成')
      ) {
        body.message.splice(i, 1); // 删除无效 AI 回复
        body.message.splice(i - 1, 1); // 连带删除对应的用户提问
      }
    }

    // 4. 去重连续相同角色的消息(避免连续用户/AI 消息)
    let role = '';
    for (let i = body.message.length - 1; i >= 0; i--) {
      if (role === body.message[i].role) {
        body.message.splice(i, 1);
      }
      role = body.message[i].role;
    }

    // 5. 限制会话最大消息数(避免上下文过长)
    let max_chat_number = Number(await this.ctx.service.option.getOption('max_chat_number')) || 20;
    if (max_chat_number % 2 === 0) max_chat_number += 1; // 保证奇数(用户+AI 成对)
    if (body.message.length > max_chat_number) {
      // 超出限制时,删除最早的消息,保留最新的 max_chat_number 条
      body.message.splice(0, body.message.length - max_chat_number);
    }

关键说明

  • 数据清洗是必要步骤:过滤无效内容可避免文心一言接口处理无意义的上下文;
  • 限制会话消息数:防止上下文过长导致接口调用失败,同时优化接口响应速度;
  • 去重连续相同角色消息:符合文心一言接口的消息格式要求(用户和 AI 消息需交替)。

2.2.2 核心逻辑:调用文心一言流式接口并解析 chunk

    // 6. 定义文心一言不同模型的接口地址
    const yiyanModuleUrl = {
      'ERNIE-Bot': 'https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions',
      'ERNIE-4.0': 'https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_pro',
      'ERNIE-Bot-turbo': 'https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant',
    };
    // 选择使用的模型(可根据需求切换)
    const yiyanModule = 'ERNIE-Bot';
    // 拼接完整的接口地址(包含 access_token)
    const url = `${yiyanModuleUrl[yiyanModule]}?access_token=${access_token}`;
    // 用于标记已推送的 chunk 索引,避免重复推送
    let sendIndex = 0;
    // 存储分段接收的数据流
    let chunk = '';

    // 7. 调用文心一言流式接口
    request
      .post(url, {
        json: {
          messages: body.message, // 清洗后的聊天上下文
          stream: true, // 开启流式返回
          temperature: 0.5, // 回答随机性(0-1,值越小越稳定)
          enable_citation: true, // 开启引用标注
        },
      })
      // 监听响应事件:处理流式返回的 chunk 数据
      .on('response', response => {
        response.on('data', async data => {
          // 拼接分段数据
          chunk += data.toString();
          let chunk2 = chunk.trim();

          // 过滤不完整的 chunk(以 }} 结尾才是完整的分段)
          if (!/\}\}$/.test(chunk2)) return;
          // 拆分多个 chunk 数据(按 data: 分隔)
          const chunkArray = chunk2.split('data:');

          // 遍历 chunk 数组,推送未发送的分段
          for (let i = 0; i < chunkArray.length; i++) {
            if (chunkArray.length - 1 < sendIndex) continue;
            if (chunkArray[sendIndex]) {
              // 通过 Socket 推送当前 chunk 到前端(指定 socketId)
              this.app.messenger.sendToApp('socketSend', {
                socket_id: this.ctx.auth.socket_id_wxsp, // 前端传入的 socketId
                message: {
                  code: 0,
                  type: 'chat', // 消息类型:聊天内容
                  message: '',
                  data: chunkArray[sendIndex].trim(), // 当前分段的聊天内容
                },
              });
            }
            sendIndex++; // 标记已推送的索引
          }
        });
      })

关键说明

  • stream: true 是开启流式返回的核心参数,文心一言接口会分段返回回答内容;
  • 数据拼接与校验:通过 chunk += data.toString() 拼接分段数据,通过 /\}\}$/ 正则校验完整的 chunk,避免处理不完整数据;
  • Socket 推送:通过 this.app.messenger.sendToApp 将每一段 chunk 推送给对应 socketId 的前端,实现“边生成边返回”的效果。

2.2.3 收尾逻辑:会话记录与异常处理

      // 8. 监听流式接口结束事件:保存完整会话记录
      .on('end', async () => {
        const result = chunk.split('data:');
        let rtText = '';
        // 拼接所有 chunk,得到完整的 AI 回复
        for (let i = 0; i < result.length; i++) {
          if (!result[i]) continue;
          let line = result[i].trim();
          // 过滤非 JSON 格式的内容
          if (!this.ctx.helper.isJSON(line)) continue;
          line = JSON.parse(line);
          if (!line.result) continue;
          rtText += line.result;
        }

        // 保存会话记录到数据库
        if (rtText) {
          await this.ctx.service.yiyanChat.insert({
            user_id: this.ctx.auth.id, // 当前用户 ID
            message: JSON.stringify(body.message), // 聊天上下文
            response: rtText.trim(), // 完整的 AI 回复
            chunk, // 原始 chunk 数据(便于排查问题)
          });
        }
      })
      // 9. 监听接口错误事件:保存错误记录
      .on('error', async err => {
        await this.ctx.service.yiyanChat.insert({
          user_id: this.ctx.auth.id,
          message: JSON.stringify(body.message),
          response: '',
          error_info: typeof err === 'object' ? err.message : JSON.stringify(err), // 错误信息
          chunk,
        });
      });

    // 10. 返回前端基础响应(流式内容通过 Socket 推送,此处仅告知请求已受理)
    this.ctx.body = {
      success: true,
      data: '',
      message: '',
      code: 0,
    };
  }
};

关键说明

  • 流式接口结束后,拼接所有 chunk 得到完整回复,并存入数据库,便于后续查看会话记录;
  • 错误监听:捕获接口调用异常,保存错误信息,便于问题排查;
  • 控制器最终返回的 ctx.body 仅作为“请求受理”的响应,实际聊天内容通过 Socket 实时推送。

2.3 Socket 通信补充说明

代码中通过 this.app.messenger.sendToApp('socketSend', {...}) 推送消息,需配合 Egg.js 的 Socket.IO 配置使用(前端小程序需连接对应的 Socket 服务):

  1. 小程序端连接 Socket 后,会收到后端返回的 socketId
  2. 小程序发送聊天请求时,需携带该 socketId
  3. 后端通过 socketId 精准推送当前用户的聊天 chunk 数据;
  4. 小程序端监听 Socket 消息,接收每一段 chunk 并实时渲染到页面。

三、小程序前端适配要点

  1. 连接 Socket:使用 wx.connectSocket 连接后端 Socket 服务,监听 onMessage 事件接收 chunk 数据;
  2. 消息拼接:收到每一段 chat 类型的消息后,解析 data 字段,拼接成完整的回答内容;
  3. 异常处理:监听 Socket 断开事件,必要时重新连接;
  4. 传参规范:调用 /v1/yiyan 接口时,需将 message (聊天上下文)转为 JSON 字符串,同时携带 socketId

总结

  1. 核心思路:通过 Egg.js 后端中转文心一言流式接口,利用 Socket 解决小程序 wx.request 无法处理流式数据的问题;
  2. 关键优化:对 access_token 缓存、聊天上下文清洗(过滤无效内容、限制长度),提升接口稳定性和性能;
  3. 核心逻辑:解析文心一言返回的 chunk 数据,通过 Socket 实时推送给小程序前端,实现“边生成边显示”的聊天体验。

该方案既解决了小程序流式数据处理的痛点,又通过数据清洗、缓存等手段保证了接口的稳定性,可直接适配到各类小程序 AI 聊天场景中。


发布评论

发布评论前请先 登录

评论列表 0

暂无评论