Files
bxh/scripts/watch_amap_spatial_collect.py

188 lines
6.1 KiB
Python

#!/usr/bin/env python3
"""Watch and resume the AMap spatial collector.
The collector itself is resumable through the task table. This watchdog keeps a
single collector alive until either all pending tasks are consumed or AMap quota
limits stop the run.
"""
from __future__ import annotations
import argparse
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.config import settings # noqa: E402
def ts() -> str:
return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
def collector_pids(graph_name: str) -> list[int]:
proc = subprocess.run(
["pgrep", "-fl", "amap_spatial_collect.py"],
check=False,
capture_output=True,
text=True,
)
pids: list[int] = []
for line in proc.stdout.splitlines():
if " watch_amap_spatial_collect.py" in line:
continue
if "amap_spatial_collect.py run" not in line:
continue
if f"--graph {graph_name}" not in line and f"--graph={graph_name}" not in line:
continue
parts = line.split(maxsplit=1)
try:
pids.append(int(parts[0]))
except (ValueError, IndexError):
continue
return pids
def stats(graph_name: str) -> dict[str, object]:
schema = settings.db_schema
with psycopg.connect(settings.database_url) as c:
with c.cursor() as cur:
cur.execute(
f"SELECT count(*) FROM {schema}.amap_spatial_pois WHERE graph_name=%s",
(graph_name,),
)
total_pois = int(cur.fetchone()[0])
cur.execute(
f"""SELECT status, count(*)
FROM {schema}.amap_spatial_collect_tasks
WHERE graph_name=%s
GROUP BY status""",
(graph_name,),
)
status_counts = {str(k): int(v) for k, v in cur.fetchall()}
cur.execute(
f"""SELECT type_label, count(*)
FROM {schema}.amap_spatial_pois
WHERE graph_name=%s
GROUP BY type_label
ORDER BY count(*) DESC
LIMIT 10""",
(graph_name,),
)
top_types = [(str(k), int(v)) for k, v in cur.fetchall()]
return {
"total_pois": total_pois,
"status_counts": status_counts,
"top_types": top_types,
}
def reset_stale_running(graph_name: str) -> int:
schema = settings.db_schema
with psycopg.connect(settings.database_url) as c:
with c.cursor() as cur:
cur.execute(
f"""UPDATE {schema}.amap_spatial_collect_tasks
SET status='pending', updated_at=now()
WHERE graph_name=%s
AND status='running'
AND updated_at < now() - interval '20 minutes'""",
(graph_name,),
)
count = cur.rowcount
c.commit()
return int(count or 0)
def start_collector(graph_name: str, sleep_s: float, log_dir: Path) -> tuple[int, Path]:
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / f"guiyang_spatial_collect_auto_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
cmd = [
"python3",
"-u",
"scripts/amap_spatial_collect.py",
"run",
"--graph",
graph_name,
"--max-api-calls",
"0",
"--sleep",
str(sleep_s),
]
with log_path.open("ab", buffering=0) as f:
proc = subprocess.Popen(
cmd,
cwd=ROOT,
stdin=subprocess.DEVNULL,
stdout=f,
stderr=subprocess.STDOUT,
start_new_session=True,
)
return proc.pid, log_path
def append_report_line(report: Path, line: str) -> None:
report.parent.mkdir(parents=True, exist_ok=True)
with report.open("a", encoding="utf-8") as f:
f.write(line + "\n")
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--graph", default="guiyang_spatial_v1")
parser.add_argument("--interval", type=int, default=180)
parser.add_argument("--sleep", type=float, default=0.25)
parser.add_argument("--once", action="store_true")
parser.add_argument("--log-dir", default=str(ROOT / "docs" / "reports"))
parser.add_argument("--report", default=str(ROOT / "docs" / "reports" / "guiyang_spatial_watchdog.log"))
args = parser.parse_args()
log_dir = Path(args.log_dir)
report = Path(args.report)
append_report_line(report, f"[{ts()}] watchdog started graph={args.graph} interval={args.interval}s")
while True:
pids = collector_pids(args.graph)
s = stats(args.graph)
status_counts = s["status_counts"]
pending = int(status_counts.get("pending", 0))
quota_limited = int(status_counts.get("quota_limited", 0))
if not pids:
stale = reset_stale_running(args.graph)
if pending > 0 and quota_limited == 0:
pid, log_path = start_collector(args.graph, args.sleep, log_dir)
pids = [pid]
append_report_line(
report,
f"[{ts()}] restarted collector pid={pid} stale_running_reset={stale} "
f"pending={pending} total_pois={s['total_pois']} log={log_path}",
)
else:
append_report_line(
report,
f"[{ts()}] collector idle pending={pending} quota_limited={quota_limited} "
f"total_pois={s['total_pois']} status={status_counts}",
)
else:
append_report_line(
report,
f"[{ts()}] collector running pids={pids} total_pois={s['total_pois']} "
f"pending={pending} status={status_counts} top_types={s['top_types']}",
)
if args.once:
break
time.sleep(max(30, args.interval))
if __name__ == "__main__":
main()