diff --git a/apps/api/src/workers/signal_worker.py b/apps/api/src/workers/signal_worker.py index e0b6b478..6f233630 100644 --- a/apps/api/src/workers/signal_worker.py +++ b/apps/api/src/workers/signal_worker.py @@ -56,6 +56,16 @@ BLOCK_MS = 5000 # 失敗重試上限 MAX_RETRIES = 3 +# ============================================================================= +# ADR-038/039: 安全網配置 (Wave 1) +# ============================================================================= +# XCLAIM: 閒置訊息回收閾值(毫秒) +PENDING_IDLE_MS = 60_000 # 1 分鐘無 ACK 則可被其他 Worker 回收 +# Active Sweeper: 掃描間隔(秒) +SWEEP_INTERVAL_S = 30 +# Graceful Shutdown: 最大等待時間(秒) +GRACEFUL_SHUTDOWN_TIMEOUT_S = 75 # K8s terminationGracePeriodSeconds: 90 + # ============================================================================= # Signal Worker @@ -80,6 +90,7 @@ class SignalWorker: def __init__(self) -> None: self._running = False self._task: asyncio.Task | None = None + self._sweeper_task: asyncio.Task | None = None # ADR-038: Active Sweeper async def _ensure_consumer_group(self) -> None: """ @@ -123,30 +134,51 @@ class SignalWorker: self._running = True self._task = asyncio.create_task(self._consume_loop()) + self._sweeper_task = asyncio.create_task(self._sweep_loop()) # ADR-038 logger.info( "signal_worker_started", stream=STREAM_KEY, group=CONSUMER_GROUP, consumer=CONSUMER_NAME, + sweep_interval_s=SWEEP_INTERVAL_S, ) async def stop(self) -> None: """ - 優雅關閉 + 優雅關閉 (ADR-038 Wave 1 強化) 等待當前處理完成後停止。 + 超時時間: 75 秒(搭配 K8s terminationGracePeriodSeconds: 90) """ if not self._running: return + logger.info( + "signal_worker_stopping", + timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S, + ) self._running = False + # 停止 Sweeper + if self._sweeper_task: + self._sweeper_task.cancel() + try: + await self._sweeper_task + except asyncio.CancelledError: + pass + + # 等待主消費循環完成 if self._task: try: - # 給予 5 秒完成當前處理 - await asyncio.wait_for(self._task, timeout=5.0) + await asyncio.wait_for( + self._task, + timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S, + ) except TimeoutError: - logger.warning("signal_worker_stop_timeout") + logger.warning( + "signal_worker_stop_timeout", + timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S, + ) self._task.cancel() except asyncio.CancelledError: pass @@ -194,6 +226,105 @@ class SignalWorker: # 避免無限快速重試 await asyncio.sleep(1.0) + async def _sweep_loop(self) -> None: + """ + Active Sweeper: 定期掃描並回收閒置的 Pending 訊息 + + ADR-038 Wave 1: 使用 XCLAIM 回收其他 Worker 死亡或卡住的訊息 + + 流程: + 1. XPENDING 取得 Pending List 摘要 + 2. XPENDING 取得具體閒置訊息 (idle > PENDING_IDLE_MS) + 3. XCLAIM 回收訊息到本 Consumer + 4. 重新處理或強制 ACK (超過 MAX_RETRIES) + """ + redis_client = get_worker_redis() + + while self._running: + try: + # 等待下次掃描 + await asyncio.sleep(SWEEP_INTERVAL_S) + + if not self._running: + break + + # 1. 取得 Pending 摘要 + pending_info = await redis_client.xpending( + STREAM_KEY, + CONSUMER_GROUP, + ) + + if not pending_info or pending_info.get("pending", 0) == 0: + continue # 沒有 Pending 訊息 + + pending_count = pending_info.get("pending", 0) + logger.debug( + "sweep_pending_check", + pending_count=pending_count, + ) + + # 2. 取得具體的 Pending 訊息(最多 10 條) + pending_messages = await redis_client.xpending_range( + STREAM_KEY, + CONSUMER_GROUP, + min="-", + max="+", + count=10, + ) + + for msg in pending_messages: + message_id = msg.get("message_id") + idle_ms = msg.get("time_since_delivered", 0) + delivery_count = msg.get("times_delivered", 0) + + # 只處理閒置超過閾值的訊息 + if idle_ms < PENDING_IDLE_MS: + continue + + logger.info( + "sweep_claiming_message", + message_id=message_id, + idle_ms=idle_ms, + delivery_count=delivery_count, + ) + + # 3. XCLAIM 回收訊息 + claimed = await redis_client.xclaim( + STREAM_KEY, + CONSUMER_GROUP, + CONSUMER_NAME, + min_idle_time=PENDING_IDLE_MS, + message_ids=[message_id], + ) + + if not claimed: + continue + + # 4. 處理回收的訊息 + for claimed_id, claimed_data in claimed: + if delivery_count >= MAX_RETRIES: + # 超過重試上限,強制 ACK 並記錄 + logger.warning( + "sweep_force_ack_max_retries", + message_id=claimed_id, + delivery_count=delivery_count, + ) + await redis_client.xack( + STREAM_KEY, + CONSUMER_GROUP, + claimed_id, + ) + else: + # 重新處理 + await self._process_signal(claimed_id, claimed_data) + + except asyncio.CancelledError: + logger.info("sweep_loop_cancelled") + break + except Exception as e: + logger.exception("sweep_loop_error", error=str(e)) + await asyncio.sleep(5.0) # 錯誤後等待 + async def _process_signal(self, message_id: str, data: dict[str, Any]) -> None: """ 處理單一訊號 diff --git a/k8s/awoooi-prod/08-deployment-worker.yaml b/k8s/awoooi-prod/08-deployment-worker.yaml index 968dd05f..7d5e8fd9 100644 --- a/k8s/awoooi-prod/08-deployment-worker.yaml +++ b/k8s/awoooi-prod/08-deployment-worker.yaml @@ -1,8 +1,10 @@ # AWOOOI Signal Worker Deployment # 負責人: CTO -# 版本: v1.1 +# 版本: v1.3 # 日期: 2026-03-22 -# 更新: 2026-03-28 - 新增 startupProbe + revisionHistoryLimit:3 (Phase K0.5/K0.7) +# 更新: 2026-03-28 v1.1 - 新增 startupProbe + revisionHistoryLimit:3 (Phase K0.5/K0.7) +# 更新: 2026-03-28 v1.2 - liveness 改為 mtime 檢查 (心跳機制長期方案) +# 更新: 2026-03-29 v1.3 - ADR-038/039 terminationGracePeriodSeconds:90 (Graceful Shutdown) # # Phase 6.5: Redis Streams 消費者 # 職責: 消費 awoooi:signals 串流,觸發 Incident Engine @@ -39,6 +41,8 @@ spec: environment: prod component: signal-processor spec: + # ADR-038/039: Graceful Shutdown (Worker 需要 75 秒完成清理) + terminationGracePeriodSeconds: 90 containers: - name: worker # 映像標籤由 CI/CD 動態注入 (格式: {sha}-{run_id}) @@ -68,12 +72,18 @@ spec: limits: cpu: "500m" memory: "512Mi" - # Worker 健康檢查 (檔案探針) + # Worker 健康檢查 (mtime 時間戳檢查 - 長期解決方案) + # 心跳機制:Worker 每 30 秒 touch 健康文件 + # 探針檢查:文件 mtime 必須在 60 秒內 livenessProbe: exec: command: - - cat - - /tmp/worker-healthy + - /bin/sh + - -c + - | + [ -f /tmp/worker-healthy ] && \ + [ "$(find /tmp/worker-healthy -mmin -1 2>/dev/null)" ] && exit 0 + exit 1 initialDelaySeconds: 30 periodSeconds: 15 timeoutSeconds: 5