Files
awoooi/apps/api/src/services/post_execution_verifier.py
Your Name 61f5a6a419
Some checks failed
CD Pipeline / tests (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
Code Review / ai-code-review (push) Has been cancelled
fix(telegram): route alerts to SRE war room
2026-04-30 15:01:23 +08:00

464 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI AIOps Phase 1 — 執行後驗證器
=====================================
每次 AI 修復動作執行後,主動用 MCP 抓取環境後狀態,
與 EvidenceSnapshot.pre_execution_state 對比,
判斷修復是否真的有效。
驗證結果三態:
- "success" — 問題已解決Pod Running / 指標恢復正常)
- "degraded" — 部分改善但未完全恢復
- "failed" — 執行後狀態比執行前更差,或完全未改善
- "timeout" — 驗證超時MCP 無法回應)
驗證結果用途:
1. 填入 EvidenceSnapshot.verification_resultPhase 3 學習閉環基礎)
2. 傳給 learning_service 更新 Playbook EWMA trust_score
3. 觸發 Reviewer Agent 的 rollback 決策Phase 2
設計原則:
- 執行後等待 warm-up period預設 10s讓 K8s controller 有時間收斂
- 超時不 raise標記 "timeout" 並繼續流程
- 不阻塞原始執行路徑await但結果不影響執行本身是否成功
W2 PR-V1: SelfHealingValidator 串接 (2026-04-28 ogt + Claude Sonnet 4.6)
- ENABLE_SELF_HEALING_VALIDATOR=True 時verify() 完成後呼叫 assess_self_healing()
- self_healing_score < 0.5 → Telegram 警示 rollback 提案(不自動執行)
- 驗證失敗不阻塞主流程try/except 全包)
ADR-081: PreDecisionInvestigator + EvidenceSnapshot
MASTER §3.1 L6×D1
2026-04-15 ogt + Claude Sonnet 4.6 (亞太): Phase 1 初始建立
"""
from __future__ import annotations
import asyncio
import time
from typing import TYPE_CHECKING, Any
import structlog
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.mcp_tool_registry import SensorDimension, get_mcp_tool_registry
from src.services.sanitization_service import sanitize_dict_values
# W2 PR-V1: 頂層 import 讓測試 patch 路徑固定(延遲 import 無法被 patch
# ENABLE_SELF_HEALING_VALIDATOR=False 時此 import 不影響效能(純 python 模組)
from src.services import self_healing_validator as _shv_module
if TYPE_CHECKING:
from src.models.incident import Incident
logger = structlog.get_logger(__name__)
# 執行後等待收斂時間(秒)— K8s controller 需要時間處理重啟/滾動更新
POST_EXEC_WARMUP_SEC = 10.0
# 驗證超時(秒)
VERIFY_TIMEOUT_SEC = 30.0
# MCP 單工具超時(秒)
TOOL_TIMEOUT_SEC = 8.0
class PostExecutionVerifier:
"""
執行後環境狀態驗證器。
在 approval_execution.py 的 execute_approved_action() 中,
執行動作後呼叫 verify(),取得驗證結果並補填 EvidenceSnapshot。
Usage:
verifier = get_post_execution_verifier()
result = await verifier.verify(
incident=incident,
snapshot=pre_decision_snapshot,
action_taken="restart_service:awoooi-api",
)
# result: "success" | "degraded" | "failed" | "timeout"
"""
def __init__(self) -> None:
self._registry = get_mcp_tool_registry()
async def verify(
self,
incident: "Incident",
snapshot: EvidenceSnapshot | None,
action_taken: str,
warmup_sec: float = POST_EXEC_WARMUP_SEC,
) -> str:
"""
執行後驗證。
Args:
incident: 原始 Incident用於取 labels 定位資源)
snapshot: 執行前的 EvidenceSnapshot取 pre_execution_state 作基準線)
action_taken: 執行的動作描述(例如 "restart_service:awoooi-api"
warmup_sec: 等待 K8s 收斂的秒數
Returns:
str: "success" | "degraded" | "failed" | "timeout"
"""
incident_id = _get_incident_id(incident)
logger.info(
"verifier_start",
incident_id=incident_id,
action=action_taken,
warmup_sec=warmup_sec,
)
# 1. 等待收斂
if warmup_sec > 0:
await asyncio.sleep(warmup_sec)
# 2. 抓後狀態
try:
post_state = await asyncio.wait_for(
self._collect_post_state(incident),
timeout=VERIFY_TIMEOUT_SEC,
)
except asyncio.TimeoutError:
logger.warning("verifier_timeout", incident_id=incident_id)
if snapshot:
await _update_snapshot(snapshot, {}, "timeout")
return "timeout"
except Exception:
logger.exception("verifier_collect_error", incident_id=incident_id)
if snapshot:
await _update_snapshot(snapshot, {}, "failed")
return "failed"
# 3. 對比前後狀態
pre_state = snapshot.pre_execution_state if snapshot else None
result = _assess_recovery(pre_state, post_state, action_taken)
# 4. 更新 EvidenceSnapshot
if snapshot:
await _update_snapshot(snapshot, post_state, result)
logger.info(
"verifier_done",
incident_id=incident_id,
result=result,
action=action_taken,
)
# 5. W2 PR-V1: SelfHealingValidator 串接ENABLE_SELF_HEALING_VALIDATOR gate
# 在 post_state 已補填後評估自愈品質,不阻塞主流程
# 外層 try/except 確保任何 validator 失敗不影響 verify() 返回值
try:
await _run_self_healing_validator(
incident_id=incident_id,
snapshot=snapshot,
pre_state=pre_state,
post_state=post_state,
verification_result=result,
action_taken=action_taken,
)
except Exception:
logger.warning(
"self_healing_validator_uncaught",
incident_id=incident_id,
exc_info=True,
)
return result
async def capture_pre_execution_state(
self,
incident: "Incident",
snapshot: EvidenceSnapshot,
) -> None:
"""
執行前快照當前狀態,寫入 snapshot.pre_execution_state。
在 approval_execution.py 的動作執行「之前」呼叫。
"""
incident_id = _get_incident_id(incident)
try:
state = await asyncio.wait_for(
self._collect_post_state(incident), # 同樣的抓取邏輯
timeout=TOOL_TIMEOUT_SEC,
)
snapshot.pre_execution_state = state
logger.debug("verifier_pre_state_captured", incident_id=incident_id)
except Exception:
logger.warning("verifier_pre_state_failed", incident_id=incident_id)
snapshot.pre_execution_state = {}
async def _collect_post_state(self, incident: "Incident") -> dict[str, Any]:
"""
蒐集執行後環境狀態K8s Pod 狀態 + 關鍵指標)。
只選 D1K8s 狀態)和 D3指標作為驗證基準線
其他感官維度(日誌、拓撲等)在驗證時不必要。
"""
state: dict[str, Any] = {}
alertname = _get_alertname(incident)
labels = _get_labels(incident)
# 取 D1 + D3 工具
all_tools = self._registry.suggest_tools(alertname=alertname, incident_labels=labels)
verify_tools = [
t for t in all_tools
if any(d in (SensorDimension.D1_K8S_STATE, SensorDimension.D3_METRICS)
for d in t.dimensions)
]
params = {
"namespace": labels.get("namespace", "awoooi-prod"),
"pod_name": labels.get("pod", labels.get("name", "")),
"deployment": labels.get("deployment", ""),
"host": labels.get("instance", "").split(":")[0] or labels.get("host", ""),
}
async def _call_one(reg) -> tuple[str, Any]:
try:
result = await asyncio.wait_for(
reg.provider.execute(reg.tool.name, params),
timeout=TOOL_TIMEOUT_SEC,
)
if result.success and result.output:
return reg.tool.name, result.output
except Exception:
pass
return reg.tool.name, None
results = await asyncio.gather(*[_call_one(t) for t in verify_tools])
for tool_name, output in results:
if output is not None:
if isinstance(output, dict):
state[tool_name] = sanitize_dict_values(output, f"post_state.{tool_name}")
else:
state[tool_name] = {"raw": sanitize(str(output), f"post_state.{tool_name}")}
return state
# ─────────────────────────────────────────────────────────────────────────────
# W2 PR-V1: SelfHealingValidator 串接
# 2026-04-28 ogt + Claude Sonnet 4.6: C6 飛輪斷鏈修復
# ─────────────────────────────────────────────────────────────────────────────
async def _run_self_healing_validator(
incident_id: str,
snapshot: EvidenceSnapshot | None,
pre_state: dict[str, Any] | None,
post_state: dict[str, Any],
verification_result: str,
action_taken: str,
) -> None:
"""
SelfHealingValidator 串接入口。
Feature gate: ENABLE_SELF_HEALING_VALIDATOR預設 False
驗證失敗全程 try/except 保護,不影響主流程。
評估後:
- 補填 snapshot.self_healing_score + self_healing_detail
- score < 0.5 → 發送 Telegram rollback 提案警示
"""
try:
from src.core.config import get_settings
_settings = get_settings()
if not _settings.ENABLE_SELF_HEALING_VALIDATOR:
return
assessment = _shv_module.assess_self_healing(
pre_state=pre_state,
post_state=post_state,
verification_result=verification_result,
action_taken=action_taken,
)
score: float = assessment["score"]
logger.info(
"self_healing_assessed",
incident_id=incident_id,
score=score,
regressions=assessment.get("regressions", []),
root_cause_cleared=assessment.get("root_cause_cleared"),
detail=assessment.get("detail"),
)
# 補填 EvidenceSnapshot
if snapshot:
try:
await snapshot.update_self_healing(score=score, detail=assessment)
except Exception as _snap_err:
logger.warning(
"self_healing_snapshot_update_failed",
incident_id=incident_id,
error=str(_snap_err),
)
# score < 0.5 → Telegram rollback 提案警示
if score < 0.5:
await _send_rollback_proposal_alert(
incident_id=incident_id,
score=score,
assessment=assessment,
action_taken=action_taken,
)
except Exception:
logger.warning(
"self_healing_validator_error",
incident_id=incident_id,
exc_info=True,
)
async def _send_rollback_proposal_alert(
incident_id: str,
score: float,
assessment: dict[str, Any],
action_taken: str,
) -> None:
"""
自愈品質分數 < 0.5 時,發送 Telegram rollback 提案警示。
不自動執行 rollback僅通知人工評估。
"""
try:
from src.core.config import get_settings
from src.services.telegram_gateway import get_telegram_gateway
_settings = get_settings()
gateway = get_telegram_gateway()
regressions = assessment.get("regressions", [])
reg_str = ", ".join(regressions[:5]) if regressions else ""
root_cleared = "" if assessment.get("root_cause_cleared") else ""
text = (
f"⚠️ <b>自愈品質警示 — 建議人工評估 Rollback</b>\n"
f"Incident: <code>{incident_id}</code>\n"
f"動作: <code>{action_taken[:120]}</code>\n"
f"自愈分數: <b>{score:.2f}</b> (門檻 0.5)\n"
f"Root Cause 解除: {root_cleared}\n"
f"Regression 信號: {reg_str}\n"
f"<i>此為提案,不會自動執行 Rollback</i>"
)
target_chat_id = _settings.SRE_GROUP_CHAT_ID or _settings.OPENCLAW_TG_CHAT_ID
await gateway._http_client.post(
f"https://api.telegram.org/bot{_settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
json={
"chat_id": target_chat_id,
"text": text,
"parse_mode": "HTML",
},
)
logger.info(
"rollback_proposal_sent",
incident_id=incident_id,
score=score,
)
except Exception:
logger.warning(
"rollback_proposal_send_failed",
incident_id=incident_id,
exc_info=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Recovery Assessment
# ─────────────────────────────────────────────────────────────────────────────
def _assess_recovery(
pre_state: dict[str, Any] | None,
post_state: dict[str, Any],
action_taken: str,
) -> str:
"""
評估修復效果。
Phase 1 使用啟發式規則(基於 K8s Pod 狀態字串判斷)。
Phase 4 將改用動態基線Holt-Winters 偏差量),不再用靜態閾值。
HeuristicsPhase 1 版本):
- post_state 含 Running → success
- post_state 含 CrashLoopBackOff / Error / OOMKilled → failed
- post_state 為空MCP 無回應)→ degraded
- pre_state 與 post_state 完全相同 → degraded未改變
"""
if not post_state:
return "degraded"
# 轉為字串做啟發式掃描
post_str = str(post_state).lower()
pre_str = str(pre_state).lower() if pre_state else ""
# 失敗信號Gate 1 fix: 移除裸 "error" — 會誤觸 error_rate/error_count 等指標 key
# "error" 作為 K8s ContainerState reason 由 "failed" Pod phase 間接覆蓋
failure_signals = ["crashloopbackoff", "oomkilled", "oomkill", "failed"]
if any(sig in post_str for sig in failure_signals):
return "failed"
# 成功信號
success_signals = ["running", "ready", "1/1", "2/2", "3/3", "healthy"]
if any(sig in post_str for sig in success_signals):
# 但如果 pre_state 已經是 running可能是無效操作
if pre_str and any(sig in pre_str for sig in success_signals):
# 如果執行的是 restart即使 pre/post 都 Running 也算 success
if "restart" in action_taken.lower() or "delete" in action_taken.lower():
return "success"
return "degraded"
return "success"
# 前後無變化
if pre_str and post_str == pre_str:
return "degraded"
return "degraded"
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _get_incident_id(incident: "Incident") -> str:
return incident.incident_id if hasattr(incident, "incident_id") else str(incident.id)
def _get_alertname(incident: "Incident") -> str:
if incident.signals:
return incident.signals[0].labels.get("alertname", "")
return ""
def _get_labels(incident: "Incident") -> dict[str, Any]:
if incident.signals:
return incident.signals[0].labels
return {}
async def _update_snapshot(
snapshot: EvidenceSnapshot,
post_state: dict[str, Any],
result: str,
) -> None:
"""補填 EvidenceSnapshot 的 post_execution_state + verification_result。"""
try:
await snapshot.update_post_execution(post_state, result)
except Exception:
logger.exception("verifier_snapshot_update_failed", snapshot_id=snapshot.snapshot_id)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_verifier: PostExecutionVerifier | None = None
def get_post_execution_verifier() -> PostExecutionVerifier:
"""取得 PostExecutionVerifier Singleton。"""
global _verifier
if _verifier is None:
_verifier = PostExecutionVerifier()
return _verifier