/** * 简化的 WebSocket 管理器(Web 版) * 专门负责 WebSocket 连接和消息传输,不包含打字机逻辑 */ import { IdUtils, CallbackUtils, MessageUtils, TimerUtils } from './index' /* ======================= * 类型定义 * ======================= */ export interface WebSocketCallbacks { onConnect?: (event?: Event) => void onDisconnect?: (event?: CloseEvent) => void onError?: (error: any) => void onMessage?: (message: any) => void getConversationId?: () => string getAgentId?: () => string } export interface WebSocketManagerOptions extends WebSocketCallbacks { wsUrl?: string protocols?: string[] reconnectInterval?: number maxReconnectAttempts?: number heartbeatInterval?: number messageId?: string baseDelay?: number retries?: number maxDelay?: number tryReconnect?: boolean } export interface QueuedMessage { [key: string]: any retryCount?: number timestamp?: number } export interface WebSocketStats { messagesReceived: number messagesSent: number messagesDropped: number reconnectCount: number connectionStartTime: number | null lastMessageTime: number | null } /* ======================= * WebSocketManager * ======================= */ export class WebSocketManager { private wsUrl = '' private protocols: string[] = [] private reconnectInterval = 3000 private maxReconnectAttempts = 5 private heartbeatInterval = 30000 private ws: WebSocket | null = null private reconnectAttempts = 0 private isConnecting = false private connectionState = false private heartbeatTimer: number | null = null private reconnectTimer: number | null = null private callbacks: Required private messageQueue: QueuedMessage[] = [] private stats: WebSocketStats = { messagesReceived: 0, messagesSent: 0, messagesDropped: 0, reconnectCount: 0, connectionStartTime: null, lastMessageTime: null, } constructor(options: WebSocketManagerOptions = {}) { this.wsUrl = options.wsUrl ?? '' this.protocols = options.protocols ?? [] this.reconnectInterval = options.reconnectInterval ?? 3000 this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5 this.heartbeatInterval = options.heartbeatInterval ?? 30000 this.callbacks = { onConnect: options.onConnect ?? (() => { }), onDisconnect: options.onDisconnect ?? (() => { }), onError: options.onError ?? (() => { }), onMessage: options.onMessage ?? (() => { }), getConversationId: options.getConversationId ?? (() => ''), getAgentId: options.getAgentId ?? (() => ''), } } /* ======================= * 内部工具 * ======================= */ private safeCall(name: keyof WebSocketCallbacks, ...args: any[]): void { CallbackUtils.safeCall(this.callbacks, name as string, ...args) } /* ======================= * 连接管理 * ======================= */ async init(wsUrl?: string): Promise { if (wsUrl) this.wsUrl = wsUrl if (!this.wsUrl) throw new Error('WebSocket URL is required') await this.connect() } // 改进方案:让connect()真正等待连接 async connect(): Promise { console.log('[WebSocket] connect() called, isConnecting:', this.isConnecting, 'connectionState:', this.connectionState) if (this.isConnecting || this.connectionState) { console.log('[WebSocket] Already connecting or connected, returning early') return } this.isConnecting = true console.log('[WebSocket] Starting connection...') return new Promise((resolve, reject) => { try { console.log('[WebSocket] About to create new WebSocket with URL:', this.wsUrl) this.ws = new WebSocket(this.wsUrl, this.protocols) console.log('[WebSocket] WebSocket object created, readyState:', this.ws?.readyState) // 包装handleOpen以resolve Promise this.ws.onopen = (event: Event) => { console.log('[WebSocket] onopen event fired') this.handleOpen(event) resolve() // ← 真正的连接成功 } this.ws.onmessage = this.handleMessage this.ws.onclose = (event: CloseEvent) => { console.log('[WebSocket] onclose event fired, code:', event.code, 'reason:', event.reason) this.handleClose(event) } this.ws.onerror = (error: Event) => { console.log('[WebSocket] onerror event fired', error) this.handleError(error) reject(error) // ← Promise拒绝 } } catch (error) { this.isConnecting = false this.safeCall('onError', error) this.scheduleReconnect() reject(error) } }) } private handleOpen = (event: Event): void => { this.isConnecting = false this.connectionState = true this.reconnectAttempts = 0 this.stats.connectionStartTime = Date.now() this.startHeartbeat() this.safeCall('onConnect', event) this.processQueue() } private handleMessage = (event: MessageEvent): void => { const raw = event.data if (MessageUtils.isPongMessage(raw)) return const data = typeof raw === 'string' ? MessageUtils.safeParseJSON(raw) : raw if (!data) return this.stats.messagesReceived++ this.stats.lastMessageTime = Date.now() this.safeCall('onMessage', data) } private handleClose = (event: CloseEvent): void => { this.connectionState = false this.isConnecting = false this.stopHeartbeat() this.safeCall('onDisconnect', event) if (event.code !== 1000 && this.reconnectAttempts < this.maxReconnectAttempts) { this.scheduleReconnect() } } private handleError = (error: Event): void => { this.connectionState = false this.isConnecting = false this.safeCall('onError', { type: 'WEBSOCKET_ERROR', error, }) if (!this.reconnectTimer) { this.scheduleReconnect() } } /* ======================= * 消息发送 * ======================= */ sendMessage(message: QueuedMessage): boolean { const data = { ...message, timestamp: Date.now(), retryCount: message.retryCount ?? 0, } if (!this.isConnected()) { this.messageQueue.push(data) this.connect() return false } try { this.ws!.send(JSON.stringify(data)) this.stats.messagesSent++ return true } catch (error) { this.messageQueue.push(data) this.safeCall('onError', error) return false } } private processQueue(): void { while (this.messageQueue.length) { const msg = this.messageQueue.shift()! if ((msg.retryCount ?? 0) >= 3) { this.stats.messagesDropped++ continue } msg.retryCount = (msg.retryCount ?? 0) + 1 this.sendMessage(msg) } } /* ======================= * 心跳 & 重连 * ======================= */ private startHeartbeat(): void { this.stopHeartbeat() this.heartbeatTimer = window.setInterval(() => { if (!this.isConnected()) return this.sendMessage({ messageType: '3', messageContent: 'heartbeat', messageId: IdUtils.generateMessageId(), conversationId: this.callbacks.getConversationId(), agentId: this.callbacks.getAgentId(), }) }, this.heartbeatInterval) } private stopHeartbeat(): void { this.heartbeatTimer = TimerUtils.clearTimer(this.heartbeatTimer, 'interval') } private scheduleReconnect(): void { if (this.reconnectAttempts >= this.maxReconnectAttempts) return this.reconnectAttempts++ this.stats.reconnectCount++ const delay = Math.min( this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts - 1), 30000 ) this.reconnectTimer = window.setTimeout(() => { this.reconnectTimer = null this.connect() }, delay) } /* ======================= * 对外 API * ======================= */ isConnected(): boolean { return ( this.connectionState && !!this.ws && this.ws.readyState === WebSocket.OPEN ) } getStats() { return { ...this.stats, queueLength: this.messageQueue.length, isConnected: this.isConnected(), } } close(): void { this.stopHeartbeat() TimerUtils.clearTimer(this.reconnectTimer) this.reconnectTimer = null this.ws?.close(1000) this.ws = null this.connectionState = false this.isConnecting = false this.messageQueue = [] } destroy(): void { this.close() } } export default WebSocketManager