"""Super Agent P1 — ingest 高德 POIs into BOTH stores, idempotent. High-trust deterministic gate (高德 = official structured source): valid(name + lat/lng in Guizhou + has address/rating) → approved → write FalkorDB Place + PG candidate_entities else → pending_review (PG only, await audit/human) Re-runnable: FalkorDB MERGE by element_id; PG pre-check by natural_key. """ from __future__ import annotations from app.api.graph import _get_graph from app.config import settings from app.db import get_conn from app.agents.gaode_connector import search_pois # poi_type → place_type bucket (overview groups by this) _PT = {"美食": "eat", "景点": "sight", "酒店": "hotel", "商场": "mall"} # generous Guizhou bbox sanity check _LNG = (103.0, 110.5) _LAT = (24.0, 29.8) def _valid(r: dict) -> bool: try: lng, lat = float(r["lng"]), float(r["lat"]) except (TypeError, ValueError): return False if not (_LNG[0] <= lng <= _LNG[1] and _LAT[0] <= lat <= _LAT[1]): return False if not r.get("name"): return False return bool(r.get("address") or r.get("rating")) async def ingest_gaode(poi_type: str | None, keyword: str | None, max_pages: int = 2, limit: int = 60) -> dict: rows = search_pois(poi_type=poi_type, keyword=keyword, max_pages=max_pages, limit=limit) return await ingest_rows(rows, poi_type or keyword, poi_type) async def ingest_rows(rows: list[dict], label: str | None, poi_type: str | None) -> dict: """去重 + 双写(PG candidate_entities + FalkorDB),幂等。 label → 批次名/标签(关键词或大类,仅用于归档) poi_type → 决定 place_type 桶(与 _PT 对齐) """ if not rows: return {"fetched": 0, "approved": 0, "pending": 0, "skipped": 0} s = settings.db_schema T, P = settings.default_tenant, settings.default_project g = _get_graph() pt = _PT.get(poi_type or "", "poi") async with get_conn() as conn: async with conn.cursor() as cur: # batch (idempotent by file_hash sentinel) tag = f"amap_poi:{label}" await cur.execute( f"SELECT id FROM {s}.import_batches WHERE tenant_id=%s AND file_hash=%s", (T, tag)) b = await cur.fetchone() if b: batch_id = b["id"] else: await cur.execute( f"""INSERT INTO {s}.import_batches (tenant_id,project_id,graph_name,template_id,file_name, file_hash,status,total_rows,success_rows) VALUES (%s,%s,%s,'amap_poi',%s,%s,'approved',%s,%s) RETURNING id""", (T, P, settings.falkordb_graph, f"高德-{label}", tag, len(rows), len(rows))) batch_id = (await cur.fetchone())["id"] await cur.execute( f"""SELECT natural_key FROM {s}.candidate_entities WHERE tenant_id=%s AND project_id=%s AND natural_key LIKE 'amap:%%'""", (T, P)) seen = {r["natural_key"] for r in await cur.fetchall()} appr = pend = skip = 0 import json as _j for r in rows: nk = "amap:" + str(r["gaode_poi_id"]) if not r["gaode_poi_id"] or nk in seen: skip += 1 continue seen.add(nk) ok = _valid(r) status = "approved" if ok else "pending_review" payload = {**r, "place_type": pt, "confidence": 0.9 if ok else 0.5} await cur.execute( f"""INSERT INTO {s}.candidate_entities (tenant_id,project_id,batch_id,template_id,entity_type, natural_key,display_name,payload_jsonb,confidence,status,reviewed_by) VALUES (%s,%s,%s,'amap_poi','Place',%s,%s,%s,%s,%s,'super_agent')""", (T, P, batch_id, nk, r["name"], _j.dumps(payload, ensure_ascii=False), 0.9 if ok else 0.5, status)) if ok: g.query( "MERGE (p:Place {element_id:$eid}) SET p.place_id=$eid," "p.name=$nm,p.lat=$lat,p.lng=$lng,p.place_type=$pt," "p.category=$cat,p.address=$ad,p.district=$di,p.tel=$tel," "p.rating=$rt,p.cost=$co,p.open_time=$ot,p.source='amap'," "p.photo_urls=$ph", {"eid": nk, "nm": r["name"], "lat": float(r["lat"]), "lng": float(r["lng"]), "pt": pt, "cat": label, "ad": r.get("address", ""), "di": r.get("district", ""), "tel": r.get("tel", ""), "rt": str(r.get("rating", "")), "co": str(r.get("cost", "")), "ot": r.get("open_time", ""), "ph": ",".join(r.get("photo_urls", []))[:500]}) appr += 1 else: pend += 1 await conn.commit() return {"fetched": len(rows), "approved": appr, "pending": pend, "skipped": skip}