"""Projects, Graph Releases, and Ontology Schema endpoints.""" import json from pathlib import Path from typing import Any from fastapi import APIRouter, Depends, HTTPException from app.auth import CurrentUser from app.config import settings from app.db import get_conn from app.project_context import ProjectContext, get_project_context router = APIRouter() ROOT_DIR = Path(__file__).resolve().parents[2] def _as_dict(value: Any) -> dict[str, Any]: if isinstance(value, dict): return value if isinstance(value, str): try: parsed = json.loads(value) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: return {} return {} def _named_items(value: Any) -> list[tuple[str, dict[str, Any]]]: if isinstance(value, dict): return [(str(name), _as_dict(meta)) for name, meta in value.items()] if isinstance(value, list): items: list[tuple[str, dict[str, Any]]] = [] for index, item in enumerate(value): meta = _as_dict(item) name = str(meta.get("name") or meta.get("entity_type") or meta.get("relation_type") or f"Item{index + 1}") items.append((name, meta)) return items return [] def _field_names(value: Any) -> list[str]: if isinstance(value, dict): return [str(key) for key in value.keys()] if isinstance(value, list): return [str(item) for item in value] return [] def _schema_source_from_json(schema: dict[str, Any]) -> str: namespace = str(schema.get("namespace") or "schema") version = str(schema.get("version") or "") lines = [f"namespace {namespace}"] if version: lines.append(f"version {version}") purpose = str(schema.get("purpose") or "").strip() if purpose: lines.extend(["", f"// {purpose}"]) for name, meta in _named_items(schema.get("entity_types")): definition = str(meta.get("definition") or meta.get("description") or "").strip() lines.extend(["", f"entity {name}" + (f" // {definition}" if definition else "")]) primary_key = str(meta.get("primary_key") or meta.get("primaryKey") or "").strip() if primary_key: lines.append(f" primary_key {primary_key}") for field in _field_names(meta.get("fields")): lines.append(f" property {field}: Text") for name, meta in _named_items(schema.get("relation_types")): source = str(meta.get("from") or meta.get("source") or "").strip() target = str(meta.get("to") or meta.get("target") or "").strip() definition = str(meta.get("definition") or meta.get("description") or "").strip() arrow = f": {source} -> {target}" if source or target else "" lines.extend(["", f"relation {name}{arrow}" + (f" // {definition}" if definition else "")]) for prop in _field_names(meta.get("properties")): lines.append(f" property {prop}: Text") enums = _as_dict(schema.get("enums")) if enums: lines.append("") for name, values in enums.items(): lines.append(f"enum {name} = {' | '.join(_field_names(values))}") recipes = _as_dict(schema.get("query_recipes")) if recipes: lines.append("") lines.append("// query_recipes") for name, query in recipes.items(): lines.append(f"// {name}: {query}") return "\n".join(lines).strip() + "\n" def _schema_source_from_file(row: dict[str, Any], schema: dict[str, Any]) -> str | None: project_id = str(row.get("project_id") or "").strip() namespace = str(row.get("namespace") or schema.get("namespace") or "").strip() version = str(schema.get("version") or row.get("version") or "").strip() if not project_id: return None schema_dir = ROOT_DIR / "schema搭建" / project_id version_slug = version.replace(".", "_") candidates = [ schema_dir / f"{namespace}_schema.current.dsl.md", schema_dir / f"{namespace}_schema.v{version_slug}.dsl.md", schema_dir / f"{project_id}_schema.current.dsl.md", schema_dir / f"{project_id}_schema.v{version_slug}.dsl.md", ] candidates.extend(sorted(schema_dir.glob("*.current.dsl.md"))) candidates.extend(sorted(schema_dir.glob(f"*v{version_slug}*.dsl.md"))) candidates.extend(sorted(schema_dir.glob("*.dsl.md"))) seen: set[Path] = set() for path in candidates: if path in seen: continue seen.add(path) if path.exists() and path.is_file(): return path.read_text(encoding="utf-8") return None def _attach_schema_source(row: Any) -> dict[str, Any]: data = dict(row) schema = _as_dict(data.get("schema_jsonb")) data["schema_source"] = _schema_source_from_file(data, schema) or _schema_source_from_json(schema) data["schema_source_format"] = "dsl" return data @router.get("/projects") async def list_projects(_user: CurrentUser): async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""SELECT * FROM {settings.db_schema}.projects WHERE status <> 'archived' ORDER BY CASE project_id WHEN 'CityGraph-new2' THEN 1 WHEN 'travel_agency' THEN 2 ELSE 99 END, created_at DESC""" ) return await cur.fetchall() @router.post("/projects") async def create_project(body: dict, _user: CurrentUser): s = settings.db_schema project_id = str(body.get("project_id") or "").strip() if not project_id: raise HTTPException(400, "project_id required") tenant_id = str(body.get("tenant_id") or project_id).strip() display_name = str(body.get("display_name") or body.get("name") or project_id).strip() async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""INSERT INTO {s}.projects ( tenant_id, project_id, display_name, description, status, default_namespace, metadata_jsonb, created_by, updated_at ) VALUES ( %(tenant_id)s, %(project_id)s, %(display_name)s, %(description)s, %(status)s, %(default_namespace)s, %(metadata_jsonb)s, %(created_by)s, now() ) ON CONFLICT (tenant_id, project_id) DO UPDATE SET tenant_id=EXCLUDED.tenant_id, display_name=EXCLUDED.display_name, description=EXCLUDED.description, status=EXCLUDED.status, default_namespace=EXCLUDED.default_namespace, metadata_jsonb=EXCLUDED.metadata_jsonb, updated_at=now() RETURNING *""", { "project_id": project_id, "tenant_id": tenant_id, "display_name": display_name, "description": body.get("description"), "status": body.get("status") or "active", "default_namespace": body.get("default_namespace") or project_id, "metadata_jsonb": body.get("metadata_jsonb") or {}, "created_by": _user["username"], }, ) row = await cur.fetchone() await conn.commit() return row @router.get("/projects/{project_id}/graph-releases") async def list_graph_releases(project_id: str, _user: CurrentUser): async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"SELECT * FROM {settings.db_schema}.graph_releases " "WHERE project_id=%s AND status <> 'archived' " "ORDER BY " "CASE WHEN status='active' THEN 1 WHEN status='published' THEN 2 ELSE 99 END, " "created_at DESC", (project_id,), ) return await cur.fetchall() @router.post("/projects/{project_id}/graph-releases") async def create_graph_release(project_id: str, body: dict, _user: CurrentUser): s = settings.db_schema graph_name = str(body.get("graph_name") or f"{project_id}_graph").strip() alias = str(body.get("alias") or "active").strip() tenant_id = str(body.get("tenant_id") or "").strip() status = str(body.get("status") or "active").strip() async with get_conn() as conn: async with conn.cursor() as cur: if not tenant_id: await cur.execute( f"SELECT tenant_id FROM {s}.projects WHERE project_id=%s LIMIT 1", (project_id,), ) project = await cur.fetchone() tenant_id = project["tenant_id"] if project else project_id await cur.execute( f"""INSERT INTO {s}.graph_releases ( tenant_id, project_id, graph_release_id, graph_name, alias, status, metadata_jsonb, created_by, activated_at, updated_at ) VALUES ( %(tenant_id)s, %(project_id)s, %(graph_release_id)s, %(graph_name)s, %(alias)s, %(status)s, %(metadata_jsonb)s, %(created_by)s, now(), now() ) ON CONFLICT (tenant_id, project_id, alias) DO UPDATE SET graph_release_id=EXCLUDED.graph_release_id, graph_name=EXCLUDED.graph_name, status=EXCLUDED.status, metadata_jsonb=EXCLUDED.metadata_jsonb, activated_at=now(), updated_at=now() RETURNING *""", { "tenant_id": tenant_id, "project_id": project_id, "graph_release_id": body.get("graph_release_id") or f"{project_id}_{alias}", "graph_name": graph_name, "alias": alias, "status": status, "metadata_jsonb": body.get("metadata_jsonb") or {}, "created_by": _user["username"], }, ) row = await cur.fetchone() await conn.commit() return row @router.get("/ontology-schemas") async def list_ontology_schemas( _user: CurrentUser, ctx: ProjectContext = Depends(get_project_context), ): """List schema versions for the current project context.""" async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""SELECT os.id, os.tenant_id, os.project_id, os.namespace, os.version, os.display_name, os.description, os.status, os.schema_jsonb, os.published_at, os.created_at, os.updated_at, gr.graph_release_id AS active_graph_release_id, gr.graph_name AS active_graph_name FROM {settings.db_schema}.ontology_schemas os LEFT JOIN {settings.db_schema}.graph_releases gr ON gr.schema_id=os.id AND gr.tenant_id=os.tenant_id AND gr.project_id=os.project_id AND gr.alias='active' AND gr.status <> 'archived' WHERE os.tenant_id=%s AND os.project_id=%s AND os.status <> 'archived' ORDER BY CASE WHEN os.status='active' THEN 1 ELSE 99 END, os.version DESC, os.updated_at DESC""", (ctx.tenant_id, ctx.project_id), ) rows = await cur.fetchall() return [_attach_schema_source(row) for row in rows] @router.get("/ontology-schemas/current") async def get_current_ontology_schema( _user: CurrentUser, ctx: ProjectContext = Depends(get_project_context), ): """Return the active graph release schema for the current project.""" async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""SELECT os.*, gr.graph_release_id AS active_graph_release_id, gr.graph_name AS active_graph_name, gr.alias AS active_graph_alias FROM {settings.db_schema}.graph_releases gr JOIN {settings.db_schema}.ontology_schemas os ON os.id=gr.schema_id WHERE gr.tenant_id=%s AND gr.project_id=%s AND gr.alias='active' AND gr.status <> 'archived' AND os.status <> 'archived' ORDER BY gr.activated_at DESC NULLS LAST, gr.updated_at DESC LIMIT 1""", (ctx.tenant_id, ctx.project_id), ) row = await cur.fetchone() if row: return _attach_schema_source(row) await cur.execute( f"""SELECT os.*, NULL::text AS active_graph_release_id, NULL::text AS active_graph_name, NULL::text AS active_graph_alias FROM {settings.db_schema}.ontology_schemas os WHERE os.tenant_id=%s AND os.project_id=%s AND os.status <> 'archived' ORDER BY CASE WHEN os.status='active' THEN 1 ELSE 99 END, os.version DESC, os.updated_at DESC LIMIT 1""", (ctx.tenant_id, ctx.project_id), ) row = await cur.fetchone() if not row: raise HTTPException(404, "No ontology schema for current project") return _attach_schema_source(row) @router.get("/ontology-schemas/{schema_id}") async def get_ontology_schema( schema_id: int, _user: CurrentUser, ctx: ProjectContext = Depends(get_project_context), ): """Return one schema version for the current project context.""" async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""SELECT os.*, gr.graph_release_id AS active_graph_release_id, gr.graph_name AS active_graph_name, gr.alias AS active_graph_alias FROM {settings.db_schema}.ontology_schemas os LEFT JOIN {settings.db_schema}.graph_releases gr ON gr.schema_id=os.id AND gr.tenant_id=os.tenant_id AND gr.project_id=os.project_id AND gr.alias='active' AND gr.status <> 'archived' WHERE os.id=%s AND os.tenant_id=%s AND os.project_id=%s AND os.status <> 'archived' LIMIT 1""", (schema_id, ctx.tenant_id, ctx.project_id), ) row = await cur.fetchone() if not row: raise HTTPException(404, "Schema not found for current project") return _attach_schema_source(row) @router.get("/health") async def health(): return {"status": "ok"}