Files
bxh/app/agents/auditor.py

487 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""STEP 02 — Quality Audit Agent (GraphRAG).
For each tourist question we actually retrieve evidence from the FalkorDB
knowledge graph, then let the LLM judge coverage/quality against that real
evidence. Domain is scoped to **Guizhou tourism visitor service**.
Suggested actions: hit / gap / low_quality / conflict.
"""
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass
from typing import Literal
from app.config import settings
from app.db import get_question_trace, update_question_trace, get_agent_settings
from app.llm_client import LlmClient
SYSTEM_PROMPT = """你是「贵州旅游知识图谱」的质量稽查员。该图谱服务于来贵州旅游的游客,
覆盖:地点(Place)、区域(Area贵州行政区)、体验标签(ExperienceTag)、线路模板(RouteTemplate)。
给你一个游客问题,以及【从图谱真实检索到的实体证据】。请只依据这些真实证据判断
图谱能否答好这个问题——检索不到相关实体即为缺失,不要凭常识脑补。
判定动作:
- hit: 命中且关键字段齐全、证据充分
- gap: 图谱缺失相关实体(需要补藏新数据)
- low_quality: 命中但关键字段空缺/证据不足(需要完善纠错)
- conflict: 命中但数据互相矛盾(需要冲突处理)
只输出 JSON。"""
USER_TEMPLATE = """游客问题:{question}
从贵州旅游图谱检索到的实体name/type/key_fields/evidence_count/empty_fields
{retrieved}
输出 schema只输出 JSON
{{
"coverage_score": 0.0~1.0,
"confidence": 0.0~1.0,
"evidence_count": int,
"matched_entity_ids": ["..."],
"missing_fields": ["field_name", ...],
"scenario_tags": ["雨天","夜间","亲子","美食","夜市","路线"...],
"suggested_action": "hit|gap|low_quality|conflict",
"explanation": "一句话中文解释,说明依据哪些实体/缺什么"
}}"""
# curated per-type fields shown to the LLM, and which count as "important"
_KEY_FIELDS = {
"Place": ["place_type", "area_id", "best_time_start", "best_time_end",
"price_band", "rain_suitability", "must_try_items", "night_vibe_score"],
"Area": ["area_type", "night_identity", "first_timer_friendliness",
"walkability_score", "rain_backup_strength"],
"ExperienceTag": ["tag_id"],
"RouteTemplate": ["theme", "target_user_type", "ideal_total_minutes",
"rhythm_type", "is_rain_compatible"],
}
_IMPORTANT = {
"Place": ["place_type", "area_id", "best_time_start", "must_try_items"],
"Area": ["area_type", "night_identity"],
"ExperienceTag": [],
"RouteTemplate": ["theme", "target_user_type"],
}
_SCENARIOS = ["雨天", "夜间", "夜晚", "亲子", "美食", "小吃", "夜市", "路线",
"地铁", "公交", "第一次", "本地", "打卡", "周边", "室内", "带娃"]
# controlled vocabulary the LLM must map the question onto (proper GraphRAG)
_TAG_VOCAB = [
"authentic_local", "worth_first_visit", "rain_ok", "less_queue",
"night_walk_friendly", "good_after_dinner", "late_night_friendly",
"breakfast_friendly", "daytime_food_friendly", "morning_walk_friendly",
"low_walk", "day_walk_friendly", "daytime_drink_friendly",
"quiet_refreshment", "transit", "metro",
]
_PLACE_TYPES = ["eat", "walk", "drink", "transit_stop"]
_INTENT_SYSTEM = """你是贵州旅游知识图谱的检索意图解析器。把游客问题映射到图谱的受控词表,
只输出 JSON不要解释。
place_type 仅可选eat(吃/美食/小吃/餐馆) / walk(逛/散步/夜市/步道/景点/打卡) /
drink(喝/酒吧/咖啡/茶) / transit_stop(地铁/公交/交通站)
tags 仅可从此列表选(英文键):
authentic_local(本地正宗) worth_first_visit(第一次必去) rain_ok(雨天可去)
less_queue(不排队) night_walk_friendly(适合夜间散步) good_after_dinner(饭后)
late_night_friendly(深夜) breakfast_friendly(早餐) daytime_food_friendly(白天吃)
morning_walk_friendly(晨间散步) low_walk(少走路) day_walk_friendly(白天散步)
daytime_drink_friendly(白天喝) quiet_refreshment(安静小憩) transit(交通枢纽) metro(地铁)
areas问题里出现的贵州区域/片区中文名(如 甲秀楼、青云市集、南明区、贵阳…),没有就空。
keywords其它有用中文检索词。
输出:{"areas":[],"place_types":[],"tags":[],"keywords":[]}"""
@dataclass(frozen=True)
class AuditResult:
trace_id: int
coverage_score: float
confidence: float
evidence_count: int
matched_entity_ids: list[str]
missing_fields: list[str]
scenario_tags: list[str]
suggested_action: Literal["hit", "gap", "low_quality", "conflict"]
explanation: str | None = None
async def _build_llm() -> LlmClient | None:
"""Construct the LLM client from Agent settings (global + auditor override).
This is the "consumer wiring": the key/model/base_url an admin saves on the
Agent 设置 page is what actually drives the audit.
"""
try:
cfg = await get_agent_settings()
except Exception:
cfg = {}
g = cfg.get("global", {}) if cfg else {}
a = (cfg.get("agents", {}) or {}).get("auditor", {}) if cfg else {}
api_key = a.get("api_key") or g.get("api_key") or settings.llm_api_key
if not api_key:
return None
base = (a.get("base_url") or g.get("base_url")
or settings.llm_api_base or "https://api.deepseek.com/v1")
model = a.get("model") or g.get("model") or settings.llm_model or "deepseek-chat"
timeout = int(g.get("timeout") or settings.llm_timeout_seconds or 30)
return LlmClient(api_base=base, api_key=api_key, model=model, timeout=timeout)
def _load_catalog() -> list[dict]:
"""Pull all named nodes from FalkorDB once per audit run."""
from app.api.graph import _get_graph
g = _get_graph()
res = g.query(
"MATCH (n) WHERE n.name IS NOT NULL "
"RETURN labels(n)[0] AS t, n AS node LIMIT 5000"
)
catalog: list[dict] = []
for row in res.result_set:
node = row[1]
props = getattr(node, "properties", {}) or {}
nm = str(props.get("name") or "").strip()
if nm:
catalog.append({"type": row[0] or "Node", "name": nm, "props": props})
return catalog
def _evidence_counts(names: list[str]) -> dict:
if not names:
return {}
from app.api.graph import _get_graph
g = _get_graph()
try:
res = g.query(
"MATCH (n)-[r]-() WHERE n.name IN $names "
"RETURN n.name AS nm, count(r) AS c",
{"names": names},
)
return {row[0]: int(row[1]) for row in res.result_set}
except Exception:
return {}
def _retrieve(question: str, catalog: list[dict]) -> tuple[list[dict], list[str]]:
"""Substring-match the question against graph entity names; gather evidence."""
# non-Area entities first (Area names are noisy at 1600+ rows)
hits: list[dict] = []
for c in catalog:
nm = c["name"]
if len(nm) >= 2 and nm in question:
hits.append(c)
hits.sort(key=lambda c: 0 if c["type"] != "Area" else 1)
hits = hits[:20]
names = [h["name"] for h in hits]
deg = _evidence_counts(names)
retrieved = []
for h in hits:
t, props = h["type"], h["props"]
kf = {k: props.get(k) for k in _KEY_FIELDS.get(t, []) if props.get(k) not in (None, "")}
empty = [k for k in _IMPORTANT.get(t, []) if props.get(k) in (None, "", [], "[]")]
retrieved.append({
"name": h["name"],
"type": t,
"key_fields": kf,
"evidence_count": deg.get(h["name"], 0),
"empty_fields": empty,
})
scen = [s for s in _SCENARIOS if s in question]
return retrieved, scen
def _extract_intent(question: str, llm: LlmClient) -> dict | None:
"""Step 1 of GraphRAG: LLM maps the question to the controlled vocabulary."""
try:
out = llm.chat_json(system=_INTENT_SYSTEM, user=f"问题:{question}")
return {
"areas": [str(x) for x in (out.get("areas") or [])],
"place_types": [x for x in (out.get("place_types") or []) if x in _PLACE_TYPES],
"tags": [x for x in (out.get("tags") or []) if x in _TAG_VOCAB],
"keywords": [str(x) for x in (out.get("keywords") or [])],
}
except Exception:
return None
def _graph_search(intent: dict) -> list[dict]:
"""Step 2 of GraphRAG: query FalkorDB by the structured intent."""
from app.api.graph import _get_graph
g = _get_graph()
areas = intent.get("areas") or []
ptypes = intent.get("place_types") or []
tags = intent.get("tags") or []
kws = [k for k in (intent.get("keywords") or []) if len(k) >= 2]
# one enriched pass over all places (107 rows — cheap)
try:
res = g.query(
"MATCH (p:Place) "
"OPTIONAL MATCH (p)-[:LOCATED_IN]->(a:Area) "
"OPTIONAL MATCH (p)-[:HAS_TAG]->(t:ExperienceTag) "
"RETURN p AS p, a.name AS area, collect(DISTINCT t.name) AS tags"
)
except Exception:
return []
scored: list[tuple[int, dict]] = []
for row in res.result_set:
node = row[0]
props = getattr(node, "properties", {}) or {}
area = row[1]
ptags = [x for x in (row[2] or []) if x]
nm = str(props.get("name") or "")
score = 0
if area and any(a and a in area for a in areas):
score += 2
if props.get("place_type") in ptypes:
score += 2
tag_hit = len(set(ptags) & set(tags))
score += tag_hit
if any(k in nm for k in kws):
score += 1
if score <= 0:
continue
scored.append((score, {
"name": nm, "type": "Place",
"key_fields": {k: props.get(k) for k in _KEY_FIELDS["Place"]
if props.get(k) not in (None, "")},
"tags": ptags,
"area": area,
"empty_fields": [k for k in _IMPORTANT["Place"]
if props.get(k) in (None, "", [], "[]")],
}))
scored.sort(key=lambda x: x[0], reverse=True)
retrieved = [d for _, d in scored[:15]]
# also surface matched Area nodes themselves
if areas:
try:
ar = g.query(
"MATCH (a:Area) WHERE a.name IN $names RETURN a",
{"names": areas},
)
for row in ar.result_set:
ap = getattr(row[0], "properties", {}) or {}
retrieved.append({
"name": str(ap.get("name") or ""), "type": "Area",
"key_fields": {k: ap.get(k) for k in _KEY_FIELDS["Area"]
if ap.get(k) not in (None, "")},
"empty_fields": [k for k in _IMPORTANT["Area"]
if ap.get(k) in (None, "", [], "[]")],
})
except Exception:
pass
names = [r["name"] for r in retrieved]
deg = _evidence_counts(names)
for r in retrieved:
r["evidence_count"] = deg.get(r["name"], 0)
return retrieved
def _rule_based_audit(question: str, retrieved: list[dict], scen: list[str]) -> AuditResult:
"""Fallback when no LLM: score from real retrieval, not toy keywords."""
n = len(retrieved)
total_ev = sum(r["evidence_count"] for r in retrieved)
any_empty = any(r["empty_fields"] for r in retrieved)
if n == 0:
action, score = "gap", 0.1
elif any_empty or total_ev == 0:
action, score = "low_quality", 0.45
else:
action, score = "hit", min(0.6 + 0.08 * n, 0.9)
miss = sorted({f for r in retrieved for f in r["empty_fields"]}) or (["相关实体"] if n == 0 else [])
return AuditResult(
trace_id=0, coverage_score=score, confidence=0.4,
evidence_count=total_ev,
matched_entity_ids=[r["name"] for r in retrieved],
missing_fields=miss, scenario_tags=scen,
suggested_action=action,
explanation=f"图谱检索评估(无 LLM命中 {n} 个实体" + (",存在空字段" if any_empty else ""),
)
async def audit_single_trace(
trace: dict, llm: LlmClient | None = None, catalog: list[dict] | None = None
) -> AuditResult:
question = trace["question_text"]
trace_id = trace["id"]
# ── GraphRAG retrieval ──
if llm and llm.available():
intent = await asyncio.to_thread(_extract_intent, question, llm)
if intent is not None:
retrieved = await asyncio.to_thread(_graph_search, intent)
scen = (intent.get("tags") or []) + [s for s in _SCENARIOS if s in question]
else:
if catalog is None:
try:
catalog = _load_catalog()
except Exception:
catalog = []
retrieved, scen = _retrieve(question, catalog)
else:
if catalog is None:
try:
catalog = _load_catalog()
except Exception:
catalog = []
retrieved, scen = _retrieve(question, catalog)
if llm and llm.available():
try:
result = await asyncio.to_thread(
llm.chat_json,
SYSTEM_PROMPT,
USER_TEMPLATE.format(
question=question,
retrieved=json.dumps(retrieved, ensure_ascii=False, indent=2),
),
)
return AuditResult(
trace_id=trace_id,
coverage_score=float(result.get("coverage_score", 0.5)),
confidence=float(result.get("confidence", 0.5)),
evidence_count=int(result.get("evidence_count", len(retrieved))),
matched_entity_ids=result.get("matched_entity_ids")
or [r["name"] for r in retrieved],
missing_fields=result.get("missing_fields", []),
scenario_tags=result.get("scenario_tags") or scen,
suggested_action=result.get("suggested_action", "gap"),
explanation=result.get("explanation"),
)
except Exception:
pass
r = _rule_based_audit(question, retrieved, scen)
return AuditResult(
trace_id=trace_id, coverage_score=r.coverage_score, confidence=r.confidence,
evidence_count=r.evidence_count, matched_entity_ids=r.matched_entity_ids,
missing_fields=r.missing_fields, scenario_tags=r.scenario_tags,
suggested_action=r.suggested_action, explanation=r.explanation,
)
# action → 工单元数据gap=补藏 / low_quality=完善 / conflict=冲突核查)
_ACTION_TASK = {
"gap": {"prefix": "补藏", "desc": "AI稽查发现知识缺口缺相关实体",
"priority": 4, "note": "该城区缺失数据,请落实采集"},
"low_quality": {"prefix": "完善", "desc": "AI稽查图谱已有数据但关键字段缺失/证据不足,需完善纠错",
"priority": 3, "note": "该城区数据不完整,请完善"},
"conflict": {"prefix": "冲突核查", "desc": "AI稽查命中但数据相互矛盾需冲突处理",
"priority": 5, "note": "该城区数据存在矛盾,请核查"},
}
_BG_TASKS: set = set()
async def _bg_audit(trace_ids: list[int], run_id: int) -> None:
from app.db import finish_audit_run
try:
await run_audit_for_traces(trace_ids, run_id)
await finish_audit_run(run_id, "done")
except Exception as e: # noqa: BLE001
try:
await finish_audit_run(run_id, "error", str(e)[:300])
except Exception:
pass
def schedule_audit(trace_ids: list[int], run_id: int) -> None:
"""Fire-and-forget background audit; progress tracked in audit_runs."""
t = asyncio.create_task(_bg_audit(trace_ids, run_id))
_BG_TASKS.add(t)
t.add_done_callback(_BG_TASKS.discard)
async def run_audit_for_traces(trace_ids: list[int], run_id: int | None = None) -> list[dict]:
"""Run audit on a batch of traces, persist results, route work-orders."""
from app.db import (
create_acquisition_task, create_notification, get_area_responsible,
resolve_area_from_entities, set_task_routing, bump_audit_run,
)
llm = await _build_llm()
try:
catalog = await asyncio.to_thread(_load_catalog)
except Exception:
catalog = []
results = []
for tid in trace_ids:
trace = await get_question_trace(tid)
if not trace:
continue
result = await audit_single_trace(trace, llm, catalog)
await update_question_trace(tid, {
"coverage_score": result.coverage_score,
"confidence": result.confidence,
"evidence_count": result.evidence_count,
"matched_entity_ids": json.dumps(result.matched_entity_ids),
"missing_fields": json.dumps(result.missing_fields),
"scenario_tags": json.dumps(result.scenario_tags),
"suggested_action": result.suggested_action,
"evaluated_at": "now()",
})
# Route gap / low_quality / conflict → a work-order; hit = no-op
meta = _ACTION_TASK.get(result.suggested_action)
if meta:
task = await create_acquisition_task({
"tenant_id": settings.default_tenant,
"project_id": settings.default_project,
"created_by": "auditor",
"triggered_by_trace_id": tid,
"title": f"{meta['prefix']}: {trace['question_text'][:60]}",
"description": result.explanation or meta["desc"],
"scenario_tags": json.dumps(result.scenario_tags),
"target_entity_types": json.dumps([]),
"target_fields": json.dumps(result.missing_fields),
"suggested_collection_method": "manual",
"priority": meta["priority"],
})
try:
area_id = await resolve_area_from_entities(result.matched_entity_ids)
responsible = await get_area_responsible(area_id) if area_id else None
if task and responsible:
await set_task_routing(task["id"], area_id, responsible["username"])
await create_notification(
user_id=responsible["id"],
title=f"{meta['prefix']}工单({area_id}",
body=f"{trace['question_text'][:80]}{meta['note']}",
ntype="task",
related_task_id=task["id"],
area_id=area_id,
)
except Exception:
pass # routing/notification best-effort; never block audit
if run_id is not None:
try:
await bump_audit_run(run_id, result.suggested_action)
except Exception:
pass
results.append({
"trace_id": tid,
"coverage_score": result.coverage_score,
"suggested_action": result.suggested_action,
})
return results