Files
bxh/scripts/enrich_travel_agency_2_0_customer_ready_data.py

249 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import hashlib
import json
import re
from collections import Counter
from datetime import datetime
from typing import Any
from falkordb import FalkorDB
SOURCE_GRAPH = "travel_fixed_route_item"
TARGET_GRAPH = "travel_agency_2_0_test"
MIGRATION_SOURCE = "travel_fixed_route_item_hotel_rates_v1"
UPDATED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def stable_id(prefix: str, text: str) -> str:
return f"{prefix}-{hashlib.sha1(text.encode('utf-8')).hexdigest()[:10].upper()}"
def compact(value: Any) -> str:
return re.sub(r"[\s·,/、()()【】\\+-]+", "", str(value or ""))
def safe_props(payload: dict[str, Any]) -> dict[str, Any]:
safe: dict[str, Any] = {}
for key, value in payload.items():
if value is None:
continue
if isinstance(value, (str, int, float, bool)):
safe[key] = value
elif isinstance(value, (list, dict)):
safe[key] = json.dumps(value, ensure_ascii=False)
else:
safe[key] = str(value)
return safe
def meal_flags(meal_text: str) -> dict[str, str]:
text = str(meal_text or "").strip()
if not text or text in {"/", "", "不含"}:
return {
"meal_plan": text or "未标注",
"meal_status": "NOT_INCLUDED_OR_UNKNOWN" if text else "UNKNOWN",
"breakfast_included": "false",
"lunch_included": "false",
"dinner_included": "false",
}
return {
"meal_plan": text,
"meal_status": "SOURCE_EXTRACTED",
"breakfast_included": "true" if "" in text else "false",
"lunch_included": "true" if "" in text or "" in text else "false",
"dinner_included": "true" if "" in text else "false",
}
def scenic_fee_status(name: str) -> tuple[str, str, str]:
text = compact(name)
if any(token in text for token in ("电瓶车", "环保车", "观光车", "摆渡车", "小交通", "保险", "扶梯", "索道", "游船", "旅拍代金券", "青龙洞")):
return "OPTIONAL", "scenic_optional_fee", "景区观光车/小交通/保险/扶梯/索道/游船等先按游客可选/升级项处理;基础门票价格待供应商补充。"
return "REVIEW", "scenic_fee_review", "费用状态需供应商或景区政策确认。"
def patch_meal_fields(graph) -> int:
rows = graph.query("MATCH (d:ProductDay) RETURN d.day_id, d.meal_text").result_set
count = 0
for day_id, meal_text in rows:
props = meal_flags(meal_text)
props["customer_day_summary"] = f"用餐:{props['meal_plan']}"
graph.query(
"MATCH (d:ProductDay {day_id:$day_id}) SET d += $props",
{"day_id": day_id, "props": props},
)
count += 1
stop_rows = graph.query("MATCH (s:RouteStop) RETURN s.stop_id, s.meal_text").result_set
for stop_id, meal_text in stop_rows:
if not meal_text:
continue
graph.query(
"MATCH (s:RouteStop {stop_id:$stop_id}) SET s.meal_plan=$meal_plan",
{"stop_id": stop_id, "meal_plan": str(meal_text)},
)
return count
def patch_fee_status(graph) -> int:
rows = graph.query("MATCH (i:TravelItem) WHERE i.scenic_id IS NOT NULL OR i.scenic_name IS NOT NULL RETURN i.item_id, i.name").result_set
count = 0
for item_id, name in rows:
status, scope, note = scenic_fee_status(name)
graph.query(
"""
MATCH (i:TravelItem {item_id:$item_id})
SET i.default_status_hint=$status,
i.item_scope=$scope,
i.status_source='customer_service_script_and_fee_name_rule',
i.mandatory_fee_policy=$note,
i.customer_visible='true',
i.updated_at=$updated_at
""",
{"item_id": item_id, "status": status, "scope": scope, "note": note, "updated_at": UPDATED_AT},
)
count += 1
return count
def migrate_hotel_rooms(source_graph, target_graph) -> dict[str, int]:
target_graph.query(
"MATCH (n:ResourcePriceRule) WHERE n.migration_source=$source DETACH DELETE n",
{"source": MIGRATION_SOURCE},
)
target_graph.query(
"MATCH (n:HotelRoomType) WHERE n.migration_source=$source DETACH DELETE n",
{"source": MIGRATION_SOURCE},
)
hotel_rows = target_graph.query("MATCH (h:Hotel) RETURN h.hotel_id, h.name").result_set
target_hotels = {compact(name): {"hotel_id": hotel_id, "name": name} for hotel_id, name in hotel_rows}
rows = source_graph.query(
"""
MATCH (h:TravelItem)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType)
OPTIONAL MATCH (room)-[:ROOM_TYPE_HAS_PRICE_EVENT]->(pe:HotelPriceEvent)
RETURN h.name, room, pe
"""
).result_set
room_seen: set[str] = set()
price_seen: set[str] = set()
stats = Counter()
for hotel_name, room_node, price_node in rows:
target = target_hotels.get(compact(hotel_name))
if not target:
stats["skipped_unmatched_hotel"] += 1
continue
room_props = dict(room_node.properties)
old_room_id = str(room_props.get("room_id") or stable_id("ROOM", room_props.get("natural_key") or repr(room_props)))
new_room_id = f"{old_room_id}-2_0"
if new_room_id not in room_seen:
payload = {
**room_props,
"room_id": new_room_id,
"hotel_id": target["hotel_id"],
"hotel_name": target["name"],
"name": f"{target['name']} - {room_props.get('room_name') or '房型'}",
"migration_source": MIGRATION_SOURCE,
"inventory_status": room_props.get("inventory_status") or "NEEDS_SUPPLIER_CONFIRM",
"inventory_source": room_props.get("inventory_source") or "supplier_pending",
"requires_supplier_confirm": "true",
"supplier_confirm_notes": "房态/余房未接入实时库存,报价前需供应商确认。",
"updated_at": UPDATED_AT,
}
target_graph.query("CREATE (room:HotelRoomType) SET room += $props", {"props": safe_props(payload)})
target_graph.query(
"""
MATCH (h:Hotel {hotel_id:$hotel_id}), (room:HotelRoomType {room_id:$room_id})
MERGE (h)-[:HOTEL_HAS_ROOM_TYPE {source:$source}]->(room)
""",
{"hotel_id": target["hotel_id"], "room_id": new_room_id, "source": MIGRATION_SOURCE},
)
room_seen.add(new_room_id)
stats["rooms_created"] += 1
if price_node is None:
continue
price_props = dict(price_node.properties)
old_rate_id = str(price_props.get("rate_id") or stable_id("RATE", price_props.get("natural_key") or repr(price_props)))
new_rate_id = f"{old_rate_id}-2_0"
if new_rate_id in price_seen:
continue
payload = {
**price_props,
"price_rule_id": new_rate_id,
"target_type": "HotelRoomType",
"target_id": new_room_id,
"target_name": f"{target['name']} - {room_props.get('room_name') or ''}".strip(),
"resource_type": "Hotel",
"resource_name": target["name"],
"hotel_id": target["hotel_id"],
"hotel_name": target["name"],
"room_id": new_room_id,
"room_name": room_props.get("room_name") or price_props.get("room_name") or "",
"requires_supplier_confirm": "true" if price_props.get("simulated_price") else "false",
"migration_source": MIGRATION_SOURCE,
"updated_at": UPDATED_AT,
}
target_graph.query("CREATE (price:ResourcePriceRule) SET price += $props", {"props": safe_props(payload)})
target_graph.query(
"""
MATCH (room:HotelRoomType {room_id:$room_id}), (price:ResourcePriceRule {price_rule_id:$price_rule_id})
MERGE (room)-[:ROOM_TYPE_HAS_PRICE_RULE {source:$source}]->(price)
""",
{"room_id": new_room_id, "price_rule_id": new_rate_id, "source": MIGRATION_SOURCE},
)
price_seen.add(new_rate_id)
stats["price_rules_created"] += 1
# Cache a short room summary on Hotel for fast customer-service answers.
summary_rows = target_graph.query(
"""
MATCH (h:Hotel)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType)
OPTIONAL MATCH (room)-[:ROOM_TYPE_HAS_PRICE_RULE]->(price:ResourcePriceRule)
RETURN h.hotel_id, collect(DISTINCT room.room_name), collect(DISTINCT price.price_text)
"""
).result_set
for hotel_id, rooms, prices in summary_rows:
room_summary = "".join([str(x) for x in rooms if x][:6])
price_summary = "".join([str(x) for x in prices if x][:4])
target_graph.query(
"""
MATCH (h:Hotel {hotel_id:$hotel_id})
SET h.room_type_summary=$room_summary,
h.room_price_summary=$price_summary,
h.requires_supplier_confirm='true',
h.supplier_confirm_notes='房态、余房、周末/节假日价需供应商二次确认'
""",
{"hotel_id": hotel_id, "room_summary": room_summary, "price_summary": price_summary},
)
return dict(stats)
def main() -> None:
db = FalkorDB(host="localhost", port=6380)
source_graph = db.select_graph(SOURCE_GRAPH)
target_graph = db.select_graph(TARGET_GRAPH)
stats = {
"meal_days_patched": patch_meal_fields(target_graph),
"scenic_fee_items_patched": patch_fee_status(target_graph),
}
stats.update(migrate_hotel_rooms(source_graph, target_graph))
verification = {
"HotelRoomType": target_graph.query("MATCH (n:HotelRoomType) RETURN count(n)").result_set[0][0],
"ResourcePriceRule": target_graph.query("MATCH (n:ResourcePriceRule) RETURN count(n)").result_set[0][0],
"ProductDayWithMealPlan": target_graph.query("MATCH (d:ProductDay) WHERE d.meal_plan IS NOT NULL RETURN count(d)").result_set[0][0],
"TravelItemWithStatusHint": target_graph.query("MATCH (i:TravelItem) WHERE i.default_status_hint IS NOT NULL RETURN count(i)").result_set[0][0],
}
print(json.dumps({"stats": stats, "verification": verification}, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()