#!/usr/bin/env python3 """Align Huaxi kg_schema_v1 demo nodes back to existing AMap/transport anchors. This is intentionally non-destructive: it does not delete the earlier demo nodes. It creates canonical links and duplicates useful knowledge edges onto the existing high-trust AMap POI so the graph browser shows one rich POI with coordinates, ratings, transit access, concepts, events, and evidence. """ from __future__ import annotations import math import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from falkordb import FalkorDB # noqa: E402 from app.config import settings # noqa: E402 TEMP_ROOT_ID = "ent_huaxi_park" CANONICAL_HUAXI_ELEMENT_ID = "amap:B035300A51" QINGYAN_TEMP_ID = "ent_qingyan" QINGYAN_AMAP_ELEMENT_ID = "amap:B035300ESE" def haversine_m(lng1: float, lat1: float, lng2: float, lat2: float) -> float: radius = 6371000.0 phi1 = math.radians(lat1) phi2 = math.radians(lat2) d_phi = math.radians(lat2 - lat1) d_lam = math.radians(lng2 - lng1) a = ( math.sin(d_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(d_lam / 2) ** 2 ) return 2 * radius * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def graph(): return FalkorDB( host=settings.falkordb_host, port=settings.falkordb_port, ).select_graph(settings.falkordb_graph) def one_node_props(g, cypher: str, params: dict[str, Any]) -> dict[str, Any] | None: res = g.query(cypher, params) if not res.result_set: return None node = res.result_set[0][0] return getattr(node, "properties", {}) or {} def copy_root_properties_to_canonical(g) -> None: temp = one_node_props(g, "MATCH (n {id:$id}) RETURN n LIMIT 1", {"id": TEMP_ROOT_ID}) if not temp: return params = { "element_id": CANONICAL_HUAXI_ELEMENT_ID, "kg_id": TEMP_ROOT_ID, "kg_description": temp.get("description") or "", "kg_address": temp.get("address") or "", "kg_climate": temp.get("climate") or "", "kg_opening_hours": temp.get("opening_hours") or "", "kg_scenic_level": temp.get("scenic_level") or "", "kg_ticket_price": temp.get("ticket_price") or "", "kg_area": temp.get("area") or "", "kg_best_season": temp.get("best_season") or "", "kg_suggested_duration": temp.get("suggested_duration") or "", "kg_evidence_quote": temp.get("evidence_quote") or "", } g.query( """ MATCH (p:Place {element_id:$element_id}) SET p.kg_id=$kg_id, p.kg_schema_v1_enriched=1, p.kg_description=$kg_description, p.kg_address=$kg_address, p.kg_climate=$kg_climate, p.kg_opening_hours=$kg_opening_hours, p.kg_scenic_level=$kg_scenic_level, p.kg_ticket_price=$kg_ticket_price, p.kg_area=$kg_area, p.kg_best_season=$kg_best_season, p.kg_suggested_duration=$kg_suggested_duration, p.kg_evidence_quote=$kg_evidence_quote """, params, ) g.query( """ MATCH (t {id:$kg_id}) MATCH (p:Place {element_id:$element_id}) MERGE (t)-[r:SAME_AS]->(p) SET r.confidence=0.98, r.reason='same name + AMap sight anchor + Baike address compatible', r.source='entity_alignment' SET t.canonical_element_id=$element_id, t.shadow_node=1 """, params, ) def mirror_temp_edges_to_canonical(g) -> int: """Copy outgoing temp-root knowledge edges to the canonical AMap Place.""" res = g.query( "MATCH (t {id:$temp})-[r]->(m) RETURN type(r), properties(r), m", {"temp": TEMP_ROOT_ID}, ) count = 0 for rel, props, target in res.result_set: if rel == "SAME_AS": continue target_props = getattr(target, "properties", {}) or {} target_id = target_props.get("id") or target_props.get("element_id") or target_props.get("place_id") if not target_id: continue relation = "".join(ch for ch in str(rel).upper() if ch.isalnum() or ch == "_") or "RELATED_TO" g.query( f""" MATCH (p:Place {{element_id:$root}}) MATCH (m) WHERE m.id=$target_id OR m.element_id=$target_id OR m.place_id=$target_id MERGE (p)-[r:{relation}]->(m) SET r.confidence=$confidence, r.evidence_quote=$evidence_quote, r.source='entity_alignment', r.mirrored_from=$temp """, { "root": CANONICAL_HUAXI_ELEMENT_ID, "target_id": str(target_id), "confidence": float((props or {}).get("confidence") or 0.9), "evidence_quote": str((props or {}).get("evidence_quote") or ""), "temp": TEMP_ROOT_ID, }, ) count += 1 return count def align_qingyan(g) -> None: g.query( """ MATCH (t {id:$temp}) MATCH (p:Place {element_id:$amap}) MERGE (t)-[r:POSSIBLE_MATCH]->(p) SET r.confidence=0.84, r.reason='Baike mentions 青岩镇; existing graph has 青岩古镇 scenic POI. Needs final human confirmation if strict admin-town vs attraction distinction matters.', r.source='entity_alignment' SET t.candidate_element_id=$amap, t.alignment_status='possible_match' """, {"temp": QINGYAN_TEMP_ID, "amap": QINGYAN_AMAP_ELEMENT_ID}, ) g.query( """ MATCH (h:Place {element_id:$huaxi}) MATCH (q:Place {element_id:$qingyan}) MERGE (h)-[r:NEARBY_ATTRACTION]->(q) SET r.confidence=0.84, r.evidence_quote='花溪以南12公里处的青岩镇', r.source='kg_schema_v1+entity_alignment', r.alignment_note='target aligned from ent_qingyan to existing AMap 青岩古镇' """, {"huaxi": CANONICAL_HUAXI_ELEMENT_ID, "qingyan": QINGYAN_AMAP_ELEMENT_ID}, ) def link_nearby_transit(g, radius_m: float = 900.0) -> int: root = one_node_props( g, "MATCH (p:Place {element_id:$id}) RETURN p LIMIT 1", {"id": CANONICAL_HUAXI_ELEMENT_ID}, ) if not root or root.get("lng") is None or root.get("lat") is None: return 0 lng = float(root["lng"]) lat = float(root["lat"]) res = g.query( """ MATCH (s:Place) WHERE s.station_type IS NOT NULL AND s.lng IS NOT NULL AND s.lat IS NOT NULL AND (s.name CONTAINS '花溪公园' OR s.name CONTAINS '轨道花溪公园') RETURN s LIMIT 80 """ ) count = 0 for (node,) in res.result_set: props = getattr(node, "properties", {}) or {} try: dist = haversine_m(lng, lat, float(props["lng"]), float(props["lat"])) except Exception: continue if dist > radius_m: continue station_id = props.get("element_id") or props.get("place_id") if not station_id: continue g.query( """ MATCH (p:Place {element_id:$root}) MATCH (s:Place) WHERE s.element_id=$sid OR s.place_id=$sid MERGE (p)-[r:NEAR_TRANSIT]->(s) SET r.distance_m=$distance_m, r.station_type=$station_type, r.source='spatial_alignment', r.confidence=0.92 """, { "root": CANONICAL_HUAXI_ELEMENT_ID, "sid": station_id, "distance_m": round(dist, 1), "station_type": props.get("station_type") or "", }, ) count += 1 return count def main() -> None: g = graph() copy_root_properties_to_canonical(g) mirrored = mirror_temp_edges_to_canonical(g) align_qingyan(g) transit = link_nearby_transit(g) summary = { "graph": settings.falkordb_graph, "canonical_huaxi": CANONICAL_HUAXI_ELEMENT_ID, "mirrored_edges_to_canonical": mirrored, "near_transit_edges": transit, "qingyan_alignment": QINGYAN_AMAP_ELEMENT_ID, } print(summary) if __name__ == "__main__": main()