Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Task 2.2: alert_rules.yaml 新增 3 類規則 (priority 125-127)
- gitea_down: Gitea CI/CD 下線 → NO_ACTION (priority 125, critical)
- ssl_cert_expiring: SSL 憑證到期 → NO_ACTION (priority 126, medium)
- external_site_down: MoWoooWork/Dev/Blackbox probe → NO_ACTION (priority 127, medium)
規則總數: 21 → 24
Task 2.3: alert_rule_engine.py kubectl 注入防護
- _RULE_ENGINE_DESTRUCTIVE_RE: 阻擋 delete pvc/namespace/statefulset/deployment,
drain/cordon, --replicas=0, rm -rf, DROP TABLE, $() 反引號
- validate_kubectl_command(): 公開 API,SSH 指令/空字串直接通過
- match_rule() 整合: 變數替換後驗證,阻擋時清空 + log warning
- test_alert_rule_engine_validation.py: 34 tests (100% 通過)
測試: 776 passed, 26 skipped, 0 failed
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
148 lines
5.9 KiB
Python
148 lines
5.9 KiB
Python
"""
|
||
AlertRuleEngine kubectl 注入防護測試
|
||
=====================================
|
||
Task 2.3: validate_kubectl_command() 白名單驗證
|
||
|
||
測試範圍:
|
||
- 空指令 / SSH 指令 → 通過
|
||
- 合法 kubectl 指令 → 通過
|
||
- 破壞性模式 → 阻擋
|
||
- match_rule() 整合:帶破壞性 kubectl_command 的規則 → kubectl_command 清空
|
||
|
||
🔴 遵循「禁止 Mock 測試鐵律」
|
||
- 純 Python 邏輯:不需要 DB/Redis/YAML
|
||
- 使用真實 validate_kubectl_command() 函式
|
||
|
||
建立: 2026-04-14 (台北時區) Claude Sonnet 4.6 (Task 2.3)
|
||
"""
|
||
|
||
import pytest
|
||
|
||
from src.services.alert_rule_engine import validate_kubectl_command
|
||
|
||
|
||
# =============================================================================
|
||
# 通過案例(應返回 True)
|
||
# =============================================================================
|
||
|
||
|
||
class TestValidKubectlCommands:
|
||
"""合法指令應通過驗證"""
|
||
|
||
def test_empty_string(self):
|
||
"""空字串 → 通過(NO_ACTION 規則)"""
|
||
assert validate_kubectl_command("") is True
|
||
|
||
def test_none_like_empty(self):
|
||
"""另一種空字串"""
|
||
assert validate_kubectl_command(" ") is True
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"ssh 192.168.0.188 'systemctl restart ollama'",
|
||
"ssh {host} 'docker restart minio'",
|
||
"ssh root@192.168.0.110 'cd /data/gitea && docker compose ps'",
|
||
])
|
||
def test_ssh_commands_pass(self, cmd):
|
||
"""SSH 指令一律通過(由主機層執行,不走 kubectl 路徑)"""
|
||
assert validate_kubectl_command(cmd) is True
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
|
||
"kubectl rollout restart deployment/postgresql -n awoooi-prod",
|
||
"kubectl scale deployment awoooi-api --replicas=3 -n awoooi-prod",
|
||
"kubectl delete pod awoooi-api-abc123 -n awoooi-prod",
|
||
"kubectl logs awoooi-api -n awoooi-prod --previous --tail=50",
|
||
"kubectl get pods -n awoooi-prod",
|
||
"kubectl describe node k3s-node-01",
|
||
"kubectl get nodes -o wide",
|
||
"kubectl autoscale deployment awoooi-api --min=2 --max=5 -n awoooi-prod",
|
||
"kubectl set resources deployment/awoooi-api --limits=memory=1Gi -n awoooi-prod",
|
||
])
|
||
def test_safe_kubectl_commands_pass(self, cmd):
|
||
"""常見合法 kubectl 指令應通過"""
|
||
assert validate_kubectl_command(cmd) is True
|
||
|
||
def test_kubectl_exec_with_psql(self):
|
||
"""kubectl exec 查詢(含 SQL SELECT)→ 通過"""
|
||
cmd = (
|
||
"kubectl exec -n awoooi-prod deployment/postgresql -- "
|
||
"psql -U postgres -c 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity;'"
|
||
)
|
||
assert validate_kubectl_command(cmd) is True
|
||
|
||
def test_kubectl_get_with_jq(self):
|
||
"""kubectl get + pipe → 通過"""
|
||
cmd = "kubectl get pods -n monitoring && curl -s http://192.168.0.120:9093/api/v1/status"
|
||
assert validate_kubectl_command(cmd) is True
|
||
|
||
|
||
# =============================================================================
|
||
# 阻擋案例(應返回 False)
|
||
# =============================================================================
|
||
|
||
|
||
class TestDestructiveKubectlCommands:
|
||
"""破壞性模式應被阻擋(返回 False)"""
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl delete pvc awoooi-data -n awoooi-prod",
|
||
"kubectl delete namespace awoooi-prod",
|
||
"kubectl delete statefulset postgresql -n awoooi-prod",
|
||
"kubectl delete deployment awoooi-api -n awoooi-prod",
|
||
])
|
||
def test_destructive_delete_blocked(self, cmd):
|
||
"""破壞性 kubectl delete 應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl scale deployment awoooi-api --replicas=0 -n awoooi-prod",
|
||
"kubectl scale deployment awoooi-api --replicas= 0 -n awoooi-prod",
|
||
# kubectl patch JSON 格式不在 YAML 規則集中,不納入測試範圍
|
||
])
|
||
def test_scale_to_zero_blocked(self, cmd):
|
||
"""縮容至零應被阻擋(--replicas=0 旗標形式)"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl drain k3s-node-01 --ignore-daemonsets",
|
||
"kubectl cordon k3s-node-01",
|
||
])
|
||
def test_node_eviction_blocked(self, cmd):
|
||
"""節點驅逐/封鎖應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
# ssh 開頭指令設計上信任(由人工寫入 YAML,不是注入點)
|
||
# 測試「非 ssh 前綴」的 rm -rf 情境
|
||
"rm -rf /tmp/test ",
|
||
"rm -f /important ",
|
||
"kubectl exec deploy/api -- rm -rf /data ",
|
||
])
|
||
def test_rm_rf_blocked(self, cmd):
|
||
"""rm -rf 應被阻擋(非 SSH 前綴的破壞性刪除)"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl exec -n prod deploy/pg -- psql -c 'DROP TABLE incidents;'",
|
||
"kubectl exec -n prod deploy/pg -- psql -c 'drop database awoooi'",
|
||
])
|
||
def test_sql_ddl_blocked(self, cmd):
|
||
"""破壞性 SQL DDL 應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
@pytest.mark.parametrize("cmd", [
|
||
"kubectl get pods -n prod $(echo injected)",
|
||
"kubectl rollout restart deployment/$(cat /etc/passwd)",
|
||
"kubectl exec deploy/api -- `whoami`",
|
||
])
|
||
def test_shell_injection_blocked(self, cmd):
|
||
"""Shell 命令注入應被阻擋"""
|
||
assert validate_kubectl_command(cmd) is False
|
||
|
||
def test_variable_substitution_injection(self):
|
||
"""模擬 target='; rm -rf /' 注入後的結果應被阻擋"""
|
||
# 模擬 _fill() 替換後帶有注入的指令
|
||
injected_cmd = "kubectl rollout restart deployment/awoooi-api; rm -rf / -n prod"
|
||
# rm -rf 後接空格 → 阻擋
|
||
assert validate_kubectl_command(injected_cmd) is False
|