Files
bxh/scripts/publish_huaxi_kg_schema_v1_to_falkor.py

309 lines
11 KiB
Python

#!/usr/bin/env python3
"""Publish the Huaxi kg_schema_v1 preview into FalkorDB for graph-browser QA."""
from __future__ import annotations
import hashlib
import json
import re
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from falkordb import FalkorDB # noqa: E402
from app.config import settings # noqa: E402
IN_JSON = ROOT / "docs/reports/huaxi_kg_schema_v1_ready.json"
ROOT_PLACE_ID = "ent_huaxi_park"
BAIDU_BAIKE_SOURCE_NAME = "百度百科"
BAIDU_BAIKE_SOURCE_URL = "https://baike.baidu.com/item/%E8%8A%B1%E6%BA%AA%E5%85%AC%E5%9B%AD"
def safe_token(value: str, fallback: str) -> str:
token = re.sub(r"[^A-Za-z0-9_]", "", value or "")
if not token:
return fallback
if token[0].isdigit():
token = f"{fallback}_{token}"
return token
def rel_type(value: str) -> str:
token = safe_token(value.upper(), "RELATED_TO")
return token if re.match(r"^[A-Z_][A-Z0-9_]*$", token) else "RELATED_TO"
def node_label(value: str, fallback: str = "Entity") -> str:
return safe_token(value, fallback)
def literal_id(statement: dict[str, Any]) -> str:
raw = f"{statement.get('subject_ref')}|{statement.get('predicate')}|{statement.get('object_ref')}"
digest = hashlib.md5(raw.encode("utf-8")).hexdigest()[:16]
return f"lit_{digest}"
def first_quote(row: dict[str, Any]) -> str:
spans = row.get("source_spans") or []
if spans and isinstance(spans[0], dict):
return str(spans[0].get("quote") or "")
return ""
def first_evidence_id(row: dict[str, Any]) -> str:
spans = row.get("source_spans") or []
if spans and isinstance(spans[0], dict):
return str(spans[0].get("evidence_id") or "")
return ""
def main() -> None:
payload = json.loads(IN_JSON.read_text(encoding="utf-8"))
graph = FalkorDB(host=settings.falkordb_host, port=settings.falkordb_port).select_graph(
settings.falkordb_graph
)
entity_ids = {row["temp_id"] for row in payload["entities"]}
event_ids = {row["temp_id"] for row in payload["events"]}
concept_ids = {row["temp_id"] for row in payload["concepts"]}
for row in payload["entities"]:
label = node_label(row.get("entity_type") or "Entity")
graph.query(
f"""
MERGE (n:{label} {{id:$id}})
SET n.name=$name,
n.entity_type=$entity_type,
n.description=$description,
n.source='baidu_baike',
n.source_name=$source_name,
n.source_url=$source_url,
n.extraction_schema='kg_schema_v1',
n.review_status='auto_published',
n.confidence=$confidence,
n.evidence_quote=$evidence_quote
""",
{
"id": row["temp_id"],
"name": row.get("name") or "",
"entity_type": row.get("entity_type") or "",
"description": row.get("description") or "",
"confidence": float(row.get("confidence") or 0),
"evidence_quote": first_quote(row),
"source_name": BAIDU_BAIKE_SOURCE_NAME,
"source_url": BAIDU_BAIKE_SOURCE_URL,
},
)
attrs = row.get("attributes") or {}
if attrs:
sets = []
params = {"id": row["temp_id"]}
for i, (key, value) in enumerate(attrs.items()):
prop = safe_token(str(key), f"attr_{i}")
pkey = f"v{i}"
sets.append(f"n.{prop}=${pkey}")
params[pkey] = str(value)
graph.query(f"MATCH (n:{label} {{id:$id}}) SET {', '.join(sets)}", params)
for row in payload["events"]:
graph.query(
"""
MERGE (n:Event {id:$id})
SET n.title=$title,
n.name=$title,
n.event_id=$id,
n.event_type=$event_type,
n.event_date=$time_text,
n.event_time=$time_text,
n.event_date_norm=$time_norm,
n.time_text=$time_text,
n.time_norm=$time_norm,
n.description=$description,
n.source='baidu_baike',
n.source_name=$source_name,
n.source_url=$source_url,
n.evidence_url=$source_url,
n.extraction_schema='kg_schema_v1',
n.review_status='auto_published',
n.confidence=$confidence,
n.evidence_id=$evidence_id,
n.evidence_quote=$evidence_quote
""",
{
"id": row["temp_id"],
"title": row.get("title") or "",
"event_type": row.get("event_type") or "",
"time_text": row.get("time_text") or "",
"time_norm": row.get("time_norm") or "",
"description": row.get("description") or "",
"confidence": float(row.get("confidence") or 0),
"evidence_id": first_evidence_id(row),
"evidence_quote": first_quote(row),
"source_name": BAIDU_BAIKE_SOURCE_NAME,
"source_url": BAIDU_BAIKE_SOURCE_URL,
},
)
for row in payload["concepts"]:
graph.query(
"""
MERGE (n:Concept {id:$id})
SET n.name=$name,
n.concept_type=$concept_type,
n.description=$description,
n.source='baidu_baike',
n.source_name=$source_name,
n.source_url=$source_url,
n.extraction_schema='kg_schema_v1',
n.review_status='auto_published',
n.confidence=$confidence,
n.evidence_quote=$evidence_quote
""",
{
"id": row["temp_id"],
"name": row.get("name") or "",
"concept_type": row.get("concept_type") or "",
"description": row.get("description") or "",
"confidence": float(row.get("confidence") or 0),
"evidence_quote": first_quote(row),
"source_name": BAIDU_BAIKE_SOURCE_NAME,
"source_url": BAIDU_BAIKE_SOURCE_URL,
},
)
node_ids = entity_ids | event_ids | concept_ids
edge_count = 0
literal_count = 0
explicit_edges: set[tuple[str, str, str]] = set()
for row in payload["statements"]:
subj = row.get("subject_ref")
pred = rel_type(str(row.get("predicate") or "RELATED_TO"))
obj = str(row.get("object_ref") or "")
kind = row.get("object_kind")
if not subj:
continue
params = {
"sid": subj,
"confidence": float(row.get("confidence") or 0),
"evidence_quote": first_quote(row),
"source": "baidu_baike",
"source_name": BAIDU_BAIKE_SOURCE_NAME,
"source_url": BAIDU_BAIKE_SOURCE_URL,
}
if kind == "literal" or obj not in node_ids:
lid = literal_id(row)
params.update({"oid": lid, "value": obj, "predicate": pred})
graph.query(
"""
MERGE (o:Literal {id:$oid})
SET o.name=$value, o.value=$value, o.predicate=$predicate,
o.source='baidu_baike',
o.source_name=$source_name,
o.source_url=$source_url,
o.extraction_schema='kg_schema_v1'
""",
params,
)
literal_count += 1
else:
params["oid"] = obj
explicit_edges.add((str(subj), pred, str(params["oid"])))
graph.query(
f"""
MATCH (s {{id:$sid}})
MATCH (o {{id:$oid}})
MERGE (s)-[r:{pred}]->(o)
SET r.confidence=$confidence,
r.evidence_quote=$evidence_quote,
r.source=$source,
r.source_name=$source_name,
r.source_url=$source_url,
r.extraction_schema='kg_schema_v1'
""",
params,
)
edge_count += 1
for row in payload["events"]:
location_ref = row.get("location_ref") or ROOT_PLACE_ID
event_id = row.get("temp_id")
if not event_id or (location_ref, "HAS_EVENT", event_id) in explicit_edges:
continue
graph.query(
"""
MATCH (p {id:$pid})
MATCH (e:Event {id:$eid})
MERGE (p)-[r:HAS_EVENT]->(e)
SET r.confidence=$confidence,
r.event_type=$event_type,
r.event_date=$time_text,
r.event_time=$time_text,
r.evidence_id=$evidence_id,
r.evidence_quote=$evidence_quote,
r.evidence_url=$source_url,
r.source='baidu_baike',
r.source_name=$source_name,
r.source_url=$source_url,
r.extraction_schema='kg_schema_v1',
r.inferred_from='event.location_ref'
""",
{
"pid": location_ref,
"eid": event_id,
"confidence": float(row.get("confidence") or 0),
"event_type": row.get("event_type") or "",
"time_text": row.get("time_text") or "",
"evidence_id": first_evidence_id(row),
"evidence_quote": first_quote(row),
"source_name": BAIDU_BAIKE_SOURCE_NAME,
"source_url": BAIDU_BAIKE_SOURCE_URL,
},
)
edge_count += 1
for row in payload["concepts"]:
concept_id = row.get("temp_id")
if not concept_id or (ROOT_PLACE_ID, "HAS_CONCEPT", concept_id) in explicit_edges:
continue
graph.query(
"""
MATCH (p {id:$pid})
MATCH (c:Concept {id:$cid})
MERGE (p)-[r:HAS_CONCEPT]->(c)
SET r.confidence=$confidence,
r.evidence_quote=$evidence_quote,
r.source='baidu_baike',
r.source_name=$source_name,
r.source_url=$source_url,
r.extraction_schema='kg_schema_v1',
r.inferred_from='document_anchor'
""",
{
"pid": ROOT_PLACE_ID,
"cid": concept_id,
"confidence": float(row.get("confidence") or 0),
"evidence_quote": first_quote(row),
"source_name": BAIDU_BAIKE_SOURCE_NAME,
"source_url": BAIDU_BAIKE_SOURCE_URL,
},
)
edge_count += 1
print({
"graph": settings.falkordb_graph,
"entities": len(payload["entities"]),
"events": len(payload["events"]),
"concepts": len(payload["concepts"]),
"statements": len(payload["statements"]),
"literal_nodes": literal_count,
"edges": edge_count,
})
if __name__ == "__main__":
main()