#!/usr/bin/env python3 """ AWOOOI 實彈射擊腳本 - 自動化告警測試 ===================================== Phase 5: Shadow Mode - 自動化實彈演習 功能: 1. 模擬 Prometheus 格式的 OOMKilled/PodCrash 告警 2. 自動計算 HMAC-SHA256 簽章 3. 直接打向本地 Webhook 端點 4. 驗證回應並輸出結果 使用方式: python scripts/fire_live_alert.py 環境變數: WEBHOOK_HMAC_SECRET: HMAC 簽章密鑰 (必要) AWOOOI_API_URL: API 端點 (預設: http://192.168.0.188:8000) Tier 2 授權: 此腳本會觸發 AI 分析流程,需統帥授權 """ import argparse import hashlib import hmac import json import os import sys from datetime import datetime, timezone from typing import Literal import httpx # ============================================================================= # Configuration # ============================================================================= DEFAULT_API_URL = os.getenv("AWOOOI_API_URL", "http://192.168.0.188:8000") WEBHOOK_ENDPOINT = "/api/v1/webhooks/alerts" HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "") # ============================================================================= # Alert Templates # ============================================================================= ALERT_TEMPLATES = { "oomkilled": { "alert_type": "k8s_pod_crash", "severity": "critical", "source": "prometheus", "target_resource": "harbor-core-7d4b8c9f5-xk2m3", "namespace": "harbor", "message": "Pod terminated due to OOMKilled - Container exceeded memory limit", "metrics": { "memory_percent": 99.8, "restart_count": 5, "memory_limit_mb": 512, "memory_usage_mb": 520, }, "labels": { "app": "harbor-core", "deployment": "harbor-core", "pod": "harbor-core-7d4b8c9f5-xk2m3", "container": "harbor-core", "reason": "OOMKilled", }, }, "podcrash": { "alert_type": "k8s_pod_crash", "severity": "warning", "source": "prometheus", "target_resource": "nginx-ingress-7d6f8c9b5-abc12", "namespace": "ingress-nginx", "message": "Pod CrashLoopBackOff - Container restarting repeatedly", "metrics": { "restart_count": 8, "cpu_percent": 15.2, "memory_percent": 45.0, }, "labels": { "app": "nginx-ingress", "deployment": "nginx-ingress-controller", "pod": "nginx-ingress-7d6f8c9b5-abc12", }, }, "highcpu": { "alert_type": "high_cpu", "severity": "warning", "source": "prometheus", "target_resource": "api-backend-deployment", "namespace": "default", "message": "High CPU usage detected - Pod using 95% of allocated CPU", "metrics": { "cpu_percent": 95.5, "memory_percent": 60.0, "sigma_deviation": 3.2, }, "labels": { "app": "api-backend", "deployment": "api-backend", }, }, "highmemory": { "alert_type": "high_memory", "severity": "warning", "source": "prometheus", "target_resource": "redis-master-0", "namespace": "redis", "message": "High memory usage detected - Pod memory at 92%", "metrics": { "cpu_percent": 25.0, "memory_percent": 92.0, "sigma_deviation": 2.8, }, "labels": { "app": "redis", "statefulset": "redis-master", }, }, } # ============================================================================= # Helper Functions # ============================================================================= def compute_hmac_signature(secret: str, payload: bytes) -> str: """計算 HMAC-SHA256 簽章""" signature = hmac.new( secret.encode(), payload, hashlib.sha256, ).hexdigest() return f"sha256={signature}" def print_header(title: str) -> None: """列印標題""" print("\n" + "=" * 60) print(f" {title}") print("=" * 60) def print_success(message: str) -> None: """列印成功訊息""" print(f" ✅ {message}") def print_error(message: str) -> None: """列印錯誤訊息""" print(f" ❌ {message}") def print_info(message: str) -> None: """列印資訊訊息""" print(f" ℹ️ {message}") def print_warning(message: str) -> None: """列印警告訊息""" print(f" ⚠️ {message}") # ============================================================================= # Main Logic # ============================================================================= def fire_alert( alert_type: str, api_url: str = DEFAULT_API_URL, hmac_secret: str = HMAC_SECRET, dry_run: bool = False, ) -> dict: """ 發射模擬告警 Args: alert_type: 告警類型 (oomkilled, podcrash, highcpu, highmemory) api_url: API 端點 URL hmac_secret: HMAC 簽章密鑰 dry_run: 是否僅輸出不實際發送 Returns: dict: API 回應 """ print_header(f"AWOOOI 實彈射擊 - {alert_type.upper()}") print(f"執行時間: {datetime.now(timezone.utc).isoformat()}") print(f"目標端點: {api_url}{WEBHOOK_ENDPOINT}") # 取得告警模板 if alert_type not in ALERT_TEMPLATES: print_error(f"未知的告警類型: {alert_type}") print_info(f"可用類型: {', '.join(ALERT_TEMPLATES.keys())}") return {"success": False, "error": "Unknown alert type"} payload = ALERT_TEMPLATES[alert_type].copy() # 序列化 Payload (與 httpx 相同的格式) payload_json = json.dumps(payload, separators=(",", ":")) payload_bytes = payload_json.encode() print("\n📦 告警 Payload:") print(json.dumps(payload, indent=2, ensure_ascii=False)) # 計算 HMAC 簽章 if hmac_secret: signature = compute_hmac_signature(hmac_secret, payload_bytes) print_success(f"HMAC 簽章: {signature[:40]}...") else: signature = None print_warning("無 HMAC Secret - 簽章將被跳過 (僅限 dev 環境)") # Dry-run 模式 if dry_run: print("\n🔒 [DRY-RUN MODE] 不實際發送請求") print_info("移除 --dry-run 參數以實際發射") return {"success": True, "dry_run": True} # 發送請求 print("\n🚀 發射中...") headers = {"Content-Type": "application/json"} if signature: headers["X-Signature-256"] = signature try: with httpx.Client(timeout=30.0) as client: response = client.post( f"{api_url}{WEBHOOK_ENDPOINT}", content=payload_bytes, headers=headers, ) # 解析回應 print(f"\n📡 HTTP Status: {response.status_code}") try: result = response.json() print("\n📋 API 回應:") print(json.dumps(result, indent=2, ensure_ascii=False)) if response.status_code == 200 and result.get("success"): print_success("告警已成功接收並處理!") if result.get("converged"): print_info(f"告警收斂: 相同指紋已聚合 x{result.get('hit_count', 1)} 次") else: print_info(f"風險等級: {result.get('risk_level', 'N/A')}") print_info(f"建議操作: {result.get('suggested_action', 'N/A')}") if result.get("approval_created"): print_success(f"待簽核卡片已建立: {result.get('approval_id', 'N/A')}") else: print_error(f"處理失敗: {result.get('message', result.get('detail', 'Unknown error'))}") return result except json.JSONDecodeError: print_error(f"回應解析失敗: {response.text}") return {"success": False, "error": "Response parse error", "raw": response.text} except httpx.ConnectError as e: print_error(f"連線失敗: {str(e)}") print_info(f"請確認 API 服務正在執行: {api_url}") return {"success": False, "error": "Connection failed"} except httpx.TimeoutException as e: print_error(f"請求超時: {str(e)}") return {"success": False, "error": "Timeout"} except Exception as e: print_error(f"未預期錯誤: {str(e)}") return {"success": False, "error": str(e)} def main(): """主程式入口""" parser = argparse.ArgumentParser( description="AWOOOI 實彈射擊腳本 - 自動化告警測試", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 告警類型: oomkilled - Pod OOMKilled (Critical) podcrash - Pod CrashLoopBackOff (Warning) highcpu - High CPU Usage (Warning) highmemory - High Memory Usage (Warning) 範例: # 發射 OOMKilled 告警 python scripts/fire_live_alert.py oomkilled # Dry-run 模式 (不實際發送) python scripts/fire_live_alert.py oomkilled --dry-run # 指定 HMAC Secret WEBHOOK_HMAC_SECRET=mysecret python scripts/fire_live_alert.py oomkilled """, ) parser.add_argument( "alert_type", choices=list(ALERT_TEMPLATES.keys()), help="告警類型", ) parser.add_argument( "--api-url", default=DEFAULT_API_URL, help=f"API 端點 URL (預設: {DEFAULT_API_URL})", ) parser.add_argument( "--hmac-secret", default=HMAC_SECRET, help="HMAC 簽章密鑰 (也可用環境變數 WEBHOOK_HMAC_SECRET)", ) parser.add_argument( "--dry-run", action="store_true", help="Dry-run 模式 - 僅輸出不實際發送", ) parser.add_argument( "--all", action="store_true", help="依序發射所有類型的告警", ) args = parser.parse_args() print_header("AWOOOI 實彈射擊系統") print(f"API 端點: {args.api_url}") print(f"HMAC 配置: {'已設定' if args.hmac_secret else '未設定 (dev mode)'}") print(f"Shadow Mode: 已啟用 (K8s 操作將被安全攔截)") if args.all: # 發射所有類型的告警 print("\n🎯 連續發射所有告警類型...") results = {} for alert_type in ALERT_TEMPLATES.keys(): result = fire_alert( alert_type=alert_type, api_url=args.api_url, hmac_secret=args.hmac_secret, dry_run=args.dry_run, ) results[alert_type] = result # 摘要 print_header("射擊結果摘要") for alert_type, result in results.items(): status = "✅" if result.get("success") else "❌" print(f" {status} {alert_type}: {result.get('message', result.get('error', 'N/A'))}") else: # 發射單一告警 fire_alert( alert_type=args.alert_type, api_url=args.api_url, hmac_secret=args.hmac_secret, dry_run=args.dry_run, ) print("\n" + "=" * 60) print(" 實彈射擊完成") print("=" * 60) if __name__ == "__main__": main()