Some checks failed
Code Review / ai-code-review (push) Successful in 16s
CD Pipeline / tests (push) Failing after 1m37s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
890 lines
33 KiB
Python
890 lines
33 KiB
Python
"""
|
||
Auto Repair Service Tests - #8 自動升級決策
|
||
==========================================
|
||
測試自動修復服務層功能
|
||
|
||
版本: v1.0
|
||
建立: 2026-03-26 (台北時區)
|
||
建立者: Claude Code (#8 自動升級決策)
|
||
"""
|
||
|
||
import pytest
|
||
|
||
from src.models.incident import Incident, IncidentStatus, Severity, Signal
|
||
from src.models.playbook import (
|
||
ActionType,
|
||
Playbook,
|
||
PlaybookStatus,
|
||
RepairStep,
|
||
RiskLevel,
|
||
SymptomPattern,
|
||
)
|
||
from src.plugins.mcp.interfaces import MCPToolResult
|
||
from src.services.auto_repair_service import AutoRepairService
|
||
from src.utils.timezone import now_taipei
|
||
|
||
|
||
class MockPlaybookService:
|
||
"""Mock playbook service for testing"""
|
||
|
||
def __init__(self):
|
||
self._playbooks: dict[str, Playbook] = {}
|
||
self._recommendations: list = []
|
||
|
||
def add_playbook(self, playbook: Playbook):
|
||
self._playbooks[playbook.playbook_id] = playbook
|
||
|
||
def set_recommendations(self, recommendations: list):
|
||
self._recommendations = recommendations
|
||
|
||
async def get_recommendations(self, symptoms, top_k=3):
|
||
return self._recommendations
|
||
|
||
async def get_by_id(self, playbook_id: str):
|
||
return self._playbooks.get(playbook_id)
|
||
|
||
async def record_execution(self, playbook_id: str, success: bool):
|
||
playbook = self._playbooks.get(playbook_id)
|
||
if playbook:
|
||
if success:
|
||
playbook.success_count += 1
|
||
else:
|
||
playbook.failure_count += 1
|
||
return playbook is not None
|
||
|
||
|
||
def create_test_incident(
|
||
incident_id: str = "INC-TEST-001",
|
||
severity: Severity = Severity.P2,
|
||
alert_category: str | None = None,
|
||
alert_name: str = "HighCPU",
|
||
) -> Incident:
|
||
"""Create a test incident"""
|
||
now = now_taipei()
|
||
return Incident(
|
||
incident_id=incident_id,
|
||
status=IncidentStatus.INVESTIGATING,
|
||
severity=severity,
|
||
affected_services=["test-service"],
|
||
alert_category=alert_category,
|
||
signals=[
|
||
Signal(
|
||
alert_name=alert_name,
|
||
severity=severity,
|
||
source="prometheus",
|
||
fired_at=now,
|
||
labels={"namespace": "prod", "alertname": alert_name},
|
||
),
|
||
],
|
||
)
|
||
|
||
|
||
def create_high_quality_playbook(
|
||
playbook_id: str = "PB-TEST-001",
|
||
risk_level: RiskLevel = RiskLevel.MEDIUM,
|
||
) -> Playbook:
|
||
"""Create a high quality playbook (success_rate >= 95%, count >= 10)"""
|
||
return Playbook(
|
||
playbook_id=playbook_id,
|
||
name="HighCPU - test-service 修復劇本",
|
||
description="High quality playbook for auto repair",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["HighCPU"],
|
||
affected_services=["test-service"],
|
||
severity_range=["P2"],
|
||
),
|
||
repair_steps=[
|
||
RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.KUBECTL,
|
||
command="kubectl rollout restart deployment/{target}",
|
||
risk_level=risk_level,
|
||
),
|
||
],
|
||
success_count=20, # >= 10
|
||
failure_count=1, # success_rate = 95.2%
|
||
ai_confidence=0.9,
|
||
)
|
||
|
||
|
||
class MockPlaybookRecommendation:
|
||
"""Mock recommendation for testing"""
|
||
|
||
def __init__(self, playbook: Playbook, similarity_score: float):
|
||
self.playbook = playbook
|
||
self.similarity_score = similarity_score
|
||
|
||
|
||
async def _no_cooldown(*args, **kwargs) -> tuple[bool, str]:
|
||
"""單元測試用 cooldown: 永遠允許 (不需要 Redis)"""
|
||
return True, "允許自動修復 (test bypass)"
|
||
|
||
|
||
class _DbContext:
|
||
async def __aenter__(self) -> object:
|
||
return object()
|
||
|
||
async def __aexit__(self, *_args: object) -> None:
|
||
return None
|
||
|
||
|
||
class TestAutoRepairService:
|
||
"""Auto Repair Service unit tests"""
|
||
|
||
@pytest.fixture
|
||
def mock_playbook_service(self):
|
||
return MockPlaybookService()
|
||
|
||
@pytest.fixture
|
||
def service(self, mock_playbook_service):
|
||
# 2026-04-01 ogt: 注入 no-op cooldown 以隔離 Redis 依賴
|
||
return AutoRepairService(
|
||
playbook_service=mock_playbook_service,
|
||
cooldown_checker=_no_cooldown,
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_allows_p1_severity_with_approved_playbook(
|
||
self,
|
||
service,
|
||
mock_playbook_service,
|
||
):
|
||
"""P1 severity no longer blocks approved LOW/MEDIUM/HIGH controlled repair."""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.HIGH)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.9)
|
||
])
|
||
incident = create_test_incident(severity=Severity.P1)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is True
|
||
assert decision.risk_level == RiskLevel.HIGH
|
||
assert decision.blocked_by is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_allows_p0_severity_with_approved_playbook(
|
||
self,
|
||
service,
|
||
mock_playbook_service,
|
||
):
|
||
"""P0 severity is handled by PlayBook/Registry guardrails, not a blanket human gate."""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.MEDIUM)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.9)
|
||
])
|
||
incident = create_test_incident(severity=Severity.P0)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is True
|
||
assert decision.risk_level == RiskLevel.MEDIUM
|
||
assert decision.blocked_by is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_no_playbook_match(self, service, mock_playbook_service):
|
||
"""Test when no playbook matches"""
|
||
mock_playbook_service.set_recommendations([])
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is False
|
||
assert decision.blocked_by == "NO_MATCH"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_low_similarity(self, service, mock_playbook_service):
|
||
"""Test that low similarity no longer blocks auto-repair.
|
||
2026-04-07: 統帥指令移除相似度門檻 — 只要 APPROVED Playbook 匹配即執行。
|
||
2026-04-08 Claude Sonnet 4.6: 更新測試預期以符合當前設計。
|
||
"""
|
||
playbook = create_high_quality_playbook()
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.5) # Below old 0.7 threshold
|
||
])
|
||
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
# 相似度門檻已移除 — APPROVED Playbook 即使低相似度也應通過
|
||
assert decision.can_auto_repair is True
|
||
assert decision.blocked_by is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_not_high_quality(self, service, mock_playbook_service):
|
||
"""Test low-quality playbook is now approved (gates removed 2026-04-07).
|
||
2026-04-07: 統帥指令移除品質門檻 — 只要 APPROVED 狀態即可執行。
|
||
2026-04-08 Claude Sonnet 4.6: 更新測試預期以符合當前設計。
|
||
"""
|
||
playbook = Playbook(
|
||
playbook_id="PB-LOW-QUALITY",
|
||
name="Low quality playbook",
|
||
description="Not enough executions",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["HighCPU"],
|
||
affected_services=["test-service"],
|
||
),
|
||
repair_steps=[
|
||
RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.KUBECTL,
|
||
command="kubectl rollout restart",
|
||
risk_level=RiskLevel.MEDIUM,
|
||
description="restart deployment",
|
||
),
|
||
],
|
||
success_count=2,
|
||
failure_count=0,
|
||
)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.9)
|
||
])
|
||
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
# 品質門檻已移除 — APPROVED Playbook 直接通過
|
||
assert decision.can_auto_repair is True
|
||
assert decision.blocked_by is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_high_risk_blocked(self, service, mock_playbook_service):
|
||
"""Test HIGH risk playbook is now approved (gates removed 2026-04-07).
|
||
2026-04-07: 統帥指令移除風險等級門檻 — 只要 APPROVED 狀態即可執行。
|
||
2026-04-08 Claude Sonnet 4.6: 更新測試預期以符合當前設計。
|
||
"""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.HIGH)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.9)
|
||
])
|
||
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
# 風險等級門檻已移除 — HIGH risk APPROVED Playbook 也通過
|
||
assert decision.can_auto_repair is True
|
||
assert decision.blocked_by is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_critical_risk_blocked(self, service, mock_playbook_service):
|
||
"""CRITICAL remains break-glass and is outside LOW/MEDIUM/HIGH authorization."""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.CRITICAL)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.9)
|
||
])
|
||
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is False
|
||
assert decision.blocked_by == "CRITICAL_BREAK_GLASS"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_success(self, service, mock_playbook_service):
|
||
"""Test successful auto repair evaluation"""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.MEDIUM)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.85)
|
||
])
|
||
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is True
|
||
assert decision.playbook is not None
|
||
assert decision.playbook.playbook_id == playbook.playbook_id
|
||
assert decision.blocked_by is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_exact_alert_match_wins_over_higher_fuzzy_similarity(
|
||
self,
|
||
service,
|
||
mock_playbook_service,
|
||
):
|
||
"""Exact alert/service playbooks must outrank unrelated fuzzy matches."""
|
||
fuzzy_playbook = create_high_quality_playbook(
|
||
playbook_id="PB-FUZZY-HOST",
|
||
risk_level=RiskLevel.LOW,
|
||
)
|
||
fuzzy_playbook.symptom_pattern = SymptomPattern(
|
||
alert_names=["HostCPUHigh"],
|
||
affected_services=["node-exporter"],
|
||
severity_range=["P2"],
|
||
)
|
||
exact_playbook = create_high_quality_playbook(
|
||
playbook_id="PB-EXACT-CANARY",
|
||
risk_level=RiskLevel.LOW,
|
||
)
|
||
exact_playbook.symptom_pattern = SymptomPattern(
|
||
alert_names=["AwoooPAutoRepairCanaryT16"],
|
||
affected_services=["awoooi-auto-repair-canary"],
|
||
severity_range=["P3"],
|
||
)
|
||
mock_playbook_service.add_playbook(fuzzy_playbook)
|
||
mock_playbook_service.add_playbook(exact_playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(fuzzy_playbook, similarity_score=0.95),
|
||
MockPlaybookRecommendation(exact_playbook, similarity_score=0.45),
|
||
])
|
||
|
||
incident = create_test_incident(
|
||
severity=Severity.P3,
|
||
alert_name="AwoooPAutoRepairCanaryT16",
|
||
)
|
||
incident.affected_services = ["awoooi-auto-repair-canary"]
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is True
|
||
assert decision.playbook is not None
|
||
assert decision.playbook.playbook_id == "PB-EXACT-CANARY"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_backup_failure_blocks_k8s_playbook(self, service, mock_playbook_service):
|
||
"""Backup/host incidents must not execute K8s rollout playbooks."""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.MEDIUM)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.85)
|
||
])
|
||
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="backup_failure",
|
||
alert_name="HostBackupFailed",
|
||
)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is False
|
||
assert decision.blocked_by == "HOST_BACKUP_K8S_PLAYBOOK"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_external_site_blocks_k3s_node_playbook(self, service, mock_playbook_service):
|
||
"""External site probes must not fuzzy-match into K3s node repair."""
|
||
playbook = Playbook(
|
||
playbook_id="PB-K3S-NODE",
|
||
name="K3s 節點下線",
|
||
description="K3s node repair",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["K3sNodeDown", "K3sVIPDown"],
|
||
affected_services=[],
|
||
),
|
||
repair_steps=[
|
||
RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.KUBECTL,
|
||
command="kubectl get nodes -o wide && kubectl describe node {target}",
|
||
risk_level=RiskLevel.HIGH,
|
||
),
|
||
],
|
||
success_count=11,
|
||
failure_count=2,
|
||
)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.91)
|
||
])
|
||
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="external_site",
|
||
alert_name="StockWoooWorkDown",
|
||
)
|
||
incident.affected_services = ["stock-platform"]
|
||
incident.signals[0].labels.update({
|
||
"layer": "external",
|
||
"component": "stock-platform",
|
||
"host": "110",
|
||
})
|
||
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is False
|
||
assert decision.blocked_by == "EXTERNAL_SITE_K3S_PLAYBOOK"
|
||
assert decision.playbook == playbook
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_backup_failure_blocks_observe_only_ssh_playbook(self, service, mock_playbook_service):
|
||
"""Backup/host read-only diagnostics must not be counted as auto-repair."""
|
||
playbook = Playbook(
|
||
playbook_id="PB-BACKUP-SSH",
|
||
name="Backup SSH diagnostics",
|
||
description="Read-only backup diagnosis",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["HostBackupFailed"],
|
||
affected_services=["test-service"],
|
||
),
|
||
repair_steps=[
|
||
RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.SSH_COMMAND,
|
||
command="ssh {host} 'tail -80 /var/log/backup.log'",
|
||
risk_level=RiskLevel.LOW,
|
||
description="collect backup logs",
|
||
),
|
||
],
|
||
success_count=20,
|
||
failure_count=1,
|
||
)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.85)
|
||
])
|
||
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="backup_failure",
|
||
alert_name="HostBackupFailed",
|
||
)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is False
|
||
assert decision.blocked_by == "OBSERVE_ONLY_PLAYBOOK"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_docker_restart_ssh_playbook_remains_auto_repair_candidate(
|
||
self,
|
||
service,
|
||
mock_playbook_service,
|
||
):
|
||
"""Safe Docker restart PlayBooks still qualify as real mutating repair."""
|
||
playbook = Playbook(
|
||
playbook_id="PB-DOCKER-RESTART",
|
||
name="Docker health restart",
|
||
description="Restart unhealthy Docker container",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["DockerContainerUnhealthy"],
|
||
affected_services=["stockplatform-v2-edge-1"],
|
||
),
|
||
repair_steps=[
|
||
RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.SSH_COMMAND,
|
||
command="ssh {host} 'docker restart {container}'",
|
||
risk_level=RiskLevel.MEDIUM,
|
||
),
|
||
],
|
||
success_count=20,
|
||
failure_count=1,
|
||
)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.85)
|
||
])
|
||
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="infrastructure",
|
||
alert_name="DockerContainerUnhealthy",
|
||
)
|
||
incident.affected_services = ["stockplatform-v2-edge-1"]
|
||
incident.signals[0].labels.update({
|
||
"host": "110",
|
||
"container_name": "stockplatform-v2-edge-1",
|
||
})
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is True
|
||
assert decision.blocked_by is None
|
||
|
||
def test_failed_verification_escalates_for_host_backup_ssh_playbook(self, service):
|
||
"""Failed backup SSH diagnostics must not synthesize K8s rollback."""
|
||
playbook = Playbook(
|
||
playbook_id="PB-BACKUP-SSH",
|
||
name="Backup SSH diagnostics",
|
||
description="Read-only backup diagnosis",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["HostBackupFailed"],
|
||
affected_services=["test-service"],
|
||
),
|
||
repair_steps=[
|
||
RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.SSH_COMMAND,
|
||
command="ssh {host} 'tail -80 /var/log/backup.log'",
|
||
risk_level=RiskLevel.LOW,
|
||
),
|
||
],
|
||
success_count=20,
|
||
failure_count=1,
|
||
)
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="backup_failure",
|
||
alert_name="HostBackupFailed",
|
||
)
|
||
|
||
assert service._should_escalate_failed_verification(incident, playbook) is True
|
||
|
||
def test_failed_verification_allows_k8s_rollback_for_k8s_playbook(self, service):
|
||
"""K8s playbooks may still use the existing K8s rollback path."""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.MEDIUM)
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
|
||
assert service._should_escalate_failed_verification(incident, playbook) is False
|
||
|
||
def test_legacy_ssh_diagnostic_routes_to_mcp_gateway(self, service):
|
||
"""Legacy YAML_RULE SSH diagnostics become governed read-only MCP calls."""
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="infrastructure",
|
||
alert_name="DockerContainerMemoryLimitPressure",
|
||
)
|
||
incident.signals[0].labels.update({
|
||
"host": "110",
|
||
"container_name": "sentry-self-hosted-clickhouse-1",
|
||
})
|
||
|
||
route = service._route_legacy_ssh_command_to_mcp(
|
||
incident,
|
||
'ssh {host} \'echo "=== LOAD ==="; uptime; docker stats --no-stream\'',
|
||
)
|
||
|
||
assert route is not None
|
||
assert route.tool_name == "ssh_diagnose"
|
||
assert route.params == {
|
||
"host": "192.168.0.110",
|
||
"container_name": "sentry-self-hosted-clickhouse-1",
|
||
}
|
||
|
||
def test_legacy_ssh_write_action_does_not_route_to_read_mcp(self, service):
|
||
"""Write operations must stay out of the read-only diagnostic grant."""
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
|
||
route = service._route_legacy_ssh_command_to_mcp(
|
||
incident,
|
||
"ssh {host} 'docker restart minio'",
|
||
)
|
||
|
||
assert route is None
|
||
|
||
def test_legacy_ssh_docker_restart_routes_to_write_mcp_gateway(self, service):
|
||
"""Safe legacy Docker restart steps use the governed write MCP tool."""
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="infrastructure",
|
||
alert_name="DockerContainerUnhealthy",
|
||
)
|
||
incident.signals[0].labels.update({
|
||
"host": "110",
|
||
"container_name": "minio",
|
||
})
|
||
|
||
route = service._route_legacy_ssh_write_command_to_mcp(
|
||
incident,
|
||
'ssh {host} \'docker inspect {container} --format="{{.State.Health.Status}}" && docker restart {container}\'',
|
||
)
|
||
|
||
assert route is not None
|
||
assert route.tool_name == "ssh_docker_restart"
|
||
assert route.required_scope == "write"
|
||
assert route.params == {
|
||
"host": "192.168.0.110",
|
||
"container_name": "minio",
|
||
"trust_score": 0.85,
|
||
}
|
||
|
||
def test_legacy_ssh_complex_restart_stays_blocked(self, service):
|
||
"""Command substitution/fallback shell must not enter auto write MCP."""
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
incident.signals[0].labels.update({"host": "110", "container_name": "node-exporter"})
|
||
|
||
route = service._route_legacy_ssh_write_command_to_mcp(
|
||
incident,
|
||
'ssh {host} \'docker restart $(docker ps -a --filter name=exporter --format "{{.Names}}" | head -1) || systemctl restart node_exporter\'',
|
||
)
|
||
|
||
assert route is None
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_execute_legacy_ssh_restart_noops_when_upstream_repair_succeeded(
|
||
self,
|
||
service,
|
||
monkeypatch,
|
||
):
|
||
"""Do not restart twice when the host monitor already repaired it."""
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="infrastructure",
|
||
alert_name="DockerContainerUnhealthy",
|
||
)
|
||
incident.signals[0].labels.update({
|
||
"host": "wooo",
|
||
"container": "stockplatform-v2-edge-1",
|
||
"source": "docker-health-monitor",
|
||
"repair_action": "restarted",
|
||
"repair_result": "success",
|
||
})
|
||
step = RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.SSH_COMMAND,
|
||
command='ssh {host} \'docker inspect {container} --format="{{.State.Health.Status}}" && docker restart {container}\'',
|
||
risk_level=RiskLevel.MEDIUM,
|
||
requires_approval=False,
|
||
)
|
||
|
||
async def fail_if_called(*args, **kwargs):
|
||
raise AssertionError("MCP Gateway should not be called after upstream repair success")
|
||
|
||
monkeypatch.setattr(service, "_execute_ssh_mcp_route", fail_if_called)
|
||
|
||
result = await service._execute_step(incident, step)
|
||
|
||
assert result.startswith("SUCCESS: upstream repair already completed")
|
||
assert "docker-health-monitor" in result
|
||
assert "stockplatform-v2-edge-1" in result
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_execute_legacy_ssh_diagnostic_uses_mcp_gateway(
|
||
self,
|
||
service,
|
||
monkeypatch,
|
||
):
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="infrastructure",
|
||
alert_name="DockerContainerMemoryLimitPressure",
|
||
)
|
||
incident.signals[0].labels.update({
|
||
"host": "110",
|
||
"container_name": "momo-scheduler",
|
||
})
|
||
step = RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.SSH_COMMAND,
|
||
command='ssh {host} \'echo "=== LOAD ==="; uptime; docker stats --no-stream\'',
|
||
risk_level=RiskLevel.LOW,
|
||
)
|
||
calls = []
|
||
|
||
class FakeGateway:
|
||
def __init__(self, db: object) -> None:
|
||
self.db = db
|
||
|
||
async def call(self, ctx, params):
|
||
calls.append({"ctx": ctx, "params": params, "db": self.db})
|
||
return MCPToolResult(
|
||
success=True,
|
||
execution_id="gw-ok",
|
||
output={"stdout": "=== CPU TOP ==="},
|
||
)
|
||
|
||
monkeypatch.setattr("src.db.base.get_db_context", lambda _project_id: _DbContext())
|
||
monkeypatch.setattr("src.plugins.mcp.gateway.McpGateway", FakeGateway)
|
||
|
||
result = await service._execute_step(incident, step)
|
||
|
||
assert result.startswith("SUCCESS: mcp:ssh_diagnose")
|
||
assert calls
|
||
assert calls[0]["ctx"].agent_id == "auto_repair_executor"
|
||
assert calls[0]["ctx"].required_scope == "read"
|
||
assert calls[0]["ctx"].is_shadow is False
|
||
assert calls[0]["params"]["host"] == "192.168.0.110"
|
||
assert calls[0]["params"]["container_name"] == "momo-scheduler"
|
||
assert calls[0]["params"]["_mcp_audit"]["flywheel_node"] == "execute"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_execute_legacy_ssh_docker_restart_uses_write_mcp_gateway(
|
||
self,
|
||
service,
|
||
monkeypatch,
|
||
):
|
||
incident = create_test_incident(
|
||
severity=Severity.P2,
|
||
alert_category="infrastructure",
|
||
alert_name="DockerContainerUnhealthy",
|
||
)
|
||
incident.signals[0].labels.update({
|
||
"host": "110",
|
||
"container_name": "minio",
|
||
})
|
||
step = RepairStep(
|
||
step_number=1,
|
||
action_type=ActionType.SSH_COMMAND,
|
||
command='ssh {host} \'docker inspect {container} --format="{{.State.Health.Status}}" && docker restart {container}\'',
|
||
risk_level=RiskLevel.MEDIUM,
|
||
requires_approval=False,
|
||
)
|
||
calls = []
|
||
redis_sets = []
|
||
|
||
class FakeRedis:
|
||
async def set(self, key, value, ex=None):
|
||
redis_sets.append({"key": key, "value": value, "ex": ex})
|
||
return True
|
||
|
||
class FakeGateway:
|
||
def __init__(self, db: object) -> None:
|
||
self.db = db
|
||
|
||
async def call(self, ctx, params):
|
||
calls.append({"ctx": ctx, "params": params, "db": self.db})
|
||
return MCPToolResult(
|
||
success=True,
|
||
execution_id="gw-write-ok",
|
||
output={"stdout": "minio"},
|
||
)
|
||
|
||
monkeypatch.setattr("src.core.redis_client.get_redis", lambda: FakeRedis())
|
||
monkeypatch.setattr("src.db.base.get_db_context", lambda _project_id: _DbContext())
|
||
monkeypatch.setattr("src.plugins.mcp.gateway.McpGateway", FakeGateway)
|
||
|
||
result = await service._execute_step(incident, step)
|
||
|
||
assert result.startswith("SUCCESS: mcp:ssh_docker_restart")
|
||
assert calls
|
||
assert redis_sets
|
||
assert calls[0]["ctx"].agent_id == "auto_repair_executor"
|
||
assert calls[0]["ctx"].tool_name == "ssh_docker_restart"
|
||
assert calls[0]["ctx"].required_scope == "write"
|
||
assert calls[0]["ctx"].is_shadow is False
|
||
assert calls[0]["ctx"].run_id is not None
|
||
assert calls[0]["params"]["host"] == "192.168.0.110"
|
||
assert calls[0]["params"]["container_name"] == "minio"
|
||
assert calls[0]["params"]["trust_score"] == 0.85
|
||
assert calls[0]["params"]["_mcp_audit"]["flywheel_node"] == "execute"
|
||
assert redis_sets[0]["key"].startswith(
|
||
"mcp_approval:awoooi:auto_repair_executor:ssh_docker_restart:"
|
||
)
|
||
assert redis_sets[0]["value"] == "approved:auto_repair_policy"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_evaluate_low_risk_allowed(self, service, mock_playbook_service):
|
||
"""Test that LOW risk actions are allowed"""
|
||
playbook = create_high_quality_playbook(risk_level=RiskLevel.LOW)
|
||
mock_playbook_service.add_playbook(playbook)
|
||
mock_playbook_service.set_recommendations([
|
||
MockPlaybookRecommendation(playbook, similarity_score=0.85)
|
||
])
|
||
|
||
incident = create_test_incident(severity=Severity.P2)
|
||
decision = await service.evaluate_auto_repair(incident)
|
||
|
||
assert decision.can_auto_repair is True
|
||
assert decision.risk_level == RiskLevel.LOW
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_is_high_quality_calculation(self):
|
||
"""Test is_high_quality property"""
|
||
# High quality: APPROVED + 95%+ success rate + 10+ successes
|
||
playbook = create_high_quality_playbook()
|
||
assert playbook.is_high_quality is True
|
||
assert playbook.success_rate >= 0.95
|
||
assert playbook.success_count >= 10
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_not_high_quality_low_success_rate(self):
|
||
"""Test playbook with low success rate is not high quality"""
|
||
playbook = Playbook(
|
||
playbook_id="PB-LOW-RATE",
|
||
name="Low success rate",
|
||
description="Too many failures",
|
||
status=PlaybookStatus.APPROVED,
|
||
symptom_pattern=SymptomPattern(
|
||
alert_names=["Test"],
|
||
affected_services=["test"],
|
||
),
|
||
repair_steps=[],
|
||
success_count=15,
|
||
failure_count=5, # 75% success rate
|
||
)
|
||
assert playbook.is_high_quality is False
|
||
assert playbook.success_rate < 0.95
|
||
|
||
|
||
# =============================================================================
|
||
# B25/B26 — drain_pending_tasks
|
||
# 2026-04-27 Wave8-X3 by Claude — K8s rolling restart drain fix
|
||
# =============================================================================
|
||
|
||
class TestDrainPendingTasks:
|
||
"""drain_pending_tasks 優雅關閉背景任務。"""
|
||
|
||
@pytest.fixture
|
||
def service(self):
|
||
return AutoRepairService(
|
||
playbook_service=MockPlaybookService(),
|
||
cooldown_checker=_no_cooldown,
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_drain_no_pending_tasks_returns_zero(self, service):
|
||
"""沒有待處理 task → 立即返回 drained=0"""
|
||
result = await service.drain_pending_tasks(timeout=5.0)
|
||
assert result["drained"] == 0
|
||
assert result["timeout"] is False
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_drain_waits_for_pending_tasks(self, service):
|
||
"""有 pending task → drain 等待完成後回報正確數量"""
|
||
import asyncio
|
||
|
||
completed = []
|
||
|
||
async def quick_task():
|
||
await asyncio.sleep(0.01)
|
||
completed.append(1)
|
||
|
||
task = asyncio.create_task(quick_task())
|
||
service._pending_tasks.add(task)
|
||
task.add_done_callback(service._pending_tasks.discard)
|
||
|
||
result = await service.drain_pending_tasks(timeout=5.0)
|
||
|
||
assert result["drained"] == 1
|
||
assert result.get("still_pending", 0) == 0
|
||
assert result["timeout"] is False
|
||
assert len(completed) == 1
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_drain_timeout_reports_still_pending(self, service):
|
||
"""Task 超過 timeout → timeout=True,still_pending > 0"""
|
||
import asyncio
|
||
|
||
async def slow_task():
|
||
await asyncio.sleep(10) # 遠超 timeout
|
||
|
||
task = asyncio.create_task(slow_task())
|
||
service._pending_tasks.add(task)
|
||
task.add_done_callback(service._pending_tasks.discard)
|
||
|
||
result = await service.drain_pending_tasks(timeout=0.05)
|
||
|
||
assert result["timeout"] is True
|
||
assert result.get("still_pending", 0) >= 1
|
||
|
||
# 清理:取消還在跑的 task 避免 test 洩漏
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_drain_multiple_tasks_all_complete(self, service):
|
||
"""多個 task → 全部完成,drained 等於 task 數"""
|
||
import asyncio
|
||
|
||
async def quick():
|
||
await asyncio.sleep(0.01)
|
||
|
||
tasks = [asyncio.create_task(quick()) for _ in range(3)]
|
||
for t in tasks:
|
||
service._pending_tasks.add(t)
|
||
t.add_done_callback(service._pending_tasks.discard)
|
||
|
||
result = await service.drain_pending_tasks(timeout=5.0)
|
||
|
||
assert result["drained"] == 3
|
||
assert result["timeout"] is False
|