#!/usr/bin/env python3 """Build a kg_schema_v1 preview from the Huaxi extraction comparison report. The comparison report intentionally keeps review_status beside each candidate for human reading. The production schema is stricter, so this script removes review-only fields, derives evidence_links, validates the shape, and writes a reviewable summary for discussion before anything is published to the graph. """ from __future__ import annotations import json from copy import deepcopy from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] IN_JSON = ROOT / "docs/reports/huaxi_kg_extraction_comparison.json" SCHEMA_JSON = ROOT / "app/schemas/kg_extraction_v1.schema.json" OUT_JSON = ROOT / "docs/reports/huaxi_kg_schema_v1_ready.json" OUT_REVIEW = ROOT / "docs/reports/huaxi_kg_schema_v1_review_plan.md" def strip_review_fields(row: dict[str, Any]) -> dict[str, Any]: item = deepcopy(row) item.pop("review_status", None) return item def evidence_links_for(target_ref: str, spans: list[dict[str, Any]]) -> list[dict[str, Any]]: links: list[dict[str, Any]] = [] for span in spans or []: evidence_id = span.get("evidence_id") quote = str(span.get("quote") or "").strip() if evidence_id and quote: links.append({ "target_ref": target_ref, "evidence_id": evidence_id, "support_type": "supports", "quote": quote, }) return links def statement_ref(row: dict[str, Any], index: int) -> str: subj = str(row.get("subject_ref") or "") pred = str(row.get("predicate") or "") obj = str(row.get("object_ref") or "")[:48] return f"stmt_{index:03d}:{subj}:{pred}:{obj}" def build_payload(source: dict[str, Any]) -> dict[str, Any]: final = source["final"] payload = { "entities": [strip_review_fields(r) for r in final.get("entities", [])], "events": [strip_review_fields(r) for r in final.get("events", [])], "concepts": [strip_review_fields(r) for r in final.get("concepts", [])], "relations": [strip_review_fields(r) for r in final.get("relations", [])], "statements": [strip_review_fields(r) for r in final.get("statements", [])], "schema_proposals": [strip_review_fields(r) for r in final.get("schema_proposals", [])], "evidence_links": [], } links: list[dict[str, Any]] = [] for row in payload["entities"]: links.extend(evidence_links_for(row["temp_id"], row.get("source_spans") or [])) for row in payload["events"]: links.extend(evidence_links_for(row["temp_id"], row.get("source_spans") or [])) for row in payload["concepts"]: links.extend(evidence_links_for(row["temp_id"], row.get("source_spans") or [])) for idx, row in enumerate(payload["statements"], start=1): links.extend(evidence_links_for(statement_ref(row, idx), row.get("source_spans") or [])) seen = set() for link in links: sig = (link["target_ref"], str(link["evidence_id"]), link["quote"]) if sig in seen: continue seen.add(sig) payload["evidence_links"].append(link) return payload def validate_payload(payload: dict[str, Any]) -> str: try: import jsonschema # type: ignore except Exception: required = [ "entities", "events", "concepts", "relations", "statements", "schema_proposals", "evidence_links", ] missing = [key for key in required if not isinstance(payload.get(key), list)] if missing: raise ValueError(f"missing list fields: {missing}") for row in payload["entities"]: for key in ["temp_id", "name", "entity_type", "confidence"]: if key not in row: raise ValueError(f"entity missing {key}: {row}") for row in payload["events"]: for key in ["temp_id", "title", "event_type", "confidence"]: if key not in row: raise ValueError(f"event missing {key}: {row}") for row in payload["concepts"]: for key in ["temp_id", "name", "concept_type", "confidence"]: if key not in row: raise ValueError(f"concept missing {key}: {row}") for row in payload["relations"]: for key in ["relation_type", "source_type", "target_type", "confidence"]: if key not in row: raise ValueError(f"relation missing {key}: {row}") for row in payload["statements"]: for key in ["subject_ref", "predicate", "object_ref", "object_kind", "confidence"]: if key not in row: raise ValueError(f"statement missing {key}: {row}") return "structural check passed" schema = json.loads(SCHEMA_JSON.read_text(encoding="utf-8")) jsonschema.Draft202012Validator(schema).validate(payload) return "valid" AUTO_PUBLISH_THRESHOLD = 0.8 def review_bucket(row: dict[str, Any], kind: str) -> str: confidence = float(row.get("confidence") or 0) has_disagreement = bool(row.get("model_disagreement") or row.get("conflict")) if kind == "schema_proposal": if confidence >= AUTO_PUBLISH_THRESHOLD and not has_disagreement: return "进入 Schema Proposal 队列:不阻塞数据入图" return "人工审核:schema 建议分数低或模型不一致" if confidence >= AUTO_PUBLISH_THRESHOLD and not has_disagreement: return "自动入候选发布:多模型一致且分数 >= 0.8" return "人工审核:模型不一致或最终分数 < 0.8" def write_review_plan(raw: dict[str, Any], payload: dict[str, Any], validation: str) -> None: counts = {k: len(payload[k]) for k in [ "entities", "events", "concepts", "relations", "statements", "schema_proposals", "evidence_links", ]} lines = [ "# 花溪公园 kg_schema_v1 效果与人工审核方案", "", "## 这次新模式会得到什么", "", f"- Entity:{counts['entities']} 个", f"- Event:{counts['events']} 个", f"- Concept:{counts['concepts']} 个", f"- Relation Schema:{counts['relations']} 个", f"- Statement:{counts['statements']} 条", f"- Schema Proposal:{counts['schema_proposals']} 条", f"- Evidence Link:{counts['evidence_links']} 条", f"- JSON Schema 校验:{validation}", "", "## 和旧 web_agent 软字段模式的差别", "", "| 维度 | 旧模式 | kg_schema_v1 新模式 |", "|---|---|---|", "| 抽取目标 | summary/history/features 等软字段 + schema_gaps | Entity/Event/Concept/Relation/Statement 全量候选 |", "| 基本信息 | 容易被放进 schema_gaps,不能稳定入图 | 地址、开放时间、门票、面积、等级等变成可入图 Statement |", "| 历史信息 | 部分进入 history 文本 | 变成 Event 节点,并通过 HAS_EVENT 连到地点 |", "| 推荐语义 | 缺少概念层 | 历史文化、生态公园、亲水游憩等变成 Concept |", "| 人工审核 | 主要审候选实体字段 | 审 Entity/Event/Concept/Statement/SchemaProposal,能细粒度放行 |", "", "## 人工介入规则", "", "1. 多模型结果一致、证据 quote 可定位、final_score/confidence >= 0.8:自动进入发布候选,可直接写入正式图谱。", "2. 多模型结果不一致、存在冲突字段、或 final_score/confidence < 0.8:转人工审核。", "3. 新关系、新事件类型、新字段进入 Schema Proposal 队列;它不阻塞事实数据入图,但 schema 是否升级单独治理。", "4. 人工只处理低分/冲突/拿不准的数据,不再默认审所有候选。", "5. 发布时每条节点和关系保留 evidence_id、quote、confidence,后续可追溯和回滚。", "", "## 建议入图谱结构", "", "```cypher", "(:Place {id:'ent_huaxi_park', name:'花溪公园'})", "(:Event {id:'evt_1937_park_build', event_id:'evt_1937_park_build', title:'花溪正式辟建为公园', event_date:'1937年', event_date_norm:'1937', event_type:'ConstructionEvent', evidence_id, evidence_quote})", "(:Concept {id:'cpt_historical_culture', name:'历史文化景区'})", "(:Place)-[:HAS_EVENT {event_date, event_type, evidence_id, confidence}]->(:Event)", "(:Place)-[:HAS_CONCEPT {evidence_id, confidence}]->(:Concept)", "(:Place)-[:HAS_PART {evidence_id, confidence}]->(:ScenicSpot)", "(:Place)-[:NEARBY_ATTRACTION {distance_text, evidence_id}]->(:Place)", "```", "", "## 自动发布 / 人工审核判定", "", "| 类型 | ID/关系 | 名称/内容 | 置信度 | 建议 |", "|---|---|---|---:|---|", ] for kind, key, label_key in [ ("entity", "entities", "name"), ("event", "events", "title"), ("concept", "concepts", "name"), ]: for row in payload[key]: lines.append( f"| {kind} | {row.get('temp_id')} | {row.get(label_key)} | " f"{float(row.get('confidence') or 0):.2f} | {review_bucket(row, kind)} |" ) for row in payload["schema_proposals"]: lines.append( f"| schema_proposal | {row.get('proposal_type')} | {row.get('name')} | " f"{float(row.get('confidence') or 0):.2f} | {review_bucket(row, 'schema_proposal')} |" ) lines += [ "", "## 当前结论", "", "花溪公园这份百度百科证据使用 kg_schema_v1 后,已经足够形成一批完整候选知识。", "但为了保证城市知识图谱可长期复用,正确做法不是所有条目都人工审,而是:", "", "```text", "Evidence -> kg_schema_v1 多模型抽取 -> 决策合并/评分", "-> final_score >= 0.8 且无冲突:自动发布到 FalkorDB", "-> final_score < 0.8 或模型冲突:进入人工审核", "```", "", "对应严格 JSON 输出:`docs/reports/huaxi_kg_schema_v1_ready.json`", ] OUT_REVIEW.write_text("\n".join(lines), encoding="utf-8") def main() -> None: raw = json.loads(IN_JSON.read_text(encoding="utf-8")) payload = build_payload(raw) validation = validate_payload(payload) OUT_JSON.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") write_review_plan(raw, payload, validation) print(OUT_JSON) print(OUT_REVIEW) print({k: len(v) for k, v in payload.items() if isinstance(v, list)}) print(validation) if __name__ == "__main__": main()