Files
bxh/scripts/backfill_baixinghui_scenic_resource_edges.py

290 lines
11 KiB
Python

from __future__ import annotations
import argparse
import hashlib
from datetime import datetime
from typing import Any
from falkordb import FalkorDB
GRAPH_NAME = "baixinghui_travel_agency"
SOURCE = "backfill_service_scenic_edges_2026_06_08"
HOTEL_REGION_TO_SCENICS: dict[str, list[str]] = {
"贵阳区域": ["青岩古镇景区", "黔灵山公园景区", "天河潭景区"],
"黄果树区域": ["黄果树旅游景区", "天星桥景区", "陡坡塘瀑布", "龙宫景区", "平坝樱花景区"],
"西江千户苗寨区域": ["西江千户苗寨景区"],
"镇远古镇区域": ["镇远古城景区"],
"梵净山区域": ["梵净山景区"],
"织金/荔波区域": ["荔波小七孔景区", "织金洞景区", "中国天眼景区"],
"毕节区域": ["百里杜鹃景区", "织金洞景区"],
"开阳/猴耳天坑区域": ["南江大峡谷景区", "猴耳天坑景区"],
"遵义区域": ["遵义会议会址", "茅台镇", "乌江寨景区", "四渡赤水纪念馆"],
}
RESTAURANT_REGION_TO_SCENICS: dict[str, list[str]] = {
**HOTEL_REGION_TO_SCENICS,
"安顺/黄果树区域": ["黄果树旅游景区", "天龙屯堡景区", "龙宫景区", "花江大桥观景片区"],
"西江苗寨区域": ["西江千户苗寨景区"],
"镇远古镇区域": ["镇远古城景区"],
"黔东南区域": ["西江千户苗寨景区", "肇兴侗寨", "荔波小七孔景区", "丹寨万达小镇"],
"黔西南区域": ["兴义万峰林景区", "峰林布依景区", "马岭河峡谷景区", "晴隆二十四道拐景区"],
}
SCENIC_VALUE_ALIASES: dict[str, list[str]] = {
"甲秀楼景点": ["甲秀楼景点"],
"贵阳市区中转用餐点": RESTAURANT_REGION_TO_SCENICS["贵阳区域"],
"毕节市区中转用餐点": RESTAURANT_REGION_TO_SCENICS["毕节区域"],
}
CREATE_MISSING_SCENICS = {
"甲秀楼景点": {"short_name": "甲秀楼", "city": "贵阳"},
"猴耳天坑景区": {"short_name": "猴耳天坑", "city": "贵阳"},
"乌江寨景区": {"short_name": "乌江寨", "city": "遵义"},
"四渡赤水纪念馆": {"short_name": "四渡赤水", "city": "遵义"},
"晴隆二十四道拐景区": {"short_name": "晴隆二十四道拐", "city": "黔西南"},
"丹寨万达小镇": {"short_name": "丹寨万达小镇", "city": "黔东南"},
}
def clean(value: Any) -> str:
return str(value or "").strip()
def stable_id(prefix: str, *parts: str) -> str:
digest = hashlib.md5("|".join(parts).encode("utf-8")).hexdigest()[:12].upper()
return f"{prefix}-{digest}"
def props(node: Any) -> dict[str, Any]:
return dict(getattr(node, "properties", None) or {})
def load_nodes(g: Any, label: str) -> list[dict[str, Any]]:
rows = g.query(f"MATCH (n:{label}) RETURN n").result_set
return [props(row[0]) for row in rows]
def scenic_index(scenics: list[dict[str, Any]]) -> dict[str, str]:
index: dict[str, str] = {}
for scenic in scenics:
name = clean(scenic.get("name"))
short = clean(scenic.get("short_name"))
if name:
index[name] = name
index[name.removesuffix("景区")] = name
index[name.removesuffix("景点")] = name
if short:
index[short] = name
return index
def resolve_scenic_names(raw_names: list[str], index: dict[str, str]) -> tuple[list[str], list[str]]:
resolved: list[str] = []
missing: list[str] = []
for raw in raw_names:
name = clean(raw)
if not name:
continue
candidates = [
name,
name.removesuffix("景区"),
name.removesuffix("景点"),
name + "景区" if not name.endswith(("景区", "景点", "纪念馆", "小镇")) else name,
]
hit = next((index[candidate] for candidate in candidates if candidate in index), "")
if hit:
if hit not in resolved:
resolved.append(hit)
elif name in CREATE_MISSING_SCENICS:
if name not in resolved:
resolved.append(name)
elif name not in missing:
missing.append(name)
return resolved, missing
def restaurant_target_names(item: dict[str, Any]) -> list[str]:
applicable = clean(item.get("applicable_scenic"))
if applicable:
return SCENIC_VALUE_ALIASES.get(applicable) or [applicable]
return RESTAURANT_REGION_TO_SCENICS.get(clean(item.get("service_area")), [])
def ensure_missing_scenic(g: Any, name: str, dry_run: bool) -> bool:
if name not in CREATE_MISSING_SCENICS:
return False
if dry_run:
return True
extra = CREATE_MISSING_SCENICS[name]
payload = {
"name": name,
"short_name": extra.get("short_name") or name,
"city": extra.get("city") or "",
"province": "贵州省",
"attraction_id": stable_id("SA-BACKFILL", name),
"data_quality": "BACKFILL_ANCHOR_FROM_RESOURCE_SERVICE_SCENIC",
"source": SOURCE,
"updated_at": datetime.now().isoformat(timespec="seconds"),
}
g.query(
"""
MERGE (a:ScenicAttraction {name:$name})
SET a += $props
""",
{"name": name, "props": payload},
)
return True
def relation_exists(g: Any, rel_type: str, scenic_name: str, target_label: str, target_field: str, target_id: str) -> bool:
rows = g.query(
f"""
MATCH (a:ScenicAttraction {{name:$scenic_name}})-[r:{rel_type}]->(x:{target_label} {{{target_field}:$target_id}})
RETURN count(r)
""",
{"scenic_name": scenic_name, "target_id": target_id},
).result_set
return bool(rows and rows[0][0])
def create_relation(
g: Any,
rel_type: str,
scenic_name: str,
target_label: str,
target_field: str,
target_id: str,
target_name: str,
basis: str,
dry_run: bool,
) -> str:
if relation_exists(g, rel_type, scenic_name, target_label, target_field, target_id):
return "exists"
if dry_run:
return "planned"
relation_id = stable_id("EDGE", scenic_name, rel_type, target_id)
payload = {
"edge_id": relation_id,
"natural_key": f"{scenic_name}->{rel_type}->{target_id}",
"recommend_rank": 900,
"resource_id": target_id,
"resource_name": target_name,
"distance_status": "pending_amap_driving",
"match_method": "service_scenic_field_backfill",
"usage_note": "按服务景区/适用景区字段补挂,未补高德车程",
"remark": f"{basis};关系补全用于客服召回,车程、房态、餐标和价格仍需供应商确认",
"source": SOURCE,
"updated_at": datetime.now().isoformat(timespec="seconds"),
}
g.query(
f"""
MATCH (a:ScenicAttraction {{name:$scenic_name}}), (x:{target_label} {{{target_field}:$target_id}})
CREATE (a)-[r:{rel_type}]->(x)
SET r += $props
""",
{"scenic_name": scenic_name, "target_id": target_id, "props": payload},
)
return "created"
def counts(g: Any) -> dict[str, int]:
queries = {
"hotels": "MATCH (h:Hotel) RETURN count(h)",
"linked_hotels": "MATCH (:ScenicAttraction)-[:ATTRACTION_NEARBY_HOTEL]->(h:Hotel) RETURN count(DISTINCT h)",
"hotel_edges": "MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_HOTEL]->(:Hotel) RETURN count(r)",
"restaurants": "MATCH (r:Restaurant) RETURN count(r)",
"linked_restaurants": "MATCH (:ScenicAttraction)-[:ATTRACTION_NEARBY_RESTAURANT]->(r:Restaurant) RETURN count(DISTINCT r)",
"restaurant_edges": "MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_RESTAURANT]->(:Restaurant) RETURN count(r)",
"scenics": "MATCH (a:ScenicAttraction) RETURN count(a)",
}
out: dict[str, int] = {}
for key, query in queries.items():
out[key] = int(g.query(query).result_set[0][0])
return out
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--graph", default=GRAPH_NAME)
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
g = FalkorDB(host="localhost", port=6380).select_graph(args.graph)
before = counts(g)
scenics = load_nodes(g, "ScenicAttraction")
index = scenic_index(scenics)
hotels = load_nodes(g, "Hotel")
restaurants = load_nodes(g, "Restaurant")
created_scenic_names: set[str] = set()
stats = {
"created_scenics": 0,
"hotel_created": 0,
"hotel_exists": 0,
"hotel_planned": 0,
"restaurant_created": 0,
"restaurant_exists": 0,
"restaurant_planned": 0,
"missing_targets": [],
}
for hotel in hotels:
target_names, missing = resolve_scenic_names(HOTEL_REGION_TO_SCENICS.get(clean(hotel.get("service_scenic")), []), index)
stats["missing_targets"].extend(f"Hotel:{hotel.get('name')} -> {name}" for name in missing)
for scenic_name in target_names:
if scenic_name in CREATE_MISSING_SCENICS and ensure_missing_scenic(g, scenic_name, args.dry_run):
created_scenic_names.add(scenic_name)
index[scenic_name] = scenic_name
status = create_relation(
g,
"ATTRACTION_NEARBY_HOTEL",
scenic_name,
"Hotel",
"hotel_id",
clean(hotel.get("hotel_id")),
clean(hotel.get("name")),
f"酒店服务景区字段={clean(hotel.get('service_scenic'))}",
args.dry_run,
)
stats[f"hotel_{status}"] += 1
for restaurant in restaurants:
target_names, missing = resolve_scenic_names(restaurant_target_names(restaurant), index)
if not target_names and clean(restaurant.get("service_area")):
target_names, extra_missing = resolve_scenic_names(
RESTAURANT_REGION_TO_SCENICS.get(clean(restaurant.get("service_area")), []),
index,
)
missing.extend(extra_missing)
stats["missing_targets"].extend(f"Restaurant:{restaurant.get('name')} -> {name}" for name in missing)
for scenic_name in target_names:
if scenic_name in CREATE_MISSING_SCENICS and ensure_missing_scenic(g, scenic_name, args.dry_run):
created_scenic_names.add(scenic_name)
index[scenic_name] = scenic_name
status = create_relation(
g,
"ATTRACTION_NEARBY_RESTAURANT",
scenic_name,
"Restaurant",
"restaurant_id",
clean(restaurant.get("restaurant_id")),
clean(restaurant.get("name")),
f"餐饮适用景区={clean(restaurant.get('applicable_scenic')) or '-'};服务区域={clean(restaurant.get('service_area')) or '-'}",
args.dry_run,
)
stats[f"restaurant_{status}"] += 1
stats["created_scenics"] = len(created_scenic_names)
after = counts(g) if not args.dry_run else before
print({"dry_run": args.dry_run, "graph": args.graph, "before": before, "after": after, "stats": stats})
if stats["missing_targets"]:
print("missing_targets")
for item in stats["missing_targets"][:80]:
print("-", item)
if __name__ == "__main__":
main()