- Import sorting (I001) - Unused imports (F401) - f-string without placeholders (F541) - Loop variable unused (B007) - zip() strict parameter (B905) - Exception chaining (B904) - collections.abc imports (UP035) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
515 lines
18 KiB
Python
515 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
E2E Tool Call Verification Script v2.0
|
||
======================================
|
||
端到端驗證:Alert → AI → Approval → Execution
|
||
|
||
Phase 18.2 優化版:
|
||
1. 目標資源斷言 - 確保 AI 沒殺錯人
|
||
2. 動態簽署數 - 根據風險等級自動簽核
|
||
3. Safe Label 防護 - 防止誤操作
|
||
|
||
執行方式:
|
||
cd apps/api
|
||
python -m scripts.e2e_tool_call_verification
|
||
|
||
# Dry-run 模式 (不執行,只驗證流程)
|
||
python -m scripts.e2e_tool_call_verification --dry-run
|
||
|
||
# 指定 API URL
|
||
python -m scripts.e2e_tool_call_verification --api-url http://192.168.0.120:32334
|
||
|
||
# 完整執行 (包括實際審核)
|
||
python -m scripts.e2e_tool_call_verification --no-dry-run
|
||
|
||
Author: Claude Code (首席架構師)
|
||
Date: 2026-03-26
|
||
Version: 2.0 (Phase 18.2 優化)
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import re
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
||
|
||
import httpx
|
||
|
||
# =============================================================================
|
||
# Config
|
||
# =============================================================================
|
||
|
||
DEFAULT_API_URL = "http://localhost:8000"
|
||
TIMEOUT = 60.0
|
||
|
||
# E2E Signer Pool (用於動態簽署)
|
||
SIGNER_POOL = [
|
||
{"id": "e2e-signer-alpha", "name": "E2E Bot Alpha"},
|
||
{"id": "e2e-signer-beta", "name": "E2E Bot Beta"},
|
||
]
|
||
|
||
# 測試用 Alert (含 Safe Label)
|
||
TEST_ALERT = {
|
||
"alert_type": "high_cpu",
|
||
"severity": "warning", # warning = 1 簽名
|
||
"source": "e2e-verification-script",
|
||
"target_resource": "awoooi-api", # 使用真實存在的資源
|
||
"namespace": "awoooi-prod",
|
||
"message": "[E2E Test] API Pod CPU at 85% - verification test",
|
||
"metrics": {
|
||
"cpu_percent": 85,
|
||
"memory_percent": 60,
|
||
"sigma_deviation": 2.5,
|
||
},
|
||
"labels": {
|
||
"app": "awoooi-api",
|
||
"team": "sre",
|
||
"env": "e2e-test", # Safe Label - 識別測試流量
|
||
"safe_mode": "true", # Safe Label - Executor 看到會跳過真實執行
|
||
},
|
||
}
|
||
|
||
# Critical 測試用 Alert (需 2 簽名)
|
||
CRITICAL_ALERT = {
|
||
**TEST_ALERT,
|
||
"severity": "critical",
|
||
"message": "[E2E Test] CRITICAL - verification test",
|
||
}
|
||
|
||
|
||
# =============================================================================
|
||
# Terminal Output Helpers
|
||
# =============================================================================
|
||
|
||
class Colors:
|
||
HEADER = '\033[95m'
|
||
BLUE = '\033[94m'
|
||
CYAN = '\033[96m'
|
||
GREEN = '\033[92m'
|
||
YELLOW = '\033[93m'
|
||
RED = '\033[91m'
|
||
ENDC = '\033[0m'
|
||
BOLD = '\033[1m'
|
||
DIM = '\033[2m'
|
||
|
||
|
||
def print_banner():
|
||
banner = f"""
|
||
{Colors.CYAN}{Colors.BOLD}
|
||
╔═══════════════════════════════════════════════════════════════╗
|
||
║ E2E Tool Call Verification v2.0 ║
|
||
║ Alert → AI → Approval → Execution ║
|
||
║ Phase 18.2: 目標驗證 + 動態簽署 + Safe Label ║
|
||
╚═══════════════════════════════════════════════════════════════╝
|
||
{Colors.ENDC}"""
|
||
print(banner)
|
||
|
||
|
||
def print_step(step: int, total: int, title: str):
|
||
print(f"\n{Colors.BLUE}{Colors.BOLD}[{step}/{total}] {title}{Colors.ENDC}")
|
||
print(f"{Colors.DIM}{'─' * 60}{Colors.ENDC}")
|
||
|
||
|
||
def print_success(msg: str):
|
||
print(f" {Colors.GREEN}✓ {msg}{Colors.ENDC}")
|
||
|
||
|
||
def print_fail(msg: str):
|
||
print(f" {Colors.RED}✗ {msg}{Colors.ENDC}")
|
||
|
||
|
||
def print_warn(msg: str):
|
||
print(f" {Colors.YELLOW}⚠ {msg}{Colors.ENDC}")
|
||
|
||
|
||
def print_info(key: str, value: Any):
|
||
print(f" {Colors.CYAN}{key}:{Colors.ENDC} {value}")
|
||
|
||
|
||
# =============================================================================
|
||
# Target Verification (Phase 18.2.1)
|
||
# =============================================================================
|
||
|
||
def verify_action_target(action: str, expected_target: str) -> tuple[bool, str]:
|
||
"""
|
||
驗證 AI 產生的 action 是否包含正確的目標資源
|
||
|
||
Phase 18.2.1: 確保 AI 沒殺錯人
|
||
|
||
Args:
|
||
action: AI 產生的動作/指令
|
||
expected_target: 預期的目標資源名稱
|
||
|
||
Returns:
|
||
(is_valid, actual_target)
|
||
"""
|
||
if not action:
|
||
return False, ""
|
||
|
||
# 嘗試從 action 中提取 deployment/pod 名稱
|
||
patterns = [
|
||
r'deployment[/\s]+([a-z0-9-]+)', # deployment/xxx 或 deployment xxx
|
||
r'pod[/\s]+([a-z0-9-]+)',
|
||
r'--replicas.*deployment[/\s]+([a-z0-9-]+)',
|
||
r'scale\s+deployment[/\s]+([a-z0-9-]+)',
|
||
]
|
||
|
||
for pattern in patterns:
|
||
match = re.search(pattern, action.lower())
|
||
if match:
|
||
actual_target = match.group(1)
|
||
# 模糊匹配 - 目標名稱應該包含在內
|
||
if expected_target.lower() in actual_target or actual_target in expected_target.lower():
|
||
return True, actual_target
|
||
else:
|
||
return False, actual_target
|
||
|
||
# 沒找到資源名稱,檢查是否是非 K8s 操作
|
||
if "kubectl" not in action.lower():
|
||
return True, "(non-k8s action)"
|
||
|
||
return False, "(not found)"
|
||
|
||
|
||
# =============================================================================
|
||
# E2E Verification Class
|
||
# =============================================================================
|
||
|
||
class E2EVerification:
|
||
"""端到端驗證器 v2.0"""
|
||
|
||
def __init__(self, api_url: str, dry_run: bool = False, use_critical: bool = False):
|
||
self.api_url = api_url.rstrip("/")
|
||
self.dry_run = dry_run
|
||
self.use_critical = use_critical
|
||
self.test_alert = CRITICAL_ALERT if use_critical else TEST_ALERT
|
||
self.approval_id: str | None = None
|
||
self.approval_data: dict | None = None
|
||
self.results: dict[str, bool] = {}
|
||
|
||
async def step1_fire_alert(self) -> bool:
|
||
"""Step 1: 發射測試 Alert (含 Safe Label)"""
|
||
print_step(1, 5, "發射測試 Alert (含 Safe Label)")
|
||
|
||
print_info("Safe Labels", "env=e2e-test, safe_mode=true")
|
||
print_info("Target", self.test_alert["target_resource"])
|
||
print_info("Severity", self.test_alert["severity"])
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
||
response = await client.post(
|
||
f"{self.api_url}/api/v1/webhooks/alerts",
|
||
json=self.test_alert,
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
|
||
if response.status_code == 401:
|
||
print_warn("HMAC 驗證啟用中 - 生產環境需要簽名")
|
||
print_info("提示", "請在測試環境執行,或配置 HMAC Secret")
|
||
return False
|
||
|
||
if response.status_code != 200:
|
||
print_fail(f"Webhook 返回 {response.status_code}")
|
||
return False
|
||
|
||
data = response.json()
|
||
self.approval_id = data.get("approval_id")
|
||
|
||
if not self.approval_id:
|
||
print_fail("未獲得 Approval ID")
|
||
return False
|
||
|
||
print_success("Alert 發射成功")
|
||
print_info("Approval ID", self.approval_id)
|
||
print_info("Risk Level", data.get("risk_level", "N/A"))
|
||
return True
|
||
|
||
except httpx.ConnectError:
|
||
print_fail(f"無法連接 API: {self.api_url}")
|
||
return False
|
||
except Exception as e:
|
||
print_fail(f"發生錯誤: {e}")
|
||
return False
|
||
|
||
async def step2_verify_ai_analysis(self) -> bool:
|
||
"""Step 2: 驗證 AI 分析結果 + 目標資源斷言"""
|
||
print_step(2, 5, "驗證 AI 分析結果 + 目標資源斷言")
|
||
|
||
if not self.approval_id:
|
||
print_fail("沒有 Approval ID,跳過")
|
||
return False
|
||
|
||
try:
|
||
max_attempts = 10
|
||
for attempt in range(max_attempts):
|
||
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
||
response = await client.get(
|
||
f"{self.api_url}/api/v1/approvals/{self.approval_id}",
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
print_warn(f"Attempt {attempt + 1}: API 返回 {response.status_code}")
|
||
await asyncio.sleep(2)
|
||
continue
|
||
|
||
data = response.json()
|
||
self.approval_data = data
|
||
|
||
action = data.get("action", "")
|
||
status = data.get("status", "")
|
||
|
||
print_info("Status", status)
|
||
print_info("Action", action[:80] if action else "N/A")
|
||
|
||
# Phase 18.2.1: 目標資源斷言
|
||
expected_target = self.test_alert["target_resource"]
|
||
is_valid, actual_target = verify_action_target(action, expected_target)
|
||
|
||
print_info("Expected Target", expected_target)
|
||
print_info("Actual Target", actual_target)
|
||
|
||
if is_valid:
|
||
print_success("目標資源驗證通過 - AI 沒殺錯人")
|
||
return True
|
||
elif status == "pending" and action:
|
||
print_warn("目標資源不匹配,可能需要檢查")
|
||
print_info("警告", f"Expected: {expected_target}, Got: {actual_target}")
|
||
return True # 不算完全失敗
|
||
else:
|
||
print_warn(f"等待 AI 分析... ({attempt + 1}/{max_attempts})")
|
||
await asyncio.sleep(3)
|
||
|
||
print_fail("AI 分析超時")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print_fail(f"驗證失敗: {e}")
|
||
return False
|
||
|
||
async def step3_verify_approval_in_redis(self) -> bool:
|
||
"""Step 3: 驗證 Approval 存入 Redis"""
|
||
print_step(3, 5, "驗證 Approval 存入 Redis")
|
||
|
||
if not self.approval_id:
|
||
print_fail("沒有 Approval ID,跳過")
|
||
return False
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
||
response = await client.get(
|
||
f"{self.api_url}/api/v1/approvals/pending",
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
print_fail(f"API 返回 {response.status_code}")
|
||
return False
|
||
|
||
data = response.json()
|
||
approvals = data.get("approvals", [])
|
||
|
||
print_info("Pending 數量", len(approvals))
|
||
|
||
found = any(a.get("id") == self.approval_id for a in approvals)
|
||
|
||
if found:
|
||
print_success("Approval 在 pending 列表中")
|
||
return True
|
||
else:
|
||
print_warn("Approval 不在 pending 列表 (可能已處理)")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print_fail(f"驗證失敗: {e}")
|
||
return False
|
||
|
||
async def step4_dynamic_approval(self) -> bool:
|
||
"""Step 4: 動態簽署 (根據風險等級)"""
|
||
print_step(4, 5, "動態簽署 (根據風險等級)")
|
||
|
||
if not self.approval_id or not self.approval_data:
|
||
print_fail("沒有 Approval 資料,跳過")
|
||
return False
|
||
|
||
if self.dry_run:
|
||
print_warn("Dry-run 模式:跳過實際審核")
|
||
return True
|
||
|
||
try:
|
||
required = self.approval_data.get("required_signatures", 1)
|
||
current = len(self.approval_data.get("signatures", []))
|
||
remaining = required - current
|
||
|
||
print_info("Required Signatures", required)
|
||
print_info("Current Signatures", current)
|
||
print_info("Remaining", remaining)
|
||
|
||
if remaining <= 0:
|
||
print_success("已有足夠簽名")
|
||
return True
|
||
|
||
# Phase 18.2.2: 動態簽署
|
||
for i in range(min(remaining, len(SIGNER_POOL))):
|
||
signer = SIGNER_POOL[i]
|
||
print_info("Signing with", signer["name"])
|
||
|
||
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
||
response = await client.post(
|
||
f"{self.api_url}/api/v1/approvals/{self.approval_id}/approve",
|
||
json={
|
||
"signer_name": signer["name"],
|
||
"comment": f"E2E auto-sign by {signer['id']}",
|
||
},
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
print_success(f"簽名成功: {signer['name']}")
|
||
else:
|
||
print_warn(f"簽名失敗: {response.status_code}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print_fail(f"簽署失敗: {e}")
|
||
return False
|
||
|
||
async def step5_verify_execution(self) -> bool:
|
||
"""Step 5: 驗證執行結果"""
|
||
print_step(5, 5, "驗證執行結果 (Safe Mode)")
|
||
|
||
if not self.approval_id:
|
||
print_fail("沒有 Approval ID,跳過")
|
||
return False
|
||
|
||
if self.dry_run:
|
||
print_warn("Dry-run 模式:跳過執行驗證")
|
||
return True
|
||
|
||
try:
|
||
await asyncio.sleep(5)
|
||
|
||
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
|
||
response = await client.get(
|
||
f"{self.api_url}/api/v1/approvals/{self.approval_id}",
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
print_fail(f"API 返回 {response.status_code}")
|
||
return False
|
||
|
||
data = response.json()
|
||
status = data.get("status", "")
|
||
executed = data.get("executed", False)
|
||
|
||
print_info("Status", status)
|
||
print_info("Executed", executed)
|
||
|
||
# 檢查 Safe Mode 是否生效
|
||
labels = self.test_alert.get("labels", {})
|
||
if labels.get("safe_mode") == "true":
|
||
print_success("Safe Mode 啟用 - 實際 K8s 操作已跳過")
|
||
|
||
timeline = data.get("timeline", [])
|
||
exec_events = [e for e in timeline if e.get("event_type") == "exec"]
|
||
if exec_events:
|
||
print_success(f"找到 {len(exec_events)} 個執行事件")
|
||
for evt in exec_events[-2:]:
|
||
print_info("Event", f"{evt.get('title')} - {evt.get('status')}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print_fail(f"驗證失敗: {e}")
|
||
return False
|
||
|
||
async def run(self) -> bool:
|
||
"""執行完整驗證"""
|
||
print_banner()
|
||
print(f"{Colors.DIM}API URL: {self.api_url}{Colors.ENDC}")
|
||
print(f"{Colors.DIM}Dry-run: {self.dry_run}{Colors.ENDC}")
|
||
print(f"{Colors.DIM}Critical Mode: {self.use_critical}{Colors.ENDC}")
|
||
print(f"{Colors.DIM}Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.ENDC}")
|
||
|
||
start_time = time.time()
|
||
|
||
self.results["step1_fire_alert"] = await self.step1_fire_alert()
|
||
self.results["step2_verify_ai"] = await self.step2_verify_ai_analysis()
|
||
self.results["step3_verify_redis"] = await self.step3_verify_approval_in_redis()
|
||
self.results["step4_approve"] = await self.step4_dynamic_approval()
|
||
self.results["step5_verify_exec"] = await self.step5_verify_execution()
|
||
|
||
elapsed = time.time() - start_time
|
||
passed = sum(1 for v in self.results.values() if v)
|
||
total = len(self.results)
|
||
|
||
print(f"\n{Colors.BLUE}{'═' * 60}{Colors.ENDC}")
|
||
print(f"{Colors.BOLD}驗證結果摘要{Colors.ENDC}")
|
||
print(f"{Colors.DIM}{'─' * 60}{Colors.ENDC}")
|
||
|
||
for step, result in self.results.items():
|
||
status = f"{Colors.GREEN}PASS{Colors.ENDC}" if result else f"{Colors.RED}FAIL{Colors.ENDC}"
|
||
print(f" {step}: {status}")
|
||
|
||
print(f"\n{Colors.BOLD}總計: {passed}/{total} 通過{Colors.ENDC}")
|
||
print(f"{Colors.DIM}耗時: {elapsed:.2f} 秒{Colors.ENDC}")
|
||
|
||
if passed == total:
|
||
print(f"\n{Colors.GREEN}{Colors.BOLD}🎉 E2E 驗證全部通過!{Colors.ENDC}")
|
||
print(f"{Colors.GREEN}AI 大腦 → kubectl 指令 → 目標正確 → 執行成功{Colors.ENDC}")
|
||
elif passed >= 3:
|
||
print(f"\n{Colors.YELLOW}{Colors.BOLD}⚠ 部分驗證通過{Colors.ENDC}")
|
||
else:
|
||
print(f"\n{Colors.RED}{Colors.BOLD}❌ 驗證失敗{Colors.ENDC}")
|
||
|
||
return passed == total
|
||
|
||
|
||
# =============================================================================
|
||
# CLI Entry Point
|
||
# =============================================================================
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="E2E Tool Call Verification v2.0",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
範例:
|
||
# Dry-run (預設)
|
||
python -m scripts.e2e_tool_call_verification --dry-run
|
||
|
||
# 生產環境
|
||
python -m scripts.e2e_tool_call_verification --api-url http://192.168.0.120:32334
|
||
|
||
# 完整執行
|
||
python -m scripts.e2e_tool_call_verification --no-dry-run
|
||
|
||
# Critical 風險測試 (需 2 簽名)
|
||
python -m scripts.e2e_tool_call_verification --critical --no-dry-run
|
||
""",
|
||
)
|
||
|
||
parser.add_argument("--api-url", type=str, default=DEFAULT_API_URL)
|
||
parser.add_argument("--dry-run", action="store_true", default=True)
|
||
parser.add_argument("--no-dry-run", action="store_true")
|
||
parser.add_argument("--critical", action="store_true", help="使用 CRITICAL 風險等級測試")
|
||
|
||
args = parser.parse_args()
|
||
dry_run = args.dry_run and not args.no_dry_run
|
||
|
||
verifier = E2EVerification(
|
||
api_url=args.api_url,
|
||
dry_run=dry_run,
|
||
use_critical=args.critical,
|
||
)
|
||
|
||
success = asyncio.run(verifier.run())
|
||
sys.exit(0 if success else 1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|