381 lines
16 KiB
Python
381 lines
16 KiB
Python
"""Projects, Graph Releases, and Ontology Schema endpoints."""
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
|
|
from app.auth import CurrentUser
|
|
from app.config import settings
|
|
from app.db import get_conn
|
|
from app.project_context import ProjectContext, get_project_context
|
|
|
|
router = APIRouter()
|
|
ROOT_DIR = Path(__file__).resolve().parents[2]
|
|
|
|
|
|
def _as_dict(value: Any) -> dict[str, Any]:
|
|
if isinstance(value, dict):
|
|
return value
|
|
if isinstance(value, str):
|
|
try:
|
|
parsed = json.loads(value)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _named_items(value: Any) -> list[tuple[str, dict[str, Any]]]:
|
|
if isinstance(value, dict):
|
|
return [(str(name), _as_dict(meta)) for name, meta in value.items()]
|
|
if isinstance(value, list):
|
|
items: list[tuple[str, dict[str, Any]]] = []
|
|
for index, item in enumerate(value):
|
|
meta = _as_dict(item)
|
|
name = str(meta.get("name") or meta.get("entity_type") or meta.get("relation_type") or f"Item{index + 1}")
|
|
items.append((name, meta))
|
|
return items
|
|
return []
|
|
|
|
|
|
def _field_names(value: Any) -> list[str]:
|
|
if isinstance(value, dict):
|
|
return [str(key) for key in value.keys()]
|
|
if isinstance(value, list):
|
|
return [str(item) for item in value]
|
|
return []
|
|
|
|
|
|
def _schema_source_from_json(schema: dict[str, Any]) -> str:
|
|
namespace = str(schema.get("namespace") or "schema")
|
|
version = str(schema.get("version") or "")
|
|
lines = [f"namespace {namespace}"]
|
|
if version:
|
|
lines.append(f"version {version}")
|
|
purpose = str(schema.get("purpose") or "").strip()
|
|
if purpose:
|
|
lines.extend(["", f"// {purpose}"])
|
|
|
|
for name, meta in _named_items(schema.get("entity_types")):
|
|
definition = str(meta.get("definition") or meta.get("description") or "").strip()
|
|
lines.extend(["", f"entity {name}" + (f" // {definition}" if definition else "")])
|
|
primary_key = str(meta.get("primary_key") or meta.get("primaryKey") or "").strip()
|
|
if primary_key:
|
|
lines.append(f" primary_key {primary_key}")
|
|
for field in _field_names(meta.get("fields")):
|
|
lines.append(f" property {field}: Text")
|
|
|
|
for name, meta in _named_items(schema.get("relation_types")):
|
|
source = str(meta.get("from") or meta.get("source") or "").strip()
|
|
target = str(meta.get("to") or meta.get("target") or "").strip()
|
|
definition = str(meta.get("definition") or meta.get("description") or "").strip()
|
|
arrow = f": {source} -> {target}" if source or target else ""
|
|
lines.extend(["", f"relation {name}{arrow}" + (f" // {definition}" if definition else "")])
|
|
for prop in _field_names(meta.get("properties")):
|
|
lines.append(f" property {prop}: Text")
|
|
|
|
enums = _as_dict(schema.get("enums"))
|
|
if enums:
|
|
lines.append("")
|
|
for name, values in enums.items():
|
|
lines.append(f"enum {name} = {' | '.join(_field_names(values))}")
|
|
|
|
recipes = _as_dict(schema.get("query_recipes"))
|
|
if recipes:
|
|
lines.append("")
|
|
lines.append("// query_recipes")
|
|
for name, query in recipes.items():
|
|
lines.append(f"// {name}: {query}")
|
|
return "\n".join(lines).strip() + "\n"
|
|
|
|
|
|
def _schema_source_from_file(row: dict[str, Any], schema: dict[str, Any]) -> str | None:
|
|
project_id = str(row.get("project_id") or "").strip()
|
|
namespace = str(row.get("namespace") or schema.get("namespace") or "").strip()
|
|
version = str(schema.get("version") or row.get("version") or "").strip()
|
|
if not project_id:
|
|
return None
|
|
schema_dir = ROOT_DIR / "schema搭建" / project_id
|
|
version_slug = version.replace(".", "_")
|
|
candidates = [
|
|
schema_dir / f"{namespace}_schema.current.dsl.md",
|
|
schema_dir / f"{namespace}_schema.v{version_slug}.dsl.md",
|
|
schema_dir / f"{project_id}_schema.current.dsl.md",
|
|
schema_dir / f"{project_id}_schema.v{version_slug}.dsl.md",
|
|
]
|
|
candidates.extend(sorted(schema_dir.glob("*.current.dsl.md")))
|
|
candidates.extend(sorted(schema_dir.glob(f"*v{version_slug}*.dsl.md")))
|
|
candidates.extend(sorted(schema_dir.glob("*.dsl.md")))
|
|
seen: set[Path] = set()
|
|
for path in candidates:
|
|
if path in seen:
|
|
continue
|
|
seen.add(path)
|
|
if path.exists() and path.is_file():
|
|
return path.read_text(encoding="utf-8")
|
|
return None
|
|
|
|
|
|
def _attach_schema_source(row: Any) -> dict[str, Any]:
|
|
data = dict(row)
|
|
schema = _as_dict(data.get("schema_jsonb"))
|
|
data["schema_source"] = _schema_source_from_file(data, schema) or _schema_source_from_json(schema)
|
|
data["schema_source_format"] = "dsl"
|
|
return data
|
|
|
|
|
|
@router.get("/projects")
|
|
async def list_projects(_user: CurrentUser):
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT * FROM {settings.db_schema}.projects
|
|
WHERE status <> 'archived'
|
|
ORDER BY
|
|
CASE project_id
|
|
WHEN 'CityGraph-new2' THEN 1
|
|
WHEN 'travel_agency' THEN 2
|
|
ELSE 99
|
|
END,
|
|
created_at DESC"""
|
|
)
|
|
return await cur.fetchall()
|
|
|
|
|
|
@router.post("/projects")
|
|
async def create_project(body: dict, _user: CurrentUser):
|
|
s = settings.db_schema
|
|
project_id = str(body.get("project_id") or "").strip()
|
|
if not project_id:
|
|
raise HTTPException(400, "project_id required")
|
|
tenant_id = str(body.get("tenant_id") or project_id).strip()
|
|
display_name = str(body.get("display_name") or body.get("name") or project_id).strip()
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""INSERT INTO {s}.projects (
|
|
tenant_id, project_id, display_name, description, status,
|
|
default_namespace, metadata_jsonb, created_by, updated_at
|
|
)
|
|
VALUES (
|
|
%(tenant_id)s, %(project_id)s, %(display_name)s, %(description)s, %(status)s,
|
|
%(default_namespace)s, %(metadata_jsonb)s, %(created_by)s, now()
|
|
)
|
|
ON CONFLICT (tenant_id, project_id) DO UPDATE
|
|
SET tenant_id=EXCLUDED.tenant_id,
|
|
display_name=EXCLUDED.display_name,
|
|
description=EXCLUDED.description,
|
|
status=EXCLUDED.status,
|
|
default_namespace=EXCLUDED.default_namespace,
|
|
metadata_jsonb=EXCLUDED.metadata_jsonb,
|
|
updated_at=now()
|
|
RETURNING *""",
|
|
{
|
|
"project_id": project_id,
|
|
"tenant_id": tenant_id,
|
|
"display_name": display_name,
|
|
"description": body.get("description"),
|
|
"status": body.get("status") or "active",
|
|
"default_namespace": body.get("default_namespace") or project_id,
|
|
"metadata_jsonb": body.get("metadata_jsonb") or {},
|
|
"created_by": _user["username"],
|
|
},
|
|
)
|
|
row = await cur.fetchone()
|
|
await conn.commit()
|
|
return row
|
|
|
|
|
|
@router.get("/projects/{project_id}/graph-releases")
|
|
async def list_graph_releases(project_id: str, _user: CurrentUser):
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"SELECT * FROM {settings.db_schema}.graph_releases "
|
|
"WHERE project_id=%s AND status <> 'archived' "
|
|
"ORDER BY "
|
|
"CASE WHEN status='active' THEN 1 WHEN status='published' THEN 2 ELSE 99 END, "
|
|
"created_at DESC",
|
|
(project_id,),
|
|
)
|
|
return await cur.fetchall()
|
|
|
|
|
|
@router.post("/projects/{project_id}/graph-releases")
|
|
async def create_graph_release(project_id: str, body: dict, _user: CurrentUser):
|
|
s = settings.db_schema
|
|
graph_name = str(body.get("graph_name") or f"{project_id}_graph").strip()
|
|
alias = str(body.get("alias") or "active").strip()
|
|
tenant_id = str(body.get("tenant_id") or "").strip()
|
|
status = str(body.get("status") or "active").strip()
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
if not tenant_id:
|
|
await cur.execute(
|
|
f"SELECT tenant_id FROM {s}.projects WHERE project_id=%s LIMIT 1",
|
|
(project_id,),
|
|
)
|
|
project = await cur.fetchone()
|
|
tenant_id = project["tenant_id"] if project else project_id
|
|
await cur.execute(
|
|
f"""INSERT INTO {s}.graph_releases (
|
|
tenant_id, project_id, graph_release_id, graph_name, alias, status,
|
|
metadata_jsonb, created_by, activated_at, updated_at
|
|
)
|
|
VALUES (
|
|
%(tenant_id)s, %(project_id)s, %(graph_release_id)s, %(graph_name)s, %(alias)s, %(status)s,
|
|
%(metadata_jsonb)s, %(created_by)s, now(), now()
|
|
)
|
|
ON CONFLICT (tenant_id, project_id, alias) DO UPDATE
|
|
SET graph_release_id=EXCLUDED.graph_release_id,
|
|
graph_name=EXCLUDED.graph_name,
|
|
status=EXCLUDED.status,
|
|
metadata_jsonb=EXCLUDED.metadata_jsonb,
|
|
activated_at=now(),
|
|
updated_at=now()
|
|
RETURNING *""",
|
|
{
|
|
"tenant_id": tenant_id,
|
|
"project_id": project_id,
|
|
"graph_release_id": body.get("graph_release_id") or f"{project_id}_{alias}",
|
|
"graph_name": graph_name,
|
|
"alias": alias,
|
|
"status": status,
|
|
"metadata_jsonb": body.get("metadata_jsonb") or {},
|
|
"created_by": _user["username"],
|
|
},
|
|
)
|
|
row = await cur.fetchone()
|
|
await conn.commit()
|
|
return row
|
|
|
|
|
|
@router.get("/ontology-schemas")
|
|
async def list_ontology_schemas(
|
|
_user: CurrentUser,
|
|
ctx: ProjectContext = Depends(get_project_context),
|
|
):
|
|
"""List schema versions for the current project context."""
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT
|
|
os.id, os.tenant_id, os.project_id, os.namespace, os.version,
|
|
os.display_name, os.description, os.status,
|
|
os.schema_jsonb,
|
|
os.published_at, os.created_at, os.updated_at,
|
|
gr.graph_release_id AS active_graph_release_id,
|
|
gr.graph_name AS active_graph_name
|
|
FROM {settings.db_schema}.ontology_schemas os
|
|
LEFT JOIN {settings.db_schema}.graph_releases gr
|
|
ON gr.schema_id=os.id
|
|
AND gr.tenant_id=os.tenant_id
|
|
AND gr.project_id=os.project_id
|
|
AND gr.alias='active'
|
|
AND gr.status <> 'archived'
|
|
WHERE os.tenant_id=%s
|
|
AND os.project_id=%s
|
|
AND os.status <> 'archived'
|
|
ORDER BY
|
|
CASE WHEN os.status='active' THEN 1 ELSE 99 END,
|
|
os.version DESC,
|
|
os.updated_at DESC""",
|
|
(ctx.tenant_id, ctx.project_id),
|
|
)
|
|
rows = await cur.fetchall()
|
|
return [_attach_schema_source(row) for row in rows]
|
|
|
|
|
|
@router.get("/ontology-schemas/current")
|
|
async def get_current_ontology_schema(
|
|
_user: CurrentUser,
|
|
ctx: ProjectContext = Depends(get_project_context),
|
|
):
|
|
"""Return the active graph release schema for the current project."""
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT
|
|
os.*,
|
|
gr.graph_release_id AS active_graph_release_id,
|
|
gr.graph_name AS active_graph_name,
|
|
gr.alias AS active_graph_alias
|
|
FROM {settings.db_schema}.graph_releases gr
|
|
JOIN {settings.db_schema}.ontology_schemas os
|
|
ON os.id=gr.schema_id
|
|
WHERE gr.tenant_id=%s
|
|
AND gr.project_id=%s
|
|
AND gr.alias='active'
|
|
AND gr.status <> 'archived'
|
|
AND os.status <> 'archived'
|
|
ORDER BY gr.activated_at DESC NULLS LAST, gr.updated_at DESC
|
|
LIMIT 1""",
|
|
(ctx.tenant_id, ctx.project_id),
|
|
)
|
|
row = await cur.fetchone()
|
|
if row:
|
|
return _attach_schema_source(row)
|
|
|
|
await cur.execute(
|
|
f"""SELECT
|
|
os.*,
|
|
NULL::text AS active_graph_release_id,
|
|
NULL::text AS active_graph_name,
|
|
NULL::text AS active_graph_alias
|
|
FROM {settings.db_schema}.ontology_schemas os
|
|
WHERE os.tenant_id=%s
|
|
AND os.project_id=%s
|
|
AND os.status <> 'archived'
|
|
ORDER BY
|
|
CASE WHEN os.status='active' THEN 1 ELSE 99 END,
|
|
os.version DESC,
|
|
os.updated_at DESC
|
|
LIMIT 1""",
|
|
(ctx.tenant_id, ctx.project_id),
|
|
)
|
|
row = await cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "No ontology schema for current project")
|
|
return _attach_schema_source(row)
|
|
|
|
|
|
@router.get("/ontology-schemas/{schema_id}")
|
|
async def get_ontology_schema(
|
|
schema_id: int,
|
|
_user: CurrentUser,
|
|
ctx: ProjectContext = Depends(get_project_context),
|
|
):
|
|
"""Return one schema version for the current project context."""
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT
|
|
os.*,
|
|
gr.graph_release_id AS active_graph_release_id,
|
|
gr.graph_name AS active_graph_name,
|
|
gr.alias AS active_graph_alias
|
|
FROM {settings.db_schema}.ontology_schemas os
|
|
LEFT JOIN {settings.db_schema}.graph_releases gr
|
|
ON gr.schema_id=os.id
|
|
AND gr.tenant_id=os.tenant_id
|
|
AND gr.project_id=os.project_id
|
|
AND gr.alias='active'
|
|
AND gr.status <> 'archived'
|
|
WHERE os.id=%s
|
|
AND os.tenant_id=%s
|
|
AND os.project_id=%s
|
|
AND os.status <> 'archived'
|
|
LIMIT 1""",
|
|
(schema_id, ctx.tenant_id, ctx.project_id),
|
|
)
|
|
row = await cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Schema not found for current project")
|
|
return _attach_schema_source(row)
|
|
|
|
|
|
@router.get("/health")
|
|
async def health():
|
|
return {"status": "ok"}
|