Files
bxh/app/api/evidence.py

298 lines
10 KiB
Python

"""Evidence quality review API.
Read-only endpoints that join graph POIs with their social raw evidence.
"""
from __future__ import annotations
from typing import Any
from falkordb import FalkorDB
from fastapi import APIRouter, HTTPException, Query
from app.auth import CurrentUser
from app.config import settings
from app.db import get_conn
router = APIRouter()
def _get_graph():
db = FalkorDB(host=settings.falkordb_host, port=settings.falkordb_port)
return db.select_graph(settings.falkordb_graph)
def _node_props(node: Any) -> dict[str, Any]:
props = getattr(node, "properties", None) or {}
return dict(props)
def _normalize_platform(platform: str | None) -> str | None:
p = (platform or "").strip()
if not p or p == "all":
return None
if p == "xiaohongshu":
return "xhs"
return p
def _tag_source_label(source: str | None) -> str:
if source == "xiaohongshu":
return "xhs"
return source or "seed"
def _row_int(row: dict[str, Any], key: str) -> int:
return int(row.get(key) or 0)
async def _evidence_summary(
*,
keyword: str | None,
platform: str | None,
limit: int,
offset: int,
) -> tuple[list[dict[str, Any]], int]:
s = settings.db_schema
kw = (keyword or "").strip()
like = f"%{kw}%" if kw else None
plat = _normalize_platform(platform)
where = [
"place_natural_key IS NOT NULL",
"place_natural_key <> ''",
]
params: list[Any] = []
if plat:
where.append("platform = %s")
params.append(plat)
if like:
where.append(
"(entity_name ILIKE %s OR keyword ILIKE %s OR "
"title ILIKE %s OR content ILIKE %s)"
)
params.extend([like, like, like, like])
where_sql = " AND ".join(where)
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT count(*) AS total
FROM (
SELECT place_natural_key
FROM {s}.social_evidence
WHERE {where_sql}
GROUP BY place_natural_key
) x""",
params,
)
total = _row_int(await cur.fetchone(), "total")
await cur.execute(
f"""WITH filtered AS (
SELECT *
FROM {s}.social_evidence
WHERE {where_sql}
),
grouped AS (
SELECT place_natural_key,
max(entity_name) FILTER (
WHERE entity_name IS NOT NULL AND entity_name <> ''
) AS evidence_entity_name,
count(*) AS evidence_count,
max(captured_at) AS latest_captured_at
FROM filtered
GROUP BY place_natural_key
),
platform_counts AS (
SELECT place_natural_key,
jsonb_object_agg(platform, cnt ORDER BY platform) AS platform_counts
FROM (
SELECT place_natural_key, platform, count(*) AS cnt
FROM filtered
GROUP BY place_natural_key, platform
) p
GROUP BY place_natural_key
)
SELECT g.place_natural_key, g.evidence_entity_name,
g.evidence_count, g.latest_captured_at,
coalesce(pc.platform_counts, '{{}}'::jsonb) AS platform_counts
FROM grouped g
LEFT JOIN platform_counts pc USING (place_natural_key)
ORDER BY g.evidence_count DESC, g.place_natural_key
LIMIT %s OFFSET %s""",
[*params, limit, offset],
)
return await cur.fetchall(), total
def _graph_places(keys: list[str]) -> dict[str, dict[str, Any]]:
if not keys:
return {}
g = _get_graph()
result = g.query(
"MATCH (p:Place) WHERE p.element_id IN $keys RETURN p",
{"keys": keys},
)
out: dict[str, dict[str, Any]] = {}
for row in result.result_set:
props = _node_props(row[0])
key = str(props.get("element_id") or "")
if key:
out[key] = props
return out
def _graph_tags(keys: list[str]) -> dict[str, list[dict[str, str]]]:
if not keys:
return {}
g = _get_graph()
result = g.query(
"MATCH (p:Place)-[:HAS_TAG]->(t:ExperienceTag) "
"WHERE p.element_id IN $keys "
"RETURN p.element_id, t.name, coalesce(t.source,'seed')",
{"keys": keys},
)
out: dict[str, list[dict[str, str]]] = {key: [] for key in keys}
seen: set[tuple[str, str, str]] = set()
for row in result.result_set:
key = str(row[0] or "")
name = str(row[1] or "")
source = _tag_source_label(str(row[2] or "seed"))
sig = (key, name, source)
if key and name and sig not in seen:
seen.add(sig)
out.setdefault(key, []).append({"name": name, "source": source})
return out
def _place_from_graph_or_evidence(
key: str,
graph: dict[str, Any] | None,
row: dict[str, Any] | None = None,
) -> dict[str, Any]:
graph = graph or {}
row = row or {}
return {
"element_id": key,
"name": graph.get("name") or row.get("evidence_entity_name") or key,
"place_type": graph.get("place_type") or "",
"station_type": graph.get("station_type") or "",
"district": graph.get("district") or graph.get("area_name") or "",
"address": graph.get("address") or "",
"location": graph.get("location") or "",
"longitude": graph.get("longitude") or graph.get("lng"),
"latitude": graph.get("latitude") or graph.get("lat"),
"tel": graph.get("tel") or graph.get("phone") or "",
"rating": graph.get("rating") or graph.get("biz_ext_rating") or "",
"raw_graph": graph,
}
@router.get("/evidence/poi-quality")
async def list_poi_quality(
keyword: str | None = None,
platform: str | None = None,
limit: int = Query(50, ge=1, le=100),
offset: int = Query(0, ge=0),
_user: CurrentUser = None,
):
"""List POIs with social evidence and the tags written back to the graph."""
try:
rows, total = await _evidence_summary(
keyword=keyword,
platform=platform,
limit=limit,
offset=offset,
)
keys = [str(r["place_natural_key"]) for r in rows]
places = _graph_places(keys)
tags = _graph_tags(keys)
items = []
for r in rows:
key = str(r["place_natural_key"])
tag_list = tags.get(key, [])
tag_sources = sorted({t["source"] for t in tag_list if t.get("source")})
items.append({
**_place_from_graph_or_evidence(key, places.get(key), r),
"evidence_count": _row_int(r, "evidence_count"),
"platform_counts": r.get("platform_counts") or {},
"latest_captured_at": r.get("latest_captured_at"),
"tags": tag_list[:16],
"tag_count": len(tag_list),
"tag_sources": tag_sources,
})
return {
"items": items,
"total": total,
"limit": limit,
"offset": offset,
"platform": _normalize_platform(platform) or "all",
}
except Exception as exc:
raise HTTPException(500, f"Failed to load POI quality data: {str(exc)[:300]}")
@router.get("/evidence/poi-quality/{place_key:path}")
async def get_poi_quality(
place_key: str,
platform: str | None = None,
limit: int = Query(80, ge=1, le=300),
_user: CurrentUser = None,
):
"""Return one POI, graph tags, and raw social evidence rows."""
key = place_key.strip()
if not key:
raise HTTPException(400, "place_key is required")
try:
places = _graph_places([key])
tag_map = _graph_tags([key])
plat = _normalize_platform(platform)
s = settings.db_schema
where = ["place_natural_key = %s"]
params: list[Any] = [key]
if plat:
where.append("platform = %s")
params.append(plat)
where_sql = " AND ".join(where)
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT platform, count(*) AS count
FROM {s}.social_evidence
WHERE {where_sql}
GROUP BY platform
ORDER BY platform""",
params,
)
platform_counts = {
str(r["platform"]): _row_int(r, "count")
for r in await cur.fetchall()
}
await cur.execute(
f"""SELECT id, platform, kind, source_id, url, entity_name,
place_natural_key, keyword, title, content, author,
author_id, likes, comments, collects, shares,
publish_time, location, tags, image_urls, raw_jsonb,
captured_at
FROM {s}.social_evidence
WHERE {where_sql}
ORDER BY likes DESC NULLS LAST,
comments DESC NULLS LAST,
captured_at DESC
LIMIT %s""",
[*params, limit],
)
evidence = await cur.fetchall()
return {
"place": _place_from_graph_or_evidence(key, places.get(key)),
"tags": tag_map.get(key, []),
"tag_count": len(tag_map.get(key, [])),
"platform_counts": platform_counts,
"evidence": evidence,
"evidence_count": sum(platform_counts.values()),
}
except Exception as exc:
raise HTTPException(500, f"Failed to load POI evidence: {str(exc)[:300]}")