705 lines
31 KiB
Python
705 lines
31 KiB
Python
from __future__ import annotations
|
||
|
||
import csv
|
||
import hashlib
|
||
import json
|
||
import re
|
||
from collections import Counter, defaultdict
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import psycopg
|
||
from falkordb import FalkorDB
|
||
from psycopg.rows import dict_row
|
||
from psycopg.types.json import Jsonb
|
||
|
||
from common_paths import TRAVEL_DELIVERY_ROOT, TRAVEL_KG_EXPORT_ROOT
|
||
|
||
DB_URL = "postgresql://admin:password@localhost:5433/kg_admin"
|
||
DB_SCHEMA = "kg_admin_new2"
|
||
|
||
TENANT_ID = "travel_agency"
|
||
PROJECT_ID = "travel_agency_2_0_test"
|
||
GRAPH_NAME = "travel_agency_2_0_test"
|
||
TEMPLATE_ID = "travel_agency_2_0_poi_nearby_import_without_amap_v1"
|
||
|
||
SOURCE_DIR = TRAVEL_DELIVERY_ROOT
|
||
HOTEL_FILE = SOURCE_DIR / "hotel_poi.csv"
|
||
RESTAURANT_FILE = SOURCE_DIR / "restaurant_poi.csv"
|
||
OUT_DIR = TRAVEL_KG_EXPORT_ROOT / "travel_agency_2_0_test_旅行社2.0测试/poi_nearby_import_without_amap"
|
||
RUN_UPDATED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def stable_id(prefix: str, text: str) -> str:
|
||
return f"{prefix}-{hashlib.sha1(text.encode('utf-8')).hexdigest()[:10].upper()}"
|
||
|
||
|
||
def graph_safe_props(payload: dict[str, Any]) -> dict[str, Any]:
|
||
safe: dict[str, Any] = {}
|
||
for key, value in payload.items():
|
||
if value is None:
|
||
continue
|
||
if isinstance(value, (str, int, float, bool)):
|
||
safe[key] = value
|
||
elif isinstance(value, (list, dict)):
|
||
safe[key] = json.dumps(value, ensure_ascii=False)
|
||
else:
|
||
safe[key] = str(value)
|
||
return safe
|
||
|
||
|
||
def cypher_label(value: str) -> str:
|
||
return re.sub(r"[^A-Za-z0-9_]", "", value) or "Entity"
|
||
|
||
|
||
def read_csv(path: Path) -> list[dict[str, str]]:
|
||
with path.open("r", encoding="utf-8-sig", newline="") as fh:
|
||
return list(csv.DictReader(fh))
|
||
|
||
|
||
def clean(value: Any) -> str:
|
||
return str(value or "").strip()
|
||
|
||
|
||
def compact(value: str) -> str:
|
||
return re.sub(r"[\s·,,/、()()【】\\-]+", "", clean(value))
|
||
|
||
|
||
def text_blob(row: dict[str, str]) -> str:
|
||
return " ".join(clean(v) for v in row.values())
|
||
|
||
|
||
def scenic_key(scenic: dict[str, Any]) -> str:
|
||
return compact(" ".join(clean(scenic.get(k)) for k in ("name", "short_name", "city", "county", "region_name")))
|
||
|
||
|
||
def load_scenics_from_pg() -> list[dict[str, Any]]:
|
||
with psycopg.connect(DB_URL, row_factory=dict_row) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"""
|
||
SELECT id, natural_key, display_name, payload_jsonb
|
||
FROM {DB_SCHEMA}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s AND entity_type='ScenicAttraction'
|
||
ORDER BY id
|
||
""",
|
||
(TENANT_ID, PROJECT_ID),
|
||
)
|
||
rows = []
|
||
for row in cur.fetchall():
|
||
payload = dict(row["payload_jsonb"] or {})
|
||
payload["pg_id"] = row["id"]
|
||
payload["natural_key"] = row["natural_key"]
|
||
payload["name"] = payload.get("name") or row["display_name"]
|
||
payload["display_name"] = row["display_name"]
|
||
rows.append(payload)
|
||
return rows
|
||
|
||
|
||
def scenic_matches(row: dict[str, str], scenics: list[dict[str, Any]], kind: str) -> list[tuple[dict[str, Any], str, float]]:
|
||
raw = text_blob(row)
|
||
region = compact(row.get("source_region", ""))
|
||
address = compact(row.get("source_address", ""))
|
||
city = compact(row.get("city", ""))
|
||
district = compact(row.get("district", ""))
|
||
town = compact(row.get("town", ""))
|
||
resource_name = compact(row.get("hotel_name") or row.get("restaurant_name") or "")
|
||
# Location evidence is intentionally separated from resource names. Names like
|
||
# “西江传说” can be a Guiyang restaurant brand, not a resource near Xijiang.
|
||
loc_hay = compact(" ".join([region, address, city, district, town, clean(row.get("formatted_address"))]))
|
||
name_hay = resource_name
|
||
hay = compact(raw)
|
||
|
||
matched: list[tuple[dict[str, Any], str, float]] = []
|
||
|
||
def add_by(predicate, reason: str, confidence: float) -> None:
|
||
for scenic in scenics:
|
||
if predicate(scenic):
|
||
if not any(s["natural_key"] == scenic["natural_key"] for s, _, _ in matched):
|
||
matched.append((scenic, reason, confidence))
|
||
|
||
def has_loc(*tokens: str) -> bool:
|
||
return any(token and token in loc_hay for token in tokens)
|
||
|
||
def has_name(*tokens: str) -> bool:
|
||
return any(token and token in name_hay for token in tokens)
|
||
|
||
def has_any(*tokens: str) -> bool:
|
||
return has_loc(*tokens) or has_name(*tokens)
|
||
|
||
# Direct scenic-area matches. For composite resource groups such as “织金/荔波”,
|
||
# exact city/county/address wins over the group label to avoid false links.
|
||
is_zhijin_specific = has_any("织金")
|
||
is_libo_specific = "荔波" in compact(" ".join([address, city, district, town, name_hay]))
|
||
if has_loc("黄果树") or (has_name("黄果树") and has_loc("安顺", "镇宁")):
|
||
add_by(lambda s: "黄果树" in scenic_key(s), "资源区域明确匹配黄果树/黄果树旅游区", 0.82)
|
||
if is_libo_specific or (has_loc("小七孔") and not is_zhijin_specific):
|
||
add_by(lambda s: "荔波小七孔" in scenic_key(s) or "小七孔" in scenic_key(s), "资源区县/地址明确匹配荔波小七孔", 0.82)
|
||
if is_zhijin_specific:
|
||
add_by(lambda s: "织金洞" in scenic_key(s), "资源区县/名称明确匹配织金洞", 0.82)
|
||
if has_loc("西江", "苗寨") or (has_name("西江", "苗寨") and has_loc("雷山", "黔东南")):
|
||
add_by(lambda s: "西江千户苗寨" in scenic_key(s) or "西江" in scenic_key(s), "资源区域明确匹配西江千户苗寨", 0.82)
|
||
if has_loc("镇远") or (has_name("镇远") and has_loc("黔东南")):
|
||
add_by(lambda s: "镇远古城" in scenic_key(s) or "镇远" in scenic_key(s), "资源区域明确匹配镇远古城", 0.82)
|
||
if has_loc("梵净山", "江口") or (has_name("梵净山") and has_loc("铜仁", "江口")):
|
||
add_by(lambda s: "梵净山" in scenic_key(s), "资源区域明确匹配梵净山/江口", 0.82)
|
||
if has_loc("茅台", "仁怀") or (has_name("茅台") and has_loc("遵义", "仁怀")):
|
||
add_by(lambda s: "茅台" in scenic_key(s), "资源区域明确匹配茅台/仁怀", 0.78)
|
||
|
||
if has_loc("贵阳") or city == "贵阳市":
|
||
if "花溪" in district or "花溪" in address:
|
||
add_by(lambda s: "青岩古镇" in scenic_key(s) or "天河潭" in scenic_key(s), "资源在贵阳花溪区,匹配花溪方向景区", 0.72)
|
||
elif "南明" in district or "南明" in address:
|
||
add_by(lambda s: "甲秀楼" in scenic_key(s), "资源在贵阳南明区,匹配甲秀楼片区", 0.72)
|
||
elif "云岩" in district or "云岩" in address:
|
||
add_by(lambda s: "黔灵山" in scenic_key(s), "资源在贵阳云岩区,匹配黔灵山片区", 0.72)
|
||
else:
|
||
add_by(lambda s: "贵阳市" in clean(s.get("city")), "资源为贵阳市通用合作资源,匹配贵阳市景区", 0.66)
|
||
|
||
if has_loc("安顺") or city == "安顺市":
|
||
add_by(
|
||
lambda s: "安顺市" in clean(s.get("city")) or any(t in scenic_key(s) for t in ("黄果树", "龙宫", "平坝", "花江", "安顺古城")),
|
||
"资源为安顺通用合作资源,匹配安顺方向景区",
|
||
0.66,
|
||
)
|
||
|
||
if has_loc("毕节", "黔西", "大方", "百里杜鹃"):
|
||
add_by(
|
||
lambda s: any(t in scenic_key(s) for t in ("百里杜鹃", "织金洞")),
|
||
"资源为毕节/黔西方向合作资源,匹配百里杜鹃/织金洞",
|
||
0.68,
|
||
)
|
||
|
||
if has_loc("遵义"):
|
||
add_by(
|
||
lambda s: "遵义市" in clean(s.get("city")) or any(t in scenic_key(s) for t in ("遵义会议", "茅台", "乌江寨")),
|
||
"资源为遵义方向合作资源,匹配遵义线路景区",
|
||
0.66,
|
||
)
|
||
|
||
if has_loc("兴义", "黔西南", "万峰林", "马岭河"):
|
||
add_by(
|
||
lambda s: "兴义市" in clean(s.get("county")) or any(t in scenic_key(s) for t in ("万峰林", "万峰湖", "马岭河")),
|
||
"资源为兴义/黔西南方向合作资源,匹配兴义景区",
|
||
0.68,
|
||
)
|
||
|
||
if has_loc("铜仁", "碧江"):
|
||
add_by(
|
||
lambda s: "铜仁市" in clean(s.get("city")) or any(t in scenic_key(s) for t in ("中南门", "梵净山")),
|
||
"资源为铜仁方向合作资源,匹配铜仁景区",
|
||
0.64,
|
||
)
|
||
|
||
# Do not connect generic “黔东南/榕江” resources to all nearby-looking scenic nodes.
|
||
# Without route evidence or driving distance this creates noisy false positives.
|
||
|
||
# If a county/district exactly appears in a scenic, keep it even when no region rule fired.
|
||
if not matched and (district or town):
|
||
add_by(
|
||
lambda s: (district and district in scenic_key(s)) or (town and town in scenic_key(s)),
|
||
"资源区县字段与景区区县字段匹配",
|
||
0.62,
|
||
)
|
||
|
||
# Avoid pretending far resources are nearby when current scenic anchors are absent.
|
||
if "开阳" in region or "猴耳" in region or "六盘水" in region:
|
||
matched = []
|
||
|
||
return matched
|
||
|
||
|
||
def build_hotel_node(row: dict[str, str], row_no: int, nearby_names: list[str]) -> dict[str, Any]:
|
||
name = clean(row.get("hotel_name"))
|
||
key_seed = "|".join([name, clean(row.get("source_region")), clean(row.get("source_address"))])
|
||
low = clean(row.get("low_season_price"))
|
||
high = clean(row.get("high_season_price"))
|
||
price_text = ";".join(part for part in [f"淡季:{low}" if low else "", f"旺季:{high}" if high else "", f"挂牌价:{clean(row.get('list_price'))}" if clean(row.get("list_price")) else ""] if part)
|
||
hotel_id = stable_id("HOTEL", key_seed)
|
||
return {
|
||
"label": "Hotel",
|
||
"natural_key": f"hotel:{hotel_id}",
|
||
"hotel_id": hotel_id,
|
||
"name": name,
|
||
"type": "Hotel",
|
||
"subtype": clean(row.get("star_rating")) or "酒店",
|
||
"star_rating": clean(row.get("star_rating")),
|
||
"hotel_grade": clean(row.get("star_rating")),
|
||
"supplier_name": "",
|
||
"partner_status": "合作资源",
|
||
"business_status": "active",
|
||
"description": clean(row.get("feature")),
|
||
"address": clean(row.get("formatted_address")) or clean(row.get("source_address")),
|
||
"province": clean(row.get("province")) or "贵州省",
|
||
"city": clean(row.get("city")),
|
||
"county": clean(row.get("district")),
|
||
"town": clean(row.get("town")),
|
||
"region_name": clean(row.get("source_region")),
|
||
"region_id": stable_id("REG", "|".join([clean(row.get("province")), clean(row.get("city")), clean(row.get("district")), clean(row.get("source_region"))])),
|
||
"geo": "",
|
||
"nearby_scenic_name": nearby_names[0] if nearby_names else "",
|
||
"nearby_scenic_names": "、".join(nearby_names),
|
||
"lat": clean(row.get("geo_lat")),
|
||
"lng": clean(row.get("geo_lng")),
|
||
"amap_poi_id": clean(row.get("amap_poi_id")),
|
||
"contact_phone": clean(row.get("contact")),
|
||
"features": clean(row.get("feature")),
|
||
"room_type_summary": price_text,
|
||
"base_price_text": price_text,
|
||
"applicable_products": clean(row.get("applicable_products")),
|
||
"primary_image_url": "",
|
||
"image_urls": "",
|
||
"source_file": str(HOTEL_FILE),
|
||
"source_row": row_no,
|
||
"price_source": "住宿资源库(四钻及以上)抽取;价格待服务商确认" if price_text else "价格缺失,待服务商确认",
|
||
"data_quality": "SOURCE_PARTIAL",
|
||
"inventory_status": "UNKNOWN",
|
||
"inventory_source": "未接入房态",
|
||
"requires_supplier_confirm": True,
|
||
"supplier_confirm_notes": "高德坐标/驾车距离/房态待补全;当前按业务区域建立候选关系。",
|
||
}
|
||
|
||
|
||
def build_restaurant_node(row: dict[str, str], row_no: int, nearby_names: list[str]) -> dict[str, Any]:
|
||
name = clean(row.get("restaurant_name"))
|
||
key_seed = "|".join([name, clean(row.get("source_region")), clean(row.get("source_address"))])
|
||
restaurant_id = stable_id("REST", key_seed)
|
||
return {
|
||
"label": "Restaurant",
|
||
"natural_key": f"restaurant:{restaurant_id}",
|
||
"restaurant_id": restaurant_id,
|
||
"name": name,
|
||
"type": "Restaurant",
|
||
"subtype": "餐厅",
|
||
"cuisine_type": clean(row.get("specialty_dishes")),
|
||
"meal_type": clean(row.get("applicable_scene")),
|
||
"supplier_name": "",
|
||
"partner_status": "合作资源",
|
||
"business_status": "active",
|
||
"description": clean(row.get("specialty_dishes")),
|
||
"avg_price": clean(row.get("avg_price_per_person")),
|
||
"price_text": clean(row.get("avg_price_per_person")),
|
||
"currency": "CNY",
|
||
"unit": "人",
|
||
"capacity": "",
|
||
"private_room": "",
|
||
"signature_dishes": clean(row.get("specialty_dishes")),
|
||
"address": clean(row.get("formatted_address")) or clean(row.get("source_address")),
|
||
"province": clean(row.get("province")) or "贵州省",
|
||
"city": clean(row.get("city")),
|
||
"county": clean(row.get("district")),
|
||
"town": clean(row.get("town")),
|
||
"region_name": clean(row.get("source_region")),
|
||
"region_id": stable_id("REG", "|".join([clean(row.get("province")), clean(row.get("city")), clean(row.get("district")), clean(row.get("source_region"))])),
|
||
"geo": "",
|
||
"nearby_scenic_name": nearby_names[0] if nearby_names else "",
|
||
"nearby_scenic_names": "、".join(nearby_names),
|
||
"lat": clean(row.get("geo_lat")),
|
||
"lng": clean(row.get("geo_lng")),
|
||
"amap_poi_id": clean(row.get("amap_poi_id")),
|
||
"contact_phone": clean(row.get("contact")),
|
||
"primary_image_url": "",
|
||
"image_urls": "",
|
||
"applicable_scene": clean(row.get("applicable_scene")),
|
||
"source_file": str(RESTAURANT_FILE),
|
||
"source_row": row_no,
|
||
"data_quality": "SOURCE_PARTIAL",
|
||
"policy_json": "",
|
||
"refund_policy": "",
|
||
"inventory_status": "UNKNOWN",
|
||
"inventory_source": "未接入桌位库存",
|
||
"requires_supplier_confirm": True,
|
||
"supplier_confirm_notes": "高德坐标/驾车距离/餐位容量待补全;当前按业务区域建立候选关系。",
|
||
}
|
||
|
||
|
||
def build_nodes_and_relations(scenics: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
|
||
nodes: list[dict[str, Any]] = []
|
||
relations: list[dict[str, Any]] = []
|
||
unmatched: dict[str, list[dict[str, str]]] = {"Hotel": [], "Restaurant": []}
|
||
scenic_rel_rank: dict[tuple[str, str], int] = defaultdict(int)
|
||
|
||
for kind, path, name_col, builder in [
|
||
("Hotel", HOTEL_FILE, "hotel_name", build_hotel_node),
|
||
("Restaurant", RESTAURANT_FILE, "restaurant_name", build_restaurant_node),
|
||
]:
|
||
rows = read_csv(path)
|
||
for row_no, row in enumerate(rows, start=2):
|
||
if not clean(row.get(name_col)):
|
||
continue
|
||
matches = scenic_matches(row, scenics, kind)
|
||
if matches:
|
||
# In this business graph, NEARBY should represent the primary scenic anchor
|
||
# for a POI. Broader same-city/same-region suggestions can be retrieved by
|
||
# attributes and should not explode into many weak graph edges.
|
||
matches = [sorted(matches, key=lambda item: (-item[2], clean(item[0].get("name"))))[0]]
|
||
if not matches:
|
||
unmatched[kind].append(row)
|
||
nearby_names = [clean(s.get("short_name")) or clean(s.get("name")) for s, _, _ in matches]
|
||
node = builder(row, row_no, nearby_names)
|
||
nodes.append(node)
|
||
for scenic, reason, confidence in matches:
|
||
scenic_key_value = scenic["natural_key"]
|
||
rank_key = (scenic_key_value, kind)
|
||
scenic_rel_rank[rank_key] += 1
|
||
relations.append(
|
||
{
|
||
"source": scenic_key_value,
|
||
"relation_type": "ATTRACTION_NEARBY_RESOURCE",
|
||
"target": node["natural_key"],
|
||
"properties": {
|
||
"resource_type": kind,
|
||
"resource_id": node.get("hotel_id") or node.get("restaurant_id"),
|
||
"distance_km": "",
|
||
"driving_minutes": "",
|
||
"walking_minutes": "",
|
||
"distance_status": "pending_amap_driving",
|
||
"rank": scenic_rel_rank[rank_key],
|
||
"is_partner": True,
|
||
"is_default_candidate": False,
|
||
"fit_reason": reason,
|
||
"source": "business_region_match_without_amap",
|
||
"match_method": "business_region",
|
||
"confidence": confidence,
|
||
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
||
},
|
||
}
|
||
)
|
||
|
||
# Deduplicate nodes by natural_key and relations by endpoint/type.
|
||
dedup_nodes: dict[str, dict[str, Any]] = {}
|
||
for node in nodes:
|
||
dedup_nodes.setdefault(node["natural_key"], node)
|
||
|
||
dedup_rel: list[dict[str, Any]] = []
|
||
seen_rel: set[tuple[str, str, str]] = set()
|
||
for rel in relations:
|
||
sig = (rel["source"], rel["relation_type"], rel["target"])
|
||
if sig in seen_rel:
|
||
continue
|
||
seen_rel.add(sig)
|
||
dedup_rel.append(rel)
|
||
|
||
stats = {
|
||
"entity_counts": dict(Counter(n["label"] for n in dedup_nodes.values())),
|
||
"relation_counts": dict(Counter(r["relation_type"] for r in dedup_rel)),
|
||
"unmatched_counts": {k: len(v) for k, v in unmatched.items()},
|
||
"unmatched_examples": {
|
||
k: [clean(r.get("hotel_name") or r.get("restaurant_name")) + " / " + clean(r.get("source_region")) for r in v[:20]]
|
||
for k, v in unmatched.items()
|
||
},
|
||
"nearby_by_scenic": dict(Counter(r["source"] for r in dedup_rel)),
|
||
}
|
||
return list(dedup_nodes.values()), dedup_rel, stats
|
||
|
||
|
||
def write_postgres(nodes: list[dict[str, Any]], relations: list[dict[str, Any]], stats: dict[str, Any]) -> dict[str, Any]:
|
||
file_hash = hashlib.md5(
|
||
json.dumps({"nodes": nodes, "relations": relations}, ensure_ascii=False, sort_keys=True).encode()
|
||
).hexdigest()
|
||
with psycopg.connect(DB_URL, row_factory=dict_row) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"""
|
||
DELETE FROM {DB_SCHEMA}.candidate_relations r
|
||
WHERE r.tenant_id=%s AND r.project_id=%s AND (
|
||
r.relation_type='ATTRACTION_NEARBY_RESOURCE'
|
||
OR r.source_candidate_id IN (
|
||
SELECT id FROM {DB_SCHEMA}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s AND entity_type IN ('Hotel','Restaurant')
|
||
)
|
||
OR r.target_candidate_id IN (
|
||
SELECT id FROM {DB_SCHEMA}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s AND entity_type IN ('Hotel','Restaurant')
|
||
)
|
||
)
|
||
""",
|
||
(TENANT_ID, PROJECT_ID, TENANT_ID, PROJECT_ID, TENANT_ID, PROJECT_ID),
|
||
)
|
||
cur.execute(
|
||
f"""
|
||
DELETE FROM {DB_SCHEMA}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s AND entity_type IN ('Hotel','Restaurant')
|
||
""",
|
||
(TENANT_ID, PROJECT_ID),
|
||
)
|
||
cur.execute(
|
||
f"""
|
||
INSERT INTO {DB_SCHEMA}.import_batches (
|
||
tenant_id, project_id, graph_name, template_id, source_name, file_name,
|
||
file_hash, status, total_rows, success_rows, failed_rows, created_by, updated_at
|
||
)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,'published',%s,%s,0,'codex-import',now())
|
||
RETURNING id
|
||
""",
|
||
(
|
||
TENANT_ID,
|
||
PROJECT_ID,
|
||
GRAPH_NAME,
|
||
TEMPLATE_ID,
|
||
"酒店餐饮 POI 增量导入(高德限额前先按业务区域 NEARBY)",
|
||
f"{HOTEL_FILE.name};{RESTAURANT_FILE.name}",
|
||
file_hash,
|
||
len(nodes) + len(relations),
|
||
len(nodes) + len(relations),
|
||
),
|
||
)
|
||
batch_id = cur.fetchone()["id"]
|
||
|
||
id_by_key: dict[str, int] = {}
|
||
cur.execute(
|
||
f"""
|
||
SELECT id, natural_key
|
||
FROM {DB_SCHEMA}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s AND entity_type='ScenicAttraction'
|
||
""",
|
||
(TENANT_ID, PROJECT_ID),
|
||
)
|
||
for row in cur.fetchall():
|
||
id_by_key[row["natural_key"]] = row["id"]
|
||
|
||
for node in nodes:
|
||
payload = {k: v for k, v in node.items() if k not in {"label", "natural_key", "name"}}
|
||
cur.execute(
|
||
f"""
|
||
INSERT INTO {DB_SCHEMA}.candidate_entities (
|
||
tenant_id, project_id, batch_id, template_id, entity_type, natural_key,
|
||
display_name, payload_jsonb, confidence, status, reviewed_by, reviewed_at, updated_at
|
||
)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,0.82,'published','codex-import',now(),now())
|
||
RETURNING id
|
||
""",
|
||
(
|
||
TENANT_ID,
|
||
PROJECT_ID,
|
||
batch_id,
|
||
TEMPLATE_ID,
|
||
node["label"],
|
||
node["natural_key"],
|
||
node["name"],
|
||
Jsonb(payload),
|
||
),
|
||
)
|
||
id_by_key[node["natural_key"]] = cur.fetchone()["id"]
|
||
|
||
inserted_relations = 0
|
||
for rel in relations:
|
||
src_id = id_by_key.get(rel["source"])
|
||
dst_id = id_by_key.get(rel["target"])
|
||
if not src_id or not dst_id:
|
||
continue
|
||
cur.execute(
|
||
f"""
|
||
INSERT INTO {DB_SCHEMA}.candidate_relations (
|
||
tenant_id, project_id, batch_id, source_candidate_id, relation_type,
|
||
target_candidate_id, target_ref_jsonb, payload_jsonb, status
|
||
)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,'published')
|
||
""",
|
||
(
|
||
TENANT_ID,
|
||
PROJECT_ID,
|
||
batch_id,
|
||
src_id,
|
||
rel["relation_type"],
|
||
dst_id,
|
||
Jsonb({"natural_key": rel["target"]}),
|
||
Jsonb(rel["properties"]),
|
||
),
|
||
)
|
||
inserted_relations += 1
|
||
|
||
cur.execute(
|
||
f"""
|
||
UPDATE {DB_SCHEMA}.graph_releases
|
||
SET metadata_jsonb = COALESCE(metadata_jsonb, '{{}}'::jsonb) || %s,
|
||
updated_at=now()
|
||
WHERE tenant_id=%s AND project_id=%s AND graph_name=%s AND status='active'
|
||
""",
|
||
(
|
||
Jsonb(
|
||
{
|
||
"last_poi_nearby_import_at": datetime.now().isoformat(timespec="seconds"),
|
||
"poi_nearby_import": stats,
|
||
}
|
||
),
|
||
TENANT_ID,
|
||
PROJECT_ID,
|
||
GRAPH_NAME,
|
||
),
|
||
)
|
||
conn.commit()
|
||
return {"batch_id": batch_id, "postgres_relations": inserted_relations}
|
||
|
||
|
||
def write_falkor(nodes: list[dict[str, Any]], relations: list[dict[str, Any]]) -> dict[str, int]:
|
||
db = FalkorDB(host="localhost", port=6380)
|
||
graph = db.select_graph(GRAPH_NAME)
|
||
graph.query("MATCH (:ScenicAttraction)-[r:ATTRACTION_NEARBY_RESOURCE]->() DELETE r")
|
||
graph.query("MATCH (n:Hotel) DETACH DELETE n")
|
||
graph.query("MATCH (n:Restaurant) DETACH DELETE n")
|
||
|
||
for node in nodes:
|
||
node.setdefault("updated_at", RUN_UPDATED_AT)
|
||
graph.query(
|
||
f"MERGE (n:{cypher_label(node['label'])} {{natural_key:$natural_key}}) SET n += $props",
|
||
{"natural_key": node["natural_key"], "props": graph_safe_props(node)},
|
||
)
|
||
|
||
for rel in relations:
|
||
rel_props = {"updated_at": RUN_UPDATED_AT, **rel["properties"]}
|
||
graph.query(
|
||
"""
|
||
MATCH (a {natural_key:$source}), (b {natural_key:$target})
|
||
MERGE (a)-[r:ATTRACTION_NEARBY_RESOURCE]->(b)
|
||
SET r += $props
|
||
""",
|
||
{
|
||
"source": rel["source"],
|
||
"target": rel["target"],
|
||
"props": graph_safe_props(
|
||
{
|
||
"natural_key": f"{rel['source']}->ATTRACTION_NEARBY_RESOURCE->{rel['target']}",
|
||
**rel_props,
|
||
}
|
||
),
|
||
},
|
||
)
|
||
|
||
node_count = graph.query("MATCH (n) RETURN count(n)").result_set[0][0]
|
||
rel_count = graph.query("MATCH ()-[r]->() RETURN count(r)").result_set[0][0]
|
||
hotel_count = graph.query("MATCH (n:Hotel) RETURN count(n)").result_set[0][0]
|
||
restaurant_count = graph.query("MATCH (n:Restaurant) RETURN count(n)").result_set[0][0]
|
||
nearby_count = graph.query("MATCH ()-[r:ATTRACTION_NEARBY_RESOURCE]->() RETURN count(r)").result_set[0][0]
|
||
return {
|
||
"graph_nodes": node_count,
|
||
"graph_relations": rel_count,
|
||
"hotel_count": hotel_count,
|
||
"restaurant_count": restaurant_count,
|
||
"nearby_count": nearby_count,
|
||
}
|
||
|
||
|
||
def update_release_counts(graph_info: dict[str, int]) -> None:
|
||
with psycopg.connect(DB_URL, row_factory=dict_row) as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
f"""
|
||
UPDATE {DB_SCHEMA}.graph_releases
|
||
SET metadata_jsonb = COALESCE(metadata_jsonb, '{{}}'::jsonb) || %s,
|
||
updated_at=now()
|
||
WHERE tenant_id=%s AND project_id=%s AND graph_name=%s AND status='active'
|
||
""",
|
||
(
|
||
Jsonb(
|
||
{
|
||
"node_count": graph_info["graph_nodes"],
|
||
"relation_count": graph_info["graph_relations"],
|
||
"hotel_count": graph_info["hotel_count"],
|
||
"restaurant_count": graph_info["restaurant_count"],
|
||
"nearby_count": graph_info["nearby_count"],
|
||
}
|
||
),
|
||
TENANT_ID,
|
||
PROJECT_ID,
|
||
GRAPH_NAME,
|
||
),
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def write_outputs(nodes: list[dict[str, Any]], relations: list[dict[str, Any]], stats: dict[str, Any], summary: dict[str, Any]) -> None:
|
||
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
(OUT_DIR / "hotel_restaurant_nodes.json").write_text(json.dumps(nodes, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
(OUT_DIR / "scenic_nearby_relations.json").write_text(json.dumps(relations, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
(OUT_DIR / "import_summary.json").write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
with (OUT_DIR / "hotel_restaurant_nodes.csv").open("w", newline="", encoding="utf-8-sig") as fh:
|
||
writer = csv.DictWriter(fh, fieldnames=["label", "natural_key", "name", "region_name", "city", "county", "base_price_text", "price_text", "nearby_scenic_names", "data_quality"])
|
||
writer.writeheader()
|
||
for node in nodes:
|
||
writer.writerow({k: node.get(k, "") for k in writer.fieldnames})
|
||
|
||
with (OUT_DIR / "scenic_nearby_relations.csv").open("w", newline="", encoding="utf-8-sig") as fh:
|
||
writer = csv.DictWriter(fh, fieldnames=["relation_type", "source", "target", "resource_type", "rank", "fit_reason", "distance_status", "confidence"])
|
||
writer.writeheader()
|
||
for rel in relations:
|
||
props = rel["properties"]
|
||
writer.writerow(
|
||
{
|
||
"relation_type": rel["relation_type"],
|
||
"source": rel["source"],
|
||
"target": rel["target"],
|
||
"resource_type": props.get("resource_type", ""),
|
||
"rank": props.get("rank", ""),
|
||
"fit_reason": props.get("fit_reason", ""),
|
||
"distance_status": props.get("distance_status", ""),
|
||
"confidence": props.get("confidence", ""),
|
||
}
|
||
)
|
||
|
||
report = [
|
||
"# 酒店餐饮 POI 与景区 NEARBY 增量导入说明",
|
||
"",
|
||
f"执行时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
||
"",
|
||
"## 处理原则",
|
||
"- 高德 API 当前限额,因此本次不生成经纬度、驾车距离和驾车时间。",
|
||
"- 酒店、餐饮作为独立实体导入,不放入 TravelItem。",
|
||
"- 景区到酒店/餐饮使用 `ATTRACTION_NEARBY_RESOURCE` 关系。",
|
||
"- 关系先按业务资源库的区域字段匹配,并写入 `distance_status=pending_amap_driving`,后续可用高德回填真实车程。",
|
||
"- 开阳/猴耳天坑、六盘水等当前无明确景区锚点的资源暂不强连。",
|
||
"",
|
||
"## 导入统计",
|
||
f"- Hotel 节点:{summary['hotel_count']}",
|
||
f"- Restaurant 节点:{summary['restaurant_count']}",
|
||
f"- NEARBY 关系:{summary['nearby_count']}",
|
||
f"- 图谱总节点:{summary['graph_nodes']}",
|
||
f"- 图谱总关系:{summary['graph_relations']}",
|
||
"",
|
||
"## 未匹配资源",
|
||
f"- 酒店未匹配:{stats['unmatched_counts'].get('Hotel', 0)}",
|
||
f"- 餐饮未匹配:{stats['unmatched_counts'].get('Restaurant', 0)}",
|
||
"",
|
||
"### 酒店未匹配样例",
|
||
*[f"- {x}" for x in stats["unmatched_examples"].get("Hotel", [])],
|
||
"",
|
||
"### 餐饮未匹配样例",
|
||
*[f"- {x}" for x in stats["unmatched_examples"].get("Restaurant", [])],
|
||
"",
|
||
"## 输出文件",
|
||
"- `hotel_restaurant_nodes.csv`",
|
||
"- `scenic_nearby_relations.csv`",
|
||
"- `hotel_restaurant_nodes.json`",
|
||
"- `scenic_nearby_relations.json`",
|
||
"- `import_summary.json`",
|
||
]
|
||
(OUT_DIR / "导入说明.md").write_text("\n".join(report), encoding="utf-8")
|
||
|
||
|
||
def main() -> None:
|
||
scenics = load_scenics_from_pg()
|
||
if not scenics:
|
||
raise RuntimeError("No ScenicAttraction nodes found in target project")
|
||
nodes, relations, stats = build_nodes_and_relations(scenics)
|
||
pg_info = write_postgres(nodes, relations, stats)
|
||
graph_info = write_falkor(nodes, relations)
|
||
update_release_counts(graph_info)
|
||
summary = {
|
||
"project_id": PROJECT_ID,
|
||
"graph_name": GRAPH_NAME,
|
||
"nodes_to_import": len(nodes),
|
||
"relations_to_import": len(relations),
|
||
**pg_info,
|
||
**graph_info,
|
||
"output_dir": str(OUT_DIR),
|
||
"stats": stats,
|
||
}
|
||
write_outputs(nodes, relations, stats, summary)
|
||
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|