Files
awoooi/apps/api/src/services/auto_approve.py
OG T 6c7f648b60
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 18m56s
fix: 3 個飛輪沉默未打通節點 — 統帥截圖盤出
統帥截圖證據 (Telegram MEDIUM 告警仍走人工審核):
INC-20260411-A03B2E / A2BB29 顯示「[規則匹配]」+ action=unknown-service

節點 1: AutoApprovePolicy 擋下規則匹配 (飛輪主因)
  - ADR-073 規則匹配 confidence=0.0 (防偽造)
  - AutoApprovePolicy.min_confidence=0.50 → 擋下
  - 結果: MEDIUM 規則匹配永遠人工審核,飛輪不轉
  修復: auto_approve.py 加 _is_rule_based 判斷
        (is_rule_based / source=expert_system / rule_id / matched_rule)
        → bypass min_confidence 檢查
        → 驗證: should_auto_approve=True 

節點 2: _is_bad_target 漏 unknown-service magic string
  - _resolve_target_from_k8s fallback 產 unknown-service / unknown-pod
  - GAP-A4 Phase 1/2 只擋 'unknown' 而非前綴
  修復: alert_rule_engine.py 加 unknown-/none-/null-/undefined- 前綴黑名單
        → 驗證: 4 個 magic 全 bad 

節點 3: stale_ready_tokens_resend 無時效過濾
  - 截圖是 2026-04-11 (4 天前) 告警
  - 舊 labels 過期,重 process 也產不出新 target
  - 壓爆 Ollama + 污染 Telegram 卡片
  修復: decision_manager.py 跳過 > 3 天的 stale incident
        → skip + log stale_ready_token_skipped_too_old

回歸: 113/113

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-04-15 10:56:48 +08:00

