#!/usr/bin/env python3 """ Phase 5 E2E 網路層測試 - HMAC 安全驗證 + Nonce 防重放 ===================================================== 首席架構師要求: 必須真正撞擊網路端點,驗證安全機制有效性 Phase 22 P1 修復: 移除 unittest.mock,使用 pytest monkeypatch 2026-03-31 Claude Code (首席架構師) 測試涵蓋: 1. HMAC 驗證 - 缺少 Header 2. HMAC 驗證 - 簽章錯誤 3. HMAC 驗證 - 正確簽章 4. Telegram Nonce - 重放攻擊防禦 5. Telegram 白名單 - 未授權使用者 使用方式: cd apps/api && pytest tests/e2e_network_test.py -v """ import hashlib import hmac import json import pytest from httpx import ASGITransport, AsyncClient from src.core.config import settings from src.main import app # ============================================================================= # Helper Functions # ============================================================================= def compute_hmac_signature(secret: str, payload: dict) -> str: """計算 HMAC-SHA256 簽章""" body = json.dumps(payload).encode() signature = hmac.new( secret.encode(), body, hashlib.sha256, ).hexdigest() return f"sha256={signature}" # ============================================================================= # Test Fixtures # ============================================================================= @pytest.fixture def hmac_secret(): """測試用 HMAC Secret""" return "test-hmac-secret-for-e2e-testing" @pytest.fixture def valid_alert_payload(): """有效的告警 Payload""" return { "alert_type": "k8s_pod_crash", "severity": "warning", "source": "prometheus", "target_resource": "test-pod-123", "namespace": "default", "message": "E2E Test Alert", "metrics": {"cpu_percent": 50}, } # ============================================================================= # Test: HMAC Verification # ============================================================================= class TestHMACVerification: """HMAC 簽章驗證測試套件""" @pytest.mark.asyncio async def test_missing_hmac_header_in_prod( self, hmac_secret: str, valid_alert_payload: dict, monkeypatch, ): """ [Edge Case 1] 缺少 HMAC Header (生產環境) 預期: 401 Unauthorized """ # Phase 22 P1: 使用 monkeypatch 取代 patch.object monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret) monkeypatch.setattr(settings, "ENVIRONMENT", "prod") async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/webhooks/alerts", json=valid_alert_payload, # 故意不帶 X-Signature-256 Header ) assert response.status_code == 401 assert "HMAC verification failed" in response.json()["detail"] assert "Missing X-Signature-256" in response.json()["detail"] @pytest.mark.asyncio async def test_missing_hmac_header_in_dev_without_secret( self, valid_alert_payload: dict, monkeypatch, ): """ [Edge Case 2] 開發環境無 Secret 設定 - 允許跳過驗證 預期: 通過 (200) 或 業務邏輯錯誤 (非 401) """ monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", "") monkeypatch.setattr(settings, "ENVIRONMENT", "dev") async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/webhooks/alerts", json=valid_alert_payload, ) # 開發環境允許跳過 HMAC,不應該是 401 assert response.status_code != 401 @pytest.mark.asyncio async def test_wrong_hmac_signature( self, hmac_secret: str, valid_alert_payload: dict, monkeypatch, ): """ [Edge Case 3] HMAC 簽章錯誤 預期: 401 Unauthorized """ monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret) monkeypatch.setattr(settings, "ENVIRONMENT", "prod") async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/webhooks/alerts", json=valid_alert_payload, headers={ "X-Signature-256": "sha256=0000000000000000000000000000000000000000000000000000000000000000", }, ) assert response.status_code == 401 assert "HMAC verification failed" in response.json()["detail"] assert "Invalid signature" in response.json()["detail"] @pytest.mark.asyncio async def test_invalid_signature_format( self, hmac_secret: str, valid_alert_payload: dict, monkeypatch, ): """ [Edge Case 4] 簽章格式錯誤 (非 sha256= 開頭) 預期: 401 Unauthorized """ monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret) monkeypatch.setattr(settings, "ENVIRONMENT", "prod") async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/webhooks/alerts", json=valid_alert_payload, headers={ "X-Signature-256": "md5=invalid_format", }, ) assert response.status_code == 401 assert "Invalid signature format" in response.json()["detail"] @pytest.mark.asyncio async def test_valid_hmac_signature( self, hmac_secret: str, valid_alert_payload: dict, monkeypatch, ): """ [Happy Path] 正確的 HMAC 簽章 預期: 通過 HMAC 驗證 (200 或業務邏輯錯誤,但非 401) 注意: 必須使用與 httpx 相同的 JSON 序列化方式 """ monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret) monkeypatch.setattr(settings, "ENVIRONMENT", "prod") # 使用與 httpx 相同的 JSON 序列化 (separators 無空格) body = json.dumps(valid_alert_payload, separators=(",", ":")).encode() signature = "sha256=" + hmac.new( hmac_secret.encode(), body, hashlib.sha256, ).hexdigest() async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/webhooks/alerts", content=body, headers={ "Content-Type": "application/json", "X-Signature-256": signature, }, ) # 不應該是 401 (HMAC 錯誤) # 可能是 200 或其他業務錯誤 (如 DB 連線) assert response.status_code != 401, f"HMAC 驗證應該通過,但收到: {response.json()}" @pytest.mark.asyncio async def test_hmac_secret_missing_in_prod_blocks_request( self, valid_alert_payload: dict, monkeypatch, ): """ [Edge Case 5] 生產環境未設定 Secret - Fail-Closed 預期: 401 Unauthorized (嚴禁跳過) """ monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", "") monkeypatch.setattr(settings, "ENVIRONMENT", "prod") async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/webhooks/alerts", json=valid_alert_payload, ) assert response.status_code == 401 assert "WEBHOOK_HMAC_SECRET missing in production" in response.json()["detail"] # ============================================================================= # Test: Telegram Security Interceptor # ============================================================================= class TestTelegramSecurityInterceptor: """Telegram 安全攔截器測試套件""" def test_nonce_generation_and_parsing(self): """ [Unit Test] Nonce 生成與解析 驗證 Nonce 結構正確 """ from src.services.security_interceptor import TelegramSecurityInterceptor interceptor = TelegramSecurityInterceptor() # 生成 Nonce approval_id = "test-approval-123" action = "approve" nonce = interceptor.generate_callback_nonce(approval_id, action) # 解析 Nonce parsed = interceptor.parse_callback_data(nonce) assert parsed["action"] == action assert parsed["approval_id"] == approval_id assert "nonce" in parsed @pytest.mark.asyncio async def test_nonce_replay_attack_blocked(self, monkeypatch): """ [Edge Case] Nonce 重放攻擊 - 必須被阻擋 同一個 Nonce 第二次使用應該被拒絕 """ from src.services.security_interceptor import ( NonceReplayError, TelegramSecurityInterceptor, ) interceptor = TelegramSecurityInterceptor() await interceptor.initialize() # 生成 Nonce approval_id = "replay-test-456" nonce = interceptor.generate_callback_nonce(approval_id, "approve") parsed = interceptor.parse_callback_data(nonce) # 模擬白名單使用者 monkeypatch.setattr(settings, "OPENCLAW_TG_USER_WHITELIST", [12345]) # 第一次使用 - 應該成功 user = await interceptor.verify_callback( user_id=12345, callback_id="callback-1", nonce=parsed["nonce"], ) assert user.is_whitelisted # 第二次使用相同 Nonce - 應該被阻擋 with pytest.raises(NonceReplayError): await interceptor.verify_callback( user_id=12345, callback_id="callback-2", nonce=parsed["nonce"], ) @pytest.mark.asyncio async def test_whitelist_enforcement(self, monkeypatch): """ [Edge Case] 白名單驗證 - 未授權使用者 非白名單使用者應該被拒絕 """ from src.services.security_interceptor import ( TelegramSecurityInterceptor, UserNotWhitelistedError, ) interceptor = TelegramSecurityInterceptor() await interceptor.initialize() # 設定白名單只有 12345 monkeypatch.setattr(settings, "OPENCLAW_TG_USER_WHITELIST", [12345]) # 白名單使用者 - 應該通過 assert interceptor.is_whitelisted(12345) is True # 非白名單使用者 - 應該被拒絕 assert interceptor.is_whitelisted(99999) is False # 嘗試驗證非白名單使用者 - 應該拋出例外 with pytest.raises(UserNotWhitelistedError): await interceptor.verify_callback( user_id=99999, callback_id="callback-blocked", nonce=None, ) # ============================================================================= # Test: Telegram Webhook Endpoint # ============================================================================= class TestTelegramWebhook: """Telegram Webhook 端點測試""" @pytest.mark.asyncio async def test_webhook_ignores_non_callback_query(self): """ [Edge Case] 非 callback_query 的 Update 應該被忽略 預期: 200 OK, 但無實際處理 """ async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/telegram/webhook", json={ "update_id": 123456, "message": { "text": "Hello", }, }, ) assert response.status_code == 200 data = response.json() assert data["ok"] is True assert "Ignored" in data["message"] @pytest.mark.asyncio async def test_webhook_rejects_invalid_callback_data(self): """ [Edge Case] 缺少必要欄位的 callback_query 預期: 200 OK, 但回傳錯誤訊息 """ async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: response = await client.post( "/api/v1/telegram/webhook", json={ "update_id": 123456, "callback_query": { "id": "callback-123", # 缺少 data 和 from }, }, ) assert response.status_code == 200 data = response.json() assert data["ok"] is False assert "Invalid callback data" in data["message"] # ============================================================================= # Test: Shadow Mode (物理繳械) # ============================================================================= class TestShadowMode: """影子模式測試 - 確保物理繳械有效""" def test_shadow_mode_config_exists(self): """ [Config] SHADOW_MODE_ENABLED 設定存在 預期: 設定存在且預設為 True """ assert hasattr(settings, "SHADOW_MODE_ENABLED") # 影子模式預設應該開啟 (安全優先) assert settings.SHADOW_MODE_ENABLED is True @pytest.mark.asyncio async def test_executor_respects_shadow_mode(self, monkeypatch): """ [Executor] 影子模式下強制 Dry-Run 預期: 執行操作時僅記錄,不真正執行 """ from src.services.executor import ActionExecutor executor = ActionExecutor() # 確保影子模式開啟 monkeypatch.setattr(settings, "SHADOW_MODE_ENABLED", True) # 測試 DELETE_POD - 應該被攔截 result = await executor.delete_pod("test-pod", "default") assert result.success is True assert "[SHADOW MODE]" in result.message assert result.k8s_response["shadow_mode"] is True assert result.k8s_response["dry_run"] is True # 測試 RESTART_DEPLOYMENT - 應該被攔截 result = await executor.restart_deployment("test-deploy", "default") assert result.success is True assert "[SHADOW MODE]" in result.message assert result.k8s_response["shadow_mode"] is True # ============================================================================= # Integration Test Summary # ============================================================================= class TestIntegrationSummary: """整合測試摘要 - 確保所有端點可達""" @pytest.mark.asyncio async def test_health_endpoints_accessible(self): """驗證健康檢查端點可達""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: # Webhook 健康檢查 response = await client.get("/api/v1/webhooks/health") assert response.status_code == 200 # Telegram 健康檢查 response = await client.get("/api/v1/telegram/health") assert response.status_code == 200 @pytest.mark.asyncio async def test_api_docs_accessible(self): """驗證 API 文檔可達""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", ) as client: # Docs 位於 /api/v1/docs response = await client.get("/api/v1/docs") assert response.status_code == 200 response = await client.get("/api/v1/openapi.json") assert response.status_code == 200 if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"])