Files
NianAIGC/lib/server/data-store.ts
2026-05-29 10:26:02 +08:00

358 lines
13 KiB
TypeScript

import { readFile, rename, writeFile } from "node:fs/promises";
import { join } from "node:path";
import { createClient, type SupabaseClient } from "@supabase/supabase-js";
import type { AppState, Asset, GenerationJob, Project, UsageEvent } from "@/lib/types";
import { createId } from "@/lib/server/ids";
import { dataDir, DEFAULT_OWNER_ID, ensureRuntimeDirs } from "@/lib/server/runtime";
const STORE_FILE = "web-app-state.json";
let localWriteQueue: Promise<unknown> = Promise.resolve();
type AssetInput = Omit<Asset, "id" | "createdAt" | "updatedAt"> & Partial<Pick<Asset, "id" | "createdAt" | "updatedAt">>;
type JobInput = Omit<GenerationJob, "id" | "createdAt" | "updatedAt"> & Partial<Pick<GenerationJob, "id" | "createdAt" | "updatedAt">>;
type UsageInput = Omit<UsageEvent, "id" | "createdAt"> & Partial<Pick<UsageEvent, "id" | "createdAt">>;
export async function listAssets(ownerId = DEFAULT_OWNER_ID): Promise<Asset[]> {
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase
.from("assets")
.select("*")
.eq("owner_id", ownerId)
.order("created_at", { ascending: false });
if (error) throw new Error(error.message);
return (data || []).map(assetFromRow);
}
const state = await readState();
return state.assets.filter((asset) => asset.ownerId === ownerId).sort(sortNewest);
}
export async function getAsset(id: string): Promise<Asset | null> {
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase.from("assets").select("*").eq("id", id).maybeSingle();
if (error) throw new Error(error.message);
return data ? assetFromRow(data) : null;
}
const state = await readState();
return state.assets.find((asset) => asset.id === id) || null;
}
export async function createAsset(input: AssetInput): Promise<Asset> {
const now = new Date().toISOString();
const asset: Asset = {
...input,
id: input.id || createId("asset"),
ownerId: input.ownerId || DEFAULT_OWNER_ID,
tags: input.tags || [],
metadata: input.metadata || {},
createdAt: input.createdAt || now,
updatedAt: input.updatedAt || now
};
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase.from("assets").insert(assetToRow(asset)).select("*").single();
if (error) throw new Error(error.message);
return assetFromRow(data);
}
return mutateLocalState((state) => {
state.assets.unshift(asset);
return asset;
});
}
export async function deleteAsset(id: string): Promise<Asset | null> {
const existing = await getAsset(id);
if (!existing) return null;
const supabase = getSupabaseAdmin();
if (supabase) {
const { error } = await supabase.from("assets").delete().eq("id", id);
if (error) throw new Error(error.message);
return existing;
}
return mutateLocalState((state) => {
state.assets = state.assets.filter((asset) => asset.id !== id);
state.generationJobs = state.generationJobs.map((job) => ({
...job,
inputAssetIds: job.inputAssetIds.filter((assetId) => assetId !== id),
outputAssetIds: job.outputAssetIds.filter((assetId) => assetId !== id)
}));
return existing;
});
}
export async function listGenerationJobs(ownerId = DEFAULT_OWNER_ID, limit = 200): Promise<GenerationJob[]> {
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase
.from("generation_jobs")
.select("*")
.eq("owner_id", ownerId)
.order("created_at", { ascending: false })
.limit(limit);
if (error) throw new Error(error.message);
return (data || []).map(jobFromRow);
}
const state = await readState();
return state.generationJobs.filter((job) => job.ownerId === ownerId).sort(sortNewest).slice(0, limit);
}
export async function getGenerationJob(id: string): Promise<GenerationJob | null> {
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase.from("generation_jobs").select("*").eq("id", id).maybeSingle();
if (error) throw new Error(error.message);
return data ? jobFromRow(data) : null;
}
const state = await readState();
return state.generationJobs.find((job) => job.id === id) || null;
}
export async function createGenerationJob(input: JobInput): Promise<GenerationJob> {
const now = new Date().toISOString();
const job: GenerationJob = {
...input,
id: input.id || createId("job"),
ownerId: input.ownerId || DEFAULT_OWNER_ID,
inputAssetIds: input.inputAssetIds || [],
inputUrls: input.inputUrls || [],
outputAssetIds: input.outputAssetIds || [],
requestPayload: input.requestPayload || {},
createdAt: input.createdAt || now,
updatedAt: input.updatedAt || now
};
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase.from("generation_jobs").insert(jobToRow(job)).select("*").single();
if (error) throw new Error(error.message);
return jobFromRow(data);
}
return mutateLocalState((state) => {
state.generationJobs.unshift(job);
return job;
});
}
export async function updateGenerationJob(id: string, patch: Partial<GenerationJob>): Promise<GenerationJob> {
const updatedAt = new Date().toISOString();
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase
.from("generation_jobs")
.update(jobToRow({ ...patch, updatedAt } as GenerationJob))
.eq("id", id)
.select("*")
.single();
if (error) throw new Error(error.message);
return jobFromRow(data);
}
return mutateLocalState((state) => {
const index = state.generationJobs.findIndex((job) => job.id === id);
if (index === -1) throw new Error(`Generation job not found: ${id}`);
state.generationJobs[index] = { ...state.generationJobs[index], ...patch, updatedAt };
return state.generationJobs[index];
});
}
export async function deleteGenerationJob(id: string): Promise<GenerationJob | null> {
const existing = await getGenerationJob(id);
if (!existing) return null;
const supabase = getSupabaseAdmin();
if (supabase) {
const { error: usageError } = await supabase.from("usage_events").delete().eq("job_id", id);
if (usageError) throw new Error(usageError.message);
const { error } = await supabase.from("generation_jobs").delete().eq("id", id);
if (error) throw new Error(error.message);
return existing;
}
return mutateLocalState((state) => {
state.generationJobs = state.generationJobs.filter((job) => job.id !== id);
state.usageEvents = state.usageEvents.filter((event) => event.jobId !== id);
return existing;
});
}
export async function recordUsageEvent(input: UsageInput): Promise<UsageEvent> {
const usage: UsageEvent = {
...input,
id: input.id || createId("usage"),
ownerId: input.ownerId || DEFAULT_OWNER_ID,
createdAt: input.createdAt || new Date().toISOString()
};
const supabase = getSupabaseAdmin();
if (supabase) {
const { data, error } = await supabase.from("usage_events").insert(usageToRow(usage)).select("*").single();
if (error) throw new Error(error.message);
return usageFromRow(data);
}
return mutateLocalState((state) => {
state.usageEvents.unshift(usage);
return usage;
});
}
export async function listProjects(ownerId = DEFAULT_OWNER_ID): Promise<Project[]> {
const state = await readState();
return state.projects.filter((project) => project.ownerId === ownerId).sort(sortNewest);
}
async function readState(): Promise<AppState> {
await ensureRuntimeDirs();
const path = join(dataDir(), STORE_FILE);
try {
return normalizeState(JSON.parse(await readFile(path, "utf8")));
} catch {
const state = normalizeState({});
await writeState(state);
return state;
}
}
async function writeState(state: AppState): Promise<void> {
await ensureRuntimeDirs();
const path = join(dataDir(), STORE_FILE);
const temp = `${path}.${createId("tmp")}.tmp`;
await writeFile(temp, JSON.stringify(normalizeState(state), null, 2));
await rename(temp, path);
}
async function mutateLocalState<T>(mutator: (state: AppState) => T): Promise<T> {
const run = localWriteQueue.then(async () => {
const state = await readState();
const result = mutator(state);
await writeState(state);
return result;
});
localWriteQueue = run.catch(() => undefined);
return run;
}
function normalizeState(raw: Partial<AppState>): AppState {
return {
users: raw.users?.length
? raw.users
: [{ id: DEFAULT_OWNER_ID, email: "demo@zhinian.local", displayName: "智念演示用户" }],
assets: raw.assets || [],
generationJobs: raw.generationJobs || [],
usageEvents: raw.usageEvents || [],
projects: raw.projects || []
};
}
function getSupabaseAdmin(): SupabaseClient | null {
const url = process.env.NEXT_PUBLIC_SUPABASE_URL;
const serviceRoleKey = process.env.SUPABASE_SERVICE_ROLE_KEY;
if (!url || !serviceRoleKey) return null;
return createClient(url, serviceRoleKey, {
auth: { persistSession: false }
});
}
function sortNewest<T extends { createdAt: string }>(a: T, b: T): number {
return b.createdAt.localeCompare(a.createdAt);
}
function assetToRow(asset: Partial<Asset>) {
return {
id: asset.id,
owner_id: asset.ownerId,
kind: asset.kind,
name: asset.name,
url: asset.url,
storage_path: asset.storagePath,
source: asset.source,
tags: asset.tags,
metadata: asset.metadata,
created_at: asset.createdAt,
updated_at: asset.updatedAt
};
}
function assetFromRow(row: Record<string, unknown>): Asset {
return {
id: String(row.id),
ownerId: String(row.owner_id),
kind: row.kind as Asset["kind"],
name: String(row.name),
url: String(row.url),
storagePath: row.storage_path ? String(row.storage_path) : undefined,
source: row.source as Asset["source"],
tags: Array.isArray(row.tags) ? row.tags.map(String) : [],
metadata: isRecord(row.metadata) ? row.metadata : {},
createdAt: String(row.created_at),
updatedAt: String(row.updated_at)
};
}
function jobToRow(job: Partial<GenerationJob>) {
const row: Record<string, unknown> = {};
if (job.id !== undefined) row.id = job.id;
if (job.ownerId !== undefined) row.owner_id = job.ownerId;
if (job.capability !== undefined) row.capability = job.capability;
if (job.provider !== undefined) row.provider = job.provider;
if (job.reqKey !== undefined) row.req_key = job.reqKey;
if (job.status !== undefined) row.status = job.status;
if (job.prompt !== undefined) row.prompt = job.prompt;
if (job.inputAssetIds !== undefined) row.input_asset_ids = job.inputAssetIds;
if (job.inputUrls !== undefined) row.input_urls = job.inputUrls;
if (job.outputAssetIds !== undefined) row.output_asset_ids = job.outputAssetIds;
if (job.providerTaskId !== undefined) row.provider_task_id = job.providerTaskId;
if (job.requestPayload !== undefined) row.request_payload = job.requestPayload;
if (job.responsePayload !== undefined) row.response_payload = job.responsePayload;
if (job.error !== undefined) row.error = job.error;
if (job.retryOf !== undefined) row.retry_of = job.retryOf;
if (job.createdAt !== undefined) row.created_at = job.createdAt;
if (job.updatedAt !== undefined) row.updated_at = job.updatedAt;
return row;
}
function jobFromRow(row: Record<string, unknown>): GenerationJob {
return {
id: String(row.id),
ownerId: String(row.owner_id),
capability: row.capability as GenerationJob["capability"],
provider: row.provider as GenerationJob["provider"],
reqKey: String(row.req_key),
status: row.status as GenerationJob["status"],
prompt: row.prompt ? String(row.prompt) : undefined,
inputAssetIds: Array.isArray(row.input_asset_ids) ? row.input_asset_ids.map(String) : [],
inputUrls: Array.isArray(row.input_urls) ? row.input_urls.map(String) : [],
outputAssetIds: Array.isArray(row.output_asset_ids) ? row.output_asset_ids.map(String) : [],
providerTaskId: row.provider_task_id ? String(row.provider_task_id) : undefined,
requestPayload: isRecord(row.request_payload) ? row.request_payload : {},
responsePayload: isRecord(row.response_payload) ? row.response_payload : undefined,
error: isRecord(row.error) ? { message: String(row.error.message || "Unknown error"), code: row.error.code as string | number | undefined, retryable: Boolean(row.error.retryable) } : undefined,
retryOf: row.retry_of ? String(row.retry_of) : undefined,
createdAt: String(row.created_at),
updatedAt: String(row.updated_at)
};
}
function usageToRow(usage: UsageEvent) {
return {
id: usage.id,
owner_id: usage.ownerId,
job_id: usage.jobId,
capability: usage.capability,
quantity: usage.quantity,
estimated_unit: usage.estimatedUnit,
created_at: usage.createdAt
};
}
function usageFromRow(row: Record<string, unknown>): UsageEvent {
return {
id: String(row.id),
ownerId: String(row.owner_id),
jobId: String(row.job_id),
capability: row.capability as UsageEvent["capability"],
quantity: Number(row.quantity || 0),
estimatedUnit: row.estimated_unit as UsageEvent["estimatedUnit"],
createdAt: String(row.created_at)
};
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}