Files
bxh/app/agents/gaode_connector.py

180 lines
5.5 KiB
Python

"""高德(Amap) connector for the Super Agent.
Reuses the user's existing crawler functions (search_poi / format_fields)
from the external project, WITHOUT image download / CSV / JSON side effects
(those live only in crawl()/__main__, not in the functions we import).
Deterministic, legal, structured — the cheap "fetch" tool the Super Agent
calls; the expensive LLM is only used for planning, not here.
"""
from __future__ import annotations
import importlib.util
import os
from pathlib import Path
from typing import Any
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
_PROJECT_ROOT = Path(__file__).resolve().parents[2]
_CRAWL_PATH = os.getenv("GAODE_CRAWLER_PATH", str(_PROJECT_ROOT / "scripts" / "crawl_guiyan.py"))
_mod: Any = None
# 高德官方一级 POI 类型编码(按 type code 网格扫描,不靠热度关键词)
AMAP_TYPECODES = {
"景点": "110000", # 风景名胜
"美食": "050000", # 餐饮服务
"酒店": "100000", # 住宿服务
"商场": "060000", # 购物服务
}
def _crawl():
global _mod
if _mod is not None:
return _mod
if not os.path.exists(_CRAWL_PATH):
raise FileNotFoundError(f"高德采集脚本不存在: {_CRAWL_PATH}")
spec = importlib.util.spec_from_file_location("crawl_guiyan", _CRAWL_PATH)
m = importlib.util.module_from_spec(spec)
spec.loader.exec_module(m) # top-level = defs/CONFIG only (guarded __main__)
_mod = m
return m
def available_types() -> list[str]:
try:
return list(_crawl().SEARCH_KEYWORDS.keys())
except Exception:
return ["景点", "美食", "酒店", "商场"]
def _normalize(f: dict) -> dict:
photos = [u for u in (f.get("门店图片") or "").split(" | ") if u]
return {
"gaode_poi_id": f.get("高德POI_ID", ""),
"name": f.get("名称", ""),
"type": f.get("类型", ""),
"typecode": f.get("类型编码", ""),
"lng": f.get("经度", ""),
"lat": f.get("纬度", ""),
"address": f.get("详细地址", ""),
"province": f.get("省份", ""),
"city": f.get("城市", ""),
"district": f.get("区县", ""),
"business_area": f.get("商圈", ""),
"tel": f.get("联系电话", ""),
"open_time": f.get("营业时间", ""),
"rating": f.get("评分", ""),
"cost": f.get("人均消费", ""),
"level": f.get("等级", ""),
"tags": f.get("标签", ""),
"photo_urls": photos, # URL only, no download
"source": "amap",
"keyword": f.get("采集关键词", ""),
}
def search_pois(
poi_type: str | None = None,
keyword: str | None = None,
max_pages: int = 2,
limit: int = 60,
) -> list[dict]:
"""Fetch + normalize POIs from 高德. No file/image side effects.
Only rows with valid lng/lat are returned (KG requires coordinates).
"""
m = _crawl()
if keyword:
kws = [keyword]
elif poi_type:
kws = m.SEARCH_KEYWORDS.get(poi_type, [poi_type])
else:
kws = []
out: list[dict] = []
seen: set[str] = set()
for kw in kws:
for page in range(1, max_pages + 1):
res = m.search_poi(kw, page)
if not res or res.get("status") != "1":
break
pois = res.get("pois") or []
if not pois:
break
for p in pois:
try:
f = m.format_fields(p, len(out) + 1, kw)
except Exception:
continue
pid = f.get("高德POI_ID")
if not pid or pid in seen:
continue
n = _normalize(f)
if not (n["lng"] and n["lat"]):
continue
seen.add(pid)
out.append(n)
if len(out) >= limit:
return out
return out
def _amap_key() -> str:
return _crawl().CONFIG["key"]
def search_polygon(
typecode: str,
bbox: tuple[float, float, float, float],
page: int = 1,
offset: int = 25,
) -> tuple[list[dict], int]:
"""高德矩形(多边形)搜索 —— 按地理网格系统扫全城,非热度排序。
bbox = (min_lng, min_lat, max_lng, max_lat)
返回 (归一化后的行, 本页高德原始返回条数)。
原始条数 < offset 视为该网格该类已扫尽。
"""
mnlng, mnlat, mxlng, mxlat = bbox
# 多边形矩形:左上 | 右下
polygon = f"{mnlng:.6f},{mxlat:.6f}|{mxlng:.6f},{mnlat:.6f}"
params = {
"key": _amap_key(),
"polygon": polygon,
"types": typecode,
"offset": offset,
"page": page,
"extensions": "all",
"output": "json",
}
headers = {"User-Agent": "Mozilla/5.0"}
for attempt in range(3):
try:
r = requests.get(
"https://restapi.amap.com/v3/place/polygon",
params=params, headers=headers, timeout=30, verify=False)
j = r.json()
break
except Exception:
if attempt == 2:
return [], 0
continue
if j.get("status") != "1":
return [], 0
pois = j.get("pois") or []
m = _crawl()
out: list[dict] = []
for i, p in enumerate(pois):
try:
f = m.format_fields(p, i + 1, "grid")
except Exception:
continue
n = _normalize(f)
if n["lng"] and n["lat"]:
out.append(n)
return out, len(pois)