940 lines
36 KiB
Python
940 lines
36 KiB
Python
#!/usr/bin/env python3
|
||
"""AMap H3 spatial collector for a clean city knowledge graph.
|
||
|
||
The old Super Agent grid used rectangular bbox subdivision. This collector uses
|
||
district boundaries -> H3 cells -> AMap polygon search, records every task, and
|
||
writes POI spatial properties into a dedicated FalkorDB graph.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import math
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import h3
|
||
import psycopg
|
||
import requests
|
||
import urllib3
|
||
from falkordb import FalkorDB
|
||
from psycopg.rows import dict_row
|
||
|
||
ROOT = Path(__file__).resolve().parents[1]
|
||
if str(ROOT) not in sys.path:
|
||
sys.path.insert(0, str(ROOT))
|
||
|
||
from app.config import settings # noqa: E402
|
||
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
AMAP_DISTRICT_URL = "https://restapi.amap.com/v3/config/district"
|
||
AMAP_POLYGON_URL = "https://restapi.amap.com/v3/place/polygon"
|
||
|
||
PAGE_SIZE = 25
|
||
MAX_PAGE = 8
|
||
SATURATE_COUNT = 180
|
||
MAX_RESOLUTION = 9
|
||
|
||
TYPE_GROUPS: dict[str, dict[str, str]] = {
|
||
"core": {
|
||
"景点": "110000",
|
||
"美食": "050000",
|
||
"酒店": "100000",
|
||
"商场": "060000",
|
||
"交通设施": "150000",
|
||
},
|
||
"tourism": {
|
||
"景点": "110000",
|
||
"美食": "050000",
|
||
"酒店": "100000",
|
||
"商场": "060000",
|
||
"交通设施": "150000",
|
||
"生活服务": "070000",
|
||
},
|
||
"all": {
|
||
"汽车服务": "010000",
|
||
"汽车销售": "020000",
|
||
"汽车维修": "030000",
|
||
"摩托车服务": "040000",
|
||
"美食": "050000",
|
||
"商场": "060000",
|
||
"生活服务": "070000",
|
||
"体育休闲": "080000",
|
||
"医疗保健": "090000",
|
||
"酒店": "100000",
|
||
"景点": "110000",
|
||
"商务住宅": "120000",
|
||
"政府机构": "130000",
|
||
"科教文化": "140000",
|
||
"交通设施": "150000",
|
||
"金融保险": "160000",
|
||
"公司企业": "170000",
|
||
"道路附属": "180000",
|
||
"地名地址": "190000",
|
||
"公共设施": "200000",
|
||
},
|
||
}
|
||
|
||
DISTRICT_PRIORITY = {
|
||
"云岩区": 1,
|
||
"南明区": 2,
|
||
"观山湖区": 3,
|
||
"花溪区": 4,
|
||
"白云区": 5,
|
||
"乌当区": 6,
|
||
"清镇市": 7,
|
||
"修文县": 8,
|
||
"息烽县": 9,
|
||
"开阳县": 10,
|
||
}
|
||
|
||
TYPE_PRIORITY = {
|
||
"景点": 1,
|
||
"美食": 2,
|
||
"酒店": 3,
|
||
"交通设施": 4,
|
||
"商场": 5,
|
||
"生活服务": 6,
|
||
"医疗保健": 7,
|
||
"科教文化": 8,
|
||
}
|
||
|
||
PLACE_TYPE = {
|
||
"景点": "sight",
|
||
"美食": "eat",
|
||
"酒店": "hotel",
|
||
"商场": "mall",
|
||
"交通设施": "transit",
|
||
"医疗保健": "medical",
|
||
"科教文化": "education",
|
||
"生活服务": "life",
|
||
"公共设施": "facility",
|
||
"政府机构": "government",
|
||
"商务住宅": "residential",
|
||
"公司企业": "enterprise",
|
||
}
|
||
|
||
FALKOR_LABEL = {
|
||
"景点": "ScenicSpot",
|
||
"美食": "FoodPlace",
|
||
"酒店": "Hotel",
|
||
"商场": "Mall",
|
||
"交通设施": "TransitFacility",
|
||
"医疗保健": "MedicalPlace",
|
||
"科教文化": "EducationPlace",
|
||
"生活服务": "LifeServicePlace",
|
||
"公共设施": "Facility",
|
||
"政府机构": "GovernmentPlace",
|
||
"商务住宅": "ResidentialPlace",
|
||
"公司企业": "EnterprisePlace",
|
||
}
|
||
|
||
|
||
def now_iso() -> str:
|
||
return datetime.now(timezone.utc).isoformat()
|
||
|
||
|
||
def mask_secret(text: Any) -> str:
|
||
return re.sub(r"([?&]key=)[^&\s]+", r"\1***", str(text or ""))
|
||
|
||
|
||
def clean(v: Any) -> str:
|
||
if v is None or v == []:
|
||
return ""
|
||
if isinstance(v, list):
|
||
return " | ".join(str(x) for x in v if x)
|
||
return str(v)
|
||
|
||
|
||
def amap_key() -> str:
|
||
for candidate in (
|
||
settings.amap_web_key,
|
||
os.environ.get("AMAP_WEB_KEY"),
|
||
os.environ.get("AMAP_KEY"),
|
||
):
|
||
if candidate:
|
||
return candidate
|
||
try:
|
||
from app.agents.gaode_connector import _amap_key
|
||
|
||
return _amap_key()
|
||
except Exception as exc: # pragma: no cover - only for local misconfig
|
||
raise RuntimeError("未找到高德 Web 服务 Key,请配置 AMAP_WEB_KEY 或保留旧采集脚本 Key") from exc
|
||
|
||
|
||
def amap_get(url: str, params: dict[str, Any], timeout: int = 30) -> dict[str, Any]:
|
||
params = {**params, "key": amap_key(), "output": "json"}
|
||
response = requests.get(url, params=params, timeout=timeout, verify=False)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
|
||
def split_polyline(polyline: str) -> list[list[list[float]]]:
|
||
loops: list[list[list[float]]] = []
|
||
for part in (polyline or "").split("|"):
|
||
coords: list[list[float]] = []
|
||
for pair in part.split(";"):
|
||
if not pair:
|
||
continue
|
||
lng, lat = map(float, pair.split(","))
|
||
coords.append([lng, lat])
|
||
if coords and coords[0] != coords[-1]:
|
||
coords.append(coords[0])
|
||
if len(coords) >= 4:
|
||
loops.append(coords)
|
||
return loops
|
||
|
||
|
||
def geojson_for_loops(loops: list[list[list[float]]]) -> dict[str, Any]:
|
||
return {"type": "MultiPolygon", "coordinates": [[loop] for loop in loops]}
|
||
|
||
|
||
def district_detail(keyword: str, subdistrict: int = 0, extensions: str = "all") -> dict[str, Any]:
|
||
payload = amap_get(
|
||
AMAP_DISTRICT_URL,
|
||
{
|
||
"keywords": keyword,
|
||
"subdistrict": subdistrict,
|
||
"extensions": extensions,
|
||
},
|
||
)
|
||
if payload.get("status") != "1" or not payload.get("districts"):
|
||
raise RuntimeError(f"行政区查询失败: {keyword} {payload.get('info')} {payload.get('infocode')}")
|
||
return payload["districts"][0]
|
||
|
||
|
||
def h3_polygon(cell_id: str) -> str:
|
||
boundary = list(h3.cell_to_boundary(cell_id))
|
||
pts = [(lng, lat) for lat, lng in boundary]
|
||
pts.append(pts[0])
|
||
return "|".join(f"{lng:.6f},{lat:.6f}" for lng, lat in pts)
|
||
|
||
|
||
def h3_fields(lat: float, lng: float) -> dict[str, str]:
|
||
return {f"h3_r{r}": h3.latlng_to_cell(lat, lng, r) for r in range(6, 11)}
|
||
|
||
|
||
def cell_center(cell_id: str) -> tuple[float, float]:
|
||
lat, lng = h3.cell_to_latlng(cell_id)
|
||
return lng, lat
|
||
|
||
|
||
def cell_boundary_json(cell_id: str) -> str:
|
||
return json.dumps([[lng, lat] for lat, lng in h3.cell_to_boundary(cell_id)], ensure_ascii=False)
|
||
|
||
|
||
def conn():
|
||
return psycopg.connect(settings.database_url, row_factory=dict_row)
|
||
|
||
|
||
def graph(graph_name: str):
|
||
db = FalkorDB(
|
||
host=settings.falkordb_host,
|
||
port=settings.falkordb_port,
|
||
password=settings.falkordb_password or None,
|
||
)
|
||
return db.select_graph(graph_name)
|
||
|
||
|
||
SCHEMA_SQL = """
|
||
CREATE TABLE IF NOT EXISTS __SCHEMA__.amap_spatial_collect_runs (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
graph_name TEXT NOT NULL,
|
||
scope_name TEXT NOT NULL,
|
||
scope_adcode TEXT NOT NULL,
|
||
type_group TEXT NOT NULL,
|
||
root_resolution INTEGER NOT NULL,
|
||
status TEXT NOT NULL DEFAULT 'running',
|
||
api_calls INTEGER NOT NULL DEFAULT 0,
|
||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
finished_at TIMESTAMPTZ,
|
||
note TEXT
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS __SCHEMA__.amap_spatial_collect_tasks (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
graph_name TEXT NOT NULL,
|
||
scope_name TEXT NOT NULL,
|
||
scope_adcode TEXT NOT NULL,
|
||
city TEXT NOT NULL,
|
||
city_adcode TEXT NOT NULL,
|
||
district TEXT NOT NULL,
|
||
district_adcode TEXT NOT NULL,
|
||
cell_id TEXT NOT NULL,
|
||
resolution INTEGER NOT NULL,
|
||
parent_cell_id TEXT,
|
||
center_lng DOUBLE PRECISION,
|
||
center_lat DOUBLE PRECISION,
|
||
boundary_jsonb JSONB NOT NULL DEFAULT '[]',
|
||
type_label TEXT NOT NULL,
|
||
typecode TEXT NOT NULL,
|
||
priority INTEGER NOT NULL DEFAULT 1000,
|
||
status TEXT NOT NULL DEFAULT 'pending',
|
||
next_page INTEGER NOT NULL DEFAULT 1,
|
||
pages_consumed INTEGER NOT NULL DEFAULT 0,
|
||
fetched_count INTEGER NOT NULL DEFAULT 0,
|
||
inserted_count INTEGER NOT NULL DEFAULT 0,
|
||
duplicate_count INTEGER NOT NULL DEFAULT 0,
|
||
attempt_count INTEGER NOT NULL DEFAULT 0,
|
||
last_error TEXT,
|
||
last_infocode TEXT,
|
||
started_at TIMESTAMPTZ,
|
||
finished_at TIMESTAMPTZ,
|
||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
UNIQUE (graph_name, cell_id, typecode)
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_amap_spatial_tasks_status
|
||
ON __SCHEMA__.amap_spatial_collect_tasks (graph_name, status, resolution, id);
|
||
CREATE INDEX IF NOT EXISTS idx_amap_spatial_tasks_district
|
||
ON __SCHEMA__.amap_spatial_collect_tasks (graph_name, district_adcode, status);
|
||
|
||
CREATE TABLE IF NOT EXISTS __SCHEMA__.amap_spatial_pois (
|
||
graph_name TEXT NOT NULL,
|
||
gaode_poi_id TEXT NOT NULL,
|
||
element_id TEXT NOT NULL,
|
||
name TEXT NOT NULL,
|
||
type_label TEXT NOT NULL,
|
||
place_type TEXT NOT NULL,
|
||
amap_type TEXT,
|
||
typecode TEXT,
|
||
lng DOUBLE PRECISION NOT NULL,
|
||
lat DOUBLE PRECISION NOT NULL,
|
||
h3_r6 TEXT NOT NULL,
|
||
h3_r7 TEXT NOT NULL,
|
||
h3_r8 TEXT NOT NULL,
|
||
h3_r9 TEXT NOT NULL,
|
||
h3_r10 TEXT NOT NULL,
|
||
province TEXT,
|
||
city TEXT,
|
||
district TEXT,
|
||
adcode TEXT,
|
||
business_area TEXT,
|
||
address TEXT,
|
||
tel TEXT,
|
||
open_time TEXT,
|
||
rating TEXT,
|
||
cost TEXT,
|
||
level TEXT,
|
||
tags TEXT,
|
||
photo_urls JSONB NOT NULL DEFAULT '[]',
|
||
source TEXT NOT NULL DEFAULT 'amap',
|
||
source_cell_id TEXT,
|
||
source_resolution INTEGER,
|
||
source_scope_adcode TEXT,
|
||
raw_jsonb JSONB NOT NULL DEFAULT '{}',
|
||
first_fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
last_fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||
PRIMARY KEY (graph_name, gaode_poi_id)
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_h3_r9
|
||
ON __SCHEMA__.amap_spatial_pois (graph_name, h3_r9);
|
||
CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_h3_r8
|
||
ON __SCHEMA__.amap_spatial_pois (graph_name, h3_r8);
|
||
CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_type
|
||
ON __SCHEMA__.amap_spatial_pois (graph_name, place_type, typecode);
|
||
CREATE INDEX IF NOT EXISTS idx_amap_spatial_pois_district
|
||
ON __SCHEMA__.amap_spatial_pois (graph_name, city, district);
|
||
"""
|
||
|
||
|
||
def ensure_tables() -> None:
|
||
with conn() as c:
|
||
with c.cursor() as cur:
|
||
cur.execute(f"CREATE SCHEMA IF NOT EXISTS {settings.db_schema}")
|
||
cur.execute(SCHEMA_SQL.replace("__SCHEMA__", settings.db_schema))
|
||
cur.execute(
|
||
f"""ALTER TABLE {settings.db_schema}.amap_spatial_collect_tasks
|
||
ADD COLUMN IF NOT EXISTS priority INTEGER NOT NULL DEFAULT 1000"""
|
||
)
|
||
cur.execute(
|
||
f"""CREATE INDEX IF NOT EXISTS idx_amap_spatial_tasks_resolution_cell_priority
|
||
ON {settings.db_schema}.amap_spatial_collect_tasks
|
||
(graph_name, status, resolution, city_adcode, district_adcode, cell_id, priority, id)"""
|
||
)
|
||
c.commit()
|
||
|
||
|
||
def reset_graph_if_requested(graph_name: str, reset_graph: bool) -> None:
|
||
if not reset_graph:
|
||
return
|
||
g = graph(graph_name)
|
||
try:
|
||
g.delete()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def seed_scope(
|
||
*,
|
||
scope_keyword: str,
|
||
graph_name: str,
|
||
type_group: str,
|
||
root_resolution: int,
|
||
reset: bool,
|
||
reset_graph: bool,
|
||
) -> None:
|
||
ensure_tables()
|
||
reset_graph_if_requested(graph_name, reset_graph)
|
||
typecodes = TYPE_GROUPS[type_group]
|
||
city = district_detail(scope_keyword, subdistrict=1, extensions="base")
|
||
city_name = city["name"]
|
||
city_adcode = city["adcode"]
|
||
districts = city.get("districts") or []
|
||
if not districts:
|
||
districts = [{"name": city_name, "adcode": city_adcode}]
|
||
|
||
with conn() as c:
|
||
with c.cursor() as cur:
|
||
if reset:
|
||
cur.execute(
|
||
f"DELETE FROM {settings.db_schema}.amap_spatial_collect_tasks WHERE graph_name=%s",
|
||
(graph_name,),
|
||
)
|
||
cur.execute(
|
||
f"DELETE FROM {settings.db_schema}.amap_spatial_pois WHERE graph_name=%s",
|
||
(graph_name,),
|
||
)
|
||
cur.execute(
|
||
f"""INSERT INTO {settings.db_schema}.amap_spatial_collect_runs
|
||
(graph_name, scope_name, scope_adcode, type_group, root_resolution, note)
|
||
VALUES (%s,%s,%s,%s,%s,%s)""",
|
||
(graph_name, city_name, city_adcode, type_group, root_resolution, "seed"),
|
||
)
|
||
seeded = 0
|
||
for d in districts:
|
||
detail = district_detail(d["adcode"], subdistrict=0, extensions="all")
|
||
loops = split_polyline(detail.get("polyline", ""))
|
||
if not loops:
|
||
continue
|
||
cells = sorted(h3.geo_to_cells(geojson_for_loops(loops), root_resolution))
|
||
for cell_id in cells:
|
||
center_lng, center_lat = cell_center(cell_id)
|
||
boundary = cell_boundary_json(cell_id)
|
||
for label, typecode in typecodes.items():
|
||
priority = DISTRICT_PRIORITY.get(detail["name"], 99) * 100 + TYPE_PRIORITY.get(label, 50)
|
||
cur.execute(
|
||
f"""INSERT INTO {settings.db_schema}.amap_spatial_collect_tasks
|
||
(graph_name, scope_name, scope_adcode, city, city_adcode,
|
||
district, district_adcode, cell_id, resolution,
|
||
center_lng, center_lat, boundary_jsonb, type_label, typecode, priority)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s)
|
||
ON CONFLICT (graph_name, cell_id, typecode) DO NOTHING""",
|
||
(
|
||
graph_name,
|
||
city_name,
|
||
city_adcode,
|
||
city_name,
|
||
city_adcode,
|
||
detail["name"],
|
||
detail["adcode"],
|
||
cell_id,
|
||
root_resolution,
|
||
center_lng,
|
||
center_lat,
|
||
boundary,
|
||
label,
|
||
typecode,
|
||
priority,
|
||
),
|
||
)
|
||
seeded += cur.rowcount or 0
|
||
time.sleep(0.15)
|
||
c.commit()
|
||
print(f"seeded {seeded} tasks for {city_name} into graph={graph_name}")
|
||
|
||
|
||
def normalize_poi(poi: dict[str, Any], task: dict[str, Any]) -> dict[str, Any] | None:
|
||
loc = clean(poi.get("location"))
|
||
if "," not in loc:
|
||
return None
|
||
try:
|
||
lng_s, lat_s = loc.split(",", 1)
|
||
lng, lat = float(lng_s), float(lat_s)
|
||
except ValueError:
|
||
return None
|
||
if not (103.0 <= lng <= 110.6 and 24.0 <= lat <= 29.9):
|
||
return None
|
||
poi_city = clean(poi.get("cityname"))
|
||
poi_adcode = clean(poi.get("adcode"))
|
||
# H3 cells near an administrative border can cross the city line. Keep the
|
||
# source scope strict so a Guiyang scan cannot pollute the clean graph with
|
||
# neighboring Qiannan / Anshun POIs.
|
||
city_prefix = clean(task["city_adcode"])[:4]
|
||
if poi_city and poi_city != task["city"]:
|
||
return None
|
||
if poi_adcode and city_prefix and not poi_adcode.startswith(city_prefix):
|
||
return None
|
||
|
||
biz_ext = poi.get("biz_ext") if isinstance(poi.get("biz_ext"), dict) else {}
|
||
photos = poi.get("photos") if isinstance(poi.get("photos"), list) else []
|
||
photo_urls = [clean(p.get("url")) for p in photos if isinstance(p, dict) and p.get("url")]
|
||
h3s = h3_fields(lat, lng)
|
||
gaode_id = clean(poi.get("id"))
|
||
if not gaode_id:
|
||
return None
|
||
return {
|
||
"graph_name": task["graph_name"],
|
||
"gaode_poi_id": gaode_id,
|
||
"element_id": f"amap:{gaode_id}",
|
||
"name": clean(poi.get("name")),
|
||
"type_label": task["type_label"],
|
||
"place_type": PLACE_TYPE.get(task["type_label"], "poi"),
|
||
"amap_type": clean(poi.get("type")),
|
||
"typecode": clean(poi.get("typecode")) or task["typecode"],
|
||
"lng": lng,
|
||
"lat": lat,
|
||
**h3s,
|
||
"province": clean(poi.get("pname")),
|
||
"city": clean(poi.get("cityname")) or task["city"],
|
||
"district": clean(poi.get("adname")) or task["district"],
|
||
"adcode": clean(poi.get("adcode")) or task["district_adcode"],
|
||
"business_area": clean(poi.get("business_area")),
|
||
"address": clean(poi.get("address")),
|
||
"tel": clean(poi.get("tel")),
|
||
"open_time": clean(biz_ext.get("open_time")),
|
||
"rating": clean(biz_ext.get("rating")),
|
||
"cost": clean(biz_ext.get("cost")),
|
||
"level": clean(poi.get("level")),
|
||
"tags": clean(poi.get("tag")),
|
||
"photo_urls": photo_urls,
|
||
"source_cell_id": task["cell_id"],
|
||
"source_resolution": task["resolution"],
|
||
"source_scope_adcode": task["scope_adcode"],
|
||
"raw_jsonb": poi,
|
||
}
|
||
|
||
|
||
def search_cell(task: dict[str, Any], page: int) -> tuple[list[dict[str, Any]], dict[str, Any]]:
|
||
payload = amap_get(
|
||
AMAP_POLYGON_URL,
|
||
{
|
||
"polygon": h3_polygon(task["cell_id"]),
|
||
"types": task["typecode"],
|
||
"offset": PAGE_SIZE,
|
||
"page": page,
|
||
"extensions": "all",
|
||
"children": 1,
|
||
},
|
||
)
|
||
if payload.get("status") != "1":
|
||
return [], payload
|
||
pois = payload.get("pois") or []
|
||
rows = [r for p in pois if (r := normalize_poi(p, task))]
|
||
return rows, payload
|
||
|
||
|
||
def upsert_pg_pois(c: psycopg.Connection, rows: list[dict[str, Any]]) -> tuple[int, int]:
|
||
inserted = duplicate = 0
|
||
with c.cursor() as cur:
|
||
for r in rows:
|
||
cur.execute(
|
||
f"""INSERT INTO {settings.db_schema}.amap_spatial_pois
|
||
(graph_name, gaode_poi_id, element_id, name, type_label, place_type,
|
||
amap_type, typecode, lng, lat, h3_r6, h3_r7, h3_r8, h3_r9, h3_r10,
|
||
province, city, district, adcode, business_area, address, tel,
|
||
open_time, rating, cost, level, tags, photo_urls, source_cell_id,
|
||
source_resolution, source_scope_adcode, raw_jsonb)
|
||
VALUES
|
||
(%(graph_name)s,%(gaode_poi_id)s,%(element_id)s,%(name)s,%(type_label)s,%(place_type)s,
|
||
%(amap_type)s,%(typecode)s,%(lng)s,%(lat)s,%(h3_r6)s,%(h3_r7)s,%(h3_r8)s,%(h3_r9)s,%(h3_r10)s,
|
||
%(province)s,%(city)s,%(district)s,%(adcode)s,%(business_area)s,%(address)s,%(tel)s,
|
||
%(open_time)s,%(rating)s,%(cost)s,%(level)s,%(tags)s,%(photo_urls)s::jsonb,%(source_cell_id)s,
|
||
%(source_resolution)s,%(source_scope_adcode)s,%(raw_jsonb)s::jsonb)
|
||
ON CONFLICT (graph_name, gaode_poi_id) DO UPDATE SET
|
||
name=EXCLUDED.name, type_label=EXCLUDED.type_label,
|
||
place_type=EXCLUDED.place_type, amap_type=EXCLUDED.amap_type,
|
||
typecode=EXCLUDED.typecode, lng=EXCLUDED.lng, lat=EXCLUDED.lat,
|
||
h3_r6=EXCLUDED.h3_r6, h3_r7=EXCLUDED.h3_r7,
|
||
h3_r8=EXCLUDED.h3_r8, h3_r9=EXCLUDED.h3_r9,
|
||
h3_r10=EXCLUDED.h3_r10, province=EXCLUDED.province,
|
||
city=EXCLUDED.city, district=EXCLUDED.district,
|
||
adcode=EXCLUDED.adcode, business_area=EXCLUDED.business_area,
|
||
address=EXCLUDED.address, tel=EXCLUDED.tel,
|
||
open_time=EXCLUDED.open_time, rating=EXCLUDED.rating,
|
||
cost=EXCLUDED.cost, level=EXCLUDED.level, tags=EXCLUDED.tags,
|
||
photo_urls=EXCLUDED.photo_urls,
|
||
source_cell_id=EXCLUDED.source_cell_id,
|
||
source_resolution=EXCLUDED.source_resolution,
|
||
source_scope_adcode=EXCLUDED.source_scope_adcode,
|
||
raw_jsonb=EXCLUDED.raw_jsonb,
|
||
last_fetched_at=now()
|
||
RETURNING (xmax = 0) AS inserted""",
|
||
{
|
||
**r,
|
||
"photo_urls": json.dumps(r["photo_urls"], ensure_ascii=False),
|
||
"raw_jsonb": json.dumps(r["raw_jsonb"], ensure_ascii=False),
|
||
},
|
||
)
|
||
if cur.fetchone()["inserted"]:
|
||
inserted += 1
|
||
else:
|
||
duplicate += 1
|
||
return inserted, duplicate
|
||
|
||
|
||
def upsert_graph_places(graph_name: str, rows: list[dict[str, Any]]) -> None:
|
||
if not rows:
|
||
return
|
||
g = graph(graph_name)
|
||
for r in rows:
|
||
label = FALKOR_LABEL.get(r["type_label"], "POI")
|
||
query = (
|
||
f"MERGE (p:Place:{label} {{element_id:$element_id}}) "
|
||
"SET p.place_id=$element_id,p.gaode_poi_id=$gaode_poi_id,p.name=$name,"
|
||
"p.place_type=$place_type,p.type_label=$type_label,p.amap_type=$amap_type,"
|
||
"p.typecode=$typecode,p.lng=$lng,p.lat=$lat,p.h3_r6=$h3_r6,p.h3_r7=$h3_r7,"
|
||
"p.h3_r8=$h3_r8,p.h3_r9=$h3_r9,p.h3_r10=$h3_r10,p.province=$province,"
|
||
"p.city=$city,p.district=$district,p.adcode=$adcode,p.address=$address,"
|
||
"p.tel=$tel,p.open_time=$open_time,p.rating=$rating,p.cost=$cost,p.level=$level,"
|
||
"p.tags=$tags,p.photo_urls=$photo_urls,p.source='amap',p.source_cell_id=$source_cell_id,"
|
||
"p.source_resolution=$source_resolution,p.first_seen_at=coalesce(p.first_seen_at,$fetched_at),"
|
||
"p.updated_at=$fetched_at "
|
||
"MERGE (prov:Area {element_id:$province_key}) SET prov.name=$province,prov.level='province' "
|
||
"MERGE (city:Area {element_id:$city_key}) SET city.name=$city,city.level='city' "
|
||
"MERGE (dist:Area {element_id:$district_key}) SET dist.name=$district,dist.level='district',dist.adcode=$adcode "
|
||
"MERGE (p)-[:LOCATED_IN]->(dist) "
|
||
"MERGE (dist)-[:PART_OF]->(city) "
|
||
"MERGE (city)-[:PART_OF]->(prov) "
|
||
"MERGE (c9:GeoCell {h3_id:$h3_r9}) SET c9.resolution=9 "
|
||
"MERGE (p)-[:IN_H3_R9]->(c9)"
|
||
)
|
||
params = {
|
||
**{k: r.get(k, "") for k in [
|
||
"element_id", "gaode_poi_id", "name", "place_type", "type_label",
|
||
"amap_type", "typecode", "h3_r6", "h3_r7", "h3_r8", "h3_r9", "h3_r10",
|
||
"province", "city", "district", "adcode", "address", "tel", "open_time",
|
||
"rating", "cost", "level", "tags", "source_cell_id",
|
||
]},
|
||
"lng": float(r["lng"]),
|
||
"lat": float(r["lat"]),
|
||
"source_resolution": int(r["source_resolution"]),
|
||
"photo_urls": "|".join(r.get("photo_urls") or [])[:1200],
|
||
"province_key": f"area:province:{r.get('province') or 'unknown'}",
|
||
"city_key": f"area:city:{r.get('city') or 'unknown'}",
|
||
"district_key": f"area:district:{r.get('adcode') or r.get('district') or 'unknown'}",
|
||
"fetched_at": now_iso(),
|
||
}
|
||
g.query(query, params)
|
||
|
||
|
||
def update_task(cur, task_id: int, **fields: Any) -> None:
|
||
if not fields:
|
||
return
|
||
cols = ", ".join(f"{k}=%s" for k in fields)
|
||
values = list(fields.values()) + [task_id]
|
||
cur.execute(
|
||
f"UPDATE {settings.db_schema}.amap_spatial_collect_tasks SET {cols}, updated_at=now() WHERE id=%s",
|
||
values,
|
||
)
|
||
|
||
|
||
def split_task(c: psycopg.Connection, task: dict[str, Any]) -> int:
|
||
if int(task["resolution"]) >= MAX_RESOLUTION:
|
||
return 0
|
||
next_res = int(task["resolution"]) + 1
|
||
children = sorted(h3.cell_to_children(task["cell_id"], next_res))
|
||
created = 0
|
||
with c.cursor() as cur:
|
||
for child in children:
|
||
center_lng, center_lat = cell_center(child)
|
||
cur.execute(
|
||
f"""INSERT INTO {settings.db_schema}.amap_spatial_collect_tasks
|
||
(graph_name, scope_name, scope_adcode, city, city_adcode,
|
||
district, district_adcode, cell_id, resolution, parent_cell_id,
|
||
center_lng, center_lat, boundary_jsonb, type_label, typecode, priority)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s)
|
||
ON CONFLICT (graph_name, cell_id, typecode) DO NOTHING""",
|
||
(
|
||
task["graph_name"], task["scope_name"], task["scope_adcode"],
|
||
task["city"], task["city_adcode"], task["district"], task["district_adcode"],
|
||
child, next_res, task["cell_id"], center_lng, center_lat,
|
||
cell_boundary_json(child), task["type_label"], task["typecode"], task["priority"],
|
||
),
|
||
)
|
||
created += cur.rowcount or 0
|
||
return created
|
||
|
||
|
||
def is_quota_error(infocode: str) -> bool:
|
||
return infocode in {"10003", "10004", "10014", "10029"}
|
||
|
||
|
||
def run_collection(graph_name: str, max_api_calls: int, sleep_s: float) -> dict[str, Any]:
|
||
ensure_tables()
|
||
api_calls = 0
|
||
totals = {"tasks": 0, "pois": 0, "inserted": 0, "duplicates": 0, "split_children": 0}
|
||
with conn() as c:
|
||
while max_api_calls <= 0 or api_calls < max_api_calls:
|
||
with c.cursor() as cur:
|
||
cur.execute(
|
||
f"""SELECT * FROM {settings.db_schema}.amap_spatial_collect_tasks
|
||
WHERE graph_name=%s AND status='pending'
|
||
ORDER BY resolution, city_adcode, district_adcode, cell_id, priority, id
|
||
LIMIT 1 FOR UPDATE SKIP LOCKED""",
|
||
(graph_name,),
|
||
)
|
||
task = cur.fetchone()
|
||
if not task:
|
||
break
|
||
update_task(cur, task["id"], status="running", started_at=datetime.now(timezone.utc))
|
||
c.commit()
|
||
|
||
task_total = 0
|
||
task_inserted = 0
|
||
task_dup = 0
|
||
task_pages = 0
|
||
last_raw = 0
|
||
try:
|
||
for page in range(int(task["next_page"]), MAX_PAGE + 1):
|
||
if max_api_calls > 0 and api_calls >= max_api_calls:
|
||
break
|
||
rows, payload = search_cell(task, page)
|
||
api_calls += 1
|
||
infocode = clean(payload.get("infocode"))
|
||
if payload.get("status") != "1":
|
||
with c.cursor() as cur:
|
||
status = "quota_limited" if is_quota_error(infocode) else (
|
||
"error" if int(task["attempt_count"]) + 1 >= 3 else "pending"
|
||
)
|
||
update_task(
|
||
cur,
|
||
task["id"],
|
||
status=status,
|
||
attempt_count=int(task["attempt_count"]) + 1,
|
||
last_error=mask_secret(clean(payload.get("info"))),
|
||
last_infocode=infocode,
|
||
)
|
||
c.commit()
|
||
if status == "quota_limited":
|
||
return {**totals, "api_calls": api_calls, "stopped": "quota_limited"}
|
||
break
|
||
|
||
last_raw = len(payload.get("pois") or [])
|
||
inserted, dup = upsert_pg_pois(c, rows)
|
||
upsert_graph_places(graph_name, rows)
|
||
task_total += last_raw
|
||
task_inserted += inserted
|
||
task_dup += dup
|
||
task_pages += 1
|
||
with c.cursor() as cur:
|
||
update_task(
|
||
cur,
|
||
task["id"],
|
||
next_page=page + 1,
|
||
pages_consumed=int(task["pages_consumed"]) + task_pages,
|
||
fetched_count=int(task["fetched_count"]) + task_total,
|
||
inserted_count=int(task["inserted_count"]) + task_inserted,
|
||
duplicate_count=int(task["duplicate_count"]) + task_dup,
|
||
)
|
||
c.commit()
|
||
if last_raw < PAGE_SIZE:
|
||
break
|
||
time.sleep(sleep_s)
|
||
|
||
final_status = "pending"
|
||
created = 0
|
||
if last_raw < PAGE_SIZE:
|
||
final_status = "done"
|
||
elif task_total >= SATURATE_COUNT and last_raw >= PAGE_SIZE:
|
||
created = split_task(c, task)
|
||
final_status = "saturated" if created else "saturated_max_res"
|
||
with c.cursor() as cur:
|
||
update_task(
|
||
cur,
|
||
task["id"],
|
||
status=final_status,
|
||
finished_at=datetime.now(timezone.utc) if final_status != "pending" else None,
|
||
)
|
||
c.commit()
|
||
totals["tasks"] += 1
|
||
totals["pois"] += task_total
|
||
totals["inserted"] += task_inserted
|
||
totals["duplicates"] += task_dup
|
||
totals["split_children"] += created
|
||
except Exception as exc:
|
||
with c.cursor() as cur:
|
||
update_task(
|
||
cur,
|
||
task["id"],
|
||
status="pending" if int(task["attempt_count"]) + 1 < 3 else "error",
|
||
attempt_count=int(task["attempt_count"]) + 1,
|
||
last_error=mask_secret(str(exc))[:500],
|
||
)
|
||
c.commit()
|
||
time.sleep(sleep_s)
|
||
return {**totals, "api_calls": api_calls, "stopped": "budget_or_done"}
|
||
|
||
|
||
def status(graph_name: str) -> dict[str, Any]:
|
||
ensure_tables()
|
||
with conn() as c:
|
||
with c.cursor() as cur:
|
||
cur.execute(
|
||
f"""SELECT status, resolution, COUNT(*) cnt,
|
||
COALESCE(SUM(fetched_count),0) fetched,
|
||
COALESCE(SUM(inserted_count),0) inserted
|
||
FROM {settings.db_schema}.amap_spatial_collect_tasks
|
||
WHERE graph_name=%s
|
||
GROUP BY status, resolution
|
||
ORDER BY resolution, status""",
|
||
(graph_name,),
|
||
)
|
||
tasks = cur.fetchall()
|
||
cur.execute(
|
||
f"""SELECT place_type, type_label, COUNT(*) cnt
|
||
FROM {settings.db_schema}.amap_spatial_pois
|
||
WHERE graph_name=%s
|
||
GROUP BY place_type, type_label
|
||
ORDER BY cnt DESC""",
|
||
(graph_name,),
|
||
)
|
||
types = cur.fetchall()
|
||
cur.execute(
|
||
f"""SELECT city, district, COUNT(*) cnt
|
||
FROM {settings.db_schema}.amap_spatial_pois
|
||
WHERE graph_name=%s
|
||
GROUP BY city, district
|
||
ORDER BY city, district""",
|
||
(graph_name,),
|
||
)
|
||
districts = cur.fetchall()
|
||
return {"graph": graph_name, "tasks": tasks, "types": types, "districts": districts}
|
||
|
||
|
||
def write_report(graph_name: str, path: Path) -> None:
|
||
payload = status(graph_name)
|
||
lines = [
|
||
f"# 贵阳市 H3 空间采集运行报告",
|
||
"",
|
||
f"- 图谱:`{graph_name}`",
|
||
f"- 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
||
"",
|
||
"## 任务状态",
|
||
"",
|
||
"| 状态 | H3分辨率 | 任务数 | 已取原始POI | 新增入库 |",
|
||
"|---|---:|---:|---:|---:|",
|
||
]
|
||
for row in payload["tasks"]:
|
||
lines.append(f"| {row['status']} | {row['resolution']} | {row['cnt']} | {row['fetched']} | {row['inserted']} |")
|
||
lines.extend(["", "## 类型分布", "", "| 类型 | place_type | 数量 |", "|---|---|---:|"])
|
||
for row in payload["types"]:
|
||
lines.append(f"| {row['type_label']} | {row['place_type']} | {row['cnt']} |")
|
||
lines.extend(["", "## 行政区分布", "", "| 城市 | 区县 | 数量 |", "|---|---|---:|"])
|
||
for row in payload["districts"]:
|
||
lines.append(f"| {row['city']} | {row['district']} | {row['cnt']} |")
|
||
lines.extend([
|
||
"",
|
||
"## 已落入图谱的关键空间字段",
|
||
"",
|
||
"- `lng/lat`:高德原始经纬度。",
|
||
"- `h3_r6` ~ `h3_r10`:多分辨率空间索引,后续 nearby 查询按半径选择。",
|
||
"- `city/district/adcode`:行政区归属,用于市县统计与责任区过滤。",
|
||
"- `source_cell_id/source_resolution`:记录本次从哪个 H3 采集格命中,便于断点续扫与追溯。",
|
||
"- `first_seen_at/updated_at`:图谱节点采集时间与更新时间。",
|
||
])
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||
|
||
|
||
def nearby(graph_name: str, lng: float, lat: float, radius_m: int, limit: int) -> list[dict[str, Any]]:
|
||
if radius_m <= 500:
|
||
res, col, k = 9, "h3_r9", 2
|
||
elif radius_m <= 1000:
|
||
res, col, k = 9, "h3_r9", 4
|
||
elif radius_m <= 3000:
|
||
res, col, k = 8, "h3_r8", 4
|
||
else:
|
||
res, col = 7, "h3_r7"
|
||
k = max(2, math.ceil(radius_m / (math.sqrt(3) * h3.average_hexagon_edge_length(res, unit="m"))))
|
||
cells = list(h3.grid_disk(h3.latlng_to_cell(lat, lng, res), k))
|
||
with conn() as c:
|
||
with c.cursor() as cur:
|
||
cur.execute(
|
||
f"""SELECT name, place_type, type_label, lng, lat, address, district, {col} AS h3_cell
|
||
FROM {settings.db_schema}.amap_spatial_pois
|
||
WHERE graph_name=%s AND {col}=ANY(%s)
|
||
LIMIT 5000""",
|
||
(graph_name, cells),
|
||
)
|
||
rows = cur.fetchall()
|
||
out = []
|
||
for r in rows:
|
||
d = haversine(lng, lat, float(r["lng"]), float(r["lat"]))
|
||
if d <= radius_m:
|
||
out.append({**dict(r), "distance_m": round(d, 1)})
|
||
return sorted(out, key=lambda x: x["distance_m"])[:limit]
|
||
|
||
|
||
def haversine(lng1: float, lat1: float, lng2: float, lat2: float) -> float:
|
||
radius = 6371008.8
|
||
d_lng = math.radians(lng2 - lng1)
|
||
d_lat = math.radians(lat2 - lat1)
|
||
a = (
|
||
math.sin(d_lat / 2) ** 2
|
||
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lng / 2) ** 2
|
||
)
|
||
return 2 * radius * math.asin(math.sqrt(a))
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="AMap H3 spatial collector")
|
||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||
|
||
p_init = sub.add_parser("init")
|
||
p_init.add_argument("--scope", default="贵阳市")
|
||
p_init.add_argument("--graph", default="guiyang_spatial_v1")
|
||
p_init.add_argument("--type-group", choices=sorted(TYPE_GROUPS), default="all")
|
||
p_init.add_argument("--resolution", type=int, default=6)
|
||
p_init.add_argument("--reset", action="store_true")
|
||
p_init.add_argument("--reset-graph", action="store_true")
|
||
|
||
p_run = sub.add_parser("run")
|
||
p_run.add_argument("--graph", default="guiyang_spatial_v1")
|
||
p_run.add_argument("--max-api-calls", type=int, default=300)
|
||
p_run.add_argument("--sleep", type=float, default=0.25)
|
||
|
||
p_status = sub.add_parser("status")
|
||
p_status.add_argument("--graph", default="guiyang_spatial_v1")
|
||
p_status.add_argument("--report", default="")
|
||
|
||
p_nearby = sub.add_parser("nearby")
|
||
p_nearby.add_argument("--graph", default="guiyang_spatial_v1")
|
||
p_nearby.add_argument("--lng", type=float, required=True)
|
||
p_nearby.add_argument("--lat", type=float, required=True)
|
||
p_nearby.add_argument("--radius", type=int, default=1000)
|
||
p_nearby.add_argument("--limit", type=int, default=20)
|
||
|
||
args = parser.parse_args()
|
||
if args.cmd == "init":
|
||
seed_scope(
|
||
scope_keyword=args.scope,
|
||
graph_name=args.graph,
|
||
type_group=args.type_group,
|
||
root_resolution=args.resolution,
|
||
reset=args.reset,
|
||
reset_graph=args.reset_graph,
|
||
)
|
||
elif args.cmd == "run":
|
||
print(json.dumps(run_collection(args.graph, args.max_api_calls, args.sleep), ensure_ascii=False, indent=2))
|
||
elif args.cmd == "status":
|
||
payload = status(args.graph)
|
||
print(json.dumps(payload, ensure_ascii=False, indent=2, default=str))
|
||
if args.report:
|
||
write_report(args.graph, Path(args.report))
|
||
print(f"report written: {args.report}")
|
||
elif args.cmd == "nearby":
|
||
print(json.dumps(nearby(args.graph, args.lng, args.lat, args.radius, args.limit), ensure_ascii=False, indent=2))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|