create table if not exists assets ( id text primary key, owner_id text not null, kind text not null, name text not null, url text not null, storage_path text, source text not null, tags text[] not null default '{}', metadata jsonb not null default '{}'::jsonb, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); create table if not exists generation_jobs ( id text primary key, owner_id text not null, external_client_id text, capability text not null, provider text not null, req_key text not null, status text not null, prompt text, input_asset_ids text[] not null default '{}', input_urls text[] not null default '{}', output_asset_ids text[] not null default '{}', provider_task_id text, request_payload jsonb not null default '{}'::jsonb, response_payload jsonb, error jsonb, retry_of text, idempotency_key text, idempotency_fingerprint text, priority integer not null default 0, attempts integer not null default 0, max_attempts integer not null default 3, scheduled_at timestamptz not null default now(), locked_at timestamptz, locked_by text, started_at timestamptz, completed_at timestamptz, webhook_url text, webhook_attempts integer not null default 0, webhook_last_status jsonb, created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); alter table generation_jobs add column if not exists external_client_id text; alter table generation_jobs add column if not exists idempotency_key text; alter table generation_jobs add column if not exists idempotency_fingerprint text; alter table generation_jobs add column if not exists priority integer not null default 0; alter table generation_jobs add column if not exists attempts integer not null default 0; alter table generation_jobs add column if not exists max_attempts integer not null default 3; alter table generation_jobs add column if not exists scheduled_at timestamptz not null default now(); alter table generation_jobs add column if not exists locked_at timestamptz; alter table generation_jobs add column if not exists locked_by text; alter table generation_jobs add column if not exists started_at timestamptz; alter table generation_jobs add column if not exists completed_at timestamptz; alter table generation_jobs add column if not exists webhook_url text; alter table generation_jobs add column if not exists webhook_attempts integer not null default 0; alter table generation_jobs add column if not exists webhook_last_status jsonb; create table if not exists usage_events ( id text primary key, owner_id text not null, job_id text not null references generation_jobs(id) on delete cascade, capability text not null, quantity integer not null default 1, estimated_unit text not null default 'image', created_at timestamptz not null default now() ); create table if not exists projects ( id text primary key, owner_id text not null, name text not null, brief text not null default '', type text not null default 'custom', asset_ids text[] not null default '{}', created_at timestamptz not null default now(), updated_at timestamptz not null default now() ); create index if not exists assets_owner_created_idx on assets(owner_id, created_at desc); create index if not exists generation_jobs_owner_created_idx on generation_jobs(owner_id, created_at desc); create index if not exists generation_jobs_status_idx on generation_jobs(status); create index if not exists generation_jobs_claim_idx on generation_jobs(status, scheduled_at, locked_at, priority desc); create index if not exists generation_jobs_external_client_idx on generation_jobs(owner_id, external_client_id, created_at desc); create unique index if not exists generation_jobs_idempotency_idx on generation_jobs(owner_id, external_client_id, idempotency_key) where external_client_id is not null and idempotency_key is not null; create index if not exists usage_events_owner_created_idx on usage_events(owner_id, created_at desc); create or replace function claim_generation_jobs( p_worker_id text, p_limit integer default 1, p_lock_timeout_seconds integer default 300 ) returns setof generation_jobs language plpgsql as $$ declare v_now timestamptz := now(); begin return query with candidates as ( select id from generation_jobs where status in ('queued', 'running') and coalesce(scheduled_at, created_at) <= v_now and ( locked_at is null or locked_at < v_now - make_interval(secs => p_lock_timeout_seconds) ) order by coalesce(priority, 0) desc, coalesce(scheduled_at, created_at) asc, created_at asc limit greatest(1, least(coalesce(p_limit, 1), 20)) for update skip locked ), updated as ( update generation_jobs set locked_at = v_now, locked_by = p_worker_id, started_at = coalesce(generation_jobs.started_at, v_now), updated_at = v_now where id in (select id from candidates) returning generation_jobs.* ) select * from updated; end; $$;