Files
bxh/scripts/baike_fullpage_markdown.py

1036 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Capture Baidu Baike rendered pages as auditable full-page Markdown.
This stricter crawler is for schema-building evidence. It does not accept a
short text fallback as success: the saved Markdown always includes a completeness
report comparing the page catalog with extracted body headings.
"""
from __future__ import annotations
import argparse
import json
import random
import re
import sys
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import quote, urljoin
from bs4 import BeautifulSoup
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_OUT_DIR = ROOT / "schema搭建" / "baidu_fullpage_md"
DEFAULT_PROFILE_DIR = ROOT / "data" / "browser_profiles" / "baike_fullpage"
DEFAULT_GUIZHOU_BATCH_DIR = ROOT / "schema搭建" / "city_poi_schema_v0_1" / "baidu_fullpage_guizhou_scenic"
BAD_LINE_TOKENS = [
"百度首页",
"登录",
"注册",
"打开APP",
"百度百科合作平台",
"使用百度前必读",
"百科协议",
"隐私政策",
"京ICP",
"营业执照",
"投诉建议",
"词条统计",
"分享你的世界",
"查看更多",
"上传视频",
"免责声明",
]
CATALOG_NOISE = {
"首页",
"帮助",
"秒懂百科",
"特色百科",
"知识专题",
"加入百科",
"百科团队",
"权威合作",
"播报",
"编辑",
"讨论",
"收藏",
"",
}
BASIC_INFO_NOISE_KEYS = CATALOG_NOISE | {
"网页",
"新闻",
"贴吧",
"知道",
"网盘",
"图片",
"视频",
"地图",
"文库",
"资讯",
"采购",
"国际版",
}
ANTI_CRAWL_TOKENS = [
"百度安全验证",
"验证码",
"系统检测到异常",
"captcha",
"anticrawl",
]
@dataclass
class PageQuality:
status: str
reason: str
page_title: str
final_url: str
markdown_chars: int
catalog_count: int
body_heading_count: int
matched_catalog_count: int
catalog_coverage: float
missing_catalog_headings: list[str]
anti_crawl: bool
media_count: int
@dataclass
class MediaItem:
kind: str
section: str
caption: str
alt: str
src: str
href: str
asset_id: str
width: str
height: str
def compact(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def now_iso() -> str:
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
def baike_url(name: str) -> str:
return f"https://baike.baidu.com/item/{quote(name)}"
GUIZHOU_SCENIC_SPOTS = [
{"name": "黄果树瀑布"},
{"name": "小七孔风景区"},
{"name": "梵净山"},
{"name": "西江千户苗寨"},
{"name": "青岩古镇"},
{"name": "镇远古城"},
{"name": "肇兴侗寨"},
{"name": "万峰林"},
{"name": "马岭河峡谷"},
{"name": "织金洞"},
{"name": "百里杜鹃风景名胜区"},
{"name": "赤水丹霞"},
{"name": "龙宫风景区"},
{"name": "遵义会议会址"},
{"name": "甲秀楼"},
{"name": "黔灵山公园"},
{
"name": "花溪公园",
"url": "https://baike.baidu.com/item/%E8%8A%B1%E6%BA%AA%E5%85%AC%E5%9B%AD/112398?fromModule=lemma_search-box",
},
{"name": "天河潭"},
{"name": "南江大峡谷"},
{"name": "乌蒙大草原"},
]
def slugify(text: str) -> str:
return re.sub(r"[\\/:*?\"<>|\\s]+", "_", text).strip("_") or "baike_page"
def clean_inline(text: str) -> str:
text = compact(text)
text = re.sub(r"\[(?:\d+|编辑)\]", "", text)
text = re.sub(r"\s*播报\s*编辑\s*$", "", text)
text = text.replace("\xa0", " ")
return compact(text)
def clean_heading(text: str) -> str:
text = clean_inline(text)
return text.strip("# ")
def clean_catalog_heading(text: str) -> str:
text = clean_heading(text)
# Strip catalog ordinals such as "1历史沿革", but keep real names like
# "68级跌水瀑布".
text = re.sub(r"^\d{1,2}\s*(?!级)(?=[\u4e00-\u9fa5A-Za-z])", "", text)
return text
def norm_heading(text: str) -> str:
text = clean_heading(text).lower()
text = re.sub(r"^[一二三四五六七八九十百千万]+[、.\s]*", "", text)
text = re.sub(r"^\d{1,2}\s*(?!级)(?=[\u4e00-\u9fa5A-Za-z])", "", text)
return re.sub(r"[\s#:、,,。.;()\[\]【】·\-_/]+", "", text)
def is_bad_line(text: str) -> bool:
text = clean_inline(text)
if not text or len(text) <= 1:
return True
if any(token in text for token in BAD_LINE_TOKENS):
return True
if len(text) > 700 and (text.startswith("{") or text.startswith("[")):
return True
return False
def dedupe_keep_order(items: list[str]) -> list[str]:
out: list[str] = []
seen = set()
for item in items:
key = norm_heading(item) or item
if key and key not in seen:
seen.add(key)
out.append(item)
return out
def anti_crawl_detected(html: str, final_url: str, title: str) -> bool:
if "anticrawl" in final_url or "captcha" in final_url:
return True
soup = BeautifulSoup(html, "html.parser")
text = compact(" ".join([title, soup.get_text(" ", strip=True)[:3000]]))
return any(token in text for token in ANTI_CRAWL_TOKENS)
def chrome_runtime_options() -> tuple[str, list[str], str]:
sys.path.insert(0, str(ROOT))
try:
from app.agents.web_agent import _CHROME_ARGS, _STEALTH_JS, _UA
except Exception:
_UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
_CHROME_ARGS = [
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
"--disable-sync",
"--disable-default-apps",
"--no-sandbox",
"--disable-dev-shm-usage",
]
_STEALTH_JS = "Object.defineProperty(navigator,'webdriver',{get:()=>undefined});"
return _UA, list(_CHROME_ARGS), _STEALTH_JS
def scroll_to_render(page, max_rounds: int = 16) -> None:
last_height = 0
stable_rounds = 0
for _ in range(max_rounds):
height = page.evaluate("() => document.documentElement.scrollHeight")
if height == last_height:
stable_rounds += 1
else:
stable_rounds = 0
if stable_rounds >= 3:
break
last_height = height
page.mouse.wheel(0, random.randint(1100, 1900))
page.wait_for_timeout(random.randint(220, 520))
page.evaluate("() => window.scrollTo(0, 0)")
page.wait_for_timeout(300)
def fetch_rendered_page(
url: str,
*,
profile_dir: Path,
headful: bool,
manual_seconds: int,
timeout_ms: int,
) -> tuple[str, str, str]:
from playwright.sync_api import sync_playwright
ua, chrome_args, stealth_js = chrome_runtime_options()
profile_dir.mkdir(parents=True, exist_ok=True)
with sync_playwright() as p:
ctx = p.chromium.launch_persistent_context(
str(profile_dir),
headless=not headful,
args=chrome_args,
ignore_default_args=["--enable-automation"],
user_agent=ua,
locale="zh-CN",
viewport=random.choice(
[
{"width": 1440, "height": 1100},
{"width": 1366, "height": 900},
{"width": 1600, "height": 1100},
]
),
extra_http_headers={
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.baidu.com/",
},
)
ctx.add_init_script(stealth_js)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded")
page.wait_for_timeout(random.randint(1000, 1800))
if manual_seconds > 0:
deadline = time.time() + manual_seconds
while time.time() < deadline:
html = page.content()
if not anti_crawl_detected(html, page.url, page.title() or ""):
break
remaining = int(deadline - time.time())
print(f"[manual] Baidu verification page detected, waiting {remaining}s...", flush=True)
page.wait_for_timeout(2500)
scroll_to_render(page)
final_url = page.url
title = page.title() or ""
html = page.content()
ctx.close()
return html, final_url, title
def table_to_markdown(table) -> list[str]:
rows: list[list[str]] = []
for tr in table.find_all("tr"):
cells = [clean_inline(c.get_text(" ", strip=True)).replace("|", "/") for c in tr.find_all(["th", "td"])]
cells = [c for c in cells if c]
if cells:
rows.append(cells[:8])
if not rows:
return []
width = max(len(row) for row in rows)
rows = [row + [""] * (width - len(row)) for row in rows]
lines = ["| " + " | ".join(rows[0]) + " |"]
lines.append("| " + " | ".join(["---"] * width) + " |")
for row in rows[1:]:
lines.append("| " + " | ".join(row) + " |")
return lines
def find_content_root(soup: BeautifulSoup):
for selector in (
"div[class*='mainContent']",
"div[id='J-lemma-main-wrapper']",
"div[class*='lemmaWrapper']",
"div.J-lemma-content",
"div[class*='lemma-content']",
"main",
"article",
):
root = soup.select_one(selector)
if root and len(clean_inline(root.get_text(" ", strip=True))) > 600:
return root
return soup.body or soup
def extract_catalog(soup: BeautifulSoup) -> list[str]:
candidates: list[str] = []
containers = soup.select(
"[class*='catalogWrapper'], [class*='CatalogWrapper'], "
"[class*='catalog_'], [class*='Catalog_'], [class*='catalogList'], [class*='CatalogList']"
)
for container in containers:
for link in container.find_all("a"):
text = clean_catalog_heading(link.get_text(" ", strip=True))
if not text:
continue
if text in CATALOG_NOISE or is_bad_line(text):
continue
if len(text) <= 40:
candidates.append(text)
if not candidates:
root = find_content_root(soup)
for link in root.select("a[href^='#']"):
text = clean_catalog_heading(link.get_text(" ", strip=True))
if text and text not in CATALOG_NOISE and len(text) <= 40 and not is_bad_line(text):
candidates.append(text)
return dedupe_keep_order(candidates)
def is_basic_info_pair(key: str, val: str) -> bool:
key = clean_inline(key).rstrip(":")
val = clean_inline(val)
key_norm = re.sub(r"\s+", "", key)
if not key or not val:
return False
if key_norm in BASIC_INFO_NOISE_KEYS or key in BASIC_INFO_NOISE_KEYS:
return False
if is_bad_line(key + val):
return False
if len(key) > 24 or len(val) > 300:
return False
return True
def extract_basic_info(soup: BeautifulSoup) -> list[tuple[str, str]]:
pairs: list[tuple[str, str]] = []
def add_pair(key: str, val: str) -> None:
key = clean_inline(key).rstrip(":")
val = clean_inline(val)
if not is_basic_info_pair(key, val):
return
key_norm = re.sub(r"\s+", "", key)
for idx, (old_key, old_val) in enumerate(pairs):
old_key_norm = re.sub(r"\s+", "", old_key)
if old_key_norm == key_norm:
if len(val) > len(old_val):
pairs[idx] = (key, val)
return
pairs.append((key, val))
names = soup.select(".basicInfo-item.name")
values = soup.select(".basicInfo-item.value")
for name, value in zip(names, values, strict=False):
key = clean_inline(name.get_text(" ", strip=True)).rstrip(":")
val = clean_inline(value.get_text(" ", strip=True))
add_pair(key, val)
for dt in soup.find_all("dt"):
dd = dt.find_next_sibling("dd")
key = clean_inline(dt.get_text(" ", strip=True)).rstrip(":")
val = clean_inline(dd.get_text(" ", strip=True)) if dd else ""
in_basic_info = bool(dt.find_parent(class_=re.compile(r"basicInfo|BasicInfo|J-basic-info")))
if in_basic_info:
add_pair(key, val)
if not pairs:
containers = soup.select("[class*='basicInfo'], [class*='BasicInfo'], .J-basic-info")
for container in containers:
tokens = [
clean_inline(token)
for token in container.get_text("\n", strip=True).splitlines()
if clean_inline(token)
]
i = 0
while i < len(tokens) - 1:
key = tokens[i].rstrip(":")
val = tokens[i + 1]
before_count = len(pairs)
add_pair(key, val)
if len(pairs) > before_count:
i += 2
else:
i += 1
return pairs
def extract_summary(soup: BeautifulSoup) -> list[str]:
root_candidates = soup.select(".J-summary, [class*='lemmaSummary'], [class*='summary_']")
lines: list[str] = []
seen = set()
for root in root_candidates:
for tag in root.find_all(["p", "div"], recursive=True):
text = clean_inline(tag.get_text(" ", strip=True))
if len(text) < 20 or is_bad_line(text):
continue
key = norm_heading(text)
if key not in seen:
seen.add(key)
lines.append(text)
return lines
def tag_classes(tag) -> str:
return " ".join(tag.get("class") or [])
def block_kind(tag) -> str:
classes = tag_classes(tag)
tag_id = tag.get("id") or ""
if "lemmaReference" in classes or "reference" in classes.lower() or tag_id == "J-lemma-reference":
return "stop"
if tag.name in {"h1", "h2", "h3", "h4"}:
return "heading"
if tag.name == "table":
return "table"
if tag.name == "li":
return "list"
if tag.name == "p":
return "paragraph"
if tag.name != "div":
return ""
if "paraTitle" in classes or re.search(r"\blevel-\d", classes):
return "heading"
lower = classes.lower()
if "caption" in lower:
return "caption"
if "para" in classes and ("content" in classes or "summary" in classes or "MARK_MODULE" in classes):
return "paragraph"
return ""
def heading_level(tag) -> str:
classes = tag_classes(tag)
if tag.name == "h1":
return "##"
if tag.name == "h2" or "level-1" in classes:
return "###"
if tag.name == "h3" or "level-2" in classes:
return "####"
if tag.name == "h4" or "level-3" in classes:
return "#####"
return "###"
def media_src(tag, final_url: str) -> str:
attrs = [
"src",
"data-src",
"data-original",
"data-lazy-src",
"data-url",
"poster",
]
for attr in attrs:
value = clean_inline(tag.get(attr) or "")
if value and not value.startswith("data:"):
return urljoin(final_url, value)
source = tag.find("source") if hasattr(tag, "find") else None
if source:
value = clean_inline(source.get("src") or "")
if value and not value.startswith("data:"):
return urljoin(final_url, value)
return ""
def media_caption(tag) -> str:
for parent in [tag, *list(tag.parents)[:5]]:
if not parent:
continue
for selector in (
"[class*='swiperDesc']",
"[class*='imageCaption']",
"[class*='picDesc']",
"[class*='caption']",
"[class*='desc']",
"figcaption",
):
found = parent.select_one(selector) if hasattr(parent, "select_one") else None
if found:
text = clean_inline(found.get_text(" ", strip=True))
if 1 < len(text) <= 120 and not is_bad_line(text):
return text
for attr in ("alt", "title", "aria-label"):
text = clean_inline(tag.get(attr) or "")
if 1 < len(text) <= 120 and not is_bad_line(text):
return text
return ""
def text_without_media(tag) -> str:
clone = BeautifulSoup(str(tag), "html.parser")
root = clone.find()
media_containers = []
for media in clone.find_all(["img", "video", "source"]):
container = media
for parent in media.parents:
if parent is clone or parent == root:
break
if parent.find(["img", "video"]):
container = parent
if container not in media_containers:
media_containers.append(container)
for container in media_containers:
container.decompose()
for selector in (
"img",
"video",
"source",
"[class*='swiperUl']",
"[class*='swiperLi']",
"[class*='swiperDesc']",
"[class*='SwiperDesc']",
"[class*='lemmaPicture']",
"[class*='LemmaPicture']",
"[class*='imageCaption']",
"[class*='ImageCaption']",
"[class*='picDesc']",
"[class*='PicDesc']",
"[class*='caption']",
"[class*='Caption']",
"[class*='desc']",
"[class*='Desc']",
):
for found in clone.select(selector):
found.decompose()
return clean_inline(clone.get_text(" ", strip=True))
def is_media_noise(item: MediaItem) -> bool:
text = f"{item.caption} {item.alt}"
if not item.src:
return True
if not item.caption and not item.alt:
return True
if any(token in text for token in ("订阅", "收藏", "", "播放", "编辑")):
return True
if "subscribe" in item.src or "front-end/swanapp-baike" in item.src:
return True
if item.section in {"相关星图", "词条统计"}:
return True
return False
def extract_media_items(soup: BeautifulSoup, final_url: str) -> list[MediaItem]:
root = soup.select_one("div.J-lemma-content") or soup.select_one("div[class*='lemma-content']") or find_content_root(soup)
items: list[MediaItem] = []
seen = set()
current_section = ""
for tag in root.find_all(["h1", "h2", "h3", "h4", "div", "img", "video"], recursive=True):
kind = block_kind(tag)
if kind == "stop":
break
if kind == "heading":
heading = clean_heading(tag.get_text(" ", strip=True))
if heading and not is_bad_line(heading):
current_section = heading
continue
if tag.name not in {"img", "video"}:
continue
src = media_src(tag, final_url)
caption = media_caption(tag)
alt = clean_inline(tag.get("alt") or tag.get("title") or "")
parent_link = tag.find_parent("a")
href = urljoin(final_url, parent_link.get("href")) if parent_link and parent_link.get("href") else ""
asset_id = clean_inline(parent_link.get("id") or tag.get("id") or "") if parent_link or tag.get("id") else ""
item = MediaItem(
kind="video" if tag.name == "video" else "image",
section=current_section,
caption=caption or alt,
alt=alt,
src=src,
href=href,
asset_id=asset_id,
width=clean_inline(tag.get("width") or ""),
height=clean_inline(tag.get("height") or ""),
)
key = (item.kind, item.section, item.caption, item.src)
if key in seen or is_media_noise(item):
continue
seen.add(key)
items.append(item)
return items
def extract_body_blocks(soup: BeautifulSoup) -> tuple[list[str], list[str], int, int]:
root = soup.select_one("div.J-lemma-content") or soup.select_one("div[class*='lemma-content']") or find_content_root(soup)
lines: list[str] = []
headings: list[str] = []
seen_blocks = set()
paragraph_count = 0
table_count = 0
for tag in root.find_all(["h1", "h2", "h3", "h4", "p", "li", "table", "div"], recursive=True):
kind = block_kind(tag)
if kind == "stop":
break
if not kind:
continue
if kind == "table":
table_lines = table_to_markdown(tag)
if table_lines:
key = "\n".join(table_lines)
if key not in seen_blocks:
seen_blocks.add(key)
lines.extend(table_lines)
lines.append("")
table_count += 1
continue
if kind == "caption":
continue
text = text_without_media(tag) if kind in {"paragraph", "list"} else clean_inline(tag.get_text(" ", strip=True))
if is_bad_line(text):
continue
if any(text.startswith(prefix) for prefix in ("参考资料", "词条标签", "开放分类")):
break
if kind == "heading":
text = clean_heading(text)
key = norm_heading(text)
if not key or key in seen_blocks:
continue
seen_blocks.add(key)
headings.append(text)
lines.extend([f"{heading_level(tag)} {text}", ""])
continue
if kind == "list":
if len(text) > 240:
continue
rendered = f"- {text}"
else:
rendered = text
key = norm_heading(rendered)
if not key or key in seen_blocks:
continue
seen_blocks.add(key)
lines.extend([rendered, ""])
paragraph_count += 1
return lines, headings, paragraph_count, table_count
def match_headings(catalog: list[str], body_headings: list[str]) -> tuple[list[str], list[str]]:
matched: list[str] = []
missing: list[str] = []
body_norms = [norm_heading(item) for item in body_headings]
for item in catalog:
item_norm = norm_heading(item)
if not item_norm:
continue
ok = any(item_norm == body or item_norm in body or body in item_norm for body in body_norms if body)
if ok:
matched.append(item)
else:
missing.append(item)
return matched, missing
def build_markdown(
*,
html: str,
source_name: str,
query_name: str,
requested_url: str,
final_url: str,
browser_title: str,
min_catalog_coverage: float,
min_chars: int,
) -> tuple[str, PageQuality, list[MediaItem]]:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "iframe", "canvas", "svg", "button", "input", "form"]):
tag.decompose()
page_title = source_name
h1 = soup.select_one("h1")
if h1:
page_title = clean_heading(h1.get_text(" ", strip=True)) or page_title
elif browser_title:
page_title = clean_heading(browser_title.split("_百度百科", 1)[0]) or page_title
anti_crawl = anti_crawl_detected(html, final_url, browser_title)
basic_info = extract_basic_info(soup)
catalog = extract_catalog(soup)
summary = extract_summary(soup)
media_items = extract_media_items(soup, final_url)
body_lines, body_headings, paragraph_count, table_count = extract_body_blocks(soup)
matched_catalog, missing_catalog = match_headings(catalog, body_headings)
catalog_coverage = (len(matched_catalog) / len(catalog)) if catalog else 0.0
reasons: list[str] = []
if anti_crawl:
reasons.append("hit Baidu anti-crawl/verification page")
if len(body_lines) == 0:
reasons.append("no body blocks extracted")
if catalog and catalog_coverage < min_catalog_coverage:
reasons.append(f"catalog coverage {catalog_coverage:.0%} < {min_catalog_coverage:.0%}")
crawl_time = now_iso()
lines = [
f"# {page_title or source_name}",
"",
"## 完整度检查",
"",
"- 状态PENDING",
f"- 页面目录项:{len(catalog)}",
f"- 正文标题数:{len(body_headings)}",
f"- 目录覆盖率:{catalog_coverage:.0%} ({len(matched_catalog)}/{len(catalog) if catalog else 0})",
f"- 段落/列表:{paragraph_count}",
f"- 表格:{table_count}",
f"- 反爬/验证页:{'' if anti_crawl else ''}",
]
if missing_catalog:
lines.append("- 缺失目录项:" + "".join(missing_catalog[:30]))
lines.append("")
lines.extend(
[
"## 页面正文 Markdown",
"",
"### 抓取信息(非原页面正文)",
"",
"- 数据源:百度百科",
f"- 请求词条:{query_name}",
f"- 页面标题:{page_title}",
f"- 请求 URL{requested_url}",
f"- 最终 URL{final_url}",
f"- 抓取时间:{crawl_time}",
"- 转换方式rendered DOM fullpage + catalog validation",
"",
]
)
if basic_info:
lines.extend(["### 基本信息", "", "| 字段 | 值 |", "| --- | --- |"])
for key, val in basic_info:
lines.append(f"| {key.replace('|', '/')} | {val.replace('|', '/')} |")
lines.append("")
if summary:
lines.extend(["### 摘要", ""])
for item in summary:
lines.extend([item, ""])
if catalog:
lines.extend(["### 页面目录", ""])
for item in catalog:
lines.append(f"- {item}")
lines.append("")
if media_items:
lines.extend(
[
"### 媒体证据(图片/视频)",
"",
"| 所属章节 | 类型 | 说明 | URL |",
"| --- | --- | --- | --- |",
]
)
for item in media_items:
label = item.caption or item.alt
url = item.href or item.src
lines.append(
f"| {item.section.replace('|', '/')} | {item.kind} | "
f"{label.replace('|', '/')} | {url.replace('|', '/')} |"
)
lines.append("")
lines.extend(body_lines)
markdown_without_status = "\n".join(lines).strip() + "\n"
markdown_chars = len(markdown_without_status)
if markdown_chars < min_chars:
reasons.append(f"markdown chars {markdown_chars} < {min_chars}")
status = "OK" if not reasons else "INCOMPLETE"
reason = "ok" if not reasons else "; ".join(reasons)
markdown = markdown_without_status.replace("- 状态PENDING", f"- 状态:{status}", 1)
quality = PageQuality(
status=status,
reason=reason,
page_title=page_title,
final_url=final_url,
markdown_chars=len(markdown),
catalog_count=len(catalog),
body_heading_count=len(body_headings),
matched_catalog_count=len(matched_catalog),
catalog_coverage=round(catalog_coverage, 4),
missing_catalog_headings=missing_catalog,
anti_crawl=anti_crawl,
media_count=len(media_items),
)
return markdown, quality, media_items
def write_outputs(
markdown: str,
quality: PageQuality,
media_items: list[MediaItem],
out_dir: Path,
filename: str,
force: bool,
write_json: bool,
) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
md_path = out_dir / filename
if md_path.exists() and not force:
raise FileExistsError(f"Output exists, use --force: {md_path}")
md_path.write_text(markdown, encoding="utf-8")
quality_path = md_path.with_suffix(".quality.json")
media_path = md_path.with_suffix(".media.json")
if write_json:
quality_path.write_text(
json.dumps(asdict(quality), ensure_ascii=False, indent=2),
encoding="utf-8",
)
media_path.write_text(
json.dumps([asdict(item) for item in media_items], ensure_ascii=False, indent=2),
encoding="utf-8",
)
else:
for path in (quality_path, media_path):
if path.exists():
path.unlink()
return md_path
def crawl_fullpage(args: argparse.Namespace) -> tuple[Path, PageQuality]:
if not args.name and not args.url:
raise SystemExit("use --name or --url")
query_name = args.name or args.url
requested_url = args.url or baike_url(args.name)
source_name = args.name or clean_heading(Path(requested_url).name)
html, final_url, browser_title = fetch_rendered_page(
requested_url,
profile_dir=Path(args.profile_dir),
headful=args.headful,
manual_seconds=max(0, args.manual_seconds),
timeout_ms=args.timeout_ms,
)
markdown, quality, media_items = build_markdown(
html=html,
source_name=source_name,
query_name=query_name,
requested_url=requested_url,
final_url=final_url,
browser_title=browser_title,
min_catalog_coverage=args.min_catalog_coverage,
min_chars=args.min_chars,
)
filename = args.output_name or f"{slugify(source_name)}.md"
path = write_outputs(markdown, quality, media_items, Path(args.out_dir), filename, args.force, args.write_json)
return path, quality
def write_batch_index(rows: list[dict], out_dir: Path) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
lines = [
"# 贵州景点百度百科 Markdown 抓取清单",
"",
f"- 生成时间:{now_iso()}",
f"- 文件数:{len(rows)}",
f"- 成功:{sum(1 for row in rows if row.get('status') == 'OK')}",
f"- 需复核:{sum(1 for row in rows if row.get('status') != 'OK')}",
"- 说明:完整度只用页面自身目录与正文标题校验,不预设业务目录。",
"",
"| # | 名称 | 页面标题 | Markdown | 目录覆盖 | 字符 | 媒体 | 状态 |",
"| ---: | --- | --- | --- | ---: | ---: | ---: | --- |",
]
for idx, row in enumerate(rows, 1):
md_name = Path(row["path"]).name if row.get("path") else ""
md_link = f"[{md_name}](./{md_name})" if md_name else "-"
lines.append(
f"| {idx} | {row.get('name', '')} | {row.get('page_title', '')} | "
f"{md_link} | {row.get('catalog_matched', 0)}/{row.get('catalog_count', 0)} | "
f"{row.get('chars', 0)} | {row.get('media_count', 0)} | {row.get('status', '')} |"
)
(out_dir / "index.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
def crawl_guizhou_batch(args: argparse.Namespace) -> list[dict]:
out_dir = Path(args.out_dir) if args.out_dir != str(DEFAULT_OUT_DIR) else DEFAULT_GUIZHOU_BATCH_DIR
out_dir.mkdir(parents=True, exist_ok=True)
rows: list[dict] = []
limit = args.limit if args.limit and args.limit > 0 else len(GUIZHOU_SCENIC_SPOTS)
for idx, entry in enumerate(GUIZHOU_SCENIC_SPOTS[:limit], 1):
name = entry["name"]
item_args = argparse.Namespace(**vars(args))
item_args.name = name
item_args.url = entry.get("url")
item_args.out_dir = str(out_dir)
item_args.output_name = f"{idx:02d}_{slugify(name)}.md"
print(f"[batch] {idx:02d}/{limit} {name}", flush=True)
try:
path, quality = crawl_fullpage(item_args)
rows.append(
{
"name": name,
"path": str(path),
"status": quality.status,
"page_title": quality.page_title,
"catalog_matched": quality.matched_catalog_count,
"catalog_count": quality.catalog_count,
"chars": quality.markdown_chars,
"media_count": quality.media_count,
"reason": quality.reason,
}
)
print(
f" [{quality.status}] title={quality.page_title} "
f"catalog={quality.matched_catalog_count}/{quality.catalog_count} "
f"chars={quality.markdown_chars} media={quality.media_count}",
flush=True,
)
except Exception as exc: # noqa: BLE001
rows.append(
{
"name": name,
"path": "",
"status": "ERROR",
"page_title": "",
"catalog_matched": 0,
"catalog_count": 0,
"chars": 0,
"media_count": 0,
"reason": str(exc)[:200],
}
)
print(f" [ERROR] {str(exc)[:200]}", flush=True)
write_batch_index(rows, out_dir)
if idx < limit and args.sleep > 0:
time.sleep(args.sleep + random.random() * 0.7)
return rows
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--name", help="Baidu Baike entry name")
parser.add_argument("--url", help="Exact Baidu Baike URL; prefer item URLs with numeric lemma ID")
parser.add_argument("--batch-guizhou", action="store_true", help="Crawl built-in Guizhou scenic-spot sample list")
parser.add_argument("--out-dir", default=str(DEFAULT_OUT_DIR))
parser.add_argument("--output-name", help="Markdown filename, for example 02_小七孔风景区_6899702.md")
parser.add_argument("--profile-dir", default=str(DEFAULT_PROFILE_DIR))
parser.add_argument("--headful", action="store_true", help="Open a visible browser; useful when Baidu asks for verification")
parser.add_argument("--manual-seconds", type=int, default=0, help="Wait this many seconds for manual verification if needed")
parser.add_argument("--timeout-ms", type=int, default=60000)
parser.add_argument("--min-catalog-coverage", type=float, default=0.9)
parser.add_argument("--min-chars", type=int, default=2000)
parser.add_argument("--limit", type=int, help="Batch mode: crawl only first N items")
parser.add_argument("--sleep", type=float, default=1.0, help="Batch mode: delay between pages")
parser.add_argument("--write-json", action="store_true", help="Also write .quality.json and .media.json sidecar files")
parser.add_argument("--force", action="store_true")
parser.add_argument("--strict", action="store_true", help="Exit 2 when quality status is not OK")
args = parser.parse_args()
if args.batch_guizhou:
rows = crawl_guizhou_batch(args)
bad = [row for row in rows if row.get("status") != "OK"]
print(f"[done] {DEFAULT_GUIZHOU_BATCH_DIR if args.out_dir == str(DEFAULT_OUT_DIR) else args.out_dir}", flush=True)
return 2 if args.strict and bad else 0
path, quality = crawl_fullpage(args)
print(
f"[{quality.status}] file={path} chars={quality.markdown_chars} "
f"catalog={quality.matched_catalog_count}/{quality.catalog_count} "
f"headings={quality.body_heading_count} reason={quality.reason}",
flush=True,
)
return 2 if args.strict and quality.status != "OK" else 0
if __name__ == "__main__":
raise SystemExit(main())