#!/usr/bin/env python3 """Visible Douyin crawler probe. This is a standalone diagnostic copy of the douyin_agent collection flow. It does not import or modify app/agents/douyin_agent.py. The script opens a headed Chrome/Chromium window, lets the browser produce signed Douyin requests, records the relevant API traffic, parses videos/comments, and saves project-compatible CSV artifacts for offline inspection. """ from __future__ import annotations import argparse import csv import json import os import random import sys import time from datetime import datetime from pathlib import Path from urllib.parse import parse_qs, quote, urlparse ROOT = Path(__file__).resolve().parents[1] DEFAULT_PROFILE_DIR = Path(os.path.expanduser("~/.zn-kg/douyin-profile")) DEFAULT_OUT_DIR = ROOT / "data" / "douyin_probe" SEARCH_URL = "https://www.douyin.com/search/{kw}?type=general" UA = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) CHROME_ARGS = [ "--disable-blink-features=AutomationControlled", "--no-first-run", "--no-default-browser-check", "--disable-sync", "--disable-default-apps", "--no-sandbox", "--disable-dev-shm-usage", ] STEALTH_JS = r""" (() => { const d=(o,k,g)=>{try{Object.defineProperty(o,k,{get:g,configurable:true})}catch(e){}}; d(Navigator.prototype,'webdriver',()=>undefined); d(Navigator.prototype,'languages',()=>['zh-CN','zh','en']); d(Navigator.prototype,'language',()=>'zh-CN'); d(Navigator.prototype,'vendor',()=>'Google Inc.'); if(!window.chrome){try{Object.defineProperty(window,'chrome',{value:{runtime:{}},configurable:true})}catch(e){}} try{window.chrome.app={isInstalled:false};window.chrome.csi=()=>({});window.chrome.loadTimes=()=>({});}catch(e){} const mk=(a)=>{a.item=(i)=>a[i]||null;a.namedItem=(n)=>a.find(x=>x.name===n)||null;return a;}; const plugins=mk([{name:'Chrome PDF Plugin'},{name:'Chrome PDF Viewer'},{name:'Native Client'}]); d(Navigator.prototype,'plugins',()=>plugins); const q=navigator.permissions&&navigator.permissions.query; if(q){navigator.permissions.query=(p)=>p&&p.name==='notifications' ?Promise.resolve({state:Notification.permission,onchange:null}):q(p);} const pw=(proto)=>{if(!proto||!proto.getParameter)return;const o=proto.getParameter; proto.getParameter=function(p){if(p===37445)return 'Intel Inc.'; if(p===37446)return 'Intel Iris OpenGL Engine';return o.apply(this,arguments);};}; pw(window.WebGLRenderingContext&&WebGLRenderingContext.prototype); pw(window.WebGL2RenderingContext&&WebGL2RenderingContext.prototype); })(); """ LISTEN_PATTERNS = [ "/aweme/v1/web/aweme/detail", "/aweme/v1/web/comment/list", "/aweme/v1/web/comment/list/reply", "/aweme/v1/web/general/search/stream", "/aweme/v1/web/general/search/single", "/aweme/v1/web/search/item", "/aweme/v1/web/general/search", ] DIAG_PATTERNS = [ "https://www.douyin.com/aweme/v1/web/", "https://www.douyin.com/search/", ] COMMENT_SCROLLERS = ( "[data-e2e='comment-list']", "[data-e2e='detail-comment']", ".comment-list", ".comment-mainContent", ".ESlRWJ2j", ) DOM_COMMENT_SELECTORS = ( "[data-e2e='comment-item']", "[data-e2e='comment-text']", ".comment-mainContent", ".comment-item", ".comment-text", ) EVIDENCE_COLUMNS = [ "platform", "kind", "source_id", "url", "entity_name", "place_natural_key", "keyword", "title", "content", "author", "author_id", "author_avatar", "likes", "comments", "collects", "shares", "publish_time", "location", "tags", "image_urls", "raw_json", ] VIDEO_METRIC_COLUMNS = [ "platform", "source_id", "url", "entity_name", "place_natural_key", "keyword", "video_title", "video_author", "video_author_id", "video_publish_time", "video_publish_timestamp", "video_like_count", "video_comment_count", "video_collect_count", "video_share_count", ] def log(message: str) -> None: ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] {message}", flush=True) def unlock_profile(profile_dir: Path) -> None: for name in ("SingletonLock", "SingletonCookie", "SingletonSocket"): try: (profile_dir / name).unlink() except OSError: pass def scalar(obj, *keys, default=""): if not isinstance(obj, dict): return default for key in keys: value = obj.get(key) if value not in (None, "", [], {}): return value return default def first_avatar(user: dict) -> str: if not isinstance(user, dict): return "" avatar = user.get("avatar_thumb") or user.get("avatar_medium") or {} urls = avatar.get("url_list") if isinstance(avatar, dict) else None if isinstance(urls, list) and urls: return str(urls[0] or "") return "" def as_int(value) -> int: try: if value in (None, ""): return 0 return int(value) except (TypeError, ValueError): return 0 def format_publish_time(value) -> str: try: ts = int(value) except (TypeError, ValueError): return str(value or "") if ts <= 0: return "" if ts > 10_000_000_000: ts = ts // 1000 try: return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") except (OSError, OverflowError, ValueError): return str(value or "") def body_of(entry: dict): if not isinstance(entry, dict): return {} return entry.get("__body") if "__body" in entry else entry def source_url_of(entry: dict) -> str: if not isinstance(entry, dict): return "" return str(entry.get("__url") or entry.get("__probe_url") or "") def aweme_id_from_url(url: str) -> str: try: qs = parse_qs(urlparse(url).query) except Exception: return "" for key in ("aweme_id", "item_id", "group_id"): vals = qs.get(key) if vals: return str(vals[0] or "") return "" def iter_lists_by_key(obj, keys: set[str]): if isinstance(obj, dict): for key, value in obj.items(): if key in keys and isinstance(value, list): yield value elif isinstance(value, (dict, list)): yield from iter_lists_by_key(value, keys) elif isinstance(obj, list): for item in obj: yield from iter_lists_by_key(item, keys) def parse_douyin_notes(raw_api: list[dict], name: str, keyword: str) -> list[dict]: out: list[dict] = [] seen: set[str] = set() for entry in raw_api or []: body = body_of(entry) if not isinstance(body, dict): continue candidates: list = [] data = body.get("data") data_dict = data if isinstance(data, dict) else {} for value in ( data if isinstance(data, list) else None, body.get("aweme_detail"), body.get("aweme_list"), data_dict.get("aweme_detail"), data_dict.get("aweme_list"), data_dict.get("data"), ): if isinstance(value, dict): candidates.append(value) elif isinstance(value, list): candidates.extend(value) for nested in iter_lists_by_key(body, {"aweme_list"}): candidates.extend(nested) for item in candidates: if not isinstance(item, dict): continue aweme = item.get("aweme_info") or item.get("aweme") or item if not isinstance(aweme, dict): continue aid = scalar(aweme, "aweme_id", "group_id", "awemeId") if not aid or str(aid) in seen: continue seen.add(str(aid)) author = aweme.get("author") or {} stat = aweme.get("statistics") or {} out.append( { "platform": "douyin", "kind": "note", "source_id": str(aid), "url": f"https://www.douyin.com/video/{aid}", "entity_name": name, "keyword": keyword, "title": scalar(aweme, "desc", "title", "caption"), "content": scalar(aweme, "desc", "content"), "author": scalar(author, "nickname", "name"), "author_id": str( scalar(author, "uid", "sec_uid", "unique_id") ), "author_avatar": first_avatar(author), "likes": as_int(scalar(stat, "digg_count", "admire_count", default=0)), "comments": as_int(scalar(stat, "comment_count", default=0)), "collects": as_int( scalar( stat, "collect_count", "favorite_count", "collects_count", default=0, ) ), "shares": as_int( scalar(stat, "share_count", "share_count_reflow", default=0) ), "publish_time": str(scalar(aweme, "create_time", default="")), "location": "", "tags": [], "image_urls": [], "raw": item, } ) return out def parse_douyin_comments(raw_api: list[dict], name: str, keyword: str) -> list[dict]: out: list[dict] = [] seen: set[str] = set() for entry in raw_api or []: body = body_of(entry) if not isinstance(body, dict): continue source_url = source_url_of(entry) fallback_aweme_id = aweme_id_from_url(source_url) comment_lists = list( iter_lists_by_key(body, {"comments", "comment_list", "reply_comments"}) ) for comments in comment_lists: for comment in comments: if not isinstance(comment, dict): continue cid = scalar(comment, "cid", "comment_id", "id") text = scalar(comment, "text", "content") if not cid or not text or str(cid) in seen: continue seen.add(str(cid)) user = comment.get("user") or comment.get("user_info") or {} aweme_id = scalar(comment, "aweme_id", default=fallback_aweme_id) replies = scalar( comment, "reply_comment_total", "reply_total", default=0 ) out.append( { "platform": "douyin", "kind": "comment", "source_id": str(cid), "url": f"https://www.douyin.com/video/{aweme_id}", "entity_name": name, "keyword": keyword, "title": "", "content": text, "author": scalar(user, "nickname", "name"), "author_id": str(scalar(user, "uid", "sec_uid")), "author_avatar": first_avatar(user), "likes": as_int(scalar(comment, "digg_count", "like_count", default=0)), "comments": as_int(replies), "collects": 0, "shares": 0, "publish_time": str( scalar(comment, "create_time", default="") ), "location": scalar(comment, "ip_label", default=""), "tags": [], "image_urls": [], "raw": comment, } ) return out def response_shape(payload) -> dict: body = payload if isinstance(payload, dict) and "__body" in payload: body = payload["__body"] if not isinstance(body, dict): return {"body_type": type(body).__name__, "keys": ""} data = body.get("data") data_type = type(data).__name__ comment_count = 0 aweme_count = 0 for comments in iter_lists_by_key(body, {"comments", "comment_list", "reply_comments"}): comment_count += len(comments) for awemes in iter_lists_by_key(body, {"aweme_list"}): aweme_count += len(awemes) if isinstance(data, list): aweme_count += len(data) return { "body_type": "dict", "data_type": data_type, "keys": ",".join(list(body.keys())[:16]), "comment_items": comment_count, "aweme_items": aweme_count, "status_code": body.get("status_code", ""), "status_msg": body.get("status_msg", ""), } def write_csv(path: Path, rows: list[dict], columns: list[str] | None = None) -> None: path.parent.mkdir(parents=True, exist_ok=True) if columns is None: keys: list[str] = [] for row in rows: for key in row.keys(): if key not in keys: keys.append(key) columns = keys with path.open("w", newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=columns, extrasaction="ignore") writer.writeheader() writer.writerows(rows) def evidence_csv_rows(records: list[dict], place_key: str) -> list[dict]: rows = [] for record in records: row = {key: record.get(key, "") for key in EVIDENCE_COLUMNS} row["place_natural_key"] = place_key row["tags"] = json.dumps(record.get("tags") or [], ensure_ascii=False) row["image_urls"] = json.dumps( record.get("image_urls") or [], ensure_ascii=False ) row["raw_json"] = json.dumps(record.get("raw") or {}, ensure_ascii=False) rows.append(row) return rows def video_metric_rows(records: list[dict], place_key: str) -> list[dict]: rows = [] for record in records: if record.get("kind") != "note": continue publish_time = record.get("publish_time", "") rows.append( { "platform": record.get("platform", "douyin"), "source_id": record.get("source_id", ""), "url": record.get("url", ""), "entity_name": record.get("entity_name", ""), "place_natural_key": place_key, "keyword": record.get("keyword", ""), "video_title": record.get("title", "") or record.get("content", ""), "video_author": record.get("author", ""), "video_author_id": record.get("author_id", ""), "video_publish_time": format_publish_time(publish_time), "video_publish_timestamp": publish_time, "video_like_count": record.get("likes", 0), "video_comment_count": record.get("comments", 0), "video_collect_count": record.get("collects", 0), "video_share_count": record.get("shares", 0), } ) return rows def save_artifacts( out_dir: Path, records: list[dict], api_events: list[dict], raw_api: list[dict], dom_comments: list[dict], summary: dict, place_key: str, ) -> dict: out_dir.mkdir(parents=True, exist_ok=True) evidence_rows = evidence_csv_rows(records, place_key) note_rows = [row for row in evidence_rows if row.get("kind") != "comment"] comment_rows = [row for row in evidence_rows if row.get("kind") == "comment"] files = { "social_evidence_csv": out_dir / "social_evidence.csv", "videos_csv": out_dir / "videos.csv", "video_metrics_csv": out_dir / "video_metrics.csv", "comments_csv": out_dir / "comments.csv", "api_urls_csv": out_dir / "api_urls.csv", "dom_comments_csv": out_dir / "dom_comments.csv", "raw_api_jsonl": out_dir / "raw_api.jsonl", "summary_json": out_dir / "run_summary.json", } write_csv(files["social_evidence_csv"], evidence_rows, EVIDENCE_COLUMNS) write_csv(files["videos_csv"], note_rows, EVIDENCE_COLUMNS) write_csv( files["video_metrics_csv"], video_metric_rows(records, place_key), VIDEO_METRIC_COLUMNS, ) write_csv(files["comments_csv"], comment_rows, EVIDENCE_COLUMNS) write_csv( files["api_urls_csv"], api_events, [ "seq", "captured_at", "status", "kind", "body_type", "data_type", "comment_items", "aweme_items", "status_code", "status_msg", "keys", "url", ], ) write_csv(files["dom_comments_csv"], dom_comments) with files["raw_api_jsonl"].open("w", encoding="utf-8") as f: for entry in raw_api: f.write(json.dumps(entry, ensure_ascii=False) + "\n") summary["files"] = {key: str(value) for key, value in files.items()} with files["summary_json"].open("w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2) return summary["files"] def collect_dom_comments(page, video_url: str) -> list[dict]: rows: list[dict] = [] seen: set[str] = set() for selector in DOM_COMMENT_SELECTORS: try: elements = page.query_selector_all(selector) except Exception: continue for element in elements: try: text = " ".join((element.inner_text() or "").split()) except Exception: continue if len(text) < 2 or text in seen: continue seen.add(text) rows.append( { "source": "dom", "selector": selector, "video_url": video_url, "content": text[:1000], } ) return rows def comment_count_from_raw(raw_api: list[dict]) -> int: total = 0 for entry in raw_api: body = body_of(entry) if not isinstance(body, dict): continue for comments in iter_lists_by_key(body, {"comments", "comment_list", "reply_comments"}): total += len(comments) return total def click_or_goto_detail(page, link, href: str) -> None: try: with page.expect_navigation(timeout=15000): link.click() return except Exception: pass try: link.click() page.wait_for_timeout(1800) if "/video/" in page.url: return except Exception: pass if href: goto_detail_url(page, href) def goto_detail_url(page, href: str) -> None: try: page.goto(href, wait_until="commit", timeout=30000) except Exception: if "/video/" not in page.url: raise def comment_panel_present(page) -> bool: try: return bool( page.evaluate( """ (selectors) => { const visibleInViewport = (el) => { if (!el) return false; const r = el.getBoundingClientRect(); const s = getComputedStyle(el); const width = Math.min(r.right, innerWidth) - Math.max(r.left, 0); const height = Math.min(r.bottom, innerHeight) - Math.max(r.top, 0); return r.width > 120 && r.height > 120 && width > 80 && height > 80 && s.display !== 'none' && s.visibility !== 'hidden' && Number(s.opacity || '1') > 0.05; }; for (const selector of selectors) { for (const el of document.querySelectorAll(selector)) { if (visibleInViewport(el)) return true; } } return false; } """, list(COMMENT_SCROLLERS), ) ) except Exception: return False def wait_for_comment_panel(page, timeout_ms: int = 3000) -> bool: deadline = time.time() + timeout_ms / 1000 while time.time() < deadline: if comment_panel_present(page): return True page.wait_for_timeout(180) return comment_panel_present(page) def detail_candidates(page, raw_api: list[dict], name: str, keyword: str) -> list[dict]: candidates: list[dict] = [] seen: set[str] = set() try: links = page.query_selector_all("a[href*='/video/']") except Exception: links = [] for link in links: try: href = link.get_attribute("href") or "" except Exception: continue if href.startswith("//"): href = "https:" + href elif href.startswith("/"): href = "https://www.douyin.com" + href if not href or href in seen: continue seen.add(href) candidates.append({"href": href, "link": link, "source": "dom"}) for note in parse_douyin_notes(raw_api, name, keyword): href = note.get("url") or "" if not href or href in seen: continue seen.add(href) candidates.append({"href": href, "link": None, "source": "api"}) return candidates def reveal_player_controls(page) -> bool: player_selectors = ( ".xgplayer", "[class*='xgplayer']", "video", ) for selector in player_selectors: try: loc = page.locator(selector).first if loc.count() <= 0: continue box = loc.bounding_box(timeout=1200) if not box: continue x = box["x"] + box["width"] * 0.72 y = box["y"] + box["height"] * 0.82 page.mouse.move(x, y) page.wait_for_timeout(700) return True except Exception: continue try: viewport = page.viewport_size or {"width": 1440, "height": 900} page.mouse.move(viewport["width"] * 0.55, viewport["height"] * 0.78) page.wait_for_timeout(700) return True except Exception: return False def player_fullscreen_active(page) -> bool: try: return bool( page.evaluate( """ () => Boolean( document.fullscreenElement || document.webkitFullscreenElement || document.querySelector( '.xgplayer-is-fullscreen,.xgplayer-is-cssfullscreen,.xgplayer-fullscreen-active' ) ) """ ) ) except Exception: return False def maybe_click_player_fullscreen(page) -> bool: reveal_player_controls(page) candidates = [ ".xgplayer-icon:has(.xg-get-fullscreen)", ".xgplayer-icon .xg-get-fullscreen", ".xg-get-fullscreen", ".xgplayer-fullscreen", "[aria-label*='全屏']", "[title*='全屏']", "button:has-text('全屏')", ] for selector in candidates: try: reveal_player_controls(page) loc = page.locator(selector).first if loc.count() <= 0: continue try: loc.scroll_into_view_if_needed(timeout=1800) except Exception: pass try: loc.hover(timeout=1200) except Exception: pass loc.click(timeout=1800) page.wait_for_timeout(1600) return True except Exception: continue try: points = page.evaluate( """ () => { const visible = (el) => { const r = el.getBoundingClientRect(); const s = getComputedStyle(el); return r.width > 4 && r.height > 4 && s.visibility !== 'hidden' && s.display !== 'none' && r.bottom > 0 && r.right > 0; }; const nodes = [ ...document.querySelectorAll( '.xg-get-fullscreen,.xgplayer-fullscreen,[aria-label*="全屏"],[title*="全屏"]' ) ]; const points = []; for (const node of nodes) { const target = node.closest('button,[role="button"],[tabindex],.xgplayer-icon') || node; if (!visible(target)) continue; const r = target.getBoundingClientRect(); points.push({ x: Math.round(r.left + r.width / 2), y: Math.round(r.top + r.height / 2), }); } return points; } """ ) for point in points or []: x = point.get("x") y = point.get("y") if not isinstance(x, int) or not isinstance(y, int): continue reveal_player_controls(page) page.mouse.move(x, y) page.mouse.click(x, y) page.wait_for_timeout(1600) return True except Exception: pass try: reveal_player_controls(page) page.keyboard.press("f") page.wait_for_timeout(1200) if player_fullscreen_active(page): return True except Exception: pass return False def maybe_click_comments(page) -> bool: if wait_for_comment_panel(page, 800): return True candidates = [ "[data-e2e='feed-comment-icon']", "[data-e2e='feed-comment']", "[data-e2e='comment-icon']", "[data-e2e='video-comment']", "[aria-label*='评论']", "[title*='评论']", "button:has-text('评论')", "[role='button']:has-text('评论')", "text=评论", ] for selector in candidates: try: loc = page.locator(selector).first if loc.count() > 0: try: loc.scroll_into_view_if_needed(timeout=1800) except Exception: pass loc.click(timeout=1200) if wait_for_comment_panel(page, 2200): return True except Exception: continue try: points = page.evaluate( """ () => { const hasPanel = () => Boolean( [...document.querySelectorAll("[data-e2e='comment-list'],.comment-mainContent")] .some((el) => { const r = el.getBoundingClientRect(); const s = getComputedStyle(el); const width = Math.min(r.right, innerWidth) - Math.max(r.left, 0); const height = Math.min(r.bottom, innerHeight) - Math.max(r.top, 0); return r.width > 120 && r.height > 120 && width > 80 && height > 80 && s.display !== 'none' && s.visibility !== 'hidden'; }) ); if (hasPanel()) return [{ alreadyOpen: true }]; const visible = (el) => { const r = el.getBoundingClientRect(); const s = getComputedStyle(el); return r.width > 6 && r.height > 6 && s.visibility !== 'hidden' && s.display !== 'none' && r.bottom > 0 && r.right > 0 && r.top < innerHeight && r.left < innerWidth; }; const points = []; const addPoint = (el, reason) => { let target = el.closest('button,[role="button"],[tabindex],a') || el; for (let i = 0; i < 5 && target.parentElement; i += 1) { const current = target.getBoundingClientRect(); const parent = target.parentElement.getBoundingClientRect(); const clickableParent = target.parentElement.matches( 'button,[role="button"],[tabindex],a' ); if (clickableParent || (parent.width <= 140 && parent.height <= 140 && parent.width >= current.width && parent.height >= current.height)) { target = target.parentElement; } } if (!visible(target)) target = el; if (!visible(target)) return; const r = target.getBoundingClientRect(); points.push({ x: Math.round(r.left + r.width / 2), y: Math.round(r.top + r.height / 2), reason, rightBias: r.left > innerWidth * 0.45 ? 1 : 0, size: Math.round(r.width * r.height), }); }; const textNodes = [ ...document.querySelectorAll('button,[role="button"],[tabindex],[aria-label],[title],span') ]; for (const el of textNodes) { const text = [ el.innerText || '', el.getAttribute('aria-label') || '', el.getAttribute('title') || '' ].join(' ').trim(); if (!/评论/.test(text) || /评论区|评论列表|暂无评论/.test(text)) continue; addPoint(el, 'text'); } for (const svg of document.querySelectorAll('svg[viewBox="0 0 99 99"]')) { const d = [...svg.querySelectorAll('path')] .map((p) => p.getAttribute('d') || '') .join(' '); if (!d.includes('M-5.79,5.98') && !d.includes('C-3.56,3.75')) { continue; } addPoint(svg, 'comment-svg-99'); } return points.sort((a, b) => (b.rightBias - a.rightBias) || (a.size - b.size) || (a.y - b.y) ); } """ ) for point in points or []: if point.get("alreadyOpen"): return True x = point.get("x") y = point.get("y") if not isinstance(x, int) or not isinstance(y, int): continue page.mouse.move(x, y) page.mouse.click(x, y) if wait_for_comment_panel(page, 2400): return True except Exception: pass return False def move_mouse_into_box(page, box: dict) -> None: viewport = page.viewport_size or {"width": 1440, "height": 900} x = min(max(box["x"] + box["width"] / 2, 8), viewport["width"] - 8) y = min(max(box["y"] + min(box["height"] / 2, 260), 8), viewport["height"] - 8) page.mouse.move(x, y) def bring_comments_into_view(page) -> bool: for _attempt in range(4): for selector in COMMENT_SCROLLERS: try: locator = page.locator(selector).first if locator.count() <= 0: continue locator.scroll_into_view_if_needed(timeout=5000) page.wait_for_timeout(1800) scroller = page.query_selector(selector) if scroller: box = scroller.bounding_box() if box: move_mouse_into_box(page, box) return True except Exception: continue try: page.evaluate( "window.scrollBy(0, Math.round(window.innerHeight * 0.68))" ) page.wait_for_timeout(1700) except Exception: pass return False def dom_comment_item_count(page) -> int: try: return len(page.query_selector_all("[data-e2e='comment-item']")) except Exception: return 0 def comments_end_reached(page) -> bool: try: return bool( page.evaluate( "() => document.body && document.body.innerText.includes('暂时没有更多评论')" ) ) except Exception: return False def scroll_comment_panel(page) -> bool: try: handle = page.evaluate_handle( """ () => { const selectors = [ "[data-e2e='comment-list']", ".comment-mainContent", "[data-e2e='detail-comment']", ".comment-list", ".ESlRWJ2j" ]; const scrollable = (el) => { if (!el) return false; const s = getComputedStyle(el); return el.scrollHeight > el.clientHeight + 20 || /(auto|scroll)/.test(s.overflowY || ''); }; for (const selector of selectors) { let el = document.querySelector(selector); while (el && el !== document.body) { if (scrollable(el) && el.clientHeight > 120) return el; el = el.parentElement; } } return document.scrollingElement || document.documentElement; } """ ) element = handle.as_element() except Exception: element = None try: if element: box = element.bounding_box() if box: move_mouse_into_box(page, box) page.mouse.wheel(0, 2800) page.evaluate( """ (e) => { const step = Math.max(900, Math.floor((e.clientHeight || innerHeight) * 1.35)); if (typeof e.scrollBy === 'function') { e.scrollBy(0, step); } else { e.scrollTop += step; } } """, element, ) return True except Exception: pass try: page.mouse.wheel(0, 2800) page.keyboard.press("End") return True except Exception: return False def launch_context(playwright, args): profile_dir = Path(args.profile_dir).expanduser() profile_dir.mkdir(parents=True, exist_ok=True) unlock_profile(profile_dir) launch_args = { "user_data_dir": str(profile_dir), "headless": args.headless, "args": CHROME_ARGS, "ignore_default_args": ["--enable-automation"], "user_agent": UA, "locale": "zh-CN", "viewport": {"width": args.width, "height": args.height}, "slow_mo": args.slow_ms, } if args.browser_channel: launch_args["channel"] = args.browser_channel try: return playwright.chromium.launch_persistent_context(**launch_args) except Exception as exc: if not args.browser_channel: raise log(f"指定 channel={args.browser_channel!r} 启动失败,回退到 Playwright Chromium: {exc}") launch_args.pop("channel", None) return playwright.chromium.launch_persistent_context(**launch_args) def finalize_run( out_dir: Path, raw_api: list[dict], api_events: list[dict], dom_comments: list[dict], args, keyword: str, logged_in: bool, ) -> dict: notes = parse_douyin_notes(raw_api, args.name, keyword) comments = parse_douyin_comments(raw_api, args.name, keyword) records = notes + comments endpoint_kinds = sorted({row.get("kind", "") for row in api_events}) summary = { "ok": True, "logged_in": logged_in, "name": args.name, "place_natural_key": args.place_key, "keyword": keyword, "note_count": len(notes), "comment_count": len(comments), "dom_comment_snippet_count": len(dom_comments), "api_event_count": len(api_events), "raw_api_count": len(raw_api), "api_kinds": endpoint_kinds, "comment_api_event_count": sum( 1 for row in api_events if row.get("kind") == "comment" ), "search_api_event_count": sum( 1 for row in api_events if row.get("kind") == "search" ), } files = save_artifacts( out_dir, records, api_events, raw_api, dom_comments, summary, args.place_key, ) log( "完成: " f"视频 {len(notes)} 条, API评论 {len(comments)} 条, " f"DOM评论片段 {len(dom_comments)} 条, API事件 {len(api_events)} 条" ) log(f"CSV: {files['social_evidence_csv']}") if summary["comment_api_event_count"] == 0: log("诊断提示: 没有捕到 comment API,优先看页面是否进入详情和评论区是否展开。") elif len(comments) == 0: log("诊断提示: 捕到 comment API 但解析为 0,优先看 raw_api.jsonl 的响应结构。") return summary def run_probe(args) -> dict: from playwright.sync_api import sync_playwright keyword = args.keyword or f"贵阳 {args.name}".strip() run_id = datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = Path(args.out_dir).expanduser() / run_id raw_api: list[dict] = [] api_events: list[dict] = [] dom_comments: list[dict] = [] captured_urls: set[str] = set() event_seq = 0 log(f"关键词: {keyword}") log(f"输出目录: {out_dir}") log("即将打开可视化 Chrome 窗口,后续搜索/点视频/滚评论都会在窗口里可见。") forced_candidates = [] if args.video_url: forced_candidates = [ {"href": args.video_url, "link": None, "source": "arg"} ] with sync_playwright() as p: ctx = launch_context(p, args) ctx.add_init_script(STEALTH_JS) def on_response(resp): nonlocal event_seq url = resp.url if not any(pattern in url for pattern in DIAG_PATTERNS): return body = None shape = {} matched = any(pattern in url for pattern in LISTEN_PATTERNS) try: body = resp.json() shape = response_shape(body) except Exception as exc: shape = {"body_type": "non_json", "keys": str(exc)[:80]} kind = ( "comment" if "/comment/" in url or "/comment/list" in url else "search" if "/search" in url else "aweme" ) event_seq += 1 event = { "seq": event_seq, "captured_at": datetime.now().isoformat(timespec="seconds"), "status": resp.status, "kind": kind, "url": url, **shape, } api_events.append(event) if matched and isinstance(body, dict): raw_api.append( { "__url": url, "__status": resp.status, "__captured_at": event["captured_at"], "__body": body, } ) if url not in captured_urls: captured_urls.add(url) cc = event.get("comment_items") or 0 ac = event.get("aweme_items") or 0 log(f"API {kind} status={resp.status} comments={cc} aweme={ac} {url[:120]}") ctx.on("response", on_response) page = ctx.pages[0] if ctx.pages else ctx.new_page() page.set_default_timeout(15000) logged_in = True if forced_candidates: search_url = args.video_url log(f"跳过搜索,直接诊断视频: {args.video_url}") else: search_url = SEARCH_URL.format(kw=quote(keyword)) page.goto(search_url, wait_until="domcontentloaded", timeout=60000) page.wait_for_timeout(args.search_wait_ms) html = page.content() cards = page.query_selector_all("a[href*='/video/']") logged_in = not ( (not cards) and ("扫码登录" in html or "手机号登录" in html or "验证" in html or "/passport" in page.url) ) if not logged_in: log("看起来遇到登录墙/验证页;脚本仍会保存诊断文件。") for i in range(args.search_scrolls): page.mouse.wheel(0, 2600) page.wait_for_timeout(int(random.uniform(1000, 1800))) cards = page.query_selector_all("a[href*='/video/']") log( f"搜索页滚动 {i + 1}/{args.search_scrolls}: " f"当前 video 链接 {len(cards)}" ) if len(cards) >= args.max_search_links: break deadline = time.time() + args.search_api_wait_ms / 1000 while time.time() < deadline: candidates = detail_candidates(page, raw_api, args.name, keyword) if candidates: log( f"搜索候选已就绪: {len(candidates)} " f"(DOM/API 混合,等待后进入深采)" ) break page.wait_for_timeout(1000) search_url = page.url for idx in range(args.max_notes): candidates = forced_candidates or detail_candidates( page, raw_api, args.name, keyword ) if idx >= len(candidates): log( f"视频候选不足,停止深采: idx={idx}, " f"candidates={len(candidates)}" ) break candidate = candidates[idx] href = candidate["href"] link = candidate.get("link") before_comments = comment_count_from_raw(raw_api) log( f"打开第 {idx + 1}/{args.max_notes} 个视频 " f"({candidate['source']}): {href}" ) try: if link: click_or_goto_detail(page, link, href) else: goto_detail_url(page, href) try: page.wait_for_load_state("domcontentloaded", timeout=30000) except Exception: if "/video/" not in page.url: raise page.wait_for_timeout(args.detail_wait_ms) if not args.skip_fullscreen: if maybe_click_player_fullscreen(page): log(" 已尝试点击播放器全屏按钮") page.wait_for_timeout(args.fullscreen_wait_ms) else: log(" 未找到播放器全屏按钮,继续尝试普通详情页评论") if maybe_click_comments(page): log(" 评论面板已确认打开") else: log(" 未确认评论面板打开,继续尝试滚入评论区") if bring_comments_into_view(page): log(" 评论区已滚入视口,等待首屏评论 API") page.wait_for_timeout(args.comment_wait_ms) stall = 0 previous = comment_count_from_raw(raw_api) previous_dom = dom_comment_item_count(page) for round_idx in range(args.comment_scrolls): scroll_comment_panel(page) page.wait_for_timeout(int(random.uniform(1500, 2400))) current = comment_count_from_raw(raw_api) current_dom = dom_comment_item_count(page) log( f" 评论滚动 {round_idx + 1}/{args.comment_scrolls}: " f"API评论累计 {current}, DOM评论项 {current_dom}" ) if comments_end_reached(page): log(" 页面提示暂时没有更多评论,停止滚动。") break if current > previous or current_dom > previous_dom: previous = current previous_dom = current_dom stall = 0 else: stall += 1 if stall >= args.stall_rounds: break dom_comments.extend(collect_dom_comments(page, href or page.url)) after_comments = comment_count_from_raw(raw_api) log( f"第 {idx + 1} 个视频结束: 新增 API 评论 " f"{after_comments - before_comments}" ) except Exception as exc: log(f"第 {idx + 1} 个视频深采异常: {exc}") finally: try: if forced_candidates: continue page.goto(search_url, wait_until="domcontentloaded", timeout=45000) page.wait_for_timeout(int(random.uniform(1500, 2300))) except Exception: pass summary = finalize_run( out_dir, raw_api, api_events, dom_comments, args, keyword, logged_in ) if args.keep_open_seconds > 0 and not args.headless and not args.leave_open: log(f"保留浏览器 {args.keep_open_seconds}s,方便最后查看页面状态。") page.wait_for_timeout(args.keep_open_seconds * 1000) if args.leave_open and not args.headless: log("已保存 CSV/JSONL;Chrome 将保持打开。终端按 Ctrl+C 才会关闭浏览器。") try: while True: page.wait_for_timeout(60000) except KeyboardInterrupt: log("收到 Ctrl+C,准备关闭浏览器。") ctx.close() return summary def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Run a visible Douyin collection probe and save CSV diagnostics." ) parser.add_argument("--name", default="老凯里酸汤鱼", help="实体名称") parser.add_argument("--keyword", default="", help="搜索关键词,默认“贵阳 {name}”") parser.add_argument("--video-url", default="", help="跳过搜索,直接诊断单个视频 URL") parser.add_argument( "--place-key", default="place-lao-kaili-sourfish", help="写入 CSV 的 place_natural_key/eid", ) parser.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR), help="输出根目录") parser.add_argument( "--profile-dir", default=str(DEFAULT_PROFILE_DIR), help="抖音持久化登录 profile 目录", ) parser.add_argument( "--browser-channel", default="chrome", help="Playwright channel,默认 chrome;失败会回退 chromium", ) parser.add_argument("--headless", action="store_true", help="改为无头运行") parser.add_argument("--width", type=int, default=1440) parser.add_argument("--height", type=int, default=900) parser.add_argument("--slow-ms", type=int, default=220, help="可视化慢动作毫秒") parser.add_argument("--search-wait-ms", type=int, default=4500) parser.add_argument("--search-api-wait-ms", type=int, default=12000) parser.add_argument("--detail-wait-ms", type=int, default=3800) parser.add_argument("--fullscreen-wait-ms", type=int, default=1800) parser.add_argument("--comment-wait-ms", type=int, default=3500) parser.add_argument("--search-scrolls", type=int, default=6) parser.add_argument("--max-search-links", type=int, default=40) parser.add_argument("--max-notes", type=int, default=3) parser.add_argument("--comment-scrolls", type=int, default=30) parser.add_argument("--stall-rounds", type=int, default=4) parser.add_argument("--keep-open-seconds", type=int, default=12) parser.add_argument( "--skip-fullscreen", action="store_true", help="不点击播放器全屏按钮,直接在普通详情页尝试评论区", ) parser.add_argument( "--leave-open", action="store_true", help="采集结束后不关闭 Chrome,保持 Python 进程等待 Ctrl+C", ) return parser def main() -> int: args = build_parser().parse_args() try: summary = run_probe(args) except KeyboardInterrupt: log("用户中断。") return 130 except Exception as exc: log(f"运行失败: {exc}") return 1 print(json.dumps(summary, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())