From ce7f8a1b2377a3f464e829ccc8728c6d75df005d Mon Sep 17 00:00:00 2001 From: OG T Date: Thu, 26 Mar 2026 22:13:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20ADR-030=20Phase=204=20=E8=87=AA?= =?UTF-8?q?=E5=8B=95=E5=9F=B7=E8=A1=8C=E6=A9=9F=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 實作低風險操作自動執行策略: 1. auto_approve.py - 自動執行策略服務 - AutoApprovePolicy: 評估是否可自動執行 - 條件: LOW 風險 + 信任分數 >= 5 + Playbook 成功率 >= 95% - CRITICAL 永遠不自動執行 - 完整審計追蹤 2. trust_engine.py - 新增 singleton - get_trust_manager(): 取得全域 TrustScoreManager 3. decision_manager.py - 整合自動執行 (Tier 3 紅區) - Step 5 加入 AutoApprovePolicy 判斷 - 條件滿足時跳過 Telegram,直接執行 - _auto_execute(): 自動執行邏輯 - 失敗時 fallback 到人工審核 流程: Incident → 分析 → AutoApprovePolicy 評估 ├─ 可自動執行 → 直接執行 → 完成 └─ 需人工審核 → Telegram 通知 → 等待批准 Co-Authored-By: Claude Opus 4.5 --- apps/api/src/services/auto_approve.py | 391 ++++++++++++++++++++++ apps/api/src/services/decision_manager.py | 83 ++++- apps/api/src/services/trust_engine.py | 5 + 3 files changed, 475 insertions(+), 4 deletions(-) create mode 100644 apps/api/src/services/auto_approve.py diff --git a/apps/api/src/services/auto_approve.py b/apps/api/src/services/auto_approve.py new file mode 100644 index 00000000..c4519ba1 --- /dev/null +++ b/apps/api/src/services/auto_approve.py @@ -0,0 +1,391 @@ +""" +Auto-Approve Service - Phase 4 自動執行策略 +========================================== +ADR-030: 智能自動修復系統 + +自動執行條件 (全部滿足才放行): +1. 風險等級 = LOW +2. 信任度 >= 90% (或 TrustEngine score >= 5) +3. 有匹配的 Playbook 且成功率 >= 95% +4. Playbook 成功執行次數 >= 3 + +設計原則: +- 保守策略 (寧可人工審核,不可錯誤自動執行) +- 完整審計追蹤 +- CRITICAL 永遠不自動執行 + +版本: v1.0 +建立: 2026-03-26 (台北時區) +""" + +from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +import structlog + +from src.models.approval import RiskLevel +from src.models.playbook import Playbook +from src.services.playbook_rag import PlaybookMatch +from src.services.trust_engine import TrustScoreManager, get_trust_manager + +logger = structlog.get_logger(__name__) + + +# ============================================================================= +# Configuration +# ============================================================================= + + +class AutoApproveReason(str, Enum): + """自動執行/拒絕原因""" + + # 自動執行 + PLAYBOOK_MATCH = "playbook_match" # Playbook 匹配成功 + TRUST_SCORE = "trust_score" # 信任分數達標 + LOW_RISK = "low_risk" # 低風險操作 + + # 拒絕自動執行 + HIGH_RISK = "high_risk" # 風險過高 + CRITICAL_OPERATION = "critical_operation" # 關鍵操作 + LOW_TRUST = "low_trust" # 信任不足 + NO_PLAYBOOK = "no_playbook" # 無匹配 Playbook + LOW_SUCCESS_RATE = "low_success_rate" # Playbook 成功率不足 + INSUFFICIENT_HISTORY = "insufficient_history" # 執行歷史不足 + + +@dataclass +class AutoApproveConfig: + """自動執行配置""" + + # 風險等級閾值 + allowed_risk_levels: list[str] = field( + default_factory=lambda: ["low"] + ) + + # 信任度閾值 + min_trust_score: int = 5 # TrustEngine 分數閾值 + min_confidence: float = 0.90 # AI 信心度閾值 + + # Playbook 閾值 + min_playbook_success_rate: float = 0.95 # 成功率 >= 95% + min_playbook_success_count: int = 3 # 成功次數 >= 3 + + # 功能開關 + enabled: bool = True # 總開關 + require_playbook: bool = True # 是否必須有 Playbook 匹配 + audit_all: bool = True # 是否記錄所有判斷 + + +# 預設配置 (保守策略) +DEFAULT_CONFIG = AutoApproveConfig() + + +# ============================================================================= +# Data Models +# ============================================================================= + + +@dataclass +class AutoApproveDecision: + """自動執行決策結果""" + + should_auto_approve: bool + reason: AutoApproveReason + reason_detail: str + + # 判斷依據 + risk_level: str + trust_score: int + confidence: float + playbook_match: PlaybookMatch | None = None + playbook_success_rate: float | None = None + playbook_success_count: int | None = None + + # 時間戳 + decided_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + + def to_dict(self) -> dict[str, Any]: + return { + "should_auto_approve": self.should_auto_approve, + "reason": self.reason.value, + "reason_detail": self.reason_detail, + "risk_level": self.risk_level, + "trust_score": self.trust_score, + "confidence": self.confidence, + "playbook_match": self.playbook_match.to_dict() if self.playbook_match else None, + "playbook_success_rate": self.playbook_success_rate, + "playbook_success_count": self.playbook_success_count, + "decided_at": self.decided_at.isoformat(), + } + + def to_audit_log(self) -> str: + """生成審計日誌""" + status = "AUTO_APPROVED" if self.should_auto_approve else "REQUIRES_HUMAN" + return ( + f"[{status}] {self.reason.value}: {self.reason_detail} " + f"(risk={self.risk_level}, trust={self.trust_score}, conf={self.confidence:.0%})" + ) + + +# ============================================================================= +# Auto-Approve Policy +# ============================================================================= + + +class AutoApprovePolicy: + """ + 自動執行策略 + + 判斷提案是否可以跳過人工審核直接執行 + + 核心原則: + - CRITICAL 永遠不自動執行 + - 必須有足夠的歷史成功記錄 + - 信任度達標 + - 風險等級為 LOW + """ + + def __init__( + self, + config: AutoApproveConfig | None = None, + trust_manager: TrustScoreManager | None = None, + ): + self.config = config or DEFAULT_CONFIG + self._trust_manager = trust_manager + + @property + def trust_manager(self) -> TrustScoreManager: + """Lazy load trust manager""" + if self._trust_manager is None: + self._trust_manager = get_trust_manager() + return self._trust_manager + + def evaluate( + self, + proposal_data: dict[str, Any], + playbook: Playbook | None = None, + playbook_match: PlaybookMatch | None = None, + ) -> AutoApproveDecision: + """ + 評估提案是否可自動執行 + + Args: + proposal_data: 提案資料 (含 risk_level, confidence, action 等) + playbook: 匹配的 Playbook (可選) + playbook_match: RAG 匹配結果 (可選) + + Returns: + AutoApproveDecision 包含決策結果和原因 + """ + # 基本資訊 + risk_level = proposal_data.get("risk_level", "medium").lower() + confidence = proposal_data.get("confidence", 0.5) + action = proposal_data.get("action", "") or proposal_data.get("kubectl_command", "") + action_pattern = self._extract_action_pattern(action) + + # 取得信任分數 + trust_record = self.trust_manager.get_trust_record(action_pattern) + trust_score = trust_record.score if trust_record else 0 + + # Playbook 資訊 + playbook_success_rate = playbook.success_rate if playbook else None + playbook_success_count = playbook.success_count if playbook else None + + # ========== 檢查條件 ========== + + # 條件 0: 功能是否啟用 + if not self.config.enabled: + return self._reject( + reason=AutoApproveReason.LOW_TRUST, + detail="Auto-approve is disabled", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + ) + + # 條件 1: CRITICAL 永遠不自動執行 + if risk_level == "critical": + return self._reject( + reason=AutoApproveReason.CRITICAL_OPERATION, + detail="CRITICAL operations always require human approval", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + ) + + # 條件 2: 風險等級必須在允許列表中 + if risk_level not in self.config.allowed_risk_levels: + return self._reject( + reason=AutoApproveReason.HIGH_RISK, + detail=f"Risk level '{risk_level}' not in allowed list {self.config.allowed_risk_levels}", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + ) + + # 條件 3: 信任分數 + if trust_score < self.config.min_trust_score: + return self._reject( + reason=AutoApproveReason.LOW_TRUST, + detail=f"Trust score {trust_score} < {self.config.min_trust_score}", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + ) + + # 條件 4: AI 信心度 + if confidence < self.config.min_confidence: + return self._reject( + reason=AutoApproveReason.LOW_TRUST, + detail=f"Confidence {confidence:.0%} < {self.config.min_confidence:.0%}", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + ) + + # 條件 5: Playbook 匹配 (如果要求) + if self.config.require_playbook: + if playbook is None: + return self._reject( + reason=AutoApproveReason.NO_PLAYBOOK, + detail="No matching Playbook found", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + ) + + # 條件 6: Playbook 成功率 + if playbook_success_rate is not None: + if playbook_success_rate < self.config.min_playbook_success_rate: + return self._reject( + reason=AutoApproveReason.LOW_SUCCESS_RATE, + detail=f"Playbook success rate {playbook_success_rate:.0%} < {self.config.min_playbook_success_rate:.0%}", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + playbook_match=playbook_match, + playbook_success_rate=playbook_success_rate, + playbook_success_count=playbook_success_count, + ) + + # 條件 7: Playbook 成功次數 + if playbook_success_count is not None: + if playbook_success_count < self.config.min_playbook_success_count: + return self._reject( + reason=AutoApproveReason.INSUFFICIENT_HISTORY, + detail=f"Playbook success count {playbook_success_count} < {self.config.min_playbook_success_count}", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + playbook_match=playbook_match, + playbook_success_rate=playbook_success_rate, + playbook_success_count=playbook_success_count, + ) + + # ========== 所有條件通過 ========== + return self._approve( + reason=AutoApproveReason.PLAYBOOK_MATCH if playbook else AutoApproveReason.TRUST_SCORE, + detail=f"All conditions met: risk={risk_level}, trust={trust_score}, confidence={confidence:.0%}", + risk_level=risk_level, + trust_score=trust_score, + confidence=confidence, + playbook_match=playbook_match, + playbook_success_rate=playbook_success_rate, + playbook_success_count=playbook_success_count, + ) + + def _approve( + self, + reason: AutoApproveReason, + detail: str, + **kwargs, + ) -> AutoApproveDecision: + """建立自動執行決策""" + decision = AutoApproveDecision( + should_auto_approve=True, + reason=reason, + reason_detail=detail, + **kwargs, + ) + + if self.config.audit_all: + logger.info( + "auto_approve_decision", + approved=True, + reason=reason.value, + detail=detail, + trust_score=kwargs.get("trust_score"), + ) + + return decision + + def _reject( + self, + reason: AutoApproveReason, + detail: str, + **kwargs, + ) -> AutoApproveDecision: + """建立拒絕自動執行決策""" + decision = AutoApproveDecision( + should_auto_approve=False, + reason=reason, + reason_detail=detail, + **kwargs, + ) + + if self.config.audit_all: + logger.debug( + "auto_approve_decision", + approved=False, + reason=reason.value, + detail=detail, + trust_score=kwargs.get("trust_score"), + ) + + return decision + + def _extract_action_pattern(self, action: str) -> str: + """ + 從 action 字串提取 pattern + + 例如: + - "kubectl rollout restart deployment/awoooi-api" → "rollout_restart:awoooi-api" + - "kubectl scale deployment/nginx --replicas=3" → "scale:nginx" + """ + if not action: + return "unknown" + + parts = action.split() + if len(parts) < 3: + return "unknown" + + # kubectl / + verb = parts[1] if len(parts) > 1 else "unknown" + resource_part = parts[2] if len(parts) > 2 else "" + + if "/" in resource_part: + resource_name = resource_part.split("/")[-1] + else: + resource_name = resource_part + + # 移除可能的選項 + resource_name = resource_name.split()[0] if " " in resource_name else resource_name + + return f"{verb}:{resource_name}" + + +# ============================================================================= +# Singleton +# ============================================================================= + +_auto_approve_policy: AutoApprovePolicy | None = None + + +def get_auto_approve_policy() -> AutoApprovePolicy: + """取得自動執行策略 singleton""" + global _auto_approve_policy + if _auto_approve_policy is None: + _auto_approve_policy = AutoApprovePolicy() + return _auto_approve_policy diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 4f0c0143..54854818 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -31,6 +31,7 @@ from src.core.config import settings from src.core.redis_client import get_redis from src.models.incident import Incident from src.models.playbook import SymptomPattern +from src.services.auto_approve import get_auto_approve_policy from src.services.openclaw import get_openclaw from src.services.playbook_service import get_playbook_service @@ -422,15 +423,89 @@ class DecisionManager: # 4. 儲存最終結果 await self._save_token(token) - # 5. Phase 6.5: 推送到 Telegram (非阻塞) + # 5. ADR-030 Phase 4: 自動執行判斷 if token.state == DecisionState.READY and token.proposal_data: - # 使用 asyncio.create_task 非阻塞執行 + # 評估是否可以自動執行 + auto_policy = get_auto_approve_policy() + auto_decision = auto_policy.evaluate( + proposal_data=token.proposal_data, + playbook=token.proposal_data.get("_matched_playbook"), # 如果有 + ) + + if auto_decision.should_auto_approve: + # 自動執行 (跳過人工審核) + logger.info( + "auto_approve_triggered", + incident_id=incident.incident_id, + reason=auto_decision.reason.value, + detail=auto_decision.reason_detail, + ) + token.state = DecisionState.EXECUTING + token.proposal_data["auto_approved"] = True + token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail + await self._save_token(token) + + # 觸發自動執行 (非阻塞) + asyncio.create_task( + self._auto_execute(incident, token) + ) + else: + # 需人工審核: 推送到 Telegram + asyncio.create_task( + _push_decision_to_telegram(incident, token.proposal_data) + ) + + return token + + async def _auto_execute(self, incident: Incident, token: "DecisionToken") -> None: + """ + ADR-030 Phase 4: 自動執行已批准的操作 + + 僅當 AutoApprovePolicy 判斷可自動執行時呼叫 + """ + try: + # 延遲導入避免循環依賴 + from src.services.approval_execution import ApprovalExecutionService + from src.models.approval import ApprovalRequest, ApprovalStatus + + # 建立虛擬 ApprovalRequest + approval = ApprovalRequest( + incident_id=incident.incident_id, + action=token.proposal_data.get("kubectl_command", ""), + status=ApprovalStatus.APPROVED, + risk_level=token.proposal_data.get("risk_level", "low"), + ) + + # 執行 + executor = ApprovalExecutionService() + await executor.execute_approved_action(approval) + + # 更新狀態 + token.state = DecisionState.COMPLETED + token.proposal_data["auto_executed"] = True + await self._save_token(token) + + logger.info( + "auto_execute_completed", + incident_id=incident.incident_id, + action=approval.action, + ) + + except Exception as e: + logger.error( + "auto_execute_failed", + incident_id=incident.incident_id, + error=str(e), + ) + token.state = DecisionState.ERROR + token.error = f"Auto-execute failed: {e}" + await self._save_token(token) + + # 失敗時 fallback 到人工審核 asyncio.create_task( _push_decision_to_telegram(incident, token.proposal_data) ) - return token - async def _dual_engine_analyze( self, incident: Incident, diff --git a/apps/api/src/services/trust_engine.py b/apps/api/src/services/trust_engine.py index b58ea923..b643adbf 100644 --- a/apps/api/src/services/trust_engine.py +++ b/apps/api/src/services/trust_engine.py @@ -415,3 +415,8 @@ def normalize_action_pattern( # 全域實例 trust_engine = TrustScoreManager() + + +def get_trust_manager() -> TrustScoreManager: + """取得 TrustScoreManager singleton (ADR-030 Phase 4)""" + return trust_engine