Files
awoooi/apps/api/tests/test_failover_e2e_dispatch.py
Your Name 2c57b71db9
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 10m45s
feat(wave5-p2): GovernanceAgent 4 項自檢 + Ollama 健康告警規則 + Prometheus metrics 整合
MASTER plan_complete_v3.md Wave 5 P2.2 + P2.3 完成(multiple engineers 在限額前完成代碼,補 commit):

P2.2 — GovernanceAgent 4 項自檢:
- governance_agent.py (342 行) — 每 1 小時自檢循環:
  · trust_drift(信任度漂移檢測)
  · knowledge_degradation(知識退化檢測)
  · llm_hallucination(LLM 幻覺檢測)
  · execution_blast_radius(執行爆炸半徑檢測)
- main.py lifespan: asyncio.create_task(run_governance_loop()) 啟動
  try/except 包裹,schedule 失敗不阻斷主流程
- failover_alerter.py: alert_governance(event_type, payload) 1h dedup
  四類事件 → Telegram MarkdownV2 告警

P2.3 — Ollama 健康規則 + Prometheus Metrics:
- ops/monitoring/ollama_health_rules.yaml (148 行):
  · OllamaHealthDegraded / OllamaPrimaryDown
  · OllamaFailoverTriggered / GeminiQuotaExceeded
  · 補 Prometheus 取資料的 alert rules
- core/metrics.py (57 行):
  · GEMINI_DAILY_CALL_COUNT / GEMINI_DAILY_QUOTA Gauge
  · OLLAMA_FAILOVER_TRIGGERED_TOTAL Counter
  · OLLAMA_CURRENT_PRIMARY_IS_OLLAMA Gauge
- ollama_failover_manager.py:
  · _check_gemini_quota: 每次 check 同步更新 Gauge(讓 Prometheus 取最新值)
  · select_provider: failover 時 inc Counter + 切 Primary Gauge
  · try/except 包裹,metric 失敗不阻斷主路由

E2E 測試:
- test_failover_e2e_dispatch.py (365 行)
  完整 dispatch 路徑:health check → failover decide → alerter → metrics

Tests: 54 passed (e2e_dispatch + failover_manager + failover_alerter)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Multiple Engineers (上 session Wave 5) <noreply@anthropic.com>
2026-04-26 20:56:19 +08:00

