Files
bxh/app/agents/xhs_agent.py

485 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""小红书子 AgentP3 v1—— 舆情/体验数据采集 → ExperienceTag。
参考用户 test.py 的 DrissionPage 监听方案,改造为自治版:
• 关键词由 Super Agent 按 KG 需求决定(不再 input()
• 搜索后 **AI 读动态 tab 栏(综合/必吃榜/本地推荐/特色小吃…)按业务需求选 tab**
(默认综合),不再人工选;
• 登录用 **持久化 Chrome Profile**一次人工登录→cookie 持久→之后无头复用;
未登录/失效 **不阻塞**,升级开工单+通知管理员重登(与现有升级机制一致);
• 采集到的帖子/评论文本 → opus 提炼成 **体验标签 ExperienceTag** 挂到 Place。
责任边界:仅做合规、低频、不破解登录的采集;登录/验证码走人工一次性手动接管
(持久化复用,非凭证窃取)。数据先跑通,形态后续细化。
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import os
import random
import time
from urllib.parse import quote
from app.config import settings
from app.db import get_agent_settings, sa_save_evidence
from app.llm_client import LlmClient
def _parse_api_notes(raw_api: list, name: str, district: str,
keyword: str) -> list[dict]:
"""解析已捕获的小红书搜索 API JSON → 全结构化证据记录(含 PII
字段容错(小红书结构常变,沿用 test.py 的多路径兜底)。
"""
out: list[dict] = []
seen: set = set()
for j in raw_api or []:
if not isinstance(j, dict):
continue
items = (j.get("data") or {}).get("items") or []
for it in items:
nc = it.get("note_card") or it.get("noteCard") or {}
if not nc:
continue
sid = (it.get("id") or nc.get("note_id")
or nc.get("id") or "")
if not sid or sid in seen:
continue
seen.add(sid)
u = nc.get("user") or nc.get("User") or {}
ii = nc.get("interact_info") or nc.get("interactInfo") or {}
cover = nc.get("cover") or {}
imgs = []
for im in (nc.get("image_list") or nc.get("images") or []):
if isinstance(im, dict):
iu = (im.get("url_default") or im.get("url")
or im.get("url_pre") or "")
if iu:
imgs.append(iu)
elif isinstance(im, str):
imgs.append(im)
cu = (cover.get("url_default") or cover.get("url") or "") \
if isinstance(cover, dict) else ""
if cu and cu not in imgs:
imgs.insert(0, cu)
out.append({
"platform": "xhs", "kind": "note", "source_id": str(sid),
"url": f"https://www.xiaohongshu.com/explore/{sid}",
"entity_name": name, "keyword": keyword,
"title": nc.get("display_title") or nc.get("title") or "",
"content": nc.get("desc") or nc.get("content") or "",
"author": u.get("nick_name") or u.get("nickname")
or u.get("name") or "",
"author_id": u.get("user_id") or u.get("userId")
or u.get("id") or "",
"author_avatar": u.get("avatar") or u.get("image") or "",
"likes": ii.get("liked_count") or ii.get("likes") or 0,
"comments": ii.get("comment_count") or 0,
"collects": ii.get("collected_count") or 0,
"shares": ii.get("shared_count") or ii.get("share_count") or 0,
"publish_time": nc.get("time") or nc.get("create_time") or "",
"location": (nc.get("location") or {}).get("name", "")
if isinstance(nc.get("location"), dict) else "",
"tags": [t.get("name", "") for t in (nc.get("tag_list") or [])
if isinstance(t, dict)],
"image_urls": imgs,
"raw": it,
})
return out
def _parse_api_comments(raw_api: list, name: str,
keyword: str) -> list[dict]:
"""解析已捕获的小红书评论 API JSON → 评论证据(含时间/PII)。
评论的"时间"是二期事件抽取的关键锚点。字段多路径容错。
"""
out: list[dict] = []
seen: set = set()
for j in raw_api or []:
if not isinstance(j, dict):
continue
data = j.get("data") or {}
comments = data.get("comments")
if not comments:
continue
note_id = data.get("note_id") or ""
for cm in comments:
cid = cm.get("comment_id") or cm.get("id") or ""
if not cid or cid in seen:
continue
seen.add(cid)
u = (cm.get("user_info") or cm.get("user")
or cm.get("User") or cm.get("author") or {})
imgs = []
for im in (cm.get("pictures") or cm.get("images")
or cm.get("image_list") or []):
if isinstance(im, dict):
iu = im.get("url") or im.get("url_default") or ""
if iu:
imgs.append(iu)
elif isinstance(im, str):
imgs.append(im)
reps = (cm.get("sub_comments") or cm.get("replies")
or cm.get("reply_list") or [])
out.append({
"platform": "xhs", "kind": "comment", "source_id": str(cid),
"url": f"https://www.xiaohongshu.com/explore/{note_id}"
if note_id else "",
"entity_name": name, "keyword": keyword,
"title": "", "content": cm.get("content")
or cm.get("text") or "",
"author": u.get("nickname") or u.get("nick_name")
or u.get("name") or "",
"author_id": u.get("user_id") or u.get("userId")
or u.get("id") or "",
"author_avatar": u.get("image") or u.get("avatar") or "",
"likes": cm.get("like_count") or cm.get("likes") or 0,
"comments": len(reps) if isinstance(reps, list) else 0,
"collects": 0, "shares": 0,
"publish_time": str(cm.get("create_time")
or cm.get("time") or ""),
"location": cm.get("ip_location") or "",
"tags": [], "image_urls": imgs,
"raw": {"note_id": note_id, **cm},
})
return out
# 专用持久化用户目录(与你日常 Chrome 隔离;一次人工登录后 cookie 常驻)
XHS_PROFILE_DIR = os.path.expanduser("~/.zn-kg/xhs-profile")
_SEARCH = "https://www.xiaohongshu.com/search_result?keyword={kw}"
_LISTEN = ["/api/sns/web/v1/feed", "/api/sns/web/v2/feed",
"/api/sns/web/v1/search/notes", "/api/sns/web/v1/comment/list",
"/api/sns/web/v2/comment/page", "/api/sns/web/v1/comment/sub/page"]
_DEEP_NOTES = 3 # 评论深采:每次点开前 N 个帖子滚动捞评论
_DEEP_SCROLL = 30 # 单帖最多滚动多少次(以评论接口不再增长提前停)
_DEEP_STALL = 4 # 连续 N 次滚动无新评论包 → 判该帖评论已捞尽
_TAB_SYS = """你在小红书搜索结果页选筛选 tab。给你业务需求 + 当前页可选 tab 列表。
按业务需求选**最相关的一个** tab 名;拿不准就选「综合」。
只输出 JSON{"tab":"要点的tab名","reason":"一句话"}"""
_TAG_SYS = """你从小红书 UGC帖子标题/正文/评论)提炼某地点的"体验标签"
只保留多条内容相互印证、对该地点有信息量的短标签(如:酸汤鱼必点、排队久、
人均50、网红打卡、本地人推荐、环境一般。不编造噪声/广告/无关一律丢弃。
只输出 JSON{"tags":["标签1","标签2"],"sentiment":"正面|中性|负面|混合",
"summary":"一句话口碑概述"}"""
def _unlock_profile() -> None:
"""清理被 kill 的 Chromium 残留的 Singleton 死锁。
本机单用户串行使用该持久化目录(编排器 xhs 步串行 + 登录脚本手动),
Singleton* 几乎总是上次进程被 kill 的残留;清掉可避免
"profile already in use" 误报。若真有活进程占用,新进程仍会自旋安全退出。
"""
for n in ("SingletonLock", "SingletonCookie", "SingletonSocket"):
try:
os.unlink(os.path.join(XHS_PROFILE_DIR, n))
except OSError:
pass
async def _build_llm() -> LlmClient | None:
try:
cfg = await get_agent_settings()
except Exception:
cfg = {}
g = cfg.get("global", {}) if cfg else {}
a = (cfg.get("agents", {}) or {}).get("xhs_agent", {}) if cfg else {}
if a and a.get("enabled") is False:
return None
key = a.get("api_key") or g.get("api_key") or settings.llm_api_key
base = a.get("base_url") or g.get("base_url") or settings.llm_api_base
if not key or not base:
return None
model = a.get("model") or g.get("model") or settings.llm_model or "deepseek-chat"
return LlmClient(api_base=base, api_key=key, model=model,
timeout=int(g.get("timeout") or 120))
def _collect(keyword: str, pick_tab_cb, deep: bool = False) -> dict:
"""Playwright 持久化上下文:复用我们已验证的隐身栈 + cookie 持久化。
返回 {logged_in, notes:[{title,author,likes}], raw_api:[...], tabs:[...]}。
pick_tab_cb(tab_labels:list[str]) -> str|None 由上层注入(含 AI 决策)。
持久化目录里一次人工登录(scripts/xhs_login.py)后,此处无头复用 cookie。
"""
from app.agents.web_agent import _STEALTH_JS, _UA, _CHROME_ARGS
os.makedirs(XHS_PROFILE_DIR, exist_ok=True)
_unlock_profile()
captured: list = []
notes: list = []
seen: set = set()
labels: list = []
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
ctx = p.chromium.launch_persistent_context(
user_data_dir=XHS_PROFILE_DIR, headless=True,
args=_CHROME_ARGS, ignore_default_args=["--enable-automation"],
user_agent=_UA, locale="zh-CN",
viewport={"width": 1440, "height": 900})
ctx.add_init_script(_STEALTH_JS)
def _on_resp(resp):
u = resp.url
if any(k in u for k in _LISTEN):
try:
captured.append(resp.json())
except Exception:
pass
ctx.on("response", _on_resp)
pg = ctx.pages[0] if ctx.pages else ctx.new_page()
pg.goto(_SEARCH.format(kw=quote(keyword)),
wait_until="domcontentloaded", timeout=45000)
pg.wait_for_timeout(4000)
note_eles = pg.query_selector_all("section.note-item")
html = pg.content()
if not note_eles and ("/login" in pg.url or "扫码登录" in html
or "手机号登录" in html):
ctx.close()
return {"logged_in": False, "notes": [], "raw_api": []}
# AI 选 tab
try:
tabs = pg.query_selector_all(
"div.content-container button.tab")
labels = [(t.get_attribute("aria-details")
or t.inner_text() or "").strip() for t in tabs]
labels = [x for x in labels if x]
chosen = pick_tab_cb(labels) if labels else None
if chosen and chosen != "综合":
for t in tabs:
lab = (t.get_attribute("aria-details")
or t.inner_text() or "").strip()
if lab == chosen:
t.click()
pg.wait_for_timeout(3000)
break
except Exception:
pass
for _ in range(6):
for it in pg.query_selector_all("section.note-item"):
try:
te = (it.query_selector(".title")
or it.query_selector("a.title"))
title = (te.inner_text().strip() if te else "")
if not title or title in seen:
continue
seen.add(title)
au = (it.query_selector(".author .name")
or it.query_selector(".name"))
lk = (it.query_selector(".like-wrapper .count")
or it.query_selector(".count"))
notes.append({
"title": title,
"author": au.inner_text().strip() if au else "",
"likes": lk.inner_text().strip() if lk else ""})
except Exception:
continue
try:
pg.mouse.wheel(0, 2400)
except Exception:
pass
pg.wait_for_timeout(int(random.uniform(1200, 2000)))
if len(notes) >= 40:
break
# ── 评论深采:现版小红书点 cover = 整页跳转(非弹层)
# 故每帖在**独立新标签页**打开 note 详情,搜索页 pg 不受影响;
# ctx 级监听照样捕获 /comment 接口;以"评论接口数据增长"驱动
# 滚动(真滚轮+容器scrollTo+End),连续无新增或 THE END 即停。
_SCROLLERS = (".note-scroller", ".comment-container",
".interaction-container", ".comments-container")
def _ccount() -> int:
t = 0
for j in captured:
if isinstance(j, dict):
cs = (j.get("data") or {}).get("comments")
if cs:
t += len(cs)
return t
if deep:
search_url = pg.url
for idx in range(_DEEP_NOTES):
try:
covers = pg.query_selector_all(
"section.note-item a.cover")
if idx >= len(covers):
break
# SPA 内点击进帖(浏览器自带签名发评论请求),等导航完成
try:
with pg.expect_navigation(timeout=15000):
covers[idx].click()
except Exception:
covers[idx].click()
pg.wait_for_load_state("domcontentloaded",
timeout=20000)
pg.wait_for_timeout(3200) # 详情+首屏评论
if not pg.query_selector(".no-comments"):
stall = 0
prev = _ccount()
for _ in range(_DEEP_SCROLL):
scs = [x for x in
(pg.query_selector(s)
for s in _SCROLLERS) if x]
try:
for sc in (scs or [None]):
if sc:
box = sc.bounding_box()
if box:
pg.mouse.move(
box["x"] + box["width"]/2,
box["y"] + box["height"]/2)
pg.mouse.wheel(0, 4000)
pg.evaluate(
"(e)=>e&&e.scrollTo("
"0,e.scrollHeight)", sc)
else:
pg.mouse.wheel(0, 4000)
pg.keyboard.press("End")
except Exception:
pass
pg.wait_for_timeout(
int(random.uniform(1800, 2600)))
if (pg.query_selector(".end-container")
or pg.query_selector(
".comment-end-container")):
break # THE END
cur = _ccount()
if cur > prev:
prev, stall = cur, 0
else:
stall += 1
if stall >= _DEEP_STALL:
break # 无新增→捞尽
pg.wait_for_timeout(3000) # 等末尾评论包
# 返回搜索结果 SPA准备下一帖
try:
with pg.expect_navigation(timeout=15000):
pg.go_back()
except Exception:
pg.goto(search_url,
wait_until="domcontentloaded",
timeout=30000)
pg.wait_for_load_state("domcontentloaded",
timeout=20000)
pg.wait_for_timeout(
int(random.uniform(1500, 2400)))
except Exception:
try: # 卡住就回搜索页,保证后续不挂
pg.goto(search_url,
wait_until="domcontentloaded",
timeout=30000)
pg.wait_for_timeout(1500)
except Exception:
pass
continue
ctx.close()
# 已登录的小红书搜索页必有 tab 栏/搜索API三者皆空 ⇒ 实为登录墙/拦截
if not notes and not labels and not captured:
return {"logged_in": False, "notes": [], "raw_api": []}
return {"logged_in": True, "notes": notes[:40],
"raw_api": captured, "tabs": labels}
except Exception as e: # noqa: BLE001
return {"error": str(e)[:140]}
async def xhs_enrich(entity: dict) -> dict:
"""对某 Place 联小红书采 UGC → 体验标签。
返回 {ok, found, need_login, tags, sentiment, summary}。
ok=False=未配置/停用need_login=True=要人工一次性登录(升级)。
"""
llm = await _build_llm()
if llm is None:
return {"ok": False, "summary": "xhs_agent 未配置或停用"}
name = entity.get("name", "")
district = entity.get("district", "")
biz = entity.get("biz_need") or f"补全「{name}」的真实口碑/体验/网红热度"
keyword = f"贵阳 {name}".strip()
def _pick(labels: list[str]) -> str | None:
try:
r = llm.chat_json(_TAB_SYS, json.dumps(
{"业务需求": biz, "可选tab": labels}, ensure_ascii=False))
return (r or {}).get("tab")
except Exception:
return None
res = await asyncio.to_thread(_collect, keyword, _pick, True)
if res.get("error"):
return {"ok": True, "found": False, "summary": f"采集异常:{res['error']}"}
if res.get("logged_in") is False:
return {"ok": True, "found": False, "need_login": True,
"summary": "小红书未登录,需一次性人工登录(已升级)"}
dom_notes = res.get("notes") or []
pnk = entity.get("eid") or entity.get("natural_key")
# 1) 优先用已捕获的官方 API → 全结构化证据(含 PII按你的决策全留)
records = _parse_api_notes(res.get("raw_api") or [], name,
district, keyword)
# 2) API 没捕到则用 DOM 卡片作轻证据(标题哈希作稳定 source_id)
if not records and dom_notes:
for n in dom_notes:
t = n.get("title") or ""
if not t:
continue
sid = "domhash:" + hashlib.md5(
(name + "|" + t).encode()).hexdigest()[:16]
records.append({
"platform": "xhs", "kind": "note_lite", "source_id": sid,
"url": "", "entity_name": name, "keyword": keyword,
"title": t, "content": "", "author": n.get("author", ""),
"author_id": "", "author_avatar": "",
"likes": 0, "comments": 0, "collects": 0, "shares": 0,
"publish_time": "", "location": "",
"tags": [], "image_urls": [], "raw": n})
# 3) 评论深采(滚动加载所得)→ 评论证据(含时间,事件抽取锚点)
comment_records = _parse_api_comments(res.get("raw_api") or [],
name, keyword)
records += comment_records
if not records:
return {"ok": True, "found": False,
"summary": f"{name}」小红书无相关 UGC跳过"}
for rec in records:
rec["place_natural_key"] = pnk
try:
saved = await sa_save_evidence(records) # 原始证据入库
except Exception:
saved = 0
n_note = sum(1 for x in records if x["kind"] != "comment")
n_cmt = len(comment_records)
# 4) 体验标签仅从帖子语料派生(评论留给事件抽取),可溯源
corpus = json.dumps(
{"地点": name, "区县": district,
"帖子": [{"标题": x["title"], "": x.get("likes", 0)}
for x in records if x["kind"] != "comment"][:40]},
ensure_ascii=False)
try:
r = await asyncio.to_thread(llm.chat_json, _TAG_SYS, corpus)
except Exception as e: # noqa: BLE001
return {"ok": True, "found": True, "evidence_saved": saved,
"tags": [], "note_count": len(records),
"summary": f"证据入库{saved}条,标签提炼失败:{str(e)[:50]}"}
tags = [str(t).strip()[:24] for t in (r.get("tags") or []) if str(t).strip()]
return {"ok": True, "found": True,
"tags": tags[:12], "sentiment": r.get("sentiment", ""),
"evidence_saved": saved, "note_count": n_note,
"comment_count": n_cmt,
"summary": f"小红书 帖{n_note}+评论{n_cmt} 入证据层(存{saved}) → "
f"{len(tags)} 体验标签·口碑{r.get('sentiment','?')}"}