Files
bxh/scripts/baike_to_markdown.py

865 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Render Baidu Baike pages and save clean Markdown evidence files.
This is the "web page -> Markdown -> extraction" bridge. It keeps the rendered
page structure that matters for schema work: source metadata, basic-info pairs,
headings, paragraphs, lists, and tables.
"""
from __future__ import annotations
import argparse
import json
import random
import re
import sys
import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import quote
from bs4 import BeautifulSoup
ROOT = Path(__file__).resolve().parents[1]
OUT_DIR = ROOT / "schema搭建" / "baidu_baike_md_data"
SCENIC_SPOTS = [
"黄果树瀑布",
"荔波小七孔景区",
"梵净山",
"西江千户苗寨",
"青岩古镇",
"镇远古城",
"肇兴侗寨",
"万峰林",
"马岭河峡谷",
"织金洞",
"百里杜鹃风景名胜区",
"赤水丹霞",
"龙宫风景区",
"遵义会议会址",
"甲秀楼",
"黔灵山公园",
"花溪公园",
"天河潭",
"南江大峡谷",
"乌蒙大草原",
]
BAIKE_QUERY_ALIASES = {
"黄果树瀑布": ["黄果树大瀑布", "黄果树风景名胜区", "安顺市黄果树大瀑布景区"],
"荔波小七孔景区": ["荔波樟江风景名胜区", "小七孔"],
"西江千户苗寨": ["西江千户苗寨景区", "雷山县西江千户苗寨景区"],
"青岩古镇": ["贵阳市青岩古镇景区", "青岩古镇景区"],
"镇远古城": ["黔东南苗族侗族自治州镇远古城旅游景区"],
"肇兴侗寨": ["黎平县肇兴侗寨景区", "肇兴侗寨景区"],
"万峰林": ["万峰林景区", "兴义万峰林"],
"马岭河峡谷": ["马岭河峡谷风景名胜区", "马岭河峡谷景区"],
"织金洞": ["织金洞风景名胜区", "毕节织金洞"],
"百里杜鹃风景名胜区": ["百里杜鹃", "百里杜鹃风景区", "百里杜鹃景区", "贵州百里杜鹃风景名胜区"],
"赤水丹霞": ["赤水丹霞旅游区", "赤水丹霞国家地质公园"],
"遵义会议会址": ["遵义会议会址景区"],
"甲秀楼": ["贵阳甲秀楼"],
"花溪公园": ["贵阳市花溪公园"],
"天河潭": ["天河潭旅游度假区", "天河潭风景区"],
"乌蒙大草原": ["乌蒙大草原景区", "盘州乌蒙大草原"],
}
SCHEMA_FIELD_HINTS = {
"中文名",
"外文名",
"地理位置",
"气候条件",
"开放时间",
"景点级别",
"门票价格",
"占地面积",
"著名景点",
"建议游玩时长",
"适宜游玩季节",
"所属国家",
"所属城市",
"保护级别",
"主要景观",
"最佳旅游时间",
"海拔",
"管理单位",
"别名",
"类型",
}
BAD_LINE_TOKENS = [
"百度首页",
"登录",
"注册",
"打开APP",
"秒懂百科",
"百度百科合作平台",
"使用百度前必读",
"百科协议",
"隐私政策",
"©",
"京ICP",
"营业执照",
"投诉建议",
"词条统计",
"分享你的世界",
"相关星图",
"查看更多",
"上传视频",
]
ANTI_CRAWL_TOKENS = [
"百度安全验证",
"验证码",
"网络不给力",
"系统检测到异常",
"captcha",
"anticrawl",
]
@dataclass
class MarkdownPage:
name: str
query_name: str
requested_url: str
final_url: str
title: str
markdown_file: str
markdown_chars: int
paragraph_count: int
heading_count: int
basic_info_count: int
table_count: int
error: str = ""
def _import_web_agent_constants():
sys.path.insert(0, str(ROOT))
try:
from app.agents.web_agent import _CHROME_ARGS, _STEALTH_JS, _UA
except Exception:
_UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
_CHROME_ARGS = [
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
"--disable-sync",
"--disable-default-apps",
"--no-sandbox",
"--disable-dev-shm-usage",
]
_STEALTH_JS = "Object.defineProperty(navigator,'webdriver',{get:()=>undefined});"
return _UA, _CHROME_ARGS, _STEALTH_JS
def compact(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def slugify(name: str, idx: int | None = None) -> str:
safe = re.sub(r"[\\/:*?\"<>|\\s]+", "_", name).strip("_")
if idx is None:
return safe or "baike_page"
return f"{idx:02d}_{safe or 'baike_page'}"
def baike_url(name: str) -> str:
return f"https://baike.baidu.com/item/{quote(name)}"
def query_terms(name: str) -> list[str]:
terms = [name, *BAIKE_QUERY_ALIASES.get(name, [])]
out = []
for term in terms:
if term not in out:
out.append(term)
return out
def is_bad_line(text: str) -> bool:
text = compact(text)
if not text:
return True
if any(token in text for token in BAD_LINE_TOKENS):
return True
if len(text) <= 1:
return True
if text == "目录" or re.match(r"^\d+\s+[\u4e00-\u9fa5A-Za-z]", text):
return True
if len(text) > 500 and (text.startswith("{") or text.startswith("[")):
return True
return False
def clean_heading(text: str) -> str:
text = compact(text)
text = re.sub(r"\s*播报\s*编辑\s*$", "", text)
return text.strip("# ")
def looks_like_anti_crawl(html: str, final_url: str, title: str = "") -> bool:
if "anticrawl" in final_url or "captcha" in final_url:
return True
soup = BeautifulSoup(html, "html.parser")
text = compact(" ".join([title, soup.get_text(" ", strip=True)[:3000]]))
return any(token in text for token in ANTI_CRAWL_TOKENS)
def fetch_rendered_html(url: str, timeout_ms: int = 45000) -> tuple[str, str, str]:
from playwright.sync_api import sync_playwright
ua, chrome_args, stealth_js = _import_web_agent_constants()
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=chrome_args,
ignore_default_args=["--enable-automation"],
)
ctx = browser.new_context(
user_agent=ua,
locale="zh-CN",
viewport=random.choice([
{"width": 1440, "height": 900},
{"width": 1366, "height": 768},
{"width": 1600, "height": 1000},
]),
extra_http_headers={
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://www.baidu.com/",
},
)
ctx.add_init_script(stealth_js)
page = ctx.new_page()
page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded")
page.wait_for_timeout(random.randint(1200, 2200))
try:
page.mouse.wheel(0, random.randint(700, 1500))
page.wait_for_timeout(random.randint(600, 1100))
except Exception:
pass
final_url = page.url
title = page.title() or ""
html = page.content()
browser.close()
return html, final_url, title
def extract_basic_info(soup: BeautifulSoup) -> list[tuple[str, str]]:
pairs: list[tuple[str, str]] = []
seen = set()
names = soup.select(".basicInfo-item.name")
values = soup.select(".basicInfo-item.value")
for name, value in zip(names, values, strict=False):
key = compact(name.get_text(" ", strip=True)).rstrip(":")
val = compact(value.get_text(" ", strip=True))
if key not in SCHEMA_FIELD_HINTS:
continue
if key and val and (key, val) not in seen:
seen.add((key, val))
pairs.append((key, val))
for dt in soup.find_all("dt"):
dd = dt.find_next_sibling("dd")
key = compact(dt.get_text(" ", strip=True)).rstrip(":")
val = compact(dd.get_text(" ", strip=True)) if dd else ""
if key not in SCHEMA_FIELD_HINTS:
continue
if key and val and len(key) <= 24 and len(val) <= 260 and not is_bad_line(key + val):
item = (key, val)
if item not in seen:
seen.add(item)
pairs.append(item)
return pairs
def table_to_markdown(table) -> list[str]:
rows: list[list[str]] = []
for tr in table.find_all("tr"):
cells = [compact(c.get_text(" ", strip=True)).replace("|", "/") for c in tr.find_all(["th", "td"])]
cells = [c for c in cells if c]
if cells:
rows.append(cells[:6])
if not rows:
return []
width = max(len(r) for r in rows)
rows = [r + [""] * (width - len(r)) for r in rows]
lines = ["| " + " | ".join(rows[0]) + " |"]
lines.append("| " + " | ".join(["---"] * width) + " |")
for row in rows[1:]:
lines.append("| " + " | ".join(row) + " |")
return lines
def pick_content_root(soup: BeautifulSoup):
for selector in (
"div.J-lemma-content",
"div[class*='lemma-content']",
"div[id='J-lemma-main-wrapper'] div[class*='mainContent']",
"div[class*='mainContent']",
"main",
"article",
):
root = soup.select_one(selector)
if root and len(compact(root.get_text(" ", strip=True))) > 400:
return root
return soup.body or soup
def baike_tag_kind(tag) -> str:
"""Classify Baidu Baike's old/new DOM nodes into markdown blocks."""
if tag.name in {"h1", "h2", "h3", "h4"}:
return "heading"
if tag.name == "li":
return "list"
if tag.name == "p":
return "paragraph"
if tag.name != "div":
return ""
classes = " ".join(tag.get("class") or [])
tag_id = tag.get("id") or ""
if "lemmaReference" in classes or tag_id == "J-lemma-reference":
return "stop"
if "paraTitle" in classes or re.search(r"\blevel-\d", classes):
return "heading"
if "para" in classes and ("content" in classes or "MARK_MODULE" in classes):
return "paragraph"
return ""
def html_to_markdown(html: str, source_name: str, query_name: str, requested_url: str, final_url: str) -> tuple[str, dict]:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "iframe", "canvas", "svg", "button", "input", "form"]):
tag.decompose()
for selector in ("nav", "footer", "header", "[class*='navbar']", "[class*='toolbar']", "[class*='share']"):
for tag in soup.select(selector):
tag.decompose()
page_title = clean_heading(soup.select_one("h1").get_text(" ", strip=True)) if soup.select_one("h1") else source_name
basic_info = extract_basic_info(soup)
root = pick_content_root(soup)
now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
lines = [
f"# {source_name}",
"",
"## 元数据",
"",
f"- 数据源:百度百科",
f"- 请求词条:{query_name}",
f"- 页面标题:{page_title}",
f"- 请求 URL{requested_url}",
f"- 最终 URL{final_url}",
f"- 抓取时间:{now}",
"",
]
if basic_info:
lines.extend(["## 基本信息", "", "| 字段 | 值 |", "| --- | --- |"])
for key, val in basic_info:
lines.append(f"| {key.replace('|', '/')} | {val.replace('|', '/')} |")
lines.append("")
lines.extend(["## 页面正文 Markdown", ""])
seen_text = set()
heading_count = 0
paragraph_count = 0
table_count = 0
fallback_body_used = False
stop = False
for tag in root.find_all(["h1", "h2", "h3", "h4", "p", "li", "table", "div"], recursive=True):
if stop:
break
if tag.name == "table":
md_table = table_to_markdown(tag)
if md_table:
key = "\n".join(md_table)
if key not in seen_text:
seen_text.add(key)
lines.extend(md_table)
lines.append("")
table_count += 1
continue
kind = baike_tag_kind(tag)
if kind == "stop":
break
if not kind:
continue
text = compact(tag.get_text(" ", strip=True))
if is_bad_line(text):
continue
text = re.sub(r"\[(\d+|编辑)\]", "", text).strip()
text = re.sub(r"\s*播报\s*编辑\s*$", "", text).strip()
if not text or text in seen_text:
continue
if any(text.startswith(prefix) for prefix in ("参考资料", "词条标签", "免责声明")):
stop = True
continue
seen_text.add(text)
if kind == "heading":
heading = clean_heading(text)
if heading and not is_bad_line(heading):
level = {"h1": "##", "h2": "###", "h3": "####", "h4": "#####", "div": "###"}[tag.name]
lines.extend([f"{level} {heading}", ""])
heading_count += 1
elif kind == "list":
if 2 <= len(text) <= 240:
lines.append(f"- {text}")
paragraph_count += 1
else:
lines.extend([text, ""])
paragraph_count += 1
if paragraph_count == 0:
fallback_body_used = True
fallback_text_root = soup.body or root
for raw_line in fallback_text_root.get_text("\n", strip=True).splitlines():
text = compact(raw_line)
text = re.sub(r"\[(\d+|编辑)\]", "", text).strip()
text = re.sub(r"\s*播报\s*编辑\s*$", "", text).strip()
if is_bad_line(text) or text in seen_text:
continue
if any(text.startswith(prefix) for prefix in ("参考资料", "词条标签", "免责声明")):
break
if len(text) < 4:
continue
seen_text.add(text)
if len(text) <= 24 and not re.search(r"[。!?;,,]", text):
lines.extend([f"### {clean_heading(text)}", ""])
heading_count += 1
else:
lines.extend([text, ""])
paragraph_count += 1
markdown = "\n".join(lines).strip() + "\n"
stats = {
"page_title": page_title,
"basic_info_count": len(basic_info),
"heading_count": heading_count,
"paragraph_count": paragraph_count,
"table_count": table_count,
"markdown_chars": len(markdown),
"fallback_body_used": fallback_body_used,
}
return markdown, stats
def fetch_existing_baike_text(url: str) -> tuple[str | None, str]:
sys.path.insert(0, str(ROOT))
from app.agents.web_agent import fetch_baidu_baike_text
return fetch_baidu_baike_text(url)
def fetch_existing_baike_text_with_retries(url: str, attempts: int = 3) -> tuple[str | None, str]:
last_final = url
for attempt in range(1, attempts + 1):
raw_text, final_url = fetch_existing_baike_text(url)
last_final = final_url or last_final
if raw_text and len(raw_text) >= 500 and "百度安全验证" not in raw_text:
return raw_text, last_final
time.sleep(1.2 * attempt + random.random())
return None, last_final
def fetch_crawl4ai_markdown(url: str) -> tuple[str | None, str]:
"""Use Crawl4AI when it is installed; keep the script runnable without it."""
try:
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode, CrawlerRunConfig
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
except Exception as exc: # noqa: BLE001
return None, f"crawl4ai unavailable: {str(exc)[:160]}"
async def _run() -> tuple[str | None, str]:
md_generator = DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.35, threshold_type="fixed")
)
browser_conf = BrowserConfig(headless=True)
run_conf = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=md_generator,
)
async with AsyncWebCrawler(config=browser_conf) as crawler:
result = await crawler.arun(url=url, config=run_conf)
md = getattr(result.markdown, "fit_markdown", None) or getattr(
result.markdown, "raw_markdown", None
) or str(result.markdown or "")
return (md.strip() if md else None), getattr(result, "url", url) or url
try:
return asyncio.run(_run())
except Exception as exc: # noqa: BLE001
return None, f"crawl4ai failed: {str(exc)[:200]}"
def baike_text_to_markdown(
raw_text: str,
source_name: str,
query_name: str,
requested_url: str,
final_url: str,
) -> tuple[str, dict]:
text = compact(raw_text)
title = source_name
m_title = re.search(r"词条名:(.{1,80}?)(?=\s(?:中文名|外文名|地理位置|正文)|$)", text)
if m_title:
title = compact(m_title.group(1))
basic_info: list[tuple[str, str]] = []
keys = ["词条名", *sorted(SCHEMA_FIELD_HINTS, key=len, reverse=True)]
key_alt = "|".join(re.escape(k) for k in keys)
for key in keys:
if key == "词条名":
continue
m = re.search(rf"{re.escape(key)}(.{{1,260}}?)(?=\s(?:{key_alt}|正文)|$)", text)
if not m:
continue
value = compact(m.group(1))
if "播报" in value or re.match(r"^\d+[\u4e00-\u9fa5]", value):
continue
if value and not is_bad_line(key + value):
basic_info.append((key, value))
body = text.split("正文:", 1)[1] if "正文:" in text else text
body = re.sub(r"\[(\d+|编辑)\]", "", body)
body = re.sub(
r"\s*([\u4e00-\u9fa5A-Za-z0-9·、()]{2,28})\s+播报\s+编辑\s*",
r"\n\n### \1\n\n",
body,
)
now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
lines = [
f"# {source_name}",
"",
"## 元数据",
"",
"- 数据源:百度百科",
f"- 请求词条:{query_name}",
f"- 页面标题:{title}",
f"- 请求 URL{requested_url}",
f"- 最终 URL{final_url}",
f"- 抓取时间:{now}",
"- 转换方式web_agent.fetch_baidu_baike_text fallback",
"",
]
if basic_info:
lines.extend(["## 基本信息", "", "| 字段 | 值 |", "| --- | --- |"])
for key, val in basic_info:
lines.append(f"| {key.replace('|', '/')} | {val.replace('|', '/')} |")
lines.append("")
lines.extend(["## 页面正文 Markdown", ""])
heading_count = 0
paragraph_count = 0
for block in re.split(r"\n{2,}", body):
block = compact(block)
if not block or is_bad_line(block):
continue
if block.startswith("### "):
heading = clean_heading(block.removeprefix("### "))
if heading and not is_bad_line(heading):
lines.extend([f"### {heading}", ""])
heading_count += 1
continue
lines.extend([block, ""])
paragraph_count += 1
markdown = "\n".join(lines).strip() + "\n"
stats = {
"page_title": title,
"basic_info_count": len(basic_info),
"heading_count": heading_count,
"paragraph_count": paragraph_count,
"table_count": 0,
"markdown_chars": len(markdown),
}
return markdown, stats
def external_markdown_to_markdown(
raw_markdown: str,
source_name: str,
query_name: str,
requested_url: str,
final_url: str,
engine: str,
) -> tuple[str, dict]:
raw_markdown = raw_markdown.strip()
raw_markdown = "\n".join(
line.rstrip()
for line in raw_markdown.splitlines()
if not any(token in line for token in BAD_LINE_TOKENS)
).strip()
now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
lines = [
f"# {source_name}",
"",
"## 元数据",
"",
"- 数据源:百度百科",
f"- 请求词条:{query_name}",
f"- 页面标题:{source_name}",
f"- 请求 URL{requested_url}",
f"- 最终 URL{final_url}",
f"- 抓取时间:{now}",
f"- 转换方式:{engine}",
"",
"## 页面正文 Markdown",
"",
raw_markdown,
"",
]
markdown = "\n".join(lines).strip() + "\n"
heading_count = len(re.findall(r"(?m)^#{1,6}\s+", raw_markdown))
paragraph_count = len([x for x in re.split(r"\n{2,}", raw_markdown) if len(compact(x)) >= 20])
stats = {
"page_title": source_name,
"basic_info_count": 0,
"heading_count": heading_count,
"paragraph_count": paragraph_count,
"table_count": raw_markdown.count("\n|"),
"markdown_chars": len(markdown),
}
return markdown, stats
def stats_good_enough(stats: dict) -> bool:
if stats.get("fallback_body_used"):
return False
return (
(
stats.get("markdown_chars", 0) >= 800
and stats.get("paragraph_count", 0) >= 2
)
or (
stats.get("markdown_chars", 0) >= 600
and stats.get("paragraph_count", 0) >= 4
and stats.get("heading_count", 0) >= 3
)
)
def crawl_one(
name: str,
out_dir: Path,
idx: int | None = None,
force: bool = False,
attempts: int = 3,
) -> MarkdownPage:
out_dir.mkdir(parents=True, exist_ok=True)
md_name = f"{slugify(name, idx)}.md"
md_path = out_dir / md_name
if md_path.exists() and md_path.stat().st_size > 500 and not force:
return MarkdownPage(
name=name,
query_name=name,
requested_url=baike_url(name),
final_url="cached",
title=name,
markdown_file=md_name,
markdown_chars=md_path.stat().st_size,
paragraph_count=0,
heading_count=0,
basic_info_count=0,
table_count=0,
)
last_error = ""
for attempt in range(1, attempts + 1):
for query_name in query_terms(name):
requested = baike_url(query_name)
try:
html, final_url, browser_title = fetch_rendered_html(requested)
if looks_like_anti_crawl(html, final_url, browser_title):
last_error = f"anticrawl: {final_url}"
else:
markdown, stats = html_to_markdown(html, name, query_name, requested, final_url)
if stats_good_enough(stats):
md_path.write_text(markdown, encoding="utf-8")
return MarkdownPage(
name=name,
query_name=query_name,
requested_url=requested,
final_url=final_url,
title=stats["page_title"] or browser_title or name,
markdown_file=md_name,
markdown_chars=stats["markdown_chars"],
paragraph_count=stats["paragraph_count"],
heading_count=stats["heading_count"],
basic_info_count=stats["basic_info_count"],
table_count=stats["table_count"],
)
last_error = f"too short: {stats['markdown_chars']} chars from {final_url}"
raw_text, text_final_url = fetch_existing_baike_text_with_retries(requested, attempts=2)
if raw_text and len(raw_text) >= 500:
markdown, stats = baike_text_to_markdown(
raw_text,
name,
query_name,
requested,
text_final_url or final_url,
)
if stats_good_enough(stats):
final_url = text_final_url or final_url
md_path.write_text(markdown, encoding="utf-8")
return MarkdownPage(
name=name,
query_name=query_name,
requested_url=requested,
final_url=final_url,
title=stats["page_title"] or browser_title or name,
markdown_file=md_name,
markdown_chars=stats["markdown_chars"],
paragraph_count=stats["paragraph_count"],
heading_count=stats["heading_count"],
basic_info_count=stats["basic_info_count"],
table_count=stats["table_count"],
)
last_error = f"fallback too short: {stats['markdown_chars']} chars from {text_final_url}"
c4_md, c4_final_url = fetch_crawl4ai_markdown(requested)
if c4_md and len(c4_md) >= 800:
markdown, stats = external_markdown_to_markdown(
c4_md,
name,
query_name,
requested,
c4_final_url if not c4_final_url.startswith("crawl4ai ") else final_url,
"Crawl4AI",
)
if stats_good_enough(stats):
final_url = c4_final_url if not c4_final_url.startswith("crawl4ai ") else final_url
md_path.write_text(markdown, encoding="utf-8")
return MarkdownPage(
name=name,
query_name=query_name,
requested_url=requested,
final_url=final_url,
title=stats["page_title"] or browser_title or name,
markdown_file=md_name,
markdown_chars=stats["markdown_chars"],
paragraph_count=stats["paragraph_count"],
heading_count=stats["heading_count"],
basic_info_count=stats["basic_info_count"],
table_count=stats["table_count"],
)
elif c4_final_url and not c4_final_url.startswith("crawl4ai unavailable"):
last_error = c4_final_url
except Exception as exc: # noqa: BLE001
last_error = str(exc)[:300]
time.sleep(1.2 * attempt + random.uniform(0.4, 1.6))
md_path.write_text(
f"# {name}\n\n抓取失败:{last_error or 'unknown error'}\n",
encoding="utf-8",
)
return MarkdownPage(
name=name,
query_name=name,
requested_url=baike_url(name),
final_url="",
title=name,
markdown_file=md_name,
markdown_chars=md_path.stat().st_size,
paragraph_count=0,
heading_count=0,
basic_info_count=0,
table_count=0,
error=last_error or "unknown error",
)
def write_manifest(rows: list[MarkdownPage], out_dir: Path) -> None:
data = []
for row in rows:
item = asdict(row)
item["ok"] = not bool(row.error)
data.append(item)
(out_dir / "manifest.json").write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
lines = [
"# 百度百科 Markdown 抓取清单",
"",
f"- 生成时间:{datetime.now(timezone.utc).astimezone().isoformat(timespec='seconds')}",
f"- 文件数:{len(rows)}",
"",
"| # | 名称 | Markdown | 字符 | 章节 | 段落/列表 | 基本信息 | 状态 |",
"| ---: | --- | --- | ---: | ---: | ---: | ---: | --- |",
]
for i, row in enumerate(rows, 1):
status = "失败" if row.error else "OK"
lines.append(
f"| {i} | {row.name} | [{row.markdown_file}](./{row.markdown_file}) | "
f"{row.markdown_chars} | {row.heading_count} | {row.paragraph_count} | "
f"{row.basic_info_count} | {status} |"
)
(out_dir / "index.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--name", help="Baidu Baike entry name")
parser.add_argument("--batch-guizhou", action="store_true", help="Crawl built-in 20 Guizhou scenic spots")
parser.add_argument("--out-dir", default=str(OUT_DIR))
parser.add_argument("--sleep", type=float, default=1.2)
parser.add_argument("--force", action="store_true")
parser.add_argument("--attempts", type=int, default=3)
args = parser.parse_args()
if not args.name and not args.batch_guizhou:
parser.error("use --name or --batch-guizhou")
out_dir = Path(args.out_dir)
names = SCENIC_SPOTS if args.batch_guizhou else [args.name]
rows: list[MarkdownPage] = []
for idx, name in enumerate(names, 1):
print(f"[crawl] {idx:02d}/{len(names)} {name}", flush=True)
row = crawl_one(
name,
out_dir,
idx if args.batch_guizhou else None,
force=args.force,
attempts=max(1, args.attempts),
)
rows.append(row)
mark = "FAIL" if row.error else "OK"
print(
f" [{mark}] chars={row.markdown_chars} headings={row.heading_count} "
f"paras={row.paragraph_count} basic={row.basic_info_count} file={row.markdown_file}",
flush=True,
)
if idx < len(names):
time.sleep(args.sleep + random.random())
write_manifest(rows, out_dir)
print(f"[done] {out_dir}", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())