169 lines
5.3 KiB
Python
169 lines
5.3 KiB
Python
"""STEP 05 — Entity Alignment Agent.
|
||
|
||
Given new candidate entities, determines whether they should be:
|
||
- merged into an existing entity (duplicate detection)
|
||
- created as new
|
||
- rejected outright
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
from typing import Literal
|
||
|
||
from app.config import settings
|
||
from app.db import list_vocabulary_terms, get_conn
|
||
from app.llm_client import LlmClient
|
||
|
||
SYSTEM_PROMPT = """你是一个知识图谱实体归一专家。给你一个新候选实体和一批已存在的相似实体,
|
||
判断新实体是已有实体的别名/重复,还是全新实体。输出 JSON。
|
||
|
||
判断依据:
|
||
1. 名称相似度(包含别名匹配)
|
||
2. 类型一致性(同类型更可能重复)
|
||
3. 地理位置重叠(地址/坐标接近)
|
||
4. 核心字段重叠(opening_hours / contact 等)
|
||
|
||
动作:
|
||
- merge: 重复,合并到已有实体
|
||
- create: 全新实体,正常创建
|
||
- reject: 无效数据,拒绝
|
||
|
||
只输出 JSON。"""
|
||
|
||
USER_TEMPLATE = """新候选实体:
|
||
{new_candidate}
|
||
|
||
已存在相似实体:
|
||
{existing}
|
||
|
||
词汇表(标准名+别名):
|
||
{vocabulary}
|
||
|
||
输出 schema:
|
||
{{
|
||
"candidate_id": int,
|
||
"action": "merge|create|reject",
|
||
"target_entity_id": "string|null",
|
||
"confidence": 0.0~1.0,
|
||
"reasons": ["理由1", "理由2"]
|
||
}}"""
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class AlignmentDecision:
|
||
candidate_id: int
|
||
action: Literal["merge", "create", "reject"]
|
||
target_entity_id: str | None
|
||
confidence: float
|
||
reasons: list[str]
|
||
|
||
|
||
async def _find_similar_entities(candidate: dict) -> list[dict]:
|
||
"""Find existing entities with similar names/types."""
|
||
s = settings.db_schema
|
||
name = (candidate.get("natural_key") or "").strip()
|
||
if not name:
|
||
return []
|
||
|
||
async with get_conn() as conn:
|
||
async with conn.cursor() as cur:
|
||
await cur.execute(
|
||
f"""SELECT id, natural_key, entity_type, payload
|
||
FROM {s}.candidate_entities
|
||
WHERE tenant_id=%s AND project_id=%s
|
||
AND entity_type=%s
|
||
AND status IN ('published', 'approved')
|
||
AND (natural_key ILIKE %s OR payload::text ILIKE %s)
|
||
LIMIT 10""",
|
||
(
|
||
settings.default_tenant,
|
||
settings.default_project,
|
||
candidate.get("entity_type", ""),
|
||
f"%{name[:5]}%",
|
||
f"%{name[:5]}%",
|
||
),
|
||
)
|
||
return await cur.fetchall()
|
||
|
||
|
||
async def align_single_candidate(
|
||
candidate: dict,
|
||
llm: LlmClient | None = None,
|
||
) -> AlignmentDecision:
|
||
"""Align a single candidate against existing entities and vocabulary."""
|
||
cid = candidate["id"]
|
||
|
||
# Find similar
|
||
similar = await _find_similar_entities(candidate)
|
||
vocab_terms = await list_vocabulary_terms(
|
||
settings.default_tenant,
|
||
settings.default_project,
|
||
candidate.get("entity_type"),
|
||
limit=50,
|
||
)
|
||
|
||
if llm and llm.available() and similar:
|
||
import json
|
||
try:
|
||
result = llm.chat_json(
|
||
system=SYSTEM_PROMPT,
|
||
user=USER_TEMPLATE.format(
|
||
new_candidate=json.dumps(
|
||
{"id": cid, "name": candidate.get("natural_key"), "type": candidate.get("entity_type")},
|
||
ensure_ascii=False,
|
||
),
|
||
existing=json.dumps(
|
||
[{"id": e["id"], "name": e["natural_key"], "type": e["entity_type"]} for e in similar],
|
||
ensure_ascii=False,
|
||
),
|
||
vocabulary=json.dumps(
|
||
[{"name": v["canonical_name"], "aliases": v.get("aliases", [])} for v in vocab_terms],
|
||
ensure_ascii=False,
|
||
),
|
||
),
|
||
)
|
||
return AlignmentDecision(
|
||
candidate_id=cid,
|
||
action=result.get("action", "create"),
|
||
target_entity_id=result.get("target_entity_id"),
|
||
confidence=float(result.get("confidence", 0.5)),
|
||
reasons=result.get("reasons", []),
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# Rule-based fallback: exact name match → merge
|
||
for s in similar:
|
||
if s["natural_key"] == candidate.get("natural_key"):
|
||
return AlignmentDecision(
|
||
candidate_id=cid,
|
||
action="merge",
|
||
target_entity_id=str(s["id"]),
|
||
confidence=0.95,
|
||
reasons=["规则匹配: 名称完全一致"],
|
||
)
|
||
|
||
return AlignmentDecision(
|
||
candidate_id=cid,
|
||
action="create",
|
||
target_entity_id=None,
|
||
confidence=0.6,
|
||
reasons=["无相似实体或LLM不可用"],
|
||
)
|
||
|
||
|
||
async def align_candidates(candidates: list[dict]) -> list[dict]:
|
||
"""Batch align candidates."""
|
||
llm = LlmClient.from_settings() if settings.llm_api_key else None
|
||
results = []
|
||
for c in candidates:
|
||
decision = await align_single_candidate(c, llm)
|
||
results.append({
|
||
"candidate_id": decision.candidate_id,
|
||
"action": decision.action,
|
||
"target_entity_id": decision.target_entity_id,
|
||
"confidence": decision.confidence,
|
||
"reasons": decision.reasons,
|
||
})
|
||
return results
|