From 40b58ebdd53f5041f0e5a9d4fb26063e2da1a1b7 Mon Sep 17 00:00:00 2001 From: zoujing Date: Fri, 8 Aug 2025 11:07:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AF=B9=E8=AF=9D=E8=81=8A=E5=88=B0?= =?UTF-8?q?=E7=9A=84=E8=AF=B7=E6=B1=82=E4=BC=98=E5=8C=96=E4=B8=8E=E8=A7=A3?= =?UTF-8?q?=E6=9E=90sse=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- request/api/AgentChatStream.js | 365 ++++++++++++++++++--------------- 1 file changed, 199 insertions(+), 166 deletions(-) diff --git a/request/api/AgentChatStream.js b/request/api/AgentChatStream.js index 5f5abbf..1e408f9 100644 --- a/request/api/AgentChatStream.js +++ b/request/api/AgentChatStream.js @@ -12,168 +12,187 @@ const API = '/agent/assistant/chat'; let requestTask = null; let isAborted = false; // 添加终止状态标志 let currentPromiseReject = null; // 保存当前Promise的reject函数 -let requestId = 0; // 请求ID,用于区分不同的请求 +let currentMessageId = null; // 当前请求的messageId +/** + * 终止的请求 + */ const stopAbortTask = () => { - console.log('🛑 开始强制终止请求...'); - isAborted = true; // 立即设置终止标志 - - // 立即拒绝当前Promise(最强制的终止) - if (currentPromiseReject) { - console.log('🛑 立即拒绝Promise'); - currentPromiseReject(new Error('请求已被用户终止')); - currentPromiseReject = null; - } + console.log("🛑 开始强制终止请求... "); + isAborted = true; // 立即设置终止标志 - if (requestTask) { - // 先取消所有监听器(关键:必须在abort之前) - try { - if (requestTask.offChunkReceived) { - requestTask.offChunkReceived(); - console.log('🛑 已取消 ChunkReceived 监听'); - } - } catch (e) { - console.log('🛑 取消 ChunkReceived 监听失败:', e); - } - - try { - if (requestTask.offHeadersReceived) { - requestTask.offHeadersReceived(); - console.log('🛑 已取消 HeadersReceived 监听'); - } - } catch (e) { - console.log('🛑 取消 HeadersReceived 监听失败:', e); - } + // 立即拒绝当前Promise(最强制的终止) + if (currentPromiseReject) { + currentPromiseReject(new Error('请求已被用户终止')); + currentPromiseReject = null; + } - // 然后终止网络请求 - try { - if (requestTask.abort) { - requestTask.abort(); - console.log('🛑 已终止网络请求'); - } - } catch (e) { - console.log('🛑 终止网络请求失败:', e); - } + if (requestTask) { + // 先取消所有监听器 + try { + if (requestTask.offChunkReceived) { + requestTask.offChunkReceived(); + } + } catch (e) { + console.log('🛑 取消 ChunkReceived 监听失败:', e); + } - requestTask = null; - } - - // 递增请求ID,使旧请求的数据无效 - requestId++; - console.log('🛑 请求强制终止完成,新请求ID:', requestId); + try { + if (requestTask.offHeadersReceived) { + requestTask.offHeadersReceived(); + } + } catch (e) { + console.log('🛑 取消 HeadersReceived 监听失败:', e); + } + + // 然后终止网络请求 + try { + if (requestTask.abort) { + requestTask.abort(); + } + } catch (e) { + console.log('🛑 终止网络请求失败:', e); + } + + requestTask = null; + } + + currentMessageId = null; + console.log('🛑 请求强制终止完成'); } const agentChatStream = (params, onChunk) => { - const promise = new Promise((resolve, reject) => { - const token = uni.getStorageSync('token'); - let hasError = false; - isAborted = false; // 重置终止状态 - - // 保存当前Promise的reject函数,用于强制终止 - currentPromiseReject = reject; - - // 为当前请求分配ID - const currentRequestId = ++requestId; - console.log("🚀 发送请求内容: ", params, "请求ID:", currentRequestId) - // #ifdef MP-WEIXIN - requestTask = uni.request({ - url: BASE_URL + API, // 替换为你的接口地址 - method: 'POST', - data: params, - enableChunked: true, - header: { - Accept: 'text/event-stream', - 'Content-Type': 'application/json', - Authorization: `Bearer ${token}`, // 如需token可加 - }, - responseType: 'arraybuffer', - success(res) { - if (!isAborted && requestId === currentRequestId) { - console.log("✅ 请求成功,ID:", currentRequestId); - resolve(res.data); - } else { - console.log("❌ 请求已过期或终止,忽略success回调,当前ID:", requestId, "请求ID:", currentRequestId); - } - }, - fail(err) { - if (!isAborted && requestId === currentRequestId) { - console.log("❌ 请求失败,ID:", currentRequestId, "错误:", JSON.stringify(err)); - reject(err); - } else { - console.log("❌ 请求已过期或终止,忽略fail回调,当前ID:", requestId, "请求ID:", currentRequestId); - } - }, - complete(res) { - if (!isAborted && requestId === currentRequestId && res.statusCode !== 200) { - console.log("❌ 请求完成但状态错误,ID:", currentRequestId, "状态:", res.statusCode); - if (onChunk) { - onChunk({ error: true, message: '服务器错误', detail: res }); - } - reject(res); - } else if (requestId !== currentRequestId) { - console.log("❌ 请求已过期或终止,忽略complete回调,当前ID:", requestId, "请求ID:", currentRequestId); - } - } - }); + const promise = new Promise((resolve, reject) => { + const token = uni.getStorageSync('token'); + let hasError = false; + isAborted = false; // 重置终止状态 - requestTask.onHeadersReceived(res => { - // 检查请求是否已终止或过期 - if (isAborted || requestId !== currentRequestId) { - console.log('🚫 Headers已终止或过期,忽略,当前ID:', requestId, '请求ID:', currentRequestId); - return; - } + // 保存当前Promise的reject函数,用于强制终止 + currentPromiseReject = reject; - console.log('📡 onHeadersReceived,ID:', currentRequestId, res); - const status = res.statusCode || (res.header && res.header.statusCode); - if (status && status !== 200) { - hasError = true; - if (onChunk && !isAborted && requestId === currentRequestId) { - onChunk({ error: true, message: `服务器错误(${status})`, detail: res }); - } - if (requestTask && requestTask.abort) { - requestTask.abort(); - } - } - }); + // 设置当前请求的messageId,用于区分不同请求 + const messageId = params.messageId; + currentMessageId = messageId; // 记录当前请求的messageId + console.log("🚀 发送请求内容: ", params) - requestTask.onChunkReceived(res => { - // 第一道防线:立即检查请求ID和终止状态 - if (isAborted || hasError || requestTask === null || requestId !== currentRequestId) { - console.log('🚫 数据块已终止或过期,忽略 - 第一道检查,当前ID:', requestId, '请求ID:', currentRequestId); - return; - } - - console.log("📦 onChunkReceived,ID:", currentRequestId, res) - const base64 = uni.arrayBufferToBase64(res.data); - let data = ''; - try { - data = decodeURIComponent(escape(weAtob(base64))); - } catch (e) { - // 某些平台可能不支持 atob,可以直接用 base64 - data = base64; - } - - // 第二道防线:解析前再次检查 - if (isAborted || hasError || requestTask === null || requestId !== currentRequestId) { - console.log('🚫 数据块已终止或过期,忽略 - 第二道检查,当前ID:', requestId, '请求ID:', currentRequestId); - return; - } - - const messages = parseSSEChunk(data); - messages.forEach(msg => { - // 第三道防线:每个消息处理前都检查 - if (onChunk && !isAborted && !hasError && requestTask !== null && requestId === currentRequestId) { - console.log(`parseSSEChunk ${currentRequestId}:`, msg) - onChunk(msg); - } else { - console.log('🚫 消息已终止或过期,忽略处理,当前ID:', requestId, '请求ID:', currentRequestId); - } - }); - }); - // #endif + // #ifdef MP-WEIXIN + requestTask = uni.request({ + url: BASE_URL + API, // 替换为你的接口地址 + method: 'POST', + data: params, + enableChunked: true, + header: { + Accept: 'text/event-stream', + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}`, // 如需token可加 + }, + responseType: 'arraybuffer', + success(res) { + if (!isAborted && currentMessageId === messageId) { + console.log("✅ 请求成功"); + resolve(res.data); + } else if (currentMessageId !== messageId) { + console.log("❌ 请求已过期(messageId不匹配),忽略success回调"); + } else { + console.log("❌ 请求已终止,忽略success回调"); + } + }, + fail(err) { + if (!isAborted && currentMessageId === messageId) { + console.log("❌ 请求失败,错误:", JSON.stringify(err)); + reject(err); + } else if (currentMessageId !== messageId) { + console.log("❌ 请求已过期(messageId不匹配),忽略fail回调"); + } else { + console.log("❌ 请求已终止,忽略fail回调"); + } + }, + complete(res) { + if (!isAborted && currentMessageId === messageId && res.statusCode !== 200) { + console.log("❌ 请求完成但状态错误,状态:", res.statusCode); + if (onChunk) { + onChunk({ error: true, message: '服务器错误', detail: res }); + } + reject(res); + } else if (currentMessageId !== messageId) { + console.log("❌ 请求已过期(messageId不匹配),忽略complete回调"); + } else if (isAborted) { + console.log("❌ 请求已终止,忽略complete回调"); + } + } }); - return promise + requestTask.onHeadersReceived(res => { + // 检查请求是否已终止或过期(messageId不匹配) + if (isAborted || currentMessageId !== messageId) { + if (currentMessageId !== messageId) { + console.log('🚫 Headers已过期(messageId不匹配),忽略'); + } else { + console.log('🚫 Headers已终止,忽略'); + } + return; + } + + console.log('📡 onHeadersReceived,res:', res); + const status = res.statusCode || (res.header && res.header.statusCode); + if (status && status !== 200) { + hasError = true; + if (onChunk && !isAborted && currentMessageId === messageId) { + onChunk({ error: true, message: `服务器错误(${status})`, detail: res }); + } + if (requestTask && requestTask.abort) { + requestTask.abort(); + } + } + }); + + requestTask.onChunkReceived(res => { + // 第一道防线:立即检查终止状态和messageId(防止处理过期请求) + if (isAborted || hasError || requestTask === null || currentMessageId !== messageId) { + if (currentMessageId !== messageId) { + console.log('🚫 数据块已过期(messageId不匹配),忽略 - 第一道检查'); + } else { + console.log('🚫 数据块已终止或无效,忽略 - 第一道检查'); + } + return; + } + + const base64 = uni.arrayBufferToBase64(res.data); + let data = ''; + try { + data = decodeURIComponent(escape(weAtob(base64))); + } catch (e) { + // 某些平台可能不支持 atob,可以直接用 base64 + data = base64; + } + console.log("📦 onChunkReceived,res:", data) + + // 第二道防线:解析前再次检查 + if (isAborted || hasError || requestTask === null || currentMessageId !== messageId) { + if (currentMessageId !== messageId) { + console.log('🚫 数据块已过期(messageId不匹配),忽略 - 第二道检查'); + } else { + console.log('🚫 数据块已终止或无效,忽略 - 第二道检查'); + } + return; + } + + const messages = parseSSEChunk(data); + messages.forEach(msg => { + // 第三道防线:每个消息处理前都检查(确保只处理当前请求的消息) + if (onChunk && !isAborted && !hasError && requestTask !== null && currentMessageId === messageId) { + onChunk(msg); + } else if (currentMessageId !== messageId) { + console.log('🚫 消息已过期(messageId不匹配),忽略处理'); + } else { + console.log('🚫 消息已终止或无效,忽略处理'); + } + }); + }); + // #endif + }); + + return promise } // window.atob兼容性处理 @@ -226,23 +245,37 @@ const weAtob = (string) => { // 解析SSE分段数据 -function parseSSEChunk(raw) { - // 拆分为多段 - const lines = raw.split('\n\n'); - const results = []; - lines.forEach(line => { - // 只处理包含 data: 的行 - const dataMatch = line.match(/data:(\{.*\})/); - if (dataMatch && dataMatch[1]) { - try { - const obj = JSON.parse(dataMatch[1]); - results.push(obj); - } catch (e) { - // 解析失败忽略 - } - } - }); - return results; +const parseSSEChunk = (raw) => { + const results = []; + + // 按一个或多个连续换行分段,表示每一个事件块 + const chunks = raw.split(/\n\n+/); + + for (const chunk of chunks) { + const lines = chunk.split(/\r?\n/); + let dataLines = []; + + for (const line of lines) { + if (line.startsWith('data:')) { + // 提取data:后面的内容并去除首尾空格 + dataLines.push(line.slice(5).trim()); + } + } + + if (dataLines.length > 0) { + // 拼接多行数据 + const fullData = dataLines.join('\n'); + try { + const obj = JSON.parse(fullData); + results.push(obj); + } catch (e) { + console.warn('⚠️ SSE数据解析失败:', e, '原始数据:', fullData); + // 解析失败忽略 + } + } + } + + return results; } export { agentChatStream, stopAbortTask }