Files
awoooi/apps/api/scripts/e2e_tool_call_verification.py
OG T 6416f56748 fix(e2e): 修正 HMAC Header 名稱 X-Webhook-Signature → X-Signature-256
- API 期望 X-Signature-256,E2E 腳本使用錯誤的 Header 名稱
- 修復後 Daily E2E Health Check 應能通過

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-29 21:16:50 +08:00

543 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
E2E Tool Call Verification Script v2.0
======================================
端到端驗證Alert → AI → Approval → Execution
Phase 18.2 優化版:
1. 目標資源斷言 - 確保 AI 沒殺錯人
2. 動態簽署數 - 根據風險等級自動簽核
3. Safe Label 防護 - 防止誤操作
執行方式:
cd apps/api
python -m scripts.e2e_tool_call_verification
# Dry-run 模式 (不執行,只驗證流程)
python -m scripts.e2e_tool_call_verification --dry-run
# 指定 API URL
python -m scripts.e2e_tool_call_verification --api-url http://192.168.0.120:32334
# 完整執行 (包括實際審核)
python -m scripts.e2e_tool_call_verification --no-dry-run
Author: Claude Code (首席架構師)
Date: 2026-03-26
Version: 2.0 (Phase 18.2 優化)
"""
import argparse
import asyncio
import hashlib
import hmac
import json
import os
import re
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
import httpx
# =============================================================================
# Config
# =============================================================================
DEFAULT_API_URL = "http://localhost:8000"
TIMEOUT = 60.0
HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "")
def compute_hmac_signature(secret: str, payload: bytes) -> str:
"""計算 HMAC-SHA256 簽章"""
signature = hmac.new(
secret.encode(),
payload,
hashlib.sha256,
).hexdigest()
return f"sha256={signature}"
# E2E Signer Pool (用於動態簽署)
SIGNER_POOL = [
{"id": "e2e-signer-alpha", "name": "E2E Bot Alpha"},
{"id": "e2e-signer-beta", "name": "E2E Bot Beta"},
]
# 測試用 Alert (含 Safe Label)
TEST_ALERT = {
"alert_type": "high_cpu",
"severity": "warning", # warning = 1 簽名
"source": "e2e-verification-script",
"target_resource": "awoooi-api", # 使用真實存在的資源
"namespace": "awoooi-prod",
"message": "[E2E Test] API Pod CPU at 85% - verification test",
"metrics": {
"cpu_percent": 85,
"memory_percent": 60,
"sigma_deviation": 2.5,
},
"labels": {
"app": "awoooi-api",
"team": "sre",
"env": "e2e-test", # Safe Label - 識別測試流量
"safe_mode": "true", # Safe Label - Executor 看到會跳過真實執行
},
}
# Critical 測試用 Alert (需 2 簽名)
CRITICAL_ALERT = {
**TEST_ALERT,
"severity": "critical",
"message": "[E2E Test] CRITICAL - verification test",
}
# =============================================================================
# Terminal Output Helpers
# =============================================================================
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
DIM = '\033[2m'
def print_banner():
banner = f"""
{Colors.CYAN}{Colors.BOLD}
╔═══════════════════════════════════════════════════════════════╗
║ E2E Tool Call Verification v2.0 ║
║ Alert → AI → Approval → Execution ║
║ Phase 18.2: 目標驗證 + 動態簽署 + Safe Label ║
╚═══════════════════════════════════════════════════════════════╝
{Colors.ENDC}"""
print(banner)
def print_step(step: int, total: int, title: str):
print(f"\n{Colors.BLUE}{Colors.BOLD}[{step}/{total}] {title}{Colors.ENDC}")
print(f"{Colors.DIM}{'' * 60}{Colors.ENDC}")
def print_success(msg: str):
print(f" {Colors.GREEN}{msg}{Colors.ENDC}")
def print_fail(msg: str):
print(f" {Colors.RED}{msg}{Colors.ENDC}")
def print_warn(msg: str):
print(f" {Colors.YELLOW}{msg}{Colors.ENDC}")
def print_info(key: str, value: Any):
print(f" {Colors.CYAN}{key}:{Colors.ENDC} {value}")
# =============================================================================
# Target Verification (Phase 18.2.1)
# =============================================================================
def verify_action_target(action: str, expected_target: str) -> tuple[bool, str]:
"""
驗證 AI 產生的 action 是否包含正確的目標資源
Phase 18.2.1: 確保 AI 沒殺錯人
Args:
action: AI 產生的動作/指令
expected_target: 預期的目標資源名稱
Returns:
(is_valid, actual_target)
"""
if not action:
return False, ""
# 嘗試從 action 中提取 deployment/pod 名稱
patterns = [
r'deployment[/\s]+([a-z0-9-]+)', # deployment/xxx 或 deployment xxx
r'pod[/\s]+([a-z0-9-]+)',
r'--replicas.*deployment[/\s]+([a-z0-9-]+)',
r'scale\s+deployment[/\s]+([a-z0-9-]+)',
]
for pattern in patterns:
match = re.search(pattern, action.lower())
if match:
actual_target = match.group(1)
# 模糊匹配 - 目標名稱應該包含在內
if expected_target.lower() in actual_target or actual_target in expected_target.lower():
return True, actual_target
else:
return False, actual_target
# 沒找到資源名稱,檢查是否是非 K8s 操作
if "kubectl" not in action.lower():
return True, "(non-k8s action)"
return False, "(not found)"
# =============================================================================
# E2E Verification Class
# =============================================================================
class E2EVerification:
"""端到端驗證器 v2.0"""
def __init__(self, api_url: str, dry_run: bool = False, use_critical: bool = False):
self.api_url = api_url.rstrip("/")
self.dry_run = dry_run
self.use_critical = use_critical
self.test_alert = CRITICAL_ALERT if use_critical else TEST_ALERT
self.approval_id: str | None = None
self.approval_data: dict | None = None
self.results: dict[str, bool] = {}
async def step1_fire_alert(self) -> bool:
"""Step 1: 發射測試 Alert (含 Safe Label)"""
print_step(1, 5, "發射測試 Alert (含 Safe Label)")
print_info("Safe Labels", "env=e2e-test, safe_mode=true")
print_info("Target", self.test_alert["target_resource"])
print_info("Severity", self.test_alert["severity"])
try:
# 準備 payload 和 headers
payload_bytes = json.dumps(self.test_alert).encode("utf-8")
headers = {"Content-Type": "application/json"}
# 如果有 HMAC Secret計算簽名
# 2026-03-29 Claude Code: 修正 Header 名稱為 X-Signature-256 (與 API 一致)
if HMAC_SECRET:
signature = compute_hmac_signature(HMAC_SECRET, payload_bytes)
headers["X-Signature-256"] = signature
print_info("HMAC", f"已簽名 ({signature[:20]}...)")
else:
print_warn("無 HMAC Secret - 僅限測試環境")
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.post(
f"{self.api_url}/api/v1/webhooks/alerts",
content=payload_bytes,
headers=headers,
)
if response.status_code == 401:
print_warn("HMAC 驗證失敗 - 檢查 Secret 是否正確")
print_info("提示", "確認 WEBHOOK_HMAC_SECRET 與 K8s 一致")
return False
if response.status_code != 200:
print_fail(f"Webhook 返回 {response.status_code}")
return False
data = response.json()
self.approval_id = data.get("approval_id")
if not self.approval_id:
print_fail("未獲得 Approval ID")
return False
print_success("Alert 發射成功")
print_info("Approval ID", self.approval_id)
print_info("Risk Level", data.get("risk_level", "N/A"))
return True
except httpx.ConnectError:
print_fail(f"無法連接 API: {self.api_url}")
return False
except Exception as e:
print_fail(f"發生錯誤: {e}")
return False
async def step2_verify_ai_analysis(self) -> bool:
"""Step 2: 驗證 AI 分析結果 + 目標資源斷言"""
print_step(2, 5, "驗證 AI 分析結果 + 目標資源斷言")
if not self.approval_id:
print_fail("沒有 Approval ID跳過")
return False
try:
max_attempts = 10
for attempt in range(max_attempts):
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(
f"{self.api_url}/api/v1/approvals/{self.approval_id}",
)
if response.status_code != 200:
print_warn(f"Attempt {attempt + 1}: API 返回 {response.status_code}")
await asyncio.sleep(2)
continue
data = response.json()
self.approval_data = data
action = data.get("action", "")
status = data.get("status", "")
print_info("Status", status)
print_info("Action", action[:80] if action else "N/A")
# Phase 18.2.1: 目標資源斷言
expected_target = self.test_alert["target_resource"]
is_valid, actual_target = verify_action_target(action, expected_target)
print_info("Expected Target", expected_target)
print_info("Actual Target", actual_target)
if is_valid:
print_success("目標資源驗證通過 - AI 沒殺錯人")
return True
elif status == "pending" and action:
print_warn("目標資源不匹配,可能需要檢查")
print_info("警告", f"Expected: {expected_target}, Got: {actual_target}")
return True # 不算完全失敗
else:
print_warn(f"等待 AI 分析... ({attempt + 1}/{max_attempts})")
await asyncio.sleep(3)
print_fail("AI 分析超時")
return False
except Exception as e:
print_fail(f"驗證失敗: {e}")
return False
async def step3_verify_approval_in_redis(self) -> bool:
"""Step 3: 驗證 Approval 存入 Redis"""
print_step(3, 5, "驗證 Approval 存入 Redis")
if not self.approval_id:
print_fail("沒有 Approval ID跳過")
return False
try:
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(
f"{self.api_url}/api/v1/approvals/pending",
)
if response.status_code != 200:
print_fail(f"API 返回 {response.status_code}")
return False
data = response.json()
approvals = data.get("approvals", [])
print_info("Pending 數量", len(approvals))
found = any(a.get("id") == self.approval_id for a in approvals)
if found:
print_success("Approval 在 pending 列表中")
return True
else:
print_warn("Approval 不在 pending 列表 (可能已處理)")
return True
except Exception as e:
print_fail(f"驗證失敗: {e}")
return False
async def step4_dynamic_approval(self) -> bool:
"""Step 4: 動態簽署 (根據風險等級)"""
print_step(4, 5, "動態簽署 (根據風險等級)")
if not self.approval_id or not self.approval_data:
print_fail("沒有 Approval 資料,跳過")
return False
if self.dry_run:
print_warn("Dry-run 模式:跳過實際審核")
return True
try:
required = self.approval_data.get("required_signatures", 1)
current = len(self.approval_data.get("signatures", []))
remaining = required - current
print_info("Required Signatures", required)
print_info("Current Signatures", current)
print_info("Remaining", remaining)
if remaining <= 0:
print_success("已有足夠簽名")
return True
# Phase 18.2.2: 動態簽署
for i in range(min(remaining, len(SIGNER_POOL))):
signer = SIGNER_POOL[i]
print_info("Signing with", signer["name"])
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.post(
f"{self.api_url}/api/v1/approvals/{self.approval_id}/approve",
json={
"signer_name": signer["name"],
"comment": f"E2E auto-sign by {signer['id']}",
},
)
if response.status_code == 200:
print_success(f"簽名成功: {signer['name']}")
else:
print_warn(f"簽名失敗: {response.status_code}")
return True
except Exception as e:
print_fail(f"簽署失敗: {e}")
return False
async def step5_verify_execution(self) -> bool:
"""Step 5: 驗證執行結果"""
print_step(5, 5, "驗證執行結果 (Safe Mode)")
if not self.approval_id:
print_fail("沒有 Approval ID跳過")
return False
if self.dry_run:
print_warn("Dry-run 模式:跳過執行驗證")
return True
try:
await asyncio.sleep(5)
async with httpx.AsyncClient(timeout=TIMEOUT) as client:
response = await client.get(
f"{self.api_url}/api/v1/approvals/{self.approval_id}",
)
if response.status_code != 200:
print_fail(f"API 返回 {response.status_code}")
return False
data = response.json()
status = data.get("status", "")
executed = data.get("executed", False)
print_info("Status", status)
print_info("Executed", executed)
# 檢查 Safe Mode 是否生效
labels = self.test_alert.get("labels", {})
if labels.get("safe_mode") == "true":
print_success("Safe Mode 啟用 - 實際 K8s 操作已跳過")
timeline = data.get("timeline", [])
exec_events = [e for e in timeline if e.get("event_type") == "exec"]
if exec_events:
print_success(f"找到 {len(exec_events)} 個執行事件")
for evt in exec_events[-2:]:
print_info("Event", f"{evt.get('title')} - {evt.get('status')}")
return True
except Exception as e:
print_fail(f"驗證失敗: {e}")
return False
async def run(self) -> bool:
"""執行完整驗證"""
print_banner()
print(f"{Colors.DIM}API URL: {self.api_url}{Colors.ENDC}")
print(f"{Colors.DIM}Dry-run: {self.dry_run}{Colors.ENDC}")
print(f"{Colors.DIM}Critical Mode: {self.use_critical}{Colors.ENDC}")
print(f"{Colors.DIM}Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.ENDC}")
start_time = time.time()
self.results["step1_fire_alert"] = await self.step1_fire_alert()
self.results["step2_verify_ai"] = await self.step2_verify_ai_analysis()
self.results["step3_verify_redis"] = await self.step3_verify_approval_in_redis()
self.results["step4_approve"] = await self.step4_dynamic_approval()
self.results["step5_verify_exec"] = await self.step5_verify_execution()
elapsed = time.time() - start_time
passed = sum(1 for v in self.results.values() if v)
total = len(self.results)
print(f"\n{Colors.BLUE}{'' * 60}{Colors.ENDC}")
print(f"{Colors.BOLD}驗證結果摘要{Colors.ENDC}")
print(f"{Colors.DIM}{'' * 60}{Colors.ENDC}")
for step, result in self.results.items():
status = f"{Colors.GREEN}PASS{Colors.ENDC}" if result else f"{Colors.RED}FAIL{Colors.ENDC}"
print(f" {step}: {status}")
print(f"\n{Colors.BOLD}總計: {passed}/{total} 通過{Colors.ENDC}")
print(f"{Colors.DIM}耗時: {elapsed:.2f}{Colors.ENDC}")
if passed == total:
print(f"\n{Colors.GREEN}{Colors.BOLD}🎉 E2E 驗證全部通過!{Colors.ENDC}")
print(f"{Colors.GREEN}AI 大腦 → kubectl 指令 → 目標正確 → 執行成功{Colors.ENDC}")
elif passed >= 3:
print(f"\n{Colors.YELLOW}{Colors.BOLD}⚠ 部分驗證通過{Colors.ENDC}")
else:
print(f"\n{Colors.RED}{Colors.BOLD}❌ 驗證失敗{Colors.ENDC}")
return passed == total
# =============================================================================
# CLI Entry Point
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="E2E Tool Call Verification v2.0",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
範例:
# Dry-run (預設)
python -m scripts.e2e_tool_call_verification --dry-run
# 生產環境
python -m scripts.e2e_tool_call_verification --api-url http://192.168.0.120:32334
# 完整執行
python -m scripts.e2e_tool_call_verification --no-dry-run
# Critical 風險測試 (需 2 簽名)
python -m scripts.e2e_tool_call_verification --critical --no-dry-run
""",
)
parser.add_argument("--api-url", type=str, default=DEFAULT_API_URL)
parser.add_argument("--dry-run", action="store_true", default=True)
parser.add_argument("--no-dry-run", action="store_true")
parser.add_argument("--critical", action="store_true", help="使用 CRITICAL 風險等級測試")
args = parser.parse_args()
dry_run = args.dry_run and not args.no_dry_run
verifier = E2EVerification(
api_url=args.api_url,
dry_run=dry_run,
use_critical=args.critical,
)
success = asyncio.run(verifier.run())
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()