121 lines
5.2 KiB
Python
121 lines
5.2 KiB
Python
"""Super Agent P1 — ingest 高德 POIs into BOTH stores, idempotent.
|
||
|
||
High-trust deterministic gate (高德 = official structured source):
|
||
valid(name + lat/lng in Guizhou + has address/rating) → approved → write
|
||
FalkorDB Place + PG candidate_entities
|
||
else → pending_review (PG only, await audit/human)
|
||
Re-runnable: FalkorDB MERGE by element_id; PG pre-check by natural_key.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from app.api.graph import _get_graph
|
||
from app.config import settings
|
||
from app.db import get_conn
|
||
from app.agents.gaode_connector import search_pois
|
||
|
||
# poi_type → place_type bucket (overview groups by this)
|
||
_PT = {"美食": "eat", "景点": "sight", "酒店": "hotel", "商场": "mall"}
|
||
# generous Guizhou bbox sanity check
|
||
_LNG = (103.0, 110.5)
|
||
_LAT = (24.0, 29.8)
|
||
|
||
|
||
def _valid(r: dict) -> bool:
|
||
try:
|
||
lng, lat = float(r["lng"]), float(r["lat"])
|
||
except (TypeError, ValueError):
|
||
return False
|
||
if not (_LNG[0] <= lng <= _LNG[1] and _LAT[0] <= lat <= _LAT[1]):
|
||
return False
|
||
if not r.get("name"):
|
||
return False
|
||
return bool(r.get("address") or r.get("rating"))
|
||
|
||
|
||
async def ingest_gaode(poi_type: str | None, keyword: str | None,
|
||
max_pages: int = 2, limit: int = 60) -> dict:
|
||
rows = search_pois(poi_type=poi_type, keyword=keyword,
|
||
max_pages=max_pages, limit=limit)
|
||
return await ingest_rows(rows, poi_type or keyword, poi_type)
|
||
|
||
|
||
async def ingest_rows(rows: list[dict], label: str | None,
|
||
poi_type: str | None) -> dict:
|
||
"""去重 + 双写(PG candidate_entities + FalkorDB),幂等。
|
||
|
||
label → 批次名/标签(关键词或大类,仅用于归档)
|
||
poi_type → 决定 place_type 桶(与 _PT 对齐)
|
||
"""
|
||
if not rows:
|
||
return {"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}
|
||
|
||
s = settings.db_schema
|
||
T, P = settings.default_tenant, settings.default_project
|
||
g = _get_graph()
|
||
pt = _PT.get(poi_type or "", "poi")
|
||
|
||
async with get_conn() as conn:
|
||
async with conn.cursor() as cur:
|
||
# batch (idempotent by file_hash sentinel)
|
||
tag = f"amap_poi:{label}"
|
||
await cur.execute(
|
||
f"SELECT id FROM {s}.import_batches WHERE tenant_id=%s AND file_hash=%s",
|
||
(T, tag))
|
||
b = await cur.fetchone()
|
||
if b:
|
||
batch_id = b["id"]
|
||
else:
|
||
await cur.execute(
|
||
f"""INSERT INTO {s}.import_batches
|
||
(tenant_id,project_id,graph_name,template_id,file_name,
|
||
file_hash,status,total_rows,success_rows)
|
||
VALUES (%s,%s,%s,'amap_poi',%s,%s,'approved',%s,%s)
|
||
RETURNING id""",
|
||
(T, P, settings.falkordb_graph, f"高德-{label}",
|
||
tag, len(rows), len(rows)))
|
||
batch_id = (await cur.fetchone())["id"]
|
||
|
||
await cur.execute(
|
||
f"""SELECT natural_key FROM {s}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s AND natural_key LIKE 'amap:%%'""",
|
||
(T, P))
|
||
seen = {r["natural_key"] for r in await cur.fetchall()}
|
||
|
||
appr = pend = skip = 0
|
||
import json as _j
|
||
for r in rows:
|
||
nk = "amap:" + str(r["gaode_poi_id"])
|
||
if not r["gaode_poi_id"] or nk in seen:
|
||
skip += 1
|
||
continue
|
||
seen.add(nk)
|
||
ok = _valid(r)
|
||
status = "approved" if ok else "pending_review"
|
||
payload = {**r, "place_type": pt, "confidence": 0.9 if ok else 0.5}
|
||
await cur.execute(
|
||
f"""INSERT INTO {s}.candidate_entities
|
||
(tenant_id,project_id,batch_id,template_id,entity_type,
|
||
natural_key,display_name,payload_jsonb,confidence,status,reviewed_by)
|
||
VALUES (%s,%s,%s,'amap_poi','Place',%s,%s,%s,%s,%s,'super_agent')""",
|
||
(T, P, batch_id, nk, r["name"],
|
||
_j.dumps(payload, ensure_ascii=False),
|
||
0.9 if ok else 0.5, status))
|
||
if ok:
|
||
g.query(
|
||
"MERGE (p:Place {element_id:$eid}) SET p.place_id=$eid,"
|
||
"p.name=$nm,p.lat=$lat,p.lng=$lng,p.place_type=$pt,"
|
||
"p.category=$cat,p.address=$ad,p.district=$di,p.tel=$tel,"
|
||
"p.rating=$rt,p.cost=$co,p.open_time=$ot,p.source='amap',"
|
||
"p.photo_urls=$ph",
|
||
{"eid": nk, "nm": r["name"], "lat": float(r["lat"]),
|
||
"lng": float(r["lng"]), "pt": pt, "cat": label,
|
||
"ad": r.get("address", ""), "di": r.get("district", ""),
|
||
"tel": r.get("tel", ""), "rt": str(r.get("rating", "")),
|
||
"co": str(r.get("cost", "")), "ot": r.get("open_time", ""),
|
||
"ph": ",".join(r.get("photo_urls", []))[:500]})
|
||
appr += 1
|
||
else:
|
||
pend += 1
|
||
await conn.commit()
|
||
return {"fetched": len(rows), "approved": appr, "pending": pend, "skipped": skip}
|