from __future__ import annotations import hashlib import json import re from collections import Counter from datetime import datetime from typing import Any from falkordb import FalkorDB SOURCE_GRAPH = "travel_fixed_route_item" TARGET_GRAPH = "travel_agency_2_0_test" MIGRATION_SOURCE = "travel_fixed_route_item_hotel_rates_v1" UPDATED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S") def stable_id(prefix: str, text: str) -> str: return f"{prefix}-{hashlib.sha1(text.encode('utf-8')).hexdigest()[:10].upper()}" def compact(value: Any) -> str: return re.sub(r"[\s·,,/、()()【】\\++-]+", "", str(value or "")) def safe_props(payload: dict[str, Any]) -> dict[str, Any]: safe: dict[str, Any] = {} for key, value in payload.items(): if value is None: continue if isinstance(value, (str, int, float, bool)): safe[key] = value elif isinstance(value, (list, dict)): safe[key] = json.dumps(value, ensure_ascii=False) else: safe[key] = str(value) return safe def meal_flags(meal_text: str) -> dict[str, str]: text = str(meal_text or "").strip() if not text or text in {"/", "无", "不含"}: return { "meal_plan": text or "未标注", "meal_status": "NOT_INCLUDED_OR_UNKNOWN" if text else "UNKNOWN", "breakfast_included": "false", "lunch_included": "false", "dinner_included": "false", } return { "meal_plan": text, "meal_status": "SOURCE_EXTRACTED", "breakfast_included": "true" if "早" in text else "false", "lunch_included": "true" if "中" in text or "午" in text else "false", "dinner_included": "true" if "晚" in text else "false", } def scenic_fee_status(name: str) -> tuple[str, str, str]: text = compact(name) if any(token in text for token in ("电瓶车", "环保车", "观光车", "摆渡车", "小交通", "保险", "扶梯", "索道", "游船", "旅拍代金券", "青龙洞")): return "OPTIONAL", "scenic_optional_fee", "景区观光车/小交通/保险/扶梯/索道/游船等先按游客可选/升级项处理;基础门票价格待供应商补充。" return "REVIEW", "scenic_fee_review", "费用状态需供应商或景区政策确认。" def patch_meal_fields(graph) -> int: rows = graph.query("MATCH (d:ProductDay) RETURN d.day_id, d.meal_text").result_set count = 0 for day_id, meal_text in rows: props = meal_flags(meal_text) props["customer_day_summary"] = f"用餐:{props['meal_plan']}" graph.query( "MATCH (d:ProductDay {day_id:$day_id}) SET d += $props", {"day_id": day_id, "props": props}, ) count += 1 stop_rows = graph.query("MATCH (s:RouteStop) RETURN s.stop_id, s.meal_text").result_set for stop_id, meal_text in stop_rows: if not meal_text: continue graph.query( "MATCH (s:RouteStop {stop_id:$stop_id}) SET s.meal_plan=$meal_plan", {"stop_id": stop_id, "meal_plan": str(meal_text)}, ) return count def patch_fee_status(graph) -> int: rows = graph.query("MATCH (i:TravelItem) WHERE i.scenic_id IS NOT NULL OR i.scenic_name IS NOT NULL RETURN i.item_id, i.name").result_set count = 0 for item_id, name in rows: status, scope, note = scenic_fee_status(name) graph.query( """ MATCH (i:TravelItem {item_id:$item_id}) SET i.default_status_hint=$status, i.item_scope=$scope, i.status_source='customer_service_script_and_fee_name_rule', i.mandatory_fee_policy=$note, i.customer_visible='true', i.updated_at=$updated_at """, {"item_id": item_id, "status": status, "scope": scope, "note": note, "updated_at": UPDATED_AT}, ) count += 1 return count def migrate_hotel_rooms(source_graph, target_graph) -> dict[str, int]: target_graph.query( "MATCH (n:ResourcePriceRule) WHERE n.migration_source=$source DETACH DELETE n", {"source": MIGRATION_SOURCE}, ) target_graph.query( "MATCH (n:HotelRoomType) WHERE n.migration_source=$source DETACH DELETE n", {"source": MIGRATION_SOURCE}, ) hotel_rows = target_graph.query("MATCH (h:Hotel) RETURN h.hotel_id, h.name").result_set target_hotels = {compact(name): {"hotel_id": hotel_id, "name": name} for hotel_id, name in hotel_rows} rows = source_graph.query( """ MATCH (h:TravelItem)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType) OPTIONAL MATCH (room)-[:ROOM_TYPE_HAS_PRICE_EVENT]->(pe:HotelPriceEvent) RETURN h.name, room, pe """ ).result_set room_seen: set[str] = set() price_seen: set[str] = set() stats = Counter() for hotel_name, room_node, price_node in rows: target = target_hotels.get(compact(hotel_name)) if not target: stats["skipped_unmatched_hotel"] += 1 continue room_props = dict(room_node.properties) old_room_id = str(room_props.get("room_id") or stable_id("ROOM", room_props.get("natural_key") or repr(room_props))) new_room_id = f"{old_room_id}-2_0" if new_room_id not in room_seen: payload = { **room_props, "room_id": new_room_id, "hotel_id": target["hotel_id"], "hotel_name": target["name"], "name": f"{target['name']} - {room_props.get('room_name') or '房型'}", "migration_source": MIGRATION_SOURCE, "inventory_status": room_props.get("inventory_status") or "NEEDS_SUPPLIER_CONFIRM", "inventory_source": room_props.get("inventory_source") or "supplier_pending", "requires_supplier_confirm": "true", "supplier_confirm_notes": "房态/余房未接入实时库存,报价前需供应商确认。", "updated_at": UPDATED_AT, } target_graph.query("CREATE (room:HotelRoomType) SET room += $props", {"props": safe_props(payload)}) target_graph.query( """ MATCH (h:Hotel {hotel_id:$hotel_id}), (room:HotelRoomType {room_id:$room_id}) MERGE (h)-[:HOTEL_HAS_ROOM_TYPE {source:$source}]->(room) """, {"hotel_id": target["hotel_id"], "room_id": new_room_id, "source": MIGRATION_SOURCE}, ) room_seen.add(new_room_id) stats["rooms_created"] += 1 if price_node is None: continue price_props = dict(price_node.properties) old_rate_id = str(price_props.get("rate_id") or stable_id("RATE", price_props.get("natural_key") or repr(price_props))) new_rate_id = f"{old_rate_id}-2_0" if new_rate_id in price_seen: continue payload = { **price_props, "price_rule_id": new_rate_id, "target_type": "HotelRoomType", "target_id": new_room_id, "target_name": f"{target['name']} - {room_props.get('room_name') or ''}".strip(), "resource_type": "Hotel", "resource_name": target["name"], "hotel_id": target["hotel_id"], "hotel_name": target["name"], "room_id": new_room_id, "room_name": room_props.get("room_name") or price_props.get("room_name") or "", "requires_supplier_confirm": "true" if price_props.get("simulated_price") else "false", "migration_source": MIGRATION_SOURCE, "updated_at": UPDATED_AT, } target_graph.query("CREATE (price:ResourcePriceRule) SET price += $props", {"props": safe_props(payload)}) target_graph.query( """ MATCH (room:HotelRoomType {room_id:$room_id}), (price:ResourcePriceRule {price_rule_id:$price_rule_id}) MERGE (room)-[:ROOM_TYPE_HAS_PRICE_RULE {source:$source}]->(price) """, {"room_id": new_room_id, "price_rule_id": new_rate_id, "source": MIGRATION_SOURCE}, ) price_seen.add(new_rate_id) stats["price_rules_created"] += 1 # Cache a short room summary on Hotel for fast customer-service answers. summary_rows = target_graph.query( """ MATCH (h:Hotel)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType) OPTIONAL MATCH (room)-[:ROOM_TYPE_HAS_PRICE_RULE]->(price:ResourcePriceRule) RETURN h.hotel_id, collect(DISTINCT room.room_name), collect(DISTINCT price.price_text) """ ).result_set for hotel_id, rooms, prices in summary_rows: room_summary = "、".join([str(x) for x in rooms if x][:6]) price_summary = ";".join([str(x) for x in prices if x][:4]) target_graph.query( """ MATCH (h:Hotel {hotel_id:$hotel_id}) SET h.room_type_summary=$room_summary, h.room_price_summary=$price_summary, h.requires_supplier_confirm='true', h.supplier_confirm_notes='房态、余房、周末/节假日价需供应商二次确认' """, {"hotel_id": hotel_id, "room_summary": room_summary, "price_summary": price_summary}, ) return dict(stats) def main() -> None: db = FalkorDB(host="localhost", port=6380) source_graph = db.select_graph(SOURCE_GRAPH) target_graph = db.select_graph(TARGET_GRAPH) stats = { "meal_days_patched": patch_meal_fields(target_graph), "scenic_fee_items_patched": patch_fee_status(target_graph), } stats.update(migrate_hotel_rooms(source_graph, target_graph)) verification = { "HotelRoomType": target_graph.query("MATCH (n:HotelRoomType) RETURN count(n)").result_set[0][0], "ResourcePriceRule": target_graph.query("MATCH (n:ResourcePriceRule) RETURN count(n)").result_set[0][0], "ProductDayWithMealPlan": target_graph.query("MATCH (d:ProductDay) WHERE d.meal_plan IS NOT NULL RETURN count(d)").result_set[0][0], "TravelItemWithStatusHint": target_graph.query("MATCH (i:TravelItem) WHERE i.default_status_hint IS NOT NULL RETURN count(i)").result_set[0][0], } print(json.dumps({"stats": stats, "verification": verification}, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()