Files
bxh/docs/kg-redesign/spatial_grid_collect_and_query_design.md

22 KiB
Raw Permalink Blame History

贵州省 POI 空间网格采集与查询联合设计

范围:高德 POI 全省采集 + KG nearby 查询共用一套 H3 网格的完整方案 关联:

  • 现有空间 KG 总体设计 docs/kg-redesign/spatial_kg_design.md
  • 召回参数实测 docs/reports/spatial_h3_resolution_sweep_result.md
  • 高德采集器代码 app/agents/gaode_connector.py

1. 目标与原则

1.1 目标

  1. 全省覆盖:贵州省 176,167 km² 内所有公开 POI 都能被有计划地采集。
  2. 不丢数据:高德接口的截断行为可被识别,触发自适应下钻补采。
  3. 查询高效nearby 检索500m / 1km / 3km保持 sub-ms 级召回。
  4. 采集与查询零耦合:同一份 POI 表 / 节点既服务采集任务调度,也服务实时 nearby。

1.2 设计原则

  • 同源 H3:采集网格和查询网格使用同一套 H3 cell仅分辨率不同。H3 各分辨率严格嵌套1 个 r6 = 7 r7 = 49 r8 = 343 r9天然不冲突。
  • 多档预算POI 落盘时一次性算齐 h3_r6 ~ h3_r10 五档索引,查询期零计算。
  • 自适应密度:默认 r6 起手,单 cell × typecode 触发硬上限后下钻到 r7 / r8 / r9。
  • 形状分工:采集用 hex 多边形顶点串调高德;查询用 hex k-ring 召回再做圆形精确过滤。
  • 单点先做对:先把"单 cell × 单 typecode"的采集做到位(识别截断、正确去重),再讨论全省并发和调度优化。

2. 高德接口关键约束(官方)

以下数据来自高德开放平台 Web 服务 API 官方文档实查,是本方案所有阈值和上限的依据。

约束项 取值 来源
单次 polygon 查询总条数上限 200 条page_size 25 × max page 8 Web 服务 API 搜索 POI 文档
offset / page_size 最大值 25 同上
count 字段含义 本次返回条数,不是 polygon 内全量估计 同上
polygon 顶点格式 lng,lat 用逗号,顶点对用 | 分隔,小数 ≤ 6 位 同上
polygon 非矩形 首尾顶点必须相同(闭合) 同上
types 多 typecode | 分隔,但共享 200 条配额 同上
个人开发者 key 配额 500 次 / 日 控制台说明
企业开发者 key 配额 控制台查 + 工单调整 同上
v3 vs v5 v3 稳定通用v5 (/v5/place/polygon) 在企业试用阶段,字段更全(用 show_fields 控制) 文档与发布说明

两个最容易踩的坑

  1. 200 条是接口硬天花板,与 typecode 或 extensions 无关。多 typecode 合并查询只会让密度高的类型挤掉低密度类型,不要合并
  2. count 字段不能用于"是否被截断"的事前预判,必须用"翻完页后的实际累计数 + 最后一页是否仍为满页"作为事后兜底判据。

3. 整体架构

                        贵州省 GeoJSON
                              |
                              |  h3.polygon_to_cells(geojson, res=6)
                              v
                       ~4,900 个 r6 cell
                              |
                              v
                  collect_grid 任务表
                  (cell_id × typecode 行级粒度)
                              |
            +-----------------+-----------------+
            |                                   |
            v                                   |
     [采集侧]                                   |
     worker:                                    |
       hex_to_amap_polygon(cell) -> 高德 API    |
       拉满 8 页或 < 25 条提前结束              |
       fetched >= 180 且末页满 -> split_cell    |
                                                |
                              v                 |
                                                |
                  统一 places 表 / 节点         |
                  { gaode_poi_id, lng, lat,     |
                    h3_r6..h3_r10, typecode }   |
                              |                 |
                              v                 |
            +-----------------+-----------------+
                              |
                              v
                       [查询侧]
                  nearby_pois(lng, lat, radius):
                    500m / 1km -> h3_r9 + k-ring
                    3km        -> h3_r8 + k-ring
                    > 3km      -> h3_r7 + 动态 k
                  WHERE h3_rN IN (cells)
                  再做 haversine 圆形精确过滤

三层清晰分工:

  • 采集侧:以 cell × typecode 为任务粒度,自适应下钻,向 places 表写入。
  • 存储层:唯一一张 places 表 / 一组 Place 节点,承担去重、索引、多分辨率属性。
  • 查询侧:按用户半径选择分辨率列,索引召回 + 精确过滤。

