/** * Gateway State Store * Uses Host API + SSE for lifecycle/status and a direct renderer WebSocket for runtime RPC. */ import { create } from 'zustand'; import { hostApiFetch } from '@/lib/host-api'; import { invokeIpc } from '@/lib/api-client'; import { subscribeHostEvent } from '@/lib/host-events'; import type { GatewayStatus } from '../types/gateway'; let gatewayInitPromise: Promise | null = null; let gatewayEventUnsubscribers: Array<() => void> | null = null; interface GatewayHealth { ok: boolean; error?: string; uptime?: number; } interface GatewayState { status: GatewayStatus; health: GatewayHealth | null; isInitialized: boolean; lastError: string | null; init: () => Promise; start: () => Promise; stop: () => Promise; restart: () => Promise; checkHealth: () => Promise; rpc: (method: string, params?: unknown, timeoutMs?: number) => Promise; setStatus: (status: GatewayStatus) => void; clearError: () => void; } function handleGatewayNotification(notification: { method?: string; params?: Record } | undefined): void { const payload = notification; if (!payload || payload.method !== 'agent' || !payload.params || typeof payload.params !== 'object') { return; } const p = payload.params; const data = (p.data && typeof p.data === 'object') ? (p.data as Record) : {}; const phase = data.phase ?? p.phase; const hasChatData = (p.state ?? data.state) || (p.message ?? data.message); if (hasChatData) { const normalizedEvent: Record = { ...data, runId: p.runId ?? data.runId, sessionKey: p.sessionKey ?? data.sessionKey, stream: p.stream ?? data.stream, seq: p.seq ?? data.seq, state: p.state ?? data.state, message: p.message ?? data.message, }; import('./chat') .then(({ useChatStore }) => { useChatStore.getState().handleChatEvent(normalizedEvent); }) .catch(() => {}); } const runId = p.runId ?? data.runId; const sessionKey = p.sessionKey ?? data.sessionKey; if (phase === 'started' && runId != null && sessionKey != null) { import('./chat') .then(({ useChatStore }) => { useChatStore.getState().handleChatEvent({ state: 'started', runId, sessionKey, }); }) .catch(() => {}); } if (phase === 'completed' || phase === 'done' || phase === 'finished' || phase === 'end') { import('./chat') .then(({ useChatStore }) => { const state = useChatStore.getState(); state.loadHistory(true); if (state.sending) { useChatStore.setState({ sending: false, activeRunId: null, pendingFinal: false, lastUserMessageAt: null, }); } }) .catch(() => {}); } } function handleGatewayChatMessage(data: unknown): void { import('./chat').then(({ useChatStore }) => { const chatData = data as Record; const payload = ('message' in chatData && typeof chatData.message === 'object') ? chatData.message as Record : chatData; if (payload.state) { useChatStore.getState().handleChatEvent(payload); return; } useChatStore.getState().handleChatEvent({ state: 'final', message: payload, runId: chatData.runId ?? payload.runId, }); }).catch(() => {}); } function mapChannelStatus(status: string): 'connected' | 'connecting' | 'disconnected' | 'error' { switch (status) { case 'connected': case 'running': return 'connected'; case 'connecting': case 'starting': return 'connecting'; case 'error': case 'failed': return 'error'; default: return 'disconnected'; } } export const useGatewayStore = create((set, get) => ({ status: { state: 'stopped', port: 18789, }, health: null, isInitialized: false, lastError: null, init: async () => { if (get().isInitialized) return; if (gatewayInitPromise) { await gatewayInitPromise; return; } gatewayInitPromise = (async () => { try { const status = await hostApiFetch('/api/gateway/status'); set({ status, isInitialized: true }); if (!gatewayEventUnsubscribers) { const unsubscribers: Array<() => void> = []; unsubscribers.push(subscribeHostEvent('gateway:status', (payload) => { set({ status: payload }); })); unsubscribers.push(subscribeHostEvent<{ message?: string }>('gateway:error', (payload) => { set({ lastError: payload.message || 'Gateway error' }); })); unsubscribers.push(subscribeHostEvent<{ method?: string; params?: Record }>( 'gateway:notification', (payload) => { handleGatewayNotification(payload); }, )); unsubscribers.push(subscribeHostEvent('gateway:chat-message', (payload) => { handleGatewayChatMessage(payload); })); unsubscribers.push(subscribeHostEvent<{ channelId?: string; status?: string }>( 'gateway:channel-status', (update) => { import('./channels') .then(({ useChannelsStore }) => { if (!update.channelId || !update.status) return; const state = useChannelsStore.getState(); const channel = state.channels.find((item) => item.type === update.channelId); if (channel) { state.updateChannel(channel.id, { status: mapChannelStatus(update.status) }); } }) .catch(() => {}); }, )); gatewayEventUnsubscribers = unsubscribers; } } catch (error) { console.error('Failed to initialize Gateway:', error); set({ lastError: String(error) }); } finally { gatewayInitPromise = null; } })(); await gatewayInitPromise; }, start: async () => { try { set({ status: { ...get().status, state: 'starting' }, lastError: null }); const result = await hostApiFetch<{ success: boolean; error?: string }>('/api/gateway/start', { method: 'POST', }); if (!result.success) { set({ status: { ...get().status, state: 'error', error: result.error }, lastError: result.error || 'Failed to start Gateway', }); } } catch (error) { set({ status: { ...get().status, state: 'error', error: String(error) }, lastError: String(error), }); } }, stop: async () => { try { await hostApiFetch('/api/gateway/stop', { method: 'POST' }); set({ status: { ...get().status, state: 'stopped' }, lastError: null }); } catch (error) { console.error('Failed to stop Gateway:', error); set({ lastError: String(error) }); } }, restart: async () => { try { set({ status: { ...get().status, state: 'starting' }, lastError: null }); const result = await hostApiFetch<{ success: boolean; error?: string }>('/api/gateway/restart', { method: 'POST', }); if (!result.success) { set({ status: { ...get().status, state: 'error', error: result.error }, lastError: result.error || 'Failed to restart Gateway', }); } } catch (error) { set({ status: { ...get().status, state: 'error', error: String(error) }, lastError: String(error), }); } }, checkHealth: async () => { try { const result = await hostApiFetch('/api/gateway/health'); set({ health: result }); return result; } catch (error) { const health: GatewayHealth = { ok: false, error: String(error) }; set({ health }); return health; } }, rpc: async (method: string, params?: unknown, timeoutMs?: number): Promise => { const response = await invokeIpc<{ success: boolean; result?: T; error?: string; }>('gateway:rpc', method, params, timeoutMs); if (!response.success) { throw new Error(response.error || `Gateway RPC failed: ${method}`); } return response.result as T; }, setStatus: (status) => set({ status }), clearError: () => set({ lastError: null }), }));