Files
awoooi/apps/api/src/models/approval.py
Your Name cd17a67774
Some checks failed
CD Pipeline / tests (push) Successful in 1m21s
Code Review / ai-code-review (push) Successful in 13s
Type Sync Check / check-type-sync (push) Failing after 40s
CD Pipeline / build-and-deploy (push) Successful in 5m22s
CD Pipeline / post-deploy-checks (push) Successful in 2m19s
fix(alerts): surface legacy hitl backlog
2026-05-31 13:16:22 +08:00

322 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HITL Approval Models
====================
CISO-101: 授權請求與簽核資料模型
Features:
- 狀態機 (PENDING → APPROVED/REJECTED/EXPIRED)
- 風險等級判定 (LOW/MEDIUM/CRITICAL)
- Multi-Sig 簽核追蹤
- Pydantic 強型別驗證
"""
from datetime import datetime, timezone
from enum import Enum
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
# =============================================================================
# Enums
# =============================================================================
class ApprovalStatus(str, Enum):
"""
授權請求狀態機
PENDING → APPROVED → EXECUTION_SUCCESS
→ EXECUTION_FAILED
PENDING → REJECTED
PENDING → EXPIRED
"""
PENDING = "pending" # 等待簽核
APPROVED = "approved" # 已批准 (滿足簽核數,準備執行)
REJECTED = "rejected" # 已拒絕
EXPIRED = "expired" # 已過期
EXECUTION_SUCCESS = "execution_success" # 執行成功
EXECUTION_FAILED = "execution_failed" # 執行失敗
class RiskLevel(str, Enum):
"""
風險等級 - 決定所需簽核人數
- LOW: 0 人,自動放行
- MEDIUM: 需 1 人簽核
- HIGH: 需 1 人簽核 (信任引擎可降級至 MEDIUM)
- CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (永不降級)
變更紀錄:
- 2026-03-25: 新增 HIGH (Phase 16 R2 合併自 trust_engine.py)
"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high" # Phase 16 R2: 從 trust_engine.py 合併 (2026-03-25)
CRITICAL = "critical"
class DataImpact(str, Enum):
"""資料影響類型"""
NONE = "none"
READ_ONLY = "read_only"
WRITE = "write"
DESTRUCTIVE = "destructive"
# =============================================================================
# Sub-models
# =============================================================================
class BlastRadius(BaseModel):
"""爆炸半徑 - 影響範圍評估"""
affected_pods: int = Field(default=0, ge=0)
estimated_downtime: str = Field(default="0")
related_services: list[str] = Field(default_factory=list)
data_impact: DataImpact = Field(default=DataImpact.NONE)
class DryRunCheck(BaseModel):
"""Dry-Run 預演檢查結果"""
name: str
passed: bool
message: str | None = None
class SignatureSource(str, Enum):
"""
簽核來源通道 (Phase 5.4.5: AuditLog 擴充)
用於追溯簽核是從哪個通道發起
"""
WEB = "web" # Web UI 簽核
TELEGRAM = "telegram" # Telegram 簽核
API = "api" # API 直接呼叫
SYSTEM = "system" # 系統自動 (LOW 風險)
class Signature(BaseModel):
"""
簽核記錄
Phase 5.4.5: 新增 Telegram 審計欄位
- source: 簽核來源通道
- telegram_user_id: Telegram User ID (永久追溯憑證)
- telegram_message_id: Telegram 訊息 ID
"""
id: UUID = Field(default_factory=uuid4)
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
signed_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
comment: str | None = None
# Phase 5.4.5: Telegram 審計軌跡
source: SignatureSource = Field(
default=SignatureSource.WEB,
description="簽核來源通道 (web/telegram/api/system)",
)
telegram_user_id: int | None = Field(
default=None,
description="Telegram User ID (永久追溯憑證)",
)
telegram_message_id: int | None = Field(
default=None,
description="Telegram 訊息 ID",
)
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# =============================================================================
# Main Models
# =============================================================================
class ApprovalRequestBase(BaseModel):
"""授權請求基礎模型"""
action: str = Field(..., description="執行動作描述")
description: str = Field(..., description="詳細說明")
risk_level: RiskLevel = Field(..., description="風險等級")
blast_radius: BlastRadius = Field(default_factory=BlastRadius)
dry_run_checks: list[DryRunCheck] = Field(default_factory=list)
requested_by: str = Field(..., description="請求發起者")
expires_at: datetime | None = Field(default=None, description="到期時間")
metadata: dict | None = Field(default=None, description="額外元資料")
# 2026-04-14 Claude Sonnet 4.6: 上移 incident_id 到 Base
# 讓 ApprovalRequestCreate 也能攜帶(修 9b9ff5b 的 NoneAttr bug
incident_id: str | None = Field(default=None, description="關聯的 Incident ID")
# ADR-083 Phase 3: 命中的 Playbook ID學習迴路必填
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 matched_playbook_id 傳遞修復
matched_playbook_id: str | None = Field(default=None, description="命中的 Playbook ID供學習服務 EWMA 更新")
class ApprovalRequestCreate(ApprovalRequestBase):
"""建立授權請求 (API 輸入)"""
pass
class ApprovalRequest(ApprovalRequestBase):
"""完整授權請求模型"""
id: UUID = Field(default_factory=uuid4)
status: ApprovalStatus = Field(default=ApprovalStatus.PENDING)
required_signatures: int = Field(..., description="所需簽核數")
signatures: list[Signature] = Field(default_factory=list)
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
resolved_at: datetime | None = Field(default=None, description="解決時間")
rejection_reason: str | None = Field(default=None)
# 戰略 B: 告警風暴收斂
fingerprint: str | None = Field(default=None, description="告警指紋 Hash")
hit_count: int = Field(default=1, description="聚合觸發次數")
last_seen_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc), description="最後觸發時間")
telegram_message_id: int | None = Field(default=None, description="Telegram approval card message ID")
telegram_chat_id: int | None = Field(default=None, description="Telegram chat ID for the approval card")
# 2026-04-14 Claude Sonnet 4.6: incident_id 已移至 Base避免 ApprovalRequestCreate 缺欄位)
@property
def current_signatures(self) -> int:
"""目前已收集的簽核數"""
return len(self.signatures)
@property
def is_fully_signed(self) -> bool:
"""是否已滿足所需簽核數"""
return self.current_signatures >= self.required_signatures
@property
def remaining_signatures(self) -> int:
"""還需要的簽核數"""
return max(0, self.required_signatures - self.current_signatures)
def has_signer(self, signer_id: str) -> bool:
"""檢查某人是否已簽核"""
return any(s.signer_id == signer_id for s in self.signatures)
# [首席架構師] 移除 json_encoders (Pydantic v2 已 deprecated),原生序列化輸出格式與 .isoformat() 一致 v1.1 2026-04-01 Asia/Taipei
# =============================================================================
# API Response Models
# =============================================================================
class ApprovalRequestResponse(BaseModel):
"""授權請求 API 回應"""
id: str
action: str
description: str
status: ApprovalStatus
risk_level: RiskLevel
blast_radius: BlastRadius
dry_run_checks: list[DryRunCheck]
required_signatures: int
current_signatures: int
signatures: list[Signature]
requested_by: str
created_at: datetime
expires_at: datetime | None
resolved_at: datetime | None
# 戰略 B: 告警風暴收斂
fingerprint: str | None = None
hit_count: int = 1
last_seen_at: datetime | None = None
# Phase 6.5: Incident 關聯 (用於簽核後更新 Incident 狀態)
incident_id: str | None = None
matched_playbook_id: str | None = None
telegram_message_id: int | None = None
telegram_chat_id: int | None = None
metadata: dict | None = None
@classmethod
def from_approval(cls, approval: ApprovalRequest) -> "ApprovalRequestResponse":
"""從 ApprovalRequest 轉換"""
return cls(
id=str(approval.id),
action=approval.action,
description=approval.description,
status=approval.status,
risk_level=approval.risk_level,
blast_radius=approval.blast_radius,
dry_run_checks=approval.dry_run_checks,
required_signatures=approval.required_signatures,
current_signatures=approval.current_signatures,
signatures=approval.signatures,
requested_by=approval.requested_by,
created_at=approval.created_at,
expires_at=approval.expires_at,
resolved_at=approval.resolved_at,
# 戰略 B
fingerprint=approval.fingerprint,
hit_count=approval.hit_count,
last_seen_at=approval.last_seen_at,
# Phase 6.5
incident_id=approval.incident_id,
matched_playbook_id=approval.matched_playbook_id,
telegram_message_id=approval.telegram_message_id,
telegram_chat_id=approval.telegram_chat_id,
metadata=approval.metadata,
)
class SignRequest(BaseModel):
"""簽核請求"""
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
comment: str | None = Field(default=None, description="簽核備註")
class RejectRequest(BaseModel):
"""退回請求"""
rejector_id: str = Field(..., description="退回者 ID")
rejector_name: str = Field(..., description="退回者名稱")
reason: str = Field(..., description="退回原因")
class SignResponse(BaseModel):
"""簽核回應"""
success: bool
message: str
approval: ApprovalRequestResponse
execution_triggered: bool = Field(
default=False,
description="是否觸發執行 (當簽核數滿足時)"
)
class PendingApprovalsResponse(BaseModel):
"""待簽核清單回應"""
count: int
approvals: list[ApprovalRequestResponse]
# =============================================================================
# Phase 11: 批次處理 (Bulk Approval)
# =============================================================================
class BulkApproveRequest(BaseModel):
"""
批次簽核請求 (Phase 11)
安全限制:
- CRITICAL 風險禁止批次核准
- DESTRUCTIVE 資料影響禁止批次核准
"""
approval_ids: list[str] = Field(..., description="要批次簽核的 Approval ID 列表")
signer_id: str = Field(..., description="簽核者 ID")
signer_name: str = Field(..., description="簽核者名稱")
comment: str | None = Field(default=None, description="批次簽核備註")
class BulkApproveResult(BaseModel):
"""單個批次簽核結果"""
approval_id: str
success: bool
message: str
execution_triggered: bool = False
class BulkApproveResponse(BaseModel):
"""批次簽核回應"""
total: int = Field(..., description="請求處理的總數")
succeeded: int = Field(..., description="成功簽核數")
failed: int = Field(..., description="失敗數")
skipped: int = Field(..., description="跳過數 (CRITICAL/DESTRUCTIVE)")
results: list[BulkApproveResult] = Field(..., description="各個 Approval 的處理結果")