feat: 消息与socket的优化调整

This commit is contained in:
2026-01-08 16:26:10 +08:00
parent fb15b8aec1
commit af615f666c
2 changed files with 268 additions and 95 deletions

View File

@@ -151,6 +151,19 @@ let webSocketManager = null;
/// 使用统一的连接状态判断函数,避免状态不同步
const isWsConnected = () => !!(webSocketManager && typeof webSocketManager.isConnected === "function" && webSocketManager.isConnected());
// pendingMap: messageId -> msgIndex
const pendingMap = new Map();
// pendingTimeouts: messageId -> timeoutId
const pendingTimeouts = new Map();
// 超时时间ms
const MESSAGE_TIMEOUT = 30000;
// 防止并发初始化 websocket
let isInitializing = false;
let pendingInitPromise = null;
// sleep helper
const sleep = (ms) => new Promise((res) => setTimeout(res, ms));
// 当前会话的消息ID用于保持发送和终止的messageId一致
let currentSessionMessageId = null;
@@ -355,9 +368,21 @@ const getMainPageData = async () => {
/// =============对话↓================
// 初始化WebSocket
const initWebSocket = async () => {
// 防止并发初始化
if (isInitializing) {
return pendingInitPromise;
}
isInitializing = true;
pendingInitPromise = (async () => {
// 清理旧实例
if (webSocketManager) {
try {
webSocketManager.destroy();
} catch (e) {
console.warn("destroy old webSocketManager failed:", e);
}
webSocketManager = null;
}
// 使用配置的WebSocket服务器地址
@@ -411,29 +436,68 @@ const initWebSocket = async () => {
} catch (error) {
console.error("WebSocket连接失败:", error);
return false;
} finally {
isInitializing = false;
pendingInitPromise = null;
}
})();
return pendingInitPromise;
};
// 处理WebSocket消息
const handleWebSocketMessage = (data) => {
const aiMsgIndex = chatMsgList.value.length - 1;
if (!chatMsgList.value[aiMsgIndex] || aiMsgIndex < 0) {
console.error("处理WebSocket消息时找不到对应的AI消息项");
// 验证关键字段(若服务端传回 conversationId/agentId则校验是否属于当前会话
if (data.conversationId && data.conversationId !== conversationId.value) {
console.warn("收到不属于当前会话的消息,忽略", data.conversationId);
return;
}
if (data.agentId && data.agentId !== agentId.value) {
console.warn("收到不属于当前 agent 的消息,忽略", data.agentId);
return;
}
// 确保消息内容是字符串类型
if (data.content && typeof data.content !== "string") {
try {
data.content = JSON.stringify(data.content);
} catch (e) {
data.content = String(data.content);
}
}
// 直接拼接内容到AI消息
// 优先使用 messageId 进行匹配
const msgId = data.messageId || data.id || data.msgId;
let aiMsgIndex = -1;
if (msgId && pendingMap.has(msgId)) {
aiMsgIndex = pendingMap.get(msgId);
} else if (!msgId && currentSessionMessageId && pendingMap.has(currentSessionMessageId)) {
// 服务端未返回 messageId 的场景:优先使用当前会话的 messageId 映射
aiMsgIndex = pendingMap.get(currentSessionMessageId);
} else {
// 向后搜索最近的 AI 消息
for (let i = chatMsgList.value.length - 1; i >= 0; i--) {
if (chatMsgList.value[i] && chatMsgList.value[i].msgType === MessageRole.AI) {
aiMsgIndex = i;
break;
}
}
if (aiMsgIndex === -1) {
console.error("处理WebSocket消息时找不到对应的AI消息项");
return;
}
}
// 直接拼接内容到对应 AI 消息
if (data.content) {
if (chatMsgList.value[aiMsgIndex].isLoading) {
chatMsgList.value[aiMsgIndex].msg = "";
}
chatMsgList.value[aiMsgIndex].msg += data.content;
// 首次收到内容:替换“加载中”文案并取消 loading 状态(恢复原始渲染逻辑)
chatMsgList.value[aiMsgIndex].msg = data.content;
chatMsgList.value[aiMsgIndex].isLoading = false;
} else {
// 后续流式内容追加
chatMsgList.value[aiMsgIndex].msg += data.content;
}
nextTick(() => scrollToBottom());
}
@@ -458,8 +522,20 @@ const handleWebSocketMessage = (data) => {
chatMsgList.value[aiMsgIndex].question = data.question;
}
// 清理 pendingMap / timeout
const ownedMessageId = chatMsgList.value[aiMsgIndex].messageId || msgId;
if (ownedMessageId) {
if (pendingTimeouts.has(ownedMessageId)) {
clearTimeout(pendingTimeouts.get(ownedMessageId));
pendingTimeouts.delete(ownedMessageId);
}
pendingMap.delete(ownedMessageId);
}
// 重置会话状态
isSessionActive.value = false;
// 清理当前会话的 messageId避免保留陈旧 id
resetMessageState();
}
};
@@ -545,31 +621,76 @@ const sendMessage = async (message, isInstruct = false) => {
console.log("发送的新消息:", JSON.stringify(newMsg));
};
// 通用WebSocket消息发送函数
const sendWebSocketMessage = (messageType, messageContent) => {
// 通用WebSocket消息发送函数 -> 返回 Promise<boolean>
const sendWebSocketMessage = async (messageType, messageContent, options = {}) => {
const args = {
conversationId: conversationId.value,
agentId: agentId.value,
messageType: String(messageType), // 消息类型 0-对话 1-指令 2-中断停止 3-心跳检测
messageContent: messageContent,
messageId: currentSessionMessageId,
messageId: options.messageId || currentSessionMessageId,
};
const maxRetries = typeof options.retries === 'number' ? options.retries : 3;
const baseDelay = typeof options.baseDelay === 'number' ? options.baseDelay : 300; // ms
const maxDelay = typeof options.maxDelay === 'number' ? options.maxDelay : 5000; // ms
for (let attempt = 0; attempt <= maxRetries; attempt++) {
// 确保连接
if (!isWsConnected()) {
if (options.tryReconnect) {
try {
// 直接调用webSocketManager的sendMessage方法利用其内部的消息队列机制
// 即使当前连接断开,消息也会被加入队列,等待连接恢复后发送
const result = webSocketManager.sendMessage(args);
console.log(`WebSocket消息已发送 [类型:${messageType}]:`, args);
return result;
} catch (error) {
console.error("发送WebSocket消息失败:", error);
await initWebSocket();
} catch (e) {
console.error('reconnect failed in sendWebSocketMessage:', e);
}
}
}
if (!isWsConnected()) {
if (!options.silent) console.warn('WebSocket 未连接,无法发送消息', args);
// 如果还有重试机会,进行等待后重试
if (attempt < maxRetries) {
const delay = Math.min(maxDelay, baseDelay * Math.pow(2, attempt));
await sleep(delay);
continue;
}
isSessionActive.value = false;
return false;
}
try {
const raw = webSocketManager.sendMessage(args);
// 兼容可能返回同步布尔或 Promise 的实现
const result = await Promise.resolve(raw);
if (result) {
console.log(`WebSocket消息已发送 [类型:${messageType}]:`, args);
return true;
}
// 若返回 false准备重试
console.warn('webSocketManager.sendMessage 返回 false准备重试', { attempt, args });
} catch (error) {
console.error('发送WebSocket消息异常:', error, args);
}
// 失败且还有重试机会,等待指数退避
if (attempt < maxRetries) {
const delay = Math.min(maxDelay, baseDelay * Math.pow(2, attempt));
await sleep(delay + Math.floor(Math.random() * 100));
continue;
}
// 最后一次失败
isSessionActive.value = false;
return false;
}
// 不可达,但为了类型安全
isSessionActive.value = false;
return false;
};
// 发送获取AI聊天消息
const sendChat = (message, isInstruct = false) => {
const sendChat = async (message, isInstruct = false) => {
// 检查WebSocket管理器是否存在如果不存在尝试重新初始化
if (!webSocketManager) {
console.error("WebSocket管理器不存在尝试重新初始化...");
@@ -596,9 +717,10 @@ const sendChat = (message, isInstruct = false) => {
const messageType = isInstruct ? 1 : 0;
const messageContent = isInstruct ? commonType : message;
// 生成 messageId 并保存到当前会话变量stopRequest 可能使用)
currentSessionMessageId = IdUtils.generateMessageId();
// 插入AI消息
// 插入AI消息,并在 pendingMap 中记录
const aiMsg = {
msgId: `msg_${chatMsgList.value.length}`,
msgType: MessageRole.AI,
@@ -608,49 +730,88 @@ const sendChat = (message, isInstruct = false) => {
type: MessageType.TEXT,
url: "",
},
messageId: currentSessionMessageId,
};
chatMsgList.value.push(aiMsg);
// 添加AI消息后滚动到底部
setTimeoutScrollToBottom();
const aiMsgIndex = chatMsgList.value.length - 1;
// 发送消息
const success = sendWebSocketMessage(messageType, messageContent);
// 记录 pendingMap
pendingMap.set(currentSessionMessageId, aiMsgIndex);
// 启动超时回退
const timeoutId = setTimeout(() => {
const idx = pendingMap.get(currentSessionMessageId);
if (idx != null && chatMsgList.value[idx] && chatMsgList.value[idx].isLoading) {
chatMsgList.value[idx].msg = "请求超时,请重试";
chatMsgList.value[idx].isLoading = false;
pendingMap.delete(currentSessionMessageId);
pendingTimeouts.delete(currentSessionMessageId);
isSessionActive.value = false;
setTimeoutScrollToBottom();
}
}, MESSAGE_TIMEOUT);
pendingTimeouts.set(currentSessionMessageId, timeoutId);
// 发送消息,支持重连尝试
const success = await sendWebSocketMessage(messageType, messageContent, { messageId: currentSessionMessageId, tryReconnect: true });
if (!success) {
chatMsgList.value[aiMsgIndex].msg = "发送消息失败,请重试";
chatMsgList.value[aiMsgIndex].isLoading = false;
const idx = pendingMap.get(currentSessionMessageId);
if (idx != null && chatMsgList.value[idx]) {
chatMsgList.value[idx].msg = "发送消息失败,请重试";
chatMsgList.value[idx].isLoading = false;
}
// 清理 pending
if (pendingTimeouts.has(currentSessionMessageId)) {
clearTimeout(pendingTimeouts.get(currentSessionMessageId));
pendingTimeouts.delete(currentSessionMessageId);
}
pendingMap.delete(currentSessionMessageId);
isSessionActive.value = false;
}
// 重置消息状态,为新消息做准备
resetMessageState();
};
// 停止请求函数
const stopRequest = () => {
const stopRequest = async () => {
console.log("停止请求");
// 发送中断消息给服务器 (messageType=2)
sendWebSocketMessage(2, "stop_request");
// 发送中断消息给服务器 (messageType=2),带上当前 messageId
try {
await sendWebSocketMessage(2, "stop_request", { messageId: currentSessionMessageId, silent: true });
} catch (e) {
console.warn("stopRequest send failed:", e);
}
// 直接将AI消息状态设为停止
const aiMsgIndex = chatMsgList.value.length - 1;
if (
chatMsgList.value[aiMsgIndex] &&
chatMsgList.value[aiMsgIndex].msgType === MessageRole.AI
) {
// 直接将AI消息状态设为停止(优先使用 pendingMap 映射)
let aiMsgIndex = -1;
if (currentSessionMessageId && pendingMap.has(currentSessionMessageId)) {
aiMsgIndex = pendingMap.get(currentSessionMessageId);
} else {
aiMsgIndex = chatMsgList.value.length - 1;
}
if (chatMsgList.value[aiMsgIndex] &&
chatMsgList.value[aiMsgIndex].msgType === MessageRole.AI) {
chatMsgList.value[aiMsgIndex].isLoading = false;
if (
chatMsgList.value[aiMsgIndex].msg &&
if (chatMsgList.value[aiMsgIndex].msg &&
chatMsgList.value[aiMsgIndex].msg.trim() &&
!chatMsgList.value[aiMsgIndex].msg.startsWith("加载中")
) {
!chatMsgList.value[aiMsgIndex].msg.startsWith("加载中")) {
// 保留已显示内容
} else {
chatMsgList.value[aiMsgIndex].msg = "请求已停止";
}
}
// 清理 pending
if (currentSessionMessageId) {
if (pendingTimeouts.has(currentSessionMessageId)) {
clearTimeout(pendingTimeouts.get(currentSessionMessageId));
pendingTimeouts.delete(currentSessionMessageId);
}
pendingMap.delete(currentSessionMessageId);
}
// 重置会话状态
isSessionActive.value = false;
setTimeoutScrollToBottom();
@@ -678,6 +839,17 @@ const resetConfig = () => {
resetMessageState();
isSessionActive.value = false;
// 清理 pendingMap / pendingTimeouts
try {
for (const t of pendingTimeouts.values()) {
clearTimeout(t);
}
} catch (e) {
// ignore
}
pendingTimeouts.clear();
pendingMap.clear();
// 清理定时器
if (holdKeyboardTimer.value) {
clearTimeout(holdKeyboardTimer.value);

View File

@@ -28,8 +28,9 @@ export class WebSocketManager {
// 回调函数
this.callbacks = {
onConnect: options.onConnect || (() => {}),
onDisconnect: options.onDisconnect || (() => {}),
// 支持两套回调命名onConnect/onDisconnect 与 onOpen/onClose兼容调用方
onConnect: options.onConnect || options.onOpen || (() => { }),
onDisconnect: options.onDisconnect || options.onClose || (() => { }),
onError: options.onError || (() => { }),
onMessage: options.onMessage || (() => { }),
getConversationId: options.getConversationId || (() => ""),
@@ -327,7 +328,7 @@ export class WebSocketManager {
const messageData = {
...message,
timestamp: Date.now(),
retryCount: 0,
retryCount: typeof message.retryCount === 'number' ? message.retryCount : 0,
};
if (this.connectionState) {
@@ -392,7 +393,7 @@ export class WebSocketManager {
agentId: this.callbacks.getAgentId
? this.callbacks.getAgentId()
: "",
messageType: 3, // 心跳检测
messageType: '3', // 心跳检测
messageContent: "heartbeat",
messageId: IdUtils.generateMessageId(),
};