1847 lines
57 KiB
TypeScript
1847 lines
57 KiB
TypeScript
import { randomUUID } from 'node:crypto';
|
|
import { EventEmitter } from 'node:events';
|
|
import { createServer } from 'node:net';
|
|
import { join } from 'node:path';
|
|
import { BrowserWindow } from 'electron';
|
|
import { windowManager } from '@electron/service/window-service';
|
|
import logManager from '@electron/service/logger';
|
|
import configManager from '@electron/service/config-service';
|
|
import { updateTrayStatus } from '@electron/service/tray';
|
|
import { getUserDataDir } from '@electron/utils/paths';
|
|
import { captureTelemetryEvent, trackMetric } from '@electron/utils/telemetry';
|
|
import WebSocket from 'ws';
|
|
import {
|
|
loadOrCreateDeviceIdentity,
|
|
type DeviceIdentity,
|
|
} from '@electron/utils/device-identity';
|
|
import { CONFIG_KEYS } from '@runtime/lib/constants';
|
|
import { normalizeAgentSessionKey } from '@runtime/lib/models';
|
|
import type { ContentBlock, RawMessage } from '@runtime/shared/chat-model';
|
|
import type { GatewayEvent, GatewayRpcParams, RuntimeRefreshTopic } from './types';
|
|
import {
|
|
createInitialGatewayDiagnostics,
|
|
type GatewayDiagnosticsSnapshot,
|
|
} from './diagnostics';
|
|
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
|
|
import { OpenClawProcessOwner } from './openclaw-process-owner';
|
|
import { launchGatewayProcess } from './process-launcher';
|
|
import { prepareGatewayLaunchContext } from './config-sync';
|
|
import {
|
|
DEFAULT_RECONNECT_CONFIG,
|
|
type ReconnectConfig,
|
|
type GatewayLifecycleState,
|
|
getReconnectScheduleDecision,
|
|
getReconnectSkipReason,
|
|
} from './process-policy';
|
|
import { isNotification, isResponse, type JsonRpcNotification } from './protocol';
|
|
import {
|
|
clearPendingGatewayRequests,
|
|
rejectPendingGatewayRequest,
|
|
resolvePendingGatewayRequest,
|
|
type PendingGatewayRequest,
|
|
} from './request-store';
|
|
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
|
|
import { runGatewayStartupSequence } from './startup-orchestrator';
|
|
import {
|
|
findExistingGatewayProcess,
|
|
runOpenClawDoctorRepair,
|
|
terminateOwnedGatewayProcess,
|
|
unloadLaunchctlGatewayService,
|
|
waitForPortFree,
|
|
warmupManagedPythonReadiness,
|
|
} from './supervisor';
|
|
import type { GatewayProcessHandle } from './process-handle';
|
|
import { GatewayConnectionMonitor } from './connection-monitor';
|
|
import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller';
|
|
import { GatewayRestartController } from './restart-controller';
|
|
import { GatewayRestartGovernor } from './restart-governor';
|
|
import {
|
|
DEFAULT_GATEWAY_RELOAD_POLICY,
|
|
loadGatewayReloadPolicy,
|
|
type GatewayReloadPolicy,
|
|
} from './reload-policy';
|
|
import { GatewayStateController, type GatewayRuntimeStatus } from './state';
|
|
import { connectGatewaySocket, waitForGatewayReady } from './ws-client';
|
|
import { dispatchGatewayRpcMethod } from './rpc-dispatch';
|
|
|
|
type RuntimeChangeBroadcast = {
|
|
topics: RuntimeRefreshTopic[];
|
|
reason?: string;
|
|
warnings?: string[];
|
|
channelType?: string;
|
|
accountId?: string;
|
|
};
|
|
|
|
type GatewayStatus = 'connected' | 'disconnected' | 'reconnecting';
|
|
|
|
type GatewayResponseFrame = {
|
|
type?: string;
|
|
id?: string;
|
|
ok?: boolean;
|
|
payload?: unknown;
|
|
error?: unknown;
|
|
};
|
|
|
|
type GatewayEventFrame = {
|
|
type?: string;
|
|
event?: string;
|
|
payload?: unknown;
|
|
};
|
|
|
|
type GatewayNotificationEvent =
|
|
| JsonRpcNotification
|
|
| {
|
|
method: string;
|
|
params?: unknown;
|
|
};
|
|
|
|
export interface GatewayManagerEvents {
|
|
status: (status: GatewayRuntimeStatus) => void;
|
|
message: (message: unknown) => void;
|
|
notification: (notification: GatewayNotificationEvent) => void;
|
|
exit: (code: number | null) => void;
|
|
error: (error: Error) => void;
|
|
'channel:status': (data: { channelId: string; status: string }) => void;
|
|
'chat:message': (data: { message: unknown }) => void;
|
|
'gateway:ready': (payload: unknown) => void;
|
|
}
|
|
|
|
function isRecord(value: unknown): value is Record<string, unknown> {
|
|
return typeof value === 'object' && value !== null;
|
|
}
|
|
|
|
function toErrorMessage(error: unknown): string {
|
|
if (error instanceof Error) {
|
|
return error.message;
|
|
}
|
|
return String(error);
|
|
}
|
|
|
|
function isTransportRpcFailure(error: unknown): boolean {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
return message.includes('Gateway RPC timed out:')
|
|
|| message.includes('OpenClaw Gateway socket is not connected')
|
|
|| message.includes('Gateway request cancelled:')
|
|
|| message.includes('Failed to send Gateway RPC');
|
|
}
|
|
|
|
function normalizeTimestamp(value: unknown): number | undefined {
|
|
if (typeof value === 'number' && Number.isFinite(value)) {
|
|
return value;
|
|
}
|
|
|
|
if (typeof value === 'string') {
|
|
const parsed = Date.parse(value);
|
|
if (Number.isFinite(parsed)) {
|
|
return parsed;
|
|
}
|
|
}
|
|
|
|
return undefined;
|
|
}
|
|
|
|
function normalizeMessageRole(value: unknown): RawMessage['role'] {
|
|
const normalized = typeof value === 'string' ? value.trim().toLowerCase() : '';
|
|
switch (normalized) {
|
|
case 'user':
|
|
case 'assistant':
|
|
case 'system':
|
|
case 'tool_result':
|
|
case 'toolresult':
|
|
return normalized === 'tool_result' ? 'tool_result' : (normalized as RawMessage['role']);
|
|
case 'tool':
|
|
return 'toolresult';
|
|
default:
|
|
return 'assistant';
|
|
}
|
|
}
|
|
|
|
function toTextContentBlocks(text: string): ContentBlock[] {
|
|
return text
|
|
? [{ type: 'text', text }]
|
|
: [];
|
|
}
|
|
|
|
function normalizeGatewayRawMessage(
|
|
value: unknown,
|
|
options?: { preferContentBlocks?: boolean },
|
|
): RawMessage | null {
|
|
if (!isRecord(value)) {
|
|
return null;
|
|
}
|
|
|
|
const rawContent = value.content;
|
|
let content: RawMessage['content'] = '';
|
|
|
|
if (typeof rawContent === 'string') {
|
|
content = options?.preferContentBlocks ? toTextContentBlocks(rawContent) : rawContent;
|
|
} else if (Array.isArray(rawContent)) {
|
|
content = rawContent as ContentBlock[];
|
|
} else if (typeof value.text === 'string') {
|
|
content = toTextContentBlocks(value.text);
|
|
}
|
|
|
|
return {
|
|
...(value as Partial<RawMessage>),
|
|
role: normalizeMessageRole(value.role),
|
|
content,
|
|
timestamp: normalizeTimestamp(value.timestamp),
|
|
};
|
|
}
|
|
|
|
function extractTextFromMessageContent(content: RawMessage['content'] | unknown): string {
|
|
if (typeof content === 'string') {
|
|
return content;
|
|
}
|
|
|
|
if (!Array.isArray(content)) {
|
|
return '';
|
|
}
|
|
|
|
return content
|
|
.map((block) => {
|
|
if (!isRecord(block)) {
|
|
return '';
|
|
}
|
|
|
|
if (typeof block.text === 'string') {
|
|
return block.text;
|
|
}
|
|
|
|
if (typeof block.thinking === 'string') {
|
|
return block.thinking;
|
|
}
|
|
|
|
if (typeof block.content === 'string') {
|
|
return block.content;
|
|
}
|
|
|
|
return '';
|
|
})
|
|
.filter(Boolean)
|
|
.join('\n');
|
|
}
|
|
|
|
function extractTextFromRawMessage(message: RawMessage): string {
|
|
return extractTextFromMessageContent(message.content);
|
|
}
|
|
|
|
function extractTextFromGatewayPayload(payload: Record<string, unknown>): string {
|
|
if (typeof payload.delta === 'string') {
|
|
return payload.delta;
|
|
}
|
|
|
|
if (typeof payload.text === 'string') {
|
|
return payload.text;
|
|
}
|
|
|
|
const normalizedMessage = normalizeGatewayRawMessage(payload.message);
|
|
if (normalizedMessage) {
|
|
return extractTextFromRawMessage(normalizedMessage);
|
|
}
|
|
|
|
return '';
|
|
}
|
|
|
|
function buildGatewayRpcError(error: unknown, fallback: string): Error {
|
|
if (typeof error === 'string' && error.trim()) {
|
|
return new Error(error);
|
|
}
|
|
|
|
if (isRecord(error) && typeof error.message === 'string' && error.message.trim()) {
|
|
return new Error(error.message);
|
|
}
|
|
|
|
return new Error(fallback);
|
|
}
|
|
|
|
async function findAvailablePort(): Promise<number> {
|
|
return await new Promise<number>((resolve, reject) => {
|
|
const server = createServer();
|
|
|
|
server.once('error', reject);
|
|
server.listen(0, '127.0.0.1', () => {
|
|
const address = server.address();
|
|
const port = typeof address === 'object' && address ? address.port : 0;
|
|
server.close((error) => {
|
|
if (error) {
|
|
reject(error);
|
|
return;
|
|
}
|
|
|
|
if (!port) {
|
|
reject(new Error('Failed to allocate an available Gateway port'));
|
|
return;
|
|
}
|
|
|
|
resolve(port);
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
export class GatewayManager extends EventEmitter {
|
|
private initialized = false;
|
|
private initPromise: Promise<void> | null = null;
|
|
private startPromise: Promise<void> | null = null;
|
|
private stopPromise: Promise<void> | null = null;
|
|
private status: GatewayStatus = 'disconnected';
|
|
private lifecycleState: GatewayLifecycleState = 'stopped';
|
|
private gatewayStatusDetails: GatewayRuntimeStatus = { state: 'stopped', port: null };
|
|
private readonly mode = 'openclaw' as const;
|
|
private readonly processOwner = new OpenClawProcessOwner();
|
|
private readonly pendingRequests = new Map<string, PendingGatewayRequest>();
|
|
private readonly deltaSnapshots = new Map<string, string>();
|
|
private gatewayToken = randomUUID();
|
|
private socket: WebSocket | null = null;
|
|
private child: GatewayProcessHandle | null = null;
|
|
private port: number | null = null;
|
|
private exitCode: number | null = null;
|
|
private lastError?: string;
|
|
private stopping = false;
|
|
private deviceIdentity: DeviceIdentity | null = null;
|
|
private ownsProcess = false;
|
|
private externalShutdownSupported: boolean | null = null;
|
|
private reconnectTimer: NodeJS.Timeout | null = null;
|
|
private reconnectAttempts = 0;
|
|
private reconnectAttemptsTotal = 0;
|
|
private reconnectSuccessTotal = 0;
|
|
private reconnectConfig: ReconnectConfig;
|
|
private shouldReconnect = true;
|
|
private startLock = false;
|
|
private restartInFlight: Promise<void> | null = null;
|
|
private readonly connectionMonitor = new GatewayConnectionMonitor();
|
|
private readonly lifecycleController = new GatewayLifecycleController();
|
|
private readonly restartController = new GatewayRestartController();
|
|
private readonly restartGovernor = new GatewayRestartGovernor();
|
|
private reloadDebounceTimer: NodeJS.Timeout | null = null;
|
|
private reloadPolicy: GatewayReloadPolicy = { ...DEFAULT_GATEWAY_RELOAD_POLICY };
|
|
private reloadPolicyLoadedAt = 0;
|
|
private reloadPolicyRefreshPromise: Promise<void> | null = null;
|
|
private isAutoReconnectStart = false;
|
|
private pendingRuntimeChange: RuntimeChangeBroadcast | null = null;
|
|
private lastSpawnSummary: string | null = null;
|
|
private recentStartupStderrLines: string[] = [];
|
|
private readonly stateController: GatewayStateController;
|
|
private gatewayReadyFallbackTimer: NodeJS.Timeout | null = null;
|
|
private diagnostics: GatewayDiagnosticsSnapshot = createInitialGatewayDiagnostics();
|
|
|
|
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
|
|
private static readonly HEARTBEAT_INTERVAL_MS = 30_000;
|
|
private static readonly HEARTBEAT_TIMEOUT_MS = 12_000;
|
|
private static readonly HEARTBEAT_MAX_MISSES = 3;
|
|
private static readonly HEARTBEAT_INTERVAL_MS_WIN = 60_000;
|
|
private static readonly HEARTBEAT_TIMEOUT_MS_WIN = 25_000;
|
|
private static readonly HEARTBEAT_MAX_MISSES_WIN = 5;
|
|
private static readonly GATEWAY_READY_FALLBACK_MS = 30_000;
|
|
|
|
override on(eventName: 'status', listener: GatewayManagerEvents['status']): this;
|
|
override on(eventName: 'message', listener: GatewayManagerEvents['message']): this;
|
|
override on(eventName: 'notification', listener: GatewayManagerEvents['notification']): this;
|
|
override on(eventName: 'exit', listener: GatewayManagerEvents['exit']): this;
|
|
override on(eventName: 'error', listener: GatewayManagerEvents['error']): this;
|
|
override on(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this;
|
|
override on(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this;
|
|
override on(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this;
|
|
override on(eventName: string | symbol, listener: (...args: any[]) => void): this;
|
|
override on(eventName: string | symbol, listener: (...args: any[]) => void): this {
|
|
return super.on(eventName, listener);
|
|
}
|
|
|
|
override once(eventName: 'status', listener: GatewayManagerEvents['status']): this;
|
|
override once(eventName: 'message', listener: GatewayManagerEvents['message']): this;
|
|
override once(eventName: 'notification', listener: GatewayManagerEvents['notification']): this;
|
|
override once(eventName: 'exit', listener: GatewayManagerEvents['exit']): this;
|
|
override once(eventName: 'error', listener: GatewayManagerEvents['error']): this;
|
|
override once(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this;
|
|
override once(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this;
|
|
override once(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this;
|
|
override once(eventName: string | symbol, listener: (...args: any[]) => void): this;
|
|
override once(eventName: string | symbol, listener: (...args: any[]) => void): this {
|
|
return super.once(eventName, listener);
|
|
}
|
|
|
|
override off(eventName: 'status', listener: GatewayManagerEvents['status']): this;
|
|
override off(eventName: 'message', listener: GatewayManagerEvents['message']): this;
|
|
override off(eventName: 'notification', listener: GatewayManagerEvents['notification']): this;
|
|
override off(eventName: 'exit', listener: GatewayManagerEvents['exit']): this;
|
|
override off(eventName: 'error', listener: GatewayManagerEvents['error']): this;
|
|
override off(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this;
|
|
override off(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this;
|
|
override off(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this;
|
|
override off(eventName: string | symbol, listener: (...args: any[]) => void): this;
|
|
override off(eventName: string | symbol, listener: (...args: any[]) => void): this {
|
|
return super.off(eventName, listener);
|
|
}
|
|
|
|
constructor(config?: Partial<ReconnectConfig>) {
|
|
super();
|
|
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
|
|
this.stateController = new GatewayStateController({
|
|
emitStatus: (status) => {
|
|
this.gatewayStatusDetails = status;
|
|
this.lifecycleState = status.state;
|
|
this.status = this.toGatewayStatus(status.state);
|
|
this.emit('status', { ...status });
|
|
updateTrayStatus(this.status);
|
|
this.broadcast({ type: 'gateway:status', status: this.status });
|
|
},
|
|
onTransition: (previousState, nextState) => {
|
|
if (nextState === 'running') {
|
|
this.restartGovernor.onRunning();
|
|
}
|
|
this.restartController.flushDeferredRestart(
|
|
`status:${previousState}->${nextState}`,
|
|
{
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
shouldReconnect: this.shouldReconnect,
|
|
},
|
|
() => {
|
|
void this.restart().catch((error) => {
|
|
logManager.warn('Deferred Gateway restart failed:', error);
|
|
});
|
|
},
|
|
);
|
|
},
|
|
});
|
|
|
|
this.on('gateway:ready', () => {
|
|
this.clearGatewayReadyFallback();
|
|
if (this.lifecycleState === 'running' && !this.gatewayStatusDetails.gatewayReady) {
|
|
logManager.info('Gateway subsystems ready (event received)');
|
|
this.setGatewayState({ gatewayReady: true });
|
|
}
|
|
});
|
|
|
|
this.on('error', (error) => {
|
|
logManager.debug('GatewayManager emitted error event', error);
|
|
});
|
|
}
|
|
|
|
private sanitizeSpawnArgs(args: string[]): string[] {
|
|
const sanitized = [...args];
|
|
const tokenIndex = sanitized.indexOf('--token');
|
|
if (tokenIndex >= 0 && tokenIndex + 1 < sanitized.length) {
|
|
sanitized[tokenIndex + 1] = '<redacted>';
|
|
}
|
|
return sanitized;
|
|
}
|
|
|
|
private isUnsupportedShutdownError(error: unknown): boolean {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
return /unknown method:\s*shutdown/i.test(message);
|
|
}
|
|
|
|
private toGatewayStatus(state: GatewayLifecycleState): GatewayStatus {
|
|
switch (state) {
|
|
case 'running':
|
|
return 'connected';
|
|
case 'starting':
|
|
case 'reconnecting':
|
|
return 'reconnecting';
|
|
case 'stopped':
|
|
case 'error':
|
|
default:
|
|
return 'disconnected';
|
|
}
|
|
}
|
|
|
|
private setGatewayState(update: Partial<GatewayRuntimeStatus>): void {
|
|
this.stateController.setStatus(update);
|
|
}
|
|
|
|
private queueRuntimeChange(change?: RuntimeChangeBroadcast): void {
|
|
if (!change) {
|
|
return;
|
|
}
|
|
|
|
if (!this.pendingRuntimeChange) {
|
|
this.pendingRuntimeChange = {
|
|
topics: [...change.topics],
|
|
reason: change.reason,
|
|
warnings: change.warnings ? [...change.warnings] : undefined,
|
|
channelType: change.channelType,
|
|
accountId: change.accountId,
|
|
};
|
|
return;
|
|
}
|
|
|
|
this.pendingRuntimeChange = {
|
|
topics: Array.from(new Set([...this.pendingRuntimeChange.topics, ...change.topics])),
|
|
reason: change.reason ?? this.pendingRuntimeChange.reason,
|
|
warnings: Array.from(new Set([
|
|
...(this.pendingRuntimeChange.warnings ?? []),
|
|
...(change.warnings ?? []),
|
|
])),
|
|
channelType: change.channelType ?? this.pendingRuntimeChange.channelType,
|
|
accountId: change.accountId ?? this.pendingRuntimeChange.accountId,
|
|
};
|
|
}
|
|
|
|
private takeQueuedRuntimeChange(): RuntimeChangeBroadcast | undefined {
|
|
if (!this.pendingRuntimeChange) {
|
|
return undefined;
|
|
}
|
|
|
|
const change = this.pendingRuntimeChange;
|
|
this.pendingRuntimeChange = null;
|
|
return change;
|
|
}
|
|
|
|
private flushQueuedRuntimeChangeFailure(prefix: string, error: unknown): void {
|
|
const runtimeChange = this.takeQueuedRuntimeChange();
|
|
if (!runtimeChange) {
|
|
return;
|
|
}
|
|
|
|
const warning = `${prefix}: ${toErrorMessage(error)}`;
|
|
this.notifyRuntimeChanged({
|
|
...runtimeChange,
|
|
warnings: [...(runtimeChange.warnings ?? []), warning],
|
|
});
|
|
}
|
|
|
|
private async initDeviceIdentity(): Promise<void> {
|
|
if (this.deviceIdentity) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const identityPath = join(getUserDataDir(), 'openclaw-device-identity.json');
|
|
this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath);
|
|
logManager.info('OpenClaw Gateway device identity loaded', {
|
|
deviceId: this.deviceIdentity.deviceId,
|
|
});
|
|
} catch (error) {
|
|
logManager.warn('Failed to load OpenClaw device identity; scopes may be limited:', error);
|
|
}
|
|
}
|
|
|
|
private async terminateChild(child: GatewayProcessHandle): Promise<void> {
|
|
await terminateOwnedGatewayProcess(child);
|
|
}
|
|
|
|
private async disposeTransport(reason: string): Promise<void> {
|
|
const socket = this.socket;
|
|
this.socket = null;
|
|
this.clearGatewayReadyFallback();
|
|
|
|
if (socket) {
|
|
try {
|
|
socket.terminate();
|
|
} catch (error) {
|
|
logManager.warn(`Failed to close OpenClaw Gateway socket during ${reason}:`, error);
|
|
}
|
|
}
|
|
|
|
const child = this.child;
|
|
this.child = null;
|
|
this.port = null;
|
|
this.ownsProcess = false;
|
|
if (child && this.exitCode === null) {
|
|
this.exitCode = -1;
|
|
} else if (!child) {
|
|
this.exitCode = null;
|
|
}
|
|
this.deltaSnapshots.clear();
|
|
|
|
clearPendingGatewayRequests(
|
|
this.pendingRequests,
|
|
new Error(`Gateway request cancelled: ${reason}`),
|
|
);
|
|
|
|
this.setGatewayState({
|
|
port: this.port,
|
|
pid: undefined,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
gatewayReady: undefined,
|
|
});
|
|
|
|
if (child) {
|
|
await this.terminateChild(child);
|
|
}
|
|
}
|
|
|
|
private handleGatewayProcessExit(child: GatewayProcessHandle, code: number | null): void {
|
|
if (this.child !== child) {
|
|
return;
|
|
}
|
|
|
|
this.exitCode = code ?? -1;
|
|
this.child = null;
|
|
this.ownsProcess = false;
|
|
this.connectionMonitor.clear();
|
|
this.clearGatewayReadyFallback();
|
|
this.emit('exit', code);
|
|
|
|
if (this.stopping) {
|
|
return;
|
|
}
|
|
|
|
this.lastError = `OpenClaw Gateway exited unexpectedly (code=${code ?? 'unknown'})`;
|
|
this.socket = null;
|
|
this.deltaSnapshots.clear();
|
|
clearPendingGatewayRequests(
|
|
this.pendingRequests,
|
|
new Error(this.lastError),
|
|
);
|
|
this.setGatewayState({
|
|
state: 'stopped',
|
|
port: this.port,
|
|
pid: undefined,
|
|
error: this.lastError,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
gatewayReady: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
});
|
|
logManager.warn(this.lastError);
|
|
this.scheduleReconnect();
|
|
}
|
|
|
|
private handleGatewayProcessError(error: Error): void {
|
|
this.lastError = toErrorMessage(error);
|
|
logManager.error('OpenClaw Gateway process error:', error);
|
|
this.emit('error', error);
|
|
}
|
|
|
|
private async spawnGatewayProcess(): Promise<void> {
|
|
if (this.port === null) {
|
|
throw new Error('OpenClaw Gateway port not allocated');
|
|
}
|
|
|
|
const launchContext = await prepareGatewayLaunchContext({
|
|
port: this.port,
|
|
token: this.gatewayToken,
|
|
});
|
|
await unloadLaunchctlGatewayService();
|
|
|
|
const stderrDedup = new Map<string, number>();
|
|
const { child, lastSpawnSummary } = await launchGatewayProcess({
|
|
port: this.port,
|
|
launchContext,
|
|
sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args),
|
|
onStdoutLine: (line) => {
|
|
logManager.debug(`[OpenClaw stdout] ${line}`);
|
|
},
|
|
onStderrLine: (line) => {
|
|
recordGatewayStartupStderrLine(this.recentStartupStderrLines, line);
|
|
const classified = classifyGatewayStderrMessage(line);
|
|
if (classified.level === 'drop') {
|
|
return;
|
|
}
|
|
|
|
const count = (stderrDedup.get(classified.normalized) ?? 0) + 1;
|
|
stderrDedup.set(classified.normalized, count);
|
|
if (count > 1) {
|
|
if (count % 50 === 0) {
|
|
logManager.debug(`[Gateway stderr] (suppressed ${count} repeats) ${classified.normalized}`);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (classified.level === 'debug') {
|
|
logManager.debug(`[Gateway stderr] ${classified.normalized}`);
|
|
return;
|
|
}
|
|
|
|
logManager.warn(`[Gateway stderr] ${classified.normalized}`);
|
|
},
|
|
onExit: (exitedChild, code) => {
|
|
this.handleGatewayProcessExit(exitedChild, code);
|
|
},
|
|
onError: (error) => {
|
|
this.handleGatewayProcessError(error);
|
|
},
|
|
});
|
|
|
|
this.child = child;
|
|
this.ownsProcess = true;
|
|
this.lastSpawnSummary = lastSpawnSummary;
|
|
this.setGatewayState({
|
|
port: this.port,
|
|
pid: child.pid ?? undefined,
|
|
error: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
});
|
|
}
|
|
|
|
private async connectToGateway(port: number, externalToken?: string): Promise<void> {
|
|
const token = externalToken ?? this.gatewayToken;
|
|
const socket = await connectGatewaySocket({
|
|
port,
|
|
token,
|
|
deviceIdentity: this.deviceIdentity,
|
|
platform: process.platform,
|
|
onMessage: (message) => this.handleGatewayFrame(message),
|
|
onCloseAfterHandshake: (socket, code) => this.handleGatewaySocketClosed(socket, code),
|
|
});
|
|
this.socket = socket;
|
|
socket.on('pong', () => {
|
|
this.connectionMonitor.markAlive('pong');
|
|
this.recordGatewayAlive();
|
|
});
|
|
this.recordGatewayAlive();
|
|
this.connectionMonitor.markAlive('message');
|
|
this.setGatewayState({
|
|
state: 'running',
|
|
port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: undefined,
|
|
connectedAt: Date.now(),
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
gatewayReady: false,
|
|
});
|
|
this.startPing();
|
|
this.startHealthCheck();
|
|
this.scheduleGatewayReadyFallback();
|
|
}
|
|
|
|
private handleGatewaySocketClosed(socket: WebSocket, code: number): void {
|
|
if (this.socket !== socket) {
|
|
return;
|
|
}
|
|
|
|
if (this.stopping) {
|
|
return;
|
|
}
|
|
|
|
this.socket = null;
|
|
this.connectionMonitor.clear();
|
|
this.clearGatewayReadyFallback();
|
|
this.recordSocketClose(code);
|
|
this.diagnostics.consecutiveHeartbeatMisses = 0;
|
|
this.deltaSnapshots.clear();
|
|
this.lastError = `OpenClaw Gateway socket closed (code=${code})`;
|
|
clearPendingGatewayRequests(
|
|
this.pendingRequests,
|
|
new Error(this.lastError),
|
|
);
|
|
const wasRunning = this.lifecycleState === 'running';
|
|
this.setGatewayState({
|
|
state: 'stopped',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? undefined,
|
|
error: this.lastError,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
gatewayReady: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
});
|
|
logManager.warn(this.lastError);
|
|
|
|
if (wasRunning && (process.platform !== 'win32' || code === 1012)) {
|
|
this.scheduleReconnect();
|
|
}
|
|
}
|
|
|
|
private handleGatewayFrame(frame: unknown): void {
|
|
this.connectionMonitor.markAlive('message');
|
|
this.recordGatewayAlive();
|
|
|
|
if (!isRecord(frame)) {
|
|
return;
|
|
}
|
|
|
|
if (frame.type === 'res' && typeof frame.id === 'string') {
|
|
const response = frame as GatewayResponseFrame;
|
|
if (response.ok === false) {
|
|
rejectPendingGatewayRequest(
|
|
this.pendingRequests,
|
|
response.id!,
|
|
buildGatewayRpcError(response.error, `Gateway RPC failed: ${response.id}`),
|
|
);
|
|
return;
|
|
}
|
|
|
|
resolvePendingGatewayRequest(
|
|
this.pendingRequests,
|
|
response.id!,
|
|
response.payload,
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (frame.type === 'event' && typeof frame.event === 'string') {
|
|
const eventFrame = frame as GatewayEventFrame;
|
|
if (eventFrame.event === 'chat' && isRecord(eventFrame.payload)) {
|
|
this.handleChatEvent(eventFrame.payload);
|
|
}
|
|
dispatchProtocolEvent(this, eventFrame.event, eventFrame.payload);
|
|
return;
|
|
}
|
|
|
|
if (isResponse(frame) && frame.id && this.pendingRequests.has(String(frame.id))) {
|
|
if (frame.error) {
|
|
rejectPendingGatewayRequest(
|
|
this.pendingRequests,
|
|
String(frame.id),
|
|
buildGatewayRpcError(frame.error, `Gateway RPC failed: ${String(frame.id)}`),
|
|
);
|
|
} else {
|
|
resolvePendingGatewayRequest(this.pendingRequests, String(frame.id), frame.result);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (isNotification(frame)) {
|
|
dispatchJsonRpcNotification(this, frame as JsonRpcNotification);
|
|
return;
|
|
}
|
|
|
|
this.emit('message', frame);
|
|
}
|
|
|
|
private handleChatEvent(payload: Record<string, unknown>): void {
|
|
const sessionKey = typeof payload.sessionKey === 'string'
|
|
? normalizeAgentSessionKey(payload.sessionKey)
|
|
: '';
|
|
const runId = typeof payload.runId === 'string' ? payload.runId : '';
|
|
const state = typeof payload.state === 'string' ? payload.state : '';
|
|
|
|
if (!sessionKey || !runId || !state) {
|
|
return;
|
|
}
|
|
|
|
switch (state) {
|
|
case 'delta': {
|
|
const nextSnapshot = extractTextFromGatewayPayload(payload);
|
|
if (!nextSnapshot) {
|
|
return;
|
|
}
|
|
|
|
const previousSnapshot = this.deltaSnapshots.get(runId) ?? '';
|
|
const delta = nextSnapshot.startsWith(previousSnapshot)
|
|
? nextSnapshot.slice(previousSnapshot.length)
|
|
: nextSnapshot;
|
|
|
|
this.deltaSnapshots.set(runId, nextSnapshot);
|
|
|
|
if (delta) {
|
|
this.broadcast({
|
|
type: 'chat:delta',
|
|
sessionKey,
|
|
runId,
|
|
delta,
|
|
});
|
|
}
|
|
break;
|
|
}
|
|
case 'final': {
|
|
const snapshotText = this.deltaSnapshots.get(runId) ?? '';
|
|
this.deltaSnapshots.delete(runId);
|
|
|
|
const message = normalizeGatewayRawMessage(payload.message, {
|
|
preferContentBlocks: true,
|
|
}) ?? (
|
|
snapshotText
|
|
? {
|
|
role: 'assistant',
|
|
content: toTextContentBlocks(snapshotText),
|
|
timestamp: Date.now(),
|
|
}
|
|
: null
|
|
);
|
|
|
|
if (!message) {
|
|
return;
|
|
}
|
|
|
|
this.broadcast({
|
|
type: 'chat:final',
|
|
sessionKey,
|
|
runId,
|
|
message,
|
|
});
|
|
break;
|
|
}
|
|
case 'error': {
|
|
this.deltaSnapshots.delete(runId);
|
|
this.broadcast({
|
|
type: 'chat:error',
|
|
sessionKey,
|
|
runId,
|
|
error: typeof payload.errorMessage === 'string'
|
|
? payload.errorMessage
|
|
: 'Gateway chat error',
|
|
});
|
|
break;
|
|
}
|
|
case 'aborted': {
|
|
this.deltaSnapshots.delete(runId);
|
|
this.broadcast({
|
|
type: 'chat:aborted',
|
|
sessionKey,
|
|
runId,
|
|
});
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
private async sendGatewayRequest(
|
|
method: string,
|
|
params?: Record<string, unknown>,
|
|
options?: {
|
|
timeoutMs?: number | null;
|
|
skipStart?: boolean;
|
|
},
|
|
): Promise<unknown> {
|
|
if (!options?.skipStart) {
|
|
await this.start();
|
|
}
|
|
|
|
const socket = this.socket;
|
|
if (!socket || socket.readyState !== WebSocket.OPEN) {
|
|
throw new Error('OpenClaw Gateway socket is not connected');
|
|
}
|
|
|
|
const requestId = `${method}-${randomUUID()}`;
|
|
const timeoutMs = options?.timeoutMs ?? 30_000;
|
|
|
|
return await new Promise<unknown>((resolve, reject) => {
|
|
const timeout = timeoutMs === null
|
|
? null
|
|
: setTimeout(() => {
|
|
rejectPendingGatewayRequest(
|
|
this.pendingRequests,
|
|
requestId,
|
|
new Error(`Gateway RPC timed out: ${method}`),
|
|
);
|
|
}, timeoutMs);
|
|
|
|
this.pendingRequests.set(requestId, {
|
|
resolve,
|
|
reject,
|
|
timeout,
|
|
});
|
|
|
|
try {
|
|
socket.send(JSON.stringify({
|
|
type: 'req',
|
|
id: requestId,
|
|
method,
|
|
params,
|
|
}));
|
|
} catch (error) {
|
|
rejectPendingGatewayRequest(
|
|
this.pendingRequests,
|
|
requestId,
|
|
new Error(`Failed to send Gateway RPC ${method}: ${toErrorMessage(error)}`),
|
|
);
|
|
}
|
|
}).then((result) => {
|
|
this.recordRpcSuccess();
|
|
return result;
|
|
}).catch((error) => {
|
|
if (isTransportRpcFailure(error)) {
|
|
this.recordRpcFailure(method);
|
|
}
|
|
throw error;
|
|
});
|
|
}
|
|
|
|
private async rpcGateway(
|
|
method: string,
|
|
params: Record<string, unknown>,
|
|
options?: { timeoutMs?: number | null },
|
|
): Promise<unknown> {
|
|
return await this.sendGatewayRequest(method, params, options);
|
|
}
|
|
|
|
private async requestGatewayShutdown(timeoutMs = 5_000): Promise<void> {
|
|
await this.sendGatewayRequest(
|
|
'shutdown',
|
|
undefined,
|
|
{
|
|
timeoutMs,
|
|
skipStart: true,
|
|
},
|
|
);
|
|
}
|
|
|
|
private startHealthCheck(): void {
|
|
this.connectionMonitor.startHealthCheck({
|
|
shouldCheck: () => this.lifecycleState === 'running',
|
|
checkHealth: () => this.checkHealth(),
|
|
onUnhealthy: (errorMessage) => {
|
|
logManager.warn(`Gateway health check failed: ${errorMessage}`);
|
|
this.emit('error', new Error(errorMessage));
|
|
},
|
|
onError: () => {
|
|
// The monitor already logged the error.
|
|
},
|
|
});
|
|
}
|
|
|
|
private startPing(): void {
|
|
const isWindows = process.platform === 'win32';
|
|
this.connectionMonitor.startPing({
|
|
intervalMs: isWindows
|
|
? GatewayManager.HEARTBEAT_INTERVAL_MS_WIN
|
|
: GatewayManager.HEARTBEAT_INTERVAL_MS,
|
|
timeoutMs: isWindows
|
|
? GatewayManager.HEARTBEAT_TIMEOUT_MS_WIN
|
|
: GatewayManager.HEARTBEAT_TIMEOUT_MS,
|
|
maxConsecutiveMisses: isWindows
|
|
? GatewayManager.HEARTBEAT_MAX_MISSES_WIN
|
|
: GatewayManager.HEARTBEAT_MAX_MISSES,
|
|
sendPing: () => {
|
|
if (this.socket?.readyState === WebSocket.OPEN) {
|
|
this.socket.ping();
|
|
}
|
|
},
|
|
onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => {
|
|
this.recordHeartbeatTimeout(consecutiveMisses);
|
|
logManager.warn(
|
|
`Gateway heartbeat: ${consecutiveMisses} consecutive pong misses (timeout=${timeoutMs}ms, state=${this.lifecycleState}, autoReconnect=${this.shouldReconnect}).`,
|
|
);
|
|
if (!this.shouldReconnect || this.lifecycleState !== 'running') {
|
|
return;
|
|
}
|
|
if (process.platform === 'win32') {
|
|
logManager.warn('Gateway heartbeat recovery skipped on Windows; waiting for process exit or socket close');
|
|
return;
|
|
}
|
|
void this.restart().catch((error) => {
|
|
logManager.warn('Gateway heartbeat recovery failed:', error);
|
|
});
|
|
},
|
|
});
|
|
}
|
|
|
|
private async refreshReloadPolicy(force = false): Promise<void> {
|
|
const now = Date.now();
|
|
if (!force && now - this.reloadPolicyLoadedAt < GatewayManager.RELOAD_POLICY_REFRESH_MS) {
|
|
return;
|
|
}
|
|
|
|
if (this.reloadPolicyRefreshPromise) {
|
|
await this.reloadPolicyRefreshPromise;
|
|
return;
|
|
}
|
|
|
|
this.reloadPolicyRefreshPromise = (async () => {
|
|
const nextPolicy = await loadGatewayReloadPolicy();
|
|
this.reloadPolicy = nextPolicy;
|
|
this.reloadPolicyLoadedAt = Date.now();
|
|
})();
|
|
|
|
try {
|
|
await this.reloadPolicyRefreshPromise;
|
|
} finally {
|
|
this.reloadPolicyRefreshPromise = null;
|
|
}
|
|
}
|
|
|
|
private clearAllTimers(): void {
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = null;
|
|
}
|
|
this.connectionMonitor.clear();
|
|
this.restartController.clearDebounceTimer();
|
|
if (this.reloadDebounceTimer) {
|
|
clearTimeout(this.reloadDebounceTimer);
|
|
this.reloadDebounceTimer = null;
|
|
}
|
|
this.clearGatewayReadyFallback();
|
|
}
|
|
|
|
private clearGatewayReadyFallback(): void {
|
|
if (this.gatewayReadyFallbackTimer) {
|
|
clearTimeout(this.gatewayReadyFallbackTimer);
|
|
this.gatewayReadyFallbackTimer = null;
|
|
}
|
|
}
|
|
|
|
private scheduleGatewayReadyFallback(): void {
|
|
this.clearGatewayReadyFallback();
|
|
this.gatewayReadyFallbackTimer = setTimeout(() => {
|
|
this.gatewayReadyFallbackTimer = null;
|
|
if (this.lifecycleState === 'running' && !this.gatewayStatusDetails.gatewayReady) {
|
|
logManager.info('Gateway ready fallback triggered (no gateway.ready event within timeout)');
|
|
this.setGatewayState({ gatewayReady: true });
|
|
}
|
|
}, GatewayManager.GATEWAY_READY_FALLBACK_MS);
|
|
}
|
|
|
|
getDiagnostics(): GatewayDiagnosticsSnapshot {
|
|
return { ...this.diagnostics };
|
|
}
|
|
|
|
private recordGatewayAlive(): void {
|
|
this.diagnostics.lastAliveAt = Date.now();
|
|
this.diagnostics.consecutiveHeartbeatMisses = 0;
|
|
}
|
|
|
|
private recordRpcSuccess(): void {
|
|
this.diagnostics.lastRpcSuccessAt = Date.now();
|
|
this.diagnostics.consecutiveRpcFailures = 0;
|
|
}
|
|
|
|
private recordRpcFailure(method: string): void {
|
|
this.diagnostics.lastRpcFailureAt = Date.now();
|
|
this.diagnostics.lastRpcFailureMethod = method;
|
|
this.diagnostics.consecutiveRpcFailures += 1;
|
|
}
|
|
|
|
private recordHeartbeatTimeout(consecutiveMisses: number): void {
|
|
this.diagnostics.lastHeartbeatTimeoutAt = Date.now();
|
|
this.diagnostics.consecutiveHeartbeatMisses = consecutiveMisses;
|
|
}
|
|
|
|
private recordSocketClose(code: number): void {
|
|
this.diagnostics.lastSocketCloseAt = Date.now();
|
|
this.diagnostics.lastSocketCloseCode = code;
|
|
}
|
|
|
|
private emitReconnectMetric(
|
|
outcome: 'success' | 'failure',
|
|
payload: {
|
|
attemptNo: number;
|
|
maxAttempts: number;
|
|
delayMs: number;
|
|
error?: string;
|
|
},
|
|
): void {
|
|
const successRate = this.reconnectAttemptsTotal > 0
|
|
? this.reconnectSuccessTotal / this.reconnectAttemptsTotal
|
|
: 0;
|
|
|
|
trackMetric('gateway.reconnect', {
|
|
outcome,
|
|
attemptNo: payload.attemptNo,
|
|
maxAttempts: payload.maxAttempts,
|
|
delayMs: payload.delayMs,
|
|
gateway_reconnect_success_count: this.reconnectSuccessTotal,
|
|
gateway_reconnect_attempt_count: this.reconnectAttemptsTotal,
|
|
gateway_reconnect_success_rate: Number(successRate.toFixed(4)),
|
|
...(payload.error ? { error: payload.error } : {}),
|
|
});
|
|
}
|
|
|
|
private scheduleReconnect(): void {
|
|
const decision = getReconnectScheduleDecision({
|
|
shouldReconnect: this.shouldReconnect,
|
|
hasReconnectTimer: this.reconnectTimer !== null,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
maxAttempts: this.reconnectConfig.maxAttempts,
|
|
baseDelay: this.reconnectConfig.baseDelay,
|
|
maxDelay: this.reconnectConfig.maxDelay,
|
|
});
|
|
|
|
if (decision.action === 'skip') {
|
|
return;
|
|
}
|
|
|
|
if (decision.action === 'already-scheduled') {
|
|
return;
|
|
}
|
|
|
|
if (decision.action === 'fail') {
|
|
this.lastError = 'Failed to reconnect after maximum attempts';
|
|
this.setGatewayState({
|
|
state: 'error',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: this.lastError,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
gatewayReady: undefined,
|
|
});
|
|
return;
|
|
}
|
|
|
|
const { delay, nextAttempt, maxAttempts } = decision;
|
|
this.reconnectAttempts = nextAttempt;
|
|
this.setGatewayState({
|
|
state: 'reconnecting',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: this.lastError,
|
|
reconnectAttempts: nextAttempt,
|
|
gatewayReady: false,
|
|
});
|
|
const scheduledEpoch = this.lifecycleController.getCurrentEpoch();
|
|
|
|
this.reconnectTimer = setTimeout(async () => {
|
|
this.reconnectTimer = null;
|
|
const skipReason = getReconnectSkipReason({
|
|
scheduledEpoch,
|
|
currentEpoch: this.lifecycleController.getCurrentEpoch(),
|
|
shouldReconnect: this.shouldReconnect,
|
|
});
|
|
if (skipReason) {
|
|
return;
|
|
}
|
|
|
|
const attemptNo = this.reconnectAttempts;
|
|
this.reconnectAttemptsTotal += 1;
|
|
try {
|
|
this.isAutoReconnectStart = true;
|
|
await this.start();
|
|
this.reconnectSuccessTotal += 1;
|
|
this.emitReconnectMetric('success', {
|
|
attemptNo,
|
|
maxAttempts,
|
|
delayMs: delay,
|
|
});
|
|
this.reconnectAttempts = 0;
|
|
} catch (error) {
|
|
logManager.error(
|
|
`Gateway reconnection attempt ${nextAttempt}/${maxAttempts} failed:`,
|
|
error,
|
|
);
|
|
this.emitReconnectMetric('failure', {
|
|
attemptNo,
|
|
maxAttempts,
|
|
delayMs: delay,
|
|
error: toErrorMessage(error),
|
|
});
|
|
this.scheduleReconnect();
|
|
}
|
|
}, delay);
|
|
}
|
|
|
|
debouncedRestart(delayMs = 2000, options?: RuntimeChangeBroadcast): void {
|
|
this.queueRuntimeChange(options);
|
|
this.restartController.debouncedRestart(delayMs, () => {
|
|
void this.restart().catch((error) => {
|
|
logManager.warn('Debounced Gateway restart failed:', error);
|
|
this.flushQueuedRuntimeChangeFailure('Gateway restart failed', error);
|
|
});
|
|
});
|
|
}
|
|
|
|
async reload(options?: RuntimeChangeBroadcast): Promise<void> {
|
|
this.queueRuntimeChange(options);
|
|
await this.refreshReloadPolicy();
|
|
|
|
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
|
|
await this.restart();
|
|
return;
|
|
}
|
|
|
|
if (this.restartController.isRestartDeferred({
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
})) {
|
|
this.restartController.markDeferredRestart('reload', {
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (!this.child?.pid || this.lifecycleState !== 'running') {
|
|
await this.restart();
|
|
return;
|
|
}
|
|
|
|
if (process.platform === 'win32') {
|
|
await this.restart();
|
|
return;
|
|
}
|
|
|
|
try {
|
|
process.kill(this.child.pid, 'SIGUSR1');
|
|
await new Promise((resolve) => setTimeout(resolve, 1500));
|
|
|
|
if (this.lifecycleState !== 'running' || !this.child?.pid) {
|
|
await this.restart();
|
|
return;
|
|
}
|
|
|
|
const runtimeChange = this.takeQueuedRuntimeChange();
|
|
if (runtimeChange) {
|
|
this.notifyRuntimeChanged(runtimeChange);
|
|
}
|
|
} catch (error) {
|
|
logManager.warn('Gateway reload signal failed, falling back to restart:', error);
|
|
await this.restart();
|
|
}
|
|
}
|
|
|
|
debouncedReload(delayMs?: number, options?: RuntimeChangeBroadcast): void {
|
|
this.queueRuntimeChange(options);
|
|
void this.refreshReloadPolicy();
|
|
|
|
const effectiveDelay = delayMs ?? this.reloadPolicy.debounceMs;
|
|
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
|
|
this.debouncedRestart(effectiveDelay);
|
|
return;
|
|
}
|
|
|
|
if (this.reloadDebounceTimer) {
|
|
clearTimeout(this.reloadDebounceTimer);
|
|
}
|
|
|
|
this.reloadDebounceTimer = setTimeout(() => {
|
|
this.reloadDebounceTimer = null;
|
|
void this.reload().catch((error) => {
|
|
logManager.warn('Debounced Gateway reload failed:', error);
|
|
this.flushQueuedRuntimeChangeFailure('Gateway reload failed', error);
|
|
});
|
|
}, effectiveDelay);
|
|
}
|
|
|
|
async init(): Promise<void> {
|
|
if (this.initialized) {
|
|
return;
|
|
}
|
|
|
|
if (this.initPromise) {
|
|
return await this.initPromise;
|
|
}
|
|
|
|
this.initPromise = (async () => {
|
|
this.initialized = true;
|
|
logManager.info('GatewayManager initialized in OpenClaw mode');
|
|
|
|
const autoStart = Boolean(configManager.get(CONFIG_KEYS.GATEWAY_AUTO_START));
|
|
if (!autoStart) {
|
|
this.setGatewayState({
|
|
state: 'stopped',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
gatewayReady: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
});
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await this.start();
|
|
} catch (error) {
|
|
this.lastError = toErrorMessage(error);
|
|
this.setGatewayState({
|
|
state: 'stopped',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: this.lastError,
|
|
gatewayReady: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
});
|
|
logManager.error('Failed to auto-start OpenClaw Gateway:', error);
|
|
}
|
|
})();
|
|
|
|
try {
|
|
await this.initPromise;
|
|
} finally {
|
|
this.initPromise = null;
|
|
}
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
if (this.startLock) {
|
|
if (this.startPromise) {
|
|
return await this.startPromise;
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (this.lifecycleState === 'running' && this.socket?.readyState === WebSocket.OPEN) {
|
|
return;
|
|
}
|
|
|
|
if (this.startPromise) {
|
|
return await this.startPromise;
|
|
}
|
|
|
|
this.initialized = true;
|
|
this.startPromise = (async () => {
|
|
if (this.stopPromise) {
|
|
await this.stopPromise;
|
|
}
|
|
|
|
this.startLock = true;
|
|
const startEpoch = this.lifecycleController.bump('start');
|
|
this.stopping = false;
|
|
this.lastError = undefined;
|
|
this.shouldReconnect = true;
|
|
await this.refreshReloadPolicy(true);
|
|
|
|
try {
|
|
await this.initDeviceIdentity();
|
|
await this.processOwner.prepare();
|
|
const runtimeStatus = this.processOwner.getStatus();
|
|
if (!runtimeStatus.entryExists) {
|
|
throw new Error(runtimeStatus.lastError || `OpenClaw entry not found at ${runtimeStatus.runtimePaths.entryPath}`);
|
|
}
|
|
|
|
if (this.reconnectTimer) {
|
|
clearTimeout(this.reconnectTimer);
|
|
this.reconnectTimer = null;
|
|
}
|
|
|
|
const wasAutoReconnectStart = this.isAutoReconnectStart;
|
|
if (!wasAutoReconnectStart) {
|
|
this.reconnectAttempts = 0;
|
|
}
|
|
this.isAutoReconnectStart = false;
|
|
|
|
const reusingManagedProcess =
|
|
this.child?.pid != null &&
|
|
this.ownsProcess &&
|
|
this.port != null &&
|
|
this.socket === null;
|
|
const shouldDisposeTransport = !reusingManagedProcess && (this.socket !== null || this.child !== null);
|
|
if (shouldDisposeTransport) {
|
|
await this.disposeTransport('starting OpenClaw Gateway');
|
|
}
|
|
|
|
if (this.port === null) {
|
|
this.port = await findAvailablePort();
|
|
}
|
|
this.exitCode = null;
|
|
if (!reusingManagedProcess) {
|
|
this.gatewayToken = randomUUID();
|
|
}
|
|
this.setGatewayState({
|
|
state: wasAutoReconnectStart ? 'reconnecting' : 'starting',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: undefined,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
gatewayReady: false,
|
|
});
|
|
|
|
warmupManagedPythonReadiness();
|
|
|
|
await runGatewayStartupSequence({
|
|
port: this.port,
|
|
shouldWaitForPortFree: process.platform === 'win32',
|
|
hasOwnedProcess: () => this.child?.pid != null && this.ownsProcess,
|
|
assertLifecycle: (phase) => {
|
|
this.lifecycleController.assert(startEpoch, phase);
|
|
},
|
|
resetStartupStderrLines: () => {
|
|
this.recentStartupStderrLines = [];
|
|
},
|
|
getStartupStderrLines: () => this.recentStartupStderrLines,
|
|
findExistingGateway: async (port) => {
|
|
return await findExistingGatewayProcess({ port, ownedPid: this.child?.pid ?? null });
|
|
},
|
|
connect: async (port, externalToken) => {
|
|
await this.connectToGateway(port, externalToken);
|
|
},
|
|
onConnectedToExistingGateway: () => {
|
|
const isOwnProcess = this.child?.pid != null && this.ownsProcess;
|
|
if (!isOwnProcess) {
|
|
this.ownsProcess = false;
|
|
this.externalShutdownSupported = null;
|
|
this.setGatewayState({ pid: undefined });
|
|
}
|
|
if (isOwnProcess) {
|
|
this.restartController.recordRestartCompleted();
|
|
}
|
|
},
|
|
waitForPortFree: async (port) => {
|
|
await waitForPortFree(port);
|
|
},
|
|
startProcess: async () => {
|
|
await this.spawnGatewayProcess();
|
|
},
|
|
waitForReady: async (port) => {
|
|
await waitForGatewayReady({
|
|
port,
|
|
getProcessExitCode: () => this.exitCode,
|
|
});
|
|
},
|
|
onConnectedToManagedGateway: () => {
|
|
logManager.info('OpenClaw Gateway startup sequence completed', {
|
|
port: this.port,
|
|
pid: this.child?.pid,
|
|
spawn: this.lastSpawnSummary,
|
|
});
|
|
},
|
|
runDoctorRepair: async () => await runOpenClawDoctorRepair(),
|
|
onDoctorRepairSuccess: () => {
|
|
this.lastError = undefined;
|
|
this.setGatewayState({
|
|
state: 'starting',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
gatewayReady: false,
|
|
});
|
|
},
|
|
delay: async (ms) => {
|
|
await new Promise((resolve) => setTimeout(resolve, ms));
|
|
},
|
|
});
|
|
|
|
this.lastError = undefined;
|
|
this.reconnectAttempts = 0;
|
|
this.setGatewayState({
|
|
state: 'running',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: undefined,
|
|
reconnectAttempts: 0,
|
|
});
|
|
logManager.info('OpenClaw Gateway connected', {
|
|
port: this.port,
|
|
pid: this.child?.pid,
|
|
spawn: this.lastSpawnSummary,
|
|
});
|
|
} catch (error) {
|
|
if (error instanceof LifecycleSupersededError) {
|
|
logManager.debug(error.message);
|
|
return;
|
|
}
|
|
|
|
this.lastError = toErrorMessage(error);
|
|
await this.disposeTransport('failed OpenClaw Gateway start');
|
|
this.setGatewayState({
|
|
state: 'error',
|
|
port: this.port,
|
|
pid: undefined,
|
|
error: this.lastError,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
gatewayReady: undefined,
|
|
});
|
|
throw error;
|
|
} finally {
|
|
this.startLock = false;
|
|
this.restartController.flushDeferredRestart(
|
|
'start:finally',
|
|
{
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
shouldReconnect: this.shouldReconnect,
|
|
},
|
|
() => {
|
|
void this.restart().catch((error) => {
|
|
logManager.warn('Deferred Gateway restart failed:', error);
|
|
});
|
|
},
|
|
);
|
|
}
|
|
})();
|
|
|
|
try {
|
|
await this.startPromise;
|
|
} finally {
|
|
this.startPromise = null;
|
|
}
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
if (this.stopPromise) {
|
|
return await this.stopPromise;
|
|
}
|
|
|
|
this.stopPromise = (async () => {
|
|
this.lifecycleController.bump('stop');
|
|
this.shouldReconnect = false;
|
|
this.stopping = true;
|
|
this.clearAllTimers();
|
|
|
|
if (!this.ownsProcess && this.socket?.readyState === WebSocket.OPEN && this.externalShutdownSupported !== false) {
|
|
try {
|
|
await this.requestGatewayShutdown();
|
|
this.externalShutdownSupported = true;
|
|
} catch (error) {
|
|
if (this.isUnsupportedShutdownError(error)) {
|
|
this.externalShutdownSupported = false;
|
|
logManager.info('External Gateway does not support "shutdown"; skipping shutdown RPC for future stops');
|
|
} else {
|
|
logManager.warn('Failed to request shutdown for externally managed Gateway:', error);
|
|
}
|
|
}
|
|
}
|
|
|
|
await this.disposeTransport('stopping OpenClaw Gateway');
|
|
this.restartController.resetDeferredRestart();
|
|
this.isAutoReconnectStart = false;
|
|
this.reconnectAttempts = 0;
|
|
this.lastError = undefined;
|
|
this.diagnostics.consecutiveHeartbeatMisses = 0;
|
|
this.setGatewayState({
|
|
state: 'stopped',
|
|
port: this.port,
|
|
pid: undefined,
|
|
error: undefined,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
reconnectAttempts: 0,
|
|
gatewayReady: undefined,
|
|
});
|
|
this.stopping = false;
|
|
})();
|
|
|
|
try {
|
|
await this.stopPromise;
|
|
} finally {
|
|
this.stopPromise = null;
|
|
}
|
|
}
|
|
|
|
async forceTerminateOwnedProcessForQuit(): Promise<boolean> {
|
|
if (!this.child || !this.ownsProcess) {
|
|
return false;
|
|
}
|
|
|
|
if (this.socket) {
|
|
try {
|
|
this.socket.terminate();
|
|
} catch {
|
|
// ignore quit-time socket termination errors
|
|
}
|
|
this.socket = null;
|
|
}
|
|
this.clearGatewayReadyFallback();
|
|
this.connectionMonitor.clear();
|
|
clearPendingGatewayRequests(
|
|
this.pendingRequests,
|
|
new Error('Gateway terminated during app quit'),
|
|
);
|
|
|
|
const child = this.child;
|
|
await this.terminateChild(child);
|
|
if (this.child === child) {
|
|
this.child = null;
|
|
}
|
|
this.ownsProcess = false;
|
|
this.setGatewayState({
|
|
pid: undefined,
|
|
connectedAt: undefined,
|
|
uptime: undefined,
|
|
gatewayReady: undefined,
|
|
});
|
|
return true;
|
|
}
|
|
|
|
async restart(options?: RuntimeChangeBroadcast): Promise<void> {
|
|
this.queueRuntimeChange(options);
|
|
|
|
if (this.restartController.isRestartDeferred({
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
})) {
|
|
this.restartController.markDeferredRestart('restart', {
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (this.restartInFlight) {
|
|
await this.restartInFlight;
|
|
return;
|
|
}
|
|
|
|
const decision = this.restartGovernor.decide();
|
|
if (!decision.allow) {
|
|
const observability = this.restartGovernor.getObservability();
|
|
logManager.warn(
|
|
`[gateway-restart-governor] restart suppressed reason=${decision.reason} retryAfterMs=${decision.retryAfterMs} suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`,
|
|
);
|
|
const props = {
|
|
reason: decision.reason,
|
|
retry_after_ms: decision.retryAfterMs,
|
|
gateway_restart_suppressed_total: observability.suppressed_total,
|
|
gateway_restart_executed_total: observability.executed_total,
|
|
gateway_restart_circuit_open_until: observability.circuit_open_until,
|
|
};
|
|
trackMetric('gateway.restart.suppressed', props);
|
|
captureTelemetryEvent('gateway_restart_suppressed', props);
|
|
return;
|
|
}
|
|
|
|
const pidBefore = this.gatewayStatusDetails.pid ?? this.child?.pid;
|
|
this.restartInFlight = (async () => {
|
|
this.setGatewayState({
|
|
state: 'reconnecting',
|
|
port: this.port,
|
|
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
|
|
error: this.lastError,
|
|
reconnectAttempts: this.reconnectAttempts,
|
|
gatewayReady: false,
|
|
});
|
|
await this.stop();
|
|
try {
|
|
await this.start();
|
|
} catch (error) {
|
|
logManager.warn('Gateway restart: start() failed after stop(), enabling auto-reconnect recovery', error);
|
|
this.shouldReconnect = true;
|
|
this.scheduleReconnect();
|
|
throw error;
|
|
}
|
|
})();
|
|
|
|
try {
|
|
await this.restartInFlight;
|
|
this.restartGovernor.recordExecuted();
|
|
this.restartController.recordRestartCompleted();
|
|
const observability = this.restartGovernor.getObservability();
|
|
const props = {
|
|
gateway_restart_executed_total: observability.executed_total,
|
|
gateway_restart_suppressed_total: observability.suppressed_total,
|
|
gateway_restart_circuit_open_until: observability.circuit_open_until,
|
|
pid_before: pidBefore ?? 'n/a',
|
|
pid_after: this.gatewayStatusDetails.pid ?? this.child?.pid ?? 'n/a',
|
|
};
|
|
trackMetric('gateway.restart.executed', props);
|
|
captureTelemetryEvent('gateway_restart_executed', props);
|
|
const runtimeChange = this.takeQueuedRuntimeChange();
|
|
if (runtimeChange) {
|
|
this.notifyRuntimeChanged(runtimeChange);
|
|
}
|
|
} finally {
|
|
this.restartInFlight = null;
|
|
this.restartController.flushDeferredRestart(
|
|
'restart:finally',
|
|
{
|
|
state: this.lifecycleState,
|
|
startLock: this.startLock,
|
|
shouldReconnect: this.shouldReconnect,
|
|
},
|
|
() => {
|
|
void this.restart().catch((error) => {
|
|
logManager.warn('Deferred Gateway restart failed:', error);
|
|
});
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
getStatus(): {
|
|
status: GatewayStatus;
|
|
initialized: boolean;
|
|
mode: 'openclaw';
|
|
port: number | null;
|
|
pid: number | null;
|
|
lastError?: string;
|
|
lifecycleState: GatewayLifecycleState;
|
|
gatewayReady?: boolean;
|
|
connectedAt?: number;
|
|
uptime?: number;
|
|
reconnectAttempts?: number;
|
|
diagnostics: GatewayDiagnosticsSnapshot;
|
|
runtime: ReturnType<OpenClawProcessOwner['getStatus']>;
|
|
} {
|
|
const runtimeStatus = this.stateController.getStatus();
|
|
const connectedAt = runtimeStatus.connectedAt;
|
|
const uptime = runtimeStatus.state === 'running' && connectedAt
|
|
? Date.now() - connectedAt
|
|
: undefined;
|
|
|
|
return {
|
|
status: this.status,
|
|
initialized: this.initialized,
|
|
mode: this.mode,
|
|
port: runtimeStatus.port ?? this.port,
|
|
pid: runtimeStatus.pid ?? this.child?.pid ?? null,
|
|
lastError: this.lastError,
|
|
lifecycleState: runtimeStatus.state,
|
|
gatewayReady: runtimeStatus.gatewayReady,
|
|
connectedAt,
|
|
uptime,
|
|
reconnectAttempts: runtimeStatus.reconnectAttempts,
|
|
diagnostics: this.getDiagnostics(),
|
|
runtime: this.processOwner.getStatus(),
|
|
};
|
|
}
|
|
|
|
async checkHealth(): Promise<{
|
|
ok: boolean;
|
|
status: GatewayStatus;
|
|
initialized: boolean;
|
|
mode: 'openclaw';
|
|
port: number | null;
|
|
pid: number | null;
|
|
lastError?: string;
|
|
lifecycleState: GatewayLifecycleState;
|
|
gatewayReady?: boolean;
|
|
connectedAt?: number;
|
|
uptime?: number;
|
|
reconnectAttempts?: number;
|
|
diagnostics: GatewayDiagnosticsSnapshot;
|
|
runtime: ReturnType<OpenClawProcessOwner['getStatus']>;
|
|
}> {
|
|
const status = this.getStatus();
|
|
|
|
return {
|
|
ok: status.initialized && status.status === 'connected' && this.socket?.readyState === WebSocket.OPEN,
|
|
...status,
|
|
};
|
|
}
|
|
|
|
async rpc(method: string, params: any): Promise<any> {
|
|
if (!this.initialized) {
|
|
await this.init();
|
|
}
|
|
|
|
const localDispatch = dispatchGatewayRpcMethod(
|
|
method,
|
|
params,
|
|
(event) => this.broadcast(event),
|
|
);
|
|
if (localDispatch.handled) {
|
|
return localDispatch.result;
|
|
}
|
|
|
|
switch (method) {
|
|
default:
|
|
throw new Error(`Unknown gateway RPC method: ${method}`);
|
|
}
|
|
}
|
|
|
|
broadcast(event: GatewayEvent): void {
|
|
const mainWindow = BrowserWindow.getAllWindows().find(
|
|
(win) => windowManager.getName(win) === 'main',
|
|
);
|
|
if (mainWindow && !mainWindow.isDestroyed()) {
|
|
mainWindow.webContents.send('gateway:event', event);
|
|
}
|
|
}
|
|
|
|
notifyRuntimeChanged(options: RuntimeChangeBroadcast): void {
|
|
const topics = Array.from(new Set(options.topics.filter(Boolean)));
|
|
if (topics.length === 0) {
|
|
return;
|
|
}
|
|
|
|
this.broadcast({
|
|
type: 'runtime:changed',
|
|
topics,
|
|
reason: options.reason,
|
|
warnings: options.warnings && options.warnings.length > 0 ? options.warnings : undefined,
|
|
channelType: options.channelType,
|
|
accountId: options.accountId,
|
|
syncedAt: new Date().toISOString(),
|
|
});
|
|
}
|
|
|
|
reloadProviders(options?: RuntimeChangeBroadcast): void {
|
|
const runtimeChange: RuntimeChangeBroadcast = {
|
|
topics: options?.topics ?? ['providers', 'models'],
|
|
reason: options?.reason ?? 'providers:reload',
|
|
warnings: options?.warnings,
|
|
channelType: options?.channelType,
|
|
accountId: options?.accountId,
|
|
};
|
|
|
|
if (this.initialized && (this.status === 'connected' || this.status === 'reconnecting')) {
|
|
this.debouncedReload(undefined, runtimeChange);
|
|
return;
|
|
}
|
|
|
|
this.notifyRuntimeChanged(runtimeChange);
|
|
}
|
|
}
|
|
|
|
export const gatewayManager = new GatewayManager();
|