Files
awoooi/apps/api/src/services/approval.py
OG T 196d269b92 feat: add all application source code
- apps/api: FastAPI backend with Dockerfile
- apps/web: Next.js frontend with Dockerfile
- apps/sensor: Signal collection agent
- packages: shared packages

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-22 18:57:44 +08:00

391 lines
12 KiB
Python

"""
Multi-Sig 多重簽核引擎
Phase 2.3: HITL 風險分級審批機制
風險矩陣:
- low: 自動執行,不需人類
- medium: 需要 1 位 admin 或 devops
- high: 需要 2 位管理員
- critical: 必須有 2 人,且其中 1 人必須是 cto 或 ciso
TOCTOU 防護:
- 簽章收集完畢後,執行前強制重新 Dry-Run
- 若 Dry-Run 失敗,清空簽章並拋出例外
"""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Literal
from uuid import UUID
from .dry_run import dry_run_engine, DryRunResult
# ==================== Types ====================
class UserRole(str, Enum):
"""使用者角色"""
VIEWER = "viewer"
DEVELOPER = "developer"
DEVOPS = "devops"
ADMIN = "admin"
CTO = "cto"
CISO = "ciso"
CEO = "ceo"
class ApprovalStatus(str, Enum):
"""審批狀態"""
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
EXPIRED = "expired"
VOIDED = "voided" # TOCTOU 衝突 (保留歷史,符合資安稽核)
@dataclass
class Signature:
"""簽章記錄"""
user_id: str
user_role: UserRole
signed_at: datetime
comment: str | None = None
@dataclass
class ApprovalState:
"""審批狀態 (In-Memory)"""
approval_id: UUID
operation: str
parameters: dict
risk_level: Literal["low", "medium", "high", "critical"]
status: ApprovalStatus = ApprovalStatus.PENDING
signatures: list[Signature] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
last_dry_run: DryRunResult | None = None
executed_at: datetime | None = None
# ==================== Exceptions ====================
class ApprovalError(Exception):
"""審批錯誤基類"""
pass
class InsufficientPermissionError(ApprovalError):
"""權限不足"""
def __init__(self, role: str, required_roles: list[str]):
self.role = role
self.required_roles = required_roles
super().__init__(
f"Role '{role}' cannot sign. Required: {required_roles}"
)
class DuplicateSignatureError(ApprovalError):
"""重複簽章"""
def __init__(self, user_id: str):
self.user_id = user_id
super().__init__(f"User '{user_id}' has already signed")
class TOCTOUConflictError(ApprovalError):
"""
TOCTOU (Time-of-Check to Time-of-Use) 衝突
當簽章收集完畢,準備執行前重新 Dry-Run 發現狀態已改變
"""
def __init__(self, reason: str, failed_checks: list[str]):
self.reason = reason
self.failed_checks = failed_checks
super().__init__(
f"TOCTOU Conflict: {reason}. Failed checks: {failed_checks}"
)
class ApprovalNotFoundError(ApprovalError):
"""找不到審批項目"""
pass
class ApprovalAlreadyDecidedError(ApprovalError):
"""審批已決定"""
pass
# ==================== Risk Matrix ====================
@dataclass
class SignatureRequirement:
"""簽章需求"""
min_signatures: int
allowed_roles: list[UserRole]
required_roles: list[UserRole] # 至少需要其中一個角色
# 風險矩陣配置
RISK_MATRIX: dict[str, SignatureRequirement] = {
"low": SignatureRequirement(
min_signatures=0, # 自動執行
allowed_roles=[],
required_roles=[],
),
"medium": SignatureRequirement(
min_signatures=1,
allowed_roles=[UserRole.ADMIN, UserRole.DEVOPS, UserRole.CTO, UserRole.CISO, UserRole.CEO],
required_roles=[], # 任一 allowed_role 即可
),
"high": SignatureRequirement(
min_signatures=2,
allowed_roles=[UserRole.ADMIN, UserRole.DEVOPS, UserRole.CTO, UserRole.CISO, UserRole.CEO],
required_roles=[], # 任二 allowed_roles 即可
),
"critical": SignatureRequirement(
min_signatures=2,
allowed_roles=[UserRole.ADMIN, UserRole.CTO, UserRole.CISO, UserRole.CEO],
required_roles=[UserRole.CTO, UserRole.CISO], # 至少需要 CTO 或 CISO 其中一人
),
}
# ==================== Multi-Sig Engine ====================
class MultiSigEngine:
"""
多重簽核引擎
負責:
1. 驗證簽章權限
2. 收集簽章
3. 判斷是否達到閾值
4. TOCTOU 防護 (執行前重新 Dry-Run)
"""
def __init__(self):
# In-memory storage (Phase 3+ 換成 Redis/PostgreSQL)
self._approvals: dict[UUID, ApprovalState] = {}
def create_approval(
self,
approval_id: UUID,
operation: str,
parameters: dict,
risk_level: Literal["low", "medium", "high", "critical"],
) -> ApprovalState:
"""建立新的審批項目"""
state = ApprovalState(
approval_id=approval_id,
operation=operation,
parameters=parameters,
risk_level=risk_level,
)
self._approvals[approval_id] = state
# Low risk 自動執行
if risk_level == "low":
state.status = ApprovalStatus.APPROVED
state.executed_at = datetime.utcnow()
return state
def get_approval(self, approval_id: UUID) -> ApprovalState:
"""取得審批狀態"""
if approval_id not in self._approvals:
raise ApprovalNotFoundError(f"Approval {approval_id} not found")
return self._approvals[approval_id]
def approve_request(
self,
approval_id: UUID,
user_id: str,
user_role: str | UserRole,
comment: str | None = None,
) -> ApprovalState:
"""
提交簽章
Args:
approval_id: 審批項目 ID
user_id: 使用者 ID
user_role: 使用者角色
comment: 簽章備註
Returns:
更新後的 ApprovalState
Raises:
ApprovalNotFoundError: 找不到審批項目
ApprovalAlreadyDecidedError: 審批已決定
InsufficientPermissionError: 權限不足
DuplicateSignatureError: 重複簽章
TOCTOUConflictError: TOCTOU 衝突
"""
# 1. 取得審批狀態
state = self.get_approval(approval_id)
# 2. 檢查是否已決定
if state.status != ApprovalStatus.PENDING:
raise ApprovalAlreadyDecidedError(
f"Approval {approval_id} is already {state.status.value}"
)
# 3. 轉換角色
if isinstance(user_role, str):
try:
user_role = UserRole(user_role.lower())
except ValueError:
raise InsufficientPermissionError(
user_role, [r.value for r in RISK_MATRIX[state.risk_level].allowed_roles]
)
# 4. 檢查角色是否有權簽章
requirement = RISK_MATRIX[state.risk_level]
if user_role not in requirement.allowed_roles:
raise InsufficientPermissionError(
user_role.value,
[r.value for r in requirement.allowed_roles],
)
# 5. 檢查重複簽章
if any(sig.user_id == user_id for sig in state.signatures):
raise DuplicateSignatureError(user_id)
# 6. 新增簽章
signature = Signature(
user_id=user_id,
user_role=user_role,
signed_at=datetime.utcnow(),
comment=comment,
)
state.signatures.append(signature)
# 7. 檢查是否達到閾值
if self._check_threshold_met(state, requirement):
# ⚠️ TOCTOU 防護: 執行前強制重新 Dry-Run
self._verify_and_execute(state)
return state
def reject_request(
self,
approval_id: UUID,
user_id: str,
user_role: str | UserRole,
reason: str | None = None,
) -> ApprovalState:
"""拒絕審批"""
state = self.get_approval(approval_id)
if state.status != ApprovalStatus.PENDING:
raise ApprovalAlreadyDecidedError(
f"Approval {approval_id} is already {state.status.value}"
)
state.status = ApprovalStatus.REJECTED
return state
def _check_threshold_met(
self,
state: ApprovalState,
requirement: SignatureRequirement,
) -> bool:
"""檢查簽章是否達到閾值"""
# 檢查數量
if len(state.signatures) < requirement.min_signatures:
return False
# 檢查必要角色 (critical 需要 CTO 或 CISO)
if requirement.required_roles:
has_required = any(
sig.user_role in requirement.required_roles
for sig in state.signatures
)
if not has_required:
return False
return True
def _verify_and_execute(self, state: ApprovalState) -> None:
"""
⚠️ TOCTOU 防護核心邏輯
當簽章收集完畢,準備執行前:
1. 強制重新執行 Dry-Run
2. 如果 Dry-Run 失敗 → 標記 VOIDED (保留簽章歷史) + 拋出例外
3. 如果 Dry-Run 通過 → 更新狀態為 APPROVED
"""
# 1. 重新執行 Dry-Run
dry_run_result = dry_run_engine.evaluate(
operation=state.operation,
parameters=state.parameters,
user_role="cluster-admin", # TODO: 使用實際簽核者角色
)
# 2. 儲存最新 Dry-Run 結果
state.last_dry_run = dry_run_result
# 3. 檢查 Dry-Run 是否通過
if not dry_run_result.overall_passed:
# ❌ TOCTOU 衝突!狀態已改變
failed_checks = [
c.name for c in dry_run_result.checks if not c.passed
]
# ⚠️ 企業級稽核: 保留簽章歷史,僅標記狀態為 VOIDED
# 不使用 clear(),確保所有審批軌跡可追溯
signature_count = len(state.signatures)
state.status = ApprovalStatus.VOIDED
raise TOCTOUConflictError(
reason=f"Dry-Run failed after {signature_count} signatures collected. "
f"Resource state has changed since initial request. "
f"Approval voided - signatures preserved for audit.",
failed_checks=failed_checks,
)
# 4. ✅ Dry-Run 通過,執行操作
state.status = ApprovalStatus.APPROVED
state.executed_at = datetime.utcnow()
# TODO: 實際執行操作 (呼叫 K8s API / Database)
# executor.execute(state.operation, state.parameters)
def get_signature_status(self, approval_id: UUID) -> dict:
"""取得簽章狀態摘要"""
state = self.get_approval(approval_id)
requirement = RISK_MATRIX[state.risk_level]
# 檢查是否有必要角色
has_required_role = (
not requirement.required_roles or
any(sig.user_role in requirement.required_roles for sig in state.signatures)
)
return {
"approval_id": str(state.approval_id),
"risk_level": state.risk_level,
"status": state.status.value,
"current_signatures": len(state.signatures),
"required_signatures": requirement.min_signatures,
"has_required_role": has_required_role,
"required_roles": [r.value for r in requirement.required_roles],
"signers": [
{
"user_id": sig.user_id,
"role": sig.user_role.value,
"signed_at": sig.signed_at.isoformat(),
}
for sig in state.signatures
],
}
# 全域引擎實例
multi_sig_engine = MultiSigEngine()