diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index 9e4e1535..0bf6154b 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -1334,6 +1334,62 @@ async def _process_new_alert_background( except Exception as _shadow_err_cs2: logger.warning("shadow_auto_approve_failed", error=str(_shadow_err_cs2)) + # 2026-04-27 ogt + Claude Sonnet 4.6: CS2 規則引擎自動執行 + # 設計:is_rule_based=True 確定性高,滿足條件直接執行,不等人工審核 + # 安全防線:CRITICAL / destructive patterns / NO_ACTION / 空 kubectl → 全部降級 PENDING + try: + from src.services.auto_approve import _DESTRUCTIVE_PATTERNS + from src.models.approval import ApprovalRequest, ApprovalStatus + from src.services.approval_execution import ApprovalExecutionService + + _destructive_set = set(p.lower() for p in _DESTRUCTIVE_PATTERNS) + _can_auto = ( + bool(rule_kubectl) + and rule_risk != RiskLevel.CRITICAL + and not any(p in rule_kubectl.lower() for p in _destructive_set) + and "NO_ACTION" not in rule_action + ) + if _can_auto: + _auto_approval = ApprovalRequest( + incident_id=None, # 尚未建立,稍後 update_incident_id 補上 + action=rule_action, + description=approval_create.description, + requested_by="auto_approve_rule_engine", + required_signatures=0, + status=ApprovalStatus.APPROVED, + risk_level=rule_risk.value, + matched_playbook_id=_approval_metadata_cs2.get("playbook_id"), + ) + # 使用 DB 中剛建立的 approval.id 讓 executor 可回寫 + _auto_approval.id = approval.id + + _cs2_executor = ApprovalExecutionService() + _cs2_exec_success = await _cs2_executor.execute_approved_action(_auto_approval) + + # 更新 DB approval 執行狀態 + try: + await service.update_execution_status(approval.id, _cs2_exec_success) + except Exception as _upd_err: + logger.warning( + "cs2_auto_execute_status_update_failed", + approval_id=str(approval.id), + error=str(_upd_err), + ) + + logger.info( + "rule_engine_auto_executed", + approval_id=str(approval.id), + rule_id=rule_response.get("rule_id", "unknown"), + kubectl=rule_kubectl, + exec_success=_cs2_exec_success, + ) + except Exception as _auto_err: + logger.warning( + "cs2_auto_execute_failed_degraded_to_pending", + approval_id=str(approval.id), + error=str(_auto_err), + ) + incident_id = await create_incident_for_approval( approval_id=str(approval.id), risk_level=rule_risk.value, diff --git a/apps/api/tests/test_rule_engine_auto_execute.py b/apps/api/tests/test_rule_engine_auto_execute.py new file mode 100644 index 00000000..44d989df --- /dev/null +++ b/apps/api/tests/test_rule_engine_auto_execute.py @@ -0,0 +1,166 @@ +""" +CS2 規則引擎自動執行條件邏輯測試 +================================== +測試範圍:webhooks.py CS2 路徑的 _can_auto 條件判斷邏輯 + +設計原則: +- 不測試 DB/K8s 端點(屬 integration test 範疇) +- 直接測試 _can_auto 的五個安全條件是否正確 +- 從 auto_approve 導入真實的 _DESTRUCTIVE_PATTERNS(不 mock) + +安全防線驗證: +1. rule_kubectl 空字串 → False +2. CRITICAL risk → False +3. DESTRUCTIVE_PATTERNS 命中 → False +4. NO_ACTION 在 rule_action → False +5. 全條件滿足 → True + +建立:2026-04-27 ogt + Claude Sonnet 4.6 (台北時區) +""" + +from src.models.approval import RiskLevel +from src.services.auto_approve import _DESTRUCTIVE_PATTERNS + + +def _evaluate_can_auto( + rule_kubectl: str, + rule_risk: RiskLevel, + rule_action: str, +) -> bool: + """ + 複製 webhooks.py CS2 路徑的 _can_auto 邏輯,用於單元測試。 + 任何修改 webhooks.py 邏輯的人,必須同步更新此函數。 + """ + _destructive_set = set(p.lower() for p in _DESTRUCTIVE_PATTERNS) + return ( + bool(rule_kubectl) + and rule_risk != RiskLevel.CRITICAL + and not any(p in rule_kubectl.lower() for p in _destructive_set) + and "NO_ACTION" not in rule_action + ) + + +class TestCS2CanAutoConditions: + """驗證 CS2 _can_auto 五個安全防線""" + + # ── 正向:全條件滿足 ───────────────────────────────────────────────── + + def test_all_conditions_met_returns_true(self): + """kubectl 非空 + medium risk + 無破壞性 + 非 NO_ACTION → True""" + assert _evaluate_can_auto( + rule_kubectl="kubectl rollout restart deployment/api -n prod", + rule_risk=RiskLevel.MEDIUM, + rule_action="重啟 API Deployment | kubectl rollout restart deployment/api -n prod", + ) is True + + def test_low_risk_returns_true(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl rollout restart deployment/worker -n prod", + rule_risk=RiskLevel.LOW, + rule_action="重啟 Worker | kubectl rollout restart deployment/worker -n prod", + ) is True + + # ── 防線 1:kubectl 空字串 ───────────────────────────────────────────── + + def test_empty_kubectl_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="", + rule_risk=RiskLevel.LOW, + rule_action="NO_ACTION - 主機記憶體觀察", + ) is False + + def test_whitespace_only_kubectl_returns_false(self): + # webhooks.py line 1265: rule_kubectl = str(...).strip() + # 因此 whitespace-only 在到達 _can_auto 前已被 strip 為空字串 + # 此測試驗證調用方的前置處理行為 + assert _evaluate_can_auto( + rule_kubectl="", # strip() 後的結果 + rule_risk=RiskLevel.LOW, + rule_action="重啟 | 某指令", + ) is False + + # ── 防線 2:CRITICAL risk ───────────────────────────────────────────── + + def test_critical_risk_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl rollout restart deployment/db -n prod", + rule_risk=RiskLevel.CRITICAL, + rule_action="重啟 DB | kubectl rollout restart deployment/db -n prod", + ) is False + + # ── 防線 3:DESTRUCTIVE_PATTERNS ──────────────────────────────────── + + def test_delete_pod_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl delete pod api-xxx-yyy -n prod", + rule_risk=RiskLevel.LOW, + rule_action="刪除 Pod | kubectl delete pod api-xxx-yyy -n prod", + ) is False + + def test_delete_pods_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl delete pods --all -n prod", + rule_risk=RiskLevel.LOW, + rule_action="刪除所有 Pod | kubectl delete pods --all -n prod", + ) is False + + def test_scale_to_zero_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl scale deployment/api --replicas=0 -n prod", + rule_risk=RiskLevel.LOW, + rule_action="縮容 | kubectl scale deployment/api --replicas=0", + ) is False + + def test_kubectl_drain_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl drain node-1 --ignore-daemonsets", + rule_risk=RiskLevel.MEDIUM, + rule_action="驅逐節點 | kubectl drain node-1", + ) is False + + def test_rollout_undo_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl rollout undo deployment/api -n prod", + rule_risk=RiskLevel.MEDIUM, + rule_action="回滾 | kubectl rollout undo deployment/api", + ) is False + + def test_destructive_pattern_case_insensitive(self): + """大寫 DELETE POD 也必須攔截""" + assert _evaluate_can_auto( + rule_kubectl="kubectl DELETE POD api-xxx -n prod", + rule_risk=RiskLevel.LOW, + rule_action="刪除 | kubectl DELETE POD", + ) is False + + # ── 防線 4:NO_ACTION ──────────────────────────────────────────────── + + def test_no_action_in_action_string_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="kubectl get pods -n prod", + rule_risk=RiskLevel.LOW, + rule_action="NO_ACTION - 觀察主機狀態", + ) is False + + def test_no_action_prefix_returns_false(self): + assert _evaluate_can_auto( + rule_kubectl="", + rule_risk=RiskLevel.LOW, + rule_action="NO_ACTION - 主機記憶體正常", + ) is False + + # ── 邊界:_DESTRUCTIVE_PATTERNS 非空 ──────────────────────────────── + + def test_destructive_patterns_list_not_empty(self): + """確保 _DESTRUCTIVE_PATTERNS 未被意外清空""" + assert len(_DESTRUCTIVE_PATTERNS) > 0 + + # ── 組合:多個防線同時觸發 ─────────────────────────────────────────── + + def test_critical_and_destructive_both_block(self): + """CRITICAL + destructive,任一條件都應攔截""" + assert _evaluate_can_auto( + rule_kubectl="kubectl delete deployment/api -n prod", + rule_risk=RiskLevel.CRITICAL, + rule_action="刪除 Deployment | kubectl delete deployment/api", + ) is False