Files
awoooi/apps/api/src/models/approval.py
OG T 7da64eaad2
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 19m7s
Type Sync Check / check-type-sync (push) Failing after 1m18s
feat(Phase 3): 學習閉環重建 — 三根因修復 + 2x EWMA + Evolver Agent
ADR-083 Phase 3 學習閉環重建:

**三根因修復**
- approval_execution.py: fire-and-forget create_task → await asyncio.wait_for(timeout=30) × 2
  (成功路徑 L265 + 失敗路徑 L353,超時記錄 learning_trigger_timeout metric,主流程不 crash)
- models/approval.py: ApprovalRequestBase 新增 matched_playbook_id 欄位
- decision_manager.py: _auto_execute 建立 ApprovalRequest 時填充 matched_playbook_id
- learning_service.py: 雙路徑查找 _matched_pb_id(matched_playbook_id + metadata fallback)

**2x EWMA 負向強化**
- models/playbook.py: 新增 trust_score: float = 0.3(EWMA 動態信任度欄位)
- repositories/playbook_repository.py: update_stats 加 EWMA
  成功: trust = 0.9 × old + 0.1 × 1.0
  失敗: trust = 0.8 × old + 0.2 × 0.0(衰減速度 2x)
  trust < 0.1 → log warning,等 Evolver 封存

**Evolver Agent(新建)**
- services/playbook_evolver.py: 三功能全靜態規則
  1. 低信任封存: trust < 0.1 → DEPRECATED
  2. 休眠封存: 30d 未使用 AND trust < 0.5 → DEPRECATED
  3. 相似合併: 症狀 Jaccard > 0.9 → 保留高 trust,封存低 trust
  AIOPS_P3_EVOLVER_ENABLED=False 預設關閉

**文件**
- ADR-083 學習閉環重建
- MASTER §8 Phase 3 完工記錄

AIOPS_P3_ENABLED=False(預設),骨架就位等統帥批准開啟

Co-Authored-By: Claude Sonnet 4.6(亞太)<noreply@anthropic.com>
2026-04-15 14:01:37 +08:00

