""" Platform Worker ================ AwoooP Phase 4: SKIP LOCKED worker + stale run reaper(ADR-114) 2026-05-04 ogt + Claude Sonnet 4.6 功能: 1. Worker Loop:以 SKIP LOCKED 從 awooop_run_state 取 PENDING run 並執行 2. Stale Run Reaper:每 60 秒掃描 lease 過期的 RUNNING run 3. Shadow Mode Enforcer:所有 Phase 4 run 強制 is_shadow=True Worker 設計: - 啟動時以 asyncio.create_task 掛入 main.py lifespan - 多個 worker 安全並行(SKIP LOCKED 保證每 run 只被一個 worker 取得) - Heartbeat 每 15 秒更新 lease(防 stale reaper 誤殺) - 優雅停機:收到 stop signal 後完成當前 run 再退出 與 legacy 的關係: - 完全獨立,不碰任何既有 signal_worker.py / aider_event_processor.py - 只處理 awooop_run_state 表(legacy signal 不寫入此表) """ from __future__ import annotations import asyncio from datetime import datetime, timezone import structlog from src.services.platform_runtime import shadow_execute from src.services.run_state_machine import ( HEARTBEAT_INTERVAL_SECONDS, STALE_REAPER_INTERVAL_SECONDS, acquire_pending_run, heartbeat, reap_stale_runs, ) logger = structlog.get_logger(__name__) # Phase 4 固定處理 awoooi 租戶(Phase 6+ 改為多租戶掃描) _DEFAULT_PROJECT_ID = "awoooi" _WORKER_POLL_INTERVAL_SECONDS = 5 # 無任務時的等待間隔 _WORKER_CONCURRENCY = 2 # 同時最多幾個 run 並行 class PlatformWorker: """ Platform Worker:SKIP LOCKED + shadow execution + stale reaper。 Usage(在 main.py lifespan 中): worker = PlatformWorker() asyncio.create_task(worker.run_loop()) asyncio.create_task(worker.reaper_loop()) """ def __init__(self, project_id: str = _DEFAULT_PROJECT_ID) -> None: self.project_id = project_id self._stop_event = asyncio.Event() self._active_runs: set[str] = set() def stop(self) -> None: """優雅停機信號""" self._stop_event.set() async def run_loop(self) -> None: """ 主 worker loop: 1. 取一筆 PENDING run(SKIP LOCKED) 2. 執行 shadow_execute(不產生 user response) 3. Heartbeat(每 15 秒) 4. 等待 5 秒後重新掃描 """ logger.info("platform_worker_started", project_id=self.project_id) while not self._stop_event.is_set(): try: # 控制並行度 if len(self._active_runs) >= _WORKER_CONCURRENCY: await asyncio.sleep(1) continue run = await acquire_pending_run(self.project_id) if run is None: await asyncio.sleep(_WORKER_POLL_INTERVAL_SECONDS) continue run_id_str = str(run.run_id) self._active_runs.add(run_id_str) # 每個 run 獨立 task,不阻塞 loop asyncio.create_task( self._execute_with_heartbeat(run), name=f"platform_run_{run_id_str[:8]}", ) except asyncio.CancelledError: break except Exception as exc: logger.exception("platform_worker_loop_error", error=str(exc)) await asyncio.sleep(_WORKER_POLL_INTERVAL_SECONDS) logger.info("platform_worker_stopped", project_id=self.project_id) async def _execute_with_heartbeat(self, run: object) -> None: """ 在 shadow_execute 執行期間,同步 heartbeat 防 stale reaper 誤殺。 """ from src.db.awooop_models import AwoooPRunState assert isinstance(run, AwoooPRunState) run_id_str = str(run.run_id) # Heartbeat task(每 15 秒更新 lease) heartbeat_task = asyncio.create_task( self._heartbeat_loop(run.run_id, self.project_id), name=f"heartbeat_{run_id_str[:8]}", ) try: await shadow_execute(run) except Exception as exc: logger.exception( "platform_run_execution_error", run_id=run_id_str, error=str(exc), ) finally: heartbeat_task.cancel() self._active_runs.discard(run_id_str) async def _heartbeat_loop(self, run_id: object, project_id: str) -> None: """每 HEARTBEAT_INTERVAL_SECONDS 秒更新 lease,直到被 cancel""" import uuid as _uuid while True: await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS) try: await heartbeat(run_id, project_id) # type: ignore[arg-type] except Exception as exc: logger.warning( "platform_heartbeat_failed", run_id=str(run_id), error=str(exc), ) async def reaper_loop(self) -> None: """ Stale run reaper:每 60 秒掃描 lease 過期的 RUNNING run。 lease < NOW() + attempt < max → PENDING(retry) lease < NOW() + attempt >= max → FAILED(E-RUN-002) """ logger.info("stale_run_reaper_started", project_id=self.project_id) while not self._stop_event.is_set(): try: await asyncio.sleep(STALE_REAPER_INTERVAL_SECONDS) reaped = await reap_stale_runs(self.project_id) if reaped: logger.info( "stale_run_reaper_cycle", project_id=self.project_id, reaped=reaped, ts=datetime.now(timezone.utc).isoformat(), ) except asyncio.CancelledError: break except Exception as exc: logger.exception("stale_run_reaper_error", error=str(exc)) logger.info("stale_run_reaper_stopped", project_id=self.project_id) # ───────────────────────────────────────────────────────────────────────────── # Singleton(掛入 lifespan 用) # ───────────────────────────────────────────────────────────────────────────── _platform_worker: PlatformWorker | None = None def get_platform_worker() -> PlatformWorker: global _platform_worker if _platform_worker is None: _platform_worker = PlatformWorker() return _platform_worker async def start_platform_worker() -> None: """在 main.py lifespan 中呼叫此函數啟動 worker""" worker = get_platform_worker() asyncio.create_task(worker.run_loop(), name="platform_worker_run_loop") asyncio.create_task(worker.reaper_loop(), name="platform_worker_reaper_loop") logger.info("platform_worker_tasks_started") async def stop_platform_worker() -> None: """在 main.py lifespan 關閉時呼叫""" worker = get_platform_worker() worker.stop() logger.info("platform_worker_stop_requested")