""" AWOOOI AIOps Phase 1 — 執行後驗證器 ===================================== 每次 AI 修復動作執行後,主動用 MCP 抓取環境後狀態, 與 EvidenceSnapshot.pre_execution_state 對比, 判斷修復是否真的有效。 驗證結果三態: - "success" — 問題已解決(Pod Running / 指標恢復正常) - "degraded" — 部分改善但未完全恢復 - "failed" — 執行後狀態比執行前更差,或完全未改善 - "timeout" — 驗證超時(MCP 無法回應) 驗證結果用途: 1. 填入 EvidenceSnapshot.verification_result(Phase 3 學習閉環基礎) 2. 傳給 learning_service 更新 Playbook EWMA trust_score 3. 觸發 Reviewer Agent 的 rollback 決策(Phase 2) 設計原則: - 執行後等待 warm-up period(預設 10s),讓 K8s controller 有時間收斂 - 超時不 raise,標記 "timeout" 並繼續流程 - 不阻塞原始執行路徑(await,但結果不影響執行本身是否成功) ADR-081: PreDecisionInvestigator + EvidenceSnapshot MASTER §3.1 L6×D1 2026-04-15 ogt + Claude Sonnet 4.6 (亞太): Phase 1 初始建立 """ from __future__ import annotations import asyncio import time from typing import TYPE_CHECKING, Any import structlog from src.services.evidence_snapshot import EvidenceSnapshot from src.services.mcp_tool_registry import SensorDimension, get_mcp_tool_registry from src.services.sanitization_service import sanitize_dict_values if TYPE_CHECKING: from src.models.incident import Incident logger = structlog.get_logger(__name__) # 執行後等待收斂時間(秒)— K8s controller 需要時間處理重啟/滾動更新 POST_EXEC_WARMUP_SEC = 10.0 # 驗證超時(秒) VERIFY_TIMEOUT_SEC = 30.0 # MCP 單工具超時(秒) TOOL_TIMEOUT_SEC = 8.0 class PostExecutionVerifier: """ 執行後環境狀態驗證器。 在 approval_execution.py 的 execute_approved_action() 中, 執行動作後呼叫 verify(),取得驗證結果並補填 EvidenceSnapshot。 Usage: verifier = get_post_execution_verifier() result = await verifier.verify( incident=incident, snapshot=pre_decision_snapshot, action_taken="restart_service:awoooi-api", ) # result: "success" | "degraded" | "failed" | "timeout" """ def __init__(self) -> None: self._registry = get_mcp_tool_registry() async def verify( self, incident: "Incident", snapshot: EvidenceSnapshot | None, action_taken: str, warmup_sec: float = POST_EXEC_WARMUP_SEC, ) -> str: """ 執行後驗證。 Args: incident: 原始 Incident(用於取 labels 定位資源) snapshot: 執行前的 EvidenceSnapshot(取 pre_execution_state 作基準線) action_taken: 執行的動作描述(例如 "restart_service:awoooi-api") warmup_sec: 等待 K8s 收斂的秒數 Returns: str: "success" | "degraded" | "failed" | "timeout" """ incident_id = _get_incident_id(incident) logger.info( "verifier_start", incident_id=incident_id, action=action_taken, warmup_sec=warmup_sec, ) # 1. 等待收斂 if warmup_sec > 0: await asyncio.sleep(warmup_sec) # 2. 抓後狀態 try: post_state = await asyncio.wait_for( self._collect_post_state(incident), timeout=VERIFY_TIMEOUT_SEC, ) except asyncio.TimeoutError: logger.warning("verifier_timeout", incident_id=incident_id) if snapshot: await _update_snapshot(snapshot, {}, "timeout") return "timeout" except Exception: logger.exception("verifier_collect_error", incident_id=incident_id) if snapshot: await _update_snapshot(snapshot, {}, "failed") return "failed" # 3. 對比前後狀態 pre_state = snapshot.pre_execution_state if snapshot else None result = _assess_recovery(pre_state, post_state, action_taken) # 4. 更新 EvidenceSnapshot if snapshot: await _update_snapshot(snapshot, post_state, result) logger.info( "verifier_done", incident_id=incident_id, result=result, action=action_taken, ) return result async def capture_pre_execution_state( self, incident: "Incident", snapshot: EvidenceSnapshot, ) -> None: """ 執行前快照當前狀態,寫入 snapshot.pre_execution_state。 在 approval_execution.py 的動作執行「之前」呼叫。 """ incident_id = _get_incident_id(incident) try: state = await asyncio.wait_for( self._collect_post_state(incident), # 同樣的抓取邏輯 timeout=TOOL_TIMEOUT_SEC, ) snapshot.pre_execution_state = state logger.debug("verifier_pre_state_captured", incident_id=incident_id) except Exception: logger.warning("verifier_pre_state_failed", incident_id=incident_id) snapshot.pre_execution_state = {} async def _collect_post_state(self, incident: "Incident") -> dict[str, Any]: """ 蒐集執行後環境狀態(K8s Pod 狀態 + 關鍵指標)。 只選 D1(K8s 狀態)和 D3(指標)作為驗證基準線, 其他感官維度(日誌、拓撲等)在驗證時不必要。 """ state: dict[str, Any] = {} alertname = _get_alertname(incident) labels = _get_labels(incident) # 取 D1 + D3 工具 all_tools = self._registry.suggest_tools(alertname=alertname, incident_labels=labels) verify_tools = [ t for t in all_tools if any(d in (SensorDimension.D1_K8S_STATE, SensorDimension.D3_METRICS) for d in t.dimensions) ] params = { "namespace": labels.get("namespace", "awoooi-prod"), "pod_name": labels.get("pod", labels.get("name", "")), "deployment": labels.get("deployment", ""), "host": labels.get("instance", "").split(":")[0] or labels.get("host", ""), } async def _call_one(reg) -> tuple[str, Any]: try: result = await asyncio.wait_for( reg.provider.execute(reg.tool.name, params), timeout=TOOL_TIMEOUT_SEC, ) if result.success and result.output: return reg.tool.name, result.output except Exception: pass return reg.tool.name, None results = await asyncio.gather(*[_call_one(t) for t in verify_tools]) for tool_name, output in results: if output is not None: if isinstance(output, dict): state[tool_name] = sanitize_dict_values(output, f"post_state.{tool_name}") else: state[tool_name] = {"raw": sanitize(str(output), f"post_state.{tool_name}")} return state # ───────────────────────────────────────────────────────────────────────────── # Recovery Assessment # ───────────────────────────────────────────────────────────────────────────── def _assess_recovery( pre_state: dict[str, Any] | None, post_state: dict[str, Any], action_taken: str, ) -> str: """ 評估修復效果。 Phase 1 使用啟發式規則(基於 K8s Pod 狀態字串判斷)。 Phase 4 將改用動態基線(Holt-Winters 偏差量),不再用靜態閾值。 Heuristics(Phase 1 版本): - post_state 含 Running → success - post_state 含 CrashLoopBackOff / Error / OOMKilled → failed - post_state 為空(MCP 無回應)→ degraded - pre_state 與 post_state 完全相同 → degraded(未改變) """ if not post_state: return "degraded" # 轉為字串做啟發式掃描 post_str = str(post_state).lower() pre_str = str(pre_state).lower() if pre_state else "" # 失敗信號(Gate 1 fix: 移除裸 "error" — 會誤觸 error_rate/error_count 等指標 key) # "error" 作為 K8s ContainerState reason 由 "failed" Pod phase 間接覆蓋 failure_signals = ["crashloopbackoff", "oomkilled", "oomkill", "failed"] if any(sig in post_str for sig in failure_signals): return "failed" # 成功信號 success_signals = ["running", "ready", "1/1", "2/2", "3/3", "healthy"] if any(sig in post_str for sig in success_signals): # 但如果 pre_state 已經是 running,可能是無效操作 if pre_str and any(sig in pre_str for sig in success_signals): # 如果執行的是 restart,即使 pre/post 都 Running 也算 success if "restart" in action_taken.lower() or "delete" in action_taken.lower(): return "success" return "degraded" return "success" # 前後無變化 if pre_str and post_str == pre_str: return "degraded" return "degraded" # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _get_incident_id(incident: "Incident") -> str: return incident.incident_id if hasattr(incident, "incident_id") else str(incident.id) def _get_alertname(incident: "Incident") -> str: if incident.signals: return incident.signals[0].labels.get("alertname", "") return "" def _get_labels(incident: "Incident") -> dict[str, Any]: if incident.signals: return incident.signals[0].labels return {} async def _update_snapshot( snapshot: EvidenceSnapshot, post_state: dict[str, Any], result: str, ) -> None: """補填 EvidenceSnapshot 的 post_execution_state + verification_result。""" try: await snapshot.update_post_execution(post_state, result) except Exception: logger.exception("verifier_snapshot_update_failed", snapshot_id=snapshot.snapshot_id) # ───────────────────────────────────────────────────────────────────────────── # Singleton # ───────────────────────────────────────────────────────────────────────────── _verifier: PostExecutionVerifier | None = None def get_post_execution_verifier() -> PostExecutionVerifier: """取得 PostExecutionVerifier Singleton。""" global _verifier if _verifier is None: _verifier = PostExecutionVerifier() return _verifier