Files
bxh/scripts/douyin_visible_probe.py

1375 lines
48 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Visible Douyin crawler probe.
This is a standalone diagnostic copy of the douyin_agent collection flow. It
does not import or modify app/agents/douyin_agent.py. The script opens a headed
Chrome/Chromium window, lets the browser produce signed Douyin requests, records
the relevant API traffic, parses videos/comments, and saves project-compatible
CSV artifacts for offline inspection.
"""
from __future__ import annotations
import argparse
import csv
import json
import os
import random
import sys
import time
from datetime import datetime
from pathlib import Path
from urllib.parse import parse_qs, quote, urlparse
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_PROFILE_DIR = Path(os.path.expanduser("~/.zn-kg/douyin-profile"))
DEFAULT_OUT_DIR = ROOT / "data" / "douyin_probe"
SEARCH_URL = "https://www.douyin.com/search/{kw}?type=general"
UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
CHROME_ARGS = [
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
"--disable-sync",
"--disable-default-apps",
"--no-sandbox",
"--disable-dev-shm-usage",
]
STEALTH_JS = r"""
(() => {
const d=(o,k,g)=>{try{Object.defineProperty(o,k,{get:g,configurable:true})}catch(e){}};
d(Navigator.prototype,'webdriver',()=>undefined);
d(Navigator.prototype,'languages',()=>['zh-CN','zh','en']);
d(Navigator.prototype,'language',()=>'zh-CN');
d(Navigator.prototype,'vendor',()=>'Google Inc.');
if(!window.chrome){try{Object.defineProperty(window,'chrome',{value:{runtime:{}},configurable:true})}catch(e){}}
try{window.chrome.app={isInstalled:false};window.chrome.csi=()=>({});window.chrome.loadTimes=()=>({});}catch(e){}
const mk=(a)=>{a.item=(i)=>a[i]||null;a.namedItem=(n)=>a.find(x=>x.name===n)||null;return a;};
const plugins=mk([{name:'Chrome PDF Plugin'},{name:'Chrome PDF Viewer'},{name:'Native Client'}]);
d(Navigator.prototype,'plugins',()=>plugins);
const q=navigator.permissions&&navigator.permissions.query;
if(q){navigator.permissions.query=(p)=>p&&p.name==='notifications'
?Promise.resolve({state:Notification.permission,onchange:null}):q(p);}
const pw=(proto)=>{if(!proto||!proto.getParameter)return;const o=proto.getParameter;
proto.getParameter=function(p){if(p===37445)return 'Intel Inc.';
if(p===37446)return 'Intel Iris OpenGL Engine';return o.apply(this,arguments);};};
pw(window.WebGLRenderingContext&&WebGLRenderingContext.prototype);
pw(window.WebGL2RenderingContext&&WebGL2RenderingContext.prototype);
})();
"""
LISTEN_PATTERNS = [
"/aweme/v1/web/aweme/detail",
"/aweme/v1/web/comment/list",
"/aweme/v1/web/comment/list/reply",
"/aweme/v1/web/general/search/stream",
"/aweme/v1/web/general/search/single",
"/aweme/v1/web/search/item",
"/aweme/v1/web/general/search",
]
DIAG_PATTERNS = [
"https://www.douyin.com/aweme/v1/web/",
"https://www.douyin.com/search/",
]
COMMENT_SCROLLERS = (
"[data-e2e='comment-list']",
"[data-e2e='detail-comment']",
".comment-list",
".comment-mainContent",
".ESlRWJ2j",
)
DOM_COMMENT_SELECTORS = (
"[data-e2e='comment-item']",
"[data-e2e='comment-text']",
".comment-mainContent",
".comment-item",
".comment-text",
)
EVIDENCE_COLUMNS = [
"platform",
"kind",
"source_id",
"url",
"entity_name",
"place_natural_key",
"keyword",
"title",
"content",
"author",
"author_id",
"author_avatar",
"likes",
"comments",
"collects",
"shares",
"publish_time",
"location",
"tags",
"image_urls",
"raw_json",
]
VIDEO_METRIC_COLUMNS = [
"platform",
"source_id",
"url",
"entity_name",
"place_natural_key",
"keyword",
"video_title",
"video_author",
"video_author_id",
"video_publish_time",
"video_publish_timestamp",
"video_like_count",
"video_comment_count",
"video_collect_count",
"video_share_count",
]
def log(message: str) -> None:
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] {message}", flush=True)
def unlock_profile(profile_dir: Path) -> None:
for name in ("SingletonLock", "SingletonCookie", "SingletonSocket"):
try:
(profile_dir / name).unlink()
except OSError:
pass
def scalar(obj, *keys, default=""):
if not isinstance(obj, dict):
return default
for key in keys:
value = obj.get(key)
if value not in (None, "", [], {}):
return value
return default
def first_avatar(user: dict) -> str:
if not isinstance(user, dict):
return ""
avatar = user.get("avatar_thumb") or user.get("avatar_medium") or {}
urls = avatar.get("url_list") if isinstance(avatar, dict) else None
if isinstance(urls, list) and urls:
return str(urls[0] or "")
return ""
def as_int(value) -> int:
try:
if value in (None, ""):
return 0
return int(value)
except (TypeError, ValueError):
return 0
def format_publish_time(value) -> str:
try:
ts = int(value)
except (TypeError, ValueError):
return str(value or "")
if ts <= 0:
return ""
if ts > 10_000_000_000:
ts = ts // 1000
try:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
except (OSError, OverflowError, ValueError):
return str(value or "")
def body_of(entry: dict):
if not isinstance(entry, dict):
return {}
return entry.get("__body") if "__body" in entry else entry
def source_url_of(entry: dict) -> str:
if not isinstance(entry, dict):
return ""
return str(entry.get("__url") or entry.get("__probe_url") or "")
def aweme_id_from_url(url: str) -> str:
try:
qs = parse_qs(urlparse(url).query)
except Exception:
return ""
for key in ("aweme_id", "item_id", "group_id"):
vals = qs.get(key)
if vals:
return str(vals[0] or "")
return ""
def iter_lists_by_key(obj, keys: set[str]):
if isinstance(obj, dict):
for key, value in obj.items():
if key in keys and isinstance(value, list):
yield value
elif isinstance(value, (dict, list)):
yield from iter_lists_by_key(value, keys)
elif isinstance(obj, list):
for item in obj:
yield from iter_lists_by_key(item, keys)
def parse_douyin_notes(raw_api: list[dict], name: str, keyword: str) -> list[dict]:
out: list[dict] = []
seen: set[str] = set()
for entry in raw_api or []:
body = body_of(entry)
if not isinstance(body, dict):
continue
candidates: list = []
data = body.get("data")
data_dict = data if isinstance(data, dict) else {}
for value in (
data if isinstance(data, list) else None,
body.get("aweme_detail"),
body.get("aweme_list"),
data_dict.get("aweme_detail"),
data_dict.get("aweme_list"),
data_dict.get("data"),
):
if isinstance(value, dict):
candidates.append(value)
elif isinstance(value, list):
candidates.extend(value)
for nested in iter_lists_by_key(body, {"aweme_list"}):
candidates.extend(nested)
for item in candidates:
if not isinstance(item, dict):
continue
aweme = item.get("aweme_info") or item.get("aweme") or item
if not isinstance(aweme, dict):
continue
aid = scalar(aweme, "aweme_id", "group_id", "awemeId")
if not aid or str(aid) in seen:
continue
seen.add(str(aid))
author = aweme.get("author") or {}
stat = aweme.get("statistics") or {}
out.append(
{
"platform": "douyin",
"kind": "note",
"source_id": str(aid),
"url": f"https://www.douyin.com/video/{aid}",
"entity_name": name,
"keyword": keyword,
"title": scalar(aweme, "desc", "title", "caption"),
"content": scalar(aweme, "desc", "content"),
"author": scalar(author, "nickname", "name"),
"author_id": str(
scalar(author, "uid", "sec_uid", "unique_id")
),
"author_avatar": first_avatar(author),
"likes": as_int(scalar(stat, "digg_count", "admire_count", default=0)),
"comments": as_int(scalar(stat, "comment_count", default=0)),
"collects": as_int(
scalar(
stat,
"collect_count",
"favorite_count",
"collects_count",
default=0,
)
),
"shares": as_int(
scalar(stat, "share_count", "share_count_reflow", default=0)
),
"publish_time": str(scalar(aweme, "create_time", default="")),
"location": "",
"tags": [],
"image_urls": [],
"raw": item,
}
)
return out
def parse_douyin_comments(raw_api: list[dict], name: str, keyword: str) -> list[dict]:
out: list[dict] = []
seen: set[str] = set()
for entry in raw_api or []:
body = body_of(entry)
if not isinstance(body, dict):
continue
source_url = source_url_of(entry)
fallback_aweme_id = aweme_id_from_url(source_url)
comment_lists = list(
iter_lists_by_key(body, {"comments", "comment_list", "reply_comments"})
)
for comments in comment_lists:
for comment in comments:
if not isinstance(comment, dict):
continue
cid = scalar(comment, "cid", "comment_id", "id")
text = scalar(comment, "text", "content")
if not cid or not text or str(cid) in seen:
continue
seen.add(str(cid))
user = comment.get("user") or comment.get("user_info") or {}
aweme_id = scalar(comment, "aweme_id", default=fallback_aweme_id)
replies = scalar(
comment, "reply_comment_total", "reply_total", default=0
)
out.append(
{
"platform": "douyin",
"kind": "comment",
"source_id": str(cid),
"url": f"https://www.douyin.com/video/{aweme_id}",
"entity_name": name,
"keyword": keyword,
"title": "",
"content": text,
"author": scalar(user, "nickname", "name"),
"author_id": str(scalar(user, "uid", "sec_uid")),
"author_avatar": first_avatar(user),
"likes": as_int(scalar(comment, "digg_count", "like_count", default=0)),
"comments": as_int(replies),
"collects": 0,
"shares": 0,
"publish_time": str(
scalar(comment, "create_time", default="")
),
"location": scalar(comment, "ip_label", default=""),
"tags": [],
"image_urls": [],
"raw": comment,
}
)
return out
def response_shape(payload) -> dict:
body = payload
if isinstance(payload, dict) and "__body" in payload:
body = payload["__body"]
if not isinstance(body, dict):
return {"body_type": type(body).__name__, "keys": ""}
data = body.get("data")
data_type = type(data).__name__
comment_count = 0
aweme_count = 0
for comments in iter_lists_by_key(body, {"comments", "comment_list", "reply_comments"}):
comment_count += len(comments)
for awemes in iter_lists_by_key(body, {"aweme_list"}):
aweme_count += len(awemes)
if isinstance(data, list):
aweme_count += len(data)
return {
"body_type": "dict",
"data_type": data_type,
"keys": ",".join(list(body.keys())[:16]),
"comment_items": comment_count,
"aweme_items": aweme_count,
"status_code": body.get("status_code", ""),
"status_msg": body.get("status_msg", ""),
}
def write_csv(path: Path, rows: list[dict], columns: list[str] | None = None) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if columns is None:
keys: list[str] = []
for row in rows:
for key in row.keys():
if key not in keys:
keys.append(key)
columns = keys
with path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=columns, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def evidence_csv_rows(records: list[dict], place_key: str) -> list[dict]:
rows = []
for record in records:
row = {key: record.get(key, "") for key in EVIDENCE_COLUMNS}
row["place_natural_key"] = place_key
row["tags"] = json.dumps(record.get("tags") or [], ensure_ascii=False)
row["image_urls"] = json.dumps(
record.get("image_urls") or [], ensure_ascii=False
)
row["raw_json"] = json.dumps(record.get("raw") or {}, ensure_ascii=False)
rows.append(row)
return rows
def video_metric_rows(records: list[dict], place_key: str) -> list[dict]:
rows = []
for record in records:
if record.get("kind") != "note":
continue
publish_time = record.get("publish_time", "")
rows.append(
{
"platform": record.get("platform", "douyin"),
"source_id": record.get("source_id", ""),
"url": record.get("url", ""),
"entity_name": record.get("entity_name", ""),
"place_natural_key": place_key,
"keyword": record.get("keyword", ""),
"video_title": record.get("title", "") or record.get("content", ""),
"video_author": record.get("author", ""),
"video_author_id": record.get("author_id", ""),
"video_publish_time": format_publish_time(publish_time),
"video_publish_timestamp": publish_time,
"video_like_count": record.get("likes", 0),
"video_comment_count": record.get("comments", 0),
"video_collect_count": record.get("collects", 0),
"video_share_count": record.get("shares", 0),
}
)
return rows
def save_artifacts(
out_dir: Path,
records: list[dict],
api_events: list[dict],
raw_api: list[dict],
dom_comments: list[dict],
summary: dict,
place_key: str,
) -> dict:
out_dir.mkdir(parents=True, exist_ok=True)
evidence_rows = evidence_csv_rows(records, place_key)
note_rows = [row for row in evidence_rows if row.get("kind") != "comment"]
comment_rows = [row for row in evidence_rows if row.get("kind") == "comment"]
files = {
"social_evidence_csv": out_dir / "social_evidence.csv",
"videos_csv": out_dir / "videos.csv",
"video_metrics_csv": out_dir / "video_metrics.csv",
"comments_csv": out_dir / "comments.csv",
"api_urls_csv": out_dir / "api_urls.csv",
"dom_comments_csv": out_dir / "dom_comments.csv",
"raw_api_jsonl": out_dir / "raw_api.jsonl",
"summary_json": out_dir / "run_summary.json",
}
write_csv(files["social_evidence_csv"], evidence_rows, EVIDENCE_COLUMNS)
write_csv(files["videos_csv"], note_rows, EVIDENCE_COLUMNS)
write_csv(
files["video_metrics_csv"],
video_metric_rows(records, place_key),
VIDEO_METRIC_COLUMNS,
)
write_csv(files["comments_csv"], comment_rows, EVIDENCE_COLUMNS)
write_csv(
files["api_urls_csv"],
api_events,
[
"seq",
"captured_at",
"status",
"kind",
"body_type",
"data_type",
"comment_items",
"aweme_items",
"status_code",
"status_msg",
"keys",
"url",
],
)
write_csv(files["dom_comments_csv"], dom_comments)
with files["raw_api_jsonl"].open("w", encoding="utf-8") as f:
for entry in raw_api:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
summary["files"] = {key: str(value) for key, value in files.items()}
with files["summary_json"].open("w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
return summary["files"]
def collect_dom_comments(page, video_url: str) -> list[dict]:
rows: list[dict] = []
seen: set[str] = set()
for selector in DOM_COMMENT_SELECTORS:
try:
elements = page.query_selector_all(selector)
except Exception:
continue
for element in elements:
try:
text = " ".join((element.inner_text() or "").split())
except Exception:
continue
if len(text) < 2 or text in seen:
continue
seen.add(text)
rows.append(
{
"source": "dom",
"selector": selector,
"video_url": video_url,
"content": text[:1000],
}
)
return rows
def comment_count_from_raw(raw_api: list[dict]) -> int:
total = 0
for entry in raw_api:
body = body_of(entry)
if not isinstance(body, dict):
continue
for comments in iter_lists_by_key(body, {"comments", "comment_list", "reply_comments"}):
total += len(comments)
return total
def click_or_goto_detail(page, link, href: str) -> None:
try:
with page.expect_navigation(timeout=15000):
link.click()
return
except Exception:
pass
try:
link.click()
page.wait_for_timeout(1800)
if "/video/" in page.url:
return
except Exception:
pass
if href:
goto_detail_url(page, href)
def goto_detail_url(page, href: str) -> None:
try:
page.goto(href, wait_until="commit", timeout=30000)
except Exception:
if "/video/" not in page.url:
raise
def comment_panel_present(page) -> bool:
try:
return bool(
page.evaluate(
"""
(selectors) => {
const visibleInViewport = (el) => {
if (!el) return false;
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
const width = Math.min(r.right, innerWidth) - Math.max(r.left, 0);
const height = Math.min(r.bottom, innerHeight) - Math.max(r.top, 0);
return r.width > 120 && r.height > 120 && width > 80 && height > 80 &&
s.display !== 'none' && s.visibility !== 'hidden' &&
Number(s.opacity || '1') > 0.05;
};
for (const selector of selectors) {
for (const el of document.querySelectorAll(selector)) {
if (visibleInViewport(el)) return true;
}
}
return false;
}
""",
list(COMMENT_SCROLLERS),
)
)
except Exception:
return False
def wait_for_comment_panel(page, timeout_ms: int = 3000) -> bool:
deadline = time.time() + timeout_ms / 1000
while time.time() < deadline:
if comment_panel_present(page):
return True
page.wait_for_timeout(180)
return comment_panel_present(page)
def detail_candidates(page, raw_api: list[dict], name: str, keyword: str) -> list[dict]:
candidates: list[dict] = []
seen: set[str] = set()
try:
links = page.query_selector_all("a[href*='/video/']")
except Exception:
links = []
for link in links:
try:
href = link.get_attribute("href") or ""
except Exception:
continue
if href.startswith("//"):
href = "https:" + href
elif href.startswith("/"):
href = "https://www.douyin.com" + href
if not href or href in seen:
continue
seen.add(href)
candidates.append({"href": href, "link": link, "source": "dom"})
for note in parse_douyin_notes(raw_api, name, keyword):
href = note.get("url") or ""
if not href or href in seen:
continue
seen.add(href)
candidates.append({"href": href, "link": None, "source": "api"})
return candidates
def reveal_player_controls(page) -> bool:
player_selectors = (
".xgplayer",
"[class*='xgplayer']",
"video",
)
for selector in player_selectors:
try:
loc = page.locator(selector).first
if loc.count() <= 0:
continue
box = loc.bounding_box(timeout=1200)
if not box:
continue
x = box["x"] + box["width"] * 0.72
y = box["y"] + box["height"] * 0.82
page.mouse.move(x, y)
page.wait_for_timeout(700)
return True
except Exception:
continue
try:
viewport = page.viewport_size or {"width": 1440, "height": 900}
page.mouse.move(viewport["width"] * 0.55, viewport["height"] * 0.78)
page.wait_for_timeout(700)
return True
except Exception:
return False
def player_fullscreen_active(page) -> bool:
try:
return bool(
page.evaluate(
"""
() => Boolean(
document.fullscreenElement ||
document.webkitFullscreenElement ||
document.querySelector(
'.xgplayer-is-fullscreen,.xgplayer-is-cssfullscreen,.xgplayer-fullscreen-active'
)
)
"""
)
)
except Exception:
return False
def maybe_click_player_fullscreen(page) -> bool:
reveal_player_controls(page)
candidates = [
".xgplayer-icon:has(.xg-get-fullscreen)",
".xgplayer-icon .xg-get-fullscreen",
".xg-get-fullscreen",
".xgplayer-fullscreen",
"[aria-label*='全屏']",
"[title*='全屏']",
"button:has-text('全屏')",
]
for selector in candidates:
try:
reveal_player_controls(page)
loc = page.locator(selector).first
if loc.count() <= 0:
continue
try:
loc.scroll_into_view_if_needed(timeout=1800)
except Exception:
pass
try:
loc.hover(timeout=1200)
except Exception:
pass
loc.click(timeout=1800)
page.wait_for_timeout(1600)
return True
except Exception:
continue
try:
points = page.evaluate(
"""
() => {
const visible = (el) => {
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
return r.width > 4 && r.height > 4 && s.visibility !== 'hidden' &&
s.display !== 'none' && r.bottom > 0 && r.right > 0;
};
const nodes = [
...document.querySelectorAll(
'.xg-get-fullscreen,.xgplayer-fullscreen,[aria-label*="全屏"],[title*="全屏"]'
)
];
const points = [];
for (const node of nodes) {
const target = node.closest('button,[role="button"],[tabindex],.xgplayer-icon') || node;
if (!visible(target)) continue;
const r = target.getBoundingClientRect();
points.push({
x: Math.round(r.left + r.width / 2),
y: Math.round(r.top + r.height / 2),
});
}
return points;
}
"""
)
for point in points or []:
x = point.get("x")
y = point.get("y")
if not isinstance(x, int) or not isinstance(y, int):
continue
reveal_player_controls(page)
page.mouse.move(x, y)
page.mouse.click(x, y)
page.wait_for_timeout(1600)
return True
except Exception:
pass
try:
reveal_player_controls(page)
page.keyboard.press("f")
page.wait_for_timeout(1200)
if player_fullscreen_active(page):
return True
except Exception:
pass
return False
def maybe_click_comments(page) -> bool:
if wait_for_comment_panel(page, 800):
return True
candidates = [
"[data-e2e='feed-comment-icon']",
"[data-e2e='feed-comment']",
"[data-e2e='comment-icon']",
"[data-e2e='video-comment']",
"[aria-label*='评论']",
"[title*='评论']",
"button:has-text('评论')",
"[role='button']:has-text('评论')",
"text=评论",
]
for selector in candidates:
try:
loc = page.locator(selector).first
if loc.count() > 0:
try:
loc.scroll_into_view_if_needed(timeout=1800)
except Exception:
pass
loc.click(timeout=1200)
if wait_for_comment_panel(page, 2200):
return True
except Exception:
continue
try:
points = page.evaluate(
"""
() => {
const hasPanel = () => Boolean(
[...document.querySelectorAll("[data-e2e='comment-list'],.comment-mainContent")]
.some((el) => {
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
const width = Math.min(r.right, innerWidth) - Math.max(r.left, 0);
const height = Math.min(r.bottom, innerHeight) - Math.max(r.top, 0);
return r.width > 120 && r.height > 120 && width > 80 && height > 80 &&
s.display !== 'none' && s.visibility !== 'hidden';
})
);
if (hasPanel()) return [{ alreadyOpen: true }];
const visible = (el) => {
const r = el.getBoundingClientRect();
const s = getComputedStyle(el);
return r.width > 6 && r.height > 6 && s.visibility !== 'hidden' &&
s.display !== 'none' && r.bottom > 0 && r.right > 0 &&
r.top < innerHeight && r.left < innerWidth;
};
const points = [];
const addPoint = (el, reason) => {
let target = el.closest('button,[role="button"],[tabindex],a') || el;
for (let i = 0; i < 5 && target.parentElement; i += 1) {
const current = target.getBoundingClientRect();
const parent = target.parentElement.getBoundingClientRect();
const clickableParent = target.parentElement.matches(
'button,[role="button"],[tabindex],a'
);
if (clickableParent || (parent.width <= 140 && parent.height <= 140 &&
parent.width >= current.width && parent.height >= current.height)) {
target = target.parentElement;
}
}
if (!visible(target)) target = el;
if (!visible(target)) return;
const r = target.getBoundingClientRect();
points.push({
x: Math.round(r.left + r.width / 2),
y: Math.round(r.top + r.height / 2),
reason,
rightBias: r.left > innerWidth * 0.45 ? 1 : 0,
size: Math.round(r.width * r.height),
});
};
const textNodes = [
...document.querySelectorAll('button,[role="button"],[tabindex],[aria-label],[title],span')
];
for (const el of textNodes) {
const text = [
el.innerText || '',
el.getAttribute('aria-label') || '',
el.getAttribute('title') || ''
].join(' ').trim();
if (!/评论/.test(text) || /评论区|评论列表|暂无评论/.test(text)) continue;
addPoint(el, 'text');
}
for (const svg of document.querySelectorAll('svg[viewBox="0 0 99 99"]')) {
const d = [...svg.querySelectorAll('path')]
.map((p) => p.getAttribute('d') || '')
.join(' ');
if (!d.includes('M-5.79,5.98') && !d.includes('C-3.56,3.75')) {
continue;
}
addPoint(svg, 'comment-svg-99');
}
return points.sort((a, b) =>
(b.rightBias - a.rightBias) || (a.size - b.size) || (a.y - b.y)
);
}
"""
)
for point in points or []:
if point.get("alreadyOpen"):
return True
x = point.get("x")
y = point.get("y")
if not isinstance(x, int) or not isinstance(y, int):
continue
page.mouse.move(x, y)
page.mouse.click(x, y)
if wait_for_comment_panel(page, 2400):
return True
except Exception:
pass
return False
def move_mouse_into_box(page, box: dict) -> None:
viewport = page.viewport_size or {"width": 1440, "height": 900}
x = min(max(box["x"] + box["width"] / 2, 8), viewport["width"] - 8)
y = min(max(box["y"] + min(box["height"] / 2, 260), 8), viewport["height"] - 8)
page.mouse.move(x, y)
def bring_comments_into_view(page) -> bool:
for _attempt in range(4):
for selector in COMMENT_SCROLLERS:
try:
locator = page.locator(selector).first
if locator.count() <= 0:
continue
locator.scroll_into_view_if_needed(timeout=5000)
page.wait_for_timeout(1800)
scroller = page.query_selector(selector)
if scroller:
box = scroller.bounding_box()
if box:
move_mouse_into_box(page, box)
return True
except Exception:
continue
try:
page.evaluate(
"window.scrollBy(0, Math.round(window.innerHeight * 0.68))"
)
page.wait_for_timeout(1700)
except Exception:
pass
return False
def dom_comment_item_count(page) -> int:
try:
return len(page.query_selector_all("[data-e2e='comment-item']"))
except Exception:
return 0
def comments_end_reached(page) -> bool:
try:
return bool(
page.evaluate(
"() => document.body && document.body.innerText.includes('暂时没有更多评论')"
)
)
except Exception:
return False
def scroll_comment_panel(page) -> bool:
try:
handle = page.evaluate_handle(
"""
() => {
const selectors = [
"[data-e2e='comment-list']",
".comment-mainContent",
"[data-e2e='detail-comment']",
".comment-list",
".ESlRWJ2j"
];
const scrollable = (el) => {
if (!el) return false;
const s = getComputedStyle(el);
return el.scrollHeight > el.clientHeight + 20 ||
/(auto|scroll)/.test(s.overflowY || '');
};
for (const selector of selectors) {
let el = document.querySelector(selector);
while (el && el !== document.body) {
if (scrollable(el) && el.clientHeight > 120) return el;
el = el.parentElement;
}
}
return document.scrollingElement || document.documentElement;
}
"""
)
element = handle.as_element()
except Exception:
element = None
try:
if element:
box = element.bounding_box()
if box:
move_mouse_into_box(page, box)
page.mouse.wheel(0, 2800)
page.evaluate(
"""
(e) => {
const step = Math.max(900, Math.floor((e.clientHeight || innerHeight) * 1.35));
if (typeof e.scrollBy === 'function') {
e.scrollBy(0, step);
} else {
e.scrollTop += step;
}
}
""",
element,
)
return True
except Exception:
pass
try:
page.mouse.wheel(0, 2800)
page.keyboard.press("End")
return True
except Exception:
return False
def launch_context(playwright, args):
profile_dir = Path(args.profile_dir).expanduser()
profile_dir.mkdir(parents=True, exist_ok=True)
unlock_profile(profile_dir)
launch_args = {
"user_data_dir": str(profile_dir),
"headless": args.headless,
"args": CHROME_ARGS,
"ignore_default_args": ["--enable-automation"],
"user_agent": UA,
"locale": "zh-CN",
"viewport": {"width": args.width, "height": args.height},
"slow_mo": args.slow_ms,
}
if args.browser_channel:
launch_args["channel"] = args.browser_channel
try:
return playwright.chromium.launch_persistent_context(**launch_args)
except Exception as exc:
if not args.browser_channel:
raise
log(f"指定 channel={args.browser_channel!r} 启动失败,回退到 Playwright Chromium: {exc}")
launch_args.pop("channel", None)
return playwright.chromium.launch_persistent_context(**launch_args)
def finalize_run(
out_dir: Path,
raw_api: list[dict],
api_events: list[dict],
dom_comments: list[dict],
args,
keyword: str,
logged_in: bool,
) -> dict:
notes = parse_douyin_notes(raw_api, args.name, keyword)
comments = parse_douyin_comments(raw_api, args.name, keyword)
records = notes + comments
endpoint_kinds = sorted({row.get("kind", "") for row in api_events})
summary = {
"ok": True,
"logged_in": logged_in,
"name": args.name,
"place_natural_key": args.place_key,
"keyword": keyword,
"note_count": len(notes),
"comment_count": len(comments),
"dom_comment_snippet_count": len(dom_comments),
"api_event_count": len(api_events),
"raw_api_count": len(raw_api),
"api_kinds": endpoint_kinds,
"comment_api_event_count": sum(
1 for row in api_events if row.get("kind") == "comment"
),
"search_api_event_count": sum(
1 for row in api_events if row.get("kind") == "search"
),
}
files = save_artifacts(
out_dir,
records,
api_events,
raw_api,
dom_comments,
summary,
args.place_key,
)
log(
"完成: "
f"视频 {len(notes)} 条, API评论 {len(comments)} 条, "
f"DOM评论片段 {len(dom_comments)} 条, API事件 {len(api_events)}"
)
log(f"CSV: {files['social_evidence_csv']}")
if summary["comment_api_event_count"] == 0:
log("诊断提示: 没有捕到 comment API优先看页面是否进入详情和评论区是否展开。")
elif len(comments) == 0:
log("诊断提示: 捕到 comment API 但解析为 0优先看 raw_api.jsonl 的响应结构。")
return summary
def run_probe(args) -> dict:
from playwright.sync_api import sync_playwright
keyword = args.keyword or f"贵阳 {args.name}".strip()
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
out_dir = Path(args.out_dir).expanduser() / run_id
raw_api: list[dict] = []
api_events: list[dict] = []
dom_comments: list[dict] = []
captured_urls: set[str] = set()
event_seq = 0
log(f"关键词: {keyword}")
log(f"输出目录: {out_dir}")
log("即将打开可视化 Chrome 窗口,后续搜索/点视频/滚评论都会在窗口里可见。")
forced_candidates = []
if args.video_url:
forced_candidates = [
{"href": args.video_url, "link": None, "source": "arg"}
]
with sync_playwright() as p:
ctx = launch_context(p, args)
ctx.add_init_script(STEALTH_JS)
def on_response(resp):
nonlocal event_seq
url = resp.url
if not any(pattern in url for pattern in DIAG_PATTERNS):
return
body = None
shape = {}
matched = any(pattern in url for pattern in LISTEN_PATTERNS)
try:
body = resp.json()
shape = response_shape(body)
except Exception as exc:
shape = {"body_type": "non_json", "keys": str(exc)[:80]}
kind = (
"comment"
if "/comment/" in url or "/comment/list" in url
else "search"
if "/search" in url
else "aweme"
)
event_seq += 1
event = {
"seq": event_seq,
"captured_at": datetime.now().isoformat(timespec="seconds"),
"status": resp.status,
"kind": kind,
"url": url,
**shape,
}
api_events.append(event)
if matched and isinstance(body, dict):
raw_api.append(
{
"__url": url,
"__status": resp.status,
"__captured_at": event["captured_at"],
"__body": body,
}
)
if url not in captured_urls:
captured_urls.add(url)
cc = event.get("comment_items") or 0
ac = event.get("aweme_items") or 0
log(f"API {kind} status={resp.status} comments={cc} aweme={ac} {url[:120]}")
ctx.on("response", on_response)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
page.set_default_timeout(15000)
logged_in = True
if forced_candidates:
search_url = args.video_url
log(f"跳过搜索,直接诊断视频: {args.video_url}")
else:
search_url = SEARCH_URL.format(kw=quote(keyword))
page.goto(search_url, wait_until="domcontentloaded", timeout=60000)
page.wait_for_timeout(args.search_wait_ms)
html = page.content()
cards = page.query_selector_all("a[href*='/video/']")
logged_in = not (
(not cards)
and ("扫码登录" in html or "手机号登录" in html
or "验证" in html or "/passport" in page.url)
)
if not logged_in:
log("看起来遇到登录墙/验证页;脚本仍会保存诊断文件。")
for i in range(args.search_scrolls):
page.mouse.wheel(0, 2600)
page.wait_for_timeout(int(random.uniform(1000, 1800)))
cards = page.query_selector_all("a[href*='/video/']")
log(
f"搜索页滚动 {i + 1}/{args.search_scrolls}: "
f"当前 video 链接 {len(cards)}"
)
if len(cards) >= args.max_search_links:
break
deadline = time.time() + args.search_api_wait_ms / 1000
while time.time() < deadline:
candidates = detail_candidates(page, raw_api, args.name, keyword)
if candidates:
log(
f"搜索候选已就绪: {len(candidates)} "
f"(DOM/API 混合,等待后进入深采)"
)
break
page.wait_for_timeout(1000)
search_url = page.url
for idx in range(args.max_notes):
candidates = forced_candidates or detail_candidates(
page, raw_api, args.name, keyword
)
if idx >= len(candidates):
log(
f"视频候选不足,停止深采: idx={idx}, "
f"candidates={len(candidates)}"
)
break
candidate = candidates[idx]
href = candidate["href"]
link = candidate.get("link")
before_comments = comment_count_from_raw(raw_api)
log(
f"打开第 {idx + 1}/{args.max_notes} 个视频 "
f"({candidate['source']}): {href}"
)
try:
if link:
click_or_goto_detail(page, link, href)
else:
goto_detail_url(page, href)
try:
page.wait_for_load_state("domcontentloaded", timeout=30000)
except Exception:
if "/video/" not in page.url:
raise
page.wait_for_timeout(args.detail_wait_ms)
if not args.skip_fullscreen:
if maybe_click_player_fullscreen(page):
log(" 已尝试点击播放器全屏按钮")
page.wait_for_timeout(args.fullscreen_wait_ms)
else:
log(" 未找到播放器全屏按钮,继续尝试普通详情页评论")
if maybe_click_comments(page):
log(" 评论面板已确认打开")
else:
log(" 未确认评论面板打开,继续尝试滚入评论区")
if bring_comments_into_view(page):
log(" 评论区已滚入视口,等待首屏评论 API")
page.wait_for_timeout(args.comment_wait_ms)
stall = 0
previous = comment_count_from_raw(raw_api)
previous_dom = dom_comment_item_count(page)
for round_idx in range(args.comment_scrolls):
scroll_comment_panel(page)
page.wait_for_timeout(int(random.uniform(1500, 2400)))
current = comment_count_from_raw(raw_api)
current_dom = dom_comment_item_count(page)
log(
f" 评论滚动 {round_idx + 1}/{args.comment_scrolls}: "
f"API评论累计 {current}, DOM评论项 {current_dom}"
)
if comments_end_reached(page):
log(" 页面提示暂时没有更多评论,停止滚动。")
break
if current > previous or current_dom > previous_dom:
previous = current
previous_dom = current_dom
stall = 0
else:
stall += 1
if stall >= args.stall_rounds:
break
dom_comments.extend(collect_dom_comments(page, href or page.url))
after_comments = comment_count_from_raw(raw_api)
log(
f"{idx + 1} 个视频结束: 新增 API 评论 "
f"{after_comments - before_comments}"
)
except Exception as exc:
log(f"{idx + 1} 个视频深采异常: {exc}")
finally:
try:
if forced_candidates:
continue
page.goto(search_url, wait_until="domcontentloaded", timeout=45000)
page.wait_for_timeout(int(random.uniform(1500, 2300)))
except Exception:
pass
summary = finalize_run(
out_dir, raw_api, api_events, dom_comments, args, keyword, logged_in
)
if args.keep_open_seconds > 0 and not args.headless and not args.leave_open:
log(f"保留浏览器 {args.keep_open_seconds}s方便最后查看页面状态。")
page.wait_for_timeout(args.keep_open_seconds * 1000)
if args.leave_open and not args.headless:
log("已保存 CSV/JSONLChrome 将保持打开。终端按 Ctrl+C 才会关闭浏览器。")
try:
while True:
page.wait_for_timeout(60000)
except KeyboardInterrupt:
log("收到 Ctrl+C准备关闭浏览器。")
ctx.close()
return summary
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run a visible Douyin collection probe and save CSV diagnostics."
)
parser.add_argument("--name", default="老凯里酸汤鱼", help="实体名称")
parser.add_argument("--keyword", default="", help="搜索关键词,默认“贵阳 {name}")
parser.add_argument("--video-url", default="", help="跳过搜索,直接诊断单个视频 URL")
parser.add_argument(
"--place-key",
default="place-lao-kaili-sourfish",
help="写入 CSV 的 place_natural_key/eid",
)
parser.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR), help="输出根目录")
parser.add_argument(
"--profile-dir",
default=str(DEFAULT_PROFILE_DIR),
help="抖音持久化登录 profile 目录",
)
parser.add_argument(
"--browser-channel",
default="chrome",
help="Playwright channel默认 chrome失败会回退 chromium",
)
parser.add_argument("--headless", action="store_true", help="改为无头运行")
parser.add_argument("--width", type=int, default=1440)
parser.add_argument("--height", type=int, default=900)
parser.add_argument("--slow-ms", type=int, default=220, help="可视化慢动作毫秒")
parser.add_argument("--search-wait-ms", type=int, default=4500)
parser.add_argument("--search-api-wait-ms", type=int, default=12000)
parser.add_argument("--detail-wait-ms", type=int, default=3800)
parser.add_argument("--fullscreen-wait-ms", type=int, default=1800)
parser.add_argument("--comment-wait-ms", type=int, default=3500)
parser.add_argument("--search-scrolls", type=int, default=6)
parser.add_argument("--max-search-links", type=int, default=40)
parser.add_argument("--max-notes", type=int, default=3)
parser.add_argument("--comment-scrolls", type=int, default=30)
parser.add_argument("--stall-rounds", type=int, default=4)
parser.add_argument("--keep-open-seconds", type=int, default=12)
parser.add_argument(
"--skip-fullscreen",
action="store_true",
help="不点击播放器全屏按钮,直接在普通详情页尝试评论区",
)
parser.add_argument(
"--leave-open",
action="store_true",
help="采集结束后不关闭 Chrome保持 Python 进程等待 Ctrl+C",
)
return parser
def main() -> int:
args = build_parser().parse_args()
try:
summary = run_probe(args)
except KeyboardInterrupt:
log("用户中断。")
return 130
except Exception as exc:
log(f"运行失败: {exc}")
return 1
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())