Files
bxh/app/api/plaza.py

426 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""STEP 02 — Knowledge Plaza overview, usage, alerts."""
from __future__ import annotations
import asyncio
import json
import math
import re
import time
from typing import Any
import h3
from fastapi import APIRouter, Depends, HTTPException
from app.auth import CurrentUser
from app.config import settings
from app.db import get_agent_settings, get_plaza_overview, get_plaza_alerts, get_conn
from app.llm_client import LlmClient
from app.project_context import ProjectContext, get_project_context
router = APIRouter()
SPATIAL_GRAPH_NAME = "guiyang_spatial_v1"
CATEGORY_ALIASES: dict[str, list[str]] = {
"美食": ["美食", "", "餐厅", "饭店", "火锅", "小吃", "烧烤", "咖啡", "奶茶", "酸汤鱼"],
"景点": ["景点", "景区", "公园", "博物馆", "古镇", "夜游", "历史", "文化", "好玩"],
"酒店": ["酒店", "住宿", "", "宾馆", "民宿"],
"商场": ["商场", "购物", "商圈", "超市", "商城"],
"医疗保健": ["医院", "诊所", "药店", "医疗", "看病", "急诊"],
"交通设施": ["地铁", "公交", "车站", "交通", "停车", "机场", "高铁"],
"生活服务": ["生活服务", "维修", "营业厅", "服务"],
"科教文化": ["学校", "大学", "图书馆", "教育", "培训"],
}
PLACE_TYPE_ALIASES = {
"美食": "eat",
"景点": "sight",
"酒店": "hotel",
"商场": "mall",
"医疗保健": "medical",
"交通设施": "transit",
"生活服务": "life",
"科教文化": "education",
}
def _haversine_m(lng1: float, lat1: float, lng2: float, lat2: float) -> float:
radius = 6_371_008.8
d_lng = math.radians(lng2 - lng1)
d_lat = math.radians(lat2 - lat1)
part = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(d_lng / 2) ** 2
)
return 2 * radius * math.asin(math.sqrt(part))
def _h3_plan(radius_m: int) -> tuple[int, str, int]:
if radius_m <= 500:
return 9, "h3_r9", 2
if radius_m <= 1000:
return 9, "h3_r9", 4
if radius_m <= 3000:
return 8, "h3_r8", 4
res = 7
edge_m = h3.average_hexagon_edge_length(res, unit="m")
return res, "h3_r7", max(2, math.ceil(radius_m / (math.sqrt(3) * edge_m)) + 1)
def _rule_intent(question: str, radius_m: int | None) -> dict[str, Any]:
q = question.strip()
radius = radius_m or 1000
km = re.search(r"(\d+(?:\.\d+)?)\s*(?:公里|千米|km)", q, flags=re.I)
meter = re.search(r"(\d+(?:\.\d+)?)\s*(?:米|m)", q, flags=re.I)
minutes = re.search(r"(\d+(?:\.\d+)?)\s*分钟", q)
if km:
radius = int(float(km.group(1)) * 1000)
elif meter:
radius = int(float(meter.group(1)))
elif minutes:
# 步行 15 分钟约 1.1~1.3km,先用保守半径召回,后续可接路线时长。
radius = min(3000, max(500, int(float(minutes.group(1)) * 80)))
category = ""
for cat, aliases in CATEGORY_ALIASES.items():
if any(a in q for a in aliases):
category = cat
break
keywords = [
w for w in re.split(r"[,。??!\s]+", q)
if w and not any(w in aliases for aliases in CATEGORY_ALIASES.values())
][:6]
return {
"radius_m": max(100, min(radius, 10000)),
"category": category,
"keywords": keywords,
"sort_preference": "综合距离、评分和语义匹配",
"user_need": q,
}
async def _deepseek_client(max_tokens: int = 900) -> LlmClient | None:
cfg = await get_agent_settings()
extract = cfg.get("extract") or {}
models = extract.get("models") or {}
deepseek_cfg = models.get("deepseek") or {}
if deepseek_cfg.get("base_url") and deepseek_cfg.get("api_key"):
return LlmClient(
deepseek_cfg["base_url"],
deepseek_cfg["api_key"],
deepseek_cfg.get("model") or "deepseek-chat",
timeout=int(extract.get("timeout") or 60),
max_tokens=max_tokens,
)
global_cfg = cfg.get("global") or {}
if global_cfg.get("base_url") and global_cfg.get("api_key"):
return LlmClient(
global_cfg["base_url"],
global_cfg["api_key"],
global_cfg.get("model") or "deepseek-chat",
timeout=int(global_cfg.get("timeout") or 45),
max_tokens=max_tokens,
)
return None
async def _llm_intent(question: str, fallback: dict[str, Any]) -> tuple[dict[str, Any], str]:
client = await _deepseek_client(max_tokens=700)
if not client:
return fallback, "DeepSeek 未配置,使用规则解析"
system = (
"你是城市知识图谱的游客问答意图解析器。只输出 JSON。"
"把用户问题解析为 nearby POI 查询意图,类别只能从:"
"美食、景点、酒店、商场、医疗保健、交通设施、生活服务、科教文化、空字符串 中选择。"
"radius_m 为整数米,没说半径默认 1000。keywords 提取用户真正关心的语义词。"
)
user = json.dumps({"question": question, "rule_fallback": fallback}, ensure_ascii=False)
try:
data = await asyncio.to_thread(client.chat_json, system, user)
merged = {**fallback, **{k: v for k, v in data.items() if v not in (None, "")}}
merged["radius_m"] = max(100, min(int(merged.get("radius_m") or fallback["radius_m"]), 10000))
if merged.get("category") not in CATEGORY_ALIASES:
merged["category"] = fallback.get("category", "")
if not isinstance(merged.get("keywords"), list):
merged["keywords"] = fallback.get("keywords", [])
return merged, "DeepSeek 意图解析"
except Exception as exc: # noqa: BLE001
return fallback, f"DeepSeek 意图解析失败,使用规则解析:{str(exc)[:120]}"
def _photo_urls(value: Any) -> list[str]:
if isinstance(value, list):
return [str(v) for v in value if v]
if isinstance(value, str):
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return [str(v) for v in parsed if v]
except Exception:
pass
return [v.strip() for v in re.split(r"[|,]", value) if v.strip().startswith("http")]
return []
def _rating_num(value: Any) -> float:
try:
return float(value or 0)
except Exception:
return 0.0
def _score_place(row: dict[str, Any], distance_m: float, radius_m: int, intent: dict[str, Any]) -> tuple[float, list[str]]:
score = max(0.0, 55.0 * (1 - distance_m / max(radius_m, 1)))
reasons = [f"距离约 {round(distance_m)}"]
rating = _rating_num(row.get("rating"))
if rating:
score += min(rating, 5) * 7
reasons.append(f"评分 {rating:g}")
category = intent.get("category") or ""
if category and row.get("type_label") == category:
score += 20
reasons.append(f"匹配类别「{category}")
haystack = " ".join(
str(row.get(k) or "") for k in ("name", "address", "tags", "amap_type", "type_label")
)
for kw in intent.get("keywords") or []:
if kw and kw in haystack:
score += 12
reasons.append(f"命中关键词「{kw}")
return round(score, 3), reasons[:4]
async def _llm_answer(question: str, intent: dict[str, Any], results: list[dict[str, Any]]) -> tuple[str, dict[str, str], str]:
if not results:
return "当前已采集的知识图谱中,没有在这个半径内找到匹配结果。可以扩大半径,或等高德网格续采完成后再试。", {}, "no_candidates"
client = await _deepseek_client(max_tokens=1200)
if not client:
first = results[0]
return (
f"根据当前知识图谱,优先推荐 {first['name']},距离约 {round(first['distance_m'])} 米。"
f"下面结果已按距离、类别匹配和评分综合排序。",
{r["place_id"]: "距离近、类别匹配、来自当前空间知识图谱" for r in results[:8]},
"fallback_answer",
)
compact = [
{
"id": r["place_id"],
"name": r["name"],
"type": r["type_label"],
"distance_m": r["distance_m"],
"rating": r.get("rating"),
"address": r.get("address"),
"tags": r.get("tags"),
"score": r.get("score"),
}
for r in results[:20]
]
system = (
"你是面向游客的城市知识图谱问答助手。只输出 JSON。"
"基于候选 POI 回答用户问题,不能编造候选中没有的地点。"
"输出 answer 和 reasonsreasons 是 {候选id: 推荐理由}。"
"回答要像真实产品结果页,简洁、可解释。"
)
user = json.dumps({"question": question, "intent": intent, "candidates": compact}, ensure_ascii=False)
try:
data = await asyncio.to_thread(client.chat_json, system, user)
answer = str(data.get("answer") or "").strip()
reasons = data.get("reasons") if isinstance(data.get("reasons"), dict) else {}
return answer or "已根据当前知识图谱完成附近结果排序。", {str(k): str(v) for k, v in reasons.items()}, "DeepSeek 回答排序"
except Exception as exc: # noqa: BLE001
first = results[0]
return (
f"根据当前知识图谱,优先推荐 {first['name']},距离约 {round(first['distance_m'])} 米。"
f"DeepSeek 回答组织暂时失败,页面仍展示规则排序结果。",
{},
f"DeepSeek 回答失败:{str(exc)[:120]}",
)
def _rule_answer(question: str, intent: dict[str, Any], results: list[dict[str, Any]]) -> tuple[str, dict[str, str], str]:
if not results:
return "当前已采集的知识图谱中,没有在这个半径内找到匹配结果。可以扩大半径,或等采集完成后再试。", {}, "rule_fast_answer"
category = intent.get("category") or "相关地点"
first = results[0]
answer = (
f"根据当前知识图谱,{category}共召回 {len(results)} 个候选;"
f"优先推荐 {first['name']},距离约 {round(first['distance_m'])} 米。"
"下方已按距离、类别匹配和评分综合排序。"
)
reasons = {
r["place_id"]: "".join(r.get("rank_reasons") or ["距离、类别和评分综合靠前"])
for r in results[:12]
}
return answer, reasons, "rule_fast_answer"
@router.get("/plaza/overview")
async def overview(
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
return await get_plaza_overview(context.tenant_id, context.project_id)
@router.get("/plaza/usage")
async def usage(
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
"""Return usage statistics — top hot and cold entities."""
s = settings.db_schema
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"SELECT entity_type, COUNT(*) AS cnt FROM {s}.candidate_entities "
"WHERE tenant_id=%s AND project_id=%s AND status='published' "
"GROUP BY entity_type ORDER BY cnt DESC",
(context.tenant_id, context.project_id),
)
by_type = await cur.fetchall()
await cur.execute(
f"SELECT COUNT(*) AS cnt FROM {s}.candidate_entities "
"WHERE tenant_id=%s AND project_id=%s AND status='pending_review'",
(context.tenant_id, context.project_id),
)
pending = (await cur.fetchone())["cnt"]
return {
"entities_by_type": by_type,
"pending_review": pending,
}
@router.get("/plaza/alerts")
async def alerts(
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
return await get_plaza_alerts(context.tenant_id, context.project_id)
@router.get("/plaza/amap-config")
async def amap_config(_user: CurrentUser = None):
"""Return browser-side AMap JS API configuration for admin map canvases."""
return {
"configured": bool(settings.amap_js_key),
"js_key": settings.amap_js_key,
"security_jscode": settings.amap_security_jscode,
"security_configured": bool(settings.amap_security_jscode),
}
@router.post("/plaza/user-query")
async def user_query(body: dict, _user: CurrentUser = None):
"""User-facing KG nearby query: NL intent -> H3 recall -> distance/rank -> LLM answer."""
started_at = time.perf_counter()
question = str(body.get("question") or "").strip()
if not question:
raise HTTPException(400, "question required")
try:
lng = float(body.get("lng", 106.7135))
lat = float(body.get("lat", 26.5744))
except Exception as exc:
raise HTTPException(400, "lng/lat required") from exc
radius_input = body.get("radius_m")
radius_m = int(radius_input) if radius_input not in (None, "") else None
graph_name = str(body.get("graph_name") or SPATIAL_GRAPH_NAME)
use_llm = body.get("use_llm") is True
fallback = _rule_intent(question, radius_m)
if use_llm:
intent, intent_source = await _llm_intent(question, fallback)
else:
intent, intent_source = fallback, "规则意图快路径"
radius = int(intent["radius_m"])
res, h3_col, k = _h3_plan(radius)
cells = list(h3.grid_disk(h3.latlng_to_cell(lat, lng, res), k))
category = intent.get("category") or ""
place_type = PLACE_TYPE_ALIASES.get(category, "")
s = settings.db_schema
async with get_conn() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"""SELECT gaode_poi_id, element_id, name, type_label, place_type, amap_type,
typecode, lng, lat, address, district, city, adcode, business_area,
tel, rating, cost, open_time, tags, photo_urls, {h3_col} AS h3_cell,
source, first_fetched_at, last_fetched_at
FROM {s}.amap_spatial_pois
WHERE graph_name=%s AND {h3_col}=ANY(%s)
AND (%s='' OR type_label=%s OR place_type=%s)
LIMIT 8000""",
(graph_name, cells, category, category, place_type),
)
rows = await cur.fetchall()
results: list[dict[str, Any]] = []
for row in rows:
d = _haversine_m(lng, lat, float(row["lng"]), float(row["lat"]))
if d > radius:
continue
score, reasons = _score_place(dict(row), d, radius, intent)
results.append({
"place_id": row["element_id"],
"gaode_poi_id": row["gaode_poi_id"],
"name": row["name"],
"type_label": row["type_label"],
"place_type": row["place_type"],
"amap_type": row["amap_type"],
"typecode": row["typecode"],
"lng": float(row["lng"]),
"lat": float(row["lat"]),
"address": row["address"] or "",
"district": row["district"] or "",
"city": row["city"] or "",
"adcode": row["adcode"] or "",
"business_area": row["business_area"] or "",
"tel": row["tel"] or "",
"rating": row["rating"] or "",
"cost": row["cost"] or "",
"open_time": row["open_time"] or "",
"tags": row["tags"] or "",
"photo_urls": _photo_urls(row["photo_urls"]),
"h3_cell": row["h3_cell"],
"source": row["source"],
"last_fetched_at": row["last_fetched_at"].isoformat() if row.get("last_fetched_at") else "",
"distance_m": round(d, 1),
"score": score,
"rank_reasons": reasons,
})
results.sort(key=lambda r: (-r["score"], r["distance_m"]))
results = results[:60]
if use_llm:
answer, llm_reasons, answer_source = await _llm_answer(question, intent, results)
else:
answer, llm_reasons, answer_source = _rule_answer(question, intent, results)
for r in results:
if llm_reasons.get(r["place_id"]):
r["llm_reason"] = llm_reasons[r["place_id"]]
return {
"question": question,
"graph_name": graph_name,
"user_location": {"lng": lng, "lat": lat},
"intent": intent,
"answer": answer,
"results": results,
"trace": {
"intent_source": intent_source,
"answer_source": answer_source,
"latency_ms": max(1, round((time.perf_counter() - started_at) * 1000)),
"performance_target_ms": 1200,
"use_llm": use_llm,
"h3_resolution": res,
"h3_column": h3_col,
"h3_k": k,
"h3_cells": len(cells),
"h3_candidates": len(rows),
"radius_filtered": len(results),
"note": "当前结果来自已采集 guiyang_spatial_v1 空间知识图谱;采集未完成的区域会影响召回。",
},
}