All checks were successful
E2E Health Check / e2e-health (push) Successful in 24s
修復內容: 1. e2e_network_test.py: 移除 unittest.mock - 將 16 個 patch.object 改為 pytest monkeypatch - 符合 feedback_no_mock_testing.md 2. audit_logs.py: Router→Service 層封裝 - 新增 AuditLogService (audit_log_service.py) - Router 改用 get_audit_log_service() - 移除直接 Repository 存取 3. incidents.py:463: DEBUG 端點重構 - 移除 get_incident_repository() 直接呼叫 - 完全透過 IncidentService 操作 - 簡化回傳結構 遵循規範: - Skill 09: Router 層禁止直接外部 API 呼叫 - feedback_lewooogo_modular_enforcement.md: Service 層封裝 - feedback_no_mock_testing.md: 禁止 MagicMock/AsyncMock Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
511 lines
16 KiB
Python
511 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Phase 5 E2E 網路層測試 - HMAC 安全驗證 + Nonce 防重放
|
||
=====================================================
|
||
首席架構師要求: 必須真正撞擊網路端點,驗證安全機制有效性
|
||
|
||
Phase 22 P1 修復: 移除 unittest.mock,使用 pytest monkeypatch
|
||
2026-03-31 Claude Code (首席架構師)
|
||
|
||
測試涵蓋:
|
||
1. HMAC 驗證 - 缺少 Header
|
||
2. HMAC 驗證 - 簽章錯誤
|
||
3. HMAC 驗證 - 正確簽章
|
||
4. Telegram Nonce - 重放攻擊防禦
|
||
5. Telegram 白名單 - 未授權使用者
|
||
|
||
使用方式:
|
||
cd apps/api && pytest tests/e2e_network_test.py -v
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
|
||
import pytest
|
||
from httpx import ASGITransport, AsyncClient
|
||
|
||
from src.core.config import settings
|
||
from src.main import app
|
||
|
||
# =============================================================================
|
||
# Helper Functions
|
||
# =============================================================================
|
||
|
||
def compute_hmac_signature(secret: str, payload: dict) -> str:
|
||
"""計算 HMAC-SHA256 簽章"""
|
||
body = json.dumps(payload).encode()
|
||
signature = hmac.new(
|
||
secret.encode(),
|
||
body,
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
return f"sha256={signature}"
|
||
|
||
|
||
# =============================================================================
|
||
# Test Fixtures
|
||
# =============================================================================
|
||
|
||
@pytest.fixture
|
||
def hmac_secret():
|
||
"""測試用 HMAC Secret"""
|
||
return "test-hmac-secret-for-e2e-testing"
|
||
|
||
|
||
@pytest.fixture
|
||
def valid_alert_payload():
|
||
"""有效的告警 Payload"""
|
||
return {
|
||
"alert_type": "k8s_pod_crash",
|
||
"severity": "warning",
|
||
"source": "prometheus",
|
||
"target_resource": "test-pod-123",
|
||
"namespace": "default",
|
||
"message": "E2E Test Alert",
|
||
"metrics": {"cpu_percent": 50},
|
||
}
|
||
|
||
|
||
# =============================================================================
|
||
# Test: HMAC Verification
|
||
# =============================================================================
|
||
|
||
class TestHMACVerification:
|
||
"""HMAC 簽章驗證測試套件"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_missing_hmac_header_in_prod(
|
||
self,
|
||
hmac_secret: str,
|
||
valid_alert_payload: dict,
|
||
monkeypatch,
|
||
):
|
||
"""
|
||
[Edge Case 1] 缺少 HMAC Header (生產環境)
|
||
|
||
預期: 401 Unauthorized
|
||
"""
|
||
# Phase 22 P1: 使用 monkeypatch 取代 patch.object
|
||
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
|
||
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
# 故意不帶 X-Signature-256 Header
|
||
)
|
||
|
||
assert response.status_code == 401
|
||
assert "HMAC verification failed" in response.json()["detail"]
|
||
assert "Missing X-Signature-256" in response.json()["detail"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_missing_hmac_header_in_dev_without_secret(
|
||
self,
|
||
valid_alert_payload: dict,
|
||
monkeypatch,
|
||
):
|
||
"""
|
||
[Edge Case 2] 開發環境無 Secret 設定 - 允許跳過驗證
|
||
|
||
預期: 通過 (200) 或 業務邏輯錯誤 (非 401)
|
||
"""
|
||
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", "")
|
||
monkeypatch.setattr(settings, "ENVIRONMENT", "dev")
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
)
|
||
|
||
# 開發環境允許跳過 HMAC,不應該是 401
|
||
assert response.status_code != 401
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_wrong_hmac_signature(
|
||
self,
|
||
hmac_secret: str,
|
||
valid_alert_payload: dict,
|
||
monkeypatch,
|
||
):
|
||
"""
|
||
[Edge Case 3] HMAC 簽章錯誤
|
||
|
||
預期: 401 Unauthorized
|
||
"""
|
||
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
|
||
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
headers={
|
||
"X-Signature-256": "sha256=0000000000000000000000000000000000000000000000000000000000000000",
|
||
},
|
||
)
|
||
|
||
assert response.status_code == 401
|
||
assert "HMAC verification failed" in response.json()["detail"]
|
||
assert "Invalid signature" in response.json()["detail"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_invalid_signature_format(
|
||
self,
|
||
hmac_secret: str,
|
||
valid_alert_payload: dict,
|
||
monkeypatch,
|
||
):
|
||
"""
|
||
[Edge Case 4] 簽章格式錯誤 (非 sha256= 開頭)
|
||
|
||
預期: 401 Unauthorized
|
||
"""
|
||
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
|
||
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
headers={
|
||
"X-Signature-256": "md5=invalid_format",
|
||
},
|
||
)
|
||
|
||
assert response.status_code == 401
|
||
assert "Invalid signature format" in response.json()["detail"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_valid_hmac_signature(
|
||
self,
|
||
hmac_secret: str,
|
||
valid_alert_payload: dict,
|
||
monkeypatch,
|
||
):
|
||
"""
|
||
[Happy Path] 正確的 HMAC 簽章
|
||
|
||
預期: 通過 HMAC 驗證 (200 或業務邏輯錯誤,但非 401)
|
||
|
||
注意: 必須使用與 httpx 相同的 JSON 序列化方式
|
||
"""
|
||
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", hmac_secret)
|
||
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
|
||
|
||
# 使用與 httpx 相同的 JSON 序列化 (separators 無空格)
|
||
body = json.dumps(valid_alert_payload, separators=(",", ":")).encode()
|
||
signature = "sha256=" + hmac.new(
|
||
hmac_secret.encode(),
|
||
body,
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
content=body,
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
"X-Signature-256": signature,
|
||
},
|
||
)
|
||
|
||
# 不應該是 401 (HMAC 錯誤)
|
||
# 可能是 200 或其他業務錯誤 (如 DB 連線)
|
||
assert response.status_code != 401, f"HMAC 驗證應該通過,但收到: {response.json()}"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_hmac_secret_missing_in_prod_blocks_request(
|
||
self,
|
||
valid_alert_payload: dict,
|
||
monkeypatch,
|
||
):
|
||
"""
|
||
[Edge Case 5] 生產環境未設定 Secret - Fail-Closed
|
||
|
||
預期: 401 Unauthorized (嚴禁跳過)
|
||
"""
|
||
monkeypatch.setattr(settings, "WEBHOOK_HMAC_SECRET", "")
|
||
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/webhooks/alerts",
|
||
json=valid_alert_payload,
|
||
)
|
||
|
||
assert response.status_code == 401
|
||
assert "WEBHOOK_HMAC_SECRET missing in production" in response.json()["detail"]
|
||
|
||
|
||
# =============================================================================
|
||
# Test: Telegram Security Interceptor
|
||
# =============================================================================
|
||
|
||
class TestTelegramSecurityInterceptor:
|
||
"""Telegram 安全攔截器測試套件"""
|
||
|
||
def test_nonce_generation_and_parsing(self):
|
||
"""
|
||
[Unit Test] Nonce 生成與解析
|
||
|
||
驗證 Nonce 結構正確
|
||
"""
|
||
from src.services.security_interceptor import TelegramSecurityInterceptor
|
||
|
||
interceptor = TelegramSecurityInterceptor()
|
||
|
||
# 生成 Nonce
|
||
approval_id = "test-approval-123"
|
||
action = "approve"
|
||
nonce = interceptor.generate_callback_nonce(approval_id, action)
|
||
|
||
# 解析 Nonce
|
||
parsed = interceptor.parse_callback_data(nonce)
|
||
|
||
assert parsed["action"] == action
|
||
assert parsed["approval_id"] == approval_id
|
||
assert "nonce" in parsed
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_nonce_replay_attack_blocked(self, monkeypatch):
|
||
"""
|
||
[Edge Case] Nonce 重放攻擊 - 必須被阻擋
|
||
|
||
同一個 Nonce 第二次使用應該被拒絕
|
||
"""
|
||
from src.services.security_interceptor import (
|
||
NonceReplayError,
|
||
TelegramSecurityInterceptor,
|
||
)
|
||
|
||
interceptor = TelegramSecurityInterceptor()
|
||
await interceptor.initialize()
|
||
|
||
# 生成 Nonce
|
||
approval_id = "replay-test-456"
|
||
nonce = interceptor.generate_callback_nonce(approval_id, "approve")
|
||
parsed = interceptor.parse_callback_data(nonce)
|
||
|
||
# 模擬白名單使用者
|
||
monkeypatch.setattr(settings, "OPENCLAW_TG_USER_WHITELIST", [12345])
|
||
|
||
# 第一次使用 - 應該成功
|
||
user = await interceptor.verify_callback(
|
||
user_id=12345,
|
||
callback_id="callback-1",
|
||
nonce=parsed["nonce"],
|
||
)
|
||
assert user.is_whitelisted
|
||
|
||
# 第二次使用相同 Nonce - 應該被阻擋
|
||
with pytest.raises(NonceReplayError):
|
||
await interceptor.verify_callback(
|
||
user_id=12345,
|
||
callback_id="callback-2",
|
||
nonce=parsed["nonce"],
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_whitelist_enforcement(self, monkeypatch):
|
||
"""
|
||
[Edge Case] 白名單驗證 - 未授權使用者
|
||
|
||
非白名單使用者應該被拒絕
|
||
"""
|
||
from src.services.security_interceptor import (
|
||
TelegramSecurityInterceptor,
|
||
UserNotWhitelistedError,
|
||
)
|
||
|
||
interceptor = TelegramSecurityInterceptor()
|
||
await interceptor.initialize()
|
||
|
||
# 設定白名單只有 12345
|
||
monkeypatch.setattr(settings, "OPENCLAW_TG_USER_WHITELIST", [12345])
|
||
|
||
# 白名單使用者 - 應該通過
|
||
assert interceptor.is_whitelisted(12345) is True
|
||
|
||
# 非白名單使用者 - 應該被拒絕
|
||
assert interceptor.is_whitelisted(99999) is False
|
||
|
||
# 嘗試驗證非白名單使用者 - 應該拋出例外
|
||
with pytest.raises(UserNotWhitelistedError):
|
||
await interceptor.verify_callback(
|
||
user_id=99999,
|
||
callback_id="callback-blocked",
|
||
nonce=None,
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Test: Telegram Webhook Endpoint
|
||
# =============================================================================
|
||
|
||
class TestTelegramWebhook:
|
||
"""Telegram Webhook 端點測試"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_webhook_ignores_non_callback_query(self):
|
||
"""
|
||
[Edge Case] 非 callback_query 的 Update 應該被忽略
|
||
|
||
預期: 200 OK, 但無實際處理
|
||
"""
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/telegram/webhook",
|
||
json={
|
||
"update_id": 123456,
|
||
"message": {
|
||
"text": "Hello",
|
||
},
|
||
},
|
||
)
|
||
|
||
assert response.status_code == 200
|
||
data = response.json()
|
||
assert data["ok"] is True
|
||
assert "Ignored" in data["message"]
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_webhook_rejects_invalid_callback_data(self):
|
||
"""
|
||
[Edge Case] 缺少必要欄位的 callback_query
|
||
|
||
預期: 200 OK, 但回傳錯誤訊息
|
||
"""
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
response = await client.post(
|
||
"/api/v1/telegram/webhook",
|
||
json={
|
||
"update_id": 123456,
|
||
"callback_query": {
|
||
"id": "callback-123",
|
||
# 缺少 data 和 from
|
||
},
|
||
},
|
||
)
|
||
|
||
assert response.status_code == 200
|
||
data = response.json()
|
||
assert data["ok"] is False
|
||
assert "Invalid callback data" in data["message"]
|
||
|
||
|
||
# =============================================================================
|
||
# Test: Shadow Mode (物理繳械)
|
||
# =============================================================================
|
||
|
||
class TestShadowMode:
|
||
"""影子模式測試 - 確保物理繳械有效"""
|
||
|
||
def test_shadow_mode_config_exists(self):
|
||
"""
|
||
[Config] SHADOW_MODE_ENABLED 設定存在
|
||
|
||
預期: 設定存在且預設為 True
|
||
"""
|
||
assert hasattr(settings, "SHADOW_MODE_ENABLED")
|
||
# 影子模式預設應該開啟 (安全優先)
|
||
assert settings.SHADOW_MODE_ENABLED is True
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_executor_respects_shadow_mode(self, monkeypatch):
|
||
"""
|
||
[Executor] 影子模式下強制 Dry-Run
|
||
|
||
預期: 執行操作時僅記錄,不真正執行
|
||
"""
|
||
from src.services.executor import ActionExecutor
|
||
|
||
executor = ActionExecutor()
|
||
|
||
# 確保影子模式開啟
|
||
monkeypatch.setattr(settings, "SHADOW_MODE_ENABLED", True)
|
||
|
||
# 測試 DELETE_POD - 應該被攔截
|
||
result = await executor.delete_pod("test-pod", "default")
|
||
|
||
assert result.success is True
|
||
assert "[SHADOW MODE]" in result.message
|
||
assert result.k8s_response["shadow_mode"] is True
|
||
assert result.k8s_response["dry_run"] is True
|
||
|
||
# 測試 RESTART_DEPLOYMENT - 應該被攔截
|
||
result = await executor.restart_deployment("test-deploy", "default")
|
||
|
||
assert result.success is True
|
||
assert "[SHADOW MODE]" in result.message
|
||
assert result.k8s_response["shadow_mode"] is True
|
||
|
||
|
||
# =============================================================================
|
||
# Integration Test Summary
|
||
# =============================================================================
|
||
|
||
class TestIntegrationSummary:
|
||
"""整合測試摘要 - 確保所有端點可達"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_health_endpoints_accessible(self):
|
||
"""驗證健康檢查端點可達"""
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
# Webhook 健康檢查
|
||
response = await client.get("/api/v1/webhooks/health")
|
||
assert response.status_code == 200
|
||
|
||
# Telegram 健康檢查
|
||
response = await client.get("/api/v1/telegram/health")
|
||
assert response.status_code == 200
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_api_docs_accessible(self):
|
||
"""驗證 API 文檔可達"""
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://test",
|
||
) as client:
|
||
# Docs 位於 /api/v1/docs
|
||
response = await client.get("/api/v1/docs")
|
||
assert response.status_code == 200
|
||
|
||
response = await client.get("/api/v1/openapi.json")
|
||
assert response.status_code == 200
|
||
|
||
|
||
if __name__ == "__main__":
|
||
pytest.main([__file__, "-v", "--tb=short"])
|