"""小红书子 Agent(P3 v1)—— 舆情/体验数据采集 → ExperienceTag。 参考用户 test.py 的 DrissionPage 监听方案,改造为自治版: • 关键词由 Super Agent 按 KG 需求决定(不再 input()); • 搜索后 **AI 读动态 tab 栏(综合/必吃榜/本地推荐/特色小吃…)按业务需求选 tab** (默认综合),不再人工选; • 登录用 **持久化 Chrome Profile**:一次人工登录→cookie 持久→之后无头复用; 未登录/失效 **不阻塞**,升级开工单+通知管理员重登(与现有升级机制一致); • 采集到的帖子/评论文本 → opus 提炼成 **体验标签 ExperienceTag** 挂到 Place。 责任边界:仅做合规、低频、不破解登录的采集;登录/验证码走人工一次性手动接管 (持久化复用,非凭证窃取)。数据先跑通,形态后续细化。 """ from __future__ import annotations import asyncio import hashlib import json import os import random import time from urllib.parse import quote from app.config import settings from app.db import get_agent_settings, sa_save_evidence from app.llm_client import LlmClient def _parse_api_notes(raw_api: list, name: str, district: str, keyword: str) -> list[dict]: """解析已捕获的小红书搜索 API JSON → 全结构化证据记录(含 PII)。 字段容错(小红书结构常变,沿用 test.py 的多路径兜底)。 """ out: list[dict] = [] seen: set = set() for j in raw_api or []: if not isinstance(j, dict): continue items = (j.get("data") or {}).get("items") or [] for it in items: nc = it.get("note_card") or it.get("noteCard") or {} if not nc: continue sid = (it.get("id") or nc.get("note_id") or nc.get("id") or "") if not sid or sid in seen: continue seen.add(sid) u = nc.get("user") or nc.get("User") or {} ii = nc.get("interact_info") or nc.get("interactInfo") or {} cover = nc.get("cover") or {} imgs = [] for im in (nc.get("image_list") or nc.get("images") or []): if isinstance(im, dict): iu = (im.get("url_default") or im.get("url") or im.get("url_pre") or "") if iu: imgs.append(iu) elif isinstance(im, str): imgs.append(im) cu = (cover.get("url_default") or cover.get("url") or "") \ if isinstance(cover, dict) else "" if cu and cu not in imgs: imgs.insert(0, cu) out.append({ "platform": "xhs", "kind": "note", "source_id": str(sid), "url": f"https://www.xiaohongshu.com/explore/{sid}", "entity_name": name, "keyword": keyword, "title": nc.get("display_title") or nc.get("title") or "", "content": nc.get("desc") or nc.get("content") or "", "author": u.get("nick_name") or u.get("nickname") or u.get("name") or "", "author_id": u.get("user_id") or u.get("userId") or u.get("id") or "", "author_avatar": u.get("avatar") or u.get("image") or "", "likes": ii.get("liked_count") or ii.get("likes") or 0, "comments": ii.get("comment_count") or 0, "collects": ii.get("collected_count") or 0, "shares": ii.get("shared_count") or ii.get("share_count") or 0, "publish_time": nc.get("time") or nc.get("create_time") or "", "location": (nc.get("location") or {}).get("name", "") if isinstance(nc.get("location"), dict) else "", "tags": [t.get("name", "") for t in (nc.get("tag_list") or []) if isinstance(t, dict)], "image_urls": imgs, "raw": it, }) return out def _parse_api_comments(raw_api: list, name: str, keyword: str) -> list[dict]: """解析已捕获的小红书评论 API JSON → 评论证据(含时间/PII)。 评论的"时间"是二期事件抽取的关键锚点。字段多路径容错。 """ out: list[dict] = [] seen: set = set() for j in raw_api or []: if not isinstance(j, dict): continue data = j.get("data") or {} comments = data.get("comments") if not comments: continue note_id = data.get("note_id") or "" for cm in comments: cid = cm.get("comment_id") or cm.get("id") or "" if not cid or cid in seen: continue seen.add(cid) u = (cm.get("user_info") or cm.get("user") or cm.get("User") or cm.get("author") or {}) imgs = [] for im in (cm.get("pictures") or cm.get("images") or cm.get("image_list") or []): if isinstance(im, dict): iu = im.get("url") or im.get("url_default") or "" if iu: imgs.append(iu) elif isinstance(im, str): imgs.append(im) reps = (cm.get("sub_comments") or cm.get("replies") or cm.get("reply_list") or []) out.append({ "platform": "xhs", "kind": "comment", "source_id": str(cid), "url": f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else "", "entity_name": name, "keyword": keyword, "title": "", "content": cm.get("content") or cm.get("text") or "", "author": u.get("nickname") or u.get("nick_name") or u.get("name") or "", "author_id": u.get("user_id") or u.get("userId") or u.get("id") or "", "author_avatar": u.get("image") or u.get("avatar") or "", "likes": cm.get("like_count") or cm.get("likes") or 0, "comments": len(reps) if isinstance(reps, list) else 0, "collects": 0, "shares": 0, "publish_time": str(cm.get("create_time") or cm.get("time") or ""), "location": cm.get("ip_location") or "", "tags": [], "image_urls": imgs, "raw": {"note_id": note_id, **cm}, }) return out # 专用持久化用户目录(与你日常 Chrome 隔离;一次人工登录后 cookie 常驻) XHS_PROFILE_DIR = os.path.expanduser("~/.zn-kg/xhs-profile") _SEARCH = "https://www.xiaohongshu.com/search_result?keyword={kw}" _LISTEN = ["/api/sns/web/v1/feed", "/api/sns/web/v2/feed", "/api/sns/web/v1/search/notes", "/api/sns/web/v1/comment/list", "/api/sns/web/v2/comment/page", "/api/sns/web/v1/comment/sub/page"] _DEEP_NOTES = 3 # 评论深采:每次点开前 N 个帖子滚动捞评论 _DEEP_SCROLL = 30 # 单帖最多滚动多少次(以评论接口不再增长提前停) _DEEP_STALL = 4 # 连续 N 次滚动无新评论包 → 判该帖评论已捞尽 _TAB_SYS = """你在小红书搜索结果页选筛选 tab。给你:业务需求 + 当前页可选 tab 列表。 按业务需求选**最相关的一个** tab 名;拿不准就选「综合」。 只输出 JSON:{"tab":"要点的tab名","reason":"一句话"}""" _TAG_SYS = """你从小红书 UGC(帖子标题/正文/评论)提炼某地点的"体验标签"。 只保留多条内容相互印证、对该地点有信息量的短标签(如:酸汤鱼必点、排队久、 人均50、网红打卡、本地人推荐、环境一般)。不编造,噪声/广告/无关一律丢弃。 只输出 JSON:{"tags":["标签1","标签2"],"sentiment":"正面|中性|负面|混合", "summary":"一句话口碑概述"}""" def _unlock_profile() -> None: """清理被 kill 的 Chromium 残留的 Singleton 死锁。 本机单用户串行使用该持久化目录(编排器 xhs 步串行 + 登录脚本手动), Singleton* 几乎总是上次进程被 kill 的残留;清掉可避免 "profile already in use" 误报。若真有活进程占用,新进程仍会自旋安全退出。 """ for n in ("SingletonLock", "SingletonCookie", "SingletonSocket"): try: os.unlink(os.path.join(XHS_PROFILE_DIR, n)) except OSError: pass async def _build_llm() -> LlmClient | None: try: cfg = await get_agent_settings() except Exception: cfg = {} g = cfg.get("global", {}) if cfg else {} a = (cfg.get("agents", {}) or {}).get("xhs_agent", {}) if cfg else {} if a and a.get("enabled") is False: return None key = a.get("api_key") or g.get("api_key") or settings.llm_api_key base = a.get("base_url") or g.get("base_url") or settings.llm_api_base if not key or not base: return None model = a.get("model") or g.get("model") or settings.llm_model or "deepseek-chat" return LlmClient(api_base=base, api_key=key, model=model, timeout=int(g.get("timeout") or 120)) def _collect(keyword: str, pick_tab_cb, deep: bool = False) -> dict: """Playwright 持久化上下文:复用我们已验证的隐身栈 + cookie 持久化。 返回 {logged_in, notes:[{title,author,likes}], raw_api:[...], tabs:[...]}。 pick_tab_cb(tab_labels:list[str]) -> str|None 由上层注入(含 AI 决策)。 持久化目录里一次人工登录(scripts/xhs_login.py)后,此处无头复用 cookie。 """ from app.agents.web_agent import _STEALTH_JS, _UA, _CHROME_ARGS os.makedirs(XHS_PROFILE_DIR, exist_ok=True) _unlock_profile() captured: list = [] notes: list = [] seen: set = set() labels: list = [] try: from playwright.sync_api import sync_playwright with sync_playwright() as p: ctx = p.chromium.launch_persistent_context( user_data_dir=XHS_PROFILE_DIR, headless=True, args=_CHROME_ARGS, ignore_default_args=["--enable-automation"], user_agent=_UA, locale="zh-CN", viewport={"width": 1440, "height": 900}) ctx.add_init_script(_STEALTH_JS) def _on_resp(resp): u = resp.url if any(k in u for k in _LISTEN): try: captured.append(resp.json()) except Exception: pass ctx.on("response", _on_resp) pg = ctx.pages[0] if ctx.pages else ctx.new_page() pg.goto(_SEARCH.format(kw=quote(keyword)), wait_until="domcontentloaded", timeout=45000) pg.wait_for_timeout(4000) note_eles = pg.query_selector_all("section.note-item") html = pg.content() if not note_eles and ("/login" in pg.url or "扫码登录" in html or "手机号登录" in html): ctx.close() return {"logged_in": False, "notes": [], "raw_api": []} # AI 选 tab try: tabs = pg.query_selector_all( "div.content-container button.tab") labels = [(t.get_attribute("aria-details") or t.inner_text() or "").strip() for t in tabs] labels = [x for x in labels if x] chosen = pick_tab_cb(labels) if labels else None if chosen and chosen != "综合": for t in tabs: lab = (t.get_attribute("aria-details") or t.inner_text() or "").strip() if lab == chosen: t.click() pg.wait_for_timeout(3000) break except Exception: pass for _ in range(6): for it in pg.query_selector_all("section.note-item"): try: te = (it.query_selector(".title") or it.query_selector("a.title")) title = (te.inner_text().strip() if te else "") if not title or title in seen: continue seen.add(title) au = (it.query_selector(".author .name") or it.query_selector(".name")) lk = (it.query_selector(".like-wrapper .count") or it.query_selector(".count")) notes.append({ "title": title, "author": au.inner_text().strip() if au else "", "likes": lk.inner_text().strip() if lk else ""}) except Exception: continue try: pg.mouse.wheel(0, 2400) except Exception: pass pg.wait_for_timeout(int(random.uniform(1200, 2000))) if len(notes) >= 40: break # ── 评论深采:现版小红书点 cover = 整页跳转(非弹层), # 故每帖在**独立新标签页**打开 note 详情,搜索页 pg 不受影响; # ctx 级监听照样捕获 /comment 接口;以"评论接口数据增长"驱动 # 滚动(真滚轮+容器scrollTo+End),连续无新增或 THE END 即停。 _SCROLLERS = (".note-scroller", ".comment-container", ".interaction-container", ".comments-container") def _ccount() -> int: t = 0 for j in captured: if isinstance(j, dict): cs = (j.get("data") or {}).get("comments") if cs: t += len(cs) return t if deep: search_url = pg.url for idx in range(_DEEP_NOTES): try: covers = pg.query_selector_all( "section.note-item a.cover") if idx >= len(covers): break # SPA 内点击进帖(浏览器自带签名发评论请求),等导航完成 try: with pg.expect_navigation(timeout=15000): covers[idx].click() except Exception: covers[idx].click() pg.wait_for_load_state("domcontentloaded", timeout=20000) pg.wait_for_timeout(3200) # 详情+首屏评论 if not pg.query_selector(".no-comments"): stall = 0 prev = _ccount() for _ in range(_DEEP_SCROLL): scs = [x for x in (pg.query_selector(s) for s in _SCROLLERS) if x] try: for sc in (scs or [None]): if sc: box = sc.bounding_box() if box: pg.mouse.move( box["x"] + box["width"]/2, box["y"] + box["height"]/2) pg.mouse.wheel(0, 4000) pg.evaluate( "(e)=>e&&e.scrollTo(" "0,e.scrollHeight)", sc) else: pg.mouse.wheel(0, 4000) pg.keyboard.press("End") except Exception: pass pg.wait_for_timeout( int(random.uniform(1800, 2600))) if (pg.query_selector(".end-container") or pg.query_selector( ".comment-end-container")): break # THE END cur = _ccount() if cur > prev: prev, stall = cur, 0 else: stall += 1 if stall >= _DEEP_STALL: break # 无新增→捞尽 pg.wait_for_timeout(3000) # 等末尾评论包 # 返回搜索结果 SPA,准备下一帖 try: with pg.expect_navigation(timeout=15000): pg.go_back() except Exception: pg.goto(search_url, wait_until="domcontentloaded", timeout=30000) pg.wait_for_load_state("domcontentloaded", timeout=20000) pg.wait_for_timeout( int(random.uniform(1500, 2400))) except Exception: try: # 卡住就回搜索页,保证后续不挂 pg.goto(search_url, wait_until="domcontentloaded", timeout=30000) pg.wait_for_timeout(1500) except Exception: pass continue ctx.close() # 已登录的小红书搜索页必有 tab 栏/搜索API;三者皆空 ⇒ 实为登录墙/拦截 if not notes and not labels and not captured: return {"logged_in": False, "notes": [], "raw_api": []} return {"logged_in": True, "notes": notes[:40], "raw_api": captured, "tabs": labels} except Exception as e: # noqa: BLE001 return {"error": str(e)[:140]} async def xhs_enrich(entity: dict) -> dict: """对某 Place 联小红书采 UGC → 体验标签。 返回 {ok, found, need_login, tags, sentiment, summary}。 ok=False=未配置/停用;need_login=True=要人工一次性登录(升级)。 """ llm = await _build_llm() if llm is None: return {"ok": False, "summary": "xhs_agent 未配置或停用"} name = entity.get("name", "") district = entity.get("district", "") biz = entity.get("biz_need") or f"补全「{name}」的真实口碑/体验/网红热度" keyword = f"贵阳 {name}".strip() def _pick(labels: list[str]) -> str | None: try: r = llm.chat_json(_TAB_SYS, json.dumps( {"业务需求": biz, "可选tab": labels}, ensure_ascii=False)) return (r or {}).get("tab") except Exception: return None res = await asyncio.to_thread(_collect, keyword, _pick, True) if res.get("error"): return {"ok": True, "found": False, "summary": f"采集异常:{res['error']}"} if res.get("logged_in") is False: return {"ok": True, "found": False, "need_login": True, "summary": "小红书未登录,需一次性人工登录(已升级)"} dom_notes = res.get("notes") or [] pnk = entity.get("eid") or entity.get("natural_key") # 1) 优先用已捕获的官方 API → 全结构化证据(含 PII,按你的决策全留) records = _parse_api_notes(res.get("raw_api") or [], name, district, keyword) # 2) API 没捕到则用 DOM 卡片作轻证据(标题哈希作稳定 source_id) if not records and dom_notes: for n in dom_notes: t = n.get("title") or "" if not t: continue sid = "domhash:" + hashlib.md5( (name + "|" + t).encode()).hexdigest()[:16] records.append({ "platform": "xhs", "kind": "note_lite", "source_id": sid, "url": "", "entity_name": name, "keyword": keyword, "title": t, "content": "", "author": n.get("author", ""), "author_id": "", "author_avatar": "", "likes": 0, "comments": 0, "collects": 0, "shares": 0, "publish_time": "", "location": "", "tags": [], "image_urls": [], "raw": n}) # 3) 评论深采(滚动加载所得)→ 评论证据(含时间,事件抽取锚点) comment_records = _parse_api_comments(res.get("raw_api") or [], name, keyword) records += comment_records if not records: return {"ok": True, "found": False, "summary": f"「{name}」小红书无相关 UGC,跳过"} for rec in records: rec["place_natural_key"] = pnk try: saved = await sa_save_evidence(records) # 原始证据入库 except Exception: saved = 0 n_note = sum(1 for x in records if x["kind"] != "comment") n_cmt = len(comment_records) # 4) 体验标签仅从帖子语料派生(评论留给事件抽取),可溯源 corpus = json.dumps( {"地点": name, "区县": district, "帖子": [{"标题": x["title"], "赞": x.get("likes", 0)} for x in records if x["kind"] != "comment"][:40]}, ensure_ascii=False) try: r = await asyncio.to_thread(llm.chat_json, _TAG_SYS, corpus) except Exception as e: # noqa: BLE001 return {"ok": True, "found": True, "evidence_saved": saved, "tags": [], "note_count": len(records), "summary": f"证据入库{saved}条,标签提炼失败:{str(e)[:50]}"} tags = [str(t).strip()[:24] for t in (r.get("tags") or []) if str(t).strip()] return {"ok": True, "found": True, "tags": tags[:12], "sentiment": r.get("sentiment", ""), "evidence_saved": saved, "note_count": n_note, "comment_count": n_cmt, "summary": f"小红书 帖{n_note}+评论{n_cmt} 入证据层(存{saved}) → " f"{len(tags)} 体验标签·口碑{r.get('sentiment','?')}"}