from __future__ import annotations import argparse import hashlib from datetime import datetime from typing import Any from falkordb import FalkorDB GRAPH_NAME = "baixinghui_travel_agency" SOURCE = "backfill_service_scenic_edges_2026_06_08" HOTEL_REGION_TO_SCENICS: dict[str, list[str]] = { "贵阳区域": ["青岩古镇景区", "黔灵山公园景区", "天河潭景区"], "黄果树区域": ["黄果树旅游景区", "天星桥景区", "陡坡塘瀑布", "龙宫景区", "平坝樱花景区"], "西江千户苗寨区域": ["西江千户苗寨景区"], "镇远古镇区域": ["镇远古城景区"], "梵净山区域": ["梵净山景区"], "织金/荔波区域": ["荔波小七孔景区", "织金洞景区", "中国天眼景区"], "毕节区域": ["百里杜鹃景区", "织金洞景区"], "开阳/猴耳天坑区域": ["南江大峡谷景区", "猴耳天坑景区"], "遵义区域": ["遵义会议会址", "茅台镇", "乌江寨景区", "四渡赤水纪念馆"], } RESTAURANT_REGION_TO_SCENICS: dict[str, list[str]] = { **HOTEL_REGION_TO_SCENICS, "安顺/黄果树区域": ["黄果树旅游景区", "天龙屯堡景区", "龙宫景区", "花江大桥观景片区"], "西江苗寨区域": ["西江千户苗寨景区"], "镇远古镇区域": ["镇远古城景区"], "黔东南区域": ["西江千户苗寨景区", "肇兴侗寨", "荔波小七孔景区", "丹寨万达小镇"], "黔西南区域": ["兴义万峰林景区", "峰林布依景区", "马岭河峡谷景区", "晴隆二十四道拐景区"], } SCENIC_VALUE_ALIASES: dict[str, list[str]] = { "甲秀楼景点": ["甲秀楼景点"], "贵阳市区中转用餐点": RESTAURANT_REGION_TO_SCENICS["贵阳区域"], "毕节市区中转用餐点": RESTAURANT_REGION_TO_SCENICS["毕节区域"], } CREATE_MISSING_SCENICS = { "甲秀楼景点": {"short_name": "甲秀楼", "city": "贵阳"}, "猴耳天坑景区": {"short_name": "猴耳天坑", "city": "贵阳"}, "乌江寨景区": {"short_name": "乌江寨", "city": "遵义"}, "四渡赤水纪念馆": {"short_name": "四渡赤水", "city": "遵义"}, "晴隆二十四道拐景区": {"short_name": "晴隆二十四道拐", "city": "黔西南"}, "丹寨万达小镇": {"short_name": "丹寨万达小镇", "city": "黔东南"}, } def clean(value: Any) -> str: return str(value or "").strip() def stable_id(prefix: str, *parts: str) -> str: digest = hashlib.md5("|".join(parts).encode("utf-8")).hexdigest()[:12].upper() return f"{prefix}-{digest}" def props(node: Any) -> dict[str, Any]: return dict(getattr(node, "properties", None) or {}) def load_nodes(g: Any, label: str) -> list[dict[str, Any]]: rows = g.query(f"MATCH (n:{label}) RETURN n").result_set return [props(row[0]) for row in rows] def scenic_index(scenics: list[dict[str, Any]]) -> dict[str, str]: index: dict[str, str] = {} for scenic in scenics: name = clean(scenic.get("name")) short = clean(scenic.get("short_name")) if name: index[name] = name index[name.removesuffix("景区")] = name index[name.removesuffix("景点")] = name if short: index[short] = name return index def resolve_scenic_names(raw_names: list[str], index: dict[str, str]) -> tuple[list[str], list[str]]: resolved: list[str] = [] missing: list[str] = [] for raw in raw_names: name = clean(raw) if not name: continue candidates = [ name, name.removesuffix("景区"), name.removesuffix("景点"), name + "景区" if not name.endswith(("景区", "景点", "纪念馆", "小镇")) else name, ] hit = next((index[candidate] for candidate in candidates if candidate in index), "") if hit: if hit not in resolved: resolved.append(hit) elif name in CREATE_MISSING_SCENICS: if name not in resolved: resolved.append(name) elif name not in missing: missing.append(name) return resolved, missing def restaurant_target_names(item: dict[str, Any]) -> list[str]: applicable = clean(item.get("applicable_scenic")) if applicable: return SCENIC_VALUE_ALIASES.get(applicable) or [applicable] return RESTAURANT_REGION_TO_SCENICS.get(clean(item.get("service_area")), []) def ensure_missing_scenic(g: Any, name: str, dry_run: bool) -> bool: if name not in CREATE_MISSING_SCENICS: return False if dry_run: return True extra = CREATE_MISSING_SCENICS[name] payload = { "name": name, "short_name": extra.get("short_name") or name, "city": extra.get("city") or "", "province": "贵州省", "attraction_id": stable_id("SA-BACKFILL", name), "data_quality": "BACKFILL_ANCHOR_FROM_RESOURCE_SERVICE_SCENIC", "source": SOURCE, "updated_at": datetime.now().isoformat(timespec="seconds"), } g.query( """ MERGE (a:ScenicAttraction {name:$name}) SET a += $props """, {"name": name, "props": payload}, ) return True def relation_exists(g: Any, rel_type: str, scenic_name: str, target_label: str, target_field: str, target_id: str) -> bool: rows = g.query( f""" MATCH (a:ScenicAttraction {{name:$scenic_name}})-[r:{rel_type}]->(x:{target_label} {{{target_field}:$target_id}}) RETURN count(r) """, {"scenic_name": scenic_name, "target_id": target_id}, ).result_set return bool(rows and rows[0][0]) def create_relation( g: Any, rel_type: str, scenic_name: str, target_label: str, target_field: str, target_id: str, target_name: str, basis: str, dry_run: bool, ) -> str: if relation_exists(g, rel_type, scenic_name, target_label, target_field, target_id): return "exists" if dry_run: return "planned" relation_id = stable_id("EDGE", scenic_name, rel_type, target_id) payload = { "edge_id": relation_id, "natural_key": f"{scenic_name}->{rel_type}->{target_id}", "recommend_rank": 900, "resource_id": target_id, "resource_name": target_name, "distance_status": "pending_amap_driving", "match_method": "service_scenic_field_backfill", "usage_note": "按服务景区/适用景区字段补挂,未补高德车程", "remark": f"{basis};关系补全用于客服召回,车程、房态、餐标和价格仍需供应商确认", "source": SOURCE, "updated_at": datetime.now().isoformat(timespec="seconds"), } g.query( f""" MATCH (a:ScenicAttraction {{name:$scenic_name}}), (x:{target_label} {{{target_field}:$target_id}}) CREATE (a)-[r:{rel_type}]->(x) SET r += $props """, {"scenic_name": scenic_name, "target_id": target_id, "props": payload}, ) return "created" def counts(g: Any) -> dict[str, int]: queries = { "hotels": "MATCH (h:Hotel) RETURN count(h)", "linked_hotels": "MATCH (:ScenicAttraction)-[:ATTRACTION_NEARBY_HOTEL]->(h:Hotel) RETURN count(DISTINCT h)", "hotel_edges": "MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_HOTEL]->(:Hotel) RETURN count(r)", "restaurants": "MATCH (r:Restaurant) RETURN count(r)", "linked_restaurants": "MATCH (:ScenicAttraction)-[:ATTRACTION_NEARBY_RESTAURANT]->(r:Restaurant) RETURN count(DISTINCT r)", "restaurant_edges": "MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_RESTAURANT]->(:Restaurant) RETURN count(r)", "scenics": "MATCH (a:ScenicAttraction) RETURN count(a)", } out: dict[str, int] = {} for key, query in queries.items(): out[key] = int(g.query(query).result_set[0][0]) return out def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--graph", default=GRAPH_NAME) parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() g = FalkorDB(host="localhost", port=6380).select_graph(args.graph) before = counts(g) scenics = load_nodes(g, "ScenicAttraction") index = scenic_index(scenics) hotels = load_nodes(g, "Hotel") restaurants = load_nodes(g, "Restaurant") created_scenic_names: set[str] = set() stats = { "created_scenics": 0, "hotel_created": 0, "hotel_exists": 0, "hotel_planned": 0, "restaurant_created": 0, "restaurant_exists": 0, "restaurant_planned": 0, "missing_targets": [], } for hotel in hotels: target_names, missing = resolve_scenic_names(HOTEL_REGION_TO_SCENICS.get(clean(hotel.get("service_scenic")), []), index) stats["missing_targets"].extend(f"Hotel:{hotel.get('name')} -> {name}" for name in missing) for scenic_name in target_names: if scenic_name in CREATE_MISSING_SCENICS and ensure_missing_scenic(g, scenic_name, args.dry_run): created_scenic_names.add(scenic_name) index[scenic_name] = scenic_name status = create_relation( g, "ATTRACTION_NEARBY_HOTEL", scenic_name, "Hotel", "hotel_id", clean(hotel.get("hotel_id")), clean(hotel.get("name")), f"酒店服务景区字段={clean(hotel.get('service_scenic'))}", args.dry_run, ) stats[f"hotel_{status}"] += 1 for restaurant in restaurants: target_names, missing = resolve_scenic_names(restaurant_target_names(restaurant), index) if not target_names and clean(restaurant.get("service_area")): target_names, extra_missing = resolve_scenic_names( RESTAURANT_REGION_TO_SCENICS.get(clean(restaurant.get("service_area")), []), index, ) missing.extend(extra_missing) stats["missing_targets"].extend(f"Restaurant:{restaurant.get('name')} -> {name}" for name in missing) for scenic_name in target_names: if scenic_name in CREATE_MISSING_SCENICS and ensure_missing_scenic(g, scenic_name, args.dry_run): created_scenic_names.add(scenic_name) index[scenic_name] = scenic_name status = create_relation( g, "ATTRACTION_NEARBY_RESTAURANT", scenic_name, "Restaurant", "restaurant_id", clean(restaurant.get("restaurant_id")), clean(restaurant.get("name")), f"餐饮适用景区={clean(restaurant.get('applicable_scenic')) or '-'};服务区域={clean(restaurant.get('service_area')) or '-'}", args.dry_run, ) stats[f"restaurant_{status}"] += 1 stats["created_scenics"] = len(created_scenic_names) after = counts(g) if not args.dry_run else before print({"dry_run": args.dry_run, "graph": args.graph, "before": before, "after": after, "stats": stats}) if stats["missing_targets"]: print("missing_targets") for item in stats["missing_targets"][:80]: print("-", item) if __name__ == "__main__": main()