Files
awoooi/apps/api/tests/test_alert_rule_engine_validation.py
Your Name 8c5605fadf
All checks were successful
CD Pipeline / tests (push) Successful in 1m22s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / build-and-deploy (push) Successful in 3m45s
CD Pipeline / post-deploy-checks (push) Successful in 1m30s
fix(api): block external site k3s playbook mismatch
2026-06-01 17:28:32 +08:00

215 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AlertRuleEngine kubectl 注入防護測試
=====================================
Task 2.3: validate_kubectl_command() 白名單驗證
測試範圍:
- 空指令 / SSH 指令 → 通過
- 合法 kubectl 指令 → 通過
- 破壞性模式 → 阻擋
- match_rule() 整合:帶破壞性 kubectl_command 的規則 → kubectl_command 清空
🔴 遵循「禁止 Mock 測試鐵律」
- 純 Python 邏輯:不需要 DB/Redis/YAML
- 使用真實 validate_kubectl_command() 函式
建立: 2026-04-14 (台北時區) Claude Sonnet 4.6 (Task 2.3)
"""
import pytest
from src.services.alert_rule_engine import match_rule, validate_kubectl_command
# =============================================================================
# 通過案例(應返回 True
# =============================================================================
class TestValidKubectlCommands:
"""合法指令應通過驗證"""
def test_empty_string(self):
"""空字串 → 通過NO_ACTION 規則)"""
assert validate_kubectl_command("") is True
def test_none_like_empty(self):
"""另一種空字串"""
assert validate_kubectl_command(" ") is True
@pytest.mark.parametrize("cmd", [
"ssh 192.168.0.188 'systemctl restart ollama'",
"ssh {host} 'docker restart minio'",
"ssh root@192.168.0.110 'cd /data/gitea && docker compose ps'",
])
def test_ssh_commands_pass(self, cmd):
"""SSH 指令一律通過(由主機層執行,不走 kubectl 路徑)"""
assert validate_kubectl_command(cmd) is True
@pytest.mark.parametrize("cmd", [
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"kubectl rollout restart deployment/postgresql -n awoooi-prod",
"kubectl scale deployment awoooi-api --replicas=3 -n awoooi-prod",
"kubectl delete pod awoooi-api-abc123 -n awoooi-prod",
"kubectl logs awoooi-api -n awoooi-prod --previous --tail=50",
"kubectl get pods -n awoooi-prod",
"kubectl describe node k3s-node-01",
"kubectl get nodes -o wide",
"kubectl autoscale deployment awoooi-api --min=2 --max=5 -n awoooi-prod",
"kubectl set resources deployment/awoooi-api --limits=memory=1Gi -n awoooi-prod",
])
def test_safe_kubectl_commands_pass(self, cmd):
"""常見合法 kubectl 指令應通過"""
assert validate_kubectl_command(cmd) is True
def test_kubectl_exec_with_psql_is_not_auto_executable(self):
"""kubectl exec 可執行任意 shell必須降級人工"""
cmd = (
"kubectl exec -n awoooi-prod deployment/postgresql -- "
"psql -U postgres -c 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity;'"
)
assert validate_kubectl_command(cmd) is False
def test_compound_kubectl_get_is_not_auto_executable(self):
"""compound shell 指令必須降級人工"""
cmd = "kubectl get pods -n monitoring && curl -s http://192.168.0.120:9093/api/v1/status"
assert validate_kubectl_command(cmd) is False
class TestRuleMatchingSpecificity:
"""具名 alertname 規則不得被寬鬆 message keyword 誤命中。"""
def test_host_storage_alert_does_not_match_minio_disk_rule(self):
ctx = {
"alert_type": "host",
"severity": "critical",
"source": "prometheus",
"target_resource": "dirty-reboot-evidence",
"namespace": "awoooi-prod",
"message": "HostPreviousBootStorageErrorsDetected storage dirty reboot evidence",
"labels": {
"alertname": "HostPreviousBootStorageErrorsDetected",
"instance": "192.168.0.110:9100",
},
}
result = match_rule(ctx)
assert result is not None
assert result["rule_id"] != "minio_disk_high"
assert "/data/minio" not in result.get("kubectl_command", "")
def test_exact_minio_disk_alert_still_matches_minio_rule(self):
ctx = {
"alert_type": "storage",
"severity": "critical",
"source": "prometheus",
"target_resource": "minio",
"namespace": "awoooi-prod",
"message": "MinIO disk usage high",
"labels": {
"alertname": "MinioDiskUsageHigh",
"instance": "192.168.0.110:9000",
},
}
result = match_rule(ctx)
assert result is not None
assert result["rule_id"] == "minio_disk_high"
def test_stock_site_down_matches_external_no_action_rule(self):
ctx = {
"alert_type": "service_404",
"severity": "critical",
"source": "prometheus",
"target_resource": "stock-platform",
"namespace": "awoooi-prod",
"message": "stock.wooo.work probe failed",
"labels": {
"alertname": "StockWoooWorkDown",
"instance": "http://stock.wooo.work",
"layer": "external",
"component": "stock-platform",
"host": "110",
},
}
result = match_rule(ctx)
assert result is not None
assert result["rule_id"] == "external_site_down"
assert result["suggested_action"] == "NO_ACTION"
assert result["kubectl_command"] == ""
# =============================================================================
# 阻擋案例(應返回 False
# =============================================================================
class TestDestructiveKubectlCommands:
"""破壞性模式應被阻擋(返回 False"""
@pytest.mark.parametrize("cmd", [
"kubectl delete pvc awoooi-data -n awoooi-prod",
"kubectl delete namespace awoooi-prod",
"kubectl delete statefulset postgresql -n awoooi-prod",
"kubectl delete deployment awoooi-api -n awoooi-prod",
])
def test_destructive_delete_blocked(self, cmd):
"""破壞性 kubectl delete 應被阻擋"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl scale deployment awoooi-api --replicas=0 -n awoooi-prod",
"kubectl scale deployment awoooi-api --replicas= 0 -n awoooi-prod",
# kubectl patch JSON 格式不在 YAML 規則集中,不納入測試範圍
])
def test_scale_to_zero_blocked(self, cmd):
"""縮容至零應被阻擋(--replicas=0 旗標形式)"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl drain k3s-node-01 --ignore-daemonsets",
"kubectl cordon k3s-node-01",
])
def test_node_eviction_blocked(self, cmd):
"""節點驅逐/封鎖應被阻擋"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
# ssh 開頭指令設計上信任(由人工寫入 YAML不是注入點
# 測試「非 ssh 前綴」的 rm -rf 情境
"rm -rf /tmp/test ",
"rm -f /important ",
"kubectl exec deploy/api -- rm -rf /data ",
])
def test_rm_rf_blocked(self, cmd):
"""rm -rf 應被阻擋(非 SSH 前綴的破壞性刪除)"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl exec -n prod deploy/pg -- psql -c 'DROP TABLE incidents;'",
"kubectl exec -n prod deploy/pg -- psql -c 'drop database awoooi'",
])
def test_sql_ddl_blocked(self, cmd):
"""破壞性 SQL DDL 應被阻擋"""
assert validate_kubectl_command(cmd) is False
@pytest.mark.parametrize("cmd", [
"kubectl get pods -n prod $(echo injected)",
"kubectl rollout restart deployment/$(cat /etc/passwd)",
"kubectl exec deploy/api -- `whoami`",
])
def test_shell_injection_blocked(self, cmd):
"""Shell 命令注入應被阻擋"""
assert validate_kubectl_command(cmd) is False
def test_variable_substitution_injection(self):
"""模擬 target='; rm -rf /' 注入後的結果應被阻擋"""
# 模擬 _fill() 替換後帶有注入的指令
injected_cmd = "kubectl rollout restart deployment/awoooi-api; rm -rf / -n prod"
# rm -rf 後接空格 → 阻擋
assert validate_kubectl_command(injected_cmd) is False