feat
This commit is contained in:
31
electron/gateway/lifecycle-controller.ts
Normal file
31
electron/gateway/lifecycle-controller.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { logger } from '../utils/logger';
|
||||
import { isLifecycleSuperseded, nextLifecycleEpoch } from './process-policy';
|
||||
|
||||
export class LifecycleSupersededError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = 'LifecycleSupersededError';
|
||||
}
|
||||
}
|
||||
|
||||
export class GatewayLifecycleController {
|
||||
private epoch = 0;
|
||||
|
||||
getCurrentEpoch(): number {
|
||||
return this.epoch;
|
||||
}
|
||||
|
||||
bump(reason: string): number {
|
||||
this.epoch = nextLifecycleEpoch(this.epoch);
|
||||
logger.debug(`Gateway lifecycle epoch advanced to ${this.epoch} (${reason})`);
|
||||
return this.epoch;
|
||||
}
|
||||
|
||||
assert(expectedEpoch: number, phase: string): void {
|
||||
if (isLifecycleSuperseded(expectedEpoch, this.epoch)) {
|
||||
throw new LifecycleSupersededError(
|
||||
`Gateway ${phase} superseded (expectedEpoch=${expectedEpoch}, currentEpoch=${this.epoch})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,30 +2,23 @@
|
||||
* Gateway Process Manager
|
||||
* Manages the OpenClaw Gateway process lifecycle
|
||||
*/
|
||||
import { app, utilityProcess } from 'electron';
|
||||
import { app } from 'electron';
|
||||
import path from 'path';
|
||||
import { EventEmitter } from 'events';
|
||||
import { existsSync, writeFileSync } from 'fs';
|
||||
import WebSocket from 'ws';
|
||||
import { PORTS } from '../utils/config';
|
||||
import {
|
||||
appendNodeRequireToNodeOptions,
|
||||
} from '../utils/paths';
|
||||
import { JsonRpcNotification, isNotification, isResponse } from './protocol';
|
||||
import { logger } from '../utils/logger';
|
||||
import {
|
||||
loadOrCreateDeviceIdentity,
|
||||
type DeviceIdentity,
|
||||
} from '../utils/device-identity';
|
||||
import { shouldAttemptConfigAutoRepair } from './startup-recovery';
|
||||
import {
|
||||
DEFAULT_RECONNECT_CONFIG,
|
||||
type ReconnectConfig,
|
||||
type GatewayLifecycleState,
|
||||
getReconnectScheduleDecision,
|
||||
getReconnectSkipReason,
|
||||
isLifecycleSuperseded,
|
||||
nextLifecycleEpoch,
|
||||
} from './process-policy';
|
||||
import {
|
||||
clearPendingGatewayRequests,
|
||||
@@ -36,10 +29,9 @@ import {
|
||||
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
|
||||
import { GatewayStateController } from './state';
|
||||
import { prepareGatewayLaunchContext } from './config-sync';
|
||||
import { connectGatewaySocket, probeGatewayReady } from './ws-client';
|
||||
import { connectGatewaySocket, waitForGatewayReady } from './ws-client';
|
||||
import {
|
||||
findExistingGatewayProcess,
|
||||
isTransientGatewayStartError,
|
||||
runOpenClawDoctorRepair,
|
||||
terminateOwnedGatewayProcess,
|
||||
unloadLaunchctlGatewayService,
|
||||
@@ -47,12 +39,12 @@ import {
|
||||
warmupManagedPythonReadiness,
|
||||
} from './supervisor';
|
||||
import { GatewayConnectionMonitor } from './connection-monitor';
|
||||
import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller';
|
||||
import { launchGatewayProcess } from './process-launcher';
|
||||
import { GatewayRestartController } from './restart-controller';
|
||||
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
|
||||
import { runGatewayStartupSequence } from './startup-orchestrator';
|
||||
|
||||
/**
|
||||
* Gateway connection status
|
||||
*/
|
||||
export interface GatewayStatus {
|
||||
state: GatewayLifecycleState;
|
||||
port: number;
|
||||
@@ -77,117 +69,6 @@ export interface GatewayManagerEvents {
|
||||
'chat:message': (data: { message: unknown }) => void;
|
||||
}
|
||||
|
||||
// getNodeExecutablePath() removed: utilityProcess.fork() handles process isolation
|
||||
// natively on all platforms (no dock icon on macOS, no console on Windows).
|
||||
|
||||
/**
|
||||
* Ensure the gateway fetch-preload script exists in userData and return
|
||||
* its absolute path. The script patches globalThis.fetch to inject
|
||||
* ClawX app-attribution headers (HTTP-Referer, X-Title) for OpenRouter
|
||||
* API requests, overriding the OpenClaw runner's hardcoded defaults.
|
||||
*
|
||||
* Inlined here so it works in dev, packaged, and asar modes without
|
||||
* extra build config. Loaded by the Gateway child process via
|
||||
* NODE_OPTIONS --require.
|
||||
*/
|
||||
const GATEWAY_FETCH_PRELOAD_SOURCE = `'use strict';
|
||||
(function () {
|
||||
var _f = globalThis.fetch;
|
||||
if (typeof _f !== 'function') return;
|
||||
if (globalThis.__clawxFetchPatched) return;
|
||||
globalThis.__clawxFetchPatched = true;
|
||||
|
||||
globalThis.fetch = function clawxFetch(input, init) {
|
||||
var url =
|
||||
typeof input === 'string' ? input
|
||||
: input && typeof input === 'object' && typeof input.url === 'string'
|
||||
? input.url : '';
|
||||
|
||||
if (url.indexOf('openrouter.ai') !== -1) {
|
||||
init = init ? Object.assign({}, init) : {};
|
||||
var prev = init.headers;
|
||||
var flat = {};
|
||||
if (prev && typeof prev.forEach === 'function') {
|
||||
prev.forEach(function (v, k) { flat[k] = v; });
|
||||
} else if (prev && typeof prev === 'object') {
|
||||
Object.assign(flat, prev);
|
||||
}
|
||||
delete flat['http-referer'];
|
||||
delete flat['HTTP-Referer'];
|
||||
delete flat['x-title'];
|
||||
delete flat['X-Title'];
|
||||
flat['HTTP-Referer'] = 'https://claw-x.com';
|
||||
flat['X-Title'] = 'ClawX';
|
||||
init.headers = flat;
|
||||
}
|
||||
return _f.call(globalThis, input, init);
|
||||
};
|
||||
|
||||
// Global monkey-patch for child_process to enforce windowsHide: true on Windows.
|
||||
// This prevents OpenClaw's tools (e.g. Terminal, Python) from flashing black
|
||||
// command boxes during AI conversations, without triggering AVs.
|
||||
//
|
||||
// Node child_process signatures vary:
|
||||
// spawn(cmd[, args][, options])
|
||||
// exec(cmd[, options][, callback])
|
||||
// execFile(file[, args][, options][, callback])
|
||||
// *Sync variants omit the callback
|
||||
//
|
||||
// Strategy: scan arguments for the first plain-object (the options param).
|
||||
// If found, set windowsHide on it. If absent, insert a new options object
|
||||
// before any trailing callback so the signature stays valid.
|
||||
if (process.platform === 'win32') {
|
||||
try {
|
||||
var cp = require('child_process');
|
||||
if (!cp.__clawxPatched) {
|
||||
cp.__clawxPatched = true;
|
||||
['spawn', 'exec', 'execFile', 'fork', 'spawnSync', 'execSync', 'execFileSync'].forEach(function(method) {
|
||||
var original = cp[method];
|
||||
if (typeof original !== 'function') return;
|
||||
cp[method] = function() {
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
var optIdx = -1;
|
||||
for (var i = 1; i < args.length; i++) {
|
||||
var a = args[i];
|
||||
if (a && typeof a === 'object' && !Array.isArray(a)) {
|
||||
optIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (optIdx >= 0) {
|
||||
args[optIdx].windowsHide = true;
|
||||
} else {
|
||||
var opts = { windowsHide: true };
|
||||
if (typeof args[args.length - 1] === 'function') {
|
||||
args.splice(args.length - 1, 0, opts);
|
||||
} else {
|
||||
args.push(opts);
|
||||
}
|
||||
}
|
||||
return original.apply(this, args);
|
||||
};
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
})();
|
||||
`;
|
||||
|
||||
function ensureGatewayFetchPreload(): string {
|
||||
const dest = path.join(app.getPath('userData'), 'gateway-fetch-preload.cjs');
|
||||
try { writeFileSync(dest, GATEWAY_FETCH_PRELOAD_SOURCE, 'utf-8'); } catch { /* best-effort */ }
|
||||
return dest;
|
||||
}
|
||||
|
||||
class LifecycleSupersededError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = 'LifecycleSupersededError';
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gateway Manager
|
||||
* Handles starting, stopping, and communicating with the OpenClaw Gateway
|
||||
@@ -208,9 +89,9 @@ export class GatewayManager extends EventEmitter {
|
||||
private recentStartupStderrLines: string[] = [];
|
||||
private pendingRequests: Map<string, PendingGatewayRequest> = new Map();
|
||||
private deviceIdentity: DeviceIdentity | null = null;
|
||||
private lifecycleEpoch = 0;
|
||||
private restartInFlight: Promise<void> | null = null;
|
||||
private readonly connectionMonitor = new GatewayConnectionMonitor();
|
||||
private readonly lifecycleController = new GatewayLifecycleController();
|
||||
private readonly restartController = new GatewayRestartController();
|
||||
|
||||
constructor(config?: Partial<ReconnectConfig>) {
|
||||
@@ -261,20 +142,6 @@ export class GatewayManager extends EventEmitter {
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
private bumpLifecycleEpoch(reason: string): number {
|
||||
this.lifecycleEpoch = nextLifecycleEpoch(this.lifecycleEpoch);
|
||||
logger.debug(`Gateway lifecycle epoch advanced to ${this.lifecycleEpoch} (${reason})`);
|
||||
return this.lifecycleEpoch;
|
||||
}
|
||||
|
||||
private assertLifecycleEpoch(expectedEpoch: number, phase: string): void {
|
||||
if (isLifecycleSuperseded(expectedEpoch, this.lifecycleEpoch)) {
|
||||
throw new LifecycleSupersededError(
|
||||
`Gateway ${phase} superseded (expectedEpoch=${expectedEpoch}, currentEpoch=${this.lifecycleEpoch})`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current Gateway status
|
||||
*/
|
||||
@@ -304,7 +171,7 @@ export class GatewayManager extends EventEmitter {
|
||||
}
|
||||
|
||||
this.startLock = true;
|
||||
const startEpoch = this.bumpLifecycleEpoch('start');
|
||||
const startEpoch = this.lifecycleController.bump('start');
|
||||
logger.info(`Gateway start requested (port=${this.status.port})`);
|
||||
this.lastSpawnSummary = null;
|
||||
this.shouldReconnect = true;
|
||||
@@ -322,96 +189,58 @@ export class GatewayManager extends EventEmitter {
|
||||
|
||||
this.reconnectAttempts = 0;
|
||||
this.setStatus({ state: 'starting', reconnectAttempts: 0 });
|
||||
let configRepairAttempted = false;
|
||||
|
||||
// Check if Python environment is ready (self-healing) asynchronously.
|
||||
// Fire-and-forget: only needs to run once, not on every retry.
|
||||
warmupManagedPythonReadiness();
|
||||
|
||||
try {
|
||||
let startAttempts = 0;
|
||||
const MAX_START_ATTEMPTS = 3;
|
||||
|
||||
while (true) {
|
||||
startAttempts++;
|
||||
this.assertLifecycleEpoch(startEpoch, 'start');
|
||||
this.recentStartupStderrLines = [];
|
||||
try {
|
||||
// Check if Gateway is already running
|
||||
logger.debug('Checking for existing Gateway...');
|
||||
const existing = await findExistingGatewayProcess({
|
||||
port: this.status.port,
|
||||
ownedPid: this.process?.pid,
|
||||
});
|
||||
this.assertLifecycleEpoch(startEpoch, 'start/find-existing');
|
||||
if (existing) {
|
||||
logger.debug(`Found existing Gateway on port ${existing.port}`);
|
||||
await this.connect(existing.port, existing.externalToken);
|
||||
this.assertLifecycleEpoch(startEpoch, 'start/connect-existing');
|
||||
this.ownsProcess = false;
|
||||
this.setStatus({ pid: undefined });
|
||||
this.startHealthCheck();
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug('No existing Gateway found, starting new process...');
|
||||
|
||||
// On Windows, TCP TIME_WAIT can hold the port for up to 2 minutes
|
||||
// after the previous Gateway process exits, preventing the new one
|
||||
// from binding. Wait for the port to be free before proceeding.
|
||||
if (process.platform === 'win32') {
|
||||
await waitForPortFree(this.status.port);
|
||||
this.assertLifecycleEpoch(startEpoch, 'start/wait-port');
|
||||
}
|
||||
|
||||
// Start new Gateway process
|
||||
await runGatewayStartupSequence({
|
||||
port: this.status.port,
|
||||
ownedPid: this.process?.pid,
|
||||
shouldWaitForPortFree: process.platform === 'win32',
|
||||
resetStartupStderrLines: () => {
|
||||
this.recentStartupStderrLines = [];
|
||||
},
|
||||
getStartupStderrLines: () => this.recentStartupStderrLines,
|
||||
assertLifecycle: (phase) => {
|
||||
this.lifecycleController.assert(startEpoch, phase);
|
||||
},
|
||||
findExistingGateway: async (port, ownedPid) => {
|
||||
return await findExistingGatewayProcess({ port, ownedPid });
|
||||
},
|
||||
connect: async (port, externalToken) => {
|
||||
await this.connect(port, externalToken);
|
||||
},
|
||||
onConnectedToExistingGateway: () => {
|
||||
this.ownsProcess = false;
|
||||
this.setStatus({ pid: undefined });
|
||||
this.startHealthCheck();
|
||||
},
|
||||
waitForPortFree: async (port) => {
|
||||
await waitForPortFree(port);
|
||||
},
|
||||
startProcess: async () => {
|
||||
await this.startProcess();
|
||||
this.assertLifecycleEpoch(startEpoch, 'start/start-process');
|
||||
|
||||
// Wait for Gateway to be ready
|
||||
await this.waitForReady();
|
||||
this.assertLifecycleEpoch(startEpoch, 'start/wait-ready');
|
||||
|
||||
// Connect WebSocket
|
||||
await this.connect(this.status.port);
|
||||
this.assertLifecycleEpoch(startEpoch, 'start/connect');
|
||||
|
||||
// Start health monitoring
|
||||
},
|
||||
waitForReady: async (port) => {
|
||||
await waitForGatewayReady({
|
||||
port,
|
||||
getProcessExitCode: () => this.processExitCode,
|
||||
});
|
||||
},
|
||||
onConnectedToManagedGateway: () => {
|
||||
this.startHealthCheck();
|
||||
logger.debug('Gateway started successfully');
|
||||
return;
|
||||
} catch (error) {
|
||||
if (error instanceof LifecycleSupersededError) {
|
||||
throw error;
|
||||
}
|
||||
if (shouldAttemptConfigAutoRepair(error, this.recentStartupStderrLines, configRepairAttempted)) {
|
||||
configRepairAttempted = true;
|
||||
logger.warn(
|
||||
'Detected invalid OpenClaw config during Gateway startup; running doctor repair before retry'
|
||||
);
|
||||
const repaired = await runOpenClawDoctorRepair();
|
||||
if (repaired) {
|
||||
logger.info('OpenClaw doctor repair completed; retrying Gateway startup');
|
||||
this.setStatus({ state: 'starting', error: undefined, reconnectAttempts: 0 });
|
||||
continue;
|
||||
}
|
||||
logger.error('OpenClaw doctor repair failed; not retrying Gateway startup');
|
||||
}
|
||||
|
||||
// Retry on transient connect errors
|
||||
const errMsg = String(error);
|
||||
const isTransientError = isTransientGatewayStartError(error);
|
||||
|
||||
if (startAttempts < MAX_START_ATTEMPTS && isTransientError) {
|
||||
logger.warn(`Transient start error: ${errMsg}. Retrying... (${startAttempts}/${MAX_START_ATTEMPTS})`);
|
||||
await new Promise((r) => setTimeout(r, 1000));
|
||||
continue;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
runDoctorRepair: async () => await runOpenClawDoctorRepair(),
|
||||
onDoctorRepairSuccess: () => {
|
||||
this.setStatus({ state: 'starting', error: undefined, reconnectAttempts: 0 });
|
||||
},
|
||||
delay: async (ms) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, ms));
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
if (error instanceof LifecycleSupersededError) {
|
||||
logger.debug(error.message);
|
||||
@@ -446,7 +275,7 @@ export class GatewayManager extends EventEmitter {
|
||||
*/
|
||||
async stop(): Promise<void> {
|
||||
logger.info('Gateway stop requested');
|
||||
this.bumpLifecycleEpoch('stop');
|
||||
this.lifecycleController.bump('stop');
|
||||
// Disable auto-reconnect
|
||||
this.shouldReconnect = false;
|
||||
|
||||
@@ -640,74 +469,33 @@ export class GatewayManager extends EventEmitter {
|
||||
* Uses OpenClaw npm package from node_modules (dev) or resources (production)
|
||||
*/
|
||||
private async startProcess(): Promise<void> {
|
||||
// Ensure no system-managed gateway service will compete with our process.
|
||||
await unloadLaunchctlGatewayService();
|
||||
const launchContext = await prepareGatewayLaunchContext(this.status.port);
|
||||
const {
|
||||
openclawDir,
|
||||
entryScript,
|
||||
gatewayArgs,
|
||||
forkEnv,
|
||||
mode,
|
||||
binPathExists,
|
||||
loadedProviderKeyCount,
|
||||
proxySummary,
|
||||
channelStartupSummary,
|
||||
} = launchContext;
|
||||
await unloadLaunchctlGatewayService();
|
||||
this.processExitCode = null;
|
||||
|
||||
logger.info(
|
||||
`Starting Gateway process (mode=${mode}, port=${this.status.port}, entry="${entryScript}", args="${this.sanitizeSpawnArgs(gatewayArgs).join(' ')}", cwd="${openclawDir}", bundledBin=${binPathExists ? 'yes' : 'no'}, providerKeys=${loadedProviderKeyCount}, channels=${channelStartupSummary}, proxy=${proxySummary})`
|
||||
);
|
||||
this.lastSpawnSummary = `mode=${mode}, entry="${entryScript}", args="${this.sanitizeSpawnArgs(gatewayArgs).join(' ')}", cwd="${openclawDir}"`;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
// Reset exit tracking for this new process instance.
|
||||
this.processExitCode = null;
|
||||
const runtimeEnv = { ...forkEnv };
|
||||
|
||||
// Inject fetch preload so OpenRouter requests carry ClawX headers.
|
||||
// The preload patches globalThis.fetch before any module loads.
|
||||
// NODE_OPTIONS --require is blocked by Electron in packaged apps, so skip
|
||||
// this injection when packaged to avoid the "NODE_OPTIONs not supported"
|
||||
// errors being printed to the gateway's stderr on every startup.
|
||||
if (!app.isPackaged) {
|
||||
try {
|
||||
const preloadPath = ensureGatewayFetchPreload();
|
||||
if (existsSync(preloadPath)) {
|
||||
runtimeEnv['NODE_OPTIONS'] = appendNodeRequireToNodeOptions(
|
||||
runtimeEnv['NODE_OPTIONS'],
|
||||
preloadPath,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('Failed to set up OpenRouter headers preload:', err);
|
||||
const { child, lastSpawnSummary } = await launchGatewayProcess({
|
||||
port: this.status.port,
|
||||
launchContext,
|
||||
sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args),
|
||||
getCurrentState: () => this.status.state,
|
||||
getShouldReconnect: () => this.shouldReconnect,
|
||||
onStderrLine: (line) => {
|
||||
recordGatewayStartupStderrLine(this.recentStartupStderrLines, line);
|
||||
const classified = classifyGatewayStderrMessage(line);
|
||||
if (classified.level === 'drop') return;
|
||||
if (classified.level === 'debug') {
|
||||
logger.debug(`[Gateway stderr] ${classified.normalized}`);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// utilityProcess.fork() runs the .mjs entry directly without spawning a
|
||||
// shell or visible console window. Works identically in dev and packaged.
|
||||
this.process = utilityProcess.fork(entryScript, gatewayArgs, {
|
||||
cwd: openclawDir,
|
||||
stdio: 'pipe',
|
||||
env: runtimeEnv as NodeJS.ProcessEnv,
|
||||
serviceName: 'OpenClaw Gateway',
|
||||
});
|
||||
const child = this.process;
|
||||
this.ownsProcess = true;
|
||||
|
||||
child.on('error', (error) => {
|
||||
this.ownsProcess = false;
|
||||
logger.error('Gateway process spawn error:', error);
|
||||
reject(error);
|
||||
});
|
||||
|
||||
child.on('exit', (code: number) => {
|
||||
logger.warn(`[Gateway stderr] ${classified.normalized}`);
|
||||
},
|
||||
onSpawn: (pid) => {
|
||||
this.setStatus({ pid });
|
||||
},
|
||||
onExit: (exitedChild, code) => {
|
||||
this.processExitCode = code;
|
||||
const expectedExit = !this.shouldReconnect || this.status.state === 'stopped';
|
||||
const level = expectedExit ? logger.info : logger.warn;
|
||||
level(`Gateway process exited (code=${code}, expected=${expectedExit ? 'yes' : 'no'})`);
|
||||
this.ownsProcess = false;
|
||||
if (this.process === child) {
|
||||
if (this.process === exitedChild) {
|
||||
this.process = null;
|
||||
}
|
||||
this.emit('exit', code);
|
||||
@@ -716,71 +504,18 @@ export class GatewayManager extends EventEmitter {
|
||||
this.setStatus({ state: 'stopped' });
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
});
|
||||
|
||||
// UtilityProcess doesn't emit 'close'; stdout/stderr end naturally on exit.
|
||||
|
||||
// Log stderr
|
||||
child.stderr?.on('data', (data) => {
|
||||
const raw = data.toString();
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
recordGatewayStartupStderrLine(this.recentStartupStderrLines, line);
|
||||
const classified = classifyGatewayStderrMessage(line);
|
||||
if (classified.level === 'drop') continue;
|
||||
if (classified.level === 'debug') {
|
||||
logger.debug(`[Gateway stderr] ${classified.normalized}`);
|
||||
continue;
|
||||
}
|
||||
logger.warn(`[Gateway stderr] ${classified.normalized}`);
|
||||
},
|
||||
onError: () => {
|
||||
this.ownsProcess = false;
|
||||
if (this.process === child) {
|
||||
this.process = null;
|
||||
}
|
||||
});
|
||||
|
||||
// PID is only available after the child process has fully spawned.
|
||||
// utilityProcess.fork() is asynchronous — child.pid is undefined if read
|
||||
// synchronously right after fork(). Use the 'spawned' event instead.
|
||||
child.on('spawn', () => {
|
||||
logger.info(`Gateway process started (pid=${child.pid})`);
|
||||
this.setStatus({ pid: child.pid });
|
||||
});
|
||||
|
||||
resolve();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for Gateway to be ready by checking if it can issue connect challenges.
|
||||
*/
|
||||
private async waitForReady(retries = 2400, interval = 200): Promise<void> {
|
||||
const child = this.process;
|
||||
for (let i = 0; i < retries; i++) {
|
||||
// Early exit if the gateway process has already exited.
|
||||
// UtilityProcess has no synchronous exitCode/signalCode — use our tracked flag.
|
||||
if (child && this.processExitCode !== null) {
|
||||
const code = this.processExitCode;
|
||||
logger.error(`Gateway process exited before ready (code=${code})`);
|
||||
throw new Error(`Gateway process exited before becoming ready (code=${code})`);
|
||||
}
|
||||
|
||||
try {
|
||||
const ready = await probeGatewayReady(this.status.port, 1500);
|
||||
|
||||
if (ready) {
|
||||
logger.debug(`Gateway ready after ${i + 1} attempt(s)`);
|
||||
return;
|
||||
}
|
||||
} catch {
|
||||
// Gateway not ready yet
|
||||
}
|
||||
|
||||
if (i > 0 && i % 10 === 0) {
|
||||
logger.debug(`Still waiting for Gateway... (attempt ${i + 1}/${retries})`);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, interval));
|
||||
}
|
||||
|
||||
logger.error(`Gateway failed to become ready after ${retries} attempts on port ${this.status.port}`);
|
||||
throw new Error(`Gateway failed to start after ${retries} retries (port ${this.status.port})`);
|
||||
this.process = child;
|
||||
this.ownsProcess = true;
|
||||
this.lastSpawnSummary = lastSpawnSummary;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -917,13 +652,13 @@ export class GatewayManager extends EventEmitter {
|
||||
state: 'reconnecting',
|
||||
reconnectAttempts: this.reconnectAttempts
|
||||
});
|
||||
const scheduledEpoch = this.lifecycleEpoch;
|
||||
const scheduledEpoch = this.lifecycleController.getCurrentEpoch();
|
||||
|
||||
this.reconnectTimer = setTimeout(async () => {
|
||||
this.reconnectTimer = null;
|
||||
const skipReason = getReconnectSkipReason({
|
||||
scheduledEpoch,
|
||||
currentEpoch: this.lifecycleEpoch,
|
||||
currentEpoch: this.lifecycleController.getCurrentEpoch(),
|
||||
shouldReconnect: this.shouldReconnect,
|
||||
});
|
||||
if (skipReason) {
|
||||
|
||||
180
electron/gateway/process-launcher.ts
Normal file
180
electron/gateway/process-launcher.ts
Normal file
@@ -0,0 +1,180 @@
|
||||
import { app, utilityProcess } from 'electron';
|
||||
import { existsSync, writeFileSync } from 'fs';
|
||||
import path from 'path';
|
||||
import type { GatewayLaunchContext } from './config-sync';
|
||||
import type { GatewayLifecycleState } from './process-policy';
|
||||
import { logger } from '../utils/logger';
|
||||
import { appendNodeRequireToNodeOptions } from '../utils/paths';
|
||||
|
||||
const GATEWAY_FETCH_PRELOAD_SOURCE = `'use strict';
|
||||
(function () {
|
||||
var _f = globalThis.fetch;
|
||||
if (typeof _f !== 'function') return;
|
||||
if (globalThis.__clawxFetchPatched) return;
|
||||
globalThis.__clawxFetchPatched = true;
|
||||
|
||||
globalThis.fetch = function clawxFetch(input, init) {
|
||||
var url =
|
||||
typeof input === 'string' ? input
|
||||
: input && typeof input === 'object' && typeof input.url === 'string'
|
||||
? input.url : '';
|
||||
|
||||
if (url.indexOf('openrouter.ai') !== -1) {
|
||||
init = init ? Object.assign({}, init) : {};
|
||||
var prev = init.headers;
|
||||
var flat = {};
|
||||
if (prev && typeof prev.forEach === 'function') {
|
||||
prev.forEach(function (v, k) { flat[k] = v; });
|
||||
} else if (prev && typeof prev === 'object') {
|
||||
Object.assign(flat, prev);
|
||||
}
|
||||
delete flat['http-referer'];
|
||||
delete flat['HTTP-Referer'];
|
||||
delete flat['x-title'];
|
||||
delete flat['X-Title'];
|
||||
flat['HTTP-Referer'] = 'https://claw-x.com';
|
||||
flat['X-Title'] = 'ClawX';
|
||||
init.headers = flat;
|
||||
}
|
||||
return _f.call(globalThis, input, init);
|
||||
};
|
||||
|
||||
if (process.platform === 'win32') {
|
||||
try {
|
||||
var cp = require('child_process');
|
||||
if (!cp.__clawxPatched) {
|
||||
cp.__clawxPatched = true;
|
||||
['spawn', 'exec', 'execFile', 'fork', 'spawnSync', 'execSync', 'execFileSync'].forEach(function(method) {
|
||||
var original = cp[method];
|
||||
if (typeof original !== 'function') return;
|
||||
cp[method] = function() {
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
var optIdx = -1;
|
||||
for (var i = 1; i < args.length; i++) {
|
||||
var a = args[i];
|
||||
if (a && typeof a === 'object' && !Array.isArray(a)) {
|
||||
optIdx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (optIdx >= 0) {
|
||||
args[optIdx].windowsHide = true;
|
||||
} else {
|
||||
var opts = { windowsHide: true };
|
||||
if (typeof args[args.length - 1] === 'function') {
|
||||
args.splice(args.length - 1, 0, opts);
|
||||
} else {
|
||||
args.push(opts);
|
||||
}
|
||||
}
|
||||
return original.apply(this, args);
|
||||
};
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
})();
|
||||
`;
|
||||
|
||||
function ensureGatewayFetchPreload(): string {
|
||||
const dest = path.join(app.getPath('userData'), 'gateway-fetch-preload.cjs');
|
||||
try {
|
||||
writeFileSync(dest, GATEWAY_FETCH_PRELOAD_SOURCE, 'utf-8');
|
||||
} catch {
|
||||
// best-effort
|
||||
}
|
||||
return dest;
|
||||
}
|
||||
|
||||
export async function launchGatewayProcess(options: {
|
||||
port: number;
|
||||
launchContext: GatewayLaunchContext;
|
||||
sanitizeSpawnArgs: (args: string[]) => string[];
|
||||
getCurrentState: () => GatewayLifecycleState;
|
||||
getShouldReconnect: () => boolean;
|
||||
onStderrLine: (line: string) => void;
|
||||
onSpawn: (pid: number | undefined) => void;
|
||||
onExit: (child: Electron.UtilityProcess, code: number | null) => void;
|
||||
onError: (error: Error) => void;
|
||||
}): Promise<{ child: Electron.UtilityProcess; lastSpawnSummary: string }> {
|
||||
const {
|
||||
openclawDir,
|
||||
entryScript,
|
||||
gatewayArgs,
|
||||
forkEnv,
|
||||
mode,
|
||||
binPathExists,
|
||||
loadedProviderKeyCount,
|
||||
proxySummary,
|
||||
channelStartupSummary,
|
||||
} = options.launchContext;
|
||||
|
||||
logger.info(
|
||||
`Starting Gateway process (mode=${mode}, port=${options.port}, entry="${entryScript}", args="${options.sanitizeSpawnArgs(gatewayArgs).join(' ')}", cwd="${openclawDir}", bundledBin=${binPathExists ? 'yes' : 'no'}, providerKeys=${loadedProviderKeyCount}, channels=${channelStartupSummary}, proxy=${proxySummary})`,
|
||||
);
|
||||
const lastSpawnSummary = `mode=${mode}, entry="${entryScript}", args="${options.sanitizeSpawnArgs(gatewayArgs).join(' ')}", cwd="${openclawDir}"`;
|
||||
|
||||
const runtimeEnv = { ...forkEnv };
|
||||
if (!app.isPackaged) {
|
||||
try {
|
||||
const preloadPath = ensureGatewayFetchPreload();
|
||||
if (existsSync(preloadPath)) {
|
||||
runtimeEnv.NODE_OPTIONS = appendNodeRequireToNodeOptions(
|
||||
runtimeEnv.NODE_OPTIONS,
|
||||
preloadPath,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('Failed to set up OpenRouter headers preload:', err);
|
||||
}
|
||||
}
|
||||
|
||||
return await new Promise<{ child: Electron.UtilityProcess; lastSpawnSummary: string }>((resolve, reject) => {
|
||||
const child = utilityProcess.fork(entryScript, gatewayArgs, {
|
||||
cwd: openclawDir,
|
||||
stdio: 'pipe',
|
||||
env: runtimeEnv as NodeJS.ProcessEnv,
|
||||
serviceName: 'OpenClaw Gateway',
|
||||
});
|
||||
|
||||
let settled = false;
|
||||
const resolveOnce = () => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
resolve({ child, lastSpawnSummary });
|
||||
};
|
||||
const rejectOnce = (error: Error) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
reject(error);
|
||||
};
|
||||
|
||||
child.on('error', (error) => {
|
||||
logger.error('Gateway process spawn error:', error);
|
||||
options.onError(error);
|
||||
rejectOnce(error);
|
||||
});
|
||||
|
||||
child.on('exit', (code: number) => {
|
||||
const expectedExit = !options.getShouldReconnect() || options.getCurrentState() === 'stopped';
|
||||
const level = expectedExit ? logger.info : logger.warn;
|
||||
level(`Gateway process exited (code=${code}, expected=${expectedExit ? 'yes' : 'no'})`);
|
||||
options.onExit(child, code);
|
||||
});
|
||||
|
||||
child.stderr?.on('data', (data) => {
|
||||
const raw = data.toString();
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
options.onStderrLine(line);
|
||||
}
|
||||
});
|
||||
|
||||
child.on('spawn', () => {
|
||||
logger.info(`Gateway process started (pid=${child.pid})`);
|
||||
options.onSpawn(child.pid);
|
||||
resolveOnce();
|
||||
});
|
||||
});
|
||||
}
|
||||
106
electron/gateway/startup-orchestrator.ts
Normal file
106
electron/gateway/startup-orchestrator.ts
Normal file
@@ -0,0 +1,106 @@
|
||||
import { logger } from '../utils/logger';
|
||||
import { LifecycleSupersededError } from './lifecycle-controller';
|
||||
import { getGatewayStartupRecoveryAction } from './startup-recovery';
|
||||
|
||||
export interface ExistingGatewayInfo {
|
||||
port: number;
|
||||
externalToken?: string;
|
||||
}
|
||||
|
||||
type StartupHooks = {
|
||||
port: number;
|
||||
ownedPid?: number;
|
||||
shouldWaitForPortFree: boolean;
|
||||
maxStartAttempts?: number;
|
||||
resetStartupStderrLines: () => void;
|
||||
getStartupStderrLines: () => string[];
|
||||
assertLifecycle: (phase: string) => void;
|
||||
findExistingGateway: (port: number, ownedPid?: number) => Promise<ExistingGatewayInfo | null>;
|
||||
connect: (port: number, externalToken?: string) => Promise<void>;
|
||||
onConnectedToExistingGateway: () => void;
|
||||
waitForPortFree: (port: number) => Promise<void>;
|
||||
startProcess: () => Promise<void>;
|
||||
waitForReady: (port: number) => Promise<void>;
|
||||
onConnectedToManagedGateway: () => void;
|
||||
runDoctorRepair: () => Promise<boolean>;
|
||||
onDoctorRepairSuccess: () => void;
|
||||
delay: (ms: number) => Promise<void>;
|
||||
};
|
||||
|
||||
export async function runGatewayStartupSequence(hooks: StartupHooks): Promise<void> {
|
||||
let configRepairAttempted = false;
|
||||
let startAttempts = 0;
|
||||
const maxStartAttempts = hooks.maxStartAttempts ?? 3;
|
||||
|
||||
while (true) {
|
||||
startAttempts++;
|
||||
hooks.assertLifecycle('start');
|
||||
hooks.resetStartupStderrLines();
|
||||
|
||||
try {
|
||||
logger.debug('Checking for existing Gateway...');
|
||||
const existing = await hooks.findExistingGateway(hooks.port, hooks.ownedPid);
|
||||
hooks.assertLifecycle('start/find-existing');
|
||||
if (existing) {
|
||||
logger.debug(`Found existing Gateway on port ${existing.port}`);
|
||||
await hooks.connect(existing.port, existing.externalToken);
|
||||
hooks.assertLifecycle('start/connect-existing');
|
||||
hooks.onConnectedToExistingGateway();
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug('No existing Gateway found, starting new process...');
|
||||
|
||||
if (hooks.shouldWaitForPortFree) {
|
||||
await hooks.waitForPortFree(hooks.port);
|
||||
hooks.assertLifecycle('start/wait-port');
|
||||
}
|
||||
|
||||
await hooks.startProcess();
|
||||
hooks.assertLifecycle('start/start-process');
|
||||
|
||||
await hooks.waitForReady(hooks.port);
|
||||
hooks.assertLifecycle('start/wait-ready');
|
||||
|
||||
await hooks.connect(hooks.port);
|
||||
hooks.assertLifecycle('start/connect');
|
||||
|
||||
hooks.onConnectedToManagedGateway();
|
||||
return;
|
||||
} catch (error) {
|
||||
if (error instanceof LifecycleSupersededError) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const recoveryAction = getGatewayStartupRecoveryAction({
|
||||
startupError: error,
|
||||
startupStderrLines: hooks.getStartupStderrLines(),
|
||||
configRepairAttempted,
|
||||
attempt: startAttempts,
|
||||
maxAttempts: maxStartAttempts,
|
||||
});
|
||||
|
||||
if (recoveryAction === 'repair') {
|
||||
configRepairAttempted = true;
|
||||
logger.warn(
|
||||
'Detected invalid OpenClaw config during Gateway startup; running doctor repair before retry',
|
||||
);
|
||||
const repaired = await hooks.runDoctorRepair();
|
||||
if (repaired) {
|
||||
logger.info('OpenClaw doctor repair completed; retrying Gateway startup');
|
||||
hooks.onDoctorRepairSuccess();
|
||||
continue;
|
||||
}
|
||||
logger.error('OpenClaw doctor repair failed; not retrying Gateway startup');
|
||||
}
|
||||
|
||||
if (recoveryAction === 'retry') {
|
||||
logger.warn(`Transient start error: ${String(error)}. Retrying... (${startAttempts}/${maxStartAttempts})`);
|
||||
await hooks.delay(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -20,17 +20,6 @@ export function warmupManagedPythonReadiness(): void {
|
||||
});
|
||||
}
|
||||
|
||||
export function isTransientGatewayStartError(error: unknown): boolean {
|
||||
const errMsg = String(error);
|
||||
return (
|
||||
errMsg.includes('WebSocket closed before handshake') ||
|
||||
errMsg.includes('ECONNREFUSED') ||
|
||||
errMsg.includes('Gateway process exited before becoming ready') ||
|
||||
errMsg.includes('Timed out waiting for connect.challenge') ||
|
||||
errMsg.includes('Connect handshake timeout')
|
||||
);
|
||||
}
|
||||
|
||||
export async function terminateOwnedGatewayProcess(child: Electron.UtilityProcess): Promise<void> {
|
||||
let exited = false;
|
||||
|
||||
|
||||
@@ -59,6 +59,43 @@ export async function probeGatewayReady(
|
||||
});
|
||||
}
|
||||
|
||||
export async function waitForGatewayReady(options: {
|
||||
port: number;
|
||||
getProcessExitCode: () => number | null;
|
||||
retries?: number;
|
||||
intervalMs?: number;
|
||||
}): Promise<void> {
|
||||
const retries = options.retries ?? 2400;
|
||||
const intervalMs = options.intervalMs ?? 200;
|
||||
|
||||
for (let i = 0; i < retries; i++) {
|
||||
const exitCode = options.getProcessExitCode();
|
||||
if (exitCode !== null) {
|
||||
logger.error(`Gateway process exited before ready (code=${exitCode})`);
|
||||
throw new Error(`Gateway process exited before becoming ready (code=${exitCode})`);
|
||||
}
|
||||
|
||||
try {
|
||||
const ready = await probeGatewayReady(options.port, 1500);
|
||||
if (ready) {
|
||||
logger.debug(`Gateway ready after ${i + 1} attempt(s)`);
|
||||
return;
|
||||
}
|
||||
} catch {
|
||||
// Gateway not ready yet.
|
||||
}
|
||||
|
||||
if (i > 0 && i % 10 === 0) {
|
||||
logger.debug(`Still waiting for Gateway... (attempt ${i + 1}/${retries})`);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, intervalMs));
|
||||
}
|
||||
|
||||
logger.error(`Gateway failed to become ready after ${retries} attempts on port ${options.port}`);
|
||||
throw new Error(`Gateway failed to start after ${retries} retries (port ${options.port})`);
|
||||
}
|
||||
|
||||
export function buildGatewayConnectFrame(options: {
|
||||
challengeNonce: string;
|
||||
token: string;
|
||||
|
||||
Reference in New Issue
Block a user