- apps/api: FastAPI backend with Dockerfile - apps/web: Next.js frontend with Dockerfile - apps/sensor: Signal collection agent - packages: shared packages Co-Authored-By: Claude <noreply@anthropic.com>
326 lines
13 KiB
Python
326 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Webhook → Telegram 全鏈路整合測試
|
||
==================================
|
||
Phase 5: 修復一級整合事故
|
||
|
||
測試涵蓋:
|
||
1. 新告警 → 自動推送 Telegram
|
||
2. 收斂告警 → 也必須推送 Telegram (含聚合次數)
|
||
3. 斷言 TelegramGateway.send_approval_card 被正確參數呼叫
|
||
4. 驗證 SOUL.md 格式資料完整性
|
||
|
||
使用方式:
|
||
cd apps/api && pytest tests/test_webhook_telegram_integration.py -v
|
||
"""
|
||
|
||
import json
|
||
import pytest
|
||
from unittest.mock import AsyncMock, patch, MagicMock
|
||
from uuid import UUID
|
||
|
||
import httpx
|
||
from httpx import ASGITransport, AsyncClient
|
||
|
||
from src.main import app
|
||
from src.core.config import settings
|
||
|
||
|
||
# =============================================================================
|
||
# Test Fixtures
|
||
# =============================================================================
|
||
|
||
@pytest.fixture
|
||
def valid_alert_payload():
|
||
"""有效的告警 Payload"""
|
||
return {
|
||
"alert_type": "k8s_pod_crash",
|
||
"severity": "critical",
|
||
"source": "prometheus",
|
||
"target_resource": "harbor-core-7d4b8c9f5-xk2m3",
|
||
"namespace": "harbor",
|
||
"message": "Pod terminated due to OOMKilled",
|
||
"metrics": {"memory_percent": 99.8, "restart_count": 5},
|
||
"labels": {"app": "harbor-core", "reason": "OOMKilled"},
|
||
}
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_approval_service():
|
||
"""Mock ApprovalService"""
|
||
mock_service = AsyncMock()
|
||
|
||
# Mock find_by_fingerprint 回傳 None (新告警)
|
||
mock_service.find_by_fingerprint.return_value = None
|
||
|
||
# Mock create_approval_with_fingerprint 回傳模擬的 Approval
|
||
mock_approval = MagicMock()
|
||
mock_approval.id = UUID("12345678-1234-5678-1234-567812345678")
|
||
mock_approval.status.value = "pending"
|
||
mock_approval.risk_level.value = "critical"
|
||
mock_approval.action = "kubectl delete pod harbor-core-7d4b8c9f5-xk2m3 -n harbor"
|
||
mock_approval.hit_count = 1
|
||
mock_service.create_approval_with_fingerprint.return_value = mock_approval
|
||
|
||
return mock_service
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_converged_approval_service():
|
||
"""Mock ApprovalService - 收斂情境"""
|
||
mock_service = AsyncMock()
|
||
|
||
# Mock find_by_fingerprint 回傳現有的 Approval (收斂)
|
||
existing_approval = MagicMock()
|
||
existing_approval.id = UUID("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee")
|
||
existing_approval.hit_count = 5
|
||
existing_approval.risk_level.value = "critical"
|
||
existing_approval.action = "kubectl delete pod harbor-core -n harbor"
|
||
mock_service.find_by_fingerprint.return_value = existing_approval
|
||
|
||
# Mock increment_hit_count
|
||
updated_approval = MagicMock()
|
||
updated_approval.id = existing_approval.id
|
||
updated_approval.hit_count = 6 # 聚合後 +1
|
||
updated_approval.risk_level.value = "critical"
|
||
updated_approval.action = "kubectl delete pod harbor-core -n harbor"
|
||
mock_service.increment_hit_count.return_value = updated_approval
|
||
|
||
return mock_service
|
||
|
||
|
||
# =============================================================================
|
||
# Test: 新告警 → Telegram 推送
|
||
# =============================================================================
|
||
|
||
class TestNewAlertTelegramPush:
|
||
"""新告警必須推送到 Telegram"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_new_alert_triggers_telegram_push(
|
||
self,
|
||
valid_alert_payload: dict,
|
||
mock_approval_service,
|
||
):
|
||
"""
|
||
[核心斷言] 新告警建立 ApprovalRecord 後,
|
||
必須呼叫 TelegramGateway.send_approval_card()
|
||
"""
|
||
mock_telegram_gateway = AsyncMock()
|
||
mock_telegram_gateway.send_approval_card = AsyncMock(return_value={"ok": True})
|
||
|
||
with patch("src.api.v1.webhooks.get_approval_service", return_value=mock_approval_service):
|
||
with patch("src.api.v1.webhooks.get_openclaw") as mock_openclaw:
|
||
# Mock OpenClaw 回傳 None (使用靜態分析)
|
||
mock_openclaw.return_value.analyze_alert = AsyncMock(
|
||
return_value=(None, "mock", "")
|
||
)
|
||
|
||
with patch("src.api.v1.webhooks.get_telegram_gateway", return_value=mock_telegram_gateway):
|
||
with patch.object(settings, "OPENCLAW_TG_BOT_TOKEN", "test-token"):
|
||
with patch.object(settings, "WEBHOOK_HMAC_SECRET", ""):
|
||
with patch.object(settings, "ENVIRONMENT", "dev"):
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
)
|
||
|
||
# 驗證 HTTP 回應
|
||
assert response.status_code == 200
|
||
data = response.json()
|
||
assert data["success"] is True
|
||
assert data["approval_created"] is True
|
||
|
||
# =====================================================================
|
||
# [核心斷言] TelegramGateway.send_approval_card 必須被呼叫
|
||
# =====================================================================
|
||
# 因為使用 BackgroundTasks,需要等待一下
|
||
import asyncio
|
||
await asyncio.sleep(0.1)
|
||
|
||
mock_telegram_gateway.send_approval_card.assert_called_once()
|
||
|
||
# 驗證呼叫參數符合 SOUL.md 格式
|
||
call_kwargs = mock_telegram_gateway.send_approval_card.call_args.kwargs
|
||
assert "approval_id" in call_kwargs
|
||
assert call_kwargs["approval_id"] == "12345678-1234-5678-1234-567812345678"
|
||
assert "risk_level" in call_kwargs
|
||
assert "resource_name" in call_kwargs
|
||
assert call_kwargs["resource_name"] == "harbor-core-7d4b8c9f5-xk2m3"
|
||
assert "root_cause" in call_kwargs
|
||
assert "suggested_action" in call_kwargs
|
||
|
||
|
||
# =============================================================================
|
||
# Test: 收斂告警 → Telegram 推送 (含聚合次數)
|
||
# =============================================================================
|
||
|
||
class TestConvergedAlertTelegramPush:
|
||
"""收斂告警也必須推送到 Telegram"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_converged_alert_also_triggers_telegram_push(
|
||
self,
|
||
valid_alert_payload: dict,
|
||
mock_converged_approval_service,
|
||
):
|
||
"""
|
||
[核心斷言] 收斂告警 (相同指紋) 聚合後,
|
||
也必須推送 Telegram,並包含聚合次數
|
||
"""
|
||
mock_telegram_gateway = AsyncMock()
|
||
mock_telegram_gateway.send_approval_card = AsyncMock(return_value={"ok": True})
|
||
|
||
with patch("src.api.v1.webhooks.get_approval_service", return_value=mock_converged_approval_service):
|
||
with patch("src.api.v1.webhooks.get_telegram_gateway", return_value=mock_telegram_gateway):
|
||
with patch.object(settings, "OPENCLAW_TG_BOT_TOKEN", "test-token"):
|
||
with patch.object(settings, "WEBHOOK_HMAC_SECRET", ""):
|
||
with patch.object(settings, "ENVIRONMENT", "dev"):
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
)
|
||
|
||
# 驗證 HTTP 回應
|
||
assert response.status_code == 200
|
||
data = response.json()
|
||
assert data["success"] is True
|
||
assert data["converged"] is True
|
||
assert data["hit_count"] == 6 # 5 + 1
|
||
|
||
# =====================================================================
|
||
# [核心斷言] 收斂告警也必須呼叫 TelegramGateway
|
||
# =====================================================================
|
||
import asyncio
|
||
await asyncio.sleep(0.1)
|
||
|
||
mock_telegram_gateway.send_approval_card.assert_called_once()
|
||
|
||
# 驗證聚合次數被嵌入 root_cause 字串
|
||
call_kwargs = mock_telegram_gateway.send_approval_card.call_args.kwargs
|
||
assert "[x6]" in call_kwargs["root_cause"], \
|
||
f"hit_count should be embedded in root_cause, got: {call_kwargs['root_cause']}"
|
||
|
||
|
||
# =============================================================================
|
||
# Test: Telegram 推送失敗不影響主流程
|
||
# =============================================================================
|
||
|
||
class TestTelegramPushFailureIsolation:
|
||
"""Telegram 推送失敗不應影響 Webhook 回應"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_telegram_failure_does_not_break_webhook(
|
||
self,
|
||
valid_alert_payload: dict,
|
||
mock_approval_service,
|
||
):
|
||
"""
|
||
[防禦性] Telegram API 錯誤時,Webhook 仍應回傳 200
|
||
"""
|
||
mock_telegram_gateway = AsyncMock()
|
||
# 模擬 Telegram API 失敗
|
||
mock_telegram_gateway.send_approval_card = AsyncMock(
|
||
side_effect=Exception("Telegram API timeout")
|
||
)
|
||
|
||
with patch("src.api.v1.webhooks.get_approval_service", return_value=mock_approval_service):
|
||
with patch("src.api.v1.webhooks.get_openclaw") as mock_openclaw:
|
||
mock_openclaw.return_value.analyze_alert = AsyncMock(
|
||
return_value=(None, "mock", "")
|
||
)
|
||
with patch("src.api.v1.webhooks.get_telegram_gateway", return_value=mock_telegram_gateway):
|
||
with patch.object(settings, "OPENCLAW_TG_BOT_TOKEN", "test-token"):
|
||
with patch.object(settings, "WEBHOOK_HMAC_SECRET", ""):
|
||
with patch.object(settings, "ENVIRONMENT", "dev"):
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
)
|
||
|
||
# =====================================================================
|
||
# [核心斷言] 即使 Telegram 失敗,Webhook 仍回傳 200
|
||
# =====================================================================
|
||
assert response.status_code == 200
|
||
data = response.json()
|
||
assert data["success"] is True
|
||
assert data["approval_created"] is True
|
||
|
||
|
||
# =============================================================================
|
||
# Test: SOUL.md 格式驗證
|
||
# =============================================================================
|
||
|
||
class TestSOULMDFormatCompliance:
|
||
"""驗證推送資料符合 SOUL.md 格式規範"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_telegram_payload_respects_soul_md_limits(
|
||
self,
|
||
mock_approval_service,
|
||
):
|
||
"""
|
||
[SOUL.md] 驗證字數限制:
|
||
- resource_name: 50 字元
|
||
- root_cause: 100 字元
|
||
- suggested_action: 50 字元
|
||
"""
|
||
# 超長資料
|
||
long_alert_payload = {
|
||
"alert_type": "k8s_pod_crash",
|
||
"severity": "critical",
|
||
"source": "prometheus",
|
||
"target_resource": "x" * 100, # 超過 50 字元
|
||
"namespace": "default",
|
||
"message": "y" * 200, # 超過 100 字元
|
||
"metrics": {},
|
||
}
|
||
|
||
mock_telegram_gateway = AsyncMock()
|
||
mock_telegram_gateway.send_approval_card = AsyncMock(return_value={"ok": True})
|
||
|
||
with patch("src.api.v1.webhooks.get_approval_service", return_value=mock_approval_service):
|
||
with patch("src.api.v1.webhooks.get_openclaw") as mock_openclaw:
|
||
mock_openclaw.return_value.analyze_alert = AsyncMock(
|
||
return_value=(None, "mock", "")
|
||
)
|
||
with patch("src.api.v1.webhooks.get_telegram_gateway", return_value=mock_telegram_gateway):
|
||
with patch.object(settings, "OPENCLAW_TG_BOT_TOKEN", "test-token"):
|
||
with patch.object(settings, "WEBHOOK_HMAC_SECRET", ""):
|
||
with patch.object(settings, "ENVIRONMENT", "dev"):
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=long_alert_payload,
|
||
)
|
||
|
||
assert response.status_code == 200
|
||
|
||
import asyncio
|
||
await asyncio.sleep(0.1)
|
||
|
||
# 驗證呼叫參數已被截斷
|
||
call_kwargs = mock_telegram_gateway.send_approval_card.call_args.kwargs
|
||
assert len(call_kwargs["resource_name"]) <= 50
|
||
assert len(call_kwargs["root_cause"]) <= 100
|
||
assert len(call_kwargs["suggested_action"]) <= 50
|
||
|
||
|
||
if __name__ == "__main__":
|
||
pytest.main([__file__, "-v", "--tb=short"])
|