feat(auto-rate): rule_engine 路徑開啟自動執行,預計 42% → 70%+
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

修法 3(debugger 建議):CS2 is_rule_based=True + kubectl 有值 + 非 CRITICAL/DESTRUCTIVE → 直接 auto-execute,不建 PENDING record

安全防線(5 層):
- CRITICAL risk → 絕對不自動執行
- _DESTRUCTIVE_PATTERNS 命中 → 絕對不自動執行
- NO_ACTION → 不執行
- kubectl 空字串 → 不執行
- 任何例外 → catch + 降級到 PENDING,不 crash

15 tests 驗收(1487 passed)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-27 16:08:50 +08:00
parent a184b82ed1
commit e5f8d90451
2 changed files with 222 additions and 0 deletions

View File

@@ -1334,6 +1334,62 @@ async def _process_new_alert_background(
except Exception as _shadow_err_cs2:
logger.warning("shadow_auto_approve_failed", error=str(_shadow_err_cs2))
# 2026-04-27 ogt + Claude Sonnet 4.6: CS2 規則引擎自動執行
# 設計is_rule_based=True 確定性高,滿足條件直接執行,不等人工審核
# 安全防線CRITICAL / destructive patterns / NO_ACTION / 空 kubectl → 全部降級 PENDING
try:
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS
from src.models.approval import ApprovalRequest, ApprovalStatus
from src.services.approval_execution import ApprovalExecutionService
_destructive_set = set(p.lower() for p in _DESTRUCTIVE_PATTERNS)
_can_auto = (
bool(rule_kubectl)
and rule_risk != RiskLevel.CRITICAL
and not any(p in rule_kubectl.lower() for p in _destructive_set)
and "NO_ACTION" not in rule_action
)
if _can_auto:
_auto_approval = ApprovalRequest(
incident_id=None, # 尚未建立,稍後 update_incident_id 補上
action=rule_action,
description=approval_create.description,
requested_by="auto_approve_rule_engine",
required_signatures=0,
status=ApprovalStatus.APPROVED,
risk_level=rule_risk.value,
matched_playbook_id=_approval_metadata_cs2.get("playbook_id"),
)
# 使用 DB 中剛建立的 approval.id 讓 executor 可回寫
_auto_approval.id = approval.id
_cs2_executor = ApprovalExecutionService()
_cs2_exec_success = await _cs2_executor.execute_approved_action(_auto_approval)
# 更新 DB approval 執行狀態
try:
await service.update_execution_status(approval.id, _cs2_exec_success)
except Exception as _upd_err:
logger.warning(
"cs2_auto_execute_status_update_failed",
approval_id=str(approval.id),
error=str(_upd_err),
)
logger.info(
"rule_engine_auto_executed",
approval_id=str(approval.id),
rule_id=rule_response.get("rule_id", "unknown"),
kubectl=rule_kubectl,
exec_success=_cs2_exec_success,
)
except Exception as _auto_err:
logger.warning(
"cs2_auto_execute_failed_degraded_to_pending",
approval_id=str(approval.id),
error=str(_auto_err),
)
incident_id = await create_incident_for_approval(
approval_id=str(approval.id),
risk_level=rule_risk.value,

View File

@@ -0,0 +1,166 @@
"""
CS2 規則引擎自動執行條件邏輯測試
==================================
測試範圍webhooks.py CS2 路徑的 _can_auto 條件判斷邏輯
設計原則:
- 不測試 DB/K8s 端點(屬 integration test 範疇)
- 直接測試 _can_auto 的五個安全條件是否正確
- 從 auto_approve 導入真實的 _DESTRUCTIVE_PATTERNS不 mock
安全防線驗證:
1. rule_kubectl 空字串 → False
2. CRITICAL risk → False
3. DESTRUCTIVE_PATTERNS 命中 → False
4. NO_ACTION 在 rule_action → False
5. 全條件滿足 → True
建立2026-04-27 ogt + Claude Sonnet 4.6 (台北時區)
"""
from src.models.approval import RiskLevel
from src.services.auto_approve import _DESTRUCTIVE_PATTERNS
def _evaluate_can_auto(
rule_kubectl: str,
rule_risk: RiskLevel,
rule_action: str,
) -> bool:
"""
複製 webhooks.py CS2 路徑的 _can_auto 邏輯,用於單元測試。
任何修改 webhooks.py 邏輯的人,必須同步更新此函數。
"""
_destructive_set = set(p.lower() for p in _DESTRUCTIVE_PATTERNS)
return (
bool(rule_kubectl)
and rule_risk != RiskLevel.CRITICAL
and not any(p in rule_kubectl.lower() for p in _destructive_set)
and "NO_ACTION" not in rule_action
)
class TestCS2CanAutoConditions:
"""驗證 CS2 _can_auto 五個安全防線"""
# ── 正向:全條件滿足 ─────────────────────────────────────────────────
def test_all_conditions_met_returns_true(self):
"""kubectl 非空 + medium risk + 無破壞性 + 非 NO_ACTION → True"""
assert _evaluate_can_auto(
rule_kubectl="kubectl rollout restart deployment/api -n prod",
rule_risk=RiskLevel.MEDIUM,
rule_action="重啟 API Deployment | kubectl rollout restart deployment/api -n prod",
) is True
def test_low_risk_returns_true(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl rollout restart deployment/worker -n prod",
rule_risk=RiskLevel.LOW,
rule_action="重啟 Worker | kubectl rollout restart deployment/worker -n prod",
) is True
# ── 防線 1kubectl 空字串 ─────────────────────────────────────────────
def test_empty_kubectl_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="",
rule_risk=RiskLevel.LOW,
rule_action="NO_ACTION - 主機記憶體觀察",
) is False
def test_whitespace_only_kubectl_returns_false(self):
# webhooks.py line 1265: rule_kubectl = str(...).strip()
# 因此 whitespace-only 在到達 _can_auto 前已被 strip 為空字串
# 此測試驗證調用方的前置處理行為
assert _evaluate_can_auto(
rule_kubectl="", # strip() 後的結果
rule_risk=RiskLevel.LOW,
rule_action="重啟 | 某指令",
) is False
# ── 防線 2CRITICAL risk ─────────────────────────────────────────────
def test_critical_risk_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl rollout restart deployment/db -n prod",
rule_risk=RiskLevel.CRITICAL,
rule_action="重啟 DB | kubectl rollout restart deployment/db -n prod",
) is False
# ── 防線 3DESTRUCTIVE_PATTERNS ────────────────────────────────────
def test_delete_pod_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl delete pod api-xxx-yyy -n prod",
rule_risk=RiskLevel.LOW,
rule_action="刪除 Pod | kubectl delete pod api-xxx-yyy -n prod",
) is False
def test_delete_pods_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl delete pods --all -n prod",
rule_risk=RiskLevel.LOW,
rule_action="刪除所有 Pod | kubectl delete pods --all -n prod",
) is False
def test_scale_to_zero_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl scale deployment/api --replicas=0 -n prod",
rule_risk=RiskLevel.LOW,
rule_action="縮容 | kubectl scale deployment/api --replicas=0",
) is False
def test_kubectl_drain_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl drain node-1 --ignore-daemonsets",
rule_risk=RiskLevel.MEDIUM,
rule_action="驅逐節點 | kubectl drain node-1",
) is False
def test_rollout_undo_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl rollout undo deployment/api -n prod",
rule_risk=RiskLevel.MEDIUM,
rule_action="回滾 | kubectl rollout undo deployment/api",
) is False
def test_destructive_pattern_case_insensitive(self):
"""大寫 DELETE POD 也必須攔截"""
assert _evaluate_can_auto(
rule_kubectl="kubectl DELETE POD api-xxx -n prod",
rule_risk=RiskLevel.LOW,
rule_action="刪除 | kubectl DELETE POD",
) is False
# ── 防線 4NO_ACTION ────────────────────────────────────────────────
def test_no_action_in_action_string_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="kubectl get pods -n prod",
rule_risk=RiskLevel.LOW,
rule_action="NO_ACTION - 觀察主機狀態",
) is False
def test_no_action_prefix_returns_false(self):
assert _evaluate_can_auto(
rule_kubectl="",
rule_risk=RiskLevel.LOW,
rule_action="NO_ACTION - 主機記憶體正常",
) is False
# ── 邊界_DESTRUCTIVE_PATTERNS 非空 ────────────────────────────────
def test_destructive_patterns_list_not_empty(self):
"""確保 _DESTRUCTIVE_PATTERNS 未被意外清空"""
assert len(_DESTRUCTIVE_PATTERNS) > 0
# ── 組合:多個防線同時觸發 ───────────────────────────────────────────
def test_critical_and_destructive_both_block(self):
"""CRITICAL + destructive任一條件都應攔截"""
assert _evaluate_can_auto(
rule_kubectl="kubectl delete deployment/api -n prod",
rule_risk=RiskLevel.CRITICAL,
rule_action="刪除 Deployment | kubectl delete deployment/api",
) is False