diff --git a/electron/gateway/manager.ts b/electron/gateway/manager.ts index a13fe78..7e8575f 100644 --- a/electron/gateway/manager.ts +++ b/electron/gateway/manager.ts @@ -11,7 +11,6 @@ import { PORTS } from '../utils/config'; import { appendNodeRequireToNodeOptions, } from '../utils/paths'; -import { getSetting } from '../utils/store'; import { JsonRpcNotification, isNotification, isResponse } from './protocol'; import { logger } from '../utils/logger'; import { @@ -37,7 +36,7 @@ import { import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch'; import { GatewayStateController } from './state'; import { prepareGatewayLaunchContext } from './config-sync'; -import { buildGatewayConnectFrame, probeGatewayReady } from './ws-client'; +import { connectGatewaySocket, probeGatewayReady } from './ws-client'; import { findExistingGatewayProcess, isTransientGatewayStartError, @@ -788,172 +787,30 @@ export class GatewayManager extends EventEmitter { * Connect WebSocket to Gateway */ private async connect(port: number, _externalToken?: string): Promise { - logger.debug(`Connecting Gateway WebSocket (ws://localhost:${port}/ws)`); - - return new Promise((resolve, reject) => { - // WebSocket URL (token will be sent in connect handshake, not URL) - const wsUrl = `ws://localhost:${port}/ws`; - - this.ws = new WebSocket(wsUrl); - let handshakeComplete = false; - let connectId: string | null = null; - let handshakeTimeout: NodeJS.Timeout | null = null; - let settled = false; - - let challengeTimer: NodeJS.Timeout | null = null; - - const cleanupHandshakeRequest = () => { - if (challengeTimer) { - clearTimeout(challengeTimer); - challengeTimer = null; - } - if (handshakeTimeout) { - clearTimeout(handshakeTimeout); - handshakeTimeout = null; - } - if (connectId && this.pendingRequests.has(connectId)) { - const request = this.pendingRequests.get(connectId); - if (request) { - clearTimeout(request.timeout); - } - this.pendingRequests.delete(connectId); - } - }; - - const resolveOnce = () => { - if (settled) return; - settled = true; - cleanupHandshakeRequest(); - resolve(); - }; - - const rejectOnce = (error: unknown) => { - if (settled) return; - settled = true; - cleanupHandshakeRequest(); - const err = error instanceof Error ? error : new Error(String(error)); - reject(err); - }; - - // Sends the connect frame using the server-issued challenge nonce. - const sendConnectHandshake = async (challengeNonce: string) => { - logger.debug('Sending connect handshake with challenge nonce'); - - const currentToken = await getSetting('gatewayToken'); - const connectPayload = buildGatewayConnectFrame({ - challengeNonce, - token: currentToken, - deviceIdentity: this.deviceIdentity, - platform: process.platform, + this.ws = await connectGatewaySocket({ + port, + deviceIdentity: this.deviceIdentity, + platform: process.platform, + pendingRequests: this.pendingRequests, + getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')), + onHandshakeComplete: (ws) => { + this.ws = ws; + this.setStatus({ + state: 'running', + port, + connectedAt: Date.now(), }); - connectId = connectPayload.connectId; - - this.ws?.send(JSON.stringify(connectPayload.frame)); - - const requestTimeout = setTimeout(() => { - if (!handshakeComplete) { - logger.error('Gateway connect handshake timed out'); - this.ws?.close(); - rejectOnce(new Error('Connect handshake timeout')); - } - }, 10000); - handshakeTimeout = requestTimeout; - - this.pendingRequests.set(connectId, { - resolve: (_result) => { - handshakeComplete = true; - logger.debug('Gateway connect handshake completed'); - this.setStatus({ - state: 'running', - port, - connectedAt: Date.now(), - }); - this.startPing(); - resolveOnce(); - }, - reject: (error) => { - logger.error('Gateway connect handshake failed:', error); - rejectOnce(error); - }, - timeout: requestTimeout, - }); - }; - - // Timeout for receiving the initial connect.challenge from the server. - // Without this, if the server never sends the challenge (e.g. orphaned - // process from a different version), the connect() promise hangs forever. - challengeTimer = setTimeout(() => { - if (!challengeReceived && !settled) { - logger.error('Gateway connect.challenge not received within timeout'); - this.ws?.close(); - rejectOnce(new Error('Timed out waiting for connect.challenge from Gateway')); - } - }, 10000); - - this.ws.on('open', () => { - logger.debug('Gateway WebSocket opened, waiting for connect.challenge...'); - }); - - let challengeReceived = false; - - this.ws.on('message', (data) => { - try { - const message = JSON.parse(data.toString()); - - // Intercept the connect.challenge event before the general handler - if ( - !challengeReceived && - typeof message === 'object' && message !== null && - message.type === 'event' && message.event === 'connect.challenge' - ) { - challengeReceived = true; - if (challengeTimer) { - clearTimeout(challengeTimer); - challengeTimer = null; - } - const nonce = message.payload?.nonce as string | undefined; - if (!nonce) { - rejectOnce(new Error('Gateway connect.challenge missing nonce')); - return; - } - logger.debug('Received connect.challenge, sending handshake'); - sendConnectHandshake(nonce); - return; - } - - this.handleMessage(message); - } catch (error) { - logger.debug('Failed to parse Gateway WebSocket message:', error); - } - }); - - this.ws.on('close', (code, reason) => { - const reasonStr = reason?.toString() || 'unknown'; - logger.warn(`Gateway WebSocket closed (code=${code}, reason=${reasonStr}, handshake=${handshakeComplete ? 'ok' : 'pending'})`); - if (!handshakeComplete) { - // If the socket closes before the handshake completes, it usually means the server is still starting or restarting. - // Rejecting this promise will cause the caller (startProcess/reconnect logic) to retry cleanly. - rejectOnce(new Error(`WebSocket closed before handshake: ${reasonStr}`)); - return; - } - cleanupHandshakeRequest(); + this.startPing(); + }, + onMessage: (message) => { + this.handleMessage(message); + }, + onCloseAfterHandshake: () => { if (this.status.state === 'running') { this.setStatus({ state: 'stopped' }); this.scheduleReconnect(); } - }); - - this.ws.on('error', (error) => { - // Suppress noisy ECONNREFUSED/WebSocket handshake errors that happen during expected Gateway restarts. - if (error.message?.includes('closed before handshake') || (error as NodeJS.ErrnoException).code === 'ECONNREFUSED') { - logger.debug(`Gateway WebSocket connection error (transient): ${error.message}`); - } else { - logger.error('Gateway WebSocket error:', error); - } - if (!handshakeComplete) { - rejectOnce(error); - } - }); + }, }); } diff --git a/electron/gateway/startup-recovery.ts b/electron/gateway/startup-recovery.ts index c630a51..a35685e 100644 --- a/electron/gateway/startup-recovery.ts +++ b/electron/gateway/startup-recovery.ts @@ -12,6 +12,14 @@ const INVALID_CONFIG_PATTERNS: RegExp[] = [ /\brun:\s*openclaw doctor --fix\b/i, ]; +const TRANSIENT_START_ERROR_PATTERNS: RegExp[] = [ + /WebSocket closed before handshake/i, + /ECONNREFUSED/i, + /Gateway process exited before becoming ready/i, + /Timed out waiting for connect\.challenge/i, + /Connect handshake timeout/i, +]; + function normalizeLogLine(value: string): string { return value.trim(); } @@ -58,3 +66,34 @@ export function shouldAttemptConfigAutoRepair( return hasInvalidConfigFailureSignal(startupError, startupStderrLines); } +export function isTransientGatewayStartError(error: unknown): boolean { + const errorText = error instanceof Error + ? `${error.name}: ${error.message}` + : String(error ?? ''); + return TRANSIENT_START_ERROR_PATTERNS.some((pattern) => pattern.test(errorText)); +} + +export type GatewayStartupRecoveryAction = 'repair' | 'retry' | 'fail'; + +export function getGatewayStartupRecoveryAction(options: { + startupError: unknown; + startupStderrLines: string[]; + configRepairAttempted: boolean; + attempt: number; + maxAttempts: number; +}): GatewayStartupRecoveryAction { + if (shouldAttemptConfigAutoRepair( + options.startupError, + options.startupStderrLines, + options.configRepairAttempted, + )) { + return 'repair'; + } + + if (options.attempt < options.maxAttempts && isTransientGatewayStartError(options.startupError)) { + return 'retry'; + } + + return 'fail'; +} + diff --git a/electron/gateway/ws-client.ts b/electron/gateway/ws-client.ts index 2addefe..a7ff275 100644 --- a/electron/gateway/ws-client.ts +++ b/electron/gateway/ws-client.ts @@ -1,10 +1,12 @@ import WebSocket from 'ws'; import type { DeviceIdentity } from '../utils/device-identity'; +import type { PendingGatewayRequest } from './request-store'; import { buildDeviceAuthPayload, publicKeyRawBase64UrlFromPem, signDevicePayload, } from '../utils/device-identity'; +import { logger } from '../utils/logger'; export async function probeGatewayReady( port: number, @@ -120,3 +122,160 @@ export function buildGatewayConnectFrame(options: { }, }; } + +export async function connectGatewaySocket(options: { + port: number; + deviceIdentity: DeviceIdentity | null; + platform: string; + pendingRequests: Map; + getToken: () => Promise; + onHandshakeComplete: (ws: WebSocket) => void; + onMessage: (message: unknown) => void; + onCloseAfterHandshake: () => void; +}): Promise { + logger.debug(`Connecting Gateway WebSocket (ws://localhost:${options.port}/ws)`); + + return await new Promise((resolve, reject) => { + const wsUrl = `ws://localhost:${options.port}/ws`; + const ws = new WebSocket(wsUrl); + let handshakeComplete = false; + let connectId: string | null = null; + let handshakeTimeout: NodeJS.Timeout | null = null; + let challengeTimer: NodeJS.Timeout | null = null; + let challengeReceived = false; + let settled = false; + + const cleanupHandshakeRequest = () => { + if (challengeTimer) { + clearTimeout(challengeTimer); + challengeTimer = null; + } + if (handshakeTimeout) { + clearTimeout(handshakeTimeout); + handshakeTimeout = null; + } + if (connectId && options.pendingRequests.has(connectId)) { + const request = options.pendingRequests.get(connectId); + if (request) { + clearTimeout(request.timeout); + } + options.pendingRequests.delete(connectId); + } + }; + + const resolveOnce = () => { + if (settled) return; + settled = true; + cleanupHandshakeRequest(); + resolve(ws); + }; + + const rejectOnce = (error: unknown) => { + if (settled) return; + settled = true; + cleanupHandshakeRequest(); + reject(error instanceof Error ? error : new Error(String(error))); + }; + + const sendConnectHandshake = async (challengeNonce: string) => { + logger.debug('Sending connect handshake with challenge nonce'); + + const currentToken = await options.getToken(); + const connectPayload = buildGatewayConnectFrame({ + challengeNonce, + token: currentToken, + deviceIdentity: options.deviceIdentity, + platform: options.platform, + }); + connectId = connectPayload.connectId; + + ws.send(JSON.stringify(connectPayload.frame)); + + const requestTimeout = setTimeout(() => { + if (!handshakeComplete) { + logger.error('Gateway connect handshake timed out'); + ws.close(); + rejectOnce(new Error('Connect handshake timeout')); + } + }, 10000); + handshakeTimeout = requestTimeout; + + options.pendingRequests.set(connectId, { + resolve: () => { + handshakeComplete = true; + logger.debug('Gateway connect handshake completed'); + options.onHandshakeComplete(ws); + resolveOnce(); + }, + reject: (error) => { + logger.error('Gateway connect handshake failed:', error); + rejectOnce(error); + }, + timeout: requestTimeout, + }); + }; + + challengeTimer = setTimeout(() => { + if (!challengeReceived && !settled) { + logger.error('Gateway connect.challenge not received within timeout'); + ws.close(); + rejectOnce(new Error('Timed out waiting for connect.challenge from Gateway')); + } + }, 10000); + + ws.on('open', () => { + logger.debug('Gateway WebSocket opened, waiting for connect.challenge...'); + }); + + ws.on('message', (data) => { + try { + const message = JSON.parse(data.toString()); + if ( + !challengeReceived && + typeof message === 'object' && message !== null && + message.type === 'event' && message.event === 'connect.challenge' + ) { + challengeReceived = true; + if (challengeTimer) { + clearTimeout(challengeTimer); + challengeTimer = null; + } + const nonce = message.payload?.nonce as string | undefined; + if (!nonce) { + rejectOnce(new Error('Gateway connect.challenge missing nonce')); + return; + } + logger.debug('Received connect.challenge, sending handshake'); + void sendConnectHandshake(nonce); + return; + } + + options.onMessage(message); + } catch (error) { + logger.debug('Failed to parse Gateway WebSocket message:', error); + } + }); + + ws.on('close', (code, reason) => { + const reasonStr = reason?.toString() || 'unknown'; + logger.warn(`Gateway WebSocket closed (code=${code}, reason=${reasonStr}, handshake=${handshakeComplete ? 'ok' : 'pending'})`); + if (!handshakeComplete) { + rejectOnce(new Error(`WebSocket closed before handshake: ${reasonStr}`)); + return; + } + cleanupHandshakeRequest(); + options.onCloseAfterHandshake(); + }); + + ws.on('error', (error) => { + if (error.message?.includes('closed before handshake') || (error as NodeJS.ErrnoException).code === 'ECONNREFUSED') { + logger.debug(`Gateway WebSocket connection error (transient): ${error.message}`); + } else { + logger.error('Gateway WebSocket error:', error); + } + if (!handshakeComplete) { + rejectOnce(error); + } + }); + }); +}