Files
bxh/scripts/enrich_travel_poi_with_amap.py

568 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Enrich travel POI CSVs with AMap Web API data.
Outputs are written next to the delivery CSVs and source files are not changed.
"""
from __future__ import annotations
import csv
import hashlib
import json
import math
import ssl
import sys
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
from common_paths import ENV_PATH, TRAVEL_DELIVERY_ROOT
BASE_DIR = TRAVEL_DELIVERY_ROOT
OUT_DIR = BASE_DIR / "amap_enriched"
CACHE_PATH = OUT_DIR / "_amap_cache.json"
SCENIC_TYPES = "110000"
HOTEL_TYPES = "100000"
RESTAURANT_TYPES = "050000"
def read_env_key(path: Path, key: str) -> str:
if not path.exists():
return ""
for line in path.read_text(errors="ignore").splitlines():
s = line.strip()
if not s or s.startswith("#") or "=" not in s:
continue
k, v = s.split("=", 1)
if k.strip() == key:
return v.strip().strip('"').strip("'")
return ""
def read_csv(path: Path) -> tuple[list[str], list[dict[str, str]]]:
with path.open("r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
return list(reader.fieldnames or []), list(reader)
def write_csv(path: Path, rows: list[dict[str, Any]], preferred: list[str] | None = None) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
keys: list[str] = []
for k in preferred or []:
if k not in keys:
keys.append(k)
for row in rows:
for k in row.keys():
if k not in keys:
keys.append(k)
with path.open("w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=keys, extrasaction="ignore")
writer.writeheader()
for row in rows:
writer.writerow({k: row.get(k, "") for k in keys})
def load_cache() -> dict[str, Any]:
if CACHE_PATH.exists():
return json.loads(CACHE_PATH.read_text(encoding="utf-8"))
return {}
def save_cache(cache: dict[str, Any]) -> None:
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(json.dumps(cache, ensure_ascii=False, indent=2), encoding="utf-8")
class AMapClient:
def __init__(self, key: str, cache: dict[str, Any]) -> None:
self.key = key
self.cache = cache
self.ctx = ssl._create_unverified_context()
self.calls = 0
self.errors: list[str] = []
def get(self, endpoint: str, params: dict[str, Any], retries: int = 3) -> dict[str, Any]:
full_params = {**params, "key": self.key}
cache_params = {k: v for k, v in full_params.items() if k != "key"}
cache_key = endpoint + "?" + urllib.parse.urlencode(sorted(cache_params.items()), doseq=True)
digest = hashlib.sha1(cache_key.encode()).hexdigest()
if digest in self.cache:
return self.cache[digest]
url = "https://restapi.amap.com" + endpoint + "?" + urllib.parse.urlencode(full_params)
last_error = ""
for attempt in range(retries):
try:
with urllib.request.urlopen(url, timeout=20, context=self.ctx) as resp:
data = json.loads(resp.read().decode("utf-8"))
self.calls += 1
if data.get("status") == "1":
self.cache[digest] = data
if self.calls % 20 == 0:
save_cache(self.cache)
time.sleep(0.06)
return data
last_error = f"{data.get('infocode')} {data.get('info')}"
if "CUQPS" in last_error or "QPS" in last_error:
time.sleep(1.5 + attempt)
continue
break
except Exception as exc: # noqa: BLE001
last_error = str(exc)
time.sleep(0.8 + attempt)
self.errors.append(f"{endpoint} {cache_params} -> {last_error}")
return {"status": "0", "info": last_error, "pois": []}
def place_text(self, keywords: str, city: str = "", types: str = "", offset: int = 20) -> list[dict[str, Any]]:
params: dict[str, Any] = {
"keywords": keywords,
"offset": offset,
"page": 1,
"extensions": "all",
"children": 1,
}
if city:
params["city"] = city
if types:
params["types"] = types
data = self.get("/v3/place/text", params)
return data.get("pois") or []
def place_around(self, location: str, types: str, radius: int, offset: int = 25) -> list[dict[str, Any]]:
params = {
"location": location,
"types": types,
"radius": radius,
"offset": offset,
"page": 1,
"extensions": "all",
"sortrule": "distance",
}
data = self.get("/v3/place/around", params)
return data.get("pois") or []
def driving(self, origin: str, destination: str) -> dict[str, str]:
data = self.get(
"/v3/direction/driving",
{"origin": origin, "destination": destination, "extensions": "base", "strategy": 0},
)
paths = (data.get("route") or {}).get("paths") or []
if not paths:
return {"drive_status": data.get("info") or "NO_ROUTE"}
p = paths[0]
distance_m = to_float(p.get("distance"))
duration_s = to_float(p.get("duration"))
return {
"drive_status": "OK",
"drive_distance_m": int(distance_m) if distance_m is not None else "",
"drive_distance_km": round(distance_m / 1000, 2) if distance_m is not None else "",
"drive_duration_s": int(duration_s) if duration_s is not None else "",
"drive_duration_min": round(duration_s / 60, 1) if duration_s is not None else "",
"drive_tolls": p.get("tolls", ""),
"drive_traffic_lights": p.get("traffic_lights", ""),
}
def to_float(value: Any) -> float | None:
try:
if value in ("", None, []):
return None
return float(value)
except Exception:
return None
def norm_name(value: str) -> str:
s = str(value or "")
for token in ["风景名胜区", "风景区", "旅游景区", "景区", "旅游区", "景点", "国家级", "贵州省"]:
s = s.replace(token, "")
return "".join(ch for ch in s if ch.isalnum() or "\u4e00" <= ch <= "\u9fff").lower()
def poi_score(poi: dict[str, Any], target_name: str, city: str = "", district: str = "") -> int:
score = 0
p_name = str(poi.get("name") or "")
n1 = norm_name(target_name)
n2 = norm_name(p_name)
if n1 and n2:
if n1 == n2:
score += 120
elif n1 in n2 or n2 in n1:
score += 80
if "风景" in str(poi.get("type") or "") or "景点" in str(poi.get("type") or ""):
score += 25
if city and str(poi.get("cityname") or "").startswith(city[:2]):
score += 12
d_clean = district.split("/")[0].replace("", "").replace("", "")
if d_clean and d_clean in str(poi.get("adname") or ""):
score += 18
if poi.get("photos"):
score += 5
return score
def select_best_poi(pois: list[dict[str, Any]], name: str, city: str = "", district: str = "") -> dict[str, Any] | None:
if not pois:
return None
return sorted(pois, key=lambda p: poi_score(p, name, city, district), reverse=True)[0]
def parse_location(location: str) -> tuple[str, str]:
if not location or "," not in location:
return "", ""
lng, lat = location.split(",", 1)
return lng.strip(), lat.strip()
def photo_urls(poi: dict[str, Any]) -> list[str]:
urls = []
for item in poi.get("photos") or []:
url = str(item.get("url") or "").strip()
if url and url not in urls:
urls.append(url)
return urls
def amap_marker_url(lng: str, lat: str, name: str) -> str:
if not lng or not lat:
return ""
return (
"https://uri.amap.com/marker?"
+ urllib.parse.urlencode(
{
"position": f"{lng},{lat}",
"name": name,
"src": "znkg",
"coordinate": "gaode",
"callnative": "0",
}
)
)
def poi_common_fields(poi: dict[str, Any]) -> dict[str, Any]:
lng, lat = parse_location(str(poi.get("location") or ""))
photos = photo_urls(poi)
name = str(poi.get("name") or "")
return {
"amap_name": name,
"amap_poi_id": poi.get("id", ""),
"amap_type": poi.get("type", ""),
"amap_typecode": poi.get("typecode", ""),
"province": poi.get("pname", ""),
"city": poi.get("cityname", ""),
"district": poi.get("adname", ""),
"adcode": poi.get("adcode", ""),
"town": poi.get("townname", ""),
"business_area": poi.get("business_area", ""),
"formatted_address": poi.get("address", ""),
"geo_lng": lng,
"geo_lat": lat,
"tel": poi.get("tel", ""),
"first_image_url": photos[0] if photos else "",
"all_image_urls": "|".join(photos),
"image_count": len(photos),
"amap_url": amap_marker_url(lng, lat, name),
}
def scenic_enrich(client: AMapClient, rows: list[dict[str, str]]) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for idx, row in enumerate(rows, 1):
name = row.get("name", "")
city = row.get("city", "")
district = row.get("district", "")
queries = [
(name, SCENIC_TYPES),
(row.get("amap_search_keyword", ""), SCENIC_TYPES),
(name, ""),
]
best = None
for keyword, types in queries:
if not keyword:
continue
pois = client.place_text(keyword, city=city, types=types, offset=20)
best = select_best_poi(pois, name, city, district)
if best:
break
enriched = dict(row)
if best:
common = poi_common_fields(best)
enriched.update(common)
enriched["has_geo"] = bool(common.get("geo_lng") and common.get("geo_lat"))
enriched["amap_match_status"] = "matched"
enriched["amap_match_score"] = poi_score(best, name, city, district)
if row.get("first_image_url") and not common.get("first_image_url"):
enriched["first_image_url"] = row.get("first_image_url")
enriched["all_image_urls"] = row.get("all_image_urls", "")
enriched["image_count"] = row.get("image_count", "")
else:
enriched["amap_match_status"] = "not_found"
out.append(enriched)
print(f"[scenic] {idx}/{len(rows)} {name} -> {enriched.get('amap_name','')}", flush=True)
return out
def enrich_existing_pois(
client: AMapClient,
rows: list[dict[str, str]],
*,
name_field: str,
id_prefix: str,
types: str,
) -> list[dict[str, Any]]:
out = []
for idx, row in enumerate(rows, 1):
name = row.get(name_field, "")
city = row.get("city") or row.get("expected_city") or ""
keyword = row.get("amap_search_keyword") or f"{city} {name}"
pois = client.place_text(keyword, city=city, types=types, offset=20)
best = select_best_poi(pois, name, city, row.get("district", ""))
enriched = dict(row)
if best:
common = poi_common_fields(best)
enriched.update(common)
enriched["source"] = (row.get("source") or "source_csv") + "+amap_text"
enriched["amap_match_status"] = "matched"
enriched["amap_match_score"] = poi_score(best, name, city, row.get("district", ""))
enriched[f"{id_prefix.lower()}_id"] = f"{id_prefix}_{best.get('id')}"
else:
enriched["amap_match_status"] = "not_found"
enriched[f"{id_prefix.lower()}_id"] = f"{id_prefix}_UNMATCHED_{idx:04d}"
out.append(enriched)
if idx % 10 == 0 or idx == len(rows):
print(f"[{id_prefix.lower()}] {idx}/{len(rows)}", flush=True)
return out
def dedupe_by_poi_id(rows: list[dict[str, Any]], name_key: str) -> list[dict[str, Any]]:
seen: set[str] = set()
out = []
for row in rows:
key = str(row.get("amap_poi_id") or row.get(name_key) or "")
if not key:
key = json.dumps(row, ensure_ascii=False, sort_keys=True)
if key in seen:
continue
seen.add(key)
out.append(row)
return out
def nearby_candidates(
client: AMapClient,
scenic_rows: list[dict[str, Any]],
*,
types: str,
kind: str,
target_count: int = 10,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
master: list[dict[str, Any]] = []
relation_rows: list[dict[str, Any]] = []
radii = [5000, 10000, 20000, 50000]
for sidx, scenic in enumerate(scenic_rows, 1):
scenic_id = scenic.get("id", "")
scenic_name = scenic.get("name", "")
lng = scenic.get("geo_lng", "")
lat = scenic.get("geo_lat", "")
if not lng or not lat:
print(f"[nearby:{kind}] skip no geo {scenic_name}", flush=True)
continue
loc = f"{lng},{lat}"
selected: list[dict[str, Any]] = []
selected_ids: set[str] = set()
for radius in radii:
pois = client.place_around(loc, types=types, radius=radius, offset=25)
for poi in pois:
pid = str(poi.get("id") or poi.get("name") or "")
if not pid or pid in selected_ids:
continue
selected_ids.add(pid)
selected.append(poi)
if len(selected) >= target_count:
break
if len(selected) >= target_count:
break
for rank, poi in enumerate(selected[:target_count], 1):
common = poi_common_fields(poi)
poi_name = common.get("amap_name", "")
row_id = f"{kind.upper()}_{common.get('amap_poi_id')}"
master_row = {
f"{kind}_id": row_id,
f"{kind}_name": poi_name,
"source": "amap_around",
**common,
}
master.append(master_row)
metric = client.driving(loc, f"{common.get('geo_lng')},{common.get('geo_lat')}")
rel = {
"scenic_id": scenic_id,
"scenic_name": scenic_name,
"scenic_lng": lng,
"scenic_lat": lat,
"resource_type": "Hotel" if kind == "hotel" else "Restaurant",
"resource_id": row_id,
"resource_name": poi_name,
"amap_poi_id": common.get("amap_poi_id", ""),
"resource_lng": common.get("geo_lng", ""),
"resource_lat": common.get("geo_lat", ""),
"rank_for_scenic": rank,
"amap_around_distance_m": poi.get("distance", ""),
**metric,
"province": common.get("province", ""),
"city": common.get("city", ""),
"district": common.get("district", ""),
"formatted_address": common.get("formatted_address", ""),
"amap_type": common.get("amap_type", ""),
"tel": common.get("tel", ""),
"first_image_url": common.get("first_image_url", ""),
"all_image_urls": common.get("all_image_urls", ""),
"amap_url": common.get("amap_url", ""),
}
relation_rows.append(rel)
print(f"[nearby:{kind}] {sidx}/{len(scenic_rows)} {scenic_name} -> {len(selected[:target_count])}", flush=True)
return dedupe_by_poi_id(master, f"{kind}_name"), relation_rows
def write_dictionary() -> None:
text = """# 高德 POI 补全字段字典
## 通用 POI 字段
| 字段 | 说明 |
|---|---|
| amap_poi_id | 高德 POI ID可作为外部数据来源唯一标识 |
| amap_name | 高德返回名称 |
| amap_type / amap_typecode | 高德行业分类与分类编码 |
| province / city / district / town / adcode | 高德行政区划字段 |
| formatted_address | 高德地址 |
| geo_lng / geo_lat | 高德 GCJ-02 坐标,经度/纬度 |
| tel | 高德电话字段,多个号码按高德原格式保留 |
| first_image_url | 高德照片第一张 URL |
| all_image_urls | 高德照片 URL 列表,使用 `|` 分隔 |
| image_count | 图片数量 |
| amap_url | 高德 URI marker 链接,可用于前端跳转地图 |
| amap_match_status | matched / not_found表示源表记录是否匹配到高德 POI |
| amap_match_score | 本地匹配打分,仅用于质检 |
## 景区附近资源关系字段
| 字段 | 说明 |
|---|---|
| scenic_id / scenic_name | 景区 ID 与名称 |
| resource_type | Hotel 或 Restaurant |
| resource_id / resource_name | 候选资源 ID 与名称 |
| rank_for_scenic | 该景区附近资源排序,从 1 开始 |
| amap_around_distance_m | 高德周边搜索返回距离,仅作参考 |
| drive_distance_m / drive_distance_km | 高德驾车路线距离,推荐排序优先使用 |
| drive_duration_s / drive_duration_min | 高德驾车时间,推荐排序优先使用 |
| drive_status | OK 表示驾车路线成功;其他值表示高德未返回可用路线 |
## 使用建议
- 图谱实体:景区使用 `ScenicAttraction`,酒店/餐饮后续可独立为 `Hotel` / `Restaurant` POI。
- 图谱关系:`ScenicAttraction -[:NEARBY]-> Hotel/Restaurant`,关系属性放 `drive_distance_km`、`drive_duration_min`、`rank_for_scenic`。
- 费用/门票/小交通仍放 `TravelItem`,不要和 Hotel/Restaurant POI 混在同一张表里。
"""
(OUT_DIR / "字段字典.md").write_text(text, encoding="utf-8")
def write_report(
scenic_rows: list[dict[str, Any]],
hotel_rows: list[dict[str, Any]],
restaurant_rows: list[dict[str, Any]],
scenic_hotels: list[dict[str, Any]],
scenic_restaurants: list[dict[str, Any]],
client: AMapClient,
) -> None:
def matched(rows: list[dict[str, Any]]) -> int:
return sum(1 for r in rows if r.get("amap_match_status") == "matched" or r.get("amap_poi_id"))
by_scenic: dict[str, dict[str, int]] = {}
for r in scenic_hotels:
by_scenic.setdefault(r["scenic_name"], {"hotel": 0, "restaurant": 0})["hotel"] += 1
for r in scenic_restaurants:
by_scenic.setdefault(r["scenic_name"], {"hotel": 0, "restaurant": 0})["restaurant"] += 1
lines = [
"# 高德 POI 补全报告",
"",
f"- 景区补全:{matched(scenic_rows)}/{len(scenic_rows)}",
f"- 原酒店表 POI 匹配:{matched(hotel_rows)}/{len(hotel_rows)}",
f"- 原餐饮表 POI 匹配:{matched(restaurant_rows)}/{len(restaurant_rows)}",
f"- 景区附近酒店关系:{len(scenic_hotels)}",
f"- 景区附近餐饮关系:{len(scenic_restaurants)}",
f"- 本次高德实际请求数:{client.calls}",
"",
"## 每个景区 nearby 覆盖",
"",
"| 景区 | 酒店候选 | 餐饮候选 |",
"|---|---:|---:|",
]
for name in sorted(by_scenic):
v = by_scenic[name]
lines.append(f"| {name} | {v.get('hotel', 0)} | {v.get('restaurant', 0)} |")
if client.errors:
lines.extend(["", "## API 异常/未返回", ""])
for err in client.errors[:80]:
lines.append(f"- {err}")
(OUT_DIR / "高德补全报告.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
key = read_env_key(ENV_PATH, "AMAP_API_KEY")
if not key:
print(f"缺少 AMAP_API_KEY: {ENV_PATH}", file=sys.stderr)
return 2
OUT_DIR.mkdir(parents=True, exist_ok=True)
cache = load_cache()
client = AMapClient(key, cache)
scenic_headers, scenic_rows_raw = read_csv(BASE_DIR / "scenic_for_amap.csv")
hotel_headers, hotel_rows_raw = read_csv(BASE_DIR / "hotel_poi.csv")
restaurant_headers, restaurant_rows_raw = read_csv(BASE_DIR / "restaurant_poi.csv")
scenic_rows = scenic_enrich(client, scenic_rows_raw)
write_csv(OUT_DIR / "scenic_for_amap_enriched.csv", scenic_rows, scenic_headers)
save_cache(cache)
hotel_rows = enrich_existing_pois(client, hotel_rows_raw, name_field="hotel_name", id_prefix="HOTEL", types=HOTEL_TYPES)
write_csv(OUT_DIR / "hotel_poi_enriched.csv", hotel_rows, hotel_headers)
save_cache(cache)
restaurant_rows = enrich_existing_pois(
client,
restaurant_rows_raw,
name_field="restaurant_name",
id_prefix="RESTAURANT",
types=RESTAURANT_TYPES,
)
write_csv(OUT_DIR / "restaurant_poi_enriched.csv", restaurant_rows, restaurant_headers)
save_cache(cache)
hotel_master, scenic_hotels = nearby_candidates(client, scenic_rows, types=HOTEL_TYPES, kind="hotel")
restaurant_master, scenic_restaurants = nearby_candidates(client, scenic_rows, types=RESTAURANT_TYPES, kind="restaurant")
# Include existing matched POIs in master output, then de-duplicate by AMap POI ID.
hotel_master = dedupe_by_poi_id(hotel_rows + hotel_master, "hotel_name")
restaurant_master = dedupe_by_poi_id(restaurant_rows + restaurant_master, "restaurant_name")
write_csv(OUT_DIR / "hotel_poi_amap_master.csv", hotel_master)
write_csv(OUT_DIR / "restaurant_poi_amap_master.csv", restaurant_master)
write_csv(OUT_DIR / "scenic_hotel_nearby_10.csv", scenic_hotels)
write_csv(OUT_DIR / "scenic_restaurant_nearby_10.csv", scenic_restaurants)
write_csv(OUT_DIR / "scenic_resource_drive_metrics.csv", scenic_hotels + scenic_restaurants)
write_dictionary()
write_report(scenic_rows, hotel_rows, restaurant_rows, scenic_hotels, scenic_restaurants, client)
save_cache(cache)
print(f"done: {OUT_DIR}")
return 0
if __name__ == "__main__":
raise SystemExit(main())