Files
awoooi/apps/api/src/core/trust_engine.py
OG T 4f1c8ae473 fix(ci): Resolve Python and TypeScript lint errors
- Fix 35 Python ruff errors (B904, F841, E722, E741, B007, B008)
- Add eslint config for lewooogo-core package
- Update pyproject.toml to new ruff lint config format
- Relax frontend eslint rules to warnings for unused vars
- Allow console.* for debugging (TODO: unified logger)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-24 09:20:56 +08:00

405 lines
11 KiB
Python

"""
Trust Engine - 風險判定與 Multi-Sig 簽核邏輯
==========================================
CISO-101: 信任引擎核心實作
風險等級與簽核需求:
- LOW: 0 人,自動放行 (如 scale up)
- MEDIUM: 需 1 人簽核 (如 delete pod)
- CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (如 DROP TABLE)
Features:
- 自動風險分類
- 簽核數驗證
- 狀態轉換控制
"""
from collections.abc import Callable
from datetime import UTC, datetime
from uuid import UUID
from src.models.approval import (
ApprovalRequest,
ApprovalRequestCreate,
ApprovalStatus,
BlastRadius,
DataImpact,
RiskLevel,
Signature,
)
# =============================================================================
# Risk Classification Rules
# =============================================================================
# 危險關鍵字 - 用於動作分類
CRITICAL_KEYWORDS = [
"drop",
"delete database",
"truncate",
"rm -rf",
"destroy",
"format",
"wipe",
"purge all",
]
MEDIUM_KEYWORDS = [
"delete",
"remove",
"stop",
"restart",
"rollback",
"downgrade",
"migrate",
]
LOW_KEYWORDS = [
"scale",
"update config",
"patch",
"upgrade",
"add",
"create",
]
# =============================================================================
# Signature Requirements
# =============================================================================
SIGNATURE_REQUIREMENTS: dict[RiskLevel, int] = {
RiskLevel.LOW: 0, # 自動放行
RiskLevel.MEDIUM: 1, # 單人簽核
RiskLevel.CRITICAL: 2, # Multi-Sig 雙重簽核
}
def get_required_signatures(risk_level: RiskLevel) -> int:
"""根據風險等級取得所需簽核數"""
return SIGNATURE_REQUIREMENTS.get(risk_level, 1)
# =============================================================================
# Risk Classification
# =============================================================================
def classify_risk_by_action(action: str) -> RiskLevel:
"""
根據動作描述自動分類風險等級
優先順序: CRITICAL > MEDIUM > LOW
"""
action_lower = action.lower()
# Check CRITICAL first
for keyword in CRITICAL_KEYWORDS:
if keyword in action_lower:
return RiskLevel.CRITICAL
# Check MEDIUM
for keyword in MEDIUM_KEYWORDS:
if keyword in action_lower:
return RiskLevel.MEDIUM
# Check LOW
for keyword in LOW_KEYWORDS:
if keyword in action_lower:
return RiskLevel.LOW
# Default to MEDIUM for unknown actions
return RiskLevel.MEDIUM
def classify_risk_by_blast_radius(blast_radius: BlastRadius) -> RiskLevel:
"""
根據爆炸半徑分類風險等級
- DESTRUCTIVE 數據影響 → CRITICAL
- 影響 > 10 pods 或多於 3 個關聯服務 → CRITICAL
- 影響 > 3 pods 或有停機時間 → MEDIUM
- 其他 → LOW
"""
# DESTRUCTIVE 資料影響直接升級為 CRITICAL
if blast_radius.data_impact == DataImpact.DESTRUCTIVE:
return RiskLevel.CRITICAL
# WRITE 資料影響至少 MEDIUM
if blast_radius.data_impact == DataImpact.WRITE:
if blast_radius.affected_pods > 5 or len(blast_radius.related_services) > 2:
return RiskLevel.CRITICAL
return RiskLevel.MEDIUM
# 根據影響範圍判定
if blast_radius.affected_pods > 10:
return RiskLevel.CRITICAL
if len(blast_radius.related_services) > 3:
return RiskLevel.CRITICAL
if blast_radius.affected_pods > 3:
return RiskLevel.MEDIUM
if blast_radius.estimated_downtime != "0":
return RiskLevel.MEDIUM
if len(blast_radius.related_services) > 1:
return RiskLevel.MEDIUM
return RiskLevel.LOW
def classify_risk(
action: str,
blast_radius: BlastRadius | None = None,
explicit_level: RiskLevel | None = None,
) -> RiskLevel:
"""
綜合風險分類 - 取最高風險等級
Args:
action: 動作描述
blast_radius: 爆炸半徑
explicit_level: 明確指定的風險等級 (優先)
Returns:
最終風險等級
"""
# 如果明確指定,直接使用
if explicit_level is not None:
return explicit_level
# 從動作分類
action_risk = classify_risk_by_action(action)
# 從爆炸半徑分類
blast_risk = RiskLevel.LOW
if blast_radius:
blast_risk = classify_risk_by_blast_radius(blast_radius)
# 取較高風險等級
risk_order = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.CRITICAL]
action_idx = risk_order.index(action_risk)
blast_idx = risk_order.index(blast_risk)
return risk_order[max(action_idx, blast_idx)]
# =============================================================================
# Approval State Machine
# =============================================================================
class TrustEngine:
"""
信任引擎 - 管理授權請求生命週期
狀態機:
PENDING → APPROVED (當簽核數滿足)
PENDING → REJECTED (當被拒絕)
PENDING → EXPIRED (當過期)
"""
def __init__(
self,
on_approved: Callable[[ApprovalRequest], None] | None = None,
on_rejected: Callable[[ApprovalRequest], None] | None = None,
):
"""
初始化信任引擎
Args:
on_approved: 當請求被批准時的回調
on_rejected: 當請求被拒絕時的回調
"""
self._approvals: dict[UUID, ApprovalRequest] = {}
self._on_approved = on_approved
self._on_rejected = on_rejected
def create_approval(
self,
request: ApprovalRequestCreate,
) -> ApprovalRequest:
"""
建立新的授權請求
自動根據風險等級設定所需簽核數
LOW 風險自動批准
"""
# 分類風險
risk_level = classify_risk(
action=request.action,
blast_radius=request.blast_radius,
explicit_level=request.risk_level,
)
# 取得所需簽核數
required_sigs = get_required_signatures(risk_level)
# 建立完整請求
approval = ApprovalRequest(
action=request.action,
description=request.description,
risk_level=risk_level,
blast_radius=request.blast_radius,
dry_run_checks=request.dry_run_checks,
requested_by=request.requested_by,
expires_at=request.expires_at,
metadata=request.metadata,
required_signatures=required_sigs,
)
# LOW 風險自動批准
if risk_level == RiskLevel.LOW:
approval.status = ApprovalStatus.APPROVED
approval.resolved_at = datetime.now(UTC)
if self._on_approved:
self._on_approved(approval)
# 儲存
self._approvals[approval.id] = approval
return approval
def get_approval(self, approval_id: UUID) -> ApprovalRequest | None:
"""取得授權請求"""
return self._approvals.get(approval_id)
def get_pending_approvals(self) -> list[ApprovalRequest]:
"""取得所有待簽核請求"""
now = datetime.now(UTC)
pending = []
for approval in self._approvals.values():
# 檢查是否過期
if approval.status == ApprovalStatus.PENDING:
if approval.expires_at and approval.expires_at < now:
approval.status = ApprovalStatus.EXPIRED
approval.resolved_at = now
else:
pending.append(approval)
# 按建立時間排序 (最新優先)
pending.sort(key=lambda x: x.created_at, reverse=True)
return pending
def sign_approval(
self,
approval_id: UUID,
signer_id: str,
signer_name: str,
comment: str | None = None,
) -> tuple[ApprovalRequest | None, str, bool]:
"""
簽核授權請求
Returns:
(approval, message, execution_triggered)
- approval: 更新後的請求 (None 表示失敗)
- message: 結果訊息
- execution_triggered: 是否觸發執行
"""
approval = self._approvals.get(approval_id)
if not approval:
return None, "Approval not found", False
if approval.status != ApprovalStatus.PENDING:
return approval, f"Cannot sign: status is {approval.status.value}", False
# 檢查是否已簽核
if approval.has_signer(signer_id):
return approval, f"Signer {signer_id} has already signed", False
# 新增簽核
signature = Signature(
signer_id=signer_id,
signer_name=signer_name,
comment=comment,
)
approval.signatures.append(signature)
approval.updated_at = datetime.now(UTC)
# 檢查是否滿足簽核數
_execution_triggered = False
if approval.is_fully_signed:
approval.status = ApprovalStatus.APPROVED
approval.resolved_at = datetime.now(UTC)
_execution_triggered = True
if self._on_approved:
self._on_approved(approval)
return approval, "Approval completed - execution triggered", True
remaining = approval.remaining_signatures
return approval, f"Signed. {remaining} more signature(s) required", False
def reject_approval(
self,
approval_id: UUID,
rejector_id: str,
rejector_name: str,
reason: str,
) -> tuple[ApprovalRequest | None, str]:
"""
拒絕授權請求
Returns:
(approval, message)
"""
approval = self._approvals.get(approval_id)
if not approval:
return None, "Approval not found"
if approval.status != ApprovalStatus.PENDING:
return approval, f"Cannot reject: status is {approval.status.value}"
# 更新狀態
approval.status = ApprovalStatus.REJECTED
approval.rejection_reason = f"[{rejector_name}] {reason}"
approval.resolved_at = datetime.now(UTC)
approval.updated_at = datetime.now(UTC)
if self._on_rejected:
self._on_rejected(approval)
return approval, "Approval rejected"
def expire_stale_approvals(self) -> list[ApprovalRequest]:
"""
過期所有超時的待簽核請求
Returns:
已過期的請求列表
"""
now = datetime.now(UTC)
expired = []
for approval in self._approvals.values():
if approval.status == ApprovalStatus.PENDING:
if approval.expires_at and approval.expires_at < now:
approval.status = ApprovalStatus.EXPIRED
approval.resolved_at = now
approval.updated_at = now
expired.append(approval)
return expired
# =============================================================================
# Singleton Instance
# =============================================================================
_trust_engine: TrustEngine | None = None
def get_trust_engine() -> TrustEngine:
"""取得全域信任引擎實例"""
global _trust_engine
if _trust_engine is None:
_trust_engine = TrustEngine()
return _trust_engine
def reset_trust_engine() -> None:
"""重置信任引擎 (僅供測試使用)"""
global _trust_engine
_trust_engine = None