Files
bxh/app/api/acquisition_tasks.py

252 lines
9.5 KiB
Python

"""STEP 04 — Acquisition Tasks (补藏队列) — work-order state machine + timeline."""
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException
from app.auth import CurrentUser
from app.config import settings
from app.contracts import AcquisitionTaskCreate, TaskFromGap, TaskAssign, TaskComplete
from app.db import (
list_acquisition_tasks,
create_acquisition_task,
get_acquisition_task,
update_acquisition_task,
get_question_trace,
log_task_event,
list_task_events,
)
from app.project_context import ProjectContext, get_project_context
router = APIRouter()
# 工单状态流:待派单→已派单→处理中→待复核→已完成 / 已取消
STATUS_LABELS = {
"pending": "待派单",
"assigned": "已派单",
"in_progress": "处理中",
"pending_review": "待复核",
"done": "已完成",
"cancelled": "已取消",
}
def _sla(task: dict) -> dict:
st = task.get("status")
if st in ("done", "cancelled"):
return {"level": "closed", "label": "已关闭"}
due = task.get("due_at")
if not due:
return {"level": "none", "label": "无时限"}
now = datetime.now(timezone.utc)
if now > due:
return {"level": "overdue", "label": "已超时"}
if now > due - timedelta(hours=24):
return {"level": "soon", "label": "即将超时"}
return {"level": "ok", "label": "正常"}
async def _transition(task: dict, action: str, to_status: str,
actor: str, note: str | None = None,
extra: dict | None = None) -> dict:
data = {"status": to_status}
if extra:
data.update(extra)
updated = await update_acquisition_task(task["id"], data)
await log_task_event(task["id"], action, task.get("status"), to_status, actor, note)
return updated
@router.get("/acquisition-tasks")
async def _list(
status: str | None = None,
limit: int = 50,
offset: int = 0,
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
tasks = await list_acquisition_tasks(
context.tenant_id, context.project_id, status, limit, offset
)
for t in tasks:
t["sla"] = _sla(t)
return tasks
@router.post("/acquisition-tasks")
async def _create(
body: AcquisitionTaskCreate,
user: CurrentUser,
context: ProjectContext = Depends(get_project_context),
):
data = body.model_dump()
data["tenant_id"] = context.tenant_id
data["project_id"] = context.project_id
data["created_by"] = user["username"]
task = await create_acquisition_task(data)
await log_task_event(task["id"], "created", None, task["status"], user["username"], "工单创建")
return task
@router.post("/acquisition-tasks/from-gap")
async def _from_gap(
body: TaskFromGap,
user: CurrentUser,
context: ProjectContext = Depends(get_project_context),
):
"""Create acquisition task from an audit gap trace."""
trace = await get_question_trace(body.trace_id)
if not trace:
raise HTTPException(404, "Trace not found")
import json
scenario_tags = trace.get("scenario_tags")
if isinstance(scenario_tags, str):
scenario_tags = json.loads(scenario_tags)
missing_fields = trace.get("missing_fields")
if isinstance(missing_fields, str):
missing_fields = json.loads(missing_fields)
data = {
"tenant_id": context.tenant_id,
"project_id": context.project_id,
"created_by": user["username"],
"triggered_by_trace_id": body.trace_id,
"title": body.title or f"补藏: {trace['question_text'][:60]}",
"scenario_tags": json.dumps(scenario_tags or []),
"target_entity_types": json.dumps([]),
"target_fields": json.dumps(missing_fields or []),
"priority": body.priority,
}
task = await create_acquisition_task(data)
await log_task_event(task["id"], "created", None, task["status"],
user["username"], "由 AI 稽查缺口生成")
return task
@router.get("/acquisition-tasks/{task_id}")
async def _get(task_id: int, _user: CurrentUser = None):
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
events = await list_task_events(task_id)
# synthetic "创建" entry only when there is no real one — covers
# AI-auto-created tasks that predate event logging without duplicating
if not any(e.get("action") == "created" for e in events):
events = [{
"id": 0,
"task_id": task_id,
"from_status": None,
"to_status": "pending",
"action": "created",
"actor": task.get("created_by") or "系统",
"note": "工单创建",
"created_at": task.get("created_at"),
}] + events
timeline = events
trace = None
if task.get("triggered_by_trace_id"):
tr = await get_question_trace(task["triggered_by_trace_id"])
if tr:
trace = {
"id": tr.get("id"),
"question_text": tr.get("question_text"),
"suggested_action": tr.get("suggested_action"),
"missing_fields": tr.get("missing_fields"),
}
return {**task, "events": timeline, "sla": _sla(task), "trace": trace,
"status_label": STATUS_LABELS.get(task.get("status"), task.get("status"))}
@router.post("/acquisition-tasks/{task_id}/assign")
async def _assign(task_id: int, body: TaskAssign, user: CurrentUser):
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] not in ("pending",):
raise HTTPException(400, f"当前状态「{STATUS_LABELS.get(task['status'])}」不可派单")
return await _transition(
task, "assign", "assigned", user["username"],
note=f"派单给 {body.assignee}",
extra={"assignee": body.assignee, "assigned_at": datetime.now(timezone.utc)},
)
@router.post("/acquisition-tasks/{task_id}/start")
async def _start(task_id: int, user: CurrentUser):
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] not in ("assigned", "in_progress"):
raise HTTPException(400, f"当前状态「{STATUS_LABELS.get(task['status'])}」不可开始")
return await _transition(task, "start", "in_progress", user["username"], "开始处理")
@router.post("/acquisition-tasks/{task_id}/complete")
async def _submit_review(task_id: int, body: TaskComplete | None = None, user: CurrentUser = None):
"""采集员提交复核:处理中 → 待复核。"""
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] not in ("in_progress", "assigned"):
raise HTTPException(400, f"当前状态「{STATUS_LABELS.get(task['status'])}」不可提交复核")
extra = {}
note = "提交复核"
if body and body.result_summary:
extra["result_summary"] = body.result_summary
note = f"提交复核:{body.result_summary[:80]}"
return await _transition(task, "submit_review", "pending_review",
user["username"] if user else "", note, extra)
@router.post("/acquisition-tasks/{task_id}/review-approve")
async def _approve(task_id: int, user: CurrentUser, body: dict | None = None):
"""运营复核通过:待复核 → 已完成。"""
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] != "pending_review":
raise HTTPException(400, "仅「待复核」工单可复核通过")
note = (body or {}).get("note") or "复核通过,工单闭环"
return await _transition(task, "approve", "done", user["username"], note,
{"completed_at": datetime.now(timezone.utc)})
@router.post("/acquisition-tasks/{task_id}/review-reject")
async def _reject(task_id: int, user: CurrentUser, body: dict | None = None):
"""运营复核打回:待复核 → 处理中。"""
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] != "pending_review":
raise HTTPException(400, "仅「待复核」工单可打回")
note = (body or {}).get("note") or "复核未通过,打回重做"
return await _transition(task, "reject", "in_progress", user["username"], note)
@router.post("/acquisition-tasks/{task_id}/urge")
async def _urge(task_id: int, user: CurrentUser, body: dict | None = None):
"""催办:不改状态,仅记录一条催办事件。"""
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] in ("done", "cancelled"):
raise HTTPException(400, "已关闭工单无需催办")
note = (body or {}).get("note") or "催办:请尽快处理"
await log_task_event(task_id, "urge", task["status"], task["status"],
user["username"], note)
return {"ok": True, "task_id": task_id}
@router.post("/acquisition-tasks/{task_id}/cancel")
async def _cancel(task_id: int, user: CurrentUser, body: dict | None = None):
task = await get_acquisition_task(task_id)
if not task:
raise HTTPException(404, "Task not found")
if task["status"] in ("done", "cancelled"):
raise HTTPException(400, "工单已关闭")
note = (body or {}).get("note") or "工单取消"
return await _transition(task, "cancel", "cancelled", user["username"], note)