Files
bxh/app/api/inventory.py

111 lines
4.2 KiB
Python

"""STEP 04 — Inventory Issues (知识盘点)."""
from fastapi import APIRouter, Depends, HTTPException
from app.auth import CurrentUser
from app.config import settings
from app.contracts import IssueResolve
from app.db import (
list_inventory_issues,
create_inventory_issue,
resolve_inventory_issue,
get_conn,
)
from app.project_context import ProjectContext, get_project_context
router = APIRouter()
@router.get("/inventory/issues")
async def _list(
status: str | None = None,
severity: str | None = None,
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
return await list_inventory_issues(
context.tenant_id, context.project_id, status, severity
)
@router.post("/inventory/scan")
async def scan(
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
"""Run a daily inventory scan — detect PG-side data quality issues."""
s = settings.db_schema
tenant = context.tenant_id
project = context.project_id
issues_created = []
async with get_conn() as conn:
async with conn.cursor() as cur:
# Check for orphan nodes (candidates that point to non-existent batches)
await cur.execute(
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities ce "
"LEFT JOIN {s}.import_batches ib ON ce.batch_id=ib.batch_id "
"WHERE ce.tenant_id=%s AND ce.project_id=%s AND ce.batch_id IS NOT NULL AND ib.batch_id IS NULL",
(tenant, project),
)
orphan_count = (await cur.fetchone())["cnt"]
if orphan_count > 0:
issue = await create_inventory_issue({
"tenant_id": tenant,
"project_id": project,
"issue_type": "orphan_node",
"severity": "warning",
"description": f"Found {orphan_count} candidate entities referencing non-existent batches",
})
issues_created.append(issue)
# Check for stale data (not updated in 180 days)
await cur.execute(
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities "
"WHERE tenant_id=%s AND project_id=%s AND status='published' "
"AND updated_at < now() - INTERVAL '180 days'",
(tenant, project),
)
stale_count = (await cur.fetchone())["cnt"]
if stale_count > 0:
issue = await create_inventory_issue({
"tenant_id": tenant,
"project_id": project,
"issue_type": "stale_data",
"severity": "info",
"description": f"Found {stale_count} entities not updated in 180+ days",
})
issues_created.append(issue)
# Check for missing required fields (empty core fields in published entities)
await cur.execute(
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities "
"WHERE tenant_id=%s AND project_id=%s AND status='published' "
"AND (payload->>'name' IS NULL OR payload->>'name'='')",
(tenant, project),
)
missing_name = (await cur.fetchone())["cnt"]
if missing_name > 0:
issue = await create_inventory_issue({
"tenant_id": tenant,
"project_id": project,
"issue_type": "missing_field",
"severity": "blocker",
"target_field": "name",
"description": f"Found {missing_name} published entities with empty name",
})
issues_created.append(issue)
return {"scanned": True, "issues_created": len(issues_created), "issues": issues_created}
@router.post("/inventory/issues/{issue_id}/resolve")
async def _resolve(issue_id: int, body: IssueResolve | None = None, user: CurrentUser = None):
row = await resolve_inventory_issue(
issue_id,
resolved_by=user["username"] if user else "system",
note=body.resolution_note if body else None,
)
if not row:
raise HTTPException(404, "Issue not found")
return row