Files
NianAIGC/lib/server/task-manager.ts
2026-06-03 12:03:14 +08:00

166 lines
5.4 KiB
TypeScript

import { randomUUID } from "node:crypto";
import {
claimGenerationJobs,
clearGenerationJobLock,
getGenerationJob,
updateGenerationJob
} from "@/lib/server/data-store";
import { advanceImageJob } from "@/lib/server/generation-service";
import { advanceVideoJob } from "@/lib/server/video-generation-service";
import { recordAppLog } from "@/lib/server/log-manager";
import { requestOrigin } from "@/lib/server/runtime";
import { deliverJobWebhook } from "@/lib/server/webhook";
import type { GenerationJob, GenerationStatus } from "@/lib/types";
export type WorkerTickResult = {
workerId: string;
claimed: number;
jobs: Array<{
id: string;
status: GenerationStatus;
action: "processed" | "retry_scheduled" | "released" | "failed";
error?: string;
}>;
};
const TERMINAL_STATUSES = new Set<GenerationStatus>(["succeeded", "failed", "expired", "cancelled"]);
export async function runWorkerTick(input: {
request?: Request;
origin?: string;
workerId?: string;
limit?: number;
} = {}): Promise<WorkerTickResult> {
const workerId = input.workerId || `worker-${randomUUID()}`;
const origin = input.origin || (input.request ? requestOrigin(input.request) : workerOrigin());
const jobs = await claimGenerationJobs({
workerId,
limit: input.limit || workerBatchSize(),
lockTimeoutMs: workerLockTimeoutMs()
});
const result: WorkerTickResult = {
workerId,
claimed: jobs.length,
jobs: []
};
for (const job of jobs) {
try {
const advanced = await advanceClaimedJob(job, origin);
const settled = await settleAdvancedJob(advanced);
result.jobs.push({
id: settled.job.id,
status: settled.job.status,
action: settled.action
});
} catch (error) {
await recordAppLog({
level: "error",
source: "worker.job",
message: `任务处理失败:${job.id}`,
error,
details: {
jobId: job.id,
capability: job.capability,
provider: job.provider,
attempts: job.attempts,
workerId
}
}).catch(() => undefined);
const failed = await updateGenerationJob(job.id, {
status: "failed",
error: {
message: error instanceof Error ? error.message : String(error),
retryable: true
}
});
const settled = await settleAdvancedJob(failed);
result.jobs.push({
id: settled.job.id,
status: settled.job.status,
action: "failed",
error: error instanceof Error ? error.message : String(error)
});
}
}
return result;
}
async function advanceClaimedJob(job: GenerationJob, origin: string): Promise<GenerationJob> {
if (job.capability === "video.generate") return advanceVideoJob(job.id, origin);
return advanceImageJob(job.id, origin);
}
async function settleAdvancedJob(job: GenerationJob): Promise<{
job: GenerationJob;
action: WorkerTickResult["jobs"][number]["action"];
}> {
const current = await getGenerationJob(job.id) || job;
const now = new Date();
if (current.status === "failed" && canRetry(current)) {
const attempts = (current.attempts || 0) + 1;
const scheduledAt = new Date(now.getTime() + retryDelayMs(attempts)).toISOString();
const retryJob = await clearGenerationJobLock(current.id, {
status: "queued",
attempts,
scheduledAt
}, { clearProviderTaskId: true });
return { job: retryJob, action: "retry_scheduled" };
}
if (TERMINAL_STATUSES.has(current.status)) {
const terminalJob = await clearGenerationJobLock(current.id, {
attempts: current.status === "failed" ? (current.attempts || 0) + 1 : current.attempts,
completedAt: current.completedAt || now.toISOString()
});
const webhook = await deliverJobWebhook(terminalJob);
if (webhook.lastStatus) {
const withWebhook = await updateGenerationJob(terminalJob.id, {
webhookAttempts: webhook.attempts,
webhookLastStatus: webhook.lastStatus
});
return { job: withWebhook, action: "processed" };
}
return { job: terminalJob, action: "processed" };
}
const scheduledAt = new Date(now.getTime() + workerPollIntervalMs()).toISOString();
const released = await clearGenerationJobLock(current.id, { scheduledAt });
return { job: released, action: "released" };
}
function canRetry(job: GenerationJob): boolean {
const attempts = job.attempts || 0;
const maxAttempts = job.maxAttempts || 3;
return Boolean(job.error?.retryable) && attempts < maxAttempts;
}
function retryDelayMs(attempts: number): number {
const base = readPositiveInt("ZHINIAN_WORKER_RETRY_BASE_MS", 10_000);
const max = readPositiveInt("ZHINIAN_WORKER_RETRY_MAX_MS", 5 * 60 * 1000);
return Math.min(max, base * 2 ** Math.max(0, attempts - 1));
}
function workerPollIntervalMs(): number {
return readPositiveInt("ZHINIAN_WORKER_POLL_INTERVAL_MS", 5_000);
}
function workerLockTimeoutMs(): number {
return readPositiveInt("ZHINIAN_WORKER_LOCK_TIMEOUT_MS", 5 * 60 * 1000);
}
function workerBatchSize(): number {
return Math.max(1, Math.min(readPositiveInt("ZHINIAN_WORKER_BATCH_SIZE", 3), 20));
}
function workerOrigin(): string {
return (process.env.NEXT_PUBLIC_APP_URL || process.env.ZHINIAN_PUBLIC_BASE_URL || "http://127.0.0.1:3000").replace(/\/$/, "");
}
function readPositiveInt(name: string, fallback: number): number {
const parsed = Number(process.env[name]);
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : fallback;
}