239 lines
11 KiB
Python
239 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""Build a kg_schema_v1 preview from the Huaxi extraction comparison report.
|
||
|
||
The comparison report intentionally keeps review_status beside each candidate
|
||
for human reading. The production schema is stricter, so this script removes
|
||
review-only fields, derives evidence_links, validates the shape, and writes a
|
||
reviewable summary for discussion before anything is published to the graph.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from copy import deepcopy
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
ROOT = Path(__file__).resolve().parents[1]
|
||
IN_JSON = ROOT / "docs/reports/huaxi_kg_extraction_comparison.json"
|
||
SCHEMA_JSON = ROOT / "app/schemas/kg_extraction_v1.schema.json"
|
||
OUT_JSON = ROOT / "docs/reports/huaxi_kg_schema_v1_ready.json"
|
||
OUT_REVIEW = ROOT / "docs/reports/huaxi_kg_schema_v1_review_plan.md"
|
||
|
||
|
||
def strip_review_fields(row: dict[str, Any]) -> dict[str, Any]:
|
||
item = deepcopy(row)
|
||
item.pop("review_status", None)
|
||
return item
|
||
|
||
|
||
def evidence_links_for(target_ref: str, spans: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||
links: list[dict[str, Any]] = []
|
||
for span in spans or []:
|
||
evidence_id = span.get("evidence_id")
|
||
quote = str(span.get("quote") or "").strip()
|
||
if evidence_id and quote:
|
||
links.append({
|
||
"target_ref": target_ref,
|
||
"evidence_id": evidence_id,
|
||
"support_type": "supports",
|
||
"quote": quote,
|
||
})
|
||
return links
|
||
|
||
|
||
def statement_ref(row: dict[str, Any], index: int) -> str:
|
||
subj = str(row.get("subject_ref") or "")
|
||
pred = str(row.get("predicate") or "")
|
||
obj = str(row.get("object_ref") or "")[:48]
|
||
return f"stmt_{index:03d}:{subj}:{pred}:{obj}"
|
||
|
||
|
||
def build_payload(source: dict[str, Any]) -> dict[str, Any]:
|
||
final = source["final"]
|
||
payload = {
|
||
"entities": [strip_review_fields(r) for r in final.get("entities", [])],
|
||
"events": [strip_review_fields(r) for r in final.get("events", [])],
|
||
"concepts": [strip_review_fields(r) for r in final.get("concepts", [])],
|
||
"relations": [strip_review_fields(r) for r in final.get("relations", [])],
|
||
"statements": [strip_review_fields(r) for r in final.get("statements", [])],
|
||
"schema_proposals": [strip_review_fields(r) for r in final.get("schema_proposals", [])],
|
||
"evidence_links": [],
|
||
}
|
||
|
||
links: list[dict[str, Any]] = []
|
||
for row in payload["entities"]:
|
||
links.extend(evidence_links_for(row["temp_id"], row.get("source_spans") or []))
|
||
for row in payload["events"]:
|
||
links.extend(evidence_links_for(row["temp_id"], row.get("source_spans") or []))
|
||
for row in payload["concepts"]:
|
||
links.extend(evidence_links_for(row["temp_id"], row.get("source_spans") or []))
|
||
for idx, row in enumerate(payload["statements"], start=1):
|
||
links.extend(evidence_links_for(statement_ref(row, idx), row.get("source_spans") or []))
|
||
|
||
seen = set()
|
||
for link in links:
|
||
sig = (link["target_ref"], str(link["evidence_id"]), link["quote"])
|
||
if sig in seen:
|
||
continue
|
||
seen.add(sig)
|
||
payload["evidence_links"].append(link)
|
||
|
||
return payload
|
||
|
||
|
||
def validate_payload(payload: dict[str, Any]) -> str:
|
||
try:
|
||
import jsonschema # type: ignore
|
||
except Exception:
|
||
required = [
|
||
"entities", "events", "concepts", "relations", "statements",
|
||
"schema_proposals", "evidence_links",
|
||
]
|
||
missing = [key for key in required if not isinstance(payload.get(key), list)]
|
||
if missing:
|
||
raise ValueError(f"missing list fields: {missing}")
|
||
for row in payload["entities"]:
|
||
for key in ["temp_id", "name", "entity_type", "confidence"]:
|
||
if key not in row:
|
||
raise ValueError(f"entity missing {key}: {row}")
|
||
for row in payload["events"]:
|
||
for key in ["temp_id", "title", "event_type", "confidence"]:
|
||
if key not in row:
|
||
raise ValueError(f"event missing {key}: {row}")
|
||
for row in payload["concepts"]:
|
||
for key in ["temp_id", "name", "concept_type", "confidence"]:
|
||
if key not in row:
|
||
raise ValueError(f"concept missing {key}: {row}")
|
||
for row in payload["relations"]:
|
||
for key in ["relation_type", "source_type", "target_type", "confidence"]:
|
||
if key not in row:
|
||
raise ValueError(f"relation missing {key}: {row}")
|
||
for row in payload["statements"]:
|
||
for key in ["subject_ref", "predicate", "object_ref", "object_kind", "confidence"]:
|
||
if key not in row:
|
||
raise ValueError(f"statement missing {key}: {row}")
|
||
return "structural check passed"
|
||
|
||
schema = json.loads(SCHEMA_JSON.read_text(encoding="utf-8"))
|
||
jsonschema.Draft202012Validator(schema).validate(payload)
|
||
return "valid"
|
||
|
||
|
||
AUTO_PUBLISH_THRESHOLD = 0.8
|
||
|
||
|
||
def review_bucket(row: dict[str, Any], kind: str) -> str:
|
||
confidence = float(row.get("confidence") or 0)
|
||
has_disagreement = bool(row.get("model_disagreement") or row.get("conflict"))
|
||
if kind == "schema_proposal":
|
||
if confidence >= AUTO_PUBLISH_THRESHOLD and not has_disagreement:
|
||
return "进入 Schema Proposal 队列:不阻塞数据入图"
|
||
return "人工审核:schema 建议分数低或模型不一致"
|
||
if confidence >= AUTO_PUBLISH_THRESHOLD and not has_disagreement:
|
||
return "自动入候选发布:多模型一致且分数 >= 0.8"
|
||
return "人工审核:模型不一致或最终分数 < 0.8"
|
||
|
||
|
||
def write_review_plan(raw: dict[str, Any], payload: dict[str, Any], validation: str) -> None:
|
||
counts = {k: len(payload[k]) for k in [
|
||
"entities", "events", "concepts", "relations",
|
||
"statements", "schema_proposals", "evidence_links",
|
||
]}
|
||
lines = [
|
||
"# 花溪公园 kg_schema_v1 效果与人工审核方案",
|
||
"",
|
||
"## 这次新模式会得到什么",
|
||
"",
|
||
f"- Entity:{counts['entities']} 个",
|
||
f"- Event:{counts['events']} 个",
|
||
f"- Concept:{counts['concepts']} 个",
|
||
f"- Relation Schema:{counts['relations']} 个",
|
||
f"- Statement:{counts['statements']} 条",
|
||
f"- Schema Proposal:{counts['schema_proposals']} 条",
|
||
f"- Evidence Link:{counts['evidence_links']} 条",
|
||
f"- JSON Schema 校验:{validation}",
|
||
"",
|
||
"## 和旧 web_agent 软字段模式的差别",
|
||
"",
|
||
"| 维度 | 旧模式 | kg_schema_v1 新模式 |",
|
||
"|---|---|---|",
|
||
"| 抽取目标 | summary/history/features 等软字段 + schema_gaps | Entity/Event/Concept/Relation/Statement 全量候选 |",
|
||
"| 基本信息 | 容易被放进 schema_gaps,不能稳定入图 | 地址、开放时间、门票、面积、等级等变成可入图 Statement |",
|
||
"| 历史信息 | 部分进入 history 文本 | 变成 Event 节点,并通过 HAS_EVENT 连到地点 |",
|
||
"| 推荐语义 | 缺少概念层 | 历史文化、生态公园、亲水游憩等变成 Concept |",
|
||
"| 人工审核 | 主要审候选实体字段 | 审 Entity/Event/Concept/Statement/SchemaProposal,能细粒度放行 |",
|
||
"",
|
||
"## 人工介入规则",
|
||
"",
|
||
"1. 多模型结果一致、证据 quote 可定位、final_score/confidence >= 0.8:自动进入发布候选,可直接写入正式图谱。",
|
||
"2. 多模型结果不一致、存在冲突字段、或 final_score/confidence < 0.8:转人工审核。",
|
||
"3. 新关系、新事件类型、新字段进入 Schema Proposal 队列;它不阻塞事实数据入图,但 schema 是否升级单独治理。",
|
||
"4. 人工只处理低分/冲突/拿不准的数据,不再默认审所有候选。",
|
||
"5. 发布时每条节点和关系保留 evidence_id、quote、confidence,后续可追溯和回滚。",
|
||
"",
|
||
"## 建议入图谱结构",
|
||
"",
|
||
"```cypher",
|
||
"(:Place {id:'ent_huaxi_park', name:'花溪公园'})",
|
||
"(:Event {id:'evt_1937_park_build', event_id:'evt_1937_park_build', title:'花溪正式辟建为公园', event_date:'1937年', event_date_norm:'1937', event_type:'ConstructionEvent', evidence_id, evidence_quote})",
|
||
"(:Concept {id:'cpt_historical_culture', name:'历史文化景区'})",
|
||
"(:Place)-[:HAS_EVENT {event_date, event_type, evidence_id, confidence}]->(:Event)",
|
||
"(:Place)-[:HAS_CONCEPT {evidence_id, confidence}]->(:Concept)",
|
||
"(:Place)-[:HAS_PART {evidence_id, confidence}]->(:ScenicSpot)",
|
||
"(:Place)-[:NEARBY_ATTRACTION {distance_text, evidence_id}]->(:Place)",
|
||
"```",
|
||
"",
|
||
"## 自动发布 / 人工审核判定",
|
||
"",
|
||
"| 类型 | ID/关系 | 名称/内容 | 置信度 | 建议 |",
|
||
"|---|---|---|---:|---|",
|
||
]
|
||
for kind, key, label_key in [
|
||
("entity", "entities", "name"),
|
||
("event", "events", "title"),
|
||
("concept", "concepts", "name"),
|
||
]:
|
||
for row in payload[key]:
|
||
lines.append(
|
||
f"| {kind} | {row.get('temp_id')} | {row.get(label_key)} | "
|
||
f"{float(row.get('confidence') or 0):.2f} | {review_bucket(row, kind)} |"
|
||
)
|
||
for row in payload["schema_proposals"]:
|
||
lines.append(
|
||
f"| schema_proposal | {row.get('proposal_type')} | {row.get('name')} | "
|
||
f"{float(row.get('confidence') or 0):.2f} | {review_bucket(row, 'schema_proposal')} |"
|
||
)
|
||
|
||
lines += [
|
||
"",
|
||
"## 当前结论",
|
||
"",
|
||
"花溪公园这份百度百科证据使用 kg_schema_v1 后,已经足够形成一批完整候选知识。",
|
||
"但为了保证城市知识图谱可长期复用,正确做法不是所有条目都人工审,而是:",
|
||
"",
|
||
"```text",
|
||
"Evidence -> kg_schema_v1 多模型抽取 -> 决策合并/评分",
|
||
"-> final_score >= 0.8 且无冲突:自动发布到 FalkorDB",
|
||
"-> final_score < 0.8 或模型冲突:进入人工审核",
|
||
"```",
|
||
"",
|
||
"对应严格 JSON 输出:`docs/reports/huaxi_kg_schema_v1_ready.json`",
|
||
]
|
||
OUT_REVIEW.write_text("\n".join(lines), encoding="utf-8")
|
||
|
||
|
||
def main() -> None:
|
||
raw = json.loads(IN_JSON.read_text(encoding="utf-8"))
|
||
payload = build_payload(raw)
|
||
validation = validate_payload(payload)
|
||
OUT_JSON.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
write_review_plan(raw, payload, validation)
|
||
print(OUT_JSON)
|
||
print(OUT_REVIEW)
|
||
print({k: len(v) for k, v in payload.items() if isinstance(v, list)})
|
||
print(validation)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|