Files
bxh/app/api/projects.py

381 lines
16 KiB
Python

"""Projects, Graph Releases, and Ontology Schema endpoints."""
import json
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from app.auth import CurrentUser
from app.config import settings
from app.db import get_conn
from app.project_context import ProjectContext, get_project_context
router = APIRouter()
ROOT_DIR = Path(__file__).resolve().parents[2]
def _as_dict(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except json.JSONDecodeError:
return {}
return {}
def _named_items(value: Any) -> list[tuple[str, dict[str, Any]]]:
if isinstance(value, dict):
return [(str(name), _as_dict(meta)) for name, meta in value.items()]
if isinstance(value, list):
items: list[tuple[str, dict[str, Any]]] = []
for index, item in enumerate(value):
meta = _as_dict(item)
name = str(meta.get("name") or meta.get("entity_type") or meta.get("relation_type") or f"Item{index + 1}")
items.append((name, meta))
return items
return []
def _field_names(value: Any) -> list[str]:
if isinstance(value, dict):
return [str(key) for key in value.keys()]
if isinstance(value, list):
return [str(item) for item in value]
return []
def _schema_source_from_json(schema: dict[str, Any]) -> str:
namespace = str(schema.get("namespace") or "schema")
version = str(schema.get("version") or "")
lines = [f"namespace {namespace}"]
if version:
lines.append(f"version {version}")
purpose = str(schema.get("purpose") or "").strip()
if purpose:
lines.extend(["", f"// {purpose}"])
for name, meta in _named_items(schema.get("entity_types")):
definition = str(meta.get("definition") or meta.get("description") or "").strip()
lines.extend(["", f"entity {name}" + (f" // {definition}" if definition else "")])
primary_key = str(meta.get("primary_key") or meta.get("primaryKey") or "").strip()
if primary_key:
lines.append(f" primary_key {primary_key}")
for field in _field_names(meta.get("fields")):
lines.append(f" property {field}: Text")
for name, meta in _named_items(schema.get("relation_types")):
source = str(meta.get("from") or meta.get("source") or "").strip()
target = str(meta.get("to") or meta.get("target") or "").strip()
definition = str(meta.get("definition") or meta.get("description") or "").strip()
arrow = f": {source} -> {target}" if source or target else ""
lines.extend(["", f"relation {name}{arrow}" + (f" // {definition}" if definition else "")])
for prop in _field_names(meta.get("properties")):
lines.append(f" property {prop}: Text")
enums = _as_dict(schema.get("enums"))
if enums:
lines.append("")
for name, values in enums.items():
lines.append(f"enum {name} = {' | '.join(_field_names(values))}")
recipes = _as_dict(schema.get("query_recipes"))
if recipes:
lines.append("")
lines.append("// query_recipes")
for name, query in recipes.items():
lines.append(f"// {name}: {query}")
return "\n".join(lines).strip() + "\n"
def _schema_source_from_file(row: dict[str, Any], schema: dict[str, Any]) -> str | None:
project_id = str(row.get("project_id") or "").strip()
namespace = str(row.get("namespace") or schema.get("namespace") or "").strip()
version = str(schema.get("version") or row.get("version") or "").strip()
if not project_id:
return None
schema_dir = ROOT_DIR / "schema搭建" / project_id
version_slug = version.replace(".", "_")
candidates = [
schema_dir / f"{namespace}_schema.current.dsl.md",
schema_dir / f"{namespace}_schema.v{version_slug}.dsl.md",
schema_dir / f"{project_id}_schema.current.dsl.md",
schema_dir / f"{project_id}_schema.v{version_slug}.dsl.md",
]
candidates.extend(sorted(schema_dir.glob("*.current.dsl.md")))
candidates.extend(sorted(schema_dir.glob(f"*v{version_slug}*.dsl.md")))
candidates.extend(sorted(schema_dir.glob("*.dsl.md")))
seen: set[Path] = set()
for path in candidates:
if path in seen:
continue
seen.add(path)
if path.exists() and path.is_file():
return path.read_text(encoding="utf-8")
return None
def _attach_schema_source(row: Any) -> dict[str, Any]:
data = dict(row)
schema = _as_dict(data.get("schema_jsonb"))
data["schema_source"] = _schema_source_from_file(data, schema) or _schema_source_from_json(schema)
data["schema_source_format"] = "dsl"
return data
@router.get("/projects")
async def list_projects(_user: CurrentUser):
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT * FROM {settings.db_schema}.projects
WHERE status <> 'archived'
ORDER BY
CASE project_id
WHEN 'CityGraph-new2' THEN 1
WHEN 'travel_agency' THEN 2
ELSE 99
END,
created_at DESC"""
)
return await cur.fetchall()
@router.post("/projects")
async def create_project(body: dict, _user: CurrentUser):
s = settings.db_schema
project_id = str(body.get("project_id") or "").strip()
if not project_id:
raise HTTPException(400, "project_id required")
tenant_id = str(body.get("tenant_id") or project_id).strip()
display_name = str(body.get("display_name") or body.get("name") or project_id).strip()
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""INSERT INTO {s}.projects (
tenant_id, project_id, display_name, description, status,
default_namespace, metadata_jsonb, created_by, updated_at
)
VALUES (
%(tenant_id)s, %(project_id)s, %(display_name)s, %(description)s, %(status)s,
%(default_namespace)s, %(metadata_jsonb)s, %(created_by)s, now()
)
ON CONFLICT (tenant_id, project_id) DO UPDATE
SET tenant_id=EXCLUDED.tenant_id,
display_name=EXCLUDED.display_name,
description=EXCLUDED.description,
status=EXCLUDED.status,
default_namespace=EXCLUDED.default_namespace,
metadata_jsonb=EXCLUDED.metadata_jsonb,
updated_at=now()
RETURNING *""",
{
"project_id": project_id,
"tenant_id": tenant_id,
"display_name": display_name,
"description": body.get("description"),
"status": body.get("status") or "active",
"default_namespace": body.get("default_namespace") or project_id,
"metadata_jsonb": body.get("metadata_jsonb") or {},
"created_by": _user["username"],
},
)
row = await cur.fetchone()
await conn.commit()
return row
@router.get("/projects/{project_id}/graph-releases")
async def list_graph_releases(project_id: str, _user: CurrentUser):
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"SELECT * FROM {settings.db_schema}.graph_releases "
"WHERE project_id=%s AND status <> 'archived' "
"ORDER BY "
"CASE WHEN status='active' THEN 1 WHEN status='published' THEN 2 ELSE 99 END, "
"created_at DESC",
(project_id,),
)
return await cur.fetchall()
@router.post("/projects/{project_id}/graph-releases")
async def create_graph_release(project_id: str, body: dict, _user: CurrentUser):
s = settings.db_schema
graph_name = str(body.get("graph_name") or f"{project_id}_graph").strip()
alias = str(body.get("alias") or "active").strip()
tenant_id = str(body.get("tenant_id") or "").strip()
status = str(body.get("status") or "active").strip()
async with get_conn() as conn:
async with conn.cursor() as cur:
if not tenant_id:
await cur.execute(
f"SELECT tenant_id FROM {s}.projects WHERE project_id=%s LIMIT 1",
(project_id,),
)
project = await cur.fetchone()
tenant_id = project["tenant_id"] if project else project_id
await cur.execute(
f"""INSERT INTO {s}.graph_releases (
tenant_id, project_id, graph_release_id, graph_name, alias, status,
metadata_jsonb, created_by, activated_at, updated_at
)
VALUES (
%(tenant_id)s, %(project_id)s, %(graph_release_id)s, %(graph_name)s, %(alias)s, %(status)s,
%(metadata_jsonb)s, %(created_by)s, now(), now()
)
ON CONFLICT (tenant_id, project_id, alias) DO UPDATE
SET graph_release_id=EXCLUDED.graph_release_id,
graph_name=EXCLUDED.graph_name,
status=EXCLUDED.status,
metadata_jsonb=EXCLUDED.metadata_jsonb,
activated_at=now(),
updated_at=now()
RETURNING *""",
{
"tenant_id": tenant_id,
"project_id": project_id,
"graph_release_id": body.get("graph_release_id") or f"{project_id}_{alias}",
"graph_name": graph_name,
"alias": alias,
"status": status,
"metadata_jsonb": body.get("metadata_jsonb") or {},
"created_by": _user["username"],
},
)
row = await cur.fetchone()
await conn.commit()
return row
@router.get("/ontology-schemas")
async def list_ontology_schemas(
_user: CurrentUser,
ctx: ProjectContext = Depends(get_project_context),
):
"""List schema versions for the current project context."""
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT
os.id, os.tenant_id, os.project_id, os.namespace, os.version,
os.display_name, os.description, os.status,
os.schema_jsonb,
os.published_at, os.created_at, os.updated_at,
gr.graph_release_id AS active_graph_release_id,
gr.graph_name AS active_graph_name
FROM {settings.db_schema}.ontology_schemas os
LEFT JOIN {settings.db_schema}.graph_releases gr
ON gr.schema_id=os.id
AND gr.tenant_id=os.tenant_id
AND gr.project_id=os.project_id
AND gr.alias='active'
AND gr.status <> 'archived'
WHERE os.tenant_id=%s
AND os.project_id=%s
AND os.status <> 'archived'
ORDER BY
CASE WHEN os.status='active' THEN 1 ELSE 99 END,
os.version DESC,
os.updated_at DESC""",
(ctx.tenant_id, ctx.project_id),
)
rows = await cur.fetchall()
return [_attach_schema_source(row) for row in rows]
@router.get("/ontology-schemas/current")
async def get_current_ontology_schema(
_user: CurrentUser,
ctx: ProjectContext = Depends(get_project_context),
):
"""Return the active graph release schema for the current project."""
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT
os.*,
gr.graph_release_id AS active_graph_release_id,
gr.graph_name AS active_graph_name,
gr.alias AS active_graph_alias
FROM {settings.db_schema}.graph_releases gr
JOIN {settings.db_schema}.ontology_schemas os
ON os.id=gr.schema_id
WHERE gr.tenant_id=%s
AND gr.project_id=%s
AND gr.alias='active'
AND gr.status <> 'archived'
AND os.status <> 'archived'
ORDER BY gr.activated_at DESC NULLS LAST, gr.updated_at DESC
LIMIT 1""",
(ctx.tenant_id, ctx.project_id),
)
row = await cur.fetchone()
if row:
return _attach_schema_source(row)
await cur.execute(
f"""SELECT
os.*,
NULL::text AS active_graph_release_id,
NULL::text AS active_graph_name,
NULL::text AS active_graph_alias
FROM {settings.db_schema}.ontology_schemas os
WHERE os.tenant_id=%s
AND os.project_id=%s
AND os.status <> 'archived'
ORDER BY
CASE WHEN os.status='active' THEN 1 ELSE 99 END,
os.version DESC,
os.updated_at DESC
LIMIT 1""",
(ctx.tenant_id, ctx.project_id),
)
row = await cur.fetchone()
if not row:
raise HTTPException(404, "No ontology schema for current project")
return _attach_schema_source(row)
@router.get("/ontology-schemas/{schema_id}")
async def get_ontology_schema(
schema_id: int,
_user: CurrentUser,
ctx: ProjectContext = Depends(get_project_context),
):
"""Return one schema version for the current project context."""
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT
os.*,
gr.graph_release_id AS active_graph_release_id,
gr.graph_name AS active_graph_name,
gr.alias AS active_graph_alias
FROM {settings.db_schema}.ontology_schemas os
LEFT JOIN {settings.db_schema}.graph_releases gr
ON gr.schema_id=os.id
AND gr.tenant_id=os.tenant_id
AND gr.project_id=os.project_id
AND gr.alias='active'
AND gr.status <> 'archived'
WHERE os.id=%s
AND os.tenant_id=%s
AND os.project_id=%s
AND os.status <> 'archived'
LIMIT 1""",
(schema_id, ctx.tenant_id, ctx.project_id),
)
row = await cur.fetchone()
if not row:
raise HTTPException(404, "Schema not found for current project")
return _attach_schema_source(row)
@router.get("/health")
async def health():
return {"status": "ok"}