132 lines
5.0 KiB
PL/PgSQL
132 lines
5.0 KiB
PL/PgSQL
create table if not exists assets (
|
|
id text primary key,
|
|
owner_id text not null,
|
|
kind text not null,
|
|
name text not null,
|
|
url text not null,
|
|
storage_path text,
|
|
source text not null,
|
|
tags text[] not null default '{}',
|
|
metadata jsonb not null default '{}'::jsonb,
|
|
created_at timestamptz not null default now(),
|
|
updated_at timestamptz not null default now()
|
|
);
|
|
|
|
create table if not exists generation_jobs (
|
|
id text primary key,
|
|
owner_id text not null,
|
|
external_client_id text,
|
|
capability text not null,
|
|
provider text not null,
|
|
req_key text not null,
|
|
status text not null,
|
|
prompt text,
|
|
input_asset_ids text[] not null default '{}',
|
|
input_urls text[] not null default '{}',
|
|
output_asset_ids text[] not null default '{}',
|
|
provider_task_id text,
|
|
request_payload jsonb not null default '{}'::jsonb,
|
|
response_payload jsonb,
|
|
error jsonb,
|
|
retry_of text,
|
|
idempotency_key text,
|
|
idempotency_fingerprint text,
|
|
priority integer not null default 0,
|
|
attempts integer not null default 0,
|
|
max_attempts integer not null default 3,
|
|
scheduled_at timestamptz not null default now(),
|
|
locked_at timestamptz,
|
|
locked_by text,
|
|
started_at timestamptz,
|
|
completed_at timestamptz,
|
|
webhook_url text,
|
|
webhook_attempts integer not null default 0,
|
|
webhook_last_status jsonb,
|
|
created_at timestamptz not null default now(),
|
|
updated_at timestamptz not null default now()
|
|
);
|
|
|
|
alter table generation_jobs add column if not exists external_client_id text;
|
|
alter table generation_jobs add column if not exists idempotency_key text;
|
|
alter table generation_jobs add column if not exists idempotency_fingerprint text;
|
|
alter table generation_jobs add column if not exists priority integer not null default 0;
|
|
alter table generation_jobs add column if not exists attempts integer not null default 0;
|
|
alter table generation_jobs add column if not exists max_attempts integer not null default 3;
|
|
alter table generation_jobs add column if not exists scheduled_at timestamptz not null default now();
|
|
alter table generation_jobs add column if not exists locked_at timestamptz;
|
|
alter table generation_jobs add column if not exists locked_by text;
|
|
alter table generation_jobs add column if not exists started_at timestamptz;
|
|
alter table generation_jobs add column if not exists completed_at timestamptz;
|
|
alter table generation_jobs add column if not exists webhook_url text;
|
|
alter table generation_jobs add column if not exists webhook_attempts integer not null default 0;
|
|
alter table generation_jobs add column if not exists webhook_last_status jsonb;
|
|
|
|
create table if not exists usage_events (
|
|
id text primary key,
|
|
owner_id text not null,
|
|
job_id text not null references generation_jobs(id) on delete cascade,
|
|
capability text not null,
|
|
quantity integer not null default 1,
|
|
estimated_unit text not null default 'image',
|
|
created_at timestamptz not null default now()
|
|
);
|
|
|
|
create table if not exists projects (
|
|
id text primary key,
|
|
owner_id text not null,
|
|
name text not null,
|
|
brief text not null default '',
|
|
type text not null default 'custom',
|
|
asset_ids text[] not null default '{}',
|
|
created_at timestamptz not null default now(),
|
|
updated_at timestamptz not null default now()
|
|
);
|
|
|
|
create index if not exists assets_owner_created_idx on assets(owner_id, created_at desc);
|
|
create index if not exists generation_jobs_owner_created_idx on generation_jobs(owner_id, created_at desc);
|
|
create index if not exists generation_jobs_status_idx on generation_jobs(status);
|
|
create index if not exists generation_jobs_claim_idx on generation_jobs(status, scheduled_at, locked_at, priority desc);
|
|
create index if not exists generation_jobs_external_client_idx on generation_jobs(owner_id, external_client_id, created_at desc);
|
|
create unique index if not exists generation_jobs_idempotency_idx
|
|
on generation_jobs(owner_id, external_client_id, idempotency_key)
|
|
where external_client_id is not null and idempotency_key is not null;
|
|
create index if not exists usage_events_owner_created_idx on usage_events(owner_id, created_at desc);
|
|
|
|
create or replace function claim_generation_jobs(
|
|
p_worker_id text,
|
|
p_limit integer default 1,
|
|
p_lock_timeout_seconds integer default 300
|
|
)
|
|
returns setof generation_jobs
|
|
language plpgsql
|
|
as $$
|
|
declare
|
|
v_now timestamptz := now();
|
|
begin
|
|
return query
|
|
with candidates as (
|
|
select id
|
|
from generation_jobs
|
|
where status in ('queued', 'running')
|
|
and coalesce(scheduled_at, created_at) <= v_now
|
|
and (
|
|
locked_at is null
|
|
or locked_at < v_now - make_interval(secs => p_lock_timeout_seconds)
|
|
)
|
|
order by coalesce(priority, 0) desc, coalesce(scheduled_at, created_at) asc, created_at asc
|
|
limit greatest(1, least(coalesce(p_limit, 1), 20))
|
|
for update skip locked
|
|
),
|
|
updated as (
|
|
update generation_jobs
|
|
set locked_at = v_now,
|
|
locked_by = p_worker_id,
|
|
started_at = coalesce(generation_jobs.started_at, v_now),
|
|
updated_at = v_now
|
|
where id in (select id from candidates)
|
|
returning generation_jobs.*
|
|
)
|
|
select * from updated;
|
|
end;
|
|
$$;
|