Files
bxh/app/agents/field_mapping.py

149 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Phase 1 — Field Mapping Agent.
Suggests which CSV columns map to which entity/relation fields.
Uses LLM to analyze column names + sample values against ontology schema targets.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from app.config import settings
from app.llm_client import LlmClient
SYSTEM_PROMPT = """你是一个知识图谱数据映射专家。给一个 CSV 的列名和样本值、以及目标 Schema 的实体类型和字段定义,
输出每个列的推荐映射关系。输出 JSON。
规则:
1. 精确匹配列名和字段名(如 "name""name"
2. 语义匹配(如 "地址""address", "电话""phone"
3. 值类型推断(如样本全是数字的可能是 "price""rating"
4. 无法映射的列标记为 "unmapped"
只输出 JSON。"""
USER_TEMPLATE = """CSV 列名 + 样本值(每列最多 3 个样本):
{columns_json}
目标 Schema — 实体类型和字段:
{schema_json}
输出 schema
{{
"mappings": [
{{
"source_column": "列名",
"target_entity_type": "目标实体类型",
"target_field": "目标字段名",
"confidence": 0.0~1.0
}}
],
"unmapped_columns": ["列名"],
"suggested_new_fields": [
{{
"column_name": "列名",
"suggested_entity_type": "建议关联的实体类型",
"sample_values": ["值1", "值2"],
"reason": "一句话理由"
}}
]
}}"""
@dataclass
class MappingSuggestion:
source_column: str
target_entity_type: str
target_field: str
confidence: float = 0.5
@dataclass
class FieldMappingResult:
mappings: list[MappingSuggestion] = field(default_factory=list)
unmapped_columns: list[str] = field(default_factory=list)
suggested_new_fields: list[dict] = field(default_factory=list)
proposals: list[dict] = field(default_factory=list) # Schema extension proposals
def _rule_based_mapping(columns: list[str], samples: dict[str, list], schema_targets: dict) -> FieldMappingResult:
"""Fallback rule-based mapping when LLM is unavailable."""
NAME_MAP: dict[str, tuple[str, str]] = {
"name": ("POI", "name"),
"名称": ("POI", "name"),
"address": ("POI", "address"),
"地址": ("POI", "address"),
"phone": ("POI", "contact"),
"电话": ("POI", "contact"),
"open": ("POI", "opening_hours"),
"开放时间": ("POI", "opening_hours"),
"price": ("POI", "price_level"),
"价格": ("POI", "price_level"),
"lat": ("POI", "latitude"),
"lng": ("POI", "longitude"),
"lon": ("POI", "longitude"),
"type": ("POI", "category"),
"类型": ("POI", "category"),
}
mappings = []
unmapped = []
for col in columns:
key = col.lower().strip()
if key in NAME_MAP:
et, field = NAME_MAP[key]
mappings.append(MappingSuggestion(col, et, field, 0.9))
else:
unmapped.append(col)
return FieldMappingResult(mappings=mappings, unmapped_columns=unmapped)
async def suggest_field_mapping(
columns: list[str],
samples: dict[str, list],
schema_targets: dict,
llm: LlmClient | None = None,
) -> FieldMappingResult:
"""Suggest field mappings for a CSV import.
Args:
columns: CSV column names
samples: {column_name: [sample_values]}
schema_targets: Ontology schema targets (entity types + fields)
llm: Optional LLM client
"""
if llm is None:
llm = LlmClient.from_settings() if settings.llm_api_key else None
if llm and llm.available():
try:
columns_json = json.dumps(
{col: samples.get(col, [])[:3] for col in columns},
ensure_ascii=False, indent=2,
)
schema_json = json.dumps(schema_targets, ensure_ascii=False, indent=2)
result = llm.chat_json(
system=SYSTEM_PROMPT,
user=USER_TEMPLATE.format(columns_json=columns_json, schema_json=schema_json),
)
mappings = [
MappingSuggestion(
source_column=m["source_column"],
target_entity_type=m["target_entity_type"],
target_field=m["target_field"],
confidence=m.get("confidence", 0.5),
)
for m in result.get("mappings", [])
]
return FieldMappingResult(
mappings=mappings,
unmapped_columns=result.get("unmapped_columns", []),
suggested_new_fields=result.get("suggested_new_fields", []),
proposals=result.get("suggested_new_fields", []), # Schema extension proposals
)
except Exception:
pass
return _rule_based_mapping(columns, samples, schema_targets)