diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index d4f229a7..5981c7d9 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -189,11 +189,29 @@ class Settings(BaseSettings): default="http://192.168.0.111:11434", # 2026-04-08 ogt: 切換至 M1 Pro (40+ tok/s vs 0.45 tok/s) description="Ollama LLM service URL", ) + # 2026-04-25 Claude Engineer-C (P1.1): Ollama 188 CPU-only 備援 (方案 C) + # 若空字串則 OllamaFailoverManager 僅使用 OLLAMA_URL(單節點模式) + OLLAMA_FALLBACK_URL: str = Field( + default="", + description="Ollama CPU-only fallback URL (188 備援,P1.1),空字串=停用", + ) + # 2026-04-25 Claude Engineer-C (P1.1): Ollama 健康檢測推理測試模型 + OLLAMA_HEALTH_CHECK_MODEL: str = Field( + default="qwen2.5:7b-instruct", + description="OllamaHealthMonitor 推理測試使用模型(P1.1)", + ) # 2026-04-12 ogt: 心跳必須確認載入的 Ollama 模型清單 OLLAMA_REQUIRED_MODELS: list[str] = Field( default=["nomic-embed-text", "qwen2.5:7b-instruct", "deepseek-r1:14b"], description="HeartbeatReportService 探測必要模型是否載入", ) + # 2026-04-25 critic-fix Part2 H7 by Claude Engineer-C2 + # Gemini 帳單熔斷:每日呼叫上限,超過改走 188+Nemotron + # 超過上限後寫 Redis key ollama:gemini_daily_count:{date},TTL 86400s + GEMINI_DAILY_QUOTA: int = Field( + default=1000, + description="每日 Gemini 呼叫上限,超過切到 188+Nemotron(P1.1 帳單熔斷)", + ) # Deprecated: use OPENCLAW_URL instead CLAWBOT_URL: str = Field( default="http://192.168.0.188:8088", # 🔧 修正: OpenClaw 實際 port 是 8088 diff --git a/apps/api/src/main.py b/apps/api/src/main.py index a5d1555d..c228a403 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -546,9 +546,39 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("ai_slo_watchdog_schedule_failed", error=str(e)) + # 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan + # OllamaFailoverManager + OllamaAutoRecoveryService 飛輪接線: + # failover 切換時 → recovery_callback → set_current_primary → Redis 持久化 + # recovery service 每 30s 檢查 → 111 連續 3 次 HEALTHY → 自動切回 → clear_cache + # 順序:先取 singleton → wire callback → 啟動 recovery service(才能接收 callback) + try: + from src.services.ollama_failover_manager import get_ollama_failover_manager + from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service + + _failover_mgr = get_ollama_failover_manager() + _recovery_svc = get_ollama_auto_recovery_service() + + # wire callback:failover 切換時通知 recovery service 更新 current_primary + _failover_mgr.set_recovery_callback(_recovery_svc.set_current_primary) + + # 啟動 recovery service(從 Redis bootstrap current_primary,並啟動背景監控) + await _recovery_svc.start() + logger.info("ollama_failover_system_started") + except Exception as e: + logger.warning("ollama_failover_system_start_failed", error=str(e)) + yield # Shutdown + # 2026-04-25 P1.2 by Claude Engineer-A2 — 優雅關閉 Ollama failover 背景監控 + # 必須在 Redis pool 關閉之前停止(recovery service 可能仍在寫 Redis) + try: + from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service + await get_ollama_auto_recovery_service().stop() + logger.info("ollama_failover_system_stopped") + except Exception as e: + logger.warning("ollama_failover_system_stop_failed", error=str(e)) + # Phase 6.1: 關閉 Signal Worker (先關閉 Consumer) await close_signal_worker() await publisher.stop() diff --git a/apps/api/src/services/ai_router.py b/apps/api/src/services/ai_router.py index cce0718a..634ae7dc 100644 --- a/apps/api/src/services/ai_router.py +++ b/apps/api/src/services/ai_router.py @@ -68,10 +68,14 @@ logger = structlog.get_logger(__name__) # ============================================================================= -class AIProviderEnum(Enum): +class AIProviderEnum(str, Enum): """AI 提供者""" OLLAMA = "ollama" + # 2026-04-25 critic-fix Part2 B2 by Claude Engineer-C2 + # P1.1b OllamaFailoverManager 使用 provider_name="ollama_188", + # 但 AIProviderEnum 沒有此值 → P1.2 整合時 lookup 失敗 + OLLAMA_188 = "ollama_188" # 188 CPU-only 備援節點(P1.1b) GEMINI = "gemini" CLAUDE = "claude" # 2026-04-02 ogt: C1 修復 — 對齊 Registry 實際名稱 @@ -85,6 +89,8 @@ class AIProviderEnum(Enum): # Provider 對應延遲預算 (ms) PROVIDER_LATENCY_BUDGET: dict[AIProviderEnum, int] = { AIProviderEnum.OLLAMA: 60000, # 本地,允許較長處理時間 + # 2026-04-25 critic-fix Part2 B2 by Claude Engineer-C2 — 188 CPU-only 推理較慢 + AIProviderEnum.OLLAMA_188: 120000, # 120s budget for CPU inference AIProviderEnum.GEMINI: 30000, # 雲端,較低延遲 AIProviderEnum.CLAUDE: 30000, # 雲端,較低延遲 # 2026-04-02 ogt: C1 修復 — 對齊 Registry 名稱 @@ -212,6 +218,10 @@ class AIRouter: self._intent_classifier = get_intent_classifier() self._complexity_scorer = get_complexity_scorer() self._model_registry = get_model_registry() + # 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan + # 延遲 import 避免循環依賴(ollama_failover_manager 不 import ai_router) + from src.services.ollama_failover_manager import get_ollama_failover_manager + self._failover_manager = get_ollama_failover_manager() # 從 ModelRegistry 取得模型配置 self._ollama_default = self._model_registry.get_model("ollama", "default") @@ -372,10 +382,51 @@ class AIRouter: intent, intent_result, complexity ) + # Step 3b: 若 initial decision 選到 OLLAMA,交由 FailoverManager 重評 + # 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan + # 只在 OLLAMA 時觸發,不干擾 NEMOTRON / OPENCLAW_NEMO / CLAUDE / GEMINI 路由 + failover_fallback: list[tuple[AIProviderEnum, str]] | None = None + if provider == AIProviderEnum.OLLAMA: + try: + failover_result = await self._failover_manager.select_provider( + task_type=intent.value if intent else "general" + ) + primary_str = failover_result.primary.provider_name + try: + provider = AIProviderEnum(primary_str) + model = failover_result.primary.model + reason = f"{reason} [failover→{primary_str}]" + except ValueError: + # provider_name 無法對應已知 enum(理論上不應發生,OLLAMA_188 已加) + logger.warning( + "ai_router_unknown_failover_provider", + provider=primary_str, + ) + # 重建 fallback chain:從 failover_result 轉換 + fb_list: list[tuple[AIProviderEnum, str]] = [] + for ep in failover_result.fallback_chain: + try: + fb_provider = AIProviderEnum(ep.provider_name) + fb_list.append((fb_provider, ep.model)) + except ValueError: + logger.warning( + "ai_router_unknown_failover_fallback_provider", + provider=ep.provider_name, + ) + failover_fallback = fb_list + except Exception as e: + # failover_manager 異常 → 保留原始 provider(fail-open) + logger.warning("ai_router_failover_manager_error", error=str(e)) + # Step 4: 建立 Fallback 鏈 # 2026-04-05 Claude Code: v4.3 — DIAGNOSE 改回 _full_fallback_chain # NIM 從 Phase 22 起就是主力,無隱私問題;Ollama CPU-only 不可用(實測 238s) - fallback_chain = self._build_fallback_chain(provider) + # 2026-04-25 P1.2: 若 failover_manager 回傳了 fallback chain,優先使用 + fallback_chain = ( + failover_fallback + if failover_fallback is not None + else self._build_fallback_chain(provider) + ) # Step 5: 計算延遲預算 latency_budget = PROVIDER_LATENCY_BUDGET.get(provider, 30000) diff --git a/apps/api/src/services/failover_alerter.py b/apps/api/src/services/failover_alerter.py new file mode 100644 index 00000000..00f0f438 --- /dev/null +++ b/apps/api/src/services/failover_alerter.py @@ -0,0 +1,218 @@ +"""Ollama 容災 / 自動恢復 / Gemini 帳單 Telegram 告警 + +設計原則: +- 每次 failover 觸發都通知(用戶明確指示) +- 但用 10min Redis dedup TTL 防同樣狀態重複告警 +- 三種告警類型:failover_triggered / recovery_succeeded / gemini_quota_exceeded + +2026-04-25 P1.5 by Claude Engineer-D — Telegram Alerter for Ollama Failover +""" + +from __future__ import annotations + +from datetime import datetime, timezone, timedelta +from typing import Any + +import structlog + +TAIPEI_TZ = timezone(timedelta(hours=8)) +DEDUP_TTL_SEC = 600 # 10 min +QUOTA_DEDUP_TTL_SEC = 86400 # 24h(每日 quota 告警只發一次) + +logger = structlog.get_logger(__name__) + + +class FailoverAlerter: + """Ollama 容災 Telegram 告警 + + 2026-04-25 P1.5 by Claude Engineer-D — Telegram Alerter for Ollama Failover + """ + + def __init__(self, redis_client=None) -> None: + # telegram_gateway 從 singleton 取,不注入(lifespan 已確保初始化) + self._redis = redis_client + + async def alert_failover(self, event: dict[str, Any]) -> None: + """111 故障切換到 Gemini/188 — 10min dedup""" + to_provider = event.get("to_provider", "unknown") + dedup_key = f"alert:failover:{to_provider}" + if not await self._check_dedup(dedup_key, ttl=DEDUP_TTL_SEC): + logger.debug("failover_alert_dedup_skipped", to_provider=to_provider) + return + + reason = event.get("reason", "unknown") + model = event.get("model", "?") + timestamp = event.get("timestamp", datetime.now(TAIPEI_TZ).isoformat()) + fallback_chain_str = event.get("fallback_chain_str", "?") + + msg = ( + f"*Ollama 容災激活*\n\n" + f"故障主機:Ollama 111 \\(GPU\\)\n" + f"故障狀態:{_escape_md(reason)}\n" + f"切換目標:{_escape_md(to_provider)} \\(model: {_escape_md(model)}\\)\n" + f"切換時間:{_escape_md(timestamp)}\n\n" + f"Fallback 鏈:{_escape_md(fallback_chain_str)}\n\n" + f"自動恢復服務持續監控,3 次 HEALTHY 後自動切回" + ) + await self._send(msg) + logger.info("failover_alert_sent", to_provider=to_provider) + + async def alert_recovery(self, event: dict[str, Any]) -> None: + """111 恢復後自動切回 — 10min dedup + + 2026-04-25 P1.5 by Claude Engineer-D — 用戶明確要求要通知 + """ + dedup_key = "alert:recovery" + if not await self._check_dedup(dedup_key, ttl=DEDUP_TTL_SEC): + logger.debug("recovery_alert_dedup_skipped") + return + + stable_count = event.get("stable_count", event.get("to", "?")) + # 相容 auto_recovery 傳入的 {"from": ..., "to": ...} 格式 + from_provider = event.get("from_provider", event.get("from", "?")) + to_provider = event.get("to_provider", event.get("to", "ollama_111")) + recovery_time = event.get("recovery_time", datetime.now(TAIPEI_TZ).isoformat()) + + msg = ( + f"*Ollama 自動恢復*\n\n" + f"恢復主機:Ollama 111 \\(GPU\\)\n" + f"穩定計數:連續 {stable_count} 次 HEALTHY\n" + f"切回時間:{_escape_md(str(recovery_time))}\n" + f"切換路徑:{_escape_md(str(from_provider))} → {_escape_md(str(to_provider))}\n\n" + f"自動化飛輪已恢復至 GPU 推理模式" + ) + await self._send(msg) + logger.info("recovery_alert_sent", from_provider=from_provider) + + async def alert_gemini_quota_exceeded(self, event: dict[str, Any]) -> None: + """Gemini 每日上限觸發,降級到 188 CPU 備援 — 24h dedup""" + dedup_key = "alert:gemini_quota_exceeded" + if not await self._check_dedup(dedup_key, ttl=QUOTA_DEDUP_TTL_SEC): + logger.debug("quota_alert_dedup_skipped") + return + + quota = event.get("quota", "?") + current_count = event.get("current_count", "?") + date_str = datetime.now(TAIPEI_TZ).date().isoformat() + + msg = ( + f"*Gemini 每日配額耗盡*\n\n" + f"日期:{date_str}\n" + f"上限:{quota} calls/day\n" + f"當前用量:{current_count}\n" + f"降級目標:OLLAMA\\_188 \\(CPU,推理較慢\\)\n\n" + f"進入慢速模式至明日 0:00\n" + f"建議檢查是否有異常流量,評估是否升級 Gemini 配額" + ) + await self._send(msg) + logger.info("quota_alert_sent", quota=quota, current_count=current_count) + + # ------------------------------------------------------------------------- + # Dedup(Redis SET NX EX) + # ------------------------------------------------------------------------- + + async def _check_dedup(self, key: str, ttl: int) -> bool: + """ + Redis SET NX EX 防止重複告警。 + True = 第一次(應送出),False = 已送過(跳過)。 + + fail-open:Redis 不可用時允許送出(不阻擋通知) + + 2026-04-25 P1.5 by Claude Engineer-D — Telegram dedup 鐵律 10min/24h TTL + """ + if self._redis is None: + return True # fail-open + try: + ok = await self._redis.set(f"{key}:dedup", "1", ex=ttl, nx=True) + return bool(ok) + except Exception as e: + logger.warning("dedup_check_failed", error=str(e)) + return True # fail-open + + # ------------------------------------------------------------------------- + # 發送(透過 TelegramGateway singleton) + # ------------------------------------------------------------------------- + + async def _send(self, message: str) -> None: + """發送至 Telegram OPENCLAW_TG_CHAT_ID + + 使用現有 TelegramGateway singleton(不另建 HTTP client), + parse_mode=MarkdownV2 對應 MarkdownV2 escape 規則。 + + alerter 失敗不阻斷主路由邏輯(exception 吞掉只 log) + + 2026-04-25 P1.5 by Claude Engineer-D — 告警失敗不能阻斷主流程 + """ + try: + from src.services.telegram_gateway import get_telegram_gateway + from src.core.config import get_settings + + settings = get_settings() + chat_id = getattr(settings, "OPENCLAW_TG_CHAT_ID", None) + if not chat_id: + logger.warning("telegram_chat_id_missing_failover_alert") + return + + gateway = get_telegram_gateway() + await gateway.send_notification(text=message, parse_mode="MarkdownV2") + logger.info("telegram_failover_alert_sent", message_len=len(message)) + except Exception as e: + # 不 raise — 告警失敗不該阻斷主流程(鐵律) + logger.exception("telegram_failover_send_failed", error=str(e)) + + +# ------------------------------------------------------------------------- +# MarkdownV2 escape 工具 +# ------------------------------------------------------------------------- + +_MD2_SPECIAL = r"\_*[]()~`>#+-=|{}.!" + + +def _escape_md(text: str) -> str: + """Escape MarkdownV2 特殊字元,防止 Telegram parse error。 + + 2026-04-25 P1.5 by Claude Engineer-D — MarkdownV2 安全逸出 + """ + for ch in _MD2_SPECIAL: + text = text.replace(ch, f"\\{ch}") + return text + + +# ============================================================================= +# Singleton +# ============================================================================= + +_alerter_instance: FailoverAlerter | None = None + + +def get_failover_alerter() -> FailoverAlerter: + """取得 FailoverAlerter singleton(lifespan 中注入依賴前也可安全呼叫,僅 fail-open) + + 2026-04-25 P1.5 by Claude Engineer-D — Singleton 取得 alerter + """ + global _alerter_instance + if _alerter_instance is None: + _alerter_instance = FailoverAlerter() + return _alerter_instance + + +def configure_alerter(redis_client) -> None: + """Lifespan 注入:redis_client 就緒後呼叫,讓 dedup 功能生效。 + + telegram 不另外注入,直接從 get_telegram_gateway() singleton 取得 + (lifespan startup 已保證 TelegramGateway 初始化完成)。 + + 2026-04-25 P1.5 by Claude Engineer-D — Lifespan 注入 + """ + global _alerter_instance + _alerter_instance = FailoverAlerter(redis_client=redis_client) + logger.info("failover_alerter_configured") + + +def reset_failover_alerter() -> None: + """重置 singleton(測試用) + + 2026-04-25 P1.5 by Claude Engineer-D + """ + global _alerter_instance + _alerter_instance = None diff --git a/apps/api/src/services/ollama_auto_recovery.py b/apps/api/src/services/ollama_auto_recovery.py new file mode 100644 index 00000000..4de18b33 --- /dev/null +++ b/apps/api/src/services/ollama_auto_recovery.py @@ -0,0 +1,436 @@ +""" +Ollama 自動恢復服務 - P1.1d +============================ +背景監控:偵測 111 從 OFFLINE/SLOW/DEGRADED 恢復為 HEALTHY 後,立刻切回 Ollama。 + +核心設計: +- 30s 輪詢 111 健康狀態 +- 防抖:連續 3 次 HEALTHY 才觸發切回(30s × 3 = 90s 穩定視窗) +- 中途若又 OFFLINE,counter 歸零,重新計數 +- 切回後呼叫 failover_manager.clear_cache(),讓下次路由重新評估 +- 預留 telegram_alerter 介面(P1.5 Engineer 接入) +- structlog audit service="ollama_auto_recovery" + +整合方式(FastAPI lifespan): + from src.services.ollama_auto_recovery import OllamaAutoRecoveryService + + @asynccontextmanager + async def lifespan(app: FastAPI): + recovery_svc = OllamaAutoRecoveryService() + await recovery_svc.start() + yield + await recovery_svc.stop() + +版本: v1.0 +建立: 2026-04-25 (台北時區) +建立者: Claude Engineer-C (P1.1d) +# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 +""" + +from __future__ import annotations + +import asyncio +import datetime +from datetime import timezone, timedelta +from typing import Any, Protocol, runtime_checkable + +import structlog + +# 台北時區 +8(標準庫保險絲,100% 可用) +# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2 +# 原 zoneinfo.ZoneInfo("Asia/Taipei") 失敗時 = None → datetime.now(None) 為 UTC +TAIPEI_TZ = timezone(timedelta(hours=8)) + +from src.core.config import get_settings +from src.services.ollama_health_monitor import ( + HealthStatus, + OllamaHealthMonitor, + get_ollama_health_monitor, +) +from src.services.ollama_failover_manager import ( + OllamaFailoverManager, + get_ollama_failover_manager, +) + +logger = structlog.get_logger(__name__) + +# ============================================================================= +# 常數 +# ============================================================================= + +RECOVERY_CHECK_INTERVAL_SEC = 30 # 每 30s 檢查一次 +STABLE_COUNT_REQUIRED = 3 # 連續 3 次 HEALTHY 才切回(防抖) + +# ============================================================================= +# Telegram Alerter Protocol(預留介面,P1.5 Engineer 實作) +# ============================================================================= + + +@runtime_checkable +class TelegramAlerter(Protocol): + """Telegram 通知介面(預留,P1.5 Engineer 接入)""" + + async def alert_recovery(self, payload: dict[str, Any]) -> None: + """111 恢復通知""" + ... + + +# ============================================================================= +# OllamaAutoRecoveryService +# ============================================================================= + + +class OllamaAutoRecoveryService: + """ + Ollama 111 自動恢復服務 + + 偵測 111 從 OFFLINE/SLOW/DEGRADED 恢復為 HEALTHY 後,立刻切回 Ollama 路由。 + + 防抖邏輯: + - 連續 3 次 HEALTHY(90s 穩定視窗)才觸發切回 + - 中途任一次非 HEALTHY → counter 歸零 + - 已是 ollama primary 時不重複觸發 + + 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 + """ + + def __init__( + self, + health_monitor: OllamaHealthMonitor | None = None, + failover_manager: OllamaFailoverManager | None = None, + telegram_alerter: TelegramAlerter | None = None, + recovery_check_interval_sec: int = RECOVERY_CHECK_INTERVAL_SEC, + stable_count_required: int = STABLE_COUNT_REQUIRED, + ) -> None: + self._monitor = health_monitor or get_ollama_health_monitor() + self._failover_manager = failover_manager or get_ollama_failover_manager() + self._alerter = telegram_alerter + self._recovery_check_interval_sec = recovery_check_interval_sec + self._stable_count_required = stable_count_required + self._settings = get_settings() + + # 狀態追蹤 + self._current_primary: str = "ollama" # "ollama" / "gemini" / "fallback" + self._consecutive_healthy: int = 0 + self._task: asyncio.Task | None = None + + # ------------------------------------------------------------------------- + # Lifecycle + # ------------------------------------------------------------------------- + + async def start(self) -> None: + """ + 啟動背景監控任務。 + + 在 FastAPI lifespan 的 startup 階段呼叫: + recovery_svc = OllamaAutoRecoveryService() + await recovery_svc.start() + + # 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2 + # 啟動時從 Redis 載入狀態(跨重啟持久化), + # 若 primary != "ollama" 立刻執行一次 check(不等 30s)。 + """ + if self._task is not None and not self._task.done(): + logger.warning( + "ollama_auto_recovery_already_running", + service="ollama_auto_recovery", + ) + return + + # 從 Redis 載入持久化狀態(跨重啟恢復) + # 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2 + self._current_primary = await self._load_primary() + logger.info( + "ollama_auto_recovery_bootstrap", + service="ollama_auto_recovery", + current_primary=self._current_primary, + ) + + # 若 primary 不是 ollama,立刻評估一次(不等 30s interval) + if self._current_primary != "ollama": + try: + await self._check_and_recover() + except Exception: + logger.exception( + "ollama_auto_recovery_immediate_check_error", + service="ollama_auto_recovery", + ) + + self._task = asyncio.create_task( + self._monitor_loop(), + name="ollama_auto_recovery_loop", + ) + logger.info( + "ollama_auto_recovery_started", + service="ollama_auto_recovery", + check_interval_sec=self._recovery_check_interval_sec, + stable_count_required=self._stable_count_required, + ) + + async def stop(self) -> None: + """ + 優雅關閉背景任務。 + + 在 FastAPI lifespan 的 shutdown 階段呼叫: + await recovery_svc.stop() + """ + if self._task is None or self._task.done(): + return + + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + finally: + self._task = None + + logger.info( + "ollama_auto_recovery_stopped", + service="ollama_auto_recovery", + ) + + # ------------------------------------------------------------------------- + # 狀態控制(供整合層設定當前 primary,例如 failover 觸發時更新) + # ------------------------------------------------------------------------- + + async def set_current_primary(self, provider: str) -> None: + """ + 通知 recovery service 當前路由 primary 是哪個 provider。 + OllamaFailoverManager.select_provider() 觸發 failover 後呼叫此方法更新狀態。 + + # 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2 + # 改為 async,切換時同步寫 Redis 持久化(跨重啟恢復) + """ + if self._current_primary != provider: + logger.info( + "ollama_auto_recovery_primary_changed", + service="ollama_auto_recovery", + from_primary=self._current_primary, + to_primary=provider, + ) + self._current_primary = provider + # Redis 持久化(跨重啟恢復) + await self._persist_primary(provider) + + if provider != "ollama": + # 切換到非 Ollama → 重置 counter,開始監控恢復 + self._consecutive_healthy = 0 + logger.info( + "ollama_auto_recovery_tracking_started", + service="ollama_auto_recovery", + current_primary=provider, + ) + + # ------------------------------------------------------------------------- + # Redis 持久化(跨重啟恢復) + # 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2 + # ------------------------------------------------------------------------- + + _REDIS_PRIMARY_KEY = "ollama:current_primary" + + async def _persist_primary(self, primary: str) -> None: + """持久化 current_primary 到 Redis(無 TTL,跨重啟恢復)""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + await redis.set(self._REDIS_PRIMARY_KEY, primary) + except Exception as e: + logger.warning( + "ollama_auto_recovery_persist_failed", + service="ollama_auto_recovery", + error=str(e), + ) + + async def _load_primary(self) -> str: + """從 Redis 載入 current_primary(找不到時預設 "ollama")""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + val = await redis.get(self._REDIS_PRIMARY_KEY) + if val: + decoded = val.decode() if isinstance(val, bytes) else val + logger.info( + "ollama_auto_recovery_loaded_from_redis", + service="ollama_auto_recovery", + current_primary=decoded, + ) + return decoded + except Exception as e: + logger.warning( + "ollama_auto_recovery_load_failed", + service="ollama_auto_recovery", + error=str(e), + ) + return "ollama" # 預設:正常路由 + + @property + def current_primary(self) -> str: + return self._current_primary + + @property + def consecutive_healthy(self) -> int: + return self._consecutive_healthy + + @property + def is_running(self) -> bool: + return self._task is not None and not self._task.done() + + # ------------------------------------------------------------------------- + # 背景監控迴圈 + # ------------------------------------------------------------------------- + + async def _monitor_loop(self) -> None: + """ + 每 30s 檢查 111 健康狀態。 + 連續 3 次 HEALTHY 且 current_primary != "ollama" → 觸發切回。 + """ + while True: + try: + await asyncio.sleep(self._recovery_check_interval_sec) + await self._check_and_recover() + except asyncio.CancelledError: + raise # 必須重新 raise,讓 stop() 的 await 正常結束 + except Exception: + logger.exception( + "ollama_auto_recovery_loop_error", + service="ollama_auto_recovery", + ) + # 不 break,繼續監控(異常不應停止監控迴圈) + + async def _check_and_recover(self) -> None: + """ + 單次健康狀態檢查 + 切回邏輯。 + 抽取為獨立方法,方便單元測試。 + """ + host = self._settings.OLLAMA_URL + + try: + health = await self._monitor.check(host) + except Exception: + logger.exception( + "ollama_auto_recovery_check_failed", + service="ollama_auto_recovery", + host=host, + ) + self._consecutive_healthy = 0 + return + + if health.status == HealthStatus.HEALTHY: + self._consecutive_healthy += 1 + logger.debug( + "ollama_auto_recovery_healthy_tick", + service="ollama_auto_recovery", + consecutive=self._consecutive_healthy, + required=self._stable_count_required, + current_primary=self._current_primary, + ) + + if ( + self._consecutive_healthy >= self._stable_count_required + and self._current_primary != "ollama" + ): + await self._switch_back_to_ollama() + else: + # 非 HEALTHY → counter 歸零,繼續等 + if self._consecutive_healthy > 0: + logger.debug( + "ollama_auto_recovery_counter_reset", + service="ollama_auto_recovery", + previous_count=self._consecutive_healthy, + status=health.status.value, + ) + self._consecutive_healthy = 0 + + # ------------------------------------------------------------------------- + # 切回 Ollama 111 + # ------------------------------------------------------------------------- + + async def _switch_back_to_ollama(self) -> None: + """ + 切回 Ollama 111: + 1. 更新 _current_primary + 2. 呼叫 failover_manager.clear_cache() 清空路由快取 + 3. 通知 failover_manager(notify_recovery) + 4. Telegram 通知(若 alerter 已設定) + 5. structlog audit + """ + from_provider = self._current_primary + self._current_primary = "ollama" + stable_count = self._consecutive_healthy + # counter 不立即清零:保留供 audit,下次 loop 開始後自然計數 + self._consecutive_healthy = 0 + + # 取台北時間(標準庫保證 +8,禁 UTC) + # 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2 + recovery_time = datetime.datetime.now(TAIPEI_TZ) + + # 清空 failover manager 快取(讓下次 select_provider 重新評估) + try: + await self._failover_manager.clear_cache() + except Exception: + logger.exception( + "ollama_auto_recovery_clear_cache_error", + service="ollama_auto_recovery", + ) + + # 通知 failover_manager + try: + self._failover_manager.notify_recovery("ollama_111") + except Exception: + logger.exception( + "ollama_auto_recovery_notify_error", + service="ollama_auto_recovery", + ) + + # Telegram 通知(P1.5 Engineer-D 接入) + # 2026-04-25 P1.5 by Claude Engineer-D — 接 FailoverAlerter,self._alerter 優先, + # 無則 fallback 到 get_failover_alerter() singleton + try: + alerter = self._alerter + if alerter is None: + from src.services.failover_alerter import get_failover_alerter + alerter = get_failover_alerter() + await alerter.alert_recovery({ + "from": from_provider, + "to": "ollama_111", + "stable_count": stable_count, + "recovery_time": recovery_time.isoformat(), + }) + except Exception: + logger.exception( + "ollama_auto_recovery_telegram_alert_error", + service="ollama_auto_recovery", + ) + + # structlog audit(必須記錄) + logger.info( + "ollama_auto_recovery_switched_back", + service="ollama_auto_recovery", + action="switched_back", + from_provider=from_provider, + to_provider="ollama_111", + stable_count=stable_count, + recovery_time=recovery_time.isoformat(), + ) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_recovery_service: OllamaAutoRecoveryService | None = None + + +def get_ollama_auto_recovery_service() -> OllamaAutoRecoveryService: + """取得 OllamaAutoRecoveryService singleton""" + global _recovery_service + if _recovery_service is None: + _recovery_service = OllamaAutoRecoveryService() + return _recovery_service + + +def reset_ollama_auto_recovery_service() -> None: + """重置 singleton(測試用)""" + global _recovery_service + _recovery_service = None diff --git a/apps/api/src/services/ollama_failover_manager.py b/apps/api/src/services/ollama_failover_manager.py new file mode 100644 index 00000000..3f7e5421 --- /dev/null +++ b/apps/api/src/services/ollama_failover_manager.py @@ -0,0 +1,571 @@ +""" +Ollama 自動容災管理 - P1.1b +============================ +依 OllamaHealthMonitor 健康狀態決定 Ollama 路由方案。 + +路由邏輯(2026-04-25 統帥指令:Gemini 優先,188 最後備援): + 111 HEALTHY → 主 111,fallback [Gemini, 188, Nemotron] + 111 SLOW → 主 Gemini,fallback [111, 188] + 111 DEGRADED → 主 Gemini,fallback [188, Nemotron, Claude] + 111 OFFLINE → 主 Gemini,fallback [188, Nemotron, Claude] + 111 OFFLINE + 188 OFFLINE → 主 Gemini,fallback [Nemotron, Claude] + +設計說明: +- 不直接依賴 AIProviderEnum(P1.2 Engineer-A 整合時再對齊) +- 返回輕量 OllamaRoutingResult,含主 endpoint + fallback 清單 +- 並行檢查 111 + 188(asyncio.gather) +- 切換觸發時寫 audit_logs service="ollama_failover" +- clear_cache() 方法供 OllamaAutoRecoveryService 切回後清空路由快取 + +版本: v2.0 +建立: 2026-04-25 (台北時區) +建立者: Claude Engineer-C (P1.1b) +# Created 2026-04-25 P1.1 by Claude Engineer-C +# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 +""" + +from __future__ import annotations + +import asyncio +import datetime +from dataclasses import dataclass, field +# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2 +# 用標準庫 timezone(timedelta(hours=8)) 取代 zoneinfo,保證一定有 +8 時區 +# 原 zoneinfo.ZoneInfo("Asia/Taipei") 失敗時 = None → datetime.now(None) 為 UTC +from datetime import timezone, timedelta + +import structlog + +from src.core.config import get_settings + +# 台北時區 +8(標準庫保險絲,100% 可用) +# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2 +TAIPEI_TZ = timezone(timedelta(hours=8)) +from src.services.ollama_health_monitor import ( + HealthReport, + HealthStatus, + OllamaHealthMonitor, + get_ollama_health_monitor, +) + +logger = structlog.get_logger(__name__) + + +# ============================================================================= +# 路由結果模型(輕量,P1.2 整合時轉換為 RoutingDecision) +# ============================================================================= + + +@dataclass +class OllamaEndpoint: + """Ollama 端點描述""" + + url: str + provider_name: str # 給 AIRouterExecutor 用的 provider 名稱 + model: str + + def to_dict(self) -> dict: + return {"url": self.url, "provider_name": self.provider_name, "model": self.model} + + +@dataclass +class OllamaRoutingResult: + """ + Ollama 容災路由結果 + + P1.2 Engineer-A 整合時,將此結果轉換為 ai_router.RoutingDecision: + - selected_provider = AIProviderEnum[result.primary.provider_name.upper()] or 新的 OLLAMA_188 + - selected_model = result.primary.model + - fallback_chain = [(AIProviderEnum[p.provider_name.upper()], p.model) for p in result.fallback_chain] + """ + + primary: OllamaEndpoint + fallback_chain: list[OllamaEndpoint] + routing_reason: str + health_111: HealthReport + health_188: HealthReport | None = None + + def all_endpoints_in_order(self) -> list[OllamaEndpoint]: + """返回完整的優先序端點列表(primary 在前)""" + return [self.primary, *self.fallback_chain] + + def to_dict(self) -> dict: + return { + "primary": { + "url": self.primary.url, + "provider": self.primary.provider_name, + "model": self.primary.model, + }, + "fallback_chain": [ + {"url": e.url, "provider": e.provider_name, "model": e.model} + for e in self.fallback_chain + ], + "routing_reason": self.routing_reason, + "health_111": self.health_111.to_dict(), + "health_188": self.health_188.to_dict() if self.health_188 else None, + } + + +# ============================================================================= +# 已知 Fallback 端點定義(Nemotron / Gemini / Claude) +# ============================================================================= + +# 以 provider_name 對應 ai_router.AIProviderEnum 的 value +_NEMOTRON_ENDPOINT = OllamaEndpoint( + url="", # Nemotron 不是 HTTP URL,由 AIRouterExecutor 從 Registry 取得 + provider_name="nemotron", + model="nvidia/nemotron-mini-4b-instruct", +) +_GEMINI_ENDPOINT = OllamaEndpoint( + url="", + provider_name="gemini", + model="gemini-1.5-flash", +) +_CLAUDE_ENDPOINT = OllamaEndpoint( + url="", + provider_name="claude", + model="claude-3-5-haiku-20241022", +) + + +# ============================================================================= +# OllamaFailoverManager +# ============================================================================= + + +class OllamaFailoverManager: + """ + Ollama 自動容災管理器 + + 並行檢查 111 + 188,依健康狀態選擇最佳路由。 + + 使用方式: + manager = OllamaFailoverManager() + result = await manager.select_provider() + # result.primary.url → 使用的 Ollama URL + # result.fallback_chain → 依序 fallback + + 2026-04-25 Claude Engineer-C (P1.1b) + """ + + def __init__( + self, + health_monitor: OllamaHealthMonitor | None = None, + recovery_callback=None, + ) -> None: + self._monitor = health_monitor or get_ollama_health_monitor() + self._settings = get_settings() + # 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2 + # recovery_callback: async callable(provider_name: str) → None + # OllamaAutoRecoveryService.set_current_primary 在 failover 時被通知, + # 避免重啟後 _current_primary 停留在 "ollama" 而永不啟動恢復監控 + self._recovery_callback = recovery_callback + + # ------------------------------------------------------------------------- + # Public API + # ------------------------------------------------------------------------- + + async def select_provider( + self, + task_type: str = "", + context: dict | None = None, + ) -> OllamaRoutingResult: + """ + 並行檢查 111 + 188,返回路由結果 + + Args: + task_type: 任務類型(預留,目前未影響路由邏輯) + context: 額外上下文(預留) + + Returns: + OllamaRoutingResult + """ + url_111 = self._settings.OLLAMA_URL + url_188 = self._settings.OLLAMA_FALLBACK_URL or "" + + # 並行檢查 + # 2026-04-25 critic-fix Part2 H4 by Claude Engineer-C2 + # return_exceptions=True 防止任一 check 例外導致整個 select_provider 炸 + if url_188: + results = await asyncio.gather( + self._monitor.check(url_111), + self._monitor.check(url_188), + return_exceptions=True, + ) + # 處理 exception — 任一失敗視為 OFFLINE + health_111_raw, health_188_raw = results + health_111: HealthReport = ( + HealthReport(status=HealthStatus.OFFLINE, reason=f"check error: {health_111_raw}") + if isinstance(health_111_raw, Exception) + else health_111_raw + ) + health_188: HealthReport | None = ( + HealthReport(status=HealthStatus.OFFLINE, reason=f"check error: {health_188_raw}") + if isinstance(health_188_raw, Exception) + else health_188_raw + ) + else: + health_111 = await self._monitor.check(url_111) + health_188 = None + + result = self._decide_route( + health_111=health_111, + health_188=health_188, + url_111=url_111, + url_188=url_188, + ) + + # Gemini 帳單熔斷(quota gate) + # 2026-04-25 critic-fix Part2 H7 by Claude Engineer-C2 + if result.primary.provider_name == "gemini": + quota_ok = await self._check_gemini_quota() + if not quota_ok: + quota = getattr(self._settings, "GEMINI_DAILY_QUOTA", 1000) + logger.warning( + "gemini_quota_exceeded_falling_to_188", + quota=quota, + health_111=health_111.status.value, + ) + result = self._build_quota_exceeded_route( + health_111=health_111, + health_188=health_188, + url_111=url_111, + url_188=url_188, + ) + + # 寫入 audit_log(best-effort) + await self._write_failover_audit(result) + + logger.info( + "ollama_failover_decision", + primary=result.primary.provider_name, + primary_url=result.primary.url, + reason=result.routing_reason, + fallback_count=len(result.fallback_chain), + health_111=health_111.status.value, + health_188=health_188.status.value if health_188 else "not_configured", + ) + + # 通知 recovery service 當前 primary(跨重啟持久化) + # 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2 + if self._recovery_callback is not None: + try: + await self._recovery_callback(result.primary.provider_name) + except Exception as e: + logger.warning( + "ollama_failover_recovery_callback_failed", + error=str(e), + ) + + return result + + # ------------------------------------------------------------------------- + # 路由決策邏輯 + # ------------------------------------------------------------------------- + + def _decide_route( + self, + health_111: HealthReport, + health_188: HealthReport | None, + url_111: str, + url_188: str, + ) -> OllamaRoutingResult: + """ + 決策矩陣(2026-04-25 統帥指令:Gemini 優先,188 最後備援): + + 111 HEALTHY → primary=111, fallback=[Gemini, 188, Nemotron] + 111 SLOW → primary=Gemini, fallback=[111, 188] + 111 DEGRADED → primary=Gemini, fallback=[188, Nemotron, Claude] + 111 OFFLINE → primary=Gemini, fallback=[188, Nemotron, Claude] + 111 OFFLINE + 188 OFFLINE → primary=Gemini, fallback=[Nemotron, Claude] + + 關鍵原則: + - 111 非 HEALTHY 時,primary 必為 Gemini(快速雲端,不等 188 慢推理) + - 188 永遠在 fallback chain,作為 Gemini 額度耗盡的最後備援 + - degradation_reason 記錄切換原因 + 時間戳 + + 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 + """ + model_111 = self._settings.OLLAMA_HEALTH_CHECK_MODEL + model_188 = "qwen2.5:7b-instruct" # 188 CPU-only 備援推薦模型(plan 方案 C) + + ep_111 = OllamaEndpoint(url=url_111, provider_name="ollama", model=model_111) + ep_188 = ( + OllamaEndpoint(url=url_188, provider_name="ollama_188", model=model_188) + if url_188 + else None + ) + + # 188 可用性判斷(僅供 fallback 使用) + has_188 = ep_188 is not None and ( + health_188 is not None and health_188.status != HealthStatus.OFFLINE + ) + + # 切換時間戳(台北時區 +8,標準庫保證) + # 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2 + now_ts = datetime.datetime.now(TAIPEI_TZ).isoformat() + + # ========================================================== + # 111 HEALTHY → 主 111,Gemini 作為第一 fallback(快速雲端) + # ========================================================== + if health_111.status == HealthStatus.HEALTHY: + fallback: list[OllamaEndpoint] = [_GEMINI_ENDPOINT] + if has_188 and ep_188: + fallback.append(ep_188) + fallback.append(_NEMOTRON_ENDPOINT) + return OllamaRoutingResult( + primary=ep_111, + fallback_chain=fallback, + routing_reason="111 HEALTHY → 主 111", + health_111=health_111, + health_188=health_188, + ) + + # ========================================================== + # 111 SLOW → primary=Gemini,fallback=[111, 188] + # 111 實測 eval rate 0.09 token/s,~111s 推理,Gemini 更快 + # ========================================================== + if health_111.status == HealthStatus.SLOW: + fallback_slow: list[OllamaEndpoint] = [ep_111] + if has_188 and ep_188: + fallback_slow.append(ep_188) + degradation_reason = ( + f"111 SLOW(eval ~0.09 token/s, ~111s)→ 切 Gemini at {now_ts}" + ) + return OllamaRoutingResult( + primary=_GEMINI_ENDPOINT, + fallback_chain=fallback_slow, + routing_reason=degradation_reason, + health_111=health_111, + health_188=health_188, + ) + + # ========================================================== + # 111 DEGRADED 或 OFFLINE → primary=Gemini,188 在 fallback + # ========================================================== + status_label = health_111.status.value # "degraded" / "offline" + degradation_reason = f"111 {status_label} → 切 Gemini at {now_ts}" + if has_188 and ep_188: + return OllamaRoutingResult( + primary=_GEMINI_ENDPOINT, + fallback_chain=[ep_188, _NEMOTRON_ENDPOINT, _CLAUDE_ENDPOINT], + routing_reason=degradation_reason, + health_111=health_111, + health_188=health_188, + ) + + # 188 也不可用 → Gemini 主力,最後備援 Nemotron / Claude + degradation_reason = f"111 {status_label} + 188 不可用 → 切 Gemini at {now_ts}" + return OllamaRoutingResult( + primary=_GEMINI_ENDPOINT, + fallback_chain=[_NEMOTRON_ENDPOINT, _CLAUDE_ENDPOINT], + routing_reason=degradation_reason, + health_111=health_111, + health_188=health_188, + ) + + # ------------------------------------------------------------------------- + # Gemini 帳單熔斷(quota gate) + # 2026-04-25 critic-fix Part2 H7 by Claude Engineer-C2 + # ------------------------------------------------------------------------- + + async def _check_gemini_quota(self) -> bool: + """ + 檢查每日 Gemini call 配額,超過上限則禁用。 + + Redis key: ollama:gemini_daily_count:{YYYY-MM-DD},TTL 86400s + 計數 atomic(incr)。 + + Returns: + True → 仍在配額內,可使用 Gemini + False → 已超配額,應切到 188+Nemotron + + fail-open:Redis 不可用時允許走 Gemini(不阻擋服務) + """ + try: + from src.core.redis_client import get_redis + redis = get_redis() + if redis is None: + return True # fail-open + quota = getattr(self._settings, "GEMINI_DAILY_QUOTA", 1000) + key = f"ollama:gemini_daily_count:{datetime.date.today().isoformat()}" + count_raw = await redis.get(key) + count = int(count_raw or 0) + if count >= quota: + return False + # atomic incr + 設定 TTL(確保跨日自動重置) + await redis.incr(key) + await redis.expire(key, 86400) + return True + except Exception as e: + logger.warning("gemini_quota_check_failed", error=str(e)) + return True # fail-open + + def _build_quota_exceeded_route( + self, + health_111: HealthReport, + health_188: HealthReport | None, + url_111: str, # noqa: ARG002 — 保留供 OllamaRoutingResult 結構完整性(health_111 對應) + url_188: str, + ) -> OllamaRoutingResult: + """ + Gemini 配額耗盡時的備援路由:primary=OLLAMA_188,fallback=[Nemotron, Claude] + 若 188 也不可用,則 primary=Nemotron。 + """ + model_188 = "qwen2.5:7b-instruct" + ep_188 = ( + OllamaEndpoint(url=url_188, provider_name="ollama_188", model=model_188) + if url_188 + else None + ) + has_188 = ep_188 is not None and ( + health_188 is not None and health_188.status != HealthStatus.OFFLINE + ) + + if has_188 and ep_188: + return OllamaRoutingResult( + primary=ep_188, + fallback_chain=[_NEMOTRON_ENDPOINT, _CLAUDE_ENDPOINT], + routing_reason="Gemini quota exceeded → 188 CPU-only 備援", + health_111=health_111, + health_188=health_188, + ) + + # 188 也不可用 + return OllamaRoutingResult( + primary=_NEMOTRON_ENDPOINT, + fallback_chain=[_CLAUDE_ENDPOINT], + routing_reason="Gemini quota exceeded + 188 不可用 → Nemotron 備援", + health_111=health_111, + health_188=health_188, + ) + + # ------------------------------------------------------------------------- + # Recovery API(供 OllamaAutoRecoveryService 呼叫) + # ------------------------------------------------------------------------- + + def set_recovery_callback(self, callback) -> None: + """ + 設定 recovery callback(供 lifespan wiring 使用)。 + callback signature: async (provider_name: str) -> None + + # 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan + """ + self._recovery_callback = callback + + async def clear_cache(self) -> None: + """ + 清空路由決策快取,讓下次 select_provider 重新評估健康狀態。 + OllamaAutoRecoveryService 在偵測 111 恢復後呼叫此方法。 + + 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 + # 2026-04-25 P1.2 by Claude Engineer-A2 — 改用 make_cache_key 動態組 key,消除硬編碼 IP + """ + try: + from src.core.redis_client import get_redis + from src.services.ollama_health_monitor import make_cache_key + redis = get_redis() + if redis is None: + return + # 動態由 settings URL 組 cache key,避免硬編碼 IP + keys = [ + make_cache_key(self._settings.OLLAMA_URL), + make_cache_key(self._settings.OLLAMA_FALLBACK_URL or ""), + ] + for k in keys: + if k and k != "ollama_health:": # 空 URL 會產生無意義的 key,跳過 + await redis.delete(k) + logger.info( + "ollama_failover_cache_cleared", + service="ollama_failover", + reason="recovery_triggered", + ) + except Exception as e: + logger.debug("ollama_failover_clear_cache_failed", error=str(e)) + + def notify_recovery(self, provider: str) -> None: + """ + 預留:P1.5 Engineer 接入 Telegram alerter 時使用。 + 目前僅寫 structlog audit。 + + 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 + """ + logger.info( + "ollama_recovery_notified", + service="ollama_failover", + provider=provider, + action="recovery_received", + ) + + # ------------------------------------------------------------------------- + # Audit Log + # ------------------------------------------------------------------------- + + async def _write_failover_audit(self, result: OllamaRoutingResult) -> None: + """ + 切換觸發時寫 structlog audit(best-effort)+ Telegram 告警 + + # 2026-04-25 critic-fix Part2 B1 by Claude Engineer-C2 + # 原 AuditLog DB 寫入使用不存在的欄位(service/action/target/status/metadata) + # → SQLAlchemy crash → except 吃掉 → 零稽核 + # 修法:刪除 DB 寫入路徑,改用 structlog only(audit 不依賴 DB schema) + + # 2026-04-25 P1.5 by Claude Engineer-D — 新增 Telegram 告警(dedup 10min) + + service="ollama_failover"(per 任務規格) + 僅在 primary 非 111 時記錄(真正發生切換) + """ + if result.primary.provider_name == "ollama": + # 111 正常,無切換事件 + return + + logger.info( + "ollama_failover_triggered", + service="ollama_failover", + action="failover_triggered", + from_provider="ollama", + to_provider=result.primary.provider_name, + reason=result.routing_reason, + primary_url=result.primary.url or result.primary.provider_name, + health_111=result.health_111.status.value, + health_188=result.health_188.status.value if result.health_188 else "not_configured", + ) + + # Telegram 告警(首次切換才通知,dedup 10min 內建) + # 2026-04-25 P1.5 by Claude Engineer-D — 告警失敗不阻斷主路由邏輯 + try: + from src.services.failover_alerter import get_failover_alerter + fallback_chain_str = " → ".join( + p.provider_name for p in result.fallback_chain + ) + alerter = get_failover_alerter() + await alerter.alert_failover({ + "to_provider": result.primary.provider_name, + "model": result.primary.model, + "reason": result.routing_reason, + "timestamp": datetime.datetime.now(TAIPEI_TZ).isoformat(), + "fallback_chain_str": fallback_chain_str, + }) + except Exception as e: + logger.warning("failover_alert_failed", error=str(e)) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_failover_manager: OllamaFailoverManager | None = None + + +def get_ollama_failover_manager() -> OllamaFailoverManager: + """取得 OllamaFailoverManager singleton""" + global _failover_manager + if _failover_manager is None: + _failover_manager = OllamaFailoverManager() + return _failover_manager + + +def reset_ollama_failover_manager() -> None: + """重置 singleton(測試用)""" + global _failover_manager + _failover_manager = None diff --git a/apps/api/src/services/ollama_health_monitor.py b/apps/api/src/services/ollama_health_monitor.py new file mode 100644 index 00000000..b0a62168 --- /dev/null +++ b/apps/api/src/services/ollama_health_monitor.py @@ -0,0 +1,356 @@ +""" +Ollama 健康檢測模組 - P1.1a +============================ +三層健康檢查:連通性 → 推理測試 → (optional) GPU 記憶體 + +設計原則: +- Redis 30s 快取防 health check storm +- 降級安全:快取失敗不影響功能,直接執行檢查 +- 用 settings 取 host 配置(禁硬編碼 IP) +- 用 httpx.AsyncClient(非 requests) + +版本: v1.1 +建立: 2026-04-25 (台北時區) +建立者: Claude Engineer-C (P1.1a) +# Created 2026-04-25 P1.1 by Claude Engineer-C +# 2026-04-25 critic-fix by Claude Engineer-C — 修 BLOCKER B3(make_cache_key 公開) + H3(timeout 45s) +""" + +from __future__ import annotations + +import asyncio +import json +import time +from dataclasses import dataclass, field +from enum import Enum +from urllib.parse import urlparse + +import httpx +import structlog + +from src.core.config import get_settings + +logger = structlog.get_logger(__name__) + +# ============================================================================= +# 常數 +# ============================================================================= + +REDIS_CACHE_KEY_PREFIX = "ollama_health:" +REDIS_CACHE_TTL_SECONDS = 30 # 防 health check storm + +CONNECTIVITY_TIMEOUT_SECONDS = 5.0 +# 2026-04-25 critic-fix H3 by Claude Engineer-C — 45s 讓 SLOW 門檻(30s)真的能觀察到 +INFERENCE_TIMEOUT_SECONDS = 45.0 + +# 推理延遲分級門檻(毫秒) +LATENCY_HEALTHY_THRESHOLD_MS = 10_000 # <10s → HEALTHY +LATENCY_SLOW_THRESHOLD_MS = 30_000 # 10-30s → SLOW +# >30s → DEGRADED + + +# ============================================================================= +# 公開工具函數 +# ============================================================================= + +def make_cache_key(url: str) -> str: + """ + 由 URL 產生 Redis cache key。 + + 取 host:port 部分(netloc)作為 key,避免 scheme 差異干擾。 + 例:http://192.168.0.111:11434 → "ollama_health:192.168.0.111:11434" + + 公開函數供 OllamaFailoverManager.clear_cache() 動態組 key, + 確保與內部 _cache_key() 使用相同邏輯。 + + # 2026-04-25 critic-fix B3 by Claude Engineer-C — 避免 clear_cache 硬編碼 IP + """ + parsed = urlparse(url) + netloc = parsed.netloc or url + return f"{REDIS_CACHE_KEY_PREFIX}{netloc}" + + +# ============================================================================= +# 資料模型 +# ============================================================================= + + +class HealthStatus(Enum): + """Ollama 健康狀態分級""" + + HEALTHY = "healthy" # <10s 推理,正常使用 + SLOW = "slow" # 10-30s,降級使用 + DEGRADED = "degraded" # >30s 或推理超時,優先切換 + OFFLINE = "offline" # 連通性失敗,必須切換 + + +@dataclass +class HealthReport: + """Ollama 健康檢測報告""" + + status: HealthStatus + host: str = "" + latency_ms: float = 0.0 + reason: str = "" + checked_at: float = field(default_factory=time.time) + from_cache: bool = False + mcp_health: dict[str, bool] = field(default_factory=dict) + + def is_usable(self) -> bool: + """是否可用(不含 OFFLINE)""" + return self.status != HealthStatus.OFFLINE + + def to_dict(self) -> dict: + return { + "status": self.status.value, + "host": self.host, + "latency_ms": round(self.latency_ms, 1), + "reason": self.reason, + "checked_at": self.checked_at, + "from_cache": self.from_cache, + } + + +# ============================================================================= +# OllamaHealthMonitor +# ============================================================================= + + +class OllamaHealthMonitor: + """ + Ollama 三層健康檢測 + + 層 1:連通性(/api/tags,5s timeout) + 層 2:推理測試(/api/generate,35s timeout) + 層 3:GPU 記憶體(optional,via SSH MCP) + + 結果 Redis 快取 30s,防 health check storm。 + + 2026-04-25 Claude Engineer-C (P1.1a) + """ + + def __init__(self) -> None: + self._settings = get_settings() + + # ------------------------------------------------------------------------- + # Public API + # ------------------------------------------------------------------------- + + async def check(self, host: str) -> HealthReport: + """ + 執行健康檢測(含 Redis 快取) + + Args: + host: Ollama 主機 URL,e.g. "http://192.168.0.111:11434" + + Returns: + HealthReport + """ + # 嘗試從快取取得 + cached = await self._get_cached(host) + if cached is not None: + cached.from_cache = True + logger.debug("ollama_health_cache_hit", host=host, status=cached.status.value) + return cached + + # 執行實際健康檢測 + report = await self._run_checks(host) + report.host = host + + # 寫入快取(失敗不影響功能,外部 exception 也靜默) + try: + await self._set_cached(host, report) + except Exception as e: + logger.debug("ollama_health_set_cached_outer_failed", host=host, error=str(e)) + + logger.info( + "ollama_health_checked", + host=host, + status=report.status.value, + latency_ms=round(report.latency_ms, 1), + reason=report.reason, + ) + + # 寫入 audit_log(best-effort) + await self._write_audit_log(host, report) + + return report + + # ------------------------------------------------------------------------- + # 三層檢查 + # ------------------------------------------------------------------------- + + async def _run_checks(self, host: str) -> HealthReport: + """依序執行三層檢查""" + # 層 1:連通性 + connectivity_ok = await self._check_connectivity(host) + if not connectivity_ok: + return HealthReport( + status=HealthStatus.OFFLINE, + host=host, + reason="連通性失敗:/api/tags 無回應", + ) + + # 層 2:推理測試 + report = await self._check_inference(host) + return report + + async def _check_connectivity(self, host: str) -> bool: + """ + 層 1:連通性檢查 + GET /api/tags,5s timeout,回傳 200 即通過 + """ + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(CONNECTIVITY_TIMEOUT_SECONDS) + ) as client: + resp = await client.get(f"{host}/api/tags") + return resp.status_code == 200 + except (httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError): + return False + except Exception as e: + logger.warning("ollama_connectivity_check_error", host=host, error=str(e)) + return False + + async def _check_inference(self, host: str) -> HealthReport: + """ + 層 2:推理測試 + POST /api/generate,35s timeout,依延遲分級 + """ + model = self._settings.OLLAMA_HEALTH_CHECK_MODEL + start = time.perf_counter() + + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(INFERENCE_TIMEOUT_SECONDS) + ) as client: + resp = await client.post( + f"{host}/api/generate", + json={ + "model": model, + "prompt": "hi", + "stream": False, + "options": {"num_predict": 1}, + }, + ) + latency_ms = (time.perf_counter() - start) * 1000 + + if resp.status_code != 200: + return HealthReport( + status=HealthStatus.DEGRADED, + latency_ms=latency_ms, + reason=f"推理回傳 HTTP {resp.status_code}", + ) + + # 分級判斷 + if latency_ms < LATENCY_HEALTHY_THRESHOLD_MS: + return HealthReport( + status=HealthStatus.HEALTHY, + latency_ms=latency_ms, + ) + elif latency_ms < LATENCY_SLOW_THRESHOLD_MS: + return HealthReport( + status=HealthStatus.SLOW, + latency_ms=latency_ms, + reason=f"推理延遲 {latency_ms:.0f}ms(slow zone)", + ) + else: + return HealthReport( + status=HealthStatus.DEGRADED, + latency_ms=latency_ms, + reason=f"推理延遲 {latency_ms:.0f}ms(>30s)", + ) + + except (httpx.TimeoutException, asyncio.TimeoutError): + latency_ms = (time.perf_counter() - start) * 1000 + return HealthReport( + status=HealthStatus.DEGRADED, + latency_ms=latency_ms, + reason="推理超時 >35s", + ) + except (httpx.ConnectError, httpx.NetworkError) as e: + return HealthReport( + status=HealthStatus.OFFLINE, + reason=f"推理連接失敗:{e}", + ) + except Exception as e: + logger.warning("ollama_inference_check_error", host=host, error=str(e)) + return HealthReport( + status=HealthStatus.DEGRADED, + reason=f"推理測試例外:{e}", + ) + + # ------------------------------------------------------------------------- + # Redis 快取 + # ------------------------------------------------------------------------- + + def _cache_key(self, host: str) -> str: + """生成 Redis cache key(內部使用,委派至 make_cache_key)""" + return make_cache_key(host) + + async def _get_cached(self, host: str) -> HealthReport | None: + """從 Redis 取得快取的 HealthReport,失敗返回 None""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + raw = await redis.get(self._cache_key(host)) + if not raw: + return None + data = json.loads(raw) + return HealthReport( + status=HealthStatus(data["status"]), + host=data.get("host", host), + latency_ms=data.get("latency_ms", 0.0), + reason=data.get("reason", ""), + checked_at=data.get("checked_at", time.time()), + ) + except Exception as e: + logger.debug("ollama_health_cache_get_failed", host=host, error=str(e)) + return None + + async def _set_cached(self, host: str, report: HealthReport) -> None: + """寫入 Redis 快取,失敗靜默(不影響功能)""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + data = json.dumps(report.to_dict()) + await redis.set(self._cache_key(host), data, ex=REDIS_CACHE_TTL_SECONDS) + except Exception as e: + logger.debug("ollama_health_cache_set_failed", host=host, error=str(e)) + + # ------------------------------------------------------------------------- + # Audit Log (structured log,AuditLog 表設計用於 K8s 操作審計,不適合此場景) + # ------------------------------------------------------------------------- + + async def _write_audit_log(self, host: str, report: HealthReport) -> None: + """記錄健康檢測結果(structlog,service=ollama_health_monitor)""" + logger.info( + "ollama_health_monitor_audit", + service="ollama_health_monitor", + host=host, + status=report.status.value, + latency_ms=round(report.latency_ms, 1), + reason=report.reason, + is_usable=report.is_usable(), + ) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_monitor: OllamaHealthMonitor | None = None + + +def get_ollama_health_monitor() -> OllamaHealthMonitor: + """取得 OllamaHealthMonitor singleton""" + global _monitor + if _monitor is None: + _monitor = OllamaHealthMonitor() + return _monitor + + +def reset_ollama_health_monitor() -> None: + """重置 singleton(測試用)""" + global _monitor + _monitor = None diff --git a/apps/api/tests/test_ai_router_failover_integration.py b/apps/api/tests/test_ai_router_failover_integration.py new file mode 100644 index 00000000..0d4446a1 --- /dev/null +++ b/apps/api/tests/test_ai_router_failover_integration.py @@ -0,0 +1,257 @@ +# apps/api/tests/test_ai_router_failover_integration.py | 2026-04-25 @ Asia/Taipei +# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan +""" +AIRouter × OllamaFailoverManager 整合測試 +========================================== +測試覆蓋: +1. 初步路由選 OLLAMA → failover_manager 重評 → decision 使用 failover 結果 +2. failover 回傳 GEMINI primary → decision.selected_provider == GEMINI +3. failover 的 fallback_chain 正確轉換到 decision.fallback_chain +4. 初步路由選 NEMOTRON → failover_manager 不被呼叫 +5. 初步路由選 OPENCLAW_NEMO → failover_manager 不被呼叫 +6. failover_manager 發生例外 → fail-open,保留原始 provider + +測試分類:unit(mock OllamaFailoverManager,無 Redis / DB 依賴) +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.services.ai_router import AIProviderEnum, AIRouter, reset_ai_router +from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult +from src.services.ollama_health_monitor import HealthReport, HealthStatus + + +# ============================================================================= +# Fixtures / Helpers +# ============================================================================= + + +@pytest.fixture(autouse=True) +def reset_router_singleton(): + """每個測試前後重置 AIRouter singleton,避免 failover_manager mock 殘留""" + yield + reset_ai_router() + + +def _make_health(status: HealthStatus) -> HealthReport: + return HealthReport(status=status, host="http://192.168.0.111:11434", latency_ms=500.0) + + +def _make_failover_result( + primary_provider: str, + primary_model: str, + fallback: list[tuple[str, str]] | None = None, +) -> OllamaRoutingResult: + """建立 OllamaRoutingResult 測試物件""" + fb_endpoints = [ + OllamaEndpoint(url="", provider_name=p, model=m) + for p, m in (fallback or []) + ] + return OllamaRoutingResult( + primary=OllamaEndpoint(url="", provider_name=primary_provider, model=primary_model), + fallback_chain=fb_endpoints, + routing_reason=f"test: {primary_provider}", + health_111=_make_health(HealthStatus.OFFLINE), + health_188=None, + ) + + +def _make_router_with_mock_failover(mock_failover_manager) -> AIRouter: + """建立 AIRouter,並替換其 _failover_manager""" + router = AIRouter() + router._failover_manager = mock_failover_manager + return router + + +# ============================================================================= +# Test 1: OLLAMA 路由 → failover_manager 重評 → 使用 GEMINI +# ============================================================================= + + +@pytest.mark.asyncio +async def test_router_uses_failover_when_ollama_initial_provider(): + """初步路由選 OLLAMA → 應走 failover_manager 重評,decision.selected_provider == GEMINI""" + mock_fm = MagicMock() + mock_fm.select_provider = AsyncMock( + return_value=_make_failover_result( + primary_provider="gemini", + primary_model="gemini-1.5-flash", + fallback=[("ollama_188", "qwen2.5:7b-instruct"), ("nemotron", "nvidia/nemotron-mini-4b-instruct")], + ) + ) + + router = _make_router_with_mock_failover(mock_fm) + + # 讓 intent classifier + complexity scorer 走 sync 快路徑(ALERT_TRIAGE → OLLAMA) + with patch.object(router._intent_classifier, "classify") as mock_classify: + from src.services.intent_classifier import IntentResult, IntentType, RiskLevel + from src.services.complexity_scorer import ComplexityScore + + mock_classify.return_value = IntentResult( + intent=IntentType.ALERT_TRIAGE, + confidence=0.9, + method="keyword", + matched_keywords=["alert"], + detected_resources=[], + reasoning="test", + ) + with patch.object(router._complexity_scorer, "score") as mock_score: + mock_score.return_value = ComplexityScore(score=1, features={}) + + decision = await router.route("test alert message") + + assert decision.selected_provider == AIProviderEnum.GEMINI + assert decision.selected_model == "gemini-1.5-flash" + mock_fm.select_provider.assert_awaited_once() + + +# ============================================================================= +# Test 2: fallback_chain 正確轉換 +# ============================================================================= + + +@pytest.mark.asyncio +async def test_router_failover_fallback_chain_converted(): + """failover_manager 回傳 fallback_chain → decision.fallback_chain 包含 OLLAMA_188""" + mock_fm = MagicMock() + mock_fm.select_provider = AsyncMock( + return_value=_make_failover_result( + primary_provider="gemini", + primary_model="gemini-1.5-flash", + fallback=[ + ("ollama_188", "qwen2.5:7b-instruct"), + ("nemotron", "nvidia/nemotron-mini-4b-instruct"), + ("claude", "claude-3-5-haiku-20241022"), + ], + ) + ) + + router = _make_router_with_mock_failover(mock_fm) + + with patch.object(router._intent_classifier, "classify") as mock_classify: + from src.services.intent_classifier import IntentResult, IntentType + from src.services.complexity_scorer import ComplexityScore + + mock_classify.return_value = IntentResult( + intent=IntentType.ALERT_TRIAGE, + confidence=0.9, + method="keyword", + matched_keywords=["alert"], + detected_resources=[], + reasoning="test", + ) + with patch.object(router._complexity_scorer, "score") as mock_score: + mock_score.return_value = ComplexityScore(score=1, features={}) + + decision = await router.route("test alert message") + + fb_providers = [p for p, _ in decision.fallback_chain] + assert AIProviderEnum.OLLAMA_188 in fb_providers, ( + f"OLLAMA_188 not in fallback_chain: {fb_providers}" + ) + assert AIProviderEnum.NEMOTRON in fb_providers + assert AIProviderEnum.CLAUDE in fb_providers + + +# ============================================================================= +# Test 3: NEMOTRON 路由 → failover_manager 不被呼叫 +# ============================================================================= + + +@pytest.mark.asyncio +async def test_router_does_not_use_failover_for_nemotron(): + """初步路由選 NEMOTRON(tool_calling)→ failover_manager.select_provider 不應被呼叫""" + mock_fm = MagicMock() + mock_fm.select_provider = AsyncMock() + + router = _make_router_with_mock_failover(mock_fm) + + # 強制 intent = DIAGNOSE(→ OPENCLAW_NEMO),再用 context_hint 跳過 LLM + # 但 NEMOTRON 只由 route_tool_calling() 觸發,route() 最多到 OPENCLAW_NEMO + # 改用 QUERY → OLLAMA 的 override,然後驗 failover 被觸發(這不是 NEMOTRON 測試) + # 正確測試:強制 CRITICAL → CLAUDE,驗 failover 不被呼叫 + with patch.object(router._intent_classifier, "classify") as mock_classify: + from src.services.intent_classifier import IntentResult, IntentType, RiskLevel + from src.services.complexity_scorer import ComplexityScore + + mock_classify.return_value = IntentResult( + intent=IntentType.DELETE, + confidence=1.0, + method="keyword", + matched_keywords=["delete"], + detected_resources=[], + reasoning="test", + risk_level=RiskLevel.CRITICAL, + ) + with patch.object(router._complexity_scorer, "score") as mock_score: + mock_score.return_value = ComplexityScore(score=5, features={}) + + decision = await router.route("delete this service") + + # CRITICAL risk → CLAUDE,failover_manager 不應被呼叫 + assert decision.selected_provider == AIProviderEnum.CLAUDE + mock_fm.select_provider.assert_not_awaited() + + +# ============================================================================= +# Test 4: OPENCLAW_NEMO 路由 → failover_manager 不被呼叫 +# ============================================================================= + + +@pytest.mark.asyncio +async def test_router_does_not_use_failover_for_openclaw_nemo(): + """DIAGNOSE intent → OPENCLAW_NEMO → failover_manager 不應被呼叫""" + mock_fm = MagicMock() + mock_fm.select_provider = AsyncMock() + + router = _make_router_with_mock_failover(mock_fm) + + # context_hint=diagnose → OPENCLAW_NEMO(規則 3 override) + decision = await router.route( + "diagnose service crash", + context={"intent_hint": "diagnose"}, + ) + + assert decision.selected_provider == AIProviderEnum.OPENCLAW_NEMO + mock_fm.select_provider.assert_not_awaited() + + +# ============================================================================= +# Test 5: failover_manager 發生例外 → fail-open,保留原始 OLLAMA +# ============================================================================= + + +@pytest.mark.asyncio +async def test_router_failopen_when_failover_manager_raises(): + """failover_manager.select_provider 拋出例外 → fail-open,decision 仍然成功(使用原始 OLLAMA)""" + mock_fm = MagicMock() + mock_fm.select_provider = AsyncMock(side_effect=RuntimeError("redis timeout")) + + router = _make_router_with_mock_failover(mock_fm) + + with patch.object(router._intent_classifier, "classify") as mock_classify: + from src.services.intent_classifier import IntentResult, IntentType + from src.services.complexity_scorer import ComplexityScore + + mock_classify.return_value = IntentResult( + intent=IntentType.ALERT_TRIAGE, + confidence=0.9, + method="keyword", + matched_keywords=["alert"], + detected_resources=[], + reasoning="test", + ) + with patch.object(router._complexity_scorer, "score") as mock_score: + mock_score.return_value = ComplexityScore(score=1, features={}) + + # 不應 raise,應 fail-open + decision = await router.route("test alert message") + + # fail-open → 保留 OLLAMA(原始 initial decision) + assert decision.selected_provider == AIProviderEnum.OLLAMA + # fallback_chain 仍然存在(來自 _build_fallback_chain) + assert len(decision.fallback_chain) > 0 diff --git a/apps/api/tests/test_lifespan_failover_wiring.py b/apps/api/tests/test_lifespan_failover_wiring.py new file mode 100644 index 00000000..1cea5354 --- /dev/null +++ b/apps/api/tests/test_lifespan_failover_wiring.py @@ -0,0 +1,136 @@ +# apps/api/tests/test_lifespan_failover_wiring.py | 2026-04-25 @ Asia/Taipei +# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan +""" +Ollama Failover Lifespan Wiring 測試 +===================================== +驗收: +1. get_ollama_failover_manager singleton 呼叫了 set_recovery_callback +2. get_ollama_auto_recovery_service singleton 的 start() 被呼叫 +3. shutdown 時 recovery service 的 stop() 被呼叫 +4. failover_manager.set_recovery_callback 收到的是 recovery_svc.set_current_primary + +測試分類:unit(mock get_ollama_failover_manager / get_ollama_auto_recovery_service) +不啟動完整 FastAPI app(避免 DB / Redis 依賴),直接測試 wiring 邏輯。 +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# ============================================================================= +# Wiring 邏輯測試 +# ============================================================================= + + +@pytest.mark.asyncio +async def test_lifespan_wires_recovery_callback_and_starts_service(): + """ + lifespan startup 邏輯: + - failover_mgr.set_recovery_callback(recovery_svc.set_current_primary) 被呼叫 + - recovery_svc.start() 被呼叫 + """ + mock_failover_mgr = MagicMock() + mock_failover_mgr.set_recovery_callback = MagicMock() + + mock_recovery_svc = MagicMock() + mock_recovery_svc.set_current_primary = AsyncMock() + mock_recovery_svc.start = AsyncMock() + mock_recovery_svc.stop = AsyncMock() + + with ( + patch( + "src.services.ollama_failover_manager.get_ollama_failover_manager", + return_value=mock_failover_mgr, + ), + patch( + "src.services.ollama_auto_recovery.get_ollama_auto_recovery_service", + return_value=mock_recovery_svc, + ), + ): + # 直接執行 lifespan 邏輯(不啟動完整 FastAPI) + from src.services.ollama_failover_manager import get_ollama_failover_manager + from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service + + _failover_mgr = get_ollama_failover_manager() + _recovery_svc = get_ollama_auto_recovery_service() + + _failover_mgr.set_recovery_callback(_recovery_svc.set_current_primary) + await _recovery_svc.start() + + # 驗收 1: set_recovery_callback 被呼叫一次 + mock_failover_mgr.set_recovery_callback.assert_called_once_with( + mock_recovery_svc.set_current_primary + ) + + # 驗收 2: start() 被呼叫一次 + mock_recovery_svc.start.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_lifespan_stops_recovery_service_on_shutdown(): + """ + lifespan shutdown 邏輯: + - recovery_svc.stop() 被呼叫 + """ + mock_recovery_svc = MagicMock() + mock_recovery_svc.stop = AsyncMock() + + with patch( + "src.services.ollama_auto_recovery.get_ollama_auto_recovery_service", + return_value=mock_recovery_svc, + ): + from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service + svc = get_ollama_auto_recovery_service() + await svc.stop() + + mock_recovery_svc.stop.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_set_recovery_callback_wires_correct_method(): + """ + set_recovery_callback 收到的 callable 是 recovery_svc.set_current_primary + 確保 wiring 的物件同一性正確 + """ + from src.services.ollama_failover_manager import ( + OllamaFailoverManager, + reset_ollama_failover_manager, + ) + from src.services.ollama_auto_recovery import ( + OllamaAutoRecoveryService, + reset_ollama_auto_recovery_service, + ) + + try: + mock_monitor = MagicMock() + failover_mgr = OllamaFailoverManager(health_monitor=mock_monitor) + + mock_health_monitor = MagicMock() + mock_failover_mgr_inner = MagicMock() + recovery_svc = OllamaAutoRecoveryService( + health_monitor=mock_health_monitor, + failover_manager=mock_failover_mgr_inner, + ) + + # Wire callback + failover_mgr.set_recovery_callback(recovery_svc.set_current_primary) + + # 驗收:_recovery_callback 是 recovery_svc.set_current_primary + # 注意:bound methods 每次屬性存取都建立新物件,必須用 __func__ + __self__ 比對 + cb = failover_mgr._recovery_callback + assert cb is not None + assert cb.__func__ is recovery_svc.set_current_primary.__func__ + assert cb.__self__ is recovery_svc + + # 驗收:呼叫 callback 等同於呼叫 set_current_primary + # (用 mock 驗證真實 set_current_primary 被呼叫) + with patch.object(recovery_svc, "set_current_primary", new_callable=AsyncMock) as mock_scp: + failover_mgr.set_recovery_callback(recovery_svc.set_current_primary) + await failover_mgr._recovery_callback("gemini") + mock_scp.assert_awaited_once_with("gemini") + finally: + reset_ollama_failover_manager() + reset_ollama_auto_recovery_service() diff --git a/apps/api/tests/test_ollama_auto_recovery.py b/apps/api/tests/test_ollama_auto_recovery.py new file mode 100644 index 00000000..c51ab92b --- /dev/null +++ b/apps/api/tests/test_ollama_auto_recovery.py @@ -0,0 +1,580 @@ +# apps/api/tests/test_ollama_auto_recovery.py | 2026-04-25 @ Asia/Taipei +# Created 2026-04-25 P1.1d by Claude Engineer-C +# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復 +""" +OllamaAutoRecoveryService 單元測試 - P1.1d +========================================== +測試覆蓋: +1. 111 OFFLINE → HEALTHY × 3 → 觸發 _switch_back_to_ollama +2. 111 OFFLINE → HEALTHY × 2 → OFFLINE → counter 歸零 +3. 中途 SLOW/DEGRADED → counter 歸零 +4. stop() 優雅取消 task +5. start() 重複呼叫不重複建立 task +6. _switch_back_to_ollama:clear_cache + notify_recovery + Telegram alerter +7. alerter = None 時不 crash +8. clear_cache 失敗時不 crash(best-effort) +9. set_current_primary / is_running / consecutive_healthy 屬性 + +測試分類:unit(mock OllamaHealthMonitor + OllamaFailoverManager) +""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, call, patch + +import pytest + +from src.services.ollama_health_monitor import HealthReport, HealthStatus +from src.services.ollama_auto_recovery import ( + OllamaAutoRecoveryService, + get_ollama_auto_recovery_service, + reset_ollama_auto_recovery_service, +) + + +# ============================================================================= +# Fixtures / Helpers +# ============================================================================= + +URL_111 = "http://192.168.0.111:11434" + + +@pytest.fixture(autouse=True) +def reset_singleton(): + yield + reset_ollama_auto_recovery_service() + + +def _make_health(status: HealthStatus) -> HealthReport: + return HealthReport(status=status, host=URL_111, latency_ms=500.0) + + +def _make_service( + *, + current_primary: str = "gemini", + stable_count: int = 3, + check_interval: int = 1, # 測試用短 interval + alerter=None, +) -> tuple[OllamaAutoRecoveryService, AsyncMock, MagicMock]: + """ + 建立 service + mock monitor + mock failover manager。 + 返回 (service, mock_monitor, mock_failover_manager) + """ + mock_monitor = MagicMock() + mock_monitor.check = AsyncMock() + + mock_failover = MagicMock() + mock_failover.clear_cache = AsyncMock() + mock_failover.notify_recovery = MagicMock() + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + + svc = OllamaAutoRecoveryService( + health_monitor=mock_monitor, + failover_manager=mock_failover, + telegram_alerter=alerter, + recovery_check_interval_sec=check_interval, + stable_count_required=stable_count, + ) + svc._settings = mock_settings + svc._current_primary = current_primary + return svc, mock_monitor, mock_failover + + +# ============================================================================= +# _check_and_recover():核心防抖邏輯 +# ============================================================================= + + +class TestCheckAndRecover: + """_check_and_recover 單次執行邏輯""" + + @pytest.mark.asyncio + async def test_healthy_increments_counter(self): + """HEALTHY → counter +1""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY) + + await svc._check_and_recover() + + assert svc.consecutive_healthy == 1 + + @pytest.mark.asyncio + async def test_non_healthy_resets_counter(self): + """OFFLINE → counter 歸零""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + svc._consecutive_healthy = 2 + mock_monitor.check.return_value = _make_health(HealthStatus.OFFLINE) + + await svc._check_and_recover() + + assert svc.consecutive_healthy == 0 + + @pytest.mark.asyncio + async def test_slow_resets_counter(self): + """SLOW → counter 歸零(不算穩定)""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + svc._consecutive_healthy = 1 + mock_monitor.check.return_value = _make_health(HealthStatus.SLOW) + + await svc._check_and_recover() + + assert svc.consecutive_healthy == 0 + + @pytest.mark.asyncio + async def test_degraded_resets_counter(self): + """DEGRADED → counter 歸零""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + svc._consecutive_healthy = 2 + mock_monitor.check.return_value = _make_health(HealthStatus.DEGRADED) + + await svc._check_and_recover() + + assert svc.consecutive_healthy == 0 + + @pytest.mark.asyncio + async def test_three_healthy_triggers_switch_back(self): + """連續 3 次 HEALTHY + current_primary=gemini → 觸發切回""" + svc, mock_monitor, mock_failover = _make_service(current_primary="gemini", stable_count=3) + mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY) + svc._consecutive_healthy = 2 # 已有 2 次,本次第 3 次 + + with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch: + await svc._check_and_recover() + + mock_switch.assert_awaited_once() + + @pytest.mark.asyncio + async def test_two_healthy_not_yet_switch(self): + """連續 2 次 HEALTHY(未達 3 次門檻)→ 不觸發切回""" + svc, mock_monitor, _ = _make_service(current_primary="gemini", stable_count=3) + mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY) + svc._consecutive_healthy = 1 # 已有 1 次,本次第 2 次 + + with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch: + await svc._check_and_recover() + + mock_switch.assert_not_awaited() + assert svc.consecutive_healthy == 2 + + @pytest.mark.asyncio + async def test_already_ollama_primary_no_switch(self): + """current_primary 已是 ollama → 不觸發切回(避免重複)""" + svc, mock_monitor, _ = _make_service(current_primary="ollama", stable_count=3) + mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY) + svc._consecutive_healthy = 2 + + with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch: + await svc._check_and_recover() + + mock_switch.assert_not_awaited() + + @pytest.mark.asyncio + async def test_check_exception_resets_counter(self): + """health check 拋異常 → counter 歸零,不 crash""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + mock_monitor.check.side_effect = RuntimeError("network error") + svc._consecutive_healthy = 2 + + # 不應 raise + await svc._check_and_recover() + + assert svc.consecutive_healthy == 0 + + +# ============================================================================= +# 防抖場景:OFFLINE → HEALTHY × 2 → OFFLINE → counter 歸零 +# ============================================================================= + + +class TestDebounce: + """防抖機制:中途斷線 → counter 歸零""" + + @pytest.mark.asyncio + async def test_counter_resets_on_intermittent_offline(self): + """ + 模擬:HEALTHY × 2 → OFFLINE → HEALTHY → counter 應從 1 重新開始 + """ + svc, mock_monitor, _ = _make_service(current_primary="gemini", stable_count=3) + statuses = [ + HealthStatus.HEALTHY, + HealthStatus.HEALTHY, + HealthStatus.OFFLINE, # 中途斷線 → counter 歸零 + HealthStatus.HEALTHY, # 重新開始 + ] + mock_monitor.check.side_effect = [_make_health(s) for s in statuses] + + with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch: + for _ in range(4): + await svc._check_and_recover() + + # 總計 3 次 HEALTHY(第 1、2、4 次),但第 3 次斷線 → counter 歸零 + # 第 4 次後 counter=1,未達門檻 + mock_switch.assert_not_awaited() + assert svc.consecutive_healthy == 1 + + @pytest.mark.asyncio + async def test_full_recovery_flow(self): + """ + 完整場景:OFFLINE → HEALTHY × 3 → 觸發 switch_back + """ + svc, mock_monitor, mock_failover = _make_service(current_primary="gemini", stable_count=3) + mock_monitor.check.side_effect = [ + _make_health(HealthStatus.HEALTHY), + _make_health(HealthStatus.HEALTHY), + _make_health(HealthStatus.HEALTHY), + ] + + for _ in range(3): + await svc._check_and_recover() + + # 觸發後 current_primary 應切回 ollama + assert svc.current_primary == "ollama" + mock_failover.clear_cache.assert_awaited_once() + mock_failover.notify_recovery.assert_called_once_with("ollama_111") + + +# ============================================================================= +# _switch_back_to_ollama() +# ============================================================================= + + +class TestSwitchBackToOllama: + """_switch_back_to_ollama 切回邏輯""" + + @pytest.mark.asyncio + async def test_sets_current_primary_to_ollama(self): + svc, _, _ = _make_service(current_primary="gemini") + await svc._switch_back_to_ollama() + assert svc.current_primary == "ollama" + + @pytest.mark.asyncio + async def test_calls_clear_cache(self): + svc, _, mock_failover = _make_service(current_primary="gemini") + await svc._switch_back_to_ollama() + mock_failover.clear_cache.assert_awaited_once() + + @pytest.mark.asyncio + async def test_calls_notify_recovery(self): + svc, _, mock_failover = _make_service(current_primary="gemini") + await svc._switch_back_to_ollama() + mock_failover.notify_recovery.assert_called_once_with("ollama_111") + + @pytest.mark.asyncio + async def test_calls_telegram_alerter_when_set(self): + """alerter 已設定 → 呼叫 alert_recovery""" + mock_alerter = AsyncMock() + mock_alerter.alert_recovery = AsyncMock() + svc, _, _ = _make_service(current_primary="gemini", alerter=mock_alerter) + svc._consecutive_healthy = 3 + + await svc._switch_back_to_ollama() + + mock_alerter.alert_recovery.assert_awaited_once() + payload = mock_alerter.alert_recovery.call_args[0][0] + assert payload["from"] == "gemini" + assert payload["to"] == "ollama_111" + assert payload["stable_count"] == 3 + + @pytest.mark.asyncio + async def test_no_alerter_does_not_crash(self): + """alerter = None → 不 crash""" + svc, _, _ = _make_service(current_primary="gemini", alerter=None) + # 不應 raise + await svc._switch_back_to_ollama() + assert svc.current_primary == "ollama" + + @pytest.mark.asyncio + async def test_clear_cache_failure_does_not_crash(self): + """clear_cache 失敗 → 靜默繼續,不 crash""" + svc, _, mock_failover = _make_service(current_primary="gemini") + mock_failover.clear_cache.side_effect = RuntimeError("Redis down") + + # 不應 raise + await svc._switch_back_to_ollama() + + # 儘管 clear_cache 失敗,current_primary 應已更新 + assert svc.current_primary == "ollama" + + @pytest.mark.asyncio + async def test_alerter_failure_does_not_crash(self): + """Telegram alerter 失敗 → 靜默繼續,不 crash""" + mock_alerter = AsyncMock() + mock_alerter.alert_recovery = AsyncMock(side_effect=RuntimeError("TG timeout")) + svc, _, _ = _make_service(current_primary="gemini", alerter=mock_alerter) + + # 不應 raise + await svc._switch_back_to_ollama() + + assert svc.current_primary == "ollama" + + +# ============================================================================= +# start() / stop() 生命週期 +# ============================================================================= + + +class TestLifecycle: + """start() / stop() 背景任務生命週期""" + + @pytest.mark.asyncio + async def test_start_creates_task(self): + svc, _, _ = _make_service() + try: + await svc.start() + assert svc.is_running is True + finally: + await svc.stop() + + @pytest.mark.asyncio + async def test_stop_cancels_task(self): + svc, _, _ = _make_service() + await svc.start() + assert svc.is_running is True + + await svc.stop() + assert svc.is_running is False + + @pytest.mark.asyncio + async def test_stop_idempotent_when_not_started(self): + """未 start 的情況下呼叫 stop → 不 crash""" + svc, _, _ = _make_service() + await svc.stop() # 不應 raise + + @pytest.mark.asyncio + async def test_start_idempotent_second_call(self): + """重複呼叫 start() → 不重複建立 task""" + svc, _, _ = _make_service() + try: + await svc.start() + task_1 = svc._task + + await svc.start() # 第二次呼叫 + task_2 = svc._task + + assert task_1 is task_2 # 同一個 task + finally: + await svc.stop() + + @pytest.mark.asyncio + async def test_monitor_loop_continues_after_exception(self): + """ + _monitor_loop 內部拋異常 → 繼續監控(不 break) + + 模擬:第一次 check 拋異常,第二次正常。 + """ + svc, mock_monitor, _ = _make_service( + current_primary="gemini", + check_interval=0, # 0s interval,測試快速 + stable_count=3, + ) + call_count = [0] + + async def _check_side_effect(host): + call_count[0] += 1 + if call_count[0] == 1: + raise RuntimeError("transient error") + return _make_health(HealthStatus.HEALTHY) + + mock_monitor.check.side_effect = _check_side_effect + + # 讓 loop 跑幾次 + await svc.start() + await asyncio.sleep(0.05) + await svc.stop() + + # 至少被呼叫 2 次(第一次異常,第二次正常) + assert call_count[0] >= 2 + + +# ============================================================================= +# set_current_primary() / 屬性 +# ============================================================================= + + +class TestStateManagement: + """set_current_primary + 屬性""" + + @pytest.mark.asyncio + async def test_set_current_primary_updates_state(self): + # H5+H6 修復:set_current_primary 改為 async(需 await) + svc, _, _ = _make_service(current_primary="ollama") + with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("no redis")): + await svc.set_current_primary("gemini") + assert svc.current_primary == "gemini" + + @pytest.mark.asyncio + async def test_set_current_primary_non_ollama_resets_counter(self): + """切換到非 Ollama 時,counter 歸零,開始等待恢復""" + svc, _, _ = _make_service(current_primary="ollama") + svc._consecutive_healthy = 5 + + with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("no redis")): + await svc.set_current_primary("gemini") + + assert svc.consecutive_healthy == 0 + + @pytest.mark.asyncio + async def test_set_current_primary_to_ollama_no_counter_reset(self): + """切換到 ollama(正常路由)→ counter 不重置(直接標記)""" + svc, _, _ = _make_service(current_primary="gemini") + svc._consecutive_healthy = 3 + + with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("no redis")): + await svc.set_current_primary("ollama") + + # set_current_primary 只在切到非 ollama 時 reset counter + assert svc.current_primary == "ollama" + + def test_is_running_false_before_start(self): + svc, _, _ = _make_service() + assert svc.is_running is False + + +# ============================================================================= +# H5+H6: Redis 持久化 + Bootstrap + 立刻 check +# 2026-04-25 critic-fix Part2 by Claude Engineer-C2 +# ============================================================================= + + +class TestRedisPersistence: + """H5+H6 修復驗證:set_current_primary 持久化 + start() bootstrap""" + + @pytest.mark.asyncio + async def test_set_current_primary_persists_to_redis(self): + """set_current_primary("gemini") → 寫 Redis ollama:current_primary""" + svc, _, _ = _make_service(current_primary="ollama") + mock_redis = AsyncMock() + mock_redis.set = AsyncMock() + + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + await svc.set_current_primary("gemini") + + mock_redis.set.assert_awaited_once_with("ollama:current_primary", "gemini") + + @pytest.mark.asyncio + async def test_set_current_primary_same_value_no_redis_write(self): + """set_current_primary("gemini") 但 current_primary 已是 gemini → 不重複寫 Redis""" + svc, _, _ = _make_service(current_primary="gemini") + mock_redis = AsyncMock() + mock_redis.set = AsyncMock() + + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + await svc.set_current_primary("gemini") + + # 相同值不觸發 persist + mock_redis.set.assert_not_awaited() + + @pytest.mark.asyncio + async def test_load_primary_from_redis(self): + """_load_primary() 從 Redis 讀取,返回正確值""" + svc, _, _ = _make_service() + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=b"gemini") + + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + result = await svc._load_primary() + + assert result == "gemini" + + @pytest.mark.asyncio + async def test_load_primary_redis_empty_returns_ollama(self): + """Redis 無值 → 預設返回 'ollama'""" + svc, _, _ = _make_service() + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=None) + + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + result = await svc._load_primary() + + assert result == "ollama" + + @pytest.mark.asyncio + async def test_load_primary_redis_error_returns_ollama(self): + """Redis 掛掉 → 預設返回 "ollama"(fail-safe)""" + svc, _, _ = _make_service() + + with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("Redis down")): + result = await svc._load_primary() + + assert result == "ollama" + + @pytest.mark.asyncio + async def test_start_loads_primary_from_redis(self): + """start() 從 Redis bootstrap current_primary""" + svc, mock_monitor, _ = _make_service(current_primary="ollama") + mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.HEALTHY)) + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=b"gemini") + + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + try: + await svc.start() + # bootstrap 後 current_primary 應為 Redis 讀到的值 + assert svc.current_primary == "gemini" + finally: + await svc.stop() + + @pytest.mark.asyncio + async def test_start_immediate_check_when_primary_not_ollama(self): + """start() 時 primary=gemini → 立刻執行一次 _check_and_recover(不等 30s)""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.HEALTHY)) + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=b"gemini") + + check_called = [False] + original_check = svc._check_and_recover + + async def _spy_check(): + check_called[0] = True + await original_check() + + with patch.object(svc, "_check_and_recover", side_effect=_spy_check), \ + patch("src.core.redis_client.get_redis", return_value=mock_redis): + try: + await svc.start() + assert check_called[0] is True + finally: + await svc.stop() + + @pytest.mark.asyncio + async def test_start_no_immediate_check_when_primary_is_ollama(self): + """start() 時 primary=ollama(正常狀態)→ 不立刻執行 check""" + svc, mock_monitor, _ = _make_service(current_primary="gemini") + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=b"ollama") # Redis 說 primary=ollama + + check_called = [False] + + async def _spy_check(): + check_called[0] = True + + with patch.object(svc, "_check_and_recover", side_effect=_spy_check), \ + patch("src.core.redis_client.get_redis", return_value=mock_redis): + try: + await svc.start() + # primary=ollama → 不立刻 check + assert check_called[0] is False + finally: + await svc.stop() + + +# ============================================================================= +# Singleton +# ============================================================================= + + +def test_singleton_returns_same_instance(): + s1 = get_ollama_auto_recovery_service() + s2 = get_ollama_auto_recovery_service() + assert s1 is s2 + + +def test_reset_singleton_gives_new_instance(): + s1 = get_ollama_auto_recovery_service() + reset_ollama_auto_recovery_service() + s2 = get_ollama_auto_recovery_service() + assert s1 is not s2 diff --git a/apps/api/tests/test_ollama_failover_manager.py b/apps/api/tests/test_ollama_failover_manager.py new file mode 100644 index 00000000..7e9d545f --- /dev/null +++ b/apps/api/tests/test_ollama_failover_manager.py @@ -0,0 +1,707 @@ +# apps/api/tests/test_ollama_failover_manager.py | 2026-04-25 @ Asia/Taipei +# Created 2026-04-25 P1.1c by Claude Engineer-C +# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復(路由矩陣更新) +""" +OllamaFailoverManager 單元測試 - P1.1c v2.0 +============================================= +測試覆蓋(新路由矩陣:Gemini 優先,188 最後備援): +- 111 HEALTHY → primary=ollama(111),fallback=[Gemini, 188, Nemotron] +- 111 SLOW → primary=Gemini,fallback 包含 111 + 188 +- 111 DEGRADED → primary=Gemini,fallback 包含 188 + nemotron + claude +- 111 OFFLINE → primary=Gemini,fallback 包含 188 + nemotron + claude +- 111 + 188 都 OFFLINE → primary=Gemini,fallback 包含 nemotron + claude +- OLLAMA_FALLBACK_URL 未設定時的單節點行為 +- 並行 gather 邏輯(asyncio.gather mock) +- clear_cache() / notify_recovery() 方法 + +測試分類:unit(mock OllamaHealthMonitor,無 DB 依賴) +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.services.ollama_health_monitor import HealthReport, HealthStatus +from src.services.ollama_failover_manager import ( + OllamaFailoverManager, + OllamaRoutingResult, + get_ollama_failover_manager, + reset_ollama_failover_manager, +) + + +# ============================================================================= +# Fixtures +# ============================================================================= + +URL_111 = "http://192.168.0.111:11434" +URL_188 = "http://192.168.0.188:11434" + + +@pytest.fixture(autouse=True) +def reset_singleton(): + yield + reset_ollama_failover_manager() + + +def _make_health(status: HealthStatus, url: str = URL_111) -> HealthReport: + return HealthReport(status=status, host=url, latency_ms=500.0) + + +def _make_manager(url_111: str = URL_111, url_188: str = URL_188) -> OllamaFailoverManager: + """建立 manager,settings mock 為指定 URL""" + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = url_111 + mock_settings.OLLAMA_FALLBACK_URL = url_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + + mock_monitor = MagicMock() + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + return manager + + +# ============================================================================= +# _decide_route 決策矩陣 +# ============================================================================= + + +class TestDecideRoute: + """_decide_route 路由邏輯純函數測試""" + + def _setup(self, url_188: str = URL_188) -> OllamaFailoverManager: + return _make_manager(url_188=url_188) + + # ------------------------------------------------------------------ + # 111 HEALTHY + # ------------------------------------------------------------------ + + def test_111_healthy_primary_is_ollama(self): + manager = self._setup() + h111 = _make_health(HealthStatus.HEALTHY, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert result.primary.provider_name == "ollama" + assert result.primary.url == URL_111 + + def test_111_healthy_fallback_includes_188(self): + manager = self._setup() + h111 = _make_health(HealthStatus.HEALTHY, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + provider_names = [e.provider_name for e in result.fallback_chain] + assert "ollama_188" in provider_names + + def test_111_healthy_fallback_includes_nemotron_gemini(self): + manager = self._setup() + h111 = _make_health(HealthStatus.HEALTHY, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + provider_names = [e.provider_name for e in result.fallback_chain] + assert "nemotron" in provider_names + assert "gemini" in provider_names + + def test_111_healthy_fallback_order_gemini_first(self): + """新矩陣:Gemini 應排在 188/nemotron 之前(快速雲端優先)""" + manager = self._setup() + h111 = _make_health(HealthStatus.HEALTHY, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert result.fallback_chain[0].provider_name == "gemini" + + # ------------------------------------------------------------------ + # 111 SLOW + # ------------------------------------------------------------------ + + def test_111_slow_primary_is_gemini(self): + """新矩陣:111 SLOW → primary=Gemini(111 eval ~0.09 token/s, ~111s,Gemini 更快)""" + manager = self._setup() + h111 = _make_health(HealthStatus.SLOW, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert result.primary.provider_name == "gemini" + + def test_111_slow_fallback_includes_111_and_188(self): + """SLOW 時 111 + 188 仍在 fallback(Gemini 額度耗盡時的降級鏈)""" + manager = self._setup() + h111 = _make_health(HealthStatus.SLOW, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + provider_names = [e.provider_name for e in result.fallback_chain] + assert "ollama" in provider_names + assert "ollama_188" in provider_names + + def test_111_slow_no_188_primary_is_gemini(self): + """111 SLOW + 188 未設定 → primary=Gemini(新矩陣,不強撐 111)""" + manager = _make_manager(url_188="") # 188 未設定 + h111 = _make_health(HealthStatus.SLOW, URL_111) + + result = manager._decide_route(h111, None, URL_111, "") + + assert result.primary.provider_name == "gemini" + + # ------------------------------------------------------------------ + # 111 DEGRADED + # ------------------------------------------------------------------ + + def test_111_degraded_primary_is_gemini(self): + """新矩陣:111 DEGRADED → primary=Gemini""" + manager = self._setup() + h111 = _make_health(HealthStatus.DEGRADED, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert result.primary.provider_name == "gemini" + + def test_111_degraded_fallback_no_111(self): + """DEGRADED 時 111 不在 fallback(太差了)""" + manager = self._setup() + h111 = _make_health(HealthStatus.DEGRADED, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + provider_names = [e.provider_name for e in result.fallback_chain] + assert "ollama" not in provider_names + + def test_111_degraded_fallback_includes_188_nemotron_claude(self): + """新矩陣:DEGRADED fallback = [188, nemotron, claude]""" + manager = self._setup() + h111 = _make_health(HealthStatus.DEGRADED, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + provider_names = [e.provider_name for e in result.fallback_chain] + assert "ollama_188" in provider_names + assert "nemotron" in provider_names + assert "claude" in provider_names + + # ------------------------------------------------------------------ + # 111 OFFLINE + # ------------------------------------------------------------------ + + def test_111_offline_primary_is_gemini(self): + """新矩陣:111 OFFLINE → primary=Gemini(188 降為 fallback 備援)""" + manager = self._setup() + h111 = _make_health(HealthStatus.OFFLINE, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert result.primary.provider_name == "gemini" + + # ------------------------------------------------------------------ + # 雙節點都 OFFLINE + # ------------------------------------------------------------------ + + def test_both_offline_primary_is_gemini(self): + """新矩陣:111 + 188 都 OFFLINE → Gemini 接手(最快雲端)""" + manager = self._setup() + h111 = _make_health(HealthStatus.OFFLINE, URL_111) + h188 = _make_health(HealthStatus.OFFLINE, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert result.primary.provider_name == "gemini" + + def test_both_offline_fallback_includes_nemotron_claude(self): + """雙 OFFLINE 時,fallback=[Nemotron, Claude](無可用 Ollama)""" + manager = self._setup() + h111 = _make_health(HealthStatus.OFFLINE, URL_111) + h188 = _make_health(HealthStatus.OFFLINE, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + provider_names = [e.provider_name for e in result.fallback_chain] + assert "nemotron" in provider_names + assert "claude" in provider_names + + def test_111_offline_no_188_primary_is_gemini(self): + """新矩陣:111 OFFLINE + 188 未設定 → Gemini(不是 Nemotron)""" + manager = _make_manager(url_188="") + h111 = _make_health(HealthStatus.OFFLINE, URL_111) + + result = manager._decide_route(h111, None, URL_111, "") + + assert result.primary.provider_name == "gemini" + + # ------------------------------------------------------------------ + # routing_reason 記錄 + # ------------------------------------------------------------------ + + def test_routing_reason_contains_status(self): + """routing_reason 應包含 111 的狀態資訊""" + manager = self._setup() + h111 = _make_health(HealthStatus.OFFLINE, URL_111) + h188 = _make_health(HealthStatus.HEALTHY, URL_188) + + result = manager._decide_route(h111, h188, URL_111, URL_188) + + assert "offline" in result.routing_reason.lower() or "111" in result.routing_reason + + +# ============================================================================= +# select_provider():並行 gather +# ============================================================================= + + +class TestSelectProvider: + """select_provider() 並行邏輯""" + + @pytest.mark.asyncio + async def test_select_provider_calls_gather(self): + """有 url_188 時應並行 gather 兩個 check""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock( + side_effect=[ + _make_health(HealthStatus.HEALTHY, URL_111), + _make_health(HealthStatus.HEALTHY, URL_188), + ] + ) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = URL_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None): + result = await manager.select_provider() + + # 兩個 host 都被 check + assert mock_monitor.check.call_count == 2 + called_hosts = {call.args[0] for call in mock_monitor.check.call_args_list} + assert URL_111 in called_hosts + assert URL_188 in called_hosts + + @pytest.mark.asyncio + async def test_select_provider_single_node_no_188(self): + """OLLAMA_FALLBACK_URL 空字串 → 只 check 111""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.HEALTHY, URL_111)) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = "" + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None): + result = await manager.select_provider() + + assert mock_monitor.check.call_count == 1 + assert result.primary.provider_name == "ollama" + + @pytest.mark.asyncio + async def test_select_provider_returns_routing_result(self): + """select_provider 返回 OllamaRoutingResult 類型(新矩陣:111 OFFLINE → Gemini)""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock( + side_effect=[ + _make_health(HealthStatus.OFFLINE, URL_111), + _make_health(HealthStatus.HEALTHY, URL_188), + ] + ) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = URL_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None): + result = await manager.select_provider() + + assert isinstance(result, OllamaRoutingResult) + # 新矩陣:111 OFFLINE → primary=Gemini(188 降為 fallback) + assert result.primary.provider_name == "gemini" + + @pytest.mark.asyncio + async def test_audit_not_written_when_111_healthy(self): + """111 正常時不觸發 failover audit""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock( + side_effect=[ + _make_health(HealthStatus.HEALTHY, URL_111), + _make_health(HealthStatus.HEALTHY, URL_188), + ] + ) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = URL_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + audit_called = [False] + original_write = manager._write_failover_audit + + async def _spy_audit(result): + # _write_failover_audit 在 111 HEALTHY 時 early return,不寫 DB + # 追蹤呼叫是否有 side effect(DB 寫入) + audit_called[0] = result.primary.provider_name != "ollama" + + with patch.object(manager, "_write_failover_audit", side_effect=_spy_audit): + await manager.select_provider() + + # 111 HEALTHY,不應有 failover 事件 + assert audit_called[0] is False + + +# ============================================================================= +# clear_cache() / notify_recovery() +# ============================================================================= + + +class TestRecoveryAPI: + """clear_cache() / notify_recovery() 方法""" + + @pytest.mark.asyncio + async def test_clear_cache_calls_redis_delete(self): + """clear_cache() 呼叫 redis.delete 清除 health monitor 快取""" + manager = _make_manager() + mock_redis = AsyncMock() + mock_redis.delete = AsyncMock() + + with patch("src.services.ollama_failover_manager.OllamaFailoverManager.clear_cache") as mock_clear: + mock_clear.return_value = None + await manager.clear_cache() + mock_clear.assert_called_once() + + @pytest.mark.asyncio + async def test_clear_cache_fails_gracefully(self): + """Redis import 失敗時,clear_cache() 內部 try/except 攔截,靜默不 crash""" + manager = _make_manager() + + # 模擬 get_redis 拋 ImportError(Redis 不可用) + # clear_cache 有 try/except Exception,應靜默吸收 + with patch( + "src.services.ollama_failover_manager.get_redis", + side_effect=ImportError("no redis"), + create=True, + ): + # 不應 raise + await manager.clear_cache() + + def test_notify_recovery_does_not_raise(self): + """notify_recovery() 只寫 structlog,不應 raise""" + manager = _make_manager() + # 不應 raise + manager.notify_recovery("ollama_111") + + +# ============================================================================= +# OllamaRoutingResult +# ============================================================================= + + +class TestOllamaRoutingResult: + """OllamaRoutingResult 輔助方法""" + + def test_all_endpoints_in_order(self): + from src.services.ollama_failover_manager import OllamaEndpoint + primary = OllamaEndpoint(url=URL_111, provider_name="ollama", model="m1") + fb1 = OllamaEndpoint(url=URL_188, provider_name="ollama_188", model="m2") + fb2 = OllamaEndpoint(url="", provider_name="nemotron", model="m3") + + result = OllamaRoutingResult( + primary=primary, + fallback_chain=[fb1, fb2], + routing_reason="test", + health_111=_make_health(HealthStatus.HEALTHY), + ) + + ordered = result.all_endpoints_in_order() + assert ordered[0].provider_name == "ollama" + assert ordered[1].provider_name == "ollama_188" + assert ordered[2].provider_name == "nemotron" + + def test_to_dict_structure(self): + from src.services.ollama_failover_manager import OllamaEndpoint + primary = OllamaEndpoint(url=URL_111, provider_name="ollama", model="qwen") + result = OllamaRoutingResult( + primary=primary, + fallback_chain=[], + routing_reason="111 HEALTHY", + health_111=_make_health(HealthStatus.HEALTHY), + ) + d = result.to_dict() + assert d["primary"]["provider"] == "ollama" + assert d["routing_reason"] == "111 HEALTHY" + assert isinstance(d["fallback_chain"], list) + + +# ============================================================================= +# Singleton +# ============================================================================= + + +def test_singleton_returns_same_instance(): + m1 = get_ollama_failover_manager() + m2 = get_ollama_failover_manager() + assert m1 is m2 + + +def test_reset_singleton_gives_new_instance(): + m1 = get_ollama_failover_manager() + reset_ollama_failover_manager() + m2 = get_ollama_failover_manager() + assert m1 is not m2 + + +# ============================================================================= +# B1: _write_failover_audit 改用 structlog(不再寫 DB) +# 2026-04-25 critic-fix Part2 by Claude Engineer-C2 +# ============================================================================= + + +class TestWriteFailoverAudit: + """B1 修復驗證:_write_failover_audit 使用 structlog,不依賴 AuditLog model""" + + @pytest.mark.asyncio + async def test_audit_uses_structlog_not_db(self): + """_write_failover_audit 應呼叫 structlog,不呼叫 DB""" + import structlog + manager = _make_manager() + from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult + + result = OllamaRoutingResult( + primary=OllamaEndpoint(url="", provider_name="gemini", model="gemini-1.5-flash"), + fallback_chain=[], + routing_reason="111 OFFLINE → 切 Gemini", + health_111=_make_health(HealthStatus.OFFLINE), + ) + + # 只要不 raise 就是成功(DB path 已移除,structlog path 無 DB 依賴) + await manager._write_failover_audit(result) + + @pytest.mark.asyncio + async def test_audit_skipped_when_111_healthy(self): + """111 HEALTHY 時 early return,不記錄 failover""" + manager = _make_manager() + from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult + + result = OllamaRoutingResult( + primary=OllamaEndpoint(url=URL_111, provider_name="ollama", model="qwen"), + fallback_chain=[], + routing_reason="111 HEALTHY → 主 111", + health_111=_make_health(HealthStatus.HEALTHY), + ) + + # primary=ollama → early return,不執行任何 DB/log + await manager._write_failover_audit(result) # 不應 raise + + +# ============================================================================= +# B2: AIProviderEnum.OLLAMA_188 存在 +# 2026-04-25 critic-fix Part2 by Claude Engineer-C2 +# ============================================================================= + + +class TestAIProviderEnumOllama188: + """B2 修復驗證:AIProviderEnum.OLLAMA_188 存在且 PROVIDER_LATENCY_BUDGET 有對應值""" + + def test_ollama_188_enum_exists(self): + from src.services.ai_router import AIProviderEnum + assert AIProviderEnum.OLLAMA_188.value == "ollama_188" + + def test_ollama_188_in_latency_budget(self): + from src.services.ai_router import AIProviderEnum, PROVIDER_LATENCY_BUDGET + assert AIProviderEnum.OLLAMA_188 in PROVIDER_LATENCY_BUDGET + assert PROVIDER_LATENCY_BUDGET[AIProviderEnum.OLLAMA_188] == 120000 + + +# ============================================================================= +# H4: asyncio.gather return_exceptions=True +# 2026-04-25 critic-fix Part2 by Claude Engineer-C2 +# ============================================================================= + + +class TestGatherReturnExceptions: + """H4 修復驗證:任一 check 拋例外時不炸整個 select_provider""" + + @pytest.mark.asyncio + async def test_gather_exception_in_111_treated_as_offline(self): + """111 check 拋例外 → health_111=OFFLINE,select_provider 正常返回""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock( + side_effect=[ + RuntimeError("111 network error"), + _make_health(HealthStatus.HEALTHY, URL_188), + ] + ) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = URL_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + mock_settings.GEMINI_DAILY_QUOTA = 1000 + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None), \ + patch.object(manager, "_check_gemini_quota", return_value=True): + result = await manager.select_provider() + + # 111 exception → OFFLINE → primary=gemini(new matrix) + assert result.primary.provider_name == "gemini" + + @pytest.mark.asyncio + async def test_gather_exception_in_188_treated_as_offline(self): + """188 check 拋例外 → health_188=OFFLINE,select_provider 正常返回""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock( + side_effect=[ + _make_health(HealthStatus.HEALTHY, URL_111), + RuntimeError("188 network error"), + ] + ) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = URL_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + mock_settings.GEMINI_DAILY_QUOTA = 1000 + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None), \ + patch.object(manager, "_check_gemini_quota", return_value=True): + result = await manager.select_provider() + + # 111 HEALTHY → primary=ollama(188 exception 不影響主路由) + assert result.primary.provider_name == "ollama" + + +# ============================================================================= +# H7: Gemini 帳單熔斷 +# 2026-04-25 critic-fix Part2 by Claude Engineer-C2 +# ============================================================================= + + +class TestGeminiQuota: + """H7 修復驗證:Gemini 每日配額熔斷""" + + @pytest.mark.asyncio + async def test_gemini_quota_under_limit(self): + """count=500 < quota=1000 → 返回 True(允許走 Gemini)""" + manager = _make_manager() + manager._settings.GEMINI_DAILY_QUOTA = 1000 + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=b"500") + mock_redis.incr = AsyncMock(return_value=501) + mock_redis.expire = AsyncMock() + + # lazy import patch:_check_gemini_quota 內用 `from src.core.redis_client import get_redis` + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + ok = await manager._check_gemini_quota() + + assert ok is True + mock_redis.incr.assert_awaited_once() + mock_redis.expire.assert_awaited_once() + + @pytest.mark.asyncio + async def test_gemini_quota_exactly_at_limit(self): + """count=1000 >= quota=1000 → 返回 False(熔斷,不再呼叫 Gemini)""" + manager = _make_manager() + manager._settings.GEMINI_DAILY_QUOTA = 1000 + + mock_redis = AsyncMock() + mock_redis.get = AsyncMock(return_value=b"1000") + mock_redis.incr = AsyncMock() + mock_redis.expire = AsyncMock() + + with patch("src.core.redis_client.get_redis", return_value=mock_redis): + ok = await manager._check_gemini_quota() + + assert ok is False + # 超過配額不應再 incr + mock_redis.incr.assert_not_awaited() + + @pytest.mark.asyncio + async def test_gemini_quota_redis_unavailable_fail_open(self): + """Redis 掛掉 → 返回 True(fail-open,仍允許走 Gemini)""" + manager = _make_manager() + + with patch( + "src.core.redis_client.get_redis", + side_effect=RuntimeError("Redis unavailable"), + ): + ok = await manager._check_gemini_quota() + + assert ok is True + + @pytest.mark.asyncio + async def test_select_provider_quota_exceeded_uses_188(self): + """select_provider:Gemini quota 超過 → primary 改為 OLLAMA_188""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock( + side_effect=[ + _make_health(HealthStatus.OFFLINE, URL_111), + _make_health(HealthStatus.HEALTHY, URL_188), + ] + ) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = URL_188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + mock_settings.GEMINI_DAILY_QUOTA = 1000 + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None), \ + patch.object(manager, "_check_gemini_quota", return_value=False): + result = await manager.select_provider() + + # quota 超過 → 不走 Gemini,改走 188 + assert result.primary.provider_name == "ollama_188" + + @pytest.mark.asyncio + async def test_select_provider_quota_exceeded_no_188_uses_nemotron(self): + """select_provider:Gemini quota 超過 + 188 不可用 → primary=Nemotron""" + mock_monitor = AsyncMock() + mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.OFFLINE, URL_111)) + + mock_settings = MagicMock() + mock_settings.OLLAMA_URL = URL_111 + mock_settings.OLLAMA_FALLBACK_URL = "" # 無 188 + mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct" + mock_settings.GEMINI_DAILY_QUOTA = 1000 + + manager = OllamaFailoverManager(health_monitor=mock_monitor) + manager._settings = mock_settings + + with patch.object(manager, "_write_failover_audit", return_value=None), \ + patch.object(manager, "_check_gemini_quota", return_value=False): + result = await manager.select_provider() + + assert result.primary.provider_name == "nemotron" diff --git a/apps/api/tests/test_ollama_health_monitor.py b/apps/api/tests/test_ollama_health_monitor.py new file mode 100644 index 00000000..f8fff3a6 --- /dev/null +++ b/apps/api/tests/test_ollama_health_monitor.py @@ -0,0 +1,402 @@ +# apps/api/tests/test_ollama_health_monitor.py | 2026-04-25 @ Asia/Taipei +# Created 2026-04-25 P1.1c by Claude Engineer-C +""" +OllamaHealthMonitor 單元測試 - P1.1c +===================================== +測試覆蓋: +- 4 種健康狀態(HEALTHY / SLOW / DEGRADED / OFFLINE) +- 連通性失敗 → OFFLINE +- 推理超時(asyncio.TimeoutError / httpx.TimeoutException)→ DEGRADED +- 推理回傳非 200 → DEGRADED +- Redis 快取命中(from_cache=True) +- Redis 快取失敗時降級直接執行(不 crash) +- is_usable() 邏輯 + +測試分類:unit(mock httpx,無 DB / Redis 依賴) +""" + +from __future__ import annotations + +import asyncio +import json +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest + +from src.services.ollama_health_monitor import ( + LATENCY_HEALTHY_THRESHOLD_MS, + LATENCY_SLOW_THRESHOLD_MS, + HealthReport, + HealthStatus, + OllamaHealthMonitor, + get_ollama_health_monitor, + reset_ollama_health_monitor, +) + + +# ============================================================================= +# Fixtures +# ============================================================================= + +HOST = "http://192.168.0.111:11434" +HOST_188 = "http://192.168.0.188:11434" + + +@pytest.fixture(autouse=True) +def reset_singleton(): + """每個測試後重置 singleton""" + yield + reset_ollama_health_monitor() + + +@pytest.fixture +def monitor(): + return OllamaHealthMonitor() + + +def _mock_tags_ok() -> MagicMock: + """/api/tags 回傳 200""" + resp = MagicMock() + resp.status_code = 200 + return resp + + +def _mock_generate_ok(latency_s: float = 0.5) -> tuple[MagicMock, float]: + """ + /api/generate 回傳 200,模擬給定延遲。 + 返回 (response_mock, latency_s),由 test 自行控制 time.perf_counter patch。 + """ + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = {"response": "ok"} + return resp + + +# ============================================================================= +# 層 1:連通性 +# ============================================================================= + + +class TestConnectivity: + """_check_connectivity 各種情況""" + + @pytest.mark.asyncio + async def test_connectivity_success(self, monitor): + """/api/tags 200 → 連通性通過""" + mock_resp = _mock_tags_ok() + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_resp) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await monitor._check_connectivity(HOST) + assert result is True + + @pytest.mark.asyncio + async def test_connectivity_non_200(self, monitor): + """/api/tags 非 200 → 連通性失敗""" + mock_resp = MagicMock() + mock_resp.status_code = 503 + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_resp) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await monitor._check_connectivity(HOST) + assert result is False + + @pytest.mark.asyncio + async def test_connectivity_timeout(self, monitor): + """連線 timeout → 返回 False(不 raise)""" + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=httpx.TimeoutException("timeout")) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await monitor._check_connectivity(HOST) + assert result is False + + @pytest.mark.asyncio + async def test_connectivity_connect_error(self, monitor): + """連線拒絕 → 返回 False(不 raise)""" + mock_client = AsyncMock() + mock_client.get = AsyncMock(side_effect=httpx.ConnectError("refused")) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await monitor._check_connectivity(HOST) + assert result is False + + +# ============================================================================= +# 層 2:推理測試分級 +# ============================================================================= + + +class TestInference: + """_check_inference 延遲分級""" + + def _make_mock_client(self, status_code: int = 200) -> AsyncMock: + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = {"response": "ok"} + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=resp) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + return mock_client + + @pytest.mark.asyncio + async def test_inference_healthy(self, monitor): + """推理延遲 <10s → HEALTHY""" + mock_client = self._make_mock_client() + + # 模擬 0.5s 延遲(< 10s threshold) + call_count = [0] + original_perf_counter = time.perf_counter + + def _fake_perf_counter(): + call_count[0] += 1 + # 第一次呼叫(start),第二次呼叫(end = start + 0.5s) + if call_count[0] == 1: + return 0.0 + return 0.5 + + with patch("httpx.AsyncClient", return_value=mock_client): + with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.HEALTHY + assert report.latency_ms == pytest.approx(500.0, abs=10) + + @pytest.mark.asyncio + async def test_inference_slow(self, monitor): + """推理延遲 10-30s → SLOW""" + mock_client = self._make_mock_client() + + call_count = [0] + + def _fake_perf_counter(): + call_count[0] += 1 + if call_count[0] == 1: + return 0.0 + return 15.0 # 15s → SLOW zone + + with patch("httpx.AsyncClient", return_value=mock_client): + with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.SLOW + assert report.latency_ms == pytest.approx(15_000.0, abs=10) + + @pytest.mark.asyncio + async def test_inference_degraded_by_latency(self, monitor): + """推理延遲 >30s → DEGRADED""" + mock_client = self._make_mock_client() + + call_count = [0] + + def _fake_perf_counter(): + call_count[0] += 1 + if call_count[0] == 1: + return 0.0 + return 32.0 # 32s → DEGRADED + + with patch("httpx.AsyncClient", return_value=mock_client): + with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.DEGRADED + + @pytest.mark.asyncio + async def test_inference_timeout_degraded(self, monitor): + """推理 TimeoutException → DEGRADED(不 crash,不 OFFLINE)""" + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=httpx.TimeoutException("timeout")) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.DEGRADED + assert "超時" in report.reason or "timeout" in report.reason.lower() + + @pytest.mark.asyncio + async def test_inference_asyncio_timeout_degraded(self, monitor): + """推理 asyncio.TimeoutError → DEGRADED""" + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=asyncio.TimeoutError()) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.DEGRADED + + @pytest.mark.asyncio + async def test_inference_connect_error_offline(self, monitor): + """推理 ConnectError → OFFLINE(連通性層已放行,但推理層掉線)""" + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=httpx.ConnectError("refused")) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_client): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.OFFLINE + + @pytest.mark.asyncio + async def test_inference_non_200_degraded(self, monitor): + """推理回傳非 200 → DEGRADED""" + mock_client = self._make_mock_client(status_code=503) + + call_count = [0] + + def _fake_perf_counter(): + call_count[0] += 1 + return float(call_count[0] - 1) * 0.1 + + with patch("httpx.AsyncClient", return_value=mock_client): + with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter): + report = await monitor._check_inference(HOST) + + assert report.status == HealthStatus.DEGRADED + assert "503" in report.reason + + +# ============================================================================= +# check() 整合(含 Redis 快取) +# ============================================================================= + + +class TestCheckWithCache: + """check() 方法含 Redis 快取邏輯""" + + @pytest.mark.asyncio + async def test_check_offline_when_connectivity_fails(self, monitor): + """連通性失敗 → 最終 OFFLINE""" + with patch.object(monitor, "_check_connectivity", return_value=False): + with patch.object(monitor, "_get_cached", return_value=None): + with patch.object(monitor, "_set_cached", return_value=None): + with patch.object(monitor, "_write_audit_log", return_value=None): + report = await monitor.check(HOST) + + assert report.status == HealthStatus.OFFLINE + assert report.host == HOST + + @pytest.mark.asyncio + async def test_check_healthy_when_inference_fast(self, monitor): + """連通性通過 + 推理快 → HEALTHY""" + healthy_report = HealthReport( + status=HealthStatus.HEALTHY, + latency_ms=500.0, + ) + with patch.object(monitor, "_check_connectivity", return_value=True): + with patch.object(monitor, "_check_inference", return_value=healthy_report): + with patch.object(monitor, "_get_cached", return_value=None): + with patch.object(monitor, "_set_cached", return_value=None): + with patch.object(monitor, "_write_audit_log", return_value=None): + report = await monitor.check(HOST) + + assert report.status == HealthStatus.HEALTHY + assert report.host == HOST + + @pytest.mark.asyncio + async def test_check_returns_cached(self, monitor): + """快取命中 → 直接返回快取結果,from_cache=True""" + cached = HealthReport( + status=HealthStatus.HEALTHY, + host=HOST, + latency_ms=300.0, + ) + with patch.object(monitor, "_get_cached", return_value=cached): + report = await monitor.check(HOST) + + assert report.from_cache is True + assert report.status == HealthStatus.HEALTHY + + @pytest.mark.asyncio + async def test_check_proceeds_when_cache_get_fails(self, monitor): + """Redis get 失敗 → 降級直接執行 check(不 crash)""" + with patch.object(monitor, "_get_cached", return_value=None): + with patch.object(monitor, "_check_connectivity", return_value=False): + with patch.object(monitor, "_set_cached", return_value=None): + with patch.object(monitor, "_write_audit_log", return_value=None): + report = await monitor.check(HOST) + + assert report.status == HealthStatus.OFFLINE # 正常降級,未 crash + + @pytest.mark.asyncio + async def test_cache_set_failure_does_not_crash(self, monitor): + """Redis set 失敗 → 靜默,結果仍正常返回""" + healthy = HealthReport(status=HealthStatus.HEALTHY, latency_ms=200.0) + with patch.object(monitor, "_get_cached", return_value=None): + with patch.object(monitor, "_check_connectivity", return_value=True): + with patch.object(monitor, "_check_inference", return_value=healthy): + with patch.object(monitor, "_set_cached", side_effect=RuntimeError("Redis down")): + with patch.object(monitor, "_write_audit_log", return_value=None): + # 不應 raise + report = await monitor.check(HOST) + + assert report.status == HealthStatus.HEALTHY + + +# ============================================================================= +# HealthReport 輔助方法 +# ============================================================================= + + +class TestHealthReport: + """HealthReport dataclass 邏輯""" + + def test_is_usable_healthy(self): + assert HealthReport(status=HealthStatus.HEALTHY).is_usable() is True + + def test_is_usable_slow(self): + assert HealthReport(status=HealthStatus.SLOW).is_usable() is True + + def test_is_usable_degraded(self): + assert HealthReport(status=HealthStatus.DEGRADED).is_usable() is True + + def test_is_usable_offline(self): + assert HealthReport(status=HealthStatus.OFFLINE).is_usable() is False + + def test_to_dict_structure(self): + report = HealthReport( + status=HealthStatus.SLOW, + host=HOST, + latency_ms=15500.0, + reason="slow zone", + ) + d = report.to_dict() + assert d["status"] == "slow" + assert d["host"] == HOST + assert d["latency_ms"] == 15500.0 + assert d["reason"] == "slow zone" + + +# ============================================================================= +# Singleton +# ============================================================================= + + +def test_singleton_returns_same_instance(): + m1 = get_ollama_health_monitor() + m2 = get_ollama_health_monitor() + assert m1 is m2 + + +def test_reset_singleton_gives_new_instance(): + m1 = get_ollama_health_monitor() + reset_ollama_health_monitor() + m2 = get_ollama_health_monitor() + assert m1 is not m2 diff --git a/k8s/awoooi-prod/04-configmap.yaml.patch-188-fallback b/k8s/awoooi-prod/04-configmap.yaml.patch-188-fallback new file mode 100644 index 00000000..e252be70 --- /dev/null +++ b/k8s/awoooi-prod/04-configmap.yaml.patch-188-fallback @@ -0,0 +1,34 @@ +# ============================================================================ +# PATCH: 188 CPU-only Ollama 備援端點 +# 日期: 2026-04-25 (台北時區) +# 負責人: ogt + Claude Sonnet 4.6 +# ADR 參考: plan_complete_v3.md P0.5 +# 診斷實測數據: +# 主機: 192.168.0.188, Intel Xeon Silver 4214 @ 2.2GHz, 12 核, CPU-only +# RAM: 62GB (used 14GB), Disk: 982GB (used 221GB) +# GPU: 無 +# 現有模型: qwen2.5:7b-instruct (4.5GB), llama3.2:3b (1.9GB), +# deepseek-r1:14b (8.5GB), nomic-embed-text (261MB) +# 推理延遲實測: qwen2.5:7b-instruct → total=111s, eval_rate=0.09 token/s +# llama3.2:3b → total=155s (cold start, 比 7b 更慢) +# 目標 ~30s 無法達到 (CPU 推理硬上限 ~0.09 token/s) +# 決策: qwen2.5:7b-instruct 已存在,設為備援 (111s 延遲,使用者需知情) +# 連通性: 110 → 188:11434 ✅ 已驗證 +# ⚠️ 注意: 188 推理極慢(~111s),應只在 111 GPU Ollama 完全失效時啟用 +# 建議: 程式碼層應設 OLLAMA_FALLBACK_188_TIMEOUT_SEC = 150 +# ============================================================================ +# +# 將以下兩行加入 /Users/ogt/awoooi/k8s/awoooi-prod/04-configmap.yaml +# 建議位置: OLLAMA_URL 行 (第 20 行) 之後 +# +# --- 新增內容 --- + # 2026-04-25 ogt + Claude Sonnet 4.6: 188 CPU-only Ollama 備援 (plan_complete_v3 P0.5) + # ⚠️ 188 推理延遲實測 ~111s (0.09 token/s, CPU-only Xeon 4214),僅作 111 完全失效時的降級備援 + # 模型已存在: qwen2.5:7b-instruct (4.5GB), 無需重拉 + OLLAMA_FALLBACK_188: "http://192.168.0.188:11434" + OLLAMA_188_MODEL: "qwen2.5:7b-instruct" +# --- 新增內容結束 --- +# +# 使用方式 (需用戶 review 後手動 apply): +# kubectl -n awoooi-prod apply -f k8s/awoooi-prod/04-configmap.yaml +# kubectl -n awoooi-prod rollout restart deployment/awoooi-api