import { EventEmitter } from 'node:events'; import { createServer } from 'node:net'; import { join } from 'node:path'; import { BrowserWindow } from 'electron'; import { windowManager } from '@electron/service/window-service'; import logManager from '@electron/service/logger'; import configManager from '@electron/service/config-service'; import { updateTrayStatus } from '@electron/service/tray'; import { getUserDataDir } from '@electron/utils/paths'; import { captureTelemetryEvent, trackMetric } from '@electron/utils/telemetry'; import WebSocket from 'ws'; import { loadOrCreateDeviceIdentity, type DeviceIdentity, } from '@electron/utils/device-identity'; import { CONFIG_KEYS } from '@runtime/lib/constants'; import { normalizeAgentSessionKey } from '@runtime/lib/models'; import type { ContentBlock, RawMessage } from '@runtime/shared/chat-model'; import type { GatewayEvent, GatewayRpcParams, RuntimeRefreshTopic } from './types'; import { createInitialGatewayDiagnostics, type GatewayDiagnosticsSnapshot, } from './diagnostics'; import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch'; import { OpenClawProcessOwner } from './openclaw-process-owner'; import { launchGatewayProcess } from './process-launcher'; import { prepareGatewayLaunchContext } from './config-sync'; import { DEFAULT_RECONNECT_CONFIG, type ReconnectConfig, type GatewayLifecycleState, getReconnectScheduleDecision, getReconnectSkipReason, } from './process-policy'; import { isNotification, isResponse, type JsonRpcNotification } from './protocol'; import { clearPendingGatewayRequests, rejectPendingGatewayRequest, resolvePendingGatewayRequest, type PendingGatewayRequest, } from './request-store'; import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr'; import { runGatewayStartupSequence } from './startup-orchestrator'; import { findExistingGatewayProcess, runOpenClawDoctorRepair, terminateOwnedGatewayProcess, unloadLaunchctlGatewayService, waitForPortFree, warmupManagedPythonReadiness, } from './supervisor'; import type { GatewayProcessHandle } from './process-handle'; import { GatewayConnectionMonitor } from './connection-monitor'; import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller'; import { GatewayRestartController } from './restart-controller'; import { GatewayRestartGovernor } from './restart-governor'; import { DEFAULT_GATEWAY_RELOAD_POLICY, loadGatewayReloadPolicy, type GatewayReloadPolicy, } from './reload-policy'; import { GatewayStateController, type GatewayRuntimeStatus } from './state'; import { connectGatewaySocket, waitForGatewayReady } from './ws-client'; import { dispatchGatewayRpcMethod } from './rpc-dispatch'; import { createRandomId } from './random-id'; type RuntimeChangeBroadcast = { topics: RuntimeRefreshTopic[]; reason?: string; warnings?: string[]; channelType?: string; accountId?: string; }; type GatewayStatus = 'connected' | 'disconnected' | 'reconnecting'; type GatewayResponseFrame = { type?: string; id?: string; ok?: boolean; payload?: unknown; error?: unknown; }; type GatewayEventFrame = { type?: string; event?: string; payload?: unknown; }; type GatewayNotificationEvent = | JsonRpcNotification | { method: string; params?: unknown; }; type GatewayToolStatusNotification = { status: 'running' | 'completed' | 'error'; payload?: unknown; }; export interface GatewayManagerEvents { status: (status: GatewayRuntimeStatus) => void; message: (message: unknown) => void; notification: (notification: GatewayNotificationEvent) => void; exit: (code: number | null) => void; error: (error: Error) => void; 'channel:status': (data: { channelId: string; status: string }) => void; 'chat:message': (data: { message: unknown }) => void; 'gateway:ready': (payload: unknown) => void; 'tool:status': (payload: GatewayToolStatusNotification) => void; } function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null; } function toErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; } return String(error); } function isTransportRpcFailure(error: unknown): boolean { const message = error instanceof Error ? error.message : String(error); return message.includes('Gateway RPC timed out:') || message.includes('OpenClaw Gateway socket is not connected') || message.includes('Gateway request cancelled:') || message.includes('Failed to send Gateway RPC'); } function normalizeTimestamp(value: unknown): number | undefined { if (typeof value === 'number' && Number.isFinite(value)) { return value; } if (typeof value === 'string') { const parsed = Date.parse(value); if (Number.isFinite(parsed)) { return parsed; } } return undefined; } function normalizeMessageRole(value: unknown): RawMessage['role'] { const normalized = typeof value === 'string' ? value.trim().toLowerCase() : ''; switch (normalized) { case 'user': case 'assistant': case 'system': case 'tool_result': case 'toolresult': return normalized === 'tool_result' ? 'tool_result' : (normalized as RawMessage['role']); case 'tool': return 'toolresult'; default: return 'assistant'; } } function toTextContentBlocks(text: string): ContentBlock[] { return text ? [{ type: 'text', text }] : []; } function normalizeGatewayRawMessage( value: unknown, options?: { preferContentBlocks?: boolean }, ): RawMessage | null { if (!isRecord(value)) { return null; } const rawContent = value.content; let content: RawMessage['content'] = ''; if (typeof rawContent === 'string') { content = options?.preferContentBlocks ? toTextContentBlocks(rawContent) : rawContent; } else if (Array.isArray(rawContent)) { content = rawContent as ContentBlock[]; } else if (typeof value.text === 'string') { content = toTextContentBlocks(value.text); } return { ...(value as Partial), role: normalizeMessageRole(value.role), content, timestamp: normalizeTimestamp(value.timestamp), }; } function extractTextFromMessageContent(content: RawMessage['content'] | unknown): string { if (typeof content === 'string') { return content; } if (!Array.isArray(content)) { return ''; } return content .map((block) => { if (!isRecord(block)) { return ''; } if (typeof block.text === 'string') { return block.text; } if (typeof block.thinking === 'string') { return block.thinking; } if (typeof block.content === 'string') { return block.content; } return ''; }) .filter(Boolean) .join('\n'); } function extractTextFromRawMessage(message: RawMessage): string { return extractTextFromMessageContent(message.content); } function extractTextFromGatewayPayload(payload: Record): string { if (typeof payload.delta === 'string') { return payload.delta; } if (typeof payload.text === 'string') { return payload.text; } const normalizedMessage = normalizeGatewayRawMessage(payload.message); if (normalizedMessage) { return extractTextFromRawMessage(normalizedMessage); } return ''; } function getRecordStringField(record: Record, ...keys: string[]): string | undefined { for (const key of keys) { const value = record[key]; if (typeof value === 'string' && value.trim()) { return value; } } return undefined; } function getRecordNumberField(record: Record, ...keys: string[]): number | undefined { for (const key of keys) { const value = record[key]; if (typeof value === 'number' && Number.isFinite(value)) { return value; } } return undefined; } function normalizeGatewayToolStatus(value: unknown): 'running' | 'completed' | 'error' { if (typeof value !== 'string') { return 'completed'; } switch (value.trim().toLowerCase()) { case 'running': case 'completed': case 'error': return value.trim().toLowerCase() as 'running' | 'completed' | 'error'; case 'failed': return 'error'; default: return 'completed'; } } function normalizeToolStatusEvent(value: GatewayToolStatusNotification): GatewayEvent | null { const payload = isRecord(value.payload) ? value.payload : {}; const sessionKey = getRecordStringField(payload, 'sessionKey', 'session_key'); const runId = getRecordStringField(payload, 'runId', 'run_id'); const toolName = getRecordStringField(payload, 'toolName', 'tool_name', 'name'); if (!sessionKey || !runId || !toolName) { return null; } const toolCallId = getRecordStringField(payload, 'toolCallId', 'tool_call_id', 'id'); const summary = getRecordStringField(payload, 'summary', 'message'); const durationMs = getRecordNumberField(payload, 'durationMs', 'duration_ms'); const errorMessage = getRecordStringField(payload, 'error'); const status = errorMessage ? 'error' : normalizeGatewayToolStatus(payload.status ?? value.status); return { type: 'tool:status', sessionKey: normalizeAgentSessionKey(sessionKey), runId, toolCallId, toolName, status, updatedAt: normalizeTimestamp(payload.timestamp) ?? Date.now(), summary: errorMessage ?? summary, durationMs, input: payload.input ?? payload.arguments ?? payload.args, result: payload.result, }; } function buildGatewayRpcError(error: unknown, fallback: string): Error { if (typeof error === 'string' && error.trim()) { return new Error(error); } if (isRecord(error) && typeof error.message === 'string' && error.message.trim()) { return new Error(error.message); } return new Error(fallback); } async function findAvailablePort(): Promise { return await new Promise((resolve, reject) => { const server = createServer(); server.once('error', reject); server.listen(0, '127.0.0.1', () => { const address = server.address(); const port = typeof address === 'object' && address ? address.port : 0; server.close((error) => { if (error) { reject(error); return; } if (!port) { reject(new Error('Failed to allocate an available Gateway port')); return; } resolve(port); }); }); }); } export class GatewayManager extends EventEmitter { private initialized = false; private initPromise: Promise | null = null; private startPromise: Promise | null = null; private stopPromise: Promise | null = null; private status: GatewayStatus = 'disconnected'; private lifecycleState: GatewayLifecycleState = 'stopped'; private gatewayStatusDetails: GatewayRuntimeStatus = { state: 'stopped', port: null }; private readonly mode = 'openclaw' as const; private readonly processOwner = new OpenClawProcessOwner(); private readonly pendingRequests = new Map(); private readonly deltaSnapshots = new Map(); private gatewayToken = createRandomId(); private socket: WebSocket | null = null; private child: GatewayProcessHandle | null = null; private port: number | null = null; private exitCode: number | null = null; private lastError?: string; private stopping = false; private deviceIdentity: DeviceIdentity | null = null; private ownsProcess = false; private externalShutdownSupported: boolean | null = null; private reconnectTimer: NodeJS.Timeout | null = null; private reconnectAttempts = 0; private reconnectAttemptsTotal = 0; private reconnectSuccessTotal = 0; private reconnectConfig: ReconnectConfig; private shouldReconnect = true; private startLock = false; private restartInFlight: Promise | null = null; private readonly connectionMonitor = new GatewayConnectionMonitor(); private readonly lifecycleController = new GatewayLifecycleController(); private readonly restartController = new GatewayRestartController(); private readonly restartGovernor = new GatewayRestartGovernor(); private reloadDebounceTimer: NodeJS.Timeout | null = null; private reloadPolicy: GatewayReloadPolicy = { ...DEFAULT_GATEWAY_RELOAD_POLICY }; private reloadPolicyLoadedAt = 0; private reloadPolicyRefreshPromise: Promise | null = null; private isAutoReconnectStart = false; private pendingRuntimeChange: RuntimeChangeBroadcast | null = null; private lastSpawnSummary: string | null = null; private recentStartupStderrLines: string[] = []; private readonly stateController: GatewayStateController; private gatewayReadyFallbackTimer: NodeJS.Timeout | null = null; private diagnostics: GatewayDiagnosticsSnapshot = createInitialGatewayDiagnostics(); private static readonly RELOAD_POLICY_REFRESH_MS = 15_000; private static readonly HEARTBEAT_INTERVAL_MS = 30_000; private static readonly HEARTBEAT_TIMEOUT_MS = 12_000; private static readonly HEARTBEAT_MAX_MISSES = 3; private static readonly HEARTBEAT_INTERVAL_MS_WIN = 60_000; private static readonly HEARTBEAT_TIMEOUT_MS_WIN = 25_000; private static readonly HEARTBEAT_MAX_MISSES_WIN = 5; private static readonly GATEWAY_READY_FALLBACK_MS = 30_000; override on(eventName: 'status', listener: GatewayManagerEvents['status']): this; override on(eventName: 'message', listener: GatewayManagerEvents['message']): this; override on(eventName: 'notification', listener: GatewayManagerEvents['notification']): this; override on(eventName: 'exit', listener: GatewayManagerEvents['exit']): this; override on(eventName: 'error', listener: GatewayManagerEvents['error']): this; override on(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this; override on(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this; override on(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this; override on(eventName: 'tool:status', listener: GatewayManagerEvents['tool:status']): this; override on(eventName: string | symbol, listener: (...args: any[]) => void): this; override on(eventName: string | symbol, listener: (...args: any[]) => void): this { return super.on(eventName, listener); } override once(eventName: 'status', listener: GatewayManagerEvents['status']): this; override once(eventName: 'message', listener: GatewayManagerEvents['message']): this; override once(eventName: 'notification', listener: GatewayManagerEvents['notification']): this; override once(eventName: 'exit', listener: GatewayManagerEvents['exit']): this; override once(eventName: 'error', listener: GatewayManagerEvents['error']): this; override once(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this; override once(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this; override once(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this; override once(eventName: 'tool:status', listener: GatewayManagerEvents['tool:status']): this; override once(eventName: string | symbol, listener: (...args: any[]) => void): this; override once(eventName: string | symbol, listener: (...args: any[]) => void): this { return super.once(eventName, listener); } override off(eventName: 'status', listener: GatewayManagerEvents['status']): this; override off(eventName: 'message', listener: GatewayManagerEvents['message']): this; override off(eventName: 'notification', listener: GatewayManagerEvents['notification']): this; override off(eventName: 'exit', listener: GatewayManagerEvents['exit']): this; override off(eventName: 'error', listener: GatewayManagerEvents['error']): this; override off(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this; override off(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this; override off(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this; override off(eventName: 'tool:status', listener: GatewayManagerEvents['tool:status']): this; override off(eventName: string | symbol, listener: (...args: any[]) => void): this; override off(eventName: string | symbol, listener: (...args: any[]) => void): this { return super.off(eventName, listener); } constructor(config?: Partial) { super(); this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config }; this.stateController = new GatewayStateController({ emitStatus: (status) => { this.gatewayStatusDetails = status; this.lifecycleState = status.state; this.status = this.toGatewayStatus(status.state); this.emit('status', { ...status }); updateTrayStatus(this.status); this.broadcast({ type: 'gateway:status', status: this.status }); }, onTransition: (previousState, nextState) => { if (nextState === 'running') { this.restartGovernor.onRunning(); } this.restartController.flushDeferredRestart( `status:${previousState}->${nextState}`, { state: this.lifecycleState, startLock: this.startLock, shouldReconnect: this.shouldReconnect, }, () => { void this.restart().catch((error) => { logManager.warn('Deferred Gateway restart failed:', error); }); }, ); }, }); this.on('gateway:ready', () => { this.clearGatewayReadyFallback(); if (this.lifecycleState === 'running' && !this.gatewayStatusDetails.gatewayReady) { logManager.info('Gateway subsystems ready (event received)'); this.setGatewayState({ gatewayReady: true }); } }); this.on('tool:status', (payload) => { const event = normalizeToolStatusEvent(payload); if (event) { this.broadcast(event); } }); this.on('error', (error) => { logManager.debug('GatewayManager emitted error event', error); }); } private sanitizeSpawnArgs(args: string[]): string[] { const sanitized = [...args]; const tokenIndex = sanitized.indexOf('--token'); if (tokenIndex >= 0 && tokenIndex + 1 < sanitized.length) { sanitized[tokenIndex + 1] = ''; } return sanitized; } private isUnsupportedShutdownError(error: unknown): boolean { const message = error instanceof Error ? error.message : String(error); return /unknown method:\s*shutdown/i.test(message); } private toGatewayStatus(state: GatewayLifecycleState): GatewayStatus { switch (state) { case 'running': return 'connected'; case 'starting': case 'reconnecting': return 'reconnecting'; case 'stopped': case 'error': default: return 'disconnected'; } } private setGatewayState(update: Partial): void { this.stateController.setStatus(update); } private queueRuntimeChange(change?: RuntimeChangeBroadcast): void { if (!change) { return; } if (!this.pendingRuntimeChange) { this.pendingRuntimeChange = { topics: [...change.topics], reason: change.reason, warnings: change.warnings ? [...change.warnings] : undefined, channelType: change.channelType, accountId: change.accountId, }; return; } this.pendingRuntimeChange = { topics: Array.from(new Set([...this.pendingRuntimeChange.topics, ...change.topics])), reason: change.reason ?? this.pendingRuntimeChange.reason, warnings: Array.from(new Set([ ...(this.pendingRuntimeChange.warnings ?? []), ...(change.warnings ?? []), ])), channelType: change.channelType ?? this.pendingRuntimeChange.channelType, accountId: change.accountId ?? this.pendingRuntimeChange.accountId, }; } private takeQueuedRuntimeChange(): RuntimeChangeBroadcast | undefined { if (!this.pendingRuntimeChange) { return undefined; } const change = this.pendingRuntimeChange; this.pendingRuntimeChange = null; return change; } private flushQueuedRuntimeChangeFailure(prefix: string, error: unknown): void { const runtimeChange = this.takeQueuedRuntimeChange(); if (!runtimeChange) { return; } const warning = `${prefix}: ${toErrorMessage(error)}`; this.notifyRuntimeChanged({ ...runtimeChange, warnings: [...(runtimeChange.warnings ?? []), warning], }); } private async initDeviceIdentity(): Promise { if (this.deviceIdentity) { return; } try { const identityPath = join(getUserDataDir(), 'openclaw-device-identity.json'); this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath); logManager.info('OpenClaw Gateway device identity loaded', { deviceId: this.deviceIdentity.deviceId, }); } catch (error) { logManager.warn('Failed to load OpenClaw device identity; scopes may be limited:', error); } } private async terminateChild(child: GatewayProcessHandle): Promise { await terminateOwnedGatewayProcess(child); } private async disposeTransport(reason: string): Promise { const socket = this.socket; this.socket = null; this.clearGatewayReadyFallback(); if (socket) { try { socket.terminate(); } catch (error) { logManager.warn(`Failed to close OpenClaw Gateway socket during ${reason}:`, error); } } const child = this.child; this.child = null; this.port = null; this.ownsProcess = false; if (child && this.exitCode === null) { this.exitCode = -1; } else if (!child) { this.exitCode = null; } this.deltaSnapshots.clear(); clearPendingGatewayRequests( this.pendingRequests, new Error(`Gateway request cancelled: ${reason}`), ); this.setGatewayState({ port: this.port, pid: undefined, connectedAt: undefined, uptime: undefined, gatewayReady: undefined, }); if (child) { await this.terminateChild(child); } } private handleGatewayProcessExit(child: GatewayProcessHandle, code: number | null): void { if (this.child !== child) { return; } this.exitCode = code ?? -1; this.child = null; this.ownsProcess = false; this.connectionMonitor.clear(); this.clearGatewayReadyFallback(); this.emit('exit', code); if (this.stopping) { return; } this.lastError = `OpenClaw Gateway exited unexpectedly (code=${code ?? 'unknown'})`; this.socket = null; this.deltaSnapshots.clear(); clearPendingGatewayRequests( this.pendingRequests, new Error(this.lastError), ); this.setGatewayState({ state: 'stopped', port: this.port, pid: undefined, error: this.lastError, connectedAt: undefined, uptime: undefined, gatewayReady: undefined, reconnectAttempts: this.reconnectAttempts, }); logManager.warn(this.lastError); this.scheduleReconnect(); } private handleGatewayProcessError(error: Error): void { this.lastError = toErrorMessage(error); logManager.error('OpenClaw Gateway process error:', error); this.emit('error', error); } private async spawnGatewayProcess(): Promise { if (this.port === null) { throw new Error('OpenClaw Gateway port not allocated'); } const launchContext = await prepareGatewayLaunchContext({ port: this.port, token: this.gatewayToken, }); await unloadLaunchctlGatewayService(); const stderrDedup = new Map(); const { child, lastSpawnSummary } = await launchGatewayProcess({ port: this.port, launchContext, sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args), onStdoutLine: (line) => { logManager.debug(`[OpenClaw stdout] ${line}`); }, onStderrLine: (line) => { recordGatewayStartupStderrLine(this.recentStartupStderrLines, line); const classified = classifyGatewayStderrMessage(line); if (classified.level === 'drop') { return; } const count = (stderrDedup.get(classified.normalized) ?? 0) + 1; stderrDedup.set(classified.normalized, count); if (count > 1) { if (count % 50 === 0) { logManager.debug(`[Gateway stderr] (suppressed ${count} repeats) ${classified.normalized}`); } return; } if (classified.level === 'debug') { logManager.debug(`[Gateway stderr] ${classified.normalized}`); return; } logManager.warn(`[Gateway stderr] ${classified.normalized}`); }, onExit: (exitedChild, code) => { this.handleGatewayProcessExit(exitedChild, code); }, onError: (error) => { this.handleGatewayProcessError(error); }, }); this.child = child; this.ownsProcess = true; this.lastSpawnSummary = lastSpawnSummary; this.setGatewayState({ port: this.port, pid: child.pid ?? undefined, error: undefined, reconnectAttempts: this.reconnectAttempts, }); } private async connectToGateway(port: number, externalToken?: string): Promise { const token = externalToken ?? this.gatewayToken; const socket = await connectGatewaySocket({ port, token, deviceIdentity: this.deviceIdentity, platform: process.platform, onMessage: (message) => this.handleGatewayFrame(message), onCloseAfterHandshake: (socket, code) => this.handleGatewaySocketClosed(socket, code), }); this.socket = socket; socket.on('pong', () => { this.connectionMonitor.markAlive('pong'); this.recordGatewayAlive(); }); this.recordGatewayAlive(); this.connectionMonitor.markAlive('message'); this.setGatewayState({ state: 'running', port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: undefined, connectedAt: Date.now(), reconnectAttempts: this.reconnectAttempts, gatewayReady: false, }); this.startPing(); this.startHealthCheck(); this.scheduleGatewayReadyFallback(); } private handleGatewaySocketClosed(socket: WebSocket, code: number): void { if (this.socket !== socket) { return; } if (this.stopping) { return; } this.socket = null; this.connectionMonitor.clear(); this.clearGatewayReadyFallback(); this.recordSocketClose(code); this.diagnostics.consecutiveHeartbeatMisses = 0; this.deltaSnapshots.clear(); this.lastError = `OpenClaw Gateway socket closed (code=${code})`; clearPendingGatewayRequests( this.pendingRequests, new Error(this.lastError), ); const wasRunning = this.lifecycleState === 'running'; this.setGatewayState({ state: 'stopped', port: this.port, pid: this.child?.pid ?? undefined, error: this.lastError, connectedAt: undefined, uptime: undefined, gatewayReady: undefined, reconnectAttempts: this.reconnectAttempts, }); logManager.warn(this.lastError); if (wasRunning && (process.platform !== 'win32' || code === 1012)) { this.scheduleReconnect(); } } private handleGatewayFrame(frame: unknown): void { this.connectionMonitor.markAlive('message'); this.recordGatewayAlive(); if (!isRecord(frame)) { return; } if (frame.type === 'res' && typeof frame.id === 'string') { const response = frame as GatewayResponseFrame; if (response.ok === false) { rejectPendingGatewayRequest( this.pendingRequests, response.id!, buildGatewayRpcError(response.error, `Gateway RPC failed: ${response.id}`), ); return; } resolvePendingGatewayRequest( this.pendingRequests, response.id!, response.payload, ); return; } if (frame.type === 'event' && typeof frame.event === 'string') { const eventFrame = frame as GatewayEventFrame; if (eventFrame.event === 'chat' && isRecord(eventFrame.payload)) { this.handleChatEvent(eventFrame.payload); } dispatchProtocolEvent(this, eventFrame.event, eventFrame.payload); return; } if (isResponse(frame) && frame.id && this.pendingRequests.has(String(frame.id))) { if (frame.error) { rejectPendingGatewayRequest( this.pendingRequests, String(frame.id), buildGatewayRpcError(frame.error, `Gateway RPC failed: ${String(frame.id)}`), ); } else { resolvePendingGatewayRequest(this.pendingRequests, String(frame.id), frame.result); } return; } if (isNotification(frame)) { dispatchJsonRpcNotification(this, frame as JsonRpcNotification); return; } this.emit('message', frame); } private handleChatEvent(payload: Record): void { const sessionKey = typeof payload.sessionKey === 'string' ? normalizeAgentSessionKey(payload.sessionKey) : ''; const runId = typeof payload.runId === 'string' ? payload.runId : ''; const state = typeof payload.state === 'string' ? payload.state : ''; if (!sessionKey || !runId || !state) { return; } switch (state) { case 'delta': { const nextSnapshot = extractTextFromGatewayPayload(payload); if (!nextSnapshot) { return; } const previousSnapshot = this.deltaSnapshots.get(runId) ?? ''; const delta = nextSnapshot.startsWith(previousSnapshot) ? nextSnapshot.slice(previousSnapshot.length) : nextSnapshot; this.deltaSnapshots.set(runId, nextSnapshot); if (delta) { this.broadcast({ type: 'chat:delta', sessionKey, runId, delta, }); } break; } case 'final': { const snapshotText = this.deltaSnapshots.get(runId) ?? ''; this.deltaSnapshots.delete(runId); const message = normalizeGatewayRawMessage(payload.message, { preferContentBlocks: true, }) ?? ( snapshotText ? { role: 'assistant', content: toTextContentBlocks(snapshotText), timestamp: Date.now(), } : null ); if (!message) { return; } this.broadcast({ type: 'chat:final', sessionKey, runId, message, }); break; } case 'error': { this.deltaSnapshots.delete(runId); this.broadcast({ type: 'chat:error', sessionKey, runId, error: typeof payload.errorMessage === 'string' ? payload.errorMessage : 'Gateway chat error', }); break; } case 'aborted': { this.deltaSnapshots.delete(runId); this.broadcast({ type: 'chat:aborted', sessionKey, runId, }); break; } default: break; } } private async sendGatewayRequest( method: string, params?: Record, options?: { timeoutMs?: number | null; skipStart?: boolean; }, ): Promise { if (!options?.skipStart) { await this.start(); } const socket = this.socket; if (!socket || socket.readyState !== WebSocket.OPEN) { throw new Error('OpenClaw Gateway socket is not connected'); } const requestId = `${method}-${randomUUID()}`; const timeoutMs = options?.timeoutMs ?? 30_000; return await new Promise((resolve, reject) => { const timeout = timeoutMs === null ? null : setTimeout(() => { rejectPendingGatewayRequest( this.pendingRequests, requestId, new Error(`Gateway RPC timed out: ${method}`), ); }, timeoutMs); this.pendingRequests.set(requestId, { resolve, reject, timeout, }); try { socket.send(JSON.stringify({ type: 'req', id: requestId, method, params, })); } catch (error) { rejectPendingGatewayRequest( this.pendingRequests, requestId, new Error(`Failed to send Gateway RPC ${method}: ${toErrorMessage(error)}`), ); } }).then((result) => { this.recordRpcSuccess(); return result; }).catch((error) => { if (isTransportRpcFailure(error)) { this.recordRpcFailure(method); } throw error; }); } private async rpcGateway( method: string, params: Record, options?: { timeoutMs?: number | null }, ): Promise { return await this.sendGatewayRequest(method, params, options); } private async requestGatewayShutdown(timeoutMs = 5_000): Promise { await this.sendGatewayRequest( 'shutdown', undefined, { timeoutMs, skipStart: true, }, ); } private startHealthCheck(): void { this.connectionMonitor.startHealthCheck({ shouldCheck: () => this.lifecycleState === 'running', checkHealth: () => this.checkHealth(), onUnhealthy: (errorMessage) => { logManager.warn(`Gateway health check failed: ${errorMessage}`); this.emit('error', new Error(errorMessage)); }, onError: () => { // The monitor already logged the error. }, }); } private startPing(): void { const isWindows = process.platform === 'win32'; this.connectionMonitor.startPing({ intervalMs: isWindows ? GatewayManager.HEARTBEAT_INTERVAL_MS_WIN : GatewayManager.HEARTBEAT_INTERVAL_MS, timeoutMs: isWindows ? GatewayManager.HEARTBEAT_TIMEOUT_MS_WIN : GatewayManager.HEARTBEAT_TIMEOUT_MS, maxConsecutiveMisses: isWindows ? GatewayManager.HEARTBEAT_MAX_MISSES_WIN : GatewayManager.HEARTBEAT_MAX_MISSES, sendPing: () => { if (this.socket?.readyState === WebSocket.OPEN) { this.socket.ping(); } }, onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => { this.recordHeartbeatTimeout(consecutiveMisses); logManager.warn( `Gateway heartbeat: ${consecutiveMisses} consecutive pong misses (timeout=${timeoutMs}ms, state=${this.lifecycleState}, autoReconnect=${this.shouldReconnect}).`, ); if (!this.shouldReconnect || this.lifecycleState !== 'running') { return; } if (process.platform === 'win32') { logManager.warn('Gateway heartbeat recovery skipped on Windows; waiting for process exit or socket close'); return; } void this.restart().catch((error) => { logManager.warn('Gateway heartbeat recovery failed:', error); }); }, }); } private async refreshReloadPolicy(force = false): Promise { const now = Date.now(); if (!force && now - this.reloadPolicyLoadedAt < GatewayManager.RELOAD_POLICY_REFRESH_MS) { return; } if (this.reloadPolicyRefreshPromise) { await this.reloadPolicyRefreshPromise; return; } this.reloadPolicyRefreshPromise = (async () => { const nextPolicy = await loadGatewayReloadPolicy(); this.reloadPolicy = nextPolicy; this.reloadPolicyLoadedAt = Date.now(); })(); try { await this.reloadPolicyRefreshPromise; } finally { this.reloadPolicyRefreshPromise = null; } } private clearAllTimers(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } this.connectionMonitor.clear(); this.restartController.clearDebounceTimer(); if (this.reloadDebounceTimer) { clearTimeout(this.reloadDebounceTimer); this.reloadDebounceTimer = null; } this.clearGatewayReadyFallback(); } private clearGatewayReadyFallback(): void { if (this.gatewayReadyFallbackTimer) { clearTimeout(this.gatewayReadyFallbackTimer); this.gatewayReadyFallbackTimer = null; } } private scheduleGatewayReadyFallback(): void { this.clearGatewayReadyFallback(); this.gatewayReadyFallbackTimer = setTimeout(() => { this.gatewayReadyFallbackTimer = null; if (this.lifecycleState === 'running' && !this.gatewayStatusDetails.gatewayReady) { logManager.info('Gateway ready fallback triggered (no gateway.ready event within timeout)'); this.setGatewayState({ gatewayReady: true }); } }, GatewayManager.GATEWAY_READY_FALLBACK_MS); } getDiagnostics(): GatewayDiagnosticsSnapshot { return { ...this.diagnostics }; } private recordGatewayAlive(): void { this.diagnostics.lastAliveAt = Date.now(); this.diagnostics.consecutiveHeartbeatMisses = 0; } private recordRpcSuccess(): void { this.diagnostics.lastRpcSuccessAt = Date.now(); this.diagnostics.consecutiveRpcFailures = 0; } private recordRpcFailure(method: string): void { this.diagnostics.lastRpcFailureAt = Date.now(); this.diagnostics.lastRpcFailureMethod = method; this.diagnostics.consecutiveRpcFailures += 1; } private recordHeartbeatTimeout(consecutiveMisses: number): void { this.diagnostics.lastHeartbeatTimeoutAt = Date.now(); this.diagnostics.consecutiveHeartbeatMisses = consecutiveMisses; } private recordSocketClose(code: number): void { this.diagnostics.lastSocketCloseAt = Date.now(); this.diagnostics.lastSocketCloseCode = code; } private emitReconnectMetric( outcome: 'success' | 'failure', payload: { attemptNo: number; maxAttempts: number; delayMs: number; error?: string; }, ): void { const successRate = this.reconnectAttemptsTotal > 0 ? this.reconnectSuccessTotal / this.reconnectAttemptsTotal : 0; trackMetric('gateway.reconnect', { outcome, attemptNo: payload.attemptNo, maxAttempts: payload.maxAttempts, delayMs: payload.delayMs, gateway_reconnect_success_count: this.reconnectSuccessTotal, gateway_reconnect_attempt_count: this.reconnectAttemptsTotal, gateway_reconnect_success_rate: Number(successRate.toFixed(4)), ...(payload.error ? { error: payload.error } : {}), }); } private scheduleReconnect(): void { const decision = getReconnectScheduleDecision({ shouldReconnect: this.shouldReconnect, hasReconnectTimer: this.reconnectTimer !== null, reconnectAttempts: this.reconnectAttempts, maxAttempts: this.reconnectConfig.maxAttempts, baseDelay: this.reconnectConfig.baseDelay, maxDelay: this.reconnectConfig.maxDelay, }); if (decision.action === 'skip') { return; } if (decision.action === 'already-scheduled') { return; } if (decision.action === 'fail') { this.lastError = 'Failed to reconnect after maximum attempts'; this.setGatewayState({ state: 'error', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: this.lastError, reconnectAttempts: this.reconnectAttempts, gatewayReady: undefined, }); return; } const { delay, nextAttempt, maxAttempts } = decision; this.reconnectAttempts = nextAttempt; this.setGatewayState({ state: 'reconnecting', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: this.lastError, reconnectAttempts: nextAttempt, gatewayReady: false, }); const scheduledEpoch = this.lifecycleController.getCurrentEpoch(); this.reconnectTimer = setTimeout(async () => { this.reconnectTimer = null; const skipReason = getReconnectSkipReason({ scheduledEpoch, currentEpoch: this.lifecycleController.getCurrentEpoch(), shouldReconnect: this.shouldReconnect, }); if (skipReason) { return; } const attemptNo = this.reconnectAttempts; this.reconnectAttemptsTotal += 1; try { this.isAutoReconnectStart = true; await this.start(); this.reconnectSuccessTotal += 1; this.emitReconnectMetric('success', { attemptNo, maxAttempts, delayMs: delay, }); this.reconnectAttempts = 0; } catch (error) { logManager.error( `Gateway reconnection attempt ${nextAttempt}/${maxAttempts} failed:`, error, ); this.emitReconnectMetric('failure', { attemptNo, maxAttempts, delayMs: delay, error: toErrorMessage(error), }); this.scheduleReconnect(); } }, delay); } debouncedRestart(delayMs = 2000, options?: RuntimeChangeBroadcast): void { this.queueRuntimeChange(options); this.restartController.debouncedRestart(delayMs, () => { void this.restart().catch((error) => { logManager.warn('Debounced Gateway restart failed:', error); this.flushQueuedRuntimeChangeFailure('Gateway restart failed', error); }); }); } async reload(options?: RuntimeChangeBroadcast): Promise { this.queueRuntimeChange(options); await this.refreshReloadPolicy(); if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') { await this.restart(); return; } if (this.restartController.isRestartDeferred({ state: this.lifecycleState, startLock: this.startLock, })) { this.restartController.markDeferredRestart('reload', { state: this.lifecycleState, startLock: this.startLock, }); return; } if (!this.child?.pid || this.lifecycleState !== 'running') { await this.restart(); return; } if (process.platform === 'win32') { await this.restart(); return; } try { process.kill(this.child.pid, 'SIGUSR1'); await new Promise((resolve) => setTimeout(resolve, 1500)); if (this.lifecycleState !== 'running' || !this.child?.pid) { await this.restart(); return; } const runtimeChange = this.takeQueuedRuntimeChange(); if (runtimeChange) { this.notifyRuntimeChanged(runtimeChange); } } catch (error) { logManager.warn('Gateway reload signal failed, falling back to restart:', error); await this.restart(); } } debouncedReload(delayMs?: number, options?: RuntimeChangeBroadcast): void { this.queueRuntimeChange(options); void this.refreshReloadPolicy(); const effectiveDelay = delayMs ?? this.reloadPolicy.debounceMs; if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') { this.debouncedRestart(effectiveDelay); return; } if (this.reloadDebounceTimer) { clearTimeout(this.reloadDebounceTimer); } this.reloadDebounceTimer = setTimeout(() => { this.reloadDebounceTimer = null; void this.reload().catch((error) => { logManager.warn('Debounced Gateway reload failed:', error); this.flushQueuedRuntimeChangeFailure('Gateway reload failed', error); }); }, effectiveDelay); } async init(): Promise { if (this.initialized) { return; } if (this.initPromise) { return await this.initPromise; } this.initPromise = (async () => { this.initialized = true; logManager.info('GatewayManager initialized in OpenClaw mode'); const autoStart = Boolean(configManager.get(CONFIG_KEYS.GATEWAY_AUTO_START)); if (!autoStart) { this.setGatewayState({ state: 'stopped', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, gatewayReady: undefined, reconnectAttempts: this.reconnectAttempts, }); return; } try { await this.start(); } catch (error) { this.lastError = toErrorMessage(error); this.setGatewayState({ state: 'stopped', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: this.lastError, gatewayReady: undefined, reconnectAttempts: this.reconnectAttempts, }); logManager.error('Failed to auto-start OpenClaw Gateway:', error); } })(); try { await this.initPromise; } finally { this.initPromise = null; } } async start(): Promise { if (this.startLock) { if (this.startPromise) { return await this.startPromise; } return; } if (this.lifecycleState === 'running' && this.socket?.readyState === WebSocket.OPEN) { return; } if (this.startPromise) { return await this.startPromise; } this.initialized = true; this.startPromise = (async () => { if (this.stopPromise) { await this.stopPromise; } this.startLock = true; const startEpoch = this.lifecycleController.bump('start'); this.stopping = false; this.lastError = undefined; this.shouldReconnect = true; await this.refreshReloadPolicy(true); try { await this.initDeviceIdentity(); await this.processOwner.prepare(); const runtimeStatus = this.processOwner.getStatus(); if (!runtimeStatus.entryExists) { throw new Error(runtimeStatus.lastError || `OpenClaw entry not found at ${runtimeStatus.runtimePaths.entryPath}`); } if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } const wasAutoReconnectStart = this.isAutoReconnectStart; if (!wasAutoReconnectStart) { this.reconnectAttempts = 0; } this.isAutoReconnectStart = false; const reusingManagedProcess = this.child?.pid != null && this.ownsProcess && this.port != null && this.socket === null; const shouldDisposeTransport = !reusingManagedProcess && (this.socket !== null || this.child !== null); if (shouldDisposeTransport) { await this.disposeTransport('starting OpenClaw Gateway'); } if (this.port === null) { this.port = await findAvailablePort(); } this.exitCode = null; if (!reusingManagedProcess) { this.gatewayToken = randomUUID(); } this.setGatewayState({ state: wasAutoReconnectStart ? 'reconnecting' : 'starting', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: undefined, connectedAt: undefined, uptime: undefined, reconnectAttempts: this.reconnectAttempts, gatewayReady: false, }); warmupManagedPythonReadiness(); await runGatewayStartupSequence({ port: this.port, shouldWaitForPortFree: process.platform === 'win32', hasOwnedProcess: () => this.child?.pid != null && this.ownsProcess, assertLifecycle: (phase) => { this.lifecycleController.assert(startEpoch, phase); }, resetStartupStderrLines: () => { this.recentStartupStderrLines = []; }, getStartupStderrLines: () => this.recentStartupStderrLines, findExistingGateway: async (port) => { return await findExistingGatewayProcess({ port, ownedPid: this.child?.pid ?? null }); }, connect: async (port, externalToken) => { await this.connectToGateway(port, externalToken); }, onConnectedToExistingGateway: () => { const isOwnProcess = this.child?.pid != null && this.ownsProcess; if (!isOwnProcess) { this.ownsProcess = false; this.externalShutdownSupported = null; this.setGatewayState({ pid: undefined }); } if (isOwnProcess) { this.restartController.recordRestartCompleted(); } }, waitForPortFree: async (port) => { await waitForPortFree(port); }, startProcess: async () => { await this.spawnGatewayProcess(); }, waitForReady: async (port) => { await waitForGatewayReady({ port, getProcessExitCode: () => this.exitCode, }); }, onConnectedToManagedGateway: () => { logManager.info('OpenClaw Gateway startup sequence completed', { port: this.port, pid: this.child?.pid, spawn: this.lastSpawnSummary, }); }, runDoctorRepair: async () => await runOpenClawDoctorRepair(), onDoctorRepairSuccess: () => { this.lastError = undefined; this.setGatewayState({ state: 'starting', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: undefined, reconnectAttempts: this.reconnectAttempts, gatewayReady: false, }); }, delay: async (ms) => { await new Promise((resolve) => setTimeout(resolve, ms)); }, }); this.lastError = undefined; this.reconnectAttempts = 0; this.setGatewayState({ state: 'running', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: undefined, reconnectAttempts: 0, }); logManager.info('OpenClaw Gateway connected', { port: this.port, pid: this.child?.pid, spawn: this.lastSpawnSummary, }); } catch (error) { if (error instanceof LifecycleSupersededError) { logManager.debug(error.message); return; } this.lastError = toErrorMessage(error); await this.disposeTransport('failed OpenClaw Gateway start'); this.setGatewayState({ state: 'error', port: this.port, pid: undefined, error: this.lastError, connectedAt: undefined, uptime: undefined, reconnectAttempts: this.reconnectAttempts, gatewayReady: undefined, }); throw error; } finally { this.startLock = false; this.restartController.flushDeferredRestart( 'start:finally', { state: this.lifecycleState, startLock: this.startLock, shouldReconnect: this.shouldReconnect, }, () => { void this.restart().catch((error) => { logManager.warn('Deferred Gateway restart failed:', error); }); }, ); } })(); try { await this.startPromise; } finally { this.startPromise = null; } } async stop(): Promise { if (this.stopPromise) { return await this.stopPromise; } this.stopPromise = (async () => { this.lifecycleController.bump('stop'); this.shouldReconnect = false; this.stopping = true; this.clearAllTimers(); if (!this.ownsProcess && this.socket?.readyState === WebSocket.OPEN && this.externalShutdownSupported !== false) { try { await this.requestGatewayShutdown(); this.externalShutdownSupported = true; } catch (error) { if (this.isUnsupportedShutdownError(error)) { this.externalShutdownSupported = false; logManager.info('External Gateway does not support "shutdown"; skipping shutdown RPC for future stops'); } else { logManager.warn('Failed to request shutdown for externally managed Gateway:', error); } } } await this.disposeTransport('stopping OpenClaw Gateway'); this.restartController.resetDeferredRestart(); this.isAutoReconnectStart = false; this.reconnectAttempts = 0; this.lastError = undefined; this.diagnostics.consecutiveHeartbeatMisses = 0; this.setGatewayState({ state: 'stopped', port: this.port, pid: undefined, error: undefined, connectedAt: undefined, uptime: undefined, reconnectAttempts: 0, gatewayReady: undefined, }); this.stopping = false; })(); try { await this.stopPromise; } finally { this.stopPromise = null; } } async forceTerminateOwnedProcessForQuit(): Promise { if (!this.child || !this.ownsProcess) { return false; } if (this.socket) { try { this.socket.terminate(); } catch { // ignore quit-time socket termination errors } this.socket = null; } this.clearGatewayReadyFallback(); this.connectionMonitor.clear(); clearPendingGatewayRequests( this.pendingRequests, new Error('Gateway terminated during app quit'), ); const child = this.child; await this.terminateChild(child); if (this.child === child) { this.child = null; } this.ownsProcess = false; this.setGatewayState({ pid: undefined, connectedAt: undefined, uptime: undefined, gatewayReady: undefined, }); return true; } async restart(options?: RuntimeChangeBroadcast): Promise { this.queueRuntimeChange(options); if (this.restartController.isRestartDeferred({ state: this.lifecycleState, startLock: this.startLock, })) { this.restartController.markDeferredRestart('restart', { state: this.lifecycleState, startLock: this.startLock, }); return; } if (this.restartInFlight) { await this.restartInFlight; return; } const decision = this.restartGovernor.decide(); if (!decision.allow) { const observability = this.restartGovernor.getObservability(); logManager.warn( `[gateway-restart-governor] restart suppressed reason=${decision.reason} retryAfterMs=${decision.retryAfterMs} suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`, ); const props = { reason: decision.reason, retry_after_ms: decision.retryAfterMs, gateway_restart_suppressed_total: observability.suppressed_total, gateway_restart_executed_total: observability.executed_total, gateway_restart_circuit_open_until: observability.circuit_open_until, }; trackMetric('gateway.restart.suppressed', props); captureTelemetryEvent('gateway_restart_suppressed', props); return; } const pidBefore = this.gatewayStatusDetails.pid ?? this.child?.pid; this.restartInFlight = (async () => { this.setGatewayState({ state: 'reconnecting', port: this.port, pid: this.child?.pid ?? this.gatewayStatusDetails.pid, error: this.lastError, reconnectAttempts: this.reconnectAttempts, gatewayReady: false, }); await this.stop(); try { await this.start(); } catch (error) { logManager.warn('Gateway restart: start() failed after stop(), enabling auto-reconnect recovery', error); this.shouldReconnect = true; this.scheduleReconnect(); throw error; } })(); try { await this.restartInFlight; this.restartGovernor.recordExecuted(); this.restartController.recordRestartCompleted(); const observability = this.restartGovernor.getObservability(); const props = { gateway_restart_executed_total: observability.executed_total, gateway_restart_suppressed_total: observability.suppressed_total, gateway_restart_circuit_open_until: observability.circuit_open_until, pid_before: pidBefore ?? 'n/a', pid_after: this.gatewayStatusDetails.pid ?? this.child?.pid ?? 'n/a', }; trackMetric('gateway.restart.executed', props); captureTelemetryEvent('gateway_restart_executed', props); const runtimeChange = this.takeQueuedRuntimeChange(); if (runtimeChange) { this.notifyRuntimeChanged(runtimeChange); } } finally { this.restartInFlight = null; this.restartController.flushDeferredRestart( 'restart:finally', { state: this.lifecycleState, startLock: this.startLock, shouldReconnect: this.shouldReconnect, }, () => { void this.restart().catch((error) => { logManager.warn('Deferred Gateway restart failed:', error); }); }, ); } } getStatus(): { status: GatewayStatus; initialized: boolean; mode: 'openclaw'; port: number | null; pid: number | null; lastError?: string; lifecycleState: GatewayLifecycleState; gatewayReady?: boolean; connectedAt?: number; uptime?: number; reconnectAttempts?: number; diagnostics: GatewayDiagnosticsSnapshot; runtime: ReturnType; } { const runtimeStatus = this.stateController.getStatus(); const connectedAt = runtimeStatus.connectedAt; const uptime = runtimeStatus.state === 'running' && connectedAt ? Date.now() - connectedAt : undefined; return { status: this.status, initialized: this.initialized, mode: this.mode, port: runtimeStatus.port ?? this.port, pid: runtimeStatus.pid ?? this.child?.pid ?? null, lastError: this.lastError, lifecycleState: runtimeStatus.state, gatewayReady: runtimeStatus.gatewayReady, connectedAt, uptime, reconnectAttempts: runtimeStatus.reconnectAttempts, diagnostics: this.getDiagnostics(), runtime: this.processOwner.getStatus(), }; } async checkHealth(): Promise<{ ok: boolean; status: GatewayStatus; initialized: boolean; mode: 'openclaw'; port: number | null; pid: number | null; lastError?: string; lifecycleState: GatewayLifecycleState; gatewayReady?: boolean; connectedAt?: number; uptime?: number; reconnectAttempts?: number; diagnostics: GatewayDiagnosticsSnapshot; runtime: ReturnType; }> { const status = this.getStatus(); return { ok: status.initialized && status.status === 'connected' && this.socket?.readyState === WebSocket.OPEN, ...status, }; } async rpc(method: string, params: any): Promise { if (!this.initialized) { await this.init(); } const localDispatch = dispatchGatewayRpcMethod( method, params, (event) => this.broadcast(event), ); if (localDispatch.handled) { return localDispatch.result; } switch (method) { default: throw new Error(`Unknown gateway RPC method: ${method}`); } } broadcast(event: GatewayEvent): void { const mainWindow = BrowserWindow.getAllWindows().find( (win) => windowManager.getName(win) === 'main', ); if (mainWindow && !mainWindow.isDestroyed()) { mainWindow.webContents.send('gateway:event', event); } } notifyRuntimeChanged(options: RuntimeChangeBroadcast): void { const topics = Array.from(new Set(options.topics.filter(Boolean))); if (topics.length === 0) { return; } this.broadcast({ type: 'runtime:changed', topics, reason: options.reason, warnings: options.warnings && options.warnings.length > 0 ? options.warnings : undefined, channelType: options.channelType, accountId: options.accountId, syncedAt: new Date().toISOString(), }); } reloadProviders(options?: RuntimeChangeBroadcast): void { const runtimeChange: RuntimeChangeBroadcast = { topics: options?.topics ?? ['providers', 'models'], reason: options?.reason ?? 'providers:reload', warnings: options?.warnings, channelType: options?.channelType, accountId: options?.accountId, }; if (this.initialized && (this.status === 'connected' || this.status === 'reconnecting')) { this.debouncedReload(undefined, runtimeChange); return; } this.notifyRuntimeChanged(runtimeChange); } } export const gatewayManager = new GatewayManager();