Files
bxh/scripts/publish_travel_agency_2_4_schema.py

363 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import shutil
from copy import deepcopy
from pathlib import Path
import psycopg
from psycopg.rows import dict_row
from psycopg.types.json import Jsonb
from app.config import settings
from common_paths import PROJECT_ROOT, TRAVEL_KG_EXPORT_ROOT
PROJECT_ID = "travel_agency_2_0_test"
TENANT_ID = "travel_agency"
GRAPH_NAME = "travel_agency_2_0_test"
NAMESPACE = "travel_agency_2_0"
SCHEMA_DIR = PROJECT_ROOT / "schema搭建/travel_agency_2_0_test"
DOWNLOAD_DIR = TRAVEL_KG_EXPORT_ROOT / "travel_agency_2_0_test_旅行社2.0测试"
CURRENT_JSON = SCHEMA_DIR / "travel_agency_2_0_schema.current.json"
def add_fields(entity: dict, fields: list[str], after: str | None = None) -> None:
current = list(entity.get("fields") or [])
insert_at = len(current)
if after and after in current:
insert_at = current.index(after) + 1
for field in fields:
if field not in current:
current.insert(insert_at, field)
insert_at += 1
entity["fields"] = current
def schema_to_dsl(payload: dict) -> str:
lines: list[str] = [
"```text",
f"namespace {payload.get('namespace', NAMESPACE)}",
f"version {payload.get('version', '')}",
"",
"// 2.4:面向客服快速问答,保留固定线路/nearby 资源主干,新增轻量 BusinessRule 规则层。",
]
for name, meta in payload.get("entity_types", {}).items():
lines.append("")
lines.append(f"entity {name} // {meta.get('definition', '')}")
if meta.get("primary_key"):
lines.append(f" primary_key {meta['primary_key']}")
for field in meta.get("fields", []):
lines.append(f" property {field}: Text")
for name, meta in payload.get("relation_types", {}).items():
lines.append("")
lines.append(
f"relation {name}: {meta.get('from', '')} -> {meta.get('to', '')} // {meta.get('definition', '')}"
)
for prop in meta.get("properties", []):
lines.append(f" property {prop}: Text")
lines.extend(["", "// design_principles"])
for item in payload.get("design_principles", []):
lines.append(f"- {item}")
lines.extend(["", "// revision_notes"])
for item in payload.get("revision_notes", []):
lines.append(f"- {item}")
lines.extend(["", "// query_recipes"])
for name, query in payload.get("query_recipes", {}).items():
lines.append(f"{name}:")
lines.append(query)
lines.append("```")
return "\n".join(lines) + "\n"
def build_schema() -> dict:
schema = deepcopy(json.loads(CURRENT_JSON.read_text(encoding="utf-8")))
schema["version"] = "2.4"
schema["display_name"] = "旅行社2.4客服固定线路资源图谱 Schema"
schema["purpose"] = (
"面向旅行社客服与小包团销售固定路线像公交线路一样可查RouteStop 串联景区;"
"Hotel/Restaurant 作为 POI 通过 nearby 推荐;门票、小交通、保险、车辆等非 POI 放 TravelItem"
"客服高频的退改、付款、成团、老人儿童、车型和费用包含通过轻量 BusinessRule 查询。"
)
enums = schema.setdefault("enums", {})
enums["BusinessRule.rule_type"] = [
"InquiryIntake",
"ProductRecommendation",
"Payment",
"Refund",
"GroupSize",
"VehicleCapacity",
"HotelPolicy",
"MealPolicy",
"TicketDiscount",
"ScenicFee",
"ElderChildCare",
"ShoppingPromise",
"ServicePromise",
"RiskNotice",
"SupplierConfirmation",
"Other",
]
enums["BusinessRule.scope"] = ["Global", "Product", "ScenicAttraction", "TravelItem", "Hotel", "Restaurant"]
entity_types = schema.setdefault("entity_types", {})
add_fields(
entity_types["TourProduct"],
[
"sales_priority",
"supported_customer_scenarios",
"group_size_rule_text",
"deposit_policy",
"payment_policy",
"refund_policy",
"meal_policy",
"shopping_policy",
"service_promise",
"elder_child_policy",
"quote_required_fields",
],
after="group_size_max",
)
add_fields(
entity_types["ProductDay"],
[
"breakfast_included",
"lunch_included",
"dinner_included",
"meal_status",
"hotel_area",
"hotel_grade_default",
"customer_day_summary",
],
after="meal_plan",
)
add_fields(
entity_types["RouteStop"],
[
"customer_visible",
"can_replace_resource",
"walk_note",
"elderly_note",
"child_note",
"expense_summary",
],
after="route_display_text",
)
add_fields(
entity_types["ScenicAttraction"],
[
"must_pay_items_summary",
"optional_items_summary",
"reservation_required",
"id_card_required",
],
after="free_ticket_policy",
)
add_fields(
entity_types["Hotel"],
[
"has_elevator",
"low_floor_available",
"family_room_available",
"room_count_policy",
"booking_note",
],
after="breakfast_policy",
)
add_fields(
entity_types["Restaurant"],
[
"meal_standard_text",
"suitable_group_size",
"booking_note",
"classification_status",
],
after="meal_type",
)
add_fields(
entity_types["TravelItem"],
[
"customer_visible",
"item_scope",
"default_status_hint",
"status_source",
"dedupe_key",
"canonical_item_id",
],
after="normalized_type_reason",
)
entity_types["BusinessRule"] = {
"definition": "客服高频业务规则与话术依据。只承载可复用规则,不把客户单次选择写进图谱。",
"primary_key": "rule_id",
"fields": [
"rule_id",
"rule_type",
"scope",
"target_type",
"target_id",
"title",
"content",
"customer_reply",
"trigger_keywords",
"applies_to_days",
"applies_to_group_type",
"priority",
"source_file",
"source_text",
"data_quality",
"updated_at",
],
}
relation_types = schema.setdefault("relation_types", {})
relation_types["RULE_APPLIES_TO"] = {
"from": "BusinessRule",
"to": "TourProduct|ScenicAttraction|TravelItem|Hotel|Restaurant",
"definition": "业务规则适用到产品、景区、费用项、酒店或餐厅。全局规则可只保留 target_type/target_id 属性,不强制连边。",
"properties": ["target_type", "target_id", "scope", "priority", "source_file", "evidence_level"],
}
schema["design_principles"] = [
"主干不变TourProduct -> ProductDay -> RouteStop -> ScenicAttraction路线固定且有序。",
"附近资源不写死到产品Hotel/Restaurant 通过 ScenicAttraction -> ATTRACTION_NEARBY_RESOURCE 动态推荐。",
"默认资源不污染资源本体RouteStop/Product 到资源的默认、包含、必付、可选状态仍放关系属性。",
"非 POI 统一 TravelItem门票、小交通、保险、车辆、赠送服务、视频讲解等不再拆一堆实体。",
"客服规则单独轻量化:退改、付款、成团、老人儿童、车型等放 BusinessRule避免在长文本里临时找答案。",
"不引入 CustomerSelection客户选择属于订单/会话侧,暂不进入基础知识图谱。",
"行政区继续属性化:当前业务按景区/线路/资源查询,行政区不再额外建层级关系。",
"数据不确定必须标注:模拟价格、供应商待确认、政策冲突必须写 data_quality/requires_supplier_confirm。",
]
schema["revision_notes"] = [
"从 2.3 升级到 2.4,新增 BusinessRule 和 RULE_APPLIES_TO。",
"补充 TourProduct/ProductDay/RouteStop 的客服展示字段,支撑住哪、吃几餐、是否可替换、费用关注点。",
"补充 Hotel/Restaurant 的客服筛选字段,支撑电梯、低楼层、家庭房、餐标和团型适配。",
"补充 TravelItem 的去重与状态提示字段,解决同一观光车/扶梯被多个文本口径重复抽取的问题。",
"继续保留 HotelRoomType 和 ResourcePriceRule当前图谱数据中这两类节点为空需要后续从住宿资源库补入。",
]
schema["query_recipes"] = {
**schema.get("query_recipes", {}),
"customer_route_recommendation": "MATCH (p:TourProduct) WHERE toInteger(p.duration_days)=$days AND coalesce(p.small_group_supported,false)=true RETURN p ORDER BY toInteger(coalesce(p.sales_priority,'999')), toFloat(coalesce(p.base_price_min_adult,'999999')) LIMIT 5",
"customer_day_detail": "MATCH (p:TourProduct)-[:PRODUCT_HAS_DAY]->(d:ProductDay)-[:DAY_HAS_STOP]->(s:RouteStop) WHERE p.product_id=$product_id RETURN p,d,s ORDER BY toInteger(d.day_no), toInteger(s.order_no)",
"route_fee_summary": "MATCH (p:TourProduct)-[:PRODUCT_HAS_DAY]->(d:ProductDay)-[:DAY_HAS_STOP]->(s:RouteStop)-[:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction)-[:ATTRACTION_HAS_ITEM]->(item:TravelItem) WHERE p.product_id=$product_id RETURN a.name,item.type,item.name,item.price,item.default_status_hint,item.mandatory_fee_policy ORDER BY a.name,item.type,item.price",
"nearby_replaceable_resources": "MATCH (s:RouteStop)-[:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction)-[near:ATTRACTION_NEARBY_RESOURCE]->(resource) WHERE s.stop_id=$stop_id AND labels(resource)[0] IN ['Hotel','Restaurant'] RETURN a,near,resource ORDER BY near.resource_type, toInteger(coalesce(near.rank,'999')) LIMIT 20",
"vehicle_capacity_for_people": "MATCH (v:TravelItem) WHERE v.type='Vehicle' AND toInteger(coalesce(v.safe_passenger_capacity,v.seat_count,'0')) >= $people RETURN v ORDER BY toInteger(coalesce(v.safe_passenger_capacity,v.seat_count,'999')) LIMIT 5",
"customer_service_rules": "MATCH (r:BusinessRule) WHERE r.rule_type IN $rule_types OR any(k IN split(coalesce(r.trigger_keywords,''),'|') WHERE $question CONTAINS k) RETURN r ORDER BY toInteger(coalesce(r.priority,'999')) LIMIT 10",
}
return schema
def write_schema_files(schema: dict) -> dict[str, str]:
SCHEMA_DIR.mkdir(parents=True, exist_ok=True)
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
paths = {
"current_json": SCHEMA_DIR / "travel_agency_2_0_schema.current.json",
"current_dsl": SCHEMA_DIR / "travel_agency_2_0_schema.current.dsl.md",
"v_json": SCHEMA_DIR / "travel_agency_2_0_schema.v2_4.json",
"v_dsl": SCHEMA_DIR / "travel_agency_2_0_schema.v2_4.dsl.md",
"sample_json": SCHEMA_DIR / "travel_agency_2_0_schema.v2_4.original_sample.json",
"sample_dsl": SCHEMA_DIR / "travel_agency_2_0_schema.v2_4.original_sample.dsl.md",
}
json_text = json.dumps(schema, ensure_ascii=False, indent=2)
dsl_text = schema_to_dsl(schema)
for path in paths.values():
path.write_text(json_text if path.suffix == ".json" else dsl_text, encoding="utf-8")
shutil.copy2(path, DOWNLOAD_DIR / path.name)
(DOWNLOAD_DIR / "当前schema文件位置.md").write_text(
"\n".join(
[
"# 当前旅行社2.0 Schema 文件位置",
"",
f"- DSL 源码:`{paths['v_dsl']}`",
f"- JSON`{paths['v_json']}`",
f"- 系统页面:`http://localhost:8102/admin/modeling/schema`",
"",
"当前版本2.4。新增客服业务规则层 BusinessRule不新增 CustomerSelection。",
]
)
+ "\n",
encoding="utf-8",
)
return {key: str(path) for key, path in paths.items()}
def publish_schema(schema: dict) -> int:
with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
with conn.cursor() as cur:
cur.execute(
f"""
UPDATE {settings.db_schema}.ontology_schemas
SET status='published', updated_at=now()
WHERE tenant_id=%s AND project_id=%s AND status='active'
""",
(TENANT_ID, PROJECT_ID),
)
cur.execute(
f"""
INSERT INTO {settings.db_schema}.ontology_schemas (
tenant_id, project_id, namespace, version, display_name, description,
status, schema_jsonb, created_by, published_by, published_at, updated_at
)
VALUES (%s,%s,%s,%s,%s,%s,'active',%s,%s,%s,now(),now())
RETURNING id
""",
(
TENANT_ID,
PROJECT_ID,
NAMESPACE,
6,
schema["display_name"],
schema["purpose"],
Jsonb(schema),
"codex",
"codex",
),
)
schema_id = cur.fetchone()["id"]
cur.execute(
f"""
UPDATE {settings.db_schema}.graph_releases
SET schema_id=%s,
graph_release_id=%s,
graph_name=%s,
status='active',
activated_at=now(),
updated_at=now(),
metadata_jsonb = coalesce(metadata_jsonb, '{{}}'::jsonb) || %s::jsonb
WHERE tenant_id=%s AND project_id=%s AND alias='active'
""",
(
schema_id,
"travel_agency_2_0_test_v2_4",
GRAPH_NAME,
Jsonb({"schema_version": "2.4", "schema_id": schema_id}),
TENANT_ID,
PROJECT_ID,
),
)
conn.commit()
return schema_id
def main() -> None:
schema = build_schema()
files = write_schema_files(schema)
schema_id = publish_schema(schema)
print(
json.dumps(
{
"schema_id": schema_id,
"schema_version": schema["version"],
"entity_count": len(schema["entity_types"]),
"relation_count": len(schema["relation_types"]),
"files": files,
},
ensure_ascii=False,
indent=2,
)
)
if __name__ == "__main__":
main()