520 lines
20 KiB
Python
520 lines
20 KiB
Python
"""
|
||
PostExecutionVerifier 測試
|
||
===========================
|
||
ADR-081: Phase 1 執行後驗證器
|
||
|
||
測試項目:
|
||
- _assess_recovery() 三態判斷邏輯(success / degraded / failed)
|
||
- 空 post_state → degraded
|
||
- failure 信號優先於 success 信號
|
||
- restart action 時 pre/post 都 Running → success
|
||
- 非 restart action 時 pre/post 都 Running → degraded
|
||
- verify() 收斂等待 warmup(warmup=0 時跳過)
|
||
- verify() 逾時 → "timeout"
|
||
- capture_pre_execution_state 填入 pre_execution_state
|
||
|
||
2026-04-15 Claude Sonnet 4.6 + ogt: Phase 1 初始建立
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import pytest
|
||
from unittest.mock import AsyncMock, patch
|
||
|
||
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
|
||
from src.plugins.mcp.registry import AuditedMCPToolProvider
|
||
from src.services import post_execution_verifier as pev_module
|
||
from src.services.post_execution_verifier import (
|
||
PostExecutionVerifier,
|
||
_assess_recovery,
|
||
_get_incident_id,
|
||
_get_labels,
|
||
)
|
||
from src.services.evidence_snapshot import EvidenceSnapshot
|
||
from src.services.mcp_tool_registry import RegisteredTool, SensorDimension
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Incident stub
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
def _stub_incident(
|
||
alertname: str = "KubePodCrashLooping",
|
||
namespace: str = "awoooi-prod",
|
||
pod: str = "api-xyz",
|
||
) -> object:
|
||
class _Signal:
|
||
labels = {
|
||
"alertname": alertname,
|
||
"namespace": namespace,
|
||
"pod": pod,
|
||
}
|
||
|
||
class _Incident:
|
||
incident_id = "INC-TEST"
|
||
signals = [_Signal()]
|
||
|
||
return _Incident()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# _assess_recovery — 核心三態邏輯
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class TestAssessRecovery:
|
||
"""Phase 1 啟發式規則驗證"""
|
||
|
||
def test_empty_post_state_is_degraded(self):
|
||
assert _assess_recovery(None, {}, "restart_service") == "degraded"
|
||
|
||
def test_running_in_post_state_is_success(self):
|
||
post = {"pod": {"status": "Running"}}
|
||
assert _assess_recovery(None, post, "restart_service:api") == "success"
|
||
|
||
def test_1_of_1_ready_is_success(self):
|
||
post = {"pod": {"containers": "1/1"}}
|
||
assert _assess_recovery(None, post, "scale_up") == "success"
|
||
|
||
def test_rollout_success_is_success(self):
|
||
post = {
|
||
"k8s_watch_rollout": {
|
||
"deployment": "awoooi-auto-repair-canary",
|
||
"success": True,
|
||
"status": 'deployment "awoooi-auto-repair-canary" successfully rolled out',
|
||
}
|
||
}
|
||
assert _assess_recovery(None, post, "auto_repair_playbook:PB-CANARY") == "success"
|
||
|
||
def test_crashloopbackoff_is_failed(self):
|
||
post = {"pod": {"status": "CrashLoopBackOff"}}
|
||
assert _assess_recovery(None, post, "restart_service") == "failed"
|
||
|
||
def test_oomkilled_is_failed(self):
|
||
post = {"pod": {"status": "OOMKilled"}}
|
||
assert _assess_recovery(None, post, "restart_service") == "failed"
|
||
|
||
def test_pod_phase_failed_is_failed(self):
|
||
"""K8s Pod phase 'Failed' 正確觸發 failed(原 Error state 測試 — Gate 1 fix: 移除裸 "error" 防止 error_rate 等 key 誤觸)"""
|
||
post = {"phase": "Failed"}
|
||
assert _assess_recovery(None, post, "patch_config") == "failed"
|
||
|
||
def test_error_rate_key_does_not_trigger_failed(self):
|
||
"""error_rate 等指標 key 不得誤判為 failed — Gate 1 回歸"""
|
||
post = {"error_rate": 0.5, "status": "Running"}
|
||
# 含 Running → success(error_rate key 不觸發 failed)
|
||
assert _assess_recovery(None, post, "restart") == "success"
|
||
|
||
def test_failure_signal_beats_success_signal(self):
|
||
# CrashLoopBackOff 且含 Running(混合狀態)— 失敗優先
|
||
post = {"status": "Running", "reason": "CrashLoopBackOff"}
|
||
assert _assess_recovery(None, post, "restart") == "failed"
|
||
|
||
def test_pre_running_post_running_no_restart_is_degraded(self):
|
||
"""非 restart 動作,前後都 Running → 操作無效 → degraded"""
|
||
pre = {"status": "Running"}
|
||
post = {"status": "Running"}
|
||
assert _assess_recovery(pre, post, "scale_up") == "degraded"
|
||
|
||
def test_pre_running_post_running_restart_is_success(self):
|
||
"""restart 動作,前後都 Running → 重啟成功 → success"""
|
||
pre = {"status": "Running"}
|
||
post = {"status": "Running"}
|
||
assert _assess_recovery(pre, post, "restart_service:api") == "success"
|
||
|
||
def test_rollout_restart_step_success_when_pre_and_post_running(self):
|
||
"""auto-repair label 必須帶實際 rollout restart 步驟,才能驗證為真修復。"""
|
||
pre = {"status": "Running"}
|
||
post = {"status": "Running"}
|
||
action = "auto_repair_playbook:PB-TEST kubectl rollout restart deployment/api"
|
||
assert _assess_recovery(pre, post, action) == "success"
|
||
|
||
def test_diagnosis_only_step_is_not_verified_repair(self):
|
||
"""診斷型 playbook 即使 post 狀態健康,也不能被算成 verified repair。"""
|
||
pre = {"status": "Running"}
|
||
post = {"status": "Running"}
|
||
action = "auto_repair_playbook:PB-TEST mcp:ssh_diagnose docker stats"
|
||
assert _assess_recovery(pre, post, action) == "degraded"
|
||
|
||
def test_pre_running_post_running_delete_is_success(self):
|
||
"""kubectl delete 動作,前後都 Running → success"""
|
||
pre = {"status": "Running"}
|
||
post = {"status": "Running"}
|
||
assert _assess_recovery(pre, post, "kubectl_delete_pod:api") == "success"
|
||
|
||
def test_pre_none_post_running_is_success(self):
|
||
"""無前狀態 + 後狀態 Running → success"""
|
||
assert _assess_recovery(None, {"status": "Running"}, "restart") == "success"
|
||
|
||
def test_healthy_signal_is_success(self):
|
||
post = {"health": "healthy"}
|
||
assert _assess_recovery(None, post, "patch") == "success"
|
||
|
||
def test_pre_post_identical_no_change_is_degraded(self):
|
||
state = {"status": "Pending"}
|
||
assert _assess_recovery(state, state, "patch_config") == "degraded"
|
||
|
||
def test_case_insensitive_matching(self):
|
||
"""信號匹配必須不分大小寫"""
|
||
post = {"STATUS": "RUNNING"}
|
||
assert _assess_recovery(None, post, "restart") == "success"
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Helper 函式
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class TestHelpers:
|
||
def test_get_incident_id_from_incident_id_attr(self):
|
||
class I:
|
||
incident_id = "INC-001"
|
||
assert _get_incident_id(I()) == "INC-001"
|
||
|
||
def test_get_incident_id_fallback_to_id(self):
|
||
class I:
|
||
id = 42
|
||
assert _get_incident_id(I()) == "42"
|
||
|
||
def test_get_labels_from_signals(self):
|
||
inc = _stub_incident(namespace="ns-test")
|
||
labels = _get_labels(inc)
|
||
assert labels["namespace"] == "ns-test"
|
||
|
||
def test_get_labels_no_signals_returns_empty(self):
|
||
class I:
|
||
signals = []
|
||
assert _get_labels(I()) == {}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# PostExecutionVerifier.verify() — 端對端邏輯(mock MCP 層)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class TestVerify:
|
||
"""verify() 整合測試:使用 mock 繞過真實 MCP / DB"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_warmup_zero_skips_sleep(self):
|
||
"""warmup_sec=0 必須立即執行,不 sleep"""
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
|
||
with patch.object(
|
||
verifier,
|
||
"_collect_post_state",
|
||
new=AsyncMock(return_value={"status": "Running"}),
|
||
):
|
||
result = await verifier.verify(
|
||
incident=incident,
|
||
snapshot=None,
|
||
action_taken="restart_service:api",
|
||
warmup_sec=0.0,
|
||
)
|
||
|
||
assert result == "success"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_post_state_failed_signals_returns_failed(self):
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
|
||
with patch.object(
|
||
verifier,
|
||
"_collect_post_state",
|
||
new=AsyncMock(return_value={"status": "CrashLoopBackOff"}),
|
||
):
|
||
result = await verifier.verify(
|
||
incident=incident,
|
||
snapshot=None,
|
||
action_taken="restart_service:api",
|
||
warmup_sec=0.0,
|
||
)
|
||
|
||
assert result == "failed"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_snapshot_missing_persists_fallback_verification(self):
|
||
"""snapshot=None 也必須把 verification_result 寫成可稽核 fallback snapshot。"""
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
persist = AsyncMock()
|
||
|
||
with patch.object(
|
||
verifier,
|
||
"_collect_post_state",
|
||
new=AsyncMock(return_value={"status": "Running"}),
|
||
):
|
||
with patch(
|
||
"src.services.post_execution_verifier._persist_fallback_snapshot",
|
||
new=persist,
|
||
):
|
||
result = await verifier.verify(
|
||
incident=incident,
|
||
snapshot=None,
|
||
action_taken="auto_repair_playbook:PB-TEST",
|
||
warmup_sec=0.0,
|
||
)
|
||
|
||
assert result == "success"
|
||
persist.assert_awaited_once()
|
||
call_kwargs = persist.await_args.kwargs
|
||
assert call_kwargs["incident"] is incident
|
||
assert call_kwargs["post_state"] == {"status": "Running"}
|
||
assert call_kwargs["result"] == "success"
|
||
assert call_kwargs["action_taken"] == "auto_repair_playbook:PB-TEST"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_collect_timeout_returns_timeout(self):
|
||
"""MCP 蒐集超時 → "timeout",不 raise"""
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
|
||
async def _slow(*args, **kwargs):
|
||
await asyncio.sleep(9999)
|
||
|
||
with patch.object(verifier, "_collect_post_state", new=_slow):
|
||
with patch("src.services.post_execution_verifier.VERIFY_TIMEOUT_SEC", 0.05):
|
||
result = await verifier.verify(
|
||
incident=incident,
|
||
snapshot=None,
|
||
action_taken="restart_service",
|
||
warmup_sec=0.0,
|
||
)
|
||
|
||
assert result == "timeout"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_collect_exception_returns_failed(self):
|
||
"""MCP 蒐集拋例外 → "failed",不 raise"""
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
|
||
async def _raise(*args, **kwargs):
|
||
raise ConnectionError("k8s unreachable")
|
||
|
||
with patch.object(verifier, "_collect_post_state", new=_raise):
|
||
result = await verifier.verify(
|
||
incident=incident,
|
||
snapshot=None,
|
||
action_taken="restart_service",
|
||
warmup_sec=0.0,
|
||
)
|
||
|
||
assert result == "failed"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_snapshot_pre_state_used_for_comparison(self):
|
||
"""pre_execution_state 須傳入 _assess_recovery 做對比"""
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
snapshot = EvidenceSnapshot(incident_id="INC-TEST")
|
||
snapshot.pre_execution_state = {"status": "Running"}
|
||
|
||
# post_state 也 Running,但動作不是 restart → degraded
|
||
with patch.object(
|
||
verifier,
|
||
"_collect_post_state",
|
||
new=AsyncMock(return_value={"status": "Running"}),
|
||
):
|
||
with patch(
|
||
"src.services.post_execution_verifier._update_snapshot",
|
||
new=AsyncMock(),
|
||
):
|
||
result = await verifier.verify(
|
||
incident=incident,
|
||
snapshot=snapshot,
|
||
action_taken="scale_up",
|
||
warmup_sec=0.0,
|
||
)
|
||
|
||
assert result == "degraded"
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# capture_pre_execution_state
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class TestCapturePreState:
|
||
@pytest.mark.asyncio
|
||
async def test_captures_state_into_snapshot(self):
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
snapshot = EvidenceSnapshot(incident_id="INC-TEST")
|
||
|
||
with patch.object(
|
||
verifier,
|
||
"_collect_post_state",
|
||
new=AsyncMock(return_value={"status": "Running", "ready": "2/2"}),
|
||
):
|
||
await verifier.capture_pre_execution_state(incident, snapshot)
|
||
|
||
assert snapshot.pre_execution_state is not None
|
||
assert "Running" in str(snapshot.pre_execution_state)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_exception_sets_empty_pre_state(self):
|
||
"""蒐集失敗 → pre_execution_state = {},不 raise"""
|
||
verifier = PostExecutionVerifier()
|
||
incident = _stub_incident()
|
||
snapshot = EvidenceSnapshot(incident_id="INC-TEST")
|
||
|
||
async def _raise(*args, **kwargs):
|
||
raise RuntimeError("k8s down")
|
||
|
||
with patch.object(verifier, "_collect_post_state", new=_raise):
|
||
await verifier.capture_pre_execution_state(incident, snapshot)
|
||
|
||
assert snapshot.pre_execution_state == {}
|
||
|
||
|
||
class _CaptureProvider(MCPToolProvider):
|
||
name = "capture"
|
||
seen_parameters: dict | None = None
|
||
|
||
async def list_tools(self) -> list[MCPTool]:
|
||
return []
|
||
|
||
async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult:
|
||
self.seen_parameters = dict(parameters)
|
||
return MCPToolResult(success=True, output={"status": "Running"})
|
||
|
||
|
||
class _FakeRegistry:
|
||
def __init__(self, provider: _CaptureProvider) -> None:
|
||
self.provider = provider
|
||
|
||
def suggest_tools(self, alertname: str = "", incident_labels: dict | None = None) -> list[RegisteredTool]:
|
||
return [
|
||
RegisteredTool(
|
||
tool=MCPTool(
|
||
name="kubectl_get",
|
||
description="",
|
||
input_schema={},
|
||
server_name="capture",
|
||
),
|
||
provider=self.provider,
|
||
dimensions=[SensorDimension.D1_K8S_STATE],
|
||
)
|
||
]
|
||
|
||
|
||
class _PrometheusRegistry:
|
||
def __init__(self, provider: _CaptureProvider) -> None:
|
||
self.provider = provider
|
||
|
||
def suggest_tools(self, alertname: str = "", incident_labels: dict | None = None) -> list[RegisteredTool]:
|
||
return [
|
||
RegisteredTool(
|
||
tool=MCPTool(
|
||
name="prometheus_query",
|
||
description="",
|
||
input_schema={},
|
||
server_name="capture",
|
||
),
|
||
provider=self.provider,
|
||
dimensions=[SensorDimension.D3_METRICS],
|
||
)
|
||
]
|
||
|
||
|
||
class _DbContext:
|
||
async def __aenter__(self) -> object:
|
||
return object()
|
||
|
||
async def __aexit__(self, *_args: object) -> None:
|
||
return None
|
||
|
||
|
||
class TestCollectPostStateAuditContext:
|
||
@pytest.mark.asyncio
|
||
async def test_collect_post_state_injects_mcp_audit_context(self):
|
||
provider = _CaptureProvider()
|
||
verifier = PostExecutionVerifier()
|
||
verifier._registry = _FakeRegistry(provider)
|
||
|
||
state = await verifier._collect_post_state(_stub_incident())
|
||
|
||
assert state["kubectl_get"]["status"] == "Running"
|
||
assert provider.seen_parameters is not None
|
||
audit_context = provider.seen_parameters["_mcp_audit"]
|
||
assert audit_context["incident_id"] == "INC-TEST"
|
||
assert audit_context["session_id"] == "incident:INC-TEST:post_execution"
|
||
assert audit_context["flywheel_node"] == "verify"
|
||
assert audit_context["agent_role"] == "post_execution_verifier"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_collect_post_state_routes_audited_provider_through_gateway(
|
||
self,
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
):
|
||
provider = _CaptureProvider()
|
||
verifier = PostExecutionVerifier()
|
||
audited_provider = AuditedMCPToolProvider(provider)
|
||
verifier._registry = _FakeRegistry(audited_provider)
|
||
calls: list[dict] = []
|
||
|
||
class FakeGateway:
|
||
def __init__(self, db: object) -> None:
|
||
self.db = db
|
||
|
||
async def call(self, ctx, parameters: dict) -> MCPToolResult:
|
||
calls.append({"ctx": ctx, "parameters": parameters, "db": self.db})
|
||
return MCPToolResult(
|
||
success=True,
|
||
execution_id="gw",
|
||
output={"status": "Running"},
|
||
)
|
||
|
||
monkeypatch.setattr(pev_module, "get_db_context", lambda _project_id: _DbContext())
|
||
monkeypatch.setattr(pev_module, "McpGateway", FakeGateway)
|
||
|
||
state = await verifier._collect_post_state(_stub_incident())
|
||
|
||
assert state["kubectl_get"]["status"] == "Running"
|
||
assert provider.seen_parameters is None
|
||
assert calls
|
||
ctx = calls[0]["ctx"]
|
||
assert ctx.project_id == "awoooi"
|
||
assert ctx.agent_id == "post_execution_verifier"
|
||
assert ctx.tool_name == "kubectl_get"
|
||
assert ctx.trace_id == "INC-TEST"
|
||
assert ctx.required_scope == "read"
|
||
assert calls[0]["parameters"]["_mcp_audit"]["incident_id"] == "INC-TEST"
|
||
assert calls[0]["parameters"]["_mcp_audit"]["flywheel_node"] == "verify"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_collect_post_state_sends_prometheus_query_parameter(self):
|
||
provider = _CaptureProvider()
|
||
verifier = PostExecutionVerifier()
|
||
verifier._registry = _PrometheusRegistry(provider)
|
||
incident = _stub_incident(alertname="DockerContainerMemoryLimitPressure")
|
||
incident.signals[0].labels.update({
|
||
"host": "110",
|
||
"container_name": "momo-scheduler",
|
||
})
|
||
|
||
await verifier._collect_post_state(incident)
|
||
|
||
assert provider.seen_parameters is not None
|
||
query = provider.seen_parameters["query"]
|
||
assert "docker_container_memory_usage_bytes" in query
|
||
assert 'host="110"' in query
|
||
assert 'container_name="momo-scheduler"' in query
|
||
|
||
|
||
class TestPrometheusQueryBuilder:
|
||
def test_docker_memory_alert_query_is_not_empty(self):
|
||
query = pev_module._build_prometheus_query(
|
||
"DockerContainerMemoryLimitPressure",
|
||
{"host": "110", "container_name": "momo-scheduler"},
|
||
)
|
||
|
||
assert "docker_container_memory_usage_bytes" in query
|
||
assert "docker_container_memory_limit_bytes" in query
|
||
|
||
def test_canary_alert_uses_generic_non_empty_query(self):
|
||
query = pev_module._build_prometheus_query("AwoooPAutoRepairCanaryT16", {})
|
||
|
||
assert query
|
||
assert "up" in query
|