import { useSyncExternalStore } from 'react'; import type { ChatSession, RawMessage, ToolStatus } from '@shared/chat-model'; import { extractText, isToolOnlyMessage } from '@shared/chat-model'; import { gatewayRpc, onGatewayEvent } from '../lib/gateway-client'; import { hostApiFetch } from '../lib/host-api'; import type { GatewayEvent } from '../types/runtime'; const DEFAULT_SESSION_KEY = 'agent:main:main'; const DEFAULT_AGENT_ID = '1953462165250859011'; const SESSION_LOAD_MIN_INTERVAL_MS = 1200; const HISTORY_LOAD_MIN_INTERVAL_MS = 800; const CHAT_EVENT_DEDUPE_TTL_MS = 30000; export interface StagedAttachment { fileName: string; mimeType: string; fileSize: number; stagedPath: string; preview: string | null; } export interface ChatStoreState { initialized: boolean; messages: RawMessage[]; loading: boolean; error: string | null; sending: boolean; activeRunId: string | null; streamingMessage: RawMessage | null; streamingTools: ToolStatus[]; pendingFinal: boolean; lastUserMessageAt: number | null; sessions: ChatSession[]; currentSessionKey: string; currentAgentId: string; sessionLabels: Record; sessionLastActivity: Record; gatewayStatus: 'connected' | 'disconnected' | 'reconnecting'; } const listeners = new Set<() => void>(); const historyLoadInFlight = new Map>(); const lastHistoryLoadAtBySession = new Map(); const chatEventDedupe = new Map(); let gatewaySubscribed = false; let loadSessionsInFlight: Promise | null = null; let lastLoadSessionsAt = 0; let state: ChatStoreState = { initialized: false, messages: [], loading: false, error: null, sending: false, activeRunId: null, streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: null, sessions: [], currentSessionKey: DEFAULT_SESSION_KEY, currentAgentId: DEFAULT_AGENT_ID, sessionLabels: {}, sessionLastActivity: {}, gatewayStatus: 'disconnected', }; function emit(): void { for (const listener of listeners) { listener(); } } function patchState(patch: Partial): ChatStoreState { state = { ...state, ...patch }; emit(); return state; } function getAgentIdFromSessionKey(sessionKey: string): string { if (!sessionKey.startsWith('agent:')) return DEFAULT_AGENT_ID; const parts = sessionKey.split(':'); return parts[1] || DEFAULT_AGENT_ID; } function clearSessionEntryFromMap>(entries: T, sessionKey: string): T { return Object.fromEntries(Object.entries(entries).filter(([key]) => key !== sessionKey)) as T; } function ensureSessionEntry(sessions: ChatSession[], sessionKey: string, displayName?: string): ChatSession[] { if (sessions.some((session) => session.key === sessionKey)) return sessions; return [...sessions, { key: sessionKey, displayName: displayName || 'New Chat' }]; } function toMs(ts: number): number { return ts < 1e12 ? ts * 1000 : ts; } function buildSessionSwitchPatch(currentState: ChatStoreState, nextSessionKey: string): Partial { const leavingEmpty = !currentState.currentSessionKey.endsWith(':main') && currentState.messages.length === 0 && !currentState.sessionLastActivity[currentState.currentSessionKey] && !currentState.sessionLabels[currentState.currentSessionKey]; const nextSessions = leavingEmpty ? currentState.sessions.filter((session) => session.key !== currentState.currentSessionKey) : currentState.sessions; return { currentSessionKey: nextSessionKey, currentAgentId: getAgentIdFromSessionKey(nextSessionKey), sessions: ensureSessionEntry(nextSessions, nextSessionKey), sessionLabels: leavingEmpty ? clearSessionEntryFromMap(currentState.sessionLabels, currentState.currentSessionKey) : currentState.sessionLabels, sessionLastActivity: leavingEmpty ? clearSessionEntryFromMap(currentState.sessionLastActivity, currentState.currentSessionKey) : currentState.sessionLastActivity, messages: [], streamingMessage: null, streamingTools: [], activeRunId: null, error: null, pendingFinal: false, lastUserMessageAt: null, }; } function pruneChatEventDedupe(now: number): void { for (const [key, timestamp] of chatEventDedupe.entries()) { if (now - timestamp > CHAT_EVENT_DEDUPE_TTL_MS) { chatEventDedupe.delete(key); } } } function buildChatEventDedupeKey(event: GatewayEvent): string | null { const runId = 'runId' in event && typeof event.runId === 'string' ? event.runId : ''; const sessionKey = 'sessionKey' in event && typeof event.sessionKey === 'string' ? event.sessionKey : ''; const type = event.type; if (!runId && !sessionKey && !type) return null; return `${runId}|${sessionKey}|${type}`; } function isDuplicateChatEvent(event: GatewayEvent): boolean { const key = buildChatEventDedupeKey(event); if (!key) return false; const now = Date.now(); pruneChatEventDedupe(now); if (chatEventDedupe.has(key)) return true; chatEventDedupe.set(key, now); return false; } async function resolveDefaultAccountId(): Promise { try { const result = await gatewayRpc<{ accountId: string | null }>('provider.getDefault', {}); if (result?.accountId) return result.accountId; } catch { // fall through } try { const result = await hostApiFetch<{ accountId: string | null }>('/api/provider-accounts/default'); return result.accountId ?? null; } catch { return null; } } async function stageBuffer(base64: string, fileName: string, mimeType: string): Promise { try { const result = await hostApiFetch('/api/files/stage-buffer', { method: 'POST', body: JSON.stringify({ base64, fileName, mimeType }), }); if (result?.stagedPath) { return result; } } catch { // fall through to local fallback } const dataUrl = `data:${mimeType};base64,${base64}`; return { fileName, mimeType, fileSize: Math.ceil(base64.length * 0.75), stagedPath: dataUrl, preview: mimeType.startsWith('image/') ? dataUrl : null, }; } async function subscribeToGateway(): Promise { if (gatewaySubscribed) return; gatewaySubscribed = true; onGatewayEvent((event) => { if (event.type === 'gateway:status') { patchState({ gatewayStatus: event.status }); return; } if (typeof event.sessionKey === 'string' && event.sessionKey !== state.currentSessionKey) { return; } void handleGatewayEvent(event); }); } async function loadSessions(): Promise { const now = Date.now(); if (loadSessionsInFlight) { await loadSessionsInFlight; return; } if (now - lastLoadSessionsAt < SESSION_LOAD_MIN_INTERVAL_MS) { return; } loadSessionsInFlight = (async () => { try { const localKeys = await gatewayRpc('session.list', {}); let sessions: ChatSession[] = localKeys.map((key) => ({ key, displayName: state.sessionLabels[key] || 'New Chat', updatedAt: state.sessionLastActivity[key] || Date.now(), })); const existingNonLocal = state.sessions.filter((session) => !session.key.startsWith('local:')); sessions = [...existingNonLocal, ...sessions]; let nextSessionKey = state.currentSessionKey || DEFAULT_SESSION_KEY; if (!sessions.find((session) => session.key === nextSessionKey) && sessions.length > 0) { nextSessionKey = sessions[0].key; } const sessionsWithCurrent = !sessions.find((session) => session.key === nextSessionKey) && nextSessionKey ? [...sessions, { key: nextSessionKey, displayName: nextSessionKey }] : sessions; const discoveredActivity = Object.fromEntries( sessionsWithCurrent .filter((session) => typeof session.updatedAt === 'number' && Number.isFinite(session.updatedAt)) .map((session) => [session.key, session.updatedAt!]), ); patchState({ initialized: true, sessions: sessionsWithCurrent, currentSessionKey: nextSessionKey, currentAgentId: getAgentIdFromSessionKey(nextSessionKey), sessionLastActivity: { ...state.sessionLastActivity, ...discoveredActivity, }, }); if (nextSessionKey && nextSessionKey !== DEFAULT_SESSION_KEY) { await loadHistory(nextSessionKey, true); } void Promise.all( sessionsWithCurrent .filter((session) => !state.sessionLabels[session.key]) .map(async (session) => { try { const messages = await gatewayRpc('chat.history', { sessionKey: session.key, limit: 50, }); const firstUser = messages.find((message) => message.role === 'user'); const lastMessage = messages[messages.length - 1]; const nextPatch: Partial = {}; if (firstUser) { const labelText = extractText(firstUser).trim(); if (labelText) { nextPatch.sessionLabels = { ...state.sessionLabels, [session.key]: labelText.length > 50 ? `${labelText.slice(0, 50)}...` : labelText, }; } } if (lastMessage?.timestamp) { nextPatch.sessionLastActivity = { ...state.sessionLastActivity, [session.key]: toMs(lastMessage.timestamp), }; } if (Object.keys(nextPatch).length > 0) { patchState(nextPatch); } } catch { // ignore background label loads } }), ); } finally { lastLoadSessionsAt = Date.now(); } })(); try { await loadSessionsInFlight; } finally { loadSessionsInFlight = null; } } async function loadHistory(sessionKey = state.currentSessionKey, quiet = false): Promise { if (!sessionKey || sessionKey === DEFAULT_SESSION_KEY) { patchState({ messages: [], loading: false, }); return; } const existingLoad = historyLoadInFlight.get(sessionKey); if (existingLoad) { await existingLoad; return; } const lastLoadAt = lastHistoryLoadAtBySession.get(sessionKey) || 0; if (quiet && Date.now() - lastLoadAt < HISTORY_LOAD_MIN_INTERVAL_MS) { return; } if (!quiet) { patchState({ loading: true }); } const loadPromise = (async () => { try { const messages = await gatewayRpc('chat.history', { sessionKey, limit: 50, }); if (state.currentSessionKey !== sessionKey) return; let nextMessages = messages.filter((message) => !message || !('role' in message) || message.role); if (state.sending && state.lastUserMessageAt) { const hasRecentUser = nextMessages.some((message) => ( message.role === 'user' && message.timestamp && Math.abs(toMs(message.timestamp) - state.lastUserMessageAt!) < 5000 )); if (!hasRecentUser) { const optimistic = [...state.messages].reverse().find((message) => ( message.role === 'user' && message.timestamp && Math.abs(toMs(message.timestamp) - state.lastUserMessageAt!) < 5000 )); if (optimistic) { nextMessages = [...nextMessages, optimistic]; } } } const nextPatch: Partial = { messages: nextMessages, loading: false, }; const firstUser = nextMessages.find((message) => message.role === 'user'); if (firstUser && !sessionKey.endsWith(':main')) { const labelText = extractText(firstUser).trim(); if (labelText) { nextPatch.sessionLabels = { ...state.sessionLabels, [sessionKey]: labelText.length > 50 ? `${labelText.slice(0, 50)}...` : labelText, }; } } const lastMessage = nextMessages[nextMessages.length - 1]; if (lastMessage?.timestamp) { nextPatch.sessionLastActivity = { ...state.sessionLastActivity, [sessionKey]: toMs(lastMessage.timestamp), }; } patchState(nextPatch); } catch (error) { patchState({ error: quiet ? state.error : String(error), loading: false, }); } })(); historyLoadInFlight.set(sessionKey, loadPromise); try { await loadPromise; } finally { lastHistoryLoadAtBySession.set(sessionKey, Date.now()); if (historyLoadInFlight.get(sessionKey) === loadPromise) { historyLoadInFlight.delete(sessionKey); } } } async function newSession(): Promise { const defaultAccountId = await resolveDefaultAccountId(); if (!defaultAccountId) { patchState({ error: '请先前往模型管理页面配置并设置一个默认模型' }); return; } const leavingEmpty = !state.currentSessionKey.endsWith(':main') && state.messages.length === 0 && !state.sessionLastActivity[state.currentSessionKey] && !state.sessionLabels[state.currentSessionKey]; const newKey = `local:${defaultAccountId}:${crypto.randomUUID()}`; const nextSessions = leavingEmpty ? state.sessions.filter((session) => session.key !== state.currentSessionKey) : state.sessions; patchState({ currentSessionKey: newKey, currentAgentId: 'local', sessions: [...nextSessions, { key: newKey, displayName: 'New Chat' }], sessionLabels: leavingEmpty ? clearSessionEntryFromMap(state.sessionLabels, state.currentSessionKey) : state.sessionLabels, sessionLastActivity: leavingEmpty ? clearSessionEntryFromMap(state.sessionLastActivity, state.currentSessionKey) : state.sessionLastActivity, messages: [], streamingMessage: null, streamingTools: [], activeRunId: null, error: null, pendingFinal: false, lastUserMessageAt: null, }); } function switchSession(sessionKey: string): void { if (sessionKey === state.currentSessionKey) return; patchState(buildSessionSwitchPatch(state, sessionKey)); void loadHistory(sessionKey); } async function deleteSession(sessionKey: string): Promise { try { await gatewayRpc('session.delete', { sessionKey }); } catch { // keep local cleanup even if gateway delete fails } const remaining = state.sessions.filter((session) => session.key !== sessionKey); const basePatch: Partial = { sessions: remaining, sessionLabels: clearSessionEntryFromMap(state.sessionLabels, sessionKey), sessionLastActivity: clearSessionEntryFromMap(state.sessionLastActivity, sessionKey), }; if (state.currentSessionKey === sessionKey) { const nextSession = remaining[0]?.key ?? DEFAULT_SESSION_KEY; patchState({ ...basePatch, currentSessionKey: nextSession, currentAgentId: getAgentIdFromSessionKey(nextSession), messages: [], streamingMessage: null, streamingTools: [], activeRunId: null, error: null, pendingFinal: false, lastUserMessageAt: null, }); if (nextSession !== DEFAULT_SESSION_KEY) { await loadHistory(nextSession); } return; } patchState(basePatch); } function renameSession(sessionKey: string, nextLabel: string): void { const trimmedLabel = nextLabel.trim(); if (!trimmedLabel) return; patchState({ sessionLabels: { ...state.sessionLabels, [sessionKey]: trimmedLabel, }, }); } async function sendMessage(text: string, attachments: StagedAttachment[] = []): Promise { const trimmedText = text.trim(); if (!trimmedText && attachments.length === 0) return false; const defaultAccountId = await resolveDefaultAccountId(); if (!defaultAccountId) { patchState({ error: '请先前往模型管理页面配置并设置一个默认模型' }); return false; } let targetSessionKey = state.currentSessionKey; if (!targetSessionKey || targetSessionKey === DEFAULT_SESSION_KEY) { targetSessionKey = `local:${defaultAccountId}:${crypto.randomUUID()}`; } const nowMs = Date.now(); const userMessage: RawMessage = { role: 'user', content: trimmedText || (attachments.length > 0 ? '(file attached)' : ''), timestamp: nowMs, id: crypto.randomUUID(), _attachedFiles: attachments.map((attachment) => ({ fileName: attachment.fileName, mimeType: attachment.mimeType, fileSize: attachment.fileSize, preview: attachment.preview, filePath: attachment.stagedPath, })), }; const nextSessions = ensureSessionEntry(state.sessions, targetSessionKey, 'New Chat'); const isFirstUserMessage = !state.messages.some((message) => message.role === 'user'); const nextLabels = !targetSessionKey.endsWith(':main') && isFirstUserMessage && trimmedText ? { ...state.sessionLabels, [targetSessionKey]: trimmedText.length > 50 ? `${trimmedText.slice(0, 50)}...` : trimmedText, } : state.sessionLabels; patchState({ messages: [...state.messages, userMessage], sending: true, activeRunId: null, error: null, streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: nowMs, sessions: nextSessions, currentSessionKey: targetSessionKey, currentAgentId: getAgentIdFromSessionKey(targetSessionKey), sessionLabels: nextLabels, sessionLastActivity: { ...state.sessionLastActivity, [targetSessionKey]: nowMs, }, }); try { let messageContent = trimmedText; if (attachments.length > 0) { const refs = attachments .map((attachment) => `[media attached: ${attachment.fileName} (${attachment.mimeType}) | ${attachment.stagedPath}]`) .join('\n'); messageContent = messageContent ? `${messageContent}\n\n${refs}` : refs; } const result = await gatewayRpc<{ runId: string }>('chat.send', { sessionKey: targetSessionKey, message: { role: 'user', content: messageContent, }, options: { providerAccountId: defaultAccountId, }, }); patchState({ activeRunId: result.runId, }); return true; } catch (error) { patchState({ error: String(error), sending: false, activeRunId: null, lastUserMessageAt: null, streamingMessage: null, streamingTools: [], pendingFinal: false, }); return false; } } async function abortRun(): Promise { const sessionKey = state.currentSessionKey; patchState({ sending: false, activeRunId: null, streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: null, }); if (!sessionKey || sessionKey === DEFAULT_SESSION_KEY) return; try { await gatewayRpc('chat.abort', { sessionKey }); } catch (error) { patchState({ error: String(error) }); } } async function handleGatewayEvent(event: GatewayEvent): Promise { if (isDuplicateChatEvent(event)) return; if (state.activeRunId && 'runId' in event && typeof event.runId === 'string' && event.runId !== state.activeRunId) return; switch (event.type) { case 'chat:delta': { const previousContent = state.streamingMessage ? extractText(state.streamingMessage) : ''; patchState({ sending: true, error: null, activeRunId: typeof event.runId === 'string' ? event.runId : state.activeRunId, streamingMessage: { role: 'assistant', content: previousContent + event.delta, timestamp: Date.now(), id: state.streamingMessage?.id || `stream-${event.runId || Date.now()}`, }, }); break; } case 'chat:final': { const composedMessage = state.streamingMessage && typeof event.message.content === 'string' ? { ...event.message, content: `${extractText(state.streamingMessage)}${event.message.content}`, } : event.message; const messageId = composedMessage.id || `run-${event.runId || Date.now()}`; const hasOutput = Boolean(extractText(composedMessage).trim()); const toolOnly = isToolOnlyMessage(composedMessage); if (!state.messages.some((message) => message.id === messageId)) { patchState({ messages: [...state.messages, { ...composedMessage, id: messageId }], sessionLastActivity: { ...state.sessionLastActivity, [state.currentSessionKey]: composedMessage.timestamp ? toMs(composedMessage.timestamp) : Date.now(), }, streamingMessage: null, streamingTools: [], pendingFinal: !hasOutput || toolOnly, sending: !hasOutput || toolOnly, activeRunId: hasOutput && !toolOnly ? null : state.activeRunId, lastUserMessageAt: hasOutput && !toolOnly ? null : state.lastUserMessageAt, }); } else { patchState({ streamingMessage: null, streamingTools: [], pendingFinal: !hasOutput || toolOnly, sending: !hasOutput || toolOnly, activeRunId: hasOutput && !toolOnly ? null : state.activeRunId, lastUserMessageAt: hasOutput && !toolOnly ? null : state.lastUserMessageAt, }); } if (hasOutput && !toolOnly) { await loadHistory(state.currentSessionKey, true); } break; } case 'chat:error': { if (state.streamingMessage && !state.messages.some((message) => message.id === state.streamingMessage?.id)) { patchState({ messages: [ ...state.messages, { ...state.streamingMessage, id: state.streamingMessage.id || `error-snap-${Date.now()}`, }, ], }); } patchState({ error: event.error, sending: false, activeRunId: null, streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: null, }); break; } case 'chat:aborted': { patchState({ sending: false, activeRunId: null, streamingMessage: null, streamingTools: [], pendingFinal: false, lastUserMessageAt: null, }); break; } default: break; } } function clearError(): void { patchState({ error: null }); } async function initChatStore(): Promise { await subscribeToGateway(); await loadSessions(); } async function stageAttachmentFiles(files: File[]): Promise { const stagedFiles: StagedAttachment[] = []; for (const file of files) { const base64 = await new Promise((resolve, reject) => { const reader = new FileReader(); reader.onerror = () => reject(reader.error || new Error('Failed to read file')); reader.onloadend = () => { const dataUrl = String(reader.result || ''); resolve(dataUrl.split(',')[1] || ''); }; reader.readAsDataURL(file); }); stagedFiles.push(await stageBuffer(base64, file.name, file.type || 'application/octet-stream')); } return stagedFiles; } function subscribe(listener: () => void): () => void { listeners.add(listener); return () => listeners.delete(listener); } function getSnapshot(): ChatStoreState { return state; } export const chatStore = { subscribe, getSnapshot, getState: () => state, init: initChatStore, loadSessions, loadHistory, switchSession, newSession, deleteSession, renameSession, sendMessage, abortRun, clearError, stageAttachmentFiles, }; export function useChatStore(): ChatStoreState { return useSyncExternalStore(chatStore.subscribe, chatStore.getSnapshot, chatStore.getSnapshot); }