Files
awoooi/apps/api/tests/test_destructive_patterns.py
Your Name ed2a4838f2
Some checks failed
CD Pipeline / tests (push) Failing after 1m2s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Successful in 24s
fix(auto): use action parser for repair gates
2026-04-30 14:06:09 +08:00

154 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
auto_approve.py DESTRUCTIVE_PATTERNS 攔截清單測試
==================================================
測試策略 (feedback_no_mock_testing.md):
- 直接呼叫 AutoApprovePolicy.evaluate(),不 Mock
- 驗證破壞性操作被攔截 (should_auto_approve=False, CRITICAL_OPERATION)
- 驗證可恢復操作允許通過(不被 pattern 誤攔)
- 2026-04-11 Claude Sonnet 4.6: Code Review I4 修補
"""
import pytest
from src.services.auto_approve import AutoApprovePolicy, _DESTRUCTIVE_PATTERNS
class TestDestructivePatternsConstant:
"""確認模組常量存在且包含關鍵項目"""
def test_constant_exists_and_is_list(self):
assert isinstance(_DESTRUCTIVE_PATTERNS, list)
assert len(_DESTRUCTIVE_PATTERNS) > 0
def test_contains_scale_to_zero(self):
assert "--replicas=0" in _DESTRUCTIVE_PATTERNS
def test_contains_delete_deployment(self):
assert "delete deployment" in _DESTRUCTIVE_PATTERNS
def test_contains_kubectl_drain(self):
assert "kubectl drain" in _DESTRUCTIVE_PATTERNS
def test_contains_docker_rm(self):
assert "docker rm" in _DESTRUCTIVE_PATTERNS
def test_contains_json_patch_replicas_zero(self):
# kubectl patch JSON 形式
assert '"replicas": 0' in _DESTRUCTIVE_PATTERNS
def test_contains_rm_rf(self):
assert "rm -rf" in _DESTRUCTIVE_PATTERNS
class TestDestructivePatternsBlocked:
"""破壞性操作必須被攔截(無論 risk_level"""
@pytest.fixture
def policy(self):
return AutoApprovePolicy()
def _proposal(self, action: str, risk_level: str = "medium", confidence: float = 0.9) -> dict:
return {"action": action, "risk_level": risk_level, "confidence": confidence}
def test_scale_to_zero_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl scale deployment api --replicas=0"))
assert not d.should_auto_approve
assert "parser rejected" in d.reason_detail
def test_delete_deployment_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl delete deployment api-server"))
assert not d.should_auto_approve
assert "parser rejected" in d.reason_detail
def test_delete_pod_allowed_by_parser(self, policy):
d = policy.evaluate(self._proposal("kubectl delete pod api-server-abc123"))
assert d.should_auto_approve
assert "Destructive pattern" not in d.reason_detail
def test_delete_pod_force_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl delete pod api-server-abc123 --force"))
assert not d.should_auto_approve
assert "parser rejected" in d.reason_detail
def test_delete_pods_plural_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl delete pods --all -n awoooi-prod"))
assert not d.should_auto_approve
def test_kubectl_drain_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl drain 192.168.0.110 --ignore-daemonsets"))
assert not d.should_auto_approve
def test_kubectl_rollout_undo_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl rollout undo deployment/api-server"))
assert not d.should_auto_approve
def test_docker_rm_blocked(self, policy):
d = policy.evaluate(self._proposal("docker rm -f sentry"))
assert not d.should_auto_approve
def test_docker_stop_blocked(self, policy):
d = policy.evaluate(self._proposal("docker stop harbor"))
assert not d.should_auto_approve
def test_drop_table_blocked(self, policy):
d = policy.evaluate(self._proposal("drop table incidents"))
assert not d.should_auto_approve
def test_rm_rf_blocked(self, policy):
d = policy.evaluate(self._proposal("ssh 192.168.0.188 'rm -rf /data'"))
assert not d.should_auto_approve
def test_json_patch_replicas_zero_blocked(self, policy):
d = policy.evaluate(self._proposal(
'kubectl patch deployment api -p \'{"spec":{"replicas": 0}}\''
))
assert not d.should_auto_approve
def test_destructive_blocked_even_low_risk(self, policy):
"""破壞性操作即使 low risk 也必須攔截"""
d = policy.evaluate(self._proposal("kubectl delete deployment api-server", risk_level="low"))
assert not d.should_auto_approve
def test_destructive_blocked_even_high_confidence(self, policy):
"""高信心度也不能繞過破壞性攔截"""
d = policy.evaluate(self._proposal("kubectl drain node-1", confidence=0.99))
assert not d.should_auto_approve
class TestSafeOperationsAllowed:
"""可恢復操作不應被誤攔"""
@pytest.fixture
def policy(self):
return AutoApprovePolicy()
def _proposal(self, action: str, risk_level: str = "medium", confidence: float = 0.9) -> dict:
return {"action": action, "risk_level": risk_level, "confidence": confidence}
def test_rollout_restart_allowed(self, policy):
"""kubectl rollout restart 是可恢復操作,不應被攔截"""
d = policy.evaluate(self._proposal("kubectl rollout restart deployment/api-server"))
# 不應因 destructive pattern 被攔截
assert "Destructive pattern" not in d.reason_detail
def test_docker_restart_allowed(self, policy):
"""docker restart 是可恢復操作"""
d = policy.evaluate(self._proposal("docker restart sentry"))
assert "Destructive pattern" not in d.reason_detail
def test_scale_up_allowed(self, policy):
"""scale to 2 replicas 不應被攔截"""
d = policy.evaluate(self._proposal("kubectl scale deployment api --replicas=2"))
assert "Destructive pattern" not in d.reason_detail
def test_kubectl_get_allowed(self, policy):
"""kubectl get 是只讀操作"""
d = policy.evaluate(self._proposal("kubectl get pods -n awoooi-prod"))
assert "Destructive pattern" not in d.reason_detail
def test_critical_severity_always_blocked(self, policy):
"""critical risk level 無論操作都需人工"""
d = policy.evaluate(self._proposal("kubectl rollout restart deployment/api", risk_level="critical"))
assert not d.should_auto_approve
assert d.reason.value == "critical_operation"