Files
bxh/scripts/crawl_guizhou_baike_scenic_md.py

636 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Crawl Baidu Baike pages for 20 well-known Guizhou scenic spots.
The output is a schema-building Markdown dataset: source metadata, basic-info
fields, short summary snippets, page outline, and candidate schema fields.
It intentionally does not store full article bodies.
"""
from __future__ import annotations
import argparse
import json
import random
import re
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import quote
ROOT = Path(__file__).resolve().parents[1]
OUT_DIR = ROOT / "schema搭建" / "baidu_baike_guizhou_scenic_20"
SCENIC_SPOTS = [
"黄果树瀑布",
"荔波小七孔景区",
"梵净山",
"西江千户苗寨",
"青岩古镇",
"镇远古城",
"肇兴侗寨",
"万峰林",
"马岭河峡谷",
"织金洞",
"百里杜鹃风景名胜区",
"赤水丹霞",
"龙宫风景区",
"遵义会议会址",
"甲秀楼",
"黔灵山公园",
"花溪公园",
"天河潭",
"南江大峡谷",
"乌蒙大草原",
]
BAIKE_QUERY_ALIASES = {
"荔波小七孔景区": ["荔波樟江风景名胜区", "小七孔"],
"镇远古城": ["黔东南苗族侗族自治州镇远古城旅游景区"],
"百里杜鹃风景名胜区": ["百里杜鹃景区", "贵州百里杜鹃风景名胜区"],
}
SCHEMA_FIELD_HINTS = [
"中文名",
"外文名",
"地理位置",
"气候条件",
"开放时间",
"景点级别",
"门票价格",
"占地面积",
"著名景点",
"建议游玩时长",
"适宜游玩季节",
"所属国家",
"所属城市",
"保护级别",
"主要景观",
"最佳旅游时间",
]
BAD_SNIPPET_TOKENS = [
"©",
"使用百度前必读",
"百科协议",
"隐私政策",
"百度百科合作平台",
"京ICP",
"营业执照",
]
@dataclass
class PageData:
name: str
requested_url: str
final_url: str
title: str
summary: str
basic_info: dict[str, str]
headings: list[str]
paragraph_count: int
text_char_count: int
error: str = ""
def _import_web_agent_constants():
sys.path.insert(0, str(ROOT))
try:
from app.agents.web_agent import _CHROME_ARGS, _STEALTH_JS, _UA
except Exception:
_UA = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
_CHROME_ARGS = [
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
"--disable-sync",
"--disable-default-apps",
"--no-sandbox",
"--disable-dev-shm-usage",
]
_STEALTH_JS = "Object.defineProperty(navigator,'webdriver',{get:()=>undefined});"
return _UA, _CHROME_ARGS, _STEALTH_JS
def slugify(name: str, idx: int) -> str:
safe = re.sub(r"[\\/:*?\"<>|\\s]+", "_", name).strip("_")
return f"{idx:02d}_{safe or 'scenic_spot'}"
def compact(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def short_snippet(text: str, limit: int = 120) -> str:
text = compact(text)
if len(text) <= limit:
return text
return text[:limit].rstrip(",。;、 ") + "..."
def usable_snippet(text: str, limit: int = 120) -> str:
snippet = short_snippet(text, limit)
if any(token in snippet for token in BAD_SNIPPET_TOKENS):
return ""
return snippet
def baike_url(name: str) -> str:
return f"https://baike.baidu.com/item/{quote(name)}"
def query_terms(name: str) -> list[str]:
terms = [name, *BAIKE_QUERY_ALIASES.get(name, [])]
deduped = []
for term in terms:
if term not in deduped:
deduped.append(term)
return deduped
def is_blocked_or_empty(data: PageData) -> bool:
if data.error:
return True
if "anticrawl" in data.final_url or "验证" in data.title:
return True
return data.text_char_count < 500 and not data.basic_info and not data.headings
def is_good_enough(data: PageData) -> bool:
if is_blocked_or_empty(data):
return False
return data.text_char_count >= 1200 or bool(data.basic_info) or len(data.headings) >= 5
def data_score(data: PageData) -> int:
if is_blocked_or_empty(data):
return -1
return data.text_char_count + 600 * len(data.basic_info) + 120 * len(data.headings)
def parse_web_agent_text(name: str, text: str, final_url: str, requested_url: str) -> PageData:
info: dict[str, str] = {}
keys = ["词条名", *SCHEMA_FIELD_HINTS]
key_alt = "|".join(re.escape(k) for k in keys)
for key in keys:
pat = rf"{re.escape(key)}(.{{1,260}}?)(?=\s(?:{key_alt}|正文)|$)"
m = re.search(pat, text)
if not m:
continue
value = compact(m.group(1))
if value and len(value) <= 240:
info[key] = value
summary = ""
m = re.search(r"摘要:(.{20,360}?)(?=\s(?:正文|词条名|中文名|地理位置|开放时间)|$)", text)
if m:
summary = compact(m.group(1))
elif "正文:" in text:
summary = compact(text.split("正文:", 1)[1])[:360]
headings = []
for heading in re.findall(r"([\u4e00-\u9fa5A-Za-z0-9·、()]{2,24})\s+播报\s+编辑", text):
heading = compact(heading)
if heading and heading not in headings and not heading.startswith("参考资料"):
headings.append(heading)
title = info.get("词条名") or name
info.pop("词条名", None)
return PageData(
name=name,
requested_url=requested_url,
final_url=final_url,
title=title,
summary=summary,
basic_info=info,
headings=headings[:80],
paragraph_count=max(0, text.count("")),
text_char_count=len(text),
)
def fetch_with_existing_baike_code(name: str, query_name: str) -> PageData:
sys.path.insert(0, str(ROOT))
from app.agents.web_agent import fetch_baidu_baike_text
requested = baike_url(query_name)
text, final_url = fetch_baidu_baike_text(requested)
if not text:
return PageData(
name=name,
requested_url=requested,
final_url=final_url or requested,
title=name,
summary="",
basic_info={},
headings=[],
paragraph_count=0,
text_char_count=0,
error="百度百科抓取为空或进入验证页",
)
return parse_web_agent_text(name, text, final_url or requested, requested)
def fetch_page_data(
name: str,
query_name: str,
timeout_ms: int = 45000,
headless: bool = True,
keep_open_seconds: int = 0,
) -> PageData:
from playwright.sync_api import sync_playwright
ua, chrome_args, stealth_js = _import_web_agent_constants()
requested = baike_url(query_name)
with sync_playwright() as p:
browser = p.chromium.launch(
headless=headless,
args=chrome_args,
ignore_default_args=["--enable-automation"],
slow_mo=250 if not headless else 0,
)
ctx = browser.new_context(
user_agent=ua,
locale="zh-CN",
viewport={"width": 1440, "height": 900},
extra_http_headers={"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"},
)
ctx.add_init_script(stealth_js)
page = ctx.new_page()
page.goto(requested, timeout=timeout_ms, wait_until="domcontentloaded")
page.wait_for_timeout(random.randint(900, 1600))
try:
page.mouse.wheel(0, random.randint(600, 1400))
page.wait_for_timeout(random.randint(400, 900))
except Exception:
pass
payload = page.evaluate(
r"""() => {
const clean = (s) => (s || '').replace(/\s+/g, ' ').trim();
const pickText = (sel) => clean(document.querySelector(sel)?.innerText || '');
const title = pickText('h1') || document.title;
let summary = pickText('.lemmaSummary, .lemmaWgt-lemmaSummary');
const basic = {};
const wanted = new Set([
'中文名', '外文名', '地理位置', '气候条件', '开放时间', '景点级别',
'门票价格', '占地面积', '著名景点', '建议游玩时长', '适宜游玩季节',
'所属国家', '所属城市', '保护级别', '主要景观', '最佳旅游时间'
]);
const addPair = (k, v) => {
k = clean(k).replace(/[:]+$/, '');
v = clean(v);
if (!wanted.has(k)) return;
if (!k || !v || k.length > 24 || v.length > 240) return;
if (!basic[k]) basic[k] = v;
};
// Old Baike layout.
const oldNames = Array.from(document.querySelectorAll('.basicInfo-item.name'));
const oldVals = Array.from(document.querySelectorAll('.basicInfo-item.value'));
for (let i = 0; i < Math.min(oldNames.length, oldVals.length); i++) {
addPair(oldNames[i].innerText, oldVals[i].innerText);
}
// New Baike layout often uses dt/dd.
const dts = Array.from(document.querySelectorAll('dt'));
for (const dt of dts) {
const dd = dt.nextElementSibling;
if (dd && dd.tagName === 'DD') addPair(dt.innerText, dd.innerText);
}
const headings = Array.from(document.querySelectorAll('h2, h3'))
.map(x => clean(x.innerText).replace(/\s*播报\s*编辑\s*$/, ''))
.filter(Boolean)
.filter((x, i, arr) => arr.indexOf(x) === i)
.slice(0, 80);
const ps = Array.from(document.querySelectorAll('div.J-lemma-content p, main p, article p, p'))
.map(x => clean(x.innerText))
.filter(x => x.length >= 8);
if (!summary && ps.length) summary = ps[0];
const body = clean(document.body?.innerText || '');
return {
title, summary, basic_info: basic, headings,
paragraph_count: ps.length,
text_char_count: body.length,
final_url: location.href
};
}"""
)
if keep_open_seconds > 0:
print(
"[debug] extracted "
f"title={compact(payload.get('title') or name)!r} "
f"final_url={payload.get('final_url') or requested} "
f"chars={int(payload.get('text_char_count') or 0)} "
f"basic={len(payload.get('basic_info') or {})} "
f"headings={len(payload.get('headings') or [])}",
flush=True,
)
print(
"[debug] browser kept open "
f"{keep_open_seconds}s; final_url={payload.get('final_url')}",
flush=True,
)
page.wait_for_timeout(keep_open_seconds * 1000)
browser.close()
return PageData(
name=name,
requested_url=requested,
final_url=payload.get("final_url") or requested,
title=compact(payload.get("title") or name),
summary=compact(payload.get("summary") or ""),
basic_info={str(k): str(v) for k, v in (payload.get("basic_info") or {}).items()},
headings=[str(x) for x in (payload.get("headings") or [])],
paragraph_count=int(payload.get("paragraph_count") or 0),
text_char_count=int(payload.get("text_char_count") or 0),
)
def debug_visible_flow(name: str, keep_open_seconds: int) -> int:
print(f"[debug] target={name}", flush=True)
for idx, query_name in enumerate(query_terms(name), 1):
print(f"[debug] try {idx}: {query_name} -> {baike_url(query_name)}", flush=True)
try:
data = fetch_page_data(
name,
query_name,
headless=False,
keep_open_seconds=keep_open_seconds,
)
except Exception as exc: # noqa: BLE001
print(f"[debug] failed: {str(exc)[:300]}", flush=True)
continue
print(
"[debug] result "
f"title={data.title!r} final_url={data.final_url} "
f"chars={data.text_char_count} basic={len(data.basic_info)} "
f"headings={len(data.headings)} blocked={is_blocked_or_empty(data)}",
flush=True,
)
if data.summary:
print(f"[debug] summary={usable_snippet(data.summary) or '<unusable/empty>'}", flush=True)
if data.basic_info:
print(f"[debug] basic_info={json.dumps(data.basic_info, ensure_ascii=False)}", flush=True)
return 0
def fetch_with_retries(name: str, attempts: int = 3) -> PageData:
last: PageData | None = None
best: PageData | None = None
best_score = -1
terms = query_terms(name)
for attempt in range(1, attempts + 1):
for query_name in terms:
try:
data = fetch_page_data(name, query_name)
last = data
score = data_score(data)
if score > best_score:
best, best_score = data, score
if is_good_enough(data):
return data
except Exception as exc: # noqa: BLE001
last = PageData(
name=name,
requested_url=baike_url(query_name),
final_url=baike_url(query_name),
title=name,
summary="",
basic_info={},
headings=[],
paragraph_count=0,
text_char_count=0,
error=str(exc)[:300],
)
time.sleep(1.5 * attempt + random.random())
try:
data = fetch_with_existing_baike_code(name, query_name)
last = data
score = data_score(data)
if score > best_score:
best, best_score = data, score
if is_good_enough(data):
return data
except Exception as exc: # noqa: BLE001
last = PageData(
name=name,
requested_url=baike_url(query_name),
final_url=baike_url(query_name),
title=name,
summary="",
basic_info={},
headings=[],
paragraph_count=0,
text_char_count=0,
error=f"existing baike fallback failed: {str(exc)[:260]}",
)
time.sleep(1.5 * attempt + random.random())
result = best if best is not None and best_score >= 0 else last
assert result is not None
if is_blocked_or_empty(result) and not result.error:
result.error = f"百度百科页面为空、验证页或未抓到有效结构化信息;已尝试词条:{', '.join(terms)}"
return result
def schema_candidates(data: PageData) -> list[dict[str, str]]:
candidates = []
for key in SCHEMA_FIELD_HINTS:
value = data.basic_info.get(key)
if not value:
continue
candidates.append({
"field_cn": key,
"value": value,
"source": "百度百科基本信息",
})
for heading in data.headings:
if any(token in heading for token in ("历史", "地理", "景点", "景观", "交通", "文化", "保护", "荣誉")):
candidates.append({
"field_cn": f"章节:{heading}",
"value": "页面存在该主题章节,可作为 schema/关系抽取候选",
"source": "百度百科章节结构",
})
return candidates[:40]
def markdown_for(data: PageData, idx: int) -> str:
now = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
snippet = usable_snippet(data.summary)
lines = [
f"# {data.name}",
"",
"## 来源",
"",
f"- 数据源:百度百科",
f"- 请求词条:{data.name}",
f"- 页面标题:{data.title}",
f"- 请求 URL{data.requested_url}",
f"- 最终 URL{data.final_url}",
f"- 抓取时间:{now}",
f"- 页面文本规模:约 {data.text_char_count} 字符,段落 {data.paragraph_count}",
"",
"## 短摘要",
"",
f"> {snippet if snippet else '未抓到可用短摘要。'}",
"",
"## 基本信息",
"",
]
if data.basic_info:
lines.extend(["| 字段 | 值 |", "| --- | --- |"])
for key, value in data.basic_info.items():
lines.append(f"| {key} | {value.replace('|', '/')} |")
else:
msg = "未抓到基本信息表。"
if data.error:
msg += f" 抓取备注:{data.error}"
lines.append(msg)
lines.extend(["", "## Schema 搭建候选", ""])
candidates = schema_candidates(data)
if candidates:
lines.extend(["| 候选字段/主题 | 值或说明 | 来源 |", "| --- | --- | --- |"])
for item in candidates:
lines.append(
f"| {item['field_cn']} | {str(item['value']).replace('|', '/')} | {item['source']} |"
)
else:
lines.append("暂无。")
lines.extend(["", "## 页面结构", ""])
if data.headings:
for heading in data.headings:
lines.append(f"- {heading}")
else:
lines.append("未抓到章节标题。")
lines.extend([
"",
"## 使用说明",
"",
"- 本文件用于 schema 搭建和字段候选分析。",
"- 为避免直接沉淀整篇百科长文,只保存结构化字段、短摘要和页面结构。",
"- 需要复核事实时,请回到上方最终 URL 查看原页面。",
"",
])
return "\n".join(lines)
def write_dataset(rows: list[PageData], out_dir: Path) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
for stale in out_dir.glob("*.md"):
stale.unlink()
manifest = []
for idx, row in enumerate(rows, 1):
stem = slugify(row.name, idx)
md_path = out_dir / f"{stem}.md"
md_path.write_text(markdown_for(row, idx), encoding="utf-8")
manifest.append({
"index": idx,
"name": row.name,
"title": row.title,
"requested_url": row.requested_url,
"final_url": row.final_url,
"markdown_file": md_path.name,
"basic_info_count": len(row.basic_info),
"heading_count": len(row.headings),
"text_char_count": row.text_char_count,
"error": row.error,
})
(out_dir / "manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2),
encoding="utf-8",
)
index_lines = [
"# 贵州著名景区百度百科 Schema 搭建数据集",
"",
f"- 生成时间:{datetime.now(timezone.utc).astimezone().isoformat(timespec='seconds')}",
f"- 景区数量:{len(rows)}",
"- 说明Markdown 文件保存结构化字段、短摘要、页面结构和 schema 候选,不保存整篇百科长文。",
"",
"| 序号 | 景区 | Markdown | 基本信息字段 | 章节数 | 来源 |",
"| --- | --- | --- | ---: | ---: | --- |",
]
for item in manifest:
index_lines.append(
f"| {item['index']} | {item['name']} | "
f"[{item['markdown_file']}](./{item['markdown_file']}) | "
f"{item['basic_info_count']} | {item['heading_count']} | "
f"[百度百科]({item['final_url']}) |"
)
(out_dir / "index.md").write_text("\n".join(index_lines) + "\n", encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--out-dir", default=str(OUT_DIR), help="Output folder")
parser.add_argument("--limit", type=int, default=20, help="Number of scenic spots to crawl")
parser.add_argument("--sleep", type=float, default=1.5, help="Polite delay between pages")
parser.add_argument("--force", action="store_true", help="Re-crawl even if files already exist")
parser.add_argument("--debug-name", help="Open one scenic spot in a visible browser and print extraction diagnostics")
parser.add_argument("--keep-open-seconds", type=int, default=600, help="Visible debug browser hold time")
args = parser.parse_args()
if args.debug_name:
return debug_visible_flow(args.debug_name, args.keep_open_seconds)
out_dir = Path(args.out_dir)
names = SCENIC_SPOTS[: max(1, min(args.limit, len(SCENIC_SPOTS)))]
rows: list[PageData] = []
for idx, name in enumerate(names, 1):
stem = slugify(name, idx)
md_path = out_dir / f"{stem}.md"
if md_path.exists() and not args.force:
print(f"[skip] {idx:02d}/{len(names)} {name} exists", flush=True)
continue
print(f"[crawl] {idx:02d}/{len(names)} {name}", flush=True)
try:
row = fetch_with_retries(name)
except Exception as exc: # noqa: BLE001
row = PageData(
name=name,
requested_url=baike_url(name),
final_url=baike_url(name),
title=name,
summary="",
basic_info={},
headings=[],
paragraph_count=0,
text_char_count=0,
error=str(exc)[:300],
)
print(f" [error] {row.error}", flush=True)
if row.error:
print(f" [warn] {row.error}", flush=True)
rows.append(row)
time.sleep(args.sleep + random.random() * 0.8)
if not rows and (out_dir / "manifest.json").exists():
print(f"[done] no recrawl needed: {out_dir}", flush=True)
return 0
if rows:
write_dataset(rows, out_dir)
print(f"[done] wrote {len(rows)} markdown files to {out_dir}", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())