Files
YGChatCS/utils/WebSocketManager.js

622 lines
20 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* 简化的WebSocket管理器
* 专门负责WebSocket连接和消息传输不包含打字机逻辑
*/
import { IdUtils, CallbackUtils, MessageUtils, TimerUtils } from "./index.js";
export class WebSocketManager {
constructor(options = {}) {
// 基础配置
console.log("WebSocketManager构造函数接收到的options:", options);
this.wsUrl = options.wsUrl || "";
console.log("WebSocketManager设置的wsUrl:", this.wsUrl);
this.protocols = options.protocols || [];
this.reconnectInterval = options.reconnectInterval || 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.heartbeatInterval = options.heartbeatInterval || 30000;
// WebSocket 实例
this.ws = null;
this.reconnectAttempts = 0;
this.isConnecting = false;
this.connectionState = false;
// 计时器
this.heartbeatTimer = null;
this.reconnectTimer = null;
// 回调函数
this.callbacks = {
onConnect: options.onConnect || (() => {}),
onDisconnect: options.onDisconnect || (() => {}),
onError: options.onError || (() => {}),
onMessage: options.onMessage || (() => {}),
getConversationId: options.getConversationId || (() => ""),
getAgentId: options.getAgentId || (() => ""),
};
// 消息队列
this.messageQueue = [];
this.isProcessingQueue = false;
// 性能统计
this.stats = {
messagesReceived: 0,
messagesSent: 0,
messagesDropped: 0,
reconnectCount: 0,
connectionStartTime: null,
lastMessageTime: null,
};
}
/**
* 安全调用回调函数
*/
_safeCallCallback(callbackName, ...args) {
CallbackUtils.safeCall(this.callbacks, callbackName, ...args);
}
/**
* 初始化连接
*/
async init(wsUrl) {
if (wsUrl) {
this.wsUrl = wsUrl;
}
if (!this.wsUrl) {
throw new Error("WebSocket URL is required");
}
await this.connect();
}
/**
* 建立WebSocket连接
*/
async connect() {
if (this.isConnecting || this.connectionState) {
console.log("WebSocket已在连接中或已连接跳过连接");
return;
}
console.log("开始建立WebSocket连接:", this.wsUrl);
this.isConnecting = true;
try {
// 在小程序环境中使用 uni.connectSocket
if (typeof uni !== "undefined" && uni.connectSocket) {
try {
this.ws = uni.connectSocket({
url: this.wsUrl,
protocols: this.protocols,
success: () => {
console.log("uni.connectSocket调用成功");
},
fail: (error) => {
console.error("uni.connectSocket调用失败:", error);
this.isConnecting = false;
this.connectionState = false;
this._safeCallCallback("onError", {
type: "CONNECTION_ERROR",
message: "uni.connectSocket调用失败",
originalError: error,
});
this.scheduleReconnect();
return;
},
});
// 确保ws对象存在且有相应方法
if (this.ws && typeof this.ws.onOpen === "function") {
this.ws.onOpen(this.handleOpen.bind(this));
this.ws.onMessage(this.handleMessage.bind(this));
this.ws.onClose(this.handleClose.bind(this));
this.ws.onError(this.handleError.bind(this));
} else {
console.error("uni.connectSocket返回的对象无效");
this.isConnecting = false;
this.connectionState = false;
this._safeCallCallback("onError", {
type: "CONNECTION_ERROR",
message:
"uni.connectSocket返回的SocketTask对象无效",
originalError: null,
});
this.scheduleReconnect();
return;
}
} catch (error) {
console.error("创建uni.connectSocket时发生异常:", error);
this.isConnecting = false;
this.connectionState = false;
this._safeCallCallback("onError", {
type: "CONNECTION_ERROR",
message: "创建WebSocket连接时发生异常",
originalError: error,
});
this.scheduleReconnect();
return;
}
} else {
// 在其他环境中使用标准 WebSocket
console.log("使用标准WebSocket创建连接");
this.ws = new WebSocket(this.wsUrl, this.protocols);
this.ws.onopen = this.handleOpen.bind(this);
this.ws.onmessage = this.handleMessage.bind(this);
this.ws.onclose = this.handleClose.bind(this);
this.ws.onerror = this.handleError.bind(this);
}
console.log("WebSocket实例创建成功等待连接...");
} catch (error) {
console.error("WebSocket连接创建失败:", error);
this.isConnecting = false;
this.connectionState = false;
this._safeCallCallback("onError", {
type: "CONNECTION_CREATE_ERROR",
message: "WebSocket连接创建失败",
originalError: error,
});
this.scheduleReconnect();
}
}
/**
* 连接成功处理
*/
handleOpen(event) {
console.log("WebSocket连接已建立");
this.isConnecting = false;
this.connectionState = true;
this.reconnectAttempts = 0;
this.stats.connectionStartTime = Date.now();
this.startHeartbeat();
// 安全调用 onConnect 回调
this._safeCallCallback("onConnect", event);
// 处理队列中的消息
this._processMessageQueue();
}
/**
* 处理消息队列
*/
_processMessageQueue() {
while (this.messageQueue.length > 0) {
const queuedMessage = this.messageQueue.shift();
// 检查重试次数
if (queuedMessage.retryCount >= 3) {
console.warn("消息重试次数超限,丢弃消息:", queuedMessage);
this.stats.messagesDropped++;
continue;
}
queuedMessage.retryCount = (queuedMessage.retryCount || 0) + 1;
this.sendMessage(queuedMessage);
}
}
/**
* 消息接收处理
*/
handleMessage(event) {
try {
// 在小程序环境中,消息数据可能在 event.data 中
const messageData = event.data || event;
// 处理心跳响应和其他非JSON消息 - 在JSON解析前检查
if (typeof messageData === "string") {
// 处理心跳响应
if (MessageUtils.isPongMessage(messageData)) {
console.log("收到心跳响应:", messageData);
return;
}
// 尝试解析JSON如果失败则忽略该消息
const data = MessageUtils.safeParseJSON(messageData);
if (data) {
this.stats.messagesReceived++;
this.stats.lastMessageTime = Date.now();
console.log("收到WebSocket消息:", data);
// 心跳响应 - 兼容对象格式
if (MessageUtils.isPongMessage(data)) {
return;
}
// 直接传递消息给回调处理
this._safeCallCallback("onMessage", data);
} else {
console.warn("收到非JSON格式消息已忽略:", messageData);
return;
}
} else {
// 处理对象格式的消息
const data = messageData;
this.stats.messagesReceived++;
this.stats.lastMessageTime = Date.now();
console.log("收到WebSocket消息:", data);
// 心跳响应 - 兼容对象格式
if (MessageUtils.isPongMessage(data)) {
return;
}
// 直接传递消息给回调处理
this._safeCallCallback("onMessage", data);
}
} catch (error) {
console.error("消息处理错误:", error, "原始消息:", event);
// 不抛出错误,避免影响其他消息处理
}
}
/**
* 连接关闭处理
*/
handleClose(event) {
console.log("WebSocket连接已关闭:", event.code, event.reason);
this.connectionState = false;
this.isConnecting = false;
this.stopHeartbeat();
// 安全调用 onDisconnect 回调
this._safeCallCallback("onDisconnect", event);
// 非正常关闭时尝试重连
if (
event.code !== 1000 &&
this.reconnectAttempts < this.maxReconnectAttempts
) {
this.scheduleReconnect();
}
}
/**
* 错误处理
*/
handleError(error) {
const errorDetails = {
error,
wsUrl: this.wsUrl,
readyState: this.ws ? this.ws.readyState : "WebSocket实例不存在",
isConnecting: this.isConnecting,
connectionState: this.connectionState,
reconnectAttempts: this.reconnectAttempts,
timestamp: new Date().toISOString(),
};
console.error("WebSocket错误详情:", errorDetails);
// 重置连接状态
this.connectionState = false;
this.isConnecting = false;
// 构造结构化错误信息
const structuredError = {
type: "WEBSOCKET_ERROR",
message: error?.message || "未知WebSocket错误",
originalError: error,
details: errorDetails,
};
// 安全调用 onError 回调
this._safeCallCallback("onError", structuredError);
// 如果不是正在重连中,尝试重连
if (
!this.reconnectTimer &&
this.reconnectAttempts < this.maxReconnectAttempts
) {
console.log("错误后尝试重连");
this.scheduleReconnect();
}
}
/**
* 发送消息
*/
sendMessage(message) {
const messageData = {
...message,
timestamp: Date.now(),
retryCount: 0,
};
if (this.connectionState) {
try {
const messageStr = JSON.stringify(messageData);
// 在小程序环境中使用不同的发送方法
if (typeof uni !== "undefined" && this.ws && this.ws.send) {
// uni-app 小程序环境
this.ws.send({
data: messageStr,
success: () => {
this.stats.messagesSent++;
//console.log("消息发送成功:", messageData);
},
fail: (error) => {
console.error("发送消息失败:", error);
this._safeCallCallback("onError", error);
// 发送失败时加入重试队列
this.messageQueue.push(messageData);
},
});
} else if (this.ws && this.ws.send) {
// 标准 WebSocket 环境
this.ws.send(messageStr);
this.stats.messagesSent++;
//console.log("消息发送成功:", messageData);
}
return true;
} catch (error) {
console.error("发送消息失败:", error);
this._safeCallCallback("onError", error);
// 发送失败时加入重试队列
this.messageQueue.push(messageData);
return false;
}
} else {
// 连接未建立时加入队列
this.messageQueue.push(messageData);
this.connect(); // 尝试连接
return false;
}
}
/**
* 开始心跳
*/
startHeartbeat() {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.connectionState && this.ws) {
try {
// 使用标准消息格式发送心跳 (messageType=3)
const heartbeatMessage = {
conversationId: this.callbacks.getConversationId
? this.callbacks.getConversationId()
: "",
agentId: this.callbacks.getAgentId
? this.callbacks.getAgentId()
: "",
messageType: 3, // 心跳检测
messageContent: "heartbeat",
messageId: IdUtils.generateMessageId(),
};
this.sendMessage(heartbeatMessage);
console.log("心跳消息已发送:", heartbeatMessage);
} catch (error) {
console.error("发送心跳失败:", error);
this.handleError(error);
}
}
}, this.heartbeatInterval);
}
/**
* 停止心跳
*/
stopHeartbeat() {
TimerUtils.clearTimer(this.heartbeatTimer, "interval");
this.heartbeatTimer = null;
}
/**
* 安排重连
*/
scheduleReconnect() {
// 清理之前的重连定时器
if (this.reconnectTimer) {
TimerUtils.clearTimer(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error(
`达到最大重连次数(${this.maxReconnectAttempts}),停止重连`
);
this.connectionState = false;
this.isConnecting = false;
// 使用更温和的错误处理,避免抛出异常
const errorInfo = {
type: "CONNECTION_FAILED",
message: `连接失败,已达到最大重连次数(${this.maxReconnectAttempts})`,
attempts: this.reconnectAttempts,
maxAttempts: this.maxReconnectAttempts,
};
console.warn("WebSocket连接最终失败:", errorInfo);
this._safeCallCallback("onError", errorInfo);
return;
}
this.reconnectAttempts++;
this.stats.reconnectCount++;
// 使用指数退避策略
const backoffDelay = Math.min(
this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts - 1),
30000 // 最大延迟30秒
);
console.log(
`${backoffDelay}ms后进行第${this.reconnectAttempts}/${this.maxReconnectAttempts}次重连`
);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect().catch((error) => {
console.error("重连过程中发生错误:", error);
// 继续尝试重连
this.scheduleReconnect();
});
}, backoffDelay);
}
/**
* 重置连接状态
*/
resetConnectionState() {
this.connectionState = false;
this.isConnecting = false;
this.reconnectAttempts = 0;
// 清理定时器
this.stopHeartbeat();
if (this.reconnectTimer) {
TimerUtils.clearTimer(this.reconnectTimer);
this.reconnectTimer = null;
}
console.log("连接状态已重置");
}
/**
* 检查是否已连接
*/
isConnected() {
return (
this.connectionState &&
this.ws &&
(this.ws.readyState === 1 ||
(typeof WebSocket !== "undefined" &&
this.ws.readyState === WebSocket.OPEN))
);
}
/**
* 手动重连
*/
async reconnect() {
console.log("手动触发重连");
// 先关闭现有连接
this.close();
// 重置连接状态
this.resetConnectionState();
// 重新连接
try {
await this.connect();
return true;
} catch (error) {
console.error("手动重连失败:", error);
return false;
}
}
/**
* 获取连接状态
*/
getConnectionState() {
return {
isConnected: this.isConnected(),
readyState: this.ws ? this.ws.readyState : -1,
reconnectAttempts: this.reconnectAttempts,
maxReconnectAttempts: this.maxReconnectAttempts,
isConnecting: this.isConnecting,
hasReconnectTimer: !!this.reconnectTimer,
};
}
/**
* 获取统计信息
*/
getStats() {
const now = Date.now();
const connectionTime = this.stats.connectionStartTime
? now - this.stats.connectionStartTime
: 0;
return {
...this.stats,
connectionTime,
queueLength: this.messageQueue.length,
isConnected: this.connectionState,
};
}
/**
* 重置统计信息
*/
resetStats() {
this.stats = {
messagesReceived: 0,
messagesSent: 0,
messagesDropped: 0,
reconnectCount: 0,
connectionStartTime: Date.now(),
lastMessageTime: null,
};
console.log("统计信息已重置");
}
/**
* 关闭连接
*/
close() {
this.stopHeartbeat();
// 清理重连定时器
TimerUtils.clearTimer(this.reconnectTimer);
this.reconnectTimer = null;
if (this.ws) {
// 在小程序环境中使用不同的关闭方法
if (typeof uni !== "undefined" && this.ws.close) {
// uni-app 小程序环境
this.ws.close({
code: 1000,
reason: "正常关闭",
success: () => {
console.log("WebSocket连接已关闭");
},
fail: (error) => {
console.error("关闭WebSocket连接失败:", error);
},
});
} else if (this.ws.close) {
// 标准 WebSocket 环境
this.ws.close(1000, "正常关闭");
}
this.ws = null;
}
this.connectionState = false;
this.isConnecting = false;
this.messageQueue = [];
}
/**
* 销毁实例
*/
destroy() {
// 输出最终统计信息
console.log("WebSocketManager销毁前统计:", this.getStats());
this.close();
this.callbacks = {};
console.log("WebSocketManager实例已销毁");
}
}
export default WebSocketManager;