Files
NianToB/electron/gateway/manager.ts
Haze 6bacbd964d perf(gateway): shorten gateway.ready fallback timeout from 30s to 5s
The fallback exists as a safety net for the server-side gateway.ready
event. In practice OpenClaw's plugin bootstrap can push the real event
well past 30s (observed: handshake completes, then 30s tick by, then the
fallback fires with no event having arrived). That long tail kept the
stale gating code blocking UI state for the full 30s.

Step 1 moved sessions.list off the gatewayReady gate, so this value now
only matters as a belt-and-braces signal for any future consumer. 5s is
long enough to preserve "event wins when it actually fires on a healthy
boot" while avoiding a multi-second stall whenever the server is slow.

Updated gateway-ready-fallback.test.ts to advance timers around the new
boundary.
2026-04-24 17:04:10 +08:00

1216 lines
44 KiB
TypeScript

/**
* Gateway Process Manager
* Manages the OpenClaw Gateway process lifecycle
*/
import { app } from 'electron';
import path from 'path';
import { EventEmitter } from 'events';
import WebSocket from 'ws';
import { PORTS } from '../utils/config';
import { JsonRpcNotification, isNotification, isResponse } from './protocol';
import { logger } from '../utils/logger';
import { captureTelemetryEvent, trackMetric } from '../utils/telemetry';
import {
loadOrCreateDeviceIdentity,
type DeviceIdentity,
} from '../utils/device-identity';
import {
DEFAULT_RECONNECT_CONFIG,
type ReconnectConfig,
type GatewayLifecycleState,
getReconnectScheduleDecision,
getReconnectSkipReason,
} from './process-policy';
import {
clearPendingGatewayRequests,
rejectPendingGatewayRequest,
resolvePendingGatewayRequest,
type PendingGatewayRequest,
} from './request-store';
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
import { GatewayStateController } from './state';
import { prepareGatewayLaunchContext } from './config-sync';
import { connectGatewaySocket, waitForGatewayReady } from './ws-client';
import {
findExistingGatewayProcess,
runOpenClawDoctorRepair,
terminateOwnedGatewayProcess,
unloadLaunchctlGatewayService,
waitForPortFree,
warmupManagedPythonReadiness,
} from './supervisor';
import { GatewayConnectionMonitor } from './connection-monitor';
import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller';
import { launchGatewayProcess } from './process-launcher';
import { GatewayRestartController } from './restart-controller';
import { GatewayRestartGovernor } from './restart-governor';
import {
DEFAULT_GATEWAY_RELOAD_POLICY,
loadGatewayReloadPolicy,
type GatewayReloadPolicy,
} from './reload-policy';
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
import { runGatewayStartupSequence } from './startup-orchestrator';
export interface GatewayStatus {
state: GatewayLifecycleState;
port: number;
pid?: number;
uptime?: number;
error?: string;
connectedAt?: number;
version?: string;
reconnectAttempts?: number;
/** True once the gateway's internal subsystems (skills, plugins) are ready for RPC calls. */
gatewayReady?: boolean;
}
export type GatewayHealthState = 'healthy' | 'degraded' | 'unresponsive';
export interface GatewayHealthSummary {
state: GatewayHealthState;
reasons: string[];
consecutiveHeartbeatMisses: number;
lastAliveAt?: number;
lastRpcSuccessAt?: number;
lastRpcFailureAt?: number;
lastRpcFailureMethod?: string;
lastChannelsStatusOkAt?: number;
lastChannelsStatusFailureAt?: number;
}
export interface GatewayDiagnosticsSnapshot {
lastAliveAt?: number;
lastRpcSuccessAt?: number;
lastRpcFailureAt?: number;
lastRpcFailureMethod?: string;
lastHeartbeatTimeoutAt?: number;
consecutiveHeartbeatMisses: number;
lastSocketCloseAt?: number;
lastSocketCloseCode?: number;
consecutiveRpcFailures: number;
}
function isTransportRpcFailure(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return message.includes('RPC timeout:')
|| message.includes('Gateway not connected')
|| message.includes('Gateway stopped')
|| message.includes('Failed to send RPC request:');
}
/**
* Gateway Manager Events
*/
export interface GatewayManagerEvents {
status: (status: GatewayStatus) => void;
message: (message: unknown) => void;
notification: (notification: JsonRpcNotification) => void;
exit: (code: number | null) => void;
error: (error: Error) => void;
'channel:status': (data: { channelId: string; status: string }) => void;
'chat:message': (data: { message: unknown }) => void;
}
/**
* Gateway Manager
* Handles starting, stopping, and communicating with the OpenClaw Gateway
*/
export class GatewayManager extends EventEmitter {
private process: Electron.UtilityProcess | null = null;
private processExitCode: number | null = null; // set by exit event, replaces exitCode/signalCode
private ownsProcess = false;
private ws: WebSocket | null = null;
private status: GatewayStatus = { state: 'stopped', port: PORTS.OPENCLAW_GATEWAY };
private readonly stateController: GatewayStateController;
private reconnectTimer: NodeJS.Timeout | null = null;
private reconnectAttempts = 0;
private reconnectConfig: ReconnectConfig;
private shouldReconnect = true;
private startLock = false;
private lastSpawnSummary: string | null = null;
private recentStartupStderrLines: string[] = [];
private pendingRequests: Map<string, PendingGatewayRequest> = new Map();
private deviceIdentity: DeviceIdentity | null = null;
private restartInFlight: Promise<void> | null = null;
private readonly connectionMonitor = new GatewayConnectionMonitor();
private readonly lifecycleController = new GatewayLifecycleController();
private readonly restartController = new GatewayRestartController();
private readonly restartGovernor = new GatewayRestartGovernor();
private reloadDebounceTimer: NodeJS.Timeout | null = null;
private reloadPolicy: GatewayReloadPolicy = { ...DEFAULT_GATEWAY_RELOAD_POLICY };
private reloadPolicyLoadedAt = 0;
private reloadPolicyRefreshPromise: Promise<void> | null = null;
private externalShutdownSupported: boolean | null = null;
private reconnectAttemptsTotal = 0;
private reconnectSuccessTotal = 0;
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
private static readonly HEARTBEAT_INTERVAL_MS = 30_000;
private static readonly HEARTBEAT_TIMEOUT_MS = 12_000;
private static readonly HEARTBEAT_MAX_MISSES = 3;
// Windows-specific heartbeat parameters — more lenient to reduce log noise
// from false positives caused by Windows Defender scans, system updates,
// and synchronous event-loop blocking in the gateway.
private static readonly HEARTBEAT_INTERVAL_MS_WIN = 60_000;
private static readonly HEARTBEAT_TIMEOUT_MS_WIN = 25_000;
private static readonly HEARTBEAT_MAX_MISSES_WIN = 5;
public static readonly RESTART_COOLDOWN_MS = 5_000;
// Fallback for the server-side gateway.ready event: if the event doesn't
// arrive within this window after the WS handshake completes, we assume the
// gateway is effectively ready so downstream consumers don't block forever.
// Kept short (5s) because handshake completion already implies a working
// RPC channel — this is only a safety net, not the primary signal.
private static readonly GATEWAY_READY_FALLBACK_MS = 5_000;
private lastRestartAt = 0;
/** Set by scheduleReconnect() before calling start() to signal auto-reconnect. */
private isAutoReconnectStart = false;
private gatewayReadyFallbackTimer: NodeJS.Timeout | null = null;
private diagnostics: GatewayDiagnosticsSnapshot = {
consecutiveHeartbeatMisses: 0,
consecutiveRpcFailures: 0,
};
constructor(config?: Partial<ReconnectConfig>) {
super();
this.stateController = new GatewayStateController({
emitStatus: (status) => {
this.status = status;
this.emit('status', status);
},
onTransition: (previousState, nextState) => {
if (nextState === 'running') {
this.restartGovernor.onRunning();
}
this.restartController.flushDeferredRestart(
`status:${previousState}->${nextState}`,
{
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
},
);
},
});
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
// Device identity is loaded lazily in start() — not in the constructor —
// so that async file I/O and key generation don't block module loading.
this.on('gateway:ready', () => {
this.clearGatewayReadyFallback();
if (this.status.state === 'running' && !this.status.gatewayReady) {
logger.info('Gateway subsystems ready (event received)');
this.setStatus({ gatewayReady: true });
}
});
}
private async initDeviceIdentity(): Promise<void> {
if (this.deviceIdentity) return; // already loaded
try {
const identityPath = path.join(app.getPath('userData'), 'clawx-device-identity.json');
this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath);
logger.debug(`Device identity loaded (deviceId=${this.deviceIdentity.deviceId})`);
} catch (err) {
logger.warn('Failed to load device identity, scopes will be limited:', err);
}
}
private sanitizeSpawnArgs(args: string[]): string[] {
const sanitized = [...args];
const tokenIdx = sanitized.indexOf('--token');
if (tokenIdx !== -1 && tokenIdx + 1 < sanitized.length) {
sanitized[tokenIdx + 1] = '[redacted]';
}
return sanitized;
}
private isUnsupportedShutdownError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return /unknown method:\s*shutdown/i.test(message);
}
/**
* Get current Gateway status
*/
getStatus(): GatewayStatus {
return this.stateController.getStatus();
}
getDiagnostics(): GatewayDiagnosticsSnapshot {
return { ...this.diagnostics };
}
/**
* Check if Gateway is connected and ready
*/
isConnected(): boolean {
return this.stateController.isConnected(this.ws?.readyState === WebSocket.OPEN);
}
/**
* Start Gateway process
*/
async start(): Promise<void> {
if (this.startLock) {
logger.debug('Gateway start ignored because a start flow is already in progress');
return;
}
if (this.status.state === 'running') {
logger.debug('Gateway already running, skipping start');
return;
}
this.startLock = true;
const startEpoch = this.lifecycleController.bump('start');
logger.info(`Gateway start requested (port=${this.status.port})`);
this.lastSpawnSummary = null;
this.shouldReconnect = true;
await this.refreshReloadPolicy(true);
// Lazily load device identity (async file I/O + key generation).
// Must happen before connect() which uses the identity for the handshake.
await this.initDeviceIdentity();
// Manual start should override and cancel any pending reconnect timer.
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
logger.debug('Cleared pending reconnect timer because start was requested manually');
}
// Only reset reconnectAttempts on manual start, not on auto-reconnect.
// Auto-reconnect calls start() via scheduleReconnect(); those should
// accumulate attempts so the maxAttempts cap works correctly.
if (!this.isAutoReconnectStart) {
this.reconnectAttempts = 0;
}
this.isAutoReconnectStart = false; // consume the flag
this.setStatus({ state: 'starting', reconnectAttempts: this.reconnectAttempts, gatewayReady: false });
// Check if Python environment is ready (self-healing) asynchronously.
// Fire-and-forget: only needs to run once, not on every retry.
warmupManagedPythonReadiness();
const t0 = Date.now();
let tSpawned = 0;
let tReady = 0;
try {
await runGatewayStartupSequence({
port: this.status.port,
ownedPid: this.process?.pid,
shouldWaitForPortFree: process.platform === 'win32',
hasOwnedProcess: () => this.process?.pid != null && this.ownsProcess,
resetStartupStderrLines: () => {
this.recentStartupStderrLines = [];
},
getStartupStderrLines: () => this.recentStartupStderrLines,
assertLifecycle: (phase) => {
this.lifecycleController.assert(startEpoch, phase);
},
findExistingGateway: async (port) => {
// Always read the current process pid dynamically so that retries
// don't treat a just-spawned gateway as an orphan. The ownedPid
// snapshot captured at start() entry is stale after startProcess()
// replaces this.process — leading to the just-started pid being
// immediately killed as a false orphan on the next retry iteration.
return await findExistingGatewayProcess({ port, ownedPid: this.process?.pid });
},
connect: async (port, externalToken) => {
await this.connect(port, externalToken);
},
onConnectedToExistingGateway: () => {
// If the existing gateway is actually our own spawned UtilityProcess
// (e.g. after a self-restart code=1012), keep ownership so that
// stop() can still terminate the process during a restart() cycle.
const isOwnProcess = this.process?.pid != null && this.ownsProcess;
if (!isOwnProcess) {
this.ownsProcess = false;
this.setStatus({ pid: undefined });
}
// Treat a successful reconnect to the owned process as a restart
// completion (e.g. after a Gateway code-1012 in-process restart).
// This updates lastRestartCompletedAt so that flushDeferredRestart
// drops any deferred restart requested before this reconnect,
// avoiding a redundant kill+respawn cycle.
if (isOwnProcess) {
this.restartController.recordRestartCompleted();
}
this.startHealthCheck();
},
waitForPortFree: async (port) => {
await waitForPortFree(port);
},
startProcess: async () => {
await this.startProcess();
tSpawned = Date.now();
},
waitForReady: async (port) => {
await waitForGatewayReady({
port,
getProcessExitCode: () => this.processExitCode,
});
tReady = Date.now();
},
onConnectedToManagedGateway: () => {
this.startHealthCheck();
const tConnected = Date.now();
logger.info('[metric] gateway.startup', {
configSyncMs: tSpawned ? tSpawned - t0 : undefined,
spawnToReadyMs: tReady && tSpawned ? tReady - tSpawned : undefined,
readyToConnectMs: tReady ? tConnected - tReady : undefined,
totalMs: tConnected - t0,
});
},
runDoctorRepair: async () => await runOpenClawDoctorRepair(),
onDoctorRepairSuccess: () => {
this.setStatus({ state: 'starting', error: undefined, reconnectAttempts: 0 });
},
delay: async (ms) => {
await new Promise((resolve) => setTimeout(resolve, ms));
},
});
} catch (error) {
if (error instanceof LifecycleSupersededError) {
logger.debug(error.message);
return;
}
logger.error(
`Gateway start failed (port=${this.status.port}, reconnectAttempts=${this.reconnectAttempts}, spawn=${this.lastSpawnSummary ?? 'n/a'})`,
error
);
this.setStatus({ state: 'error', error: String(error) });
throw error;
} finally {
this.startLock = false;
this.restartController.flushDeferredRestart(
'start:finally',
{
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
},
);
}
}
/**
* Stop Gateway process
*/
async stop(): Promise<void> {
logger.info('Gateway stop requested');
this.lifecycleController.bump('stop');
// Disable auto-reconnect
this.shouldReconnect = false;
// Clear all timers
this.clearAllTimers();
// If this manager is attached to an external gateway process, ask it to shut down
// over protocol before closing the socket.
if (!this.ownsProcess && this.ws?.readyState === WebSocket.OPEN && this.externalShutdownSupported !== false) {
try {
await this.rpc('shutdown', undefined, 5000);
this.externalShutdownSupported = true;
} catch (error) {
if (this.isUnsupportedShutdownError(error)) {
this.externalShutdownSupported = false;
logger.info('External Gateway does not support "shutdown"; skipping shutdown RPC for future stops');
} else {
logger.warn('Failed to request shutdown for externally managed Gateway:', error);
}
}
}
// Close WebSocket — use terminate() to force-close the TCP connection
// immediately without waiting for the WebSocket close handshake.
// ws.close() sends a close frame and waits for the server to respond;
// if the gateway process is being killed concurrently, the handshake
// never completes and the connection stays ESTABLISHED indefinitely,
// accumulating leaked connections on every restart cycle.
if (this.ws) {
try { this.ws.terminate(); } catch { /* ignore */ }
this.ws = null;
}
// Kill process
if (this.process && this.ownsProcess) {
const child = this.process;
await terminateOwnedGatewayProcess(child);
if (this.process === child) {
this.process = null;
}
}
this.ownsProcess = false;
clearPendingGatewayRequests(this.pendingRequests, new Error('Gateway stopped'));
this.restartController.resetDeferredRestart();
this.isAutoReconnectStart = false;
this.diagnostics.consecutiveHeartbeatMisses = 0;
this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined, gatewayReady: undefined });
}
/**
* Best-effort emergency cleanup for app-quit timeout paths.
* Only terminates a process this manager still owns.
*/
async forceTerminateOwnedProcessForQuit(): Promise<boolean> {
if (!this.process || !this.ownsProcess) {
return false;
}
const child = this.process;
await terminateOwnedGatewayProcess(child);
if (this.process === child) {
this.process = null;
}
this.ownsProcess = false;
this.setStatus({ pid: undefined });
return true;
}
/**
* Restart Gateway process
*/
async restart(): Promise<void> {
if (this.restartController.isRestartDeferred({
state: this.status.state,
startLock: this.startLock,
})) {
this.restartController.markDeferredRestart('restart', {
state: this.status.state,
startLock: this.startLock,
});
return;
}
if (this.restartInFlight) {
logger.debug('Gateway restart already in progress, joining existing request');
await this.restartInFlight;
return;
}
const decision = this.restartGovernor.decide();
if (!decision.allow) {
const observability = this.restartGovernor.getObservability();
logger.warn(
`[gateway-restart-governor] restart suppressed reason=${decision.reason} retryAfterMs=${decision.retryAfterMs} ` +
`suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`,
);
const props = {
reason: decision.reason,
retry_after_ms: decision.retryAfterMs,
gateway_restart_suppressed_total: observability.suppressed_total,
gateway_restart_executed_total: observability.executed_total,
gateway_restart_circuit_open_until: observability.circuit_open_until,
};
trackMetric('gateway.restart.suppressed', props);
captureTelemetryEvent('gateway_restart_suppressed', props);
return;
}
const pidBefore = this.status.pid;
logger.info(`[gateway-refresh] mode=restart requested pidBefore=${pidBefore ?? 'n/a'}`);
this.restartInFlight = (async () => {
await this.stop();
try {
await this.start();
} catch (err) {
// stop() set shouldReconnect=false. Restore it so the gateway
// can self-heal via scheduleReconnect() instead of dying permanently.
logger.warn('Gateway restart: start() failed after stop(), enabling auto-reconnect recovery', err);
this.shouldReconnect = true;
this.scheduleReconnect();
throw err;
}
})();
try {
await this.restartInFlight;
this.restartGovernor.recordExecuted();
this.restartController.recordRestartCompleted();
const observability = this.restartGovernor.getObservability();
const props = {
gateway_restart_executed_total: observability.executed_total,
gateway_restart_suppressed_total: observability.suppressed_total,
gateway_restart_circuit_open_until: observability.circuit_open_until,
};
trackMetric('gateway.restart.executed', props);
captureTelemetryEvent('gateway_restart_executed', props);
logger.info(
`[gateway-refresh] mode=restart result=applied pidBefore=${pidBefore ?? 'n/a'} pidAfter=${this.status.pid ?? 'n/a'} ` +
`suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`,
);
} finally {
this.restartInFlight = null;
this.restartController.flushDeferredRestart(
'restart:finally',
{
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
},
);
}
}
/**
* Debounced restart — coalesces multiple rapid restart requests into a
* single restart after `delayMs` of inactivity. This prevents the
* cascading stop/start cycles that occur when provider:save,
* provider:setDefault and channel:saveConfig all fire within seconds
* of each other during setup.
*/
debouncedRestart(delayMs = 2000): void {
this.restartController.debouncedRestart(delayMs, () => {
void this.restart().catch((err) => {
logger.warn('Debounced Gateway restart failed:', err);
});
});
}
/**
* Ask the Gateway process to reload config in-place when possible.
* Falls back to restart on unsupported platforms or signaling failures.
*/
async reload(): Promise<void> {
await this.refreshReloadPolicy();
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
logger.info(
`[gateway-refresh] mode=reload result=policy_forced_restart policy=${this.reloadPolicy.mode}`,
);
await this.restart();
return;
}
if (this.restartController.isRestartDeferred({
state: this.status.state,
startLock: this.startLock,
})) {
this.restartController.markDeferredRestart('reload', {
state: this.status.state,
startLock: this.startLock,
});
return;
}
const pidBefore = this.process?.pid;
logger.info(`[gateway-refresh] mode=reload requested pid=${pidBefore ?? 'n/a'} state=${this.status.state}`);
if (!this.process?.pid || this.status.state !== 'running') {
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=not_running');
logger.warn('Gateway reload requested while not running; falling back to restart');
await this.restart();
return;
}
const connectedForMs = this.status.connectedAt
? Date.now() - this.status.connectedAt
: Number.POSITIVE_INFINITY;
// Avoid signaling a process that just came up; it will already read latest config.
if (connectedForMs < 8000) {
logger.info(
`[gateway-refresh] mode=reload result=skipped_recent_connect connectedForMs=${connectedForMs} pid=${this.process.pid}`,
);
logger.info(`Gateway connected ${connectedForMs}ms ago, skipping reload signal`);
return;
}
if (process.platform === 'win32') {
// Windows does not support SIGUSR1 for in-process reload.
// Fall back to a full restart. The connectedForMs < 8000 guard above
// already skips unnecessary restarts for recently-started processes.
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=windows');
await this.restart();
return;
}
try {
process.kill(this.process.pid, 'SIGUSR1');
logger.info(`Sent SIGUSR1 to Gateway for config reload (pid=${this.process.pid})`);
// Some gateway builds do not handle SIGUSR1 as an in-process reload.
// If process state doesn't recover quickly, fall back to restart.
await new Promise((resolve) => setTimeout(resolve, 1500));
if (this.status.state !== 'running' || !this.process?.pid) {
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=post_signal_unhealthy');
logger.warn('Gateway did not stay running after reload signal, falling back to restart');
await this.restart();
} else {
const pidAfter = this.process.pid;
logger.info(
`[gateway-refresh] mode=reload result=applied_in_place pidBefore=${pidBefore} pidAfter=${pidAfter}`,
);
}
} catch (error) {
logger.warn('[gateway-refresh] mode=reload result=fallback_restart cause=signal_error');
logger.warn('Gateway reload signal failed, falling back to restart:', error);
await this.restart();
}
}
/**
* Debounced reload — coalesces multiple rapid config-change events into one
* in-process reload when possible.
*/
debouncedReload(delayMs?: number): void {
void this.refreshReloadPolicy();
const effectiveDelay = delayMs ?? this.reloadPolicy.debounceMs;
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
logger.debug(
`Gateway reload policy=${this.reloadPolicy.mode}; routing debouncedReload to debouncedRestart (${effectiveDelay}ms)`,
);
this.debouncedRestart(effectiveDelay);
return;
}
if (this.reloadDebounceTimer) {
clearTimeout(this.reloadDebounceTimer);
}
logger.debug(`Gateway reload debounced (will fire in ${effectiveDelay}ms)`);
this.reloadDebounceTimer = setTimeout(() => {
this.reloadDebounceTimer = null;
void this.reload().catch((err) => {
logger.warn('Debounced Gateway reload failed:', err);
});
}, effectiveDelay);
}
private async refreshReloadPolicy(force = false): Promise<void> {
const now = Date.now();
if (!force && now - this.reloadPolicyLoadedAt < GatewayManager.RELOAD_POLICY_REFRESH_MS) {
return;
}
if (this.reloadPolicyRefreshPromise) {
await this.reloadPolicyRefreshPromise;
return;
}
this.reloadPolicyRefreshPromise = (async () => {
const nextPolicy = await loadGatewayReloadPolicy();
this.reloadPolicy = nextPolicy;
this.reloadPolicyLoadedAt = Date.now();
})();
try {
await this.reloadPolicyRefreshPromise;
} finally {
this.reloadPolicyRefreshPromise = null;
}
}
/**
* Clear all active timers
*/
private clearAllTimers(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.connectionMonitor.clear();
this.restartController.clearDebounceTimer();
if (this.reloadDebounceTimer) {
clearTimeout(this.reloadDebounceTimer);
this.reloadDebounceTimer = null;
}
this.clearGatewayReadyFallback();
}
private clearGatewayReadyFallback(): void {
if (this.gatewayReadyFallbackTimer) {
clearTimeout(this.gatewayReadyFallbackTimer);
this.gatewayReadyFallbackTimer = null;
}
}
private scheduleGatewayReadyFallback(): void {
this.clearGatewayReadyFallback();
this.gatewayReadyFallbackTimer = setTimeout(() => {
this.gatewayReadyFallbackTimer = null;
if (this.status.state === 'running' && !this.status.gatewayReady) {
logger.info('Gateway ready fallback triggered (no gateway.ready event within timeout)');
this.setStatus({ gatewayReady: true });
}
}, GatewayManager.GATEWAY_READY_FALLBACK_MS);
}
/**
* Make an RPC call to the Gateway
* Uses OpenClaw protocol format: { type: "req", id: "...", method: "...", params: {...} }
*/
async rpc<T>(method: string, params?: unknown, timeoutMs = 30000): Promise<T> {
return await new Promise<T>((resolve, reject) => {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
reject(new Error('Gateway not connected'));
return;
}
const id = crypto.randomUUID();
// Set timeout for request
const timeout = setTimeout(() => {
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`RPC timeout: ${method}`));
}, timeoutMs);
// Store pending request
this.pendingRequests.set(id, {
resolve: resolve as (value: unknown) => void,
reject,
timeout,
});
// Send request using OpenClaw protocol format
const request = {
type: 'req',
id,
method,
params,
};
try {
this.ws.send(JSON.stringify(request));
} catch (error) {
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`Failed to send RPC request: ${error}`));
}
}).then((result) => {
this.recordRpcSuccess();
return result;
}).catch((error) => {
if (isTransportRpcFailure(error)) {
this.recordRpcFailure(method);
}
throw error;
});
}
/**
* Start health check monitoring
*/
private startHealthCheck(): void {
this.connectionMonitor.startHealthCheck({
shouldCheck: () => this.status.state === 'running',
checkHealth: () => this.checkHealth(),
onUnhealthy: (errorMessage) => {
this.emit('error', new Error(errorMessage));
},
onError: () => {
// The monitor already logged the error; nothing else to do here.
},
});
}
/**
* Check Gateway health via WebSocket ping
* OpenClaw Gateway doesn't have an HTTP /health endpoint
*/
async checkHealth(): Promise<{ ok: boolean; error?: string; uptime?: number }> {
try {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const uptime = this.status.connectedAt
? Math.floor((Date.now() - this.status.connectedAt) / 1000)
: undefined;
return { ok: true, uptime };
}
return { ok: false, error: 'WebSocket not connected' };
} catch (error) {
return { ok: false, error: String(error) };
}
}
private recordGatewayAlive(): void {
this.diagnostics.lastAliveAt = Date.now();
this.diagnostics.consecutiveHeartbeatMisses = 0;
}
private recordRpcSuccess(): void {
this.diagnostics.lastRpcSuccessAt = Date.now();
this.diagnostics.consecutiveRpcFailures = 0;
}
private recordRpcFailure(method: string): void {
this.diagnostics.lastRpcFailureAt = Date.now();
this.diagnostics.lastRpcFailureMethod = method;
this.diagnostics.consecutiveRpcFailures += 1;
}
private recordHeartbeatTimeout(consecutiveMisses: number): void {
this.diagnostics.lastHeartbeatTimeoutAt = Date.now();
this.diagnostics.consecutiveHeartbeatMisses = consecutiveMisses;
}
private recordSocketClose(code: number): void {
this.diagnostics.lastSocketCloseAt = Date.now();
this.diagnostics.lastSocketCloseCode = code;
}
/**
* Start Gateway process
* Uses OpenClaw npm package from node_modules (dev) or resources (production)
*/
private async startProcess(): Promise<void> {
const launchContext = await prepareGatewayLaunchContext(this.status.port);
await unloadLaunchctlGatewayService();
this.processExitCode = null;
// Per-process dedup map for stderr lines — resets on each new spawn.
const stderrDedup = new Map<string, number>();
const { child, lastSpawnSummary } = await launchGatewayProcess({
port: this.status.port,
launchContext,
sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args),
getCurrentState: () => this.status.state,
getShouldReconnect: () => this.shouldReconnect,
onStderrLine: (line) => {
recordGatewayStartupStderrLine(this.recentStartupStderrLines, line);
const classified = classifyGatewayStderrMessage(line);
if (classified.level === 'drop') return;
// Dedup: suppress identical stderr lines after the first occurrence.
const count = (stderrDedup.get(classified.normalized) ?? 0) + 1;
stderrDedup.set(classified.normalized, count);
if (count > 1) {
// Log a summary every 50 duplicates to stay visible without flooding.
if (count % 50 === 0) {
logger.debug(`[Gateway stderr] (suppressed ${count} repeats) ${classified.normalized}`);
}
return;
}
if (classified.level === 'debug') {
logger.debug(`[Gateway stderr] ${classified.normalized}`);
return;
}
logger.warn(`[Gateway stderr] ${classified.normalized}`);
},
onSpawn: (pid) => {
this.setStatus({ pid });
},
onExit: (exitedChild, code) => {
this.processExitCode = code;
this.ownsProcess = false;
this.connectionMonitor.clear();
if (this.process === exitedChild) {
this.process = null;
}
this.emit('exit', code);
if (this.status.state === 'running') {
this.setStatus({ state: 'stopped' });
}
// Always attempt reconnect from process exit. scheduleReconnect()
// internally checks shouldReconnect and reconnect-timer guards, so
// calling it unconditionally is safe — intentional stop() calls set
// shouldReconnect=false which makes scheduleReconnect() no-op.
//
// On Windows, the WS close handler intentionally skips reconnect
// (to avoid racing with this exit handler). However, WS close
// fires *before* process exit and sets state='stopped', which
// previously caused this handler to also skip reconnect — leaving
// the gateway permanently dead with no recovery path.
this.scheduleReconnect();
},
onError: () => {
this.ownsProcess = false;
if (this.process === child) {
this.process = null;
}
},
});
this.process = child;
this.ownsProcess = true;
logger.debug(`Gateway manager now owns process pid=${child.pid ?? 'unknown'}`);
this.lastSpawnSummary = lastSpawnSummary;
}
/**
* Connect WebSocket to Gateway
*/
private async connect(port: number, _externalToken?: string): Promise<void> {
this.ws = await connectGatewaySocket({
port,
deviceIdentity: this.deviceIdentity,
platform: process.platform,
pendingRequests: this.pendingRequests,
getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')),
onHandshakeComplete: (ws) => {
this.ws = ws;
ws.on('pong', () => {
this.connectionMonitor.markAlive('pong');
this.recordGatewayAlive();
});
this.recordGatewayAlive();
this.setStatus({
state: 'running',
port,
connectedAt: Date.now(),
});
this.startPing();
this.scheduleGatewayReadyFallback();
},
onMessage: (message) => {
this.handleMessage(message);
},
onCloseAfterHandshake: (closeCode) => {
this.connectionMonitor.clear();
this.recordSocketClose(closeCode);
this.diagnostics.consecutiveHeartbeatMisses = 0;
if (this.status.state === 'running') {
this.setStatus({ state: 'stopped' });
// On Windows, skip reconnect from WS close. The Gateway is a local
// child process; actual crashes are already caught by the process exit
// handler (`onExit`) which calls scheduleReconnect(). Triggering
// reconnect from WS close as well races with the exit handler and can
// cause double start() attempts or port conflicts during TCP TIME_WAIT.
//
// Exception: code=1012 means the Gateway is performing an in-process
// restart (e.g. config reload). The UtilityProcess stays alive, so
// `onExit` will never fire — we MUST reconnect from the WS close path.
if (process.platform !== 'win32' || closeCode === 1012) {
this.scheduleReconnect();
}
}
},
});
}
/**
* Handle incoming WebSocket message
*/
private handleMessage(message: unknown): void {
this.connectionMonitor.markAlive('message');
this.recordGatewayAlive();
if (typeof message !== 'object' || message === null) {
logger.debug('Received non-object Gateway message');
return;
}
const msg = message as Record<string, unknown>;
// Handle OpenClaw protocol response format: { type: "res", id: "...", ok: true/false, ... }
if (msg.type === 'res' && typeof msg.id === 'string') {
if (msg.ok === false || msg.error) {
const errorObj = msg.error as { message?: string; code?: number } | undefined;
const errorMsg = errorObj?.message || JSON.stringify(msg.error) || 'Unknown error';
if (rejectPendingGatewayRequest(this.pendingRequests, msg.id, new Error(errorMsg))) {
return;
}
} else if (resolvePendingGatewayRequest(this.pendingRequests, msg.id, msg.payload ?? msg)) {
return;
}
}
// Handle OpenClaw protocol event format: { type: "event", event: "...", payload: {...} }
if (msg.type === 'event' && typeof msg.event === 'string') {
dispatchProtocolEvent(this, msg.event, msg.payload);
return;
}
// Fallback: Check if this is a JSON-RPC 2.0 response (legacy support)
if (isResponse(message) && message.id && this.pendingRequests.has(String(message.id))) {
if (message.error) {
const errorMsg = typeof message.error === 'object'
? (message.error as { message?: string }).message || JSON.stringify(message.error)
: String(message.error);
rejectPendingGatewayRequest(this.pendingRequests, String(message.id), new Error(errorMsg));
} else {
resolvePendingGatewayRequest(this.pendingRequests, String(message.id), message.result);
}
return;
}
// Check if this is a JSON-RPC notification (server-initiated event)
if (isNotification(message)) {
dispatchJsonRpcNotification(this, message);
return;
}
this.emit('message', message);
}
/**
* Start ping interval to keep connection alive
*/
private startPing(): void {
const isWindows = process.platform === 'win32';
this.connectionMonitor.startPing({
intervalMs: isWindows
? GatewayManager.HEARTBEAT_INTERVAL_MS_WIN
: GatewayManager.HEARTBEAT_INTERVAL_MS,
timeoutMs: isWindows
? GatewayManager.HEARTBEAT_TIMEOUT_MS_WIN
: GatewayManager.HEARTBEAT_TIMEOUT_MS,
maxConsecutiveMisses: isWindows
? GatewayManager.HEARTBEAT_MAX_MISSES_WIN
: GatewayManager.HEARTBEAT_MAX_MISSES,
sendPing: () => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.ping();
}
},
onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => {
this.recordHeartbeatTimeout(consecutiveMisses);
const pid = this.process?.pid ?? 'unknown';
const isWindows = process.platform === 'win32';
const shouldAttemptRecovery = !isWindows && this.shouldReconnect && this.status.state === 'running';
logger.warn(
`Gateway heartbeat: ${consecutiveMisses} consecutive pong misses ` +
`(timeout=${timeoutMs}ms, pid=${pid}, state=${this.status.state}, autoReconnect=${this.shouldReconnect}).`,
);
if (!shouldAttemptRecovery) {
const reason = isWindows
? 'platform=win32'
: 'lifecycle is not in auto-recoverable running state';
logger.warn(`Gateway heartbeat recovery skipped (${reason})`);
return;
}
logger.warn('Gateway heartbeat recovery: restarting unresponsive gateway process');
void this.restart().catch((error) => {
logger.warn('Gateway heartbeat recovery failed:', error);
});
},
});
}
/**
* Schedule reconnection attempt with exponential backoff
*/
private scheduleReconnect(): void {
const decision = getReconnectScheduleDecision({
shouldReconnect: this.shouldReconnect,
hasReconnectTimer: this.reconnectTimer !== null,
reconnectAttempts: this.reconnectAttempts,
maxAttempts: this.reconnectConfig.maxAttempts,
baseDelay: this.reconnectConfig.baseDelay,
maxDelay: this.reconnectConfig.maxDelay,
});
if (decision.action === 'skip') {
logger.debug(`Gateway reconnect skipped (${decision.reason})`);
return;
}
if (decision.action === 'already-scheduled') {
return;
}
if (decision.action === 'fail') {
logger.error(`Gateway reconnect failed: max attempts reached (${decision.maxAttempts})`);
this.setStatus({
state: 'error',
error: 'Failed to reconnect after maximum attempts',
reconnectAttempts: this.reconnectAttempts
});
return;
}
const cooldownRemaining = Math.max(0, GatewayManager.RESTART_COOLDOWN_MS - (Date.now() - this.lastRestartAt));
const { delay, nextAttempt, maxAttempts } = decision;
const effectiveDelay = Math.max(delay, cooldownRemaining);
this.reconnectAttempts = nextAttempt;
logger.warn(`Scheduling Gateway reconnect attempt ${nextAttempt}/${maxAttempts} in ${effectiveDelay}ms`);
this.setStatus({
state: 'reconnecting',
reconnectAttempts: this.reconnectAttempts
});
const scheduledEpoch = this.lifecycleController.getCurrentEpoch();
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
const skipReason = getReconnectSkipReason({
scheduledEpoch,
currentEpoch: this.lifecycleController.getCurrentEpoch(),
shouldReconnect: this.shouldReconnect,
});
if (skipReason) {
logger.debug(`Skipping reconnect attempt: ${skipReason}`);
return;
}
const attemptNo = this.reconnectAttempts;
this.reconnectAttemptsTotal += 1;
try {
// Use the guarded start() flow so reconnect attempts cannot bypass
// lifecycle locking and accidentally start duplicate Gateway processes.
this.isAutoReconnectStart = true;
await this.start();
this.reconnectSuccessTotal += 1;
this.emitReconnectMetric('success', {
attemptNo,
maxAttempts,
delayMs: effectiveDelay,
});
this.reconnectAttempts = 0;
} catch (error) {
logger.error('Gateway reconnection attempt failed:', error);
this.emitReconnectMetric('failure', {
attemptNo,
maxAttempts,
delayMs: effectiveDelay,
error: error instanceof Error ? error.message : String(error),
});
this.scheduleReconnect();
}
}, effectiveDelay);
}
private emitReconnectMetric(
outcome: 'success' | 'failure',
payload: {
attemptNo: number;
maxAttempts: number;
delayMs: number;
error?: string;
},
): void {
const successRate = this.reconnectAttemptsTotal > 0
? this.reconnectSuccessTotal / this.reconnectAttemptsTotal
: 0;
const properties = {
outcome,
attemptNo: payload.attemptNo,
maxAttempts: payload.maxAttempts,
delayMs: payload.delayMs,
gateway_reconnect_success_count: this.reconnectSuccessTotal,
gateway_reconnect_attempt_count: this.reconnectAttemptsTotal,
gateway_reconnect_success_rate: Number(successRate.toFixed(4)),
...(payload.error ? { error: payload.error } : {}),
};
trackMetric('gateway.reconnect', properties);
// Keep local metrics only; do not upload reconnect details to PostHog.
}
/**
* Update status and emit event
*/
private setStatus(update: Partial<GatewayStatus>): void {
this.stateController.setStatus(update);
}
}