Files
zn-ai/electron/gateway/handlers/chat.ts
2026-04-14 17:02:20 +08:00

171 lines
4.8 KiB
TypeScript

import { randomUUID } from 'crypto';
import { createProvider } from '@electron/providers';
import type { BaseProvider } from '@electron/providers/BaseProvider';
import { providerApiService } from '@electron/service/provider-api-service';
import logManager from '@electron/service/logger';
import type { RawMessage } from '@src/pages/home/model/ChatModel';
import { sessionStore } from '../session-store';
import type { GatewayEvent, GatewayRpcParams, GatewayRpcReturns } from '../types';
export interface GatewayChatMessage {
role: 'system' | 'user' | 'assistant' | 'tool';
content: string;
}
function buildChatMessages(sessionMessages: RawMessage[]): GatewayChatMessage[] {
return sessionMessages
.map((msg): GatewayChatMessage | null => {
if (!msg.role || !msg.content) return null;
const role = msg.role;
if (role === 'user' || role === 'assistant' || role === 'system') {
return {
role,
content: typeof msg.content === 'string' ? msg.content : '',
};
}
// Skip toolresult and unsupported roles for now
return null;
})
.filter((m): m is GatewayChatMessage => m !== null);
}
async function processChatStream(
sessionKey: string,
runId: string,
provider: BaseProvider,
model: string,
messages: GatewayChatMessage[],
signal: AbortSignal,
broadcast: (event: GatewayEvent) => void
) {
let assistantContent = '';
try {
const chunks = await provider.chat(messages, model, { signal });
for await (const chunk of chunks) {
if (signal.aborted) break;
if (chunk.result) {
assistantContent += chunk.result;
broadcast({
type: 'chat:delta',
sessionKey,
runId,
delta: chunk.result,
});
}
if (chunk.isEnd) {
break;
}
}
if (!signal.aborted) {
const finalMessage: RawMessage = {
role: 'assistant',
content: assistantContent,
timestamp: Date.now(),
};
sessionStore.appendMessage(sessionKey, finalMessage);
sessionStore.clearActiveRun(sessionKey);
broadcast({
type: 'chat:final',
sessionKey,
runId,
message: finalMessage,
});
}
} catch (error) {
sessionStore.clearActiveRun(sessionKey);
broadcast({
type: 'chat:error',
sessionKey,
runId,
error: error instanceof Error ? error.message : String(error),
});
}
}
export function handleChatSend(
params: GatewayRpcParams['chat.send'],
broadcast: (event: GatewayEvent) => void
): GatewayRpcReturns['chat.send'] {
const { sessionKey, message, options } = params;
const runId = randomUUID();
// 1. Append user message
sessionStore.appendMessage(sessionKey, {
...message,
timestamp: message.timestamp || Date.now(),
});
// 2. Resolve provider account
const accountId = options?.providerAccountId || providerApiService.getDefault().accountId;
if (!accountId) {
throw new Error('No provider account selected');
}
const account = providerApiService.getAccounts().find((a) => a.id === accountId);
if (!account) {
throw new Error(`Provider account ${accountId} not found`);
}
const model = account.model;
if (!model) {
throw new Error(`Provider account ${accountId} has no model configured`);
}
// 3. Build messages array from session history
const session = sessionStore.getOrCreate(sessionKey);
const messages = buildChatMessages(session.messages);
// 4. Start streaming
const abortController = new AbortController();
sessionStore.setActiveRun(sessionKey, runId, abortController);
// Run async stream processing in background
const provider = createProvider(accountId);
processChatStream(sessionKey, runId, provider, model, messages, abortController.signal, broadcast).catch(
(err) => {
logManager.error('Unexpected error in processChatStream:', err);
sessionStore.clearActiveRun(sessionKey);
broadcast({
type: 'chat:error',
sessionKey,
runId,
error: err instanceof Error ? err.message : String(err),
});
}
);
return { runId };
}
export function handleChatHistory(
params: GatewayRpcParams['chat.history']
): GatewayRpcReturns['chat.history'] {
return sessionStore.getMessages(params.sessionKey, params.limit ?? 50);
}
export function handleChatAbort(
params: GatewayRpcParams['chat.abort'],
broadcast: (event: GatewayEvent) => void
): GatewayRpcReturns['chat.abort'] {
const activeRun = sessionStore.getActiveRun(params.sessionKey);
if (activeRun) {
activeRun.abortController.abort();
sessionStore.clearActiveRun(params.sessionKey);
broadcast({
type: 'chat:aborted',
sessionKey: params.sessionKey,
runId: activeRun.runId,
});
}
}
export function handleSessionList(): GatewayRpcReturns['session.list'] {
return sessionStore.getAllKeys();
}