import { randomUUID } from 'node:crypto'; import { createServer } from 'node:net'; import { join } from 'node:path'; import { BrowserWindow } from 'electron'; import { windowManager } from '@electron/service/window-service'; import logManager from '@electron/service/logger'; import configManager from '@electron/service/config-service'; import { updateTrayStatus } from '@electron/service/tray'; import { getUserDataDir } from '@electron/utils/paths'; import { loadOrCreateDeviceIdentity, type DeviceIdentity, } from '@electron/utils/device-identity'; import { CONFIG_KEYS } from '@runtime/lib/constants'; import { normalizeAgentSessionKey } from '@runtime/lib/models'; import type { ContentBlock, RawMessage } from '@runtime/shared/chat-model'; import type { GatewayEvent, GatewayRpcParams, RuntimeRefreshTopic } from './types'; import * as providerHandlers from './handlers/provider'; import * as skillHandlers from './handlers/skills'; import { OpenClawProcessOwner } from './openclaw-process-owner'; import { launchGatewayProcess } from './process-launcher'; import { clearPendingGatewayRequests, rejectPendingGatewayRequest, resolvePendingGatewayRequest, type PendingGatewayRequest, } from './request-store'; import { connectGatewaySocket, waitForGatewayReady } from './ws-client'; type RuntimeChangeBroadcast = { topics: RuntimeRefreshTopic[]; reason?: string; warnings?: string[]; channelType?: string; accountId?: string; }; type GatewayStatus = 'connected' | 'disconnected' | 'reconnecting'; type GatewayResponseFrame = { type?: string; id?: string; ok?: boolean; payload?: unknown; error?: unknown; }; type GatewayEventFrame = { type?: string; event?: string; payload?: unknown; }; function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null; } function toErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; } return String(error); } function normalizeTimestamp(value: unknown): number | undefined { if (typeof value === 'number' && Number.isFinite(value)) { return value; } if (typeof value === 'string') { const parsed = Date.parse(value); if (Number.isFinite(parsed)) { return parsed; } } return undefined; } function normalizeMessageRole(value: unknown): RawMessage['role'] { const normalized = typeof value === 'string' ? value.trim().toLowerCase() : ''; switch (normalized) { case 'user': case 'assistant': case 'system': case 'tool_result': case 'toolresult': return normalized === 'tool_result' ? 'tool_result' : (normalized as RawMessage['role']); case 'tool': return 'toolresult'; default: return 'assistant'; } } function toTextContentBlocks(text: string): ContentBlock[] { return text ? [{ type: 'text', text }] : []; } function normalizeGatewayRawMessage( value: unknown, options?: { preferContentBlocks?: boolean }, ): RawMessage | null { if (!isRecord(value)) { return null; } const rawContent = value.content; let content: RawMessage['content'] = ''; if (typeof rawContent === 'string') { content = options?.preferContentBlocks ? toTextContentBlocks(rawContent) : rawContent; } else if (Array.isArray(rawContent)) { content = rawContent as ContentBlock[]; } else if (typeof value.text === 'string') { content = toTextContentBlocks(value.text); } return { ...(value as Partial), role: normalizeMessageRole(value.role), content, timestamp: normalizeTimestamp(value.timestamp), }; } function extractTextFromMessageContent(content: RawMessage['content'] | unknown): string { if (typeof content === 'string') { return content; } if (!Array.isArray(content)) { return ''; } return content .map((block) => { if (!isRecord(block)) { return ''; } if (typeof block.text === 'string') { return block.text; } if (typeof block.thinking === 'string') { return block.thinking; } if (typeof block.content === 'string') { return block.content; } return ''; }) .filter(Boolean) .join('\n'); } function extractTextFromRawMessage(message: RawMessage): string { return extractTextFromMessageContent(message.content); } function extractTextFromGatewayPayload(payload: Record): string { if (typeof payload.delta === 'string') { return payload.delta; } if (typeof payload.text === 'string') { return payload.text; } const normalizedMessage = normalizeGatewayRawMessage(payload.message); if (normalizedMessage) { return extractTextFromRawMessage(normalizedMessage); } return ''; } function buildGatewayRpcError(error: unknown, fallback: string): Error { if (typeof error === 'string' && error.trim()) { return new Error(error); } if (isRecord(error) && typeof error.message === 'string' && error.message.trim()) { return new Error(error.message); } return new Error(fallback); } async function findAvailablePort(): Promise { return await new Promise((resolve, reject) => { const server = createServer(); server.once('error', reject); server.listen(0, '127.0.0.1', () => { const address = server.address(); const port = typeof address === 'object' && address ? address.port : 0; server.close((error) => { if (error) { reject(error); return; } if (!port) { reject(new Error('Failed to allocate an available Gateway port')); return; } resolve(port); }); }); }); } class GatewayManager { private initialized = false; private initPromise: Promise | null = null; private startPromise: Promise | null = null; private stopPromise: Promise | null = null; private status: GatewayStatus = 'disconnected'; private readonly mode = 'openclaw' as const; private readonly processOwner = new OpenClawProcessOwner(); private readonly pendingRequests = new Map(); private readonly deltaSnapshots = new Map(); private gatewayToken = randomUUID(); private socket: WebSocket | null = null; private child: Electron.UtilityProcess | null = null; private port: number | null = null; private exitCode: number | null = null; private lastError?: string; private stopping = false; private deviceIdentity: DeviceIdentity | null = null; private setStatus(status: GatewayStatus): void { this.status = status; updateTrayStatus(status); this.broadcast({ type: 'gateway:status', status }); } private async initDeviceIdentity(): Promise { if (this.deviceIdentity) { return; } try { const identityPath = join(getUserDataDir(), 'openclaw-device-identity.json'); this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath); logManager.info('OpenClaw Gateway device identity loaded', { deviceId: this.deviceIdentity.deviceId, }); } catch (error) { logManager.warn('Failed to load OpenClaw device identity; scopes may be limited:', error); } } private async terminateChild(child: Electron.UtilityProcess): Promise { await new Promise((resolve) => { let settled = false; const finish = () => { if (settled) return; settled = true; resolve(); }; child.once('exit', () => { finish(); }); setTimeout(() => { finish(); }, 1500); try { child.kill(); } catch { finish(); } }); } private async disposeTransport(reason: string): Promise { const socket = this.socket; this.socket = null; if (socket) { try { socket.close(); } catch (error) { logManager.warn(`Failed to close OpenClaw Gateway socket during ${reason}:`, error); } } const child = this.child; this.child = null; this.port = null; if (child && this.exitCode === null) { this.exitCode = -1; } else if (!child) { this.exitCode = null; } this.deltaSnapshots.clear(); clearPendingGatewayRequests( this.pendingRequests, new Error(`Gateway request cancelled: ${reason}`), ); if (child) { await this.terminateChild(child); } } private bindProcessLifecycle(child: Electron.UtilityProcess): void { child.on('exit', (code) => { if (this.child !== child) { return; } this.exitCode = code ?? -1; this.child = null; this.port = null; if (this.stopping) { return; } this.lastError = `OpenClaw Gateway exited unexpectedly (code=${code ?? 'unknown'})`; this.socket = null; this.deltaSnapshots.clear(); clearPendingGatewayRequests( this.pendingRequests, new Error(this.lastError), ); this.setStatus('disconnected'); logManager.warn(this.lastError); }); child.on('error', (error) => { this.lastError = toErrorMessage(error); logManager.error('OpenClaw Gateway process error:', error); }); } private handleGatewaySocketClosed(socket: WebSocket, code: number): void { if (this.socket !== socket) { return; } if (this.stopping) { return; } this.socket = null; this.deltaSnapshots.clear(); this.lastError = `OpenClaw Gateway socket closed (code=${code})`; clearPendingGatewayRequests( this.pendingRequests, new Error(this.lastError), ); this.setStatus('disconnected'); logManager.warn(this.lastError); } private handleGatewayFrame(frame: unknown): void { if (!isRecord(frame)) { return; } if (frame.type === 'res' && typeof frame.id === 'string') { const response = frame as GatewayResponseFrame; if (response.ok === false) { rejectPendingGatewayRequest( this.pendingRequests, response.id!, buildGatewayRpcError(response.error, `Gateway RPC failed: ${response.id}`), ); return; } resolvePendingGatewayRequest( this.pendingRequests, response.id!, response.payload, ); return; } if (frame.type === 'event' && typeof frame.event === 'string') { this.handleGatewayEvent(frame as GatewayEventFrame); } } private handleGatewayEvent(event: GatewayEventFrame): void { switch (event.event) { case 'chat': if (isRecord(event.payload)) { this.handleChatEvent(event.payload); } break; case 'gateway.ready': logManager.info('OpenClaw Gateway reported ready'); break; default: break; } } private handleChatEvent(payload: Record): void { const sessionKey = typeof payload.sessionKey === 'string' ? normalizeAgentSessionKey(payload.sessionKey) : ''; const runId = typeof payload.runId === 'string' ? payload.runId : ''; const state = typeof payload.state === 'string' ? payload.state : ''; if (!sessionKey || !runId || !state) { return; } switch (state) { case 'delta': { const nextSnapshot = extractTextFromGatewayPayload(payload); if (!nextSnapshot) { return; } const previousSnapshot = this.deltaSnapshots.get(runId) ?? ''; const delta = nextSnapshot.startsWith(previousSnapshot) ? nextSnapshot.slice(previousSnapshot.length) : nextSnapshot; this.deltaSnapshots.set(runId, nextSnapshot); if (delta) { this.broadcast({ type: 'chat:delta', sessionKey, runId, delta, }); } break; } case 'final': { const snapshotText = this.deltaSnapshots.get(runId) ?? ''; this.deltaSnapshots.delete(runId); const message = normalizeGatewayRawMessage(payload.message, { preferContentBlocks: true, }) ?? ( snapshotText ? { role: 'assistant', content: toTextContentBlocks(snapshotText), timestamp: Date.now(), } : null ); if (!message) { return; } this.broadcast({ type: 'chat:final', sessionKey, runId, message, }); break; } case 'error': { this.deltaSnapshots.delete(runId); this.broadcast({ type: 'chat:error', sessionKey, runId, error: typeof payload.errorMessage === 'string' ? payload.errorMessage : 'Gateway chat error', }); break; } case 'aborted': { this.deltaSnapshots.delete(runId); this.broadcast({ type: 'chat:aborted', sessionKey, runId, }); break; } default: break; } } private async rpcGateway( method: string, params: Record, options?: { timeoutMs?: number | null }, ): Promise { await this.start(); const socket = this.socket; if (!socket || socket.readyState !== WebSocket.OPEN) { throw new Error('OpenClaw Gateway socket is not connected'); } const requestId = `${method}-${randomUUID()}`; const timeoutMs = options?.timeoutMs ?? 30_000; return await new Promise((resolve, reject) => { const timeout = timeoutMs === null ? null : setTimeout(() => { rejectPendingGatewayRequest( this.pendingRequests, requestId, new Error(`Gateway RPC timed out: ${method}`), ); }, timeoutMs); this.pendingRequests.set(requestId, { resolve, reject, timeout, }); try { socket.send(JSON.stringify({ type: 'req', id: requestId, method, params, })); } catch (error) { rejectPendingGatewayRequest( this.pendingRequests, requestId, new Error(`Failed to send Gateway RPC ${method}: ${toErrorMessage(error)}`), ); } }); } async init(): Promise { if (this.initialized) { return; } if (this.initPromise) { return await this.initPromise; } this.initPromise = (async () => { this.initialized = true; logManager.info('GatewayManager initialized in OpenClaw mode'); const autoStart = Boolean(configManager.get(CONFIG_KEYS.GATEWAY_AUTO_START)); if (!autoStart) { this.setStatus('disconnected'); return; } try { await this.start(); } catch (error) { this.lastError = toErrorMessage(error); this.setStatus('disconnected'); logManager.error('Failed to auto-start OpenClaw Gateway:', error); } })(); try { await this.initPromise; } finally { this.initPromise = null; } } async start(): Promise { if (this.status === 'connected' && this.socket?.readyState === WebSocket.OPEN) { return; } if (this.startPromise) { return await this.startPromise; } this.initialized = true; this.startPromise = (async () => { if (this.stopPromise) { await this.stopPromise; } this.stopping = false; this.lastError = undefined; this.setStatus('reconnecting'); try { await this.initDeviceIdentity(); await this.processOwner.prepare(); const runtimeStatus = this.processOwner.getStatus(); if (!runtimeStatus.entryExists) { throw new Error(runtimeStatus.lastError || `OpenClaw entry not found at ${runtimeStatus.runtimePaths.entryPath}`); } await this.disposeTransport('starting OpenClaw Gateway'); this.port = await findAvailablePort(); this.exitCode = null; this.gatewayToken = randomUUID(); const child = await launchGatewayProcess({ port: this.port, token: this.gatewayToken, openclawDir: runtimeStatus.runtimePaths.resolvedDir, entryScript: runtimeStatus.runtimePaths.entryPath, }); this.child = child; this.bindProcessLifecycle(child); await waitForGatewayReady({ port: this.port, getProcessExitCode: () => this.exitCode, }); this.socket = await connectGatewaySocket({ port: this.port, token: this.gatewayToken, deviceIdentity: this.deviceIdentity, platform: process.platform, onMessage: (message) => this.handleGatewayFrame(message), onCloseAfterHandshake: (socket, code) => this.handleGatewaySocketClosed(socket, code), }); this.lastError = undefined; this.setStatus('connected'); logManager.info('OpenClaw Gateway connected', { port: this.port, pid: this.child?.pid, }); } catch (error) { this.lastError = toErrorMessage(error); await this.disposeTransport('failed OpenClaw Gateway start'); this.setStatus('disconnected'); throw error; } })(); try { await this.startPromise; } finally { this.startPromise = null; } } async stop(): Promise { if (this.stopPromise) { return await this.stopPromise; } this.stopPromise = (async () => { this.stopping = true; await this.disposeTransport('stopping OpenClaw Gateway'); this.setStatus('disconnected'); this.stopping = false; })(); try { await this.stopPromise; } finally { this.stopPromise = null; } } async restart(options?: RuntimeChangeBroadcast): Promise { this.setStatus('reconnecting'); await this.stop(); await this.start(); if (options) { this.notifyRuntimeChanged(options); } } getStatus(): { status: GatewayStatus; initialized: boolean; mode: 'openclaw'; port: number | null; pid: number | null; lastError?: string; runtime: ReturnType; } { return { status: this.status, initialized: this.initialized, mode: this.mode, port: this.port, pid: this.child?.pid ?? null, lastError: this.lastError, runtime: this.processOwner.getStatus(), }; } async checkHealth(): Promise<{ ok: boolean; status: GatewayStatus; initialized: boolean; mode: 'openclaw'; port: number | null; pid: number | null; lastError?: string; runtime: ReturnType; }> { const status = this.getStatus(); return { ok: status.initialized && status.status === 'connected' && this.socket?.readyState === WebSocket.OPEN, ...status, }; } async rpc(method: string, params: any): Promise { if (!this.initialized) { await this.init(); } switch (method) { case 'chat.send': { const request = params as GatewayRpcParams['chat.send']; const sessionKey = normalizeAgentSessionKey(request.sessionKey); const messageText = extractTextFromRawMessage(request.message); const response = await this.rpcGateway('chat.send', { sessionKey, message: messageText, deliver: false, idempotencyKey: request.message.id || randomUUID(), }, { timeoutMs: 30_000 }); const runId = ( isRecord(response) && typeof response.runId === 'string' && response.runId.trim() ) ? response.runId : ''; if (!runId) { throw new Error('OpenClaw Gateway chat.send did not return a runId'); } return { runId }; } case 'chat.history': { const request = params as GatewayRpcParams['chat.history']; const response = await this.rpcGateway('chat.history', { sessionKey: normalizeAgentSessionKey(request.sessionKey), limit: request.limit ?? 50, }, { timeoutMs: 15_000 }); if (!isRecord(response) || !Array.isArray(response.messages)) { return []; } return response.messages .map((message) => normalizeGatewayRawMessage(message)) .filter((message): message is RawMessage => message !== null); } case 'chat.abort': { const request = params as GatewayRpcParams['chat.abort']; await this.rpcGateway('chat.abort', { sessionKey: normalizeAgentSessionKey(request.sessionKey), }, { timeoutMs: 10_000 }); return; } case 'session.list': { const response = await this.rpcGateway('sessions.list', {}, { timeoutMs: 10_000 }); if (!isRecord(response) || !Array.isArray(response.sessions)) { return []; } return response.sessions .map((session) => ( isRecord(session) && typeof session.key === 'string' ? session.key : null )) .filter((sessionKey): sessionKey is string => Boolean(sessionKey)); } case 'session.delete': { const request = params as GatewayRpcParams['session.delete']; await this.rpcGateway('sessions.delete', { key: normalizeAgentSessionKey(request.sessionKey), deleteTranscript: true, }, { timeoutMs: 15_000 }); return { success: true }; } case 'provider.list': return providerHandlers.handleProviderList(); case 'provider.getDefault': return providerHandlers.handleProviderGetDefault(); case 'skills.status': return skillHandlers.handleSkillsStatus(); case 'skills.update': return skillHandlers.handleSkillsUpdate(params); default: throw new Error(`Unknown gateway RPC method: ${method}`); } } broadcast(event: GatewayEvent): void { const mainWindow = BrowserWindow.getAllWindows().find( (win) => windowManager.getName(win) === 'main', ); if (mainWindow && !mainWindow.isDestroyed()) { mainWindow.webContents.send('gateway:event', event); } } notifyRuntimeChanged(options: RuntimeChangeBroadcast): void { const topics = Array.from(new Set(options.topics.filter(Boolean))); if (topics.length === 0) { return; } this.broadcast({ type: 'runtime:changed', topics, reason: options.reason, warnings: options.warnings && options.warnings.length > 0 ? options.warnings : undefined, channelType: options.channelType, accountId: options.accountId, syncedAt: new Date().toISOString(), }); } reloadProviders(options?: RuntimeChangeBroadcast): void { const runtimeChange: RuntimeChangeBroadcast = { topics: options?.topics ?? ['providers', 'models'], reason: options?.reason ?? 'providers:reload', warnings: options?.warnings, channelType: options?.channelType, accountId: options?.accountId, }; if (this.initialized && (this.status === 'connected' || this.status === 'reconnecting')) { void this.restart(runtimeChange).catch((error) => { const warning = `Gateway restart after provider reload failed: ${toErrorMessage(error)}`; logManager.error(warning, error); this.notifyRuntimeChanged({ ...runtimeChange, warnings: [...(runtimeChange.warnings ?? []), warning], }); }); return; } this.notifyRuntimeChanged(runtimeChange); } } export const gatewayManager = new GatewayManager();