"""web_agent —— 通用联网采集(P2,多站点 + 反爬加固)。 来源策略(稳 → 富): 1) 维基百科官方 API(为程序访问设计、合法、无反爬,带 UA 即可)—— 主源; 2) 隐身真浏览器 `fetch_page(url)` —— 通用兜底,可抓任意站点(百度百科、 官方文旅站等,未来多站点任务复用),带业界标准反检测层。 反检测层为本项目自写实现(webdriver/chrome/plugins/languages/permissions/ WebGL 伪装 + 启动参数 + 拟人化),思路与开源 playwright-stealth 一致; 研究了 agent-browser-runtime 的做法,但未拷贝其受限源码。 隐私红线沿用:只用 名称/地址/区县/类别 锚定(**不发坐标、不发电话**); 公开网页/百科数据非高德机密。Playwright 用同步 API 跑在 to_thread。 """ from __future__ import annotations import asyncio import json import random import re from urllib.parse import quote import httpx import hashlib from app.config import settings # noqa: F401 (some helpers may use) from app.db import get_agent_settings, sa_save_evidence from app.llm_client import LlmClient # noqa: F401 (fetch helpers may use) from app.agents.distill_gate import ATTR_FIELDS from app.agents.multi_extract import build_extract_pool, fan_out, decide _UA = ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/124.0.0.0 Safari/537.36") _WIKI_UA = "ZNKG-KnowledgeGraphBot/1.0 (admin tourism KG; contact ops)" # 业务口径:不截断网页正文(全部喂 opus,保抽取质量;原页另存证据层留底) # 业界标准反检测 init 脚本(本项目自写;进真站点前注入) _STEALTH_JS = r""" (() => { const d=(o,k,g)=>{try{Object.defineProperty(o,k,{get:g,configurable:true})}catch(e){}}; d(Navigator.prototype,'webdriver',()=>undefined); d(Navigator.prototype,'languages',()=>['zh-CN','zh','en']); d(Navigator.prototype,'language',()=>'zh-CN'); d(Navigator.prototype,'vendor',()=>'Google Inc.'); if(!window.chrome){try{Object.defineProperty(window,'chrome',{value:{runtime:{}},configurable:true})}catch(e){}} try{window.chrome.app={isInstalled:false};window.chrome.csi=()=>({});window.chrome.loadTimes=()=>({});}catch(e){} const mk=(a)=>{a.item=(i)=>a[i]||null;a.namedItem=(n)=>a.find(x=>x.name===n)||null;return a;}; const plugins=mk([{name:'Chrome PDF Plugin'},{name:'Chrome PDF Viewer'},{name:'Native Client'}]); d(Navigator.prototype,'plugins',()=>plugins); const q=navigator.permissions&&navigator.permissions.query; if(q){navigator.permissions.query=(p)=>p&&p.name==='notifications' ?Promise.resolve({state:Notification.permission,onchange:null}):q(p);} const pw=(proto)=>{if(!proto||!proto.getParameter)return;const o=proto.getParameter; proto.getParameter=function(p){if(p===37445)return 'Intel Inc.'; if(p===37446)return 'Intel Iris OpenGL Engine';return o.apply(this,arguments);};}; pw(window.WebGLRenderingContext&&WebGLRenderingContext.prototype); pw(window.WebGL2RenderingContext&&WebGL2RenderingContext.prototype); })(); """ _CHROME_ARGS = [ "--disable-blink-features=AutomationControlled", "--no-first-run", "--no-default-browser-check", "--disable-sync", "--disable-default-apps", "--no-sandbox", "--disable-dev-shm-usage", ] _EXTRACT_SYS = """你是知识图谱数据抽取与对齐器。下面给你: ①【实体锚点】名称/高德地址/区县/类别(用于确认网页是不是这个地点); ②【网页正文】从公开权威页面抓取的真实文本; ③【图谱现有软字段】该地点图谱里已存在的值(可能来自上一次富集)。 任务: 1) 先判断网页是否就是这个锚点地点:不是/无法确认 → entity_match=false; 2) 仅依据网页正文(不得编造),抽取并与图谱对齐这些软字段: summary/history/features/suitable_for/best_season/ticket_hint - 图谱该字段为空 → 放 adopt(给出依据网页的准确简洁值) - 图谱已有且与网页一致 → 放 keep - 图谱已有且与网页实质矛盾 → 放 conflict(existing/web/理由,绝不擅改) 3) 网页中明显有、但上面6个软字段装不下的"结构化知识点" (如 占地面积、海拔、文保级别、4A/5A、著名人物、重大历史事件、所属景区等) → 放 schema_gaps:[{"attr":"中文属性名","field":"英文蛇形","value":"取值","why":"为何值得入模型"}] 只输出 JSON: {"entity_match":true|false, "adopt":{"字段":"值"},"keep":["字段"], "conflict":[{"field":"","existing":"","web":"","note":""}], "schema_gaps":[{"attr":"","field":"","value":"","why":""}], "confidence":0~1}""" _AGG_SYS = """你是网页抽取裁决器。下面是多个模型对**同一份网页正文 + 同一个锚点** 各自给出的抽取结果(JSON)。请你裁决合并: A. entity_match: 若 ≥半数模型说 false → 最终 false; 否则 true. B. adopt 软字段(summary/history/features/suitable_for/best_season/ticket_hint): - ≥2 模型对同一字段给出兼容/一致内容 → 采纳, 综合最完整版作为最终值 - 只 1 模型给的或互相矛盾 → 不进 adopt, 进 uncertain C. keep: ≥半数模型说一致 → 进 keep D. conflict: 任一模型标出与图谱矛盾 → 进 conflict, 多模型矛盾合并描述 E. schema_gaps: 同一 attr 在 ≥2 模型出现 → 高信(consensus=true); 单模型独发 → 低信(consensus=false) 合并时: - 列表型(著名人物/景点/事件): 取所有模型并集去重 - 数值型(面积/海拔/绿地率): 多模型一致 → 高信, 不一致 → 注明分歧 - 描述型: 取最完整一份 F. 最终 confidence: 看共识程度, ≥3/4 模型一致 → 0.9+, 多分歧 → ≤0.6 只输出 JSON: {"entity_match":true|false, "adopt":{"字段":"值"},"keep":["字段"], "conflict":[{"field":"","existing":"","web":"","note":""}], "uncertain":["字段"], "schema_gaps":[{"attr":"","field":"","value":"","why":"", "consensus":true|false,"voted_by":["model_key1","..."]}], "confidence":0~1}""" def _wiki_text(name: str) -> tuple[str | None, str | None]: """维基百科官方 API 取纯文本摘录(主源:合法、稳定、无反爬)。""" try: r = httpx.get( "https://zh.wikipedia.org/w/api.php", params={"format": "json", "action": "query", "prop": "extracts", "explaintext": 1, "redirects": 1, "titles": name}, headers={"User-Agent": _WIKI_UA, "Accept": "application/json"}, timeout=20, verify=False, follow_redirects=True) pages = r.json().get("query", {}).get("pages", {}) page = next(iter(pages.values()), {}) ex = (page.get("extract") or "").strip() if len(ex) >= 120: title = page.get("title", name) return (re.sub(r"\s+", " ", ex), f"https://zh.wikipedia.org/wiki/{quote(title)}") except Exception: pass return None, None def fetch_page(url: str, wait_selector: str | None = None) -> tuple[str | None, str]: """通用隐身浏览器抓任意页面渲染正文。返回 (text, final_url)。 供未来多站点采集复用(百科/官方文旅站/名录…)。 """ try: from playwright.sync_api import sync_playwright with sync_playwright() as p: b = p.chromium.launch( headless=True, args=_CHROME_ARGS, ignore_default_args=["--enable-automation"]) ctx = b.new_context( user_agent=_UA, locale="zh-CN", viewport={"width": 1440, "height": 900}, extra_http_headers={"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"}) ctx.add_init_script(_STEALTH_JS) pg = ctx.new_page() pg.goto(url, timeout=45000, wait_until="domcontentloaded") if wait_selector: try: pg.wait_for_selector(wait_selector, timeout=8000) except Exception: pass pg.wait_for_timeout(random.randint(900, 1800)) # 拟人化停顿 try: pg.mouse.wheel(0, random.randint(600, 1600)) pg.wait_for_timeout(random.randint(400, 900)) except Exception: pass final = pg.url body = "" for sel in ("div.J-lemma-content", "main", "article", "body"): try: body = pg.inner_text(sel) if body and len(body) > 120: break except Exception: continue b.close() except Exception as e: # noqa: BLE001 return None, f"ERR:{str(e)[:120]}" if not body or len(body) < 120: return None, final return re.sub(r"\s+", " ", body).strip(), final def fetch_baidu_baike_text(url: str) -> tuple[str | None, str]: """抓百度百科词条的摘要、基本信息表和正文。 百度百科新版页面的“基本信息”不在 div.J-lemma-content 内;如果只抓正文, 会漏掉地理位置、开放时间、景点级别、门票、面积等高价值结构化字段。 """ try: from playwright.sync_api import sync_playwright with sync_playwright() as p: b = p.chromium.launch( headless=True, args=_CHROME_ARGS, ignore_default_args=["--enable-automation"]) ctx = b.new_context( user_agent=_UA, locale="zh-CN", viewport={"width": 1440, "height": 900}, extra_http_headers={"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"}) ctx.add_init_script(_STEALTH_JS) pg = ctx.new_page() pg.goto(url, timeout=45000, wait_until="domcontentloaded") pg.wait_for_timeout(random.randint(900, 1800)) final = pg.url parts = pg.evaluate( r"""() => { const clean = (s) => (s || '').replace(/\s+/g, ' ').trim(); const out = []; const seen = new Set(); const add = (label, text) => { text = clean(text); if (!text || text.length < 2 || seen.has(label + text)) return; seen.add(label + text); out.push(label ? `${label}:${text}` : text); }; const title = clean(document.querySelector('h1')?.innerText); if (title) add('词条名', title); const wanted = new Set([ '中文名', '地理位置', '气候条件', '开放时间', '景点级别', '门票价格', '占地面积', '著名景点', '邻近景点', '美誉', '美 誉', '所属国家', '所属城市', '建议游玩时长', '适宜游玩季节' ]); // 新版百度百科基本信息通常是 dt/dd 对,class 名带 hash。 const dts = Array.from(document.querySelectorAll('dt')); for (const dt of dts) { const key = clean(dt.innerText); if (!wanted.has(key)) continue; const dd = dt.nextElementSibling; const val = clean(dd && dd.tagName === 'DD' ? dd.innerText : ''); if (key && val) add(key, val); } // 兼容少数新版卡片:label/value 不一定使用 dt/dd。 const bodyText = clean(document.body?.innerText || ''); for (const key of wanted) { if (out.some(x => x.startsWith(key + ':'))) continue; const re = new RegExp(`${key}\\s+([^\\n]{1,80})`); const m = bodyText.match(re); if (m) add(key, m[1]); } const summary = clean(document.querySelector('.lemmaSummary, .lemmaWgt-lemmaSummary')?.innerText); if (summary) add('摘要', summary); const content = clean(document.querySelector('div.J-lemma-content')?.innerText); if (content) add('正文', content); return out.join('\n'); }""" ) b.close() except Exception as e: # noqa: BLE001 return None, f"ERR:{str(e)[:120]}" if not parts or len(parts) < 120: return None, final return re.sub(r"\s+", " ", parts).strip(), final def _baike_text(name: str) -> tuple[str | None, str | None]: url = f"https://baike.baidu.com/item/{quote(name)}" text, final = fetch_baidu_baike_text(url) if not text: return None, url return text, final async def fetch_entity_text(name: str) -> tuple[str | None, str | None, str]: """稳→富:维基 API 主源;不足则隐身浏览器抓百度百科兜底。""" text, url = await asyncio.to_thread(_wiki_text, name) if text: return text, url, "wikipedia" text, url = await asyncio.to_thread(_baike_text, name) if text: return text, url, "baike" return None, url, "none" async def web_enrich(entity: dict) -> dict: """联网抓权威网页 → 多模型抽取(3 抽 + 1 决策) → 对齐 + schema 缺口。 主 agent (opus/global) 不参与抽取, 只调度。抽取走 extract 池(deepseek/doubao/qwen 等), 共享 distill.models 的 API 配置, 避免主 agent 一处欠费全瘫。 返回 {ok, found, entity_match, adopt, keep, uncertain, conflict, schema_gaps, confidence, url, source, summary}。 schema_gaps 每条额外带 consensus/voted_by (谁支持). """ cfg = await get_agent_settings() extractors, agg, status_msg = build_extract_pool(cfg) if len(extractors) < 1 or agg is None: return {"ok": False, "summary": f"知识抽取未配置:{status_msg}" f"(在系统设置 → 知识抽取 卡里启用)"} name = entity.get("name", "") pnk = entity.get("eid") or entity.get("natural_key") or "" text, url, source = await fetch_entity_text(name) if not text: return {"ok": True, "found": False, "url": url, "source": source, "adopt": {}, "keep": [], "conflict": [], "schema_gaps": [], "summary": "公开权威源无该词条/抓取失败,跳过(已标记)"} # 原页留底证据层(抓一次挖多次,后续业务需求免重抓) try: sid = "webpage:" + hashlib.md5( (url or name).encode("utf-8")).hexdigest()[:16] await sa_save_evidence([{ "platform": source, "kind": "web_page", "source_id": sid, "url": url or "", "entity_name": name, "place_natural_key": pnk, "keyword": name, "title": name, "content": text, "author": "", "author_id": "", "author_avatar": "", "likes": 0, "comments": 0, "collects": 0, "shares": 0, "publish_time": "", "location": "", "tags": [], "image_urls": [], "raw": {"url": url, "source": source, "char_count": len(text)}, }]) except Exception: pass body = json.dumps({ "实体锚点": {"名称": entity.get("name"), "高德地址": entity.get("address") or "", "区县": entity.get("district") or "", "类别": entity.get("place_type") or ""}, "网页正文": text, "图谱现有软字段": {k: v for k, v in (entity.get("existing") or {}).items() if v}, }, ensure_ascii=False) # === 阶段 1: 多模型扇出抽取 === responses = await fan_out(_EXTRACT_SYS, body, extractors) valid = [r for r in responses if r.get("data") and isinstance(r.get("data"), dict)] if not valid: errs = "; ".join(f"{r['model']}:{r.get('error','无返回')}" for r in responses if r.get("error"))[:120] return {"ok": False, "url": url, "source": source, "summary": f"多模型抽取全部失败({status_msg}); {errs}"} ok_models = [r["model"] for r in valid] # === 阶段 2: 决策器合并 === agg_input = json.dumps({ "锚点": {"名称": entity.get("name"), "地址": entity.get("address") or "", "区县": entity.get("district") or "", "类别": entity.get("place_type") or ""}, "图谱现有软字段": {k: v for k, v in (entity.get("existing") or {}).items() if v}, "多模型抽取": [{"model": r["model"], "data": r["data"]} for r in valid], }, ensure_ascii=False) decided, err = await decide(_AGG_SYS, agg_input, agg) if not decided: return {"ok": False, "url": url, "source": source, "summary": f"决策器({agg[0]})失败({err}); " f"抽取器={ok_models}"} if decided.get("entity_match") is False: return {"ok": True, "found": True, "entity_match": False, "adopt": {}, "keep": [], "conflict": [], "uncertain": [], "schema_gaps": [], "url": url, "source": source, "summary": f"网页({source})与锚点不符(同名异地),跳过"} adopt = {k: str(v).strip() for k, v in (decided.get("adopt") or {}).items() if k in ATTR_FIELDS and str(v).strip()} keep = [k for k in (decided.get("keep") or []) if k in ATTR_FIELDS] conflict = [c for c in (decided.get("conflict") or []) if isinstance(c, dict) and c.get("field") in ATTR_FIELDS] uncertain = [u for u in (decided.get("uncertain") or []) if u in ATTR_FIELDS] gaps = [] for s in (decided.get("schema_gaps") or []): if not isinstance(s, dict) or not s.get("attr") or not s.get("value"): continue gaps.append({ "attr": str(s.get("attr"))[:40], "field": str(s.get("field") or "")[:60], "value": s.get("value"), "why": str(s.get("why") or "")[:160], "consensus": bool(s.get("consensus")), "voted_by": [str(m)[:16] for m in (s.get("voted_by") or []) if m][:8], }) return {"ok": True, "found": True, "entity_match": True, "adopt": adopt, "keep": keep, "conflict": conflict, "uncertain": uncertain, "schema_gaps": gaps[:30], "confidence": decided.get("confidence"), "url": url, "source": source, "summary": f"{source} · {len(ok_models)}/{len(extractors)}模型抽取" f"({','.join(ok_models)}) · {agg[0]}决策 → " f"采纳{len(adopt)}·一致{len(keep)}·矛盾{len(conflict)}" f"·存疑{len(uncertain)}·schema缺口{len(gaps)}" f"·正文{len(text)}字"}