Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 8m39s
Critical:
- C1: decision_manager _collect_mcp_context container 變數 Python ternary 優先度 bug 修正
原: `A or B or C[0] if list else ""` (ternary 控制全式)
修: `A or B or (C[0] if list else "")` (明確括號)
- C2: 所有 MCP 呼叫加 asyncio.wait_for timeout=5s,防止阻塞決策主路徑
同時加 unknown host warning log (C4)
- C3+M1: _DESTRUCTIVE_PATTERNS 補全移至模組頂層常量
新增: delete pods(複數)/kubectl drain/kubectl cordon/kubectl rollout undo/
docker rm/docker stop/docker kill/rm -rf/"replicas": 0(JSON patch)
Important:
- I1: webhooks.py IP 排除改用 is_internal_ip() 支援全 RFC-1918 (10.x/172.16-31.x/192.168.x)
- I4: 新增 test_destructive_patterns.py — 25 測試全過
涵蓋: 常量存在、攔截、誤攔迴歸、critical 永遠攔截
🔴 Tier 3 紅區 — 首席架構師 Code Review 通過後 push
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
148 lines
5.9 KiB
Python
148 lines
5.9 KiB
Python
"""
|
||
auto_approve.py DESTRUCTIVE_PATTERNS 攔截清單測試
|
||
==================================================
|
||
測試策略 (feedback_no_mock_testing.md):
|
||
- 直接呼叫 AutoApprovePolicy.evaluate(),不 Mock
|
||
- 驗證破壞性操作被攔截 (should_auto_approve=False, CRITICAL_OPERATION)
|
||
- 驗證可恢復操作允許通過(不被 pattern 誤攔)
|
||
- 2026-04-11 Claude Sonnet 4.6: Code Review I4 修補
|
||
"""
|
||
|
||
import pytest
|
||
|
||
from src.services.auto_approve import AutoApprovePolicy, _DESTRUCTIVE_PATTERNS
|
||
|
||
|
||
class TestDestructivePatternsConstant:
|
||
"""確認模組常量存在且包含關鍵項目"""
|
||
|
||
def test_constant_exists_and_is_list(self):
|
||
assert isinstance(_DESTRUCTIVE_PATTERNS, list)
|
||
assert len(_DESTRUCTIVE_PATTERNS) > 0
|
||
|
||
def test_contains_scale_to_zero(self):
|
||
assert "--replicas=0" in _DESTRUCTIVE_PATTERNS
|
||
|
||
def test_contains_delete_deployment(self):
|
||
assert "delete deployment" in _DESTRUCTIVE_PATTERNS
|
||
|
||
def test_contains_kubectl_drain(self):
|
||
assert "kubectl drain" in _DESTRUCTIVE_PATTERNS
|
||
|
||
def test_contains_docker_rm(self):
|
||
assert "docker rm" in _DESTRUCTIVE_PATTERNS
|
||
|
||
def test_contains_json_patch_replicas_zero(self):
|
||
# kubectl patch JSON 形式
|
||
assert '"replicas": 0' in _DESTRUCTIVE_PATTERNS
|
||
|
||
def test_contains_rm_rf(self):
|
||
assert "rm -rf" in _DESTRUCTIVE_PATTERNS
|
||
|
||
|
||
class TestDestructivePatternsBlocked:
|
||
"""破壞性操作必須被攔截(無論 risk_level)"""
|
||
|
||
@pytest.fixture
|
||
def policy(self):
|
||
return AutoApprovePolicy()
|
||
|
||
def _proposal(self, action: str, risk_level: str = "medium", confidence: float = 0.9) -> dict:
|
||
return {"action": action, "risk_level": risk_level, "confidence": confidence}
|
||
|
||
def test_scale_to_zero_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("kubectl scale deployment api --replicas=0"))
|
||
assert not d.should_auto_approve
|
||
assert "Destructive pattern" in d.reason_detail
|
||
|
||
def test_delete_deployment_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("kubectl delete deployment api-server"))
|
||
assert not d.should_auto_approve
|
||
assert "Destructive pattern" in d.reason_detail
|
||
|
||
def test_delete_pod_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("kubectl delete pod api-server-abc123"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_delete_pods_plural_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("kubectl delete pods --all -n awoooi-prod"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_kubectl_drain_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("kubectl drain 192.168.0.110 --ignore-daemonsets"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_kubectl_rollout_undo_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("kubectl rollout undo deployment/api-server"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_docker_rm_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("docker rm -f sentry"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_docker_stop_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("docker stop harbor"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_drop_table_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("drop table incidents"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_rm_rf_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal("ssh 192.168.0.188 'rm -rf /data'"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_json_patch_replicas_zero_blocked(self, policy):
|
||
d = policy.evaluate(self._proposal(
|
||
'kubectl patch deployment api -p \'{"spec":{"replicas": 0}}\''
|
||
))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_destructive_blocked_even_low_risk(self, policy):
|
||
"""破壞性操作即使 low risk 也必須攔截"""
|
||
d = policy.evaluate(self._proposal("kubectl delete deployment api-server", risk_level="low"))
|
||
assert not d.should_auto_approve
|
||
|
||
def test_destructive_blocked_even_high_confidence(self, policy):
|
||
"""高信心度也不能繞過破壞性攔截"""
|
||
d = policy.evaluate(self._proposal("kubectl drain node-1", confidence=0.99))
|
||
assert not d.should_auto_approve
|
||
|
||
|
||
class TestSafeOperationsAllowed:
|
||
"""可恢復操作不應被誤攔"""
|
||
|
||
@pytest.fixture
|
||
def policy(self):
|
||
return AutoApprovePolicy()
|
||
|
||
def _proposal(self, action: str, risk_level: str = "medium", confidence: float = 0.9) -> dict:
|
||
return {"action": action, "risk_level": risk_level, "confidence": confidence}
|
||
|
||
def test_rollout_restart_allowed(self, policy):
|
||
"""kubectl rollout restart 是可恢復操作,不應被攔截"""
|
||
d = policy.evaluate(self._proposal("kubectl rollout restart deployment/api-server"))
|
||
# 不應因 destructive pattern 被攔截
|
||
assert "Destructive pattern" not in d.reason_detail
|
||
|
||
def test_docker_restart_allowed(self, policy):
|
||
"""docker restart 是可恢復操作"""
|
||
d = policy.evaluate(self._proposal("docker restart sentry"))
|
||
assert "Destructive pattern" not in d.reason_detail
|
||
|
||
def test_scale_up_allowed(self, policy):
|
||
"""scale to 2 replicas 不應被攔截"""
|
||
d = policy.evaluate(self._proposal("kubectl scale deployment api --replicas=2"))
|
||
assert "Destructive pattern" not in d.reason_detail
|
||
|
||
def test_kubectl_get_allowed(self, policy):
|
||
"""kubectl get 是只讀操作"""
|
||
d = policy.evaluate(self._proposal("kubectl get pods -n awoooi-prod"))
|
||
assert "Destructive pattern" not in d.reason_detail
|
||
|
||
def test_critical_severity_always_blocked(self, policy):
|
||
"""critical risk level 無論操作都需人工"""
|
||
d = policy.evaluate(self._proposal("kubectl rollout restart deployment/api", risk_level="critical"))
|
||
assert not d.should_auto_approve
|
||
assert d.reason.value == "critical_operation"
|