"""Super Agent — 知识图谱馆长(全城网格自治采集,常驻不停)。 科学合理 & 省额度的核心:**地理网格扫描**,不再用"热度关键词"。 • 把贵阳整个城市切成网格,逐格用高德"多边形(矩形)搜索 + 官方类型编码" 系统性扫,每格每页都是不同地理切片 → 几乎不重复 → 不浪费 API 额度。 • 每格扫到第几页**持久化**(gaode_grid_cells.next_page),跨步/跨轮接着扫, 扫尽的格标记 exhausted **永不重复请求**(省额度的关键)。 • 稠密区一格出不完 → **自适应四叉细分**,保证不漏。 • 全城扫完不"结束",转**驻守巡检**(不再消耗额度,仅响应停止/再播种)。 • 进度=网格地理覆盖率,真实可反映,非拍脑袋数字。 """ from __future__ import annotations import asyncio import re from app.api.graph import _get_graph from app.config import settings from app.agents.gaode_connector import AMAP_TYPECODES, search_polygon from app.agents.super_ingest import ingest_rows from app.agents.distill_gate import distill_entity, ATTR_FIELDS from app.agents.web_agent import web_enrich from app.agents.xhs_agent import xhs_enrich from app.agents.douyin_agent import douyin_enrich from app.agents.event_miner import mine_events from app.db import ( sa_append_step, sa_finish, sa_stop_requested, sa_add_task, sa_set_status, sa_has_open_escalation, create_acquisition_task, create_notification, get_admin_user_id, grid_seed, grid_counts, grid_pending_cats, grid_take_next, grid_update, grid_subdivide, sa_merge_candidate_payload, sa_record_conflict, sa_record_schema_proposal, ) _BG: set = set() # 贵阳整市外接框 (min_lng, min_lat, max_lng, max_lat):含主城+周边区县 _GY_BBOX = (106.20, 26.10, 107.30, 27.05) _CELL = 0.08 # 初始网格边长(°),约 8km _OFFSET = 25 # 高德多边形单页上限 _MAX_PAGE_SPLIT = 8 # 单格翻到第N页仍满 → 太密 → 四叉细分 _MAX_DEPTH = 3 # 最深细分层级(0.08→约1km格) _API_PACING = 0.35 # 每次高德调用间隔(秒),护 QPS _HARD_MAX_STEPS = 100000 # 仅极端兜底(持久化,跨轮续扫,不暴露用户) _MAX_API_PER_RUN = 6000 # 单进程跑安全上限;命中转驻守,下轮自动续 _STEWARD_INTERVAL = 1800 _STEWARD_TICK = 10 _ENRICH_BATCH = 5 # 每步最多蒸馏富集多少个实体 _ENRICH_EVERY = 4 # 网格忙时每 N 步插一次知识富集 _WEB_BATCH = 2 # 每步最多联网富集多少个实体(真浏览器较慢) _XHS_BATCH = 1 # 小红书浏览器很慢/脆,每步只采 1 个 def _place_counts() -> dict: g = _get_graph() out: dict = {} try: for row in g.query( "MATCH (p:Place) RETURN coalesce(p.place_type,'?'), count(*)" ).result_set: out[row[0]] = row[1] except Exception: pass return out def _coverage() -> dict: from app.agents.super_ingest import _PT counts = _place_counts() cat2pt = {cat: _PT.get(cat, "poi") for cat in AMAP_TYPECODES} items = [{"cat": cat, "place_type": cat2pt[cat], "current": int(counts.get(cat2pt[cat], 0))} for cat in AMAP_TYPECODES] return {"items": items, "total": sum(i["current"] for i in items)} def _grid_for(bbox: tuple, step: float) -> list[tuple]: mnlng, mnlat, mxlng, mxlat = bbox cells, lng = [], mnlng while lng < mxlng: lat = mnlat nlng = round(min(lng + step, mxlng), 6) while lat < mxlat: nlat = round(min(lat + step, mxlat), 6) cells.append((round(lng, 6), round(lat, 6), nlng, nlat)) lat = nlat lng = nlng return cells def _quads(c: dict) -> list[tuple]: """父格四等分为 4 个子格(矩形对半切)。""" mnlng, mnlat = c["min_lng"], c["min_lat"] mxlng, mxlat = c["max_lng"], c["max_lat"] midlng = round((mnlng + mxlng) / 2, 6) midlat = round((mnlat + mxlat) / 2, 6) return [ (mnlng, mnlat, midlng, midlat), (midlng, mnlat, mxlng, midlat), (mnlng, midlat, midlng, mxlat), (midlng, midlat, mxlng, mxlat), ] async def _seed_if_needed() -> None: have = await grid_counts() for cat, tc in AMAP_TYPECODES.items(): if cat not in have: await grid_seed(cat, tc, _grid_for(_GY_BBOX, _CELL)) async def _escalate(run_id: int, step: int, cat: str, cur: int) -> None: if await sa_has_open_escalation(cat): await sa_add_task(run_id, step, cat, "escalate", "全城网格已扫尽,该类仍偏少", "skip_dup", status="escalated", note="已有未结工单,不重复打扰") return task_id = None try: task = await create_acquisition_task({ "tenant_id": settings.default_tenant, "project_id": settings.default_project, "created_by": "super_agent", "title": f"【Super Agent 求助】「{cat}」全城网格已扫尽仍偏少", "description": ( f"馆长已对贵阳全城做网格化高德采集,「{cat}」当前仅 {cur} 条," f"高德官方源对该类覆盖有限。建议人工核查类型编码或改用" f"小红书/大众点评/官方名录等渠道补全。馆长继续驻守," f"工单结清/数据增长后自动恢复。"), "scenario_tags": ["super_agent", "escalation", cat], "target_entity_types": ["Place"], "target_fields": [], "suggested_collection_method": "manual_or_alt_source", "priority": 1, }) task_id = task["id"] except Exception: task_id = None try: uid = await get_admin_user_id() if uid: await create_notification( uid, title=f"Super Agent 求助:「{cat}」全城网格已扫尽", body=(f"「{cat}」仅 {cur} 条,高德源覆盖有限。已开工单" + (f" #{task_id}" if task_id else "") + ",请人工/改渠道;馆长继续驻守。"), ntype="task", related_task_id=task_id) except Exception: pass await sa_add_task(run_id, step, cat, "escalate", "全城网格已扫尽,该类仍偏少", "notify_admin", status="escalated", related_task_id=task_id) await sa_append_step(run_id, { "step": step, "action": "escalate", "cat": cat, "reason": f"「{cat}」全城网格扫尽仅{cur}条,已开工单并通知管理员,继续驻守"}) def _enrich_targets(limit: int) -> list[dict]: """未富集过的 Place + 锚点(名/址/区/类) + 现有软字段。 取数显式排除 经纬度/电话(隐私红线,绝不外发给蒸馏模型)。 """ g = _get_graph() try: rs = g.query( "MATCH (p:Place) WHERE p.enrich_done IS NULL " "RETURN p.element_id, p.name, coalesce(p.place_type,''), " "coalesce(p.district,''), coalesce(p.address,''), " "coalesce(p.summary,''), coalesce(p.history,''), " "coalesce(p.features,''), coalesce(p.suitable_for,''), " "coalesce(p.best_season,''), coalesce(p.ticket_hint,'') " "LIMIT $n", {"n": limit}).result_set except Exception: return [] out = [] for r in rs: if not (r and r[1]): continue existing = {k: v for k, v in zip( ATTR_FIELDS, [r[5], r[6], r[7], r[8], r[9], r[10]]) if v} out.append({"eid": r[0], "name": r[1], "place_type": r[2], "district": r[3], "address": r[4], "existing": existing}) return out def _apply_enrich(eid: str, fields: dict) -> None: """共识字段写回 FalkorDB 节点;无论是否有字段都打 enrich_done 防重复空跑。""" g = _get_graph() sets = ["p.enrich_done=1"] params = {"eid": eid} for k in ATTR_FIELDS: if fields.get(k): sets.append(f"p.{k}=${k}") params[k] = fields[k] try: g.query(f"MATCH (p:Place {{element_id:$eid}}) SET {','.join(sets)}", params) except Exception: pass async def _distill_enrich(run_id: int, step: int, targets: list[dict]) -> bool: """工具:多模型知识蒸馏,给已有实体补"知识层"属性。 独立数据来源(不是高德质检闸门):问 N 个模型脑内知识 → 全局模型聚合 跨模型共识 → 写回 FalkorDB 节点 + 候选 payload(审计可溯)。 """ enriched = adopt_fields = keep_total = conflict_total = uncertain_total = 0 conflict_names: list[str] = [] last = "" for t in targets: res = await distill_entity(t) last = res.get("summary", "") if not res.get("ok"): # 配置/连通问题:整批中止——不打标记(待配置后重试)、不空转 await sa_add_task( run_id, step, "蒸馏", "distill", f"知识蒸馏未就绪:{last}", "distill_enrich", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="skipped", note=last) await sa_append_step(run_id, { "step": step, "action": "distill", "reason": f"知识蒸馏未配置/不可用({last}),本步跳过," f"配好≥2蒸馏模型+全局聚合后自动恢复"}) return False adopt = res.get("adopt") or {} _apply_enrich(t["eid"], adopt) # 只写 adopt;无则仅打标记防空转 if adopt: await sa_merge_candidate_payload(t["eid"], adopt) enriched += 1 adopt_fields += len(adopt) keep_total += len(res.get("keep") or []) uncertain_total += len(res.get("uncertain") or []) # 与图谱既有值矛盾 → 不覆盖,落 validation_issues 转人工 for c in (res.get("conflict") or []): await sa_record_conflict( t["eid"], c.get("field", ""), c.get("existing", ""), c.get("distilled", ""), c.get("note", "")) conflict_total += 1 if t.get("name"): conflict_names.append(f"{t['name']}·{c.get('field','')}") res_obj = {"fetched": len(targets), "approved": adopt_fields, "pending": conflict_total, "skipped": uncertain_total} await sa_add_task( run_id, step, "蒸馏", "distill", f"多模型知识蒸馏富集 {len(targets)} 个实体({last})", "distill_enrich", result=res_obj, status="done", note=f"富集{enriched}实体/{adopt_fields}字段·一致{keep_total}" f"·矛盾{conflict_total}·存疑{uncertain_total}") await sa_append_step(run_id, { "step": step, "action": "distill", "reason": f"知识蒸馏富集:{enriched}/{len(targets)} 实体补 {adopt_fields} 字段," f"矛盾 {conflict_total} 转人工({last})"}) if conflict_total: try: uid = await get_admin_user_id() if uid: await create_notification( uid, title=f"蒸馏发现 {conflict_total} 处与图谱既有数据矛盾", body=("需人工裁决(蒸馏未覆盖图谱):" + "、".join(conflict_names[:8]) + ("…" if len(conflict_names) > 8 else "") + "。详见 数据质量 / 校验问题(distill_conflict)。"), ntype="task") except Exception: pass return True def _web_targets(limit: int) -> list[dict]: """未联网富集过的 Place(无 web_done)+ 锚点 + 现有软字段。 取数显式排除 经纬度/电话(隐私红线)。""" g = _get_graph() try: # 景点最可能有公开百科页 → 优先联网富集,命中率最高 rs = g.query( "MATCH (p:Place) WHERE p.web_done IS NULL " "RETURN p.element_id, p.name, coalesce(p.place_type,''), " "coalesce(p.district,''), coalesce(p.address,''), " "coalesce(p.summary,''), coalesce(p.history,''), " "coalesce(p.features,''), coalesce(p.suitable_for,''), " "coalesce(p.best_season,''), coalesce(p.ticket_hint,'') " "ORDER BY CASE WHEN p.place_type='sight' THEN 0 ELSE 1 END, " "p.element_id LIMIT $n", {"n": limit}).result_set except Exception: return [] out = [] for r in rs: if not (r and r[1]): continue existing = {k: v for k, v in zip( ATTR_FIELDS, [r[5], r[6], r[7], r[8], r[9], r[10]]) if v} out.append({"eid": r[0], "name": r[1], "place_type": r[2], "district": r[3], "address": r[4], "existing": existing}) return out def _apply_web(eid: str, adopt: dict) -> None: """网页采纳字段写回;打 web_done(必);有 adopt 则连 enrich_done 一并打 (网页权威,省一次记忆蒸馏)。""" g = _get_graph() sets = ["p.web_done=1"] params = {"eid": eid} if adopt: sets.append("p.enrich_done=1") for k in ATTR_FIELDS: if adopt.get(k): sets.append(f"p.{k}=${k}") params[k] = adopt[k] try: g.query(f"MATCH (p:Place {{element_id:$eid}}) SET {','.join(sets)}", params) except Exception: pass async def _web_enrich(run_id: int, step: int, targets: list[dict]) -> bool: """工具:browser-use 式联网采集(真浏览器抓权威页 → opus 抽取对齐)。""" enriched = adopt_fields = found = conflict_total = gap_total = 0 last = "" for t in targets: r = await web_enrich(t) last = r.get("summary", "") if not r.get("ok"): await sa_add_task(run_id, step, "联网", "web", f"web_agent 未就绪:{last}", "web_agent", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="skipped", note=last) await sa_append_step(run_id, { "step": step, "action": "web", "reason": f"web_agent 未配置/不可用({last}),跳过待恢复"}) return False if not r.get("found") or r.get("entity_match") is False: _apply_web(t["eid"], {}) # 标记 web_done,不再重复抓 continue found += 1 adopt = r.get("adopt") or {} _apply_web(t["eid"], adopt) if adopt: await sa_merge_candidate_payload(t["eid"], adopt) enriched += 1 adopt_fields += len(adopt) for c in (r.get("conflict") or []): await sa_record_conflict(t["eid"], c.get("field", ""), c.get("existing", ""), c.get("web", ""), c.get("note", "")) conflict_total += 1 for sgap in (r.get("schema_gaps") or []): iid = await sa_record_schema_proposal( sgap.get("attr", ""), sgap.get("field", "") or re.sub(r"\W+", "_", sgap.get("attr", "")).strip("_"), str(sgap.get("value", ""))[:200], f"web_agent 在「{t['name']}」网页发现:{sgap.get('why','')}", float(r.get("confidence") or 0.7)) if iid: gap_total += 1 res_obj = {"fetched": len(targets), "approved": adopt_fields, "pending": conflict_total, "skipped": gap_total} await sa_add_task( run_id, step, "联网", "web", f"browser-use 联网采集 {len(targets)} 个实体({last})", "web_agent", result=res_obj, status="done", note=f"命中{found}·补{adopt_fields}字段·矛盾{conflict_total}" f"·schema提案{gap_total}") await sa_append_step(run_id, { "step": step, "action": "web", "reason": f"联网采集:命中 {found}/{len(targets)},补 {adopt_fields} 字段," f"矛盾 {conflict_total} 转人工,schema 提案 {gap_total}({last})"}) if conflict_total or gap_total: try: uid = await get_admin_user_id() if uid: await create_notification( uid, title=f"web_agent:{conflict_total} 矛盾 / {gap_total} schema 提案待裁决", body="联网采集与图谱矛盾或发现新属性,详见 数据质量/校验问题 " "与 本体建模/字段提案。", ntype="task") except Exception: pass return True def _xhs_targets(limit: int) -> list[dict]: """未采过小红书(无 xhs_done)的 Place;美食最优先(小红书食/玩为主)。""" g = _get_graph() try: rs = g.query( "MATCH (p:Place) WHERE p.xhs_done IS NULL " "RETURN p.element_id, p.name, coalesce(p.place_type,''), " "coalesce(p.district,'') " "ORDER BY CASE WHEN p.place_type='eat' THEN 0 " "WHEN p.place_type='sight' THEN 1 ELSE 2 END, p.element_id " "LIMIT $n", {"n": limit}).result_set except Exception: return [] return [{"eid": r[0], "name": r[1], "place_type": r[2], "district": r[3]} for r in rs if r and r[1]] def _apply_xhs(eid: str, tags: list[str]) -> None: """体验标签写回:MERGE ExperienceTag + (Place)-[:HAS_TAG]->(tag); 无论有无标签都打 xhs_done 防重复。""" g = _get_graph() try: g.query("MATCH (p:Place {element_id:$eid}) SET p.xhs_done=1", {"eid": eid}) for t in tags: g.query( "MATCH (p:Place {element_id:$eid}) " "MERGE (e:ExperienceTag {name:$t}) " "MERGE (p)-[:HAS_TAG]->(e) SET e.source='xiaohongshu'", {"eid": eid, "t": t}) except Exception: pass async def _xhs_enrich(run_id: int, step: int, targets: list[dict]) -> bool: """工具:小红书 UGC → 体验标签。未登录→升级人工(一次性登录),本轮关闭。""" done = tags_total = found = ev_total = 0 last = "" for t in targets: r = await xhs_enrich(t) last = r.get("summary", "") if not r.get("ok"): await sa_add_task(run_id, step, "小红书", "xhs", f"xhs_agent 未就绪:{last}", "xhs_agent", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="skipped", note=last) await sa_append_step(run_id, {"step": step, "action": "xhs", "reason": f"小红书未配置/停用({last})"}) return False if r.get("need_login"): if not await sa_has_open_escalation("小红书登录"): try: task = await create_acquisition_task({ "tenant_id": settings.default_tenant, "project_id": settings.default_project, "created_by": "super_agent", "title": "【Super Agent 求助】小红书需一次性人工登录", "description": "后台 xhs_agent 检测到未登录。请在项目根目录运行 " "`python3 scripts/xhs_login.py`,弹出浏览器里登录" "小红书一次;cookie 持久化后馆长自动恢复采集。", "scenario_tags": ["super_agent", "escalation", "小红书登录"], "target_entity_types": ["Place"], "target_fields": [], "suggested_collection_method": "manual_login", "priority": 1}) uid = await get_admin_user_id() if uid: await create_notification( uid, title="小红书需一次性登录", body="运行 scripts/xhs_login.py 登录一次即可,馆长自动恢复。", ntype="task", related_task_id=task["id"]) except Exception: pass await sa_add_task(run_id, step, "小红书", "xhs", "小红书未登录,已升级人工一次性登录", "notify_admin", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="escalated", note="待 scripts/xhs_login.py") await sa_append_step(run_id, { "step": step, "action": "xhs", "reason": "小红书未登录 → 已开工单+通知(运行 xhs_login.py),本轮暂停小红书"}) return False ev_total += int(r.get("evidence_saved") or 0) if r.get("found"): _apply_xhs(t["eid"], r.get("tags") or []) found += 1 if r.get("tags"): done += 1 tags_total += len(r["tags"]) else: _apply_xhs(t["eid"], []) # 标记,避免重复 res_obj = {"fetched": len(targets), "approved": tags_total, "pending": ev_total, "skipped": len(targets) - found} await sa_add_task(run_id, step, "小红书", "xhs", f"小红书 UGC 采集 {len(targets)} 个实体({last})", "xhs_agent", result=res_obj, status="done", note=f"命中{found}·证据{ev_total}条·体验标签{tags_total}") await sa_append_step(run_id, { "step": step, "action": "xhs", "reason": f"小红书采集:命中 {found}/{len(targets)},证据入库 {ev_total} 条," f"产出 {tags_total} 个体验标签({last})"}) return True def _dy_targets(limit: int) -> list[dict]: """未采过抖音(无 dy_done)的 Place;美食/景点优先(抖音食玩为主)。""" g = _get_graph() try: rs = g.query( "MATCH (p:Place) WHERE p.dy_done IS NULL " "RETURN p.element_id, p.name, coalesce(p.place_type,''), " "coalesce(p.district,'') " "ORDER BY CASE WHEN p.place_type='eat' THEN 0 " "WHEN p.place_type='sight' THEN 1 ELSE 2 END, p.element_id " "LIMIT $n", {"n": limit}).result_set except Exception: return [] return [{"eid": r[0], "name": r[1], "place_type": r[2], "district": r[3]} for r in rs if r and r[1]] def _apply_dy(eid: str, tags: list[str]) -> None: g = _get_graph() try: g.query("MATCH (p:Place {element_id:$eid}) SET p.dy_done=1", {"eid": eid}) for t in tags: g.query( "MATCH (p:Place {element_id:$eid}) " "MERGE (e:ExperienceTag {name:$t}) " "MERGE (p)-[:HAS_TAG]->(e) SET e.source='douyin'", {"eid": eid, "t": t}) except Exception: pass async def _douyin_enrich(run_id: int, step: int, targets: list[dict]) -> bool: """工具:抖音 UGC → 证据层+体验标签。未登录→升级一次性登录,本轮关闭。""" done = tags_total = found = ev_total = 0 last = "" for t in targets: r = await douyin_enrich(t) last = r.get("summary", "") if not r.get("ok"): await sa_add_task(run_id, step, "抖音", "douyin", f"douyin_agent 未就绪:{last}", "douyin_agent", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="skipped", note=last) await sa_append_step(run_id, {"step": step, "action": "douyin", "reason": f"抖音未配置/停用({last})"}) return False if r.get("need_login"): if not await sa_has_open_escalation("抖音登录"): try: task = await create_acquisition_task({ "tenant_id": settings.default_tenant, "project_id": settings.default_project, "created_by": "super_agent", "title": "【Super Agent 求助】抖音需一次性人工登录", "description": "后台 douyin_agent 检测到未登录。请在项目根目录" "运行 `python3 scripts/douyin_login.py`,弹出" "浏览器里登录抖音一次;cookie 持久化后自动恢复。", "scenario_tags": ["super_agent", "escalation", "抖音登录"], "target_entity_types": ["Place"], "target_fields": [], "suggested_collection_method": "manual_login", "priority": 1}) uid = await get_admin_user_id() if uid: await create_notification( uid, title="抖音需一次性登录", body="运行 scripts/douyin_login.py 登录一次即可," "馆长自动恢复。", ntype="task", related_task_id=task["id"]) except Exception: pass await sa_add_task(run_id, step, "抖音", "douyin", "抖音未登录,已升级人工一次性登录", "notify_admin", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="escalated", note="待 scripts/douyin_login.py") await sa_append_step(run_id, { "step": step, "action": "douyin", "reason": "抖音未登录 → 已开工单+通知(运行 douyin_login.py)," "本轮暂停抖音"}) return False ev_total += int(r.get("evidence_saved") or 0) if r.get("found"): _apply_dy(t["eid"], r.get("tags") or []) found += 1 if r.get("tags"): done += 1 tags_total += len(r["tags"]) else: _apply_dy(t["eid"], []) res_obj = {"fetched": len(targets), "approved": tags_total, "pending": ev_total, "skipped": len(targets) - found} await sa_add_task(run_id, step, "抖音", "douyin", f"抖音 UGC 采集 {len(targets)} 个实体({last})", "douyin_agent", result=res_obj, status="done", note=f"命中{found}·证据{ev_total}条·体验标签{tags_total}") await sa_append_step(run_id, { "step": step, "action": "douyin", "reason": f"抖音采集:命中 {found}/{len(targets)},证据入库 {ev_total} 条," f"产出 {tags_total} 个体验标签({last})"}) return True def _events_targets(limit: int) -> list[dict]: """已采过任何源(web/xhs/dy) 但还没抽过事件(无 events_done)的 Place。 平台无关:只要 social_evidence 里有该 pnk 的证据(任意 platform), 都可以挖事件,不再硬要求 xhs_done。 """ g = _get_graph() try: rs = g.query( "MATCH (p:Place) " "WHERE (p.web_done=1 OR p.xhs_done=1 OR p.dy_done=1) " " AND p.events_done IS NULL " "RETURN p.element_id, p.name LIMIT $n", {"n": limit}).result_set except Exception: return [] return [{"eid": r[0], "name": r[1]} for r in rs if r and r[1]] def _apply_events(eid: str, events: list[dict]) -> None: """事件写回:MERGE Event + (Place)-[:HAS_EVENT{time,type}]->(Event)。 - source 不再硬编 'xiaohongshu',按事件实际来源(baike/wiki/xhs/douyin)写 - 加 time_norm(排序用) / participants(涉及人物) / confidence(置信度) - MERGE 用 (place,title) 幂等,同名事件不会重复 - 无论有无都打 events_done 防重复整批 """ g = _get_graph() try: g.query("MATCH (p:Place {element_id:$eid}) SET p.events_done=1", {"eid": eid}) for e in events: g.query( "MATCH (p:Place {element_id:$eid}) " "MERGE (ev:Event {place:$eid, title:$t}) " "SET ev.time=$tm, ev.time_norm=$tn, ev.type=$ty, " " ev.desc=$d, ev.source=$src, ev.participants=$ppl, " " ev.confidence=$conf " "MERGE (p)-[:HAS_EVENT {time:$tn, type:$ty}]->(ev)", {"eid": eid, "t": e.get("title", ""), "tm": e.get("time", ""), "tn": e.get("time_norm", "") or e.get("time", ""), "ty": e.get("type", ""), "d": e.get("desc", ""), "src": e.get("source_platform", "") or "mixed", "ppl": ",".join(e.get("participants") or []), "conf": float(e.get("confidence") or 0)}) except Exception: pass async def _event_mine(run_id: int, step: int, targets: list[dict]) -> bool: """工具:从证据层评论/帖子抽时间锚定事件(纯 LLM,不爬网)。""" done = ev_total = 0 last = "" for t in targets: r = await mine_events(t) last = r.get("summary", "") if not r.get("ok"): await sa_add_task(run_id, step, "事件", "event", f"event_miner 未就绪:{last}", "event_miner", result={"fetched": 0, "approved": 0, "pending": 0, "skipped": 0}, status="skipped", note=last) await sa_append_step(run_id, {"step": step, "action": "event", "reason": f"事件抽取未配置({last})"}) return False evs = r.get("events") or [] _apply_events(t["eid"], evs) if evs: done += 1 ev_total += len(evs) res_obj = {"fetched": len(targets), "approved": ev_total, "pending": 0, "skipped": len(targets) - done} await sa_add_task(run_id, step, "事件", "event", f"评论时间→事件抽取 {len(targets)} 个实体({last})", "event_miner", result=res_obj, status="done", note=f"命中{done}·事件{ev_total}") await sa_append_step(run_id, { "step": step, "action": "event", "reason": f"事件抽取:{done}/{len(targets)} 实体," f"产出 {ev_total} 个时间锚定事件({last})"}) return True async def run_super_agent(run_id: int) -> None: await _seed_if_needed() step = 0 api_calls = 0 enrich_off = False # 记忆蒸馏未配置/不可用时本轮关闭 web_off = False # web_agent 未就绪时本轮关闭,避免空转 xhs_off = False # 小红书未配置/未登录时本轮关闭 dy_off = False # 抖音未配置/未登录时本轮关闭 event_off = False # 事件抽取未配置时本轮关闭 enrich_turn = 0 # 富集源轮转(web/xhs/douyin/event/distill 公平) escalated: set[str] = set() async def _wait_steward() -> bool: waited = 0 while waited < _STEWARD_INTERVAL: if await sa_stop_requested(run_id): return True await asyncio.sleep(_STEWARD_TICK) waited += _STEWARD_TICK return False try: while True: if await sa_stop_requested(run_id): await sa_finish(run_id, "stopped") return # ── 指挥大脑选工具 ─────────────────────────────────────── # 高德网格法 = 主力快采(最快、结构化、带坐标); # 多模型蒸馏 = 知识富集(补高德给不了的知识层,独立来源)。 # 网格忙时每 _ENRICH_EVERY 步插一次富集;网格扫完后全力富集。 pend = await grid_pending_cats() grid_busy = (bool(pend) and api_calls < _MAX_API_PER_RUN and step < _HARD_MAX_STEPS) # 富集插槽:web/小红书/抖音/事件/记忆蒸馏 **轮转调度** # (避免某源目标海量把其它源饿死,多源公平持续推进) if ((not web_off or not xhs_off or not dy_off or not event_off or not enrich_off) and ((not grid_busy) or (step > 0 and step % _ENRICH_EVERY == 0))): pool = ([("web", _web_targets, _WEB_BATCH, _web_enrich)] if not web_off else []) pool += ([("xhs", _xhs_targets, _XHS_BATCH, _xhs_enrich)] if not xhs_off else []) pool += ([("douyin", _dy_targets, 1, _douyin_enrich)] if not dy_off else []) pool += ([("event", _events_targets, 3, _event_mine)] if not event_off else []) pool += ([("distill", _enrich_targets, _ENRICH_BATCH, _distill_enrich)] if not enrich_off else []) hit = False for i in range(len(pool)): name, tfn, batch, efn = pool[(enrich_turn + i) % len(pool)] tg = await asyncio.to_thread(tfn, batch) if not tg: continue enrich_turn += 1 step += 1 ok = await efn(run_id, step, tg) if not ok: # 未就绪 → 本轮关该源 if name == "web": web_off = True elif name == "xhs": xhs_off = True elif name == "douyin": dy_off = True elif name == "event": event_off = True else: enrich_off = True break hit = True break if hit: await asyncio.sleep(_API_PACING) continue if not grid_busy: # 网格扫完且无可富集 → 驻守(不烧额度,下轮自动续,可一键停止) reason = ("全城网格已扫完、知识富集也已补全,进入驻守巡检" "(不再消耗额度,新数据/工单结清后自动恢复,可停止)" if not pend else "达单进程安全上限,转驻守;下次启动自动从断点续扫") await sa_set_status(run_id, "stewarding") await sa_append_step(run_id, {"step": step, "action": "steward", "reason": reason}) if await _wait_steward(): await sa_finish(run_id, "stopped") return api_calls = 0 # 新驻守周期重置进程内计数 continue # 聚焦:仍有待扫格的类里,挑当前藏品最少的(均衡推进) cov = {i["cat"]: i["current"] for i in _coverage()["items"]} focus = min(pend, key=lambda c: cov.get(c, 0)) cell = await grid_take_next(focus) if not cell: continue tc = cell["typecode"] page = cell["next_page"] bbox = (cell["min_lng"], cell["min_lat"], cell["max_lng"], cell["max_lat"]) step += 1 try: rows, raw = await asyncio.to_thread( search_polygon, tc, bbox, page, _OFFSET) except Exception as e: rows, raw = [], -1 err = str(e)[:140] else: err = None api_calls += 1 res = await ingest_rows(rows, focus, focus) if rows else \ {"fetched": 0, "approved": 0, "pending": 0, "skipped": 0} if err: res["error"] = err area = (f"经[{bbox[0]:.3f},{bbox[2]:.3f}] " f"纬[{bbox[1]:.3f},{bbox[3]:.3f}]") # 状态机:扫尽 / 翻页 / 自适应细分(密集区 8 页仍满→四叉细分,防 POI 丢) if err: # 单格请求出错:不前进、不标尽,留待后续重试(不浪费已得) nstatus, npage, note = "pending", page, f"请求失败重试 {err}" elif raw <= 0 or raw < _OFFSET: nstatus, npage, note = "exhausted", page, f"网格扫尽(本页{raw}条)" elif page >= _MAX_PAGE_SPLIT and cell["depth"] < _MAX_DEPTH: await grid_subdivide(cell["id"], focus, tc, _quads(cell), cell["depth"] + 1) nstatus, npage, note = None, page, "稠密→四叉细分深扫" elif page >= _MAX_PAGE_SPLIT: nstatus, npage, note = "exhausted", page, "达最深层,止于该格" else: nstatus, npage, note = "pending", page + 1, "本页满,续下一页" if nstatus is not None: await grid_update(cell["id"], npage, res.get("approved", 0) + res.get("pending", 0), nstatus) reason = f"全城网格扫描 {area} 第{page}页({focus}/{tc})" await sa_add_task(run_id, step, focus, "ingest", reason, "gaode_grid", result=res, status="done", note=f"格#{cell['id']} d{cell['depth']} p{page}·{note}") await sa_append_step(run_id, { "step": step, "plan": {"poi_type": focus, "keyword": f"网格 {area} p{page}", "reason": reason}, "result": res, }, ingested_delta=res.get("approved", 0)) # 该类全城扫尽却仍偏少 → 求助管理员(去重,馆长不停) if focus not in escalated: gc = await grid_counts() g = gc.get(focus, {}) if g.get("total") and g["done"] >= g["total"]: maxc = max(cov.values()) if cov else 0 if cov.get(focus, 0) < max(0.25 * maxc, 30): await _escalate(run_id, step, focus, cov.get(focus, 0)) escalated.add(focus) await asyncio.sleep(_API_PACING) except Exception as e: # noqa: BLE001 await sa_finish(run_id, "error", str(e)[:240]) def schedule_super_agent(run_id: int) -> None: t = asyncio.create_task(run_super_agent(run_id)) _BG.add(t) t.add_done_callback(_BG.discard)