249 lines
10 KiB
Python
249 lines
10 KiB
Python
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import json
|
||
import re
|
||
from collections import Counter
|
||
from datetime import datetime
|
||
from typing import Any
|
||
|
||
from falkordb import FalkorDB
|
||
|
||
|
||
SOURCE_GRAPH = "travel_fixed_route_item"
|
||
TARGET_GRAPH = "travel_agency_2_0_test"
|
||
MIGRATION_SOURCE = "travel_fixed_route_item_hotel_rates_v1"
|
||
UPDATED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def stable_id(prefix: str, text: str) -> str:
|
||
return f"{prefix}-{hashlib.sha1(text.encode('utf-8')).hexdigest()[:10].upper()}"
|
||
|
||
|
||
def compact(value: Any) -> str:
|
||
return re.sub(r"[\s·,,/、()()【】\\++-]+", "", str(value or ""))
|
||
|
||
|
||
def safe_props(payload: dict[str, Any]) -> dict[str, Any]:
|
||
safe: dict[str, Any] = {}
|
||
for key, value in payload.items():
|
||
if value is None:
|
||
continue
|
||
if isinstance(value, (str, int, float, bool)):
|
||
safe[key] = value
|
||
elif isinstance(value, (list, dict)):
|
||
safe[key] = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
safe[key] = str(value)
|
||
return safe
|
||
|
||
|
||
def meal_flags(meal_text: str) -> dict[str, str]:
|
||
text = str(meal_text or "").strip()
|
||
if not text or text in {"/", "无", "不含"}:
|
||
return {
|
||
"meal_plan": text or "未标注",
|
||
"meal_status": "NOT_INCLUDED_OR_UNKNOWN" if text else "UNKNOWN",
|
||
"breakfast_included": "false",
|
||
"lunch_included": "false",
|
||
"dinner_included": "false",
|
||
}
|
||
return {
|
||
"meal_plan": text,
|
||
"meal_status": "SOURCE_EXTRACTED",
|
||
"breakfast_included": "true" if "早" in text else "false",
|
||
"lunch_included": "true" if "中" in text or "午" in text else "false",
|
||
"dinner_included": "true" if "晚" in text else "false",
|
||
}
|
||
|
||
|
||
def scenic_fee_status(name: str) -> tuple[str, str, str]:
|
||
text = compact(name)
|
||
if any(token in text for token in ("电瓶车", "环保车", "观光车", "摆渡车", "小交通", "保险", "扶梯", "索道", "游船", "旅拍代金券", "青龙洞")):
|
||
return "OPTIONAL", "scenic_optional_fee", "景区观光车/小交通/保险/扶梯/索道/游船等先按游客可选/升级项处理;基础门票价格待供应商补充。"
|
||
return "REVIEW", "scenic_fee_review", "费用状态需供应商或景区政策确认。"
|
||
|
||
|
||
def patch_meal_fields(graph) -> int:
|
||
rows = graph.query("MATCH (d:ProductDay) RETURN d.day_id, d.meal_text").result_set
|
||
count = 0
|
||
for day_id, meal_text in rows:
|
||
props = meal_flags(meal_text)
|
||
props["customer_day_summary"] = f"用餐:{props['meal_plan']}"
|
||
graph.query(
|
||
"MATCH (d:ProductDay {day_id:$day_id}) SET d += $props",
|
||
{"day_id": day_id, "props": props},
|
||
)
|
||
count += 1
|
||
|
||
stop_rows = graph.query("MATCH (s:RouteStop) RETURN s.stop_id, s.meal_text").result_set
|
||
for stop_id, meal_text in stop_rows:
|
||
if not meal_text:
|
||
continue
|
||
graph.query(
|
||
"MATCH (s:RouteStop {stop_id:$stop_id}) SET s.meal_plan=$meal_plan",
|
||
{"stop_id": stop_id, "meal_plan": str(meal_text)},
|
||
)
|
||
return count
|
||
|
||
|
||
def patch_fee_status(graph) -> int:
|
||
rows = graph.query("MATCH (i:TravelItem) WHERE i.scenic_id IS NOT NULL OR i.scenic_name IS NOT NULL RETURN i.item_id, i.name").result_set
|
||
count = 0
|
||
for item_id, name in rows:
|
||
status, scope, note = scenic_fee_status(name)
|
||
graph.query(
|
||
"""
|
||
MATCH (i:TravelItem {item_id:$item_id})
|
||
SET i.default_status_hint=$status,
|
||
i.item_scope=$scope,
|
||
i.status_source='customer_service_script_and_fee_name_rule',
|
||
i.mandatory_fee_policy=$note,
|
||
i.customer_visible='true',
|
||
i.updated_at=$updated_at
|
||
""",
|
||
{"item_id": item_id, "status": status, "scope": scope, "note": note, "updated_at": UPDATED_AT},
|
||
)
|
||
count += 1
|
||
return count
|
||
|
||
|
||
def migrate_hotel_rooms(source_graph, target_graph) -> dict[str, int]:
|
||
target_graph.query(
|
||
"MATCH (n:ResourcePriceRule) WHERE n.migration_source=$source DETACH DELETE n",
|
||
{"source": MIGRATION_SOURCE},
|
||
)
|
||
target_graph.query(
|
||
"MATCH (n:HotelRoomType) WHERE n.migration_source=$source DETACH DELETE n",
|
||
{"source": MIGRATION_SOURCE},
|
||
)
|
||
|
||
hotel_rows = target_graph.query("MATCH (h:Hotel) RETURN h.hotel_id, h.name").result_set
|
||
target_hotels = {compact(name): {"hotel_id": hotel_id, "name": name} for hotel_id, name in hotel_rows}
|
||
|
||
rows = source_graph.query(
|
||
"""
|
||
MATCH (h:TravelItem)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType)
|
||
OPTIONAL MATCH (room)-[:ROOM_TYPE_HAS_PRICE_EVENT]->(pe:HotelPriceEvent)
|
||
RETURN h.name, room, pe
|
||
"""
|
||
).result_set
|
||
|
||
room_seen: set[str] = set()
|
||
price_seen: set[str] = set()
|
||
stats = Counter()
|
||
for hotel_name, room_node, price_node in rows:
|
||
target = target_hotels.get(compact(hotel_name))
|
||
if not target:
|
||
stats["skipped_unmatched_hotel"] += 1
|
||
continue
|
||
|
||
room_props = dict(room_node.properties)
|
||
old_room_id = str(room_props.get("room_id") or stable_id("ROOM", room_props.get("natural_key") or repr(room_props)))
|
||
new_room_id = f"{old_room_id}-2_0"
|
||
if new_room_id not in room_seen:
|
||
payload = {
|
||
**room_props,
|
||
"room_id": new_room_id,
|
||
"hotel_id": target["hotel_id"],
|
||
"hotel_name": target["name"],
|
||
"name": f"{target['name']} - {room_props.get('room_name') or '房型'}",
|
||
"migration_source": MIGRATION_SOURCE,
|
||
"inventory_status": room_props.get("inventory_status") or "NEEDS_SUPPLIER_CONFIRM",
|
||
"inventory_source": room_props.get("inventory_source") or "supplier_pending",
|
||
"requires_supplier_confirm": "true",
|
||
"supplier_confirm_notes": "房态/余房未接入实时库存,报价前需供应商确认。",
|
||
"updated_at": UPDATED_AT,
|
||
}
|
||
target_graph.query("CREATE (room:HotelRoomType) SET room += $props", {"props": safe_props(payload)})
|
||
target_graph.query(
|
||
"""
|
||
MATCH (h:Hotel {hotel_id:$hotel_id}), (room:HotelRoomType {room_id:$room_id})
|
||
MERGE (h)-[:HOTEL_HAS_ROOM_TYPE {source:$source}]->(room)
|
||
""",
|
||
{"hotel_id": target["hotel_id"], "room_id": new_room_id, "source": MIGRATION_SOURCE},
|
||
)
|
||
room_seen.add(new_room_id)
|
||
stats["rooms_created"] += 1
|
||
|
||
if price_node is None:
|
||
continue
|
||
price_props = dict(price_node.properties)
|
||
old_rate_id = str(price_props.get("rate_id") or stable_id("RATE", price_props.get("natural_key") or repr(price_props)))
|
||
new_rate_id = f"{old_rate_id}-2_0"
|
||
if new_rate_id in price_seen:
|
||
continue
|
||
payload = {
|
||
**price_props,
|
||
"price_rule_id": new_rate_id,
|
||
"target_type": "HotelRoomType",
|
||
"target_id": new_room_id,
|
||
"target_name": f"{target['name']} - {room_props.get('room_name') or ''}".strip(),
|
||
"resource_type": "Hotel",
|
||
"resource_name": target["name"],
|
||
"hotel_id": target["hotel_id"],
|
||
"hotel_name": target["name"],
|
||
"room_id": new_room_id,
|
||
"room_name": room_props.get("room_name") or price_props.get("room_name") or "",
|
||
"requires_supplier_confirm": "true" if price_props.get("simulated_price") else "false",
|
||
"migration_source": MIGRATION_SOURCE,
|
||
"updated_at": UPDATED_AT,
|
||
}
|
||
target_graph.query("CREATE (price:ResourcePriceRule) SET price += $props", {"props": safe_props(payload)})
|
||
target_graph.query(
|
||
"""
|
||
MATCH (room:HotelRoomType {room_id:$room_id}), (price:ResourcePriceRule {price_rule_id:$price_rule_id})
|
||
MERGE (room)-[:ROOM_TYPE_HAS_PRICE_RULE {source:$source}]->(price)
|
||
""",
|
||
{"room_id": new_room_id, "price_rule_id": new_rate_id, "source": MIGRATION_SOURCE},
|
||
)
|
||
price_seen.add(new_rate_id)
|
||
stats["price_rules_created"] += 1
|
||
|
||
# Cache a short room summary on Hotel for fast customer-service answers.
|
||
summary_rows = target_graph.query(
|
||
"""
|
||
MATCH (h:Hotel)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType)
|
||
OPTIONAL MATCH (room)-[:ROOM_TYPE_HAS_PRICE_RULE]->(price:ResourcePriceRule)
|
||
RETURN h.hotel_id, collect(DISTINCT room.room_name), collect(DISTINCT price.price_text)
|
||
"""
|
||
).result_set
|
||
for hotel_id, rooms, prices in summary_rows:
|
||
room_summary = "、".join([str(x) for x in rooms if x][:6])
|
||
price_summary = ";".join([str(x) for x in prices if x][:4])
|
||
target_graph.query(
|
||
"""
|
||
MATCH (h:Hotel {hotel_id:$hotel_id})
|
||
SET h.room_type_summary=$room_summary,
|
||
h.room_price_summary=$price_summary,
|
||
h.requires_supplier_confirm='true',
|
||
h.supplier_confirm_notes='房态、余房、周末/节假日价需供应商二次确认'
|
||
""",
|
||
{"hotel_id": hotel_id, "room_summary": room_summary, "price_summary": price_summary},
|
||
)
|
||
return dict(stats)
|
||
|
||
|
||
def main() -> None:
|
||
db = FalkorDB(host="localhost", port=6380)
|
||
source_graph = db.select_graph(SOURCE_GRAPH)
|
||
target_graph = db.select_graph(TARGET_GRAPH)
|
||
|
||
stats = {
|
||
"meal_days_patched": patch_meal_fields(target_graph),
|
||
"scenic_fee_items_patched": patch_fee_status(target_graph),
|
||
}
|
||
stats.update(migrate_hotel_rooms(source_graph, target_graph))
|
||
|
||
verification = {
|
||
"HotelRoomType": target_graph.query("MATCH (n:HotelRoomType) RETURN count(n)").result_set[0][0],
|
||
"ResourcePriceRule": target_graph.query("MATCH (n:ResourcePriceRule) RETURN count(n)").result_set[0][0],
|
||
"ProductDayWithMealPlan": target_graph.query("MATCH (d:ProductDay) WHERE d.meal_plan IS NOT NULL RETURN count(d)").result_set[0][0],
|
||
"TravelItemWithStatusHint": target_graph.query("MATCH (i:TravelItem) WHERE i.default_status_hint IS NOT NULL RETURN count(i)").result_set[0][0],
|
||
}
|
||
print(json.dumps({"stats": stats, "verification": verification}, ensure_ascii=False, indent=2))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|