1. proposal_service._load_incident() 改用 incident_service.get_from_working_memory() - brain engine 使用 awoooi:incidents: prefix,資料實際在 incident: prefix - 兩個 prefix 不符導致永遠 404 (Y/n 按鈕全部失敗) - 2026-04-02 ogt 2. trust_engine CRITICAL required_signatures 2→1 - 統帥決策: 所有審核只需 1 層簽核 - 2026-04-02 ogt Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
405 lines
11 KiB
Python
405 lines
11 KiB
Python
"""
|
||
Trust Engine - 風險判定與 Multi-Sig 簽核邏輯
|
||
==========================================
|
||
CISO-101: 信任引擎核心實作
|
||
|
||
風險等級與簽核需求:
|
||
- LOW: 0 人,自動放行 (如 scale up)
|
||
- MEDIUM: 需 1 人簽核 (如 delete pod)
|
||
- CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (如 DROP TABLE)
|
||
|
||
Features:
|
||
- 自動風險分類
|
||
- 簽核數驗證
|
||
- 狀態轉換控制
|
||
"""
|
||
|
||
from collections.abc import Callable
|
||
from datetime import UTC, datetime
|
||
from uuid import UUID
|
||
|
||
from src.models.approval import (
|
||
ApprovalRequest,
|
||
ApprovalRequestCreate,
|
||
ApprovalStatus,
|
||
BlastRadius,
|
||
DataImpact,
|
||
RiskLevel,
|
||
Signature,
|
||
)
|
||
|
||
# =============================================================================
|
||
# Risk Classification Rules
|
||
# =============================================================================
|
||
|
||
# 危險關鍵字 - 用於動作分類
|
||
CRITICAL_KEYWORDS = [
|
||
"drop",
|
||
"delete database",
|
||
"truncate",
|
||
"rm -rf",
|
||
"destroy",
|
||
"format",
|
||
"wipe",
|
||
"purge all",
|
||
]
|
||
|
||
MEDIUM_KEYWORDS = [
|
||
"delete",
|
||
"remove",
|
||
"stop",
|
||
"restart",
|
||
"rollback",
|
||
"downgrade",
|
||
"migrate",
|
||
]
|
||
|
||
LOW_KEYWORDS = [
|
||
"scale",
|
||
"update config",
|
||
"patch",
|
||
"upgrade",
|
||
"add",
|
||
"create",
|
||
]
|
||
|
||
|
||
# =============================================================================
|
||
# Signature Requirements
|
||
# =============================================================================
|
||
|
||
SIGNATURE_REQUIREMENTS: dict[RiskLevel, int] = {
|
||
RiskLevel.LOW: 0, # 自動放行
|
||
RiskLevel.MEDIUM: 1, # 單人簽核
|
||
RiskLevel.CRITICAL: 1, # 2026-04-02 ogt: 統帥決策 — 只需 1 層審核,Multi-Sig 停用
|
||
}
|
||
|
||
|
||
def get_required_signatures(risk_level: RiskLevel) -> int:
|
||
"""根據風險等級取得所需簽核數"""
|
||
return SIGNATURE_REQUIREMENTS.get(risk_level, 1)
|
||
|
||
|
||
# =============================================================================
|
||
# Risk Classification
|
||
# =============================================================================
|
||
|
||
def classify_risk_by_action(action: str) -> RiskLevel:
|
||
"""
|
||
根據動作描述自動分類風險等級
|
||
|
||
優先順序: CRITICAL > MEDIUM > LOW
|
||
"""
|
||
action_lower = action.lower()
|
||
|
||
# Check CRITICAL first
|
||
for keyword in CRITICAL_KEYWORDS:
|
||
if keyword in action_lower:
|
||
return RiskLevel.CRITICAL
|
||
|
||
# Check MEDIUM
|
||
for keyword in MEDIUM_KEYWORDS:
|
||
if keyword in action_lower:
|
||
return RiskLevel.MEDIUM
|
||
|
||
# Check LOW
|
||
for keyword in LOW_KEYWORDS:
|
||
if keyword in action_lower:
|
||
return RiskLevel.LOW
|
||
|
||
# Default to MEDIUM for unknown actions
|
||
return RiskLevel.MEDIUM
|
||
|
||
|
||
def classify_risk_by_blast_radius(blast_radius: BlastRadius) -> RiskLevel:
|
||
"""
|
||
根據爆炸半徑分類風險等級
|
||
|
||
- DESTRUCTIVE 數據影響 → CRITICAL
|
||
- 影響 > 10 pods 或多於 3 個關聯服務 → CRITICAL
|
||
- 影響 > 3 pods 或有停機時間 → MEDIUM
|
||
- 其他 → LOW
|
||
"""
|
||
# DESTRUCTIVE 資料影響直接升級為 CRITICAL
|
||
if blast_radius.data_impact == DataImpact.DESTRUCTIVE:
|
||
return RiskLevel.CRITICAL
|
||
|
||
# WRITE 資料影響至少 MEDIUM
|
||
if blast_radius.data_impact == DataImpact.WRITE:
|
||
if blast_radius.affected_pods > 5 or len(blast_radius.related_services) > 2:
|
||
return RiskLevel.CRITICAL
|
||
return RiskLevel.MEDIUM
|
||
|
||
# 根據影響範圍判定
|
||
if blast_radius.affected_pods > 10:
|
||
return RiskLevel.CRITICAL
|
||
if len(blast_radius.related_services) > 3:
|
||
return RiskLevel.CRITICAL
|
||
|
||
if blast_radius.affected_pods > 3:
|
||
return RiskLevel.MEDIUM
|
||
if blast_radius.estimated_downtime != "0":
|
||
return RiskLevel.MEDIUM
|
||
if len(blast_radius.related_services) > 1:
|
||
return RiskLevel.MEDIUM
|
||
|
||
return RiskLevel.LOW
|
||
|
||
|
||
def classify_risk(
|
||
action: str,
|
||
blast_radius: BlastRadius | None = None,
|
||
explicit_level: RiskLevel | None = None,
|
||
) -> RiskLevel:
|
||
"""
|
||
綜合風險分類 - 取最高風險等級
|
||
|
||
Args:
|
||
action: 動作描述
|
||
blast_radius: 爆炸半徑
|
||
explicit_level: 明確指定的風險等級 (優先)
|
||
|
||
Returns:
|
||
最終風險等級
|
||
"""
|
||
# 如果明確指定,直接使用
|
||
if explicit_level is not None:
|
||
return explicit_level
|
||
|
||
# 從動作分類
|
||
action_risk = classify_risk_by_action(action)
|
||
|
||
# 從爆炸半徑分類
|
||
blast_risk = RiskLevel.LOW
|
||
if blast_radius:
|
||
blast_risk = classify_risk_by_blast_radius(blast_radius)
|
||
|
||
# 取較高風險等級
|
||
risk_order = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.CRITICAL]
|
||
action_idx = risk_order.index(action_risk)
|
||
blast_idx = risk_order.index(blast_risk)
|
||
|
||
return risk_order[max(action_idx, blast_idx)]
|
||
|
||
|
||
# =============================================================================
|
||
# Approval State Machine
|
||
# =============================================================================
|
||
|
||
class TrustEngine:
|
||
"""
|
||
信任引擎 - 管理授權請求生命週期
|
||
|
||
狀態機:
|
||
PENDING → APPROVED (當簽核數滿足)
|
||
PENDING → REJECTED (當被拒絕)
|
||
PENDING → EXPIRED (當過期)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
on_approved: Callable[[ApprovalRequest], None] | None = None,
|
||
on_rejected: Callable[[ApprovalRequest], None] | None = None,
|
||
):
|
||
"""
|
||
初始化信任引擎
|
||
|
||
Args:
|
||
on_approved: 當請求被批准時的回調
|
||
on_rejected: 當請求被拒絕時的回調
|
||
"""
|
||
self._approvals: dict[UUID, ApprovalRequest] = {}
|
||
self._on_approved = on_approved
|
||
self._on_rejected = on_rejected
|
||
|
||
def create_approval(
|
||
self,
|
||
request: ApprovalRequestCreate,
|
||
) -> ApprovalRequest:
|
||
"""
|
||
建立新的授權請求
|
||
|
||
自動根據風險等級設定所需簽核數
|
||
LOW 風險自動批准
|
||
"""
|
||
# 分類風險
|
||
risk_level = classify_risk(
|
||
action=request.action,
|
||
blast_radius=request.blast_radius,
|
||
explicit_level=request.risk_level,
|
||
)
|
||
|
||
# 取得所需簽核數
|
||
required_sigs = get_required_signatures(risk_level)
|
||
|
||
# 建立完整請求
|
||
approval = ApprovalRequest(
|
||
action=request.action,
|
||
description=request.description,
|
||
risk_level=risk_level,
|
||
blast_radius=request.blast_radius,
|
||
dry_run_checks=request.dry_run_checks,
|
||
requested_by=request.requested_by,
|
||
expires_at=request.expires_at,
|
||
metadata=request.metadata,
|
||
required_signatures=required_sigs,
|
||
)
|
||
|
||
# LOW 風險自動批准
|
||
if risk_level == RiskLevel.LOW:
|
||
approval.status = ApprovalStatus.APPROVED
|
||
approval.resolved_at = datetime.now(UTC)
|
||
if self._on_approved:
|
||
self._on_approved(approval)
|
||
|
||
# 儲存
|
||
self._approvals[approval.id] = approval
|
||
return approval
|
||
|
||
def get_approval(self, approval_id: UUID) -> ApprovalRequest | None:
|
||
"""取得授權請求"""
|
||
return self._approvals.get(approval_id)
|
||
|
||
def get_pending_approvals(self) -> list[ApprovalRequest]:
|
||
"""取得所有待簽核請求"""
|
||
now = datetime.now(UTC)
|
||
pending = []
|
||
|
||
for approval in self._approvals.values():
|
||
# 檢查是否過期
|
||
if approval.status == ApprovalStatus.PENDING:
|
||
if approval.expires_at and approval.expires_at < now:
|
||
approval.status = ApprovalStatus.EXPIRED
|
||
approval.resolved_at = now
|
||
else:
|
||
pending.append(approval)
|
||
|
||
# 按建立時間排序 (最新優先)
|
||
pending.sort(key=lambda x: x.created_at, reverse=True)
|
||
return pending
|
||
|
||
def sign_approval(
|
||
self,
|
||
approval_id: UUID,
|
||
signer_id: str,
|
||
signer_name: str,
|
||
comment: str | None = None,
|
||
) -> tuple[ApprovalRequest | None, str, bool]:
|
||
"""
|
||
簽核授權請求
|
||
|
||
Returns:
|
||
(approval, message, execution_triggered)
|
||
- approval: 更新後的請求 (None 表示失敗)
|
||
- message: 結果訊息
|
||
- execution_triggered: 是否觸發執行
|
||
"""
|
||
approval = self._approvals.get(approval_id)
|
||
|
||
if not approval:
|
||
return None, "Approval not found", False
|
||
|
||
if approval.status != ApprovalStatus.PENDING:
|
||
return approval, f"Cannot sign: status is {approval.status.value}", False
|
||
|
||
# 檢查是否已簽核
|
||
if approval.has_signer(signer_id):
|
||
return approval, f"Signer {signer_id} has already signed", False
|
||
|
||
# 新增簽核
|
||
signature = Signature(
|
||
signer_id=signer_id,
|
||
signer_name=signer_name,
|
||
comment=comment,
|
||
)
|
||
approval.signatures.append(signature)
|
||
approval.updated_at = datetime.now(UTC)
|
||
|
||
# 檢查是否滿足簽核數
|
||
_execution_triggered = False
|
||
if approval.is_fully_signed:
|
||
approval.status = ApprovalStatus.APPROVED
|
||
approval.resolved_at = datetime.now(UTC)
|
||
_execution_triggered = True
|
||
|
||
if self._on_approved:
|
||
self._on_approved(approval)
|
||
|
||
return approval, "Approval completed - execution triggered", True
|
||
|
||
remaining = approval.remaining_signatures
|
||
return approval, f"Signed. {remaining} more signature(s) required", False
|
||
|
||
def reject_approval(
|
||
self,
|
||
approval_id: UUID,
|
||
rejector_id: str,
|
||
rejector_name: str,
|
||
reason: str,
|
||
) -> tuple[ApprovalRequest | None, str]:
|
||
"""
|
||
拒絕授權請求
|
||
|
||
Returns:
|
||
(approval, message)
|
||
"""
|
||
approval = self._approvals.get(approval_id)
|
||
|
||
if not approval:
|
||
return None, "Approval not found"
|
||
|
||
if approval.status != ApprovalStatus.PENDING:
|
||
return approval, f"Cannot reject: status is {approval.status.value}"
|
||
|
||
# 更新狀態
|
||
approval.status = ApprovalStatus.REJECTED
|
||
approval.rejection_reason = f"[{rejector_name}] {reason}"
|
||
approval.resolved_at = datetime.now(UTC)
|
||
approval.updated_at = datetime.now(UTC)
|
||
|
||
if self._on_rejected:
|
||
self._on_rejected(approval)
|
||
|
||
return approval, "Approval rejected"
|
||
|
||
def expire_stale_approvals(self) -> list[ApprovalRequest]:
|
||
"""
|
||
過期所有超時的待簽核請求
|
||
|
||
Returns:
|
||
已過期的請求列表
|
||
"""
|
||
now = datetime.now(UTC)
|
||
expired = []
|
||
|
||
for approval in self._approvals.values():
|
||
if approval.status == ApprovalStatus.PENDING:
|
||
if approval.expires_at and approval.expires_at < now:
|
||
approval.status = ApprovalStatus.EXPIRED
|
||
approval.resolved_at = now
|
||
approval.updated_at = now
|
||
expired.append(approval)
|
||
|
||
return expired
|
||
|
||
|
||
# =============================================================================
|
||
# Singleton Instance
|
||
# =============================================================================
|
||
|
||
_trust_engine: TrustEngine | None = None
|
||
|
||
|
||
def get_trust_engine() -> TrustEngine:
|
||
"""取得全域信任引擎實例"""
|
||
global _trust_engine
|
||
if _trust_engine is None:
|
||
_trust_engine = TrustEngine()
|
||
return _trust_engine
|
||
|
||
|
||
def reset_trust_engine() -> None:
|
||
"""重置信任引擎 (僅供測試使用)"""
|
||
global _trust_engine
|
||
_trust_engine = None
|