"""Render Baidu Baike pages and save clean Markdown evidence files. This is the "web page -> Markdown -> extraction" bridge. It keeps the rendered page structure that matters for schema work: source metadata, basic-info pairs, headings, paragraphs, lists, and tables. """ from __future__ import annotations import argparse import json import random import re import sys import time from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from urllib.parse import quote from bs4 import BeautifulSoup ROOT = Path(__file__).resolve().parents[1] OUT_DIR = ROOT / "schema搭建" / "baidu_baike_md_data" SCENIC_SPOTS = [ "黄果树瀑布", "荔波小七孔景区", "梵净山", "西江千户苗寨", "青岩古镇", "镇远古城", "肇兴侗寨", "万峰林", "马岭河峡谷", "织金洞", "百里杜鹃风景名胜区", "赤水丹霞", "龙宫风景区", "遵义会议会址", "甲秀楼", "黔灵山公园", "花溪公园", "天河潭", "南江大峡谷", "乌蒙大草原", ] BAIKE_QUERY_ALIASES = { "黄果树瀑布": ["黄果树大瀑布", "黄果树风景名胜区", "安顺市黄果树大瀑布景区"], "荔波小七孔景区": ["荔波樟江风景名胜区", "小七孔"], "西江千户苗寨": ["西江千户苗寨景区", "雷山县西江千户苗寨景区"], "青岩古镇": ["贵阳市青岩古镇景区", "青岩古镇景区"], "镇远古城": ["黔东南苗族侗族自治州镇远古城旅游景区"], "肇兴侗寨": ["黎平县肇兴侗寨景区", "肇兴侗寨景区"], "万峰林": ["万峰林景区", "兴义万峰林"], "马岭河峡谷": ["马岭河峡谷风景名胜区", "马岭河峡谷景区"], "织金洞": ["织金洞风景名胜区", "毕节织金洞"], "百里杜鹃风景名胜区": ["百里杜鹃", "百里杜鹃风景区", "百里杜鹃景区", "贵州百里杜鹃风景名胜区"], "赤水丹霞": ["赤水丹霞旅游区", "赤水丹霞国家地质公园"], "遵义会议会址": ["遵义会议会址景区"], "甲秀楼": ["贵阳甲秀楼"], "花溪公园": ["贵阳市花溪公园"], "天河潭": ["天河潭旅游度假区", "天河潭风景区"], "乌蒙大草原": ["乌蒙大草原景区", "盘州乌蒙大草原"], } SCHEMA_FIELD_HINTS = { "中文名", "外文名", "地理位置", "气候条件", "开放时间", "景点级别", "门票价格", "占地面积", "著名景点", "建议游玩时长", "适宜游玩季节", "所属国家", "所属城市", "保护级别", "主要景观", "最佳旅游时间", "海拔", "管理单位", "别名", "类型", } BAD_LINE_TOKENS = [ "百度首页", "登录", "注册", "打开APP", "秒懂百科", "百度百科合作平台", "使用百度前必读", "百科协议", "隐私政策", "©", "京ICP", "营业执照", "投诉建议", "词条统计", "分享你的世界", "相关星图", "查看更多", "上传视频", ] ANTI_CRAWL_TOKENS = [ "百度安全验证", "验证码", "网络不给力", "系统检测到异常", "captcha", "anticrawl", ] @dataclass class MarkdownPage: name: str query_name: str requested_url: str final_url: str title: str markdown_file: str markdown_chars: int paragraph_count: int heading_count: int basic_info_count: int table_count: int error: str = "" def _import_web_agent_constants(): sys.path.insert(0, str(ROOT)) try: from app.agents.web_agent import _CHROME_ARGS, _STEALTH_JS, _UA except Exception: _UA = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) _CHROME_ARGS = [ "--disable-blink-features=AutomationControlled", "--no-first-run", "--no-default-browser-check", "--disable-sync", "--disable-default-apps", "--no-sandbox", "--disable-dev-shm-usage", ] _STEALTH_JS = "Object.defineProperty(navigator,'webdriver',{get:()=>undefined});" return _UA, _CHROME_ARGS, _STEALTH_JS def compact(text: str) -> str: return re.sub(r"\s+", " ", text or "").strip() def slugify(name: str, idx: int | None = None) -> str: safe = re.sub(r"[\\/:*?\"<>|\\s]+", "_", name).strip("_") if idx is None: return safe or "baike_page" return f"{idx:02d}_{safe or 'baike_page'}" def baike_url(name: str) -> str: return f"https://baike.baidu.com/item/{quote(name)}" def query_terms(name: str) -> list[str]: terms = [name, *BAIKE_QUERY_ALIASES.get(name, [])] out = [] for term in terms: if term not in out: out.append(term) return out def is_bad_line(text: str) -> bool: text = compact(text) if not text: return True if any(token in text for token in BAD_LINE_TOKENS): return True if len(text) <= 1: return True if text == "目录" or re.match(r"^\d+\s+[\u4e00-\u9fa5A-Za-z]", text): return True if len(text) > 500 and (text.startswith("{") or text.startswith("[")): return True return False def clean_heading(text: str) -> str: text = compact(text) text = re.sub(r"\s*播报\s*编辑\s*$", "", text) return text.strip("# ") def looks_like_anti_crawl(html: str, final_url: str, title: str = "") -> bool: if "anticrawl" in final_url or "captcha" in final_url: return True soup = BeautifulSoup(html, "html.parser") text = compact(" ".join([title, soup.get_text(" ", strip=True)[:3000]])) return any(token in text for token in ANTI_CRAWL_TOKENS) def fetch_rendered_html(url: str, timeout_ms: int = 45000) -> tuple[str, str, str]: from playwright.sync_api import sync_playwright ua, chrome_args, stealth_js = _import_web_agent_constants() with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=chrome_args, ignore_default_args=["--enable-automation"], ) ctx = browser.new_context( user_agent=ua, locale="zh-CN", viewport=random.choice([ {"width": 1440, "height": 900}, {"width": 1366, "height": 768}, {"width": 1600, "height": 1000}, ]), extra_http_headers={ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Referer": "https://www.baidu.com/", }, ) ctx.add_init_script(stealth_js) page = ctx.new_page() page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded") page.wait_for_timeout(random.randint(1200, 2200)) try: page.mouse.wheel(0, random.randint(700, 1500)) page.wait_for_timeout(random.randint(600, 1100)) except Exception: pass final_url = page.url title = page.title() or "" html = page.content() browser.close() return html, final_url, title def extract_basic_info(soup: BeautifulSoup) -> list[tuple[str, str]]: pairs: list[tuple[str, str]] = [] seen = set() names = soup.select(".basicInfo-item.name") values = soup.select(".basicInfo-item.value") for name, value in zip(names, values, strict=False): key = compact(name.get_text(" ", strip=True)).rstrip("::") val = compact(value.get_text(" ", strip=True)) if key not in SCHEMA_FIELD_HINTS: continue if key and val and (key, val) not in seen: seen.add((key, val)) pairs.append((key, val)) for dt in soup.find_all("dt"): dd = dt.find_next_sibling("dd") key = compact(dt.get_text(" ", strip=True)).rstrip("::") val = compact(dd.get_text(" ", strip=True)) if dd else "" if key not in SCHEMA_FIELD_HINTS: continue if key and val and len(key) <= 24 and len(val) <= 260 and not is_bad_line(key + val): item = (key, val) if item not in seen: seen.add(item) pairs.append(item) return pairs def table_to_markdown(table) -> list[str]: rows: list[list[str]] = [] for tr in table.find_all("tr"): cells = [compact(c.get_text(" ", strip=True)).replace("|", "/") for c in tr.find_all(["th", "td"])] cells = [c for c in cells if c] if cells: rows.append(cells[:6]) if not rows: return [] width = max(len(r) for r in rows) rows = [r + [""] * (width - len(r)) for r in rows] lines = ["| " + " | ".join(rows[0]) + " |"] lines.append("| " + " | ".join(["---"] * width) + " |") for row in rows[1:]: lines.append("| " + " | ".join(row) + " |") return lines def pick_content_root(soup: BeautifulSoup): for selector in ( "div.J-lemma-content", "div[class*='lemma-content']", "div[id='J-lemma-main-wrapper'] div[class*='mainContent']", "div[class*='mainContent']", "main", "article", ): root = soup.select_one(selector) if root and len(compact(root.get_text(" ", strip=True))) > 400: return root return soup.body or soup def baike_tag_kind(tag) -> str: """Classify Baidu Baike's old/new DOM nodes into markdown blocks.""" if tag.name in {"h1", "h2", "h3", "h4"}: return "heading" if tag.name == "li": return "list" if tag.name == "p": return "paragraph" if tag.name != "div": return "" classes = " ".join(tag.get("class") or []) tag_id = tag.get("id") or "" if "lemmaReference" in classes or tag_id == "J-lemma-reference": return "stop" if "paraTitle" in classes or re.search(r"\blevel-\d", classes): return "heading" if "para" in classes and ("content" in classes or "MARK_MODULE" in classes): return "paragraph" return "" def html_to_markdown(html: str, source_name: str, query_name: str, requested_url: str, final_url: str) -> tuple[str, dict]: soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style", "noscript", "iframe", "canvas", "svg", "button", "input", "form"]): tag.decompose() for selector in ("nav", "footer", "header", "[class*='navbar']", "[class*='toolbar']", "[class*='share']"): for tag in soup.select(selector): tag.decompose() page_title = clean_heading(soup.select_one("h1").get_text(" ", strip=True)) if soup.select_one("h1") else source_name basic_info = extract_basic_info(soup) root = pick_content_root(soup) now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") lines = [ f"# {source_name}", "", "## 元数据", "", f"- 数据源:百度百科", f"- 请求词条:{query_name}", f"- 页面标题:{page_title}", f"- 请求 URL:{requested_url}", f"- 最终 URL:{final_url}", f"- 抓取时间:{now}", "", ] if basic_info: lines.extend(["## 基本信息", "", "| 字段 | 值 |", "| --- | --- |"]) for key, val in basic_info: lines.append(f"| {key.replace('|', '/')} | {val.replace('|', '/')} |") lines.append("") lines.extend(["## 页面正文 Markdown", ""]) seen_text = set() heading_count = 0 paragraph_count = 0 table_count = 0 fallback_body_used = False stop = False for tag in root.find_all(["h1", "h2", "h3", "h4", "p", "li", "table", "div"], recursive=True): if stop: break if tag.name == "table": md_table = table_to_markdown(tag) if md_table: key = "\n".join(md_table) if key not in seen_text: seen_text.add(key) lines.extend(md_table) lines.append("") table_count += 1 continue kind = baike_tag_kind(tag) if kind == "stop": break if not kind: continue text = compact(tag.get_text(" ", strip=True)) if is_bad_line(text): continue text = re.sub(r"\[(\d+|编辑)\]", "", text).strip() text = re.sub(r"\s*播报\s*编辑\s*$", "", text).strip() if not text or text in seen_text: continue if any(text.startswith(prefix) for prefix in ("参考资料", "词条标签", "免责声明")): stop = True continue seen_text.add(text) if kind == "heading": heading = clean_heading(text) if heading and not is_bad_line(heading): level = {"h1": "##", "h2": "###", "h3": "####", "h4": "#####", "div": "###"}[tag.name] lines.extend([f"{level} {heading}", ""]) heading_count += 1 elif kind == "list": if 2 <= len(text) <= 240: lines.append(f"- {text}") paragraph_count += 1 else: lines.extend([text, ""]) paragraph_count += 1 if paragraph_count == 0: fallback_body_used = True fallback_text_root = soup.body or root for raw_line in fallback_text_root.get_text("\n", strip=True).splitlines(): text = compact(raw_line) text = re.sub(r"\[(\d+|编辑)\]", "", text).strip() text = re.sub(r"\s*播报\s*编辑\s*$", "", text).strip() if is_bad_line(text) or text in seen_text: continue if any(text.startswith(prefix) for prefix in ("参考资料", "词条标签", "免责声明")): break if len(text) < 4: continue seen_text.add(text) if len(text) <= 24 and not re.search(r"[。!?;,,]", text): lines.extend([f"### {clean_heading(text)}", ""]) heading_count += 1 else: lines.extend([text, ""]) paragraph_count += 1 markdown = "\n".join(lines).strip() + "\n" stats = { "page_title": page_title, "basic_info_count": len(basic_info), "heading_count": heading_count, "paragraph_count": paragraph_count, "table_count": table_count, "markdown_chars": len(markdown), "fallback_body_used": fallback_body_used, } return markdown, stats def fetch_existing_baike_text(url: str) -> tuple[str | None, str]: sys.path.insert(0, str(ROOT)) from app.agents.web_agent import fetch_baidu_baike_text return fetch_baidu_baike_text(url) def fetch_existing_baike_text_with_retries(url: str, attempts: int = 3) -> tuple[str | None, str]: last_final = url for attempt in range(1, attempts + 1): raw_text, final_url = fetch_existing_baike_text(url) last_final = final_url or last_final if raw_text and len(raw_text) >= 500 and "百度安全验证" not in raw_text: return raw_text, last_final time.sleep(1.2 * attempt + random.random()) return None, last_final def fetch_crawl4ai_markdown(url: str) -> tuple[str | None, str]: """Use Crawl4AI when it is installed; keep the script runnable without it.""" try: import asyncio from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode, CrawlerRunConfig from crawl4ai.content_filter_strategy import PruningContentFilter from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator except Exception as exc: # noqa: BLE001 return None, f"crawl4ai unavailable: {str(exc)[:160]}" async def _run() -> tuple[str | None, str]: md_generator = DefaultMarkdownGenerator( content_filter=PruningContentFilter(threshold=0.35, threshold_type="fixed") ) browser_conf = BrowserConfig(headless=True) run_conf = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, markdown_generator=md_generator, ) async with AsyncWebCrawler(config=browser_conf) as crawler: result = await crawler.arun(url=url, config=run_conf) md = getattr(result.markdown, "fit_markdown", None) or getattr( result.markdown, "raw_markdown", None ) or str(result.markdown or "") return (md.strip() if md else None), getattr(result, "url", url) or url try: return asyncio.run(_run()) except Exception as exc: # noqa: BLE001 return None, f"crawl4ai failed: {str(exc)[:200]}" def baike_text_to_markdown( raw_text: str, source_name: str, query_name: str, requested_url: str, final_url: str, ) -> tuple[str, dict]: text = compact(raw_text) title = source_name m_title = re.search(r"词条名:(.{1,80}?)(?=\s(?:中文名|外文名|地理位置|正文):|$)", text) if m_title: title = compact(m_title.group(1)) basic_info: list[tuple[str, str]] = [] keys = ["词条名", *sorted(SCHEMA_FIELD_HINTS, key=len, reverse=True)] key_alt = "|".join(re.escape(k) for k in keys) for key in keys: if key == "词条名": continue m = re.search(rf"{re.escape(key)}:(.{{1,260}}?)(?=\s(?:{key_alt}|正文):|$)", text) if not m: continue value = compact(m.group(1)) if "播报" in value or re.match(r"^\d+[\u4e00-\u9fa5]", value): continue if value and not is_bad_line(key + value): basic_info.append((key, value)) body = text.split("正文:", 1)[1] if "正文:" in text else text body = re.sub(r"\[(\d+|编辑)\]", "", body) body = re.sub( r"\s*([\u4e00-\u9fa5A-Za-z0-9·、()()]{2,28})\s+播报\s+编辑\s*", r"\n\n### \1\n\n", body, ) now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") lines = [ f"# {source_name}", "", "## 元数据", "", "- 数据源:百度百科", f"- 请求词条:{query_name}", f"- 页面标题:{title}", f"- 请求 URL:{requested_url}", f"- 最终 URL:{final_url}", f"- 抓取时间:{now}", "- 转换方式:web_agent.fetch_baidu_baike_text fallback", "", ] if basic_info: lines.extend(["## 基本信息", "", "| 字段 | 值 |", "| --- | --- |"]) for key, val in basic_info: lines.append(f"| {key.replace('|', '/')} | {val.replace('|', '/')} |") lines.append("") lines.extend(["## 页面正文 Markdown", ""]) heading_count = 0 paragraph_count = 0 for block in re.split(r"\n{2,}", body): block = compact(block) if not block or is_bad_line(block): continue if block.startswith("### "): heading = clean_heading(block.removeprefix("### ")) if heading and not is_bad_line(heading): lines.extend([f"### {heading}", ""]) heading_count += 1 continue lines.extend([block, ""]) paragraph_count += 1 markdown = "\n".join(lines).strip() + "\n" stats = { "page_title": title, "basic_info_count": len(basic_info), "heading_count": heading_count, "paragraph_count": paragraph_count, "table_count": 0, "markdown_chars": len(markdown), } return markdown, stats def external_markdown_to_markdown( raw_markdown: str, source_name: str, query_name: str, requested_url: str, final_url: str, engine: str, ) -> tuple[str, dict]: raw_markdown = raw_markdown.strip() raw_markdown = "\n".join( line.rstrip() for line in raw_markdown.splitlines() if not any(token in line for token in BAD_LINE_TOKENS) ).strip() now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") lines = [ f"# {source_name}", "", "## 元数据", "", "- 数据源:百度百科", f"- 请求词条:{query_name}", f"- 页面标题:{source_name}", f"- 请求 URL:{requested_url}", f"- 最终 URL:{final_url}", f"- 抓取时间:{now}", f"- 转换方式:{engine}", "", "## 页面正文 Markdown", "", raw_markdown, "", ] markdown = "\n".join(lines).strip() + "\n" heading_count = len(re.findall(r"(?m)^#{1,6}\s+", raw_markdown)) paragraph_count = len([x for x in re.split(r"\n{2,}", raw_markdown) if len(compact(x)) >= 20]) stats = { "page_title": source_name, "basic_info_count": 0, "heading_count": heading_count, "paragraph_count": paragraph_count, "table_count": raw_markdown.count("\n|"), "markdown_chars": len(markdown), } return markdown, stats def stats_good_enough(stats: dict) -> bool: if stats.get("fallback_body_used"): return False return ( ( stats.get("markdown_chars", 0) >= 800 and stats.get("paragraph_count", 0) >= 2 ) or ( stats.get("markdown_chars", 0) >= 600 and stats.get("paragraph_count", 0) >= 4 and stats.get("heading_count", 0) >= 3 ) ) def crawl_one( name: str, out_dir: Path, idx: int | None = None, force: bool = False, attempts: int = 3, ) -> MarkdownPage: out_dir.mkdir(parents=True, exist_ok=True) md_name = f"{slugify(name, idx)}.md" md_path = out_dir / md_name if md_path.exists() and md_path.stat().st_size > 500 and not force: return MarkdownPage( name=name, query_name=name, requested_url=baike_url(name), final_url="cached", title=name, markdown_file=md_name, markdown_chars=md_path.stat().st_size, paragraph_count=0, heading_count=0, basic_info_count=0, table_count=0, ) last_error = "" for attempt in range(1, attempts + 1): for query_name in query_terms(name): requested = baike_url(query_name) try: html, final_url, browser_title = fetch_rendered_html(requested) if looks_like_anti_crawl(html, final_url, browser_title): last_error = f"anticrawl: {final_url}" else: markdown, stats = html_to_markdown(html, name, query_name, requested, final_url) if stats_good_enough(stats): md_path.write_text(markdown, encoding="utf-8") return MarkdownPage( name=name, query_name=query_name, requested_url=requested, final_url=final_url, title=stats["page_title"] or browser_title or name, markdown_file=md_name, markdown_chars=stats["markdown_chars"], paragraph_count=stats["paragraph_count"], heading_count=stats["heading_count"], basic_info_count=stats["basic_info_count"], table_count=stats["table_count"], ) last_error = f"too short: {stats['markdown_chars']} chars from {final_url}" raw_text, text_final_url = fetch_existing_baike_text_with_retries(requested, attempts=2) if raw_text and len(raw_text) >= 500: markdown, stats = baike_text_to_markdown( raw_text, name, query_name, requested, text_final_url or final_url, ) if stats_good_enough(stats): final_url = text_final_url or final_url md_path.write_text(markdown, encoding="utf-8") return MarkdownPage( name=name, query_name=query_name, requested_url=requested, final_url=final_url, title=stats["page_title"] or browser_title or name, markdown_file=md_name, markdown_chars=stats["markdown_chars"], paragraph_count=stats["paragraph_count"], heading_count=stats["heading_count"], basic_info_count=stats["basic_info_count"], table_count=stats["table_count"], ) last_error = f"fallback too short: {stats['markdown_chars']} chars from {text_final_url}" c4_md, c4_final_url = fetch_crawl4ai_markdown(requested) if c4_md and len(c4_md) >= 800: markdown, stats = external_markdown_to_markdown( c4_md, name, query_name, requested, c4_final_url if not c4_final_url.startswith("crawl4ai ") else final_url, "Crawl4AI", ) if stats_good_enough(stats): final_url = c4_final_url if not c4_final_url.startswith("crawl4ai ") else final_url md_path.write_text(markdown, encoding="utf-8") return MarkdownPage( name=name, query_name=query_name, requested_url=requested, final_url=final_url, title=stats["page_title"] or browser_title or name, markdown_file=md_name, markdown_chars=stats["markdown_chars"], paragraph_count=stats["paragraph_count"], heading_count=stats["heading_count"], basic_info_count=stats["basic_info_count"], table_count=stats["table_count"], ) elif c4_final_url and not c4_final_url.startswith("crawl4ai unavailable"): last_error = c4_final_url except Exception as exc: # noqa: BLE001 last_error = str(exc)[:300] time.sleep(1.2 * attempt + random.uniform(0.4, 1.6)) md_path.write_text( f"# {name}\n\n抓取失败:{last_error or 'unknown error'}\n", encoding="utf-8", ) return MarkdownPage( name=name, query_name=name, requested_url=baike_url(name), final_url="", title=name, markdown_file=md_name, markdown_chars=md_path.stat().st_size, paragraph_count=0, heading_count=0, basic_info_count=0, table_count=0, error=last_error or "unknown error", ) def write_manifest(rows: list[MarkdownPage], out_dir: Path) -> None: data = [] for row in rows: item = asdict(row) item["ok"] = not bool(row.error) data.append(item) (out_dir / "manifest.json").write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") lines = [ "# 百度百科 Markdown 抓取清单", "", f"- 生成时间:{datetime.now(timezone.utc).astimezone().isoformat(timespec='seconds')}", f"- 文件数:{len(rows)}", "", "| # | 名称 | Markdown | 字符 | 章节 | 段落/列表 | 基本信息 | 状态 |", "| ---: | --- | --- | ---: | ---: | ---: | ---: | --- |", ] for i, row in enumerate(rows, 1): status = "失败" if row.error else "OK" lines.append( f"| {i} | {row.name} | [{row.markdown_file}](./{row.markdown_file}) | " f"{row.markdown_chars} | {row.heading_count} | {row.paragraph_count} | " f"{row.basic_info_count} | {status} |" ) (out_dir / "index.md").write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--name", help="Baidu Baike entry name") parser.add_argument("--batch-guizhou", action="store_true", help="Crawl built-in 20 Guizhou scenic spots") parser.add_argument("--out-dir", default=str(OUT_DIR)) parser.add_argument("--sleep", type=float, default=1.2) parser.add_argument("--force", action="store_true") parser.add_argument("--attempts", type=int, default=3) args = parser.parse_args() if not args.name and not args.batch_guizhou: parser.error("use --name or --batch-guizhou") out_dir = Path(args.out_dir) names = SCENIC_SPOTS if args.batch_guizhou else [args.name] rows: list[MarkdownPage] = [] for idx, name in enumerate(names, 1): print(f"[crawl] {idx:02d}/{len(names)} {name}", flush=True) row = crawl_one( name, out_dir, idx if args.batch_guizhou else None, force=args.force, attempts=max(1, args.attempts), ) rows.append(row) mark = "FAIL" if row.error else "OK" print( f" [{mark}] chars={row.markdown_chars} headings={row.heading_count} " f"paras={row.paragraph_count} basic={row.basic_info_count} file={row.markdown_file}", flush=True, ) if idx < len(names): time.sleep(args.sleep + random.random()) write_manifest(rows, out_dir) print(f"[done] {out_dir}", flush=True) return 0 if __name__ == "__main__": raise SystemExit(main())