feat: add task workflow and asset downloads
This commit is contained in:
151
lib/server/task-manager.ts
Normal file
151
lib/server/task-manager.ts
Normal file
@@ -0,0 +1,151 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import {
|
||||
claimGenerationJobs,
|
||||
clearGenerationJobLock,
|
||||
getGenerationJob,
|
||||
updateGenerationJob
|
||||
} from "@/lib/server/data-store";
|
||||
import { advanceImageJob } from "@/lib/server/generation-service";
|
||||
import { advanceVideoJob } from "@/lib/server/video-generation-service";
|
||||
import { requestOrigin } from "@/lib/server/runtime";
|
||||
import { deliverJobWebhook } from "@/lib/server/webhook";
|
||||
import type { GenerationJob, GenerationStatus } from "@/lib/types";
|
||||
|
||||
export type WorkerTickResult = {
|
||||
workerId: string;
|
||||
claimed: number;
|
||||
jobs: Array<{
|
||||
id: string;
|
||||
status: GenerationStatus;
|
||||
action: "processed" | "retry_scheduled" | "released" | "failed";
|
||||
error?: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
const TERMINAL_STATUSES = new Set<GenerationStatus>(["succeeded", "failed", "expired", "cancelled"]);
|
||||
|
||||
export async function runWorkerTick(input: {
|
||||
request?: Request;
|
||||
origin?: string;
|
||||
workerId?: string;
|
||||
limit?: number;
|
||||
} = {}): Promise<WorkerTickResult> {
|
||||
const workerId = input.workerId || `worker-${randomUUID()}`;
|
||||
const origin = input.origin || (input.request ? requestOrigin(input.request) : workerOrigin());
|
||||
const jobs = await claimGenerationJobs({
|
||||
workerId,
|
||||
limit: input.limit || workerBatchSize(),
|
||||
lockTimeoutMs: workerLockTimeoutMs()
|
||||
});
|
||||
const result: WorkerTickResult = {
|
||||
workerId,
|
||||
claimed: jobs.length,
|
||||
jobs: []
|
||||
};
|
||||
|
||||
for (const job of jobs) {
|
||||
try {
|
||||
const advanced = await advanceClaimedJob(job, origin);
|
||||
const settled = await settleAdvancedJob(advanced);
|
||||
result.jobs.push({
|
||||
id: settled.job.id,
|
||||
status: settled.job.status,
|
||||
action: settled.action
|
||||
});
|
||||
} catch (error) {
|
||||
const failed = await updateGenerationJob(job.id, {
|
||||
status: "failed",
|
||||
error: {
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
retryable: true
|
||||
}
|
||||
});
|
||||
const settled = await settleAdvancedJob(failed);
|
||||
result.jobs.push({
|
||||
id: settled.job.id,
|
||||
status: settled.job.status,
|
||||
action: "failed",
|
||||
error: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async function advanceClaimedJob(job: GenerationJob, origin: string): Promise<GenerationJob> {
|
||||
if (job.capability === "video.generate") return advanceVideoJob(job.id, origin);
|
||||
return advanceImageJob(job.id, origin);
|
||||
}
|
||||
|
||||
async function settleAdvancedJob(job: GenerationJob): Promise<{
|
||||
job: GenerationJob;
|
||||
action: WorkerTickResult["jobs"][number]["action"];
|
||||
}> {
|
||||
const current = await getGenerationJob(job.id) || job;
|
||||
const now = new Date();
|
||||
|
||||
if (current.status === "failed" && canRetry(current)) {
|
||||
const attempts = (current.attempts || 0) + 1;
|
||||
const scheduledAt = new Date(now.getTime() + retryDelayMs(attempts)).toISOString();
|
||||
const retryJob = await clearGenerationJobLock(current.id, {
|
||||
status: "queued",
|
||||
attempts,
|
||||
scheduledAt
|
||||
}, { clearProviderTaskId: true });
|
||||
return { job: retryJob, action: "retry_scheduled" };
|
||||
}
|
||||
|
||||
if (TERMINAL_STATUSES.has(current.status)) {
|
||||
const terminalJob = await clearGenerationJobLock(current.id, {
|
||||
attempts: current.status === "failed" ? (current.attempts || 0) + 1 : current.attempts,
|
||||
completedAt: current.completedAt || now.toISOString()
|
||||
});
|
||||
const webhook = await deliverJobWebhook(terminalJob);
|
||||
if (webhook.lastStatus) {
|
||||
const withWebhook = await updateGenerationJob(terminalJob.id, {
|
||||
webhookAttempts: webhook.attempts,
|
||||
webhookLastStatus: webhook.lastStatus
|
||||
});
|
||||
return { job: withWebhook, action: "processed" };
|
||||
}
|
||||
return { job: terminalJob, action: "processed" };
|
||||
}
|
||||
|
||||
const scheduledAt = new Date(now.getTime() + workerPollIntervalMs()).toISOString();
|
||||
const released = await clearGenerationJobLock(current.id, { scheduledAt });
|
||||
return { job: released, action: "released" };
|
||||
}
|
||||
|
||||
function canRetry(job: GenerationJob): boolean {
|
||||
const attempts = job.attempts || 0;
|
||||
const maxAttempts = job.maxAttempts || 3;
|
||||
return Boolean(job.error?.retryable) && attempts < maxAttempts;
|
||||
}
|
||||
|
||||
function retryDelayMs(attempts: number): number {
|
||||
const base = readPositiveInt("ZHINIAN_WORKER_RETRY_BASE_MS", 10_000);
|
||||
const max = readPositiveInt("ZHINIAN_WORKER_RETRY_MAX_MS", 5 * 60 * 1000);
|
||||
return Math.min(max, base * 2 ** Math.max(0, attempts - 1));
|
||||
}
|
||||
|
||||
function workerPollIntervalMs(): number {
|
||||
return readPositiveInt("ZHINIAN_WORKER_POLL_INTERVAL_MS", 5_000);
|
||||
}
|
||||
|
||||
function workerLockTimeoutMs(): number {
|
||||
return readPositiveInt("ZHINIAN_WORKER_LOCK_TIMEOUT_MS", 5 * 60 * 1000);
|
||||
}
|
||||
|
||||
function workerBatchSize(): number {
|
||||
return Math.max(1, Math.min(readPositiveInt("ZHINIAN_WORKER_BATCH_SIZE", 3), 20));
|
||||
}
|
||||
|
||||
function workerOrigin(): string {
|
||||
return (process.env.NEXT_PUBLIC_APP_URL || process.env.ZHINIAN_PUBLIC_BASE_URL || "http://127.0.0.1:3000").replace(/\/$/, "");
|
||||
}
|
||||
|
||||
function readPositiveInt(name: string, fallback: number): number {
|
||||
const parsed = Number(process.env[name]);
|
||||
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : fallback;
|
||||
}
|
||||
Reference in New Issue
Block a user