"""Evidence quality review API. Read-only endpoints that join graph POIs with their social raw evidence. """ from __future__ import annotations from typing import Any from falkordb import FalkorDB from fastapi import APIRouter, HTTPException, Query from app.auth import CurrentUser from app.config import settings from app.db import get_conn router = APIRouter() def _get_graph(): db = FalkorDB(host=settings.falkordb_host, port=settings.falkordb_port) return db.select_graph(settings.falkordb_graph) def _node_props(node: Any) -> dict[str, Any]: props = getattr(node, "properties", None) or {} return dict(props) def _normalize_platform(platform: str | None) -> str | None: p = (platform or "").strip() if not p or p == "all": return None if p == "xiaohongshu": return "xhs" return p def _tag_source_label(source: str | None) -> str: if source == "xiaohongshu": return "xhs" return source or "seed" def _row_int(row: dict[str, Any], key: str) -> int: return int(row.get(key) or 0) async def _evidence_summary( *, keyword: str | None, platform: str | None, limit: int, offset: int, ) -> tuple[list[dict[str, Any]], int]: s = settings.db_schema kw = (keyword or "").strip() like = f"%{kw}%" if kw else None plat = _normalize_platform(platform) where = [ "place_natural_key IS NOT NULL", "place_natural_key <> ''", ] params: list[Any] = [] if plat: where.append("platform = %s") params.append(plat) if like: where.append( "(entity_name ILIKE %s OR keyword ILIKE %s OR " "title ILIKE %s OR content ILIKE %s)" ) params.extend([like, like, like, like]) where_sql = " AND ".join(where) async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""SELECT count(*) AS total FROM ( SELECT place_natural_key FROM {s}.social_evidence WHERE {where_sql} GROUP BY place_natural_key ) x""", params, ) total = _row_int(await cur.fetchone(), "total") await cur.execute( f"""WITH filtered AS ( SELECT * FROM {s}.social_evidence WHERE {where_sql} ), grouped AS ( SELECT place_natural_key, max(entity_name) FILTER ( WHERE entity_name IS NOT NULL AND entity_name <> '' ) AS evidence_entity_name, count(*) AS evidence_count, max(captured_at) AS latest_captured_at FROM filtered GROUP BY place_natural_key ), platform_counts AS ( SELECT place_natural_key, jsonb_object_agg(platform, cnt ORDER BY platform) AS platform_counts FROM ( SELECT place_natural_key, platform, count(*) AS cnt FROM filtered GROUP BY place_natural_key, platform ) p GROUP BY place_natural_key ) SELECT g.place_natural_key, g.evidence_entity_name, g.evidence_count, g.latest_captured_at, coalesce(pc.platform_counts, '{{}}'::jsonb) AS platform_counts FROM grouped g LEFT JOIN platform_counts pc USING (place_natural_key) ORDER BY g.evidence_count DESC, g.place_natural_key LIMIT %s OFFSET %s""", [*params, limit, offset], ) return await cur.fetchall(), total def _graph_places(keys: list[str]) -> dict[str, dict[str, Any]]: if not keys: return {} g = _get_graph() result = g.query( "MATCH (p:Place) WHERE p.element_id IN $keys RETURN p", {"keys": keys}, ) out: dict[str, dict[str, Any]] = {} for row in result.result_set: props = _node_props(row[0]) key = str(props.get("element_id") or "") if key: out[key] = props return out def _graph_tags(keys: list[str]) -> dict[str, list[dict[str, str]]]: if not keys: return {} g = _get_graph() result = g.query( "MATCH (p:Place)-[:HAS_TAG]->(t:ExperienceTag) " "WHERE p.element_id IN $keys " "RETURN p.element_id, t.name, coalesce(t.source,'seed')", {"keys": keys}, ) out: dict[str, list[dict[str, str]]] = {key: [] for key in keys} seen: set[tuple[str, str, str]] = set() for row in result.result_set: key = str(row[0] or "") name = str(row[1] or "") source = _tag_source_label(str(row[2] or "seed")) sig = (key, name, source) if key and name and sig not in seen: seen.add(sig) out.setdefault(key, []).append({"name": name, "source": source}) return out def _place_from_graph_or_evidence( key: str, graph: dict[str, Any] | None, row: dict[str, Any] | None = None, ) -> dict[str, Any]: graph = graph or {} row = row or {} return { "element_id": key, "name": graph.get("name") or row.get("evidence_entity_name") or key, "place_type": graph.get("place_type") or "", "station_type": graph.get("station_type") or "", "district": graph.get("district") or graph.get("area_name") or "", "address": graph.get("address") or "", "location": graph.get("location") or "", "longitude": graph.get("longitude") or graph.get("lng"), "latitude": graph.get("latitude") or graph.get("lat"), "tel": graph.get("tel") or graph.get("phone") or "", "rating": graph.get("rating") or graph.get("biz_ext_rating") or "", "raw_graph": graph, } @router.get("/evidence/poi-quality") async def list_poi_quality( keyword: str | None = None, platform: str | None = None, limit: int = Query(50, ge=1, le=100), offset: int = Query(0, ge=0), _user: CurrentUser = None, ): """List POIs with social evidence and the tags written back to the graph.""" try: rows, total = await _evidence_summary( keyword=keyword, platform=platform, limit=limit, offset=offset, ) keys = [str(r["place_natural_key"]) for r in rows] places = _graph_places(keys) tags = _graph_tags(keys) items = [] for r in rows: key = str(r["place_natural_key"]) tag_list = tags.get(key, []) tag_sources = sorted({t["source"] for t in tag_list if t.get("source")}) items.append({ **_place_from_graph_or_evidence(key, places.get(key), r), "evidence_count": _row_int(r, "evidence_count"), "platform_counts": r.get("platform_counts") or {}, "latest_captured_at": r.get("latest_captured_at"), "tags": tag_list[:16], "tag_count": len(tag_list), "tag_sources": tag_sources, }) return { "items": items, "total": total, "limit": limit, "offset": offset, "platform": _normalize_platform(platform) or "all", } except Exception as exc: raise HTTPException(500, f"Failed to load POI quality data: {str(exc)[:300]}") @router.get("/evidence/poi-quality/{place_key:path}") async def get_poi_quality( place_key: str, platform: str | None = None, limit: int = Query(80, ge=1, le=300), _user: CurrentUser = None, ): """Return one POI, graph tags, and raw social evidence rows.""" key = place_key.strip() if not key: raise HTTPException(400, "place_key is required") try: places = _graph_places([key]) tag_map = _graph_tags([key]) plat = _normalize_platform(platform) s = settings.db_schema where = ["place_natural_key = %s"] params: list[Any] = [key] if plat: where.append("platform = %s") params.append(plat) where_sql = " AND ".join(where) async with get_conn() as conn: async with conn.cursor() as cur: await cur.execute( f"""SELECT platform, count(*) AS count FROM {s}.social_evidence WHERE {where_sql} GROUP BY platform ORDER BY platform""", params, ) platform_counts = { str(r["platform"]): _row_int(r, "count") for r in await cur.fetchall() } await cur.execute( f"""SELECT id, platform, kind, source_id, url, entity_name, place_natural_key, keyword, title, content, author, author_id, likes, comments, collects, shares, publish_time, location, tags, image_urls, raw_jsonb, captured_at FROM {s}.social_evidence WHERE {where_sql} ORDER BY likes DESC NULLS LAST, comments DESC NULLS LAST, captured_at DESC LIMIT %s""", [*params, limit], ) evidence = await cur.fetchall() return { "place": _place_from_graph_or_evidence(key, places.get(key)), "tags": tag_map.get(key, []), "tag_count": len(tag_map.get(key, [])), "platform_counts": platform_counts, "evidence": evidence, "evidence_count": sum(platform_counts.values()), } except Exception as exc: raise HTTPException(500, f"Failed to load POI evidence: {str(exc)[:300]}")