215 lines
8.2 KiB
Python
215 lines
8.2 KiB
Python
"""
|
||
AlertRuleEngine kubectl 注入防護測試
|
||
=====================================
|
||
Task 2.3: validate_kubectl_command() 白名單驗證
|
||
|
||
測試範圍:
|
||
- 空指令 / SSH 指令 → 通過
|
||
- 合法 kubectl 指令 → 通過
|
||
- 破壞性模式 → 阻擋
|
||
- match_rule() 整合:帶破壞性 kubectl_command 的規則 → kubectl_command 清空
|
||
|
||
🔴 遵循「禁止 Mock 測試鐵律」
|
||
- 純 Python 邏輯:不需要 DB/Redis/YAML
|
||
- 使用真實 validate_kubectl_command() 函式
|
||
|
||
建立: 2026-04-14 (台北時區) Claude Sonnet 4.6 (Task 2.3)
|
||
"""
|
||
|
||
import pytest
|
||
|
||
from src.services.alert_rule_engine import match_rule, validate_kubectl_command
|
||
|
||
|
||
# =============================================================================
|
||
# 通過案例(應返回 True)
|
||
# =============================================================================
|
||
|
||
|
||
class TestValidKubectlCommands:
|
||
"""合法指令應通過驗證"""
|
||
|
||
def test_empty_string(self):
|
||
"""空字串 → 通過(NO_ACTION 規則)"""
|
||
assert validate_kubectl_command("") is True
|
||
|
||
def test_none_like_empty(self):
|
||
"""另一種空字串"""
|
||
assert validate_kubectl_command(" ") is True
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"ssh 192.168.0.188 'systemctl restart ollama'",
|
||
"ssh {host} 'docker restart minio'",
|
||
"ssh root@192.168.0.110 'cd /data/gitea && docker compose ps'",
|
||
])
|
||
def test_ssh_commands_pass(self, cmd):
|
||
"""SSH 指令一律通過(由主機層執行,不走 kubectl 路徑)"""
|
||
assert validate_kubectl_command(cmd) is True
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
|
||
"kubectl rollout restart deployment/postgresql -n awoooi-prod",
|
||
"kubectl scale deployment awoooi-api --replicas=3 -n awoooi-prod",
|
||
"kubectl delete pod awoooi-api-abc123 -n awoooi-prod",
|
||
"kubectl logs awoooi-api -n awoooi-prod --previous --tail=50",
|
||
"kubectl get pods -n awoooi-prod",
|
||
"kubectl describe node k3s-node-01",
|
||
"kubectl get nodes -o wide",
|
||
"kubectl autoscale deployment awoooi-api --min=2 --max=5 -n awoooi-prod",
|
||
"kubectl set resources deployment/awoooi-api --limits=memory=1Gi -n awoooi-prod",
|
||
])
|
||
def test_safe_kubectl_commands_pass(self, cmd):
|
||
"""常見合法 kubectl 指令應通過"""
|
||
assert validate_kubectl_command(cmd) is True
|
||
|
||
def test_kubectl_exec_with_psql_is_not_auto_executable(self):
|
||
"""kubectl exec 可執行任意 shell,必須降級人工"""
|
||
cmd = (
|
||
"kubectl exec -n awoooi-prod deployment/postgresql -- "
|
||
"psql -U postgres -c 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity;'"
|
||
)
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
def test_compound_kubectl_get_is_not_auto_executable(self):
|
||
"""compound shell 指令必須降級人工"""
|
||
cmd = "kubectl get pods -n monitoring && curl -s http://192.168.0.120:9093/api/v1/status"
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
|
||
class TestRuleMatchingSpecificity:
|
||
"""具名 alertname 規則不得被寬鬆 message keyword 誤命中。"""
|
||
|
||
def test_host_storage_alert_does_not_match_minio_disk_rule(self):
|
||
ctx = {
|
||
"alert_type": "host",
|
||
"severity": "critical",
|
||
"source": "prometheus",
|
||
"target_resource": "dirty-reboot-evidence",
|
||
"namespace": "awoooi-prod",
|
||
"message": "HostPreviousBootStorageErrorsDetected storage dirty reboot evidence",
|
||
"labels": {
|
||
"alertname": "HostPreviousBootStorageErrorsDetected",
|
||
"instance": "192.168.0.110:9100",
|
||
},
|
||
}
|
||
|
||
result = match_rule(ctx)
|
||
|
||
assert result is not None
|
||
assert result["rule_id"] != "minio_disk_high"
|
||
assert "/data/minio" not in result.get("kubectl_command", "")
|
||
|
||
def test_exact_minio_disk_alert_still_matches_minio_rule(self):
|
||
ctx = {
|
||
"alert_type": "storage",
|
||
"severity": "critical",
|
||
"source": "prometheus",
|
||
"target_resource": "minio",
|
||
"namespace": "awoooi-prod",
|
||
"message": "MinIO disk usage high",
|
||
"labels": {
|
||
"alertname": "MinioDiskUsageHigh",
|
||
"instance": "192.168.0.110:9000",
|
||
},
|
||
}
|
||
|
||
result = match_rule(ctx)
|
||
|
||
assert result is not None
|
||
assert result["rule_id"] == "minio_disk_high"
|
||
|
||
def test_stock_site_down_matches_external_no_action_rule(self):
|
||
ctx = {
|
||
"alert_type": "service_404",
|
||
"severity": "critical",
|
||
"source": "prometheus",
|
||
"target_resource": "stock-platform",
|
||
"namespace": "awoooi-prod",
|
||
"message": "stock.wooo.work probe failed",
|
||
"labels": {
|
||
"alertname": "StockWoooWorkDown",
|
||
"instance": "http://stock.wooo.work",
|
||
"layer": "external",
|
||
"component": "stock-platform",
|
||
"host": "110",
|
||
},
|
||
}
|
||
|
||
result = match_rule(ctx)
|
||
|
||
assert result is not None
|
||
assert result["rule_id"] == "external_site_down"
|
||
assert result["suggested_action"] == "NO_ACTION"
|
||
assert result["kubectl_command"] == ""
|
||
|
||
|
||
# =============================================================================
|
||
# 阻擋案例(應返回 False)
|
||
# =============================================================================
|
||
|
||
|
||
class TestDestructiveKubectlCommands:
|
||
"""破壞性模式應被阻擋(返回 False)"""
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl delete pvc awoooi-data -n awoooi-prod",
|
||
"kubectl delete namespace awoooi-prod",
|
||
"kubectl delete statefulset postgresql -n awoooi-prod",
|
||
"kubectl delete deployment awoooi-api -n awoooi-prod",
|
||
])
|
||
def test_destructive_delete_blocked(self, cmd):
|
||
"""破壞性 kubectl delete 應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl scale deployment awoooi-api --replicas=0 -n awoooi-prod",
|
||
"kubectl scale deployment awoooi-api --replicas= 0 -n awoooi-prod",
|
||
# kubectl patch JSON 格式不在 YAML 規則集中,不納入測試範圍
|
||
])
|
||
def test_scale_to_zero_blocked(self, cmd):
|
||
"""縮容至零應被阻擋(--replicas=0 旗標形式)"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl drain k3s-node-01 --ignore-daemonsets",
|
||
"kubectl cordon k3s-node-01",
|
||
])
|
||
def test_node_eviction_blocked(self, cmd):
|
||
"""節點驅逐/封鎖應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
# ssh 開頭指令設計上信任(由人工寫入 YAML,不是注入點)
|
||
# 測試「非 ssh 前綴」的 rm -rf 情境
|
||
"rm -rf /tmp/test ",
|
||
"rm -f /important ",
|
||
"kubectl exec deploy/api -- rm -rf /data ",
|
||
])
|
||
def test_rm_rf_blocked(self, cmd):
|
||
"""rm -rf 應被阻擋(非 SSH 前綴的破壞性刪除)"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl exec -n prod deploy/pg -- psql -c 'DROP TABLE incidents;'",
|
||
"kubectl exec -n prod deploy/pg -- psql -c 'drop database awoooi'",
|
||
])
|
||
def test_sql_ddl_blocked(self, cmd):
|
||
"""破壞性 SQL DDL 應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl get pods -n prod $(echo injected)",
|
||
"kubectl rollout restart deployment/$(cat /etc/passwd)",
|
||
"kubectl exec deploy/api -- `whoami`",
|
||
])
|
||
def test_shell_injection_blocked(self, cmd):
|
||
"""Shell 命令注入應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
def test_variable_substitution_injection(self):
|
||
"""模擬 target='; rm -rf /' 注入後的結果應被阻擋"""
|
||
# 模擬 _fill() 替換後帶有注入的指令
|
||
injected_cmd = "kubectl rollout restart deployment/awoooi-api; rm -rf / -n prod"
|
||
# rm -rf 後接空格 → 阻擋
|
||
assert validate_kubectl_command(injected_cmd) is False
|