850 lines
32 KiB
Python
850 lines
32 KiB
Python
"""抖音子 Agent(三期,移植自 douyin_visible_probe 的成熟链路)。
|
||
|
||
核心修复:之前 comment=0 的原因是没走通"进详情 → 点全屏 → 显式开评论 →
|
||
滚入视口 → 找到真正可滚的元素 → 听 /comment API + DOM 双轨"。本版本把
|
||
probe 已经跑通的所有技巧搬进来,自治版同样能拉到全量评论。
|
||
|
||
外部接口保持不变:
|
||
- `douyin_enrich(entity) -> dict` (供编排器 _douyin_enrich)
|
||
- `DY_PROFILE_DIR`, `_unlock` (供 scripts/douyin_login.py)
|
||
对内做法升级:
|
||
- listen 同时记 url+status+body(评论可借 url 回填 note_id)
|
||
- 解析用 iter_lists_by_key 深递归,适配 douyin JSON 嵌套多变
|
||
- 详情页:click_or_goto + 全屏按钮 + 显式点开评论 + 滚入视口 + 智能滚动
|
||
- 终止条件:API 增长 + DOM 项 + "暂时没有更多评论"任一触发
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import random
|
||
import time
|
||
from urllib.parse import parse_qs, quote, urlparse
|
||
|
||
from app.config import settings
|
||
from app.db import get_agent_settings, sa_save_evidence
|
||
from app.llm_client import LlmClient
|
||
|
||
# ── 路径 / 端点 ─────────────────────────────────────────────────────────
|
||
DY_PROFILE_DIR = os.path.expanduser("~/.zn-kg/douyin-profile")
|
||
_SEARCH = "https://www.douyin.com/search/{kw}?type=general"
|
||
|
||
# 监听:解析与计数走这里(精准)
|
||
_LISTEN = (
|
||
"/aweme/v1/web/aweme/detail",
|
||
"/aweme/v1/web/comment/list",
|
||
"/aweme/v1/web/comment/list/reply",
|
||
"/aweme/v1/web/general/search/stream",
|
||
"/aweme/v1/web/general/search/single",
|
||
"/aweme/v1/web/search/item",
|
||
"/aweme/v1/web/general/search",
|
||
)
|
||
# 诊断:URL 记录走这里(更宽,排错可视)
|
||
_DIAG = ("https://www.douyin.com/aweme/v1/web/", "https://www.douyin.com/search/")
|
||
|
||
_COMMENT_SCROLLERS = (
|
||
"[data-e2e='comment-list']",
|
||
"[data-e2e='detail-comment']",
|
||
".comment-list",
|
||
".comment-mainContent",
|
||
".ESlRWJ2j",
|
||
)
|
||
_DOM_COMMENT_ITEM = "[data-e2e='comment-item']"
|
||
|
||
# ── 调参(全为模块常量,后续要调直接改) ──────────────────────────────────
|
||
_DEEP_NOTES = 3 # 单次最多深采几个视频
|
||
_SEARCH_SCROLLS = 6 # 搜索页最多滚几次
|
||
_MAX_SEARCH_LINKS = 40
|
||
_SEARCH_WAIT_MS = 4500 # 搜索页首屏等
|
||
_SEARCH_API_WAIT_MS = 12000 # 等搜索 API 出货
|
||
_DETAIL_WAIT_MS = 3800 # 进详情后给页面布局时间
|
||
_FULLSCREEN_WAIT_MS = 1800
|
||
_COMMENT_WAIT_MS = 3500
|
||
_COMMENT_SCROLLS = 30
|
||
_DEEP_STALL = 4 # 连续 N 次评论无新增即停
|
||
|
||
# ── 提示词 ─────────────────────────────────────────────────────────────
|
||
_TAB_SYS = """你在抖音搜索结果页选 tab。给业务需求 + 当前可选 tab 列表,
|
||
选最相关的一个 tab 名;拿不准选「综合」。
|
||
只输出 JSON:{"tab":"tab名","reason":"一句话"}"""
|
||
|
||
_TAG_SYS = """你从抖音 UGC(视频标题/文案/评论)提炼某地点的"体验标签"。
|
||
只留多条相互印证、对该地点有信息量的短标签(如:酸汤鱼必点、排队久、人均50、
|
||
网红打卡、本地人推荐、环境一般)。广告/无关/噪声一律丢弃。
|
||
只输出 JSON:{"tags":["标签1"],"sentiment":"正面|中性|负面|混合","summary":"一句话口碑"}"""
|
||
|
||
|
||
# ── 通用工具 ────────────────────────────────────────────────────────────
|
||
def _unlock(profile_dir: str = DY_PROFILE_DIR) -> None:
|
||
"""清掉上次被 kill 的 Chromium Singleton 死锁。"""
|
||
for name in ("SingletonLock", "SingletonCookie", "SingletonSocket"):
|
||
try:
|
||
os.unlink(os.path.join(profile_dir, name))
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _scalar(obj, *keys, default=""):
|
||
if not isinstance(obj, dict):
|
||
return default
|
||
for k in keys:
|
||
v = obj.get(k)
|
||
if v not in (None, "", [], {}):
|
||
return v
|
||
return default
|
||
|
||
|
||
def _first_avatar(user) -> str:
|
||
if not isinstance(user, dict):
|
||
return ""
|
||
av = user.get("avatar_thumb") or user.get("avatar_medium") or {}
|
||
urls = av.get("url_list") if isinstance(av, dict) else None
|
||
if isinstance(urls, list) and urls:
|
||
return str(urls[0] or "")
|
||
return ""
|
||
|
||
|
||
def _as_int(value) -> int:
|
||
try:
|
||
if value in (None, ""):
|
||
return 0
|
||
return int(value)
|
||
except (TypeError, ValueError):
|
||
return 0
|
||
|
||
|
||
def _body_of(entry):
|
||
if isinstance(entry, dict) and "__body" in entry:
|
||
return entry.get("__body")
|
||
return entry
|
||
|
||
|
||
def _url_of(entry) -> str:
|
||
if isinstance(entry, dict):
|
||
return str(entry.get("__url") or "")
|
||
return ""
|
||
|
||
|
||
def _aweme_id_from_url(url: str) -> str:
|
||
try:
|
||
qs = parse_qs(urlparse(url).query)
|
||
except Exception:
|
||
return ""
|
||
for k in ("aweme_id", "item_id", "group_id"):
|
||
v = qs.get(k)
|
||
if v:
|
||
return str(v[0] or "")
|
||
return ""
|
||
|
||
|
||
def _iter_lists_by_key(obj, keys: set):
|
||
"""深递归找到所有匹配 key 的 list 节点(适配 douyin 多变嵌套)。"""
|
||
if isinstance(obj, dict):
|
||
for k, v in obj.items():
|
||
if k in keys and isinstance(v, list):
|
||
yield v
|
||
elif isinstance(v, (dict, list)):
|
||
yield from _iter_lists_by_key(v, keys)
|
||
elif isinstance(obj, list):
|
||
for it in obj:
|
||
yield from _iter_lists_by_key(it, keys)
|
||
|
||
|
||
# ── 解析(深递归+多路径) ────────────────────────────────────────────────
|
||
def _parse_dy_notes(raw_api: list, name: str, keyword: str) -> list[dict]:
|
||
out, seen = [], set()
|
||
for entry in raw_api or []:
|
||
body = _body_of(entry)
|
||
if not isinstance(body, dict):
|
||
continue
|
||
cands: list = []
|
||
data = body.get("data")
|
||
d_dict = data if isinstance(data, dict) else {}
|
||
for v in (
|
||
data if isinstance(data, list) else None,
|
||
body.get("aweme_detail"),
|
||
body.get("aweme_list"),
|
||
d_dict.get("aweme_detail"),
|
||
d_dict.get("aweme_list"),
|
||
d_dict.get("data"),
|
||
):
|
||
if isinstance(v, dict):
|
||
cands.append(v)
|
||
elif isinstance(v, list):
|
||
cands.extend(v)
|
||
for nested in _iter_lists_by_key(body, {"aweme_list"}):
|
||
cands.extend(nested)
|
||
|
||
for item in cands:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
aw = item.get("aweme_info") or item.get("aweme") or item
|
||
if not isinstance(aw, dict):
|
||
continue
|
||
aid = _scalar(aw, "aweme_id", "group_id", "awemeId")
|
||
if not aid or str(aid) in seen:
|
||
continue
|
||
seen.add(str(aid))
|
||
au = aw.get("author") or {}
|
||
st = aw.get("statistics") or {}
|
||
out.append({
|
||
"platform": "douyin", "kind": "note", "source_id": str(aid),
|
||
"url": f"https://www.douyin.com/video/{aid}",
|
||
"entity_name": name, "keyword": keyword,
|
||
"title": _scalar(aw, "desc", "title", "caption"),
|
||
"content": _scalar(aw, "desc", "content"),
|
||
"author": _scalar(au, "nickname", "name"),
|
||
"author_id": str(_scalar(au, "uid", "sec_uid", "unique_id")),
|
||
"author_avatar": _first_avatar(au),
|
||
"likes": _as_int(_scalar(st, "digg_count", "admire_count",
|
||
default=0)),
|
||
"comments": _as_int(_scalar(st, "comment_count", default=0)),
|
||
"collects": _as_int(_scalar(st, "collect_count",
|
||
"favorite_count",
|
||
"collects_count", default=0)),
|
||
"shares": _as_int(_scalar(st, "share_count",
|
||
"share_count_reflow", default=0)),
|
||
"publish_time": str(_scalar(aw, "create_time", default="")),
|
||
"location": "", "tags": [], "image_urls": [], "raw": item,
|
||
})
|
||
return out
|
||
|
||
|
||
def _parse_dy_comments(raw_api: list, name: str, keyword: str) -> list[dict]:
|
||
out, seen = [], set()
|
||
for entry in raw_api or []:
|
||
body = _body_of(entry)
|
||
if not isinstance(body, dict):
|
||
continue
|
||
# 评论缺 aweme_id 时,可从响应 URL 的 query 兜回填
|
||
fallback_aid = _aweme_id_from_url(_url_of(entry))
|
||
for comments in _iter_lists_by_key(
|
||
body, {"comments", "comment_list", "reply_comments"}):
|
||
for cm in comments:
|
||
if not isinstance(cm, dict):
|
||
continue
|
||
cid = _scalar(cm, "cid", "comment_id", "id")
|
||
text = _scalar(cm, "text", "content")
|
||
if not cid or not text or str(cid) in seen:
|
||
continue
|
||
seen.add(str(cid))
|
||
u = cm.get("user") or cm.get("user_info") or {}
|
||
aid = _scalar(cm, "aweme_id", default=fallback_aid)
|
||
replies = _scalar(cm, "reply_comment_total",
|
||
"reply_total", default=0)
|
||
out.append({
|
||
"platform": "douyin", "kind": "comment",
|
||
"source_id": str(cid),
|
||
"url": (f"https://www.douyin.com/video/{aid}"
|
||
if aid else ""),
|
||
"entity_name": name, "keyword": keyword,
|
||
"title": "", "content": text,
|
||
"author": _scalar(u, "nickname", "name"),
|
||
"author_id": str(_scalar(u, "uid", "sec_uid")),
|
||
"author_avatar": _first_avatar(u),
|
||
"likes": _as_int(_scalar(cm, "digg_count", "like_count",
|
||
default=0)),
|
||
"comments": _as_int(replies),
|
||
"collects": 0, "shares": 0,
|
||
"publish_time": str(_scalar(cm, "create_time",
|
||
default="")),
|
||
"location": _scalar(cm, "ip_label", default=""),
|
||
"tags": [], "image_urls": [], "raw": cm,
|
||
})
|
||
return out
|
||
|
||
|
||
def _comment_count_from_raw(raw_api: list) -> int:
|
||
n = 0
|
||
for entry in raw_api or []:
|
||
body = _body_of(entry)
|
||
if not isinstance(body, dict):
|
||
continue
|
||
for cms in _iter_lists_by_key(
|
||
body, {"comments", "comment_list", "reply_comments"}):
|
||
n += len(cms)
|
||
return n
|
||
|
||
|
||
# ── 浏览器交互(全部从 probe 移植,headless 下大多照样能跑) ────────────
|
||
def _comment_panel_present(pg) -> bool:
|
||
try:
|
||
return bool(pg.evaluate(
|
||
"""
|
||
(selectors) => {
|
||
const ok = (el) => {
|
||
if (!el) return false;
|
||
const r = el.getBoundingClientRect();
|
||
const s = getComputedStyle(el);
|
||
const w = Math.min(r.right, innerWidth) - Math.max(r.left, 0);
|
||
const h = Math.min(r.bottom, innerHeight) - Math.max(r.top, 0);
|
||
return r.width > 120 && r.height > 120 && w > 80 && h > 80 &&
|
||
s.display !== 'none' && s.visibility !== 'hidden' &&
|
||
Number(s.opacity || '1') > 0.05;
|
||
};
|
||
for (const sel of selectors) {
|
||
for (const el of document.querySelectorAll(sel)) {
|
||
if (ok(el)) return true;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
""", list(_COMMENT_SCROLLERS)))
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _wait_for_comment_panel(pg, timeout_ms: int = 3000) -> bool:
|
||
deadline = time.time() + timeout_ms / 1000
|
||
while time.time() < deadline:
|
||
if _comment_panel_present(pg):
|
||
return True
|
||
pg.wait_for_timeout(180)
|
||
return _comment_panel_present(pg)
|
||
|
||
|
||
def _reveal_player_controls(pg) -> None:
|
||
for sel in (".xgplayer", "[class*='xgplayer']", "video"):
|
||
try:
|
||
loc = pg.locator(sel).first
|
||
if loc.count() <= 0:
|
||
continue
|
||
box = loc.bounding_box(timeout=1200)
|
||
if not box:
|
||
continue
|
||
pg.mouse.move(box["x"] + box["width"] * 0.72,
|
||
box["y"] + box["height"] * 0.82)
|
||
pg.wait_for_timeout(700)
|
||
return
|
||
except Exception:
|
||
continue
|
||
try:
|
||
vp = pg.viewport_size or {"width": 1440, "height": 900}
|
||
pg.mouse.move(vp["width"] * 0.55, vp["height"] * 0.78)
|
||
pg.wait_for_timeout(700)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _maybe_click_fullscreen(pg) -> bool:
|
||
"""点全屏按钮 / 按 f,详情面板布局更稳。"""
|
||
_reveal_player_controls(pg)
|
||
for sel in (
|
||
".xgplayer-icon:has(.xg-get-fullscreen)",
|
||
".xgplayer-icon .xg-get-fullscreen", ".xg-get-fullscreen",
|
||
".xgplayer-fullscreen", "[aria-label*='全屏']",
|
||
"[title*='全屏']", "button:has-text('全屏')",
|
||
):
|
||
try:
|
||
_reveal_player_controls(pg)
|
||
loc = pg.locator(sel).first
|
||
if loc.count() <= 0:
|
||
continue
|
||
try:
|
||
loc.scroll_into_view_if_needed(timeout=1800)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
loc.hover(timeout=1200)
|
||
except Exception:
|
||
pass
|
||
loc.click(timeout=1800)
|
||
pg.wait_for_timeout(1600)
|
||
return True
|
||
except Exception:
|
||
continue
|
||
try:
|
||
_reveal_player_controls(pg)
|
||
pg.keyboard.press("f")
|
||
pg.wait_for_timeout(1200)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _maybe_click_comments(pg) -> bool:
|
||
"""显式点开评论按钮(关键:有些详情页评论默认不展开)。"""
|
||
if _wait_for_comment_panel(pg, 800):
|
||
return True
|
||
for sel in (
|
||
"[data-e2e='feed-comment-icon']", "[data-e2e='feed-comment']",
|
||
"[data-e2e='comment-icon']", "[data-e2e='video-comment']",
|
||
"[aria-label*='评论']", "[title*='评论']",
|
||
"button:has-text('评论')", "[role='button']:has-text('评论')",
|
||
"text=评论",
|
||
):
|
||
try:
|
||
loc = pg.locator(sel).first
|
||
if loc.count() > 0:
|
||
try:
|
||
loc.scroll_into_view_if_needed(timeout=1800)
|
||
except Exception:
|
||
pass
|
||
loc.click(timeout=1200)
|
||
if _wait_for_comment_panel(pg, 2200):
|
||
return True
|
||
except Exception:
|
||
continue
|
||
# 兜底:文本/SVG path 启发式搜索可点位置
|
||
try:
|
||
points = pg.evaluate(
|
||
"""
|
||
() => {
|
||
const hasPanel = () => Boolean(
|
||
[...document.querySelectorAll(
|
||
"[data-e2e='comment-list'],.comment-mainContent")]
|
||
.some((el) => {
|
||
const r = el.getBoundingClientRect();
|
||
const s = getComputedStyle(el);
|
||
return r.width > 120 && r.height > 120 &&
|
||
s.display !== 'none' && s.visibility !== 'hidden';
|
||
})
|
||
);
|
||
if (hasPanel()) return [{ alreadyOpen: true }];
|
||
const visible = (el) => {
|
||
const r = el.getBoundingClientRect();
|
||
const s = getComputedStyle(el);
|
||
return r.width > 6 && r.height > 6 &&
|
||
s.visibility !== 'hidden' && s.display !== 'none';
|
||
};
|
||
const points = [];
|
||
const add = (el) => {
|
||
let t = el.closest('button,[role="button"],[tabindex],a') || el;
|
||
if (!visible(t)) return;
|
||
const r = t.getBoundingClientRect();
|
||
points.push({
|
||
x: Math.round(r.left + r.width / 2),
|
||
y: Math.round(r.top + r.height / 2),
|
||
});
|
||
};
|
||
for (const el of document.querySelectorAll(
|
||
'button,[role="button"],[tabindex],[aria-label],span')) {
|
||
const tx = [el.innerText || '',
|
||
el.getAttribute('aria-label') || '',
|
||
el.getAttribute('title') || ''].join(' ');
|
||
if (!/评论/.test(tx) || /评论区|评论列表|暂无评论/.test(tx))
|
||
continue;
|
||
add(el);
|
||
}
|
||
return points;
|
||
}
|
||
""") or []
|
||
for p in points:
|
||
if p.get("alreadyOpen"):
|
||
return True
|
||
x, y = p.get("x"), p.get("y")
|
||
if not isinstance(x, int) or not isinstance(y, int):
|
||
continue
|
||
pg.mouse.move(x, y)
|
||
pg.mouse.click(x, y)
|
||
if _wait_for_comment_panel(pg, 2200):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
|
||
def _move_into_box(pg, box) -> None:
|
||
vp = pg.viewport_size or {"width": 1440, "height": 900}
|
||
x = min(max(box["x"] + box["width"] / 2, 8), vp["width"] - 8)
|
||
y = min(max(box["y"] + min(box["height"] / 2, 260), 8), vp["height"] - 8)
|
||
pg.mouse.move(x, y)
|
||
|
||
|
||
def _bring_comments_into_view(pg) -> bool:
|
||
for _ in range(4):
|
||
for sel in _COMMENT_SCROLLERS:
|
||
try:
|
||
loc = pg.locator(sel).first
|
||
if loc.count() <= 0:
|
||
continue
|
||
loc.scroll_into_view_if_needed(timeout=5000)
|
||
pg.wait_for_timeout(1800)
|
||
sc = pg.query_selector(sel)
|
||
if sc:
|
||
box = sc.bounding_box()
|
||
if box:
|
||
_move_into_box(pg, box)
|
||
return True
|
||
except Exception:
|
||
continue
|
||
try:
|
||
pg.evaluate(
|
||
"window.scrollBy(0, Math.round(window.innerHeight*0.68))")
|
||
pg.wait_for_timeout(1700)
|
||
except Exception:
|
||
pass
|
||
return False
|
||
|
||
|
||
def _scroll_comment_panel(pg) -> bool:
|
||
"""找到真正可滚的容器(用 getComputedStyle 判 overflow)并滚到底。"""
|
||
try:
|
||
handle = pg.evaluate_handle(
|
||
"""
|
||
() => {
|
||
const sels = [
|
||
"[data-e2e='comment-list']", ".comment-mainContent",
|
||
"[data-e2e='detail-comment']", ".comment-list", ".ESlRWJ2j"
|
||
];
|
||
const ok = (el) => {
|
||
if (!el) return false;
|
||
const s = getComputedStyle(el);
|
||
return el.scrollHeight > el.clientHeight + 20 ||
|
||
/(auto|scroll)/.test(s.overflowY || '');
|
||
};
|
||
for (const sel of sels) {
|
||
let el = document.querySelector(sel);
|
||
while (el && el !== document.body) {
|
||
if (ok(el) && el.clientHeight > 120) return el;
|
||
el = el.parentElement;
|
||
}
|
||
}
|
||
return document.scrollingElement || document.documentElement;
|
||
}
|
||
"""
|
||
)
|
||
el = handle.as_element()
|
||
except Exception:
|
||
el = None
|
||
try:
|
||
if el:
|
||
box = el.bounding_box()
|
||
if box:
|
||
_move_into_box(pg, box)
|
||
pg.mouse.wheel(0, 2800)
|
||
pg.evaluate(
|
||
"(e)=>{const s=Math.max(900,Math.floor((e.clientHeight||"
|
||
"innerHeight)*1.35));typeof e.scrollBy==='function'?"
|
||
"e.scrollBy(0,s):e.scrollTop+=s;}", el)
|
||
return True
|
||
except Exception:
|
||
pass
|
||
try:
|
||
pg.mouse.wheel(0, 2800)
|
||
pg.keyboard.press("End")
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _dom_comment_count(pg) -> int:
|
||
try:
|
||
return len(pg.query_selector_all(_DOM_COMMENT_ITEM))
|
||
except Exception:
|
||
return 0
|
||
|
||
|
||
def _comments_end_reached(pg) -> bool:
|
||
try:
|
||
return bool(pg.evaluate(
|
||
"() => document.body && "
|
||
"document.body.innerText.includes('暂时没有更多评论')"))
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _goto_detail_url(pg, href: str) -> None:
|
||
try:
|
||
pg.goto(href, wait_until="commit", timeout=30000)
|
||
except Exception:
|
||
if "/video/" not in pg.url:
|
||
raise
|
||
|
||
|
||
def _click_or_goto_detail(pg, link, href: str) -> None:
|
||
"""优先 SPA 内点击(让 douyin 自己发签名请求),失败回退直接 goto。"""
|
||
try:
|
||
with pg.expect_navigation(timeout=15000):
|
||
link.click()
|
||
return
|
||
except Exception:
|
||
pass
|
||
try:
|
||
link.click()
|
||
pg.wait_for_timeout(1800)
|
||
if "/video/" in pg.url:
|
||
return
|
||
except Exception:
|
||
pass
|
||
if href:
|
||
_goto_detail_url(pg, href)
|
||
|
||
|
||
def _detail_candidates(pg, raw_api, name, keyword) -> list[dict]:
|
||
"""搜索页 DOM 链接 + API 已发现的 aweme_id,取并集做深采候选。"""
|
||
cands, seen = [], set()
|
||
try:
|
||
links = pg.query_selector_all("a[href*='/video/']")
|
||
except Exception:
|
||
links = []
|
||
for link in links:
|
||
try:
|
||
href = link.get_attribute("href") or ""
|
||
except Exception:
|
||
continue
|
||
if href.startswith("//"):
|
||
href = "https:" + href
|
||
elif href.startswith("/"):
|
||
href = "https://www.douyin.com" + href
|
||
if not href or href in seen:
|
||
continue
|
||
seen.add(href)
|
||
cands.append({"href": href, "link": link, "source": "dom"})
|
||
for note in _parse_dy_notes(raw_api, name, keyword):
|
||
href = note.get("url") or ""
|
||
if not href or href in seen:
|
||
continue
|
||
seen.add(href)
|
||
cands.append({"href": href, "link": None, "source": "api"})
|
||
return cands
|
||
|
||
|
||
# ── 主流程:_collect ────────────────────────────────────────────────────
|
||
def _collect(keyword: str, pick_tab_cb, deep: bool = True) -> dict:
|
||
"""Playwright 持久化上下文采集(已对齐 probe 的成熟链路)。
|
||
|
||
返回 {logged_in, raw_api(含 __url/__body), tabs[], notes_dom[],
|
||
api_url_count}。raw_api 直接喂给 _parse_dy_notes/_comments。
|
||
"""
|
||
from app.agents.web_agent import _STEALTH_JS, _UA, _CHROME_ARGS
|
||
os.makedirs(DY_PROFILE_DIR, exist_ok=True)
|
||
_unlock()
|
||
raw_api: list = []
|
||
url_count = 0
|
||
labels: list = []
|
||
try:
|
||
from playwright.sync_api import sync_playwright
|
||
with sync_playwright() as p:
|
||
ctx = p.chromium.launch_persistent_context(
|
||
user_data_dir=DY_PROFILE_DIR, headless=True,
|
||
args=_CHROME_ARGS,
|
||
ignore_default_args=["--enable-automation"],
|
||
user_agent=_UA, locale="zh-CN",
|
||
viewport={"width": 1440, "height": 900})
|
||
ctx.add_init_script(_STEALTH_JS)
|
||
|
||
def _on(resp):
|
||
nonlocal url_count
|
||
u = resp.url
|
||
if not any(p in u for p in _DIAG):
|
||
return
|
||
matched = any(p in u for p in _LISTEN)
|
||
body = None
|
||
try:
|
||
body = resp.json()
|
||
except Exception:
|
||
body = None
|
||
if matched and isinstance(body, dict):
|
||
raw_api.append({"__url": u, "__status": resp.status,
|
||
"__body": body})
|
||
url_count += 1
|
||
ctx.on("response", _on)
|
||
|
||
pg = ctx.pages[0] if ctx.pages else ctx.new_page()
|
||
pg.set_default_timeout(15000)
|
||
|
||
# 1) 搜索
|
||
pg.goto(_SEARCH.format(kw=quote(keyword)),
|
||
wait_until="domcontentloaded", timeout=60000)
|
||
pg.wait_for_timeout(_SEARCH_WAIT_MS)
|
||
html = pg.content()
|
||
cards = pg.query_selector_all("a[href*='/video/']")
|
||
if not cards and ("扫码登录" in html or "手机号登录" in html
|
||
or "验证" in html or "/passport" in pg.url):
|
||
ctx.close()
|
||
return {"logged_in": False, "raw_api": [], "tabs": [],
|
||
"api_url_count": url_count}
|
||
|
||
# AI 选 tab(best-effort)
|
||
try:
|
||
tabs = pg.query_selector_all(
|
||
"[data-e2e='search-tab'] span, .tab-item, "
|
||
"div[role='tab']")
|
||
labels = list({(t.inner_text() or "").strip()
|
||
for t in tabs
|
||
if (t.inner_text() or "").strip()})
|
||
chosen = pick_tab_cb(labels) if labels else None
|
||
if chosen and chosen != "综合":
|
||
for t in tabs:
|
||
if (t.inner_text() or "").strip() == chosen:
|
||
t.click()
|
||
pg.wait_for_timeout(3000)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
# 搜索页继续滚,促 API 出货
|
||
for _ in range(_SEARCH_SCROLLS):
|
||
pg.mouse.wheel(0, 2600)
|
||
pg.wait_for_timeout(int(random.uniform(1000, 1800)))
|
||
if (len(pg.query_selector_all("a[href*='/video/']"))
|
||
>= _MAX_SEARCH_LINKS):
|
||
break
|
||
|
||
# 候选就绪
|
||
deadline = time.time() + _SEARCH_API_WAIT_MS / 1000
|
||
while time.time() < deadline:
|
||
if _detail_candidates(pg, raw_api, "", keyword):
|
||
break
|
||
pg.wait_for_timeout(800)
|
||
|
||
search_url = pg.url
|
||
|
||
# 2) 深采
|
||
if deep:
|
||
for idx in range(_DEEP_NOTES):
|
||
cands = _detail_candidates(pg, raw_api, "", keyword)
|
||
if idx >= len(cands):
|
||
break
|
||
c = cands[idx]
|
||
href = c["href"]
|
||
before = _comment_count_from_raw(raw_api)
|
||
try:
|
||
if c.get("link"):
|
||
_click_or_goto_detail(pg, c["link"], href)
|
||
else:
|
||
_goto_detail_url(pg, href)
|
||
try:
|
||
pg.wait_for_load_state(
|
||
"domcontentloaded", timeout=30000)
|
||
except Exception:
|
||
if "/video/" not in pg.url:
|
||
raise
|
||
pg.wait_for_timeout(_DETAIL_WAIT_MS)
|
||
if _maybe_click_fullscreen(pg):
|
||
pg.wait_for_timeout(_FULLSCREEN_WAIT_MS)
|
||
_maybe_click_comments(pg)
|
||
if _bring_comments_into_view(pg):
|
||
pg.wait_for_timeout(_COMMENT_WAIT_MS)
|
||
# 评论滚动:API 增长 + DOM 项增长 + 末尾文案 三轨判停
|
||
stall = 0
|
||
prev_api = _comment_count_from_raw(raw_api)
|
||
prev_dom = _dom_comment_count(pg)
|
||
for _ in range(_COMMENT_SCROLLS):
|
||
_scroll_comment_panel(pg)
|
||
pg.wait_for_timeout(
|
||
int(random.uniform(1500, 2400)))
|
||
if _comments_end_reached(pg):
|
||
break
|
||
cur_api = _comment_count_from_raw(raw_api)
|
||
cur_dom = _dom_comment_count(pg)
|
||
if cur_api > prev_api or cur_dom > prev_dom:
|
||
prev_api, prev_dom, stall = (
|
||
cur_api, cur_dom, 0)
|
||
else:
|
||
stall += 1
|
||
if stall >= _DEEP_STALL:
|
||
break
|
||
pg.wait_for_timeout(2500) # 末尾包到齐
|
||
_ = before # 仅占位避免 lint
|
||
except Exception:
|
||
try:
|
||
pg.keyboard.press("Escape")
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
try:
|
||
pg.goto(search_url,
|
||
wait_until="domcontentloaded",
|
||
timeout=45000)
|
||
pg.wait_for_timeout(
|
||
int(random.uniform(1500, 2300)))
|
||
except Exception:
|
||
pass
|
||
|
||
ctx.close()
|
||
return {"logged_in": True, "raw_api": raw_api, "tabs": labels,
|
||
"api_url_count": url_count}
|
||
except Exception as e: # noqa: BLE001
|
||
return {"error": str(e)[:200]}
|
||
|
||
|
||
# ── LLM 解析与对外接口 ─────────────────────────────────────────────────
|
||
async def _build_llm() -> LlmClient | None:
|
||
try:
|
||
cfg = await get_agent_settings()
|
||
except Exception:
|
||
cfg = {}
|
||
g = cfg.get("global", {}) if cfg else {}
|
||
a = (cfg.get("agents", {}) or {}).get("douyin_agent", {}) if cfg else {}
|
||
if a and a.get("enabled") is False:
|
||
return None
|
||
key = a.get("api_key") or g.get("api_key") or settings.llm_api_key
|
||
base = a.get("base_url") or g.get("base_url") or settings.llm_api_base
|
||
if not key or not base:
|
||
return None
|
||
model = (a.get("model") or g.get("model")
|
||
or settings.llm_model or "deepseek-chat")
|
||
return LlmClient(api_base=base, api_key=key, model=model,
|
||
timeout=int(g.get("timeout") or 120))
|
||
|
||
|
||
async def douyin_enrich(entity: dict) -> dict:
|
||
"""对某 Place 联抖音采视频+评论 → 证据层 + 体验标签。
|
||
|
||
返回 {ok, found, need_login, tags, sentiment, evidence_saved,
|
||
note_count, comment_count, summary}。
|
||
"""
|
||
llm = await _build_llm()
|
||
if llm is None:
|
||
return {"ok": False, "summary": "douyin_agent 未配置或停用"}
|
||
name = entity.get("name", "")
|
||
biz = entity.get("biz_need") or f"补全「{name}」真实口碑/体验/热度"
|
||
keyword = f"贵阳 {name}".strip()
|
||
pnk = entity.get("eid") or entity.get("natural_key")
|
||
|
||
def _pick(labels):
|
||
try:
|
||
r = llm.chat_json(_TAB_SYS, json.dumps(
|
||
{"业务需求": biz, "可选tab": labels}, ensure_ascii=False))
|
||
return (r or {}).get("tab")
|
||
except Exception:
|
||
return None
|
||
|
||
res = await asyncio.to_thread(_collect, keyword, _pick, True)
|
||
if res.get("error"):
|
||
return {"ok": True, "found": False,
|
||
"summary": f"采集异常:{res['error']}"}
|
||
if res.get("logged_in") is False:
|
||
return {"ok": True, "found": False, "need_login": True,
|
||
"summary": "抖音未登录,需一次性人工登录(已升级)"}
|
||
|
||
raw = res.get("raw_api") or []
|
||
notes = _parse_dy_notes(raw, name, keyword)
|
||
comments = _parse_dy_comments(raw, name, keyword)
|
||
records = notes + comments
|
||
if not records:
|
||
return {"ok": True, "found": False,
|
||
"summary": f"「{name}」抖音无相关 UGC,跳过"}
|
||
for r in records:
|
||
r["place_natural_key"] = pnk
|
||
try:
|
||
saved = await sa_save_evidence(records)
|
||
except Exception:
|
||
saved = 0
|
||
n_note = len(notes)
|
||
n_cmt = len(comments)
|
||
|
||
corpus = json.dumps(
|
||
{"地点": name,
|
||
"视频": [x["title"] for x in notes if x["title"]][:40],
|
||
"评论": [x["content"] for x in comments if x["content"]][:80]},
|
||
ensure_ascii=False)
|
||
try:
|
||
t = await asyncio.to_thread(llm.chat_json, _TAG_SYS, corpus)
|
||
except Exception as e: # noqa: BLE001
|
||
return {"ok": True, "found": True, "evidence_saved": saved,
|
||
"tags": [], "note_count": n_note, "comment_count": n_cmt,
|
||
"summary": f"证据入库{saved},标签失败:{str(e)[:50]}"}
|
||
tags = [str(x).strip()[:24] for x in (t.get("tags") or [])
|
||
if str(x).strip()]
|
||
return {
|
||
"ok": True, "found": True, "tags": tags[:12],
|
||
"sentiment": t.get("sentiment", ""), "evidence_saved": saved,
|
||
"note_count": n_note, "comment_count": n_cmt,
|
||
"summary": (f"抖音 视频{n_note}+评论{n_cmt} 入证据层(存{saved}) → "
|
||
f"{len(tags)} 体验标签·口碑{t.get('sentiment','?')}"),
|
||
}
|