Files
awoooi/apps/api/scripts/test_signal_stream.py
OG T 196d269b92 feat: add all application source code
- apps/api: FastAPI backend with Dockerfile
- apps/web: Next.js frontend with Dockerfile
- apps/sensor: Signal collection agent
- packages: shared packages

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-22 18:57:44 +08:00

84 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""
Phase 6.1 測試腳本: Redis Streams Signal 流程驗證
=================================================
功能:
1. 發送測試 Signal 到 /api/v1/webhooks/signals
2. 驗證 Redis Stream 中有新訊息
3. 輸出 Stream 狀態
使用:
python scripts/test_signal_stream.py
環境變數:
API_BASE_URL: API 基礎 URL (預設: http://localhost:8000)
"""
import asyncio
import json
import os
import sys
import httpx
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")
SIGNAL_ENDPOINT = f"{API_BASE_URL}/api/v1/webhooks/signals"
async def send_test_signal() -> dict:
"""發送測試 Signal"""
payload = {
"source": "test-script",
"alert_name": "TestSignal",
"severity": "warning",
"namespace": "awoooi-test",
"target": "test-pod-123",
"message": "Phase 6.1 Event Bus 驗證測試",
"labels": {"team": "devops", "env": "test"},
"annotations": {"runbook_url": "https://wiki.example.com/runbook"},
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(SIGNAL_ENDPOINT, json=payload)
response.raise_for_status()
return response.json()
async def main():
print("=" * 60)
print("Phase 6.1 Event Bus 測試")
print("=" * 60)
print()
print(f"[1] 發送測試 Signal 到 {SIGNAL_ENDPOINT}")
try:
result = await send_test_signal()
print(f" ✅ 成功!")
print(f" Message ID: {result.get('message_id')}")
print(f" Stream: {result.get('stream')}")
except httpx.HTTPStatusError as e:
print(f" ❌ HTTP 錯誤: {e.response.status_code}")
print(f" {e.response.text}")
sys.exit(1)
except Exception as e:
print(f" ❌ 錯誤: {e}")
sys.exit(1)
print()
print("[2] 驗證 Signal Worker (Consumer) 是否收到訊息")
print(" 查看 API 日誌: docker logs awoooi-api | grep signal_received")
print()
print("[3] 手動檢查 Redis Stream 狀態")
print(" redis-cli XINFO STREAM stream:awoooi_signals")
print(" redis-cli XINFO GROUPS stream:awoooi_signals")
print()
print("=" * 60)
print("測試完成!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())