111 lines
4.2 KiB
Python
111 lines
4.2 KiB
Python
"""STEP 04 — Inventory Issues (知识盘点)."""
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
|
|
from app.auth import CurrentUser
|
|
from app.config import settings
|
|
from app.contracts import IssueResolve
|
|
from app.db import (
|
|
list_inventory_issues,
|
|
create_inventory_issue,
|
|
resolve_inventory_issue,
|
|
get_conn,
|
|
)
|
|
from app.project_context import ProjectContext, get_project_context
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/inventory/issues")
|
|
async def _list(
|
|
status: str | None = None,
|
|
severity: str | None = None,
|
|
context: ProjectContext = Depends(get_project_context),
|
|
_user: CurrentUser = None,
|
|
):
|
|
return await list_inventory_issues(
|
|
context.tenant_id, context.project_id, status, severity
|
|
)
|
|
|
|
|
|
@router.post("/inventory/scan")
|
|
async def scan(
|
|
context: ProjectContext = Depends(get_project_context),
|
|
_user: CurrentUser = None,
|
|
):
|
|
"""Run a daily inventory scan — detect PG-side data quality issues."""
|
|
s = settings.db_schema
|
|
tenant = context.tenant_id
|
|
project = context.project_id
|
|
issues_created = []
|
|
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
# Check for orphan nodes (candidates that point to non-existent batches)
|
|
await cur.execute(
|
|
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities ce "
|
|
"LEFT JOIN {s}.import_batches ib ON ce.batch_id=ib.batch_id "
|
|
"WHERE ce.tenant_id=%s AND ce.project_id=%s AND ce.batch_id IS NOT NULL AND ib.batch_id IS NULL",
|
|
(tenant, project),
|
|
)
|
|
orphan_count = (await cur.fetchone())["cnt"]
|
|
if orphan_count > 0:
|
|
issue = await create_inventory_issue({
|
|
"tenant_id": tenant,
|
|
"project_id": project,
|
|
"issue_type": "orphan_node",
|
|
"severity": "warning",
|
|
"description": f"Found {orphan_count} candidate entities referencing non-existent batches",
|
|
})
|
|
issues_created.append(issue)
|
|
|
|
# Check for stale data (not updated in 180 days)
|
|
await cur.execute(
|
|
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities "
|
|
"WHERE tenant_id=%s AND project_id=%s AND status='published' "
|
|
"AND updated_at < now() - INTERVAL '180 days'",
|
|
(tenant, project),
|
|
)
|
|
stale_count = (await cur.fetchone())["cnt"]
|
|
if stale_count > 0:
|
|
issue = await create_inventory_issue({
|
|
"tenant_id": tenant,
|
|
"project_id": project,
|
|
"issue_type": "stale_data",
|
|
"severity": "info",
|
|
"description": f"Found {stale_count} entities not updated in 180+ days",
|
|
})
|
|
issues_created.append(issue)
|
|
|
|
# Check for missing required fields (empty core fields in published entities)
|
|
await cur.execute(
|
|
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities "
|
|
"WHERE tenant_id=%s AND project_id=%s AND status='published' "
|
|
"AND (payload->>'name' IS NULL OR payload->>'name'='')",
|
|
(tenant, project),
|
|
)
|
|
missing_name = (await cur.fetchone())["cnt"]
|
|
if missing_name > 0:
|
|
issue = await create_inventory_issue({
|
|
"tenant_id": tenant,
|
|
"project_id": project,
|
|
"issue_type": "missing_field",
|
|
"severity": "blocker",
|
|
"target_field": "name",
|
|
"description": f"Found {missing_name} published entities with empty name",
|
|
})
|
|
issues_created.append(issue)
|
|
|
|
return {"scanned": True, "issues_created": len(issues_created), "issues": issues_created}
|
|
|
|
|
|
@router.post("/inventory/issues/{issue_id}/resolve")
|
|
async def _resolve(issue_id: int, body: IssueResolve | None = None, user: CurrentUser = None):
|
|
row = await resolve_inventory_issue(
|
|
issue_id,
|
|
resolved_by=user["username"] if user else "system",
|
|
note=body.resolution_note if body else None,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Issue not found")
|
|
return row
|