Files
awoooi/apps/api/tests/e2e_network_test.py
OG T 31c9117ae7
All checks were successful
E2E Health Check / e2e-health (push) Successful in 24s
refactor(api): Phase 22 P1 模組化修復 - Router→Service 封裝
修復內容:

1. e2e_network_test.py: 移除 unittest.mock
   - 將 16 個 patch.object 改為 pytest monkeypatch
   - 符合 feedback_no_mock_testing.md

2. audit_logs.py: Router→Service 層封裝
   - 新增 AuditLogService (audit_log_service.py)
   - Router 改用 get_audit_log_service()
   - 移除直接 Repository 存取

3. incidents.py:463: DEBUG 端點重構
   - 移除 get_incident_repository() 直接呼叫
   - 完全透過 IncidentService 操作
   - 簡化回傳結構

遵循規範:
- Skill 09: Router 層禁止直接外部 API 呼叫
- feedback_lewooogo_modular_enforcement.md: Service 層封裝
- feedback_no_mock_testing.md: 禁止 MagicMock/AsyncMock

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-31 16:25:00 +08:00

511 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Phase 5 E2E 網路層測試 - HMAC 安全驗證 + Nonce 防重放
=====================================================
首席架構師要求: 必須真正撞擊網路端點,驗證安全機制有效性
Phase 22 P1 修復: 移除 unittest.mock使用 pytest monkeypatch
2026-03-31 Claude Code (首席架構師)
測試涵蓋:
1. HMAC 驗證 - 缺少 Header
2. HMAC 驗證 - 簽章錯誤
3. HMAC 驗證 - 正確簽章
4. Telegram Nonce - 重放攻擊防禦
5. Telegram 白名單 - 未授權使用者
使用方式:
cd apps/api && pytest tests/e2e_network_test.py -v
"""
import hashlib
import hmac
import json
import pytest
from httpx import ASGITransport, AsyncClient
from src.core.config import settings
from src.main import app
# =============================================================================
# Helper Functions
# =============================================================================
def compute_hmac_signature(secret: str, payload: dict) -> str:
"""計算 HMAC-SHA256 簽章"""
body = json.dumps(payload).encode()
signature = hmac.new(
secret.encode(),
body,
hashlib.sha256,
).hexdigest()
return f"sha256={signature}"
# =============================================================================
# Test Fixtures
# =============================================================================
@pytest.fixture
def hmac_secret():
"""測試用 HMAC Secret"""
return "test-hmac-secret-for-e2e-testing"
@pytest.fixture
def valid_alert_payload():
"""有效的告警 Payload"""
return {
"alert_type": "k8s_pod_crash",
"severity": "warning",
"source": "prometheus",
"target_resource": "test-pod-123",
"namespace": "default",
"message": "E2E Test Alert",
"metrics": {"cpu_percent": 50},
}
# =============================================================================
# Test: HMAC Verification
# =============================================================================
class TestHMACVerification:
"""HMAC 簽章驗證測試套件"""
@pytest.mark.asyncio
async def test_missing_hmac_header_in_prod(
self,
hmac_secret: str,
valid_alert_payload: dict,
monkeypatch,
):
"""
[Edge Case 1] 缺少 HMAC Header (生產環境)
預期: 401 Unauthorized
"""
# Phase 22 P1: 使用 monkeypatch 取代 patch.object
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/webhooks/alerts",
json=valid_alert_payload,
# 故意不帶 X-Signature-256 Header
)
assert response.status_code == 401
assert "HMAC verification failed" in response.json()["detail"]
assert "Missing X-Signature-256" in response.json()["detail"]
@pytest.mark.asyncio
async def test_missing_hmac_header_in_dev_without_secret(
self,
valid_alert_payload: dict,
monkeypatch,
):
"""
[Edge Case 2] 開發環境無 Secret 設定 - 允許跳過驗證
預期: 通過 (200) 或 業務邏輯錯誤 (非 401)
"""
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", "")
monkeypatch.setattr(settings, "ENVIRONMENT", "dev")
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/webhooks/alerts",
json=valid_alert_payload,
)
# 開發環境允許跳過 HMAC不應該是 401
assert response.status_code != 401
@pytest.mark.asyncio
async def test_wrong_hmac_signature(
self,
hmac_secret: str,
valid_alert_payload: dict,
monkeypatch,
):
"""
[Edge Case 3] HMAC 簽章錯誤
預期: 401 Unauthorized
"""
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/webhooks/alerts",
json=valid_alert_payload,
headers={
"X-Signature-256": "sha256=0000000000000000000000000000000000000000000000000000000000000000",
},
)
assert response.status_code == 401
assert "HMAC verification failed" in response.json()["detail"]
assert "Invalid signature" in response.json()["detail"]
@pytest.mark.asyncio
async def test_invalid_signature_format(
self,
hmac_secret: str,
valid_alert_payload: dict,
monkeypatch,
):
"""
[Edge Case 4] 簽章格式錯誤 (非 sha256= 開頭)
預期: 401 Unauthorized
"""
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/webhooks/alerts",
json=valid_alert_payload,
headers={
"X-Signature-256": "md5=invalid_format",
},
)
assert response.status_code == 401
assert "Invalid signature format" in response.json()["detail"]
@pytest.mark.asyncio
async def test_valid_hmac_signature(
self,
hmac_secret: str,
valid_alert_payload: dict,
monkeypatch,
):
"""
[Happy Path] 正確的 HMAC 簽章
預期: 通過 HMAC 驗證 (200 或業務邏輯錯誤,但非 401)
注意: 必須使用與 httpx 相同的 JSON 序列化方式
"""
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
# 使用與 httpx 相同的 JSON 序列化 (separators 無空格)
body = json.dumps(valid_alert_payload, separators=(",", ":")).encode()
signature = "sha256=" + hmac.new(
hmac_secret.encode(),
body,
hashlib.sha256,
).hexdigest()
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/webhooks/alerts",
content=body,
headers={
"Content-Type": "application/json",
"X-Signature-256": signature,
},
)
# 不應該是 401 (HMAC 錯誤)
# 可能是 200 或其他業務錯誤 (如 DB 連線)
assert response.status_code != 401, f"HMAC 驗證應該通過,但收到: {response.json()}"
@pytest.mark.asyncio
async def test_hmac_secret_missing_in_prod_blocks_request(
self,
valid_alert_payload: dict,
monkeypatch,
):
"""
[Edge Case 5] 生產環境未設定 Secret - Fail-Closed
預期: 401 Unauthorized (嚴禁跳過)
"""
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", "")
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/webhooks/alerts",
json=valid_alert_payload,
)
assert response.status_code == 401
assert "WEBHOOK_HMAC_SECRET missing in production" in response.json()["detail"]
# =============================================================================
# Test: Telegram Security Interceptor
# =============================================================================
class TestTelegramSecurityInterceptor:
"""Telegram 安全攔截器測試套件"""
def test_nonce_generation_and_parsing(self):
"""
[Unit Test] Nonce 生成與解析
驗證 Nonce 結構正確
"""
from src.services.security_interceptor import TelegramSecurityInterceptor
interceptor = TelegramSecurityInterceptor()
# 生成 Nonce
approval_id = "test-approval-123"
action = "approve"
nonce = interceptor.generate_callback_nonce(approval_id, action)
# 解析 Nonce
parsed = interceptor.parse_callback_data(nonce)
assert parsed["action"] == action
assert parsed["approval_id"] == approval_id
assert "nonce" in parsed
@pytest.mark.asyncio
async def test_nonce_replay_attack_blocked(self, monkeypatch):
"""
[Edge Case] Nonce 重放攻擊 - 必須被阻擋
同一個 Nonce 第二次使用應該被拒絕
"""
from src.services.security_interceptor import (
NonceReplayError,
TelegramSecurityInterceptor,
)
interceptor = TelegramSecurityInterceptor()
await interceptor.initialize()
# 生成 Nonce
approval_id = "replay-test-456"
nonce = interceptor.generate_callback_nonce(approval_id, "approve")
parsed = interceptor.parse_callback_data(nonce)
# 模擬白名單使用者
monkeypatch.setattr(settings, "OPENCLAW_TG_USER_WHITELIST", [12345])
# 第一次使用 - 應該成功
user = await interceptor.verify_callback(
user_id=12345,
callback_id="callback-1",
nonce=parsed["nonce"],
)
assert user.is_whitelisted
# 第二次使用相同 Nonce - 應該被阻擋
with pytest.raises(NonceReplayError):
await interceptor.verify_callback(
user_id=12345,
callback_id="callback-2",
nonce=parsed["nonce"],
)
@pytest.mark.asyncio
async def test_whitelist_enforcement(self, monkeypatch):
"""
[Edge Case] 白名單驗證 - 未授權使用者
非白名單使用者應該被拒絕
"""
from src.services.security_interceptor import (
TelegramSecurityInterceptor,
UserNotWhitelistedError,
)
interceptor = TelegramSecurityInterceptor()
await interceptor.initialize()
# 設定白名單只有 12345
monkeypatch.setattr(settings, "OPENCLAW_TG_USER_WHITELIST", [12345])
# 白名單使用者 - 應該通過
assert interceptor.is_whitelisted(12345) is True
# 非白名單使用者 - 應該被拒絕
assert interceptor.is_whitelisted(99999) is False
# 嘗試驗證非白名單使用者 - 應該拋出例外
with pytest.raises(UserNotWhitelistedError):
await interceptor.verify_callback(
user_id=99999,
callback_id="callback-blocked",
nonce=None,
)
# =============================================================================
# Test: Telegram Webhook Endpoint
# =============================================================================
class TestTelegramWebhook:
"""Telegram Webhook 端點測試"""
@pytest.mark.asyncio
async def test_webhook_ignores_non_callback_query(self):
"""
[Edge Case] 非 callback_query 的 Update 應該被忽略
預期: 200 OK, 但無實際處理
"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/telegram/webhook",
json={
"update_id": 123456,
"message": {
"text": "Hello",
},
},
)
assert response.status_code == 200
data = response.json()
assert data["ok"] is True
assert "Ignored" in data["message"]
@pytest.mark.asyncio
async def test_webhook_rejects_invalid_callback_data(self):
"""
[Edge Case] 缺少必要欄位的 callback_query
預期: 200 OK, 但回傳錯誤訊息
"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
response = await client.post(
"/api/v1/telegram/webhook",
json={
"update_id": 123456,
"callback_query": {
"id": "callback-123",
# 缺少 data 和 from
},
},
)
assert response.status_code == 200
data = response.json()
assert data["ok"] is False
assert "Invalid callback data" in data["message"]
# =============================================================================
# Test: Shadow Mode (物理繳械)
# =============================================================================
class TestShadowMode:
"""影子模式測試 - 確保物理繳械有效"""
def test_shadow_mode_config_exists(self):
"""
[Config] SHADOW_MODE_ENABLED 設定存在
預期: 設定存在且預設為 True
"""
assert hasattr(settings, "SHADOW_MODE_ENABLED")
# 影子模式預設應該開啟 (安全優先)
assert settings.SHADOW_MODE_ENABLED is True
@pytest.mark.asyncio
async def test_executor_respects_shadow_mode(self, monkeypatch):
"""
[Executor] 影子模式下強制 Dry-Run
預期: 執行操作時僅記錄,不真正執行
"""
from src.services.executor import ActionExecutor
executor = ActionExecutor()
# 確保影子模式開啟
monkeypatch.setattr(settings, "SHADOW_MODE_ENABLED", True)
# 測試 DELETE_POD - 應該被攔截
result = await executor.delete_pod("test-pod", "default")
assert result.success is True
assert "[SHADOW MODE]" in result.message
assert result.k8s_response["shadow_mode"] is True
assert result.k8s_response["dry_run"] is True
# 測試 RESTART_DEPLOYMENT - 應該被攔截
result = await executor.restart_deployment("test-deploy", "default")
assert result.success is True
assert "[SHADOW MODE]" in result.message
assert result.k8s_response["shadow_mode"] is True
# =============================================================================
# Integration Test Summary
# =============================================================================
class TestIntegrationSummary:
"""整合測試摘要 - 確保所有端點可達"""
@pytest.mark.asyncio
async def test_health_endpoints_accessible(self):
"""驗證健康檢查端點可達"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
# Webhook 健康檢查
response = await client.get("/api/v1/webhooks/health")
assert response.status_code == 200
# Telegram 健康檢查
response = await client.get("/api/v1/telegram/health")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_api_docs_accessible(self):
"""驗證 API 文檔可達"""
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
# Docs 位於 /api/v1/docs
response = await client.get("/api/v1/docs")
assert response.status_code == 200
response = await client.get("/api/v1/openapi.json")
assert response.status_code == 200
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])