290 lines
11 KiB
Python
290 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from falkordb import FalkorDB
|
|
|
|
|
|
GRAPH_NAME = "baixinghui_travel_agency"
|
|
SOURCE = "backfill_service_scenic_edges_2026_06_08"
|
|
|
|
HOTEL_REGION_TO_SCENICS: dict[str, list[str]] = {
|
|
"贵阳区域": ["青岩古镇景区", "黔灵山公园景区", "天河潭景区"],
|
|
"黄果树区域": ["黄果树旅游景区", "天星桥景区", "陡坡塘瀑布", "龙宫景区", "平坝樱花景区"],
|
|
"西江千户苗寨区域": ["西江千户苗寨景区"],
|
|
"镇远古镇区域": ["镇远古城景区"],
|
|
"梵净山区域": ["梵净山景区"],
|
|
"织金/荔波区域": ["荔波小七孔景区", "织金洞景区", "中国天眼景区"],
|
|
"毕节区域": ["百里杜鹃景区", "织金洞景区"],
|
|
"开阳/猴耳天坑区域": ["南江大峡谷景区", "猴耳天坑景区"],
|
|
"遵义区域": ["遵义会议会址", "茅台镇", "乌江寨景区", "四渡赤水纪念馆"],
|
|
}
|
|
|
|
RESTAURANT_REGION_TO_SCENICS: dict[str, list[str]] = {
|
|
**HOTEL_REGION_TO_SCENICS,
|
|
"安顺/黄果树区域": ["黄果树旅游景区", "天龙屯堡景区", "龙宫景区", "花江大桥观景片区"],
|
|
"西江苗寨区域": ["西江千户苗寨景区"],
|
|
"镇远古镇区域": ["镇远古城景区"],
|
|
"黔东南区域": ["西江千户苗寨景区", "肇兴侗寨", "荔波小七孔景区", "丹寨万达小镇"],
|
|
"黔西南区域": ["兴义万峰林景区", "峰林布依景区", "马岭河峡谷景区", "晴隆二十四道拐景区"],
|
|
}
|
|
|
|
SCENIC_VALUE_ALIASES: dict[str, list[str]] = {
|
|
"甲秀楼景点": ["甲秀楼景点"],
|
|
"贵阳市区中转用餐点": RESTAURANT_REGION_TO_SCENICS["贵阳区域"],
|
|
"毕节市区中转用餐点": RESTAURANT_REGION_TO_SCENICS["毕节区域"],
|
|
}
|
|
|
|
CREATE_MISSING_SCENICS = {
|
|
"甲秀楼景点": {"short_name": "甲秀楼", "city": "贵阳"},
|
|
"猴耳天坑景区": {"short_name": "猴耳天坑", "city": "贵阳"},
|
|
"乌江寨景区": {"short_name": "乌江寨", "city": "遵义"},
|
|
"四渡赤水纪念馆": {"short_name": "四渡赤水", "city": "遵义"},
|
|
"晴隆二十四道拐景区": {"short_name": "晴隆二十四道拐", "city": "黔西南"},
|
|
"丹寨万达小镇": {"short_name": "丹寨万达小镇", "city": "黔东南"},
|
|
}
|
|
|
|
|
|
def clean(value: Any) -> str:
|
|
return str(value or "").strip()
|
|
|
|
|
|
def stable_id(prefix: str, *parts: str) -> str:
|
|
digest = hashlib.md5("|".join(parts).encode("utf-8")).hexdigest()[:12].upper()
|
|
return f"{prefix}-{digest}"
|
|
|
|
|
|
def props(node: Any) -> dict[str, Any]:
|
|
return dict(getattr(node, "properties", None) or {})
|
|
|
|
|
|
def load_nodes(g: Any, label: str) -> list[dict[str, Any]]:
|
|
rows = g.query(f"MATCH (n:{label}) RETURN n").result_set
|
|
return [props(row[0]) for row in rows]
|
|
|
|
|
|
def scenic_index(scenics: list[dict[str, Any]]) -> dict[str, str]:
|
|
index: dict[str, str] = {}
|
|
for scenic in scenics:
|
|
name = clean(scenic.get("name"))
|
|
short = clean(scenic.get("short_name"))
|
|
if name:
|
|
index[name] = name
|
|
index[name.removesuffix("景区")] = name
|
|
index[name.removesuffix("景点")] = name
|
|
if short:
|
|
index[short] = name
|
|
return index
|
|
|
|
|
|
def resolve_scenic_names(raw_names: list[str], index: dict[str, str]) -> tuple[list[str], list[str]]:
|
|
resolved: list[str] = []
|
|
missing: list[str] = []
|
|
for raw in raw_names:
|
|
name = clean(raw)
|
|
if not name:
|
|
continue
|
|
candidates = [
|
|
name,
|
|
name.removesuffix("景区"),
|
|
name.removesuffix("景点"),
|
|
name + "景区" if not name.endswith(("景区", "景点", "纪念馆", "小镇")) else name,
|
|
]
|
|
hit = next((index[candidate] for candidate in candidates if candidate in index), "")
|
|
if hit:
|
|
if hit not in resolved:
|
|
resolved.append(hit)
|
|
elif name in CREATE_MISSING_SCENICS:
|
|
if name not in resolved:
|
|
resolved.append(name)
|
|
elif name not in missing:
|
|
missing.append(name)
|
|
return resolved, missing
|
|
|
|
|
|
def restaurant_target_names(item: dict[str, Any]) -> list[str]:
|
|
applicable = clean(item.get("applicable_scenic"))
|
|
if applicable:
|
|
return SCENIC_VALUE_ALIASES.get(applicable) or [applicable]
|
|
return RESTAURANT_REGION_TO_SCENICS.get(clean(item.get("service_area")), [])
|
|
|
|
|
|
def ensure_missing_scenic(g: Any, name: str, dry_run: bool) -> bool:
|
|
if name not in CREATE_MISSING_SCENICS:
|
|
return False
|
|
if dry_run:
|
|
return True
|
|
extra = CREATE_MISSING_SCENICS[name]
|
|
payload = {
|
|
"name": name,
|
|
"short_name": extra.get("short_name") or name,
|
|
"city": extra.get("city") or "",
|
|
"province": "贵州省",
|
|
"attraction_id": stable_id("SA-BACKFILL", name),
|
|
"data_quality": "BACKFILL_ANCHOR_FROM_RESOURCE_SERVICE_SCENIC",
|
|
"source": SOURCE,
|
|
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
|
}
|
|
g.query(
|
|
"""
|
|
MERGE (a:ScenicAttraction {name:$name})
|
|
SET a += $props
|
|
""",
|
|
{"name": name, "props": payload},
|
|
)
|
|
return True
|
|
|
|
|
|
def relation_exists(g: Any, rel_type: str, scenic_name: str, target_label: str, target_field: str, target_id: str) -> bool:
|
|
rows = g.query(
|
|
f"""
|
|
MATCH (a:ScenicAttraction {{name:$scenic_name}})-[r:{rel_type}]->(x:{target_label} {{{target_field}:$target_id}})
|
|
RETURN count(r)
|
|
""",
|
|
{"scenic_name": scenic_name, "target_id": target_id},
|
|
).result_set
|
|
return bool(rows and rows[0][0])
|
|
|
|
|
|
def create_relation(
|
|
g: Any,
|
|
rel_type: str,
|
|
scenic_name: str,
|
|
target_label: str,
|
|
target_field: str,
|
|
target_id: str,
|
|
target_name: str,
|
|
basis: str,
|
|
dry_run: bool,
|
|
) -> str:
|
|
if relation_exists(g, rel_type, scenic_name, target_label, target_field, target_id):
|
|
return "exists"
|
|
if dry_run:
|
|
return "planned"
|
|
relation_id = stable_id("EDGE", scenic_name, rel_type, target_id)
|
|
payload = {
|
|
"edge_id": relation_id,
|
|
"natural_key": f"{scenic_name}->{rel_type}->{target_id}",
|
|
"recommend_rank": 900,
|
|
"resource_id": target_id,
|
|
"resource_name": target_name,
|
|
"distance_status": "pending_amap_driving",
|
|
"match_method": "service_scenic_field_backfill",
|
|
"usage_note": "按服务景区/适用景区字段补挂,未补高德车程",
|
|
"remark": f"{basis};关系补全用于客服召回,车程、房态、餐标和价格仍需供应商确认",
|
|
"source": SOURCE,
|
|
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
|
}
|
|
g.query(
|
|
f"""
|
|
MATCH (a:ScenicAttraction {{name:$scenic_name}}), (x:{target_label} {{{target_field}:$target_id}})
|
|
CREATE (a)-[r:{rel_type}]->(x)
|
|
SET r += $props
|
|
""",
|
|
{"scenic_name": scenic_name, "target_id": target_id, "props": payload},
|
|
)
|
|
return "created"
|
|
|
|
|
|
def counts(g: Any) -> dict[str, int]:
|
|
queries = {
|
|
"hotels": "MATCH (h:Hotel) RETURN count(h)",
|
|
"linked_hotels": "MATCH (:ScenicAttraction)-[:ATTRACTION_NEARBY_HOTEL]->(h:Hotel) RETURN count(DISTINCT h)",
|
|
"hotel_edges": "MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_HOTEL]->(:Hotel) RETURN count(r)",
|
|
"restaurants": "MATCH (r:Restaurant) RETURN count(r)",
|
|
"linked_restaurants": "MATCH (:ScenicAttraction)-[:ATTRACTION_NEARBY_RESTAURANT]->(r:Restaurant) RETURN count(DISTINCT r)",
|
|
"restaurant_edges": "MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_RESTAURANT]->(:Restaurant) RETURN count(r)",
|
|
"scenics": "MATCH (a:ScenicAttraction) RETURN count(a)",
|
|
}
|
|
out: dict[str, int] = {}
|
|
for key, query in queries.items():
|
|
out[key] = int(g.query(query).result_set[0][0])
|
|
return out
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--graph", default=GRAPH_NAME)
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
g = FalkorDB(host="localhost", port=6380).select_graph(args.graph)
|
|
before = counts(g)
|
|
scenics = load_nodes(g, "ScenicAttraction")
|
|
index = scenic_index(scenics)
|
|
hotels = load_nodes(g, "Hotel")
|
|
restaurants = load_nodes(g, "Restaurant")
|
|
|
|
created_scenic_names: set[str] = set()
|
|
stats = {
|
|
"created_scenics": 0,
|
|
"hotel_created": 0,
|
|
"hotel_exists": 0,
|
|
"hotel_planned": 0,
|
|
"restaurant_created": 0,
|
|
"restaurant_exists": 0,
|
|
"restaurant_planned": 0,
|
|
"missing_targets": [],
|
|
}
|
|
|
|
for hotel in hotels:
|
|
target_names, missing = resolve_scenic_names(HOTEL_REGION_TO_SCENICS.get(clean(hotel.get("service_scenic")), []), index)
|
|
stats["missing_targets"].extend(f"Hotel:{hotel.get('name')} -> {name}" for name in missing)
|
|
for scenic_name in target_names:
|
|
if scenic_name in CREATE_MISSING_SCENICS and ensure_missing_scenic(g, scenic_name, args.dry_run):
|
|
created_scenic_names.add(scenic_name)
|
|
index[scenic_name] = scenic_name
|
|
status = create_relation(
|
|
g,
|
|
"ATTRACTION_NEARBY_HOTEL",
|
|
scenic_name,
|
|
"Hotel",
|
|
"hotel_id",
|
|
clean(hotel.get("hotel_id")),
|
|
clean(hotel.get("name")),
|
|
f"酒店服务景区字段={clean(hotel.get('service_scenic'))}",
|
|
args.dry_run,
|
|
)
|
|
stats[f"hotel_{status}"] += 1
|
|
|
|
for restaurant in restaurants:
|
|
target_names, missing = resolve_scenic_names(restaurant_target_names(restaurant), index)
|
|
if not target_names and clean(restaurant.get("service_area")):
|
|
target_names, extra_missing = resolve_scenic_names(
|
|
RESTAURANT_REGION_TO_SCENICS.get(clean(restaurant.get("service_area")), []),
|
|
index,
|
|
)
|
|
missing.extend(extra_missing)
|
|
stats["missing_targets"].extend(f"Restaurant:{restaurant.get('name')} -> {name}" for name in missing)
|
|
for scenic_name in target_names:
|
|
if scenic_name in CREATE_MISSING_SCENICS and ensure_missing_scenic(g, scenic_name, args.dry_run):
|
|
created_scenic_names.add(scenic_name)
|
|
index[scenic_name] = scenic_name
|
|
status = create_relation(
|
|
g,
|
|
"ATTRACTION_NEARBY_RESTAURANT",
|
|
scenic_name,
|
|
"Restaurant",
|
|
"restaurant_id",
|
|
clean(restaurant.get("restaurant_id")),
|
|
clean(restaurant.get("name")),
|
|
f"餐饮适用景区={clean(restaurant.get('applicable_scenic')) or '-'};服务区域={clean(restaurant.get('service_area')) or '-'}",
|
|
args.dry_run,
|
|
)
|
|
stats[f"restaurant_{status}"] += 1
|
|
|
|
stats["created_scenics"] = len(created_scenic_names)
|
|
after = counts(g) if not args.dry_run else before
|
|
print({"dry_run": args.dry_run, "graph": args.graph, "before": before, "after": after, "stats": stats})
|
|
if stats["missing_targets"]:
|
|
print("missing_targets")
|
|
for item in stats["missing_targets"][:80]:
|
|
print("-", item)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|