refractor

This commit is contained in:
paisley
2026-03-07 17:40:40 +08:00
parent c4f3749516
commit 7b3dadd8b0
3 changed files with 218 additions and 106 deletions

View File

@@ -0,0 +1,59 @@
import { logger } from '../utils/logger';
type HealthResult = { ok: boolean; error?: string };
export class GatewayConnectionMonitor {
private pingInterval: NodeJS.Timeout | null = null;
private healthCheckInterval: NodeJS.Timeout | null = null;
startPing(sendPing: () => void, intervalMs = 30000): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
}
this.pingInterval = setInterval(() => {
sendPing();
}, intervalMs);
}
startHealthCheck(options: {
shouldCheck: () => boolean;
checkHealth: () => Promise<HealthResult>;
onUnhealthy: (errorMessage: string) => void;
onError: (error: unknown) => void;
intervalMs?: number;
}): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
}
this.healthCheckInterval = setInterval(async () => {
if (!options.shouldCheck()) {
return;
}
try {
const health = await options.checkHealth();
if (!health.ok) {
const errorMessage = health.error ?? 'Health check failed';
logger.warn(`Gateway health check failed: ${errorMessage}`);
options.onUnhealthy(errorMessage);
}
} catch (error) {
logger.error('Gateway health check error:', error);
options.onError(error);
}
}, options.intervalMs ?? 30000);
}
clear(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
}
}

View File

