149 lines
4.8 KiB
Python
149 lines
4.8 KiB
Python
"""Phase 1 — Field Mapping Agent.
|
||
|
||
Suggests which CSV columns map to which entity/relation fields.
|
||
Uses LLM to analyze column names + sample values against ontology schema targets.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from dataclasses import dataclass, field
|
||
|
||
from app.config import settings
|
||
from app.llm_client import LlmClient
|
||
|
||
SYSTEM_PROMPT = """你是一个知识图谱数据映射专家。给一个 CSV 的列名和样本值、以及目标 Schema 的实体类型和字段定义,
|
||
输出每个列的推荐映射关系。输出 JSON。
|
||
|
||
规则:
|
||
1. 精确匹配列名和字段名(如 "name" → "name")
|
||
2. 语义匹配(如 "地址" → "address", "电话" → "phone")
|
||
3. 值类型推断(如样本全是数字的可能是 "price" 或 "rating")
|
||
4. 无法映射的列标记为 "unmapped"
|
||
|
||
只输出 JSON。"""
|
||
|
||
USER_TEMPLATE = """CSV 列名 + 样本值(每列最多 3 个样本):
|
||
{columns_json}
|
||
|
||
目标 Schema — 实体类型和字段:
|
||
{schema_json}
|
||
|
||
输出 schema:
|
||
{{
|
||
"mappings": [
|
||
{{
|
||
"source_column": "列名",
|
||
"target_entity_type": "目标实体类型",
|
||
"target_field": "目标字段名",
|
||
"confidence": 0.0~1.0
|
||
}}
|
||
],
|
||
"unmapped_columns": ["列名"],
|
||
"suggested_new_fields": [
|
||
{{
|
||
"column_name": "列名",
|
||
"suggested_entity_type": "建议关联的实体类型",
|
||
"sample_values": ["值1", "值2"],
|
||
"reason": "一句话理由"
|
||
}}
|
||
]
|
||
}}"""
|
||
|
||
|
||
@dataclass
|
||
class MappingSuggestion:
|
||
source_column: str
|
||
target_entity_type: str
|
||
target_field: str
|
||
confidence: float = 0.5
|
||
|
||
|
||
@dataclass
|
||
class FieldMappingResult:
|
||
mappings: list[MappingSuggestion] = field(default_factory=list)
|
||
unmapped_columns: list[str] = field(default_factory=list)
|
||
suggested_new_fields: list[dict] = field(default_factory=list)
|
||
proposals: list[dict] = field(default_factory=list) # Schema extension proposals
|
||
|
||
|
||
def _rule_based_mapping(columns: list[str], samples: dict[str, list], schema_targets: dict) -> FieldMappingResult:
|
||
"""Fallback rule-based mapping when LLM is unavailable."""
|
||
NAME_MAP: dict[str, tuple[str, str]] = {
|
||
"name": ("POI", "name"),
|
||
"名称": ("POI", "name"),
|
||
"address": ("POI", "address"),
|
||
"地址": ("POI", "address"),
|
||
"phone": ("POI", "contact"),
|
||
"电话": ("POI", "contact"),
|
||
"open": ("POI", "opening_hours"),
|
||
"开放时间": ("POI", "opening_hours"),
|
||
"price": ("POI", "price_level"),
|
||
"价格": ("POI", "price_level"),
|
||
"lat": ("POI", "latitude"),
|
||
"lng": ("POI", "longitude"),
|
||
"lon": ("POI", "longitude"),
|
||
"type": ("POI", "category"),
|
||
"类型": ("POI", "category"),
|
||
}
|
||
|
||
mappings = []
|
||
unmapped = []
|
||
for col in columns:
|
||
key = col.lower().strip()
|
||
if key in NAME_MAP:
|
||
et, field = NAME_MAP[key]
|
||
mappings.append(MappingSuggestion(col, et, field, 0.9))
|
||
else:
|
||
unmapped.append(col)
|
||
|
||
return FieldMappingResult(mappings=mappings, unmapped_columns=unmapped)
|
||
|
||
|
||
async def suggest_field_mapping(
|
||
columns: list[str],
|
||
samples: dict[str, list],
|
||
schema_targets: dict,
|
||
llm: LlmClient | None = None,
|
||
) -> FieldMappingResult:
|
||
"""Suggest field mappings for a CSV import.
|
||
|
||
Args:
|
||
columns: CSV column names
|
||
samples: {column_name: [sample_values]}
|
||
schema_targets: Ontology schema targets (entity types + fields)
|
||
llm: Optional LLM client
|
||
"""
|
||
if llm is None:
|
||
llm = LlmClient.from_settings() if settings.llm_api_key else None
|
||
|
||
if llm and llm.available():
|
||
try:
|
||
columns_json = json.dumps(
|
||
{col: samples.get(col, [])[:3] for col in columns},
|
||
ensure_ascii=False, indent=2,
|
||
)
|
||
schema_json = json.dumps(schema_targets, ensure_ascii=False, indent=2)
|
||
result = llm.chat_json(
|
||
system=SYSTEM_PROMPT,
|
||
user=USER_TEMPLATE.format(columns_json=columns_json, schema_json=schema_json),
|
||
)
|
||
mappings = [
|
||
MappingSuggestion(
|
||
source_column=m["source_column"],
|
||
target_entity_type=m["target_entity_type"],
|
||
target_field=m["target_field"],
|
||
confidence=m.get("confidence", 0.5),
|
||
)
|
||
for m in result.get("mappings", [])
|
||
]
|
||
return FieldMappingResult(
|
||
mappings=mappings,
|
||
unmapped_columns=result.get("unmapped_columns", []),
|
||
suggested_new_fields=result.get("suggested_new_fields", []),
|
||
proposals=result.get("suggested_new_fields", []), # Schema extension proposals
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
return _rule_based_mapping(columns, samples, schema_targets)
|