采集和查询不直接互相调用,所有交互都经过 places 表


4. H3 分辨率方案

4.1 分辨率对照H3 官方统计)

res 平均边长 平均面积 贵州省总 cell 数
5 8.54 km 252 km² ~700
6 3.23 km 36.13 km² ~4,900
7 1.22 km 5.16 km² ~34,000
8 461 m 0.737 km² ~240,000
9 174 m 0.105 km² ~1,670,000
10 65.9 m 0.0151 km² ~11,700,000

sweep 报告里的"边长"是 H3 库 average_hexagon_edge_length 返回的"平均到顶点的距离",与本表"平均边长"略有差异,量级一致。

4.2 起手分辨率论证

按贵阳老城实测密度反推每 km² 单 typecode POI 数 ≈ 120

分辨率 老城单格预估 是否爆 200 上限 山区单格预估 是否浪费
r6 ~4,320 必爆 ~36
r7 ~620 必爆 ~5 略浪费
r8 ~89 安全 ~1 严重浪费
r9 ~13 安全 <1 大量空格

结论:

  • 城区合理分辨率是 r8,老城热点商圈可能要 r9
  • 山区/农村合理分辨率是 r6 或 r7
  • 单一分辨率全省扫不可行,必须从粗到细自适应
  • r6 作为起手:山区一次拉完,城区一两次下钻就能解决,初始任务数仅 4,900 × 4 = 19,600调度可控。

4.3 下钻终点

下钻终止于 r9

  • r9 单格 ~0.1 km² × 老城密度 120 ≈ 13 条/typecode绝无溢出风险。
  • r10 太碎(贵州全省 1100 万格),下钻管理成本远大于收益。
  • r9 与查询侧主索引一致,下钻产生的 cell 直接是 nearby 召回的天然单位。

4.4 多档索引存哪些

POI 落盘时一次性算齐 5 档h3_r6, h3_r7, h3_r8, h3_r9, h3_r10

  • r6 / r7:服务大半径(> 3km查询和采集任务反查。
  • r8:服务 3km 查询sweep 实测最优)。
  • r9:服务 500m / 1km 查询sweep 实测最优)。
  • r10:预留给"高密度商圈临时升精度"场景。
  • 不存 r11+sweep 已验证 r11 在 3km 时 k=31 仅覆盖 36.5%,无生产价值。

5. 采集侧设计

5.1 任务表 collect_grid

CREATE TABLE collect_grid (
  cell_id          TEXT,         -- H3 index混合分辨率字符串形式
  resolution       SMALLINT,     -- 6 / 7 / 8 / 9
  typecode         TEXT,         -- 110000 景点 / 050000 美食 / 100000 酒店 / 060000 商场
  parent_cell_id   TEXT,         -- 下钻血缘NULL 表示根 r6
  status           TEXT,         -- pending | running | done | saturated | error
  fetched_count    INT,
  pages_consumed   INT,
  attempt_count    INT DEFAULT 0,
  last_attempt_at  TIMESTAMPTZ,
  last_error       TEXT,
  PRIMARY KEY (cell_id, typecode)
);

CREATE INDEX ON collect_grid (status, resolution);
CREATE INDEX ON collect_grid (parent_cell_id);

字段说明:

  • cell_idtypecode 组成联合主键,同一个 cell 内每个 typecode 独立调度
  • parent_cell_id 用于下钻血缘追溯debug 和重采)。
  • status 状态机见 5.5。

5.2 初始化

import h3
import json

def init_province_grid(geojson_path: str) -> int:
    geo = json.load(open(geojson_path))
    # h3 v4 接受 geojson 风格的 polygon
    cells = h3.polygon_to_cells(geo, res=6)
    typecodes = ["110000", "050000", "100000", "060000"]
    rows = [
        (cell, 6, tc, None, "pending", 0, 0, 0, None, None)
        for cell in cells for tc in typecodes
    ]
    bulk_insert("collect_grid", rows)
    return len(rows)  # ~19,600

省界严格裁剪:h3.polygon_to_cells 默认只返回中心点落在 polygon 内的 cell。跨省界的 POI 不属于贵州,主动丢弃。

5.3 Polygon 参数生成

高德 polygon 参数要求 lng,lat 顺序、| 分隔、6 位小数、非矩形闭合。

