Files
zn-ai/electron/gateway/ws-client.ts
DEV_DSW 71bcc3b3c5 feat: implement telemetry system for application usage tracking
- Added telemetry utility to capture application events and metrics.
- Integrated PostHog for event tracking with distinct user identification.
- Implemented telemetry initialization, event capturing, and shutdown procedures.

feat: add UV environment setup for Python management

- Created utilities to manage Python installation and configuration.
- Implemented network optimization checks for Python installation mirrors.
- Added functions to set up managed Python environments with error handling.

feat: enhance host API communication with token management

- Introduced host API token retrieval and management for secure requests.
- Updated host API fetch functions to include token in headers.
- Added support for creating event sources with authentication.

test: add comprehensive tests for gateway protocol and startup helpers

- Implemented unit tests for gateway protocol helpers, event dispatching, and state management.
- Added tests for startup recovery strategies and process policies.
- Ensured coverage for connection monitoring and restart governance logic.
2026-04-23 17:21:57 +08:00

318 lines
8.5 KiB
TypeScript

import WebSocket from 'ws';
import type { DeviceIdentity } from '@electron/utils/device-identity';
import {
buildDeviceAuthPayload,
publicKeyRawBase64UrlFromPem,
signDevicePayload,
} from '@electron/utils/device-identity';
export const GATEWAY_CHALLENGE_TIMEOUT_MS = 10_000;
export const GATEWAY_CONNECT_HANDSHAKE_TIMEOUT_MS = 20_000;
type GatewayProtocolFrame =
| {
type?: string;
event?: string;
id?: string;
ok?: boolean;
payload?: unknown;
error?: unknown;
}
| null;
function parseGatewayFrame(data: WebSocket.RawData): GatewayProtocolFrame {
const text = data.toString();
if (!text) {
return null;
}
return JSON.parse(text) as GatewayProtocolFrame;
}
function buildGatewayConnectFrame(options: {
challengeNonce: string;
token: string;
deviceIdentity: DeviceIdentity | null;
platform: string;
}): { connectId: string; frame: Record<string, unknown> } {
const connectId = `connect-${Date.now()}`;
const clientId = 'gateway-client';
const clientMode = 'ui';
const role = 'operator';
const scopes = ['operator.admin'];
const signedAtMs = Date.now();
const device = (() => {
if (!options.deviceIdentity) {
return undefined;
}
const payload = buildDeviceAuthPayload({
deviceId: options.deviceIdentity.deviceId,
clientId,
clientMode,
role,
scopes,
signedAtMs,
token: options.token,
nonce: options.challengeNonce,
});
return {
id: options.deviceIdentity.deviceId,
publicKey: publicKeyRawBase64UrlFromPem(options.deviceIdentity.publicKeyPem),
signature: signDevicePayload(options.deviceIdentity.privateKeyPem, payload),
signedAt: signedAtMs,
nonce: options.challengeNonce,
};
})();
return {
connectId,
frame: {
type: 'req',
id: connectId,
method: 'connect',
params: {
minProtocol: 3,
maxProtocol: 3,
client: {
id: clientId,
displayName: 'zn-ai',
version: '1.0.0',
platform: options.platform,
mode: clientMode,
},
auth: {
token: options.token,
},
caps: [],
role,
scopes,
device,
},
},
};
}
export async function probeGatewayReady(
port: number,
timeoutMs = 1500,
): Promise<boolean> {
return await new Promise<boolean>((resolve) => {
const ws = new WebSocket(`ws://127.0.0.1:${port}/ws`);
let settled = false;
const resolveOnce = (value: boolean) => {
if (settled) return;
settled = true;
clearTimeout(timeout);
try {
ws.terminate();
} catch {
// ignore probe close errors
}
resolve(value);
};
const timeout = setTimeout(() => {
resolveOnce(false);
}, timeoutMs);
ws.on('message', (data) => {
try {
const message = parseGatewayFrame(data);
if (message?.type === 'event' && message.event === 'connect.challenge') {
resolveOnce(true);
}
} catch {
// ignore malformed probe payloads
}
});
ws.on('error', () => {
resolveOnce(false);
});
ws.on('close', () => {
resolveOnce(false);
});
});
}
export async function waitForGatewayReady(options: {
port: number;
getProcessExitCode: () => number | null;
timeoutMs?: number;
intervalMs?: number;
probeTimeoutMs?: number;
}): Promise<void> {
const timeoutMs = options.timeoutMs ?? (process.platform === 'win32' ? 180_000 : 90_000);
const intervalMs = options.intervalMs ?? 200;
const probeTimeoutMs = options.probeTimeoutMs ?? 1500;
const startedAt = Date.now();
while (Date.now() - startedAt < timeoutMs) {
const exitCode = options.getProcessExitCode();
if (exitCode !== null) {
throw new Error(`OpenClaw Gateway exited before becoming ready (code=${exitCode})`);
}
const ready = await probeGatewayReady(options.port, probeTimeoutMs);
if (ready) {
return;
}
await new Promise((resolve) => setTimeout(resolve, intervalMs));
}
throw new Error(`OpenClaw Gateway failed to become ready on port ${options.port} within ${timeoutMs}ms`);
}
export async function connectGatewaySocket(options: {
port: number;
token: string;
deviceIdentity: DeviceIdentity | null;
platform: string;
onMessage: (message: unknown) => void;
onCloseAfterHandshake: (socket: WebSocket, code: number) => void;
challengeTimeoutMs?: number;
connectTimeoutMs?: number;
}): Promise<WebSocket> {
const challengeTimeoutMs = options.challengeTimeoutMs ?? GATEWAY_CHALLENGE_TIMEOUT_MS;
const connectTimeoutMs = options.connectTimeoutMs ?? GATEWAY_CONNECT_HANDSHAKE_TIMEOUT_MS;
return await new Promise<WebSocket>((resolve, reject) => {
const ws = new WebSocket(`ws://127.0.0.1:${options.port}/ws`);
let handshakeComplete = false;
let settled = false;
let challengeTimer: NodeJS.Timeout | null = null;
let handshakeTimer: NodeJS.Timeout | null = null;
let connectId: string | null = null;
const cleanup = () => {
if (challengeTimer) {
clearTimeout(challengeTimer);
challengeTimer = null;
}
if (handshakeTimer) {
clearTimeout(handshakeTimer);
handshakeTimer = null;
}
};
const resolveOnce = () => {
if (settled) return;
settled = true;
cleanup();
resolve(ws);
};
const rejectOnce = (error: unknown) => {
if (settled) return;
settled = true;
cleanup();
try {
ws.terminate();
} catch {
// ignore terminate errors
}
reject(error instanceof Error ? error : new Error(String(error)));
};
challengeTimer = setTimeout(() => {
rejectOnce(new Error('Timed out waiting for connect.challenge from OpenClaw Gateway'));
}, challengeTimeoutMs);
ws.on('message', (data) => {
try {
const message = parseGatewayFrame(data);
if (!message) {
return;
}
if (!handshakeComplete && message.type === 'event' && message.event === 'connect.challenge') {
if (challengeTimer) {
clearTimeout(challengeTimer);
challengeTimer = null;
}
const nonce = (
typeof message.payload === 'object' &&
message.payload !== null &&
'nonce' in message.payload &&
typeof (message.payload as { nonce?: unknown }).nonce === 'string'
)
? (message.payload as { nonce: string }).nonce
: '';
if (!nonce) {
rejectOnce(new Error('OpenClaw Gateway connect.challenge missing nonce'));
return;
}
const payload = buildGatewayConnectFrame({
challengeNonce: nonce,
token: options.token,
deviceIdentity: options.deviceIdentity,
platform: options.platform,
});
connectId = payload.connectId;
ws.send(JSON.stringify(payload.frame));
handshakeTimer = setTimeout(() => {
rejectOnce(new Error('Timed out waiting for OpenClaw Gateway connect response'));
}, connectTimeoutMs);
return;
}
if (!handshakeComplete && message.type === 'res' && message.id === connectId) {
if (message.ok === false) {
const errorMessage =
typeof message.error === 'string'
? message.error
: (
typeof message.error === 'object' &&
message.error !== null &&
'message' in message.error &&
typeof (message.error as { message?: unknown }).message === 'string'
)
? (message.error as { message: string }).message
: 'OpenClaw Gateway connect handshake failed';
rejectOnce(new Error(errorMessage));
return;
}
handshakeComplete = true;
resolveOnce();
return;
}
if (handshakeComplete) {
options.onMessage(message);
}
} catch (error) {
if (!handshakeComplete) {
rejectOnce(error);
}
}
});
ws.on('close', (code) => {
if (!handshakeComplete) {
rejectOnce(new Error(`OpenClaw Gateway socket closed before handshake (code=${code})`));
return;
}
cleanup();
options.onCloseAfterHandshake(ws, code);
});
ws.on('error', (error) => {
if (!handshakeComplete) {
rejectOnce(error);
}
});
});
}