Files
bxh/app/agents/super_orchestrator.py

857 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Super Agent — 知识图谱馆长(全城网格自治采集,常驻不停)。
科学合理 & 省额度的核心:**地理网格扫描**,不再用"热度关键词"
• 把贵阳整个城市切成网格,逐格用高德"多边形(矩形)搜索 + 官方类型编码"
系统性扫,每格每页都是不同地理切片 → 几乎不重复 → 不浪费 API 额度。
• 每格扫到第几页**持久化**gaode_grid_cells.next_page跨步/跨轮接着扫,
扫尽的格标记 exhausted **永不重复请求**(省额度的关键)。
• 稠密区一格出不完 → **自适应四叉细分**,保证不漏。
• 全城扫完不"结束",转**驻守巡检**(不再消耗额度,仅响应停止/再播种)。
• 进度=网格地理覆盖率,真实可反映,非拍脑袋数字。
"""
from __future__ import annotations
import asyncio
import re
from app.api.graph import _get_graph
from app.config import settings
from app.agents.gaode_connector import AMAP_TYPECODES, search_polygon
from app.agents.super_ingest import ingest_rows
from app.agents.distill_gate import distill_entity, ATTR_FIELDS
from app.agents.web_agent import web_enrich
from app.agents.xhs_agent import xhs_enrich
from app.agents.douyin_agent import douyin_enrich
from app.agents.event_miner import mine_events
from app.db import (
sa_append_step, sa_finish, sa_stop_requested, sa_add_task, sa_set_status,
sa_has_open_escalation, create_acquisition_task, create_notification,
get_admin_user_id, grid_seed, grid_counts, grid_pending_cats,
grid_take_next, grid_update, grid_subdivide,
sa_merge_candidate_payload, sa_record_conflict, sa_record_schema_proposal,
)
_BG: set = set()
# 贵阳整市外接框 (min_lng, min_lat, max_lng, max_lat):含主城+周边区县
_GY_BBOX = (106.20, 26.10, 107.30, 27.05)
_CELL = 0.08 # 初始网格边长(°),约 8km
_OFFSET = 25 # 高德多边形单页上限
_MAX_PAGE_SPLIT = 8 # 单格翻到第N页仍满 → 太密 → 四叉细分
_MAX_DEPTH = 3 # 最深细分层级0.08→约1km格
_API_PACING = 0.35 # 每次高德调用间隔(秒),护 QPS
_HARD_MAX_STEPS = 100000 # 仅极端兜底(持久化,跨轮续扫,不暴露用户)
_MAX_API_PER_RUN = 6000 # 单进程跑安全上限;命中转驻守,下轮自动续
_STEWARD_INTERVAL = 1800
_STEWARD_TICK = 10
_ENRICH_BATCH = 5 # 每步最多蒸馏富集多少个实体
_ENRICH_EVERY = 4 # 网格忙时每 N 步插一次知识富集
_WEB_BATCH = 2 # 每步最多联网富集多少个实体(真浏览器较慢)
_XHS_BATCH = 1 # 小红书浏览器很慢/脆,每步只采 1 个
def _place_counts() -> dict:
g = _get_graph()
out: dict = {}
try:
for row in g.query(
"MATCH (p:Place) RETURN coalesce(p.place_type,'?'), count(*)"
).result_set:
out[row[0]] = row[1]
except Exception:
pass
return out
def _coverage() -> dict:
from app.agents.super_ingest import _PT
counts = _place_counts()
cat2pt = {cat: _PT.get(cat, "poi") for cat in AMAP_TYPECODES}
items = [{"cat": cat, "place_type": cat2pt[cat],
"current": int(counts.get(cat2pt[cat], 0))}
for cat in AMAP_TYPECODES]
return {"items": items, "total": sum(i["current"] for i in items)}
def _grid_for(bbox: tuple, step: float) -> list[tuple]:
mnlng, mnlat, mxlng, mxlat = bbox
cells, lng = [], mnlng
while lng < mxlng:
lat = mnlat
nlng = round(min(lng + step, mxlng), 6)
while lat < mxlat:
nlat = round(min(lat + step, mxlat), 6)
cells.append((round(lng, 6), round(lat, 6), nlng, nlat))
lat = nlat
lng = nlng
return cells
def _quads(c: dict) -> list[tuple]:
"""父格四等分为 4 个子格(矩形对半切)。"""
mnlng, mnlat = c["min_lng"], c["min_lat"]
mxlng, mxlat = c["max_lng"], c["max_lat"]
midlng = round((mnlng + mxlng) / 2, 6)
midlat = round((mnlat + mxlat) / 2, 6)
return [
(mnlng, mnlat, midlng, midlat), (midlng, mnlat, mxlng, midlat),
(mnlng, midlat, midlng, mxlat), (midlng, midlat, mxlng, mxlat),
]
async def _seed_if_needed() -> None:
have = await grid_counts()
for cat, tc in AMAP_TYPECODES.items():
if cat not in have:
await grid_seed(cat, tc, _grid_for(_GY_BBOX, _CELL))
async def _escalate(run_id: int, step: int, cat: str, cur: int) -> None:
if await sa_has_open_escalation(cat):
await sa_add_task(run_id, step, cat, "escalate",
"全城网格已扫尽,该类仍偏少", "skip_dup",
status="escalated", note="已有未结工单,不重复打扰")
return
task_id = None
try:
task = await create_acquisition_task({
"tenant_id": settings.default_tenant,
"project_id": settings.default_project,
"created_by": "super_agent",
"title": f"【Super Agent 求助】「{cat}」全城网格已扫尽仍偏少",
"description": (
f"馆长已对贵阳全城做网格化高德采集,「{cat}」当前仅 {cur} 条,"
f"高德官方源对该类覆盖有限。建议人工核查类型编码或改用"
f"小红书/大众点评/官方名录等渠道补全。馆长继续驻守,"
f"工单结清/数据增长后自动恢复。"),
"scenario_tags": ["super_agent", "escalation", cat],
"target_entity_types": ["Place"],
"target_fields": [],
"suggested_collection_method": "manual_or_alt_source",
"priority": 1,
})
task_id = task["id"]
except Exception:
task_id = None
try:
uid = await get_admin_user_id()
if uid:
await create_notification(
uid, title=f"Super Agent 求助:「{cat}」全城网格已扫尽",
body=(f"{cat}」仅 {cur} 条,高德源覆盖有限。已开工单"
+ (f" #{task_id}" if task_id else "")
+ ",请人工/改渠道;馆长继续驻守。"),
ntype="task", related_task_id=task_id)
except Exception:
pass
await sa_add_task(run_id, step, cat, "escalate",
"全城网格已扫尽,该类仍偏少", "notify_admin",
status="escalated", related_task_id=task_id)
await sa_append_step(run_id, {
"step": step, "action": "escalate", "cat": cat,
"reason": f"{cat}」全城网格扫尽仅{cur}条,已开工单并通知管理员,继续驻守"})
def _enrich_targets(limit: int) -> list[dict]:
"""未富集过的 Place + 锚点(名/址/区/类) + 现有软字段。
取数显式排除 经纬度/电话(隐私红线,绝不外发给蒸馏模型)。
"""
g = _get_graph()
try:
rs = g.query(
"MATCH (p:Place) WHERE p.enrich_done IS NULL "
"RETURN p.element_id, p.name, coalesce(p.place_type,''), "
"coalesce(p.district,''), coalesce(p.address,''), "
"coalesce(p.summary,''), coalesce(p.history,''), "
"coalesce(p.features,''), coalesce(p.suitable_for,''), "
"coalesce(p.best_season,''), coalesce(p.ticket_hint,'') "
"LIMIT $n", {"n": limit}).result_set
except Exception:
return []
out = []
for r in rs:
if not (r and r[1]):
continue
existing = {k: v for k, v in zip(
ATTR_FIELDS, [r[5], r[6], r[7], r[8], r[9], r[10]]) if v}
out.append({"eid": r[0], "name": r[1], "place_type": r[2],
"district": r[3], "address": r[4],
"existing": existing})
return out
def _apply_enrich(eid: str, fields: dict) -> None:
"""共识字段写回 FalkorDB 节点;无论是否有字段都打 enrich_done 防重复空跑。"""
g = _get_graph()
sets = ["p.enrich_done=1"]
params = {"eid": eid}
for k in ATTR_FIELDS:
if fields.get(k):
sets.append(f"p.{k}=${k}")
params[k] = fields[k]
try:
g.query(f"MATCH (p:Place {{element_id:$eid}}) SET {','.join(sets)}",
params)
except Exception:
pass
async def _distill_enrich(run_id: int, step: int,
targets: list[dict]) -> bool:
"""工具:多模型知识蒸馏,给已有实体补"知识层"属性。
独立数据来源(不是高德质检闸门):问 N 个模型脑内知识 → 全局模型聚合
跨模型共识 → 写回 FalkorDB 节点 + 候选 payload审计可溯
"""
enriched = adopt_fields = keep_total = conflict_total = uncertain_total = 0
conflict_names: list[str] = []
last = ""
for t in targets:
res = await distill_entity(t)
last = res.get("summary", "")
if not res.get("ok"):
# 配置/连通问题:整批中止——不打标记(待配置后重试)、不空转
await sa_add_task(
run_id, step, "蒸馏", "distill",
f"知识蒸馏未就绪:{last}", "distill_enrich",
result={"fetched": 0, "approved": 0, "pending": 0,
"skipped": 0}, status="skipped", note=last)
await sa_append_step(run_id, {
"step": step, "action": "distill",
"reason": f"知识蒸馏未配置/不可用({last}),本步跳过,"
f"配好≥2蒸馏模型+全局聚合后自动恢复"})
return False
adopt = res.get("adopt") or {}
_apply_enrich(t["eid"], adopt) # 只写 adopt无则仅打标记防空转
if adopt:
await sa_merge_candidate_payload(t["eid"], adopt)
enriched += 1
adopt_fields += len(adopt)
keep_total += len(res.get("keep") or [])
uncertain_total += len(res.get("uncertain") or [])
# 与图谱既有值矛盾 → 不覆盖,落 validation_issues 转人工
for c in (res.get("conflict") or []):
await sa_record_conflict(
t["eid"], c.get("field", ""), c.get("existing", ""),
c.get("distilled", ""), c.get("note", ""))
conflict_total += 1
if t.get("name"):
conflict_names.append(f"{t['name']}·{c.get('field','')}")
res_obj = {"fetched": len(targets), "approved": adopt_fields,
"pending": conflict_total, "skipped": uncertain_total}
await sa_add_task(
run_id, step, "蒸馏", "distill",
f"多模型知识蒸馏富集 {len(targets)} 个实体({last}",
"distill_enrich", result=res_obj, status="done",
note=f"富集{enriched}实体/{adopt_fields}字段·一致{keep_total}"
f"·矛盾{conflict_total}·存疑{uncertain_total}")
await sa_append_step(run_id, {
"step": step, "action": "distill",
"reason": f"知识蒸馏富集:{enriched}/{len(targets)} 实体补 {adopt_fields} 字段,"
f"矛盾 {conflict_total} 转人工({last}"})
if conflict_total:
try:
uid = await get_admin_user_id()
if uid:
await create_notification(
uid,
title=f"蒸馏发现 {conflict_total} 处与图谱既有数据矛盾",
body=("需人工裁决(蒸馏未覆盖图谱):"
+ "".join(conflict_names[:8])
+ ("" if len(conflict_names) > 8 else "")
+ "。详见 数据质量 / 校验问题(distill_conflict)。"),
ntype="task")
except Exception:
pass
return True
def _web_targets(limit: int) -> list[dict]:
"""未联网富集过的 Place无 web_done+ 锚点 + 现有软字段。
取数显式排除 经纬度/电话(隐私红线)。"""
g = _get_graph()
try:
# 景点最可能有公开百科页 → 优先联网富集,命中率最高
rs = g.query(
"MATCH (p:Place) WHERE p.web_done IS NULL "
"RETURN p.element_id, p.name, coalesce(p.place_type,''), "
"coalesce(p.district,''), coalesce(p.address,''), "
"coalesce(p.summary,''), coalesce(p.history,''), "
"coalesce(p.features,''), coalesce(p.suitable_for,''), "
"coalesce(p.best_season,''), coalesce(p.ticket_hint,'') "
"ORDER BY CASE WHEN p.place_type='sight' THEN 0 ELSE 1 END, "
"p.element_id LIMIT $n", {"n": limit}).result_set
except Exception:
return []
out = []
for r in rs:
if not (r and r[1]):
continue
existing = {k: v for k, v in zip(
ATTR_FIELDS, [r[5], r[6], r[7], r[8], r[9], r[10]]) if v}
out.append({"eid": r[0], "name": r[1], "place_type": r[2],
"district": r[3], "address": r[4], "existing": existing})
return out
def _apply_web(eid: str, adopt: dict) -> None:
"""网页采纳字段写回;打 web_done(必);有 adopt 则连 enrich_done 一并打
(网页权威,省一次记忆蒸馏)。"""
g = _get_graph()
sets = ["p.web_done=1"]
params = {"eid": eid}
if adopt:
sets.append("p.enrich_done=1")
for k in ATTR_FIELDS:
if adopt.get(k):
sets.append(f"p.{k}=${k}")
params[k] = adopt[k]
try:
g.query(f"MATCH (p:Place {{element_id:$eid}}) SET {','.join(sets)}",
params)
except Exception:
pass
async def _web_enrich(run_id: int, step: int, targets: list[dict]) -> bool:
"""工具browser-use 式联网采集(真浏览器抓权威页 → opus 抽取对齐)。"""
enriched = adopt_fields = found = conflict_total = gap_total = 0
last = ""
for t in targets:
r = await web_enrich(t)
last = r.get("summary", "")
if not r.get("ok"):
await sa_add_task(run_id, step, "联网", "web",
f"web_agent 未就绪:{last}", "web_agent",
result={"fetched": 0, "approved": 0,
"pending": 0, "skipped": 0},
status="skipped", note=last)
await sa_append_step(run_id, {
"step": step, "action": "web",
"reason": f"web_agent 未配置/不可用({last}),跳过待恢复"})
return False
if not r.get("found") or r.get("entity_match") is False:
_apply_web(t["eid"], {}) # 标记 web_done不再重复抓
continue
found += 1
adopt = r.get("adopt") or {}
_apply_web(t["eid"], adopt)
if adopt:
await sa_merge_candidate_payload(t["eid"], adopt)
enriched += 1
adopt_fields += len(adopt)
for c in (r.get("conflict") or []):
await sa_record_conflict(t["eid"], c.get("field", ""),
c.get("existing", ""),
c.get("web", ""), c.get("note", ""))
conflict_total += 1
for sgap in (r.get("schema_gaps") or []):
iid = await sa_record_schema_proposal(
sgap.get("attr", ""), sgap.get("field", "")
or re.sub(r"\W+", "_", sgap.get("attr", "")).strip("_"),
str(sgap.get("value", ""))[:200],
f"web_agent 在「{t['name']}」网页发现:{sgap.get('why','')}",
float(r.get("confidence") or 0.7))
if iid:
gap_total += 1
res_obj = {"fetched": len(targets), "approved": adopt_fields,
"pending": conflict_total, "skipped": gap_total}
await sa_add_task(
run_id, step, "联网", "web",
f"browser-use 联网采集 {len(targets)} 个实体({last}",
"web_agent", result=res_obj, status="done",
note=f"命中{found}·补{adopt_fields}字段·矛盾{conflict_total}"
f"·schema提案{gap_total}")
await sa_append_step(run_id, {
"step": step, "action": "web",
"reason": f"联网采集:命中 {found}/{len(targets)},补 {adopt_fields} 字段,"
f"矛盾 {conflict_total} 转人工schema 提案 {gap_total}{last}"})
if conflict_total or gap_total:
try:
uid = await get_admin_user_id()
if uid:
await create_notification(
uid,
title=f"web_agent{conflict_total} 矛盾 / {gap_total} schema 提案待裁决",
body="联网采集与图谱矛盾或发现新属性,详见 数据质量/校验问题 "
"与 本体建模/字段提案。",
ntype="task")
except Exception:
pass
return True
def _xhs_targets(limit: int) -> list[dict]:
"""未采过小红书(无 xhs_done)的 Place美食最优先(小红书食/玩为主)。"""
g = _get_graph()
try:
rs = g.query(
"MATCH (p:Place) WHERE p.xhs_done IS NULL "
"RETURN p.element_id, p.name, coalesce(p.place_type,''), "
"coalesce(p.district,'') "
"ORDER BY CASE WHEN p.place_type='eat' THEN 0 "
"WHEN p.place_type='sight' THEN 1 ELSE 2 END, p.element_id "
"LIMIT $n", {"n": limit}).result_set
except Exception:
return []
return [{"eid": r[0], "name": r[1], "place_type": r[2], "district": r[3]}
for r in rs if r and r[1]]
def _apply_xhs(eid: str, tags: list[str]) -> None:
"""体验标签写回MERGE ExperienceTag + (Place)-[:HAS_TAG]->(tag)
无论有无标签都打 xhs_done 防重复。"""
g = _get_graph()
try:
g.query("MATCH (p:Place {element_id:$eid}) SET p.xhs_done=1",
{"eid": eid})
for t in tags:
g.query(
"MATCH (p:Place {element_id:$eid}) "
"MERGE (e:ExperienceTag {name:$t}) "
"MERGE (p)-[:HAS_TAG]->(e) SET e.source='xiaohongshu'",
{"eid": eid, "t": t})
except Exception:
pass
async def _xhs_enrich(run_id: int, step: int, targets: list[dict]) -> bool:
"""工具:小红书 UGC → 体验标签。未登录→升级人工(一次性登录),本轮关闭。"""
done = tags_total = found = ev_total = 0
last = ""
for t in targets:
r = await xhs_enrich(t)
last = r.get("summary", "")
if not r.get("ok"):
await sa_add_task(run_id, step, "小红书", "xhs",
f"xhs_agent 未就绪:{last}", "xhs_agent",
result={"fetched": 0, "approved": 0,
"pending": 0, "skipped": 0},
status="skipped", note=last)
await sa_append_step(run_id, {"step": step, "action": "xhs",
"reason": f"小红书未配置/停用({last}"})
return False
if r.get("need_login"):
if not await sa_has_open_escalation("小红书登录"):
try:
task = await create_acquisition_task({
"tenant_id": settings.default_tenant,
"project_id": settings.default_project,
"created_by": "super_agent",
"title": "【Super Agent 求助】小红书需一次性人工登录",
"description": "后台 xhs_agent 检测到未登录。请在项目根目录运行 "
"`python3 scripts/xhs_login.py`,弹出浏览器里登录"
"小红书一次cookie 持久化后馆长自动恢复采集。",
"scenario_tags": ["super_agent", "escalation", "小红书登录"],
"target_entity_types": ["Place"], "target_fields": [],
"suggested_collection_method": "manual_login",
"priority": 1})
uid = await get_admin_user_id()
if uid:
await create_notification(
uid, title="小红书需一次性登录",
body="运行 scripts/xhs_login.py 登录一次即可,馆长自动恢复。",
ntype="task", related_task_id=task["id"])
except Exception:
pass
await sa_add_task(run_id, step, "小红书", "xhs",
"小红书未登录,已升级人工一次性登录", "notify_admin",
result={"fetched": 0, "approved": 0,
"pending": 0, "skipped": 0},
status="escalated", note="待 scripts/xhs_login.py")
await sa_append_step(run_id, {
"step": step, "action": "xhs",
"reason": "小红书未登录 → 已开工单+通知(运行 xhs_login.py),本轮暂停小红书"})
return False
ev_total += int(r.get("evidence_saved") or 0)
if r.get("found"):
_apply_xhs(t["eid"], r.get("tags") or [])
found += 1
if r.get("tags"):
done += 1
tags_total += len(r["tags"])
else:
_apply_xhs(t["eid"], []) # 标记,避免重复
res_obj = {"fetched": len(targets), "approved": tags_total,
"pending": ev_total, "skipped": len(targets) - found}
await sa_add_task(run_id, step, "小红书", "xhs",
f"小红书 UGC 采集 {len(targets)} 个实体({last}",
"xhs_agent", result=res_obj, status="done",
note=f"命中{found}·证据{ev_total}条·体验标签{tags_total}")
await sa_append_step(run_id, {
"step": step, "action": "xhs",
"reason": f"小红书采集:命中 {found}/{len(targets)},证据入库 {ev_total} 条,"
f"产出 {tags_total} 个体验标签({last}"})
return True
def _dy_targets(limit: int) -> list[dict]:
"""未采过抖音(无 dy_done)的 Place美食/景点优先(抖音食玩为主)。"""
g = _get_graph()
try:
rs = g.query(
"MATCH (p:Place) WHERE p.dy_done IS NULL "
"RETURN p.element_id, p.name, coalesce(p.place_type,''), "
"coalesce(p.district,'') "
"ORDER BY CASE WHEN p.place_type='eat' THEN 0 "
"WHEN p.place_type='sight' THEN 1 ELSE 2 END, p.element_id "
"LIMIT $n", {"n": limit}).result_set
except Exception:
return []
return [{"eid": r[0], "name": r[1], "place_type": r[2], "district": r[3]}
for r in rs if r and r[1]]
def _apply_dy(eid: str, tags: list[str]) -> None:
g = _get_graph()
try:
g.query("MATCH (p:Place {element_id:$eid}) SET p.dy_done=1",
{"eid": eid})
for t in tags:
g.query(
"MATCH (p:Place {element_id:$eid}) "
"MERGE (e:ExperienceTag {name:$t}) "
"MERGE (p)-[:HAS_TAG]->(e) SET e.source='douyin'",
{"eid": eid, "t": t})
except Exception:
pass
async def _douyin_enrich(run_id: int, step: int,
targets: list[dict]) -> bool:
"""工具:抖音 UGC → 证据层+体验标签。未登录→升级一次性登录,本轮关闭。"""
done = tags_total = found = ev_total = 0
last = ""
for t in targets:
r = await douyin_enrich(t)
last = r.get("summary", "")
if not r.get("ok"):
await sa_add_task(run_id, step, "抖音", "douyin",
f"douyin_agent 未就绪:{last}", "douyin_agent",
result={"fetched": 0, "approved": 0,
"pending": 0, "skipped": 0},
status="skipped", note=last)
await sa_append_step(run_id, {"step": step, "action": "douyin",
"reason": f"抖音未配置/停用({last}"})
return False
if r.get("need_login"):
if not await sa_has_open_escalation("抖音登录"):
try:
task = await create_acquisition_task({
"tenant_id": settings.default_tenant,
"project_id": settings.default_project,
"created_by": "super_agent",
"title": "【Super Agent 求助】抖音需一次性人工登录",
"description": "后台 douyin_agent 检测到未登录。请在项目根目录"
"运行 `python3 scripts/douyin_login.py`,弹出"
"浏览器里登录抖音一次cookie 持久化后自动恢复。",
"scenario_tags": ["super_agent", "escalation",
"抖音登录"],
"target_entity_types": ["Place"], "target_fields": [],
"suggested_collection_method": "manual_login",
"priority": 1})
uid = await get_admin_user_id()
if uid:
await create_notification(
uid, title="抖音需一次性登录",
body="运行 scripts/douyin_login.py 登录一次即可,"
"馆长自动恢复。",
ntype="task", related_task_id=task["id"])
except Exception:
pass
await sa_add_task(run_id, step, "抖音", "douyin",
"抖音未登录,已升级人工一次性登录", "notify_admin",
result={"fetched": 0, "approved": 0,
"pending": 0, "skipped": 0},
status="escalated",
note="待 scripts/douyin_login.py")
await sa_append_step(run_id, {
"step": step, "action": "douyin",
"reason": "抖音未登录 → 已开工单+通知(运行 douyin_login.py)"
"本轮暂停抖音"})
return False
ev_total += int(r.get("evidence_saved") or 0)
if r.get("found"):
_apply_dy(t["eid"], r.get("tags") or [])
found += 1
if r.get("tags"):
done += 1
tags_total += len(r["tags"])
else:
_apply_dy(t["eid"], [])
res_obj = {"fetched": len(targets), "approved": tags_total,
"pending": ev_total, "skipped": len(targets) - found}
await sa_add_task(run_id, step, "抖音", "douyin",
f"抖音 UGC 采集 {len(targets)} 个实体({last}",
"douyin_agent", result=res_obj, status="done",
note=f"命中{found}·证据{ev_total}条·体验标签{tags_total}")
await sa_append_step(run_id, {
"step": step, "action": "douyin",
"reason": f"抖音采集:命中 {found}/{len(targets)},证据入库 {ev_total} 条,"
f"产出 {tags_total} 个体验标签({last}"})
return True
def _events_targets(limit: int) -> list[dict]:
"""已采过任何源(web/xhs/dy) 但还没抽过事件(无 events_done)的 Place。
平台无关:只要 social_evidence 里有该 pnk 的证据(任意 platform)
都可以挖事件,不再硬要求 xhs_done。
"""
g = _get_graph()
try:
rs = g.query(
"MATCH (p:Place) "
"WHERE (p.web_done=1 OR p.xhs_done=1 OR p.dy_done=1) "
" AND p.events_done IS NULL "
"RETURN p.element_id, p.name LIMIT $n", {"n": limit}).result_set
except Exception:
return []
return [{"eid": r[0], "name": r[1]} for r in rs if r and r[1]]
def _apply_events(eid: str, events: list[dict]) -> None:
"""事件写回MERGE Event + (Place)-[:HAS_EVENT{time,type}]->(Event)。
- source 不再硬编 'xiaohongshu',按事件实际来源(baike/wiki/xhs/douyin)写
- 加 time_norm(排序用) / participants(涉及人物) / confidence(置信度)
- MERGE 用 (place,title) 幂等,同名事件不会重复
- 无论有无都打 events_done 防重复整批
"""
g = _get_graph()
try:
g.query("MATCH (p:Place {element_id:$eid}) SET p.events_done=1",
{"eid": eid})
for e in events:
g.query(
"MATCH (p:Place {element_id:$eid}) "
"MERGE (ev:Event {place:$eid, title:$t}) "
"SET ev.time=$tm, ev.time_norm=$tn, ev.type=$ty, "
" ev.desc=$d, ev.source=$src, ev.participants=$ppl, "
" ev.confidence=$conf "
"MERGE (p)-[:HAS_EVENT {time:$tn, type:$ty}]->(ev)",
{"eid": eid, "t": e.get("title", ""),
"tm": e.get("time", ""),
"tn": e.get("time_norm", "") or e.get("time", ""),
"ty": e.get("type", ""), "d": e.get("desc", ""),
"src": e.get("source_platform", "") or "mixed",
"ppl": ",".join(e.get("participants") or []),
"conf": float(e.get("confidence") or 0)})
except Exception:
pass
async def _event_mine(run_id: int, step: int, targets: list[dict]) -> bool:
"""工具:从证据层评论/帖子抽时间锚定事件(纯 LLM不爬网)。"""
done = ev_total = 0
last = ""
for t in targets:
r = await mine_events(t)
last = r.get("summary", "")
if not r.get("ok"):
await sa_add_task(run_id, step, "事件", "event",
f"event_miner 未就绪:{last}", "event_miner",
result={"fetched": 0, "approved": 0,
"pending": 0, "skipped": 0},
status="skipped", note=last)
await sa_append_step(run_id, {"step": step, "action": "event",
"reason": f"事件抽取未配置({last}"})
return False
evs = r.get("events") or []
_apply_events(t["eid"], evs)
if evs:
done += 1
ev_total += len(evs)
res_obj = {"fetched": len(targets), "approved": ev_total,
"pending": 0, "skipped": len(targets) - done}
await sa_add_task(run_id, step, "事件", "event",
f"评论时间→事件抽取 {len(targets)} 个实体({last}",
"event_miner", result=res_obj, status="done",
note=f"命中{done}·事件{ev_total}")
await sa_append_step(run_id, {
"step": step, "action": "event",
"reason": f"事件抽取:{done}/{len(targets)} 实体,"
f"产出 {ev_total} 个时间锚定事件({last}"})
return True
async def run_super_agent(run_id: int) -> None:
await _seed_if_needed()
step = 0
api_calls = 0
enrich_off = False # 记忆蒸馏未配置/不可用时本轮关闭
web_off = False # web_agent 未就绪时本轮关闭,避免空转
xhs_off = False # 小红书未配置/未登录时本轮关闭
dy_off = False # 抖音未配置/未登录时本轮关闭
event_off = False # 事件抽取未配置时本轮关闭
enrich_turn = 0 # 富集源轮转(web/xhs/douyin/event/distill 公平)
escalated: set[str] = set()
async def _wait_steward() -> bool:
waited = 0
while waited < _STEWARD_INTERVAL:
if await sa_stop_requested(run_id):
return True
await asyncio.sleep(_STEWARD_TICK)
waited += _STEWARD_TICK
return False
try:
while True:
if await sa_stop_requested(run_id):
await sa_finish(run_id, "stopped")
return
# ── 指挥大脑选工具 ───────────────────────────────────────
# 高德网格法 = 主力快采(最快、结构化、带坐标);
# 多模型蒸馏 = 知识富集(补高德给不了的知识层,独立来源)。
# 网格忙时每 _ENRICH_EVERY 步插一次富集;网格扫完后全力富集。
pend = await grid_pending_cats()
grid_busy = (bool(pend) and api_calls < _MAX_API_PER_RUN
and step < _HARD_MAX_STEPS)
# 富集插槽web/小红书/抖音/事件/记忆蒸馏 **轮转调度**
# (避免某源目标海量把其它源饿死,多源公平持续推进)
if ((not web_off or not xhs_off or not dy_off or not event_off
or not enrich_off)
and ((not grid_busy)
or (step > 0 and step % _ENRICH_EVERY == 0))):
pool = ([("web", _web_targets, _WEB_BATCH, _web_enrich)]
if not web_off else [])
pool += ([("xhs", _xhs_targets, _XHS_BATCH, _xhs_enrich)]
if not xhs_off else [])
pool += ([("douyin", _dy_targets, 1, _douyin_enrich)]
if not dy_off else [])
pool += ([("event", _events_targets, 3, _event_mine)]
if not event_off else [])
pool += ([("distill", _enrich_targets, _ENRICH_BATCH,
_distill_enrich)] if not enrich_off else [])
hit = False
for i in range(len(pool)):
name, tfn, batch, efn = pool[(enrich_turn + i) % len(pool)]
tg = await asyncio.to_thread(tfn, batch)
if not tg:
continue
enrich_turn += 1
step += 1
ok = await efn(run_id, step, tg)
if not ok: # 未就绪 → 本轮关该源
if name == "web":
web_off = True
elif name == "xhs":
xhs_off = True
elif name == "douyin":
dy_off = True
elif name == "event":
event_off = True
else:
enrich_off = True
break
hit = True
break
if hit:
await asyncio.sleep(_API_PACING)
continue
if not grid_busy:
# 网格扫完且无可富集 → 驻守(不烧额度,下轮自动续,可一键停止)
reason = ("全城网格已扫完、知识富集也已补全,进入驻守巡检"
"(不再消耗额度,新数据/工单结清后自动恢复,可停止)"
if not pend else
"达单进程安全上限,转驻守;下次启动自动从断点续扫")
await sa_set_status(run_id, "stewarding")
await sa_append_step(run_id, {"step": step,
"action": "steward",
"reason": reason})
if await _wait_steward():
await sa_finish(run_id, "stopped")
return
api_calls = 0 # 新驻守周期重置进程内计数
continue
# 聚焦:仍有待扫格的类里,挑当前藏品最少的(均衡推进)
cov = {i["cat"]: i["current"] for i in _coverage()["items"]}
focus = min(pend, key=lambda c: cov.get(c, 0))
cell = await grid_take_next(focus)
if not cell:
continue
tc = cell["typecode"]
page = cell["next_page"]
bbox = (cell["min_lng"], cell["min_lat"],
cell["max_lng"], cell["max_lat"])
step += 1
try:
rows, raw = await asyncio.to_thread(
search_polygon, tc, bbox, page, _OFFSET)
except Exception as e:
rows, raw = [], -1
err = str(e)[:140]
else:
err = None
api_calls += 1
res = await ingest_rows(rows, focus, focus) if rows else \
{"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}
if err:
res["error"] = err
area = (f"经[{bbox[0]:.3f},{bbox[2]:.3f}] "
f"纬[{bbox[1]:.3f},{bbox[3]:.3f}]")
# 状态机:扫尽 / 翻页 / 自适应细分(密集区 8 页仍满→四叉细分,防 POI 丢)
if err:
# 单格请求出错:不前进、不标尽,留待后续重试(不浪费已得)
nstatus, npage, note = "pending", page, f"请求失败重试 {err}"
elif raw <= 0 or raw < _OFFSET:
nstatus, npage, note = "exhausted", page, f"网格扫尽(本页{raw}条)"
elif page >= _MAX_PAGE_SPLIT and cell["depth"] < _MAX_DEPTH:
await grid_subdivide(cell["id"], focus, tc,
_quads(cell), cell["depth"] + 1)
nstatus, npage, note = None, page, "稠密→四叉细分深扫"
elif page >= _MAX_PAGE_SPLIT:
nstatus, npage, note = "exhausted", page, "达最深层,止于该格"
else:
nstatus, npage, note = "pending", page + 1, "本页满,续下一页"
if nstatus is not None:
await grid_update(cell["id"], npage,
res.get("approved", 0) + res.get("pending", 0),
nstatus)
reason = f"全城网格扫描 {area}{page}页({focus}/{tc}"
await sa_add_task(run_id, step, focus, "ingest", reason,
"gaode_grid", result=res, status="done",
note=f"格#{cell['id']} d{cell['depth']} p{page}·{note}")
await sa_append_step(run_id, {
"step": step,
"plan": {"poi_type": focus, "keyword": f"网格 {area} p{page}",
"reason": reason},
"result": res,
}, ingested_delta=res.get("approved", 0))
# 该类全城扫尽却仍偏少 → 求助管理员(去重,馆长不停)
if focus not in escalated:
gc = await grid_counts()
g = gc.get(focus, {})
if g.get("total") and g["done"] >= g["total"]:
maxc = max(cov.values()) if cov else 0
if cov.get(focus, 0) < max(0.25 * maxc, 30):
await _escalate(run_id, step, focus, cov.get(focus, 0))
escalated.add(focus)
await asyncio.sleep(_API_PACING)
except Exception as e: # noqa: BLE001
await sa_finish(run_id, "error", str(e)[:240])
def schedule_super_agent(run_id: int) -> None:
t = asyncio.create_task(run_super_agent(run_id))
_BG.add(t)
t.add_done_callback(_BG.discard)