feat
This commit is contained in:
@@ -11,7 +11,6 @@ import { PORTS } from '../utils/config';
|
|||||||
import {
|
import {
|
||||||
appendNodeRequireToNodeOptions,
|
appendNodeRequireToNodeOptions,
|
||||||
} from '../utils/paths';
|
} from '../utils/paths';
|
||||||
import { getSetting } from '../utils/store';
|
|
||||||
import { JsonRpcNotification, isNotification, isResponse } from './protocol';
|
import { JsonRpcNotification, isNotification, isResponse } from './protocol';
|
||||||
import { logger } from '../utils/logger';
|
import { logger } from '../utils/logger';
|
||||||
import {
|
import {
|
||||||
@@ -37,7 +36,7 @@ import {
|
|||||||
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
|
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
|
||||||
import { GatewayStateController } from './state';
|
import { GatewayStateController } from './state';
|
||||||
import { prepareGatewayLaunchContext } from './config-sync';
|
import { prepareGatewayLaunchContext } from './config-sync';
|
||||||
import { buildGatewayConnectFrame, probeGatewayReady } from './ws-client';
|
import { connectGatewaySocket, probeGatewayReady } from './ws-client';
|
||||||
import {
|
import {
|
||||||
findExistingGatewayProcess,
|
findExistingGatewayProcess,
|
||||||
isTransientGatewayStartError,
|
isTransientGatewayStartError,
|
||||||
@@ -788,172 +787,30 @@ export class GatewayManager extends EventEmitter {
|
|||||||
* Connect WebSocket to Gateway
|
* Connect WebSocket to Gateway
|
||||||
*/
|
*/
|
||||||
private async connect(port: number, _externalToken?: string): Promise<void> {
|
private async connect(port: number, _externalToken?: string): Promise<void> {
|
||||||
logger.debug(`Connecting Gateway WebSocket (ws://localhost:${port}/ws)`);
|
this.ws = await connectGatewaySocket({
|
||||||
|
port,
|
||||||
return new Promise((resolve, reject) => {
|
deviceIdentity: this.deviceIdentity,
|
||||||
// WebSocket URL (token will be sent in connect handshake, not URL)
|
platform: process.platform,
|
||||||
const wsUrl = `ws://localhost:${port}/ws`;
|
pendingRequests: this.pendingRequests,
|
||||||
|
getToken: async () => await import('../utils/store').then(({ getSetting }) => getSetting('gatewayToken')),
|
||||||
this.ws = new WebSocket(wsUrl);
|
onHandshakeComplete: (ws) => {
|
||||||
let handshakeComplete = false;
|
this.ws = ws;
|
||||||
let connectId: string | null = null;
|
this.setStatus({
|
||||||
let handshakeTimeout: NodeJS.Timeout | null = null;
|
state: 'running',
|
||||||
let settled = false;
|
port,
|
||||||
|
connectedAt: Date.now(),
|
||||||
let challengeTimer: NodeJS.Timeout | null = null;
|
|
||||||
|
|
||||||
const cleanupHandshakeRequest = () => {
|
|
||||||
if (challengeTimer) {
|
|
||||||
clearTimeout(challengeTimer);
|
|
||||||
challengeTimer = null;
|
|
||||||
}
|
|
||||||
if (handshakeTimeout) {
|
|
||||||
clearTimeout(handshakeTimeout);
|
|
||||||
handshakeTimeout = null;
|
|
||||||
}
|
|
||||||
if (connectId && this.pendingRequests.has(connectId)) {
|
|
||||||
const request = this.pendingRequests.get(connectId);
|
|
||||||
if (request) {
|
|
||||||
clearTimeout(request.timeout);
|
|
||||||
}
|
|
||||||
this.pendingRequests.delete(connectId);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const resolveOnce = () => {
|
|
||||||
if (settled) return;
|
|
||||||
settled = true;
|
|
||||||
cleanupHandshakeRequest();
|
|
||||||
resolve();
|
|
||||||
};
|
|
||||||
|
|
||||||
const rejectOnce = (error: unknown) => {
|
|
||||||
if (settled) return;
|
|
||||||
settled = true;
|
|
||||||
cleanupHandshakeRequest();
|
|
||||||
const err = error instanceof Error ? error : new Error(String(error));
|
|
||||||
reject(err);
|
|
||||||
};
|
|
||||||
|
|
||||||
// Sends the connect frame using the server-issued challenge nonce.
|
|
||||||
const sendConnectHandshake = async (challengeNonce: string) => {
|
|
||||||
logger.debug('Sending connect handshake with challenge nonce');
|
|
||||||
|
|
||||||
const currentToken = await getSetting('gatewayToken');
|
|
||||||
const connectPayload = buildGatewayConnectFrame({
|
|
||||||
challengeNonce,
|
|
||||||
token: currentToken,
|
|
||||||
deviceIdentity: this.deviceIdentity,
|
|
||||||
platform: process.platform,
|
|
||||||
});
|
});
|
||||||
connectId = connectPayload.connectId;
|
this.startPing();
|
||||||
|
},
|
||||||
this.ws?.send(JSON.stringify(connectPayload.frame));
|
onMessage: (message) => {
|
||||||
|
this.handleMessage(message);
|
||||||
const requestTimeout = setTimeout(() => {
|
},
|
||||||
if (!handshakeComplete) {
|
onCloseAfterHandshake: () => {
|
||||||
logger.error('Gateway connect handshake timed out');
|
|
||||||
this.ws?.close();
|
|
||||||
rejectOnce(new Error('Connect handshake timeout'));
|
|
||||||
}
|
|
||||||
}, 10000);
|
|
||||||
handshakeTimeout = requestTimeout;
|
|
||||||
|
|
||||||
this.pendingRequests.set(connectId, {
|
|
||||||
resolve: (_result) => {
|
|
||||||
handshakeComplete = true;
|
|
||||||
logger.debug('Gateway connect handshake completed');
|
|
||||||
this.setStatus({
|
|
||||||
state: 'running',
|
|
||||||
port,
|
|
||||||
connectedAt: Date.now(),
|
|
||||||
});
|
|
||||||
this.startPing();
|
|
||||||
resolveOnce();
|
|
||||||
},
|
|
||||||
reject: (error) => {
|
|
||||||
logger.error('Gateway connect handshake failed:', error);
|
|
||||||
rejectOnce(error);
|
|
||||||
},
|
|
||||||
timeout: requestTimeout,
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
// Timeout for receiving the initial connect.challenge from the server.
|
|
||||||
// Without this, if the server never sends the challenge (e.g. orphaned
|
|
||||||
// process from a different version), the connect() promise hangs forever.
|
|
||||||
challengeTimer = setTimeout(() => {
|
|
||||||
if (!challengeReceived && !settled) {
|
|
||||||
logger.error('Gateway connect.challenge not received within timeout');
|
|
||||||
this.ws?.close();
|
|
||||||
rejectOnce(new Error('Timed out waiting for connect.challenge from Gateway'));
|
|
||||||
}
|
|
||||||
}, 10000);
|
|
||||||
|
|
||||||
this.ws.on('open', () => {
|
|
||||||
logger.debug('Gateway WebSocket opened, waiting for connect.challenge...');
|
|
||||||
});
|
|
||||||
|
|
||||||
let challengeReceived = false;
|
|
||||||
|
|
||||||
this.ws.on('message', (data) => {
|
|
||||||
try {
|
|
||||||
const message = JSON.parse(data.toString());
|
|
||||||
|
|
||||||
// Intercept the connect.challenge event before the general handler
|
|
||||||
if (
|
|
||||||
!challengeReceived &&
|
|
||||||
typeof message === 'object' && message !== null &&
|
|
||||||
message.type === 'event' && message.event === 'connect.challenge'
|
|
||||||
) {
|
|
||||||
challengeReceived = true;
|
|
||||||
if (challengeTimer) {
|
|
||||||
clearTimeout(challengeTimer);
|
|
||||||
challengeTimer = null;
|
|
||||||
}
|
|
||||||
const nonce = message.payload?.nonce as string | undefined;
|
|
||||||
if (!nonce) {
|
|
||||||
rejectOnce(new Error('Gateway connect.challenge missing nonce'));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.debug('Received connect.challenge, sending handshake');
|
|
||||||
sendConnectHandshake(nonce);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.handleMessage(message);
|
|
||||||
} catch (error) {
|
|
||||||
logger.debug('Failed to parse Gateway WebSocket message:', error);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
this.ws.on('close', (code, reason) => {
|
|
||||||
const reasonStr = reason?.toString() || 'unknown';
|
|
||||||
logger.warn(`Gateway WebSocket closed (code=${code}, reason=${reasonStr}, handshake=${handshakeComplete ? 'ok' : 'pending'})`);
|
|
||||||
if (!handshakeComplete) {
|
|
||||||
// If the socket closes before the handshake completes, it usually means the server is still starting or restarting.
|
|
||||||
// Rejecting this promise will cause the caller (startProcess/reconnect logic) to retry cleanly.
|
|
||||||
rejectOnce(new Error(`WebSocket closed before handshake: ${reasonStr}`));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
cleanupHandshakeRequest();
|
|
||||||
if (this.status.state === 'running') {
|
if (this.status.state === 'running') {
|
||||||
this.setStatus({ state: 'stopped' });
|
this.setStatus({ state: 'stopped' });
|
||||||
this.scheduleReconnect();
|
this.scheduleReconnect();
|
||||||
}
|
}
|
||||||
});
|
},
|
||||||
|
|
||||||
this.ws.on('error', (error) => {
|
|
||||||
// Suppress noisy ECONNREFUSED/WebSocket handshake errors that happen during expected Gateway restarts.
|
|
||||||
if (error.message?.includes('closed before handshake') || (error as NodeJS.ErrnoException).code === 'ECONNREFUSED') {
|
|
||||||
logger.debug(`Gateway WebSocket connection error (transient): ${error.message}`);
|
|
||||||
} else {
|
|
||||||
logger.error('Gateway WebSocket error:', error);
|
|
||||||
}
|
|
||||||
if (!handshakeComplete) {
|
|
||||||
rejectOnce(error);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,6 +12,14 @@ const INVALID_CONFIG_PATTERNS: RegExp[] = [
|
|||||||
/\brun:\s*openclaw doctor --fix\b/i,
|
/\brun:\s*openclaw doctor --fix\b/i,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
const TRANSIENT_START_ERROR_PATTERNS: RegExp[] = [
|
||||||
|
/WebSocket closed before handshake/i,
|
||||||
|
/ECONNREFUSED/i,
|
||||||
|
/Gateway process exited before becoming ready/i,
|
||||||
|
/Timed out waiting for connect\.challenge/i,
|
||||||
|
/Connect handshake timeout/i,
|
||||||
|
];
|
||||||
|
|
||||||
function normalizeLogLine(value: string): string {
|
function normalizeLogLine(value: string): string {
|
||||||
return value.trim();
|
return value.trim();
|
||||||
}
|
}
|
||||||
@@ -58,3 +66,34 @@ export function shouldAttemptConfigAutoRepair(
|
|||||||
return hasInvalidConfigFailureSignal(startupError, startupStderrLines);
|
return hasInvalidConfigFailureSignal(startupError, startupStderrLines);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isTransientGatewayStartError(error: unknown): boolean {
|
||||||
|
const errorText = error instanceof Error
|
||||||
|
? `${error.name}: ${error.message}`
|
||||||
|
: String(error ?? '');
|
||||||
|
return TRANSIENT_START_ERROR_PATTERNS.some((pattern) => pattern.test(errorText));
|
||||||
|
}
|
||||||
|
|
||||||
|
export type GatewayStartupRecoveryAction = 'repair' | 'retry' | 'fail';
|
||||||
|
|
||||||
|
export function getGatewayStartupRecoveryAction(options: {
|
||||||
|
startupError: unknown;
|
||||||
|
startupStderrLines: string[];
|
||||||
|
configRepairAttempted: boolean;
|
||||||
|
attempt: number;
|
||||||
|
maxAttempts: number;
|
||||||
|
}): GatewayStartupRecoveryAction {
|
||||||
|
if (shouldAttemptConfigAutoRepair(
|
||||||
|
options.startupError,
|
||||||
|
options.startupStderrLines,
|
||||||
|
options.configRepairAttempted,
|
||||||
|
)) {
|
||||||
|
return 'repair';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (options.attempt < options.maxAttempts && isTransientGatewayStartError(options.startupError)) {
|
||||||
|
return 'retry';
|
||||||
|
}
|
||||||
|
|
||||||
|
return 'fail';
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
import WebSocket from 'ws';
|
import WebSocket from 'ws';
|
||||||
import type { DeviceIdentity } from '../utils/device-identity';
|
import type { DeviceIdentity } from '../utils/device-identity';
|
||||||
|
import type { PendingGatewayRequest } from './request-store';
|
||||||
import {
|
import {
|
||||||
buildDeviceAuthPayload,
|
buildDeviceAuthPayload,
|
||||||
publicKeyRawBase64UrlFromPem,
|
publicKeyRawBase64UrlFromPem,
|
||||||
signDevicePayload,
|
signDevicePayload,
|
||||||
} from '../utils/device-identity';
|
} from '../utils/device-identity';
|
||||||
|
import { logger } from '../utils/logger';
|
||||||
|
|
||||||
export async function probeGatewayReady(
|
export async function probeGatewayReady(
|
||||||
port: number,
|
port: number,
|
||||||
@@ -120,3 +122,160 @@ export function buildGatewayConnectFrame(options: {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function connectGatewaySocket(options: {
|
||||||
|
port: number;
|
||||||
|
deviceIdentity: DeviceIdentity | null;
|
||||||
|
platform: string;
|
||||||
|
pendingRequests: Map<string, PendingGatewayRequest>;
|
||||||
|
getToken: () => Promise<string>;
|
||||||
|
onHandshakeComplete: (ws: WebSocket) => void;
|
||||||
|
onMessage: (message: unknown) => void;
|
||||||
|
onCloseAfterHandshake: () => void;
|
||||||
|
}): Promise<WebSocket> {
|
||||||
|
logger.debug(`Connecting Gateway WebSocket (ws://localhost:${options.port}/ws)`);
|
||||||
|
|
||||||
|
return await new Promise<WebSocket>((resolve, reject) => {
|
||||||
|
const wsUrl = `ws://localhost:${options.port}/ws`;
|
||||||
|
const ws = new WebSocket(wsUrl);
|
||||||
|
let handshakeComplete = false;
|
||||||
|
let connectId: string | null = null;
|
||||||
|
let handshakeTimeout: NodeJS.Timeout | null = null;
|
||||||
|
let challengeTimer: NodeJS.Timeout | null = null;
|
||||||
|
let challengeReceived = false;
|
||||||
|
let settled = false;
|
||||||
|
|
||||||
|
const cleanupHandshakeRequest = () => {
|
||||||
|
if (challengeTimer) {
|
||||||
|
clearTimeout(challengeTimer);
|
||||||
|
challengeTimer = null;
|
||||||
|
}
|
||||||
|
if (handshakeTimeout) {
|
||||||
|
clearTimeout(handshakeTimeout);
|
||||||
|
handshakeTimeout = null;
|
||||||
|
}
|
||||||
|
if (connectId && options.pendingRequests.has(connectId)) {
|
||||||
|
const request = options.pendingRequests.get(connectId);
|
||||||
|
if (request) {
|
||||||
|
clearTimeout(request.timeout);
|
||||||
|
}
|
||||||
|
options.pendingRequests.delete(connectId);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const resolveOnce = () => {
|
||||||
|
if (settled) return;
|
||||||
|
settled = true;
|
||||||
|
cleanupHandshakeRequest();
|
||||||
|
resolve(ws);
|
||||||
|
};
|
||||||
|
|
||||||
|
const rejectOnce = (error: unknown) => {
|
||||||
|
if (settled) return;
|
||||||
|
settled = true;
|
||||||
|
cleanupHandshakeRequest();
|
||||||
|
reject(error instanceof Error ? error : new Error(String(error)));
|
||||||
|
};
|
||||||
|
|
||||||
|
const sendConnectHandshake = async (challengeNonce: string) => {
|
||||||
|
logger.debug('Sending connect handshake with challenge nonce');
|
||||||
|
|
||||||
|
const currentToken = await options.getToken();
|
||||||
|
const connectPayload = buildGatewayConnectFrame({
|
||||||
|
challengeNonce,
|
||||||
|
token: currentToken,
|
||||||
|
deviceIdentity: options.deviceIdentity,
|
||||||
|
platform: options.platform,
|
||||||
|
});
|
||||||
|
connectId = connectPayload.connectId;
|
||||||
|
|
||||||
|
ws.send(JSON.stringify(connectPayload.frame));
|
||||||
|
|
||||||
|
const requestTimeout = setTimeout(() => {
|
||||||
|
if (!handshakeComplete) {
|
||||||
|
logger.error('Gateway connect handshake timed out');
|
||||||
|
ws.close();
|
||||||
|
rejectOnce(new Error('Connect handshake timeout'));
|
||||||
|
}
|
||||||
|
}, 10000);
|
||||||
|
handshakeTimeout = requestTimeout;
|
||||||
|
|
||||||
|
options.pendingRequests.set(connectId, {
|
||||||
|
resolve: () => {
|
||||||
|
handshakeComplete = true;
|
||||||
|
logger.debug('Gateway connect handshake completed');
|
||||||
|
options.onHandshakeComplete(ws);
|
||||||
|
resolveOnce();
|
||||||
|
},
|
||||||
|
reject: (error) => {
|
||||||
|
logger.error('Gateway connect handshake failed:', error);
|
||||||
|
rejectOnce(error);
|
||||||
|
},
|
||||||
|
timeout: requestTimeout,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
challengeTimer = setTimeout(() => {
|
||||||
|
if (!challengeReceived && !settled) {
|
||||||
|
logger.error('Gateway connect.challenge not received within timeout');
|
||||||
|
ws.close();
|
||||||
|
rejectOnce(new Error('Timed out waiting for connect.challenge from Gateway'));
|
||||||
|
}
|
||||||
|
}, 10000);
|
||||||
|
|
||||||
|
ws.on('open', () => {
|
||||||
|
logger.debug('Gateway WebSocket opened, waiting for connect.challenge...');
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('message', (data) => {
|
||||||
|
try {
|
||||||
|
const message = JSON.parse(data.toString());
|
||||||
|
if (
|
||||||
|
!challengeReceived &&
|
||||||
|
typeof message === 'object' && message !== null &&
|
||||||
|
message.type === 'event' && message.event === 'connect.challenge'
|
||||||
|
) {
|
||||||
|
challengeReceived = true;
|
||||||
|
if (challengeTimer) {
|
||||||
|
clearTimeout(challengeTimer);
|
||||||
|
challengeTimer = null;
|
||||||
|
}
|
||||||
|
const nonce = message.payload?.nonce as string | undefined;
|
||||||
|
if (!nonce) {
|
||||||
|
rejectOnce(new Error('Gateway connect.challenge missing nonce'));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.debug('Received connect.challenge, sending handshake');
|
||||||
|
void sendConnectHandshake(nonce);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
options.onMessage(message);
|
||||||
|
} catch (error) {
|
||||||
|
logger.debug('Failed to parse Gateway WebSocket message:', error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('close', (code, reason) => {
|
||||||
|
const reasonStr = reason?.toString() || 'unknown';
|
||||||
|
logger.warn(`Gateway WebSocket closed (code=${code}, reason=${reasonStr}, handshake=${handshakeComplete ? 'ok' : 'pending'})`);
|
||||||
|
if (!handshakeComplete) {
|
||||||
|
rejectOnce(new Error(`WebSocket closed before handshake: ${reasonStr}`));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cleanupHandshakeRequest();
|
||||||
|
options.onCloseAfterHandshake();
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('error', (error) => {
|
||||||
|
if (error.message?.includes('closed before handshake') || (error as NodeJS.ErrnoException).code === 'ECONNREFUSED') {
|
||||||
|
logger.debug(`Gateway WebSocket connection error (transient): ${error.message}`);
|
||||||
|
} else {
|
||||||
|
logger.error('Gateway WebSocket error:', error);
|
||||||
|
}
|
||||||
|
if (!handshakeComplete) {
|
||||||
|
rejectOnce(error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user