Files
bxh/app/api/graph.py

1041 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Graph Browser API — FalkorDB Cypher queries + natural-language search."""
import re
import json
import time
from typing import Any
from falkordb import FalkorDB
from fastapi import APIRouter, Depends, HTTPException
from app.auth import CurrentUser
from app.config import settings
from app.db import get_agent_settings
from app.llm_client import LlmClient
from app.project_context import ProjectContext, get_project_context
router = APIRouter()
GRAPH_QUERY_MAX_LIMIT = 15000
WRITE_KEYWORDS = re.compile(
r"\b(CREATE|MERGE|SET|DELETE|DETACH|DROP|REMOVE|CALL\s+DB\.IDX|CALL\s+DB\.CONSTRAINT)\b",
re.IGNORECASE,
)
READ_START = re.compile(r"^(MATCH|RETURN|CALL)\b", re.IGNORECASE)
TEXT_PROPS = (
# Keep fallback search on fields that are consistently scalar strings.
# Some graph properties are arrays/lists; RedisGraph CONTAINS on them raises
# a type error, so broad "search every property" is not safe here.
"name", "title", "label", "display_name", "description",
"summary", "history", "features", "address", "district",
"category", "place_type", "event_type", "concept_type",
"product_family", "product_type", "group_mode", "vehicle_layout",
"hotel_grade", "meal_standard", "service_promise", "included_summary",
"excluded_summary", "booking_notes", "risk_notes", "route_summary",
"quote_summary", "answer_hint", "demand_summary", "special_care",
"lead_source", "vehicle_type", "room_type", "refund_policy",
"rule_text", "message_template", "trigger_scenario", "origin_text",
"destination_text", "meal_scene", "signature_dishes",
"applicable_products", "region",
"fee_name", "fee_type", "price_text", "scenic_target_name",
"parent_scenic_area_name", "route_role", "recommendation_note",
"admin_region_name", "amap_name",
"item_id", "type", "subtype", "binding_type", "status", "default_text",
"price_formula", "station_like_name", "route_line_name", "route_sequence_label",
"base_price_status", "base_price_text", "price_options", "mandatory_fee_text",
"ticket_refund_policy", "quote_formula", "pricing_notes", "pricing_source_files",
)
PLACE_TYPE_LABELS = {
"sight": "景点",
"eat": "美食",
"hotel": "酒店",
"mall": "商场",
"transit_stop": "交通站点",
}
STATION_TYPE_LABELS = {
"公交站": "公交站",
"地铁站": "地铁站",
}
NL_TO_CYPHER_SYS = """你是城市知识图谱查询助手。请把用户自然语言问题转换成 FalkorDB/RedisGraph 只读 Cypher。
硬性要求:
1. 只能生成只读查询,必须以 MATCH/RETURN/CALL 开头。
2. 禁止 CREATE、MERGE、SET、DELETE、DETACH、DROP、REMOVE、索引/约束操作。
3. 查询必须 RETURN 节点、关系或路径,方便前端图谱可视化。
4. 默认 LIMIT 100除非用户明确要更少。
5. 优先用 CONTAINS 做中文名称匹配,避免要求用户输入精确 ID。
6. 如果用户问某地点的事件,优先查 HAS_EVENT问主题/适合/夜游/历史文化,优先查 HAS_CONCEPT问组成/景点/设施,优先查 HAS_PART。
7. 同名地点优先选择 place_type='sight'、source='amap' 或带 element_id 的高德主 POI避免选择 shadow_node=1 的抽取临时节点。
常见例子:
- “花溪公园有哪些历史事件”
MATCH (p:Place)-[r:HAS_EVENT]->(e:Event) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' RETURN p,r,e LIMIT 100
- “花溪公园有哪些概念”
MATCH (p:Place)-[r:HAS_CONCEPT]->(c:Concept) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' RETURN p,r,c LIMIT 100
- “花溪公园包含哪些景点”
MATCH (p:Place)-[r:HAS_PART]->(m) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' RETURN p,r,m LIMIT 100
- “花溪公园附近有哪些公交地铁可以到”
MATCH (p:Place)-[r:NEAR_TRANSIT]->(s:Place) WHERE p.name CONTAINS '花溪公园' AND p.place_type = 'sight' OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(s) RETURN p,r,s,b,r2 LIMIT 100
- “查看花溪公园”
MATCH (p:Place {element_id:'amap:B035300A51'})-[r]->(m) OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(m) RETURN p,r,m,b,r2 LIMIT 100
只输出 JSON
{"cypher":"...","reason":"一句话说明"}"""
def _get_graph(graph_name: str | None = None):
db = FalkorDB(host=settings.falkordb_host, port=settings.falkordb_port)
return db.select_graph(graph_name or settings.falkordb_graph)
def _is_travel_item_graph_name(graph_name: str | None) -> bool:
name = (graph_name or "").lower()
return (
name == "travel_fixed_route_item"
or "fixed_route" in name
or "baixinghui" in name
or "travel_agency_2_0" in name
)
def _assert_read_only(cypher: str) -> None:
if not cypher:
raise HTTPException(400, "Cypher query required")
if WRITE_KEYWORDS.search(cypher):
raise HTTPException(400, "Read-only queries only (no CREATE/MERGE/DELETE/DROP)")
if not READ_START.match(cypher):
raise HTTPException(400, "Only MATCH, RETURN, or CALL queries allowed")
def _ensure_limit(cypher: str, limit: int) -> str:
if re.search(r"\bLIMIT\s+\d+\b", cypher, flags=re.IGNORECASE):
return cypher
return f"{cypher} LIMIT {limit}"
def _execute_graph_query(
cypher: str,
limit: int,
params: dict[str, Any] | None = None,
*,
graph_name: str | None = None,
) -> dict:
_assert_read_only(cypher)
g = _get_graph(graph_name)
result = g.query(cypher, params) if params else g.query(cypher)
rows = list(result.result_set)[:limit]
header = [h[1] for h in (result.header or [])]
nodes, edges = _extract(rows)
return {
"graph_name": graph_name or settings.falkordb_graph,
"cypher": cypher,
"columns": header,
"rows": rows[:20],
"row_count": len(rows),
"nodes": nodes,
"edges": edges,
}
def _graph_schema_hint(graph_name: str | None = None) -> dict:
try:
g = _get_graph(graph_name)
labels = g.query(
"MATCH (n) RETURN labels(n) AS labels, count(n) AS count LIMIT 80"
).result_set
rels = g.query(
"MATCH ()-[r]->() RETURN type(r) AS type, count(r) AS count LIMIT 80"
).result_set
return {
"labels": [{"label": r[0][0] if r[0] else "", "count": r[1]} for r in labels],
"relations": [{"type": r[0], "count": r[1]} for r in rels],
}
except Exception:
return {"labels": [], "relations": []}
def _query_terms(text: str) -> list[str]:
raw = text.strip()
stop_words = (
"查询", "查看", "看看", "请问", "帮我", "给我", "一下",
"有哪些", "有什么", "哪些", "什么", "相关", "关系", "节点",
"图谱", "数据", "里面", "历史事件", "历史", "事件", "概念",
"组成", "包含", "景点", "设施", "附近", "推荐",
)
terms: list[str] = []
for chunk in re.findall(r"[\u4e00-\u9fffA-Za-z0-9_:-]{2,}", raw):
reduced = chunk
for word in stop_words:
reduced = reduced.replace(word, " ")
for part in re.split(r"\s+", reduced):
part = part.strip()
if len(part) >= 2:
terms.append(part)
if "花溪公园" in raw:
terms.insert(0, "花溪公园")
if not terms and raw:
terms.append(raw[:24])
seen: set[str] = set()
out: list[str] = []
for term in terms:
if term not in seen:
seen.add(term)
out.append(term)
return out[:5]
def _fallback_cypher(question: str, limit: int) -> tuple[str, dict[str, Any]]:
terms = _query_terms(question)
if not terms:
return "MATCH (n)-[r]->(m) RETURN n,r,m LIMIT 100", {}
clauses: list[str] = []
params: dict[str, Any] = {}
for i, term in enumerate(terms):
key = f"t{i}"
params[key] = term
clauses.extend([f"n.{prop} CONTAINS ${key}" for prop in TEXT_PROPS])
where = " OR ".join(clauses)
cypher = (
f"MATCH (n) WHERE {where} "
"OPTIONAL MATCH (n)-[r]-(m) "
f"RETURN n,r,m LIMIT {limit}"
)
return cypher, params
def _travel_rule_based_cypher(question: str, limit: int) -> tuple[str, dict[str, Any], str, str] | None:
q = question.strip()
if not any(w in q for w in ("旅行", "旅游", "线路", "行程", "推荐", "接送", "酒店", "住宿", "餐厅", "", "报价", "退费", "老人", "儿童", "孕妇", "话术", "客服", "黄果树", "小七孔", "西江", "梵净山")):
return None
known_terms = [
"黄果树", "小七孔", "西江", "梵净山", "青岩", "镇远", "百里杜鹃", "平坝樱花",
"兴义", "花江大桥", "机场", "北站", "火车站", "东站", "观山湖", "市区",
"白云", "贵阳", "7座", "5座", "商务", "保姆车", "2+1", "四钻", "4钻",
"四星", "五钻", "5钻", "五星", "接机", "晚班", "老人", "孕妇", "儿童",
"行李", "退费", "自费", "小交通", "纯玩", "费用包含", "留资", "微信", "小红书",
]
terms = [term for term in known_terms if term in q]
if "4钻" in q or "四钻" in q:
terms.extend(["四星", "4钻", "四钻"])
if "5钻" in q or "五钻" in q:
terms.extend(["五星", "5钻", "五钻"])
terms.extend([t for t in _query_terms(q) if t not in terms])
seen_terms: set[str] = set()
terms = [t for t in terms if not (t in seen_terms or seen_terms.add(t))]
scenic_terms = [t for t in ("黄果树", "小七孔", "西江", "梵净山", "青岩", "镇远", "百里杜鹃", "平坝樱花", "兴义", "花江大桥") if t in q]
if scenic_terms:
terms = scenic_terms + [t for t in terms if t not in scenic_terms]
params: dict[str, Any] = {f"t{i}": term for i, term in enumerate(terms[:8])}
def contains_clause(alias: str, props: tuple[str, ...]) -> str:
clauses: list[str] = []
for i in range(len(params)):
for prop in props:
clauses.append(f"{alias}.{prop} CONTAINS $t{i}")
return " OR ".join(clauses) or "true"
if any(w in q for w in ("门票", "小交通", "电瓶车", "环保车", "观光车", "保险", "扶梯", "索道", "游船", "自费", "二消", "二次", "儿童价", "成人价", "免费", "多少钱", "价格")):
where = " OR ".join([
contains_clause("x", ("name", "admin_region_name")),
contains_clause("f", ("name", "fee_name", "fee_type", "price_text", "inclusion_status", "scenic_target_name", "source_product_name", "rule_text")),
])
return (
"MATCH (x)-[r]->(f:TicketFee) "
"WHERE (type(r) = 'SCENIC_AREA_HAS_FEE' OR type(r) = 'ATTRACTION_HAS_FEE' OR type(r) = 'PRODUCT_HAS_FEE') "
f"AND ({where}) "
f"RETURN x,r,f LIMIT {limit}",
params,
"travel_structured_fee",
"按景区/景点下的门票、小交通、保险和二次消费项目查询。",
)
if any(w in q for w in ("退费", "老人", "儿童", "孕妇", "限制", "不接待", "行李", "年龄", "自费", "小交通")):
where = contains_clause("rule", ("name", "rule_type", "applies_to", "rule_text", "severity"))
return (
f"MATCH (rule:PolicyRule) WHERE {where} "
"OPTIONAL MATCH (p)-[r:HAS_POLICY]->(rule) "
f"RETURN p,r,rule LIMIT {limit}",
params,
"travel_policy_rule",
"按退费、年龄、儿童、孕妇、行李和自费规则查询。",
)
if any(w in q for w in ("话术", "客服", "回复", "加微信", "留资", "怎么说", "怎么回")):
where = contains_clause("s", ("name", "channel", "funnel_stage", "trigger_scenario", "message_template", "intent_tags"))
return (
f"MATCH (s:SalesScript) WHERE {where} "
"OPTIONAL MATCH (s)-[r:FROM_SOURCE]->(c:SalesChannel) "
f"RETURN s,r,c LIMIT {limit}",
params,
"travel_sales_script",
"按渠道、转化阶段和触发场景查询客服话术。",
)
if any(w in q for w in ("附近", "车程", "距离", "多久", "酒店", "住宿", "餐厅", "", "资源")):
generic_terms = {
"附近", "车程", "距离", "多久", "酒店", "住宿", "餐厅", "餐饮", "", "资源",
"哪些", "有哪", "有什么", "多少", "多远", "多久",
}
search_terms = [
term for term in terms
if term not in generic_terms and not any(g in term for g in generic_terms)
]
local_params = {f"t{i}": term for i, term in enumerate((search_terms or terms)[:6])}
def local_contains(alias: str, props: tuple[str, ...]) -> str:
clauses: list[str] = []
for i in range(len(local_params)):
for prop in props:
clauses.append(f"{alias}.{prop} CONTAINS $t{i}")
return " OR ".join(clauses) or "true"
where = " OR ".join([
local_contains("a", ("name", "amap_name", "admin_region_name")),
local_contains("child", ("name", "amap_name", "admin_region_name", "parent_scenic_area_name")),
local_contains("x", ("name", "amap_name", "admin_region_name", "city_or_area", "features", "signature_dishes")),
])
type_filters: list[str] = []
if any(w in q for w in ("酒店", "住宿", "")):
type_filters.append("x:HotelResource")
if any(w in q for w in ("餐厅", "餐饮", "", "美食")):
type_filters.append("x:RestaurantResource")
type_where = f" AND ({' OR '.join(type_filters)})" if type_filters else ""
return (
"MATCH (a)-[r:NEARBY_LOCATION_RESOURCE]->(x) "
"OPTIONAL MATCH (a)-[ar:SCENIC_AREA_HAS_ATTRACTION]->(child:ScenicAttraction) "
f"WHERE ({where}){type_where} "
"RETURN a,r,x,ar,child ORDER BY r.drive_duration_min "
f"LIMIT {limit}",
local_params,
"travel_nearby_resource",
"按景区片区/子景点归属查询合作酒店餐饮,并返回高德驾车距离与车程时间。",
)
if any(w in q for w in ("推荐", "线路", "行程", "方案", "几天", "", "", "保姆车", "纯玩", "报价")):
where = " OR ".join([
contains_clause("p", ("name", "product_family", "route_stop_sequence", "route_day_summary", "default_vehicle_type", "default_hotel_grade", "service_promise", "included_summary", "excluded_summary")),
contains_clause("d", ("name", "title", "route_path", "route_stop_sequence", "meal_text", "accommodation_text")),
contains_clause("s", ("name", "city_or_area", "route_line_name", "route_sequence_label", "evidence_text")),
contains_clause("a", ("name", "parent_scenic_area_name", "route_role")),
contains_clause("sa", ("name", "management_note")),
contains_clause("region", ("name", "city", "county")),
])
return (
"MATCH (p:TourProduct)-[lineStop:PRODUCT_HAS_ORDERED_STOP]->(s:RouteStop) "
"OPTIONAL MATCH (s)-[next:ROUTE_STOP_NEXT]->(s2:RouteStop) "
"OPTIONAL MATCH (p)-[r1:HAS_DAY]->(d:ProductDay)-[r2:DAY_HAS_STOP]->(s) "
"OPTIONAL MATCH (s)-[r3:STOP_VISITS_SCENIC_AREA]->(sa:ScenicArea) "
"OPTIONAL MATCH (s)-[r4:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
"OPTIONAL MATCH (s)-[sr:STOP_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
f"WHERE {where} "
f"RETURN p,lineStop,s,next,s2,r1,d,r2,r3,sa,r4,a,sr,region LIMIT {limit}",
params,
"travel_existing_product_route",
"按已有路线产品的全程有序停靠点、每日行程、景区片区和行政区查询。",
)
if any(w in q for w in ("接送", "机场", "北站", "火车站", "东站", "观山湖", "市区")):
where = contains_clause("q", ("name", "origin_text", "destination_text", "vehicle_type", "quote_notes"))
return (
"MATCH (q:TransferQuote)-[r:USES_VEHICLE]->(v:VehicleService) "
f"WHERE {where} RETURN q,r,v LIMIT {limit}",
params,
"travel_transfer_quote",
"按接送报价、出发地/目的地和车型查询。",
)
if any(w in q for w in ("酒店", "住宿", "", "四钻", "4钻", "五钻", "5钻", "晚班")):
where = contains_clause("h", ("name", "hotel_grade", "region", "address", "features", "applicable_products"))
return (
f"MATCH (h:HotelResource) WHERE {where} "
"OPTIONAL MATCH (h)-[r:LOCATED_IN_REGION]->(a:AdministrativeRegion) "
f"RETURN h,r,a LIMIT {limit}",
params,
"travel_hotel_resource",
"按酒店等级、区域、特点和适用产品查询。",
)
if any(w in q for w in ("餐厅", "", "特色餐", "酸汤鱼", "餐饮", "用餐")):
where = contains_clause("m", ("name", "region", "address", "signature_dishes", "meal_scene", "per_capita_price_text"))
return (
f"MATCH (m:RestaurantResource) WHERE {where} "
"OPTIONAL MATCH (m)-[r:LOCATED_IN_REGION]->(a:AdministrativeRegion) "
f"RETURN m,r,a LIMIT {limit}",
params,
"travel_restaurant_resource",
"按餐厅区域、菜品和适用场景查询。",
)
if any(w in q for w in ("退费", "老人", "儿童", "孕妇", "限制", "不接待", "行李", "年龄", "自费", "小交通")):
where = contains_clause("rule", ("name", "rule_type", "applies_to", "rule_text", "severity"))
return (
f"MATCH (rule:PolicyRule) WHERE {where} "
"OPTIONAL MATCH (p)-[r:HAS_POLICY]->(rule) "
f"RETURN p,r,rule LIMIT {limit}",
params,
"travel_policy_rule",
"按退费、年龄、儿童、孕妇、行李和自费规则查询。",
)
if any(w in q for w in ("话术", "客服", "回复", "加微信", "留资", "怎么说", "怎么回")):
where = contains_clause("s", ("name", "channel", "funnel_stage", "trigger_scenario", "message_template", "intent_tags"))
return (
f"MATCH (s:SalesScript) WHERE {where} "
"OPTIONAL MATCH (s)-[r:FROM_SOURCE]->(c:SalesChannel) "
f"RETURN s,r,c LIMIT {limit}",
params,
"travel_sales_script",
"按渠道、转化阶段和触发场景查询客服话术。",
)
where = contains_clause("p", ("name", "product_family", "group_mode", "vehicle_layout", "hotel_grade", "service_promise", "route_summary"))
return (
f"MATCH (p:TourProduct) WHERE {where} "
"OPTIONAL MATCH (p)-[r]->(m) "
f"RETURN p,r,m LIMIT {limit}",
params,
"travel_product_lookup",
"按线路产品名称、景点、车型和服务承诺查询。",
)
def _travel_item_rule_based_cypher(question: str, limit: int) -> tuple[str, dict[str, Any], str, str] | None:
q = question.strip()
if not any(w in q for w in ("旅行", "旅游", "线路", "路线", "行程", "推荐", "酒店", "住宿", "餐厅", "", "费用", "价格", "车辆", "车型", "用车", "门票", "儿童", "老人", "黄小西", "小西", "镇梵", "黄果树", "小七孔", "西江", "梵净山", "天星桥", "青岩")):
return None
known_terms = [
"黄小西", "小西", "镇梵", "黄万马", "黄果树", "天星桥", "陡坡塘", "小七孔", "荔波", "西江", "梵净山", "青岩",
"镇远", "百里杜鹃", "平坝樱花", "兴义", "贵阳", "安顺", "酒店", "餐厅",
"车辆", "门票", "电瓶车", "环保车", "保险", "扶梯", "游船", "儿童", "老人",
]
terms = [term for term in known_terms if term in q]
terms.extend([t for t in _query_terms(q) if t not in terms])
seen: set[str] = set()
terms = [t for t in terms if not (t in seen or seen.add(t))][:8]
params: dict[str, Any] = {f"t{i}": term for i, term in enumerate(terms or [q[:20]])}
def contains_clause(alias: str, props: tuple[str, ...]) -> str:
clauses: list[str] = []
for i in range(len(params)):
for prop in props:
clauses.append(f"{alias}.{prop} CONTAINS $t{i}")
return " OR ".join(clauses) or "true"
duration_days = 0
for pattern in (r"(\d+)\s*(?:日游|天游)", r"(?:玩|计划|只能|安排)\s*(\d+)\s*天", r"(\d+)\s*天(?!星)"):
m = re.search(pattern, q)
if m:
duration_days = int(m.group(1))
break
duration_filter = f" AND p.duration_days = {duration_days}" if duration_days else ""
product_focus_filters: list[str] = []
if "镇梵" in q:
product_focus_filters.append(
"(p.name CONTAINS '镇梵' OR p.source_files CONTAINS '镇梵' OR "
"p.route_stop_sequence CONTAINS '镇远' OR p.route_stop_sequence CONTAINS '梵净山' OR "
"p.route_day_summary CONTAINS '镇远' OR p.route_day_summary CONTAINS '梵净山')"
)
if "青岩" in q:
product_focus_filters.append(
"(p.name CONTAINS '青岩' OR p.source_files CONTAINS '青岩' OR "
"p.route_stop_sequence CONTAINS '青岩' OR p.route_day_summary CONTAINS '青岩')"
)
product_focus_filter = (" AND " + " AND ".join(product_focus_filters)) if product_focus_filters else ""
core_route_where = " OR ".join([
contains_clause("p", ("name", "product_family", "route_stop_sequence", "route_day_summary", "source_files", "default_vehicle_type", "default_hotel_grade")),
contains_clause("d", ("name", "route_path", "meal_text", "accommodation_text", "route_stop_sequence")),
contains_clause("s", ("name", "city_or_area", "route_line_name", "route_sequence_label", "station_like_name")),
])
route_where = " OR ".join([
core_route_where,
contains_clause("a", ("name", "aliases", "city", "county")),
contains_clause("sub", ("name", "parent_attraction_name", "aliases")),
contains_clause("region", ("name", "city", "county")),
])
product_quote_words = ("报价", "团费", "基础价", "成人价", "儿童价", "成人结算", "儿童结算", "单房差", "结算")
product_context_words = ("线路", "行程", "路线", "产品", "日游", "天游", "", "", "公司", "学校", "酒店", "住宿", "费用")
if any(w in q for w in product_quote_words) or ("多少钱" in q and any(w in q for w in product_context_words)) or ("费用" in q and any(w in q for w in ("线路", "行程", "产品", "日游", "天游", "", ""))):
quote_where = " OR ".join([
route_where,
contains_clause("b", ("name", "binding_type", "status", "default_text", "notes", "price_formula", "quantity_formula", "pricing_scope")),
contains_clause("i", ("name", "type", "subtype", "price_text", "description", "scenic_name")),
])
return (
"MATCH (p:TourProduct)-[pd:PRODUCT_HAS_DAY]->(d:ProductDay)-[ds:DAY_HAS_STOP]->(s:RouteStop) "
f"WHERE ({core_route_where}){duration_filter}{product_focus_filter} "
"OPTIONAL MATCH (s)-[next:ROUTE_STOP_NEXT]->(s2:RouteStop) "
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
"OPTIONAL MATCH (s)-[ss:STOP_VISITS_SUB_ATTRACTION]->(sub:SubAttraction) "
"OPTIONAL MATCH (s)-[sr:STOP_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
"OPTIONAL MATCH (s)-[sb:STOP_HAS_TRAVEL_ITEM]->(b:ItineraryResourceBinding)-[bi:BINDING_BINDS_ITEM]->(i:TravelItem) "
f"RETURN p,pd,d,ds,s,next,s2,sa,a,ss,sub,sr,region,sb,b,bi,i LIMIT {limit}",
params,
"travel_item_product_quote",
"按固定线路产品查询基础团费报价依据,并带出必付/可选 TravelItem 以支持总价计算。",
)
if any(w in q for w in ("车辆", "车型", "用车", "小车", "商务车", "大巴", "中巴", "5座", "7座")):
return (
"MATCH (i:TravelItem) WHERE i.type = 'Vehicle' "
"OPTIONAL MATCH (p:TourProduct)-[r]->(i) "
f"RETURN p,r,i ORDER BY i.seat_count LIMIT {limit}",
params,
"travel_item_vehicle_resource",
"按 TravelItem 车辆资源直接查询车型、座位、图片和 price/unit 价格字段。",
)
if any(w in q for w in ("门票", "小交通", "电瓶车", "环保车", "观光车", "保险", "扶梯", "索道", "游船", "二消", "二次", "多少钱", "价格", "费用", "儿童价", "成人价")):
item_where = " OR ".join([
contains_clause("i", ("name", "type", "subtype", "category", "raw_evidence")),
contains_clause("a", ("name", "short_name", "city", "county")),
])
fee_focus_filter = ""
for focus in ("天星桥", "陡坡塘", "大瀑布", "黄果树", "小七孔", "西江", "梵净山", "镇远", "青岩"):
if focus in q:
fee_focus_filter = (
" AND (a.name CONTAINS '%s' OR a.short_name CONTAINS '%s' OR i.name CONTAINS '%s')"
) % (focus, focus, focus)
if focus == "天星桥":
break
break
return (
"MATCH (i:TravelItem) "
"OPTIONAL MATCH (a:ScenicAttraction)-[has:ATTRACTION_HAS_ITEM]->(i) "
"WHERE i.type IN ['Ticket','ScenicTransport','Insurance','Activity','Service','ScenicOptional'] "
f"AND ({item_where}){fee_focus_filter} "
f"RETURN a,has,i LIMIT {limit}",
params,
"travel_item_fee_binding",
"按 ScenicAttraction -> TravelItem 查询门票、小交通、保险和二次消费,价格来自 TravelItem price/adult_price/unit。",
)
if any(w in q for w in ("酒店", "住宿", "餐厅", "餐饮", "", "美食", "资源", "可选", "升级", "替换")):
type_filters: list[str] = []
if any(w in q for w in ("酒店", "住宿", "")):
type_filters.append("i.type = 'Hotel'")
if any(w in q for w in ("餐厅", "餐饮", "", "美食")):
type_filters.append("i.type = 'Restaurant'")
type_where = f"AND ({' OR '.join(type_filters)})" if type_filters else "AND i.type IN ['Hotel','Restaurant','Vehicle']"
for focus in ("黄果树", "小七孔", "西江", "梵净山", "镇远", "青岩", "天星桥", "荔波", "安顺", "贵阳"):
if focus in q and type_filters:
return (
"MATCH (a:ScenicAttraction)<-[serve:ITEM_SERVES_ATTRACTION]-(i:TravelItem) "
"WHERE (a.name CONTAINS '%s' OR a.aliases CONTAINS '%s') %s "
"OPTIONAL MATCH (i)-[ir:ITEM_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
"RETURN a,serve,i,ir,region LIMIT %d"
% (focus, focus, type_where, limit),
params,
"travel_item_scenic_resource_pool",
"按景区锚点直接查询可服务该景区的酒店/餐饮资源池,再带出固定路线槽位。",
)
item_where = " OR ".join([
contains_clause("s", ("name", "city_or_area", "route_line_name", "route_sequence_label")),
contains_clause("b", ("name", "binding_type", "status", "default_text", "notes")),
contains_clause("i", ("name", "type", "subtype", "region_name", "address", "features", "signature_dishes", "price_text")),
])
return (
"MATCH (s:RouteStop)-[sb:STOP_HAS_TRAVEL_ITEM]->(b:ItineraryResourceBinding)-[bi:BINDING_BINDS_ITEM]->(i:TravelItem) "
f"WHERE ({item_where}) {type_where} "
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
"OPTIONAL MATCH (i)-[ir:ITEM_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
f"RETURN s,sb,b,bi,i,sa,a,ir,region LIMIT {limit}",
params,
"travel_item_resource_binding",
"按固定路线停靠点查询可替换/升级资源,资源本身是 TravelItem包含/可选状态在绑定槽位上。",
)
if any(w in q for w in ("推荐", "线路", "行程", "路线", "几天", "", "", "纯玩", "产品")):
return (
"MATCH (p:TourProduct)-[pd:PRODUCT_HAS_DAY]->(d:ProductDay)-[ds:DAY_HAS_STOP]->(s:RouteStop) "
f"WHERE ({core_route_where}){duration_filter}{product_focus_filter} "
"OPTIONAL MATCH (s)-[next:ROUTE_STOP_NEXT]->(s2:RouteStop) "
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
"OPTIONAL MATCH (s)-[ss:STOP_VISITS_SUB_ATTRACTION]->(sub:SubAttraction) "
"OPTIONAL MATCH (s)-[sr:STOP_LOCATED_IN_REGION]->(region:AdministrativeRegion) "
f"RETURN p,pd,d,ds,s,next,s2,sa,a,ss,sub,sr,region LIMIT {limit}",
params,
"travel_item_fixed_route",
"按 TourProduct -> ProductDay -> RouteStop 的固定路线骨架查询,停靠点像公交站一样保留顺序。",
)
return (
"MATCH (p:TourProduct)-[pd:PRODUCT_HAS_DAY]->(d:ProductDay)-[ds:DAY_HAS_STOP]->(s:RouteStop) "
f"WHERE ({core_route_where}){duration_filter}{product_focus_filter} "
"OPTIONAL MATCH (s)-[sa:STOP_VISITS_ATTRACTION]->(a:ScenicAttraction) "
"OPTIONAL MATCH (s)-[ss:STOP_VISITS_SUB_ATTRACTION]->(sub:SubAttraction) "
f"RETURN p,pd,d,ds,s,sa,a,ss,sub LIMIT {limit}",
params,
"travel_item_lookup",
"按固定线路、停靠点、景区和子景点查询。",
)
def _rule_based_cypher(question: str, limit: int) -> tuple[str, dict[str, Any], str] | None:
q = question.strip()
if "花溪公园" not in q:
return None
traffic_words = ("公交", "地铁", "交通", "怎么去", "到达", "坐车", "")
knowledge_words = ("知识", "总览", "详情", "全部", "完整", "全貌")
event_words = ("历史", "事件", "沿革", "建园", "名人")
concept_words = ("概念", "主题", "文化", "夜游", "生态", "适合")
part_words = ("包含", "组成", "景点", "设施", "里面", "有哪些")
if any(w in q for w in traffic_words):
return (
"MATCH (p:Place {element_id:$eid})-[r:NEAR_TRANSIT]->(s:Place) "
"OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(s) "
f"RETURN p,r,s,b,r2 LIMIT {limit}",
{"eid": "amap:B035300A51"},
"rule_transit",
)
if any(w in q for w in knowledge_words) or ("事件" in q and "概念" in q):
return (
"MATCH (p:Place {element_id:$eid})-[r]->(m) "
"OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(m) "
f"RETURN p,r,m,b,r2 LIMIT {limit}",
{"eid": "amap:B035300A51"},
"rule_canonical_full_knowledge",
)
if any(w in q for w in event_words):
return (
"MATCH (p:Place {element_id:$eid})-[r:HAS_EVENT]->(e:Event) "
f"RETURN p,r,e LIMIT {limit}",
{"eid": "amap:B035300A51"},
"rule_event",
)
if any(w in q for w in concept_words):
return (
"MATCH (p:Place {element_id:$eid})-[r:HAS_CONCEPT]->(c:Concept) "
f"RETURN p,r,c LIMIT {limit}",
{"eid": "amap:B035300A51"},
"rule_concept",
)
if any(w in q for w in part_words):
return (
"MATCH (p:Place {element_id:$eid})-[r:HAS_PART]->(m) "
f"RETURN p,r,m LIMIT {limit}",
{"eid": "amap:B035300A51"},
"rule_part",
)
return (
"MATCH (p:Place {element_id:$eid})-[r]->(m) "
"OPTIONAL MATCH (b:BusLine)-[r2:STOPS_AT]->(m) "
f"RETURN p,r,m,b,r2 LIMIT {limit}",
{"eid": "amap:B035300A51"},
"rule_canonical_full_knowledge",
)
async def _nl_query_client() -> LlmClient | None:
cfg = await get_agent_settings()
global_cfg = cfg.get("global") or {}
if global_cfg.get("base_url") and global_cfg.get("api_key"):
return LlmClient(
global_cfg["base_url"],
global_cfg["api_key"],
global_cfg.get("model") or "deepseek-chat",
timeout=int(global_cfg.get("timeout") or 30),
max_tokens=1200,
)
extract = cfg.get("extract") or {}
models = extract.get("models") or {}
keys = [extract.get("aggregator")] + [
k for k, v in models.items() if v.get("enabled")
] + list(models.keys())
for key in keys:
model_cfg = models.get(key or "")
if model_cfg and model_cfg.get("base_url") and model_cfg.get("api_key"):
return LlmClient(
model_cfg["base_url"],
model_cfg["api_key"],
model_cfg.get("model") or "deepseek-chat",
timeout=int(extract.get("timeout") or 60),
max_tokens=1200,
)
return None
def _canon_id(x: Any) -> str:
"""Canonical id = FalkorDB internal node id.
Used for BOTH nodes and edge endpoints so they always reference the
same id space (edge.src_node / dest_node may be a Node or an int).
"""
nid = getattr(x, "id", None)
if nid is not None:
return str(nid)
return str(x)
def _place_visual_group(props: dict[str, Any]) -> str:
station_type = props.get("station_type")
if station_type:
return STATION_TYPE_LABELS.get(str(station_type), str(station_type))
place_type = props.get("place_type")
if place_type:
return PLACE_TYPE_LABELS.get(str(place_type), str(place_type))
category = props.get("category")
if category:
return str(category)
return "地点"
def _event_visual_group(props: dict[str, Any]) -> str:
event_type = props.get("event_type")
if event_type:
return str(event_type)
return "Event"
# Property keys to try (in order) as a node's display label
_NAME_KEYS = (
"display_name", "name", "line_name", "title", "label",
"natural_key", "product_id", "scenic_area_id", "attraction_id",
"ticket_fee_id", "fee_item_id", "template_id", "theme", "tag_id", "area_id", "place_id",
)
def _node_dict(n: Any) -> dict:
labels = getattr(n, "labels", None) or ["Node"]
props = getattr(n, "properties", None) or {}
nid = _canon_id(n)
group = str(labels[0]) if labels else "Node"
visual_group = group
if group == "Place":
visual_group = _place_visual_group(props)
elif group == "Event":
visual_group = _event_visual_group(props)
disp = ""
station_type = props.get("station_type")
place_type = props.get("place_type")
if props.get("name") and station_type:
disp = f"{props.get('name')}{station_type}"
for k in _NAME_KEYS:
if disp:
break
v = props.get(k)
if v not in (None, ""):
disp = str(v)
break
if not disp:
disp = f"{group} #{nid}" # never a bare number
return {
"id": nid,
"label": disp,
"group": visual_group,
"properties": {
k: str(v)[:2000 if k in {"photo_urls", "image_url", "image_urls", "cover_url"} else 200]
for k, v in props.items()
},
}
def _is_shadow_node(n: Any) -> bool:
props = getattr(n, "properties", None) or {}
value = props.get("shadow_node")
return value is True or value == 1 or str(value).lower() in {"1", "true", "yes"}
def _edge_dict(e: Any) -> dict:
src = _canon_id(e.src_node)
dst = _canon_id(e.dest_node)
eid = getattr(e, "id", None)
rel = str(getattr(e, "relation", "") or "")
props = getattr(e, "properties", None) or {}
return {
"id": str(eid) if eid is not None else f"{src}-{rel}-{dst}",
"from": src,
"to": dst,
"label": rel,
"properties": {k: str(v)[:200] for k, v in props.items()},
}
def _extract(rows: list) -> tuple[list, list]:
"""Walk result rows, pulling out nodes/edges from Node, Edge and Path objects."""
nodes: list = []
edges: list = []
seen_n: set = set()
seen_e: set = set()
def add_node(n: Any) -> None:
if _is_shadow_node(n):
return
nid = _canon_id(n)
if nid not in seen_n:
seen_n.add(nid)
nodes.append(_node_dict(n))
def add_edge(e: Any) -> None:
src = getattr(e, "src_node", None)
dst = getattr(e, "dest_node", None)
if _is_shadow_node(src) or _is_shadow_node(dst):
return
d = _edge_dict(e)
if d["id"] not in seen_e:
seen_e.add(d["id"])
edges.append(d)
stack = list(rows)
while stack:
item = stack.pop()
# Path — exposes nodes()/edges() methods
if (
hasattr(item, "nodes")
and callable(getattr(item, "nodes"))
and hasattr(item, "edges")
and callable(getattr(item, "edges"))
):
for n in item.nodes():
add_node(n)
for e in item.edges():
add_edge(e)
# Edge — has src_node / dest_node
elif hasattr(item, "src_node") and hasattr(item, "dest_node"):
add_edge(item)
# Node — has labels + properties
elif hasattr(item, "labels") and hasattr(item, "properties"):
add_node(item)
# Result row / collection — unpack and keep walking
elif isinstance(item, (list, tuple)):
stack.extend(item)
visible_ids = {n["id"] for n in nodes}
edges = [e for e in edges if e["from"] in visible_ids and e["to"] in visible_ids]
return nodes, edges
@router.get("/graph/overview")
def graph_overview(
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
try:
g = _get_graph(context.graph_name)
result = g.query(
"MATCH (n) WHERE n.shadow_node IS NULL OR n.shadow_node <> 1 RETURN count(n) AS cnt"
)
node_count = result.result_set[0][0] if result.result_set else 0
result = g.query(
"MATCH (a)-[r]->(b) "
"WHERE (a.shadow_node IS NULL OR a.shadow_node <> 1) "
"AND (b.shadow_node IS NULL OR b.shadow_node <> 1) "
"RETURN count(r) AS cnt"
)
rel_count = result.result_set[0][0] if result.result_set else 0
label_result = g.query(
"MATCH (n) WHERE n.shadow_node IS NULL OR n.shadow_node <> 1 "
"RETURN labels(n) AS labels, count(n) AS count LIMIT 100"
)
label_counts = [
{"label": r[0][0] if r[0] else "(no label)", "count": r[1]}
for r in label_result.result_set
]
visual_counts: dict[str, int] = {}
for item in label_counts:
label = item["label"]
if label in {"Place", "Event"}:
continue
visual_counts[label] = visual_counts.get(label, 0) + int(item["count"] or 0)
place_split_result = g.query(
"MATCH (p:Place) WHERE p.shadow_node IS NULL OR p.shadow_node <> 1 "
"RETURN p.place_type AS place_type, p.station_type AS station_type, "
"p.category AS category, count(p) AS count LIMIT 200"
)
for place_type, station_type, category, count in place_split_result.result_set:
label = _place_visual_group({
"place_type": place_type,
"station_type": station_type,
"category": category,
})
visual_counts[label] = visual_counts.get(label, 0) + int(count or 0)
event_split_result = g.query(
"MATCH (e:Event) WHERE e.shadow_node IS NULL OR e.shadow_node <> 1 "
"RETURN e.event_type AS event_type, count(e) AS count LIMIT 100"
)
for event_type, count in event_split_result.result_set:
label = _event_visual_group({"event_type": event_type})
visual_counts[label] = visual_counts.get(label, 0) + int(count or 0)
visual_label_counts = [
{"label": label, "count": count}
for label, count in sorted(visual_counts.items(), key=lambda x: x[1], reverse=True)
]
rel_result = g.query(
"MATCH (a)-[r]->(b) "
"WHERE (a.shadow_node IS NULL OR a.shadow_node <> 1) "
"AND (b.shadow_node IS NULL OR b.shadow_node <> 1) "
"RETURN type(r) AS type, count(r) AS count LIMIT 100"
)
rel_counts = [
{"type": r[0], "count": r[1]}
for r in rel_result.result_set
]
return {
"graph_name": context.graph_name,
"connected": True,
"node_count": node_count,
"relationship_count": rel_count,
"label_counts": label_counts,
"visual_label_counts": visual_label_counts,
"relationship_type_counts": rel_counts,
}
except Exception as e:
return {"connected": False, "error": str(e)}
@router.post("/graph/query")
def graph_query(
body: dict,
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
cypher = (body.get("cypher") or "").strip().rstrip(";")
limit = min(max(int(body.get("limit", 50)), 1), GRAPH_QUERY_MAX_LIMIT)
graph_name = str(body.get("graph_name") or context.graph_name)
try:
data = _execute_graph_query(cypher, limit, graph_name=graph_name)
data.update({"mode": "cypher"})
return data
except Exception as e:
raise HTTPException(400, f"Query failed: {str(e)[:300]}")
@router.post("/graph/nl-query")
async def graph_nl_query(
body: dict,
context: ProjectContext = Depends(get_project_context),
_user: CurrentUser = None,
):
started_at = time.perf_counter()
query = (body.get("query") or body.get("text") or "").strip().rstrip(";")
limit = min(max(int(body.get("limit", 100)), 1), GRAPH_QUERY_MAX_LIMIT)
graph_name = str(body.get("graph_name") or context.graph_name)
use_llm = body.get("use_llm") is True
def finish(data: dict) -> dict:
data.setdefault("trace", {})
data["trace"].update({
"latency_ms": max(1, round((time.perf_counter() - started_at) * 1000)),
"performance_target_ms": 1200,
"use_llm": use_llm,
})
data.setdefault("latency_ms", data["trace"]["latency_ms"])
return data
if not query:
raise HTTPException(400, "Query text required")
if READ_START.match(query):
try:
data = _execute_graph_query(query, limit, graph_name=graph_name)
data.update({"mode": "cypher", "input": query})
return finish(data)
except Exception as e:
raise HTTPException(400, f"Query failed: {str(e)[:300]}")
if _is_travel_item_graph_name(graph_name):
travel_item_ruled = _travel_item_rule_based_cypher(query, limit)
if travel_item_ruled:
cypher, params, rule_mode, reason = travel_item_ruled
try:
data = _execute_graph_query(cypher, limit, params=params, graph_name=graph_name)
data.update({
"mode": rule_mode,
"input": query,
"reason": reason,
})
return finish(data)
except Exception as e:
llm_error = str(e)[:180]
else:
llm_error = ""
travel_ruled = _travel_rule_based_cypher(query, limit)
if travel_ruled and ("travel" in graph_name.lower() or "旅行" in query or "旅游" in query):
cypher, params, rule_mode, reason = travel_ruled
try:
data = _execute_graph_query(cypher, limit, params=params, graph_name=graph_name)
data.update({
"mode": rule_mode,
"input": query,
"reason": reason,
})
return finish(data)
except Exception as e:
llm_error = str(e)[:180]
else:
llm_error = ""
ruled = _rule_based_cypher(query, limit)
if ruled:
cypher, params, rule_mode = ruled
try:
data = _execute_graph_query(cypher, limit, params=params, graph_name=graph_name)
data.update({
"mode": rule_mode,
"input": query,
"reason": "命中已对齐的高德景区主节点,避免同名公交/地铁站被当成第二个公园。",
})
return finish(data)
except Exception as e:
# Fall through to LLM/fallback if the deterministic rule fails.
llm_error = str(e)[:180]
else:
llm_error = ""
llm_error = ""
generated_cypher = ""
if use_llm:
try:
client = await _nl_query_client()
if client is None:
raise RuntimeError("LLM 未配置")
payload = {
"user_question": query,
"graph_schema": _graph_schema_hint(graph_name),
}
decided = client.chat_json(NL_TO_CYPHER_SYS, json.dumps(payload, ensure_ascii=False))
generated_cypher = str(decided.get("cypher") or "").strip().rstrip(";")
generated_cypher = _ensure_limit(generated_cypher, limit)
_assert_read_only(generated_cypher)
data = _execute_graph_query(generated_cypher, limit, graph_name=graph_name)
if data["nodes"]:
data.update({
"mode": "llm_cypher",
"input": query,
"reason": decided.get("reason") or "",
})
return finish(data)
llm_error = "LLM Cypher 查询无结果,已转文本召回"
except Exception as e: # noqa: BLE001
llm_error = str(e)[:180]
else:
llm_error = "未启用 LLM直接走文本召回快路径"
try:
fallback, params = _fallback_cypher(query, limit)
data = _execute_graph_query(fallback, limit, params=params, graph_name=graph_name)
data.update({
"mode": "text_recall",
"input": query,
"generated_cypher": generated_cypher,
"fallback_reason": llm_error,
})
return finish(data)
except Exception as e:
raise HTTPException(400, f"NL query failed: {str(e)[:300]}")