485 lines
23 KiB
Python
485 lines
23 KiB
Python
"""小红书子 Agent(P3 v1)—— 舆情/体验数据采集 → ExperienceTag。
|
||
|
||
参考用户 test.py 的 DrissionPage 监听方案,改造为自治版:
|
||
• 关键词由 Super Agent 按 KG 需求决定(不再 input());
|
||
• 搜索后 **AI 读动态 tab 栏(综合/必吃榜/本地推荐/特色小吃…)按业务需求选 tab**
|
||
(默认综合),不再人工选;
|
||
• 登录用 **持久化 Chrome Profile**:一次人工登录→cookie 持久→之后无头复用;
|
||
未登录/失效 **不阻塞**,升级开工单+通知管理员重登(与现有升级机制一致);
|
||
• 采集到的帖子/评论文本 → opus 提炼成 **体验标签 ExperienceTag** 挂到 Place。
|
||
|
||
责任边界:仅做合规、低频、不破解登录的采集;登录/验证码走人工一次性手动接管
|
||
(持久化复用,非凭证窃取)。数据先跑通,形态后续细化。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import random
|
||
import time
|
||
from urllib.parse import quote
|
||
|
||
from app.config import settings
|
||
from app.db import get_agent_settings, sa_save_evidence
|
||
from app.llm_client import LlmClient
|
||
|
||
|
||
def _parse_api_notes(raw_api: list, name: str, district: str,
|
||
keyword: str) -> list[dict]:
|
||
"""解析已捕获的小红书搜索 API JSON → 全结构化证据记录(含 PII)。
|
||
|
||
字段容错(小红书结构常变,沿用 test.py 的多路径兜底)。
|
||
"""
|
||
out: list[dict] = []
|
||
seen: set = set()
|
||
for j in raw_api or []:
|
||
if not isinstance(j, dict):
|
||
continue
|
||
items = (j.get("data") or {}).get("items") or []
|
||
for it in items:
|
||
nc = it.get("note_card") or it.get("noteCard") or {}
|
||
if not nc:
|
||
continue
|
||
sid = (it.get("id") or nc.get("note_id")
|
||
or nc.get("id") or "")
|
||
if not sid or sid in seen:
|
||
continue
|
||
seen.add(sid)
|
||
u = nc.get("user") or nc.get("User") or {}
|
||
ii = nc.get("interact_info") or nc.get("interactInfo") or {}
|
||
cover = nc.get("cover") or {}
|
||
imgs = []
|
||
for im in (nc.get("image_list") or nc.get("images") or []):
|
||
if isinstance(im, dict):
|
||
iu = (im.get("url_default") or im.get("url")
|
||
or im.get("url_pre") or "")
|
||
if iu:
|
||
imgs.append(iu)
|
||
elif isinstance(im, str):
|
||
imgs.append(im)
|
||
cu = (cover.get("url_default") or cover.get("url") or "") \
|
||
if isinstance(cover, dict) else ""
|
||
if cu and cu not in imgs:
|
||
imgs.insert(0, cu)
|
||
out.append({
|
||
"platform": "xhs", "kind": "note", "source_id": str(sid),
|
||
"url": f"https://www.xiaohongshu.com/explore/{sid}",
|
||
"entity_name": name, "keyword": keyword,
|
||
"title": nc.get("display_title") or nc.get("title") or "",
|
||
"content": nc.get("desc") or nc.get("content") or "",
|
||
"author": u.get("nick_name") or u.get("nickname")
|
||
or u.get("name") or "",
|
||
"author_id": u.get("user_id") or u.get("userId")
|
||
or u.get("id") or "",
|
||
"author_avatar": u.get("avatar") or u.get("image") or "",
|
||
"likes": ii.get("liked_count") or ii.get("likes") or 0,
|
||
"comments": ii.get("comment_count") or 0,
|
||
"collects": ii.get("collected_count") or 0,
|
||
"shares": ii.get("shared_count") or ii.get("share_count") or 0,
|
||
"publish_time": nc.get("time") or nc.get("create_time") or "",
|
||
"location": (nc.get("location") or {}).get("name", "")
|
||
if isinstance(nc.get("location"), dict) else "",
|
||
"tags": [t.get("name", "") for t in (nc.get("tag_list") or [])
|
||
if isinstance(t, dict)],
|
||
"image_urls": imgs,
|
||
"raw": it,
|
||
})
|
||
return out
|
||
|
||
|
||
def _parse_api_comments(raw_api: list, name: str,
|
||
keyword: str) -> list[dict]:
|
||
"""解析已捕获的小红书评论 API JSON → 评论证据(含时间/PII)。
|
||
|
||
评论的"时间"是二期事件抽取的关键锚点。字段多路径容错。
|
||
"""
|
||
out: list[dict] = []
|
||
seen: set = set()
|
||
for j in raw_api or []:
|
||
if not isinstance(j, dict):
|
||
continue
|
||
data = j.get("data") or {}
|
||
comments = data.get("comments")
|
||
if not comments:
|
||
continue
|
||
note_id = data.get("note_id") or ""
|
||
for cm in comments:
|
||
cid = cm.get("comment_id") or cm.get("id") or ""
|
||
if not cid or cid in seen:
|
||
continue
|
||
seen.add(cid)
|
||
u = (cm.get("user_info") or cm.get("user")
|
||
or cm.get("User") or cm.get("author") or {})
|
||
imgs = []
|
||
for im in (cm.get("pictures") or cm.get("images")
|
||
or cm.get("image_list") or []):
|
||
if isinstance(im, dict):
|
||
iu = im.get("url") or im.get("url_default") or ""
|
||
if iu:
|
||
imgs.append(iu)
|
||
elif isinstance(im, str):
|
||
imgs.append(im)
|
||
reps = (cm.get("sub_comments") or cm.get("replies")
|
||
or cm.get("reply_list") or [])
|
||
out.append({
|
||
"platform": "xhs", "kind": "comment", "source_id": str(cid),
|
||
"url": f"https://www.xiaohongshu.com/explore/{note_id}"
|
||
if note_id else "",
|
||
"entity_name": name, "keyword": keyword,
|
||
"title": "", "content": cm.get("content")
|
||
or cm.get("text") or "",
|
||
"author": u.get("nickname") or u.get("nick_name")
|
||
or u.get("name") or "",
|
||
"author_id": u.get("user_id") or u.get("userId")
|
||
or u.get("id") or "",
|
||
"author_avatar": u.get("image") or u.get("avatar") or "",
|
||
"likes": cm.get("like_count") or cm.get("likes") or 0,
|
||
"comments": len(reps) if isinstance(reps, list) else 0,
|
||
"collects": 0, "shares": 0,
|
||
"publish_time": str(cm.get("create_time")
|
||
or cm.get("time") or ""),
|
||
"location": cm.get("ip_location") or "",
|
||
"tags": [], "image_urls": imgs,
|
||
"raw": {"note_id": note_id, **cm},
|
||
})
|
||
return out
|
||
|
||
|
||
# 专用持久化用户目录(与你日常 Chrome 隔离;一次人工登录后 cookie 常驻)
|
||
XHS_PROFILE_DIR = os.path.expanduser("~/.zn-kg/xhs-profile")
|
||
_SEARCH = "https://www.xiaohongshu.com/search_result?keyword={kw}"
|
||
_LISTEN = ["/api/sns/web/v1/feed", "/api/sns/web/v2/feed",
|
||
"/api/sns/web/v1/search/notes", "/api/sns/web/v1/comment/list",
|
||
"/api/sns/web/v2/comment/page", "/api/sns/web/v1/comment/sub/page"]
|
||
_DEEP_NOTES = 3 # 评论深采:每次点开前 N 个帖子滚动捞评论
|
||
_DEEP_SCROLL = 30 # 单帖最多滚动多少次(以评论接口不再增长提前停)
|
||
_DEEP_STALL = 4 # 连续 N 次滚动无新评论包 → 判该帖评论已捞尽
|
||
|
||
_TAB_SYS = """你在小红书搜索结果页选筛选 tab。给你:业务需求 + 当前页可选 tab 列表。
|
||
按业务需求选**最相关的一个** tab 名;拿不准就选「综合」。
|
||
只输出 JSON:{"tab":"要点的tab名","reason":"一句话"}"""
|
||
|
||
_TAG_SYS = """你从小红书 UGC(帖子标题/正文/评论)提炼某地点的"体验标签"。
|
||
只保留多条内容相互印证、对该地点有信息量的短标签(如:酸汤鱼必点、排队久、
|
||
人均50、网红打卡、本地人推荐、环境一般)。不编造,噪声/广告/无关一律丢弃。
|
||
只输出 JSON:{"tags":["标签1","标签2"],"sentiment":"正面|中性|负面|混合",
|
||
"summary":"一句话口碑概述"}"""
|
||
|
||
|
||
def _unlock_profile() -> None:
|
||
"""清理被 kill 的 Chromium 残留的 Singleton 死锁。
|
||
|
||
本机单用户串行使用该持久化目录(编排器 xhs 步串行 + 登录脚本手动),
|
||
Singleton* 几乎总是上次进程被 kill 的残留;清掉可避免
|
||
"profile already in use" 误报。若真有活进程占用,新进程仍会自旋安全退出。
|
||
"""
|
||
for n in ("SingletonLock", "SingletonCookie", "SingletonSocket"):
|
||
try:
|
||
os.unlink(os.path.join(XHS_PROFILE_DIR, n))
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
async def _build_llm() -> LlmClient | None:
|
||
try:
|
||
cfg = await get_agent_settings()
|
||
except Exception:
|
||
cfg = {}
|
||
g = cfg.get("global", {}) if cfg else {}
|
||
a = (cfg.get("agents", {}) or {}).get("xhs_agent", {}) if cfg else {}
|
||
if a and a.get("enabled") is False:
|
||
return None
|
||
key = a.get("api_key") or g.get("api_key") or settings.llm_api_key
|
||
base = a.get("base_url") or g.get("base_url") or settings.llm_api_base
|
||
if not key or not base:
|
||
return None
|
||
model = a.get("model") or g.get("model") or settings.llm_model or "deepseek-chat"
|
||
return LlmClient(api_base=base, api_key=key, model=model,
|
||
timeout=int(g.get("timeout") or 120))
|
||
|
||
|
||
def _collect(keyword: str, pick_tab_cb, deep: bool = False) -> dict:
|
||
"""Playwright 持久化上下文:复用我们已验证的隐身栈 + cookie 持久化。
|
||
|
||
返回 {logged_in, notes:[{title,author,likes}], raw_api:[...], tabs:[...]}。
|
||
pick_tab_cb(tab_labels:list[str]) -> str|None 由上层注入(含 AI 决策)。
|
||
持久化目录里一次人工登录(scripts/xhs_login.py)后,此处无头复用 cookie。
|
||
"""
|
||
from app.agents.web_agent import _STEALTH_JS, _UA, _CHROME_ARGS
|
||
os.makedirs(XHS_PROFILE_DIR, exist_ok=True)
|
||
_unlock_profile()
|
||
captured: list = []
|
||
notes: list = []
|
||
seen: set = set()
|
||
labels: list = []
|
||
try:
|
||
from playwright.sync_api import sync_playwright
|
||
with sync_playwright() as p:
|
||
ctx = p.chromium.launch_persistent_context(
|
||
user_data_dir=XHS_PROFILE_DIR, headless=True,
|
||
args=_CHROME_ARGS, ignore_default_args=["--enable-automation"],
|
||
user_agent=_UA, locale="zh-CN",
|
||
viewport={"width": 1440, "height": 900})
|
||
ctx.add_init_script(_STEALTH_JS)
|
||
|
||
def _on_resp(resp):
|
||
u = resp.url
|
||
if any(k in u for k in _LISTEN):
|
||
try:
|
||
captured.append(resp.json())
|
||
except Exception:
|
||
pass
|
||
ctx.on("response", _on_resp)
|
||
pg = ctx.pages[0] if ctx.pages else ctx.new_page()
|
||
pg.goto(_SEARCH.format(kw=quote(keyword)),
|
||
wait_until="domcontentloaded", timeout=45000)
|
||
pg.wait_for_timeout(4000)
|
||
|
||
note_eles = pg.query_selector_all("section.note-item")
|
||
html = pg.content()
|
||
if not note_eles and ("/login" in pg.url or "扫码登录" in html
|
||
or "手机号登录" in html):
|
||
ctx.close()
|
||
return {"logged_in": False, "notes": [], "raw_api": []}
|
||
|
||
# AI 选 tab
|
||
try:
|
||
tabs = pg.query_selector_all(
|
||
"div.content-container button.tab")
|
||
labels = [(t.get_attribute("aria-details")
|
||
or t.inner_text() or "").strip() for t in tabs]
|
||
labels = [x for x in labels if x]
|
||
chosen = pick_tab_cb(labels) if labels else None
|
||
if chosen and chosen != "综合":
|
||
for t in tabs:
|
||
lab = (t.get_attribute("aria-details")
|
||
or t.inner_text() or "").strip()
|
||
if lab == chosen:
|
||
t.click()
|
||
pg.wait_for_timeout(3000)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
for _ in range(6):
|
||
for it in pg.query_selector_all("section.note-item"):
|
||
try:
|
||
te = (it.query_selector(".title")
|
||
or it.query_selector("a.title"))
|
||
title = (te.inner_text().strip() if te else "")
|
||
if not title or title in seen:
|
||
continue
|
||
seen.add(title)
|
||
au = (it.query_selector(".author .name")
|
||
or it.query_selector(".name"))
|
||
lk = (it.query_selector(".like-wrapper .count")
|
||
or it.query_selector(".count"))
|
||
notes.append({
|
||
"title": title,
|
||
"author": au.inner_text().strip() if au else "",
|
||
"likes": lk.inner_text().strip() if lk else ""})
|
||
except Exception:
|
||
continue
|
||
try:
|
||
pg.mouse.wheel(0, 2400)
|
||
except Exception:
|
||
pass
|
||
pg.wait_for_timeout(int(random.uniform(1200, 2000)))
|
||
if len(notes) >= 40:
|
||
break
|
||
|
||
# ── 评论深采:现版小红书点 cover = 整页跳转(非弹层),
|
||
# 故每帖在**独立新标签页**打开 note 详情,搜索页 pg 不受影响;
|
||
# ctx 级监听照样捕获 /comment 接口;以"评论接口数据增长"驱动
|
||
# 滚动(真滚轮+容器scrollTo+End),连续无新增或 THE END 即停。
|
||
_SCROLLERS = (".note-scroller", ".comment-container",
|
||
".interaction-container", ".comments-container")
|
||
|
||
def _ccount() -> int:
|
||
t = 0
|
||
for j in captured:
|
||
if isinstance(j, dict):
|
||
cs = (j.get("data") or {}).get("comments")
|
||
if cs:
|
||
t += len(cs)
|
||
return t
|
||
|
||
if deep:
|
||
search_url = pg.url
|
||
for idx in range(_DEEP_NOTES):
|
||
try:
|
||
covers = pg.query_selector_all(
|
||
"section.note-item a.cover")
|
||
if idx >= len(covers):
|
||
break
|
||
# SPA 内点击进帖(浏览器自带签名发评论请求),等导航完成
|
||
try:
|
||
with pg.expect_navigation(timeout=15000):
|
||
covers[idx].click()
|
||
except Exception:
|
||
covers[idx].click()
|
||
pg.wait_for_load_state("domcontentloaded",
|
||
timeout=20000)
|
||
pg.wait_for_timeout(3200) # 详情+首屏评论
|
||
if not pg.query_selector(".no-comments"):
|
||
stall = 0
|
||
prev = _ccount()
|
||
for _ in range(_DEEP_SCROLL):
|
||
scs = [x for x in
|
||
(pg.query_selector(s)
|
||
for s in _SCROLLERS) if x]
|
||
try:
|
||
for sc in (scs or [None]):
|
||
if sc:
|
||
box = sc.bounding_box()
|
||
if box:
|
||
pg.mouse.move(
|
||
box["x"] + box["width"]/2,
|
||
box["y"] + box["height"]/2)
|
||
pg.mouse.wheel(0, 4000)
|
||
pg.evaluate(
|
||
"(e)=>e&&e.scrollTo("
|
||
"0,e.scrollHeight)", sc)
|
||
else:
|
||
pg.mouse.wheel(0, 4000)
|
||
pg.keyboard.press("End")
|
||
except Exception:
|
||
pass
|
||
pg.wait_for_timeout(
|
||
int(random.uniform(1800, 2600)))
|
||
if (pg.query_selector(".end-container")
|
||
or pg.query_selector(
|
||
".comment-end-container")):
|
||
break # THE END
|
||
cur = _ccount()
|
||
if cur > prev:
|
||
prev, stall = cur, 0
|
||
else:
|
||
stall += 1
|
||
if stall >= _DEEP_STALL:
|
||
break # 无新增→捞尽
|
||
pg.wait_for_timeout(3000) # 等末尾评论包
|
||
# 返回搜索结果 SPA,准备下一帖
|
||
try:
|
||
with pg.expect_navigation(timeout=15000):
|
||
pg.go_back()
|
||
except Exception:
|
||
pg.goto(search_url,
|
||
wait_until="domcontentloaded",
|
||
timeout=30000)
|
||
pg.wait_for_load_state("domcontentloaded",
|
||
timeout=20000)
|
||
pg.wait_for_timeout(
|
||
int(random.uniform(1500, 2400)))
|
||
except Exception:
|
||
try: # 卡住就回搜索页,保证后续不挂
|
||
pg.goto(search_url,
|
||
wait_until="domcontentloaded",
|
||
timeout=30000)
|
||
pg.wait_for_timeout(1500)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
ctx.close()
|
||
# 已登录的小红书搜索页必有 tab 栏/搜索API;三者皆空 ⇒ 实为登录墙/拦截
|
||
if not notes and not labels and not captured:
|
||
return {"logged_in": False, "notes": [], "raw_api": []}
|
||
return {"logged_in": True, "notes": notes[:40],
|
||
"raw_api": captured, "tabs": labels}
|
||
except Exception as e: # noqa: BLE001
|
||
return {"error": str(e)[:140]}
|
||
|
||
|
||
async def xhs_enrich(entity: dict) -> dict:
|
||
"""对某 Place 联小红书采 UGC → 体验标签。
|
||
|
||
返回 {ok, found, need_login, tags, sentiment, summary}。
|
||
ok=False=未配置/停用;need_login=True=要人工一次性登录(升级)。
|
||
"""
|
||
llm = await _build_llm()
|
||
if llm is None:
|
||
return {"ok": False, "summary": "xhs_agent 未配置或停用"}
|
||
|
||
name = entity.get("name", "")
|
||
district = entity.get("district", "")
|
||
biz = entity.get("biz_need") or f"补全「{name}」的真实口碑/体验/网红热度"
|
||
keyword = f"贵阳 {name}".strip()
|
||
|
||
def _pick(labels: list[str]) -> str | None:
|
||
try:
|
||
r = llm.chat_json(_TAB_SYS, json.dumps(
|
||
{"业务需求": biz, "可选tab": labels}, ensure_ascii=False))
|
||
return (r or {}).get("tab")
|
||
except Exception:
|
||
return None
|
||
|
||
res = await asyncio.to_thread(_collect, keyword, _pick, True)
|
||
if res.get("error"):
|
||
return {"ok": True, "found": False, "summary": f"采集异常:{res['error']}"}
|
||
if res.get("logged_in") is False:
|
||
return {"ok": True, "found": False, "need_login": True,
|
||
"summary": "小红书未登录,需一次性人工登录(已升级)"}
|
||
|
||
dom_notes = res.get("notes") or []
|
||
pnk = entity.get("eid") or entity.get("natural_key")
|
||
|
||
# 1) 优先用已捕获的官方 API → 全结构化证据(含 PII,按你的决策全留)
|
||
records = _parse_api_notes(res.get("raw_api") or [], name,
|
||
district, keyword)
|
||
# 2) API 没捕到则用 DOM 卡片作轻证据(标题哈希作稳定 source_id)
|
||
if not records and dom_notes:
|
||
for n in dom_notes:
|
||
t = n.get("title") or ""
|
||
if not t:
|
||
continue
|
||
sid = "domhash:" + hashlib.md5(
|
||
(name + "|" + t).encode()).hexdigest()[:16]
|
||
records.append({
|
||
"platform": "xhs", "kind": "note_lite", "source_id": sid,
|
||
"url": "", "entity_name": name, "keyword": keyword,
|
||
"title": t, "content": "", "author": n.get("author", ""),
|
||
"author_id": "", "author_avatar": "",
|
||
"likes": 0, "comments": 0, "collects": 0, "shares": 0,
|
||
"publish_time": "", "location": "",
|
||
"tags": [], "image_urls": [], "raw": n})
|
||
|
||
# 3) 评论深采(滚动加载所得)→ 评论证据(含时间,事件抽取锚点)
|
||
comment_records = _parse_api_comments(res.get("raw_api") or [],
|
||
name, keyword)
|
||
records += comment_records
|
||
|
||
if not records:
|
||
return {"ok": True, "found": False,
|
||
"summary": f"「{name}」小红书无相关 UGC,跳过"}
|
||
|
||
for rec in records:
|
||
rec["place_natural_key"] = pnk
|
||
try:
|
||
saved = await sa_save_evidence(records) # 原始证据入库
|
||
except Exception:
|
||
saved = 0
|
||
n_note = sum(1 for x in records if x["kind"] != "comment")
|
||
n_cmt = len(comment_records)
|
||
|
||
# 4) 体验标签仅从帖子语料派生(评论留给事件抽取),可溯源
|
||
corpus = json.dumps(
|
||
{"地点": name, "区县": district,
|
||
"帖子": [{"标题": x["title"], "赞": x.get("likes", 0)}
|
||
for x in records if x["kind"] != "comment"][:40]},
|
||
ensure_ascii=False)
|
||
try:
|
||
r = await asyncio.to_thread(llm.chat_json, _TAG_SYS, corpus)
|
||
except Exception as e: # noqa: BLE001
|
||
return {"ok": True, "found": True, "evidence_saved": saved,
|
||
"tags": [], "note_count": len(records),
|
||
"summary": f"证据入库{saved}条,标签提炼失败:{str(e)[:50]}"}
|
||
tags = [str(t).strip()[:24] for t in (r.get("tags") or []) if str(t).strip()]
|
||
return {"ok": True, "found": True,
|
||
"tags": tags[:12], "sentiment": r.get("sentiment", ""),
|
||
"evidence_saved": saved, "note_count": n_note,
|
||
"comment_count": n_cmt,
|
||
"summary": f"小红书 帖{n_note}+评论{n_cmt} 入证据层(存{saved}) → "
|
||
f"{len(tags)} 体验标签·口碑{r.get('sentiment','?')}"}
|