- Python: ruff --fix 修復 280 個 lint 錯誤 - lewooogo-core: src/ 目錄未追蹤,導致 CI eslint 失敗 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
370 lines
11 KiB
Python
Executable File
370 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
AWOOOI 實彈射擊腳本 - 自動化告警測試
|
||
=====================================
|
||
Phase 5: Shadow Mode - 自動化實彈演習
|
||
|
||
功能:
|
||
1. 模擬 Prometheus 格式的 OOMKilled/PodCrash 告警
|
||
2. 自動計算 HMAC-SHA256 簽章
|
||
3. 直接打向本地 Webhook 端點
|
||
4. 驗證回應並輸出結果
|
||
|
||
使用方式:
|
||
python scripts/fire_live_alert.py
|
||
|
||
環境變數:
|
||
WEBHOOK_HMAC_SECRET: HMAC 簽章密鑰 (必要)
|
||
AWOOOI_API_URL: API 端點 (預設: http://192.168.0.188:8000)
|
||
|
||
Tier 2 授權: 此腳本會觸發 AI 分析流程,需統帥授權
|
||
"""
|
||
|
||
import argparse
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import os
|
||
from datetime import UTC, datetime
|
||
|
||
import httpx
|
||
|
||
# =============================================================================
|
||
# Configuration
|
||
# =============================================================================
|
||
|
||
DEFAULT_API_URL = os.getenv("AWOOOI_API_URL", "http://192.168.0.188:8000")
|
||
WEBHOOK_ENDPOINT = "/api/v1/webhooks/alerts"
|
||
HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "")
|
||
|
||
|
||
# =============================================================================
|
||
# Alert Templates
|
||
# =============================================================================
|
||
|
||
ALERT_TEMPLATES = {
|
||
"oomkilled": {
|
||
"alert_type": "k8s_pod_crash",
|
||
"severity": "critical",
|
||
"source": "prometheus",
|
||
"target_resource": "harbor-core-7d4b8c9f5-xk2m3",
|
||
"namespace": "harbor",
|
||
"message": "Pod terminated due to OOMKilled - Container exceeded memory limit",
|
||
"metrics": {
|
||
"memory_percent": 99.8,
|
||
"restart_count": 5,
|
||
"memory_limit_mb": 512,
|
||
"memory_usage_mb": 520,
|
||
},
|
||
"labels": {
|
||
"app": "harbor-core",
|
||
"deployment": "harbor-core",
|
||
"pod": "harbor-core-7d4b8c9f5-xk2m3",
|
||
"container": "harbor-core",
|
||
"reason": "OOMKilled",
|
||
},
|
||
},
|
||
"podcrash": {
|
||
"alert_type": "k8s_pod_crash",
|
||
"severity": "warning",
|
||
"source": "prometheus",
|
||
"target_resource": "nginx-ingress-7d6f8c9b5-abc12",
|
||
"namespace": "ingress-nginx",
|
||
"message": "Pod CrashLoopBackOff - Container restarting repeatedly",
|
||
"metrics": {
|
||
"restart_count": 8,
|
||
"cpu_percent": 15.2,
|
||
"memory_percent": 45.0,
|
||
},
|
||
"labels": {
|
||
"app": "nginx-ingress",
|
||
"deployment": "nginx-ingress-controller",
|
||
"pod": "nginx-ingress-7d6f8c9b5-abc12",
|
||
},
|
||
},
|
||
"highcpu": {
|
||
"alert_type": "high_cpu",
|
||
"severity": "warning",
|
||
"source": "prometheus",
|
||
"target_resource": "api-backend-deployment",
|
||
"namespace": "default",
|
||
"message": "High CPU usage detected - Pod using 95% of allocated CPU",
|
||
"metrics": {
|
||
"cpu_percent": 95.5,
|
||
"memory_percent": 60.0,
|
||
"sigma_deviation": 3.2,
|
||
},
|
||
"labels": {
|
||
"app": "api-backend",
|
||
"deployment": "api-backend",
|
||
},
|
||
},
|
||
"highmemory": {
|
||
"alert_type": "high_memory",
|
||
"severity": "warning",
|
||
"source": "prometheus",
|
||
"target_resource": "redis-master-0",
|
||
"namespace": "redis",
|
||
"message": "High memory usage detected - Pod memory at 92%",
|
||
"metrics": {
|
||
"cpu_percent": 25.0,
|
||
"memory_percent": 92.0,
|
||
"sigma_deviation": 2.8,
|
||
},
|
||
"labels": {
|
||
"app": "redis",
|
||
"statefulset": "redis-master",
|
||
},
|
||
},
|
||
}
|
||
|
||
|
||
# =============================================================================
|
||
# Helper Functions
|
||
# =============================================================================
|
||
|
||
def compute_hmac_signature(secret: str, payload: bytes) -> str:
|
||
"""計算 HMAC-SHA256 簽章"""
|
||
signature = hmac.new(
|
||
secret.encode(),
|
||
payload,
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
return f"sha256={signature}"
|
||
|
||
|
||
def print_header(title: str) -> None:
|
||
"""列印標題"""
|
||
print("\n" + "=" * 60)
|
||
print(f" {title}")
|
||
print("=" * 60)
|
||
|
||
|
||
def print_success(message: str) -> None:
|
||
"""列印成功訊息"""
|
||
print(f" ✅ {message}")
|
||
|
||
|
||
def print_error(message: str) -> None:
|
||
"""列印錯誤訊息"""
|
||
print(f" ❌ {message}")
|
||
|
||
|
||
def print_info(message: str) -> None:
|
||
"""列印資訊訊息"""
|
||
print(f" ℹ️ {message}")
|
||
|
||
|
||
def print_warning(message: str) -> None:
|
||
"""列印警告訊息"""
|
||
print(f" ⚠️ {message}")
|
||
|
||
|
||
# =============================================================================
|
||
# Main Logic
|
||
# =============================================================================
|
||
|
||
def fire_alert(
|
||
alert_type: str,
|
||
api_url: str = DEFAULT_API_URL,
|
||
hmac_secret: str = HMAC_SECRET,
|
||
dry_run: bool = False,
|
||
) -> dict:
|
||
"""
|
||
發射模擬告警
|
||
|
||
Args:
|
||
alert_type: 告警類型 (oomkilled, podcrash, highcpu, highmemory)
|
||
api_url: API 端點 URL
|
||
hmac_secret: HMAC 簽章密鑰
|
||
dry_run: 是否僅輸出不實際發送
|
||
|
||
Returns:
|
||
dict: API 回應
|
||
"""
|
||
print_header(f"AWOOOI 實彈射擊 - {alert_type.upper()}")
|
||
print(f"執行時間: {datetime.now(UTC).isoformat()}")
|
||
print(f"目標端點: {api_url}{WEBHOOK_ENDPOINT}")
|
||
|
||
# 取得告警模板
|
||
if alert_type not in ALERT_TEMPLATES:
|
||
print_error(f"未知的告警類型: {alert_type}")
|
||
print_info(f"可用類型: {', '.join(ALERT_TEMPLATES.keys())}")
|
||
return {"success": False, "error": "Unknown alert type"}
|
||
|
||
payload = ALERT_TEMPLATES[alert_type].copy()
|
||
|
||
# 序列化 Payload (與 httpx 相同的格式)
|
||
payload_json = json.dumps(payload, separators=(",", ":"))
|
||
payload_bytes = payload_json.encode()
|
||
|
||
print("\n📦 告警 Payload:")
|
||
print(json.dumps(payload, indent=2, ensure_ascii=False))
|
||
|
||
# 計算 HMAC 簽章
|
||
if hmac_secret:
|
||
signature = compute_hmac_signature(hmac_secret, payload_bytes)
|
||
print_success(f"HMAC 簽章: {signature[:40]}...")
|
||
else:
|
||
signature = None
|
||
print_warning("無 HMAC Secret - 簽章將被跳過 (僅限 dev 環境)")
|
||
|
||
# Dry-run 模式
|
||
if dry_run:
|
||
print("\n🔒 [DRY-RUN MODE] 不實際發送請求")
|
||
print_info("移除 --dry-run 參數以實際發射")
|
||
return {"success": True, "dry_run": True}
|
||
|
||
# 發送請求
|
||
print("\n🚀 發射中...")
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
if signature:
|
||
headers["X-Signature-256"] = signature
|
||
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
response = client.post(
|
||
f"{api_url}{WEBHOOK_ENDPOINT}",
|
||
content=payload_bytes,
|
||
headers=headers,
|
||
)
|
||
|
||
# 解析回應
|
||
print(f"\n📡 HTTP Status: {response.status_code}")
|
||
|
||
try:
|
||
result = response.json()
|
||
print("\n📋 API 回應:")
|
||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||
|
||
if response.status_code == 200 and result.get("success"):
|
||
print_success("告警已成功接收並處理!")
|
||
|
||
if result.get("converged"):
|
||
print_info(f"告警收斂: 相同指紋已聚合 x{result.get('hit_count', 1)} 次")
|
||
else:
|
||
print_info(f"風險等級: {result.get('risk_level', 'N/A')}")
|
||
print_info(f"建議操作: {result.get('suggested_action', 'N/A')}")
|
||
|
||
if result.get("approval_created"):
|
||
print_success(f"待簽核卡片已建立: {result.get('approval_id', 'N/A')}")
|
||
else:
|
||
print_error(f"處理失敗: {result.get('message', result.get('detail', 'Unknown error'))}")
|
||
|
||
return result
|
||
|
||
except json.JSONDecodeError:
|
||
print_error(f"回應解析失敗: {response.text}")
|
||
return {"success": False, "error": "Response parse error", "raw": response.text}
|
||
|
||
except httpx.ConnectError as e:
|
||
print_error(f"連線失敗: {str(e)}")
|
||
print_info(f"請確認 API 服務正在執行: {api_url}")
|
||
return {"success": False, "error": "Connection failed"}
|
||
|
||
except httpx.TimeoutException as e:
|
||
print_error(f"請求超時: {str(e)}")
|
||
return {"success": False, "error": "Timeout"}
|
||
|
||
except Exception as e:
|
||
print_error(f"未預期錯誤: {str(e)}")
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def main():
|
||
"""主程式入口"""
|
||
parser = argparse.ArgumentParser(
|
||
description="AWOOOI 實彈射擊腳本 - 自動化告警測試",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
告警類型:
|
||
oomkilled - Pod OOMKilled (Critical)
|
||
podcrash - Pod CrashLoopBackOff (Warning)
|
||
highcpu - High CPU Usage (Warning)
|
||
highmemory - High Memory Usage (Warning)
|
||
|
||
範例:
|
||
# 發射 OOMKilled 告警
|
||
python scripts/fire_live_alert.py oomkilled
|
||
|
||
# Dry-run 模式 (不實際發送)
|
||
python scripts/fire_live_alert.py oomkilled --dry-run
|
||
|
||
# 指定 HMAC Secret
|
||
WEBHOOK_HMAC_SECRET=mysecret python scripts/fire_live_alert.py oomkilled
|
||
""",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"alert_type",
|
||
choices=list(ALERT_TEMPLATES.keys()),
|
||
help="告警類型",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--api-url",
|
||
default=DEFAULT_API_URL,
|
||
help=f"API 端點 URL (預設: {DEFAULT_API_URL})",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--hmac-secret",
|
||
default=HMAC_SECRET,
|
||
help="HMAC 簽章密鑰 (也可用環境變數 WEBHOOK_HMAC_SECRET)",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Dry-run 模式 - 僅輸出不實際發送",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--all",
|
||
action="store_true",
|
||
help="依序發射所有類型的告警",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
print_header("AWOOOI 實彈射擊系統")
|
||
print(f"API 端點: {args.api_url}")
|
||
print(f"HMAC 配置: {'已設定' if args.hmac_secret else '未設定 (dev mode)'}")
|
||
print("Shadow Mode: 已啟用 (K8s 操作將被安全攔截)")
|
||
|
||
if args.all:
|
||
# 發射所有類型的告警
|
||
print("\n🎯 連續發射所有告警類型...")
|
||
results = {}
|
||
for alert_type in ALERT_TEMPLATES.keys():
|
||
result = fire_alert(
|
||
alert_type=alert_type,
|
||
api_url=args.api_url,
|
||
hmac_secret=args.hmac_secret,
|
||
dry_run=args.dry_run,
|
||
)
|
||
results[alert_type] = result
|
||
|
||
# 摘要
|
||
print_header("射擊結果摘要")
|
||
for alert_type, result in results.items():
|
||
status = "✅" if result.get("success") else "❌"
|
||
print(f" {status} {alert_type}: {result.get('message', result.get('error', 'N/A'))}")
|
||
else:
|
||
# 發射單一告警
|
||
fire_alert(
|
||
alert_type=args.alert_type,
|
||
api_url=args.api_url,
|
||
hmac_secret=args.hmac_secret,
|
||
dry_run=args.dry_run,
|
||
)
|
||
|
||
print("\n" + "=" * 60)
|
||
print(" 實彈射擊完成")
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|