Files
awoooi/apps/api/tests/test_post_execution_verifier.py
Your Name fa0e956c0e
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 59s
CD Pipeline / build-and-deploy (push) Successful in 3m22s
CD Pipeline / post-deploy-checks (push) Successful in 1m19s
fix(mcp): tag legacy provider calls with audit context
2026-05-06 17:18:52 +08:00

360 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PostExecutionVerifier 測試
===========================
ADR-081: Phase 1 執行後驗證器
測試項目:
- _assess_recovery() 三態判斷邏輯success / degraded / failed
- 空 post_state → degraded
- failure 信號優先於 success 信號
- restart action 時 pre/post 都 Running → success
- 非 restart action 時 pre/post 都 Running → degraded
- verify() 收斂等待 warmupwarmup=0 時跳過)
- verify() 逾時 → "timeout"
- capture_pre_execution_state 填入 pre_execution_state
2026-04-15 Claude Sonnet 4.6 + ogt: Phase 1 初始建立
"""
from __future__ import annotations
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
from src.services.post_execution_verifier import (
PostExecutionVerifier,
_assess_recovery,
_get_incident_id,
_get_labels,
)
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.mcp_tool_registry import RegisteredTool, SensorDimension
# ─────────────────────────────────────────────────────────────────────────────
# Incident stub
# ─────────────────────────────────────────────────────────────────────────────
def _stub_incident(
alertname: str = "KubePodCrashLooping",
namespace: str = "awoooi-prod",
pod: str = "api-xyz",
) -> object:
class _Signal:
labels = {
"alertname": alertname,
"namespace": namespace,
"pod": pod,
}
class _Incident:
incident_id = "INC-TEST"
signals = [_Signal()]
return _Incident()
# ─────────────────────────────────────────────────────────────────────────────
# _assess_recovery — 核心三態邏輯
# ─────────────────────────────────────────────────────────────────────────────
class TestAssessRecovery:
"""Phase 1 啟發式規則驗證"""
def test_empty_post_state_is_degraded(self):
assert _assess_recovery(None, {}, "restart_service") == "degraded"
def test_running_in_post_state_is_success(self):
post = {"pod": {"status": "Running"}}
assert _assess_recovery(None, post, "restart_service:api") == "success"
def test_1_of_1_ready_is_success(self):
post = {"pod": {"containers": "1/1"}}
assert _assess_recovery(None, post, "scale_up") == "success"
def test_crashloopbackoff_is_failed(self):
post = {"pod": {"status": "CrashLoopBackOff"}}
assert _assess_recovery(None, post, "restart_service") == "failed"
def test_oomkilled_is_failed(self):
post = {"pod": {"status": "OOMKilled"}}
assert _assess_recovery(None, post, "restart_service") == "failed"
def test_pod_phase_failed_is_failed(self):
"""K8s Pod phase 'Failed' 正確觸發 failed原 Error state 測試 — Gate 1 fix: 移除裸 "error" 防止 error_rate 等 key 誤觸)"""
post = {"phase": "Failed"}
assert _assess_recovery(None, post, "patch_config") == "failed"
def test_error_rate_key_does_not_trigger_failed(self):
"""error_rate 等指標 key 不得誤判為 failed — Gate 1 回歸"""
post = {"error_rate": 0.5, "status": "Running"}
# 含 Running → successerror_rate key 不觸發 failed
assert _assess_recovery(None, post, "restart") == "success"
def test_failure_signal_beats_success_signal(self):
# CrashLoopBackOff 且含 Running混合狀態— 失敗優先
post = {"status": "Running", "reason": "CrashLoopBackOff"}
assert _assess_recovery(None, post, "restart") == "failed"
def test_pre_running_post_running_no_restart_is_degraded(self):
"""非 restart 動作,前後都 Running → 操作無效 → degraded"""
pre = {"status": "Running"}
post = {"status": "Running"}
assert _assess_recovery(pre, post, "scale_up") == "degraded"
def test_pre_running_post_running_restart_is_success(self):
"""restart 動作,前後都 Running → 重啟成功 → success"""
pre = {"status": "Running"}
post = {"status": "Running"}
assert _assess_recovery(pre, post, "restart_service:api") == "success"
def test_pre_running_post_running_delete_is_success(self):
"""kubectl delete 動作,前後都 Running → success"""
pre = {"status": "Running"}
post = {"status": "Running"}
assert _assess_recovery(pre, post, "kubectl_delete_pod:api") == "success"
def test_pre_none_post_running_is_success(self):
"""無前狀態 + 後狀態 Running → success"""
assert _assess_recovery(None, {"status": "Running"}, "restart") == "success"
def test_healthy_signal_is_success(self):
post = {"health": "healthy"}
assert _assess_recovery(None, post, "patch") == "success"
def test_pre_post_identical_no_change_is_degraded(self):
state = {"status": "Pending"}
assert _assess_recovery(state, state, "patch_config") == "degraded"
def test_case_insensitive_matching(self):
"""信號匹配必須不分大小寫"""
post = {"STATUS": "RUNNING"}
assert _assess_recovery(None, post, "restart") == "success"
# ─────────────────────────────────────────────────────────────────────────────
# Helper 函式
# ─────────────────────────────────────────────────────────────────────────────
class TestHelpers:
def test_get_incident_id_from_incident_id_attr(self):
class I:
incident_id = "INC-001"
assert _get_incident_id(I()) == "INC-001"
def test_get_incident_id_fallback_to_id(self):
class I:
id = 42
assert _get_incident_id(I()) == "42"
def test_get_labels_from_signals(self):
inc = _stub_incident(namespace="ns-test")
labels = _get_labels(inc)
assert labels["namespace"] == "ns-test"
def test_get_labels_no_signals_returns_empty(self):
class I:
signals = []
assert _get_labels(I()) == {}
# ─────────────────────────────────────────────────────────────────────────────
# PostExecutionVerifier.verify() — 端對端邏輯mock MCP 層)
# ─────────────────────────────────────────────────────────────────────────────
class TestVerify:
"""verify() 整合測試:使用 mock 繞過真實 MCP / DB"""
@pytest.mark.asyncio
async def test_warmup_zero_skips_sleep(self):
"""warmup_sec=0 必須立即執行,不 sleep"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
with patch.object(
verifier,
"_collect_post_state",
new=AsyncMock(return_value={"status": "Running"}),
):
result = await verifier.verify(
incident=incident,
snapshot=None,
action_taken="restart_service:api",
warmup_sec=0.0,
)
assert result == "success"
@pytest.mark.asyncio
async def test_post_state_failed_signals_returns_failed(self):
verifier = PostExecutionVerifier()
incident = _stub_incident()
with patch.object(
verifier,
"_collect_post_state",
new=AsyncMock(return_value={"status": "CrashLoopBackOff"}),
):
result = await verifier.verify(
incident=incident,
snapshot=None,
action_taken="restart_service:api",
warmup_sec=0.0,
)
assert result == "failed"
@pytest.mark.asyncio
async def test_collect_timeout_returns_timeout(self):
"""MCP 蒐集超時 → "timeout",不 raise"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
async def _slow(*args, **kwargs):
await asyncio.sleep(9999)
with patch.object(verifier, "_collect_post_state", new=_slow):
with patch("src.services.post_execution_verifier.VERIFY_TIMEOUT_SEC", 0.05):
result = await verifier.verify(
incident=incident,
snapshot=None,
action_taken="restart_service",
warmup_sec=0.0,
)
assert result == "timeout"
@pytest.mark.asyncio
async def test_collect_exception_returns_failed(self):
"""MCP 蒐集拋例外 → "failed",不 raise"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
async def _raise(*args, **kwargs):
raise ConnectionError("k8s unreachable")
with patch.object(verifier, "_collect_post_state", new=_raise):
result = await verifier.verify(
incident=incident,
snapshot=None,
action_taken="restart_service",
warmup_sec=0.0,
)
assert result == "failed"
@pytest.mark.asyncio
async def test_snapshot_pre_state_used_for_comparison(self):
"""pre_execution_state 須傳入 _assess_recovery 做對比"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
snapshot = EvidenceSnapshot(incident_id="INC-TEST")
snapshot.pre_execution_state = {"status": "Running"}
# post_state 也 Running但動作不是 restart → degraded
with patch.object(
verifier,
"_collect_post_state",
new=AsyncMock(return_value={"status": "Running"}),
):
with patch(
"src.services.post_execution_verifier._update_snapshot",
new=AsyncMock(),
):
result = await verifier.verify(
incident=incident,
snapshot=snapshot,
action_taken="scale_up",
warmup_sec=0.0,
)
assert result == "degraded"
# ─────────────────────────────────────────────────────────────────────────────
# capture_pre_execution_state
# ─────────────────────────────────────────────────────────────────────────────
class TestCapturePreState:
@pytest.mark.asyncio
async def test_captures_state_into_snapshot(self):
verifier = PostExecutionVerifier()
incident = _stub_incident()
snapshot = EvidenceSnapshot(incident_id="INC-TEST")
with patch.object(
verifier,
"_collect_post_state",
new=AsyncMock(return_value={"status": "Running", "ready": "2/2"}),
):
await verifier.capture_pre_execution_state(incident, snapshot)
assert snapshot.pre_execution_state is not None
assert "Running" in str(snapshot.pre_execution_state)
@pytest.mark.asyncio
async def test_exception_sets_empty_pre_state(self):
"""蒐集失敗 → pre_execution_state = {},不 raise"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
snapshot = EvidenceSnapshot(incident_id="INC-TEST")
async def _raise(*args, **kwargs):
raise RuntimeError("k8s down")
with patch.object(verifier, "_collect_post_state", new=_raise):
await verifier.capture_pre_execution_state(incident, snapshot)
assert snapshot.pre_execution_state == {}
class _CaptureProvider(MCPToolProvider):
name = "capture"
seen_parameters: dict | None = None
async def list_tools(self) -> list[MCPTool]:
return []
async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult:
self.seen_parameters = dict(parameters)
return MCPToolResult(success=True, output={"status": "Running"})
class _FakeRegistry:
def __init__(self, provider: _CaptureProvider) -> None:
self.provider = provider
def suggest_tools(self, alertname: str = "", incident_labels: dict | None = None) -> list[RegisteredTool]:
return [
RegisteredTool(
tool=MCPTool(
name="kubectl_get",
description="",
input_schema={},
server_name="capture",
),
provider=self.provider,
dimensions=[SensorDimension.D1_K8S_STATE],
)
]
class TestCollectPostStateAuditContext:
@pytest.mark.asyncio
async def test_collect_post_state_injects_mcp_audit_context(self):
provider = _CaptureProvider()
verifier = PostExecutionVerifier()
verifier._registry = _FakeRegistry(provider)
state = await verifier._collect_post_state(_stub_incident())
assert state["kubectl_get"]["status"] == "Running"
assert provider.seen_parameters is not None
audit_context = provider.seen_parameters["_mcp_audit"]
assert audit_context["incident_id"] == "INC-TEST"
assert audit_context["session_id"] == "incident:INC-TEST:post_execution"
assert audit_context["flywheel_node"] == "verify"
assert audit_context["agent_role"] == "post_execution_verifier"