Files
bxh/app/agents/douyin_agent.py

850 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""抖音子 Agent三期移植自 douyin_visible_probe 的成熟链路)。
核心修复:之前 comment=0 的原因是没走通"进详情 → 点全屏 → 显式开评论 →
滚入视口 → 找到真正可滚的元素 → 听 /comment API + DOM 双轨"。本版本把
probe 已经跑通的所有技巧搬进来,自治版同样能拉到全量评论。
外部接口保持不变:
- `douyin_enrich(entity) -> dict` (供编排器 _douyin_enrich)
- `DY_PROFILE_DIR`, `_unlock` (供 scripts/douyin_login.py)
对内做法升级:
- listen 同时记 url+status+body(评论可借 url 回填 note_id)
- 解析用 iter_lists_by_key 深递归,适配 douyin JSON 嵌套多变
- 详情页:click_or_goto + 全屏按钮 + 显式点开评论 + 滚入视口 + 智能滚动
- 终止条件:API 增长 + DOM 项 + "暂时没有更多评论"任一触发
"""
from __future__ import annotations
import asyncio
import json
import os
import random
import time
from urllib.parse import parse_qs, quote, urlparse
from app.config import settings
from app.db import get_agent_settings, sa_save_evidence
from app.llm_client import LlmClient
# ── 路径 / 端点 ─────────────────────────────────────────────────────────
DY_PROFILE_DIR = os.path.expanduser("~/.zn-kg/douyin-profile")
_SEARCH = "https://www.douyin.com/search/{kw}?type=general"
# 监听:解析与计数走这里(精准)
_LISTEN = (
"/aweme/v1/web/aweme/detail",
"/aweme/v1/web/comment/list",
"/aweme/v1/web/comment/list/reply",
"/aweme/v1/web/general/search/stream",
"/aweme/v1/web/general/search/single",
"/aweme/v1/web/search/item",
"/aweme/v1/web/general/search",
)
# 诊断:URL 记录走这里(更宽,排错可视)
_DIAG = ("https://www.douyin.com/aweme/v1/web/", "https://www.douyin.com/search/")
_COMMENT_SCROLLERS = (
"[data-e2e='comment-list']",
"[data-e2e='detail-comment']",
".comment-list",
".comment-mainContent",
".ESlRWJ2j",
)
_DOM_COMMENT_ITEM = "[data-e2e='comment-item']"
# ── 调参(全为模块常量,后续要调直接改) ──────────────────────────────────
_DEEP_NOTES = 3 # 单次最多深采几个视频
_SEARCH_SCROLLS = 6 # 搜索页最多滚几次
_MAX_SEARCH_LINKS = 40
_SEARCH_WAIT_MS = 4500 # 搜索页首屏等
_SEARCH_API_WAIT_MS = 12000 # 等搜索 API 出货
_DETAIL_WAIT_MS = 3800 # 进详情后给页面布局时间
_FULLSCREEN_WAIT_MS = 1800
_COMMENT_WAIT_MS = 3500
_COMMENT_SCROLLS = 30
_DEEP_STALL = 4 # 连续 N 次评论无新增即停
# ── 提示词 ─────────────────────────────────────────────────────────────
_TAB_SYS = """你在抖音搜索结果页选 tab。给业务需求 + 当前可选 tab 列表,
选最相关的一个 tab 名;拿不准选「综合」。
只输出 JSON:{"tab":"tab名","reason":"一句话"}"""
_TAG_SYS = """你从抖音 UGC(视频标题/文案/评论)提炼某地点的"体验标签"
只留多条相互印证、对该地点有信息量的短标签(如:酸汤鱼必点、排队久、人均50、
网红打卡、本地人推荐、环境一般)。广告/无关/噪声一律丢弃。
只输出 JSON:{"tags":["标签1"],"sentiment":"正面|中性|负面|混合","summary":"一句话口碑"}"""
# ── 通用工具 ────────────────────────────────────────────────────────────
def _unlock(profile_dir: str = DY_PROFILE_DIR) -> None:
"""清掉上次被 kill 的 Chromium Singleton 死锁。"""
for name in ("SingletonLock", "SingletonCookie", "SingletonSocket"):
try:
os.unlink(os.path.join(profile_dir, name))
except OSError:
pass
def _scalar(obj, *keys, default=""):
if not isinstance(obj, dict):
return default
for k in keys:
v = obj.get(k)
if v not in (None, "", [], {}):
return v
return default
def _first_avatar(user) -> str:
if not isinstance(user, dict):
return ""
av = user.get("avatar_thumb") or user.get("avatar_medium") or {}
urls = av.get("url_list") if isinstance(av, dict) else None
if isinstance(urls, list) and urls:
return str(urls[0] or "")
return ""
def _as_int(value) -> int:
try:
if value in (None, ""):
return 0
return int(value)
except (TypeError, ValueError):
return 0
def _body_of(entry):
if isinstance(entry, dict) and "__body" in entry:
return entry.get("__body")
return entry
def _url_of(entry) -> str:
if isinstance(entry, dict):
return str(entry.get("__url") or "")
return ""
def _aweme_id_from_url(url: str) -> str:
try:
qs = parse_qs(urlparse(url).query)
except Exception:
return ""
for k in ("aweme_id", "item_id", "group_id"):
v = qs.get(k)
if v:
return str(v[0] or "")
return ""
def _iter_lists_by_key(obj, keys: set):
"""深递归找到所有匹配 key 的 list 节点(适配 douyin 多变嵌套)。"""
if isinstance(obj, dict):
for k, v in obj.items():
if k in keys and isinstance(v, list):
yield v
elif isinstance(v, (dict, list)):
yield from _iter_lists_by_key(v, keys)
elif isinstance(obj, list):
for it in obj:
yield from _iter_lists_by_key(it, keys)
# ── 解析(深递归+多路径) ────────────────────────────────────────────────
def _parse_dy_notes(raw_api: list, name: str, keyword: str) -> list[dict]:
out, seen = [], set()
for entry in raw_api or []:
body = _body_of(entry)
if not isinstance(body, dict):
continue
cands: list = []
data = body.get("data")
d_dict = data if isinstance(data, dict) else {}
for v in (
data if isinstance(data, list) else None,
body.get("aweme_detail"),
body.get("aweme_list"),
d_dict.get("aweme_detail"),
d_dict.get("aweme_list"),
d_dict.get("data"),
):
if isinstance(v, dict):
cands.append(v)
elif isinstance(v, list):
cands.extend(v)
for nested in _iter_lists_by_key(body, {"aweme_list"}):
cands.extend(nested)
for item in cands:
if not isinstance(item, dict):
continue
aw = item.get("aweme_info") or item.get("aweme") or item
if not isinstance(aw, dict):
continue
aid = _scalar(aw, "aweme_id", "group_id", "awemeId")
if not aid or str(aid) in seen:
continue
seen.add(str(aid))
au = aw.get("author") or {}
st = aw.get("statistics") or {}
out.append({
"platform": "douyin", "kind": "note", "source_id": str(aid),
"url": f"https://www.douyin.com/video/{aid}",
"entity_name": name, "keyword": keyword,
"title": _scalar(aw, "desc", "title", "caption"),
"content": _scalar(aw, "desc", "content"),
"author": _scalar(au, "nickname", "name"),
"author_id": str(_scalar(au, "uid", "sec_uid", "unique_id")),
"author_avatar": _first_avatar(au),
"likes": _as_int(_scalar(st, "digg_count", "admire_count",
default=0)),
"comments": _as_int(_scalar(st, "comment_count", default=0)),
"collects": _as_int(_scalar(st, "collect_count",
"favorite_count",
"collects_count", default=0)),
"shares": _as_int(_scalar(st, "share_count",
"share_count_reflow", default=0)),
"publish_time": str(_scalar(aw, "create_time", default="")),
"location": "", "tags": [], "image_urls": [], "raw": item,
})
return out
def _parse_dy_comments(raw_api: list, name: str, keyword: str) -> list[dict]:
out, seen = [], set()
for entry in raw_api or []:
body = _body_of(entry)
if not isinstance(body, dict):
continue
# 评论缺 aweme_id 时,可从响应 URL 的 query 兜回填
fallback_aid = _aweme_id_from_url(_url_of(entry))
for comments in _iter_lists_by_key(
body, {"comments", "comment_list", "reply_comments"}):
for cm in comments:
if not isinstance(cm, dict):
continue
cid = _scalar(cm, "cid", "comment_id", "id")
text = _scalar(cm, "text", "content")
if not cid or not text or str(cid) in seen:
continue
seen.add(str(cid))
u = cm.get("user") or cm.get("user_info") or {}
aid = _scalar(cm, "aweme_id", default=fallback_aid)
replies = _scalar(cm, "reply_comment_total",
"reply_total", default=0)
out.append({
"platform": "douyin", "kind": "comment",
"source_id": str(cid),
"url": (f"https://www.douyin.com/video/{aid}"
if aid else ""),
"entity_name": name, "keyword": keyword,
"title": "", "content": text,
"author": _scalar(u, "nickname", "name"),
"author_id": str(_scalar(u, "uid", "sec_uid")),
"author_avatar": _first_avatar(u),
"likes": _as_int(_scalar(cm, "digg_count", "like_count",
default=0)),
"comments": _as_int(replies),
"collects": 0, "shares": 0,
"publish_time": str(_scalar(cm, "create_time",
default="")),
"location": _scalar(cm, "ip_label", default=""),
"tags": [], "image_urls": [], "raw": cm,
})
return out
def _comment_count_from_raw(raw_api: list) -> int:
n = 0
for entry in raw_api or []:
body = _body_of(entry)
if not isinstance(body, dict):
continue
for cms in _iter_lists_by_key(
body, {"comments", "comment_list", "reply_comments"}):
n += len(cms)
return n
# ── 浏览器交互(全部从 probe 移植,headless 下大多照样能跑) ────────────
def _comment_panel_present(pg) -> bool:
try:
return bool(pg.evaluate(
"""
(selectors) => {
const ok = (el) => {
if (!el) return false;
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
const w = Math.min(r.right, innerWidth) - Math.max(r.left, 0);
const h = Math.min(r.bottom, innerHeight) - Math.max(r.top, 0);
return r.width > 120 && r.height > 120 && w > 80 && h > 80 &&
s.display !== 'none' && s.visibility !== 'hidden' &&
Number(s.opacity || '1') > 0.05;
};
for (const sel of selectors) {
for (const el of document.querySelectorAll(sel)) {
if (ok(el)) return true;
}
}
return false;
}
""", list(_COMMENT_SCROLLERS)))
except Exception:
return False
def _wait_for_comment_panel(pg, timeout_ms: int = 3000) -> bool:
deadline = time.time() + timeout_ms / 1000
while time.time() < deadline:
if _comment_panel_present(pg):
return True
pg.wait_for_timeout(180)
return _comment_panel_present(pg)
def _reveal_player_controls(pg) -> None:
for sel in (".xgplayer", "[class*='xgplayer']", "video"):
try:
loc = pg.locator(sel).first
if loc.count() <= 0:
continue
box = loc.bounding_box(timeout=1200)
if not box:
continue
pg.mouse.move(box["x"] + box["width"] * 0.72,
box["y"] + box["height"] * 0.82)
pg.wait_for_timeout(700)
return
except Exception:
continue
try:
vp = pg.viewport_size or {"width": 1440, "height": 900}
pg.mouse.move(vp["width"] * 0.55, vp["height"] * 0.78)
pg.wait_for_timeout(700)
except Exception:
pass
def _maybe_click_fullscreen(pg) -> bool:
"""点全屏按钮 / 按 f,详情面板布局更稳。"""
_reveal_player_controls(pg)
for sel in (
".xgplayer-icon:has(.xg-get-fullscreen)",
".xgplayer-icon .xg-get-fullscreen", ".xg-get-fullscreen",
".xgplayer-fullscreen", "[aria-label*='全屏']",
"[title*='全屏']", "button:has-text('全屏')",
):
try:
_reveal_player_controls(pg)
loc = pg.locator(sel).first
if loc.count() <= 0:
continue
try:
loc.scroll_into_view_if_needed(timeout=1800)
except Exception:
pass
try:
loc.hover(timeout=1200)
except Exception:
pass
loc.click(timeout=1800)
pg.wait_for_timeout(1600)
return True
except Exception:
continue
try:
_reveal_player_controls(pg)
pg.keyboard.press("f")
pg.wait_for_timeout(1200)
return True
except Exception:
return False
def _maybe_click_comments(pg) -> bool:
"""显式点开评论按钮(关键:有些详情页评论默认不展开)。"""
if _wait_for_comment_panel(pg, 800):
return True
for sel in (
"[data-e2e='feed-comment-icon']", "[data-e2e='feed-comment']",
"[data-e2e='comment-icon']", "[data-e2e='video-comment']",
"[aria-label*='评论']", "[title*='评论']",
"button:has-text('评论')", "[role='button']:has-text('评论')",
"text=评论",
):
try:
loc = pg.locator(sel).first
if loc.count() > 0:
try:
loc.scroll_into_view_if_needed(timeout=1800)
except Exception:
pass
loc.click(timeout=1200)
if _wait_for_comment_panel(pg, 2200):
return True
except Exception:
continue
# 兜底:文本/SVG path 启发式搜索可点位置
try:
points = pg.evaluate(
"""
() => {
const hasPanel = () => Boolean(
[...document.querySelectorAll(
"[data-e2e='comment-list'],.comment-mainContent")]
.some((el) => {
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
return r.width > 120 && r.height > 120 &&
s.display !== 'none' && s.visibility !== 'hidden';
})
);
if (hasPanel()) return [{ alreadyOpen: true }];
const visible = (el) => {
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
return r.width > 6 && r.height > 6 &&
s.visibility !== 'hidden' && s.display !== 'none';
};
const points = [];
const add = (el) => {
let t = el.closest('button,[role="button"],[tabindex],a') || el;
if (!visible(t)) return;
const r = t.getBoundingClientRect();
points.push({
x: Math.round(r.left + r.width / 2),
y: Math.round(r.top + r.height / 2),
});
};
for (const el of document.querySelectorAll(
'button,[role="button"],[tabindex],[aria-label],span')) {
const tx = [el.innerText || '',
el.getAttribute('aria-label') || '',
el.getAttribute('title') || ''].join(' ');
if (!/评论/.test(tx) || /评论区|评论列表|暂无评论/.test(tx))
continue;
add(el);
}
return points;
}
""") or []
for p in points:
if p.get("alreadyOpen"):
return True
x, y = p.get("x"), p.get("y")
if not isinstance(x, int) or not isinstance(y, int):
continue
pg.mouse.move(x, y)
pg.mouse.click(x, y)
if _wait_for_comment_panel(pg, 2200):
return True
except Exception:
pass
return False
def _move_into_box(pg, box) -> None:
vp = pg.viewport_size or {"width": 1440, "height": 900}
x = min(max(box["x"] + box["width"] / 2, 8), vp["width"] - 8)
y = min(max(box["y"] + min(box["height"] / 2, 260), 8), vp["height"] - 8)
pg.mouse.move(x, y)
def _bring_comments_into_view(pg) -> bool:
for _ in range(4):
for sel in _COMMENT_SCROLLERS:
try:
loc = pg.locator(sel).first
if loc.count() <= 0:
continue
loc.scroll_into_view_if_needed(timeout=5000)
pg.wait_for_timeout(1800)
sc = pg.query_selector(sel)
if sc:
box = sc.bounding_box()
if box:
_move_into_box(pg, box)
return True
except Exception:
continue
try:
pg.evaluate(
"window.scrollBy(0, Math.round(window.innerHeight*0.68))")
pg.wait_for_timeout(1700)
except Exception:
pass
return False
def _scroll_comment_panel(pg) -> bool:
"""找到真正可滚的容器(用 getComputedStyle 判 overflow)并滚到底。"""
try:
handle = pg.evaluate_handle(
"""
() => {
const sels = [
"[data-e2e='comment-list']", ".comment-mainContent",
"[data-e2e='detail-comment']", ".comment-list", ".ESlRWJ2j"
];
const ok = (el) => {
if (!el) return false;
const s = getComputedStyle(el);
return el.scrollHeight > el.clientHeight + 20 ||
/(auto|scroll)/.test(s.overflowY || '');
};
for (const sel of sels) {
let el = document.querySelector(sel);
while (el && el !== document.body) {
if (ok(el) && el.clientHeight > 120) return el;
el = el.parentElement;
}
}
return document.scrollingElement || document.documentElement;
}
"""
)
el = handle.as_element()
except Exception:
el = None
try:
if el:
box = el.bounding_box()
if box:
_move_into_box(pg, box)
pg.mouse.wheel(0, 2800)
pg.evaluate(
"(e)=>{const s=Math.max(900,Math.floor((e.clientHeight||"
"innerHeight)*1.35));typeof e.scrollBy==='function'?"
"e.scrollBy(0,s):e.scrollTop+=s;}", el)
return True
except Exception:
pass
try:
pg.mouse.wheel(0, 2800)
pg.keyboard.press("End")
return True
except Exception:
return False
def _dom_comment_count(pg) -> int:
try:
return len(pg.query_selector_all(_DOM_COMMENT_ITEM))
except Exception:
return 0
def _comments_end_reached(pg) -> bool:
try:
return bool(pg.evaluate(
"() => document.body && "
"document.body.innerText.includes('暂时没有更多评论')"))
except Exception:
return False
def _goto_detail_url(pg, href: str) -> None:
try:
pg.goto(href, wait_until="commit", timeout=30000)
except Exception:
if "/video/" not in pg.url:
raise
def _click_or_goto_detail(pg, link, href: str) -> None:
"""优先 SPA 内点击(让 douyin 自己发签名请求),失败回退直接 goto。"""
try:
with pg.expect_navigation(timeout=15000):
link.click()
return
except Exception:
pass
try:
link.click()
pg.wait_for_timeout(1800)
if "/video/" in pg.url:
return
except Exception:
pass
if href:
_goto_detail_url(pg, href)
def _detail_candidates(pg, raw_api, name, keyword) -> list[dict]:
"""搜索页 DOM 链接 + API 已发现的 aweme_id,取并集做深采候选。"""
cands, seen = [], set()
try:
links = pg.query_selector_all("a[href*='/video/']")
except Exception:
links = []
for link in links:
try:
href = link.get_attribute("href") or ""
except Exception:
continue
if href.startswith("//"):
href = "https:" + href
elif href.startswith("/"):
href = "https://www.douyin.com" + href
if not href or href in seen:
continue
seen.add(href)
cands.append({"href": href, "link": link, "source": "dom"})
for note in _parse_dy_notes(raw_api, name, keyword):
href = note.get("url") or ""
if not href or href in seen:
continue
seen.add(href)
cands.append({"href": href, "link": None, "source": "api"})
return cands
# ── 主流程:_collect ────────────────────────────────────────────────────
def _collect(keyword: str, pick_tab_cb, deep: bool = True) -> dict:
"""Playwright 持久化上下文采集(已对齐 probe 的成熟链路)。
返回 {logged_in, raw_api(含 __url/__body), tabs[], notes_dom[],
api_url_count}。raw_api 直接喂给 _parse_dy_notes/_comments。
"""
from app.agents.web_agent import _STEALTH_JS, _UA, _CHROME_ARGS
os.makedirs(DY_PROFILE_DIR, exist_ok=True)
_unlock()
raw_api: list = []
url_count = 0
labels: list = []
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
ctx = p.chromium.launch_persistent_context(
user_data_dir=DY_PROFILE_DIR, headless=True,
args=_CHROME_ARGS,
ignore_default_args=["--enable-automation"],
user_agent=_UA, locale="zh-CN",
viewport={"width": 1440, "height": 900})
ctx.add_init_script(_STEALTH_JS)
def _on(resp):
nonlocal url_count
u = resp.url
if not any(p in u for p in _DIAG):
return
matched = any(p in u for p in _LISTEN)
body = None
try:
body = resp.json()
except Exception:
body = None
if matched and isinstance(body, dict):
raw_api.append({"__url": u, "__status": resp.status,
"__body": body})
url_count += 1
ctx.on("response", _on)
pg = ctx.pages[0] if ctx.pages else ctx.new_page()
pg.set_default_timeout(15000)
# 1) 搜索
pg.goto(_SEARCH.format(kw=quote(keyword)),
wait_until="domcontentloaded", timeout=60000)
pg.wait_for_timeout(_SEARCH_WAIT_MS)
html = pg.content()
cards = pg.query_selector_all("a[href*='/video/']")
if not cards and ("扫码登录" in html or "手机号登录" in html
or "验证" in html or "/passport" in pg.url):
ctx.close()
return {"logged_in": False, "raw_api": [], "tabs": [],
"api_url_count": url_count}
# AI 选 tab(best-effort)
try:
tabs = pg.query_selector_all(
"[data-e2e='search-tab'] span, .tab-item, "
"div[role='tab']")
labels = list({(t.inner_text() or "").strip()
for t in tabs
if (t.inner_text() or "").strip()})
chosen = pick_tab_cb(labels) if labels else None
if chosen and chosen != "综合":
for t in tabs:
if (t.inner_text() or "").strip() == chosen:
t.click()
pg.wait_for_timeout(3000)
break
except Exception:
pass
# 搜索页继续滚,促 API 出货
for _ in range(_SEARCH_SCROLLS):
pg.mouse.wheel(0, 2600)
pg.wait_for_timeout(int(random.uniform(1000, 1800)))
if (len(pg.query_selector_all("a[href*='/video/']"))
>= _MAX_SEARCH_LINKS):
break
# 候选就绪
deadline = time.time() + _SEARCH_API_WAIT_MS / 1000
while time.time() < deadline:
if _detail_candidates(pg, raw_api, "", keyword):
break
pg.wait_for_timeout(800)
search_url = pg.url
# 2) 深采
if deep:
for idx in range(_DEEP_NOTES):
cands = _detail_candidates(pg, raw_api, "", keyword)
if idx >= len(cands):
break
c = cands[idx]
href = c["href"]
before = _comment_count_from_raw(raw_api)
try:
if c.get("link"):
_click_or_goto_detail(pg, c["link"], href)
else:
_goto_detail_url(pg, href)
try:
pg.wait_for_load_state(
"domcontentloaded", timeout=30000)
except Exception:
if "/video/" not in pg.url:
raise
pg.wait_for_timeout(_DETAIL_WAIT_MS)
if _maybe_click_fullscreen(pg):
pg.wait_for_timeout(_FULLSCREEN_WAIT_MS)
_maybe_click_comments(pg)
if _bring_comments_into_view(pg):
pg.wait_for_timeout(_COMMENT_WAIT_MS)
# 评论滚动:API 增长 + DOM 项增长 + 末尾文案 三轨判停
stall = 0
prev_api = _comment_count_from_raw(raw_api)
prev_dom = _dom_comment_count(pg)
for _ in range(_COMMENT_SCROLLS):
_scroll_comment_panel(pg)
pg.wait_for_timeout(
int(random.uniform(1500, 2400)))
if _comments_end_reached(pg):
break
cur_api = _comment_count_from_raw(raw_api)
cur_dom = _dom_comment_count(pg)
if cur_api > prev_api or cur_dom > prev_dom:
prev_api, prev_dom, stall = (
cur_api, cur_dom, 0)
else:
stall += 1
if stall >= _DEEP_STALL:
break
pg.wait_for_timeout(2500) # 末尾包到齐
_ = before # 仅占位避免 lint
except Exception:
try:
pg.keyboard.press("Escape")
except Exception:
pass
finally:
try:
pg.goto(search_url,
wait_until="domcontentloaded",
timeout=45000)
pg.wait_for_timeout(
int(random.uniform(1500, 2300)))
except Exception:
pass
ctx.close()
return {"logged_in": True, "raw_api": raw_api, "tabs": labels,
"api_url_count": url_count}
except Exception as e: # noqa: BLE001
return {"error": str(e)[:200]}
# ── LLM 解析与对外接口 ─────────────────────────────────────────────────
async def _build_llm() -> LlmClient | None:
try:
cfg = await get_agent_settings()
except Exception:
cfg = {}
g = cfg.get("global", {}) if cfg else {}
a = (cfg.get("agents", {}) or {}).get("douyin_agent", {}) if cfg else {}
if a and a.get("enabled") is False:
return None
key = a.get("api_key") or g.get("api_key") or settings.llm_api_key
base = a.get("base_url") or g.get("base_url") or settings.llm_api_base
if not key or not base:
return None
model = (a.get("model") or g.get("model")
or settings.llm_model or "deepseek-chat")
return LlmClient(api_base=base, api_key=key, model=model,
timeout=int(g.get("timeout") or 120))
async def douyin_enrich(entity: dict) -> dict:
"""对某 Place 联抖音采视频+评论 → 证据层 + 体验标签。
返回 {ok, found, need_login, tags, sentiment, evidence_saved,
note_count, comment_count, summary}。
"""
llm = await _build_llm()
if llm is None:
return {"ok": False, "summary": "douyin_agent 未配置或停用"}
name = entity.get("name", "")
biz = entity.get("biz_need") or f"补全「{name}」真实口碑/体验/热度"
keyword = f"贵阳 {name}".strip()
pnk = entity.get("eid") or entity.get("natural_key")
def _pick(labels):
try:
r = llm.chat_json(_TAB_SYS, json.dumps(
{"业务需求": biz, "可选tab": labels}, ensure_ascii=False))
return (r or {}).get("tab")
except Exception:
return None
res = await asyncio.to_thread(_collect, keyword, _pick, True)
if res.get("error"):
return {"ok": True, "found": False,
"summary": f"采集异常:{res['error']}"}
if res.get("logged_in") is False:
return {"ok": True, "found": False, "need_login": True,
"summary": "抖音未登录,需一次性人工登录(已升级)"}
raw = res.get("raw_api") or []
notes = _parse_dy_notes(raw, name, keyword)
comments = _parse_dy_comments(raw, name, keyword)
records = notes + comments
if not records:
return {"ok": True, "found": False,
"summary": f"{name}」抖音无相关 UGC,跳过"}
for r in records:
r["place_natural_key"] = pnk
try:
saved = await sa_save_evidence(records)
except Exception:
saved = 0
n_note = len(notes)
n_cmt = len(comments)
corpus = json.dumps(
{"地点": name,
"视频": [x["title"] for x in notes if x["title"]][:40],
"评论": [x["content"] for x in comments if x["content"]][:80]},
ensure_ascii=False)
try:
t = await asyncio.to_thread(llm.chat_json, _TAG_SYS, corpus)
except Exception as e: # noqa: BLE001
return {"ok": True, "found": True, "evidence_saved": saved,
"tags": [], "note_count": n_note, "comment_count": n_cmt,
"summary": f"证据入库{saved},标签失败:{str(e)[:50]}"}
tags = [str(x).strip()[:24] for x in (t.get("tags") or [])
if str(x).strip()]
return {
"ok": True, "found": True, "tags": tags[:12],
"sentiment": t.get("sentiment", ""), "evidence_saved": saved,
"note_count": n_note, "comment_count": n_cmt,
"summary": (f"抖音 视频{n_note}+评论{n_cmt} 入证据层(存{saved}) → "
f"{len(tags)} 体验标签·口碑{t.get('sentiment','?')}"),
}