"""Capture Baidu Baike rendered pages as auditable full-page Markdown. This stricter crawler is for schema-building evidence. It does not accept a short text fallback as success: the saved Markdown always includes a completeness report comparing the page catalog with extracted body headings. """ from __future__ import annotations import argparse import json import random import re import sys import time from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from urllib.parse import quote, urljoin from bs4 import BeautifulSoup ROOT = Path(__file__).resolve().parents[1] DEFAULT_OUT_DIR = ROOT / "schema搭建" / "baidu_fullpage_md" DEFAULT_PROFILE_DIR = ROOT / "data" / "browser_profiles" / "baike_fullpage" DEFAULT_GUIZHOU_BATCH_DIR = ROOT / "schema搭建" / "city_poi_schema_v0_1" / "baidu_fullpage_guizhou_scenic" BAD_LINE_TOKENS = [ "百度首页", "登录", "注册", "打开APP", "百度百科合作平台", "使用百度前必读", "百科协议", "隐私政策", "京ICP", "营业执照", "投诉建议", "词条统计", "分享你的世界", "查看更多", "上传视频", "免责声明", ] CATALOG_NOISE = { "首页", "帮助", "秒懂百科", "特色百科", "知识专题", "加入百科", "百科团队", "权威合作", "播报", "编辑", "讨论", "收藏", "赞", } BASIC_INFO_NOISE_KEYS = CATALOG_NOISE | { "网页", "新闻", "贴吧", "知道", "网盘", "图片", "视频", "地图", "文库", "资讯", "采购", "国际版", } ANTI_CRAWL_TOKENS = [ "百度安全验证", "验证码", "系统检测到异常", "captcha", "anticrawl", ] @dataclass class PageQuality: status: str reason: str page_title: str final_url: str markdown_chars: int catalog_count: int body_heading_count: int matched_catalog_count: int catalog_coverage: float missing_catalog_headings: list[str] anti_crawl: bool media_count: int @dataclass class MediaItem: kind: str section: str caption: str alt: str src: str href: str asset_id: str width: str height: str def compact(text: str) -> str: return re.sub(r"\s+", " ", text or "").strip() def now_iso() -> str: return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") def baike_url(name: str) -> str: return f"https://baike.baidu.com/item/{quote(name)}" GUIZHOU_SCENIC_SPOTS = [ {"name": "黄果树瀑布"}, {"name": "小七孔风景区"}, {"name": "梵净山"}, {"name": "西江千户苗寨"}, {"name": "青岩古镇"}, {"name": "镇远古城"}, {"name": "肇兴侗寨"}, {"name": "万峰林"}, {"name": "马岭河峡谷"}, {"name": "织金洞"}, {"name": "百里杜鹃风景名胜区"}, {"name": "赤水丹霞"}, {"name": "龙宫风景区"}, {"name": "遵义会议会址"}, {"name": "甲秀楼"}, {"name": "黔灵山公园"}, { "name": "花溪公园", "url": "https://baike.baidu.com/item/%E8%8A%B1%E6%BA%AA%E5%85%AC%E5%9B%AD/112398?fromModule=lemma_search-box", }, {"name": "天河潭"}, {"name": "南江大峡谷"}, {"name": "乌蒙大草原"}, ] def slugify(text: str) -> str: return re.sub(r"[\\/:*?\"<>|\\s]+", "_", text).strip("_") or "baike_page" def clean_inline(text: str) -> str: text = compact(text) text = re.sub(r"\[(?:\d+|编辑)\]", "", text) text = re.sub(r"\s*播报\s*编辑\s*$", "", text) text = text.replace("\xa0", " ") return compact(text) def clean_heading(text: str) -> str: text = clean_inline(text) return text.strip("# ") def clean_catalog_heading(text: str) -> str: text = clean_heading(text) # Strip catalog ordinals such as "1历史沿革", but keep real names like # "68级跌水瀑布". text = re.sub(r"^\d{1,2}\s*(?!级)(?=[\u4e00-\u9fa5A-Za-z])", "", text) return text def norm_heading(text: str) -> str: text = clean_heading(text).lower() text = re.sub(r"^[一二三四五六七八九十百千万]+[、..\s]*", "", text) text = re.sub(r"^\d{1,2}\s*(?!级)(?=[\u4e00-\u9fa5A-Za-z])", "", text) return re.sub(r"[\s#::、,,。.;;()()\[\]【】·\-_/]+", "", text) def is_bad_line(text: str) -> bool: text = clean_inline(text) if not text or len(text) <= 1: return True if any(token in text for token in BAD_LINE_TOKENS): return True if len(text) > 700 and (text.startswith("{") or text.startswith("[")): return True return False def dedupe_keep_order(items: list[str]) -> list[str]: out: list[str] = [] seen = set() for item in items: key = norm_heading(item) or item if key and key not in seen: seen.add(key) out.append(item) return out def anti_crawl_detected(html: str, final_url: str, title: str) -> bool: if "anticrawl" in final_url or "captcha" in final_url: return True soup = BeautifulSoup(html, "html.parser") text = compact(" ".join([title, soup.get_text(" ", strip=True)[:3000]])) return any(token in text for token in ANTI_CRAWL_TOKENS) def chrome_runtime_options() -> tuple[str, list[str], str]: sys.path.insert(0, str(ROOT)) try: from app.agents.web_agent import _CHROME_ARGS, _STEALTH_JS, _UA except Exception: _UA = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36" ) _CHROME_ARGS = [ "--disable-blink-features=AutomationControlled", "--no-first-run", "--no-default-browser-check", "--disable-sync", "--disable-default-apps", "--no-sandbox", "--disable-dev-shm-usage", ] _STEALTH_JS = "Object.defineProperty(navigator,'webdriver',{get:()=>undefined});" return _UA, list(_CHROME_ARGS), _STEALTH_JS def scroll_to_render(page, max_rounds: int = 16) -> None: last_height = 0 stable_rounds = 0 for _ in range(max_rounds): height = page.evaluate("() => document.documentElement.scrollHeight") if height == last_height: stable_rounds += 1 else: stable_rounds = 0 if stable_rounds >= 3: break last_height = height page.mouse.wheel(0, random.randint(1100, 1900)) page.wait_for_timeout(random.randint(220, 520)) page.evaluate("() => window.scrollTo(0, 0)") page.wait_for_timeout(300) def fetch_rendered_page( url: str, *, profile_dir: Path, headful: bool, manual_seconds: int, timeout_ms: int, ) -> tuple[str, str, str]: from playwright.sync_api import sync_playwright ua, chrome_args, stealth_js = chrome_runtime_options() profile_dir.mkdir(parents=True, exist_ok=True) with sync_playwright() as p: ctx = p.chromium.launch_persistent_context( str(profile_dir), headless=not headful, args=chrome_args, ignore_default_args=["--enable-automation"], user_agent=ua, locale="zh-CN", viewport=random.choice( [ {"width": 1440, "height": 1100}, {"width": 1366, "height": 900}, {"width": 1600, "height": 1100}, ] ), extra_http_headers={ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Referer": "https://www.baidu.com/", }, ) ctx.add_init_script(stealth_js) page = ctx.pages[0] if ctx.pages else ctx.new_page() page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded") page.wait_for_timeout(random.randint(1000, 1800)) if manual_seconds > 0: deadline = time.time() + manual_seconds while time.time() < deadline: html = page.content() if not anti_crawl_detected(html, page.url, page.title() or ""): break remaining = int(deadline - time.time()) print(f"[manual] Baidu verification page detected, waiting {remaining}s...", flush=True) page.wait_for_timeout(2500) scroll_to_render(page) final_url = page.url title = page.title() or "" html = page.content() ctx.close() return html, final_url, title def table_to_markdown(table) -> list[str]: rows: list[list[str]] = [] for tr in table.find_all("tr"): cells = [clean_inline(c.get_text(" ", strip=True)).replace("|", "/") for c in tr.find_all(["th", "td"])] cells = [c for c in cells if c] if cells: rows.append(cells[:8]) if not rows: return [] width = max(len(row) for row in rows) rows = [row + [""] * (width - len(row)) for row in rows] lines = ["| " + " | ".join(rows[0]) + " |"] lines.append("| " + " | ".join(["---"] * width) + " |") for row in rows[1:]: lines.append("| " + " | ".join(row) + " |") return lines def find_content_root(soup: BeautifulSoup): for selector in ( "div[class*='mainContent']", "div[id='J-lemma-main-wrapper']", "div[class*='lemmaWrapper']", "div.J-lemma-content", "div[class*='lemma-content']", "main", "article", ): root = soup.select_one(selector) if root and len(clean_inline(root.get_text(" ", strip=True))) > 600: return root return soup.body or soup def extract_catalog(soup: BeautifulSoup) -> list[str]: candidates: list[str] = [] containers = soup.select( "[class*='catalogWrapper'], [class*='CatalogWrapper'], " "[class*='catalog_'], [class*='Catalog_'], [class*='catalogList'], [class*='CatalogList']" ) for container in containers: for link in container.find_all("a"): text = clean_catalog_heading(link.get_text(" ", strip=True)) if not text: continue if text in CATALOG_NOISE or is_bad_line(text): continue if len(text) <= 40: candidates.append(text) if not candidates: root = find_content_root(soup) for link in root.select("a[href^='#']"): text = clean_catalog_heading(link.get_text(" ", strip=True)) if text and text not in CATALOG_NOISE and len(text) <= 40 and not is_bad_line(text): candidates.append(text) return dedupe_keep_order(candidates) def is_basic_info_pair(key: str, val: str) -> bool: key = clean_inline(key).rstrip("::") val = clean_inline(val) key_norm = re.sub(r"\s+", "", key) if not key or not val: return False if key_norm in BASIC_INFO_NOISE_KEYS or key in BASIC_INFO_NOISE_KEYS: return False if is_bad_line(key + val): return False if len(key) > 24 or len(val) > 300: return False return True def extract_basic_info(soup: BeautifulSoup) -> list[tuple[str, str]]: pairs: list[tuple[str, str]] = [] def add_pair(key: str, val: str) -> None: key = clean_inline(key).rstrip("::") val = clean_inline(val) if not is_basic_info_pair(key, val): return key_norm = re.sub(r"\s+", "", key) for idx, (old_key, old_val) in enumerate(pairs): old_key_norm = re.sub(r"\s+", "", old_key) if old_key_norm == key_norm: if len(val) > len(old_val): pairs[idx] = (key, val) return pairs.append((key, val)) names = soup.select(".basicInfo-item.name") values = soup.select(".basicInfo-item.value") for name, value in zip(names, values, strict=False): key = clean_inline(name.get_text(" ", strip=True)).rstrip("::") val = clean_inline(value.get_text(" ", strip=True)) add_pair(key, val) for dt in soup.find_all("dt"): dd = dt.find_next_sibling("dd") key = clean_inline(dt.get_text(" ", strip=True)).rstrip("::") val = clean_inline(dd.get_text(" ", strip=True)) if dd else "" in_basic_info = bool(dt.find_parent(class_=re.compile(r"basicInfo|BasicInfo|J-basic-info"))) if in_basic_info: add_pair(key, val) if not pairs: containers = soup.select("[class*='basicInfo'], [class*='BasicInfo'], .J-basic-info") for container in containers: tokens = [ clean_inline(token) for token in container.get_text("\n", strip=True).splitlines() if clean_inline(token) ] i = 0 while i < len(tokens) - 1: key = tokens[i].rstrip("::") val = tokens[i + 1] before_count = len(pairs) add_pair(key, val) if len(pairs) > before_count: i += 2 else: i += 1 return pairs def extract_summary(soup: BeautifulSoup) -> list[str]: root_candidates = soup.select(".J-summary, [class*='lemmaSummary'], [class*='summary_']") lines: list[str] = [] seen = set() for root in root_candidates: for tag in root.find_all(["p", "div"], recursive=True): text = clean_inline(tag.get_text(" ", strip=True)) if len(text) < 20 or is_bad_line(text): continue key = norm_heading(text) if key not in seen: seen.add(key) lines.append(text) return lines def tag_classes(tag) -> str: return " ".join(tag.get("class") or []) def block_kind(tag) -> str: classes = tag_classes(tag) tag_id = tag.get("id") or "" if "lemmaReference" in classes or "reference" in classes.lower() or tag_id == "J-lemma-reference": return "stop" if tag.name in {"h1", "h2", "h3", "h4"}: return "heading" if tag.name == "table": return "table" if tag.name == "li": return "list" if tag.name == "p": return "paragraph" if tag.name != "div": return "" if "paraTitle" in classes or re.search(r"\blevel-\d", classes): return "heading" lower = classes.lower() if "caption" in lower: return "caption" if "para" in classes and ("content" in classes or "summary" in classes or "MARK_MODULE" in classes): return "paragraph" return "" def heading_level(tag) -> str: classes = tag_classes(tag) if tag.name == "h1": return "##" if tag.name == "h2" or "level-1" in classes: return "###" if tag.name == "h3" or "level-2" in classes: return "####" if tag.name == "h4" or "level-3" in classes: return "#####" return "###" def media_src(tag, final_url: str) -> str: attrs = [ "src", "data-src", "data-original", "data-lazy-src", "data-url", "poster", ] for attr in attrs: value = clean_inline(tag.get(attr) or "") if value and not value.startswith("data:"): return urljoin(final_url, value) source = tag.find("source") if hasattr(tag, "find") else None if source: value = clean_inline(source.get("src") or "") if value and not value.startswith("data:"): return urljoin(final_url, value) return "" def media_caption(tag) -> str: for parent in [tag, *list(tag.parents)[:5]]: if not parent: continue for selector in ( "[class*='swiperDesc']", "[class*='imageCaption']", "[class*='picDesc']", "[class*='caption']", "[class*='desc']", "figcaption", ): found = parent.select_one(selector) if hasattr(parent, "select_one") else None if found: text = clean_inline(found.get_text(" ", strip=True)) if 1 < len(text) <= 120 and not is_bad_line(text): return text for attr in ("alt", "title", "aria-label"): text = clean_inline(tag.get(attr) or "") if 1 < len(text) <= 120 and not is_bad_line(text): return text return "" def text_without_media(tag) -> str: clone = BeautifulSoup(str(tag), "html.parser") root = clone.find() media_containers = [] for media in clone.find_all(["img", "video", "source"]): container = media for parent in media.parents: if parent is clone or parent == root: break if parent.find(["img", "video"]): container = parent if container not in media_containers: media_containers.append(container) for container in media_containers: container.decompose() for selector in ( "img", "video", "source", "[class*='swiperUl']", "[class*='swiperLi']", "[class*='swiperDesc']", "[class*='SwiperDesc']", "[class*='lemmaPicture']", "[class*='LemmaPicture']", "[class*='imageCaption']", "[class*='ImageCaption']", "[class*='picDesc']", "[class*='PicDesc']", "[class*='caption']", "[class*='Caption']", "[class*='desc']", "[class*='Desc']", ): for found in clone.select(selector): found.decompose() return clean_inline(clone.get_text(" ", strip=True)) def is_media_noise(item: MediaItem) -> bool: text = f"{item.caption} {item.alt}" if not item.src: return True if not item.caption and not item.alt: return True if any(token in text for token in ("订阅", "收藏", "赞", "播放", "编辑")): return True if "subscribe" in item.src or "front-end/swanapp-baike" in item.src: return True if item.section in {"相关星图", "词条统计"}: return True return False def extract_media_items(soup: BeautifulSoup, final_url: str) -> list[MediaItem]: root = soup.select_one("div.J-lemma-content") or soup.select_one("div[class*='lemma-content']") or find_content_root(soup) items: list[MediaItem] = [] seen = set() current_section = "" for tag in root.find_all(["h1", "h2", "h3", "h4", "div", "img", "video"], recursive=True): kind = block_kind(tag) if kind == "stop": break if kind == "heading": heading = clean_heading(tag.get_text(" ", strip=True)) if heading and not is_bad_line(heading): current_section = heading continue if tag.name not in {"img", "video"}: continue src = media_src(tag, final_url) caption = media_caption(tag) alt = clean_inline(tag.get("alt") or tag.get("title") or "") parent_link = tag.find_parent("a") href = urljoin(final_url, parent_link.get("href")) if parent_link and parent_link.get("href") else "" asset_id = clean_inline(parent_link.get("id") or tag.get("id") or "") if parent_link or tag.get("id") else "" item = MediaItem( kind="video" if tag.name == "video" else "image", section=current_section, caption=caption or alt, alt=alt, src=src, href=href, asset_id=asset_id, width=clean_inline(tag.get("width") or ""), height=clean_inline(tag.get("height") or ""), ) key = (item.kind, item.section, item.caption, item.src) if key in seen or is_media_noise(item): continue seen.add(key) items.append(item) return items def extract_body_blocks(soup: BeautifulSoup) -> tuple[list[str], list[str], int, int]: root = soup.select_one("div.J-lemma-content") or soup.select_one("div[class*='lemma-content']") or find_content_root(soup) lines: list[str] = [] headings: list[str] = [] seen_blocks = set() paragraph_count = 0 table_count = 0 for tag in root.find_all(["h1", "h2", "h3", "h4", "p", "li", "table", "div"], recursive=True): kind = block_kind(tag) if kind == "stop": break if not kind: continue if kind == "table": table_lines = table_to_markdown(tag) if table_lines: key = "\n".join(table_lines) if key not in seen_blocks: seen_blocks.add(key) lines.extend(table_lines) lines.append("") table_count += 1 continue if kind == "caption": continue text = text_without_media(tag) if kind in {"paragraph", "list"} else clean_inline(tag.get_text(" ", strip=True)) if is_bad_line(text): continue if any(text.startswith(prefix) for prefix in ("参考资料", "词条标签", "开放分类")): break if kind == "heading": text = clean_heading(text) key = norm_heading(text) if not key or key in seen_blocks: continue seen_blocks.add(key) headings.append(text) lines.extend([f"{heading_level(tag)} {text}", ""]) continue if kind == "list": if len(text) > 240: continue rendered = f"- {text}" else: rendered = text key = norm_heading(rendered) if not key or key in seen_blocks: continue seen_blocks.add(key) lines.extend([rendered, ""]) paragraph_count += 1 return lines, headings, paragraph_count, table_count def match_headings(catalog: list[str], body_headings: list[str]) -> tuple[list[str], list[str]]: matched: list[str] = [] missing: list[str] = [] body_norms = [norm_heading(item) for item in body_headings] for item in catalog: item_norm = norm_heading(item) if not item_norm: continue ok = any(item_norm == body or item_norm in body or body in item_norm for body in body_norms if body) if ok: matched.append(item) else: missing.append(item) return matched, missing def build_markdown( *, html: str, source_name: str, query_name: str, requested_url: str, final_url: str, browser_title: str, min_catalog_coverage: float, min_chars: int, ) -> tuple[str, PageQuality, list[MediaItem]]: soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style", "noscript", "iframe", "canvas", "svg", "button", "input", "form"]): tag.decompose() page_title = source_name h1 = soup.select_one("h1") if h1: page_title = clean_heading(h1.get_text(" ", strip=True)) or page_title elif browser_title: page_title = clean_heading(browser_title.split("_百度百科", 1)[0]) or page_title anti_crawl = anti_crawl_detected(html, final_url, browser_title) basic_info = extract_basic_info(soup) catalog = extract_catalog(soup) summary = extract_summary(soup) media_items = extract_media_items(soup, final_url) body_lines, body_headings, paragraph_count, table_count = extract_body_blocks(soup) matched_catalog, missing_catalog = match_headings(catalog, body_headings) catalog_coverage = (len(matched_catalog) / len(catalog)) if catalog else 0.0 reasons: list[str] = [] if anti_crawl: reasons.append("hit Baidu anti-crawl/verification page") if len(body_lines) == 0: reasons.append("no body blocks extracted") if catalog and catalog_coverage < min_catalog_coverage: reasons.append(f"catalog coverage {catalog_coverage:.0%} < {min_catalog_coverage:.0%}") crawl_time = now_iso() lines = [ f"# {page_title or source_name}", "", "## 完整度检查", "", "- 状态:PENDING", f"- 页面目录项:{len(catalog)}", f"- 正文标题数:{len(body_headings)}", f"- 目录覆盖率:{catalog_coverage:.0%} ({len(matched_catalog)}/{len(catalog) if catalog else 0})", f"- 段落/列表:{paragraph_count}", f"- 表格:{table_count}", f"- 反爬/验证页:{'是' if anti_crawl else '否'}", ] if missing_catalog: lines.append("- 缺失目录项:" + "、".join(missing_catalog[:30])) lines.append("") lines.extend( [ "## 页面正文 Markdown", "", "### 抓取信息(非原页面正文)", "", "- 数据源:百度百科", f"- 请求词条:{query_name}", f"- 页面标题:{page_title}", f"- 请求 URL:{requested_url}", f"- 最终 URL:{final_url}", f"- 抓取时间:{crawl_time}", "- 转换方式:rendered DOM fullpage + catalog validation", "", ] ) if basic_info: lines.extend(["### 基本信息", "", "| 字段 | 值 |", "| --- | --- |"]) for key, val in basic_info: lines.append(f"| {key.replace('|', '/')} | {val.replace('|', '/')} |") lines.append("") if summary: lines.extend(["### 摘要", ""]) for item in summary: lines.extend([item, ""]) if catalog: lines.extend(["### 页面目录", ""]) for item in catalog: lines.append(f"- {item}") lines.append("") if media_items: lines.extend( [ "### 媒体证据(图片/视频)", "", "| 所属章节 | 类型 | 说明 | URL |", "| --- | --- | --- | --- |", ] ) for item in media_items: label = item.caption or item.alt url = item.href or item.src lines.append( f"| {item.section.replace('|', '/')} | {item.kind} | " f"{label.replace('|', '/')} | {url.replace('|', '/')} |" ) lines.append("") lines.extend(body_lines) markdown_without_status = "\n".join(lines).strip() + "\n" markdown_chars = len(markdown_without_status) if markdown_chars < min_chars: reasons.append(f"markdown chars {markdown_chars} < {min_chars}") status = "OK" if not reasons else "INCOMPLETE" reason = "ok" if not reasons else "; ".join(reasons) markdown = markdown_without_status.replace("- 状态:PENDING", f"- 状态:{status}", 1) quality = PageQuality( status=status, reason=reason, page_title=page_title, final_url=final_url, markdown_chars=len(markdown), catalog_count=len(catalog), body_heading_count=len(body_headings), matched_catalog_count=len(matched_catalog), catalog_coverage=round(catalog_coverage, 4), missing_catalog_headings=missing_catalog, anti_crawl=anti_crawl, media_count=len(media_items), ) return markdown, quality, media_items def write_outputs( markdown: str, quality: PageQuality, media_items: list[MediaItem], out_dir: Path, filename: str, force: bool, write_json: bool, ) -> Path: out_dir.mkdir(parents=True, exist_ok=True) md_path = out_dir / filename if md_path.exists() and not force: raise FileExistsError(f"Output exists, use --force: {md_path}") md_path.write_text(markdown, encoding="utf-8") quality_path = md_path.with_suffix(".quality.json") media_path = md_path.with_suffix(".media.json") if write_json: quality_path.write_text( json.dumps(asdict(quality), ensure_ascii=False, indent=2), encoding="utf-8", ) media_path.write_text( json.dumps([asdict(item) for item in media_items], ensure_ascii=False, indent=2), encoding="utf-8", ) else: for path in (quality_path, media_path): if path.exists(): path.unlink() return md_path def crawl_fullpage(args: argparse.Namespace) -> tuple[Path, PageQuality]: if not args.name and not args.url: raise SystemExit("use --name or --url") query_name = args.name or args.url requested_url = args.url or baike_url(args.name) source_name = args.name or clean_heading(Path(requested_url).name) html, final_url, browser_title = fetch_rendered_page( requested_url, profile_dir=Path(args.profile_dir), headful=args.headful, manual_seconds=max(0, args.manual_seconds), timeout_ms=args.timeout_ms, ) markdown, quality, media_items = build_markdown( html=html, source_name=source_name, query_name=query_name, requested_url=requested_url, final_url=final_url, browser_title=browser_title, min_catalog_coverage=args.min_catalog_coverage, min_chars=args.min_chars, ) filename = args.output_name or f"{slugify(source_name)}.md" path = write_outputs(markdown, quality, media_items, Path(args.out_dir), filename, args.force, args.write_json) return path, quality def write_batch_index(rows: list[dict], out_dir: Path) -> None: out_dir.mkdir(parents=True, exist_ok=True) lines = [ "# 贵州景点百度百科 Markdown 抓取清单", "", f"- 生成时间:{now_iso()}", f"- 文件数:{len(rows)}", f"- 成功:{sum(1 for row in rows if row.get('status') == 'OK')}", f"- 需复核:{sum(1 for row in rows if row.get('status') != 'OK')}", "- 说明:完整度只用页面自身目录与正文标题校验,不预设业务目录。", "", "| # | 名称 | 页面标题 | Markdown | 目录覆盖 | 字符 | 媒体 | 状态 |", "| ---: | --- | --- | --- | ---: | ---: | ---: | --- |", ] for idx, row in enumerate(rows, 1): md_name = Path(row["path"]).name if row.get("path") else "" md_link = f"[{md_name}](./{md_name})" if md_name else "-" lines.append( f"| {idx} | {row.get('name', '')} | {row.get('page_title', '')} | " f"{md_link} | {row.get('catalog_matched', 0)}/{row.get('catalog_count', 0)} | " f"{row.get('chars', 0)} | {row.get('media_count', 0)} | {row.get('status', '')} |" ) (out_dir / "index.md").write_text("\n".join(lines) + "\n", encoding="utf-8") def crawl_guizhou_batch(args: argparse.Namespace) -> list[dict]: out_dir = Path(args.out_dir) if args.out_dir != str(DEFAULT_OUT_DIR) else DEFAULT_GUIZHOU_BATCH_DIR out_dir.mkdir(parents=True, exist_ok=True) rows: list[dict] = [] limit = args.limit if args.limit and args.limit > 0 else len(GUIZHOU_SCENIC_SPOTS) for idx, entry in enumerate(GUIZHOU_SCENIC_SPOTS[:limit], 1): name = entry["name"] item_args = argparse.Namespace(**vars(args)) item_args.name = name item_args.url = entry.get("url") item_args.out_dir = str(out_dir) item_args.output_name = f"{idx:02d}_{slugify(name)}.md" print(f"[batch] {idx:02d}/{limit} {name}", flush=True) try: path, quality = crawl_fullpage(item_args) rows.append( { "name": name, "path": str(path), "status": quality.status, "page_title": quality.page_title, "catalog_matched": quality.matched_catalog_count, "catalog_count": quality.catalog_count, "chars": quality.markdown_chars, "media_count": quality.media_count, "reason": quality.reason, } ) print( f" [{quality.status}] title={quality.page_title} " f"catalog={quality.matched_catalog_count}/{quality.catalog_count} " f"chars={quality.markdown_chars} media={quality.media_count}", flush=True, ) except Exception as exc: # noqa: BLE001 rows.append( { "name": name, "path": "", "status": "ERROR", "page_title": "", "catalog_matched": 0, "catalog_count": 0, "chars": 0, "media_count": 0, "reason": str(exc)[:200], } ) print(f" [ERROR] {str(exc)[:200]}", flush=True) write_batch_index(rows, out_dir) if idx < limit and args.sleep > 0: time.sleep(args.sleep + random.random() * 0.7) return rows def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--name", help="Baidu Baike entry name") parser.add_argument("--url", help="Exact Baidu Baike URL; prefer item URLs with numeric lemma ID") parser.add_argument("--batch-guizhou", action="store_true", help="Crawl built-in Guizhou scenic-spot sample list") parser.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR)) parser.add_argument("--output-name", help="Markdown filename, for example 02_小七孔风景区_6899702.md") parser.add_argument("--profile-dir", default=str(DEFAULT_PROFILE_DIR)) parser.add_argument("--headful", action="store_true", help="Open a visible browser; useful when Baidu asks for verification") parser.add_argument("--manual-seconds", type=int, default=0, help="Wait this many seconds for manual verification if needed") parser.add_argument("--timeout-ms", type=int, default=60000) parser.add_argument("--min-catalog-coverage", type=float, default=0.9) parser.add_argument("--min-chars", type=int, default=2000) parser.add_argument("--limit", type=int, help="Batch mode: crawl only first N items") parser.add_argument("--sleep", type=float, default=1.0, help="Batch mode: delay between pages") parser.add_argument("--write-json", action="store_true", help="Also write .quality.json and .media.json sidecar files") parser.add_argument("--force", action="store_true") parser.add_argument("--strict", action="store_true", help="Exit 2 when quality status is not OK") args = parser.parse_args() if args.batch_guizhou: rows = crawl_guizhou_batch(args) bad = [row for row in rows if row.get("status") != "OK"] print(f"[done] {DEFAULT_GUIZHOU_BATCH_DIR if args.out_dir == str(DEFAULT_OUT_DIR) else args.out_dir}", flush=True) return 2 if args.strict and bad else 0 path, quality = crawl_fullpage(args) print( f"[{quality.status}] file={path} chars={quality.markdown_chars} " f"catalog={quality.matched_catalog_count}/{quality.catalog_count} " f"headings={quality.body_heading_count} reason={quality.reason}", flush=True, ) return 2 if args.strict and quality.status != "OK" else 0 if __name__ == "__main__": raise SystemExit(main())