feat(api): ADR-030 Phase 4 自動執行機制

實作低風險操作自動執行策略:

1. auto_approve.py - 自動執行策略服務
   - AutoApprovePolicy: 評估是否可自動執行
   - 條件: LOW 風險 + 信任分數 >= 5 + Playbook 成功率 >= 95%
   - CRITICAL 永遠不自動執行
   - 完整審計追蹤

2. trust_engine.py - 新增 singleton
   - get_trust_manager(): 取得全域 TrustScoreManager

3. decision_manager.py - 整合自動執行 (Tier 3 紅區)
   - Step 5 加入 AutoApprovePolicy 判斷
   - 條件滿足時跳過 Telegram,直接執行
   - _auto_execute(): 自動執行邏輯
   - 失敗時 fallback 到人工審核

流程:
Incident → 分析 → AutoApprovePolicy 評估
  ├─ 可自動執行 → 直接執行 → 完成
  └─ 需人工審核 → Telegram 通知 → 等待批准

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-26 22:13:10 +08:00
parent 17ee8838be
commit ce7f8a1b23
3 changed files with 475 additions and 4 deletions

View File

@@ -0,0 +1,391 @@
"""
Auto-Approve Service - Phase 4 自動執行策略
==========================================
ADR-030: 智能自動修復系統
自動執行條件 (全部滿足才放行):
1. 風險等級 = LOW
2. 信任度 >= 90% (或 TrustEngine score >= 5)
3. 有匹配的 Playbook 且成功率 >= 95%
4. Playbook 成功執行次數 >= 3
設計原則:
- 保守策略 (寧可人工審核,不可錯誤自動執行)
- 完整審計追蹤
- CRITICAL 永遠不自動執行
版本: v1.0
建立: 2026-03-26 (台北時區)
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import structlog
from src.models.approval import RiskLevel
from src.models.playbook import Playbook
from src.services.playbook_rag import PlaybookMatch
from src.services.trust_engine import TrustScoreManager, get_trust_manager
logger = structlog.get_logger(__name__)
# =============================================================================
# Configuration
# =============================================================================
class AutoApproveReason(str, Enum):
"""自動執行/拒絕原因"""
# 自動執行
PLAYBOOK_MATCH = "playbook_match" # Playbook 匹配成功
TRUST_SCORE = "trust_score" # 信任分數達標
LOW_RISK = "low_risk" # 低風險操作
# 拒絕自動執行
HIGH_RISK = "high_risk" # 風險過高
CRITICAL_OPERATION = "critical_operation" # 關鍵操作
LOW_TRUST = "low_trust" # 信任不足
NO_PLAYBOOK = "no_playbook" # 無匹配 Playbook
LOW_SUCCESS_RATE = "low_success_rate" # Playbook 成功率不足
INSUFFICIENT_HISTORY = "insufficient_history" # 執行歷史不足
@dataclass
class AutoApproveConfig:
"""自動執行配置"""
# 風險等級閾值
allowed_risk_levels: list[str] = field(
default_factory=lambda: ["low"]
)
# 信任度閾值
min_trust_score: int = 5 # TrustEngine 分數閾值
min_confidence: float = 0.90 # AI 信心度閾值
# Playbook 閾值
min_playbook_success_rate: float = 0.95 # 成功率 >= 95%
min_playbook_success_count: int = 3 # 成功次數 >= 3
# 功能開關
enabled: bool = True # 總開關
require_playbook: bool = True # 是否必須有 Playbook 匹配
audit_all: bool = True # 是否記錄所有判斷
# 預設配置 (保守策略)
DEFAULT_CONFIG = AutoApproveConfig()
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class AutoApproveDecision:
"""自動執行決策結果"""
should_auto_approve: bool
reason: AutoApproveReason
reason_detail: str
# 判斷依據
risk_level: str
trust_score: int
confidence: float
playbook_match: PlaybookMatch | None = None
playbook_success_rate: float | None = None
playbook_success_count: int | None = None
# 時間戳
decided_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"should_auto_approve": self.should_auto_approve,
"reason": self.reason.value,
"reason_detail": self.reason_detail,
"risk_level": self.risk_level,
"trust_score": self.trust_score,
"confidence": self.confidence,
"playbook_match": self.playbook_match.to_dict() if self.playbook_match else None,
"playbook_success_rate": self.playbook_success_rate,
"playbook_success_count": self.playbook_success_count,
"decided_at": self.decided_at.isoformat(),
}
def to_audit_log(self) -> str:
"""生成審計日誌"""
status = "AUTO_APPROVED" if self.should_auto_approve else "REQUIRES_HUMAN"
return (
f"[{status}] {self.reason.value}: {self.reason_detail} "
f"(risk={self.risk_level}, trust={self.trust_score}, conf={self.confidence:.0%})"
)
# =============================================================================
# Auto-Approve Policy
# =============================================================================
class AutoApprovePolicy:
"""
自動執行策略
判斷提案是否可以跳過人工審核直接執行
核心原則:
- CRITICAL 永遠不自動執行
- 必須有足夠的歷史成功記錄
- 信任度達標
- 風險等級為 LOW
"""
def __init__(
self,
config: AutoApproveConfig | None = None,
trust_manager: TrustScoreManager | None = None,
):
self.config = config or DEFAULT_CONFIG
self._trust_manager = trust_manager
@property
def trust_manager(self) -> TrustScoreManager:
"""Lazy load trust manager"""
if self._trust_manager is None:
self._trust_manager = get_trust_manager()
return self._trust_manager
def evaluate(
self,
proposal_data: dict[str, Any],
playbook: Playbook | None = None,
playbook_match: PlaybookMatch | None = None,
) -> AutoApproveDecision:
"""
評估提案是否可自動執行
Args:
proposal_data: 提案資料 (含 risk_level, confidence, action 等)
playbook: 匹配的 Playbook (可選)
playbook_match: RAG 匹配結果 (可選)
Returns:
AutoApproveDecision 包含決策結果和原因
"""
# 基本資訊
risk_level = proposal_data.get("risk_level", "medium").lower()
confidence = proposal_data.get("confidence", 0.5)
action = proposal_data.get("action", "") or proposal_data.get("kubectl_command", "")
action_pattern = self._extract_action_pattern(action)
# 取得信任分數
trust_record = self.trust_manager.get_trust_record(action_pattern)
trust_score = trust_record.score if trust_record else 0
# Playbook 資訊
playbook_success_rate = playbook.success_rate if playbook else None
playbook_success_count = playbook.success_count if playbook else None
# ========== 檢查條件 ==========
# 條件 0: 功能是否啟用
if not self.config.enabled:
return self._reject(
reason=AutoApproveReason.LOW_TRUST,
detail="Auto-approve is disabled",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 1: CRITICAL 永遠不自動執行
if risk_level == "critical":
return self._reject(
reason=AutoApproveReason.CRITICAL_OPERATION,
detail="CRITICAL operations always require human approval",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 2: 風險等級必須在允許列表中
if risk_level not in self.config.allowed_risk_levels:
return self._reject(
reason=AutoApproveReason.HIGH_RISK,
detail=f"Risk level '{risk_level}' not in allowed list {self.config.allowed_risk_levels}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 3: 信任分數
if trust_score < self.config.min_trust_score:
return self._reject(
reason=AutoApproveReason.LOW_TRUST,
detail=f"Trust score {trust_score} < {self.config.min_trust_score}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 4: AI 信心度
if confidence < self.config.min_confidence:
return self._reject(
reason=AutoApproveReason.LOW_TRUST,
detail=f"Confidence {confidence:.0%} < {self.config.min_confidence:.0%}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 5: Playbook 匹配 (如果要求)
if self.config.require_playbook:
if playbook is None:
return self._reject(
reason=AutoApproveReason.NO_PLAYBOOK,
detail="No matching Playbook found",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 6: Playbook 成功率
if playbook_success_rate is not None:
if playbook_success_rate < self.config.min_playbook_success_rate:
return self._reject(
reason=AutoApproveReason.LOW_SUCCESS_RATE,
detail=f"Playbook success rate {playbook_success_rate:.0%} < {self.config.min_playbook_success_rate:.0%}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
playbook_match=playbook_match,
playbook_success_rate=playbook_success_rate,
playbook_success_count=playbook_success_count,
)
# 條件 7: Playbook 成功次數
if playbook_success_count is not None:
if playbook_success_count < self.config.min_playbook_success_count:
return self._reject(
reason=AutoApproveReason.INSUFFICIENT_HISTORY,
detail=f"Playbook success count {playbook_success_count} < {self.config.min_playbook_success_count}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
playbook_match=playbook_match,
playbook_success_rate=playbook_success_rate,
playbook_success_count=playbook_success_count,
)
# ========== 所有條件通過 ==========
return self._approve(
reason=AutoApproveReason.PLAYBOOK_MATCH if playbook else AutoApproveReason.TRUST_SCORE,
detail=f"All conditions met: risk={risk_level}, trust={trust_score}, confidence={confidence:.0%}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
playbook_match=playbook_match,
playbook_success_rate=playbook_success_rate,
playbook_success_count=playbook_success_count,
)
def _approve(
self,
reason: AutoApproveReason,
detail: str,
**kwargs,
) -> AutoApproveDecision:
"""建立自動執行決策"""
decision = AutoApproveDecision(
should_auto_approve=True,
reason=reason,
reason_detail=detail,
**kwargs,
)
if self.config.audit_all:
logger.info(
"auto_approve_decision",
approved=True,
reason=reason.value,
detail=detail,
trust_score=kwargs.get("trust_score"),
)
return decision
def _reject(
self,
reason: AutoApproveReason,
detail: str,
**kwargs,
) -> AutoApproveDecision:
"""建立拒絕自動執行決策"""
decision = AutoApproveDecision(
should_auto_approve=False,
reason=reason,
reason_detail=detail,
**kwargs,
)
if self.config.audit_all:
logger.debug(
"auto_approve_decision",
approved=False,
reason=reason.value,
detail=detail,
trust_score=kwargs.get("trust_score"),
)
return decision
def _extract_action_pattern(self, action: str) -> str:
"""
從 action 字串提取 pattern
例如:
- "kubectl rollout restart deployment/awoooi-api""rollout_restart:awoooi-api"
- "kubectl scale deployment/nginx --replicas=3""scale:nginx"
"""
if not action:
return "unknown"
parts = action.split()
if len(parts) < 3:
return "unknown"
# kubectl <verb> <resource>/<name>
verb = parts[1] if len(parts) > 1 else "unknown"
resource_part = parts[2] if len(parts) > 2 else ""
if "/" in resource_part:
resource_name = resource_part.split("/")[-1]
else:
resource_name = resource_part
# 移除可能的選項
resource_name = resource_name.split()[0] if " " in resource_name else resource_name
return f"{verb}:{resource_name}"
# =============================================================================
# Singleton
# =============================================================================
_auto_approve_policy: AutoApprovePolicy | None = None
def get_auto_approve_policy() -> AutoApprovePolicy:
"""取得自動執行策略 singleton"""
global _auto_approve_policy
if _auto_approve_policy is None:
_auto_approve_policy = AutoApprovePolicy()
return _auto_approve_policy

View File

@@ -31,6 +31,7 @@ from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.auto_approve import get_auto_approve_policy
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
@@ -422,15 +423,89 @@ class DecisionManager:
# 4. 儲存最終結果
await self._save_token(token)
# 5. Phase 6.5: 推送到 Telegram (非阻塞)
# 5. ADR-030 Phase 4: 自動執行判斷
if token.state == DecisionState.READY and token.proposal_data:
# 使用 asyncio.create_task 非阻塞執行
# 評估是否可以自動執行
auto_policy = get_auto_approve_policy()
auto_decision = auto_policy.evaluate(
proposal_data=token.proposal_data,
playbook=token.proposal_data.get("_matched_playbook"), # 如果有
)
if auto_decision.should_auto_approve:
# 自動執行 (跳過人工審核)
logger.info(
"auto_approve_triggered",
incident_id=incident.incident_id,
reason=auto_decision.reason.value,
detail=auto_decision.reason_detail,
)
token.state = DecisionState.EXECUTING
token.proposal_data["auto_approved"] = True
token.proposal_data["auto_approve_reason"] = auto_decision.reason_detail
await self._save_token(token)
# 觸發自動執行 (非阻塞)
asyncio.create_task(
self._auto_execute(incident, token)
)
else:
# 需人工審核: 推送到 Telegram
asyncio.create_task(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _auto_execute(self, incident: Incident, token: "DecisionToken") -> None:
"""
ADR-030 Phase 4: 自動執行已批准的操作
僅當 AutoApprovePolicy 判斷可自動執行時呼叫
"""
try:
# 延遲導入避免循環依賴
from src.services.approval_execution import ApprovalExecutionService
from src.models.approval import ApprovalRequest, ApprovalStatus
# 建立虛擬 ApprovalRequest
approval = ApprovalRequest(
incident_id=incident.incident_id,
action=token.proposal_data.get("kubectl_command", ""),
status=ApprovalStatus.APPROVED,
risk_level=token.proposal_data.get("risk_level", "low"),
)
# 執行
executor = ApprovalExecutionService()
await executor.execute_approved_action(approval)
# 更新狀態
token.state = DecisionState.COMPLETED
token.proposal_data["auto_executed"] = True
await self._save_token(token)
logger.info(
"auto_execute_completed",
incident_id=incident.incident_id,
action=approval.action,
)
except Exception as e:
logger.error(
"auto_execute_failed",
incident_id=incident.incident_id,
error=str(e),
)
token.state = DecisionState.ERROR
token.error = f"Auto-execute failed: {e}"
await self._save_token(token)
# 失敗時 fallback 到人工審核
asyncio.create_task(
_push_decision_to_telegram(incident, token.proposal_data)
)
return token
async def _dual_engine_analyze(
self,
incident: Incident,

View File

@@ -415,3 +415,8 @@ def normalize_action_pattern(
# 全域實例
trust_engine = TrustScoreManager()
def get_trust_manager() -> TrustScoreManager:
"""取得 TrustScoreManager singleton (ADR-030 Phase 4)"""
return trust_engine