464 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Auto-Approve Service - Phase 4 自動執行策略
==========================================
ADR-030: 智能自動修復系統
自動執行條件 (全部滿足才放行):
1. 風險等級 = LOW
2. 信任度 >= 90% (或 TrustEngine score >= 5)
3. 有匹配的 Playbook 且成功率 >= 95%
4. Playbook 成功執行次數 >= 3
設計原則:
- 保守策略 (寧可人工審核,不可錯誤自動執行)
- 完整審計追蹤
- CRITICAL 永遠不自動執行
版本: v1.0
建立: 2026-03-26 (台北時區)
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import structlog
from src.models.playbook import Playbook
from src.services.playbook_rag import PlaybookMatch
from src.services.trust_engine import TrustScoreManager, get_trust_manager
logger = structlog.get_logger(__name__)
# =============================================================================
# Configuration
# =============================================================================
class AutoApproveReason(str, Enum):
"""自動執行/拒絕原因"""
# 自動執行
PLAYBOOK_MATCH = "playbook_match" # Playbook 匹配成功
TRUST_SCORE = "trust_score" # 信任分數達標
LOW_RISK = "low_risk" # 低風險操作
# 拒絕自動執行
HIGH_RISK = "high_risk" # 風險過高
CRITICAL_OPERATION = "critical_operation" # 關鍵操作
LOW_TRUST = "low_trust" # 信任不足
NO_PLAYBOOK = "no_playbook" # 無匹配 Playbook
LOW_SUCCESS_RATE = "low_success_rate" # Playbook 成功率不足
INSUFFICIENT_HISTORY = "insufficient_history" # 執行歷史不足
@dataclass
class AutoApproveConfig:
"""自動執行配置"""
# 風險等級閾值
# 2026-04-11 Claude Sonnet 4.6: ADR-070 全自動化方向 — low/medium/high 全開放
# 真正需要人工的由 DESTRUCTIVE_PATTERNS 攔截scale=0, delete, drop
# 原: ["low", "medium"] → 導致所有 high risk 告警永遠走人工審核
allowed_risk_levels: list[str] = field(
default_factory=lambda: ["low", "medium", "high"]
)
# 信任度閾值
# 2026-04-10 Claude Sonnet 4.6: trust_score 是 in-memoryPod 重啟歸零
# → 改為 0讓 medium risk + confidence >= 0.65 的操作直接自動執行
# 歷史原因: min_trust_score=1 導致所有告警永遠走審批,從未自動修復
min_trust_score: int = 0 # 不要求執行歷史 (原: 1)
# 2026-04-11 Claude Sonnet 4.6: ADR-070 全自動化 — 0.5 即可執行
# 真正風險由 DESTRUCTIVE_PATTERNS + risk_level=critical 把關
min_confidence: float = 0.50 # AI 有基本把握即可 (原: 0.90, 後: 0.65)
# Playbook 閾值
# 2026-04-01 ogt: 降低啟動門檻1次成功記錄即可
min_playbook_success_rate: float = 0.80 # 成功率 >= 80% (原: 95%)
min_playbook_success_count: int = 1 # 成功次數 >= 1 (原: 3)
# 功能開關
enabled: bool = True # 總開關
require_playbook: bool = False # 不強制要求 Playbook (原: True)Gemini 分析直接可執行
audit_all: bool = True # 是否記錄所有判斷
# 預設配置 (實用策略 - 2026-04-01 ogt Phase 22 P1)
DEFAULT_CONFIG = AutoApproveConfig()
# =============================================================================
# 破壞性指令攔截清單 (ADR-070, 2026-04-11 Claude Sonnet 4.6)
# C3+M1 修復 (Code Review 2026-04-11): 移至模組常量 + 補全 K8s/Docker 高風險操作
# 原則: 可恢復操作 → 自動執行; 不可逆 / 業務衝擊 → 人工確認
# =============================================================================
_DESTRUCTIVE_PATTERNS: list[str] = [
# --- Scale to zero (停機) ---
"--replicas=0", # kubectl scale --replicas=0
'"replicas": 0', # kubectl patch JSON patch
"'replicas': 0", # kubectl patch 單引號變體
"replicas=0", # 任何形式的 replicas=0
# --- K8s 刪除操作 ---
"delete pod", # 強制刪除 pod (kubectl delete pod / pods)
"delete pods", # 複數形式
"delete deployment", # 刪除 deployment
"delete pvc", # 刪除 PVC (資料丟失)
"delete namespace", # 刪除 namespace
"kubectl drain", # 驅逐節點所有 pod
"kubectl cordon", # 封鎖節點(業務影響)
"kubectl rollout undo", # 回滾部署(需人工確認版本)
# --- Docker 破壞性操作 ---
"docker rm", # 刪除容器
"docker stop", # 停止容器(不同於 restart
"docker kill", # 強制殺死容器
# --- DB DDL (不可逆) ---
"drop table",
"drop database",
"truncate table",
# --- SSH 危險指令 ---
"rm -rf", # 遞迴刪除
"rm -f /", # 刪除根目錄
]
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class AutoApproveDecision:
"""自動執行決策結果"""
should_auto_approve: bool
reason: AutoApproveReason
reason_detail: str
# 判斷依據
risk_level: str
trust_score: int
confidence: float
playbook_match: PlaybookMatch | None = None
playbook_success_rate: float | None = None
playbook_success_count: int | None = None
# 時間戳
decided_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"should_auto_approve": self.should_auto_approve,
"reason": self.reason.value,
"reason_detail": self.reason_detail,
"risk_level": self.risk_level,
"trust_score": self.trust_score,
"confidence": self.confidence,
"playbook_match": self.playbook_match.to_dict() if self.playbook_match else None,
"playbook_success_rate": self.playbook_success_rate,
"playbook_success_count": self.playbook_success_count,
"decided_at": self.decided_at.isoformat(),
}
def to_audit_log(self) -> str:
"""生成審計日誌"""
status = "AUTO_APPROVED" if self.should_auto_approve else "REQUIRES_HUMAN"
return (
f"[{status}] {self.reason.value}: {self.reason_detail} "
f"(risk={self.risk_level}, trust={self.trust_score}, conf={self.confidence:.0%})"
)
# =============================================================================
# Auto-Approve Policy
# =============================================================================
class AutoApprovePolicy:
"""
自動執行策略
判斷提案是否可以跳過人工審核直接執行
核心原則:
- CRITICAL 永遠不自動執行
- 必須有足夠的歷史成功記錄
- 信任度達標
- 風險等級為 LOW
"""
def __init__(
self,
config: AutoApproveConfig | None = None,
trust_manager: TrustScoreManager | None = None,
):
self.config = config or DEFAULT_CONFIG
self._trust_manager = trust_manager
@property
def trust_manager(self) -> TrustScoreManager:
"""Lazy load trust manager"""
if self._trust_manager is None:
self._trust_manager = get_trust_manager()
return self._trust_manager
def evaluate(
self,
proposal_data: dict[str, Any],
playbook: Playbook | None = None,
playbook_match: PlaybookMatch | None = None,
) -> AutoApproveDecision:
"""
評估提案是否可自動執行
Args:
proposal_data: 提案資料 (含 risk_level, confidence, action 等)
playbook: 匹配的 Playbook (可選)
playbook_match: RAG 匹配結果 (可選)
Returns:
AutoApproveDecision 包含決策結果和原因
"""
# 基本資訊
risk_level = proposal_data.get("risk_level", "medium").lower()
confidence = proposal_data.get("confidence", 0.0) # 🔴 無信心度=規則匹配
action = proposal_data.get("action", "") or proposal_data.get("kubectl_command", "")
action_pattern = self._extract_action_pattern(action)
# 取得信任分數
trust_record = self.trust_manager.get_trust_record(action_pattern)
trust_score = trust_record.score if trust_record else 0
# Playbook 資訊
playbook_success_rate = playbook.success_rate if playbook else None
playbook_success_count = playbook.success_count if playbook else None
# ========== 檢查條件 ==========
# 條件 0: 功能是否啟用
if not self.config.enabled:
return self._reject(
reason=AutoApproveReason.LOW_TRUST,
detail="Auto-approve is disabled",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 1: CRITICAL 永遠不自動執行
if risk_level == "critical":
return self._reject(
reason=AutoApproveReason.CRITICAL_OPERATION,
detail="CRITICAL operations always require human approval",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 1b: 破壞性指令攔截 (ADR-070: 2026-04-11 Claude Sonnet 4.6)
# 即使是 low/medium risk以下操作仍需人工確認
# 原則: 可恢復操作 → 自動執行; 不可逆 / 業務衝擊 → 人工
# M1+C3 修復 2026-04-11 (Code Review): 移至模組常量 + 補全 K8s/Docker 高風險操作
action_lower = action.lower()
for pattern in _DESTRUCTIVE_PATTERNS:
if pattern in action_lower:
return self._reject(
reason=AutoApproveReason.CRITICAL_OPERATION,
detail=f"Destructive pattern detected: '{pattern}' in action — requires human approval",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 2: 風險等級必須在允許列表中
if risk_level not in self.config.allowed_risk_levels:
return self._reject(
reason=AutoApproveReason.HIGH_RISK,
detail=f"Risk level '{risk_level}' not in allowed list {self.config.allowed_risk_levels}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 3: 信任分數
if trust_score < self.config.min_trust_score:
return self._reject(
reason=AutoApproveReason.LOW_TRUST,
detail=f"Trust score {trust_score} < {self.config.min_trust_score}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 4: AI 信心度
# 2026-04-15 Claude Sonnet 4.6 (飛輪沉默節點 1 修復):
# 規則匹配的 confidence 固定 0.0ADR-073 防偽造),會被此條件擋下
# 但 YAML 規則是人工審核過的,應直接信任 → bypass min_confidence
# 改用「Playbook 成功率」或「規則 source」判斷可信度
_is_rule_based = (
proposal_data.get("is_rule_based") is True
or proposal_data.get("source") == "expert_system"
or (proposal_data.get("rule_id") or "") != ""
or (proposal_data.get("matched_rule") or "") != ""
)
if not _is_rule_based and confidence < self.config.min_confidence:
return self._reject(
reason=AutoApproveReason.LOW_TRUST,
detail=f"Confidence {confidence:.0%} < {self.config.min_confidence:.0%}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 5: Playbook 匹配 (如果要求)
if self.config.require_playbook:
if playbook is None:
return self._reject(
reason=AutoApproveReason.NO_PLAYBOOK,
detail="No matching Playbook found",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
)
# 條件 6: Playbook 成功率
if playbook_success_rate is not None:
if playbook_success_rate < self.config.min_playbook_success_rate:
return self._reject(
reason=AutoApproveReason.LOW_SUCCESS_RATE,
detail=f"Playbook success rate {playbook_success_rate:.0%} < {self.config.min_playbook_success_rate:.0%}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
playbook_match=playbook_match,
playbook_success_rate=playbook_success_rate,
playbook_success_count=playbook_success_count,
)
# 條件 7: Playbook 成功次數
if playbook_success_count is not None:
if playbook_success_count < self.config.min_playbook_success_count:
return self._reject(
reason=AutoApproveReason.INSUFFICIENT_HISTORY,
detail=f"Playbook success count {playbook_success_count} < {self.config.min_playbook_success_count}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
playbook_match=playbook_match,
playbook_success_rate=playbook_success_rate,
playbook_success_count=playbook_success_count,
)
# ========== 所有條件通過 ==========
return self._approve(
reason=AutoApproveReason.PLAYBOOK_MATCH if playbook else AutoApproveReason.TRUST_SCORE,
detail=f"All conditions met: risk={risk_level}, trust={trust_score}, confidence={confidence:.0%}",
risk_level=risk_level,
trust_score=trust_score,
confidence=confidence,
playbook_match=playbook_match,
playbook_success_rate=playbook_success_rate,
playbook_success_count=playbook_success_count,
)
def _approve(
self,
reason: AutoApproveReason,
detail: str,
**kwargs,
) -> AutoApproveDecision:
"""建立自動執行決策"""
decision = AutoApproveDecision(
should_auto_approve=True,
reason=reason,
reason_detail=detail,
**kwargs,
)
if self.config.audit_all:
logger.info(
"auto_approve_decision",
approved=True,
reason=reason.value,
detail=detail,
trust_score=kwargs.get("trust_score"),
)
return decision
def _reject(
self,
reason: AutoApproveReason,
detail: str,
**kwargs,
) -> AutoApproveDecision:
"""建立拒絕自動執行決策"""
decision = AutoApproveDecision(
should_auto_approve=False,
reason=reason,
reason_detail=detail,
**kwargs,
)
if self.config.audit_all:
logger.debug(
"auto_approve_decision",
approved=False,
reason=reason.value,
detail=detail,
trust_score=kwargs.get("trust_score"),
)
return decision
def _extract_action_pattern(self, action: str) -> str:
"""
從 action 字串提取 pattern
例如:
- "kubectl rollout restart deployment/awoooi-api""rollout_restart:awoooi-api"
- "kubectl scale deployment/nginx --replicas=3""scale:nginx"
"""
if not action:
return "unknown"
parts = action.split()
if len(parts) < 3:
return "unknown"
# kubectl <verb> <resource>/<name>
verb = parts[1] if len(parts) > 1 else "unknown"
resource_part = parts[2] if len(parts) > 2 else ""
if "/" in resource_part:
resource_name = resource_part.split("/")[-1]
else:
resource_name = resource_part
# 移除可能的選項
resource_name = resource_name.split()[0] if " " in resource_name else resource_name
return f"{verb}:{resource_name}"
# =============================================================================
# Singleton
# =============================================================================
_auto_approve_policy: AutoApprovePolicy | None = None
def get_auto_approve_policy() -> AutoApprovePolicy:
"""取得自動執行策略 singleton"""
global _auto_approve_policy
if _auto_approve_policy is None:
_auto_approve_policy = AutoApprovePolicy()
return _auto_approve_policy