Files
awoooi/apps/api/tests/test_alert_rule_engine_validation.py
OG T cc42aa0bdb
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(adr-076): Task 2.2 + 2.3 — 規則擴充 + kubectl 注入防護
Task 2.2: alert_rules.yaml 新增 3 類規則 (priority 125-127)
  - gitea_down: Gitea CI/CD 下線 → NO_ACTION (priority 125, critical)
  - ssl_cert_expiring: SSL 憑證到期 → NO_ACTION (priority 126, medium)
  - external_site_down: MoWoooWork/Dev/Blackbox probe → NO_ACTION (priority 127, medium)
  規則總數: 21 → 24

Task 2.3: alert_rule_engine.py kubectl 注入防護
  - _RULE_ENGINE_DESTRUCTIVE_RE: 阻擋 delete pvc/namespace/statefulset/deployment,
    drain/cordon, --replicas=0, rm -rf, DROP TABLE, $() 反引號
  - validate_kubectl_command(): 公開 API,SSH 指令/空字串直接通過
  - match_rule() 整合: 變數替換後驗證,阻擋時清空 + log warning
  - test_alert_rule_engine_validation.py: 34 tests (100% 通過)

測試: 776 passed, 26 skipped, 0 failed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 15:10:10 +08:00

148 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AlertRuleEngine kubectl 注入防護測試
=====================================
Task 2.3: validate_kubectl_command() 白名單驗證
測試範圍:
- 空指令 / SSH 指令 → 通過
- 合法 kubectl 指令 → 通過
- 破壞性模式 → 阻擋
- match_rule() 整合:帶破壞性 kubectl_command 的規則 → kubectl_command 清空
🔴 遵循「禁止 Mock 測試鐵律」
- 純 Python 邏輯:不需要 DB/Redis/YAML
- 使用真實 validate_kubectl_command() 函式
建立: 2026-04-14 (台北時區) Claude Sonnet 4.6 (Task 2.3)
"""
import pytest
from src.services.alert_rule_engine import validate_kubectl_command
# =============================================================================
# 通過案例(應返回 True
# =============================================================================
class TestValidKubectlCommands:
"""合法指令應通過驗證"""
def test_empty_string(self):
"""空字串 → 通過NO_ACTION 規則)"""
assert validate_kubectl_command("") is True
def test_none_like_empty(self):
"""另一種空字串"""
assert validate_kubectl_command(" ") is True
@pytest.mark.parametrize("cmd", [
"ssh 192.168.0.188 'systemctl restart ollama'",
"ssh {host} 'docker restart minio'",
"ssh root@192.168.0.110 'cd /data/gitea && docker compose ps'",
])
def test_ssh_commands_pass(self, cmd):
"""SSH 指令一律通過(由主機層執行,不走 kubectl 路徑)"""
assert validate_kubectl_command(cmd) is True
@pytest.mark.parametrize("cmd", [
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"kubectl rollout restart deployment/postgresql -n awoooi-prod",
"kubectl scale deployment awoooi-api --replicas=3 -n awoooi-prod",
"kubectl delete pod awoooi-api-abc123 -n awoooi-prod",
"kubectl logs awoooi-api -n awoooi-prod --previous --tail=50",
"kubectl get pods -n awoooi-prod",
"kubectl describe node k3s-node-01",
"kubectl get nodes -o wide",
"kubectl autoscale deployment awoooi-api --min=2 --max=5 -n awoooi-prod",
"kubectl set resources deployment/awoooi-api --limits=memory=1Gi -n awoooi-prod",
])
def test_safe_kubectl_commands_pass(self, cmd):
"""常見合法 kubectl 指令應通過"""
assert validate_kubectl_command(cmd) is True
def test_kubectl_exec_with_psql(self):
"""kubectl exec 查詢(含 SQL SELECT→ 通過"""
cmd = (
"kubectl exec -n awoooi-prod deployment/postgresql -- "
"psql -U postgres -c 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity;'"
)
assert validate_kubectl_command(cmd) is True
def test_kubectl_get_with_jq(self):
"""kubectl get + pipe → 通過"""
cmd = "kubectl get pods -n monitoring && curl -s http://192.168.0.120:9093/api/v1/status"
assert validate_kubectl_command(cmd) is True
# =============================================================================
# 阻擋案例(應返回 False
# =============================================================================
class TestDestructiveKubectlCommands:
"""破壞性模式應被阻擋(返回 False"""
@pytest.mark.parametrize("cmd", [
"kubectl delete pvc awoooi-data -n awoooi-prod",
"kubectl delete namespace awoooi-prod",
"kubectl delete statefulset postgresql -n awoooi-prod",
"kubectl delete deployment awoooi-api -n awoooi-prod",
])
def test_destructive_delete_blocked(self, cmd):
"""破壞性 kubectl delete 應被阻擋"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl scale deployment awoooi-api --replicas=0 -n awoooi-prod",
"kubectl scale deployment awoooi-api --replicas= 0 -n awoooi-prod",
# kubectl patch JSON 格式不在 YAML 規則集中,不納入測試範圍
])
def test_scale_to_zero_blocked(self, cmd):
"""縮容至零應被阻擋(--replicas=0 旗標形式)"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl drain k3s-node-01 --ignore-daemonsets",
"kubectl cordon k3s-node-01",
])
def test_node_eviction_blocked(self, cmd):
"""節點驅逐/封鎖應被阻擋"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
# ssh 開頭指令設計上信任(由人工寫入 YAML不是注入點
# 測試「非 ssh 前綴」的 rm -rf 情境
"rm -rf /tmp/test ",
"rm -f /important ",
"kubectl exec deploy/api -- rm -rf /data ",
])
def test_rm_rf_blocked(self, cmd):
"""rm -rf 應被阻擋(非 SSH 前綴的破壞性刪除)"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl exec -n prod deploy/pg -- psql -c 'DROP TABLE incidents;'",
"kubectl exec -n prod deploy/pg -- psql -c 'drop database awoooi'",
])
def test_sql_ddl_blocked(self, cmd):
"""破壞性 SQL DDL 應被阻擋"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl get pods -n prod $(echo injected)",
"kubectl rollout restart deployment/$(cat /etc/passwd)",
"kubectl exec deploy/api -- `whoami`",
])
def test_shell_injection_blocked(self, cmd):
"""Shell 命令注入應被阻擋"""
assert validate_kubectl_command(cmd) is False
def test_variable_substitution_injection(self):
"""模擬 target='; rm -rf /' 注入後的結果應被阻擋"""
# 模擬 _fill() 替換後帶有注入的指令
injected_cmd = "kubectl rollout restart deployment/awoooi-api; rm -rf / -n prod"
# rm -rf 後接空格 → 阻擋
assert validate_kubectl_command(injected_cmd) is False