- Python: ruff --fix 修復 280 個 lint 錯誤 - lewooogo-core: src/ 目錄未追蹤,導致 CI eslint 失敗 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
287 lines
8.5 KiB
Python
Executable File
287 lines
8.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Phase 6.3 Race Condition 測試腳本
|
||
==================================
|
||
|
||
功能:
|
||
1. 使用 asyncio.gather 同時發射 20 筆同源告警
|
||
2. 證明 Lua Script 原子操作成功擋下 Race Condition
|
||
3. 驗證最終 Incident JSON 精準包含 20 筆 Signals
|
||
|
||
使用方式:
|
||
cd apps/api
|
||
python scripts/test_race_condition.py
|
||
|
||
預期結果:
|
||
- 只有 1 個 Incident 被建立
|
||
- signals 陣列長度 = 20
|
||
- 無任何 Signal 遺失
|
||
|
||
統帥鐵律:
|
||
- 嚴禁人工 QA
|
||
- 必須程式化驗證
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
from datetime import datetime
|
||
|
||
import httpx
|
||
|
||
# API 端點
|
||
API_BASE = "http://localhost:8000"
|
||
SIGNALS_ENDPOINT = f"{API_BASE}/api/v1/webhooks/signals"
|
||
|
||
# 併發數量
|
||
CONCURRENT_SIGNALS = 20
|
||
|
||
# 測試 namespace 和 target (同源)
|
||
TEST_NAMESPACE = "race-test-ns"
|
||
TEST_TARGET = "race-test-service"
|
||
|
||
|
||
def generate_alert(index: int) -> dict:
|
||
"""生成測試告警 (同 namespace + 同 target,不同 alert_name)"""
|
||
return {
|
||
"alert_name": f"RaceConditionTest_{index:03d}",
|
||
"severity": "warning",
|
||
"source": "prometheus",
|
||
"namespace": TEST_NAMESPACE,
|
||
"target": TEST_TARGET,
|
||
"fingerprint": f"fp_race_{index:03d}", # 唯一 fingerprint 防止去重
|
||
"labels": {
|
||
"namespace": TEST_NAMESPACE,
|
||
"test_index": str(index),
|
||
},
|
||
"annotations": {
|
||
"summary": f"Race condition test signal #{index}",
|
||
},
|
||
}
|
||
|
||
|
||
async def send_alert(client: httpx.AsyncClient, index: int) -> dict:
|
||
"""發送單一告警"""
|
||
alert = generate_alert(index)
|
||
try:
|
||
response = await client.post(
|
||
SIGNALS_ENDPOINT,
|
||
json=alert,
|
||
timeout=30.0,
|
||
)
|
||
return {
|
||
"index": index,
|
||
"status_code": response.status_code,
|
||
"message_id": response.json().get("message_id"),
|
||
"success": response.status_code == 200,
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"index": index,
|
||
"status_code": 0,
|
||
"message_id": None,
|
||
"success": False,
|
||
"error": str(e),
|
||
}
|
||
|
||
|
||
async def fire_concurrent_alerts() -> list[dict]:
|
||
"""併發發射所有告警"""
|
||
async with httpx.AsyncClient() as client:
|
||
tasks = [send_alert(client, i) for i in range(CONCURRENT_SIGNALS)]
|
||
results = await asyncio.gather(*tasks)
|
||
return list(results)
|
||
|
||
|
||
async def verify_redis_incident() -> dict | None:
|
||
"""從 Redis 查詢 Incident 並驗證"""
|
||
import subprocess
|
||
|
||
# 查詢所有 incident keys
|
||
result = subprocess.run(
|
||
["docker", "exec", "awoooi-redis", "redis-cli", "KEYS", "incident:INC-*"],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
keys = [k.strip() for k in result.stdout.strip().split("\n") if k.strip()]
|
||
|
||
if not keys:
|
||
return None
|
||
|
||
# 找到最新的 Incident (假設測試環境已清空)
|
||
# 對於測試,我們檢查所有 incident 找到包含 race-test-ns 的那個
|
||
for key in keys:
|
||
get_result = subprocess.run(
|
||
["docker", "exec", "awoooi-redis", "redis-cli", "GET", key],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
if get_result.stdout.strip():
|
||
try:
|
||
incident = json.loads(get_result.stdout.strip())
|
||
# 檢查是否是我們的測試 Incident
|
||
if any(
|
||
s.get("labels", {}).get("namespace") == TEST_NAMESPACE
|
||
for s in incident.get("signals", [])
|
||
):
|
||
return incident
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
return None
|
||
|
||
|
||
async def main():
|
||
"""主測試流程"""
|
||
print("=" * 70)
|
||
print("Phase 6.3 Race Condition 併發測試")
|
||
print("=" * 70)
|
||
print(f"時間: {datetime.now().isoformat()}")
|
||
print(f"併發數量: {CONCURRENT_SIGNALS} 筆告警")
|
||
print(f"測試 Namespace: {TEST_NAMESPACE}")
|
||
print(f"測試 Target: {TEST_TARGET}")
|
||
print()
|
||
|
||
# 0. 清除舊的測試 Incident (可選)
|
||
print("[0] 準備測試環境...")
|
||
import subprocess
|
||
|
||
# 刪除舊的索引 (如果存在)
|
||
subprocess.run(
|
||
[
|
||
"docker", "exec", "awoooi-redis", "redis-cli",
|
||
"DEL",
|
||
f"incident:idx:ns:{TEST_NAMESPACE}",
|
||
f"incident:idx:target:{TEST_TARGET}",
|
||
],
|
||
capture_output=True,
|
||
)
|
||
print(" 已清除舊索引")
|
||
|
||
# 1. 檢查 API
|
||
print("\n[1] 檢查 API 健康狀態...")
|
||
async with httpx.AsyncClient() as client:
|
||
try:
|
||
health = await client.get(f"{API_BASE}/api/v1/health", timeout=5.0)
|
||
print(f" API status: {health.status_code}")
|
||
except Exception as e:
|
||
print(f" API 連線失敗: {e}")
|
||
print(" 請確認 API 已啟動: docker compose up -d")
|
||
return
|
||
|
||
# 2. 併發發射告警
|
||
print("\n" + "-" * 70)
|
||
print("[2] 併發發射 20 筆告警 (asyncio.gather)")
|
||
print("-" * 70)
|
||
|
||
start_time = datetime.now()
|
||
results = await fire_concurrent_alerts()
|
||
end_time = datetime.now()
|
||
duration = (end_time - start_time).total_seconds()
|
||
|
||
success_count = sum(1 for r in results if r["success"])
|
||
fail_count = sum(1 for r in results if not r["success"])
|
||
|
||
print("\n發射結果:")
|
||
print(f" 成功: {success_count}/{CONCURRENT_SIGNALS}")
|
||
print(f" 失敗: {fail_count}/{CONCURRENT_SIGNALS}")
|
||
print(f" 耗時: {duration:.3f} 秒")
|
||
|
||
if fail_count > 0:
|
||
print("\n失敗詳情:")
|
||
for r in results:
|
||
if not r["success"]:
|
||
print(f" - Index {r['index']}: {r.get('error', 'Unknown')}")
|
||
|
||
# 3. 等待 Consumer 處理
|
||
print("\n" + "-" * 70)
|
||
print("[3] 等待 Consumer 處理 (5 秒)")
|
||
print("-" * 70)
|
||
await asyncio.sleep(5)
|
||
|
||
# 4. 驗證 Redis Incident
|
||
print("\n" + "-" * 70)
|
||
print("[4] 驗證 Redis Incident")
|
||
print("-" * 70)
|
||
|
||
incident = await verify_redis_incident()
|
||
|
||
if not incident:
|
||
print("\n❌ 錯誤: 找不到測試 Incident!")
|
||
print(" 請檢查 API 日誌: docker logs awoooi-api --tail 100")
|
||
return
|
||
|
||
incident_id = incident.get("incident_id", "N/A")
|
||
signals = incident.get("signals", [])
|
||
signal_count = len(signals)
|
||
severity = incident.get("severity", "N/A")
|
||
affected_services = incident.get("affected_services", [])
|
||
|
||
print("\n找到 Incident:")
|
||
print(f" incident_id: {incident_id}")
|
||
print(f" signal_count: {signal_count}")
|
||
print(f" severity: {severity}")
|
||
print(f" affected_services: {affected_services}")
|
||
|
||
# 5. 驗證結果
|
||
print("\n" + "=" * 70)
|
||
print("驗證結果")
|
||
print("=" * 70)
|
||
|
||
# 計算聚合的告警數量
|
||
race_signals = [
|
||
s for s in signals
|
||
if s.get("alert_name", "").startswith("RaceConditionTest_")
|
||
]
|
||
race_signal_count = len(race_signals)
|
||
|
||
# 檢查告警名稱分布
|
||
alert_names = [s.get("alert_name") for s in race_signals]
|
||
unique_names = set(alert_names)
|
||
|
||
print()
|
||
passed = True
|
||
|
||
# 驗證 1: signal_count
|
||
if race_signal_count == CONCURRENT_SIGNALS:
|
||
print(f"[✅ PASS] Signal 數量: {race_signal_count}/{CONCURRENT_SIGNALS}")
|
||
else:
|
||
print(f"[❌ FAIL] Signal 數量: {race_signal_count}/{CONCURRENT_SIGNALS}")
|
||
print(f" 遺失 {CONCURRENT_SIGNALS - race_signal_count} 筆 Signal!")
|
||
passed = False
|
||
|
||
# 驗證 2: unique names (無重複跳過)
|
||
if len(unique_names) == race_signal_count:
|
||
print(f"[✅ PASS] 唯一告警名稱: {len(unique_names)} 個 (無重複)")
|
||
else:
|
||
print(f"[❌ FAIL] 唯一告警名稱: {len(unique_names)} 個 (有重複被覆蓋)")
|
||
passed = False
|
||
|
||
# 驗證 3: affected_services
|
||
if TEST_TARGET in affected_services:
|
||
print(f"[✅ PASS] affected_services 包含 '{TEST_TARGET}'")
|
||
else:
|
||
print(f"[❌ FAIL] affected_services 不包含 '{TEST_TARGET}'")
|
||
passed = False
|
||
|
||
# 最終結論
|
||
print()
|
||
print("=" * 70)
|
||
if passed:
|
||
print("🎉 Race Condition 測試 PASSED!")
|
||
print(f" {CONCURRENT_SIGNALS} 筆併發告警全部成功聚合!")
|
||
print(" Lua Script 原子操作有效防止了資料遺失!")
|
||
else:
|
||
print("💥 Race Condition 測試 FAILED!")
|
||
print(" 存在資料遺失,需要進一步調查!")
|
||
print("=" * 70)
|
||
|
||
# 輸出詳細日誌指令
|
||
print("\n檢查詳細日誌:")
|
||
print("docker logs awoooi-api --tail 100 | grep -E '(atomic|aggregate|race)'")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|