refactor clawx

This commit is contained in:
paisley
2026-03-07 15:51:44 +08:00
parent b41a8eedd9
commit 3c9cec9fb3
40 changed files with 2567 additions and 638 deletions

View File

@@ -17,7 +17,7 @@ import {
import { getAllSettings, getSetting } from '../utils/store';
import { getApiKey, getDefaultProvider, getProvider } from '../utils/secure-storage';
import { getProviderEnvVar, getKeyableProviderTypes } from '../utils/provider-registry';
import { GatewayEventType, JsonRpcNotification, isNotification, isResponse } from './protocol';
import { JsonRpcNotification, isNotification, isResponse } from './protocol';
import { logger } from '../utils/logger';
import { getUvMirrorEnv } from '../utils/uv-env';
import { isPythonReady, setupManagedPython } from '../utils/uv-setup';
@@ -40,6 +40,13 @@ import {
nextLifecycleEpoch,
shouldDeferRestart,
} from './process-policy';
import {
clearPendingGatewayRequests,
rejectPendingGatewayRequest,
resolvePendingGatewayRequest,
type PendingGatewayRequest,
} from './request-store';
import { dispatchJsonRpcNotification, dispatchProtocolEvent } from './event-dispatch';
/**
* Gateway connection status
@@ -213,11 +220,7 @@ export class GatewayManager extends EventEmitter {
private startLock = false;
private lastSpawnSummary: string | null = null;
private recentStartupStderrLines: string[] = [];
private pendingRequests: Map<string, {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
}> = new Map();
private pendingRequests: Map<string, PendingGatewayRequest> = new Map();
private deviceIdentity: DeviceIdentity | null = null;
private restartDebounceTimer: NodeJS.Timeout | null = null;
private lifecycleEpoch = 0;
@@ -580,12 +583,7 @@ export class GatewayManager extends EventEmitter {
}
this.ownsProcess = false;
// Reject all pending requests
for (const [, request] of this.pendingRequests) {
clearTimeout(request.timeout);
request.reject(new Error('Gateway stopped'));
}
this.pendingRequests.clear();
clearPendingGatewayRequests(this.pendingRequests, new Error('Gateway stopped'));
this.deferredRestartPending = false;
this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined });
@@ -677,8 +675,7 @@ export class GatewayManager extends EventEmitter {
// Set timeout for request
const timeout = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`RPC timeout: ${method}`));
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`RPC timeout: ${method}`));
}, timeoutMs);
// Store pending request
@@ -699,9 +696,7 @@ export class GatewayManager extends EventEmitter {
try {
this.ws.send(JSON.stringify(request));
} catch (error) {
this.pendingRequests.delete(id);
clearTimeout(timeout);
reject(new Error(`Failed to send RPC request: ${error}`));
rejectPendingGatewayRequest(this.pendingRequests, id, new Error(`Failed to send RPC request: ${error}`));
}
});
}
@@ -1565,118 +1560,45 @@ export class GatewayManager extends EventEmitter {
// Handle OpenClaw protocol response format: { type: "res", id: "...", ok: true/false, ... }
if (msg.type === 'res' && typeof msg.id === 'string') {
if (this.pendingRequests.has(msg.id)) {
const request = this.pendingRequests.get(msg.id)!;
clearTimeout(request.timeout);
this.pendingRequests.delete(msg.id);
if (msg.ok === false || msg.error) {
const errorObj = msg.error as { message?: string; code?: number } | undefined;
const errorMsg = errorObj?.message || JSON.stringify(msg.error) || 'Unknown error';
request.reject(new Error(errorMsg));
} else {
request.resolve(msg.payload ?? msg);
if (msg.ok === false || msg.error) {
const errorObj = msg.error as { message?: string; code?: number } | undefined;
const errorMsg = errorObj?.message || JSON.stringify(msg.error) || 'Unknown error';
if (rejectPendingGatewayRequest(this.pendingRequests, msg.id, new Error(errorMsg))) {
return;
}
} else if (resolvePendingGatewayRequest(this.pendingRequests, msg.id, msg.payload ?? msg)) {
return;
}
}
// Handle OpenClaw protocol event format: { type: "event", event: "...", payload: {...} }
if (msg.type === 'event' && typeof msg.event === 'string') {
this.handleProtocolEvent(msg.event, msg.payload);
dispatchProtocolEvent(this, msg.event, msg.payload);
return;
}
// Fallback: Check if this is a JSON-RPC 2.0 response (legacy support)
if (isResponse(message) && message.id && this.pendingRequests.has(String(message.id))) {
const request = this.pendingRequests.get(String(message.id))!;
clearTimeout(request.timeout);
this.pendingRequests.delete(String(message.id));
if (message.error) {
const errorMsg = typeof message.error === 'object'
? (message.error as { message?: string }).message || JSON.stringify(message.error)
: String(message.error);
request.reject(new Error(errorMsg));
rejectPendingGatewayRequest(this.pendingRequests, String(message.id), new Error(errorMsg));
} else {
request.resolve(message.result);
resolvePendingGatewayRequest(this.pendingRequests, String(message.id), message.result);
}
return;
}
// Check if this is a JSON-RPC notification (server-initiated event)
if (isNotification(message)) {
this.handleNotification(message);
dispatchJsonRpcNotification(this, message);
return;
}
this.emit('message', message);
}
/**
* Handle OpenClaw protocol events
*/
private handleProtocolEvent(event: string, payload: unknown): void {
switch (event) {
case 'tick':
break;
case 'chat':
this.emit('chat:message', { message: payload });
break;
case 'agent': {
// Agent events may carry chat streaming data inside payload.data,
// or be lifecycle events (phase=started/completed) with no message.
const p = payload as Record<string, unknown>;
const data = (p.data && typeof p.data === 'object') ? p.data as Record<string, unknown> : {};
const chatEvent: Record<string, unknown> = {
...data,
runId: p.runId ?? data.runId,
sessionKey: p.sessionKey ?? data.sessionKey,
state: p.state ?? data.state,
message: p.message ?? data.message,
};
if (chatEvent.state || chatEvent.message) {
this.emit('chat:message', { message: chatEvent });
}
this.emit('notification', { method: event, params: payload });
break;
}
case 'channel.status':
this.emit('channel:status', payload as { channelId: string; status: string });
break;
default:
this.emit('notification', { method: event, params: payload });
}
}
/**
* Handle server-initiated notifications
*/
private handleNotification(notification: JsonRpcNotification): void {
this.emit('notification', notification);
// Route specific events
switch (notification.method) {
case GatewayEventType.CHANNEL_STATUS_CHANGED:
this.emit('channel:status', notification.params as { channelId: string; status: string });
break;
case GatewayEventType.MESSAGE_RECEIVED:
this.emit('chat:message', notification.params as { message: unknown });
break;
case GatewayEventType.ERROR: {
const errorData = notification.params as { message?: string };
this.emit('error', new Error(errorData.message || 'Gateway error'));
break;
}
default:
// Unknown notification type, just log it
logger.debug(`Unknown Gateway notification: ${notification.method}`);
}
}
/**
* Start ping interval to keep connection alive
*/