166 lines
5.4 KiB
TypeScript
166 lines
5.4 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
|
import {
|
|
claimGenerationJobs,
|
|
clearGenerationJobLock,
|
|
getGenerationJob,
|
|
updateGenerationJob
|
|
} from "@/lib/server/data-store";
|
|
import { advanceImageJob } from "@/lib/server/generation-service";
|
|
import { advanceVideoJob } from "@/lib/server/video-generation-service";
|
|
import { recordAppLog } from "@/lib/server/log-manager";
|
|
import { requestOrigin } from "@/lib/server/runtime";
|
|
import { deliverJobWebhook } from "@/lib/server/webhook";
|
|
import type { GenerationJob, GenerationStatus } from "@/lib/types";
|
|
|
|
export type WorkerTickResult = {
|
|
workerId: string;
|
|
claimed: number;
|
|
jobs: Array<{
|
|
id: string;
|
|
status: GenerationStatus;
|
|
action: "processed" | "retry_scheduled" | "released" | "failed";
|
|
error?: string;
|
|
}>;
|
|
};
|
|
|
|
const TERMINAL_STATUSES = new Set<GenerationStatus>(["succeeded", "failed", "expired", "cancelled"]);
|
|
|
|
export async function runWorkerTick(input: {
|
|
request?: Request;
|
|
origin?: string;
|
|
workerId?: string;
|
|
limit?: number;
|
|
} = {}): Promise<WorkerTickResult> {
|
|
const workerId = input.workerId || `worker-${randomUUID()}`;
|
|
const origin = input.origin || (input.request ? requestOrigin(input.request) : workerOrigin());
|
|
const jobs = await claimGenerationJobs({
|
|
workerId,
|
|
limit: input.limit || workerBatchSize(),
|
|
lockTimeoutMs: workerLockTimeoutMs()
|
|
});
|
|
const result: WorkerTickResult = {
|
|
workerId,
|
|
claimed: jobs.length,
|
|
jobs: []
|
|
};
|
|
|
|
for (const job of jobs) {
|
|
try {
|
|
const advanced = await advanceClaimedJob(job, origin);
|
|
const settled = await settleAdvancedJob(advanced);
|
|
result.jobs.push({
|
|
id: settled.job.id,
|
|
status: settled.job.status,
|
|
action: settled.action
|
|
});
|
|
} catch (error) {
|
|
await recordAppLog({
|
|
level: "error",
|
|
source: "worker.job",
|
|
message: `任务处理失败:${job.id}`,
|
|
error,
|
|
details: {
|
|
jobId: job.id,
|
|
capability: job.capability,
|
|
provider: job.provider,
|
|
attempts: job.attempts,
|
|
workerId
|
|
}
|
|
}).catch(() => undefined);
|
|
const failed = await updateGenerationJob(job.id, {
|
|
status: "failed",
|
|
error: {
|
|
message: error instanceof Error ? error.message : String(error),
|
|
retryable: true
|
|
}
|
|
});
|
|
const settled = await settleAdvancedJob(failed);
|
|
result.jobs.push({
|
|
id: settled.job.id,
|
|
status: settled.job.status,
|
|
action: "failed",
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async function advanceClaimedJob(job: GenerationJob, origin: string): Promise<GenerationJob> {
|
|
if (job.capability === "video.generate") return advanceVideoJob(job.id, origin);
|
|
return advanceImageJob(job.id, origin);
|
|
}
|
|
|
|
async function settleAdvancedJob(job: GenerationJob): Promise<{
|
|
job: GenerationJob;
|
|
action: WorkerTickResult["jobs"][number]["action"];
|
|
}> {
|
|
const current = await getGenerationJob(job.id) || job;
|
|
const now = new Date();
|
|
|
|
if (current.status === "failed" && canRetry(current)) {
|
|
const attempts = (current.attempts || 0) + 1;
|
|
const scheduledAt = new Date(now.getTime() + retryDelayMs(attempts)).toISOString();
|
|
const retryJob = await clearGenerationJobLock(current.id, {
|
|
status: "queued",
|
|
attempts,
|
|
scheduledAt
|
|
}, { clearProviderTaskId: true });
|
|
return { job: retryJob, action: "retry_scheduled" };
|
|
}
|
|
|
|
if (TERMINAL_STATUSES.has(current.status)) {
|
|
const terminalJob = await clearGenerationJobLock(current.id, {
|
|
attempts: current.status === "failed" ? (current.attempts || 0) + 1 : current.attempts,
|
|
completedAt: current.completedAt || now.toISOString()
|
|
});
|
|
const webhook = await deliverJobWebhook(terminalJob);
|
|
if (webhook.lastStatus) {
|
|
const withWebhook = await updateGenerationJob(terminalJob.id, {
|
|
webhookAttempts: webhook.attempts,
|
|
webhookLastStatus: webhook.lastStatus
|
|
});
|
|
return { job: withWebhook, action: "processed" };
|
|
}
|
|
return { job: terminalJob, action: "processed" };
|
|
}
|
|
|
|
const scheduledAt = new Date(now.getTime() + workerPollIntervalMs()).toISOString();
|
|
const released = await clearGenerationJobLock(current.id, { scheduledAt });
|
|
return { job: released, action: "released" };
|
|
}
|
|
|
|
function canRetry(job: GenerationJob): boolean {
|
|
const attempts = job.attempts || 0;
|
|
const maxAttempts = job.maxAttempts || 3;
|
|
return Boolean(job.error?.retryable) && attempts < maxAttempts;
|
|
}
|
|
|
|
function retryDelayMs(attempts: number): number {
|
|
const base = readPositiveInt("ZHINIAN_WORKER_RETRY_BASE_MS", 10_000);
|
|
const max = readPositiveInt("ZHINIAN_WORKER_RETRY_MAX_MS", 5 * 60 * 1000);
|
|
return Math.min(max, base * 2 ** Math.max(0, attempts - 1));
|
|
}
|
|
|
|
function workerPollIntervalMs(): number {
|
|
return readPositiveInt("ZHINIAN_WORKER_POLL_INTERVAL_MS", 5_000);
|
|
}
|
|
|
|
function workerLockTimeoutMs(): number {
|
|
return readPositiveInt("ZHINIAN_WORKER_LOCK_TIMEOUT_MS", 5 * 60 * 1000);
|
|
}
|
|
|
|
function workerBatchSize(): number {
|
|
return Math.max(1, Math.min(readPositiveInt("ZHINIAN_WORKER_BATCH_SIZE", 3), 20));
|
|
}
|
|
|
|
function workerOrigin(): string {
|
|
return (process.env.NEXT_PUBLIC_APP_URL || process.env.ZHINIAN_PUBLIC_BASE_URL || "http://127.0.0.1:3000").replace(/\/$/, "");
|
|
}
|
|
|
|
function readPositiveInt(name: string, fallback: number): number {
|
|
const parsed = Number(process.env[name]);
|
|
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : fallback;
|
|
}
|