import { readFile, rename, writeFile } from "node:fs/promises"; import { join } from "node:path"; import { createClient, type SupabaseClient } from "@supabase/supabase-js"; import type { AppState, Asset, GenerationCapability, GenerationJob, GenerationStatus, Project, UsageEvent } from "@/lib/types"; import { createId } from "@/lib/server/ids"; import { dataDir, DEFAULT_OWNER_ID, ensureRuntimeDirs } from "@/lib/server/runtime"; const STORE_FILE = "web-app-state.json"; let localWriteQueue: Promise = Promise.resolve(); type AssetInput = Omit & Partial>; type JobInput = Omit & Partial>; type UsageInput = Omit & Partial>; export type GenerationJobListFilters = { ownerId?: string; externalClientId?: string; status?: GenerationStatus; capability?: GenerationCapability; limit?: number; before?: string; }; export type ClaimGenerationJobsInput = { workerId: string; limit?: number; lockTimeoutMs?: number; }; export async function listAssets(ownerId = DEFAULT_OWNER_ID): Promise { const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase .from("assets") .select("*") .eq("owner_id", ownerId) .order("created_at", { ascending: false }); if (error) throw new Error(error.message); return (data || []).map(assetFromRow); } const state = await readState(); return state.assets.filter((asset) => asset.ownerId === ownerId).sort(sortNewest); } export async function getAsset(id: string): Promise { const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.from("assets").select("*").eq("id", id).maybeSingle(); if (error) throw new Error(error.message); return data ? assetFromRow(data) : null; } const state = await readState(); return state.assets.find((asset) => asset.id === id) || null; } export async function getAssetByStoragePath(storagePath: string): Promise { const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.from("assets").select("*").eq("storage_path", storagePath).maybeSingle(); if (error) throw new Error(error.message); return data ? assetFromRow(data) : null; } const state = await readState(); return state.assets.find((asset) => asset.storagePath === storagePath) || null; } export async function createAsset(input: AssetInput): Promise { const now = new Date().toISOString(); const asset: Asset = { ...input, id: input.id || createId("asset"), ownerId: input.ownerId || DEFAULT_OWNER_ID, tags: input.tags || [], metadata: input.metadata || {}, createdAt: input.createdAt || now, updatedAt: input.updatedAt || now }; const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.from("assets").insert(assetToRow(asset)).select("*").single(); if (error) throw new Error(error.message); return assetFromRow(data); } return mutateLocalState((state) => { state.assets.unshift(asset); return asset; }); } export async function deleteAsset(id: string): Promise { const existing = await getAsset(id); if (!existing) return null; const supabase = getSupabaseAdmin(); if (supabase) { const { error } = await supabase.from("assets").delete().eq("id", id); if (error) throw new Error(error.message); return existing; } return mutateLocalState((state) => { state.assets = state.assets.filter((asset) => asset.id !== id); state.generationJobs = state.generationJobs.map((job) => ({ ...job, inputAssetIds: job.inputAssetIds.filter((assetId) => assetId !== id), outputAssetIds: job.outputAssetIds.filter((assetId) => assetId !== id) })); return existing; }); } export async function listGenerationJobs(ownerId = DEFAULT_OWNER_ID, limit = 200): Promise { return listGenerationJobsFiltered({ ownerId, limit }); } export async function listGenerationJobsFiltered(filters: GenerationJobListFilters = {}): Promise { const ownerId = filters.ownerId || DEFAULT_OWNER_ID; const limit = filters.limit || 200; const supabase = getSupabaseAdmin(); if (supabase) { let query = supabase .from("generation_jobs") .select("*") .eq("owner_id", ownerId) .order("created_at", { ascending: false }) .limit(limit); if (filters.externalClientId) query = query.eq("external_client_id", filters.externalClientId); if (filters.status) query = query.eq("status", filters.status); if (filters.capability) query = query.eq("capability", filters.capability); if (filters.before) query = query.lt("created_at", filters.before); const { data, error } = await query; if (error) throw new Error(error.message); return (data || []).map(jobFromRow); } const state = await readState(); return state.generationJobs .filter((job) => job.ownerId === ownerId) .filter((job) => !filters.externalClientId || job.externalClientId === filters.externalClientId) .filter((job) => !filters.status || job.status === filters.status) .filter((job) => !filters.capability || job.capability === filters.capability) .filter((job) => !filters.before || job.createdAt < filters.before) .sort(sortNewest) .slice(0, limit); } export async function getGenerationJob(id: string): Promise { const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.from("generation_jobs").select("*").eq("id", id).maybeSingle(); if (error) throw new Error(error.message); return data ? jobFromRow(data) : null; } const state = await readState(); return state.generationJobs.find((job) => job.id === id) || null; } export async function createGenerationJob(input: JobInput): Promise { const now = new Date().toISOString(); const job: GenerationJob = { ...input, id: input.id || createId("job"), ownerId: input.ownerId || DEFAULT_OWNER_ID, inputAssetIds: input.inputAssetIds || [], inputUrls: input.inputUrls || [], outputAssetIds: input.outputAssetIds || [], requestPayload: input.requestPayload || {}, priority: input.priority ?? 0, attempts: input.attempts ?? 0, maxAttempts: input.maxAttempts ?? 3, scheduledAt: input.scheduledAt || now, webhookAttempts: input.webhookAttempts ?? 0, createdAt: input.createdAt || now, updatedAt: input.updatedAt || now }; const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.from("generation_jobs").insert(jobToRow(job)).select("*").single(); if (error) throw new Error(error.message); return jobFromRow(data); } return mutateLocalState((state) => { state.generationJobs.unshift(job); return job; }); } export async function findGenerationJobByIdempotency( externalClientId: string, idempotencyKey: string, ownerId = DEFAULT_OWNER_ID ): Promise { const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase .from("generation_jobs") .select("*") .eq("owner_id", ownerId) .eq("external_client_id", externalClientId) .eq("idempotency_key", idempotencyKey) .maybeSingle(); if (error) throw new Error(error.message); return data ? jobFromRow(data) : null; } const state = await readState(); return state.generationJobs.find((job) => ( job.ownerId === ownerId && job.externalClientId === externalClientId && job.idempotencyKey === idempotencyKey )) || null; } export async function claimGenerationJobs(input: ClaimGenerationJobsInput): Promise { const limit = Math.max(1, Math.min(input.limit || 1, 20)); const lockTimeoutMs = input.lockTimeoutMs ?? 5 * 60 * 1000; const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.rpc("claim_generation_jobs", { p_worker_id: input.workerId, p_limit: limit, p_lock_timeout_seconds: Math.ceil(lockTimeoutMs / 1000) }); if (error) throw new Error(`claim_generation_jobs failed: ${error.message}`); return (Array.isArray(data) ? data : []).map(jobFromRow); } return mutateLocalState((state) => { const now = new Date(); const nowIso = now.toISOString(); const staleBefore = new Date(now.getTime() - lockTimeoutMs).toISOString(); const selected = state.generationJobs .filter((job) => isClaimableJob(job, nowIso, staleBefore)) .sort(sortClaimableJobs) .slice(0, limit); for (const job of selected) { job.lockedAt = nowIso; job.lockedBy = input.workerId; if (!job.startedAt) job.startedAt = nowIso; job.updatedAt = nowIso; } return selected.map((job) => ({ ...job })); }); } export async function clearGenerationJobLock( id: string, patch: Partial = {}, options: { clearProviderTaskId?: boolean } = {} ): Promise { const updatedAt = new Date().toISOString(); const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase .from("generation_jobs") .update({ ...jobToRow({ ...patch, updatedAt } as GenerationJob), locked_at: null, locked_by: null, ...(options.clearProviderTaskId ? { provider_task_id: null } : {}) }) .eq("id", id) .select("*") .single(); if (error) throw new Error(error.message); return jobFromRow(data); } return mutateLocalState((state) => { const index = state.generationJobs.findIndex((job) => job.id === id); if (index === -1) throw new Error(`Generation job not found: ${id}`); state.generationJobs[index] = { ...state.generationJobs[index], ...patch, lockedAt: undefined, lockedBy: undefined, ...(options.clearProviderTaskId ? { providerTaskId: undefined } : {}), updatedAt }; return state.generationJobs[index]; }); } export async function updateGenerationJob(id: string, patch: Partial): Promise { const updatedAt = new Date().toISOString(); const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase .from("generation_jobs") .update(jobToRow({ ...patch, updatedAt } as GenerationJob)) .eq("id", id) .select("*") .single(); if (error) throw new Error(error.message); return jobFromRow(data); } return mutateLocalState((state) => { const index = state.generationJobs.findIndex((job) => job.id === id); if (index === -1) throw new Error(`Generation job not found: ${id}`); state.generationJobs[index] = { ...state.generationJobs[index], ...patch, updatedAt }; return state.generationJobs[index]; }); } export async function deleteGenerationJob(id: string): Promise { const existing = await getGenerationJob(id); if (!existing) return null; const supabase = getSupabaseAdmin(); if (supabase) { const { error: usageError } = await supabase.from("usage_events").delete().eq("job_id", id); if (usageError) throw new Error(usageError.message); const { error } = await supabase.from("generation_jobs").delete().eq("id", id); if (error) throw new Error(error.message); return existing; } return mutateLocalState((state) => { state.generationJobs = state.generationJobs.filter((job) => job.id !== id); state.usageEvents = state.usageEvents.filter((event) => event.jobId !== id); return existing; }); } export async function recordUsageEvent(input: UsageInput): Promise { const usage: UsageEvent = { ...input, id: input.id || createId("usage"), ownerId: input.ownerId || DEFAULT_OWNER_ID, createdAt: input.createdAt || new Date().toISOString() }; const supabase = getSupabaseAdmin(); if (supabase) { const { data, error } = await supabase.from("usage_events").insert(usageToRow(usage)).select("*").single(); if (error) throw new Error(error.message); return usageFromRow(data); } return mutateLocalState((state) => { state.usageEvents.unshift(usage); return usage; }); } export async function listProjects(ownerId = DEFAULT_OWNER_ID): Promise { const state = await readState(); return state.projects.filter((project) => project.ownerId === ownerId).sort(sortNewest); } async function readState(): Promise { await ensureRuntimeDirs(); const path = join(dataDir(), STORE_FILE); try { return normalizeState(JSON.parse(await readFile(path, "utf8"))); } catch { const state = normalizeState({}); await writeState(state); return state; } } async function writeState(state: AppState): Promise { await ensureRuntimeDirs(); const path = join(dataDir(), STORE_FILE); const temp = `${path}.${createId("tmp")}.tmp`; await writeFile(temp, JSON.stringify(normalizeState(state), null, 2)); await rename(temp, path); } async function mutateLocalState(mutator: (state: AppState) => T): Promise { const run = localWriteQueue.then(async () => { const state = await readState(); const result = mutator(state); await writeState(state); return result; }); localWriteQueue = run.catch(() => undefined); return run; } function normalizeState(raw: Partial): AppState { return { users: raw.users?.length ? raw.users : [{ id: DEFAULT_OWNER_ID, email: "demo@zhinian.local", displayName: "智念演示用户" }], assets: raw.assets || [], generationJobs: raw.generationJobs || [], usageEvents: raw.usageEvents || [], projects: raw.projects || [] }; } function getSupabaseAdmin(): SupabaseClient | null { const url = process.env.NEXT_PUBLIC_SUPABASE_URL; const serviceRoleKey = process.env.SUPABASE_SERVICE_ROLE_KEY; if (!url || !serviceRoleKey) return null; return createClient(url, serviceRoleKey, { auth: { persistSession: false } }); } function sortNewest(a: T, b: T): number { return b.createdAt.localeCompare(a.createdAt); } function isClaimableJob(job: GenerationJob, nowIso: string, staleBefore: string): boolean { if (["succeeded", "failed", "expired", "cancelled"].includes(job.status)) return false; if ((job.scheduledAt || job.createdAt) > nowIso) return false; return !job.lockedAt || job.lockedAt < staleBefore; } function sortClaimableJobs(a: GenerationJob, b: GenerationJob): number { const priority = (b.priority || 0) - (a.priority || 0); if (priority !== 0) return priority; const scheduled = (a.scheduledAt || a.createdAt).localeCompare(b.scheduledAt || b.createdAt); if (scheduled !== 0) return scheduled; return a.createdAt.localeCompare(b.createdAt); } function assetToRow(asset: Partial) { return { id: asset.id, owner_id: asset.ownerId, kind: asset.kind, name: asset.name, url: asset.url, storage_path: asset.storagePath, source: asset.source, tags: asset.tags, metadata: asset.metadata, created_at: asset.createdAt, updated_at: asset.updatedAt }; } function assetFromRow(row: Record): Asset { return { id: String(row.id), ownerId: String(row.owner_id), kind: row.kind as Asset["kind"], name: String(row.name), url: String(row.url), storagePath: row.storage_path ? String(row.storage_path) : undefined, source: row.source as Asset["source"], tags: Array.isArray(row.tags) ? row.tags.map(String) : [], metadata: isRecord(row.metadata) ? row.metadata : {}, createdAt: String(row.created_at), updatedAt: String(row.updated_at) }; } function jobToRow(job: Partial) { const row: Record = {}; if (job.id !== undefined) row.id = job.id; if (job.ownerId !== undefined) row.owner_id = job.ownerId; if (job.externalClientId !== undefined) row.external_client_id = job.externalClientId; if (job.capability !== undefined) row.capability = job.capability; if (job.provider !== undefined) row.provider = job.provider; if (job.reqKey !== undefined) row.req_key = job.reqKey; if (job.status !== undefined) row.status = job.status; if (job.prompt !== undefined) row.prompt = job.prompt; if (job.inputAssetIds !== undefined) row.input_asset_ids = job.inputAssetIds; if (job.inputUrls !== undefined) row.input_urls = job.inputUrls; if (job.outputAssetIds !== undefined) row.output_asset_ids = job.outputAssetIds; if (job.providerTaskId !== undefined) row.provider_task_id = job.providerTaskId; if (job.requestPayload !== undefined) row.request_payload = job.requestPayload; if (job.responsePayload !== undefined) row.response_payload = job.responsePayload; if (job.error !== undefined) row.error = job.error; if (job.retryOf !== undefined) row.retry_of = job.retryOf; if (job.idempotencyKey !== undefined) row.idempotency_key = job.idempotencyKey; if (job.idempotencyFingerprint !== undefined) row.idempotency_fingerprint = job.idempotencyFingerprint; if (job.priority !== undefined) row.priority = job.priority; if (job.attempts !== undefined) row.attempts = job.attempts; if (job.maxAttempts !== undefined) row.max_attempts = job.maxAttempts; if (job.scheduledAt !== undefined) row.scheduled_at = job.scheduledAt; if (job.lockedAt !== undefined) row.locked_at = job.lockedAt; if (job.lockedBy !== undefined) row.locked_by = job.lockedBy; if (job.startedAt !== undefined) row.started_at = job.startedAt; if (job.completedAt !== undefined) row.completed_at = job.completedAt; if (job.webhookUrl !== undefined) row.webhook_url = job.webhookUrl; if (job.webhookAttempts !== undefined) row.webhook_attempts = job.webhookAttempts; if (job.webhookLastStatus !== undefined) row.webhook_last_status = job.webhookLastStatus; if (job.createdAt !== undefined) row.created_at = job.createdAt; if (job.updatedAt !== undefined) row.updated_at = job.updatedAt; return row; } function jobFromRow(row: Record): GenerationJob { return { id: String(row.id), ownerId: String(row.owner_id), externalClientId: optionalString(row.external_client_id), capability: row.capability as GenerationJob["capability"], provider: row.provider as GenerationJob["provider"], reqKey: String(row.req_key), status: row.status as GenerationJob["status"], prompt: row.prompt ? String(row.prompt) : undefined, inputAssetIds: Array.isArray(row.input_asset_ids) ? row.input_asset_ids.map(String) : [], inputUrls: Array.isArray(row.input_urls) ? row.input_urls.map(String) : [], outputAssetIds: Array.isArray(row.output_asset_ids) ? row.output_asset_ids.map(String) : [], providerTaskId: row.provider_task_id ? String(row.provider_task_id) : undefined, requestPayload: isRecord(row.request_payload) ? row.request_payload : {}, responsePayload: isRecord(row.response_payload) ? row.response_payload : undefined, error: isRecord(row.error) ? { message: String(row.error.message || "Unknown error"), code: row.error.code as string | number | undefined, retryable: Boolean(row.error.retryable) } : undefined, retryOf: row.retry_of ? String(row.retry_of) : undefined, idempotencyKey: optionalString(row.idempotency_key), idempotencyFingerprint: optionalString(row.idempotency_fingerprint), priority: optionalNumber(row.priority), attempts: optionalNumber(row.attempts), maxAttempts: optionalNumber(row.max_attempts), scheduledAt: optionalString(row.scheduled_at), lockedAt: optionalString(row.locked_at), lockedBy: optionalString(row.locked_by), startedAt: optionalString(row.started_at), completedAt: optionalString(row.completed_at), webhookUrl: optionalString(row.webhook_url), webhookAttempts: optionalNumber(row.webhook_attempts), webhookLastStatus: isRecord(row.webhook_last_status) ? { ok: Boolean(row.webhook_last_status.ok), status: optionalNumber(row.webhook_last_status.status), error: optionalString(row.webhook_last_status.error), attemptedAt: String(row.webhook_last_status.attemptedAt || row.webhook_last_status.attempted_at || ""), nextAttemptAt: optionalString(row.webhook_last_status.nextAttemptAt || row.webhook_last_status.next_attempt_at) } : undefined, createdAt: String(row.created_at), updatedAt: String(row.updated_at) }; } function usageToRow(usage: UsageEvent) { return { id: usage.id, owner_id: usage.ownerId, job_id: usage.jobId, capability: usage.capability, quantity: usage.quantity, estimated_unit: usage.estimatedUnit, created_at: usage.createdAt }; } function usageFromRow(row: Record): UsageEvent { return { id: String(row.id), ownerId: String(row.owner_id), jobId: String(row.job_id), capability: row.capability as UsageEvent["capability"], quantity: Number(row.quantity || 0), estimatedUnit: row.estimated_unit as UsageEvent["estimatedUnit"], createdAt: String(row.created_at) }; } function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } function optionalString(value: unknown): string | undefined { if (typeof value !== "string") return undefined; const trimmed = value.trim(); return trimmed || undefined; } function optionalNumber(value: unknown): number | undefined { if (value === undefined || value === null || value === "") return undefined; const parsed = Number(value); return Number.isFinite(parsed) ? parsed : undefined; }