def hex_to_amap_polygon(cell_id: str) -> str:
    boundary = h3.cell_to_boundary(cell_id)   # [(lat, lng), ...] × 6
    pts = [(lng, lat) for lat, lng in boundary]
    pts.append(pts[0])                        # 闭合:首尾相同
    return "|".join(f"{lng:.6f},{lat:.6f}" for lng, lat in pts)

5.4 单 cell × 单 typecode 采集循环

HARD_CAP = 200          # 高德 polygon 接口硬上限
SATURATE = 180          # 触发下钻的阈值(留 10% 缓冲)
PAGE_SIZE = 25
MAX_PAGE = HARD_CAP // PAGE_SIZE   # = 8

def collect_cell(cell_id: str, typecode: str) -> str:
    """返回: done | saturatedPOI 已落盘"""
    all_pois: list[dict] = []
    last_raw = 0
    polygon = hex_to_amap_polygon(cell_id)
    for page in range(1, MAX_PAGE + 1):
        pois, raw = search_polygon_v3(
            polygon=polygon, typecode=typecode,
            offset=PAGE_SIZE, page=page,
        )
        all_pois.extend(pois)
        last_raw = raw
        if raw < PAGE_SIZE:        # 高德判定已拉尽
            break

    save_pois(all_pois)            # 见 7.1

    # 唯一可靠的"截断"信号:累计接近硬天花板 + 最后一页仍是满页
    if len(all_pois) >= SATURATE and last_raw >= PAGE_SIZE:
        split_cell(cell_id, typecode)
        return "saturated"
    return "done"

5.5 状态机

                    +-----------+
                    |  pending  |
                    +-----+-----+
                          |
                  (worker pick)
                          v
                    +-----------+
                    |  running  |
                    +-----+-----+
              +-----------+-----------+----------+
              |                       |          |
   fetched < SATURATE         fetched >= SATURATE  异常
              |                  且末页满          |
              v                       v            v
        +----------+          +-------------+  +---------+
        |   done   |          |  saturated  |  |  error  |
        +----------+          +------+------+  +----+----+
                                     |              |
                              split_cell    attempt < 3 -> pending
                                     |              |
                                     v              v
                              7 个子格 pending   终止

5.6 下钻

def split_cell(parent_cell: str, typecode: str) -> None:
    res = h3.get_resolution(parent_cell)
    if res >= 9:
        return                              # 已是下钻终点,不再细分
    children = h3.cell_to_children(parent_cell, res + 1)
    for child in children:                  # 1 -> 7
        upsert("collect_grid",
               cell_id=child, resolution=res + 1, typecode=typecode,
               parent_cell_id=parent_cell, status="pending",
               fetched_count=0, pages_consumed=0, attempt_count=0)

关键纪律

  • 每个 typecode 独立判断、独立下钻。同一个 cell 的美食爆了不影响酒店。
  • 子格状态从 pending 重新开始,不复用父格状态。
  • 已触发过下钻的父行保留 saturated 状态作为审计记录,不删。

