1041 lines
46 KiB
Python
1041 lines
46 KiB
Python
"""Graph Browser API — FalkorDB Cypher queries + natural-language search."""
|
||
import re
|
||
import json
|
||
import time
|
||
from typing import Any
|
||
|
||
from falkordb import FalkorDB
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
|
||
from app.auth import CurrentUser
|
||
from app.config import settings
|
||
from app.db import get_agent_settings
|
||
from app.llm_client import LlmClient
|
||
from app.project_context import ProjectContext, get_project_context
|
||
|
||
router = APIRouter()
|
||
GRAPH_QUERY_MAX_LIMIT = 15000
|
||
|
||
WRITE_KEYWORDS = re.compile(
|
||
r"\b(CREATE|MERGE|SET|DELETE|DETACH|DROP|REMOVE|CALL\s+DB\.IDX|CALL\s+DB\.CONSTRAINT)\b",
|
||
re.IGNORECASE,
|
||
)
|
||
READ_START = re.compile(r"^(MATCH|RETURN|CALL)\b", re.IGNORECASE)
|
||
|
||
TEXT_PROPS = (
|
||
# Keep fallback search on fields that are consistently scalar strings.
|
||
# Some graph properties are arrays/lists; RedisGraph CONTAINS on them raises
|
||
# a type error, so broad "search every property" is not safe here.
|
||
"name", "title", "label", "display_name", "description",
|
||
"summary", "history", "features", "address", "district",
|
||
"category", "place_type", "event_type", "concept_type",
|
||
"product_family", "product_type", "group_mode", "vehicle_layout",
|
||
"hotel_grade", "meal_standard", "service_promise", "included_summary",
|
||
"excluded_summary", "booking_notes", "risk_notes", "route_summary",
|
||
"quote_summary", "answer_hint", "demand_summary", "special_care",
|
||
"lead_source", "vehicle_type", "room_type", "refund_policy",
|
||
"rule_text", "message_template", "trigger_scenario", "origin_text",
|
||
"destination_text", "meal_scene", "signature_dishes",
|
||
"applicable_products", "region",
|
||
"fee_name", "fee_type", "price_text", "scenic_target_name",
|
||
"parent_scenic_area_name", "route_role", "recommendation_note",
|
||
"admin_region_name", "amap_name",
|
||
"item_id", "type", "subtype", "binding_type", "status", "default_text",
|
||
"price_formula", "station_like_name", "route_line_name", "route_sequence_label",
|
||
"base_price_status", "base_price_text", "price_options", "mandatory_fee_text",
|
||
"ticket_refund_policy", "quote_formula", "pricing_notes", "pricing_source_files",
|
||
)
|
||
|
||
PLACE_TYPE_LABELS = {
|
||
"sight": "景点",
|
||
"eat": "美食",
|
||
"hotel": "酒店",
|
||
"mall": "商场",
|
||
"transit_stop": "交通站点",
|
||
}
|
||
|
||
STATION_TYPE_LABELS = {
|
||
"公交站": "公交站",
|
||
"地铁站": "地铁站",
|
||
}
|
||
|
||
NL_TO_CYPHER_SYS = """你是城市知识图谱查询助手。请把用户自然语言问题转换成 FalkorDB/RedisGraph 只读 Cypher。
|
||
|
||
硬性要求:
|
||
1. 只能生成只读查询,必须以 MATCH/RETURN/CALL 开头。
|
||
2. 禁止 CREATE、MERGE、SET、DELETE、DETACH、DROP、REMOVE、索引/约束操作。
|
||
3. 查询必须 RETURN 节点、关系或路径,方便前端图谱可视化。
|
||
4. 默认 LIMIT 100,除非用户明确要更少。
|
||
5. 优先用 CONTAINS 做中文名称匹配,避免要求用户输入精确 ID。
|
||
6. 如果用户问某地点的事件,优先查 HAS_EVENT;问主题/适合/夜游/历史文化,优先查 HAS_CONCEPT;问组成/景点/设施,优先查 HAS_PART。
|
||
7. 同名地点优先选择 place_type='sight'、source='amap' 或带 element_id 的高德主 POI,避免选择 shadow_node=1 的抽取临时节点。
|
||
|
||
常见例子:
|
||
- “花溪公园有哪些历史事件”
|
||
MATCH (p:Place)-[r:HAS_EVENT]->(e:Event) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' RETURN p,r,e LIMIT 100
|
||
- “花溪公园有哪些概念”
|
||
MATCH (p:Place)-[r:HAS_CONCEPT]->(c:Concept) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' RETURN p,r,c LIMIT 100
|
||
- “花溪公园包含哪些景点”
|
||
MATCH (p:Place)-[r:HAS_PART]->(m) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' RETURN p,r,m LIMIT 100
|
||
- “花溪公园附近有哪些公交地铁可以到”
|
||
MATCH (p:Place)-[r:NEAR_TRANSIT]->(s:Place) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(s) RETURN p,r,s,b,r2 LIMIT 100
|
||
- “查看花溪公园”
|
||
MATCH (p:Place {element_id:'amap:B035300A51'})-[r]->(m) OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(m) RETURN p,r,m,b,r2 LIMIT 100
|
||
|
||
只输出 JSON:
|
||
{"cypher":"...","reason":"一句话说明"}"""
|
||
|
||
|
||
def _get_graph(graph_name: str | None = None):
|
||
db = FalkorDB(host=settings.falkordb_host, port=settings.falkordb_port)
|
||
return db.select_graph(graph_name or settings.falkordb_graph)
|
||
|
||
|
||
def _is_travel_item_graph_name(graph_name: str | None) -> bool:
|
||
name = (graph_name or "").lower()
|
||
return (
|
||
name == "travel_fixed_route_item"
|
||
or "fixed_route" in name
|
||
or "baixinghui" in name
|
||
or "travel_agency_2_0" in name
|
||
)
|
||
|
||
|
||
def _assert_read_only(cypher: str) -> None:
|
||
if not cypher:
|
||
raise HTTPException(400, "Cypher query required")
|
||
if WRITE_KEYWORDS.search(cypher):
|
||
raise HTTPException(400, "Read-only queries only (no CREATE/MERGE/DELETE/DROP)")
|
||
if not READ_START.match(cypher):
|
||
raise HTTPException(400, "Only MATCH, RETURN, or CALL queries allowed")
|
||
|
||
|
||
def _ensure_limit(cypher: str, limit: int) -> str:
|
||
if re.search(r"\bLIMIT\s+\d+\b", cypher, flags=re.IGNORECASE):
|
||
return cypher
|
||
return f"{cypher} LIMIT {limit}"
|
||
|
||
|
||
def _execute_graph_query(
|
||
cypher: str,
|
||
limit: int,
|
||
params: dict[str, Any] | None = None,
|
||
*,
|
||
graph_name: str | None = None,
|
||
) -> dict:
|
||
_assert_read_only(cypher)
|
||
g = _get_graph(graph_name)
|
||
result = g.query(cypher, params) if params else g.query(cypher)
|
||
rows = list(result.result_set)[:limit]
|
||
header = [h[1] for h in (result.header or [])]
|
||
nodes, edges = _extract(rows)
|
||
return {
|
||
"graph_name": graph_name or settings.falkordb_graph,
|
||
"cypher": cypher,
|
||
"columns": header,
|
||
"rows": rows[:20],
|
||
"row_count": len(rows),
|
||
"nodes": nodes,
|
||
"edges": edges,
|
||
}
|
||
|
||
|
||
def _graph_schema_hint(graph_name: str | None = None) -> dict:
|
||
try:
|
||
g = _get_graph(graph_name)
|
||
labels = g.query(
|
||
"MATCH (n) RETURN labels(n) AS labels, count(n) AS count LIMIT 80"
|
||
).result_set
|
||
rels = g.query(
|
||
"MATCH ()-[r]->() RETURN type(r) AS type, count(r) AS count LIMIT 80"
|
||
).result_set
|
||
return {
|
||
"labels": [{"label": r[0][0] if r[0] else "", "count": r[1]} for r in labels],
|
||
"relations": [{"type": r[0], "count": r[1]} for r in rels],
|
||
}
|
||
except Exception:
|
||
return {"labels": [], "relations": []}
|
||
|
||
|
||
def _query_terms(text: str) -> list[str]:
|
||
raw = text.strip()
|
||
stop_words = (
|
||
"查询", "查看", "看看", "请问", "帮我", "给我", "一下",
|
||
"有哪些", "有什么", "哪些", "什么", "相关", "关系", "节点",
|
||
"图谱", "数据", "里面", "历史事件", "历史", "事件", "概念",
|
||
"组成", "包含", "景点", "设施", "附近", "推荐",
|
||
)
|
||
terms: list[str] = []
|
||
for chunk in re.findall(r"[\u4e00-\u9fffA-Za-z0-9_:-]{2,}", raw):
|
||
reduced = chunk
|
||
for word in stop_words:
|
||
reduced = reduced.replace(word, " ")
|
||
for part in re.split(r"\s+", reduced):
|
||
part = part.strip()
|
||
if len(part) >= 2:
|
||
terms.append(part)
|
||
if "花溪公园" in raw:
|
||
terms.insert(0, "花溪公园")
|
||
if not terms and raw:
|
||
terms.append(raw[:24])
|
||
seen: set[str] = set()
|
||
out: list[str] = []
|
||
for term in terms:
|
||
if term not in seen:
|
||
seen.add(term)
|
||
out.append(term)
|
||
return out[:5]
|
||
|
||
|
||
def _fallback_cypher(question: str, limit: int) -> tuple[str, dict[str, Any]]:
|
||
terms = _query_terms(question)
|
||
if not terms:
|
||
return "MATCH (n)-[r]->(m) RETURN n,r,m LIMIT 100", {}
|
||
clauses: list[str] = []
|
||
params: dict[str, Any] = {}
|
||
for i, term in enumerate(terms):
|
||
key = f"t{i}"
|
||
params[key] = term
|
||
clauses.extend([f"n.{prop} CONTAINS ${key}" for prop in TEXT_PROPS])
|
||
where = " OR ".join(clauses)
|
||
cypher = (
|
||
f"MATCH (n) WHERE {where} "
|
||
"OPTIONAL MATCH (n)-[r]-(m) "
|
||
f"RETURN n,r,m LIMIT {limit}"
|
||
)
|
||
return cypher, params
|
||
|
||
|
||
def _travel_rule_based_cypher(question: str, limit: int) -> tuple[str, dict[str, Any], str, str] | None:
|
||
q = question.strip()
|
||
if not any(w in q for w in ("旅行", "旅游", "线路", "行程", "推荐", "接送", "酒店", "住宿", "餐厅", "吃", "报价", "退费", "老人", "儿童", "孕妇", "话术", "客服", "黄果树", "小七孔", "西江", "梵净山")):
|
||
return None
|
||
|
||
known_terms = [
|
||
"黄果树", "小七孔", "西江", "梵净山", "青岩", "镇远", "百里杜鹃", "平坝樱花",
|
||
"兴义", "花江大桥", "机场", "北站", "火车站", "东站", "观山湖", "市区",
|
||
"白云", "贵阳", "7座", "5座", "商务", "保姆车", "2+1", "四钻", "4钻",
|
||
"四星", "五钻", "5钻", "五星", "接机", "晚班", "老人", "孕妇", "儿童",
|
||
"行李", "退费", "自费", "小交通", "纯玩", "费用包含", "留资", "微信", "小红书",
|
||
]
|
||
terms = [term for term in known_terms if term in q]
|
||
if "4钻" in q or "四钻" in q:
|
||
terms.extend(["四星", "4钻", "四钻"])
|
||
if "5钻" in q or "五钻" in q:
|
||
terms.extend(["五星", "5钻", "五钻"])
|
||
terms.extend([t for t in _query_terms(q) if t not in terms])
|
||
seen_terms: set[str] = set()
|
||
terms = [t for t in terms if not (t in seen_terms or seen_terms.add(t))]
|
||
scenic_terms = [t for t in ("黄果树", "小七孔", "西江", "梵净山", "青岩", "镇远", "百里杜鹃", "平坝樱花", "兴义", "花江大桥") if t in q]
|
||
if scenic_terms:
|
||
terms = scenic_terms + [t for t in terms if t not in scenic_terms]
|
||
params: dict[str, Any] = {f"t{i}": term for i, term in enumerate(terms[:8])}
|
||
|
||
def contains_clause(alias: str, props: tuple[str, ...]) -> str:
|
||
clauses: list[str] = []
|
||
for i in range(len(params)):
|
||
for prop in props:
|
||
clauses.append(f"{alias}.{prop} CONTAINS $t{i}")
|
||
return " OR ".join(clauses) or "true"
|
||
|
||
if any(w in q for w in ("门票", "小交通", "电瓶车", "环保车", "观光车", "保险", "扶梯", "索道", "游船", "自费", "二消", "二次", "儿童价", "成人价", "免费", "多少钱", "价格")):
|
||
where = " OR ".join([
|
||
contains_clause("x", ("name", "admin_region_name")),
|
||
contains_clause("f", ("name", "fee_name", "fee_type", "price_text", "inclusion_status", "scenic_target_name", "source_product_name", "rule_text")),
|
||
])
|
||
return (
|
||
"MATCH (x)-[r]->(f:TicketFee) "
|
||
"WHERE (type(r) = 'SCENIC_AREA_HAS_FEE' OR type(r) = 'ATTRACTION_HAS_FEE' OR type(r) = 'PRODUCT_HAS_FEE') "
|
||
f"AND ({where}) "
|
||
f"RETURN x,r,f LIMIT {limit}",
|
||
params,
|
||
"travel_structured_fee",
|
||
"按景区/景点下的门票、小交通、保险和二次消费项目查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("退费", "老人", "儿童", "孕妇", "限制", "不接待", "行李", "年龄", "自费", "小交通")):
|
||
where = contains_clause("rule", ("name", "rule_type", "applies_to", "rule_text", "severity"))
|
||
return (
|
||
f"MATCH (rule:PolicyRule) WHERE {where} "
|
||
"OPTIONAL MATCH (p)-[r:HAS_POLICY]->(rule) "
|
||
f"RETURN p,r,rule LIMIT {limit}",
|
||
params,
|
||
"travel_policy_rule",
|
||
"按退费、年龄、儿童、孕妇、行李和自费规则查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("话术", "客服", "回复", "加微信", "留资", "怎么说", "怎么回")):
|
||
where = contains_clause("s", ("name", "channel", "funnel_stage", "trigger_scenario", "message_template", "intent_tags"))
|
||
return (
|
||
f"MATCH (s:SalesScript) WHERE {where} "
|
||
"OPTIONAL MATCH (s)-[r:FROM_SOURCE]->(c:SalesChannel) "
|
||
f"RETURN s,r,c LIMIT {limit}",
|
||
params,
|
||
"travel_sales_script",
|
||
"按渠道、转化阶段和触发场景查询客服话术。",
|
||
)
|
||
|
||
if any(w in q for w in ("附近", "车程", "距离", "多久", "酒店", "住宿", "餐厅", "吃", "资源")):
|
||
generic_terms = {
|
||
"附近", "车程", "距离", "多久", "酒店", "住宿", "餐厅", "餐饮", "吃", "资源",
|
||
"哪些", "有哪", "有什么", "多少", "多远", "多久",
|
||
}
|
||
search_terms = [
|
||
term for term in terms
|
||
if term not in generic_terms and not any(g in term for g in generic_terms)
|
||
]
|
||
local_params = {f"t{i}": term for i, term in enumerate((search_terms or terms)[:6])}
|
||
|
||
def local_contains(alias: str, props: tuple[str, ...]) -> str:
|
||
clauses: list[str] = []
|
||
for i in range(len(local_params)):
|
||
for prop in props:
|
||
clauses.append(f"{alias}.{prop} CONTAINS $t{i}")
|
||
return " OR ".join(clauses) or "true"
|
||
|
||
where = " OR ".join([
|
||
local_contains("a", ("name", "amap_name", "admin_region_name")),
|
||
local_contains("child", ("name", "amap_name", "admin_region_name", "parent_scenic_area_name")),
|
||
local_contains("x", ("name", "amap_name", "admin_region_name", "city_or_area", "features", "signature_dishes")),
|
||
])
|
||
type_filters: list[str] = []
|
||
if any(w in q for w in ("酒店", "住宿", "住")):
|
||
type_filters.append("x:HotelResource")
|
||
if any(w in q for w in ("餐厅", "餐饮", "吃", "美食")):
|
||
type_filters.append("x:RestaurantResource")
|
||
type_where = f" AND ({' OR '.join(type_filters)})" if type_filters else ""
|
||
return (
|
||
"MATCH (a)-[r:NEARBY_LOCATION_RESOURCE]->(x) "
|
||
"OPTIONAL MATCH (a)-[ar:SCENIC_AREA_HAS_ATTRACTION]->(child:ScenicAttraction) "
|
||
f"WHERE ({where}){type_where} "
|
||
"RETURN a,r,x,ar,child ORDER BY r.drive_duration_min "
|
||
f"LIMIT {limit}",
|
||
local_params,
|
||
"travel_nearby_resource",
|
||
"按景区片区/子景点归属查询合作酒店餐饮,并返回高德驾车距离与车程时间。",
|
||
)
|
||
|
||
if any(w in q for w in ("推荐", "线路", "行程", "方案", "几天", "天", "人", "保姆车", "纯玩", "报价")):
|
||
where = " OR ".join([
|
||
contains_clause("p", ("name", "product_family", "route_stop_sequence", "route_day_summary", "default_vehicle_type", "default_hotel_grade", "service_promise", "included_summary", "excluded_summary")),
|
||
contains_clause("d", ("name", "title", "route_path", "route_stop_sequence", "meal_text", "accommodation_text")),
|
||
contains_clause("s", ("name", "city_or_area", "route_line_name", "route_sequence_label", "evidence_text")),
|
||
contains_clause("a", ("name", "parent_scenic_area_name", "route_role")),
|
||
contains_clause("sa", ("name", "management_note")),
|
||
contains_clause("region", ("name", "city", "county")),
|
||
])
|
||
return (
|
||
"MATCH (p:TourProduct)-[lineStop:PRODUCT_HAS_ORDERED_STOP]->(s:RouteStop) "
|
||
"OPTIONAL MATCH (s)-[next:ROUTE_STOP_NEXT]->(s2:RouteStop) "
|
||
"OPTIONAL MATCH (p)-[r1:HAS_DAY]->(d:ProductDay)-[r2:DAY_HAS_STOP]->(s) "
|
||
"OPTIONAL MATCH (s)-[r3:STOP_VISITS_SCENIC_AREA]->(sa:ScenicArea) "
|
||
"OPTIONAL MATCH (s)-[r4:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
|
||
"OPTIONAL MATCH (s)-[sr:STOP_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
|
||
f"WHERE {where} "
|
||
f"RETURN p,lineStop,s,next,s2,r1,d,r2,r3,sa,r4,a,sr,region LIMIT {limit}",
|
||
params,
|
||
"travel_existing_product_route",
|
||
"按已有路线产品的全程有序停靠点、每日行程、景区片区和行政区查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("接送", "机场", "北站", "火车站", "东站", "观山湖", "市区")):
|
||
where = contains_clause("q", ("name", "origin_text", "destination_text", "vehicle_type", "quote_notes"))
|
||
return (
|
||
"MATCH (q:TransferQuote)-[r:USES_VEHICLE]->(v:VehicleService) "
|
||
f"WHERE {where} RETURN q,r,v LIMIT {limit}",
|
||
params,
|
||
"travel_transfer_quote",
|
||
"按接送报价、出发地/目的地和车型查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("酒店", "住宿", "住", "四钻", "4钻", "五钻", "5钻", "晚班")):
|
||
where = contains_clause("h", ("name", "hotel_grade", "region", "address", "features", "applicable_products"))
|
||
return (
|
||
f"MATCH (h:HotelResource) WHERE {where} "
|
||
"OPTIONAL MATCH (h)-[r:LOCATED_IN_REGION]->(a:AdministrativeRegion) "
|
||
f"RETURN h,r,a LIMIT {limit}",
|
||
params,
|
||
"travel_hotel_resource",
|
||
"按酒店等级、区域、特点和适用产品查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("餐厅", "吃", "特色餐", "酸汤鱼", "餐饮", "用餐")):
|
||
where = contains_clause("m", ("name", "region", "address", "signature_dishes", "meal_scene", "per_capita_price_text"))
|
||
return (
|
||
f"MATCH (m:RestaurantResource) WHERE {where} "
|
||
"OPTIONAL MATCH (m)-[r:LOCATED_IN_REGION]->(a:AdministrativeRegion) "
|
||
f"RETURN m,r,a LIMIT {limit}",
|
||
params,
|
||
"travel_restaurant_resource",
|
||
"按餐厅区域、菜品和适用场景查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("退费", "老人", "儿童", "孕妇", "限制", "不接待", "行李", "年龄", "自费", "小交通")):
|
||
where = contains_clause("rule", ("name", "rule_type", "applies_to", "rule_text", "severity"))
|
||
return (
|
||
f"MATCH (rule:PolicyRule) WHERE {where} "
|
||
"OPTIONAL MATCH (p)-[r:HAS_POLICY]->(rule) "
|
||
f"RETURN p,r,rule LIMIT {limit}",
|
||
params,
|
||
"travel_policy_rule",
|
||
"按退费、年龄、儿童、孕妇、行李和自费规则查询。",
|
||
)
|
||
|
||
if any(w in q for w in ("话术", "客服", "回复", "加微信", "留资", "怎么说", "怎么回")):
|
||
where = contains_clause("s", ("name", "channel", "funnel_stage", "trigger_scenario", "message_template", "intent_tags"))
|
||
return (
|
||
f"MATCH (s:SalesScript) WHERE {where} "
|
||
"OPTIONAL MATCH (s)-[r:FROM_SOURCE]->(c:SalesChannel) "
|
||
f"RETURN s,r,c LIMIT {limit}",
|
||
params,
|
||
"travel_sales_script",
|
||
"按渠道、转化阶段和触发场景查询客服话术。",
|
||
)
|
||
|
||
where = contains_clause("p", ("name", "product_family", "group_mode", "vehicle_layout", "hotel_grade", "service_promise", "route_summary"))
|
||
return (
|
||
f"MATCH (p:TourProduct) WHERE {where} "
|
||
"OPTIONAL MATCH (p)-[r]->(m) "
|
||
f"RETURN p,r,m LIMIT {limit}",
|
||
params,
|
||
"travel_product_lookup",
|
||
"按线路产品名称、景点、车型和服务承诺查询。",
|
||
)
|
||
|
||
|
||
def _travel_item_rule_based_cypher(question: str, limit: int) -> tuple[str, dict[str, Any], str, str] | None:
|
||
q = question.strip()
|
||
if not any(w in q for w in ("旅行", "旅游", "线路", "路线", "行程", "推荐", "酒店", "住宿", "餐厅", "吃", "费用", "价格", "车辆", "车型", "用车", "门票", "儿童", "老人", "黄小西", "小西", "镇梵", "黄果树", "小七孔", "西江", "梵净山", "天星桥", "青岩")):
|
||
return None
|
||
|
||
known_terms = [
|
||
"黄小西", "小西", "镇梵", "黄万马", "黄果树", "天星桥", "陡坡塘", "小七孔", "荔波", "西江", "梵净山", "青岩",
|
||
"镇远", "百里杜鹃", "平坝樱花", "兴义", "贵阳", "安顺", "酒店", "餐厅",
|
||
"车辆", "门票", "电瓶车", "环保车", "保险", "扶梯", "游船", "儿童", "老人",
|
||
]
|
||
terms = [term for term in known_terms if term in q]
|
||
terms.extend([t for t in _query_terms(q) if t not in terms])
|
||
seen: set[str] = set()
|
||
terms = [t for t in terms if not (t in seen or seen.add(t))][:8]
|
||
params: dict[str, Any] = {f"t{i}": term for i, term in enumerate(terms or [q[:20]])}
|
||
|
||
def contains_clause(alias: str, props: tuple[str, ...]) -> str:
|
||
clauses: list[str] = []
|
||
for i in range(len(params)):
|
||
for prop in props:
|
||
clauses.append(f"{alias}.{prop} CONTAINS $t{i}")
|
||
return " OR ".join(clauses) or "true"
|
||
|
||
duration_days = 0
|
||
for pattern in (r"(\d+)\s*(?:日游|天游)", r"(?:玩|计划|只能|安排)\s*(\d+)\s*天", r"(\d+)\s*天(?!星)"):
|
||
m = re.search(pattern, q)
|
||
if m:
|
||
duration_days = int(m.group(1))
|
||
break
|
||
duration_filter = f" AND p.duration_days = {duration_days}" if duration_days else ""
|
||
product_focus_filters: list[str] = []
|
||
if "镇梵" in q:
|
||
product_focus_filters.append(
|
||
"(p.name CONTAINS '镇梵' OR p.source_files CONTAINS '镇梵' OR "
|
||
"p.route_stop_sequence CONTAINS '镇远' OR p.route_stop_sequence CONTAINS '梵净山' OR "
|
||
"p.route_day_summary CONTAINS '镇远' OR p.route_day_summary CONTAINS '梵净山')"
|
||
)
|
||
if "青岩" in q:
|
||
product_focus_filters.append(
|
||
"(p.name CONTAINS '青岩' OR p.source_files CONTAINS '青岩' OR "
|
||
"p.route_stop_sequence CONTAINS '青岩' OR p.route_day_summary CONTAINS '青岩')"
|
||
)
|
||
product_focus_filter = (" AND " + " AND ".join(product_focus_filters)) if product_focus_filters else ""
|
||
|
||
core_route_where = " OR ".join([
|
||
contains_clause("p", ("name", "product_family", "route_stop_sequence", "route_day_summary", "source_files", "default_vehicle_type", "default_hotel_grade")),
|
||
contains_clause("d", ("name", "route_path", "meal_text", "accommodation_text", "route_stop_sequence")),
|
||
contains_clause("s", ("name", "city_or_area", "route_line_name", "route_sequence_label", "station_like_name")),
|
||
])
|
||
route_where = " OR ".join([
|
||
core_route_where,
|
||
contains_clause("a", ("name", "aliases", "city", "county")),
|
||
contains_clause("sub", ("name", "parent_attraction_name", "aliases")),
|
||
contains_clause("region", ("name", "city", "county")),
|
||
])
|
||
|
||
product_quote_words = ("报价", "团费", "基础价", "成人价", "儿童价", "成人结算", "儿童结算", "单房差", "结算")
|
||
product_context_words = ("线路", "行程", "路线", "产品", "日游", "天游", "人", "团", "公司", "学校", "酒店", "住宿", "费用")
|
||
if any(w in q for w in product_quote_words) or ("多少钱" in q and any(w in q for w in product_context_words)) or ("费用" in q and any(w in q for w in ("线路", "行程", "产品", "日游", "天游", "人", "团"))):
|
||
quote_where = " OR ".join([
|
||
route_where,
|
||
contains_clause("b", ("name", "binding_type", "status", "default_text", "notes", "price_formula", "quantity_formula", "pricing_scope")),
|
||
contains_clause("i", ("name", "type", "subtype", "price_text", "description", "scenic_name")),
|
||
])
|
||
return (
|
||
"MATCH (p:TourProduct)-[pd:PRODUCT_HAS_DAY]->(d:ProductDay)-[ds:DAY_HAS_STOP]->(s:RouteStop) "
|
||
f"WHERE ({core_route_where}){duration_filter}{product_focus_filter} "
|
||
"OPTIONAL MATCH (s)-[next:ROUTE_STOP_NEXT]->(s2:RouteStop) "
|
||
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
|
||
"OPTIONAL MATCH (s)-[ss:STOP_VISITS_SUB_ATTRACTION]->(sub:SubAttraction) "
|
||
"OPTIONAL MATCH (s)-[sr:STOP_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
|
||
"OPTIONAL MATCH (s)-[sb:STOP_HAS_TRAVEL_ITEM]->(b:ItineraryResourceBinding)-[bi:BINDING_BINDS_ITEM]->(i:TravelItem) "
|
||
f"RETURN p,pd,d,ds,s,next,s2,sa,a,ss,sub,sr,region,sb,b,bi,i LIMIT {limit}",
|
||
params,
|
||
"travel_item_product_quote",
|
||
"按固定线路产品查询基础团费报价依据,并带出必付/可选 TravelItem 以支持总价计算。",
|
||
)
|
||
|
||
if any(w in q for w in ("车辆", "车型", "用车", "小车", "商务车", "大巴", "中巴", "5座", "7座")):
|
||
return (
|
||
"MATCH (i:TravelItem) WHERE i.type = 'Vehicle' "
|
||
"OPTIONAL MATCH (p:TourProduct)-[r]->(i) "
|
||
f"RETURN p,r,i ORDER BY i.seat_count LIMIT {limit}",
|
||
params,
|
||
"travel_item_vehicle_resource",
|
||
"按 TravelItem 车辆资源直接查询车型、座位、图片和 price/unit 价格字段。",
|
||
)
|
||
|
||
if any(w in q for w in ("门票", "小交通", "电瓶车", "环保车", "观光车", "保险", "扶梯", "索道", "游船", "二消", "二次", "多少钱", "价格", "费用", "儿童价", "成人价")):
|
||
item_where = " OR ".join([
|
||
contains_clause("i", ("name", "type", "subtype", "category", "raw_evidence")),
|
||
contains_clause("a", ("name", "short_name", "city", "county")),
|
||
])
|
||
fee_focus_filter = ""
|
||
for focus in ("天星桥", "陡坡塘", "大瀑布", "黄果树", "小七孔", "西江", "梵净山", "镇远", "青岩"):
|
||
if focus in q:
|
||
fee_focus_filter = (
|
||
" AND (a.name CONTAINS '%s' OR a.short_name CONTAINS '%s' OR i.name CONTAINS '%s')"
|
||
) % (focus, focus, focus)
|
||
if focus == "天星桥":
|
||
break
|
||
break
|
||
return (
|
||
"MATCH (i:TravelItem) "
|
||
"OPTIONAL MATCH (a:ScenicAttraction)-[has:ATTRACTION_HAS_ITEM]->(i) "
|
||
"WHERE i.type IN ['Ticket','ScenicTransport','Insurance','Activity','Service','ScenicOptional'] "
|
||
f"AND ({item_where}){fee_focus_filter} "
|
||
f"RETURN a,has,i LIMIT {limit}",
|
||
params,
|
||
"travel_item_fee_binding",
|
||
"按 ScenicAttraction -> TravelItem 查询门票、小交通、保险和二次消费,价格来自 TravelItem price/adult_price/unit。",
|
||
)
|
||
|
||
if any(w in q for w in ("酒店", "住宿", "餐厅", "餐饮", "吃", "美食", "资源", "可选", "升级", "替换")):
|
||
type_filters: list[str] = []
|
||
if any(w in q for w in ("酒店", "住宿", "住")):
|
||
type_filters.append("i.type = 'Hotel'")
|
||
if any(w in q for w in ("餐厅", "餐饮", "吃", "美食")):
|
||
type_filters.append("i.type = 'Restaurant'")
|
||
type_where = f"AND ({' OR '.join(type_filters)})" if type_filters else "AND i.type IN ['Hotel','Restaurant','Vehicle']"
|
||
for focus in ("黄果树", "小七孔", "西江", "梵净山", "镇远", "青岩", "天星桥", "荔波", "安顺", "贵阳"):
|
||
if focus in q and type_filters:
|
||
return (
|
||
"MATCH (a:ScenicAttraction)<-[serve:ITEM_SERVES_ATTRACTION]-(i:TravelItem) "
|
||
"WHERE (a.name CONTAINS '%s' OR a.aliases CONTAINS '%s') %s "
|
||
"OPTIONAL MATCH (i)-[ir:ITEM_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
|
||
"RETURN a,serve,i,ir,region LIMIT %d"
|
||
% (focus, focus, type_where, limit),
|
||
params,
|
||
"travel_item_scenic_resource_pool",
|
||
"按景区锚点直接查询可服务该景区的酒店/餐饮资源池,再带出固定路线槽位。",
|
||
)
|
||
item_where = " OR ".join([
|
||
contains_clause("s", ("name", "city_or_area", "route_line_name", "route_sequence_label")),
|
||
contains_clause("b", ("name", "binding_type", "status", "default_text", "notes")),
|
||
contains_clause("i", ("name", "type", "subtype", "region_name", "address", "features", "signature_dishes", "price_text")),
|
||
])
|
||
return (
|
||
"MATCH (s:RouteStop)-[sb:STOP_HAS_TRAVEL_ITEM]->(b:ItineraryResourceBinding)-[bi:BINDING_BINDS_ITEM]->(i:TravelItem) "
|
||
f"WHERE ({item_where}) {type_where} "
|
||
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
|
||
"OPTIONAL MATCH (i)-[ir:ITEM_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
|
||
f"RETURN s,sb,b,bi,i,sa,a,ir,region LIMIT {limit}",
|
||
params,
|
||
"travel_item_resource_binding",
|
||
"按固定路线停靠点查询可替换/升级资源,资源本身是 TravelItem,包含/可选状态在绑定槽位上。",
|
||
)
|
||
|
||
if any(w in q for w in ("推荐", "线路", "行程", "路线", "几天", "天", "人", "纯玩", "产品")):
|
||
return (
|
||
"MATCH (p:TourProduct)-[pd:PRODUCT_HAS_DAY]->(d:ProductDay)-[ds:DAY_HAS_STOP]->(s:RouteStop) "
|
||
f"WHERE ({core_route_where}){duration_filter}{product_focus_filter} "
|
||
"OPTIONAL MATCH (s)-[next:ROUTE_STOP_NEXT]->(s2:RouteStop) "
|
||
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
|
||
"OPTIONAL MATCH (s)-[ss:STOP_VISITS_SUB_ATTRACTION]->(sub:SubAttraction) "
|
||
"OPTIONAL MATCH (s)-[sr:STOP_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
|
||
f"RETURN p,pd,d,ds,s,next,s2,sa,a,ss,sub,sr,region LIMIT {limit}",
|
||
params,
|
||
"travel_item_fixed_route",
|
||
"按 TourProduct -> ProductDay -> RouteStop 的固定路线骨架查询,停靠点像公交站一样保留顺序。",
|
||
)
|
||
|
||
return (
|
||
"MATCH (p:TourProduct)-[pd:PRODUCT_HAS_DAY]->(d:ProductDay)-[ds:DAY_HAS_STOP]->(s:RouteStop) "
|
||
f"WHERE ({core_route_where}){duration_filter}{product_focus_filter} "
|
||
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
|
||
"OPTIONAL MATCH (s)-[ss:STOP_VISITS_SUB_ATTRACTION]->(sub:SubAttraction) "
|
||
f"RETURN p,pd,d,ds,s,sa,a,ss,sub LIMIT {limit}",
|
||
params,
|
||
"travel_item_lookup",
|
||
"按固定线路、停靠点、景区和子景点查询。",
|
||
)
|
||
|
||
|
||
def _rule_based_cypher(question: str, limit: int) -> tuple[str, dict[str, Any], str] | None:
|
||
q = question.strip()
|
||
if "花溪公园" not in q:
|
||
return None
|
||
traffic_words = ("公交", "地铁", "交通", "怎么去", "到达", "坐车", "站")
|
||
knowledge_words = ("知识", "总览", "详情", "全部", "完整", "全貌")
|
||
event_words = ("历史", "事件", "沿革", "建园", "名人")
|
||
concept_words = ("概念", "主题", "文化", "夜游", "生态", "适合")
|
||
part_words = ("包含", "组成", "景点", "设施", "里面", "有哪些")
|
||
if any(w in q for w in traffic_words):
|
||
return (
|
||
"MATCH (p:Place {element_id:$eid})-[r:NEAR_TRANSIT]->(s:Place) "
|
||
"OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(s) "
|
||
f"RETURN p,r,s,b,r2 LIMIT {limit}",
|
||
{"eid": "amap:B035300A51"},
|
||
"rule_transit",
|
||
)
|
||
if any(w in q for w in knowledge_words) or ("事件" in q and "概念" in q):
|
||
return (
|
||
"MATCH (p:Place {element_id:$eid})-[r]->(m) "
|
||
"OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(m) "
|
||
f"RETURN p,r,m,b,r2 LIMIT {limit}",
|
||
{"eid": "amap:B035300A51"},
|
||
"rule_canonical_full_knowledge",
|
||
)
|
||
if any(w in q for w in event_words):
|
||
return (
|
||
"MATCH (p:Place {element_id:$eid})-[r:HAS_EVENT]->(e:Event) "
|
||
f"RETURN p,r,e LIMIT {limit}",
|
||
{"eid": "amap:B035300A51"},
|
||
"rule_event",
|
||
)
|
||
if any(w in q for w in concept_words):
|
||
return (
|
||
"MATCH (p:Place {element_id:$eid})-[r:HAS_CONCEPT]->(c:Concept) "
|
||
f"RETURN p,r,c LIMIT {limit}",
|
||
{"eid": "amap:B035300A51"},
|
||
"rule_concept",
|
||
)
|
||
if any(w in q for w in part_words):
|
||
return (
|
||
"MATCH (p:Place {element_id:$eid})-[r:HAS_PART]->(m) "
|
||
f"RETURN p,r,m LIMIT {limit}",
|
||
{"eid": "amap:B035300A51"},
|
||
"rule_part",
|
||
)
|
||
return (
|
||
"MATCH (p:Place {element_id:$eid})-[r]->(m) "
|
||
"OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(m) "
|
||
f"RETURN p,r,m,b,r2 LIMIT {limit}",
|
||
{"eid": "amap:B035300A51"},
|
||
"rule_canonical_full_knowledge",
|
||
)
|
||
|
||
|
||
async def _nl_query_client() -> LlmClient | None:
|
||
cfg = await get_agent_settings()
|
||
global_cfg = cfg.get("global") or {}
|
||
if global_cfg.get("base_url") and global_cfg.get("api_key"):
|
||
return LlmClient(
|
||
global_cfg["base_url"],
|
||
global_cfg["api_key"],
|
||
global_cfg.get("model") or "deepseek-chat",
|
||
timeout=int(global_cfg.get("timeout") or 30),
|
||
max_tokens=1200,
|
||
)
|
||
|
||
extract = cfg.get("extract") or {}
|
||
models = extract.get("models") or {}
|
||
keys = [extract.get("aggregator")] + [
|
||
k for k, v in models.items() if v.get("enabled")
|
||
] + list(models.keys())
|
||
for key in keys:
|
||
model_cfg = models.get(key or "")
|
||
if model_cfg and model_cfg.get("base_url") and model_cfg.get("api_key"):
|
||
return LlmClient(
|
||
model_cfg["base_url"],
|
||
model_cfg["api_key"],
|
||
model_cfg.get("model") or "deepseek-chat",
|
||
timeout=int(extract.get("timeout") or 60),
|
||
max_tokens=1200,
|
||
)
|
||
return None
|
||
|
||
|
||
def _canon_id(x: Any) -> str:
|
||
"""Canonical id = FalkorDB internal node id.
|
||
|
||
Used for BOTH nodes and edge endpoints so they always reference the
|
||
same id space (edge.src_node / dest_node may be a Node or an int).
|
||
"""
|
||
nid = getattr(x, "id", None)
|
||
if nid is not None:
|
||
return str(nid)
|
||
return str(x)
|
||
|
||
|
||
def _place_visual_group(props: dict[str, Any]) -> str:
|
||
station_type = props.get("station_type")
|
||
if station_type:
|
||
return STATION_TYPE_LABELS.get(str(station_type), str(station_type))
|
||
place_type = props.get("place_type")
|
||
if place_type:
|
||
return PLACE_TYPE_LABELS.get(str(place_type), str(place_type))
|
||
category = props.get("category")
|
||
if category:
|
||
return str(category)
|
||
return "地点"
|
||
|
||
|
||
def _event_visual_group(props: dict[str, Any]) -> str:
|
||
event_type = props.get("event_type")
|
||
if event_type:
|
||
return str(event_type)
|
||
return "Event"
|
||
|
||
|
||
# Property keys to try (in order) as a node's display label
|
||
_NAME_KEYS = (
|
||
"display_name", "name", "line_name", "title", "label",
|
||
"natural_key", "product_id", "scenic_area_id", "attraction_id",
|
||
"ticket_fee_id", "fee_item_id", "template_id", "theme", "tag_id", "area_id", "place_id",
|
||
)
|
||
|
||
|
||
def _node_dict(n: Any) -> dict:
|
||
labels = getattr(n, "labels", None) or ["Node"]
|
||
props = getattr(n, "properties", None) or {}
|
||
nid = _canon_id(n)
|
||
group = str(labels[0]) if labels else "Node"
|
||
visual_group = group
|
||
if group == "Place":
|
||
visual_group = _place_visual_group(props)
|
||
elif group == "Event":
|
||
visual_group = _event_visual_group(props)
|
||
|
||
disp = ""
|
||
station_type = props.get("station_type")
|
||
place_type = props.get("place_type")
|
||
if props.get("name") and station_type:
|
||
disp = f"{props.get('name')}({station_type})"
|
||
for k in _NAME_KEYS:
|
||
if disp:
|
||
break
|
||
v = props.get(k)
|
||
if v not in (None, ""):
|
||
disp = str(v)
|
||
break
|
||
if not disp:
|
||
disp = f"{group} #{nid}" # never a bare number
|
||
|
||
return {
|
||
"id": nid,
|
||
"label": disp,
|
||
"group": visual_group,
|
||
"properties": {
|
||
k: str(v)[:2000 if k in {"photo_urls", "image_url", "image_urls", "cover_url"} else 200]
|
||
for k, v in props.items()
|
||
},
|
||
}
|
||
|
||
|
||
def _is_shadow_node(n: Any) -> bool:
|
||
props = getattr(n, "properties", None) or {}
|
||
value = props.get("shadow_node")
|
||
return value is True or value == 1 or str(value).lower() in {"1", "true", "yes"}
|
||
|
||
|
||
def _edge_dict(e: Any) -> dict:
|
||
src = _canon_id(e.src_node)
|
||
dst = _canon_id(e.dest_node)
|
||
eid = getattr(e, "id", None)
|
||
rel = str(getattr(e, "relation", "") or "")
|
||
props = getattr(e, "properties", None) or {}
|
||
return {
|
||
"id": str(eid) if eid is not None else f"{src}-{rel}-{dst}",
|
||
"from": src,
|
||
"to": dst,
|
||
"label": rel,
|
||
"properties": {k: str(v)[:200] for k, v in props.items()},
|
||
}
|
||
|
||
|
||
def _extract(rows: list) -> tuple[list, list]:
|
||
"""Walk result rows, pulling out nodes/edges from Node, Edge and Path objects."""
|
||
nodes: list = []
|
||
edges: list = []
|
||
seen_n: set = set()
|
||
seen_e: set = set()
|
||
|
||
def add_node(n: Any) -> None:
|
||
if _is_shadow_node(n):
|
||
return
|
||
nid = _canon_id(n)
|
||
if nid not in seen_n:
|
||
seen_n.add(nid)
|
||
nodes.append(_node_dict(n))
|
||
|
||
def add_edge(e: Any) -> None:
|
||
src = getattr(e, "src_node", None)
|
||
dst = getattr(e, "dest_node", None)
|
||
if _is_shadow_node(src) or _is_shadow_node(dst):
|
||
return
|
||
d = _edge_dict(e)
|
||
if d["id"] not in seen_e:
|
||
seen_e.add(d["id"])
|
||
edges.append(d)
|
||
|
||
stack = list(rows)
|
||
while stack:
|
||
item = stack.pop()
|
||
# Path — exposes nodes()/edges() methods
|
||
if (
|
||
hasattr(item, "nodes")
|
||
and callable(getattr(item, "nodes"))
|
||
and hasattr(item, "edges")
|
||
and callable(getattr(item, "edges"))
|
||
):
|
||
for n in item.nodes():
|
||
add_node(n)
|
||
for e in item.edges():
|
||
add_edge(e)
|
||
# Edge — has src_node / dest_node
|
||
elif hasattr(item, "src_node") and hasattr(item, "dest_node"):
|
||
add_edge(item)
|
||
# Node — has labels + properties
|
||
elif hasattr(item, "labels") and hasattr(item, "properties"):
|
||
add_node(item)
|
||
# Result row / collection — unpack and keep walking
|
||
elif isinstance(item, (list, tuple)):
|
||
stack.extend(item)
|
||
|
||
visible_ids = {n["id"] for n in nodes}
|
||
edges = [e for e in edges if e["from"] in visible_ids and e["to"] in visible_ids]
|
||
return nodes, edges
|
||
|
||
|
||
@router.get("/graph/overview")
|
||
def graph_overview(
|
||
context: ProjectContext = Depends(get_project_context),
|
||
_user: CurrentUser = None,
|
||
):
|
||
try:
|
||
g = _get_graph(context.graph_name)
|
||
result = g.query(
|
||
"MATCH (n) WHERE n.shadow_node IS NULL OR n.shadow_node <> 1 RETURN count(n) AS cnt"
|
||
)
|
||
node_count = result.result_set[0][0] if result.result_set else 0
|
||
result = g.query(
|
||
"MATCH (a)-[r]->(b) "
|
||
"WHERE (a.shadow_node IS NULL OR a.shadow_node <> 1) "
|
||
"AND (b.shadow_node IS NULL OR b.shadow_node <> 1) "
|
||
"RETURN count(r) AS cnt"
|
||
)
|
||
rel_count = result.result_set[0][0] if result.result_set else 0
|
||
|
||
label_result = g.query(
|
||
"MATCH (n) WHERE n.shadow_node IS NULL OR n.shadow_node <> 1 "
|
||
"RETURN labels(n) AS labels, count(n) AS count LIMIT 100"
|
||
)
|
||
label_counts = [
|
||
{"label": r[0][0] if r[0] else "(no label)", "count": r[1]}
|
||
for r in label_result.result_set
|
||
]
|
||
visual_counts: dict[str, int] = {}
|
||
for item in label_counts:
|
||
label = item["label"]
|
||
if label in {"Place", "Event"}:
|
||
continue
|
||
visual_counts[label] = visual_counts.get(label, 0) + int(item["count"] or 0)
|
||
|
||
place_split_result = g.query(
|
||
"MATCH (p:Place) WHERE p.shadow_node IS NULL OR p.shadow_node <> 1 "
|
||
"RETURN p.place_type AS place_type, p.station_type AS station_type, "
|
||
"p.category AS category, count(p) AS count LIMIT 200"
|
||
)
|
||
for place_type, station_type, category, count in place_split_result.result_set:
|
||
label = _place_visual_group({
|
||
"place_type": place_type,
|
||
"station_type": station_type,
|
||
"category": category,
|
||
})
|
||
visual_counts[label] = visual_counts.get(label, 0) + int(count or 0)
|
||
event_split_result = g.query(
|
||
"MATCH (e:Event) WHERE e.shadow_node IS NULL OR e.shadow_node <> 1 "
|
||
"RETURN e.event_type AS event_type, count(e) AS count LIMIT 100"
|
||
)
|
||
for event_type, count in event_split_result.result_set:
|
||
label = _event_visual_group({"event_type": event_type})
|
||
visual_counts[label] = visual_counts.get(label, 0) + int(count or 0)
|
||
visual_label_counts = [
|
||
{"label": label, "count": count}
|
||
for label, count in sorted(visual_counts.items(), key=lambda x: x[1], reverse=True)
|
||
]
|
||
|
||
rel_result = g.query(
|
||
"MATCH (a)-[r]->(b) "
|
||
"WHERE (a.shadow_node IS NULL OR a.shadow_node <> 1) "
|
||
"AND (b.shadow_node IS NULL OR b.shadow_node <> 1) "
|
||
"RETURN type(r) AS type, count(r) AS count LIMIT 100"
|
||
)
|
||
rel_counts = [
|
||
{"type": r[0], "count": r[1]}
|
||
for r in rel_result.result_set
|
||
]
|
||
|
||
return {
|
||
"graph_name": context.graph_name,
|
||
"connected": True,
|
||
"node_count": node_count,
|
||
"relationship_count": rel_count,
|
||
"label_counts": label_counts,
|
||
"visual_label_counts": visual_label_counts,
|
||
"relationship_type_counts": rel_counts,
|
||
}
|
||
except Exception as e:
|
||
return {"connected": False, "error": str(e)}
|
||
|
||
|
||
@router.post("/graph/query")
|
||
def graph_query(
|
||
body: dict,
|
||
context: ProjectContext = Depends(get_project_context),
|
||
_user: CurrentUser = None,
|
||
):
|
||
cypher = (body.get("cypher") or "").strip().rstrip(";")
|
||
limit = min(max(int(body.get("limit", 50)), 1), GRAPH_QUERY_MAX_LIMIT)
|
||
graph_name = str(body.get("graph_name") or context.graph_name)
|
||
|
||
try:
|
||
data = _execute_graph_query(cypher, limit, graph_name=graph_name)
|
||
data.update({"mode": "cypher"})
|
||
return data
|
||
except Exception as e:
|
||
raise HTTPException(400, f"Query failed: {str(e)[:300]}")
|
||
|
||
|
||
@router.post("/graph/nl-query")
|
||
async def graph_nl_query(
|
||
body: dict,
|
||
context: ProjectContext = Depends(get_project_context),
|
||
_user: CurrentUser = None,
|
||
):
|
||
started_at = time.perf_counter()
|
||
query = (body.get("query") or body.get("text") or "").strip().rstrip(";")
|
||
limit = min(max(int(body.get("limit", 100)), 1), GRAPH_QUERY_MAX_LIMIT)
|
||
graph_name = str(body.get("graph_name") or context.graph_name)
|
||
use_llm = body.get("use_llm") is True
|
||
|
||
def finish(data: dict) -> dict:
|
||
data.setdefault("trace", {})
|
||
data["trace"].update({
|
||
"latency_ms": max(1, round((time.perf_counter() - started_at) * 1000)),
|
||
"performance_target_ms": 1200,
|
||
"use_llm": use_llm,
|
||
})
|
||
data.setdefault("latency_ms", data["trace"]["latency_ms"])
|
||
return data
|
||
|
||
if not query:
|
||
raise HTTPException(400, "Query text required")
|
||
|
||
if READ_START.match(query):
|
||
try:
|
||
data = _execute_graph_query(query, limit, graph_name=graph_name)
|
||
data.update({"mode": "cypher", "input": query})
|
||
return finish(data)
|
||
except Exception as e:
|
||
raise HTTPException(400, f"Query failed: {str(e)[:300]}")
|
||
|
||
if _is_travel_item_graph_name(graph_name):
|
||
travel_item_ruled = _travel_item_rule_based_cypher(query, limit)
|
||
if travel_item_ruled:
|
||
cypher, params, rule_mode, reason = travel_item_ruled
|
||
try:
|
||
data = _execute_graph_query(cypher, limit, params=params, graph_name=graph_name)
|
||
data.update({
|
||
"mode": rule_mode,
|
||
"input": query,
|
||
"reason": reason,
|
||
})
|
||
return finish(data)
|
||
except Exception as e:
|
||
llm_error = str(e)[:180]
|
||
else:
|
||
llm_error = ""
|
||
|
||
travel_ruled = _travel_rule_based_cypher(query, limit)
|
||
if travel_ruled and ("travel" in graph_name.lower() or "旅行" in query or "旅游" in query):
|
||
cypher, params, rule_mode, reason = travel_ruled
|
||
try:
|
||
data = _execute_graph_query(cypher, limit, params=params, graph_name=graph_name)
|
||
data.update({
|
||
"mode": rule_mode,
|
||
"input": query,
|
||
"reason": reason,
|
||
})
|
||
return finish(data)
|
||
except Exception as e:
|
||
llm_error = str(e)[:180]
|
||
else:
|
||
llm_error = ""
|
||
|
||
ruled = _rule_based_cypher(query, limit)
|
||
if ruled:
|
||
cypher, params, rule_mode = ruled
|
||
try:
|
||
data = _execute_graph_query(cypher, limit, params=params, graph_name=graph_name)
|
||
data.update({
|
||
"mode": rule_mode,
|
||
"input": query,
|
||
"reason": "命中已对齐的高德景区主节点,避免同名公交/地铁站被当成第二个公园。",
|
||
})
|
||
return finish(data)
|
||
except Exception as e:
|
||
# Fall through to LLM/fallback if the deterministic rule fails.
|
||
llm_error = str(e)[:180]
|
||
else:
|
||
llm_error = ""
|
||
|
||
llm_error = ""
|
||
generated_cypher = ""
|
||
if use_llm:
|
||
try:
|
||
client = await _nl_query_client()
|
||
if client is None:
|
||
raise RuntimeError("LLM 未配置")
|
||
payload = {
|
||
"user_question": query,
|
||
"graph_schema": _graph_schema_hint(graph_name),
|
||
}
|
||
decided = client.chat_json(NL_TO_CYPHER_SYS, json.dumps(payload, ensure_ascii=False))
|
||
generated_cypher = str(decided.get("cypher") or "").strip().rstrip(";")
|
||
generated_cypher = _ensure_limit(generated_cypher, limit)
|
||
_assert_read_only(generated_cypher)
|
||
data = _execute_graph_query(generated_cypher, limit, graph_name=graph_name)
|
||
if data["nodes"]:
|
||
data.update({
|
||
"mode": "llm_cypher",
|
||
"input": query,
|
||
"reason": decided.get("reason") or "",
|
||
})
|
||
return finish(data)
|
||
llm_error = "LLM Cypher 查询无结果,已转文本召回"
|
||
except Exception as e: # noqa: BLE001
|
||
llm_error = str(e)[:180]
|
||
else:
|
||
llm_error = "未启用 LLM,直接走文本召回快路径"
|
||
|
||
try:
|
||
fallback, params = _fallback_cypher(query, limit)
|
||
data = _execute_graph_query(fallback, limit, params=params, graph_name=graph_name)
|
||
data.update({
|
||
"mode": "text_recall",
|
||
"input": query,
|
||
"generated_cypher": generated_cypher,
|
||
"fallback_reason": llm_error,
|
||
})
|
||
return finish(data)
|
||
except Exception as e:
|
||
raise HTTPException(400, f"NL query failed: {str(e)[:300]}")
|