#!/usr/bin/env python3 """Watch and resume the AMap spatial collector. The collector itself is resumable through the task table. This watchdog keeps a single collector alive until either all pending tasks are consumed or AMap quota limits stop the run. """ from __future__ import annotations import argparse import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path import psycopg ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from app.config import settings # noqa: E402 def ts() -> str: return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S") def collector_pids(graph_name: str) -> list[int]: proc = subprocess.run( ["pgrep", "-fl", "amap_spatial_collect.py"], check=False, capture_output=True, text=True, ) pids: list[int] = [] for line in proc.stdout.splitlines(): if " watch_amap_spatial_collect.py" in line: continue if "amap_spatial_collect.py run" not in line: continue if f"--graph {graph_name}" not in line and f"--graph={graph_name}" not in line: continue parts = line.split(maxsplit=1) try: pids.append(int(parts[0])) except (ValueError, IndexError): continue return pids def stats(graph_name: str) -> dict[str, object]: schema = settings.db_schema with psycopg.connect(settings.database_url) as c: with c.cursor() as cur: cur.execute( f"SELECT count(*) FROM {schema}.amap_spatial_pois WHERE graph_name=%s", (graph_name,), ) total_pois = int(cur.fetchone()[0]) cur.execute( f"""SELECT status, count(*) FROM {schema}.amap_spatial_collect_tasks WHERE graph_name=%s GROUP BY status""", (graph_name,), ) status_counts = {str(k): int(v) for k, v in cur.fetchall()} cur.execute( f"""SELECT type_label, count(*) FROM {schema}.amap_spatial_pois WHERE graph_name=%s GROUP BY type_label ORDER BY count(*) DESC LIMIT 10""", (graph_name,), ) top_types = [(str(k), int(v)) for k, v in cur.fetchall()] return { "total_pois": total_pois, "status_counts": status_counts, "top_types": top_types, } def reset_stale_running(graph_name: str) -> int: schema = settings.db_schema with psycopg.connect(settings.database_url) as c: with c.cursor() as cur: cur.execute( f"""UPDATE {schema}.amap_spatial_collect_tasks SET status='pending', updated_at=now() WHERE graph_name=%s AND status='running' AND updated_at < now() - interval '20 minutes'""", (graph_name,), ) count = cur.rowcount c.commit() return int(count or 0) def start_collector(graph_name: str, sleep_s: float, log_dir: Path) -> tuple[int, Path]: log_dir.mkdir(parents=True, exist_ok=True) log_path = log_dir / f"guiyang_spatial_collect_auto_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" cmd = [ "python3", "-u", "scripts/amap_spatial_collect.py", "run", "--graph", graph_name, "--max-api-calls", "0", "--sleep", str(sleep_s), ] with log_path.open("ab", buffering=0) as f: proc = subprocess.Popen( cmd, cwd=ROOT, stdin=subprocess.DEVNULL, stdout=f, stderr=subprocess.STDOUT, start_new_session=True, ) return proc.pid, log_path def append_report_line(report: Path, line: str) -> None: report.parent.mkdir(parents=True, exist_ok=True) with report.open("a", encoding="utf-8") as f: f.write(line + "\n") def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--graph", default="guiyang_spatial_v1") parser.add_argument("--interval", type=int, default=180) parser.add_argument("--sleep", type=float, default=0.25) parser.add_argument("--once", action="store_true") parser.add_argument("--log-dir", default=str(ROOT / "docs" / "reports")) parser.add_argument("--report", default=str(ROOT / "docs" / "reports" / "guiyang_spatial_watchdog.log")) args = parser.parse_args() log_dir = Path(args.log_dir) report = Path(args.report) append_report_line(report, f"[{ts()}] watchdog started graph={args.graph} interval={args.interval}s") while True: pids = collector_pids(args.graph) s = stats(args.graph) status_counts = s["status_counts"] pending = int(status_counts.get("pending", 0)) quota_limited = int(status_counts.get("quota_limited", 0)) if not pids: stale = reset_stale_running(args.graph) if pending > 0 and quota_limited == 0: pid, log_path = start_collector(args.graph, args.sleep, log_dir) pids = [pid] append_report_line( report, f"[{ts()}] restarted collector pid={pid} stale_running_reset={stale} " f"pending={pending} total_pois={s['total_pois']} log={log_path}", ) else: append_report_line( report, f"[{ts()}] collector idle pending={pending} quota_limited={quota_limited} " f"total_pois={s['total_pois']} status={status_counts}", ) else: append_report_line( report, f"[{ts()}] collector running pids={pids} total_pois={s['total_pois']} " f"pending={pending} status={status_counts} top_types={s['top_types']}", ) if args.once: break time.sleep(max(30, args.interval)) if __name__ == "__main__": main()