366 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# apps/api/tests/test_failover_e2e_dispatch.py | 2026-04-26 @ Asia/Taipei
# 2026-04-26 Wave5 B4 by Claude Engineer-A4 — E2E executor dispatch 測試
# 驗證 failover 切到 OLLAMA_188 後HTTP 請求真的打到 OLLAMA_FALLBACK_URL
"""
E2Eexecutor dispatch 層驗證
===============================
測試覆蓋(補全 B4 — 整合測試只驗決策層,未驗執行層):
1. registry 確實有 ollama_188 providerB1 修復後基本健全性)
2. Ollama188Provider.is_enabled 在有 OLLAMA_FALLBACK_URL 時為 True
3. Ollama188Provider.is_enabled 在 OLLAMA_FALLBACK_URL 空字串時為 False
4. Ollama188Provider.analyze() 真的把 HTTP 打到 OLLAMA_FALLBACK_URL攔截 httpx
5. executor.execute(provider_order=["ollama_188"]) 真的路由到 188 URL
6. Gemini quota pipeline 並行 5 次不超發B3 atomic 驗證)
7. Gemini quota TTL 第一次呼叫即設定
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# =============================================================================
# B1registry 健全性
# =============================================================================
def test_registry_has_ollama_188_provider():
"""B1 基本健全性_init_registry() 後 registry 必須有 ollama_188"""
from src.services.ai_router import _init_registry
registry = _init_registry()
# registry.get() 只返回 is_enabled=True 的 provider
# 用 _providers dict 直接檢查(不管 is_enabled
assert "ollama_188" in registry._providers, (
"ollama_188 not found in registry._providers — B1 fix 未生效"
)
def test_ollama_188_provider_name():
"""Ollama188Provider.name == 'ollama_188'"""
from src.services.ai_providers.ollama import Ollama188Provider
p = Ollama188Provider()
assert p.name == "ollama_188"
def test_ollama_188_provider_privacy_level():
"""Ollama188Provider.privacy_level == 'local'(本地推理,可接機密資料)"""
from src.services.ai_providers.ollama import Ollama188Provider
p = Ollama188Provider()
assert p.privacy_level == "local"
# =============================================================================
# B1is_enabled 邏輯
# =============================================================================
def test_ollama_188_is_enabled_with_fallback_url(monkeypatch):
"""OLLAMA_FALLBACK_URL 有值 + ENABLE_OLLAMA_188 未設 → is_enabled == True"""
from src.services.ai_providers.ollama import Ollama188Provider
from src.core.config import get_settings
monkeypatch.setenv("ENABLE_OLLAMA_188", "true")
# patch settings 的 OLLAMA_FALLBACK_URL
mock_settings = MagicMock()
mock_settings.OLLAMA_FALLBACK_URL = "http://192.168.0.188:11434"
mock_settings.OPENCLAW_TIMEOUT = "60"
p = Ollama188Provider()
# 直接 patch module-level settings 物件
with patch("src.services.ai_providers.ollama.settings", mock_settings):
assert p.is_enabled is True
def test_ollama_188_is_disabled_without_fallback_url(monkeypatch):
"""OLLAMA_FALLBACK_URL 空字串 → is_enabled == False188 節點未設定)"""
from src.services.ai_providers.ollama import Ollama188Provider
monkeypatch.setenv("ENABLE_OLLAMA_188", "true")
mock_settings = MagicMock()
mock_settings.OLLAMA_FALLBACK_URL = ""
p = Ollama188Provider()
with patch("src.services.ai_providers.ollama.settings", mock_settings):
assert p.is_enabled is False
def test_ollama_188_is_disabled_by_env_flag(monkeypatch):
"""ENABLE_OLLAMA_188=false → is_enabled == False即使有 URL"""
from src.services.ai_providers.ollama import Ollama188Provider
monkeypatch.setenv("ENABLE_OLLAMA_188", "false")
mock_settings = MagicMock()
mock_settings.OLLAMA_FALLBACK_URL = "http://192.168.0.188:11434"
p = Ollama188Provider()
with patch("src.services.ai_providers.ollama.settings", mock_settings):
assert p.is_enabled is False
# =============================================================================
# B4 核心HTTP dispatch 驗證
# =============================================================================
@pytest.mark.asyncio
async def test_ollama_188_analyze_dispatches_to_fallback_url():
"""
B4 核心Ollama188Provider.analyze() 必須把 HTTP 打到 OLLAMA_FALLBACK_URL。
攔截 httpx.AsyncClient.post記錄實際呼叫 URL斷言包含 188 IP。
"""
from src.services.ai_providers.ollama import Ollama188Provider
FALLBACK_URL = "http://192.168.0.188:11434"
captured_urls: list[str] = []
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status = MagicMock()
mock_response.json = MagicMock(return_value={
"response": '{"action_title": "test", "confidence": 0.9}',
"eval_count": 10,
"prompt_eval_count": 5,
})
# httpx.AsyncClient.post 是 instance methodmock 需要接受 self
async def mock_post(self_client, url, **kwargs):
captured_urls.append(url)
return mock_response
mock_settings = MagicMock()
mock_settings.OLLAMA_FALLBACK_URL = FALLBACK_URL
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
mock_settings.OPENCLAW_TIMEOUT = "60"
mock_settings.OLLAMA_DIAGNOSE_TIMEOUT_SECONDS = 200
# mock model_registry
mock_registry = MagicMock()
mock_registry.get_model = MagicMock(return_value="qwen2.5:7b-instruct")
mock_registry.get_provider_options = MagicMock(return_value={
"num_predict": 1024,
"temperature": 0.1,
"top_p": 0.9,
})
provider = Ollama188Provider()
with patch("src.services.ai_providers.ollama.settings", mock_settings):
with patch("src.services.ai_providers.ollama.get_model_registry", return_value=mock_registry):
import httpx
# patch httpx.AsyncClient.postclass-level適用所有 instance
with patch.object(httpx.AsyncClient, "post", new=mock_post):
result = await provider.analyze("test prompt", context={})
assert len(captured_urls) > 0, "analyze() 未發出任何 HTTP 請求"
assert any("192.168.0.188" in url for url in captured_urls), (
f"HTTP 請求未打到 188實際 URL: {captured_urls}"
)
assert result.provider == "ollama_188"
@pytest.mark.asyncio
async def test_ollama_188_analyze_returns_error_when_no_fallback_url():
"""OLLAMA_FALLBACK_URL 未設定 → analyze() 應返回 success=False不發 HTTP"""
from src.services.ai_providers.ollama import Ollama188Provider
mock_settings = MagicMock()
mock_settings.OLLAMA_FALLBACK_URL = ""
provider = Ollama188Provider()
with patch("src.services.ai_providers.ollama.settings", mock_settings):
result = await provider.analyze("test prompt")
assert result.success is False
assert result.provider == "ollama_188"
assert "OLLAMA_FALLBACK_URL" in (result.error or "")
@pytest.mark.asyncio
async def test_executor_dispatches_ollama_188_to_fallback_url():
"""
B4 執行層AIRouterExecutor.execute(provider_order=["ollama_188"])
應路由到 Ollama188Provider且 HTTP 打到 OLLAMA_FALLBACK_URL。
"""
from src.services.ai_router import AIProviderRegistry, AIRouterExecutor, reset_ai_router
from src.services.ai_providers.ollama import Ollama188Provider
from src.services.ai_providers.interfaces import AIResult
reset_ai_router()
FALLBACK_URL = "http://192.168.0.188:11434"
captured_urls: list[str] = []
# 建立真實 registry只登錄 ollama_188
registry = AIProviderRegistry()
# mock analyze 讓它回傳成功,但驗 URL 路徑
async def fake_analyze(prompt, context=None):
captured_urls.append(f"{FALLBACK_URL}/api/generate")
return AIResult(
raw_response='{"action_title":"ok","confidence":0.9}',
success=True,
provider="ollama_188",
tokens=10,
)
mock_settings_global = MagicMock()
mock_settings_global.OLLAMA_FALLBACK_URL = FALLBACK_URL
# 建立 Ollama188Providermock 其 analyze + is_enabled
provider = Ollama188Provider()
provider.analyze = fake_analyze # type: ignore[method-assign]
# 強制 is_enabled = True繞過 settings patch 的複雜度)
type(provider).is_enabled = property(lambda self: True)
registry.register(provider)
executor = AIRouterExecutor(registry)
# mock Redis不依賴真實 Redis
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=None)
mock_redis.set = AsyncMock(return_value=True)
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
with patch("src.services.ai_router._settings") as mock_settings:
mock_settings.MOCK_MODE = False
result = await executor.execute(
prompt="test alert",
provider_order=["ollama_188"],
context={},
)
assert result.success is True, f"execute 失敗: {result.error}"
assert result.provider == "ollama_188", f"provider 不是 ollama_188: {result.provider}"
assert any("192.168.0.188" in u for u in captured_urls), (
f"HTTP 未打到 188captured: {captured_urls}"
)
# =============================================================================
# B3Gemini quota atomic pipeline 驗證
# =============================================================================
@pytest.mark.asyncio
async def test_gemini_quota_concurrent_no_overshoot():
"""
B3 atomic 驗證5 個並行呼叫 _check_gemini_quota()quota=5。
pipeline 原子遞增 → counter 嚴格等於 5不超發
第 6 次呼叫應返回 False。
"""
from src.services.ollama_failover_manager import OllamaFailoverManager
from src.services.ollama_health_monitor import OllamaHealthMonitor
# 用真正的 in-memory counter 模擬 Redis pipeline
_store: dict[str, int] = {}
def make_mock_redis():
redis = MagicMock()
class FakePipeline:
def __init__(self):
self._key = None
self._nx_val = 0
self._ex = None
def set(self, key, val, ex=None, nx=False):
self._key = key
self._nx_val = val
self._ex = ex
return self
def incr(self, key):
self._key = key
return self
async def execute(self):
key = self._key
# NX set: only if not exists
if key not in _store:
_store[key] = self._nx_val
# INCR
_store[key] = _store.get(key, 0) + 1
new_val = _store[key]
return [True, new_val]
redis.pipeline = MagicMock(return_value=FakePipeline())
return redis
mock_settings = MagicMock()
mock_settings.GEMINI_DAILY_QUOTA = 5
mock_monitor = MagicMock(spec=OllamaHealthMonitor)
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
call_count = 0
async def patched_check():
nonlocal call_count
mock_redis = make_mock_redis()
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
return await manager._check_gemini_quota()
# 5 個並行呼叫quota=5每個都應返回 True
results = await asyncio.gather(*[patched_check() for _ in range(5)])
assert all(results), f"5 個並行呼叫中有失敗: {results}"
# 第 6 次(超出 quota應返回 False
# 重置 store 到 quota 值,模擬已滿
_store.clear()
for _ in range(5):
await patched_check()
result_6 = await patched_check()
assert result_6 is False, f"第 6 次超出 quota 應返回 False實際: {result_6}"
@pytest.mark.asyncio
async def test_gemini_quota_ttl_set_atomically():
"""
B3 TTL 驗證:第一次呼叫 _check_gemini_quota() 後,
pipeline 的 SET NX 應已設定 TTL不依賴分開的 EXPIRE
"""
from src.services.ollama_failover_manager import OllamaFailoverManager
from src.services.ollama_health_monitor import OllamaHealthMonitor
set_calls: list[dict] = []
class CapturingPipeline:
def set(self, key, val, ex=None, nx=False):
set_calls.append({"key": key, "val": val, "ex": ex, "nx": nx})
return self
def incr(self, key):
return self
async def execute(self):
return [True, 1]
mock_redis = MagicMock()
mock_redis.pipeline = MagicMock(return_value=CapturingPipeline())
mock_settings = MagicMock()
mock_settings.GEMINI_DAILY_QUOTA = 1000
mock_monitor = MagicMock(spec=OllamaHealthMonitor)
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
await manager._check_gemini_quota()
assert len(set_calls) == 1, f"pipeline.set() 應被呼叫一次,實際: {len(set_calls)}"
call = set_calls[0]
assert call["nx"] is True, "SET 必須帶 NX=True只首次設定"
assert call["ex"] == 86400, f"TTL 必須 86400s實際: {call['ex']}"
assert call["ex"] is not None, "TTL 必須在 SET 時設定,不能分開 EXPIREB3 修復驗證)"