Files
awoooi/apps/api/scripts/demo_multisig.py
OG T 30153496d1 fix(api): 修復全部 lint 錯誤 (ruff --fix)
- Import sorting (I001)
- Unused imports (F401)
- f-string without placeholders (F541)
- Loop variable unused (B007)
- zip() strict parameter (B905)
- Exception chaining (B904)
- collections.abc imports (UP035)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 16:06:20 +08:00

266 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
CISO-101 Multi-Sig Demo Script
==============================
展示 CRITICAL 任務從發起到完成的完整信任鏈生命週期
流程:
1. OpenClaw 發起 CRITICAL 操作 (DROP TABLE)
2. 第一位簽核者簽核 → 仍為 PENDING (1/2)
3. 第二位簽核者簽核 → 轉為 APPROVED → 觸發執行
執行方式:
cd apps/api
source .venv/bin/activate
python scripts/demo_multisig.py
"""
import sys
from datetime import UTC, datetime, timedelta
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.trust_engine import TrustEngine, get_required_signatures
from src.models.approval import (
ApprovalRequestCreate,
ApprovalStatus,
BlastRadius,
DataImpact,
DryRunCheck,
RiskLevel,
)
def print_header(title: str) -> None:
"""Print a formatted header"""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def print_approval_status(approval) -> None:
"""Print approval status summary"""
print(f"""
ID: {approval.id}
Action: {approval.action}
Status: {approval.status.value.upper()}
Risk Level: {approval.risk_level.value.upper()}
Required Sigs: {approval.required_signatures}
Current Sigs: {approval.current_signatures}
Is Fully Signed: {approval.is_fully_signed}
""")
if approval.signatures:
print(" Signatures:")
for sig in approval.signatures:
print(f" - {sig.signer_name} ({sig.signer_id}) at {sig.signed_at.strftime('%H:%M:%S')}")
if sig.comment:
print(f" Comment: {sig.comment}")
def main():
"""Run the Multi-Sig demo"""
print_header("CISO-101 Multi-Sig Trust Engine Demo")
print("""
This demo shows the complete CRITICAL approval lifecycle:
1. OpenClaw initiates a CRITICAL operation (DROP TABLE)
2. First signer signs → Still PENDING (1/2)
3. Second signer signs → APPROVED → Execution triggered
""")
# ==========================================================================
# Step 0: Show signature requirements
# ==========================================================================
print_header("Step 0: Signature Requirements")
print("""
Risk Level Required Signatures
---------- -------------------
LOW 0 (Auto-approve)
MEDIUM 1
CRITICAL 2 (Multi-Sig)
""")
for level in RiskLevel:
req = get_required_signatures(level)
print(f" {level.value.upper():10}{req} signature(s)")
# ==========================================================================
# Step 1: Create CRITICAL approval request
# ==========================================================================
print_header("Step 1: OpenClaw Initiates CRITICAL Operation")
# Track approved requests
approved_requests = []
def on_approved(approval):
approved_requests.append(approval)
print(f"\n 🚀 EXECUTION TRIGGERED: {approval.action}")
def on_rejected(approval):
print(f"\n ❌ REJECTED: {approval.rejection_reason}")
engine = TrustEngine(
on_approved=on_approved,
on_rejected=on_rejected,
)
# Create the CRITICAL request
request = ApprovalRequestCreate(
action="DROP TABLE user_sessions",
description="清除所有用戶 session 以強制重新登入。此操作將影響所有線上用戶。",
risk_level=RiskLevel.CRITICAL,
blast_radius=BlastRadius(
affected_pods=0,
estimated_downtime="0",
related_services=["auth-service", "api-gateway", "user-service"],
data_impact=DataImpact.DESTRUCTIVE,
),
dry_run_checks=[
DryRunCheck(name="RBAC Check", passed=True, message="db-admin"),
DryRunCheck(name="Syntax Check", passed=True),
DryRunCheck(name="Backup Available", passed=False, message="No recent backup!"),
],
requested_by="OpenClaw",
expires_at=datetime.now(UTC) + timedelta(hours=1),
)
approval = engine.create_approval(request)
print(f"""
OpenClaw 發起 CRITICAL 操作請求:
動作: {request.action}
描述: {request.description}
風險等級: {request.risk_level.value.upper()}
資料影響: {request.blast_radius.data_impact.value.upper()}
""")
print_approval_status(approval)
# ==========================================================================
# Step 2: First signer signs
# ==========================================================================
print_header("Step 2: First Signer (Alice) Signs")
approval, message, triggered = engine.sign_approval(
approval_id=approval.id,
signer_id="alice-001",
signer_name="Alice Chen (CTO)",
comment="已確認風險,建議在低流量時段執行",
)
print(f"""
Alice (CTO) 已簽核:
結果: {message}
觸發執行: {triggered}
""")
print_approval_status(approval)
assert approval.status == ApprovalStatus.PENDING, "Should still be PENDING after first signature"
assert approval.current_signatures == 1, "Should have 1 signature"
assert not triggered, "Should not trigger execution yet"
# ==========================================================================
# Step 3: Second signer signs
# ==========================================================================
print_header("Step 3: Second Signer (Bob) Signs - Multi-Sig Complete")
approval, message, triggered = engine.sign_approval(
approval_id=approval.id,
signer_id="bob-002",
signer_name="Bob Wu (CISO)",
comment="CISO 核准。已通知 DBA 團隊待命。",
)
print(f"""
Bob (CISO) 已簽核:
結果: {message}
觸發執行: {triggered}
""")
print_approval_status(approval)
assert approval.status == ApprovalStatus.APPROVED, "Should be APPROVED after second signature"
assert approval.current_signatures == 2, "Should have 2 signatures"
assert approval.is_fully_signed, "Should be fully signed"
assert triggered, "Should trigger execution"
# ==========================================================================
# Step 4: Verify final state
# ==========================================================================
print_header("Step 4: Verification")
pending = engine.get_pending_approvals()
print(f"""
驗證結果:
✅ 待簽核清單數量: {len(pending)} (應為 0)
✅ 已批准請求數量: {len(approved_requests)} (應為 1)
✅ 最終狀態: {approval.status.value.upper()}
✅ 簽核數: {approval.current_signatures}/{approval.required_signatures}
✅ 解決時間: {approval.resolved_at.strftime('%Y-%m-%d %H:%M:%S') if approval.resolved_at else 'N/A'}
""")
# ==========================================================================
# Bonus: Demo LOW risk auto-approval
# ==========================================================================
print_header("Bonus: LOW Risk Auto-Approval Demo")
low_request = ApprovalRequestCreate(
action="Scale deployment api-backend to 5 replicas",
description="增加後端服務副本數以應對流量增長",
risk_level=RiskLevel.LOW,
blast_radius=BlastRadius(
affected_pods=5,
estimated_downtime="0",
related_services=["api-backend"],
data_impact=DataImpact.NONE,
),
dry_run_checks=[
DryRunCheck(name="Resource Check", passed=True, message="5/20 pods"),
],
requested_by="OpenClaw",
)
low_approval = engine.create_approval(low_request)
print(f"""
LOW 風險操作自動放行:
動作: {low_request.action}
風險等級: LOW
狀態: {low_approval.status.value.upper()} (自動批准!)
簽核數: {low_approval.required_signatures} (不需要簽核)
""")
assert low_approval.status == ApprovalStatus.APPROVED, "LOW risk should be auto-approved"
# ==========================================================================
# Summary
# ==========================================================================
print_header("Demo Complete!")
print("""
CISO-101 Multi-Sig Trust Engine 功能驗證完成:
✅ 風險等級分類 (LOW/MEDIUM/CRITICAL)
✅ 簽核數自動判定 (0/1/2)
✅ LOW 風險自動放行
✅ CRITICAL 雙重簽核 (Multi-Sig)
✅ 狀態機正確轉換 (PENDING → APPROVED)
✅ 簽核完成觸發執行回調
信任鏈完整性已驗證。
""")
if __name__ == "__main__":
main()