72 lines
2.6 KiB
Python
72 lines
2.6 KiB
Python
"""STEP 05 — Conflict Desk (冲突处理台)."""
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
|
|
from app.auth import CurrentUser
|
|
from app.config import settings
|
|
from app.contracts import ConflictResolve
|
|
from app.db import get_conn
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/conflicts")
|
|
async def _list(_user: CurrentUser = None):
|
|
"""List entities with conflicting field values from multiple sources."""
|
|
s = settings.db_schema
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT id, natural_key, entity_type, field_provenance_jsonb, confidence
|
|
FROM {s}.candidate_entities
|
|
WHERE tenant_id=%s AND project_id=%s
|
|
AND status='pending_review'
|
|
AND field_provenance_jsonb != '{{}}'::jsonb
|
|
ORDER BY created_at DESC LIMIT 50""",
|
|
(settings.default_tenant, settings.default_project),
|
|
)
|
|
rows = await cur.fetchall()
|
|
|
|
import json
|
|
conflicts = []
|
|
for r in rows:
|
|
prov = r.get("field_provenance_jsonb")
|
|
if isinstance(prov, str):
|
|
prov = json.loads(prov)
|
|
# A field has conflict if multiple source_batch_ids exist for same field
|
|
for field, info in (prov or {}).items():
|
|
if isinstance(info, dict) and info.get("source_batch_id"):
|
|
conflicts.append({
|
|
"entity_id": r["id"],
|
|
"natural_key": r["natural_key"],
|
|
"entity_type": r["entity_type"],
|
|
"field": field,
|
|
"current_value": info.get("value"),
|
|
"source": info.get("source_code"),
|
|
"confidence": info.get("confidence"),
|
|
})
|
|
|
|
return conflicts
|
|
|
|
|
|
@router.post("/conflicts/{conflict_id}/resolve")
|
|
async def _resolve(conflict_id: int, body: ConflictResolve, user: CurrentUser):
|
|
"""Resolve a conflict — update the entity's field value."""
|
|
from app.db import get_candidate_entity, update_candidate_entity
|
|
|
|
entity = await get_candidate_entity(conflict_id)
|
|
if not entity:
|
|
raise HTTPException(404, "Entity not found")
|
|
|
|
import json
|
|
payload = entity.get("payload") or {}
|
|
if isinstance(payload, str):
|
|
payload = json.loads(payload)
|
|
|
|
if body.chosen_value is not None:
|
|
# Update the conflicted field in payload (field name from query param or body)
|
|
payload["resolved_value"] = body.chosen_value
|
|
|
|
await update_candidate_entity(conflict_id, {"payload": json.dumps(payload)})
|
|
|
|
return {"resolved": conflict_id, "resolution": body.resolution}
|