@@ -23,12 +23,10 @@ import {
DEFAULT_RECONNECT_CONFIG, DEFAULT_RECONNECT_CONFIG,
type ReconnectConfig, type ReconnectConfig,
type GatewayLifecycleState, type GatewayLifecycleState,
getDeferredRestartAction,
getReconnectScheduleDecision, getReconnectScheduleDecision,
getReconnectSkipReason, getReconnectSkipReason,
isLifecycleSuperseded, isLifecycleSuperseded,
nextLifecycleEpoch, nextLifecycleEpoch,
shouldDeferRestart,
} from './process-policy'; } from './process-policy';
import { import {
clearPendingGatewayRequests, clearPendingGatewayRequests,
@@ -49,6 +47,8 @@ import {
waitForPortFree, waitForPortFree,
warmupManagedPythonReadiness, warmupManagedPythonReadiness,
} from './supervisor'; } from './supervisor';
import { GatewayConnectionMonitor } from './connection-monitor';
import { GatewayRestartController } from './restart-controller';
import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr'; import { classifyGatewayStderrMessage, recordGatewayStartupStderrLine } from './startup-stderr';
/** /**
@@ -201,8 +201,6 @@ export class GatewayManager extends EventEmitter {
private status: GatewayStatus = { state: 'stopped', port: PORTS.OPENCLAW_GATEWAY }; private status: GatewayStatus = { state: 'stopped', port: PORTS.OPENCLAW_GATEWAY };
private readonly stateController: GatewayStateController; private readonly stateController: GatewayStateController;
private reconnectTimer: NodeJS.Timeout | null = null; private reconnectTimer: NodeJS.Timeout | null = null;
private pingInterval: NodeJS.Timeout | null = null;
private healthCheckInterval: NodeJS.Timeout | null = null;
private reconnectAttempts = 0; private reconnectAttempts = 0;
private reconnectConfig: ReconnectConfig; private reconnectConfig: ReconnectConfig;
private shouldReconnect = true; private shouldReconnect = true;
@@ -211,10 +209,10 @@ export class GatewayManager extends EventEmitter {
private recentStartupStderrLines: string[] = []; private recentStartupStderrLines: string[] = [];
private pendingRequests: Map<string, PendingGatewayRequest> = new Map(); private pendingRequests: Map<string, PendingGatewayRequest> = new Map();
private deviceIdentity: DeviceIdentity | null = null; private deviceIdentity: DeviceIdentity | null = null;
private restartDebounceTimer: NodeJS.Timeout | null = null;
private lifecycleEpoch = 0; private lifecycleEpoch = 0;
private deferredRestartPending = false;
private restartInFlight: Promise<void> | null = null; private restartInFlight: Promise<void> | null = null;
private readonly connectionMonitor = new GatewayConnectionMonitor();
private readonly restartController = new GatewayRestartController();
constructor(config?: Partial<ReconnectConfig>) { constructor(config?: Partial<ReconnectConfig>) {
super(); super();
@@ -224,7 +222,19 @@ export class GatewayManager extends EventEmitter {
this.emit('status', status); this.emit('status', status);
}, },
onTransition: (previousState, nextState) => { onTransition: (previousState, nextState) => {
this.flushDeferredRestart(`status:${previousState}->${nextState}`); this.restartController.flushDeferredRestart(
`status:${previousState}->${nextState}`,
{
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
},
);
}, },
}); });
this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config }; this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...config };
@@ -266,56 +276,6 @@ export class GatewayManager extends EventEmitter {
} }
} }
private isRestartDeferred(): boolean {
return shouldDeferRestart({
state: this.status.state,
startLock: this.startLock,
});
}
private markDeferredRestart(reason: string): void {
if (!this.deferredRestartPending) {
logger.info(
`Deferring Gateway restart (${reason}) until startup/reconnect settles (state=${this.status.state}, startLock=${this.startLock})`
);
} else {
logger.debug(
`Gateway restart already deferred; keeping pending request (${reason}, state=${this.status.state}, startLock=${this.startLock})`
);
}
this.deferredRestartPending = true;
}
private flushDeferredRestart(trigger: string): void {
const action = getDeferredRestartAction({
hasPendingRestart: this.deferredRestartPending,
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
});
if (action === 'none') return;
if (action === 'wait') {
logger.debug(
`Deferred Gateway restart still waiting (${trigger}, state=${this.status.state}, startLock=${this.startLock})`
);
return;
}
this.deferredRestartPending = false;
if (action === 'drop') {
logger.info(
`Dropping deferred Gateway restart (${trigger}) because lifecycle already recovered (state=${this.status.state}, shouldReconnect=${this.shouldReconnect})`
);
return;
}
logger.info(`Executing deferred Gateway restart now (${trigger})`);
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
}
/** /**
* Get current Gateway status * Get current Gateway status
*/ */
@@ -466,7 +426,19 @@ export class GatewayManager extends EventEmitter {
throw error; throw error;
} finally { } finally {
this.startLock = false; this.startLock = false;
this.flushDeferredRestart('start:finally'); this.restartController.flushDeferredRestart(
'start:finally',
{
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
},
);
} }
} }
@@ -511,7 +483,7 @@ export class GatewayManager extends EventEmitter {
clearPendingGatewayRequests(this.pendingRequests, new Error('Gateway stopped')); clearPendingGatewayRequests(this.pendingRequests, new Error('Gateway stopped'));
this.deferredRestartPending = false; this.restartController.resetDeferredRestart();
this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined }); this.setStatus({ state: 'stopped', error: undefined, pid: undefined, connectedAt: undefined, uptime: undefined });
} }
@@ -519,8 +491,14 @@ export class GatewayManager extends EventEmitter {
* Restart Gateway process * Restart Gateway process
*/ */
async restart(): Promise<void> { async restart(): Promise<void> {
if (this.isRestartDeferred()) { if (this.restartController.isRestartDeferred({
this.markDeferredRestart('restart'); state: this.status.state,
startLock: this.startLock,
})) {
this.restartController.markDeferredRestart('restart', {
state: this.status.state,
startLock: this.startLock,
});
return; return;
} }
@@ -540,7 +518,19 @@ export class GatewayManager extends EventEmitter {
await this.restartInFlight; await this.restartInFlight;
} finally { } finally {
this.restartInFlight = null; this.restartInFlight = null;
this.flushDeferredRestart('restart:finally'); this.restartController.flushDeferredRestart(
'restart:finally',
{
state: this.status.state,
startLock: this.startLock,
shouldReconnect: this.shouldReconnect,
},
() => {
void this.restart().catch((error) => {
logger.warn('Deferred Gateway restart failed:', error);
});
},
);
} }
} }
@@ -552,16 +542,11 @@ export class GatewayManager extends EventEmitter {
* of each other during setup. * of each other during setup.
*/ */
debouncedRestart(delayMs = 2000): void { debouncedRestart(delayMs = 2000): void {
if (this.restartDebounceTimer) { this.restartController.debouncedRestart(delayMs, () => {
clearTimeout(this.restartDebounceTimer);
}
logger.debug(`Gateway restart debounced (will fire in ${delayMs}ms)`);
this.restartDebounceTimer = setTimeout(() => {
this.restartDebounceTimer = null;
void this.restart().catch((err) => { void this.restart().catch((err) => {
logger.warn('Debounced Gateway restart failed:', err); logger.warn('Debounced Gateway restart failed:', err);
}); });
}, delayMs); });
} }
/** /**
@@ -572,18 +557,8 @@ export class GatewayManager extends EventEmitter {
clearTimeout(this.reconnectTimer); clearTimeout(this.reconnectTimer);
this.reconnectTimer = null; this.reconnectTimer = null;
} }
if (this.pingInterval) { this.connectionMonitor.clear();
clearInterval(this.pingInterval); this.restartController.clearDebounceTimer();
this.pingInterval = null;
}
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
if (this.restartDebounceTimer) {
clearTimeout(this.restartDebounceTimer);
this.restartDebounceTimer = null;
}
} }
/** /**
@@ -631,25 +606,16 @@ export class GatewayManager extends EventEmitter {
* Start health check monitoring * Start health check monitoring
*/ */
private startHealthCheck(): void { private startHealthCheck(): void {
if (this.healthCheckInterval) { this.connectionMonitor.startHealthCheck({
clearInterval(this.healthCheckInterval); shouldCheck: () => this.status.state === 'running',
} checkHealth: () => this.checkHealth(),
onUnhealthy: (errorMessage) => {
this.healthCheckInterval = setInterval(async () => { this.emit('error', new Error(errorMessage));
if (this.status.state !== 'running') { },
return; onError: () => {
} // The monitor already logged the error; nothing else to do here.
},
try { });
const health = await this.checkHealth();
if (!health.ok) {
logger.warn(`Gateway health check failed: ${health.error ?? 'unknown'}`);
this.emit('error', new Error(health.error || 'Health check failed'));
}
} catch (error) {
logger.error('Gateway health check error:', error);
}
}, 30000); // Check every 30 seconds
} }
/** /**
@@ -1047,15 +1013,11 @@ export class GatewayManager extends EventEmitter {
* Start ping interval to keep connection alive * Start ping interval to keep connection alive
*/ */
private startPing(): void { private startPing(): void {
if (this.pingInterval) { this.connectionMonitor.startPing(() => {
clearInterval(this.pingInterval);
}
this.pingInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) { if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.ping(); this.ws.ping();
} }
}, 30000); });
} }
/** /**

View File

@@ -0,0 +1,91 @@
import { logger } from '../utils/logger';
import {
getDeferredRestartAction,
shouldDeferRestart,
type GatewayLifecycleState,
} from './process-policy';
type RestartDeferralState = {
state: GatewayLifecycleState;
startLock: boolean;
};
type DeferredRestartContext = RestartDeferralState & {
shouldReconnect: boolean;
};
export class GatewayRestartController {
private deferredRestartPending = false;
private restartDebounceTimer: NodeJS.Timeout | null = null;
isRestartDeferred(context: RestartDeferralState): boolean {
return shouldDeferRestart(context);
}
markDeferredRestart(reason: string, context: RestartDeferralState): void {
if (!this.deferredRestartPending) {
logger.info(
`Deferring Gateway restart (${reason}) until startup/reconnect settles (state=${context.state}, startLock=${context.startLock})`,
);
} else {
logger.debug(
`Gateway restart already deferred; keeping pending request (${reason}, state=${context.state}, startLock=${context.startLock})`,
);
}
this.deferredRestartPending = true;
}
flushDeferredRestart(
trigger: string,
context: DeferredRestartContext,
executeRestart: () => void,
): void {
const action = getDeferredRestartAction({
hasPendingRestart: this.deferredRestartPending,
state: context.state,
startLock: context.startLock,
shouldReconnect: context.shouldReconnect,
});
if (action === 'none') return;
if (action === 'wait') {
logger.debug(
`Deferred Gateway restart still waiting (${trigger}, state=${context.state}, startLock=${context.startLock})`,
);
return;
}
this.deferredRestartPending = false;
if (action === 'drop') {
logger.info(
`Dropping deferred Gateway restart (${trigger}) because lifecycle already recovered (state=${context.state}, shouldReconnect=${context.shouldReconnect})`,
);
return;
}
logger.info(`Executing deferred Gateway restart now (${trigger})`);
executeRestart();
}
debouncedRestart(delayMs: number, executeRestart: () => void): void {
if (this.restartDebounceTimer) {
clearTimeout(this.restartDebounceTimer);
}
logger.debug(`Gateway restart debounced (will fire in ${delayMs}ms)`);
this.restartDebounceTimer = setTimeout(() => {
this.restartDebounceTimer = null;
executeRestart();
}, delayMs);
}
clearDebounceTimer(): void {
if (this.restartDebounceTimer) {
clearTimeout(this.restartDebounceTimer);
this.restartDebounceTimer = null;
}
}
resetDeferredRestart(): void {
this.deferredRestartPending = false;
}
}