""" AlertRuleEngine kubectl 注入防護測試 ===================================== Task 2.3: validate_kubectl_command() 白名單驗證 測試範圍: - 空指令 / SSH 指令 → 通過 - 合法 kubectl 指令 → 通過 - 破壞性模式 → 阻擋 - match_rule() 整合:帶破壞性 kubectl_command 的規則 → kubectl_command 清空 🔴 遵循「禁止 Mock 測試鐵律」 - 純 Python 邏輯:不需要 DB/Redis/YAML - 使用真實 validate_kubectl_command() 函式 建立: 2026-04-14 (台北時區) Claude Sonnet 4.6 (Task 2.3) """ import pytest from src.services.alert_rule_engine import match_rule, validate_kubectl_command # ============================================================================= # 通過案例(應返回 True) # ============================================================================= class TestValidKubectlCommands: """合法指令應通過驗證""" def test_empty_string(self): """空字串 → 通過(NO_ACTION 規則)""" assert validate_kubectl_command("") is True def test_none_like_empty(self): """另一種空字串""" assert validate_kubectl_command(" ") is True @pytest.mark.parametrize("cmd", [ "ssh 192.168.0.188 'systemctl restart ollama'", "ssh {host} 'docker restart minio'", "ssh root@192.168.0.110 'cd /data/gitea && docker compose ps'", ]) def test_ssh_commands_pass(self, cmd): """SSH 指令一律通過(由主機層執行,不走 kubectl 路徑)""" assert validate_kubectl_command(cmd) is True @pytest.mark.parametrize("cmd", [ "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", "kubectl rollout restart deployment/postgresql -n awoooi-prod", "kubectl scale deployment awoooi-api --replicas=3 -n awoooi-prod", "kubectl delete pod awoooi-api-abc123 -n awoooi-prod", "kubectl logs awoooi-api -n awoooi-prod --previous --tail=50", "kubectl get pods -n awoooi-prod", "kubectl describe node k3s-node-01", "kubectl get nodes -o wide", "kubectl autoscale deployment awoooi-api --min=2 --max=5 -n awoooi-prod", "kubectl set resources deployment/awoooi-api --limits=memory=1Gi -n awoooi-prod", ]) def test_safe_kubectl_commands_pass(self, cmd): """常見合法 kubectl 指令應通過""" assert validate_kubectl_command(cmd) is True def test_kubectl_exec_with_psql_is_not_auto_executable(self): """kubectl exec 可執行任意 shell,必須降級人工""" cmd = ( "kubectl exec -n awoooi-prod deployment/postgresql -- " "psql -U postgres -c 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity;'" ) assert validate_kubectl_command(cmd) is False def test_compound_kubectl_get_is_not_auto_executable(self): """compound shell 指令必須降級人工""" cmd = "kubectl get pods -n monitoring && curl -s http://192.168.0.120:9093/api/v1/status" assert validate_kubectl_command(cmd) is False class TestRuleMatchingSpecificity: """具名 alertname 規則不得被寬鬆 message keyword 誤命中。""" def test_host_storage_alert_does_not_match_minio_disk_rule(self): ctx = { "alert_type": "host", "severity": "critical", "source": "prometheus", "target_resource": "dirty-reboot-evidence", "namespace": "awoooi-prod", "message": "HostPreviousBootStorageErrorsDetected storage dirty reboot evidence", "labels": { "alertname": "HostPreviousBootStorageErrorsDetected", "instance": "192.168.0.110:9100", }, } result = match_rule(ctx) assert result is not None assert result["rule_id"] != "minio_disk_high" assert "/data/minio" not in result.get("kubectl_command", "") def test_exact_minio_disk_alert_still_matches_minio_rule(self): ctx = { "alert_type": "storage", "severity": "critical", "source": "prometheus", "target_resource": "minio", "namespace": "awoooi-prod", "message": "MinIO disk usage high", "labels": { "alertname": "MinioDiskUsageHigh", "instance": "192.168.0.110:9000", }, } result = match_rule(ctx) assert result is not None assert result["rule_id"] == "minio_disk_high" def test_stock_site_down_matches_external_no_action_rule(self): ctx = { "alert_type": "service_404", "severity": "critical", "source": "prometheus", "target_resource": "stock-platform", "namespace": "awoooi-prod", "message": "stock.wooo.work probe failed", "labels": { "alertname": "StockWoooWorkDown", "instance": "http://stock.wooo.work", "layer": "external", "component": "stock-platform", "host": "110", }, } result = match_rule(ctx) assert result is not None assert result["rule_id"] == "external_site_down" assert result["suggested_action"] == "NO_ACTION" assert result["kubectl_command"] == "" # ============================================================================= # 阻擋案例(應返回 False) # ============================================================================= class TestDestructiveKubectlCommands: """破壞性模式應被阻擋(返回 False)""" @pytest.mark.parametrize("cmd", [ "kubectl delete pvc awoooi-data -n awoooi-prod", "kubectl delete namespace awoooi-prod", "kubectl delete statefulset postgresql -n awoooi-prod", "kubectl delete deployment awoooi-api -n awoooi-prod", ]) def test_destructive_delete_blocked(self, cmd): """破壞性 kubectl delete 應被阻擋""" assert validate_kubectl_command(cmd) is False @pytest.mark.parametrize("cmd", [ "kubectl scale deployment awoooi-api --replicas=0 -n awoooi-prod", "kubectl scale deployment awoooi-api --replicas= 0 -n awoooi-prod", # kubectl patch JSON 格式不在 YAML 規則集中,不納入測試範圍 ]) def test_scale_to_zero_blocked(self, cmd): """縮容至零應被阻擋(--replicas=0 旗標形式)""" assert validate_kubectl_command(cmd) is False @pytest.mark.parametrize("cmd", [ "kubectl drain k3s-node-01 --ignore-daemonsets", "kubectl cordon k3s-node-01", ]) def test_node_eviction_blocked(self, cmd): """節點驅逐/封鎖應被阻擋""" assert validate_kubectl_command(cmd) is False @pytest.mark.parametrize("cmd", [ # ssh 開頭指令設計上信任(由人工寫入 YAML,不是注入點) # 測試「非 ssh 前綴」的 rm -rf 情境 "rm -rf /tmp/test ", "rm -f /important ", "kubectl exec deploy/api -- rm -rf /data ", ]) def test_rm_rf_blocked(self, cmd): """rm -rf 應被阻擋(非 SSH 前綴的破壞性刪除)""" assert validate_kubectl_command(cmd) is False @pytest.mark.parametrize("cmd", [ "kubectl exec -n prod deploy/pg -- psql -c 'DROP TABLE incidents;'", "kubectl exec -n prod deploy/pg -- psql -c 'drop database awoooi'", ]) def test_sql_ddl_blocked(self, cmd): """破壞性 SQL DDL 應被阻擋""" assert validate_kubectl_command(cmd) is False @pytest.mark.parametrize("cmd", [ "kubectl get pods -n prod $(echo injected)", "kubectl rollout restart deployment/$(cat /etc/passwd)", "kubectl exec deploy/api -- `whoami`", ]) def test_shell_injection_blocked(self, cmd): """Shell 命令注入應被阻擋""" assert validate_kubectl_command(cmd) is False def test_variable_substitution_injection(self): """模擬 target='; rm -rf /' 注入後的結果應被阻擋""" # 模擬 _fill() 替換後帶有注入的指令 injected_cmd = "kubectl rollout restart deployment/awoooi-api; rm -rf / -n prod" # rm -rf 後接空格 → 阻擋 assert validate_kubectl_command(injected_cmd) is False