feat: implement telemetry system for application usage tracking
- Added telemetry utility to capture application events and metrics. - Integrated PostHog for event tracking with distinct user identification. - Implemented telemetry initialization, event capturing, and shutdown procedures. feat: add UV environment setup for Python management - Created utilities to manage Python installation and configuration. - Implemented network optimization checks for Python installation mirrors. - Added functions to set up managed Python environments with error handling. feat: enhance host API communication with token management - Introduced host API token retrieval and management for secure requests. - Updated host API fetch functions to include token in headers. - Added support for creating event sources with authentication. test: add comprehensive tests for gateway protocol and startup helpers - Implemented unit tests for gateway protocol helpers, event dispatching, and state management. - Added tests for startup recovery strategies and process policies. - Ensured coverage for connection monitoring and restart governance logic.
This commit is contained in:
340
electron/gateway/supervisor.ts
Normal file
340
electron/gateway/supervisor.ts
Normal file
@@ -0,0 +1,340 @@
|
||||
import { exec } from 'node:child_process';
|
||||
import { once } from 'node:events';
|
||||
import { existsSync } from 'node:fs';
|
||||
import { createServer } from 'node:net';
|
||||
import { join } from 'node:path';
|
||||
import { app, utilityProcess } from 'electron';
|
||||
import logManager from '@electron/service/logger';
|
||||
import { getOpenClawDir, getOpenClawEntryPath } from '@electron/utils/paths';
|
||||
import { prependPathEntry } from '@electron/utils/env-path';
|
||||
import { getUvMirrorEnv } from '@electron/utils/uv-env';
|
||||
import { isPythonReady, setupManagedPython } from '@electron/utils/uv-setup';
|
||||
import type { GatewayProcessHandle } from './process-handle';
|
||||
import { probeGatewayReady } from './ws-client';
|
||||
|
||||
export function warmupManagedPythonReadiness(): void {
|
||||
void isPythonReady()
|
||||
.then((pythonReady) => {
|
||||
if (!pythonReady) {
|
||||
logManager.info('Python environment missing or incomplete, attempting background repair...');
|
||||
void setupManagedPython().catch((error) => {
|
||||
logManager.error('Background Python repair failed:', error);
|
||||
});
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logManager.error('Failed to check Python environment:', error);
|
||||
});
|
||||
}
|
||||
|
||||
async function getListeningProcessIds(port: number): Promise<string[]> {
|
||||
const command = process.platform === 'win32'
|
||||
? `netstat -ano | findstr :${port}`
|
||||
: `lsof -i :${port} -sTCP:LISTEN -t`;
|
||||
|
||||
const stdout = await new Promise<string>((resolve) => {
|
||||
exec(command, { timeout: 5000, windowsHide: true }, (error, result) => {
|
||||
if (error) {
|
||||
resolve('');
|
||||
return;
|
||||
}
|
||||
resolve(result);
|
||||
});
|
||||
});
|
||||
|
||||
if (!stdout.trim()) {
|
||||
return [];
|
||||
}
|
||||
|
||||
if (process.platform === 'win32') {
|
||||
const pids: string[] = [];
|
||||
for (const line of stdout.trim().split(/\r?\n/)) {
|
||||
const parts = line.trim().split(/\s+/);
|
||||
if (parts.length >= 5 && parts[3] === 'LISTENING') {
|
||||
pids.push(parts[4]);
|
||||
}
|
||||
}
|
||||
return [...new Set(pids)];
|
||||
}
|
||||
|
||||
return [...new Set(stdout.trim().split(/\r?\n/).map((value) => value.trim()).filter(Boolean))];
|
||||
}
|
||||
|
||||
async function terminateOrphanedProcessIds(port: number, pids: string[]): Promise<void> {
|
||||
logManager.warn(`Found orphaned Gateway listener on port ${port}; terminating PIDs: ${pids.join(', ')}`);
|
||||
|
||||
for (const pid of pids) {
|
||||
try {
|
||||
if (process.platform === 'win32') {
|
||||
await new Promise<void>((resolve) => {
|
||||
exec(
|
||||
`taskkill /F /PID ${pid} /T`,
|
||||
{ timeout: 5000, windowsHide: true },
|
||||
() => resolve(),
|
||||
);
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
process.kill(Number.parseInt(pid, 10), 'SIGTERM');
|
||||
} catch {
|
||||
// ignore already-exited processes
|
||||
}
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, process.platform === 'win32' ? 1500 : 1000));
|
||||
}
|
||||
|
||||
export async function terminateOwnedGatewayProcess(child: GatewayProcessHandle): Promise<void> {
|
||||
const pid = child.pid;
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
let exited = false;
|
||||
let timeout: NodeJS.Timeout | null = null;
|
||||
|
||||
child.once('exit', () => {
|
||||
exited = true;
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
|
||||
logManager.info(`Sending kill to Gateway process (pid=${pid ?? 'unknown'})`);
|
||||
|
||||
if (process.platform === 'win32' && pid) {
|
||||
exec(`taskkill /F /PID ${pid} /T`, { timeout: 5000, windowsHide: true }, () => {
|
||||
if (!exited) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
child.kill();
|
||||
} catch {
|
||||
resolve();
|
||||
}
|
||||
|
||||
timeout = setTimeout(() => {
|
||||
if (!exited) {
|
||||
logManager.warn(`Gateway did not exit in time, force-killing (pid=${pid ?? 'unknown'})`);
|
||||
if (pid) {
|
||||
if (process.platform === 'win32') {
|
||||
exec(`taskkill /F /PID ${pid} /T`, { timeout: 5000, windowsHide: true }, () => {
|
||||
resolve();
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
process.kill(pid, 'SIGKILL');
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
resolve();
|
||||
}, 5000);
|
||||
});
|
||||
}
|
||||
|
||||
export async function unloadLaunchctlGatewayService(): Promise<void> {
|
||||
if (process.platform !== 'darwin') {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const uid = process.getuid?.();
|
||||
if (uid === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
const launchdLabel = 'ai.openclaw.gateway';
|
||||
const serviceTarget = `gui/${uid}/${launchdLabel}`;
|
||||
|
||||
const loaded = await new Promise<boolean>((resolve) => {
|
||||
exec(`launchctl print ${serviceTarget}`, { timeout: 5000 }, (error) => {
|
||||
resolve(!error);
|
||||
});
|
||||
});
|
||||
|
||||
if (!loaded) {
|
||||
return;
|
||||
}
|
||||
|
||||
logManager.info(`Unloading launchctl service ${serviceTarget} to prevent auto-respawn`);
|
||||
await new Promise<void>((resolve) => {
|
||||
exec(`launchctl bootout ${serviceTarget}`, { timeout: 10000 }, (error) => {
|
||||
if (error) {
|
||||
logManager.warn(`Failed to bootout launchctl service: ${error.message}`);
|
||||
} else {
|
||||
logManager.info('Successfully unloaded launchctl gateway service');
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
} catch (error) {
|
||||
logManager.warn('Error while unloading launchctl gateway service:', error);
|
||||
}
|
||||
}
|
||||
|
||||
async function canListenOnPort(port: number): Promise<boolean> {
|
||||
const server = createServer();
|
||||
try {
|
||||
server.listen(port, '127.0.0.1');
|
||||
await once(server, 'listening');
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
} finally {
|
||||
try {
|
||||
server.close();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function waitForPortFree(
|
||||
port: number,
|
||||
timeoutMs = 30_000,
|
||||
intervalMs = 500,
|
||||
): Promise<void> {
|
||||
const startAt = Date.now();
|
||||
let logged = false;
|
||||
|
||||
while (Date.now() - startAt < timeoutMs) {
|
||||
if (await canListenOnPort(port)) {
|
||||
const elapsed = Date.now() - startAt;
|
||||
if (elapsed > intervalMs) {
|
||||
logManager.info(`Port ${port} became available after ${elapsed}ms`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!logged) {
|
||||
logManager.info(`Waiting for port ${port} to become available (Windows TCP TIME_WAIT)...`);
|
||||
logged = true;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, intervalMs));
|
||||
}
|
||||
|
||||
logManager.error(`Port ${port} still occupied after ${timeoutMs}ms; aborting startup to avoid port conflict`);
|
||||
throw new Error(`Port ${port} still occupied after ${timeoutMs}ms`);
|
||||
}
|
||||
|
||||
export async function findExistingGatewayProcess(options: {
|
||||
port: number;
|
||||
ownedPid?: number | null;
|
||||
}): Promise<{ port: number; externalToken?: string } | null> {
|
||||
try {
|
||||
const pids = await getListeningProcessIds(options.port);
|
||||
if (pids.length > 0 && (!options.ownedPid || !pids.includes(String(options.ownedPid)))) {
|
||||
await terminateOrphanedProcessIds(options.port, pids);
|
||||
if (process.platform === 'win32') {
|
||||
await waitForPortFree(options.port, 10_000);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
const ready = await probeGatewayReady(options.port, 5_000);
|
||||
return ready ? { port: options.port } : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function runOpenClawDoctorRepair(): Promise<boolean> {
|
||||
const openclawDir = getOpenClawDir();
|
||||
const entryScript = getOpenClawEntryPath();
|
||||
if (!existsSync(entryScript)) {
|
||||
logManager.error(`Cannot run OpenClaw doctor repair: entry script not found at ${entryScript}`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const platform = process.platform;
|
||||
const arch = process.arch;
|
||||
const target = `${platform}-${arch}`;
|
||||
const binPath = app.isPackaged
|
||||
? join(process.resourcesPath, 'bin')
|
||||
: join(process.cwd(), 'resources', 'bin', target);
|
||||
const binPathExists = existsSync(binPath);
|
||||
const baseProcessEnv = process.env as Record<string, string | undefined>;
|
||||
const baseEnvPatched = binPathExists
|
||||
? prependPathEntry(baseProcessEnv, binPath).env
|
||||
: baseProcessEnv;
|
||||
const uvEnv = await getUvMirrorEnv();
|
||||
const doctorArgs = ['doctor', '--fix', '--yes', '--non-interactive'];
|
||||
|
||||
logManager.info(
|
||||
`Running OpenClaw doctor repair (entry="${entryScript}", args="${doctorArgs.join(' ')}", cwd="${openclawDir}", bundledBin=${binPathExists ? 'yes' : 'no'})`,
|
||||
);
|
||||
|
||||
return await new Promise<boolean>((resolve) => {
|
||||
const forkEnv: Record<string, string | undefined> = {
|
||||
...baseEnvPatched,
|
||||
...uvEnv,
|
||||
OPENCLAW_NO_RESPAWN: '1',
|
||||
};
|
||||
|
||||
const child = utilityProcess.fork(entryScript, doctorArgs, {
|
||||
cwd: openclawDir,
|
||||
stdio: 'pipe',
|
||||
env: forkEnv as NodeJS.ProcessEnv,
|
||||
});
|
||||
|
||||
let settled = false;
|
||||
const finish = (ok: boolean) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
resolve(ok);
|
||||
};
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
logManager.error('OpenClaw doctor repair timed out after 120000ms');
|
||||
try {
|
||||
child.kill();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
finish(false);
|
||||
}, 120000);
|
||||
|
||||
child.on('error', (error) => {
|
||||
clearTimeout(timeout);
|
||||
logManager.error('Failed to spawn OpenClaw doctor repair process:', error);
|
||||
finish(false);
|
||||
});
|
||||
|
||||
child.stdout?.on('data', (data) => {
|
||||
const raw = data.toString();
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
const normalized = line.trim();
|
||||
if (!normalized) continue;
|
||||
logManager.debug(`[Gateway doctor stdout] ${normalized}`);
|
||||
}
|
||||
});
|
||||
|
||||
child.stderr?.on('data', (data) => {
|
||||
const raw = data.toString();
|
||||
for (const line of raw.split(/\r?\n/)) {
|
||||
const normalized = line.trim();
|
||||
if (!normalized) continue;
|
||||
logManager.warn(`[Gateway doctor stderr] ${normalized}`);
|
||||
}
|
||||
});
|
||||
|
||||
child.on('exit', (code) => {
|
||||
clearTimeout(timeout);
|
||||
if (code === 0) {
|
||||
logManager.info('OpenClaw doctor repair completed successfully');
|
||||
finish(true);
|
||||
return;
|
||||
}
|
||||
logManager.warn(`OpenClaw doctor repair exited (code=${code})`);
|
||||
finish(false);
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user