import { randomUUID } from 'crypto'; import { createProvider } from '@electron/providers'; import type { BaseProvider } from '@electron/providers/BaseProvider'; import { providerApiService } from '@electron/service/provider-api-service'; import logManager from '@electron/service/logger'; import type { RawMessage } from '@src/pages/home/model/ChatModel'; import { sessionStore } from '../session-store'; import type { GatewayEvent, GatewayRpcParams, GatewayRpcReturns } from '../types'; import { appendTranscriptLine } from '@electron/utils/token-usage-writer'; export interface GatewayChatMessage { role: 'system' | 'user' | 'assistant' | 'tool'; content: string; } function buildChatMessages(sessionMessages: RawMessage[]): GatewayChatMessage[] { return sessionMessages .map((msg): GatewayChatMessage | null => { if (!msg.role || !msg.content) return null; const role = msg.role; if (role === 'user' || role === 'assistant' || role === 'system') { return { role, content: typeof msg.content === 'string' ? msg.content : '', }; } // Skip toolresult and unsupported roles for now return null; }) .filter((m): m is GatewayChatMessage => m !== null); } async function processChatStream( sessionKey: string, runId: string, provider: BaseProvider, model: string, providerName: string, messages: GatewayChatMessage[], signal: AbortSignal, broadcast: (event: GatewayEvent) => void ) { let assistantContent = ''; let finalUsage: any = undefined; try { const chunks = await provider.chat(messages, model, { signal }); for await (const chunk of chunks) { if (signal.aborted) break; if (chunk.result) { assistantContent += chunk.result; broadcast({ type: 'chat:delta', sessionKey, runId, delta: chunk.result, }); } if (chunk.usage !== undefined) { finalUsage = chunk.usage; } // Do not break on isEnd; the iterable may still yield a trailing usage chunk. // The loop will finish naturally when the generator is done. } if (!signal.aborted) { const finalMessage: RawMessage = { role: 'assistant', content: assistantContent, timestamp: Date.now(), }; sessionStore.appendMessage(sessionKey, finalMessage); sessionStore.clearActiveRun(sessionKey); appendTranscriptLine(sessionKey, { type: 'message', timestamp: new Date().toISOString(), message: { role: 'assistant', content: assistantContent, model, provider: providerName, usage: finalUsage, }, }); broadcast({ type: 'chat:final', sessionKey, runId, message: finalMessage, }); } } catch (error) { sessionStore.clearActiveRun(sessionKey); broadcast({ type: 'chat:error', sessionKey, runId, error: error instanceof Error ? error.message : String(error), }); } } export function handleChatSend( params: GatewayRpcParams['chat.send'], broadcast: (event: GatewayEvent) => void ): GatewayRpcReturns['chat.send'] { const { sessionKey, message, options } = params; const runId = randomUUID(); // 1. Append user message const userMessage: RawMessage = { ...message, timestamp: message.timestamp || Date.now(), }; sessionStore.appendMessage(sessionKey, userMessage); appendTranscriptLine(sessionKey, { type: 'message', timestamp: new Date().toISOString(), message: { role: 'user', content: typeof userMessage.content === 'string' ? userMessage.content : '', }, }); // 2. Resolve provider account const accountId = options?.providerAccountId || providerApiService.getDefault().accountId; if (!accountId) { throw new Error('No provider account selected'); } const account = providerApiService.getAccounts().find((a) => a.id === accountId); if (!account) { throw new Error(`Provider account ${accountId} not found`); } const model = account.model; if (!model) { throw new Error(`Provider account ${accountId} has no model configured`); } // 3. Build messages array from session history const session = sessionStore.getOrCreate(sessionKey); const messages = buildChatMessages(session.messages); // 4. Start streaming const abortController = new AbortController(); sessionStore.setActiveRun(sessionKey, runId, abortController); // Run async stream processing in background const provider = createProvider(accountId); const providerName = account.vendorId || account.label || account.model || 'unknown'; processChatStream(sessionKey, runId, provider, model, providerName, messages, abortController.signal, broadcast).catch( (err) => { logManager.error('Unexpected error in processChatStream:', err); sessionStore.clearActiveRun(sessionKey); broadcast({ type: 'chat:error', sessionKey, runId, error: err instanceof Error ? err.message : String(err), }); } ); return { runId }; } export function handleChatHistory( params: GatewayRpcParams['chat.history'] ): GatewayRpcReturns['chat.history'] { return sessionStore.getMessages(params.sessionKey, params.limit ?? 50); } export function handleChatAbort( params: GatewayRpcParams['chat.abort'], broadcast: (event: GatewayEvent) => void ): GatewayRpcReturns['chat.abort'] { const activeRun = sessionStore.getActiveRun(params.sessionKey); if (activeRun) { activeRun.abortController.abort(); sessionStore.clearActiveRun(params.sessionKey); broadcast({ type: 'chat:aborted', sessionKey: params.sessionKey, runId: activeRun.runId, }); } } export function handleSessionList(): GatewayRpcReturns['session.list'] { return sessionStore.getAllKeys(); }