#!/usr/bin/env python3 """Fetch focused AMap POI fields for the travel_graph location resources. This does not add arbitrary online hotels/restaurants into the sellable resource library. It only enriches resources that already exist in the business files with AMap id/address/region/coordinate/photo fields when a high-confidence match is available. """ from __future__ import annotations import csv import importlib.util import json import os import re import time from datetime import datetime from pathlib import Path from typing import Any import requests import urllib3 from common_paths import GAODE_CRAWLER_PATH, PROJECT_ROOT, TRAVEL_KG_EXPORT_ROOT urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) BUILD_SCRIPT = PROJECT_ROOT / "scripts/build_travel_graph_existing_product_project.py" OUT_DIR = TRAVEL_KG_EXPORT_ROOT / "travel_graph_旅行社线路制定" NODES_PATH = OUT_DIR / "抽取结果_nodes.json" CACHE_PATH = OUT_DIR / "amap_poi_enrichment_cache.json" REPORT_CSV = OUT_DIR / "amap_poi_enrichment_report.csv" AMAP_TEXT_URL = "https://restapi.amap.com/v3/place/text" def load_build_module(): spec = importlib.util.spec_from_file_location("travel_build", BUILD_SCRIPT) mod = importlib.util.module_from_spec(spec) assert spec.loader spec.loader.exec_module(mod) return mod b = load_build_module() def load_key() -> str: for key in (os.environ.get("AMAP_WEB_KEY"), os.environ.get("AMAP_KEY")): if key: return key crawl_path = GAODE_CRAWLER_PATH if crawl_path.exists(): spec = importlib.util.spec_from_file_location("crawl_guiyan", crawl_path) mod = importlib.util.module_from_spec(spec) assert spec.loader spec.loader.exec_module(mod) key = (getattr(mod, "CONFIG", {}) or {}).get("key") if key: return key raise RuntimeError("未找到可用的高德 Web 服务 Key。请配置 AMAP_WEB_KEY 或保留 crawl_guiyan.py 中的 CONFIG['key']。") def clean(value: Any) -> str: return b.compact(value) def norm(value: Any) -> str: return b.norm(value) def as_list(value: Any) -> list[str]: if isinstance(value, list): return [clean(x) for x in value if clean(x)] if isinstance(value, str): try: decoded = json.loads(value) if isinstance(decoded, list): return [clean(x) for x in decoded if clean(x)] except Exception: pass return [x for x in re.split(r"[、,,;/;\n]+", value) if clean(x)] return [] def expected_type_prefix(label: str) -> str: return { "ScenicAttraction": "110", "HotelResource": "100", "RestaurantResource": "050", }.get(label, "") def query_pois(key: str, keyword: str) -> list[dict[str, Any]]: params = { "key": key, "keywords": keyword, "city": "贵州", "citylimit": "false", "output": "json", "extensions": "all", "offset": 12, "page": 1, } response = requests.get(AMAP_TEXT_URL, params=params, timeout=(3, 8), verify=False) response.raise_for_status() payload = response.json() if payload.get("status") != "1": return [] return payload.get("pois") or [] def region_hint_for_node(node: dict[str, Any]) -> tuple[str, str, str] | None: if node.get("label") == "ScenicAttraction": hint = b.ATTRACTION_ADMIN_HINTS.get(clean(node.get("name"))) if hint: return hint return b.region_hint_for_text(" ".join([ clean(node.get("name")), clean(node.get("city_or_area")), clean(node.get("address")), clean(node.get("city")), ])) def name_score(node: dict[str, Any], poi: dict[str, Any]) -> int: poi_name = norm(poi.get("name")) names = [clean(node.get("name")), *as_list(node.get("aliases"))] best = 0 for name in names: n = norm(name) if not n or not poi_name: continue if n == poi_name: best = max(best, 82) elif n in poi_name or poi_name in n: best = max(best, 64) else: common = len(set(re.findall(r"[\u4e00-\u9fff]{2,}", n)) & set(re.findall(r"[\u4e00-\u9fff]{2,}", poi_name))) best = max(best, min(45, common * 10)) return best def score_poi(node: dict[str, Any], poi: dict[str, Any]) -> tuple[int, str]: score = name_score(node, poi) reasons: list[str] = [f"name={score}"] prefix = expected_type_prefix(node.get("label", "")) typecode = clean(poi.get("typecode")) if prefix and typecode.startswith(prefix): score += 18 reasons.append("type=match") elif prefix: score -= 18 reasons.append(f"type=mismatch:{typecode}") hint = region_hint_for_node(node) if hint: _, city, county = hint cityname = clean(poi.get("cityname")) adname = clean(poi.get("adname")) if county and county == adname: score += 12 reasons.append("county=match") elif city and city == cityname: score += 8 reasons.append("city=match") elif county and adname and county != adname: score -= 12 reasons.append(f"county=conflict:{adname}") if poi.get("location"): score += 4 reasons.append("geo=yes") if poi.get("photos"): score += 2 reasons.append("photo=yes") return score, ";".join(reasons) def has_hard_region_conflict(node: dict[str, Any], poi: dict[str, Any]) -> bool: hint = region_hint_for_node(node) if not hint: return False _, city, _county = hint cityname = clean(poi.get("cityname")) # City/province level mismatch is a hard reject. County names can differ # for scenic management zones, e.g. 百里杜鹃管理区 vs 黔西市, so those stay soft. return bool(city and cityname and city != cityname) def best_match(node: dict[str, Any], pois: list[dict[str, Any]]) -> tuple[dict[str, Any] | None, int, str]: filtered = [poi for poi in pois if not has_hard_region_conflict(node, poi)] scored = [(score_poi(node, poi), poi) for poi in filtered] scored.sort(key=lambda item: item[0][0], reverse=True) if not scored: return None, 0, "no_result_or_region_conflict" (score, reason), poi = scored[0] threshold = 54 if node.get("label") == "ScenicAttraction" else 70 if score < threshold: return None, score, reason return poi, score, reason def fields_from_poi(poi: dict[str, Any], score: int, reason: str) -> dict[str, Any]: lng = lat = "" if poi.get("location") and "," in poi["location"]: lng, lat = poi["location"].split(",", 1) photos = [p.get("url") for p in (poi.get("photos") or []) if p.get("url")] return { "amap_match_status": "matched", "amap_match_confidence": round(score / 100, 2), "amap_match_reason": reason, "amap_poi_id": poi.get("id"), "amap_name": poi.get("name"), "amap_type": poi.get("type"), "amap_typecode": poi.get("typecode"), "amap_address": poi.get("address"), "amap_location": poi.get("location"), "amap_lng": float(lng) if lng else "", "amap_lat": float(lat) if lat else "", "amap_pname": poi.get("pname"), "amap_cityname": poi.get("cityname"), "amap_adname": poi.get("adname"), "amap_adcode": poi.get("adcode"), "amap_tel": clean(poi.get("tel")), "amap_rating": clean((poi.get("biz_ext") or {}).get("rating")), "amap_cost": clean((poi.get("biz_ext") or {}).get("cost")), "amap_open_time": clean((poi.get("biz_ext") or {}).get("open_time")), "amap_photo_urls": photos, "data_completeness_note": "高德POI补全:行政区、地址、坐标、电话/评分/人均/营业时间按接口可得字段写入。", } def keyword_for_node(node: dict[str, Any]) -> str: name = clean(node.get("name")) if node.get("label") == "ScenicAttraction": hint = region_hint_for_node(node) if hint: _province, city, county = hint return " ".join(part for part in [city, county, name] if clean(part)) return name region = clean(node.get("city_or_area")) region = re.sub(r"区域$", "", region) return f"{region} {name}".strip() def main() -> None: if not NODES_PATH.exists(): raise RuntimeError(f"缺少节点文件:{NODES_PATH},请先构建一次 travel_graph。") OUT_DIR.mkdir(parents=True, exist_ok=True) key = load_key() nodes = json.loads(NODES_PATH.read_text(encoding="utf-8")) targets = [n for n in nodes if n.get("label") in {"ScenicAttraction", "HotelResource", "RestaurantResource"}] print(f"targets {len(targets)}", flush=True) items: dict[str, dict[str, Any]] = {} report_rows: list[dict[str, Any]] = [] for idx, node in enumerate(targets, start=1): keyword = keyword_for_node(node) print(f"[{idx}/{len(targets)}] {node.get('label')} {node.get('name')} -> {keyword}", flush=True) try: pois = query_pois(key, keyword) poi, score, reason = best_match(node, pois) except Exception as exc: poi, score, reason = None, 0, f"api_error:{str(exc)[:120]}" if poi: fields = fields_from_poi(poi, score, reason) status = "matched" else: fields = { "amap_match_status": "unmatched", "amap_match_confidence": round(score / 100, 2) if score else 0, "amap_match_reason": reason, "data_completeness_note": "高德POI未高置信匹配,暂不写入坐标,避免误把同名/异地资源挂错。", } status = "unmatched" items[node["natural_key"]] = { "label": node.get("label"), "name": node.get("name"), "query": keyword, "status": status, "fields": fields, } report_rows.append({ "label": node.get("label"), "name": node.get("name"), "query": keyword, "status": status, "score": fields.get("amap_match_confidence"), "amap_name": fields.get("amap_name", ""), "city": fields.get("amap_cityname", ""), "district": fields.get("amap_adname", ""), "reason": fields.get("amap_match_reason", ""), }) if idx % 25 == 0: print(f"processed {idx}/{len(targets)}", flush=True) time.sleep(0.12) CACHE_PATH.write_text(json.dumps({ "generated_at": datetime.now().isoformat(timespec="seconds"), "source": "amap_place_text", "note": "只补全业务文件已有资源;不新增线上随机酒店/餐厅为可售资源。", "items": items, }, ensure_ascii=False, indent=2), encoding="utf-8") with REPORT_CSV.open("w", newline="", encoding="utf-8-sig") as fh: writer = csv.DictWriter(fh, fieldnames=list(report_rows[0].keys())) writer.writeheader() writer.writerows(report_rows) matched = sum(1 for row in report_rows if row["status"] == "matched") print(json.dumps({"targets": len(targets), "matched": matched, "unmatched": len(targets) - matched, "cache": str(CACHE_PATH)}, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()