/** * 简化的WebSocket管理器 * 专门负责WebSocket连接和消息传输,不包含打字机逻辑 */ import { IdUtils, CallbackUtils, MessageUtils, TimerUtils } from "./index.js"; export class WebSocketManager { constructor(options = {}) { // 基础配置 console.log("WebSocketManager构造函数接收到的options:", options); this.wsUrl = options.wsUrl || ""; console.log("WebSocketManager设置的wsUrl:", this.wsUrl); this.protocols = options.protocols || []; this.reconnectInterval = options.reconnectInterval || 3000; this.maxReconnectAttempts = options.maxReconnectAttempts || 5; this.heartbeatInterval = options.heartbeatInterval || 30000; // WebSocket 实例 this.ws = null; this.reconnectAttempts = 0; this.isConnecting = false; this.connectionState = false; // 计时器 this.heartbeatTimer = null; this.reconnectTimer = null; // 回调函数 this.callbacks = { onConnect: options.onConnect || (() => {}), onDisconnect: options.onDisconnect || (() => {}), onError: options.onError || (() => {}), onMessage: options.onMessage || (() => {}), getConversationId: options.getConversationId || (() => ""), getAgentId: options.getAgentId || (() => ""), }; // 消息队列 this.messageQueue = []; this.isProcessingQueue = false; // 性能统计 this.stats = { messagesReceived: 0, messagesSent: 0, messagesDropped: 0, reconnectCount: 0, connectionStartTime: null, lastMessageTime: null, }; } /** * 安全调用回调函数 */ _safeCallCallback(callbackName, ...args) { CallbackUtils.safeCall(this.callbacks, callbackName, ...args); } /** * 初始化连接 */ async init(wsUrl) { if (wsUrl) { this.wsUrl = wsUrl; } if (!this.wsUrl) { throw new Error("WebSocket URL is required"); } await this.connect(); } /** * 建立WebSocket连接 */ async connect() { if (this.isConnecting || this.connectionState) { console.log("WebSocket已在连接中或已连接,跳过连接"); return; } console.log("开始建立WebSocket连接:", this.wsUrl); this.isConnecting = true; try { // 在小程序环境中使用 uni.connectSocket if (typeof uni !== "undefined" && uni.connectSocket) { try { this.ws = uni.connectSocket({ url: this.wsUrl, protocols: this.protocols, success: () => { console.log("uni.connectSocket调用成功"); }, fail: (error) => { console.error("uni.connectSocket调用失败:", error); this.isConnecting = false; this.connectionState = false; this._safeCallCallback("onError", { type: "CONNECTION_ERROR", message: "uni.connectSocket调用失败", originalError: error, }); this.scheduleReconnect(); return; }, }); // 确保ws对象存在且有相应方法 if (this.ws && typeof this.ws.onOpen === "function") { this.ws.onOpen(this.handleOpen.bind(this)); this.ws.onMessage(this.handleMessage.bind(this)); this.ws.onClose(this.handleClose.bind(this)); this.ws.onError(this.handleError.bind(this)); } else { console.error("uni.connectSocket返回的对象无效"); this.isConnecting = false; this.connectionState = false; this._safeCallCallback("onError", { type: "CONNECTION_ERROR", message: "uni.connectSocket返回的SocketTask对象无效", originalError: null, }); this.scheduleReconnect(); return; } } catch (error) { console.error("创建uni.connectSocket时发生异常:", error); this.isConnecting = false; this.connectionState = false; this._safeCallCallback("onError", { type: "CONNECTION_ERROR", message: "创建WebSocket连接时发生异常", originalError: error, }); this.scheduleReconnect(); return; } } else { // 在其他环境中使用标准 WebSocket console.log("使用标准WebSocket创建连接"); this.ws = new WebSocket(this.wsUrl, this.protocols); this.ws.onopen = this.handleOpen.bind(this); this.ws.onmessage = this.handleMessage.bind(this); this.ws.onclose = this.handleClose.bind(this); this.ws.onerror = this.handleError.bind(this); } console.log("WebSocket实例创建成功,等待连接..."); } catch (error) { console.error("WebSocket连接创建失败:", error); this.isConnecting = false; this.connectionState = false; this._safeCallCallback("onError", { type: "CONNECTION_CREATE_ERROR", message: "WebSocket连接创建失败", originalError: error, }); this.scheduleReconnect(); } } /** * 连接成功处理 */ handleOpen(event) { console.log("WebSocket连接已建立"); this.isConnecting = false; this.connectionState = true; this.reconnectAttempts = 0; this.stats.connectionStartTime = Date.now(); this.startHeartbeat(); // 安全调用 onConnect 回调 this._safeCallCallback("onConnect", event); // 处理队列中的消息 this._processMessageQueue(); } /** * 处理消息队列 */ _processMessageQueue() { while (this.messageQueue.length > 0) { const queuedMessage = this.messageQueue.shift(); // 检查重试次数 if (queuedMessage.retryCount >= 3) { console.warn("消息重试次数超限,丢弃消息:", queuedMessage); this.stats.messagesDropped++; continue; } queuedMessage.retryCount = (queuedMessage.retryCount || 0) + 1; this.sendMessage(queuedMessage); } } /** * 消息接收处理 */ handleMessage(event) { try { // 在小程序环境中,消息数据可能在 event.data 中 const messageData = event.data || event; // 处理心跳响应和其他非JSON消息 - 在JSON解析前检查 if (typeof messageData === "string") { // 处理心跳响应 if (MessageUtils.isPongMessage(messageData)) { console.log("收到心跳响应:", messageData); return; } // 尝试解析JSON,如果失败则忽略该消息 const data = MessageUtils.safeParseJSON(messageData); if (data) { this.stats.messagesReceived++; this.stats.lastMessageTime = Date.now(); console.log("收到WebSocket消息:", data); // 心跳响应 - 兼容对象格式 if (MessageUtils.isPongMessage(data)) { return; } // 直接传递消息给回调处理 this._safeCallCallback("onMessage", data); } else { console.warn("收到非JSON格式消息,已忽略:", messageData); return; } } else { // 处理对象格式的消息 const data = messageData; this.stats.messagesReceived++; this.stats.lastMessageTime = Date.now(); console.log("收到WebSocket消息:", data); // 心跳响应 - 兼容对象格式 if (MessageUtils.isPongMessage(data)) { return; } // 直接传递消息给回调处理 this._safeCallCallback("onMessage", data); } } catch (error) { console.error("消息处理错误:", error, "原始消息:", event); // 不抛出错误,避免影响其他消息处理 } } /** * 连接关闭处理 */ handleClose(event) { console.log("WebSocket连接已关闭:", event.code, event.reason); this.connectionState = false; this.isConnecting = false; this.stopHeartbeat(); // 安全调用 onDisconnect 回调 this._safeCallCallback("onDisconnect", event); // 非正常关闭时尝试重连 if ( event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts ) { this.scheduleReconnect(); } } /** * 错误处理 */ handleError(error) { const errorDetails = { error, wsUrl: this.wsUrl, readyState: this.ws ? this.ws.readyState : "WebSocket实例不存在", isConnecting: this.isConnecting, connectionState: this.connectionState, reconnectAttempts: this.reconnectAttempts, timestamp: new Date().toISOString(), }; console.error("WebSocket错误详情:", errorDetails); // 重置连接状态 this.connectionState = false; this.isConnecting = false; // 构造结构化错误信息 const structuredError = { type: "WEBSOCKET_ERROR", message: error?.message || "未知WebSocket错误", originalError: error, details: errorDetails, }; // 安全调用 onError 回调 this._safeCallCallback("onError", structuredError); // 如果不是正在重连中,尝试重连 if ( !this.reconnectTimer && this.reconnectAttempts < this.maxReconnectAttempts ) { console.log("错误后尝试重连"); this.scheduleReconnect(); } } /** * 发送消息 */ sendMessage(message) { const messageData = { ...message, timestamp: Date.now(), retryCount: 0, }; if (this.connectionState) { try { const messageStr = JSON.stringify(messageData); // 在小程序环境中使用不同的发送方法 if (typeof uni !== "undefined" && this.ws && this.ws.send) { // uni-app 小程序环境 this.ws.send({ data: messageStr, success: () => { this.stats.messagesSent++; console.log("消息发送成功:", messageData); }, fail: (error) => { console.error("发送消息失败:", error); this._safeCallCallback("onError", error); // 发送失败时加入重试队列 this.messageQueue.push(messageData); }, }); } else if (this.ws && this.ws.send) { // 标准 WebSocket 环境 this.ws.send(messageStr); this.stats.messagesSent++; console.log("消息发送成功:", messageData); } return true; } catch (error) { console.error("发送消息失败:", error); this._safeCallCallback("onError", error); // 发送失败时加入重试队列 this.messageQueue.push(messageData); return false; } } else { // 连接未建立时加入队列 this.messageQueue.push(messageData); this.connect(); // 尝试连接 return false; } } /** * 开始心跳 */ startHeartbeat() { this.stopHeartbeat(); this.heartbeatTimer = setInterval(() => { if (this.connectionState && this.ws) { try { // 使用标准消息格式发送心跳 (messageType=3) const heartbeatMessage = { conversationId: this.callbacks.getConversationId ? this.callbacks.getConversationId() : "", agentId: this.callbacks.getAgentId ? this.callbacks.getAgentId() : "", messageType: 3, // 心跳检测 messageContent: "heartbeat", messageId: IdUtils.generateMessageId(), }; this.sendMessage(heartbeatMessage); console.log("心跳消息已发送:", heartbeatMessage); } catch (error) { console.error("发送心跳失败:", error); this.handleError(error); } } }, this.heartbeatInterval); } /** * 停止心跳 */ stopHeartbeat() { TimerUtils.clearTimer(this.heartbeatTimer, "interval"); this.heartbeatTimer = null; } /** * 安排重连 */ scheduleReconnect() { // 清理之前的重连定时器 if (this.reconnectTimer) { TimerUtils.clearTimer(this.reconnectTimer); this.reconnectTimer = null; } if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error( `达到最大重连次数(${this.maxReconnectAttempts}),停止重连` ); this.connectionState = false; this.isConnecting = false; // 使用更温和的错误处理,避免抛出异常 const errorInfo = { type: "CONNECTION_FAILED", message: `连接失败,已达到最大重连次数(${this.maxReconnectAttempts})`, attempts: this.reconnectAttempts, maxAttempts: this.maxReconnectAttempts, }; console.warn("WebSocket连接最终失败:", errorInfo); this._safeCallCallback("onError", errorInfo); return; } this.reconnectAttempts++; this.stats.reconnectCount++; // 使用指数退避策略 const backoffDelay = Math.min( this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts - 1), 30000 // 最大延迟30秒 ); console.log( `${backoffDelay}ms后进行第${this.reconnectAttempts}/${this.maxReconnectAttempts}次重连` ); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this.connect().catch((error) => { console.error("重连过程中发生错误:", error); // 继续尝试重连 this.scheduleReconnect(); }); }, backoffDelay); } /** * 重置连接状态 */ resetConnectionState() { this.connectionState = false; this.isConnecting = false; this.reconnectAttempts = 0; // 清理定时器 this.stopHeartbeat(); if (this.reconnectTimer) { TimerUtils.clearTimer(this.reconnectTimer); this.reconnectTimer = null; } console.log("连接状态已重置"); } /** * 检查是否已连接 */ isConnected() { return ( this.connectionState && this.ws && (this.ws.readyState === 1 || (typeof WebSocket !== "undefined" && this.ws.readyState === WebSocket.OPEN)) ); } /** * 手动重连 */ async reconnect() { console.log("手动触发重连"); // 先关闭现有连接 this.close(); // 重置连接状态 this.resetConnectionState(); // 重新连接 try { await this.connect(); return true; } catch (error) { console.error("手动重连失败:", error); return false; } } /** * 获取连接状态 */ getConnectionState() { return { isConnected: this.isConnected(), readyState: this.ws ? this.ws.readyState : -1, reconnectAttempts: this.reconnectAttempts, maxReconnectAttempts: this.maxReconnectAttempts, isConnecting: this.isConnecting, hasReconnectTimer: !!this.reconnectTimer, }; } /** * 获取统计信息 */ getStats() { const now = Date.now(); const connectionTime = this.stats.connectionStartTime ? now - this.stats.connectionStartTime : 0; return { ...this.stats, connectionTime, queueLength: this.messageQueue.length, isConnected: this.connectionState, }; } /** * 重置统计信息 */ resetStats() { this.stats = { messagesReceived: 0, messagesSent: 0, messagesDropped: 0, reconnectCount: 0, connectionStartTime: Date.now(), lastMessageTime: null, }; console.log("统计信息已重置"); } /** * 关闭连接 */ close() { this.stopHeartbeat(); // 清理重连定时器 TimerUtils.clearTimer(this.reconnectTimer); this.reconnectTimer = null; if (this.ws) { // 在小程序环境中使用不同的关闭方法 if (typeof uni !== "undefined" && this.ws.close) { // uni-app 小程序环境 this.ws.close({ code: 1000, reason: "正常关闭", success: () => { console.log("WebSocket连接已关闭"); }, fail: (error) => { console.error("关闭WebSocket连接失败:", error); }, }); } else if (this.ws.close) { // 标准 WebSocket 环境 this.ws.close(1000, "正常关闭"); } this.ws = null; } this.connectionState = false; this.isConnecting = false; this.messageQueue = []; } /** * 销毁实例 */ destroy() { // 输出最终统计信息 console.log("WebSocketManager销毁前统计:", this.getStats()); this.close(); this.callbacks = {}; console.log("WebSocketManager实例已销毁"); } } export default WebSocketManager;