""" Auto-Approve Service - Phase 4 自動執行策略 ========================================== ADR-030: 智能自動修復系統 自動執行條件 (全部滿足才放行): 1. 風險等級 = LOW 2. 信任度 >= 90% (或 TrustEngine score >= 5) 3. 有匹配的 Playbook 且成功率 >= 95% 4. Playbook 成功執行次數 >= 3 設計原則: - 保守策略 (寧可人工審核,不可錯誤自動執行) - 完整審計追蹤 - CRITICAL 永遠不自動執行 版本: v1.0 建立: 2026-03-26 (台北時區) """ from dataclasses import dataclass, field from datetime import UTC, datetime from enum import Enum from typing import Any import structlog from src.models.playbook import Playbook from src.services.action_parser import parse_kubectl_action from src.services.playbook_rag import PlaybookMatch from src.services.trust_engine import TrustScoreManager, get_trust_manager logger = structlog.get_logger(__name__) # ============================================================================= # Configuration # ============================================================================= class AutoApproveReason(str, Enum): """自動執行/拒絕原因""" # 自動執行 PLAYBOOK_MATCH = "playbook_match" # Playbook 匹配成功 TRUST_SCORE = "trust_score" # 信任分數達標 LOW_RISK = "low_risk" # 低風險操作 # 拒絕自動執行 HIGH_RISK = "high_risk" # 風險過高 CRITICAL_OPERATION = "critical_operation" # 關鍵操作 LOW_TRUST = "low_trust" # 信任不足 NO_PLAYBOOK = "no_playbook" # 無匹配 Playbook NO_EXECUTABLE_ACTION = "no_executable_action" # action 為自然語言,無法執行 LOW_SUCCESS_RATE = "low_success_rate" # Playbook 成功率不足 INSUFFICIENT_HISTORY = "insufficient_history" # 執行歷史不足 @dataclass class AutoApproveConfig: """自動執行配置""" # 風險等級閾值 # 2026-04-11 Claude Sonnet 4.6: ADR-070 全自動化方向 — low/medium/high 全開放 # 真正需要人工的由 DESTRUCTIVE_PATTERNS 攔截(scale=0, delete, drop) # 原: ["low", "medium"] → 導致所有 high risk 告警永遠走人工審核 allowed_risk_levels: list[str] = field( default_factory=lambda: ["low", "medium", "high"] ) # 信任度閾值 # 2026-04-10 Claude Sonnet 4.6: trust_score 是 in-memory,Pod 重啟歸零 # → 改為 0,讓 medium risk + confidence >= 0.65 的操作直接自動執行 # 歷史原因: min_trust_score=1 導致所有告警永遠走審批,從未自動修復 min_trust_score: int = 0 # 不要求執行歷史 (原: 1) # 2026-04-11 Claude Sonnet 4.6: ADR-070 全自動化 — 0.5 即可執行 # 真正風險由 DESTRUCTIVE_PATTERNS + risk_level=critical 把關 min_confidence: float = 0.50 # AI 有基本把握即可 (原: 0.90, 後: 0.65) # Playbook 閾值 # 2026-04-01 ogt: 降低啟動門檻,1次成功記錄即可 min_playbook_success_rate: float = 0.80 # 成功率 >= 80% (原: 95%) min_playbook_success_count: int = 1 # 成功次數 >= 1 (原: 3) # 功能開關 enabled: bool = True # 總開關 require_playbook: bool = False # 不強制要求 Playbook (原: True),Gemini 分析直接可執行 audit_all: bool = True # 是否記錄所有判斷 # 預設配置 (實用策略 - 2026-04-01 ogt Phase 22 P1) DEFAULT_CONFIG = AutoApproveConfig() # ============================================================================= # 破壞性指令攔截清單 (ADR-070, 2026-04-11 Claude Sonnet 4.6) # C3+M1 修復 (Code Review 2026-04-11): 移至模組常量 + 補全 K8s/Docker 高風險操作 # 原則: 可恢復操作 → 自動執行; 不可逆 / 業務衝擊 → 人工確認 # ============================================================================= _DESTRUCTIVE_PATTERNS: list[str] = [ # --- Scale to zero (停機) --- "--replicas=0", # kubectl scale --replicas=0 '"replicas": 0', # kubectl patch JSON patch "'replicas': 0", # kubectl patch 單引號變體 "replicas=0", # 任何形式的 replicas=0 # --- K8s 刪除操作 --- "delete pod --all", # 批次刪除 pod "delete pod -A", # 跨 namespace 刪除 pod "delete pod --all-namespaces", "delete pods", # 複數形式 "delete deployment", # 刪除 deployment "delete pvc", # 刪除 PVC (資料丟失) "delete namespace", # 刪除 namespace "kubectl drain", # 驅逐節點所有 pod "kubectl cordon", # 封鎖節點(業務影響) "kubectl rollout undo", # 回滾部署(需人工確認版本) # --- Docker 破壞性操作 --- "docker rm", # 刪除容器 "docker stop", # 停止容器(不同於 restart) "docker kill", # 強制殺死容器 # --- DB DDL (不可逆) --- "drop table", "drop database", "truncate table", # --- SSH 危險指令 --- "rm -rf", # 遞迴刪除 "rm -f /", # 刪除根目錄 ] # ============================================================================= # Data Models # ============================================================================= @dataclass class AutoApproveDecision: """自動執行決策結果""" should_auto_approve: bool reason: AutoApproveReason reason_detail: str # 判斷依據 risk_level: str trust_score: int confidence: float playbook_match: PlaybookMatch | None = None playbook_success_rate: float | None = None playbook_success_count: int | None = None # 時間戳 decided_at: datetime = field(default_factory=lambda: datetime.now(UTC)) def to_dict(self) -> dict[str, Any]: return { "should_auto_approve": self.should_auto_approve, "reason": self.reason.value, "reason_detail": self.reason_detail, "risk_level": self.risk_level, "trust_score": self.trust_score, "confidence": self.confidence, "playbook_match": self.playbook_match.to_dict() if self.playbook_match else None, "playbook_success_rate": self.playbook_success_rate, "playbook_success_count": self.playbook_success_count, "decided_at": self.decided_at.isoformat(), } def to_audit_log(self) -> str: """生成審計日誌""" status = "AUTO_APPROVED" if self.should_auto_approve else "REQUIRES_HUMAN" return ( f"[{status}] {self.reason.value}: {self.reason_detail} " f"(risk={self.risk_level}, trust={self.trust_score}, conf={self.confidence:.0%})" ) # ============================================================================= # Auto-Approve Policy # ============================================================================= class AutoApprovePolicy: """ 自動執行策略 判斷提案是否可以跳過人工審核直接執行 核心原則: - CRITICAL 永遠不自動執行 - 必須有足夠的歷史成功記錄 - 信任度達標 - 風險等級為 LOW """ def __init__( self, config: AutoApproveConfig | None = None, trust_manager: TrustScoreManager | None = None, ): self.config = config or DEFAULT_CONFIG self._trust_manager = trust_manager @property def trust_manager(self) -> TrustScoreManager: """Lazy load trust manager""" if self._trust_manager is None: self._trust_manager = get_trust_manager() return self._trust_manager def evaluate( self, proposal_data: dict[str, Any], playbook: Playbook | None = None, playbook_match: PlaybookMatch | None = None, ) -> AutoApproveDecision: """ 評估提案是否可自動執行 Args: proposal_data: 提案資料 (含 risk_level, confidence, action 等) playbook: 匹配的 Playbook (可選) playbook_match: RAG 匹配結果 (可選) Returns: AutoApproveDecision 包含決策結果和原因 """ # 基本資訊 risk_level = proposal_data.get("risk_level", "medium").lower() confidence = proposal_data.get("confidence", 0.0) # 🔴 無信心度=規則匹配 action = proposal_data.get("action", "") or proposal_data.get("kubectl_command", "") action_pattern = self._extract_action_pattern(action) # 取得信任分數 trust_record = self.trust_manager.get_trust_record(action_pattern) trust_score = trust_record.score if trust_record else 0 # Playbook 資訊 playbook_success_rate = playbook.success_rate if playbook else None playbook_success_count = playbook.success_count if playbook else None # ========== 檢查條件 ========== # 條件 0: 功能是否啟用 if not self.config.enabled: return self._reject( reason=AutoApproveReason.LOW_TRUST, detail="Auto-approve is disabled", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 1: CRITICAL 永遠不自動執行 if risk_level == "critical": return self._reject( reason=AutoApproveReason.CRITICAL_OPERATION, detail="CRITICAL operations always require human approval", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 1b: structured action parser 安全閘 (SPF-2, 2026-04-30) # kubectl 指令以 token grammar 判斷,避免 substring regex 誤殺 # `kubectl delete pod `,同時攔截 delete deployment / # delete --all / rollout undo / replicas=0 / shell injection。 action_stripped = action.strip() action_lower = action_stripped.lower() kubectl_cmd_raw = str(proposal_data.get("kubectl_command", "") or "").strip() kubectl_candidate = kubectl_cmd_raw if not kubectl_candidate and "kubectl" in action_lower: kubectl_candidate = action_stripped[action_lower.index("kubectl"):].strip() if kubectl_candidate.lower().startswith("kubectl"): parsed_action = parse_kubectl_action(kubectl_candidate) if not parsed_action.ok: return self._reject( reason=AutoApproveReason.CRITICAL_OPERATION, detail=f"kubectl action parser rejected action: {parsed_action.reason} — requires human approval", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) else: for pattern in _DESTRUCTIVE_PATTERNS: if pattern in action_lower: return self._reject( reason=AutoApproveReason.CRITICAL_OPERATION, detail=f"Destructive pattern detected: '{pattern}' in action — requires human approval", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 1c: 無可執行指令 → 拒絕自動執行(2026-04-16 ogt + Claude Sonnet 4.6) # 根因:INVALID_TARGET 導致 rule engine 清空 kubectl_command,action 為空 # 原本繼續走 auto_approve 流程,系統誤報「即將執行」但實際無指令 # 修復:action 為空字串時直接拒絕,強制 SRE 人工確認 if not action.strip(): return self._reject( reason=AutoApproveReason.NO_PLAYBOOK, detail="No executable action/kubectl_command — INVALID_TARGET or NO_ACTION, requires human review", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 1d: 自然語言描述不可自動執行(2026-04-17 ogt + Claude Sonnet 4.6) # 根因:Solver 經 OpenClaw Nemo 路徑輸出「重啟 Crash Looping Pod」等自然語言 # action 非空 → 條件 1c 通過 → auto_approved=True # 但 kubectl_command 為空 → 實際無法執行 → incident 卡在 investigating # 修復:直接讀 proposal_data["action"] 原始值(非 fallback 後的 action 變數) # 避免「action 空 → fallback 成 kubectl_command → action 含 kubectl → 誤放行」 # Code Review 2026-04-17: P0-1 修正 action fallback 語意混淆 # P1-2 改用 NO_EXECUTABLE_ACTION(避免污染 KM 飛輪學習資料) _raw_action = proposal_data.get("action", "") or "" _kubectl_cmd = proposal_data.get("kubectl_command", "") or "" # 2026-04-27 Claude Sonnet 4.6: 擴充可執行指令識別,加入 SSH 診斷路徑 # 根因:_has_kubectl 只認 kubectl,SSH 診斷指令(主機告警)被全部攔截 → 飛輪停轉 # 修復:ssh {host} '...' 格式也是可執行指令,允許走 _ssh_execute() 路徑 _has_executable = ( "kubectl" in _raw_action.lower() or "kubectl" in _kubectl_cmd.lower() or _raw_action.lower().strip().startswith("ssh ") or _kubectl_cmd.lower().strip().startswith("ssh ") ) if not _has_executable: return self._reject( reason=AutoApproveReason.NO_EXECUTABLE_ACTION, detail=f"Action '{_raw_action[:60] or _kubectl_cmd[:60]}' is natural language — no kubectl/ssh command, requires human review", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 2: 風險等級必須在允許列表中 if risk_level not in self.config.allowed_risk_levels: return self._reject( reason=AutoApproveReason.HIGH_RISK, detail=f"Risk level '{risk_level}' not in allowed list {self.config.allowed_risk_levels}", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 3: 信任分數 if trust_score < self.config.min_trust_score: return self._reject( reason=AutoApproveReason.LOW_TRUST, detail=f"Trust score {trust_score} < {self.config.min_trust_score}", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 4: AI 信心度 # 2026-04-15 Claude Sonnet 4.6 (飛輪沉默節點 1 修復): # 規則匹配的 confidence 固定 0.0(ADR-073 防偽造),會被此條件擋下 # 但 YAML 規則是人工審核過的,應直接信任 → bypass min_confidence # 改用「Playbook 成功率」或「規則 source」判斷可信度 _is_rule_based = ( proposal_data.get("is_rule_based") is True or proposal_data.get("source") == "expert_system" or (proposal_data.get("rule_id") or "") != "" or (proposal_data.get("matched_rule") or "") != "" # 2026-04-24 ogt + Claude Sonnet 4.6: Phase 2 五 agent 協作輸出 bypass confidence 閾值 # 根因:phase2_agent_debate 的 is_rule_based=False + confidence 低 → 被誤攔截 # 修法:識別 phase2_agent_debate source,視同規則可信路徑 or (proposal_data.get("source") or "").startswith("phase2_agent_debate") # 2026-04-27 Wave8-B3 by Claude — fusion 三斷鏈修復: # P2.1 fusion composite > 0.7 → auto_execute_eligible,bypass min_confidence 閾值 # auto_execute_eligible 是 FusionScore.to_dict() 的 bool 欄位 or ( proposal_data.get("decision_fusion", {}).get("auto_execute_eligible") is True ) # 2026-04-27 Wave8-B5 by Claude — Consensus auto_approve 不認修復: # source=consensus_engine + consensus_score >= 0.6 → 視同規則可信路徑 or ( proposal_data.get("source") == "consensus_engine" and float(proposal_data.get("consensus_score", 0)) >= 0.6 ) ) if not _is_rule_based and confidence < self.config.min_confidence: return self._reject( reason=AutoApproveReason.LOW_TRUST, detail=f"Confidence {confidence:.0%} < {self.config.min_confidence:.0%}", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 5: Playbook 匹配 (如果要求) if self.config.require_playbook: if playbook is None: return self._reject( reason=AutoApproveReason.NO_PLAYBOOK, detail="No matching Playbook found", risk_level=risk_level, trust_score=trust_score, confidence=confidence, ) # 條件 6: Playbook 成功率 if playbook_success_rate is not None: if playbook_success_rate < self.config.min_playbook_success_rate: return self._reject( reason=AutoApproveReason.LOW_SUCCESS_RATE, detail=f"Playbook success rate {playbook_success_rate:.0%} < {self.config.min_playbook_success_rate:.0%}", risk_level=risk_level, trust_score=trust_score, confidence=confidence, playbook_match=playbook_match, playbook_success_rate=playbook_success_rate, playbook_success_count=playbook_success_count, ) # 條件 7: Playbook 成功次數 if playbook_success_count is not None: if playbook_success_count < self.config.min_playbook_success_count: return self._reject( reason=AutoApproveReason.INSUFFICIENT_HISTORY, detail=f"Playbook success count {playbook_success_count} < {self.config.min_playbook_success_count}", risk_level=risk_level, trust_score=trust_score, confidence=confidence, playbook_match=playbook_match, playbook_success_rate=playbook_success_rate, playbook_success_count=playbook_success_count, ) # ========== 所有條件通過 ========== return self._approve( reason=AutoApproveReason.PLAYBOOK_MATCH if playbook else AutoApproveReason.TRUST_SCORE, detail=f"All conditions met: risk={risk_level}, trust={trust_score}, confidence={confidence:.0%}", risk_level=risk_level, trust_score=trust_score, confidence=confidence, playbook_match=playbook_match, playbook_success_rate=playbook_success_rate, playbook_success_count=playbook_success_count, ) def _approve( self, reason: AutoApproveReason, detail: str, **kwargs, ) -> AutoApproveDecision: """建立自動執行決策""" decision = AutoApproveDecision( should_auto_approve=True, reason=reason, reason_detail=detail, **kwargs, ) if self.config.audit_all: logger.info( "auto_approve_decision", approved=True, reason=reason.value, detail=detail, trust_score=kwargs.get("trust_score"), ) return decision def _reject( self, reason: AutoApproveReason, detail: str, **kwargs, ) -> AutoApproveDecision: """建立拒絕自動執行決策""" decision = AutoApproveDecision( should_auto_approve=False, reason=reason, reason_detail=detail, **kwargs, ) if self.config.audit_all: logger.debug( "auto_approve_decision", approved=False, reason=reason.value, detail=detail, trust_score=kwargs.get("trust_score"), ) # 記錄拒絕原因計數(供系統報告分析人工審核積壓根因) # 在 async context 中呼叫,用 create_task 不阻塞主流程 try: import asyncio as _asyncio from datetime import datetime as _dt _today = _dt.now().strftime("%Y%m%d") _reject_key = f"stats:auto_approve_rejected:{reason.value}:{_today}" async def _incr_reject_stat() -> None: try: from src.core.redis_client import get_redis as _get_redis _r = _get_redis() await _r.incr(_reject_key) await _r.expire(_reject_key, 86400 * 7) except Exception: pass # Redis 不可用時靜默降級,不影響核心流程 loop = _asyncio.get_running_loop() loop.create_task(_incr_reject_stat()) except RuntimeError: pass # 非 async context(如單元測試),靜默跳過 return decision def _extract_action_pattern(self, action: str) -> str: """ 從 action 字串提取 pattern 例如: - "kubectl rollout restart deployment/awoooi-api" → "rollout_restart:awoooi-api" - "kubectl scale deployment/nginx --replicas=3" → "scale:nginx" """ if not action: return "unknown" parts = action.split() if len(parts) < 3: return "unknown" # kubectl / verb = parts[1] if len(parts) > 1 else "unknown" resource_part = parts[2] if len(parts) > 2 else "" if "/" in resource_part: resource_name = resource_part.split("/")[-1] else: resource_name = resource_part # 移除可能的選項 resource_name = resource_name.split()[0] if " " in resource_name else resource_name return f"{verb}:{resource_name}" # ============================================================================= # Singleton # ============================================================================= _auto_approve_policy: AutoApprovePolicy | None = None def get_auto_approve_policy() -> AutoApprovePolicy: """取得自動執行策略 singleton""" global _auto_approve_policy if _auto_approve_policy is None: _auto_approve_policy = AutoApprovePolicy() return _auto_approve_policy