Files
zn-ai/electron/gateway/manager.ts
duanshuwen df600272d6 feat: add tool status management and localization for skill installation
- Updated chat message types to include tool statuses.
- Enhanced localization files for English, Thai, and Chinese to support new tool status messages.
- Modified HomePage and SkillsPage components to handle tool statuses in chat messages.
- Implemented tool status merging and updating logic in the chat store.
- Added handling for tool status events in the gateway event processing.
- Created tests for chat message rendering with tool statuses and skill installation shortcuts.
- Improved gateway event dispatching for tool lifecycle events.
2026-04-23 20:27:54 +08:00

1935 lines
60 KiB
TypeScript

import { EventEmitter } from 'node:events';
import { createServer } from 'node:net';
import { join } from 'node:path';
import { BrowserWindow } from 'electron';
import { windowManager } from '@electron/service/window-service';
import logManager from '@electron/service/logger';
import configManager from '@electron/service/config-service';
import { updateTrayStatus } from '@electron/service/tray';
import { getUserDataDir } from '@electron/utils/paths';
import { captureTelemetryEvent, trackMetric } from '@electron/utils/telemetry';
import WebSocket from 'ws';
import {
loadOrCreateDeviceIdentity,
type DeviceIdentity,
} from '@electron/utils/device-identity';
import { CONFIG_KEYS } from '@runtime/lib/constants';
import { normalizeAgentSessionKey } from '@runtime/lib/models';
import type { ContentBlock, RawMessage } from '@runtime/shared/chat-model';
import type { GatewayEvent, GatewayRpcParams, RuntimeRefreshTopic } from './types';
import {
createInitialGatewayDiagnostics,
type GatewayDiagnosticsSnapshot,
} from './diagnostics';
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
import { OpenClawProcessOwner } from './openclaw-process-owner';
import { launchGatewayProcess } from './process-launcher';
import { prepareGatewayLaunchContext } from './config-sync';
import {
DEFAULT_RECONNECT_CONFIG,
type ReconnectConfig,
type GatewayLifecycleState,
getReconnectScheduleDecision,
getReconnectSkipReason,
} from './process-policy';
import { isNotification, isResponse, type JsonRpcNotification } from './protocol';
import {
clearPendingGatewayRequests,
rejectPendingGatewayRequest,
resolvePendingGatewayRequest,
type PendingGatewayRequest,
} from './request-store';
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
import { runGatewayStartupSequence } from './startup-orchestrator';
import {
findExistingGatewayProcess,
runOpenClawDoctorRepair,
terminateOwnedGatewayProcess,
unloadLaunchctlGatewayService,
waitForPortFree,
warmupManagedPythonReadiness,
} from './supervisor';
import type { GatewayProcessHandle } from './process-handle';
import { GatewayConnectionMonitor } from './connection-monitor';
import { GatewayLifecycleController, LifecycleSupersededError } from './lifecycle-controller';
import { GatewayRestartController } from './restart-controller';
import { GatewayRestartGovernor } from './restart-governor';
import {
DEFAULT_GATEWAY_RELOAD_POLICY,
loadGatewayReloadPolicy,
type GatewayReloadPolicy,
} from './reload-policy';
import { GatewayStateController, type GatewayRuntimeStatus } from './state';
import { connectGatewaySocket, waitForGatewayReady } from './ws-client';
import { dispatchGatewayRpcMethod } from './rpc-dispatch';
import { createRandomId } from './random-id';
type RuntimeChangeBroadcast = {
topics: RuntimeRefreshTopic[];
reason?: string;
warnings?: string[];
channelType?: string;
accountId?: string;
};
type GatewayStatus = 'connected' | 'disconnected' | 'reconnecting';
type GatewayResponseFrame = {
type?: string;
id?: string;
ok?: boolean;
payload?: unknown;
error?: unknown;
};
type GatewayEventFrame = {
type?: string;
event?: string;
payload?: unknown;
};
type GatewayNotificationEvent =
| JsonRpcNotification
| {
method: string;
params?: unknown;
};
type GatewayToolStatusNotification = {
status: 'running' | 'completed' | 'error';
payload?: unknown;
};
export interface GatewayManagerEvents {
status: (status: GatewayRuntimeStatus) => void;
message: (message: unknown) => void;
notification: (notification: GatewayNotificationEvent) => void;
exit: (code: number | null) => void;
error: (error: Error) => void;
'channel:status': (data: { channelId: string; status: string }) => void;
'chat:message': (data: { message: unknown }) => void;
'gateway:ready': (payload: unknown) => void;
'tool:status': (payload: GatewayToolStatusNotification) => void;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null;
}
function toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
function isTransportRpcFailure(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return message.includes('Gateway RPC timed out:')
|| message.includes('OpenClaw Gateway socket is not connected')
|| message.includes('Gateway request cancelled:')
|| message.includes('Failed to send Gateway RPC');
}
function normalizeTimestamp(value: unknown): number | undefined {
if (typeof value === 'number' && Number.isFinite(value)) {
return value;
}
if (typeof value === 'string') {
const parsed = Date.parse(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return undefined;
}
function normalizeMessageRole(value: unknown): RawMessage['role'] {
const normalized = typeof value === 'string' ? value.trim().toLowerCase() : '';
switch (normalized) {
case 'user':
case 'assistant':
case 'system':
case 'tool_result':
case 'toolresult':
return normalized === 'tool_result' ? 'tool_result' : (normalized as RawMessage['role']);
case 'tool':
return 'toolresult';
default:
return 'assistant';
}
}
function toTextContentBlocks(text: string): ContentBlock[] {
return text
? [{ type: 'text', text }]
: [];
}
function normalizeGatewayRawMessage(
value: unknown,
options?: { preferContentBlocks?: boolean },
): RawMessage | null {
if (!isRecord(value)) {
return null;
}
const rawContent = value.content;
let content: RawMessage['content'] = '';
if (typeof rawContent === 'string') {
content = options?.preferContentBlocks ? toTextContentBlocks(rawContent) : rawContent;
} else if (Array.isArray(rawContent)) {
content = rawContent as ContentBlock[];
} else if (typeof value.text === 'string') {
content = toTextContentBlocks(value.text);
}
return {
...(value as Partial<RawMessage>),
role: normalizeMessageRole(value.role),
content,
timestamp: normalizeTimestamp(value.timestamp),
};
}
function extractTextFromMessageContent(content: RawMessage['content'] | unknown): string {
if (typeof content === 'string') {
return content;
}
if (!Array.isArray(content)) {
return '';
}
return content
.map((block) => {
if (!isRecord(block)) {
return '';
}
if (typeof block.text === 'string') {
return block.text;
}
if (typeof block.thinking === 'string') {
return block.thinking;
}
if (typeof block.content === 'string') {
return block.content;
}
return '';
})
.filter(Boolean)
.join('\n');
}
function extractTextFromRawMessage(message: RawMessage): string {
return extractTextFromMessageContent(message.content);
}
function extractTextFromGatewayPayload(payload: Record<string, unknown>): string {
if (typeof payload.delta === 'string') {
return payload.delta;
}
if (typeof payload.text === 'string') {
return payload.text;
}
const normalizedMessage = normalizeGatewayRawMessage(payload.message);
if (normalizedMessage) {
return extractTextFromRawMessage(normalizedMessage);
}
return '';
}
function getRecordStringField(record: Record<string, unknown>, ...keys: string[]): string | undefined {
for (const key of keys) {
const value = record[key];
if (typeof value === 'string' && value.trim()) {
return value;
}
}
return undefined;
}
function getRecordNumberField(record: Record<string, unknown>, ...keys: string[]): number | undefined {
for (const key of keys) {
const value = record[key];
if (typeof value === 'number' && Number.isFinite(value)) {
return value;
}
}
return undefined;
}
function normalizeGatewayToolStatus(value: unknown): 'running' | 'completed' | 'error' {
if (typeof value !== 'string') {
return 'completed';
}
switch (value.trim().toLowerCase()) {
case 'running':
case 'completed':
case 'error':
return value.trim().toLowerCase() as 'running' | 'completed' | 'error';
case 'failed':
return 'error';
default:
return 'completed';
}
}
function normalizeToolStatusEvent(value: GatewayToolStatusNotification): GatewayEvent | null {
const payload = isRecord(value.payload) ? value.payload : {};
const sessionKey = getRecordStringField(payload, 'sessionKey', 'session_key');
const runId = getRecordStringField(payload, 'runId', 'run_id');
const toolName = getRecordStringField(payload, 'toolName', 'tool_name', 'name');
if (!sessionKey || !runId || !toolName) {
return null;
}
const toolCallId = getRecordStringField(payload, 'toolCallId', 'tool_call_id', 'id');
const summary = getRecordStringField(payload, 'summary', 'message');
const durationMs = getRecordNumberField(payload, 'durationMs', 'duration_ms');
const errorMessage = getRecordStringField(payload, 'error');
const status = errorMessage
? 'error'
: normalizeGatewayToolStatus(payload.status ?? value.status);
return {
type: 'tool:status',
sessionKey: normalizeAgentSessionKey(sessionKey),
runId,
toolCallId,
toolName,
status,
updatedAt: normalizeTimestamp(payload.timestamp) ?? Date.now(),
summary: errorMessage ?? summary,
durationMs,
input: payload.input ?? payload.arguments ?? payload.args,
result: payload.result,
};
}
function buildGatewayRpcError(error: unknown, fallback: string): Error {
if (typeof error === 'string' && error.trim()) {
return new Error(error);
}
if (isRecord(error) && typeof error.message === 'string' && error.message.trim()) {
return new Error(error.message);
}
return new Error(fallback);
}
async function findAvailablePort(): Promise<number> {
return await new Promise<number>((resolve, reject) => {
const server = createServer();
server.once('error', reject);
server.listen(0, '127.0.0.1', () => {
const address = server.address();
const port = typeof address === 'object' && address ? address.port : 0;
server.close((error) => {
if (error) {
reject(error);
return;
}
if (!port) {
reject(new Error('Failed to allocate an available Gateway port'));
return;
}
resolve(port);
});
});
});
}
export class GatewayManager extends EventEmitter {
private initialized = false;
private initPromise: Promise<void> | null = null;
private startPromise: Promise<void> | null = null;
private stopPromise: Promise<void> | null = null;
private status: GatewayStatus = 'disconnected';
private lifecycleState: GatewayLifecycleState = 'stopped';
private gatewayStatusDetails: GatewayRuntimeStatus = { state: 'stopped', port: null };
private readonly mode = 'openclaw' as const;
private readonly processOwner = new OpenClawProcessOwner();
private readonly pendingRequests = new Map<string, PendingGatewayRequest>();
private readonly deltaSnapshots = new Map<string, string>();
private gatewayToken = createRandomId();
private socket: WebSocket | null = null;
private child: GatewayProcessHandle | null = null;
private port: number | null = null;
private exitCode: number | null = null;
private lastError?: string;
private stopping = false;
private deviceIdentity: DeviceIdentity | null = null;
private ownsProcess = false;
private externalShutdownSupported: boolean | null = null;
private reconnectTimer: NodeJS.Timeout | null = null;
private reconnectAttempts = 0;
private reconnectAttemptsTotal = 0;
private reconnectSuccessTotal = 0;
private reconnectConfig: ReconnectConfig;
private shouldReconnect = true;
private startLock = false;
private restartInFlight: Promise<void> | null = null;
private readonly connectionMonitor = new GatewayConnectionMonitor();
private readonly lifecycleController = new GatewayLifecycleController();
private readonly restartController = new GatewayRestartController();
private readonly restartGovernor = new GatewayRestartGovernor();
private reloadDebounceTimer: NodeJS.Timeout | null = null;
private reloadPolicy: GatewayReloadPolicy = { ...DEFAULT_GATEWAY_RELOAD_POLICY };
private reloadPolicyLoadedAt = 0;
private reloadPolicyRefreshPromise: Promise<void> | null = null;
private isAutoReconnectStart = false;
private pendingRuntimeChange: RuntimeChangeBroadcast | null = null;
private lastSpawnSummary: string | null = null;
private recentStartupStderrLines: string[] = [];
private readonly stateController: GatewayStateController;
private gatewayReadyFallbackTimer: NodeJS.Timeout | null = null;
private diagnostics: GatewayDiagnosticsSnapshot = createInitialGatewayDiagnostics();
private static readonly RELOAD_POLICY_REFRESH_MS = 15_000;
private static readonly HEARTBEAT_INTERVAL_MS = 30_000;
private static readonly HEARTBEAT_TIMEOUT_MS = 12_000;
private static readonly HEARTBEAT_MAX_MISSES = 3;
private static readonly HEARTBEAT_INTERVAL_MS_WIN = 60_000;
private static readonly HEARTBEAT_TIMEOUT_MS_WIN = 25_000;
private static readonly HEARTBEAT_MAX_MISSES_WIN = 5;
private static readonly GATEWAY_READY_FALLBACK_MS = 30_000;
override on(eventName: 'status', listener: GatewayManagerEvents['status']): this;
override on(eventName: 'message', listener: GatewayManagerEvents['message']): this;
override on(eventName: 'notification', listener: GatewayManagerEvents['notification']): this;
override on(eventName: 'exit', listener: GatewayManagerEvents['exit']): this;
override on(eventName: 'error', listener: GatewayManagerEvents['error']): this;
override on(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this;
override on(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this;
override on(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this;
override on(eventName: 'tool:status', listener: GatewayManagerEvents['tool:status']): this;
override on(eventName: string | symbol, listener: (...args: any[]) => void): this;
override on(eventName: string | symbol, listener: (...args: any[]) => void): this {
return super.on(eventName, listener);
}
override once(eventName: 'status', listener: GatewayManagerEvents['status']): this;
override once(eventName: 'message', listener: GatewayManagerEvents['message']): this;
override once(eventName: 'notification', listener: GatewayManagerEvents['notification']): this;
override once(eventName: 'exit', listener: GatewayManagerEvents['exit']): this;
override once(eventName: 'error', listener: GatewayManagerEvents['error']): this;
override once(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this;
override once(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this;
override once(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this;
override once(eventName: 'tool:status', listener: GatewayManagerEvents['tool:status']): this;
override once(eventName: string | symbol, listener: (...args: any[]) => void): this;
override once(eventName: string | symbol, listener: (...args: any[]) => void): this {
return super.once(eventName, listener);
}
override off(eventName: 'status', listener: GatewayManagerEvents['status']): this;
override off(eventName: 'message', listener: GatewayManagerEvents['message']): this;
override off(eventName: 'notification', listener: GatewayManagerEvents['notification']): this;
override off(eventName: 'exit', listener: GatewayManagerEvents['exit']): this;
override off(eventName: 'error', listener: GatewayManagerEvents['error']): this;
override off(eventName: 'channel:status', listener: GatewayManagerEvents['channel:status']): this;
override off(eventName: 'chat:message', listener: GatewayManagerEvents['chat:message']): this;
override off(eventName: 'gateway:ready', listener: GatewayManagerEvents['gateway:ready']): this;
override off(eventName: 'tool:status', listener: GatewayManagerEvents['tool:status']): this;
override off(eventName: string | symbol, listener: (...args: any[]) => void): this;
override off(eventName: string | symbol, listener: (...args: any[]) => void): this {
return super.off(eventName, listener);
}
constructor(config?: Partial<ReconnectConfig>) {
super();
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
this.stateController = new GatewayStateController({
emitStatus: (status) => {
this.gatewayStatusDetails = status;
this.lifecycleState = status.state;
this.status = this.toGatewayStatus(status.state);
this.emit('status', { ...status });
updateTrayStatus(this.status);
this.broadcast({ type: 'gateway:status', status: this.status });
},
onTransition: (previousState, nextState) => {
if (nextState === 'running') {
this.restartGovernor.onRunning();
}
this.restartController.flushDeferredRestart(
`status:${previousState}->${nextState}`,
{
state: this.lifecycleState,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logManager.warn('Deferred Gateway restart failed:', error);
});
},
);
},
});
this.on('gateway:ready', () => {
this.clearGatewayReadyFallback();
if (this.lifecycleState === 'running' && !this.gatewayStatusDetails.gatewayReady) {
logManager.info('Gateway subsystems ready (event received)');
this.setGatewayState({ gatewayReady: true });
}
});
this.on('tool:status', (payload) => {
const event = normalizeToolStatusEvent(payload);
if (event) {
this.broadcast(event);
}
});
this.on('error', (error) => {
logManager.debug('GatewayManager emitted error event', error);
});
}
private sanitizeSpawnArgs(args: string[]): string[] {
const sanitized = [...args];
const tokenIndex = sanitized.indexOf('--token');
if (tokenIndex >= 0 && tokenIndex + 1 < sanitized.length) {
sanitized[tokenIndex + 1] = '<redacted>';
}
return sanitized;
}
private isUnsupportedShutdownError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return /unknown method:\s*shutdown/i.test(message);
}
private toGatewayStatus(state: GatewayLifecycleState): GatewayStatus {
switch (state) {
case 'running':
return 'connected';
case 'starting':
case 'reconnecting':
return 'reconnecting';
case 'stopped':
case 'error':
default:
return 'disconnected';
}
}
private setGatewayState(update: Partial<GatewayRuntimeStatus>): void {
this.stateController.setStatus(update);
}
private queueRuntimeChange(change?: RuntimeChangeBroadcast): void {
if (!change) {
return;
}
if (!this.pendingRuntimeChange) {
this.pendingRuntimeChange = {
topics: [...change.topics],
reason: change.reason,
warnings: change.warnings ? [...change.warnings] : undefined,
channelType: change.channelType,
accountId: change.accountId,
};
return;
}
this.pendingRuntimeChange = {
topics: Array.from(new Set([...this.pendingRuntimeChange.topics, ...change.topics])),
reason: change.reason ?? this.pendingRuntimeChange.reason,
warnings: Array.from(new Set([
...(this.pendingRuntimeChange.warnings ?? []),
...(change.warnings ?? []),
])),
channelType: change.channelType ?? this.pendingRuntimeChange.channelType,
accountId: change.accountId ?? this.pendingRuntimeChange.accountId,
};
}
private takeQueuedRuntimeChange(): RuntimeChangeBroadcast | undefined {
if (!this.pendingRuntimeChange) {
return undefined;
}
const change = this.pendingRuntimeChange;
this.pendingRuntimeChange = null;
return change;
}
private flushQueuedRuntimeChangeFailure(prefix: string, error: unknown): void {
const runtimeChange = this.takeQueuedRuntimeChange();
if (!runtimeChange) {
return;
}
const warning = `${prefix}: ${toErrorMessage(error)}`;
this.notifyRuntimeChanged({
...runtimeChange,
warnings: [...(runtimeChange.warnings ?? []), warning],
});
}
private async initDeviceIdentity(): Promise<void> {
if (this.deviceIdentity) {
return;
}
try {
const identityPath = join(getUserDataDir(), 'openclaw-device-identity.json');
this.deviceIdentity = await loadOrCreateDeviceIdentity(identityPath);
logManager.info('OpenClaw Gateway device identity loaded', {
deviceId: this.deviceIdentity.deviceId,
});
} catch (error) {
logManager.warn('Failed to load OpenClaw device identity; scopes may be limited:', error);
}
}
private async terminateChild(child: GatewayProcessHandle): Promise<void> {
await terminateOwnedGatewayProcess(child);
}
private async disposeTransport(reason: string): Promise<void> {
const socket = this.socket;
this.socket = null;
this.clearGatewayReadyFallback();
if (socket) {
try {
socket.terminate();
} catch (error) {
logManager.warn(`Failed to close OpenClaw Gateway socket during ${reason}:`, error);
}
}
const child = this.child;
this.child = null;
this.port = null;
this.ownsProcess = false;
if (child && this.exitCode === null) {
this.exitCode = -1;
} else if (!child) {
this.exitCode = null;
}
this.deltaSnapshots.clear();
clearPendingGatewayRequests(
this.pendingRequests,
new Error(`Gateway request cancelled: ${reason}`),
);
this.setGatewayState({
port: this.port,
pid: undefined,
connectedAt: undefined,
uptime: undefined,
gatewayReady: undefined,
});
if (child) {
await this.terminateChild(child);
}
}
private handleGatewayProcessExit(child: GatewayProcessHandle, code: number | null): void {
if (this.child !== child) {
return;
}
this.exitCode = code ?? -1;
this.child = null;
this.ownsProcess = false;
this.connectionMonitor.clear();
this.clearGatewayReadyFallback();
this.emit('exit', code);
if (this.stopping) {
return;
}
this.lastError = `OpenClaw Gateway exited unexpectedly (code=${code ?? 'unknown'})`;
this.socket = null;
this.deltaSnapshots.clear();
clearPendingGatewayRequests(
this.pendingRequests,
new Error(this.lastError),
);
this.setGatewayState({
state: 'stopped',
port: this.port,
pid: undefined,
error: this.lastError,
connectedAt: undefined,
uptime: undefined,
gatewayReady: undefined,
reconnectAttempts: this.reconnectAttempts,
});
logManager.warn(this.lastError);
this.scheduleReconnect();
}
private handleGatewayProcessError(error: Error): void {
this.lastError = toErrorMessage(error);
logManager.error('OpenClaw Gateway process error:', error);
this.emit('error', error);
}
private async spawnGatewayProcess(): Promise<void> {
if (this.port === null) {
throw new Error('OpenClaw Gateway port not allocated');
}
const launchContext = await prepareGatewayLaunchContext({
port: this.port,
token: this.gatewayToken,
});
await unloadLaunchctlGatewayService();
const stderrDedup = new Map<string, number>();
const { child, lastSpawnSummary } = await launchGatewayProcess({
port: this.port,
launchContext,
sanitizeSpawnArgs: (args) => this.sanitizeSpawnArgs(args),
onStdoutLine: (line) => {
logManager.debug(`[OpenClaw stdout] ${line}`);
},
onStderrLine: (line) => {
recordGatewayStartupStderrLine(this.recentStartupStderrLines, line);
const classified = classifyGatewayStderrMessage(line);
if (classified.level === 'drop') {
return;
}
const count = (stderrDedup.get(classified.normalized) ?? 0) + 1;
stderrDedup.set(classified.normalized, count);
if (count > 1) {
if (count % 50 === 0) {
logManager.debug(`[Gateway stderr] (suppressed ${count} repeats) ${classified.normalized}`);
}
return;
}
if (classified.level === 'debug') {
logManager.debug(`[Gateway stderr] ${classified.normalized}`);
return;
}
logManager.warn(`[Gateway stderr] ${classified.normalized}`);
},
onExit: (exitedChild, code) => {
this.handleGatewayProcessExit(exitedChild, code);
},
onError: (error) => {
this.handleGatewayProcessError(error);
},
});
this.child = child;
this.ownsProcess = true;
this.lastSpawnSummary = lastSpawnSummary;
this.setGatewayState({
port: this.port,
pid: child.pid ?? undefined,
error: undefined,
reconnectAttempts: this.reconnectAttempts,
});
}
private async connectToGateway(port: number, externalToken?: string): Promise<void> {
const token = externalToken ?? this.gatewayToken;
const socket = await connectGatewaySocket({
port,
token,
deviceIdentity: this.deviceIdentity,
platform: process.platform,
onMessage: (message) => this.handleGatewayFrame(message),
onCloseAfterHandshake: (socket, code) => this.handleGatewaySocketClosed(socket, code),
});
this.socket = socket;
socket.on('pong', () => {
this.connectionMonitor.markAlive('pong');
this.recordGatewayAlive();
});
this.recordGatewayAlive();
this.connectionMonitor.markAlive('message');
this.setGatewayState({
state: 'running',
port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: undefined,
connectedAt: Date.now(),
reconnectAttempts: this.reconnectAttempts,
gatewayReady: false,
});
this.startPing();
this.startHealthCheck();
this.scheduleGatewayReadyFallback();
}
private handleGatewaySocketClosed(socket: WebSocket, code: number): void {
if (this.socket !== socket) {
return;
}
if (this.stopping) {
return;
}
this.socket = null;
this.connectionMonitor.clear();
this.clearGatewayReadyFallback();
this.recordSocketClose(code);
this.diagnostics.consecutiveHeartbeatMisses = 0;
this.deltaSnapshots.clear();
this.lastError = `OpenClaw Gateway socket closed (code=${code})`;
clearPendingGatewayRequests(
this.pendingRequests,
new Error(this.lastError),
);
const wasRunning = this.lifecycleState === 'running';
this.setGatewayState({
state: 'stopped',
port: this.port,
pid: this.child?.pid ?? undefined,
error: this.lastError,
connectedAt: undefined,
uptime: undefined,
gatewayReady: undefined,
reconnectAttempts: this.reconnectAttempts,
});
logManager.warn(this.lastError);
if (wasRunning && (process.platform !== 'win32' || code === 1012)) {
this.scheduleReconnect();
}
}
private handleGatewayFrame(frame: unknown): void {
this.connectionMonitor.markAlive('message');
this.recordGatewayAlive();
if (!isRecord(frame)) {
return;
}
if (frame.type === 'res' && typeof frame.id === 'string') {
const response = frame as GatewayResponseFrame;
if (response.ok === false) {
rejectPendingGatewayRequest(
this.pendingRequests,
response.id!,
buildGatewayRpcError(response.error, `Gateway RPC failed: ${response.id}`),
);
return;
}
resolvePendingGatewayRequest(
this.pendingRequests,
response.id!,
response.payload,
);
return;
}
if (frame.type === 'event' && typeof frame.event === 'string') {
const eventFrame = frame as GatewayEventFrame;
if (eventFrame.event === 'chat' && isRecord(eventFrame.payload)) {
this.handleChatEvent(eventFrame.payload);
}
dispatchProtocolEvent(this, eventFrame.event, eventFrame.payload);
return;
}
if (isResponse(frame) && frame.id && this.pendingRequests.has(String(frame.id))) {
if (frame.error) {
rejectPendingGatewayRequest(
this.pendingRequests,
String(frame.id),
buildGatewayRpcError(frame.error, `Gateway RPC failed: ${String(frame.id)}`),
);
} else {
resolvePendingGatewayRequest(this.pendingRequests, String(frame.id), frame.result);
}
return;
}
if (isNotification(frame)) {
dispatchJsonRpcNotification(this, frame as JsonRpcNotification);
return;
}
this.emit('message', frame);
}
private handleChatEvent(payload: Record<string, unknown>): void {
const sessionKey = typeof payload.sessionKey === 'string'
? normalizeAgentSessionKey(payload.sessionKey)
: '';
const runId = typeof payload.runId === 'string' ? payload.runId : '';
const state = typeof payload.state === 'string' ? payload.state : '';
if (!sessionKey || !runId || !state) {
return;
}
switch (state) {
case 'delta': {
const nextSnapshot = extractTextFromGatewayPayload(payload);
if (!nextSnapshot) {
return;
}
const previousSnapshot = this.deltaSnapshots.get(runId) ?? '';
const delta = nextSnapshot.startsWith(previousSnapshot)
? nextSnapshot.slice(previousSnapshot.length)
: nextSnapshot;
this.deltaSnapshots.set(runId, nextSnapshot);
if (delta) {
this.broadcast({
type: 'chat:delta',
sessionKey,
runId,
delta,
});
}
break;
}
case 'final': {
const snapshotText = this.deltaSnapshots.get(runId) ?? '';
this.deltaSnapshots.delete(runId);
const message = normalizeGatewayRawMessage(payload.message, {
preferContentBlocks: true,
}) ?? (
snapshotText
? {
role: 'assistant',
content: toTextContentBlocks(snapshotText),
timestamp: Date.now(),
}
: null
);
if (!message) {
return;
}
this.broadcast({
type: 'chat:final',
sessionKey,
runId,
message,
});
break;
}
case 'error': {
this.deltaSnapshots.delete(runId);
this.broadcast({
type: 'chat:error',
sessionKey,
runId,
error: typeof payload.errorMessage === 'string'
? payload.errorMessage
: 'Gateway chat error',
});
break;
}
case 'aborted': {
this.deltaSnapshots.delete(runId);
this.broadcast({
type: 'chat:aborted',
sessionKey,
runId,
});
break;
}
default:
break;
}
}
private async sendGatewayRequest(
method: string,
params?: Record<string, unknown>,
options?: {
timeoutMs?: number | null;
skipStart?: boolean;
},
): Promise<unknown> {
if (!options?.skipStart) {
await this.start();
}
const socket = this.socket;
if (!socket || socket.readyState !== WebSocket.OPEN) {
throw new Error('OpenClaw Gateway socket is not connected');
}
const requestId = `${method}-${randomUUID()}`;
const timeoutMs = options?.timeoutMs ?? 30_000;
return await new Promise<unknown>((resolve, reject) => {
const timeout = timeoutMs === null
? null
: setTimeout(() => {
rejectPendingGatewayRequest(
this.pendingRequests,
requestId,
new Error(`Gateway RPC timed out: ${method}`),
);
}, timeoutMs);
this.pendingRequests.set(requestId, {
resolve,
reject,
timeout,
});
try {
socket.send(JSON.stringify({
type: 'req',
id: requestId,
method,
params,
}));
} catch (error) {
rejectPendingGatewayRequest(
this.pendingRequests,
requestId,
new Error(`Failed to send Gateway RPC ${method}: ${toErrorMessage(error)}`),
);
}
}).then((result) => {
this.recordRpcSuccess();
return result;
}).catch((error) => {
if (isTransportRpcFailure(error)) {
this.recordRpcFailure(method);
}
throw error;
});
}
private async rpcGateway(
method: string,
params: Record<string, unknown>,
options?: { timeoutMs?: number | null },
): Promise<unknown> {
return await this.sendGatewayRequest(method, params, options);
}
private async requestGatewayShutdown(timeoutMs = 5_000): Promise<void> {
await this.sendGatewayRequest(
'shutdown',
undefined,
{
timeoutMs,
skipStart: true,
},
);
}
private startHealthCheck(): void {
this.connectionMonitor.startHealthCheck({
shouldCheck: () => this.lifecycleState === 'running',
checkHealth: () => this.checkHealth(),
onUnhealthy: (errorMessage) => {
logManager.warn(`Gateway health check failed: ${errorMessage}`);
this.emit('error', new Error(errorMessage));
},
onError: () => {
// The monitor already logged the error.
},
});
}
private startPing(): void {
const isWindows = process.platform === 'win32';
this.connectionMonitor.startPing({
intervalMs: isWindows
? GatewayManager.HEARTBEAT_INTERVAL_MS_WIN
: GatewayManager.HEARTBEAT_INTERVAL_MS,
timeoutMs: isWindows
? GatewayManager.HEARTBEAT_TIMEOUT_MS_WIN
: GatewayManager.HEARTBEAT_TIMEOUT_MS,
maxConsecutiveMisses: isWindows
? GatewayManager.HEARTBEAT_MAX_MISSES_WIN
: GatewayManager.HEARTBEAT_MAX_MISSES,
sendPing: () => {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.ping();
}
},
onHeartbeatTimeout: ({ consecutiveMisses, timeoutMs }) => {
this.recordHeartbeatTimeout(consecutiveMisses);
logManager.warn(
`Gateway heartbeat: ${consecutiveMisses} consecutive pong misses (timeout=${timeoutMs}ms, state=${this.lifecycleState}, autoReconnect=${this.shouldReconnect}).`,
);
if (!this.shouldReconnect || this.lifecycleState !== 'running') {
return;
}
if (process.platform === 'win32') {
logManager.warn('Gateway heartbeat recovery skipped on Windows; waiting for process exit or socket close');
return;
}
void this.restart().catch((error) => {
logManager.warn('Gateway heartbeat recovery failed:', error);
});
},
});
}
private async refreshReloadPolicy(force = false): Promise<void> {
const now = Date.now();
if (!force && now - this.reloadPolicyLoadedAt < GatewayManager.RELOAD_POLICY_REFRESH_MS) {
return;
}
if (this.reloadPolicyRefreshPromise) {
await this.reloadPolicyRefreshPromise;
return;
}
this.reloadPolicyRefreshPromise = (async () => {
const nextPolicy = await loadGatewayReloadPolicy();
this.reloadPolicy = nextPolicy;
this.reloadPolicyLoadedAt = Date.now();
})();
try {
await this.reloadPolicyRefreshPromise;
} finally {
this.reloadPolicyRefreshPromise = null;
}
}
private clearAllTimers(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.connectionMonitor.clear();
this.restartController.clearDebounceTimer();
if (this.reloadDebounceTimer) {
clearTimeout(this.reloadDebounceTimer);
this.reloadDebounceTimer = null;
}
this.clearGatewayReadyFallback();
}
private clearGatewayReadyFallback(): void {
if (this.gatewayReadyFallbackTimer) {
clearTimeout(this.gatewayReadyFallbackTimer);
this.gatewayReadyFallbackTimer = null;
}
}
private scheduleGatewayReadyFallback(): void {
this.clearGatewayReadyFallback();
this.gatewayReadyFallbackTimer = setTimeout(() => {
this.gatewayReadyFallbackTimer = null;
if (this.lifecycleState === 'running' && !this.gatewayStatusDetails.gatewayReady) {
logManager.info('Gateway ready fallback triggered (no gateway.ready event within timeout)');
this.setGatewayState({ gatewayReady: true });
}
}, GatewayManager.GATEWAY_READY_FALLBACK_MS);
}
getDiagnostics(): GatewayDiagnosticsSnapshot {
return { ...this.diagnostics };
}
private recordGatewayAlive(): void {
this.diagnostics.lastAliveAt = Date.now();
this.diagnostics.consecutiveHeartbeatMisses = 0;
}
private recordRpcSuccess(): void {
this.diagnostics.lastRpcSuccessAt = Date.now();
this.diagnostics.consecutiveRpcFailures = 0;
}
private recordRpcFailure(method: string): void {
this.diagnostics.lastRpcFailureAt = Date.now();
this.diagnostics.lastRpcFailureMethod = method;
this.diagnostics.consecutiveRpcFailures += 1;
}
private recordHeartbeatTimeout(consecutiveMisses: number): void {
this.diagnostics.lastHeartbeatTimeoutAt = Date.now();
this.diagnostics.consecutiveHeartbeatMisses = consecutiveMisses;
}
private recordSocketClose(code: number): void {
this.diagnostics.lastSocketCloseAt = Date.now();
this.diagnostics.lastSocketCloseCode = code;
}
private emitReconnectMetric(
outcome: 'success' | 'failure',
payload: {
attemptNo: number;
maxAttempts: number;
delayMs: number;
error?: string;
},
): void {
const successRate = this.reconnectAttemptsTotal > 0
? this.reconnectSuccessTotal / this.reconnectAttemptsTotal
: 0;
trackMetric('gateway.reconnect', {
outcome,
attemptNo: payload.attemptNo,
maxAttempts: payload.maxAttempts,
delayMs: payload.delayMs,
gateway_reconnect_success_count: this.reconnectSuccessTotal,
gateway_reconnect_attempt_count: this.reconnectAttemptsTotal,
gateway_reconnect_success_rate: Number(successRate.toFixed(4)),
...(payload.error ? { error: payload.error } : {}),
});
}
private scheduleReconnect(): void {
const decision = getReconnectScheduleDecision({
shouldReconnect: this.shouldReconnect,
hasReconnectTimer: this.reconnectTimer !== null,
reconnectAttempts: this.reconnectAttempts,
maxAttempts: this.reconnectConfig.maxAttempts,
baseDelay: this.reconnectConfig.baseDelay,
maxDelay: this.reconnectConfig.maxDelay,
});
if (decision.action === 'skip') {
return;
}
if (decision.action === 'already-scheduled') {
return;
}
if (decision.action === 'fail') {
this.lastError = 'Failed to reconnect after maximum attempts';
this.setGatewayState({
state: 'error',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: this.lastError,
reconnectAttempts: this.reconnectAttempts,
gatewayReady: undefined,
});
return;
}
const { delay, nextAttempt, maxAttempts } = decision;
this.reconnectAttempts = nextAttempt;
this.setGatewayState({
state: 'reconnecting',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: this.lastError,
reconnectAttempts: nextAttempt,
gatewayReady: false,
});
const scheduledEpoch = this.lifecycleController.getCurrentEpoch();
this.reconnectTimer = setTimeout(async () => {
this.reconnectTimer = null;
const skipReason = getReconnectSkipReason({
scheduledEpoch,
currentEpoch: this.lifecycleController.getCurrentEpoch(),
shouldReconnect: this.shouldReconnect,
});
if (skipReason) {
return;
}
const attemptNo = this.reconnectAttempts;
this.reconnectAttemptsTotal += 1;
try {
this.isAutoReconnectStart = true;
await this.start();
this.reconnectSuccessTotal += 1;
this.emitReconnectMetric('success', {
attemptNo,
maxAttempts,
delayMs: delay,
});
this.reconnectAttempts = 0;
} catch (error) {
logManager.error(
`Gateway reconnection attempt ${nextAttempt}/${maxAttempts} failed:`,
error,
);
this.emitReconnectMetric('failure', {
attemptNo,
maxAttempts,
delayMs: delay,
error: toErrorMessage(error),
});
this.scheduleReconnect();
}
}, delay);
}
debouncedRestart(delayMs = 2000, options?: RuntimeChangeBroadcast): void {
this.queueRuntimeChange(options);
this.restartController.debouncedRestart(delayMs, () => {
void this.restart().catch((error) => {
logManager.warn('Debounced Gateway restart failed:', error);
this.flushQueuedRuntimeChangeFailure('Gateway restart failed', error);
});
});
}
async reload(options?: RuntimeChangeBroadcast): Promise<void> {
this.queueRuntimeChange(options);
await this.refreshReloadPolicy();
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
await this.restart();
return;
}
if (this.restartController.isRestartDeferred({
state: this.lifecycleState,
startLock: this.startLock,
})) {
this.restartController.markDeferredRestart('reload', {
state: this.lifecycleState,
startLock: this.startLock,
});
return;
}
if (!this.child?.pid || this.lifecycleState !== 'running') {
await this.restart();
return;
}
if (process.platform === 'win32') {
await this.restart();
return;
}
try {
process.kill(this.child.pid, 'SIGUSR1');
await new Promise((resolve) => setTimeout(resolve, 1500));
if (this.lifecycleState !== 'running' || !this.child?.pid) {
await this.restart();
return;
}
const runtimeChange = this.takeQueuedRuntimeChange();
if (runtimeChange) {
this.notifyRuntimeChanged(runtimeChange);
}
} catch (error) {
logManager.warn('Gateway reload signal failed, falling back to restart:', error);
await this.restart();
}
}
debouncedReload(delayMs?: number, options?: RuntimeChangeBroadcast): void {
this.queueRuntimeChange(options);
void this.refreshReloadPolicy();
const effectiveDelay = delayMs ?? this.reloadPolicy.debounceMs;
if (this.reloadPolicy.mode === 'off' || this.reloadPolicy.mode === 'restart') {
this.debouncedRestart(effectiveDelay);
return;
}
if (this.reloadDebounceTimer) {
clearTimeout(this.reloadDebounceTimer);
}
this.reloadDebounceTimer = setTimeout(() => {
this.reloadDebounceTimer = null;
void this.reload().catch((error) => {
logManager.warn('Debounced Gateway reload failed:', error);
this.flushQueuedRuntimeChangeFailure('Gateway reload failed', error);
});
}, effectiveDelay);
}
async init(): Promise<void> {
if (this.initialized) {
return;
}
if (this.initPromise) {
return await this.initPromise;
}
this.initPromise = (async () => {
this.initialized = true;
logManager.info('GatewayManager initialized in OpenClaw mode');
const autoStart = Boolean(configManager.get(CONFIG_KEYS.GATEWAY_AUTO_START));
if (!autoStart) {
this.setGatewayState({
state: 'stopped',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
gatewayReady: undefined,
reconnectAttempts: this.reconnectAttempts,
});
return;
}
try {
await this.start();
} catch (error) {
this.lastError = toErrorMessage(error);
this.setGatewayState({
state: 'stopped',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: this.lastError,
gatewayReady: undefined,
reconnectAttempts: this.reconnectAttempts,
});
logManager.error('Failed to auto-start OpenClaw Gateway:', error);
}
})();
try {
await this.initPromise;
} finally {
this.initPromise = null;
}
}
async start(): Promise<void> {
if (this.startLock) {
if (this.startPromise) {
return await this.startPromise;
}
return;
}
if (this.lifecycleState === 'running' && this.socket?.readyState === WebSocket.OPEN) {
return;
}
if (this.startPromise) {
return await this.startPromise;
}
this.initialized = true;
this.startPromise = (async () => {
if (this.stopPromise) {
await this.stopPromise;
}
this.startLock = true;
const startEpoch = this.lifecycleController.bump('start');
this.stopping = false;
this.lastError = undefined;
this.shouldReconnect = true;
await this.refreshReloadPolicy(true);
try {
await this.initDeviceIdentity();
await this.processOwner.prepare();
const runtimeStatus = this.processOwner.getStatus();
if (!runtimeStatus.entryExists) {
throw new Error(runtimeStatus.lastError || `OpenClaw entry not found at ${runtimeStatus.runtimePaths.entryPath}`);
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
const wasAutoReconnectStart = this.isAutoReconnectStart;
if (!wasAutoReconnectStart) {
this.reconnectAttempts = 0;
}
this.isAutoReconnectStart = false;
const reusingManagedProcess =
this.child?.pid != null &&
this.ownsProcess &&
this.port != null &&
this.socket === null;
const shouldDisposeTransport = !reusingManagedProcess && (this.socket !== null || this.child !== null);
if (shouldDisposeTransport) {
await this.disposeTransport('starting OpenClaw Gateway');
}
if (this.port === null) {
this.port = await findAvailablePort();
}
this.exitCode = null;
if (!reusingManagedProcess) {
this.gatewayToken = randomUUID();
}
this.setGatewayState({
state: wasAutoReconnectStart ? 'reconnecting' : 'starting',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: undefined,
connectedAt: undefined,
uptime: undefined,
reconnectAttempts: this.reconnectAttempts,
gatewayReady: false,
});
warmupManagedPythonReadiness();
await runGatewayStartupSequence({
port: this.port,
shouldWaitForPortFree: process.platform === 'win32',
hasOwnedProcess: () => this.child?.pid != null && this.ownsProcess,
assertLifecycle: (phase) => {
this.lifecycleController.assert(startEpoch, phase);
},
resetStartupStderrLines: () => {
this.recentStartupStderrLines = [];
},
getStartupStderrLines: () => this.recentStartupStderrLines,
findExistingGateway: async (port) => {
return await findExistingGatewayProcess({ port, ownedPid: this.child?.pid ?? null });
},
connect: async (port, externalToken) => {
await this.connectToGateway(port, externalToken);
},
onConnectedToExistingGateway: () => {
const isOwnProcess = this.child?.pid != null && this.ownsProcess;
if (!isOwnProcess) {
this.ownsProcess = false;
this.externalShutdownSupported = null;
this.setGatewayState({ pid: undefined });
}
if (isOwnProcess) {
this.restartController.recordRestartCompleted();
}
},
waitForPortFree: async (port) => {
await waitForPortFree(port);
},
startProcess: async () => {
await this.spawnGatewayProcess();
},
waitForReady: async (port) => {
await waitForGatewayReady({
port,
getProcessExitCode: () => this.exitCode,
});
},
onConnectedToManagedGateway: () => {
logManager.info('OpenClaw Gateway startup sequence completed', {
port: this.port,
pid: this.child?.pid,
spawn: this.lastSpawnSummary,
});
},
runDoctorRepair: async () => await runOpenClawDoctorRepair(),
onDoctorRepairSuccess: () => {
this.lastError = undefined;
this.setGatewayState({
state: 'starting',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: undefined,
reconnectAttempts: this.reconnectAttempts,
gatewayReady: false,
});
},
delay: async (ms) => {
await new Promise((resolve) => setTimeout(resolve, ms));
},
});
this.lastError = undefined;
this.reconnectAttempts = 0;
this.setGatewayState({
state: 'running',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: undefined,
reconnectAttempts: 0,
});
logManager.info('OpenClaw Gateway connected', {
port: this.port,
pid: this.child?.pid,
spawn: this.lastSpawnSummary,
});
} catch (error) {
if (error instanceof LifecycleSupersededError) {
logManager.debug(error.message);
return;
}
this.lastError = toErrorMessage(error);
await this.disposeTransport('failed OpenClaw Gateway start');
this.setGatewayState({
state: 'error',
port: this.port,
pid: undefined,
error: this.lastError,
connectedAt: undefined,
uptime: undefined,
reconnectAttempts: this.reconnectAttempts,
gatewayReady: undefined,
});
throw error;
} finally {
this.startLock = false;
this.restartController.flushDeferredRestart(
'start:finally',
{
state: this.lifecycleState,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logManager.warn('Deferred Gateway restart failed:', error);
});
},
);
}
})();
try {
await this.startPromise;
} finally {
this.startPromise = null;
}
}
async stop(): Promise<void> {
if (this.stopPromise) {
return await this.stopPromise;
}
this.stopPromise = (async () => {
this.lifecycleController.bump('stop');
this.shouldReconnect = false;
this.stopping = true;
this.clearAllTimers();
if (!this.ownsProcess && this.socket?.readyState === WebSocket.OPEN && this.externalShutdownSupported !== false) {
try {
await this.requestGatewayShutdown();
this.externalShutdownSupported = true;
} catch (error) {
if (this.isUnsupportedShutdownError(error)) {
this.externalShutdownSupported = false;
logManager.info('External Gateway does not support "shutdown"; skipping shutdown RPC for future stops');
} else {
logManager.warn('Failed to request shutdown for externally managed Gateway:', error);
}
}
}
await this.disposeTransport('stopping OpenClaw Gateway');
this.restartController.resetDeferredRestart();
this.isAutoReconnectStart = false;
this.reconnectAttempts = 0;
this.lastError = undefined;
this.diagnostics.consecutiveHeartbeatMisses = 0;
this.setGatewayState({
state: 'stopped',
port: this.port,
pid: undefined,
error: undefined,
connectedAt: undefined,
uptime: undefined,
reconnectAttempts: 0,
gatewayReady: undefined,
});
this.stopping = false;
})();
try {
await this.stopPromise;
} finally {
this.stopPromise = null;
}
}
async forceTerminateOwnedProcessForQuit(): Promise<boolean> {
if (!this.child || !this.ownsProcess) {
return false;
}
if (this.socket) {
try {
this.socket.terminate();
} catch {
// ignore quit-time socket termination errors
}
this.socket = null;
}
this.clearGatewayReadyFallback();
this.connectionMonitor.clear();
clearPendingGatewayRequests(
this.pendingRequests,
new Error('Gateway terminated during app quit'),
);
const child = this.child;
await this.terminateChild(child);
if (this.child === child) {
this.child = null;
}
this.ownsProcess = false;
this.setGatewayState({
pid: undefined,
connectedAt: undefined,
uptime: undefined,
gatewayReady: undefined,
});
return true;
}
async restart(options?: RuntimeChangeBroadcast): Promise<void> {
this.queueRuntimeChange(options);
if (this.restartController.isRestartDeferred({
state: this.lifecycleState,
startLock: this.startLock,
})) {
this.restartController.markDeferredRestart('restart', {
state: this.lifecycleState,
startLock: this.startLock,
});
return;
}
if (this.restartInFlight) {
await this.restartInFlight;
return;
}
const decision = this.restartGovernor.decide();
if (!decision.allow) {
const observability = this.restartGovernor.getObservability();
logManager.warn(
`[gateway-restart-governor] restart suppressed reason=${decision.reason} retryAfterMs=${decision.retryAfterMs} suppressed=${observability.suppressed_total} executed=${observability.executed_total} circuitOpenUntil=${observability.circuit_open_until}`,
);
const props = {
reason: decision.reason,
retry_after_ms: decision.retryAfterMs,
gateway_restart_suppressed_total: observability.suppressed_total,
gateway_restart_executed_total: observability.executed_total,
gateway_restart_circuit_open_until: observability.circuit_open_until,
};
trackMetric('gateway.restart.suppressed', props);
captureTelemetryEvent('gateway_restart_suppressed', props);
return;
}
const pidBefore = this.gatewayStatusDetails.pid ?? this.child?.pid;
this.restartInFlight = (async () => {
this.setGatewayState({
state: 'reconnecting',
port: this.port,
pid: this.child?.pid ?? this.gatewayStatusDetails.pid,
error: this.lastError,
reconnectAttempts: this.reconnectAttempts,
gatewayReady: false,
});
await this.stop();
try {
await this.start();
} catch (error) {
logManager.warn('Gateway restart: start() failed after stop(), enabling auto-reconnect recovery', error);
this.shouldReconnect = true;
this.scheduleReconnect();
throw error;
}
})();
try {
await this.restartInFlight;
this.restartGovernor.recordExecuted();
this.restartController.recordRestartCompleted();
const observability = this.restartGovernor.getObservability();
const props = {
gateway_restart_executed_total: observability.executed_total,
gateway_restart_suppressed_total: observability.suppressed_total,
gateway_restart_circuit_open_until: observability.circuit_open_until,
pid_before: pidBefore ?? 'n/a',
pid_after: this.gatewayStatusDetails.pid ?? this.child?.pid ?? 'n/a',
};
trackMetric('gateway.restart.executed', props);
captureTelemetryEvent('gateway_restart_executed', props);
const runtimeChange = this.takeQueuedRuntimeChange();
if (runtimeChange) {
this.notifyRuntimeChanged(runtimeChange);
}
} finally {
this.restartInFlight = null;
this.restartController.flushDeferredRestart(
'restart:finally',
{
state: this.lifecycleState,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logManager.warn('Deferred Gateway restart failed:', error);
});
},
);
}
}
getStatus(): {
status: GatewayStatus;
initialized: boolean;
mode: 'openclaw';
port: number | null;
pid: number | null;
lastError?: string;
lifecycleState: GatewayLifecycleState;
gatewayReady?: boolean;
connectedAt?: number;
uptime?: number;
reconnectAttempts?: number;
diagnostics: GatewayDiagnosticsSnapshot;
runtime: ReturnType<OpenClawProcessOwner['getStatus']>;
} {
const runtimeStatus = this.stateController.getStatus();
const connectedAt = runtimeStatus.connectedAt;
const uptime = runtimeStatus.state === 'running' && connectedAt
? Date.now() - connectedAt
: undefined;
return {
status: this.status,
initialized: this.initialized,
mode: this.mode,
port: runtimeStatus.port ?? this.port,
pid: runtimeStatus.pid ?? this.child?.pid ?? null,
lastError: this.lastError,
lifecycleState: runtimeStatus.state,
gatewayReady: runtimeStatus.gatewayReady,
connectedAt,
uptime,
reconnectAttempts: runtimeStatus.reconnectAttempts,
diagnostics: this.getDiagnostics(),
runtime: this.processOwner.getStatus(),
};
}
async checkHealth(): Promise<{
ok: boolean;
status: GatewayStatus;
initialized: boolean;
mode: 'openclaw';
port: number | null;
pid: number | null;
lastError?: string;
lifecycleState: GatewayLifecycleState;
gatewayReady?: boolean;
connectedAt?: number;
uptime?: number;
reconnectAttempts?: number;
diagnostics: GatewayDiagnosticsSnapshot;
runtime: ReturnType<OpenClawProcessOwner['getStatus']>;
}> {
const status = this.getStatus();
return {
ok: status.initialized && status.status === 'connected' && this.socket?.readyState === WebSocket.OPEN,
...status,
};
}
async rpc(method: string, params: any): Promise<any> {
if (!this.initialized) {
await this.init();
}
const localDispatch = dispatchGatewayRpcMethod(
method,
params,
(event) => this.broadcast(event),
);
if (localDispatch.handled) {
return localDispatch.result;
}
switch (method) {
default:
throw new Error(`Unknown gateway RPC method: ${method}`);
}
}
broadcast(event: GatewayEvent): void {
const mainWindow = BrowserWindow.getAllWindows().find(
(win) => windowManager.getName(win) === 'main',
);
if (mainWindow && !mainWindow.isDestroyed()) {
mainWindow.webContents.send('gateway:event', event);
}
}
notifyRuntimeChanged(options: RuntimeChangeBroadcast): void {
const topics = Array.from(new Set(options.topics.filter(Boolean)));
if (topics.length === 0) {
return;
}
this.broadcast({
type: 'runtime:changed',
topics,
reason: options.reason,
warnings: options.warnings && options.warnings.length > 0 ? options.warnings : undefined,
channelType: options.channelType,
accountId: options.accountId,
syncedAt: new Date().toISOString(),
});
}
reloadProviders(options?: RuntimeChangeBroadcast): void {
const runtimeChange: RuntimeChangeBroadcast = {
topics: options?.topics ?? ['providers', 'models'],
reason: options?.reason ?? 'providers:reload',
warnings: options?.warnings,
channelType: options?.channelType,
accountId: options?.accountId,
};
if (this.initialized && (this.status === 'connected' || this.status === 'reconnecting')) {
this.debouncedReload(undefined, runtimeChange);
return;
}
this.notifyRuntimeChanged(runtimeChange);
}
}
export const gatewayManager = new GatewayManager();