import { randomUUID } from "node:crypto"; import { claimGenerationJobs, clearGenerationJobLock, getGenerationJob, updateGenerationJob } from "@/lib/server/data-store"; import { advanceImageJob } from "@/lib/server/generation-service"; import { advanceVideoJob } from "@/lib/server/video-generation-service"; import { recordAppLog } from "@/lib/server/log-manager"; import { requestOrigin } from "@/lib/server/runtime"; import { deliverJobWebhook } from "@/lib/server/webhook"; import type { GenerationJob, GenerationStatus } from "@/lib/types"; export type WorkerTickResult = { workerId: string; claimed: number; jobs: Array<{ id: string; status: GenerationStatus; action: "processed" | "retry_scheduled" | "released" | "failed"; error?: string; }>; }; const TERMINAL_STATUSES = new Set(["succeeded", "failed", "expired", "cancelled"]); export async function runWorkerTick(input: { request?: Request; origin?: string; workerId?: string; limit?: number; } = {}): Promise { const workerId = input.workerId || `worker-${randomUUID()}`; const origin = input.origin || (input.request ? requestOrigin(input.request) : workerOrigin()); const jobs = await claimGenerationJobs({ workerId, limit: input.limit || workerBatchSize(), lockTimeoutMs: workerLockTimeoutMs() }); const result: WorkerTickResult = { workerId, claimed: jobs.length, jobs: [] }; for (const job of jobs) { try { const advanced = await advanceClaimedJob(job, origin); const settled = await settleAdvancedJob(advanced); result.jobs.push({ id: settled.job.id, status: settled.job.status, action: settled.action }); } catch (error) { await recordAppLog({ level: "error", source: "worker.job", message: `任务处理失败:${job.id}`, error, details: { jobId: job.id, capability: job.capability, provider: job.provider, attempts: job.attempts, workerId } }).catch(() => undefined); const failed = await updateGenerationJob(job.id, { status: "failed", error: { message: error instanceof Error ? error.message : String(error), retryable: true } }); const settled = await settleAdvancedJob(failed); result.jobs.push({ id: settled.job.id, status: settled.job.status, action: "failed", error: error instanceof Error ? error.message : String(error) }); } } return result; } async function advanceClaimedJob(job: GenerationJob, origin: string): Promise { if (job.capability === "video.generate") return advanceVideoJob(job.id, origin); return advanceImageJob(job.id, origin); } async function settleAdvancedJob(job: GenerationJob): Promise<{ job: GenerationJob; action: WorkerTickResult["jobs"][number]["action"]; }> { const current = await getGenerationJob(job.id) || job; const now = new Date(); if (current.status === "failed" && canRetry(current)) { const attempts = (current.attempts || 0) + 1; const scheduledAt = new Date(now.getTime() + retryDelayMs(attempts)).toISOString(); const retryJob = await clearGenerationJobLock(current.id, { status: "queued", attempts, scheduledAt }, { clearProviderTaskId: true }); return { job: retryJob, action: "retry_scheduled" }; } if (TERMINAL_STATUSES.has(current.status)) { const terminalJob = await clearGenerationJobLock(current.id, { attempts: current.status === "failed" ? (current.attempts || 0) + 1 : current.attempts, completedAt: current.completedAt || now.toISOString() }); const webhook = await deliverJobWebhook(terminalJob); if (webhook.lastStatus) { const withWebhook = await updateGenerationJob(terminalJob.id, { webhookAttempts: webhook.attempts, webhookLastStatus: webhook.lastStatus }); return { job: withWebhook, action: "processed" }; } return { job: terminalJob, action: "processed" }; } const scheduledAt = new Date(now.getTime() + workerPollIntervalMs()).toISOString(); const released = await clearGenerationJobLock(current.id, { scheduledAt }); return { job: released, action: "released" }; } function canRetry(job: GenerationJob): boolean { const attempts = job.attempts || 0; const maxAttempts = job.maxAttempts || 3; return Boolean(job.error?.retryable) && attempts < maxAttempts; } function retryDelayMs(attempts: number): number { const base = readPositiveInt("ZHINIAN_WORKER_RETRY_BASE_MS", 10_000); const max = readPositiveInt("ZHINIAN_WORKER_RETRY_MAX_MS", 5 * 60 * 1000); return Math.min(max, base * 2 ** Math.max(0, attempts - 1)); } function workerPollIntervalMs(): number { return readPositiveInt("ZHINIAN_WORKER_POLL_INTERVAL_MS", 5_000); } function workerLockTimeoutMs(): number { return readPositiveInt("ZHINIAN_WORKER_LOCK_TIMEOUT_MS", 5 * 60 * 1000); } function workerBatchSize(): number { return Math.max(1, Math.min(readPositiveInt("ZHINIAN_WORKER_BATCH_SIZE", 3), 20)); } function workerOrigin(): string { return (process.env.NEXT_PUBLIC_APP_URL || process.env.ZHINIAN_PUBLIC_BASE_URL || "http://127.0.0.1:3000").replace(/\/$/, ""); } function readPositiveInt(name: string, fallback: number): number { const parsed = Number(process.env[name]); return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : fallback; }