312 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HITL Approval Models
====================
CISO-101: 授權請求與簽核資料模型
Features:
- 狀態機 (PENDING → APPROVED/REJECTED/EXPIRED)
- 風險等級判定 (LOW/MEDIUM/CRITICAL)
- Multi-Sig 簽核追蹤
- Pydantic 強型別驗證
"""
from datetime import datetime, timezone
from enum import Enum
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
# =============================================================================
# Enums
# =============================================================================
class ApprovalStatus(str, Enum):
"""
授權請求狀態機
PENDING → APPROVED → EXECUTION_SUCCESS
→ EXECUTION_FAILED
PENDING → REJECTED
PENDING → EXPIRED
"""
PENDING = "pending" # 等待簽核
APPROVED = "approved" # 已批准 (滿足簽核數,準備執行)
REJECTED = "rejected" # 已拒絕
EXPIRED = "expired" # 已過期
EXECUTION_SUCCESS = "execution_success" # 執行成功
EXECUTION_FAILED = "execution_failed" # 執行失敗
class RiskLevel(str, Enum):
"""
風險等級 - 決定所需簽核人數
- LOW: 0 人,自動放行
- MEDIUM: 需 1 人簽核
- HIGH: 需 1 人簽核 (信任引擎可降級至 MEDIUM)
- CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (永不降級)
變更紀錄:
- 2026-03-25: 新增 HIGH (Phase 16 R2 合併自 trust_engine.py)
"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high" # Phase 16 R2: 從 trust_engine.py 合併 (2026-03-25)
CRITICAL = "critical"
class DataImpact(str, Enum):
"""資料影響類型"""
NONE = "none"
READ_ONLY = "read_only"
WRITE = "write"
DESTRUCTIVE = "destructive"
# =============================================================================
# Sub-models
# =============================================================================
class BlastRadius(BaseModel):
"""爆炸半徑 - 影響範圍評估"""
affected_pods: int = Field(default=0, ge=0)
estimated_downtime: str = Field(default="0")
related_services: list[str] = Field(default_factory=list)
data_impact: DataImpact = Field(default=DataImpact.NONE)
class DryRunCheck(BaseModel):
"""Dry-Run 預演檢查結果"""
name: str
passed: bool
message: str | None = None
class SignatureSource(str, Enum):
"""
簽核來源通道 (Phase 5.4.5: AuditLog 擴充)
用於追溯簽核是從哪個通道發起
"""
WEB = "web" # Web UI 簽核
TELEGRAM = "telegram" # Telegram 簽核
API = "api" # API 直接呼叫
SYSTEM = "system" # 系統自動 (LOW 風險)
class Signature(BaseModel):
"""
簽核記錄
Phase 5.4.5: 新增 Telegram 審計欄位
- source: 簽核來源通道
- telegram_user_id: Telegram User ID (永久追溯憑證)
- telegram_message_id: Telegram 訊息 ID
"""
id: UUID = Field(default_factory=uuid4)
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
signed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
comment: str | None = None
# Phase 5.4.5: Telegram 審計軌跡
source: SignatureSource = Field(
default=SignatureSource.WEB,
description="簽核來源通道 (web/telegram/api/system)",
)
telegram_user_id: int | None = Field(
default=None,
description="Telegram User ID (永久追溯憑證)",
)
telegram_message_id: int | None = Field(
default=None,
description="Telegram 訊息 ID",
)
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# =============================================================================
# Main Models
# =============================================================================
class ApprovalRequestBase(BaseModel):
"""授權請求基礎模型"""
action: str = Field(..., description="執行動作描述")
description: str = Field(..., description="詳細說明")
risk_level: RiskLevel = Field(..., description="風險等級")
blast_radius: BlastRadius = Field(default_factory=BlastRadius)
dry_run_checks: list[DryRunCheck] = Field(default_factory=list)
requested_by: str = Field(..., description="請求發起者")
expires_at: datetime | None = Field(default=None, description="到期時間")
metadata: dict | None = Field(default=None, description="額外元資料")
# 2026-04-14 Claude Sonnet 4.6: 上移 incident_id 到 Base
# 讓 ApprovalRequestCreate 也能攜帶(修 9b9ff5b 的 NoneAttr bug
incident_id: str | None = Field(default=None, description="關聯的 Incident ID")
# ADR-083 Phase 3: 命中的 Playbook ID學習迴路必填
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 matched_playbook_id 傳遞修復
matched_playbook_id: str | None = Field(default=None, description="命中的 Playbook ID供學習服務 EWMA 更新")
class ApprovalRequestCreate(ApprovalRequestBase):
"""建立授權請求 (API 輸入)"""
pass
class ApprovalRequest(ApprovalRequestBase):
"""完整授權請求模型"""
id: UUID = Field(default_factory=uuid4)
status: ApprovalStatus = Field(default=ApprovalStatus.PENDING)
required_signatures: int = Field(..., description="所需簽核數")
signatures: list[Signature] = Field(default_factory=list)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
resolved_at: datetime | None = Field(default=None, description="解決時間")
rejection_reason: str | None = Field(default=None)
# 戰略 B: 告警風暴收斂
fingerprint: str | None = Field(default=None, description="告警指紋 Hash")
hit_count: int = Field(default=1, description="聚合觸發次數")
last_seen_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="最後觸發時間")
# 2026-04-14 Claude Sonnet 4.6: incident_id 已移至 Base避免 ApprovalRequestCreate 缺欄位)
@property
def current_signatures(self) -> int:
"""目前已收集的簽核數"""
return len(self.signatures)
@property
def is_fully_signed(self) -> bool:
"""是否已滿足所需簽核數"""
return self.current_signatures >= self.required_signatures
@property
def remaining_signatures(self) -> int:
"""還需要的簽核數"""
return max(0, self.required_signatures - self.current_signatures)
def has_signer(self, signer_id: str) -> bool:
"""檢查某人是否已簽核"""
return any(s.signer_id == signer_id for s in self.signatures)
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# =============================================================================
# API Response Models
# =============================================================================
class ApprovalRequestResponse(BaseModel):
"""授權請求 API 回應"""
id: str
action: str
description: str
status: ApprovalStatus
risk_level: RiskLevel
blast_radius: BlastRadius
dry_run_checks: list[DryRunCheck]
required_signatures: int
current_signatures: int
signatures: list[Signature]
requested_by: str
created_at: datetime
expires_at: datetime | None
resolved_at: datetime | None
# 戰略 B: 告警風暴收斂
fingerprint: str | None = None
hit_count: int = 1
last_seen_at: datetime | None = None
# Phase 6.5: Incident 關聯 (用於簽核後更新 Incident 狀態)
metadata: dict | None = None
@classmethod
def from_approval(cls, approval: ApprovalRequest) -> "ApprovalRequestResponse":
"""從 ApprovalRequest 轉換"""
return cls(
id=str(approval.id),
action=approval.action,
description=approval.description,
status=approval.status,
risk_level=approval.risk_level,
blast_radius=approval.blast_radius,
dry_run_checks=approval.dry_run_checks,
required_signatures=approval.required_signatures,
current_signatures=approval.current_signatures,
signatures=approval.signatures,
requested_by=approval.requested_by,
created_at=approval.created_at,
expires_at=approval.expires_at,
resolved_at=approval.resolved_at,
# 戰略 B
fingerprint=approval.fingerprint,
hit_count=approval.hit_count,
last_seen_at=approval.last_seen_at,
# Phase 6.5
metadata=approval.metadata,
)
class SignRequest(BaseModel):
"""簽核請求"""
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
comment: str | None = Field(default=None, description="簽核備註")
class RejectRequest(BaseModel):
"""退回請求"""
rejector_id: str = Field(..., description="退回者 ID")
rejector_name: str = Field(..., description="退回者名稱")
reason: str = Field(..., description="退回原因")
class SignResponse(BaseModel):
"""簽核回應"""
success: bool
message: str
approval: ApprovalRequestResponse
execution_triggered: bool = Field(
default=False,
description="是否觸發執行 (當簽核數滿足時)"
)
class PendingApprovalsResponse(BaseModel):
"""待簽核清單回應"""
count: int
approvals: list[ApprovalRequestResponse]
# =============================================================================
# Phase 11: 批次處理 (Bulk Approval)
# =============================================================================
class BulkApproveRequest(BaseModel):
"""
批次簽核請求 (Phase 11)
安全限制:
- CRITICAL 風險禁止批次核准
- DESTRUCTIVE 資料影響禁止批次核准
"""
approval_ids: list[str] = Field(..., description="要批次簽核的 Approval ID 列表")
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
comment: str | None = Field(default=None, description="批次簽核備註")
class BulkApproveResult(BaseModel):
"""單個批次簽核結果"""
approval_id: str
success: bool
message: str
execution_triggered: bool = False
class BulkApproveResponse(BaseModel):
"""批次簽核回應"""
total: int = Field(..., description="請求處理的總數")
succeeded: int = Field(..., description="成功簽核數")
failed: int = Field(..., description="失敗數")
skipped: int = Field(..., description="跳過數 (CRITICAL/DESTRUCTIVE)")
results: list[BulkApproveResult] = Field(..., description="各個 Approval 的處理結果")