353 lines
13 KiB
Python
353 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Fetch AMap driving distance metrics for travel_graph route/resource pairs.
|
|
|
|
The graph stores only cooperative business resources. This script does not add
|
|
online hotels or restaurants; it enriches existing attraction-to-attraction and
|
|
same-region attraction-to-resource pairs with driving distance/duration.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import importlib.util
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import requests
|
|
import urllib3
|
|
|
|
from common_paths import GAODE_CRAWLER_PATH, PROJECT_ROOT, TRAVEL_KG_EXPORT_ROOT
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
BUILD_SCRIPT = PROJECT_ROOT / "scripts/build_travel_graph_existing_product_project.py"
|
|
OUT_DIR = TRAVEL_KG_EXPORT_ROOT / "travel_graph_旅行社线路制定"
|
|
NODES_PATH = OUT_DIR / "抽取结果_nodes.json"
|
|
CACHE_PATH = OUT_DIR / "amap_driving_distance_cache.json"
|
|
REPORT_CSV = OUT_DIR / "amap_driving_distance_report.csv"
|
|
|
|
AMAP_DISTANCE_URL = "https://restapi.amap.com/v3/distance"
|
|
MAX_ORIGINS_PER_REQUEST = 80
|
|
|
|
|
|
def load_build_module():
|
|
spec = importlib.util.spec_from_file_location("travel_build", BUILD_SCRIPT)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
assert spec.loader
|
|
spec.loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
b = load_build_module()
|
|
|
|
|
|
def load_key() -> str:
|
|
for key in (os.environ.get("AMAP_WEB_KEY"), os.environ.get("AMAP_KEY")):
|
|
if key:
|
|
return key
|
|
crawl_path = GAODE_CRAWLER_PATH
|
|
if crawl_path.exists():
|
|
spec = importlib.util.spec_from_file_location("crawl_guiyan", crawl_path)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
assert spec.loader
|
|
spec.loader.exec_module(mod)
|
|
key = (getattr(mod, "CONFIG", {}) or {}).get("key")
|
|
if key:
|
|
return key
|
|
raise RuntimeError("未找到可用的高德 Web 服务 Key。请配置 AMAP_WEB_KEY 或保留 crawl_guiyan.py 中的 CONFIG['key']。")
|
|
|
|
|
|
def clean(value: Any) -> str:
|
|
return b.compact(value)
|
|
|
|
|
|
def number(value: Any) -> float | None:
|
|
return b.number(value)
|
|
|
|
|
|
def node_location(node: dict[str, Any]) -> tuple[float, float] | None:
|
|
lng = number(node.get("location_lng") or node.get("amap_lng"))
|
|
lat = number(node.get("location_lat") or node.get("amap_lat"))
|
|
if lng is None or lat is None:
|
|
return None
|
|
return lng, lat
|
|
|
|
|
|
def location_text(loc: tuple[float, float]) -> str:
|
|
return f"{loc[0]:.6f},{loc[1]:.6f}"
|
|
|
|
|
|
def chunked(items: list[Any], size: int) -> list[list[Any]]:
|
|
return [items[idx: idx + size] for idx in range(0, len(items), size)]
|
|
|
|
|
|
def pair_key(source_key: str, target_key: str) -> str:
|
|
return f"{source_key}=>{target_key}"
|
|
|
|
|
|
def resource_type(label: str) -> str:
|
|
return {
|
|
"ScenicArea": "scenic_area",
|
|
"ScenicAttraction": "attraction",
|
|
"HotelResource": "hotel",
|
|
"RestaurantResource": "restaurant",
|
|
}.get(label, "resource")
|
|
|
|
|
|
def same_admin_region(source: dict[str, Any], target: dict[str, Any]) -> str:
|
|
source_region = clean(source.get("admin_region_name"))
|
|
target_region = clean(target.get("admin_region_name"))
|
|
if source_region and target_region and source_region == target_region:
|
|
return "same_admin_region"
|
|
return ""
|
|
|
|
|
|
def straight_distance_km(source: dict[str, Any], target: dict[str, Any]) -> float | None:
|
|
src = node_location(source)
|
|
dst = node_location(target)
|
|
if not src or not dst:
|
|
return None
|
|
return round(b.haversine_km(src[0], src[1], dst[0], dst[1]), 2)
|
|
|
|
|
|
def nearby_match(source: dict[str, Any], target: dict[str, Any]) -> str:
|
|
distance = straight_distance_km(source, target)
|
|
if distance is None:
|
|
return ""
|
|
threshold = 35.0 if target.get("label") == "HotelResource" else 15.0
|
|
region_match = same_admin_region(source, target)
|
|
if region_match:
|
|
return region_match
|
|
source_city = clean(source.get("amap_cityname") or source.get("city") or source.get("city_or_area"))
|
|
target_city = clean(target.get("amap_cityname") or target.get("city") or target.get("city_or_area"))
|
|
same_city = source_city and target_city and (source_city in target_city or target_city in source_city)
|
|
if distance <= threshold:
|
|
return "nearby_distance"
|
|
if same_city and distance <= threshold * 1.6:
|
|
return "same_city_nearby_distance"
|
|
return ""
|
|
|
|
|
|
def load_existing_cache() -> dict[str, dict[str, Any]]:
|
|
if not CACHE_PATH.exists():
|
|
return {}
|
|
try:
|
|
payload = json.loads(CACHE_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
if isinstance(payload, dict) and isinstance(payload.get("items"), dict):
|
|
return payload["items"]
|
|
return payload if isinstance(payload, dict) else {}
|
|
|
|
|
|
def build_pairs(nodes: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
route_anchors = [
|
|
n for n in nodes
|
|
if node_location(n)
|
|
and (
|
|
n.get("label") == "ScenicArea"
|
|
or (
|
|
n.get("label") == "ScenicAttraction"
|
|
and n.get("is_independent_destination") is not False
|
|
and not clean(n.get("parent_scenic_area_name"))
|
|
)
|
|
)
|
|
]
|
|
hotels = [n for n in nodes if n.get("label") == "HotelResource" and node_location(n)]
|
|
restaurants = [n for n in nodes if n.get("label") == "RestaurantResource" and node_location(n)]
|
|
pairs: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
|
|
def add_pair(source: dict[str, Any], target: dict[str, Any], scope: str, region_match: str) -> None:
|
|
key = pair_key(source["natural_key"], target["natural_key"])
|
|
if key in seen:
|
|
return
|
|
src_loc = node_location(source)
|
|
dst_loc = node_location(target)
|
|
if not src_loc or not dst_loc:
|
|
return
|
|
seen.add(key)
|
|
pairs.append({
|
|
"key": key,
|
|
"source_key": source["natural_key"],
|
|
"target_key": target["natural_key"],
|
|
"source_name": source.get("name"),
|
|
"target_name": target.get("name"),
|
|
"source_label": source.get("label"),
|
|
"target_label": target.get("label"),
|
|
"target_resource_type": resource_type(target.get("label", "")),
|
|
"source_region": source.get("admin_region_name"),
|
|
"target_region": target.get("admin_region_name"),
|
|
"metric_scope": scope,
|
|
"region_match_level": region_match,
|
|
"same_admin_region": bool(region_match),
|
|
"origin_location": location_text(src_loc),
|
|
"destination_location": location_text(dst_loc),
|
|
"straight_distance_km": straight_distance_km(source, target),
|
|
})
|
|
|
|
for source in route_anchors:
|
|
for target in route_anchors:
|
|
if source["natural_key"] == target["natural_key"]:
|
|
continue
|
|
add_pair(source, target, "route_anchor_to_route_anchor", same_admin_region(source, target) or "cross_region_route_metric")
|
|
for target in hotels:
|
|
region_match = nearby_match(source, target)
|
|
if region_match:
|
|
scope = "scenic_anchor_to_same_region_hotel" if region_match == "same_admin_region" else "scenic_anchor_to_nearby_hotel"
|
|
add_pair(source, target, scope, region_match)
|
|
for target in restaurants:
|
|
region_match = nearby_match(source, target)
|
|
if region_match:
|
|
scope = "scenic_anchor_to_same_region_restaurant" if region_match == "same_admin_region" else "scenic_anchor_to_nearby_restaurant"
|
|
add_pair(source, target, scope, region_match)
|
|
return pairs
|
|
|
|
|
|
def cache_matches_pair(cached: dict[str, Any], pair: dict[str, Any]) -> bool:
|
|
"""Reuse cached AMap metrics only when the current semantic scope is identical."""
|
|
if cached.get("status") != "matched":
|
|
return False
|
|
for field in [
|
|
"source_key",
|
|
"target_key",
|
|
"source_label",
|
|
"target_label",
|
|
"metric_scope",
|
|
"origin_location",
|
|
"destination_location",
|
|
]:
|
|
if cached.get(field) != pair.get(field):
|
|
return False
|
|
return True
|
|
|
|
|
|
def query_distance_batch(key: str, destination: str, origin_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
|
params = {
|
|
"key": key,
|
|
"origins": "|".join(row["origin_location"] for row in origin_rows),
|
|
"destination": destination,
|
|
"type": "1",
|
|
"output": "json",
|
|
}
|
|
response = requests.get(AMAP_DISTANCE_URL, params=params, timeout=(3, 12), verify=False)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
if payload.get("status") != "1":
|
|
raise RuntimeError(f"{payload.get('infocode')} {payload.get('info')}")
|
|
results = payload.get("results") or []
|
|
by_pair: dict[str, dict[str, Any]] = {}
|
|
for result in results:
|
|
raw_idx = clean(result.get("origin_id"))
|
|
try:
|
|
idx = int(raw_idx) - 1
|
|
except Exception:
|
|
continue
|
|
if idx < 0 or idx >= len(origin_rows):
|
|
continue
|
|
row = origin_rows[idx]
|
|
distance_m = number(result.get("distance"))
|
|
duration_s = number(result.get("duration"))
|
|
if distance_m is None or duration_s is None:
|
|
continue
|
|
by_pair[row["key"]] = {
|
|
**row,
|
|
"status": "matched",
|
|
"amap_distance_m": int(distance_m),
|
|
"amap_duration_s": int(duration_s),
|
|
"drive_distance_km": round(distance_m / 1000, 2),
|
|
"drive_duration_min": int(math.ceil(duration_s / 60)),
|
|
"provider": "amap",
|
|
"api": "v3/distance",
|
|
"route_type": "driving",
|
|
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
|
}
|
|
return by_pair
|
|
|
|
|
|
def main() -> None:
|
|
if not NODES_PATH.exists():
|
|
raise RuntimeError(f"缺少节点文件:{NODES_PATH},请先构建一次 travel_graph。")
|
|
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
amap_key = load_key()
|
|
nodes = json.loads(NODES_PATH.read_text(encoding="utf-8"))
|
|
pairs = build_pairs(nodes)
|
|
existing_cache = load_existing_cache()
|
|
cache: dict[str, dict[str, Any]] = {}
|
|
missing: list[dict[str, Any]] = []
|
|
for row in pairs:
|
|
cached = existing_cache.get(row["key"])
|
|
if isinstance(cached, dict) and cache_matches_pair(cached, row):
|
|
cache[row["key"]] = cached
|
|
else:
|
|
missing.append(row)
|
|
print(json.dumps({
|
|
"candidate_pairs": len(pairs),
|
|
"cached_matched": len(pairs) - len(missing),
|
|
"missing": len(missing),
|
|
}, ensure_ascii=False), flush=True)
|
|
|
|
by_destination: dict[str, list[dict[str, Any]]] = {}
|
|
for row in missing:
|
|
by_destination.setdefault(row["destination_location"], []).append(row)
|
|
|
|
processed = 0
|
|
for destination, rows in by_destination.items():
|
|
for batch in chunked(rows, MAX_ORIGINS_PER_REQUEST):
|
|
try:
|
|
cache.update(query_distance_batch(amap_key, destination, batch))
|
|
except Exception as exc:
|
|
now = datetime.now().isoformat(timespec="seconds")
|
|
for row in batch:
|
|
cache[row["key"]] = {
|
|
**row,
|
|
"status": "api_error",
|
|
"error": str(exc)[:180],
|
|
"provider": "amap",
|
|
"api": "v3/distance",
|
|
"route_type": "driving",
|
|
"updated_at": now,
|
|
}
|
|
processed += len(batch)
|
|
if processed % 200 == 0 or processed == len(missing):
|
|
print(f"processed {processed}/{len(missing)}", flush=True)
|
|
time.sleep(0.10)
|
|
|
|
rows = [cache[row["key"]] for row in pairs if row["key"] in cache]
|
|
CACHE_PATH.write_text(json.dumps({
|
|
"generated_at": datetime.now().isoformat(timespec="seconds"),
|
|
"source": "amap_v3_distance_driving",
|
|
"note": "仅对业务资料库已有资源补充驾车距离/耗时;不新增线上随机商家。",
|
|
"items": cache,
|
|
}, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
fieldnames = [
|
|
"status", "metric_scope", "source_label", "source_name", "source_region",
|
|
"target_label", "target_name", "target_region", "region_match_level",
|
|
"straight_distance_km", "drive_distance_km", "drive_duration_min",
|
|
"amap_distance_m", "amap_duration_s", "origin_location", "destination_location",
|
|
"error",
|
|
]
|
|
with REPORT_CSV.open("w", newline="", encoding="utf-8-sig") as fh:
|
|
writer = csv.DictWriter(fh, fieldnames=fieldnames, extrasaction="ignore")
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
matched = sum(1 for row in rows if row.get("status") == "matched")
|
|
print(json.dumps({
|
|
"pairs": len(pairs),
|
|
"matched": matched,
|
|
"failed": len(rows) - matched,
|
|
"cache": str(CACHE_PATH),
|
|
"report": str(REPORT_CSV),
|
|
}, ensure_ascii=False, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|