"""STEP 04 — Inventory Issues (知识盘点).""" from fastapi import APIRouter, Depends, HTTPException from app.auth import CurrentUser from app.config import settings from app.contracts import IssueResolve from app.db import ( list_inventory_issues, create_inventory_issue, resolve_inventory_issue, get_conn, ) from app.project_context import ProjectContext, get_project_context router = APIRouter() @router.get("/inventory/issues") async def _list( status: str | None = None, severity: str | None = None, context: ProjectContext = Depends(get_project_context), _user: CurrentUser = None, ): return await list_inventory_issues( context.tenant_id, context.project_id, status, severity ) @router.post("/inventory/scan") async def scan( context: ProjectContext = Depends(get_project_context), _user: CurrentUser = None, ): """Run a daily inventory scan — detect PG-side data quality issues.""" s = settings.db_schema tenant = context.tenant_id project = context.project_id issues_created = [] async with get_conn() as conn: async with conn.cursor() as cur: # Check for orphan nodes (candidates that point to non-existent batches) await cur.execute( f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities ce " "LEFT JOIN {s}.import_batches ib ON ce.batch_id=ib.batch_id " "WHERE ce.tenant_id=%s AND ce.project_id=%s AND ce.batch_id IS NOT NULL AND ib.batch_id IS NULL", (tenant, project), ) orphan_count = (await cur.fetchone())["cnt"] if orphan_count > 0: issue = await create_inventory_issue({ "tenant_id": tenant, "project_id": project, "issue_type": "orphan_node", "severity": "warning", "description": f"Found {orphan_count} candidate entities referencing non-existent batches", }) issues_created.append(issue) # Check for stale data (not updated in 180 days) await cur.execute( f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities " "WHERE tenant_id=%s AND project_id=%s AND status='published' " "AND updated_at < now() - INTERVAL '180 days'", (tenant, project), ) stale_count = (await cur.fetchone())["cnt"] if stale_count > 0: issue = await create_inventory_issue({ "tenant_id": tenant, "project_id": project, "issue_type": "stale_data", "severity": "info", "description": f"Found {stale_count} entities not updated in 180+ days", }) issues_created.append(issue) # Check for missing required fields (empty core fields in published entities) await cur.execute( f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities " "WHERE tenant_id=%s AND project_id=%s AND status='published' " "AND (payload->>'name' IS NULL OR payload->>'name'='')", (tenant, project), ) missing_name = (await cur.fetchone())["cnt"] if missing_name > 0: issue = await create_inventory_issue({ "tenant_id": tenant, "project_id": project, "issue_type": "missing_field", "severity": "blocker", "target_field": "name", "description": f"Found {missing_name} published entities with empty name", }) issues_created.append(issue) return {"scanned": True, "issues_created": len(issues_created), "issues": issues_created} @router.post("/inventory/issues/{issue_id}/resolve") async def _resolve(issue_id: int, body: IssueResolve | None = None, user: CurrentUser = None): row = await resolve_inventory_issue( issue_id, resolved_by=user["username"] if user else "system", note=body.resolution_note if body else None, ) if not row: raise HTTPException(404, "Issue not found") return row