""" HITL Approval Models ==================== CISO-101: 授權請求與簽核資料模型 Features: - 狀態機 (PENDING → APPROVED/REJECTED/EXPIRED) - 風險等級判定 (LOW/MEDIUM/CRITICAL) - Multi-Sig 簽核追蹤 - Pydantic 強型別驗證 """ from datetime import datetime, timezone from enum import Enum from uuid import UUID, uuid4 from pydantic import BaseModel, Field # ============================================================================= # Enums # ============================================================================= class ApprovalStatus(str, Enum): """ 授權請求狀態機 PENDING → APPROVED → EXECUTION_SUCCESS → EXECUTION_FAILED PENDING → REJECTED PENDING → EXPIRED """ PENDING = "pending" # 等待簽核 APPROVED = "approved" # 已批准 (滿足簽核數,準備執行) REJECTED = "rejected" # 已拒絕 EXPIRED = "expired" # 已過期 EXECUTION_SUCCESS = "execution_success" # 執行成功 EXECUTION_FAILED = "execution_failed" # 執行失敗 class RiskLevel(str, Enum): """ 風險等級 - 決定所需簽核人數 - LOW: 0 人,自動放行 - MEDIUM: 需 1 人簽核 - HIGH: 需 1 人簽核 (信任引擎可降級至 MEDIUM) - CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (永不降級) 變更紀錄: - 2026-03-25: 新增 HIGH (Phase 16 R2 合併自 trust_engine.py) """ LOW = "low" MEDIUM = "medium" HIGH = "high" # Phase 16 R2: 從 trust_engine.py 合併 (2026-03-25) CRITICAL = "critical" class DataImpact(str, Enum): """資料影響類型""" NONE = "none" READ_ONLY = "read_only" WRITE = "write" DESTRUCTIVE = "destructive" # ============================================================================= # Sub-models # ============================================================================= class BlastRadius(BaseModel): """爆炸半徑 - 影響範圍評估""" affected_pods: int = Field(default=0, ge=0) estimated_downtime: str = Field(default="0") related_services: list[str] = Field(default_factory=list) data_impact: DataImpact = Field(default=DataImpact.NONE) class DryRunCheck(BaseModel): """Dry-Run 預演檢查結果""" name: str passed: bool message: str | None = None class SignatureSource(str, Enum): """ 簽核來源通道 (Phase 5.4.5: AuditLog 擴充) 用於追溯簽核是從哪個通道發起 """ WEB = "web" # Web UI 簽核 TELEGRAM = "telegram" # Telegram 簽核 API = "api" # API 直接呼叫 SYSTEM = "system" # 系統自動 (LOW 風險) class Signature(BaseModel): """ 簽核記錄 Phase 5.4.5: 新增 Telegram 審計欄位 - source: 簽核來源通道 - telegram_user_id: Telegram User ID (永久追溯憑證) - telegram_message_id: Telegram 訊息 ID """ id: UUID = Field(default_factory=uuid4) signer_id: str = Field(..., description="簽核者 ID") signer_name: str = Field(..., description="簽核者名稱") signed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) comment: str | None = None # Phase 5.4.5: Telegram 審計軌跡 source: SignatureSource = Field( default=SignatureSource.WEB, description="簽核來源通道 (web/telegram/api/system)", ) telegram_user_id: int | None = Field( default=None, description="Telegram User ID (永久追溯憑證)", ) telegram_message_id: int | None = Field( default=None, description="Telegram 訊息 ID", ) # [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei # ============================================================================= # Main Models # ============================================================================= class ApprovalRequestBase(BaseModel): """授權請求基礎模型""" action: str = Field(..., description="執行動作描述") description: str = Field(..., description="詳細說明") risk_level: RiskLevel = Field(..., description="風險等級") blast_radius: BlastRadius = Field(default_factory=BlastRadius) dry_run_checks: list[DryRunCheck] = Field(default_factory=list) requested_by: str = Field(..., description="請求發起者") expires_at: datetime | None = Field(default=None, description="到期時間") metadata: dict | None = Field(default=None, description="額外元資料") # 2026-04-14 Claude Sonnet 4.6: 上移 incident_id 到 Base, # 讓 ApprovalRequestCreate 也能攜帶(修 9b9ff5b 的 NoneAttr bug) incident_id: str | None = Field(default=None, description="關聯的 Incident ID") # ADR-083 Phase 3: 命中的 Playbook ID(學習迴路必填) # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 matched_playbook_id 傳遞修復 matched_playbook_id: str | None = Field(default=None, description="命中的 Playbook ID,供學習服務 EWMA 更新") class ApprovalRequestCreate(ApprovalRequestBase): """建立授權請求 (API 輸入)""" pass class ApprovalRequest(ApprovalRequestBase): """完整授權請求模型""" id: UUID = Field(default_factory=uuid4) status: ApprovalStatus = Field(default=ApprovalStatus.PENDING) required_signatures: int = Field(..., description="所需簽核數") signatures: list[Signature] = Field(default_factory=list) created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) resolved_at: datetime | None = Field(default=None, description="解決時間") rejection_reason: str | None = Field(default=None) # 戰略 B: 告警風暴收斂 fingerprint: str | None = Field(default=None, description="告警指紋 Hash") hit_count: int = Field(default=1, description="聚合觸發次數") last_seen_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="最後觸發時間") telegram_message_id: int | None = Field(default=None, description="Telegram approval card message ID") telegram_chat_id: int | None = Field(default=None, description="Telegram chat ID for the approval card") # 2026-04-14 Claude Sonnet 4.6: incident_id 已移至 Base(避免 ApprovalRequestCreate 缺欄位) @property def current_signatures(self) -> int: """目前已收集的簽核數""" return len(self.signatures) @property def is_fully_signed(self) -> bool: """是否已滿足所需簽核數""" return self.current_signatures >= self.required_signatures @property def remaining_signatures(self) -> int: """還需要的簽核數""" return max(0, self.required_signatures - self.current_signatures) def has_signer(self, signer_id: str) -> bool: """檢查某人是否已簽核""" return any(s.signer_id == signer_id for s in self.signatures) # [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei # ============================================================================= # API Response Models # ============================================================================= class ApprovalRequestResponse(BaseModel): """授權請求 API 回應""" id: str action: str description: str status: ApprovalStatus risk_level: RiskLevel blast_radius: BlastRadius dry_run_checks: list[DryRunCheck] required_signatures: int current_signatures: int signatures: list[Signature] requested_by: str created_at: datetime expires_at: datetime | None resolved_at: datetime | None # 戰略 B: 告警風暴收斂 fingerprint: str | None = None hit_count: int = 1 last_seen_at: datetime | None = None # Phase 6.5: Incident 關聯 (用於簽核後更新 Incident 狀態) incident_id: str | None = None matched_playbook_id: str | None = None telegram_message_id: int | None = None telegram_chat_id: int | None = None metadata: dict | None = None @classmethod def from_approval(cls, approval: ApprovalRequest) -> "ApprovalRequestResponse": """從 ApprovalRequest 轉換""" return cls( id=str(approval.id), action=approval.action, description=approval.description, status=approval.status, risk_level=approval.risk_level, blast_radius=approval.blast_radius, dry_run_checks=approval.dry_run_checks, required_signatures=approval.required_signatures, current_signatures=approval.current_signatures, signatures=approval.signatures, requested_by=approval.requested_by, created_at=approval.created_at, expires_at=approval.expires_at, resolved_at=approval.resolved_at, # 戰略 B fingerprint=approval.fingerprint, hit_count=approval.hit_count, last_seen_at=approval.last_seen_at, # Phase 6.5 incident_id=approval.incident_id, matched_playbook_id=approval.matched_playbook_id, telegram_message_id=approval.telegram_message_id, telegram_chat_id=approval.telegram_chat_id, metadata=approval.metadata, ) class SignRequest(BaseModel): """簽核請求""" signer_id: str = Field(..., description="簽核者 ID") signer_name: str = Field(..., description="簽核者名稱") comment: str | None = Field(default=None, description="簽核備註") class RejectRequest(BaseModel): """退回請求""" rejector_id: str = Field(..., description="退回者 ID") rejector_name: str = Field(..., description="退回者名稱") reason: str = Field(..., description="退回原因") class SignResponse(BaseModel): """簽核回應""" success: bool message: str approval: ApprovalRequestResponse execution_triggered: bool = Field( default=False, description="是否觸發執行 (當簽核數滿足時)" ) class PendingApprovalsResponse(BaseModel): """待簽核清單回應""" count: int approvals: list[ApprovalRequestResponse] # ============================================================================= # Phase 11: 批次處理 (Bulk Approval) # ============================================================================= class BulkApproveRequest(BaseModel): """ 批次簽核請求 (Phase 11) 安全限制: - CRITICAL 風險禁止批次核准 - DESTRUCTIVE 資料影響禁止批次核准 """ approval_ids: list[str] = Field(..., description="要批次簽核的 Approval ID 列表") signer_id: str = Field(..., description="簽核者 ID") signer_name: str = Field(..., description="簽核者名稱") comment: str | None = Field(default=None, description="批次簽核備註") class BulkApproveResult(BaseModel): """單個批次簽核結果""" approval_id: str success: bool message: str execution_triggered: bool = False class BulkApproveResponse(BaseModel): """批次簽核回應""" total: int = Field(..., description="請求處理的總數") succeeded: int = Field(..., description="成功簽核數") failed: int = Field(..., description="失敗數") skipped: int = Field(..., description="跳過數 (CRITICAL/DESTRUCTIVE)") results: list[BulkApproveResult] = Field(..., description="各個 Approval 的處理結果")