All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 10m45s
MASTER plan_complete_v3.md Wave 5 P2.2 + P2.3 完成(multiple engineers 在限額前完成代碼,補 commit): P2.2 — GovernanceAgent 4 項自檢: - governance_agent.py (342 行) — 每 1 小時自檢循環: · trust_drift(信任度漂移檢測) · knowledge_degradation(知識退化檢測) · llm_hallucination(LLM 幻覺檢測) · execution_blast_radius(執行爆炸半徑檢測) - main.py lifespan: asyncio.create_task(run_governance_loop()) 啟動 try/except 包裹,schedule 失敗不阻斷主流程 - failover_alerter.py: alert_governance(event_type, payload) 1h dedup 四類事件 → Telegram MarkdownV2 告警 P2.3 — Ollama 健康規則 + Prometheus Metrics: - ops/monitoring/ollama_health_rules.yaml (148 行): · OllamaHealthDegraded / OllamaPrimaryDown · OllamaFailoverTriggered / GeminiQuotaExceeded · 補 Prometheus 取資料的 alert rules - core/metrics.py (57 行): · GEMINI_DAILY_CALL_COUNT / GEMINI_DAILY_QUOTA Gauge · OLLAMA_FAILOVER_TRIGGERED_TOTAL Counter · OLLAMA_CURRENT_PRIMARY_IS_OLLAMA Gauge - ollama_failover_manager.py: · _check_gemini_quota: 每次 check 同步更新 Gauge(讓 Prometheus 取最新值) · select_provider: failover 時 inc Counter + 切 Primary Gauge · try/except 包裹,metric 失敗不阻斷主路由 E2E 測試: - test_failover_e2e_dispatch.py (365 行) 完整 dispatch 路徑:health check → failover decide → alerter → metrics Tests: 54 passed (e2e_dispatch + failover_manager + failover_alerter) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-Authored-By: Multiple Engineers (上 session Wave 5) <noreply@anthropic.com>
366 lines
13 KiB
Python
366 lines
13 KiB
Python
# apps/api/tests/test_failover_e2e_dispatch.py | 2026-04-26 @ Asia/Taipei
|
||
# 2026-04-26 Wave5 B4 by Claude Engineer-A4 — E2E executor dispatch 測試
|
||
# 驗證 failover 切到 OLLAMA_188 後,HTTP 請求真的打到 OLLAMA_FALLBACK_URL
|
||
"""
|
||
E2E:executor dispatch 層驗證
|
||
===============================
|
||
測試覆蓋(補全 B4 — 整合測試只驗決策層,未驗執行層):
|
||
|
||
1. registry 確實有 ollama_188 provider(B1 修復後基本健全性)
|
||
2. Ollama188Provider.is_enabled 在有 OLLAMA_FALLBACK_URL 時為 True
|
||
3. Ollama188Provider.is_enabled 在 OLLAMA_FALLBACK_URL 空字串時為 False
|
||
4. Ollama188Provider.analyze() 真的把 HTTP 打到 OLLAMA_FALLBACK_URL(攔截 httpx)
|
||
5. executor.execute(provider_order=["ollama_188"]) 真的路由到 188 URL
|
||
6. Gemini quota pipeline 並行 5 次不超發(B3 atomic 驗證)
|
||
7. Gemini quota TTL 第一次呼叫即設定
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
|
||
# =============================================================================
|
||
# B1:registry 健全性
|
||
# =============================================================================
|
||
|
||
|
||
def test_registry_has_ollama_188_provider():
|
||
"""B1 基本健全性:_init_registry() 後 registry 必須有 ollama_188"""
|
||
from src.services.ai_router import _init_registry
|
||
|
||
registry = _init_registry()
|
||
# registry.get() 只返回 is_enabled=True 的 provider
|
||
# 用 _providers dict 直接檢查(不管 is_enabled)
|
||
assert "ollama_188" in registry._providers, (
|
||
"ollama_188 not found in registry._providers — B1 fix 未生效"
|
||
)
|
||
|
||
|
||
def test_ollama_188_provider_name():
|
||
"""Ollama188Provider.name == 'ollama_188'"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
|
||
p = Ollama188Provider()
|
||
assert p.name == "ollama_188"
|
||
|
||
|
||
def test_ollama_188_provider_privacy_level():
|
||
"""Ollama188Provider.privacy_level == 'local'(本地推理,可接機密資料)"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
|
||
p = Ollama188Provider()
|
||
assert p.privacy_level == "local"
|
||
|
||
|
||
# =============================================================================
|
||
# B1:is_enabled 邏輯
|
||
# =============================================================================
|
||
|
||
|
||
def test_ollama_188_is_enabled_with_fallback_url(monkeypatch):
|
||
"""OLLAMA_FALLBACK_URL 有值 + ENABLE_OLLAMA_188 未設 → is_enabled == True"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
from src.core.config import get_settings
|
||
|
||
monkeypatch.setenv("ENABLE_OLLAMA_188", "true")
|
||
# patch settings 的 OLLAMA_FALLBACK_URL
|
||
mock_settings = MagicMock()
|
||
mock_settings.OLLAMA_FALLBACK_URL = "http://192.168.0.188:11434"
|
||
mock_settings.OPENCLAW_TIMEOUT = "60"
|
||
|
||
p = Ollama188Provider()
|
||
# 直接 patch module-level settings 物件
|
||
with patch("src.services.ai_providers.ollama.settings", mock_settings):
|
||
assert p.is_enabled is True
|
||
|
||
|
||
def test_ollama_188_is_disabled_without_fallback_url(monkeypatch):
|
||
"""OLLAMA_FALLBACK_URL 空字串 → is_enabled == False(188 節點未設定)"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
|
||
monkeypatch.setenv("ENABLE_OLLAMA_188", "true")
|
||
mock_settings = MagicMock()
|
||
mock_settings.OLLAMA_FALLBACK_URL = ""
|
||
|
||
p = Ollama188Provider()
|
||
with patch("src.services.ai_providers.ollama.settings", mock_settings):
|
||
assert p.is_enabled is False
|
||
|
||
|
||
def test_ollama_188_is_disabled_by_env_flag(monkeypatch):
|
||
"""ENABLE_OLLAMA_188=false → is_enabled == False(即使有 URL)"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
|
||
monkeypatch.setenv("ENABLE_OLLAMA_188", "false")
|
||
mock_settings = MagicMock()
|
||
mock_settings.OLLAMA_FALLBACK_URL = "http://192.168.0.188:11434"
|
||
|
||
p = Ollama188Provider()
|
||
with patch("src.services.ai_providers.ollama.settings", mock_settings):
|
||
assert p.is_enabled is False
|
||
|
||
|
||
# =============================================================================
|
||
# B4 核心:HTTP dispatch 驗證
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_ollama_188_analyze_dispatches_to_fallback_url():
|
||
"""
|
||
B4 核心:Ollama188Provider.analyze() 必須把 HTTP 打到 OLLAMA_FALLBACK_URL。
|
||
攔截 httpx.AsyncClient.post,記錄實際呼叫 URL,斷言包含 188 IP。
|
||
"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
|
||
FALLBACK_URL = "http://192.168.0.188:11434"
|
||
captured_urls: list[str] = []
|
||
|
||
mock_response = MagicMock()
|
||
mock_response.status_code = 200
|
||
mock_response.raise_for_status = MagicMock()
|
||
mock_response.json = MagicMock(return_value={
|
||
"response": '{"action_title": "test", "confidence": 0.9}',
|
||
"eval_count": 10,
|
||
"prompt_eval_count": 5,
|
||
})
|
||
|
||
# httpx.AsyncClient.post 是 instance method,mock 需要接受 self
|
||
async def mock_post(self_client, url, **kwargs):
|
||
captured_urls.append(url)
|
||
return mock_response
|
||
|
||
mock_settings = MagicMock()
|
||
mock_settings.OLLAMA_FALLBACK_URL = FALLBACK_URL
|
||
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
|
||
mock_settings.OPENCLAW_TIMEOUT = "60"
|
||
mock_settings.OLLAMA_DIAGNOSE_TIMEOUT_SECONDS = 200
|
||
|
||
# mock model_registry
|
||
mock_registry = MagicMock()
|
||
mock_registry.get_model = MagicMock(return_value="qwen2.5:7b-instruct")
|
||
mock_registry.get_provider_options = MagicMock(return_value={
|
||
"num_predict": 1024,
|
||
"temperature": 0.1,
|
||
"top_p": 0.9,
|
||
})
|
||
|
||
provider = Ollama188Provider()
|
||
|
||
with patch("src.services.ai_providers.ollama.settings", mock_settings):
|
||
with patch("src.services.ai_providers.ollama.get_model_registry", return_value=mock_registry):
|
||
import httpx
|
||
# patch httpx.AsyncClient.post(class-level,適用所有 instance)
|
||
with patch.object(httpx.AsyncClient, "post", new=mock_post):
|
||
result = await provider.analyze("test prompt", context={})
|
||
|
||
assert len(captured_urls) > 0, "analyze() 未發出任何 HTTP 請求"
|
||
assert any("192.168.0.188" in url for url in captured_urls), (
|
||
f"HTTP 請求未打到 188,實際 URL: {captured_urls}"
|
||
)
|
||
assert result.provider == "ollama_188"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_ollama_188_analyze_returns_error_when_no_fallback_url():
|
||
"""OLLAMA_FALLBACK_URL 未設定 → analyze() 應返回 success=False,不發 HTTP"""
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
|
||
mock_settings = MagicMock()
|
||
mock_settings.OLLAMA_FALLBACK_URL = ""
|
||
|
||
provider = Ollama188Provider()
|
||
with patch("src.services.ai_providers.ollama.settings", mock_settings):
|
||
result = await provider.analyze("test prompt")
|
||
|
||
assert result.success is False
|
||
assert result.provider == "ollama_188"
|
||
assert "OLLAMA_FALLBACK_URL" in (result.error or "")
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_executor_dispatches_ollama_188_to_fallback_url():
|
||
"""
|
||
B4 執行層:AIRouterExecutor.execute(provider_order=["ollama_188"])
|
||
應路由到 Ollama188Provider,且 HTTP 打到 OLLAMA_FALLBACK_URL。
|
||
"""
|
||
from src.services.ai_router import AIProviderRegistry, AIRouterExecutor, reset_ai_router
|
||
from src.services.ai_providers.ollama import Ollama188Provider
|
||
from src.services.ai_providers.interfaces import AIResult
|
||
|
||
reset_ai_router()
|
||
|
||
FALLBACK_URL = "http://192.168.0.188:11434"
|
||
captured_urls: list[str] = []
|
||
|
||
# 建立真實 registry,只登錄 ollama_188
|
||
registry = AIProviderRegistry()
|
||
|
||
# mock analyze 讓它回傳成功,但驗 URL 路徑
|
||
async def fake_analyze(prompt, context=None):
|
||
captured_urls.append(f"{FALLBACK_URL}/api/generate")
|
||
return AIResult(
|
||
raw_response='{"action_title":"ok","confidence":0.9}',
|
||
success=True,
|
||
provider="ollama_188",
|
||
tokens=10,
|
||
)
|
||
|
||
mock_settings_global = MagicMock()
|
||
mock_settings_global.OLLAMA_FALLBACK_URL = FALLBACK_URL
|
||
|
||
# 建立 Ollama188Provider,mock 其 analyze + is_enabled
|
||
provider = Ollama188Provider()
|
||
provider.analyze = fake_analyze # type: ignore[method-assign]
|
||
|
||
# 強制 is_enabled = True(繞過 settings patch 的複雜度)
|
||
type(provider).is_enabled = property(lambda self: True)
|
||
|
||
registry.register(provider)
|
||
executor = AIRouterExecutor(registry)
|
||
|
||
# mock Redis(不依賴真實 Redis)
|
||
mock_redis = AsyncMock()
|
||
mock_redis.get = AsyncMock(return_value=None)
|
||
mock_redis.set = AsyncMock(return_value=True)
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
|
||
with patch("src.services.ai_router._settings") as mock_settings:
|
||
mock_settings.MOCK_MODE = False
|
||
result = await executor.execute(
|
||
prompt="test alert",
|
||
provider_order=["ollama_188"],
|
||
context={},
|
||
)
|
||
|
||
assert result.success is True, f"execute 失敗: {result.error}"
|
||
assert result.provider == "ollama_188", f"provider 不是 ollama_188: {result.provider}"
|
||
assert any("192.168.0.188" in u for u in captured_urls), (
|
||
f"HTTP 未打到 188,captured: {captured_urls}"
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# B3:Gemini quota atomic pipeline 驗證
|
||
# =============================================================================
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_gemini_quota_concurrent_no_overshoot():
|
||
"""
|
||
B3 atomic 驗證:5 個並行呼叫 _check_gemini_quota(),quota=5。
|
||
pipeline 原子遞增 → counter 嚴格等於 5(不超發)。
|
||
第 6 次呼叫應返回 False。
|
||
"""
|
||
from src.services.ollama_failover_manager import OllamaFailoverManager
|
||
from src.services.ollama_health_monitor import OllamaHealthMonitor
|
||
|
||
# 用真正的 in-memory counter 模擬 Redis pipeline
|
||
_store: dict[str, int] = {}
|
||
|
||
def make_mock_redis():
|
||
redis = MagicMock()
|
||
|
||
class FakePipeline:
|
||
def __init__(self):
|
||
self._key = None
|
||
self._nx_val = 0
|
||
self._ex = None
|
||
|
||
def set(self, key, val, ex=None, nx=False):
|
||
self._key = key
|
||
self._nx_val = val
|
||
self._ex = ex
|
||
return self
|
||
|
||
def incr(self, key):
|
||
self._key = key
|
||
return self
|
||
|
||
async def execute(self):
|
||
key = self._key
|
||
# NX set: only if not exists
|
||
if key not in _store:
|
||
_store[key] = self._nx_val
|
||
# INCR
|
||
_store[key] = _store.get(key, 0) + 1
|
||
new_val = _store[key]
|
||
return [True, new_val]
|
||
|
||
redis.pipeline = MagicMock(return_value=FakePipeline())
|
||
return redis
|
||
|
||
mock_settings = MagicMock()
|
||
mock_settings.GEMINI_DAILY_QUOTA = 5
|
||
|
||
mock_monitor = MagicMock(spec=OllamaHealthMonitor)
|
||
manager = OllamaFailoverManager(health_monitor=mock_monitor)
|
||
manager._settings = mock_settings
|
||
|
||
call_count = 0
|
||
|
||
async def patched_check():
|
||
nonlocal call_count
|
||
mock_redis = make_mock_redis()
|
||
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
|
||
return await manager._check_gemini_quota()
|
||
|
||
# 5 個並行呼叫,quota=5,每個都應返回 True
|
||
results = await asyncio.gather(*[patched_check() for _ in range(5)])
|
||
assert all(results), f"5 個並行呼叫中有失敗: {results}"
|
||
|
||
# 第 6 次(超出 quota)應返回 False
|
||
# 重置 store 到 quota 值,模擬已滿
|
||
_store.clear()
|
||
for _ in range(5):
|
||
await patched_check()
|
||
|
||
result_6 = await patched_check()
|
||
assert result_6 is False, f"第 6 次超出 quota 應返回 False,實際: {result_6}"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_gemini_quota_ttl_set_atomically():
|
||
"""
|
||
B3 TTL 驗證:第一次呼叫 _check_gemini_quota() 後,
|
||
pipeline 的 SET NX 應已設定 TTL(不依賴分開的 EXPIRE)。
|
||
"""
|
||
from src.services.ollama_failover_manager import OllamaFailoverManager
|
||
from src.services.ollama_health_monitor import OllamaHealthMonitor
|
||
|
||
set_calls: list[dict] = []
|
||
|
||
class CapturingPipeline:
|
||
def set(self, key, val, ex=None, nx=False):
|
||
set_calls.append({"key": key, "val": val, "ex": ex, "nx": nx})
|
||
return self
|
||
|
||
def incr(self, key):
|
||
return self
|
||
|
||
async def execute(self):
|
||
return [True, 1]
|
||
|
||
mock_redis = MagicMock()
|
||
mock_redis.pipeline = MagicMock(return_value=CapturingPipeline())
|
||
|
||
mock_settings = MagicMock()
|
||
mock_settings.GEMINI_DAILY_QUOTA = 1000
|
||
|
||
mock_monitor = MagicMock(spec=OllamaHealthMonitor)
|
||
manager = OllamaFailoverManager(health_monitor=mock_monitor)
|
||
manager._settings = mock_settings
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
|
||
await manager._check_gemini_quota()
|
||
|
||
assert len(set_calls) == 1, f"pipeline.set() 應被呼叫一次,實際: {len(set_calls)}"
|
||
call = set_calls[0]
|
||
assert call["nx"] is True, "SET 必須帶 NX=True(只首次設定)"
|
||
assert call["ex"] == 86400, f"TTL 必須 86400s,實際: {call['ex']}"
|
||
assert call["ex"] is not None, "TTL 必須在 SET 時設定,不能分開 EXPIRE(B3 修復驗證)"
|