Files
awoooi/apps/api/src/models/approval.py
OG T 59c3dfb910
Some checks failed
CD Pipeline / build-and-deploy (push) Successful in 12m12s
Type Sync Check / check-type-sync (push) Failing after 52s
fix(models): approval.py 改用 timezone.utc 相容 Python 3.10
CI runner 用 Python 3.10,datetime.UTC 是 3.11 才加入。
改用 datetime.timezone.utc 全版本相容,修復 CI type-sync 全量失敗。

# 2026-04-06 ogt: root cause — CI Python 3.10 無法 import UTC

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 12:19:23 +08:00

307 lines
11 KiB
Python

"""
HITL Approval Models
====================
CISO-101: 授權請求與簽核資料模型
Features:
- 狀態機 (PENDING → APPROVED/REJECTED/EXPIRED)
- 風險等級判定 (LOW/MEDIUM/CRITICAL)
- Multi-Sig 簽核追蹤
- Pydantic 強型別驗證
"""
from datetime import datetime, timezone
from enum import Enum
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
# =============================================================================
# Enums
# =============================================================================
class ApprovalStatus(str, Enum):
"""
授權請求狀態機
PENDING → APPROVED → EXECUTION_SUCCESS
→ EXECUTION_FAILED
PENDING → REJECTED
PENDING → EXPIRED
"""
PENDING = "pending" # 等待簽核
APPROVED = "approved" # 已批准 (滿足簽核數,準備執行)
REJECTED = "rejected" # 已拒絕
EXPIRED = "expired" # 已過期
EXECUTION_SUCCESS = "execution_success" # 執行成功
EXECUTION_FAILED = "execution_failed" # 執行失敗
class RiskLevel(str, Enum):
"""
風險等級 - 決定所需簽核人數
- LOW: 0 人,自動放行
- MEDIUM: 需 1 人簽核
- HIGH: 需 1 人簽核 (信任引擎可降級至 MEDIUM)
- CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (永不降級)
變更紀錄:
- 2026-03-25: 新增 HIGH (Phase 16 R2 合併自 trust_engine.py)
"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high" # Phase 16 R2: 從 trust_engine.py 合併 (2026-03-25)
CRITICAL = "critical"
class DataImpact(str, Enum):
"""資料影響類型"""
NONE = "none"
READ_ONLY = "read_only"
WRITE = "write"
DESTRUCTIVE = "destructive"
# =============================================================================
# Sub-models
# =============================================================================
class BlastRadius(BaseModel):
"""爆炸半徑 - 影響範圍評估"""
affected_pods: int = Field(default=0, ge=0)
estimated_downtime: str = Field(default="0")
related_services: list[str] = Field(default_factory=list)
data_impact: DataImpact = Field(default=DataImpact.NONE)
class DryRunCheck(BaseModel):
"""Dry-Run 預演檢查結果"""
name: str
passed: bool
message: str | None = None
class SignatureSource(str, Enum):
"""
簽核來源通道 (Phase 5.4.5: AuditLog 擴充)
用於追溯簽核是從哪個通道發起
"""
WEB = "web" # Web UI 簽核
TELEGRAM = "telegram" # Telegram 簽核
API = "api" # API 直接呼叫
SYSTEM = "system" # 系統自動 (LOW 風險)
class Signature(BaseModel):
"""
簽核記錄
Phase 5.4.5: 新增 Telegram 審計欄位
- source: 簽核來源通道
- telegram_user_id: Telegram User ID (永久追溯憑證)
- telegram_message_id: Telegram 訊息 ID
"""
id: UUID = Field(default_factory=uuid4)
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
signed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
comment: str | None = None
# Phase 5.4.5: Telegram 審計軌跡
source: SignatureSource = Field(
default=SignatureSource.WEB,
description="簽核來源通道 (web/telegram/api/system)",
)
telegram_user_id: int | None = Field(
default=None,
description="Telegram User ID (永久追溯憑證)",
)
telegram_message_id: int | None = Field(
default=None,
description="Telegram 訊息 ID",
)
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# =============================================================================
# Main Models
# =============================================================================
class ApprovalRequestBase(BaseModel):
"""授權請求基礎模型"""
action: str = Field(..., description="執行動作描述")
description: str = Field(..., description="詳細說明")
risk_level: RiskLevel = Field(..., description="風險等級")
blast_radius: BlastRadius = Field(default_factory=BlastRadius)
dry_run_checks: list[DryRunCheck] = Field(default_factory=list)
requested_by: str = Field(..., description="請求發起者")
expires_at: datetime | None = Field(default=None, description="到期時間")
metadata: dict | None = Field(default=None, description="額外元資料")
class ApprovalRequestCreate(ApprovalRequestBase):
"""建立授權請求 (API 輸入)"""
pass
class ApprovalRequest(ApprovalRequestBase):
"""完整授權請求模型"""
id: UUID = Field(default_factory=uuid4)
status: ApprovalStatus = Field(default=ApprovalStatus.PENDING)
required_signatures: int = Field(..., description="所需簽核數")
signatures: list[Signature] = Field(default_factory=list)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
resolved_at: datetime | None = Field(default=None, description="解決時間")
rejection_reason: str | None = Field(default=None)
# 戰略 B: 告警風暴收斂
fingerprint: str | None = Field(default=None, description="告警指紋 Hash")
hit_count: int = Field(default=1, description="聚合觸發次數")
last_seen_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="最後觸發時間")
# 2026-04-06 ogt: 關聯 Incident — 萃取 Playbook 與 KM 寫入必須知道 incident_id
incident_id: str | None = Field(default=None, description="關聯的 Incident ID")
@property
def current_signatures(self) -> int:
"""目前已收集的簽核數"""
return len(self.signatures)
@property
def is_fully_signed(self) -> bool:
"""是否已滿足所需簽核數"""
return self.current_signatures >= self.required_signatures
@property
def remaining_signatures(self) -> int:
"""還需要的簽核數"""
return max(0, self.required_signatures - self.current_signatures)
def has_signer(self, signer_id: str) -> bool:
"""檢查某人是否已簽核"""
return any(s.signer_id == signer_id for s in self.signatures)
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# =============================================================================
# API Response Models
# =============================================================================
class ApprovalRequestResponse(BaseModel):
"""授權請求 API 回應"""
id: str
action: str
description: str
status: ApprovalStatus
risk_level: RiskLevel
blast_radius: BlastRadius
dry_run_checks: list[DryRunCheck]
required_signatures: int
current_signatures: int
signatures: list[Signature]
requested_by: str
created_at: datetime
expires_at: datetime | None
resolved_at: datetime | None
# 戰略 B: 告警風暴收斂
fingerprint: str | None = None
hit_count: int = 1
last_seen_at: datetime | None = None
# Phase 6.5: Incident 關聯 (用於簽核後更新 Incident 狀態)
metadata: dict | None = None
@classmethod
def from_approval(cls, approval: ApprovalRequest) -> "ApprovalRequestResponse":
"""從 ApprovalRequest 轉換"""
return cls(
id=str(approval.id),
action=approval.action,
description=approval.description,
status=approval.status,
risk_level=approval.risk_level,
blast_radius=approval.blast_radius,
dry_run_checks=approval.dry_run_checks,
required_signatures=approval.required_signatures,
current_signatures=approval.current_signatures,
signatures=approval.signatures,
requested_by=approval.requested_by,
created_at=approval.created_at,
expires_at=approval.expires_at,
resolved_at=approval.resolved_at,
# 戰略 B
fingerprint=approval.fingerprint,
hit_count=approval.hit_count,
last_seen_at=approval.last_seen_at,
# Phase 6.5
metadata=approval.metadata,
)
class SignRequest(BaseModel):
"""簽核請求"""
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
comment: str | None = Field(default=None, description="簽核備註")
class RejectRequest(BaseModel):
"""退回請求"""
rejector_id: str = Field(..., description="退回者 ID")
rejector_name: str = Field(..., description="退回者名稱")
reason: str = Field(..., description="退回原因")
class SignResponse(BaseModel):
"""簽核回應"""
success: bool
message: str
approval: ApprovalRequestResponse
execution_triggered: bool = Field(
default=False,
description="是否觸發執行 (當簽核數滿足時)"
)
class PendingApprovalsResponse(BaseModel):
"""待簽核清單回應"""
count: int
approvals: list[ApprovalRequestResponse]
# =============================================================================
# Phase 11: 批次處理 (Bulk Approval)
# =============================================================================
class BulkApproveRequest(BaseModel):
"""
批次簽核請求 (Phase 11)
安全限制:
- CRITICAL 風險禁止批次核准
- DESTRUCTIVE 資料影響禁止批次核准
"""
approval_ids: list[str] = Field(..., description="要批次簽核的 Approval ID 列表")
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
comment: str | None = Field(default=None, description="批次簽核備註")
class BulkApproveResult(BaseModel):
"""單個批次簽核結果"""
approval_id: str
success: bool
message: str
execution_triggered: bool = False
class BulkApproveResponse(BaseModel):
"""批次簽核回應"""
total: int = Field(..., description="請求處理的總數")
succeeded: int = Field(..., description="成功簽核數")
failed: int = Field(..., description="失敗數")
skipped: int = Field(..., description="跳過數 (CRITICAL/DESTRUCTIVE)")
results: list[BulkApproveResult] = Field(..., description="各個 Approval 的處理結果")