"""STEP 02 — Quality Audit Agent (GraphRAG). For each tourist question we actually retrieve evidence from the FalkorDB knowledge graph, then let the LLM judge coverage/quality against that real evidence. Domain is scoped to **Guizhou tourism visitor service**. Suggested actions: hit / gap / low_quality / conflict. """ from __future__ import annotations import asyncio import json from dataclasses import dataclass from typing import Literal from app.config import settings from app.db import get_question_trace, update_question_trace, get_agent_settings from app.llm_client import LlmClient SYSTEM_PROMPT = """你是「贵州旅游知识图谱」的质量稽查员。该图谱服务于来贵州旅游的游客, 覆盖:地点(Place)、区域(Area,贵州行政区)、体验标签(ExperienceTag)、线路模板(RouteTemplate)。 给你一个游客问题,以及【从图谱真实检索到的实体证据】。请只依据这些真实证据判断 图谱能否答好这个问题——检索不到相关实体即为缺失,不要凭常识脑补。 判定动作: - hit: 命中且关键字段齐全、证据充分 - gap: 图谱缺失相关实体(需要补藏新数据) - low_quality: 命中但关键字段空缺/证据不足(需要完善纠错) - conflict: 命中但数据互相矛盾(需要冲突处理) 只输出 JSON。""" USER_TEMPLATE = """游客问题:{question} 从贵州旅游图谱检索到的实体(name/type/key_fields/evidence_count/empty_fields): {retrieved} 输出 schema(只输出 JSON): {{ "coverage_score": 0.0~1.0, "confidence": 0.0~1.0, "evidence_count": int, "matched_entity_ids": ["..."], "missing_fields": ["field_name", ...], "scenario_tags": ["雨天","夜间","亲子","美食","夜市","路线"...], "suggested_action": "hit|gap|low_quality|conflict", "explanation": "一句话中文解释,说明依据哪些实体/缺什么" }}""" # curated per-type fields shown to the LLM, and which count as "important" _KEY_FIELDS = { "Place": ["place_type", "area_id", "best_time_start", "best_time_end", "price_band", "rain_suitability", "must_try_items", "night_vibe_score"], "Area": ["area_type", "night_identity", "first_timer_friendliness", "walkability_score", "rain_backup_strength"], "ExperienceTag": ["tag_id"], "RouteTemplate": ["theme", "target_user_type", "ideal_total_minutes", "rhythm_type", "is_rain_compatible"], } _IMPORTANT = { "Place": ["place_type", "area_id", "best_time_start", "must_try_items"], "Area": ["area_type", "night_identity"], "ExperienceTag": [], "RouteTemplate": ["theme", "target_user_type"], } _SCENARIOS = ["雨天", "夜间", "夜晚", "亲子", "美食", "小吃", "夜市", "路线", "地铁", "公交", "第一次", "本地", "打卡", "周边", "室内", "带娃"] # controlled vocabulary the LLM must map the question onto (proper GraphRAG) _TAG_VOCAB = [ "authentic_local", "worth_first_visit", "rain_ok", "less_queue", "night_walk_friendly", "good_after_dinner", "late_night_friendly", "breakfast_friendly", "daytime_food_friendly", "morning_walk_friendly", "low_walk", "day_walk_friendly", "daytime_drink_friendly", "quiet_refreshment", "transit", "metro", ] _PLACE_TYPES = ["eat", "walk", "drink", "transit_stop"] _INTENT_SYSTEM = """你是贵州旅游知识图谱的检索意图解析器。把游客问题映射到图谱的受控词表, 只输出 JSON,不要解释。 place_type 仅可选:eat(吃/美食/小吃/餐馆) / walk(逛/散步/夜市/步道/景点/打卡) / drink(喝/酒吧/咖啡/茶) / transit_stop(地铁/公交/交通站) tags 仅可从此列表选(英文键): authentic_local(本地正宗) worth_first_visit(第一次必去) rain_ok(雨天可去) less_queue(不排队) night_walk_friendly(适合夜间散步) good_after_dinner(饭后) late_night_friendly(深夜) breakfast_friendly(早餐) daytime_food_friendly(白天吃) morning_walk_friendly(晨间散步) low_walk(少走路) day_walk_friendly(白天散步) daytime_drink_friendly(白天喝) quiet_refreshment(安静小憩) transit(交通枢纽) metro(地铁) areas:问题里出现的贵州区域/片区中文名(如 甲秀楼、青云市集、南明区、贵阳…),没有就空。 keywords:其它有用中文检索词。 输出:{"areas":[],"place_types":[],"tags":[],"keywords":[]}""" @dataclass(frozen=True) class AuditResult: trace_id: int coverage_score: float confidence: float evidence_count: int matched_entity_ids: list[str] missing_fields: list[str] scenario_tags: list[str] suggested_action: Literal["hit", "gap", "low_quality", "conflict"] explanation: str | None = None async def _build_llm() -> LlmClient | None: """Construct the LLM client from Agent settings (global + auditor override). This is the "consumer wiring": the key/model/base_url an admin saves on the Agent 设置 page is what actually drives the audit. """ try: cfg = await get_agent_settings() except Exception: cfg = {} g = cfg.get("global", {}) if cfg else {} a = (cfg.get("agents", {}) or {}).get("auditor", {}) if cfg else {} api_key = a.get("api_key") or g.get("api_key") or settings.llm_api_key if not api_key: return None base = (a.get("base_url") or g.get("base_url") or settings.llm_api_base or "https://api.deepseek.com/v1") model = a.get("model") or g.get("model") or settings.llm_model or "deepseek-chat" timeout = int(g.get("timeout") or settings.llm_timeout_seconds or 30) return LlmClient(api_base=base, api_key=api_key, model=model, timeout=timeout) def _load_catalog() -> list[dict]: """Pull all named nodes from FalkorDB once per audit run.""" from app.api.graph import _get_graph g = _get_graph() res = g.query( "MATCH (n) WHERE n.name IS NOT NULL " "RETURN labels(n)[0] AS t, n AS node LIMIT 5000" ) catalog: list[dict] = [] for row in res.result_set: node = row[1] props = getattr(node, "properties", {}) or {} nm = str(props.get("name") or "").strip() if nm: catalog.append({"type": row[0] or "Node", "name": nm, "props": props}) return catalog def _evidence_counts(names: list[str]) -> dict: if not names: return {} from app.api.graph import _get_graph g = _get_graph() try: res = g.query( "MATCH (n)-[r]-() WHERE n.name IN $names " "RETURN n.name AS nm, count(r) AS c", {"names": names}, ) return {row[0]: int(row[1]) for row in res.result_set} except Exception: return {} def _retrieve(question: str, catalog: list[dict]) -> tuple[list[dict], list[str]]: """Substring-match the question against graph entity names; gather evidence.""" # non-Area entities first (Area names are noisy at 1600+ rows) hits: list[dict] = [] for c in catalog: nm = c["name"] if len(nm) >= 2 and nm in question: hits.append(c) hits.sort(key=lambda c: 0 if c["type"] != "Area" else 1) hits = hits[:20] names = [h["name"] for h in hits] deg = _evidence_counts(names) retrieved = [] for h in hits: t, props = h["type"], h["props"] kf = {k: props.get(k) for k in _KEY_FIELDS.get(t, []) if props.get(k) not in (None, "")} empty = [k for k in _IMPORTANT.get(t, []) if props.get(k) in (None, "", [], "[]")] retrieved.append({ "name": h["name"], "type": t, "key_fields": kf, "evidence_count": deg.get(h["name"], 0), "empty_fields": empty, }) scen = [s for s in _SCENARIOS if s in question] return retrieved, scen def _extract_intent(question: str, llm: LlmClient) -> dict | None: """Step 1 of GraphRAG: LLM maps the question to the controlled vocabulary.""" try: out = llm.chat_json(system=_INTENT_SYSTEM, user=f"问题:{question}") return { "areas": [str(x) for x in (out.get("areas") or [])], "place_types": [x for x in (out.get("place_types") or []) if x in _PLACE_TYPES], "tags": [x for x in (out.get("tags") or []) if x in _TAG_VOCAB], "keywords": [str(x) for x in (out.get("keywords") or [])], } except Exception: return None def _graph_search(intent: dict) -> list[dict]: """Step 2 of GraphRAG: query FalkorDB by the structured intent.""" from app.api.graph import _get_graph g = _get_graph() areas = intent.get("areas") or [] ptypes = intent.get("place_types") or [] tags = intent.get("tags") or [] kws = [k for k in (intent.get("keywords") or []) if len(k) >= 2] # one enriched pass over all places (107 rows — cheap) try: res = g.query( "MATCH (p:Place) " "OPTIONAL MATCH (p)-[:LOCATED_IN]->(a:Area) " "OPTIONAL MATCH (p)-[:HAS_TAG]->(t:ExperienceTag) " "RETURN p AS p, a.name AS area, collect(DISTINCT t.name) AS tags" ) except Exception: return [] scored: list[tuple[int, dict]] = [] for row in res.result_set: node = row[0] props = getattr(node, "properties", {}) or {} area = row[1] ptags = [x for x in (row[2] or []) if x] nm = str(props.get("name") or "") score = 0 if area and any(a and a in area for a in areas): score += 2 if props.get("place_type") in ptypes: score += 2 tag_hit = len(set(ptags) & set(tags)) score += tag_hit if any(k in nm for k in kws): score += 1 if score <= 0: continue scored.append((score, { "name": nm, "type": "Place", "key_fields": {k: props.get(k) for k in _KEY_FIELDS["Place"] if props.get(k) not in (None, "")}, "tags": ptags, "area": area, "empty_fields": [k for k in _IMPORTANT["Place"] if props.get(k) in (None, "", [], "[]")], })) scored.sort(key=lambda x: x[0], reverse=True) retrieved = [d for _, d in scored[:15]] # also surface matched Area nodes themselves if areas: try: ar = g.query( "MATCH (a:Area) WHERE a.name IN $names RETURN a", {"names": areas}, ) for row in ar.result_set: ap = getattr(row[0], "properties", {}) or {} retrieved.append({ "name": str(ap.get("name") or ""), "type": "Area", "key_fields": {k: ap.get(k) for k in _KEY_FIELDS["Area"] if ap.get(k) not in (None, "")}, "empty_fields": [k for k in _IMPORTANT["Area"] if ap.get(k) in (None, "", [], "[]")], }) except Exception: pass names = [r["name"] for r in retrieved] deg = _evidence_counts(names) for r in retrieved: r["evidence_count"] = deg.get(r["name"], 0) return retrieved def _rule_based_audit(question: str, retrieved: list[dict], scen: list[str]) -> AuditResult: """Fallback when no LLM: score from real retrieval, not toy keywords.""" n = len(retrieved) total_ev = sum(r["evidence_count"] for r in retrieved) any_empty = any(r["empty_fields"] for r in retrieved) if n == 0: action, score = "gap", 0.1 elif any_empty or total_ev == 0: action, score = "low_quality", 0.45 else: action, score = "hit", min(0.6 + 0.08 * n, 0.9) miss = sorted({f for r in retrieved for f in r["empty_fields"]}) or (["相关实体"] if n == 0 else []) return AuditResult( trace_id=0, coverage_score=score, confidence=0.4, evidence_count=total_ev, matched_entity_ids=[r["name"] for r in retrieved], missing_fields=miss, scenario_tags=scen, suggested_action=action, explanation=f"图谱检索评估(无 LLM):命中 {n} 个实体" + (",存在空字段" if any_empty else ""), ) async def audit_single_trace( trace: dict, llm: LlmClient | None = None, catalog: list[dict] | None = None ) -> AuditResult: question = trace["question_text"] trace_id = trace["id"] # ── GraphRAG retrieval ── if llm and llm.available(): intent = await asyncio.to_thread(_extract_intent, question, llm) if intent is not None: retrieved = await asyncio.to_thread(_graph_search, intent) scen = (intent.get("tags") or []) + [s for s in _SCENARIOS if s in question] else: if catalog is None: try: catalog = _load_catalog() except Exception: catalog = [] retrieved, scen = _retrieve(question, catalog) else: if catalog is None: try: catalog = _load_catalog() except Exception: catalog = [] retrieved, scen = _retrieve(question, catalog) if llm and llm.available(): try: result = await asyncio.to_thread( llm.chat_json, SYSTEM_PROMPT, USER_TEMPLATE.format( question=question, retrieved=json.dumps(retrieved, ensure_ascii=False, indent=2), ), ) return AuditResult( trace_id=trace_id, coverage_score=float(result.get("coverage_score", 0.5)), confidence=float(result.get("confidence", 0.5)), evidence_count=int(result.get("evidence_count", len(retrieved))), matched_entity_ids=result.get("matched_entity_ids") or [r["name"] for r in retrieved], missing_fields=result.get("missing_fields", []), scenario_tags=result.get("scenario_tags") or scen, suggested_action=result.get("suggested_action", "gap"), explanation=result.get("explanation"), ) except Exception: pass r = _rule_based_audit(question, retrieved, scen) return AuditResult( trace_id=trace_id, coverage_score=r.coverage_score, confidence=r.confidence, evidence_count=r.evidence_count, matched_entity_ids=r.matched_entity_ids, missing_fields=r.missing_fields, scenario_tags=r.scenario_tags, suggested_action=r.suggested_action, explanation=r.explanation, ) # action → 工单元数据(gap=补藏 / low_quality=完善 / conflict=冲突核查) _ACTION_TASK = { "gap": {"prefix": "补藏", "desc": "AI稽查发现知识缺口(缺相关实体)", "priority": 4, "note": "该城区缺失数据,请落实采集"}, "low_quality": {"prefix": "完善", "desc": "AI稽查:图谱已有数据但关键字段缺失/证据不足,需完善纠错", "priority": 3, "note": "该城区数据不完整,请完善"}, "conflict": {"prefix": "冲突核查", "desc": "AI稽查:命中但数据相互矛盾,需冲突处理", "priority": 5, "note": "该城区数据存在矛盾,请核查"}, } _BG_TASKS: set = set() async def _bg_audit(trace_ids: list[int], run_id: int) -> None: from app.db import finish_audit_run try: await run_audit_for_traces(trace_ids, run_id) await finish_audit_run(run_id, "done") except Exception as e: # noqa: BLE001 try: await finish_audit_run(run_id, "error", str(e)[:300]) except Exception: pass def schedule_audit(trace_ids: list[int], run_id: int) -> None: """Fire-and-forget background audit; progress tracked in audit_runs.""" t = asyncio.create_task(_bg_audit(trace_ids, run_id)) _BG_TASKS.add(t) t.add_done_callback(_BG_TASKS.discard) async def run_audit_for_traces(trace_ids: list[int], run_id: int | None = None) -> list[dict]: """Run audit on a batch of traces, persist results, route work-orders.""" from app.db import ( create_acquisition_task, create_notification, get_area_responsible, resolve_area_from_entities, set_task_routing, bump_audit_run, ) llm = await _build_llm() try: catalog = await asyncio.to_thread(_load_catalog) except Exception: catalog = [] results = [] for tid in trace_ids: trace = await get_question_trace(tid) if not trace: continue result = await audit_single_trace(trace, llm, catalog) await update_question_trace(tid, { "coverage_score": result.coverage_score, "confidence": result.confidence, "evidence_count": result.evidence_count, "matched_entity_ids": json.dumps(result.matched_entity_ids), "missing_fields": json.dumps(result.missing_fields), "scenario_tags": json.dumps(result.scenario_tags), "suggested_action": result.suggested_action, "evaluated_at": "now()", }) # Route gap / low_quality / conflict → a work-order; hit = no-op meta = _ACTION_TASK.get(result.suggested_action) if meta: task = await create_acquisition_task({ "tenant_id": settings.default_tenant, "project_id": settings.default_project, "created_by": "auditor", "triggered_by_trace_id": tid, "title": f"{meta['prefix']}: {trace['question_text'][:60]}", "description": result.explanation or meta["desc"], "scenario_tags": json.dumps(result.scenario_tags), "target_entity_types": json.dumps([]), "target_fields": json.dumps(result.missing_fields), "suggested_collection_method": "manual", "priority": meta["priority"], }) try: area_id = await resolve_area_from_entities(result.matched_entity_ids) responsible = await get_area_responsible(area_id) if area_id else None if task and responsible: await set_task_routing(task["id"], area_id, responsible["username"]) await create_notification( user_id=responsible["id"], title=f"新{meta['prefix']}工单({area_id})", body=f"{trace['question_text'][:80]} — {meta['note']}。", ntype="task", related_task_id=task["id"], area_id=area_id, ) except Exception: pass # routing/notification best-effort; never block audit if run_id is not None: try: await bump_audit_run(run_id, result.suggested_action) except Exception: pass results.append({ "trace_id": tid, "coverage_score": result.coverage_score, "suggested_action": result.suggested_action, }) return results