5.7 限流与重试

  • 启动 worker 时按 key 类型设置 QPS 上限(个人 key 建议 ≤ 2 QPS企业按工单配额
  • 任意 HTTP 异常或高德返回 status != "1"attempt_count += 1,回 pending3 次后终止为 error
  • 退避:每次重试间隔 2 ** attempt 秒。

6. 查询侧设计

完全沿用 docs/reports/spatial_h3_resolution_sweep_result.md 的实测推荐,无需任何参数修改

6.1 半径 → 分辨率 + k-ring 映射

半径 索引列 k-ring sweep 实测耗时 候选膨胀
≤ 500m h3_r9 2 0.30 ms 2.11×
≤ 1000m h3_r9 4 0.89 ms 1.71×
≤ 3000m h3_r8 4 3.58 ms 1.21×
> 3000m h3_r7 按需

6.2 查询函数

import math
import h3

def nearby_pois(lng: float, lat: float, radius_m: int,
                typecode: str | None = None) -> list[dict]:
    if radius_m <= 500:
        res, col, k = 9, "h3_r9", 2
    elif radius_m <= 1000:
        res, col, k = 9, "h3_r9", 4
    elif radius_m <= 3000:
        res, col, k = 8, "h3_r8", 4
    else:
        res, col = 7, "h3_r7"
        edge = h3.average_hexagon_edge_length(7, unit="m")
        k = max(2, math.ceil(radius_m / edge))

    center = h3.latlng_to_cell(lat, lng, res)
    cells = list(h3.grid_disk(center, k))

    candidates = query_db(f"""
        SELECT gaode_poi_id, name, typecode, lng, lat
        FROM places
        WHERE {col} = ANY(:cells)
          {"AND typecode = :typecode" if typecode else ""}
    """, cells=cells, typecode=typecode)

    # H3 是六边形不是圆,必须再做精确距离过滤
    return [
        {**p, "distance_m": d}
        for p in candidates
        if (d := haversine_m(lng, lat, p["lng"], p["lat"])) <= radius_m
    ]

关键纪律

  • 查询时不再算 h3(落盘已算齐),只做 WHERE h3_rN IN (cells)B-tree 索引秒命中。
  • 六边形 k-ring 召回 ≠ 圆形,最后一定要 haversine 圆形过滤。

7. 存储设计

7.1 POI 表

CREATE TABLE places (
  gaode_poi_id    TEXT PRIMARY KEY,           -- ★ 去重主键
  name            TEXT NOT NULL,
  typecode        TEXT NOT NULL,
  type            TEXT,                       -- 中文类型名
  lng             DOUBLE PRECISION NOT NULL,
  lat             DOUBLE PRECISION NOT NULL,
  h3_r6           TEXT NOT NULL,
  h3_r7           TEXT NOT NULL,
  h3_r8           TEXT NOT NULL,
  h3_r9           TEXT NOT NULL,
  h3_r10          TEXT NOT NULL,
  province        TEXT,
  city            TEXT,
  district        TEXT,
  business_area   TEXT,
  address         TEXT,
  tel             TEXT,
  rating          REAL,
  source_cell_id  TEXT,                       -- 最近一次采集来源 cell (debug)
  source_res      SMALLINT,
  fetched_at      TIMESTAMPTZ NOT NULL,
  updated_at      TIMESTAMPTZ NOT NULL
);

CREATE INDEX ON places (h3_r9);               -- 500m / 1km 主查询索引
CREATE INDEX ON places (h3_r8);               -- 3km 查询索引
CREATE INDEX ON places (h3_r7);               -- > 3km 预留
CREATE INDEX ON places (typecode);
CREATE INDEX ON places (city, district);

图谱版本FalkorDB / Neo4j把以上列作为节点属性索引等价

CREATE INDEX FOR (p:Place) ON (p.h3_r9);
CREATE INDEX FOR (p:Place) ON (p.h3_r8);
CREATE INDEX FOR (p:Place) ON (p.h3_r7);
CREATE INDEX FOR (p:Place) ON (p.typecode);

7.2 写入函数

def save_pois(pois: list[dict]) -> int:
    rows = []
    now = datetime.utcnow()
    for p in pois:
        lat = float(p["lat"]); lng = float(p["lng"])
        if not (103.0 <= lng <= 110.5 and 24.0 <= lat <= 29.8):
            continue                          # 落在贵州合理范围外
        rows.append({
            "gaode_poi_id": p["gaode_poi_id"],
            "name": p["name"], "typecode": p["typecode"], "type": p.get("type"),
            "lng": lng, "lat": lat,
            "h3_r6":  h3.latlng_to_cell(lat, lng, 6),
            "h3_r7":  h3.latlng_to_cell(lat, lng, 7),
            "h3_r8":  h3.latlng_to_cell(lat, lng, 8),
            "h3_r9":  h3.latlng_to_cell(lat, lng, 9),
            "h3_r10": h3.latlng_to_cell(lat, lng, 10),
            **{k: p.get(k) for k in [
                "province","city","district","business_area",
                "address","tel","rating"]},
            "fetched_at": now, "updated_at": now,
        })
    return upsert_on_conflict("places", rows, key="gaode_poi_id")

UPSERT 在 PG 用 ON CONFLICT (gaode_poi_id) DO UPDATE,保证多次采集重叠时只保留最新版本。


8. 联合不冲突的原理

H3 严格嵌套:

1 个 r6 cell  ==  7 个 r7 子 cell  ==  49 个 r8  ==  343 个 r9  ==  2,401 个 r10

因此一个 POI 在 r6 是 cell-A必然在 cell-A 的某个 r7 / r8 / r9 / r10 子格里。落盘时一次算齐所有分辨率后:

  • 采集侧:调度任务以 cell × typecode 为单位,下钻不会丢失 POI子格集合恰好等于父格
  • 查询侧按任意分辨率召回POI 的 h3_rN 列直接命中索引。
  • 去重:始终以 gaode_poi_id 为唯一主键,跨 cell 重叠(如果用矩形 bbox 而非 hex不会产生重复行。

形状差异由各侧自己处理,互不影响:

形状 处理方式
采集 hex 顶点串 直接调高德 polygon APIhex tiling 无重叠
查询召回 hex k-ring H3 库返回
查询精排 圆形 haversine 距离过滤

9. API 用量预估

按贵州 POI 分布估算(基于贵阳老城实测密度推断):

区域类型 估计 r6 数 平均最终 cell × 4 typecode 平均页数 API 调用
老城(贵阳/遵义/凯里) ~50 50 × 49 r8 × 4 ≈ 9,800 ~4 ~39,200
各市县城区 ~400 400 × 7 r7 × 4 ≈ 11,200 ~3 ~33,600
镇区 ~1,500 1,500 × 4 = 6,000 ~2 ~12,000
乡村山区 ~2,950 2,950 × 4 = 11,800 12 ~17,700
合计 ~4,900 ~38,800 cell-task ~102,500 次

按 key 类型折算:

Key 类型 估计日配额 首轮全省采集耗时
个人开发者 500 ~205 天(不可行)
企业(小) ~50,000 ~2 天
企业(标准) ~300,000 ~8 小时

结论:业务正式上线必须使用企业 key。个人 key 仅适合 5.4 节验证脚本。


10. 风险与对策

风险 影响 对策
高德 polygon count 字段不可信用于事前预测 浪费 quota 或误判 仅用"累计 ≥ 180 且末页满"作为唯一截断信号
单一 typecode 在小区域内极端密集(如商场区美食) 即使 r9 仍可能爆 200 下钻终点 r9 通常足够;极端情况手工标记 r10
高德接口 typecode 编码升级 / 子类型新增 漏类别 启动时调用 available_types,定期与高德官方对照表 diff
POI 跨 cell 漂移(坐标更新) 旧 cell 索引失效 增量采集时按 gaode_poi_id UPSERT重算 h3
高德 QPS / 日配额超限 限流或封禁 worker 启动配 token bucket监控错误码 10003 / 10014
个人 key 多账号轮转 封号 / 数据合规问题 不推荐;正式采集走企业 key
省界裁剪时跨省 POI 数据归属混乱 h3.polygon_to_cells 默认按中心点裁POI 落盘时再按 bbox 二次校验
v3 接口未来下线 重写迁移成本 接口层封装 search_h3_cell(cell, typecode),底层切换 v3/v5 透明

11. 落地清单

按顺序,每步独立可验证。

  1. scripts/build_province_grid.py:拉贵州 geojsonh3.polygon_to_cells(res=6),写入 collect_grid 共 ~19,600 行 pending。

    • 验证点cell 数量在 4,8005,000 之间;可在地图上可视化省界覆盖。
    • 零 API 调用。
  2. 校准小样本30 min< 100 次 API:选 1 个贵阳老城 r6 cell + 1 个山区 r6 cell × 4 typecode验证

    • 200 条上限是否真的触发
    • hex polygon 闭合格式是否被高德正确解析
    • 各 typecode 的实际密度,校准 SATURATE 阈值
  3. 改造 gaode_connector.py:新增 search_h3_cell(cell_id, typecode, page),内部用 hex_to_amap_polygon,保留现有 search_polygon(bbox) 兼容。

  4. scripts/collect_worker.py:实现 5.4 / 5.5 / 5.6 的 worker配 QPS 限流和重试退避。

  5. POI 写入适配save_pois 一次算齐 5 档 h3UPSERT 入库 / 图谱。

  6. 单市试点(贵阳)~200 个 r6 cell × 4 typecode端到端跑通采集 → 落盘 → 查询闭环。

  7. 全省扩展:按试点实测的 API 用量等比放大,确认企业 key 配额够。

  8. nearby 查询替换:把现有 h3_neighbor_cells 调用换成 6.2 的 nearby_poissweep 验证过的参数直接用。


12. 待确认项

正式开工前必须回答:

  1. 当前高德 key 是个人还是企业认证? 实际日配额 / QPS 是多少?(去控制台 → 流量分析 → 配额管理)
  2. 存储后端POI 主表落在 Postgres 还是 FalkorDB两者 schema 设计兼容,但索引语法和并发模型不同。
  3. 采集触发方式:一次性全省扫 + 增量更新,还是按需扫(用户区域热力驱动)?影响 collect_grid 的优先级字段设计。
  4. v3 还是 v5v5 字段更全但企业试用,是否申请?

13. 参考资料