#!/usr/bin/env python3 """AMap H3 spatial collector for a clean city knowledge graph. The old Super Agent grid used rectangular bbox subdivision. This collector uses district boundaries -> H3 cells -> AMap polygon search, records every task, and writes POI spatial properties into a dedicated FalkorDB graph. """ from __future__ import annotations import argparse import json import math import os import re import sys import time from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any import h3 import psycopg import requests import urllib3 from falkordb import FalkorDB from psycopg.rows import dict_row ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from app.config import settings # noqa: E402 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) AMAP_DISTRICT_URL = "https://restapi.amap.com/v3/config/district" AMAP_POLYGON_URL = "https://restapi.amap.com/v3/place/polygon" PAGE_SIZE = 25 MAX_PAGE = 8 SATURATE_COUNT = 180 MAX_RESOLUTION = 9 TYPE_GROUPS: dict[str, dict[str, str]] = { "core": { "景点": "110000", "美食": "050000", "酒店": "100000", "商场": "060000", "交通设施": "150000", }, "tourism": { "景点": "110000", "美食": "050000", "酒店": "100000", "商场": "060000", "交通设施": "150000", "生活服务": "070000", }, "all": { "汽车服务": "010000", "汽车销售": "020000", "汽车维修": "030000", "摩托车服务": "040000", "美食": "050000", "商场": "060000", "生活服务": "070000", "体育休闲": "080000", "医疗保健": "090000", "酒店": "100000", "景点": "110000", "商务住宅": "120000", "政府机构": "130000", "科教文化": "140000", "交通设施": "150000", "金融保险": "160000", "公司企业": "170000", "道路附属": "180000", "地名地址": "190000", "公共设施": "200000", }, } DISTRICT_PRIORITY = { "云岩区": 1, "南明区": 2, "观山湖区": 3, "花溪区": 4, "白云区": 5, "乌当区": 6, "清镇市": 7, "修文县": 8, "息烽县": 9, "开阳县": 10, } TYPE_PRIORITY = { "景点": 1, "美食": 2, "酒店": 3, "交通设施": 4, "商场": 5, "生活服务": 6, "医疗保健": 7, "科教文化": 8, } PLACE_TYPE = { "景点": "sight", "美食": "eat", "酒店": "hotel", "商场": "mall", "交通设施": "transit", "医疗保健": "medical", "科教文化": "education", "生活服务": "life", "公共设施": "facility", "政府机构": "government", "商务住宅": "residential", "公司企业": "enterprise", } FALKOR_LABEL = { "景点": "ScenicSpot", "美食": "FoodPlace", "酒店": "Hotel", "商场": "Mall", "交通设施": "TransitFacility", "医疗保健": "MedicalPlace", "科教文化": "EducationPlace", "生活服务": "LifeServicePlace", "公共设施": "Facility", "政府机构": "GovernmentPlace", "商务住宅": "ResidentialPlace", "公司企业": "EnterprisePlace", } def now_iso() -> str: return datetime.now(timezone.utc).isoformat() def mask_secret(text: Any) -> str: return re.sub(r"([?&]key=)[^&\s]+", r"\1***", str(text or "")) def clean(v: Any) -> str: if v is None or v == []: return "" if isinstance(v, list): return " | ".join(str(x) for x in v if x) return str(v) def amap_key() -> str: for candidate in ( settings.amap_web_key, os.environ.get("AMAP_WEB_KEY"), os.environ.get("AMAP_KEY"), ): if candidate: return candidate try: from app.agents.gaode_connector import _amap_key return _amap_key() except Exception as exc: # pragma: no cover - only for local misconfig raise RuntimeError("未找到高德 Web 服务 Key,请配置 AMAP_WEB_KEY 或保留旧采集脚本 Key") from exc def amap_get(url: str, params: dict[str, Any], timeout: int = 30) -> dict[str, Any]: params = {**params, "key": amap_key(), "output": "json"} response = requests.get(url, params=params, timeout=timeout, verify=False) response.raise_for_status() return response.json() def split_polyline(polyline: str) -> list[list[list[float]]]: loops: list[list[list[float]]] = [] for part in (polyline or "").split("|"): coords: list[list[float]] = [] for pair in part.split(";"): if not pair: continue lng, lat = map(float, pair.split(",")) coords.append([lng, lat]) if coords and coords[0] != coords[-1]: coords.append(coords[0]) if len(coords) >= 4: loops.append(coords) return loops def geojson_for_loops(loops: list[list[list[float]]]) -> dict[str, Any]: return {"type": "MultiPolygon", "coordinates": [[loop] for loop in loops]} def district_detail(keyword: str, subdistrict: int = 0, extensions: str = "all") -> dict[str, Any]: payload = amap_get( AMAP_DISTRICT_URL, { "keywords": keyword, "subdistrict": subdistrict, "extensions": extensions, }, ) if payload.get("status") != "1" or not payload.get("districts"): raise RuntimeError(f"行政区查询失败: {keyword} {payload.get('info')} {payload.get('infocode')}") return payload["districts"][0] def h3_polygon(cell_id: str) -> str: boundary = list(h3.cell_to_boundary(cell_id)) pts = [(lng, lat) for lat, lng in boundary] pts.append(pts[0]) return "|".join(f"{lng:.6f},{lat:.6f}" for lng, lat in pts) def h3_fields(lat: float, lng: float) -> dict[str, str]: return {f"h3_r{r}": h3.latlng_to_cell(lat, lng, r) for r in range(6, 11)} def cell_center(cell_id: str) -> tuple[float, float]: lat, lng = h3.cell_to_latlng(cell_id) return lng, lat def cell_boundary_json(cell_id: str) -> str: return json.dumps([[lng, lat] for lat, lng in h3.cell_to_boundary(cell_id)], ensure_ascii=False) def conn(): return psycopg.connect(settings.database_url, row_factory=dict_row) def graph(graph_name: str): db = FalkorDB( host=settings.falkordb_host, port=settings.falkordb_port, password=settings.falkordb_password or None, ) return db.select_graph(graph_name) SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS __SCHEMA__.amap_spatial_collect_runs ( id BIGSERIAL PRIMARY KEY, graph_name TEXT NOT NULL, scope_name TEXT NOT NULL, scope_adcode TEXT NOT NULL, type_group TEXT NOT NULL, root_resolution INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'running', api_calls INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), finished_at TIMESTAMPTZ, note TEXT ); CREATE TABLE IF NOT EXISTS __SCHEMA__.amap_spatial_collect_tasks ( id BIGSERIAL PRIMARY KEY, graph_name TEXT NOT NULL, scope_name TEXT NOT NULL, scope_adcode TEXT NOT NULL, city TEXT NOT NULL, city_adcode TEXT NOT NULL, district TEXT NOT NULL, district_adcode TEXT NOT NULL, cell_id TEXT NOT NULL, resolution INTEGER NOT NULL, parent_cell_id TEXT, center_lng DOUBLE PRECISION, center_lat DOUBLE PRECISION, boundary_jsonb JSONB NOT NULL DEFAULT '[]', type_label TEXT NOT NULL, typecode TEXT NOT NULL, priority INTEGER NOT NULL DEFAULT 1000, status TEXT NOT NULL DEFAULT 'pending', next_page INTEGER NOT NULL DEFAULT 1, pages_consumed INTEGER NOT NULL DEFAULT 0, fetched_count INTEGER NOT NULL DEFAULT 0, inserted_count INTEGER NOT NULL DEFAULT 0, duplicate_count INTEGER NOT NULL DEFAULT 0, attempt_count INTEGER NOT NULL DEFAULT 0, last_error TEXT, last_infocode TEXT, started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (graph_name, cell_id, typecode) ); CREATE INDEX IF NOT EXISTS idx_amap_spatial_tasks_status ON __SCHEMA__.amap_spatial_collect_tasks (graph_name, status, resolution, id); CREATE INDEX IF NOT EXISTS idx_amap_spatial_tasks_district ON __SCHEMA__.amap_spatial_collect_tasks (graph_name, district_adcode, status); CREATE TABLE IF NOT EXISTS __SCHEMA__.amap_spatial_pois ( graph_name TEXT NOT NULL, gaode_poi_id TEXT NOT NULL, element_id TEXT NOT NULL, name TEXT NOT NULL, type_label TEXT NOT NULL, place_type TEXT NOT NULL, amap_type TEXT, typecode TEXT, lng DOUBLE PRECISION NOT NULL, lat DOUBLE PRECISION NOT NULL, h3_r6 TEXT NOT NULL, h3_r7 TEXT NOT NULL, h3_r8 TEXT NOT NULL, h3_r9 TEXT NOT NULL, h3_r10 TEXT NOT NULL, province TEXT, city TEXT, district TEXT, adcode TEXT, business_area TEXT, address TEXT, tel TEXT, open_time TEXT, rating TEXT, cost TEXT, level TEXT, tags TEXT, photo_urls JSONB NOT NULL DEFAULT '[]', source TEXT NOT NULL DEFAULT 'amap', source_cell_id TEXT, source_resolution INTEGER, source_scope_adcode TEXT, raw_jsonb JSONB NOT NULL DEFAULT '{}', first_fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (graph_name, gaode_poi_id) ); CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_h3_r9 ON __SCHEMA__.amap_spatial_pois (graph_name, h3_r9); CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_h3_r8 ON __SCHEMA__.amap_spatial_pois (graph_name, h3_r8); CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_type ON __SCHEMA__.amap_spatial_pois (graph_name, place_type, typecode); CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_district ON __SCHEMA__.amap_spatial_pois (graph_name, city, district); """ def ensure_tables() -> None: with conn() as c: with c.cursor() as cur: cur.execute(f"CREATE SCHEMA IF NOT EXISTS {settings.db_schema}") cur.execute(SCHEMA_SQL.replace("__SCHEMA__", settings.db_schema)) cur.execute( f"""ALTER TABLE {settings.db_schema}.amap_spatial_collect_tasks ADD COLUMN IF NOT EXISTS priority INTEGER NOT NULL DEFAULT 1000""" ) cur.execute( f"""CREATE INDEX IF NOT EXISTS idx_amap_spatial_tasks_resolution_cell_priority ON {settings.db_schema}.amap_spatial_collect_tasks (graph_name, status, resolution, city_adcode, district_adcode, cell_id, priority, id)""" ) c.commit() def reset_graph_if_requested(graph_name: str, reset_graph: bool) -> None: if not reset_graph: return g = graph(graph_name) try: g.delete() except Exception: pass def seed_scope( *, scope_keyword: str, graph_name: str, type_group: str, root_resolution: int, reset: bool, reset_graph: bool, ) -> None: ensure_tables() reset_graph_if_requested(graph_name, reset_graph) typecodes = TYPE_GROUPS[type_group] city = district_detail(scope_keyword, subdistrict=1, extensions="base") city_name = city["name"] city_adcode = city["adcode"] districts = city.get("districts") or [] if not districts: districts = [{"name": city_name, "adcode": city_adcode}] with conn() as c: with c.cursor() as cur: if reset: cur.execute( f"DELETE FROM {settings.db_schema}.amap_spatial_collect_tasks WHERE graph_name=%s", (graph_name,), ) cur.execute( f"DELETE FROM {settings.db_schema}.amap_spatial_pois WHERE graph_name=%s", (graph_name,), ) cur.execute( f"""INSERT INTO {settings.db_schema}.amap_spatial_collect_runs (graph_name, scope_name, scope_adcode, type_group, root_resolution, note) VALUES (%s,%s,%s,%s,%s,%s)""", (graph_name, city_name, city_adcode, type_group, root_resolution, "seed"), ) seeded = 0 for d in districts: detail = district_detail(d["adcode"], subdistrict=0, extensions="all") loops = split_polyline(detail.get("polyline", "")) if not loops: continue cells = sorted(h3.geo_to_cells(geojson_for_loops(loops), root_resolution)) for cell_id in cells: center_lng, center_lat = cell_center(cell_id) boundary = cell_boundary_json(cell_id) for label, typecode in typecodes.items(): priority = DISTRICT_PRIORITY.get(detail["name"], 99) * 100 + TYPE_PRIORITY.get(label, 50) cur.execute( f"""INSERT INTO {settings.db_schema}.amap_spatial_collect_tasks (graph_name, scope_name, scope_adcode, city, city_adcode, district, district_adcode, cell_id, resolution, center_lng, center_lat, boundary_jsonb, type_label, typecode, priority) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s) ON CONFLICT (graph_name, cell_id, typecode) DO NOTHING""", ( graph_name, city_name, city_adcode, city_name, city_adcode, detail["name"], detail["adcode"], cell_id, root_resolution, center_lng, center_lat, boundary, label, typecode, priority, ), ) seeded += cur.rowcount or 0 time.sleep(0.15) c.commit() print(f"seeded {seeded} tasks for {city_name} into graph={graph_name}") def normalize_poi(poi: dict[str, Any], task: dict[str, Any]) -> dict[str, Any] | None: loc = clean(poi.get("location")) if "," not in loc: return None try: lng_s, lat_s = loc.split(",", 1) lng, lat = float(lng_s), float(lat_s) except ValueError: return None if not (103.0 <= lng <= 110.6 and 24.0 <= lat <= 29.9): return None poi_city = clean(poi.get("cityname")) poi_adcode = clean(poi.get("adcode")) # H3 cells near an administrative border can cross the city line. Keep the # source scope strict so a Guiyang scan cannot pollute the clean graph with # neighboring Qiannan / Anshun POIs. city_prefix = clean(task["city_adcode"])[:4] if poi_city and poi_city != task["city"]: return None if poi_adcode and city_prefix and not poi_adcode.startswith(city_prefix): return None biz_ext = poi.get("biz_ext") if isinstance(poi.get("biz_ext"), dict) else {} photos = poi.get("photos") if isinstance(poi.get("photos"), list) else [] photo_urls = [clean(p.get("url")) for p in photos if isinstance(p, dict) and p.get("url")] h3s = h3_fields(lat, lng) gaode_id = clean(poi.get("id")) if not gaode_id: return None return { "graph_name": task["graph_name"], "gaode_poi_id": gaode_id, "element_id": f"amap:{gaode_id}", "name": clean(poi.get("name")), "type_label": task["type_label"], "place_type": PLACE_TYPE.get(task["type_label"], "poi"), "amap_type": clean(poi.get("type")), "typecode": clean(poi.get("typecode")) or task["typecode"], "lng": lng, "lat": lat, **h3s, "province": clean(poi.get("pname")), "city": clean(poi.get("cityname")) or task["city"], "district": clean(poi.get("adname")) or task["district"], "adcode": clean(poi.get("adcode")) or task["district_adcode"], "business_area": clean(poi.get("business_area")), "address": clean(poi.get("address")), "tel": clean(poi.get("tel")), "open_time": clean(biz_ext.get("open_time")), "rating": clean(biz_ext.get("rating")), "cost": clean(biz_ext.get("cost")), "level": clean(poi.get("level")), "tags": clean(poi.get("tag")), "photo_urls": photo_urls, "source_cell_id": task["cell_id"], "source_resolution": task["resolution"], "source_scope_adcode": task["scope_adcode"], "raw_jsonb": poi, } def search_cell(task: dict[str, Any], page: int) -> tuple[list[dict[str, Any]], dict[str, Any]]: payload = amap_get( AMAP_POLYGON_URL, { "polygon": h3_polygon(task["cell_id"]), "types": task["typecode"], "offset": PAGE_SIZE, "page": page, "extensions": "all", "children": 1, }, ) if payload.get("status") != "1": return [], payload pois = payload.get("pois") or [] rows = [r for p in pois if (r := normalize_poi(p, task))] return rows, payload def upsert_pg_pois(c: psycopg.Connection, rows: list[dict[str, Any]]) -> tuple[int, int]: inserted = duplicate = 0 with c.cursor() as cur: for r in rows: cur.execute( f"""INSERT INTO {settings.db_schema}.amap_spatial_pois (graph_name, gaode_poi_id, element_id, name, type_label, place_type, amap_type, typecode, lng, lat, h3_r6, h3_r7, h3_r8, h3_r9, h3_r10, province, city, district, adcode, business_area, address, tel, open_time, rating, cost, level, tags, photo_urls, source_cell_id, source_resolution, source_scope_adcode, raw_jsonb) VALUES (%(graph_name)s,%(gaode_poi_id)s,%(element_id)s,%(name)s,%(type_label)s,%(place_type)s, %(amap_type)s,%(typecode)s,%(lng)s,%(lat)s,%(h3_r6)s,%(h3_r7)s,%(h3_r8)s,%(h3_r9)s,%(h3_r10)s, %(province)s,%(city)s,%(district)s,%(adcode)s,%(business_area)s,%(address)s,%(tel)s, %(open_time)s,%(rating)s,%(cost)s,%(level)s,%(tags)s,%(photo_urls)s::jsonb,%(source_cell_id)s, %(source_resolution)s,%(source_scope_adcode)s,%(raw_jsonb)s::jsonb) ON CONFLICT (graph_name, gaode_poi_id) DO UPDATE SET name=EXCLUDED.name, type_label=EXCLUDED.type_label, place_type=EXCLUDED.place_type, amap_type=EXCLUDED.amap_type, typecode=EXCLUDED.typecode, lng=EXCLUDED.lng, lat=EXCLUDED.lat, h3_r6=EXCLUDED.h3_r6, h3_r7=EXCLUDED.h3_r7, h3_r8=EXCLUDED.h3_r8, h3_r9=EXCLUDED.h3_r9, h3_r10=EXCLUDED.h3_r10, province=EXCLUDED.province, city=EXCLUDED.city, district=EXCLUDED.district, adcode=EXCLUDED.adcode, business_area=EXCLUDED.business_area, address=EXCLUDED.address, tel=EXCLUDED.tel, open_time=EXCLUDED.open_time, rating=EXCLUDED.rating, cost=EXCLUDED.cost, level=EXCLUDED.level, tags=EXCLUDED.tags, photo_urls=EXCLUDED.photo_urls, source_cell_id=EXCLUDED.source_cell_id, source_resolution=EXCLUDED.source_resolution, source_scope_adcode=EXCLUDED.source_scope_adcode, raw_jsonb=EXCLUDED.raw_jsonb, last_fetched_at=now() RETURNING (xmax = 0) AS inserted""", { **r, "photo_urls": json.dumps(r["photo_urls"], ensure_ascii=False), "raw_jsonb": json.dumps(r["raw_jsonb"], ensure_ascii=False), }, ) if cur.fetchone()["inserted"]: inserted += 1 else: duplicate += 1 return inserted, duplicate def upsert_graph_places(graph_name: str, rows: list[dict[str, Any]]) -> None: if not rows: return g = graph(graph_name) for r in rows: label = FALKOR_LABEL.get(r["type_label"], "POI") query = ( f"MERGE (p:Place:{label} {{element_id:$element_id}}) " "SET p.place_id=$element_id,p.gaode_poi_id=$gaode_poi_id,p.name=$name," "p.place_type=$place_type,p.type_label=$type_label,p.amap_type=$amap_type," "p.typecode=$typecode,p.lng=$lng,p.lat=$lat,p.h3_r6=$h3_r6,p.h3_r7=$h3_r7," "p.h3_r8=$h3_r8,p.h3_r9=$h3_r9,p.h3_r10=$h3_r10,p.province=$province," "p.city=$city,p.district=$district,p.adcode=$adcode,p.address=$address," "p.tel=$tel,p.open_time=$open_time,p.rating=$rating,p.cost=$cost,p.level=$level," "p.tags=$tags,p.photo_urls=$photo_urls,p.source='amap',p.source_cell_id=$source_cell_id," "p.source_resolution=$source_resolution,p.first_seen_at=coalesce(p.first_seen_at,$fetched_at)," "p.updated_at=$fetched_at " "MERGE (prov:Area {element_id:$province_key}) SET prov.name=$province,prov.level='province' " "MERGE (city:Area {element_id:$city_key}) SET city.name=$city,city.level='city' " "MERGE (dist:Area {element_id:$district_key}) SET dist.name=$district,dist.level='district',dist.adcode=$adcode " "MERGE (p)-[:LOCATED_IN]->(dist) " "MERGE (dist)-[:PART_OF]->(city) " "MERGE (city)-[:PART_OF]->(prov) " "MERGE (c9:GeoCell {h3_id:$h3_r9}) SET c9.resolution=9 " "MERGE (p)-[:IN_H3_R9]->(c9)" ) params = { **{k: r.get(k, "") for k in [ "element_id", "gaode_poi_id", "name", "place_type", "type_label", "amap_type", "typecode", "h3_r6", "h3_r7", "h3_r8", "h3_r9", "h3_r10", "province", "city", "district", "adcode", "address", "tel", "open_time", "rating", "cost", "level", "tags", "source_cell_id", ]}, "lng": float(r["lng"]), "lat": float(r["lat"]), "source_resolution": int(r["source_resolution"]), "photo_urls": "|".join(r.get("photo_urls") or [])[:1200], "province_key": f"area:province:{r.get('province') or 'unknown'}", "city_key": f"area:city:{r.get('city') or 'unknown'}", "district_key": f"area:district:{r.get('adcode') or r.get('district') or 'unknown'}", "fetched_at": now_iso(), } g.query(query, params) def update_task(cur, task_id: int, **fields: Any) -> None: if not fields: return cols = ", ".join(f"{k}=%s" for k in fields) values = list(fields.values()) + [task_id] cur.execute( f"UPDATE {settings.db_schema}.amap_spatial_collect_tasks SET {cols}, updated_at=now() WHERE id=%s", values, ) def split_task(c: psycopg.Connection, task: dict[str, Any]) -> int: if int(task["resolution"]) >= MAX_RESOLUTION: return 0 next_res = int(task["resolution"]) + 1 children = sorted(h3.cell_to_children(task["cell_id"], next_res)) created = 0 with c.cursor() as cur: for child in children: center_lng, center_lat = cell_center(child) cur.execute( f"""INSERT INTO {settings.db_schema}.amap_spatial_collect_tasks (graph_name, scope_name, scope_adcode, city, city_adcode, district, district_adcode, cell_id, resolution, parent_cell_id, center_lng, center_lat, boundary_jsonb, type_label, typecode, priority) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s) ON CONFLICT (graph_name, cell_id, typecode) DO NOTHING""", ( task["graph_name"], task["scope_name"], task["scope_adcode"], task["city"], task["city_adcode"], task["district"], task["district_adcode"], child, next_res, task["cell_id"], center_lng, center_lat, cell_boundary_json(child), task["type_label"], task["typecode"], task["priority"], ), ) created += cur.rowcount or 0 return created def is_quota_error(infocode: str) -> bool: return infocode in {"10003", "10004", "10014", "10029"} def run_collection(graph_name: str, max_api_calls: int, sleep_s: float) -> dict[str, Any]: ensure_tables() api_calls = 0 totals = {"tasks": 0, "pois": 0, "inserted": 0, "duplicates": 0, "split_children": 0} with conn() as c: while max_api_calls <= 0 or api_calls < max_api_calls: with c.cursor() as cur: cur.execute( f"""SELECT * FROM {settings.db_schema}.amap_spatial_collect_tasks WHERE graph_name=%s AND status='pending' ORDER BY resolution, city_adcode, district_adcode, cell_id, priority, id LIMIT 1 FOR UPDATE SKIP LOCKED""", (graph_name,), ) task = cur.fetchone() if not task: break update_task(cur, task["id"], status="running", started_at=datetime.now(timezone.utc)) c.commit() task_total = 0 task_inserted = 0 task_dup = 0 task_pages = 0 last_raw = 0 try: for page in range(int(task["next_page"]), MAX_PAGE + 1): if max_api_calls > 0 and api_calls >= max_api_calls: break rows, payload = search_cell(task, page) api_calls += 1 infocode = clean(payload.get("infocode")) if payload.get("status") != "1": with c.cursor() as cur: status = "quota_limited" if is_quota_error(infocode) else ( "error" if int(task["attempt_count"]) + 1 >= 3 else "pending" ) update_task( cur, task["id"], status=status, attempt_count=int(task["attempt_count"]) + 1, last_error=mask_secret(clean(payload.get("info"))), last_infocode=infocode, ) c.commit() if status == "quota_limited": return {**totals, "api_calls": api_calls, "stopped": "quota_limited"} break last_raw = len(payload.get("pois") or []) inserted, dup = upsert_pg_pois(c, rows) upsert_graph_places(graph_name, rows) task_total += last_raw task_inserted += inserted task_dup += dup task_pages += 1 with c.cursor() as cur: update_task( cur, task["id"], next_page=page + 1, pages_consumed=int(task["pages_consumed"]) + task_pages, fetched_count=int(task["fetched_count"]) + task_total, inserted_count=int(task["inserted_count"]) + task_inserted, duplicate_count=int(task["duplicate_count"]) + task_dup, ) c.commit() if last_raw < PAGE_SIZE: break time.sleep(sleep_s) final_status = "pending" created = 0 if last_raw < PAGE_SIZE: final_status = "done" elif task_total >= SATURATE_COUNT and last_raw >= PAGE_SIZE: created = split_task(c, task) final_status = "saturated" if created else "saturated_max_res" with c.cursor() as cur: update_task( cur, task["id"], status=final_status, finished_at=datetime.now(timezone.utc) if final_status != "pending" else None, ) c.commit() totals["tasks"] += 1 totals["pois"] += task_total totals["inserted"] += task_inserted totals["duplicates"] += task_dup totals["split_children"] += created except Exception as exc: with c.cursor() as cur: update_task( cur, task["id"], status="pending" if int(task["attempt_count"]) + 1 < 3 else "error", attempt_count=int(task["attempt_count"]) + 1, last_error=mask_secret(str(exc))[:500], ) c.commit() time.sleep(sleep_s) return {**totals, "api_calls": api_calls, "stopped": "budget_or_done"} def status(graph_name: str) -> dict[str, Any]: ensure_tables() with conn() as c: with c.cursor() as cur: cur.execute( f"""SELECT status, resolution, COUNT(*) cnt, COALESCE(SUM(fetched_count),0) fetched, COALESCE(SUM(inserted_count),0) inserted FROM {settings.db_schema}.amap_spatial_collect_tasks WHERE graph_name=%s GROUP BY status, resolution ORDER BY resolution, status""", (graph_name,), ) tasks = cur.fetchall() cur.execute( f"""SELECT place_type, type_label, COUNT(*) cnt FROM {settings.db_schema}.amap_spatial_pois WHERE graph_name=%s GROUP BY place_type, type_label ORDER BY cnt DESC""", (graph_name,), ) types = cur.fetchall() cur.execute( f"""SELECT city, district, COUNT(*) cnt FROM {settings.db_schema}.amap_spatial_pois WHERE graph_name=%s GROUP BY city, district ORDER BY city, district""", (graph_name,), ) districts = cur.fetchall() return {"graph": graph_name, "tasks": tasks, "types": types, "districts": districts} def write_report(graph_name: str, path: Path) -> None: payload = status(graph_name) lines = [ f"# 贵阳市 H3 空间采集运行报告", "", f"- 图谱:`{graph_name}`", f"- 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "", "## 任务状态", "", "| 状态 | H3分辨率 | 任务数 | 已取原始POI | 新增入库 |", "|---|---:|---:|---:|---:|", ] for row in payload["tasks"]: lines.append(f"| {row['status']} | {row['resolution']} | {row['cnt']} | {row['fetched']} | {row['inserted']} |") lines.extend(["", "## 类型分布", "", "| 类型 | place_type | 数量 |", "|---|---|---:|"]) for row in payload["types"]: lines.append(f"| {row['type_label']} | {row['place_type']} | {row['cnt']} |") lines.extend(["", "## 行政区分布", "", "| 城市 | 区县 | 数量 |", "|---|---|---:|"]) for row in payload["districts"]: lines.append(f"| {row['city']} | {row['district']} | {row['cnt']} |") lines.extend([ "", "## 已落入图谱的关键空间字段", "", "- `lng/lat`:高德原始经纬度。", "- `h3_r6` ~ `h3_r10`:多分辨率空间索引,后续 nearby 查询按半径选择。", "- `city/district/adcode`:行政区归属,用于市县统计与责任区过滤。", "- `source_cell_id/source_resolution`:记录本次从哪个 H3 采集格命中,便于断点续扫与追溯。", "- `first_seen_at/updated_at`:图谱节点采集时间与更新时间。", ]) path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines) + "\n", encoding="utf-8") def nearby(graph_name: str, lng: float, lat: float, radius_m: int, limit: int) -> list[dict[str, Any]]: if radius_m <= 500: res, col, k = 9, "h3_r9", 2 elif radius_m <= 1000: res, col, k = 9, "h3_r9", 4 elif radius_m <= 3000: res, col, k = 8, "h3_r8", 4 else: res, col = 7, "h3_r7" k = max(2, math.ceil(radius_m / (math.sqrt(3) * h3.average_hexagon_edge_length(res, unit="m")))) cells = list(h3.grid_disk(h3.latlng_to_cell(lat, lng, res), k)) with conn() as c: with c.cursor() as cur: cur.execute( f"""SELECT name, place_type, type_label, lng, lat, address, district, {col} AS h3_cell FROM {settings.db_schema}.amap_spatial_pois WHERE graph_name=%s AND {col}=ANY(%s) LIMIT 5000""", (graph_name, cells), ) rows = cur.fetchall() out = [] for r in rows: d = haversine(lng, lat, float(r["lng"]), float(r["lat"])) if d <= radius_m: out.append({**dict(r), "distance_m": round(d, 1)}) return sorted(out, key=lambda x: x["distance_m"])[:limit] def haversine(lng1: float, lat1: float, lng2: float, lat2: float) -> float: radius = 6371008.8 d_lng = math.radians(lng2 - lng1) d_lat = math.radians(lat2 - lat1) a = ( math.sin(d_lat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lng / 2) ** 2 ) return 2 * radius * math.asin(math.sqrt(a)) def main() -> None: parser = argparse.ArgumentParser(description="AMap H3 spatial collector") sub = parser.add_subparsers(dest="cmd", required=True) p_init = sub.add_parser("init") p_init.add_argument("--scope", default="贵阳市") p_init.add_argument("--graph", default="guiyang_spatial_v1") p_init.add_argument("--type-group", choices=sorted(TYPE_GROUPS), default="all") p_init.add_argument("--resolution", type=int, default=6) p_init.add_argument("--reset", action="store_true") p_init.add_argument("--reset-graph", action="store_true") p_run = sub.add_parser("run") p_run.add_argument("--graph", default="guiyang_spatial_v1") p_run.add_argument("--max-api-calls", type=int, default=300) p_run.add_argument("--sleep", type=float, default=0.25) p_status = sub.add_parser("status") p_status.add_argument("--graph", default="guiyang_spatial_v1") p_status.add_argument("--report", default="") p_nearby = sub.add_parser("nearby") p_nearby.add_argument("--graph", default="guiyang_spatial_v1") p_nearby.add_argument("--lng", type=float, required=True) p_nearby.add_argument("--lat", type=float, required=True) p_nearby.add_argument("--radius", type=int, default=1000) p_nearby.add_argument("--limit", type=int, default=20) args = parser.parse_args() if args.cmd == "init": seed_scope( scope_keyword=args.scope, graph_name=args.graph, type_group=args.type_group, root_resolution=args.resolution, reset=args.reset, reset_graph=args.reset_graph, ) elif args.cmd == "run": print(json.dumps(run_collection(args.graph, args.max_api_calls, args.sleep), ensure_ascii=False, indent=2)) elif args.cmd == "status": payload = status(args.graph) print(json.dumps(payload, ensure_ascii=False, indent=2, default=str)) if args.report: write_report(args.graph, Path(args.report)) print(f"report written: {args.report}") elif args.cmd == "nearby": print(json.dumps(nearby(args.graph, args.lng, args.lat, args.radius, args.limit), ensure_ascii=False, indent=2)) if __name__ == "__main__": main()