Files
awoooi/apps/api/scripts/test_race_condition.py
OG T 6f049877fc fix(lint): ruff auto-fix + lewooogo-core src 加入 git
- Python: ruff --fix 修復 280 個 lint 錯誤
- lewooogo-core: src/ 目錄未追蹤,導致 CI eslint 失敗

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-23 23:51:37 +08:00

287 lines
8.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Phase 6.3 Race Condition 測試腳本
==================================
功能:
1. 使用 asyncio.gather 同時發射 20 筆同源告警
2. 證明 Lua Script 原子操作成功擋下 Race Condition
3. 驗證最終 Incident JSON 精準包含 20 筆 Signals
使用方式:
cd apps/api
python scripts/test_race_condition.py
預期結果:
- 只有 1 個 Incident 被建立
- signals 陣列長度 = 20
- 無任何 Signal 遺失
統帥鐵律:
- 嚴禁人工 QA
- 必須程式化驗證
"""
import asyncio
import json
from datetime import datetime
import httpx
# API 端點
API_BASE = "http://localhost:8000"
SIGNALS_ENDPOINT = f"{API_BASE}/api/v1/webhooks/signals"
# 併發數量
CONCURRENT_SIGNALS = 20
# 測試 namespace 和 target (同源)
TEST_NAMESPACE = "race-test-ns"
TEST_TARGET = "race-test-service"
def generate_alert(index: int) -> dict:
"""生成測試告警 (同 namespace + 同 target不同 alert_name)"""
return {
"alert_name": f"RaceConditionTest_{index:03d}",
"severity": "warning",
"source": "prometheus",
"namespace": TEST_NAMESPACE,
"target": TEST_TARGET,
"fingerprint": f"fp_race_{index:03d}", # 唯一 fingerprint 防止去重
"labels": {
"namespace": TEST_NAMESPACE,
"test_index": str(index),
},
"annotations": {
"summary": f"Race condition test signal #{index}",
},
}
async def send_alert(client: httpx.AsyncClient, index: int) -> dict:
"""發送單一告警"""
alert = generate_alert(index)
try:
response = await client.post(
SIGNALS_ENDPOINT,
json=alert,
timeout=30.0,
)
return {
"index": index,
"status_code": response.status_code,
"message_id": response.json().get("message_id"),
"success": response.status_code == 200,
}
except Exception as e:
return {
"index": index,
"status_code": 0,
"message_id": None,
"success": False,
"error": str(e),
}
async def fire_concurrent_alerts() -> list[dict]:
"""併發發射所有告警"""
async with httpx.AsyncClient() as client:
tasks = [send_alert(client, i) for i in range(CONCURRENT_SIGNALS)]
results = await asyncio.gather(*tasks)
return list(results)
async def verify_redis_incident() -> dict | None:
"""從 Redis 查詢 Incident 並驗證"""
import subprocess
# 查詢所有 incident keys
result = subprocess.run(
["docker", "exec", "awoooi-redis", "redis-cli", "KEYS", "incident:INC-*"],
capture_output=True,
text=True,
)
keys = [k.strip() for k in result.stdout.strip().split("\n") if k.strip()]
if not keys:
return None
# 找到最新的 Incident (假設測試環境已清空)
# 對於測試,我們檢查所有 incident 找到包含 race-test-ns 的那個
for key in keys:
get_result = subprocess.run(
["docker", "exec", "awoooi-redis", "redis-cli", "GET", key],
capture_output=True,
text=True,
)
if get_result.stdout.strip():
try:
incident = json.loads(get_result.stdout.strip())
# 檢查是否是我們的測試 Incident
if any(
s.get("labels", {}).get("namespace") == TEST_NAMESPACE
for s in incident.get("signals", [])
):
return incident
except json.JSONDecodeError:
continue
return None
async def main():
"""主測試流程"""
print("=" * 70)
print("Phase 6.3 Race Condition 併發測試")
print("=" * 70)
print(f"時間: {datetime.now().isoformat()}")
print(f"併發數量: {CONCURRENT_SIGNALS} 筆告警")
print(f"測試 Namespace: {TEST_NAMESPACE}")
print(f"測試 Target: {TEST_TARGET}")
print()
# 0. 清除舊的測試 Incident (可選)
print("[0] 準備測試環境...")
import subprocess
# 刪除舊的索引 (如果存在)
subprocess.run(
[
"docker", "exec", "awoooi-redis", "redis-cli",
"DEL",
f"incident:idx:ns:{TEST_NAMESPACE}",
f"incident:idx:target:{TEST_TARGET}",
],
capture_output=True,
)
print(" 已清除舊索引")
# 1. 檢查 API
print("\n[1] 檢查 API 健康狀態...")
async with httpx.AsyncClient() as client:
try:
health = await client.get(f"{API_BASE}/api/v1/health", timeout=5.0)
print(f" API status: {health.status_code}")
except Exception as e:
print(f" API 連線失敗: {e}")
print(" 請確認 API 已啟動: docker compose up -d")
return
# 2. 併發發射告警
print("\n" + "-" * 70)
print("[2] 併發發射 20 筆告警 (asyncio.gather)")
print("-" * 70)
start_time = datetime.now()
results = await fire_concurrent_alerts()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
success_count = sum(1 for r in results if r["success"])
fail_count = sum(1 for r in results if not r["success"])
print("\n發射結果:")
print(f" 成功: {success_count}/{CONCURRENT_SIGNALS}")
print(f" 失敗: {fail_count}/{CONCURRENT_SIGNALS}")
print(f" 耗時: {duration:.3f}")
if fail_count > 0:
print("\n失敗詳情:")
for r in results:
if not r["success"]:
print(f" - Index {r['index']}: {r.get('error', 'Unknown')}")
# 3. 等待 Consumer 處理
print("\n" + "-" * 70)
print("[3] 等待 Consumer 處理 (5 秒)")
print("-" * 70)
await asyncio.sleep(5)
# 4. 驗證 Redis Incident
print("\n" + "-" * 70)
print("[4] 驗證 Redis Incident")
print("-" * 70)
incident = await verify_redis_incident()
if not incident:
print("\n❌ 錯誤: 找不到測試 Incident!")
print(" 請檢查 API 日誌: docker logs awoooi-api --tail 100")
return
incident_id = incident.get("incident_id", "N/A")
signals = incident.get("signals", [])
signal_count = len(signals)
severity = incident.get("severity", "N/A")
affected_services = incident.get("affected_services", [])
print("\n找到 Incident:")
print(f" incident_id: {incident_id}")
print(f" signal_count: {signal_count}")
print(f" severity: {severity}")
print(f" affected_services: {affected_services}")
# 5. 驗證結果
print("\n" + "=" * 70)
print("驗證結果")
print("=" * 70)
# 計算聚合的告警數量
race_signals = [
s for s in signals
if s.get("alert_name", "").startswith("RaceConditionTest_")
]
race_signal_count = len(race_signals)
# 檢查告警名稱分布
alert_names = [s.get("alert_name") for s in race_signals]
unique_names = set(alert_names)
print()
passed = True
# 驗證 1: signal_count
if race_signal_count == CONCURRENT_SIGNALS:
print(f"[✅ PASS] Signal 數量: {race_signal_count}/{CONCURRENT_SIGNALS}")
else:
print(f"[❌ FAIL] Signal 數量: {race_signal_count}/{CONCURRENT_SIGNALS}")
print(f" 遺失 {CONCURRENT_SIGNALS - race_signal_count} 筆 Signal!")
passed = False
# 驗證 2: unique names (無重複跳過)
if len(unique_names) == race_signal_count:
print(f"[✅ PASS] 唯一告警名稱: {len(unique_names)} 個 (無重複)")
else:
print(f"[❌ FAIL] 唯一告警名稱: {len(unique_names)} 個 (有重複被覆蓋)")
passed = False
# 驗證 3: affected_services
if TEST_TARGET in affected_services:
print(f"[✅ PASS] affected_services 包含 '{TEST_TARGET}'")
else:
print(f"[❌ FAIL] affected_services 不包含 '{TEST_TARGET}'")
passed = False
# 最終結論
print()
print("=" * 70)
if passed:
print("🎉 Race Condition 測試 PASSED!")
print(f" {CONCURRENT_SIGNALS} 筆併發告警全部成功聚合!")
print(" Lua Script 原子操作有效防止了資料遺失!")
else:
print("💥 Race Condition 測試 FAILED!")
print(" 存在資料遺失,需要進一步調查!")
print("=" * 70)
# 輸出詳細日誌指令
print("\n檢查詳細日誌:")
print("docker logs awoooi-api --tail 100 | grep -E '(atomic|aggregate|race)'")
if __name__ == "__main__":
asyncio.run(main())