426 lines
17 KiB
Python
426 lines
17 KiB
Python
"""STEP 02 — Knowledge Plaza overview, usage, alerts."""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import math
|
||
import re
|
||
import time
|
||
from typing import Any
|
||
|
||
import h3
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
|
||
from app.auth import CurrentUser
|
||
from app.config import settings
|
||
from app.db import get_agent_settings, get_plaza_overview, get_plaza_alerts, get_conn
|
||
from app.llm_client import LlmClient
|
||
from app.project_context import ProjectContext, get_project_context
|
||
|
||
router = APIRouter()
|
||
|
||
SPATIAL_GRAPH_NAME = "guiyang_spatial_v1"
|
||
|
||
CATEGORY_ALIASES: dict[str, list[str]] = {
|
||
"美食": ["美食", "吃", "餐厅", "饭店", "火锅", "小吃", "烧烤", "咖啡", "奶茶", "酸汤鱼"],
|
||
"景点": ["景点", "景区", "公园", "博物馆", "古镇", "夜游", "历史", "文化", "好玩"],
|
||
"酒店": ["酒店", "住宿", "住", "宾馆", "民宿"],
|
||
"商场": ["商场", "购物", "商圈", "超市", "商城"],
|
||
"医疗保健": ["医院", "诊所", "药店", "医疗", "看病", "急诊"],
|
||
"交通设施": ["地铁", "公交", "车站", "交通", "停车", "机场", "高铁"],
|
||
"生活服务": ["生活服务", "维修", "营业厅", "服务"],
|
||
"科教文化": ["学校", "大学", "图书馆", "教育", "培训"],
|
||
}
|
||
|
||
PLACE_TYPE_ALIASES = {
|
||
"美食": "eat",
|
||
"景点": "sight",
|
||
"酒店": "hotel",
|
||
"商场": "mall",
|
||
"医疗保健": "medical",
|
||
"交通设施": "transit",
|
||
"生活服务": "life",
|
||
"科教文化": "education",
|
||
}
|
||
|
||
|
||
def _haversine_m(lng1: float, lat1: float, lng2: float, lat2: float) -> float:
|
||
radius = 6_371_008.8
|
||
d_lng = math.radians(lng2 - lng1)
|
||
d_lat = math.radians(lat2 - lat1)
|
||
part = (
|
||
math.sin(d_lat / 2) ** 2
|
||
+ math.cos(math.radians(lat1))
|
||
* math.cos(math.radians(lat2))
|
||
* math.sin(d_lng / 2) ** 2
|
||
)
|
||
return 2 * radius * math.asin(math.sqrt(part))
|
||
|
||
|
||
def _h3_plan(radius_m: int) -> tuple[int, str, int]:
|
||
if radius_m <= 500:
|
||
return 9, "h3_r9", 2
|
||
if radius_m <= 1000:
|
||
return 9, "h3_r9", 4
|
||
if radius_m <= 3000:
|
||
return 8, "h3_r8", 4
|
||
res = 7
|
||
edge_m = h3.average_hexagon_edge_length(res, unit="m")
|
||
return res, "h3_r7", max(2, math.ceil(radius_m / (math.sqrt(3) * edge_m)) + 1)
|
||
|
||
|
||
def _rule_intent(question: str, radius_m: int | None) -> dict[str, Any]:
|
||
q = question.strip()
|
||
radius = radius_m or 1000
|
||
km = re.search(r"(\d+(?:\.\d+)?)\s*(?:公里|千米|km)", q, flags=re.I)
|
||
meter = re.search(r"(\d+(?:\.\d+)?)\s*(?:米|m)", q, flags=re.I)
|
||
minutes = re.search(r"(\d+(?:\.\d+)?)\s*分钟", q)
|
||
if km:
|
||
radius = int(float(km.group(1)) * 1000)
|
||
elif meter:
|
||
radius = int(float(meter.group(1)))
|
||
elif minutes:
|
||
# 步行 15 分钟约 1.1~1.3km,先用保守半径召回,后续可接路线时长。
|
||
radius = min(3000, max(500, int(float(minutes.group(1)) * 80)))
|
||
category = ""
|
||
for cat, aliases in CATEGORY_ALIASES.items():
|
||
if any(a in q for a in aliases):
|
||
category = cat
|
||
break
|
||
keywords = [
|
||
w for w in re.split(r"[,,。??!!\s]+", q)
|
||
if w and not any(w in aliases for aliases in CATEGORY_ALIASES.values())
|
||
][:6]
|
||
return {
|
||
"radius_m": max(100, min(radius, 10000)),
|
||
"category": category,
|
||
"keywords": keywords,
|
||
"sort_preference": "综合距离、评分和语义匹配",
|
||
"user_need": q,
|
||
}
|
||
|
||
|
||
async def _deepseek_client(max_tokens: int = 900) -> LlmClient | None:
|
||
cfg = await get_agent_settings()
|
||
extract = cfg.get("extract") or {}
|
||
models = extract.get("models") or {}
|
||
deepseek_cfg = models.get("deepseek") or {}
|
||
if deepseek_cfg.get("base_url") and deepseek_cfg.get("api_key"):
|
||
return LlmClient(
|
||
deepseek_cfg["base_url"],
|
||
deepseek_cfg["api_key"],
|
||
deepseek_cfg.get("model") or "deepseek-chat",
|
||
timeout=int(extract.get("timeout") or 60),
|
||
max_tokens=max_tokens,
|
||
)
|
||
|
||
global_cfg = cfg.get("global") or {}
|
||
if global_cfg.get("base_url") and global_cfg.get("api_key"):
|
||
return LlmClient(
|
||
global_cfg["base_url"],
|
||
global_cfg["api_key"],
|
||
global_cfg.get("model") or "deepseek-chat",
|
||
timeout=int(global_cfg.get("timeout") or 45),
|
||
max_tokens=max_tokens,
|
||
)
|
||
return None
|
||
|
||
|
||
async def _llm_intent(question: str, fallback: dict[str, Any]) -> tuple[dict[str, Any], str]:
|
||
client = await _deepseek_client(max_tokens=700)
|
||
if not client:
|
||
return fallback, "DeepSeek 未配置,使用规则解析"
|
||
system = (
|
||
"你是城市知识图谱的游客问答意图解析器。只输出 JSON。"
|
||
"把用户问题解析为 nearby POI 查询意图,类别只能从:"
|
||
"美食、景点、酒店、商场、医疗保健、交通设施、生活服务、科教文化、空字符串 中选择。"
|
||
"radius_m 为整数米,没说半径默认 1000。keywords 提取用户真正关心的语义词。"
|
||
)
|
||
user = json.dumps({"question": question, "rule_fallback": fallback}, ensure_ascii=False)
|
||
try:
|
||
data = await asyncio.to_thread(client.chat_json, system, user)
|
||
merged = {**fallback, **{k: v for k, v in data.items() if v not in (None, "")}}
|
||
merged["radius_m"] = max(100, min(int(merged.get("radius_m") or fallback["radius_m"]), 10000))
|
||
if merged.get("category") not in CATEGORY_ALIASES:
|
||
merged["category"] = fallback.get("category", "")
|
||
if not isinstance(merged.get("keywords"), list):
|
||
merged["keywords"] = fallback.get("keywords", [])
|
||
return merged, "DeepSeek 意图解析"
|
||
except Exception as exc: # noqa: BLE001
|
||
return fallback, f"DeepSeek 意图解析失败,使用规则解析:{str(exc)[:120]}"
|
||
|
||
|
||
def _photo_urls(value: Any) -> list[str]:
|
||
if isinstance(value, list):
|
||
return [str(v) for v in value if v]
|
||
if isinstance(value, str):
|
||
try:
|
||
parsed = json.loads(value)
|
||
if isinstance(parsed, list):
|
||
return [str(v) for v in parsed if v]
|
||
except Exception:
|
||
pass
|
||
return [v.strip() for v in re.split(r"[|,]", value) if v.strip().startswith("http")]
|
||
return []
|
||
|
||
|
||
def _rating_num(value: Any) -> float:
|
||
try:
|
||
return float(value or 0)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
|
||
def _score_place(row: dict[str, Any], distance_m: float, radius_m: int, intent: dict[str, Any]) -> tuple[float, list[str]]:
|
||
score = max(0.0, 55.0 * (1 - distance_m / max(radius_m, 1)))
|
||
reasons = [f"距离约 {round(distance_m)} 米"]
|
||
rating = _rating_num(row.get("rating"))
|
||
if rating:
|
||
score += min(rating, 5) * 7
|
||
reasons.append(f"评分 {rating:g}")
|
||
category = intent.get("category") or ""
|
||
if category and row.get("type_label") == category:
|
||
score += 20
|
||
reasons.append(f"匹配类别「{category}」")
|
||
haystack = " ".join(
|
||
str(row.get(k) or "") for k in ("name", "address", "tags", "amap_type", "type_label")
|
||
)
|
||
for kw in intent.get("keywords") or []:
|
||
if kw and kw in haystack:
|
||
score += 12
|
||
reasons.append(f"命中关键词「{kw}」")
|
||
return round(score, 3), reasons[:4]
|
||
|
||
|
||
async def _llm_answer(question: str, intent: dict[str, Any], results: list[dict[str, Any]]) -> tuple[str, dict[str, str], str]:
|
||
if not results:
|
||
return "当前已采集的知识图谱中,没有在这个半径内找到匹配结果。可以扩大半径,或等高德网格续采完成后再试。", {}, "no_candidates"
|
||
client = await _deepseek_client(max_tokens=1200)
|
||
if not client:
|
||
first = results[0]
|
||
return (
|
||
f"根据当前知识图谱,优先推荐 {first['name']},距离约 {round(first['distance_m'])} 米。"
|
||
f"下面结果已按距离、类别匹配和评分综合排序。",
|
||
{r["place_id"]: "距离近、类别匹配、来自当前空间知识图谱" for r in results[:8]},
|
||
"fallback_answer",
|
||
)
|
||
compact = [
|
||
{
|
||
"id": r["place_id"],
|
||
"name": r["name"],
|
||
"type": r["type_label"],
|
||
"distance_m": r["distance_m"],
|
||
"rating": r.get("rating"),
|
||
"address": r.get("address"),
|
||
"tags": r.get("tags"),
|
||
"score": r.get("score"),
|
||
}
|
||
for r in results[:20]
|
||
]
|
||
system = (
|
||
"你是面向游客的城市知识图谱问答助手。只输出 JSON。"
|
||
"基于候选 POI 回答用户问题,不能编造候选中没有的地点。"
|
||
"输出 answer 和 reasons,reasons 是 {候选id: 推荐理由}。"
|
||
"回答要像真实产品结果页,简洁、可解释。"
|
||
)
|
||
user = json.dumps({"question": question, "intent": intent, "candidates": compact}, ensure_ascii=False)
|
||
try:
|
||
data = await asyncio.to_thread(client.chat_json, system, user)
|
||
answer = str(data.get("answer") or "").strip()
|
||
reasons = data.get("reasons") if isinstance(data.get("reasons"), dict) else {}
|
||
return answer or "已根据当前知识图谱完成附近结果排序。", {str(k): str(v) for k, v in reasons.items()}, "DeepSeek 回答排序"
|
||
except Exception as exc: # noqa: BLE001
|
||
first = results[0]
|
||
return (
|
||
f"根据当前知识图谱,优先推荐 {first['name']},距离约 {round(first['distance_m'])} 米。"
|
||
f"DeepSeek 回答组织暂时失败,页面仍展示规则排序结果。",
|
||
{},
|
||
f"DeepSeek 回答失败:{str(exc)[:120]}",
|
||
)
|
||
|
||
|
||
def _rule_answer(question: str, intent: dict[str, Any], results: list[dict[str, Any]]) -> tuple[str, dict[str, str], str]:
|
||
if not results:
|
||
return "当前已采集的知识图谱中,没有在这个半径内找到匹配结果。可以扩大半径,或等采集完成后再试。", {}, "rule_fast_answer"
|
||
category = intent.get("category") or "相关地点"
|
||
first = results[0]
|
||
answer = (
|
||
f"根据当前知识图谱,{category}共召回 {len(results)} 个候选;"
|
||
f"优先推荐 {first['name']},距离约 {round(first['distance_m'])} 米。"
|
||
"下方已按距离、类别匹配和评分综合排序。"
|
||
)
|
||
reasons = {
|
||
r["place_id"]: ";".join(r.get("rank_reasons") or ["距离、类别和评分综合靠前"])
|
||
for r in results[:12]
|
||
}
|
||
return answer, reasons, "rule_fast_answer"
|
||
|
||
|
||
@router.get("/plaza/overview")
|
||
async def overview(
|
||
context: ProjectContext = Depends(get_project_context),
|
||
_user: CurrentUser = None,
|
||
):
|
||
return await get_plaza_overview(context.tenant_id, context.project_id)
|
||
|
||
|
||
@router.get("/plaza/usage")
|
||
async def usage(
|
||
context: ProjectContext = Depends(get_project_context),
|
||
_user: CurrentUser = None,
|
||
):
|
||
"""Return usage statistics — top hot and cold entities."""
|
||
s = settings.db_schema
|
||
async with get_conn() as conn:
|
||
async with conn.cursor() as cur:
|
||
await cur.execute(
|
||
f"SELECT entity_type, COUNT(*) AS cnt FROM {s}.candidate_entities "
|
||
"WHERE tenant_id=%s AND project_id=%s AND status='published' "
|
||
"GROUP BY entity_type ORDER BY cnt DESC",
|
||
(context.tenant_id, context.project_id),
|
||
)
|
||
by_type = await cur.fetchall()
|
||
|
||
await cur.execute(
|
||
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities "
|
||
"WHERE tenant_id=%s AND project_id=%s AND status='pending_review'",
|
||
(context.tenant_id, context.project_id),
|
||
)
|
||
pending = (await cur.fetchone())["cnt"]
|
||
|
||
return {
|
||
"entities_by_type": by_type,
|
||
"pending_review": pending,
|
||
}
|
||
|
||
|
||
@router.get("/plaza/alerts")
|
||
async def alerts(
|
||
context: ProjectContext = Depends(get_project_context),
|
||
_user: CurrentUser = None,
|
||
):
|
||
return await get_plaza_alerts(context.tenant_id, context.project_id)
|
||
|
||
|
||
@router.get("/plaza/amap-config")
|
||
async def amap_config(_user: CurrentUser = None):
|
||
"""Return browser-side AMap JS API configuration for admin map canvases."""
|
||
return {
|
||
"configured": bool(settings.amap_js_key),
|
||
"js_key": settings.amap_js_key,
|
||
"security_jscode": settings.amap_security_jscode,
|
||
"security_configured": bool(settings.amap_security_jscode),
|
||
}
|
||
|
||
|
||
@router.post("/plaza/user-query")
|
||
async def user_query(body: dict, _user: CurrentUser = None):
|
||
"""User-facing KG nearby query: NL intent -> H3 recall -> distance/rank -> LLM answer."""
|
||
started_at = time.perf_counter()
|
||
question = str(body.get("question") or "").strip()
|
||
if not question:
|
||
raise HTTPException(400, "question required")
|
||
try:
|
||
lng = float(body.get("lng", 106.7135))
|
||
lat = float(body.get("lat", 26.5744))
|
||
except Exception as exc:
|
||
raise HTTPException(400, "lng/lat required") from exc
|
||
radius_input = body.get("radius_m")
|
||
radius_m = int(radius_input) if radius_input not in (None, "") else None
|
||
graph_name = str(body.get("graph_name") or SPATIAL_GRAPH_NAME)
|
||
|
||
use_llm = body.get("use_llm") is True
|
||
fallback = _rule_intent(question, radius_m)
|
||
if use_llm:
|
||
intent, intent_source = await _llm_intent(question, fallback)
|
||
else:
|
||
intent, intent_source = fallback, "规则意图快路径"
|
||
radius = int(intent["radius_m"])
|
||
res, h3_col, k = _h3_plan(radius)
|
||
cells = list(h3.grid_disk(h3.latlng_to_cell(lat, lng, res), k))
|
||
category = intent.get("category") or ""
|
||
place_type = PLACE_TYPE_ALIASES.get(category, "")
|
||
|
||
s = settings.db_schema
|
||
async with get_conn() as conn:
|
||
async with conn.cursor() as cur:
|
||
await cur.execute(
|
||
f"""SELECT gaode_poi_id, element_id, name, type_label, place_type, amap_type,
|
||
typecode, lng, lat, address, district, city, adcode, business_area,
|
||
tel, rating, cost, open_time, tags, photo_urls, {h3_col} AS h3_cell,
|
||
source, first_fetched_at, last_fetched_at
|
||
FROM {s}.amap_spatial_pois
|
||
WHERE graph_name=%s AND {h3_col}=ANY(%s)
|
||
AND (%s='' OR type_label=%s OR place_type=%s)
|
||
LIMIT 8000""",
|
||
(graph_name, cells, category, category, place_type),
|
||
)
|
||
rows = await cur.fetchall()
|
||
|
||
results: list[dict[str, Any]] = []
|
||
for row in rows:
|
||
d = _haversine_m(lng, lat, float(row["lng"]), float(row["lat"]))
|
||
if d > radius:
|
||
continue
|
||
score, reasons = _score_place(dict(row), d, radius, intent)
|
||
results.append({
|
||
"place_id": row["element_id"],
|
||
"gaode_poi_id": row["gaode_poi_id"],
|
||
"name": row["name"],
|
||
"type_label": row["type_label"],
|
||
"place_type": row["place_type"],
|
||
"amap_type": row["amap_type"],
|
||
"typecode": row["typecode"],
|
||
"lng": float(row["lng"]),
|
||
"lat": float(row["lat"]),
|
||
"address": row["address"] or "",
|
||
"district": row["district"] or "",
|
||
"city": row["city"] or "",
|
||
"adcode": row["adcode"] or "",
|
||
"business_area": row["business_area"] or "",
|
||
"tel": row["tel"] or "",
|
||
"rating": row["rating"] or "",
|
||
"cost": row["cost"] or "",
|
||
"open_time": row["open_time"] or "",
|
||
"tags": row["tags"] or "",
|
||
"photo_urls": _photo_urls(row["photo_urls"]),
|
||
"h3_cell": row["h3_cell"],
|
||
"source": row["source"],
|
||
"last_fetched_at": row["last_fetched_at"].isoformat() if row.get("last_fetched_at") else "",
|
||
"distance_m": round(d, 1),
|
||
"score": score,
|
||
"rank_reasons": reasons,
|
||
})
|
||
results.sort(key=lambda r: (-r["score"], r["distance_m"]))
|
||
results = results[:60]
|
||
if use_llm:
|
||
answer, llm_reasons, answer_source = await _llm_answer(question, intent, results)
|
||
else:
|
||
answer, llm_reasons, answer_source = _rule_answer(question, intent, results)
|
||
for r in results:
|
||
if llm_reasons.get(r["place_id"]):
|
||
r["llm_reason"] = llm_reasons[r["place_id"]]
|
||
|
||
return {
|
||
"question": question,
|
||
"graph_name": graph_name,
|
||
"user_location": {"lng": lng, "lat": lat},
|
||
"intent": intent,
|
||
"answer": answer,
|
||
"results": results,
|
||
"trace": {
|
||
"intent_source": intent_source,
|
||
"answer_source": answer_source,
|
||
"latency_ms": max(1, round((time.perf_counter() - started_at) * 1000)),
|
||
"performance_target_ms": 1200,
|
||
"use_llm": use_llm,
|
||
"h3_resolution": res,
|
||
"h3_column": h3_col,
|
||
"h3_k": k,
|
||
"h3_cells": len(cells),
|
||
"h3_candidates": len(rows),
|
||
"radius_filtered": len(results),
|
||
"note": "当前结果来自已采集 guiyang_spatial_v1 空间知识图谱;采集未完成的区域会影响召回。",
|
||
},
|
||
}
|