Files
bxh/scripts/publish_travel_agency_2_3_schema.py

330 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import shutil
from copy import deepcopy
from pathlib import Path
import psycopg
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
from app.config import settings
from common_paths import PROJECT_ROOT, TRAVEL_KG_EXPORT_ROOT
PROJECT_ID = "travel_agency_2_0_test"
TENANT_ID = "travel_agency"
GRAPH_NAME = "travel_agency_2_0_test"
NAMESPACE = "travel_agency_2_0"
SCHEMA_DIR = PROJECT_ROOT / "schema搭建/travel_agency_2_0_test"
DOWNLOAD_DIR = TRAVEL_KG_EXPORT_ROOT / "travel_agency_2_0_test_旅行社2.0测试"
CURRENT_JSON = SCHEMA_DIR / "travel_agency_2_0_schema.current.json"
def add_fields(entity: dict, fields: list[str], after: str | None = None) -> None:
current = list(entity.get("fields") or [])
insert_at = len(current)
if after and after in current:
insert_at = current.index(after) + 1
for field in fields:
if field not in current:
current.insert(insert_at, field)
insert_at += 1
entity["fields"] = current
def schema_to_dsl(payload: dict) -> str:
lines: list[str] = [
"```text",
f"namespace {payload.get('namespace', NAMESPACE)}",
f"version {payload.get('version', '')}",
"",
"// 2.3:采纳最终意见中的 POI 拆分、scenic_id 主景区挂载、Activity 纠偏;保留关系属性绑定,不恢复 Binding 实体。",
]
for name, meta in payload.get("entity_types", {}).items():
lines.append("")
lines.append(f"entity {name} // {meta.get('definition', '')}")
if meta.get("primary_key"):
lines.append(f" primary_key {meta['primary_key']}")
for field in meta.get("fields", []):
lines.append(f" property {field}: Text")
for name, meta in payload.get("relation_types", {}).items():
lines.append("")
lines.append(
f"relation {name}: {meta.get('from', '')} -> {meta.get('to', '')} // {meta.get('definition', '')}"
)
for prop in meta.get("properties", []):
lines.append(f" property {prop}: Text")
lines.extend(["", "// design_principles"])
for item in payload.get("design_principles", []):
lines.append(f"- {item}")
lines.extend(["", "// revision_notes"])
for item in payload.get("revision_notes", []):
lines.append(f"- {item}")
lines.extend(["", "// query_recipes"])
for name, query in payload.get("query_recipes", {}).items():
lines.append(f"{name}:")
lines.append(query)
lines.append("```")
return "\n".join(lines) + "\n"
def build_schema() -> dict:
schema = deepcopy(json.loads(CURRENT_JSON.read_text(encoding="utf-8")))
schema["version"] = "2.3"
schema["display_name"] = "旅行社2.3固定线路 POI 与主景区费用图谱 Schema"
schema["purpose"] = (
"面向旅行社固定线路与小包团销售路线固定、站点有序Hotel/Restaurant 为独立 POI"
"TravelItem 仅承载非 POI 服务/费用;景区费用项统一用 scenic_id 挂到主景区;"
"默认/必付/可选资源状态继续放在 RouteStop/Product 到资源的关系属性中,不恢复 ItineraryResourceBinding 实体;"
"行政区继续属性化,同时预留 region_id避免行政区层级把图谱撑乱。"
)
enums = schema.setdefault("enums", {})
enums["TravelItem.type"] = [
"Ticket",
"ScenicTransport",
"Insurance",
"Vehicle",
"GiftService",
"SmallGroupPackage",
"Shopping",
"Service",
"VideoGuide",
"Guide",
"Pickup",
"Other",
]
enums["ResourceRefType"] = ["Hotel", "Restaurant", "TravelItem"]
enums["ResourceStatus"] = ["INCLUDED", "OPTIONAL", "MANDATORY"]
enums["RouteStop.business_filter"] = ["ROUTE_NODE", "RESOURCE_ONLY", "TEXT_NOTE"]
entity_types = schema.setdefault("entity_types", {})
add_fields(entity_types["ScenicAttraction"], ["short_name", "region_id"], after="name")
add_fields(entity_types["SubAttraction"], ["parent_attraction_id"], after="parent_attraction_name")
add_fields(
entity_types["RouteStop"],
[
"scenic_id",
"main_attraction",
"main_attraction_id",
"sub_attractions",
"region_text",
"business_filter",
],
after="stop_type",
)
add_fields(entity_types["Hotel"], ["region_id", "geo"], after="region_name")
add_fields(entity_types["Restaurant"], ["region_id", "geo"], after="region_name")
add_fields(
entity_types["TravelItem"],
["scenic_id", "raw_evidence", "original_type", "normalized_type_reason"],
after="scenic_name",
)
entity_types["TravelItem"]["definition"] = (
"非 POI 的统一服务/费用资源。门票、小交通、保险、景区二消、车辆、赠送服务、小包团套餐等进入该类型;"
"景区相关费用必须优先填写 scenic_id 并挂主景区Hotel/Restaurant 不放入 TravelItem。"
)
entity_types["Hotel"]["definition"] = (
"独立酒店 POI。酒店有地址、经纬度、图片、星级、房型和淡旺季/周末价格,可跨多条线路复用。"
)
entity_types["Restaurant"]["definition"] = (
"独立餐厅 POI。餐厅有地址、菜系、餐标、容量和图片可跨线路复用并作为行程用餐节点资源。"
)
relation_types = schema.setdefault("relation_types", {})
for rel_name in ["STOP_USES_DEFAULT_RESOURCE", "PRODUCT_USES_DEFAULT_RESOURCE"]:
props = list(relation_types[rel_name].get("properties") or [])
for prop in ["binding_id", "product_id", "day_index", "route_stop_id"]:
if prop not in props:
props.insert(0, prop)
relation_types[rel_name]["properties"] = props
relation_types["ATTRACTION_HAS_ITEM"]["definition"] = (
"主景区拥有的非 POI 服务/费用项,例如门票、小交通、保险、二消、视频讲解。费用项只挂主景区,不挂子景点。"
)
props = list(relation_types["ATTRACTION_HAS_ITEM"].get("properties") or [])
for prop in ["scenic_id", "raw_evidence", "normalized_type_reason"]:
if prop not in props:
props.append(prop)
relation_types["ATTRACTION_HAS_ITEM"]["properties"] = props
schema["query_recipes"] = {
"route_bus_like_view": "MATCH (p:TourProduct)-[:PRODUCT_HAS_DAY]->(d:ProductDay)-[:DAY_HAS_STOP]->(s:RouteStop) WHERE p.product_id=$product_id AND coalesce(s.business_filter,'ROUTE_NODE')='ROUTE_NODE' RETURN p,d,s ORDER BY d.day_no,s.order_no",
"default_resources_for_stop": "MATCH (s:RouteStop)-[r:STOP_USES_DEFAULT_RESOURCE]->(resource) WHERE s.stop_id=$stop_id RETURN s,r,labels(resource) AS resource_labels,resource ORDER BY r.default_rank",
"nearby_hotels_restaurants": "MATCH (s:RouteStop)-[:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction)-[near:ATTRACTION_NEARBY_RESOURCE]->(resource) WHERE s.stop_id=$stop_id AND near.resource_type IN $types RETURN a,near,labels(resource) AS resource_labels,resource ORDER BY near.is_partner DESC, near.driving_minutes ASC, near.rank ASC",
"scenic_fee_items_by_scenic_id": "MATCH (a:ScenicAttraction)-[r:ATTRACTION_HAS_ITEM]->(item:TravelItem) WHERE a.attraction_id=$scenic_id OR item.scenic_id=$scenic_id RETURN a,r,item ORDER BY item.type,item.name",
"route_resource_bindings": "MATCH (p:TourProduct)-[:PRODUCT_HAS_DAY]->(d:ProductDay)-[:DAY_HAS_STOP]->(s:RouteStop)-[r:STOP_USES_DEFAULT_RESOURCE]->(resource) WHERE p.product_id=$product_id RETURN p,d,s,r,labels(resource) AS resource_labels,resource ORDER BY d.day_no,s.order_no,r.default_rank",
"hotel_room_price_rules": "MATCH (hotel:Hotel)-[:HOTEL_HAS_ROOM_TYPE]->(room:HotelRoomType)-[r:ROOM_TYPE_HAS_PRICE_RULE]->(price:ResourcePriceRule) WHERE hotel.hotel_id=$hotel_id RETURN hotel,room,r,price ORDER BY room.room_name,price.season,price.event_type",
"restaurant_price_rules": "MATCH (restaurant:Restaurant)-[r:RESTAURANT_HAS_PRICE_RULE]->(price:ResourcePriceRule) WHERE restaurant.restaurant_id=$restaurant_id RETURN restaurant,r,price ORDER BY price.event_type,price.date_range",
"service_price_rules": "MATCH (item:TravelItem)-[r:ITEM_HAS_PRICE_RULE]->(price:ResourcePriceRule) WHERE item.item_id=$item_id RETURN item,r,price ORDER BY price.event_type,price.date_range",
}
schema["design_principles"] = [
"路线像公交线路TourProduct -> ProductDay -> RouteStop -> ScenicAttraction站点顺序固定不让推荐逻辑随意改线。",
"POI 与费用分层Hotel、Restaurant、ScenicAttraction/SubAttraction 是有物理位置的 POITicket、ScenicTransport、Insurance、Vehicle、GiftService、SmallGroupPackage 等是 TravelItem。",
"Activity 纠偏:扶梯、观光车、电瓶车、索道、游船等历史上误分到 Activity 的项目统一归一为 ScenicTransport非交通体验可放 Service/GiftService/Shopping。",
"费用挂主景区Ticket、ScenicTransport、Insurance、景区二消等 TravelItem 必须填 scenic_id且只挂主景区不挂子景点。",
"默认配置轻量化:不恢复 ItineraryResourceBinding 实体;默认/必付/可选状态由 STOP_USES_DEFAULT_RESOURCE/PRODUCT_USES_DEFAULT_RESOURCE 的关系属性表达。",
"资源替换路径:住宿/餐饮的替换推荐走 ScenicAttraction -> ATTRACTION_NEARBY_RESOURCE -> Hotel/Restaurant。",
"行政区轻量化:不恢复 AdministrativeRegion 实体province/city/county/town/region_name/region_id 作为属性用于过滤和后续数据补全。",
"RouteStop 清洁:小吃街、夜市、长桌宴、餐馆等资源型文本不应进入路线站点;只保留出发/城市/景区/交通/返程等路线节点。",
"价格规则统一TravelItem、HotelRoomType、Restaurant 的价格都通过 ResourcePriceRule 表达。",
"数据可追溯:保留 source_file/source_row/raw_evidence/data_quality方便服务商补正。",
]
schema["not_in_scope_for_2_3"] = [
"CustomerSelection 暂不入图谱;客户最终选择放订单/会话侧。",
"AdministrativeRegion 暂不作为实体层级;当前用 region_id 和行政区属性过滤。",
"ItineraryResourceBinding 暂不作为实体;当前用关系属性表达绑定,后期订单级选择复杂后再升级。",
"SubAttraction 不承接费用项;费用项统一 scenic_id 挂主景区。",
"Inventory/InventorySnapshot 暂不作为实体;当前仅标记 inventory_status/requires_supplier_confirm。",
]
schema["revision_notes"] = [
"采纳 Hotel/Restaurant 独立 POI并保留 2.2 的 HotelRoomType/RestaurantPriceRule 设计。",
"采纳 fee 加 scenic_id景区费用项通过 ATTRACTION_HAS_ITEM 挂主景区。",
"采纳 Activity 纠偏:删除 TravelItem.type 中的 Activity交通/游览类二消统一使用 ScenicTransport。",
"部分采纳 Binding 意见:不恢复 ItineraryResourceBinding 实体,但在默认资源关系上增加 binding_id/resource_type/resource_id/product_id/day_index/route_stop_id。",
"不采纳 AdministrativeRegion 实体化:当前业务优先查询路线、景区、酒店、餐饮和费用,行政区作为属性更利于减少关系数量。",
"数据管线下一步需要输出 route_product/day/stop、fee_item、hotel_poi、restaurant_poi、scenic_items_summary 等模板文件。",
]
return schema
def write_schema_files(schema: dict) -> dict[str, str]:
SCHEMA_DIR.mkdir(parents=True, exist_ok=True)
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
paths = {
"current_json": SCHEMA_DIR / "travel_agency_2_0_schema.current.json",
"current_dsl": SCHEMA_DIR / "travel_agency_2_0_schema.current.dsl.md",
"v_json": SCHEMA_DIR / "travel_agency_2_0_schema.v2_3.json",
"v_dsl": SCHEMA_DIR / "travel_agency_2_0_schema.v2_3.dsl.md",
"sample_json": SCHEMA_DIR / "travel_agency_2_0_schema.v2_3.original_sample.json",
"sample_dsl": SCHEMA_DIR / "travel_agency_2_0_schema.v2_3.original_sample.dsl.md",
}
json_text = json.dumps(schema, ensure_ascii=False, indent=2)
dsl_text = schema_to_dsl(schema)
for path in paths.values():
path.write_text(json_text if path.suffix == ".json" else dsl_text, encoding="utf-8")
shutil.copy2(path, DOWNLOAD_DIR / path.name)
readme = """# 旅行社2.0测试 Schema
当前 active 版本:`2.3`。
2.3 采纳内容:
- `Hotel`、`Restaurant` 独立 POI。
- `TravelItem` 只保留非 POI 服务/费用。
- `TravelItem.scenic_id` 挂主景区,景区费用项走 `ATTRACTION_HAS_ITEM`。
- `Activity` 不再作为核心类型;扶梯/观光车/电瓶车/索道/游船统一归 `ScenicTransport`。
- 默认资源绑定继续走关系属性,增加 `binding_id/resource_type/resource_id/product_id/day_index/route_stop_id`。
- 行政区不单独建实体,保留 `region_id` 与行政区属性。
"""
(SCHEMA_DIR / "README.md").write_text(readme, encoding="utf-8")
(DOWNLOAD_DIR / "README.md").write_text(readme, encoding="utf-8")
return {key: str(path) for key, path in paths.items()}
def publish_schema(schema: dict) -> int:
with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
with conn.cursor() as cur:
cur.execute(
f"""
UPDATE {settings.db_schema}.ontology_schemas
SET status='published', updated_at=now()
WHERE tenant_id=%s AND project_id=%s AND status='active'
""",
(TENANT_ID, PROJECT_ID),
)
cur.execute(
f"""
INSERT INTO {settings.db_schema}.ontology_schemas (
tenant_id, project_id, namespace, version, display_name, description,
status, schema_jsonb, created_by, published_by, published_at, updated_at
)
VALUES (%s,%s,%s,%s,%s,%s,'active',%s,%s,%s,now(),now())
RETURNING id
""",
(
TENANT_ID,
PROJECT_ID,
NAMESPACE,
5,
schema["display_name"],
schema["purpose"],
Jsonb(schema),
"codex",
"codex",
),
)
schema_id = cur.fetchone()["id"]
cur.execute(
f"""
UPDATE {settings.db_schema}.graph_releases
SET schema_id=%s,
graph_release_id=%s,
graph_name=%s,
status='active',
activated_at=now(),
updated_at=now(),
metadata_jsonb = coalesce(metadata_jsonb, '{{}}'::jsonb) || %s::jsonb
WHERE tenant_id=%s AND project_id=%s AND alias='active'
""",
(
schema_id,
"travel_agency_2_0_test_v2_3",
GRAPH_NAME,
Jsonb({"schema_version": "2.3", "schema_id": schema_id}),
TENANT_ID,
PROJECT_ID,
),
)
cur.execute(
f"""
INSERT INTO {settings.db_schema}.import_templates (
template_id, version, display_name, primary_entity, template_jsonb, status, updated_at
)
VALUES (%s, 4, %s, 'TourProduct', %s, 'active', now())
ON CONFLICT (template_id, version) DO UPDATE
SET display_name=EXCLUDED.display_name,
primary_entity=EXCLUDED.primary_entity,
template_jsonb=EXCLUDED.template_jsonb,
status=EXCLUDED.status,
updated_at=now()
""",
(
"travel_agency_2_0_fixed_route_nearby_v1",
"旅行社2.3固定线路 POI 与主景区费用导入模板",
Jsonb(
{
"schema_version": "2.3",
"entity_types": list(schema["entity_types"].keys()),
"relation_types": list(schema["relation_types"].keys()),
"notes": "Hotel/Restaurant 独立 POITravelItem 费用项必须挂 scenic_id默认配置关系携带 binding_id/resource_type/resource_id。",
}
),
),
)
conn.commit()
return schema_id
def main() -> None:
schema = build_schema()
files = write_schema_files(schema)
schema_id = publish_schema(schema)
print(
json.dumps(
{
"schema_id": schema_id,
"schema_version": schema["version"],
"entities": list(schema["entity_types"].keys()),
"relations": list(schema["relation_types"].keys()),
"travel_item_types": schema["enums"]["TravelItem.type"],
"files": files,
},
ensure_ascii=False,
indent=2,
)
)
if __name__ == "__main__":
main()