188 lines
6.1 KiB
Python
188 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Watch and resume the AMap spatial collector.
|
|
|
|
The collector itself is resumable through the task table. This watchdog keeps a
|
|
single collector alive until either all pending tasks are consumed or AMap quota
|
|
limits stop the run.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import psycopg
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from app.config import settings # noqa: E402
|
|
|
|
|
|
def ts() -> str:
|
|
return datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def collector_pids(graph_name: str) -> list[int]:
|
|
proc = subprocess.run(
|
|
["pgrep", "-fl", "amap_spatial_collect.py"],
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
pids: list[int] = []
|
|
for line in proc.stdout.splitlines():
|
|
if " watch_amap_spatial_collect.py" in line:
|
|
continue
|
|
if "amap_spatial_collect.py run" not in line:
|
|
continue
|
|
if f"--graph {graph_name}" not in line and f"--graph={graph_name}" not in line:
|
|
continue
|
|
parts = line.split(maxsplit=1)
|
|
try:
|
|
pids.append(int(parts[0]))
|
|
except (ValueError, IndexError):
|
|
continue
|
|
return pids
|
|
|
|
|
|
def stats(graph_name: str) -> dict[str, object]:
|
|
schema = settings.db_schema
|
|
with psycopg.connect(settings.database_url) as c:
|
|
with c.cursor() as cur:
|
|
cur.execute(
|
|
f"SELECT count(*) FROM {schema}.amap_spatial_pois WHERE graph_name=%s",
|
|
(graph_name,),
|
|
)
|
|
total_pois = int(cur.fetchone()[0])
|
|
cur.execute(
|
|
f"""SELECT status, count(*)
|
|
FROM {schema}.amap_spatial_collect_tasks
|
|
WHERE graph_name=%s
|
|
GROUP BY status""",
|
|
(graph_name,),
|
|
)
|
|
status_counts = {str(k): int(v) for k, v in cur.fetchall()}
|
|
cur.execute(
|
|
f"""SELECT type_label, count(*)
|
|
FROM {schema}.amap_spatial_pois
|
|
WHERE graph_name=%s
|
|
GROUP BY type_label
|
|
ORDER BY count(*) DESC
|
|
LIMIT 10""",
|
|
(graph_name,),
|
|
)
|
|
top_types = [(str(k), int(v)) for k, v in cur.fetchall()]
|
|
return {
|
|
"total_pois": total_pois,
|
|
"status_counts": status_counts,
|
|
"top_types": top_types,
|
|
}
|
|
|
|
|
|
def reset_stale_running(graph_name: str) -> int:
|
|
schema = settings.db_schema
|
|
with psycopg.connect(settings.database_url) as c:
|
|
with c.cursor() as cur:
|
|
cur.execute(
|
|
f"""UPDATE {schema}.amap_spatial_collect_tasks
|
|
SET status='pending', updated_at=now()
|
|
WHERE graph_name=%s
|
|
AND status='running'
|
|
AND updated_at < now() - interval '20 minutes'""",
|
|
(graph_name,),
|
|
)
|
|
count = cur.rowcount
|
|
c.commit()
|
|
return int(count or 0)
|
|
|
|
|
|
def start_collector(graph_name: str, sleep_s: float, log_dir: Path) -> tuple[int, Path]:
|
|
log_dir.mkdir(parents=True, exist_ok=True)
|
|
log_path = log_dir / f"guiyang_spatial_collect_auto_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
|
cmd = [
|
|
"python3",
|
|
"-u",
|
|
"scripts/amap_spatial_collect.py",
|
|
"run",
|
|
"--graph",
|
|
graph_name,
|
|
"--max-api-calls",
|
|
"0",
|
|
"--sleep",
|
|
str(sleep_s),
|
|
]
|
|
with log_path.open("ab", buffering=0) as f:
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
cwd=ROOT,
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=f,
|
|
stderr=subprocess.STDOUT,
|
|
start_new_session=True,
|
|
)
|
|
return proc.pid, log_path
|
|
|
|
|
|
def append_report_line(report: Path, line: str) -> None:
|
|
report.parent.mkdir(parents=True, exist_ok=True)
|
|
with report.open("a", encoding="utf-8") as f:
|
|
f.write(line + "\n")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--graph", default="guiyang_spatial_v1")
|
|
parser.add_argument("--interval", type=int, default=180)
|
|
parser.add_argument("--sleep", type=float, default=0.25)
|
|
parser.add_argument("--once", action="store_true")
|
|
parser.add_argument("--log-dir", default=str(ROOT / "docs" / "reports"))
|
|
parser.add_argument("--report", default=str(ROOT / "docs" / "reports" / "guiyang_spatial_watchdog.log"))
|
|
args = parser.parse_args()
|
|
|
|
log_dir = Path(args.log_dir)
|
|
report = Path(args.report)
|
|
append_report_line(report, f"[{ts()}] watchdog started graph={args.graph} interval={args.interval}s")
|
|
|
|
while True:
|
|
pids = collector_pids(args.graph)
|
|
s = stats(args.graph)
|
|
status_counts = s["status_counts"]
|
|
pending = int(status_counts.get("pending", 0))
|
|
quota_limited = int(status_counts.get("quota_limited", 0))
|
|
|
|
if not pids:
|
|
stale = reset_stale_running(args.graph)
|
|
if pending > 0 and quota_limited == 0:
|
|
pid, log_path = start_collector(args.graph, args.sleep, log_dir)
|
|
pids = [pid]
|
|
append_report_line(
|
|
report,
|
|
f"[{ts()}] restarted collector pid={pid} stale_running_reset={stale} "
|
|
f"pending={pending} total_pois={s['total_pois']} log={log_path}",
|
|
)
|
|
else:
|
|
append_report_line(
|
|
report,
|
|
f"[{ts()}] collector idle pending={pending} quota_limited={quota_limited} "
|
|
f"total_pois={s['total_pois']} status={status_counts}",
|
|
)
|
|
else:
|
|
append_report_line(
|
|
report,
|
|
f"[{ts()}] collector running pids={pids} total_pois={s['total_pois']} "
|
|
f"pending={pending} status={status_counts} top_types={s['top_types']}",
|
|
)
|
|
|
|
if args.once:
|
|
break
|
|
time.sleep(max(30, args.interval))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|