298 lines
10 KiB
Python
298 lines
10 KiB
Python
"""Evidence quality review API.
|
|
|
|
Read-only endpoints that join graph POIs with their social raw evidence.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from falkordb import FalkorDB
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
|
|
from app.auth import CurrentUser
|
|
from app.config import settings
|
|
from app.db import get_conn
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def _get_graph():
|
|
db = FalkorDB(host=settings.falkordb_host, port=settings.falkordb_port)
|
|
return db.select_graph(settings.falkordb_graph)
|
|
|
|
|
|
def _node_props(node: Any) -> dict[str, Any]:
|
|
props = getattr(node, "properties", None) or {}
|
|
return dict(props)
|
|
|
|
|
|
def _normalize_platform(platform: str | None) -> str | None:
|
|
p = (platform or "").strip()
|
|
if not p or p == "all":
|
|
return None
|
|
if p == "xiaohongshu":
|
|
return "xhs"
|
|
return p
|
|
|
|
|
|
def _tag_source_label(source: str | None) -> str:
|
|
if source == "xiaohongshu":
|
|
return "xhs"
|
|
return source or "seed"
|
|
|
|
|
|
def _row_int(row: dict[str, Any], key: str) -> int:
|
|
return int(row.get(key) or 0)
|
|
|
|
|
|
async def _evidence_summary(
|
|
*,
|
|
keyword: str | None,
|
|
platform: str | None,
|
|
limit: int,
|
|
offset: int,
|
|
) -> tuple[list[dict[str, Any]], int]:
|
|
s = settings.db_schema
|
|
kw = (keyword or "").strip()
|
|
like = f"%{kw}%" if kw else None
|
|
plat = _normalize_platform(platform)
|
|
where = [
|
|
"place_natural_key IS NOT NULL",
|
|
"place_natural_key <> ''",
|
|
]
|
|
params: list[Any] = []
|
|
if plat:
|
|
where.append("platform = %s")
|
|
params.append(plat)
|
|
if like:
|
|
where.append(
|
|
"(entity_name ILIKE %s OR keyword ILIKE %s OR "
|
|
"title ILIKE %s OR content ILIKE %s)"
|
|
)
|
|
params.extend([like, like, like, like])
|
|
where_sql = " AND ".join(where)
|
|
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT count(*) AS total
|
|
FROM (
|
|
SELECT place_natural_key
|
|
FROM {s}.social_evidence
|
|
WHERE {where_sql}
|
|
GROUP BY place_natural_key
|
|
) x""",
|
|
params,
|
|
)
|
|
total = _row_int(await cur.fetchone(), "total")
|
|
|
|
await cur.execute(
|
|
f"""WITH filtered AS (
|
|
SELECT *
|
|
FROM {s}.social_evidence
|
|
WHERE {where_sql}
|
|
),
|
|
grouped AS (
|
|
SELECT place_natural_key,
|
|
max(entity_name) FILTER (
|
|
WHERE entity_name IS NOT NULL AND entity_name <> ''
|
|
) AS evidence_entity_name,
|
|
count(*) AS evidence_count,
|
|
max(captured_at) AS latest_captured_at
|
|
FROM filtered
|
|
GROUP BY place_natural_key
|
|
),
|
|
platform_counts AS (
|
|
SELECT place_natural_key,
|
|
jsonb_object_agg(platform, cnt ORDER BY platform) AS platform_counts
|
|
FROM (
|
|
SELECT place_natural_key, platform, count(*) AS cnt
|
|
FROM filtered
|
|
GROUP BY place_natural_key, platform
|
|
) p
|
|
GROUP BY place_natural_key
|
|
)
|
|
SELECT g.place_natural_key, g.evidence_entity_name,
|
|
g.evidence_count, g.latest_captured_at,
|
|
coalesce(pc.platform_counts, '{{}}'::jsonb) AS platform_counts
|
|
FROM grouped g
|
|
LEFT JOIN platform_counts pc USING (place_natural_key)
|
|
ORDER BY g.evidence_count DESC, g.place_natural_key
|
|
LIMIT %s OFFSET %s""",
|
|
[*params, limit, offset],
|
|
)
|
|
return await cur.fetchall(), total
|
|
|
|
|
|
def _graph_places(keys: list[str]) -> dict[str, dict[str, Any]]:
|
|
if not keys:
|
|
return {}
|
|
g = _get_graph()
|
|
result = g.query(
|
|
"MATCH (p:Place) WHERE p.element_id IN $keys RETURN p",
|
|
{"keys": keys},
|
|
)
|
|
out: dict[str, dict[str, Any]] = {}
|
|
for row in result.result_set:
|
|
props = _node_props(row[0])
|
|
key = str(props.get("element_id") or "")
|
|
if key:
|
|
out[key] = props
|
|
return out
|
|
|
|
|
|
def _graph_tags(keys: list[str]) -> dict[str, list[dict[str, str]]]:
|
|
if not keys:
|
|
return {}
|
|
g = _get_graph()
|
|
result = g.query(
|
|
"MATCH (p:Place)-[:HAS_TAG]->(t:ExperienceTag) "
|
|
"WHERE p.element_id IN $keys "
|
|
"RETURN p.element_id, t.name, coalesce(t.source,'seed')",
|
|
{"keys": keys},
|
|
)
|
|
out: dict[str, list[dict[str, str]]] = {key: [] for key in keys}
|
|
seen: set[tuple[str, str, str]] = set()
|
|
for row in result.result_set:
|
|
key = str(row[0] or "")
|
|
name = str(row[1] or "")
|
|
source = _tag_source_label(str(row[2] or "seed"))
|
|
sig = (key, name, source)
|
|
if key and name and sig not in seen:
|
|
seen.add(sig)
|
|
out.setdefault(key, []).append({"name": name, "source": source})
|
|
return out
|
|
|
|
|
|
def _place_from_graph_or_evidence(
|
|
key: str,
|
|
graph: dict[str, Any] | None,
|
|
row: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
graph = graph or {}
|
|
row = row or {}
|
|
return {
|
|
"element_id": key,
|
|
"name": graph.get("name") or row.get("evidence_entity_name") or key,
|
|
"place_type": graph.get("place_type") or "",
|
|
"station_type": graph.get("station_type") or "",
|
|
"district": graph.get("district") or graph.get("area_name") or "",
|
|
"address": graph.get("address") or "",
|
|
"location": graph.get("location") or "",
|
|
"longitude": graph.get("longitude") or graph.get("lng"),
|
|
"latitude": graph.get("latitude") or graph.get("lat"),
|
|
"tel": graph.get("tel") or graph.get("phone") or "",
|
|
"rating": graph.get("rating") or graph.get("biz_ext_rating") or "",
|
|
"raw_graph": graph,
|
|
}
|
|
|
|
|
|
@router.get("/evidence/poi-quality")
|
|
async def list_poi_quality(
|
|
keyword: str | None = None,
|
|
platform: str | None = None,
|
|
limit: int = Query(50, ge=1, le=100),
|
|
offset: int = Query(0, ge=0),
|
|
_user: CurrentUser = None,
|
|
):
|
|
"""List POIs with social evidence and the tags written back to the graph."""
|
|
try:
|
|
rows, total = await _evidence_summary(
|
|
keyword=keyword,
|
|
platform=platform,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
keys = [str(r["place_natural_key"]) for r in rows]
|
|
places = _graph_places(keys)
|
|
tags = _graph_tags(keys)
|
|
items = []
|
|
for r in rows:
|
|
key = str(r["place_natural_key"])
|
|
tag_list = tags.get(key, [])
|
|
tag_sources = sorted({t["source"] for t in tag_list if t.get("source")})
|
|
items.append({
|
|
**_place_from_graph_or_evidence(key, places.get(key), r),
|
|
"evidence_count": _row_int(r, "evidence_count"),
|
|
"platform_counts": r.get("platform_counts") or {},
|
|
"latest_captured_at": r.get("latest_captured_at"),
|
|
"tags": tag_list[:16],
|
|
"tag_count": len(tag_list),
|
|
"tag_sources": tag_sources,
|
|
})
|
|
return {
|
|
"items": items,
|
|
"total": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"platform": _normalize_platform(platform) or "all",
|
|
}
|
|
except Exception as exc:
|
|
raise HTTPException(500, f"Failed to load POI quality data: {str(exc)[:300]}")
|
|
|
|
|
|
@router.get("/evidence/poi-quality/{place_key:path}")
|
|
async def get_poi_quality(
|
|
place_key: str,
|
|
platform: str | None = None,
|
|
limit: int = Query(80, ge=1, le=300),
|
|
_user: CurrentUser = None,
|
|
):
|
|
"""Return one POI, graph tags, and raw social evidence rows."""
|
|
key = place_key.strip()
|
|
if not key:
|
|
raise HTTPException(400, "place_key is required")
|
|
|
|
try:
|
|
places = _graph_places([key])
|
|
tag_map = _graph_tags([key])
|
|
plat = _normalize_platform(platform)
|
|
s = settings.db_schema
|
|
where = ["place_natural_key = %s"]
|
|
params: list[Any] = [key]
|
|
if plat:
|
|
where.append("platform = %s")
|
|
params.append(plat)
|
|
where_sql = " AND ".join(where)
|
|
|
|
async with get_conn() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(
|
|
f"""SELECT platform, count(*) AS count
|
|
FROM {s}.social_evidence
|
|
WHERE {where_sql}
|
|
GROUP BY platform
|
|
ORDER BY platform""",
|
|
params,
|
|
)
|
|
platform_counts = {
|
|
str(r["platform"]): _row_int(r, "count")
|
|
for r in await cur.fetchall()
|
|
}
|
|
await cur.execute(
|
|
f"""SELECT id, platform, kind, source_id, url, entity_name,
|
|
place_natural_key, keyword, title, content, author,
|
|
author_id, likes, comments, collects, shares,
|
|
publish_time, location, tags, image_urls, raw_jsonb,
|
|
captured_at
|
|
FROM {s}.social_evidence
|
|
WHERE {where_sql}
|
|
ORDER BY likes DESC NULLS LAST,
|
|
comments DESC NULLS LAST,
|
|
captured_at DESC
|
|
LIMIT %s""",
|
|
[*params, limit],
|
|
)
|
|
evidence = await cur.fetchall()
|
|
|
|
return {
|
|
"place": _place_from_graph_or_evidence(key, places.get(key)),
|
|
"tags": tag_map.get(key, []),
|
|
"tag_count": len(tag_map.get(key, [])),
|
|
"platform_counts": platform_counts,
|
|
"evidence": evidence,
|
|
"evidence_count": sum(platform_counts.values()),
|
|
}
|
|
except Exception as exc:
|
|
raise HTTPException(500, f"Failed to load POI evidence: {str(exc)[:300]}")
|