feat(p1): Ollama 多層容災系統 — P1.1 健康檢測 + P1.2 ai_router 整合 + P1.5 容災告警

ADR-092 P1 飛輪閉環的 Ollama 失敗轉移子系統,全部 Engineer-A2/C/C2 補上。

新服務 (1581 行):
- ollama_health_monitor.py (356):3 層健康檢測(TCP/HTTP/推理)
- ollama_failover_manager.py (571):111→188 自動切換 + Redis 持久化 + recovery callback
- ollama_auto_recovery.py (436):30s 背景監控 + 連續 3 次 HEALTHY → 切回 + clear_cache
- failover_alerter.py (218):P1.5 Telegram 容災告警

服務整合:
- ai_router.py: AIProviderEnum.OLLAMA_188 + 120s budget + failover fallback chain
- main.py lifespan: 啟動時 wire callback + start recovery,關閉時優雅 stop
- config.py: OLLAMA_FALLBACK_URL / OLLAMA_HEALTH_CHECK_MODEL / GEMINI_DAILY_QUOTA(帳單熔斷)

K8s 配置:
- 04-configmap.yaml.patch-188-fallback:注入 OLLAMA_FALLBACK_URL=http://192.168.0.188:11434

測試 (2082 行):
- test_ollama_health_monitor.py (402)
- test_ollama_failover_manager.py (707)
- test_ollama_auto_recovery.py (580)
- test_ai_router_failover_integration.py (257)
- test_lifespan_failover_wiring.py (136)

依賴鏈:service 三件套 + ai_router + main.py 一起 commit,缺一就 ImportError。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-26 20:18:33 +08:00
parent d3a4fb4d15
commit 55c6b4e2d9
13 changed files with 3798 additions and 2 deletions

View File

@@ -189,11 +189,29 @@ class Settings(BaseSettings):
default="http://192.168.0.111:11434", # 2026-04-08 ogt: 切換至 M1 Pro (40+ tok/s vs 0.45 tok/s)
description="Ollama LLM service URL",
)
# 2026-04-25 Claude Engineer-C (P1.1): Ollama 188 CPU-only 備援 (方案 C)
# 若空字串則 OllamaFailoverManager 僅使用 OLLAMA_URL單節點模式
OLLAMA_FALLBACK_URL: str = Field(
default="",
description="Ollama CPU-only fallback URL (188 備援P1.1),空字串=停用",
)
# 2026-04-25 Claude Engineer-C (P1.1): Ollama 健康檢測推理測試模型
OLLAMA_HEALTH_CHECK_MODEL: str = Field(
default="qwen2.5:7b-instruct",
description="OllamaHealthMonitor 推理測試使用模型P1.1",
)
# 2026-04-12 ogt: 心跳必須確認載入的 Ollama 模型清單
OLLAMA_REQUIRED_MODELS: list[str] = Field(
default=["nomic-embed-text", "qwen2.5:7b-instruct", "deepseek-r1:14b"],
description="HeartbeatReportService 探測必要模型是否載入",
)
# 2026-04-25 critic-fix Part2 H7 by Claude Engineer-C2
# Gemini 帳單熔斷:每日呼叫上限,超過改走 188+Nemotron
# 超過上限後寫 Redis key ollama:gemini_daily_count:{date}TTL 86400s
GEMINI_DAILY_QUOTA: int = Field(
default=1000,
description="每日 Gemini 呼叫上限,超過切到 188+NemotronP1.1 帳單熔斷)",
)
# Deprecated: use OPENCLAW_URL instead
CLAWBOT_URL: str = Field(
default="http://192.168.0.188:8088", # 🔧 修正: OpenClaw 實際 port 是 8088

View File

@@ -546,9 +546,39 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("ai_slo_watchdog_schedule_failed", error=str(e))
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
# OllamaFailoverManager + OllamaAutoRecoveryService 飛輪接線:
# failover 切換時 → recovery_callback → set_current_primary → Redis 持久化
# recovery service 每 30s 檢查 → 111 連續 3 次 HEALTHY → 自動切回 → clear_cache
# 順序:先取 singleton → wire callback → 啟動 recovery service才能接收 callback
try:
from src.services.ollama_failover_manager import get_ollama_failover_manager
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
_failover_mgr = get_ollama_failover_manager()
_recovery_svc = get_ollama_auto_recovery_service()
# wire callbackfailover 切換時通知 recovery service 更新 current_primary
_failover_mgr.set_recovery_callback(_recovery_svc.set_current_primary)
# 啟動 recovery service從 Redis bootstrap current_primary並啟動背景監控
await _recovery_svc.start()
logger.info("ollama_failover_system_started")
except Exception as e:
logger.warning("ollama_failover_system_start_failed", error=str(e))
yield
# Shutdown
# 2026-04-25 P1.2 by Claude Engineer-A2 — 優雅關閉 Ollama failover 背景監控
# 必須在 Redis pool 關閉之前停止recovery service 可能仍在寫 Redis
try:
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
await get_ollama_auto_recovery_service().stop()
logger.info("ollama_failover_system_stopped")
except Exception as e:
logger.warning("ollama_failover_system_stop_failed", error=str(e))
# Phase 6.1: 關閉 Signal Worker (先關閉 Consumer)
await close_signal_worker()
await publisher.stop()

View File

@@ -68,10 +68,14 @@ logger = structlog.get_logger(__name__)
# =============================================================================
class AIProviderEnum(Enum):
class AIProviderEnum(str, Enum):
"""AI 提供者"""
OLLAMA = "ollama"
# 2026-04-25 critic-fix Part2 B2 by Claude Engineer-C2
# P1.1b OllamaFailoverManager 使用 provider_name="ollama_188"
# 但 AIProviderEnum 沒有此值 → P1.2 整合時 lookup 失敗
OLLAMA_188 = "ollama_188" # 188 CPU-only 備援節點P1.1b
GEMINI = "gemini"
CLAUDE = "claude"
# 2026-04-02 ogt: C1 修復 — 對齊 Registry 實際名稱
@@ -85,6 +89,8 @@ class AIProviderEnum(Enum):
# Provider 對應延遲預算 (ms)
PROVIDER_LATENCY_BUDGET: dict[AIProviderEnum, int] = {
AIProviderEnum.OLLAMA: 60000, # 本地,允許較長處理時間
# 2026-04-25 critic-fix Part2 B2 by Claude Engineer-C2 — 188 CPU-only 推理較慢
AIProviderEnum.OLLAMA_188: 120000, # 120s budget for CPU inference
AIProviderEnum.GEMINI: 30000, # 雲端,較低延遲
AIProviderEnum.CLAUDE: 30000, # 雲端,較低延遲
# 2026-04-02 ogt: C1 修復 — 對齊 Registry 名稱
@@ -212,6 +218,10 @@ class AIRouter:
self._intent_classifier = get_intent_classifier()
self._complexity_scorer = get_complexity_scorer()
self._model_registry = get_model_registry()
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
# 延遲 import 避免循環依賴ollama_failover_manager 不 import ai_router
from src.services.ollama_failover_manager import get_ollama_failover_manager
self._failover_manager = get_ollama_failover_manager()
# 從 ModelRegistry 取得模型配置
self._ollama_default = self._model_registry.get_model("ollama", "default")
@@ -372,10 +382,51 @@ class AIRouter:
intent, intent_result, complexity
)
# Step 3b: 若 initial decision 選到 OLLAMA交由 FailoverManager 重評
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
# 只在 OLLAMA 時觸發,不干擾 NEMOTRON / OPENCLAW_NEMO / CLAUDE / GEMINI 路由
failover_fallback: list[tuple[AIProviderEnum, str]] | None = None
if provider == AIProviderEnum.OLLAMA:
try:
failover_result = await self._failover_manager.select_provider(
task_type=intent.value if intent else "general"
)
primary_str = failover_result.primary.provider_name
try:
provider = AIProviderEnum(primary_str)
model = failover_result.primary.model
reason = f"{reason} [failover→{primary_str}]"
except ValueError:
# provider_name 無法對應已知 enum理論上不應發生OLLAMA_188 已加)
logger.warning(
"ai_router_unknown_failover_provider",
provider=primary_str,
)
# 重建 fallback chain從 failover_result 轉換
fb_list: list[tuple[AIProviderEnum, str]] = []
for ep in failover_result.fallback_chain:
try:
fb_provider = AIProviderEnum(ep.provider_name)
fb_list.append((fb_provider, ep.model))
except ValueError:
logger.warning(
"ai_router_unknown_failover_fallback_provider",
provider=ep.provider_name,
)
failover_fallback = fb_list
except Exception as e:
# failover_manager 異常 → 保留原始 providerfail-open
logger.warning("ai_router_failover_manager_error", error=str(e))
# Step 4: 建立 Fallback 鏈
# 2026-04-05 Claude Code: v4.3 — DIAGNOSE 改回 _full_fallback_chain
# NIM 從 Phase 22 起就是主力無隱私問題Ollama CPU-only 不可用(實測 238s
fallback_chain = self._build_fallback_chain(provider)
# 2026-04-25 P1.2: 若 failover_manager 回傳了 fallback chain,優先使用
fallback_chain = (
failover_fallback
if failover_fallback is not None
else self._build_fallback_chain(provider)
)
# Step 5: 計算延遲預算
latency_budget = PROVIDER_LATENCY_BUDGET.get(provider, 30000)

View File

@@ -0,0 +1,218 @@
"""Ollama 容災 / 自動恢復 / Gemini 帳單 Telegram 告警
設計原則:
- 每次 failover 觸發都通知(用戶明確指示)
- 但用 10min Redis dedup TTL 防同樣狀態重複告警
- 三種告警類型failover_triggered / recovery_succeeded / gemini_quota_exceeded
2026-04-25 P1.5 by Claude Engineer-D — Telegram Alerter for Ollama Failover
"""
from __future__ import annotations
from datetime import datetime, timezone, timedelta
from typing import Any
import structlog
TAIPEI_TZ = timezone(timedelta(hours=8))
DEDUP_TTL_SEC = 600 # 10 min
QUOTA_DEDUP_TTL_SEC = 86400 # 24h每日 quota 告警只發一次)
logger = structlog.get_logger(__name__)
class FailoverAlerter:
"""Ollama 容災 Telegram 告警
2026-04-25 P1.5 by Claude Engineer-D — Telegram Alerter for Ollama Failover
"""
def __init__(self, redis_client=None) -> None:
# telegram_gateway 從 singleton 取不注入lifespan 已確保初始化)
self._redis = redis_client
async def alert_failover(self, event: dict[str, Any]) -> None:
"""111 故障切換到 Gemini/188 — 10min dedup"""
to_provider = event.get("to_provider", "unknown")
dedup_key = f"alert:failover:{to_provider}"
if not await self._check_dedup(dedup_key, ttl=DEDUP_TTL_SEC):
logger.debug("failover_alert_dedup_skipped", to_provider=to_provider)
return
reason = event.get("reason", "unknown")
model = event.get("model", "?")
timestamp = event.get("timestamp", datetime.now(TAIPEI_TZ).isoformat())
fallback_chain_str = event.get("fallback_chain_str", "?")
msg = (
f"*Ollama 容災激活*\n\n"
f"故障主機Ollama 111 \\(GPU\\)\n"
f"故障狀態:{_escape_md(reason)}\n"
f"切換目標:{_escape_md(to_provider)} \\(model: {_escape_md(model)}\\)\n"
f"切換時間:{_escape_md(timestamp)}\n\n"
f"Fallback 鏈:{_escape_md(fallback_chain_str)}\n\n"
f"自動恢復服務持續監控3 次 HEALTHY 後自動切回"
)
await self._send(msg)
logger.info("failover_alert_sent", to_provider=to_provider)
async def alert_recovery(self, event: dict[str, Any]) -> None:
"""111 恢復後自動切回 — 10min dedup
2026-04-25 P1.5 by Claude Engineer-D — 用戶明確要求要通知
"""
dedup_key = "alert:recovery"
if not await self._check_dedup(dedup_key, ttl=DEDUP_TTL_SEC):
logger.debug("recovery_alert_dedup_skipped")
return
stable_count = event.get("stable_count", event.get("to", "?"))
# 相容 auto_recovery 傳入的 {"from": ..., "to": ...} 格式
from_provider = event.get("from_provider", event.get("from", "?"))
to_provider = event.get("to_provider", event.get("to", "ollama_111"))
recovery_time = event.get("recovery_time", datetime.now(TAIPEI_TZ).isoformat())
msg = (
f"*Ollama 自動恢復*\n\n"
f"恢復主機Ollama 111 \\(GPU\\)\n"
f"穩定計數:連續 {stable_count} 次 HEALTHY\n"
f"切回時間:{_escape_md(str(recovery_time))}\n"
f"切換路徑:{_escape_md(str(from_provider))}{_escape_md(str(to_provider))}\n\n"
f"自動化飛輪已恢復至 GPU 推理模式"
)
await self._send(msg)
logger.info("recovery_alert_sent", from_provider=from_provider)
async def alert_gemini_quota_exceeded(self, event: dict[str, Any]) -> None:
"""Gemini 每日上限觸發,降級到 188 CPU 備援 — 24h dedup"""
dedup_key = "alert:gemini_quota_exceeded"
if not await self._check_dedup(dedup_key, ttl=QUOTA_DEDUP_TTL_SEC):
logger.debug("quota_alert_dedup_skipped")
return
quota = event.get("quota", "?")
current_count = event.get("current_count", "?")
date_str = datetime.now(TAIPEI_TZ).date().isoformat()
msg = (
f"*Gemini 每日配額耗盡*\n\n"
f"日期:{date_str}\n"
f"上限:{quota} calls/day\n"
f"當前用量:{current_count}\n"
f"降級目標OLLAMA\\_188 \\(CPU推理較慢\\)\n\n"
f"進入慢速模式至明日 0:00\n"
f"建議檢查是否有異常流量,評估是否升級 Gemini 配額"
)
await self._send(msg)
logger.info("quota_alert_sent", quota=quota, current_count=current_count)
# -------------------------------------------------------------------------
# DedupRedis SET NX EX
# -------------------------------------------------------------------------
async def _check_dedup(self, key: str, ttl: int) -> bool:
"""
Redis SET NX EX 防止重複告警。
True = 第一次應送出False = 已送過(跳過)。
fail-openRedis 不可用時允許送出(不阻擋通知)
2026-04-25 P1.5 by Claude Engineer-D — Telegram dedup 鐵律 10min/24h TTL
"""
if self._redis is None:
return True # fail-open
try:
ok = await self._redis.set(f"{key}:dedup", "1", ex=ttl, nx=True)
return bool(ok)
except Exception as e:
logger.warning("dedup_check_failed", error=str(e))
return True # fail-open
# -------------------------------------------------------------------------
# 發送(透過 TelegramGateway singleton
# -------------------------------------------------------------------------
async def _send(self, message: str) -> None:
"""發送至 Telegram OPENCLAW_TG_CHAT_ID
使用現有 TelegramGateway singleton不另建 HTTP client
parse_mode=MarkdownV2 對應 MarkdownV2 escape 規則。
alerter 失敗不阻斷主路由邏輯exception 吞掉只 log
2026-04-25 P1.5 by Claude Engineer-D — 告警失敗不能阻斷主流程
"""
try:
from src.services.telegram_gateway import get_telegram_gateway
from src.core.config import get_settings
settings = get_settings()
chat_id = getattr(settings, "OPENCLAW_TG_CHAT_ID", None)
if not chat_id:
logger.warning("telegram_chat_id_missing_failover_alert")
return
gateway = get_telegram_gateway()
await gateway.send_notification(text=message, parse_mode="MarkdownV2")
logger.info("telegram_failover_alert_sent", message_len=len(message))
except Exception as e:
# 不 raise — 告警失敗不該阻斷主流程(鐵律)
logger.exception("telegram_failover_send_failed", error=str(e))
# -------------------------------------------------------------------------
# MarkdownV2 escape 工具
# -------------------------------------------------------------------------
_MD2_SPECIAL = r"\_*[]()~`>#+-=|{}.!"
def _escape_md(text: str) -> str:
"""Escape MarkdownV2 特殊字元,防止 Telegram parse error。
2026-04-25 P1.5 by Claude Engineer-D — MarkdownV2 安全逸出
"""
for ch in _MD2_SPECIAL:
text = text.replace(ch, f"\\{ch}")
return text
# =============================================================================
# Singleton
# =============================================================================
_alerter_instance: FailoverAlerter | None = None
def get_failover_alerter() -> FailoverAlerter:
"""取得 FailoverAlerter singletonlifespan 中注入依賴前也可安全呼叫,僅 fail-open
2026-04-25 P1.5 by Claude Engineer-D — Singleton 取得 alerter
"""
global _alerter_instance
if _alerter_instance is None:
_alerter_instance = FailoverAlerter()
return _alerter_instance
def configure_alerter(redis_client) -> None:
"""Lifespan 注入redis_client 就緒後呼叫,讓 dedup 功能生效。
telegram 不另外注入,直接從 get_telegram_gateway() singleton 取得
lifespan startup 已保證 TelegramGateway 初始化完成)。
2026-04-25 P1.5 by Claude Engineer-D — Lifespan 注入
"""
global _alerter_instance
_alerter_instance = FailoverAlerter(redis_client=redis_client)
logger.info("failover_alerter_configured")
def reset_failover_alerter() -> None:
"""重置 singleton測試用
2026-04-25 P1.5 by Claude Engineer-D
"""
global _alerter_instance
_alerter_instance = None

View File

@@ -0,0 +1,436 @@
"""
Ollama 自動恢復服務 - P1.1d
============================
背景監控:偵測 111 從 OFFLINE/SLOW/DEGRADED 恢復為 HEALTHY 後,立刻切回 Ollama。
核心設計:
- 30s 輪詢 111 健康狀態
- 防抖:連續 3 次 HEALTHY 才觸發切回30s × 3 = 90s 穩定視窗)
- 中途若又 OFFLINEcounter 歸零,重新計數
- 切回後呼叫 failover_manager.clear_cache(),讓下次路由重新評估
- 預留 telegram_alerter 介面P1.5 Engineer 接入)
- structlog audit service="ollama_auto_recovery"
整合方式FastAPI lifespan
from src.services.ollama_auto_recovery import OllamaAutoRecoveryService
@asynccontextmanager
async def lifespan(app: FastAPI):
recovery_svc = OllamaAutoRecoveryService()
await recovery_svc.start()
yield
await recovery_svc.stop()
版本: v1.0
建立: 2026-04-25 (台北時區)
建立者: Claude Engineer-C (P1.1d)
# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
"""
from __future__ import annotations
import asyncio
import datetime
from datetime import timezone, timedelta
from typing import Any, Protocol, runtime_checkable
import structlog
# 台北時區 +8標準庫保險絲100% 可用)
# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2
# 原 zoneinfo.ZoneInfo("Asia/Taipei") 失敗時 = None → datetime.now(None) 為 UTC
TAIPEI_TZ = timezone(timedelta(hours=8))
from src.core.config import get_settings
from src.services.ollama_health_monitor import (
HealthStatus,
OllamaHealthMonitor,
get_ollama_health_monitor,
)
from src.services.ollama_failover_manager import (
OllamaFailoverManager,
get_ollama_failover_manager,
)
logger = structlog.get_logger(__name__)
# =============================================================================
# 常數
# =============================================================================
RECOVERY_CHECK_INTERVAL_SEC = 30 # 每 30s 檢查一次
STABLE_COUNT_REQUIRED = 3 # 連續 3 次 HEALTHY 才切回(防抖)
# =============================================================================
# Telegram Alerter Protocol預留介面P1.5 Engineer 實作)
# =============================================================================
@runtime_checkable
class TelegramAlerter(Protocol):
"""Telegram 通知介面預留P1.5 Engineer 接入)"""
async def alert_recovery(self, payload: dict[str, Any]) -> None:
"""111 恢復通知"""
...
# =============================================================================
# OllamaAutoRecoveryService
# =============================================================================
class OllamaAutoRecoveryService:
"""
Ollama 111 自動恢復服務
偵測 111 從 OFFLINE/SLOW/DEGRADED 恢復為 HEALTHY 後,立刻切回 Ollama 路由。
防抖邏輯:
- 連續 3 次 HEALTHY90s 穩定視窗)才觸發切回
- 中途任一次非 HEALTHY → counter 歸零
- 已是 ollama primary 時不重複觸發
2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
"""
def __init__(
self,
health_monitor: OllamaHealthMonitor | None = None,
failover_manager: OllamaFailoverManager | None = None,
telegram_alerter: TelegramAlerter | None = None,
recovery_check_interval_sec: int = RECOVERY_CHECK_INTERVAL_SEC,
stable_count_required: int = STABLE_COUNT_REQUIRED,
) -> None:
self._monitor = health_monitor or get_ollama_health_monitor()
self._failover_manager = failover_manager or get_ollama_failover_manager()
self._alerter = telegram_alerter
self._recovery_check_interval_sec = recovery_check_interval_sec
self._stable_count_required = stable_count_required
self._settings = get_settings()
# 狀態追蹤
self._current_primary: str = "ollama" # "ollama" / "gemini" / "fallback"
self._consecutive_healthy: int = 0
self._task: asyncio.Task | None = None
# -------------------------------------------------------------------------
# Lifecycle
# -------------------------------------------------------------------------
async def start(self) -> None:
"""
啟動背景監控任務。
在 FastAPI lifespan 的 startup 階段呼叫:
recovery_svc = OllamaAutoRecoveryService()
await recovery_svc.start()
# 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2
# 啟動時從 Redis 載入狀態(跨重啟持久化),
# 若 primary != "ollama" 立刻執行一次 check不等 30s
"""
if self._task is not None and not self._task.done():
logger.warning(
"ollama_auto_recovery_already_running",
service="ollama_auto_recovery",
)
return
# 從 Redis 載入持久化狀態(跨重啟恢復)
# 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2
self._current_primary = await self._load_primary()
logger.info(
"ollama_auto_recovery_bootstrap",
service="ollama_auto_recovery",
current_primary=self._current_primary,
)
# 若 primary 不是 ollama立刻評估一次不等 30s interval
if self._current_primary != "ollama":
try:
await self._check_and_recover()
except Exception:
logger.exception(
"ollama_auto_recovery_immediate_check_error",
service="ollama_auto_recovery",
)
self._task = asyncio.create_task(
self._monitor_loop(),
name="ollama_auto_recovery_loop",
)
logger.info(
"ollama_auto_recovery_started",
service="ollama_auto_recovery",
check_interval_sec=self._recovery_check_interval_sec,
stable_count_required=self._stable_count_required,
)
async def stop(self) -> None:
"""
優雅關閉背景任務。
在 FastAPI lifespan 的 shutdown 階段呼叫:
await recovery_svc.stop()
"""
if self._task is None or self._task.done():
return
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
finally:
self._task = None
logger.info(
"ollama_auto_recovery_stopped",
service="ollama_auto_recovery",
)
# -------------------------------------------------------------------------
# 狀態控制(供整合層設定當前 primary例如 failover 觸發時更新)
# -------------------------------------------------------------------------
async def set_current_primary(self, provider: str) -> None:
"""
通知 recovery service 當前路由 primary 是哪個 provider。
OllamaFailoverManager.select_provider() 觸發 failover 後呼叫此方法更新狀態。
# 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2
# 改為 async切換時同步寫 Redis 持久化(跨重啟恢復)
"""
if self._current_primary != provider:
logger.info(
"ollama_auto_recovery_primary_changed",
service="ollama_auto_recovery",
from_primary=self._current_primary,
to_primary=provider,
)
self._current_primary = provider
# Redis 持久化(跨重啟恢復)
await self._persist_primary(provider)
if provider != "ollama":
# 切換到非 Ollama → 重置 counter開始監控恢復
self._consecutive_healthy = 0
logger.info(
"ollama_auto_recovery_tracking_started",
service="ollama_auto_recovery",
current_primary=provider,
)
# -------------------------------------------------------------------------
# Redis 持久化(跨重啟恢復)
# 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2
# -------------------------------------------------------------------------
_REDIS_PRIMARY_KEY = "ollama:current_primary"
async def _persist_primary(self, primary: str) -> None:
"""持久化 current_primary 到 Redis無 TTL跨重啟恢復"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
await redis.set(self._REDIS_PRIMARY_KEY, primary)
except Exception as e:
logger.warning(
"ollama_auto_recovery_persist_failed",
service="ollama_auto_recovery",
error=str(e),
)
async def _load_primary(self) -> str:
"""從 Redis 載入 current_primary找不到時預設 "ollama""""
try:
from src.core.redis_client import get_redis
redis = get_redis()
val = await redis.get(self._REDIS_PRIMARY_KEY)
if val:
decoded = val.decode() if isinstance(val, bytes) else val
logger.info(
"ollama_auto_recovery_loaded_from_redis",
service="ollama_auto_recovery",
current_primary=decoded,
)
return decoded
except Exception as e:
logger.warning(
"ollama_auto_recovery_load_failed",
service="ollama_auto_recovery",
error=str(e),
)
return "ollama" # 預設:正常路由
@property
def current_primary(self) -> str:
return self._current_primary
@property
def consecutive_healthy(self) -> int:
return self._consecutive_healthy
@property
def is_running(self) -> bool:
return self._task is not None and not self._task.done()
# -------------------------------------------------------------------------
# 背景監控迴圈
# -------------------------------------------------------------------------
async def _monitor_loop(self) -> None:
"""
每 30s 檢查 111 健康狀態。
連續 3 次 HEALTHY 且 current_primary != "ollama" → 觸發切回。
"""
while True:
try:
await asyncio.sleep(self._recovery_check_interval_sec)
await self._check_and_recover()
except asyncio.CancelledError:
raise # 必須重新 raise讓 stop() 的 await 正常結束
except Exception:
logger.exception(
"ollama_auto_recovery_loop_error",
service="ollama_auto_recovery",
)
# 不 break繼續監控異常不應停止監控迴圈
async def _check_and_recover(self) -> None:
"""
單次健康狀態檢查 + 切回邏輯。
抽取為獨立方法,方便單元測試。
"""
host = self._settings.OLLAMA_URL
try:
health = await self._monitor.check(host)
except Exception:
logger.exception(
"ollama_auto_recovery_check_failed",
service="ollama_auto_recovery",
host=host,
)
self._consecutive_healthy = 0
return
if health.status == HealthStatus.HEALTHY:
self._consecutive_healthy += 1
logger.debug(
"ollama_auto_recovery_healthy_tick",
service="ollama_auto_recovery",
consecutive=self._consecutive_healthy,
required=self._stable_count_required,
current_primary=self._current_primary,
)
if (
self._consecutive_healthy >= self._stable_count_required
and self._current_primary != "ollama"
):
await self._switch_back_to_ollama()
else:
# 非 HEALTHY → counter 歸零,繼續等
if self._consecutive_healthy > 0:
logger.debug(
"ollama_auto_recovery_counter_reset",
service="ollama_auto_recovery",
previous_count=self._consecutive_healthy,
status=health.status.value,
)
self._consecutive_healthy = 0
# -------------------------------------------------------------------------
# 切回 Ollama 111
# -------------------------------------------------------------------------
async def _switch_back_to_ollama(self) -> None:
"""
切回 Ollama 111
1. 更新 _current_primary
2. 呼叫 failover_manager.clear_cache() 清空路由快取
3. 通知 failover_managernotify_recovery
4. Telegram 通知(若 alerter 已設定)
5. structlog audit
"""
from_provider = self._current_primary
self._current_primary = "ollama"
stable_count = self._consecutive_healthy
# counter 不立即清零:保留供 audit下次 loop 開始後自然計數
self._consecutive_healthy = 0
# 取台北時間(標準庫保證 +8禁 UTC
# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2
recovery_time = datetime.datetime.now(TAIPEI_TZ)
# 清空 failover manager 快取(讓下次 select_provider 重新評估)
try:
await self._failover_manager.clear_cache()
except Exception:
logger.exception(
"ollama_auto_recovery_clear_cache_error",
service="ollama_auto_recovery",
)
# 通知 failover_manager
try:
self._failover_manager.notify_recovery("ollama_111")
except Exception:
logger.exception(
"ollama_auto_recovery_notify_error",
service="ollama_auto_recovery",
)
# Telegram 通知P1.5 Engineer-D 接入)
# 2026-04-25 P1.5 by Claude Engineer-D — 接 FailoverAlerterself._alerter 優先,
# 無則 fallback 到 get_failover_alerter() singleton
try:
alerter = self._alerter
if alerter is None:
from src.services.failover_alerter import get_failover_alerter
alerter = get_failover_alerter()
await alerter.alert_recovery({
"from": from_provider,
"to": "ollama_111",
"stable_count": stable_count,
"recovery_time": recovery_time.isoformat(),
})
except Exception:
logger.exception(
"ollama_auto_recovery_telegram_alert_error",
service="ollama_auto_recovery",
)
# structlog audit必須記錄
logger.info(
"ollama_auto_recovery_switched_back",
service="ollama_auto_recovery",
action="switched_back",
from_provider=from_provider,
to_provider="ollama_111",
stable_count=stable_count,
recovery_time=recovery_time.isoformat(),
)
# =============================================================================
# Singleton
# =============================================================================
_recovery_service: OllamaAutoRecoveryService | None = None
def get_ollama_auto_recovery_service() -> OllamaAutoRecoveryService:
"""取得 OllamaAutoRecoveryService singleton"""
global _recovery_service
if _recovery_service is None:
_recovery_service = OllamaAutoRecoveryService()
return _recovery_service
def reset_ollama_auto_recovery_service() -> None:
"""重置 singleton測試用"""
global _recovery_service
_recovery_service = None

View File

@@ -0,0 +1,571 @@
"""
Ollama 自動容災管理 - P1.1b
============================
依 OllamaHealthMonitor 健康狀態決定 Ollama 路由方案。
路由邏輯2026-04-25 統帥指令Gemini 優先188 最後備援):
111 HEALTHY → 主 111fallback [Gemini, 188, Nemotron]
111 SLOW → 主 Geminifallback [111, 188]
111 DEGRADED → 主 Geminifallback [188, Nemotron, Claude]
111 OFFLINE → 主 Geminifallback [188, Nemotron, Claude]
111 OFFLINE + 188 OFFLINE → 主 Geminifallback [Nemotron, Claude]
設計說明:
- 不直接依賴 AIProviderEnumP1.2 Engineer-A 整合時再對齊)
- 返回輕量 OllamaRoutingResult含主 endpoint + fallback 清單
- 並行檢查 111 + 188asyncio.gather
- 切換觸發時寫 audit_logs service="ollama_failover"
- clear_cache() 方法供 OllamaAutoRecoveryService 切回後清空路由快取
版本: v2.0
建立: 2026-04-25 (台北時區)
建立者: Claude Engineer-C (P1.1b)
# Created 2026-04-25 P1.1 by Claude Engineer-C
# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
"""
from __future__ import annotations
import asyncio
import datetime
from dataclasses import dataclass, field
# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2
# 用標準庫 timezone(timedelta(hours=8)) 取代 zoneinfo保證一定有 +8 時區
# 原 zoneinfo.ZoneInfo("Asia/Taipei") 失敗時 = None → datetime.now(None) 為 UTC
from datetime import timezone, timedelta
import structlog
from src.core.config import get_settings
# 台北時區 +8標準庫保險絲100% 可用)
# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2
TAIPEI_TZ = timezone(timedelta(hours=8))
from src.services.ollama_health_monitor import (
HealthReport,
HealthStatus,
OllamaHealthMonitor,
get_ollama_health_monitor,
)
logger = structlog.get_logger(__name__)
# =============================================================================
# 路由結果模型輕量P1.2 整合時轉換為 RoutingDecision
# =============================================================================
@dataclass
class OllamaEndpoint:
"""Ollama 端點描述"""
url: str
provider_name: str # 給 AIRouterExecutor 用的 provider 名稱
model: str
def to_dict(self) -> dict:
return {"url": self.url, "provider_name": self.provider_name, "model": self.model}
@dataclass
class OllamaRoutingResult:
"""
Ollama 容災路由結果
P1.2 Engineer-A 整合時,將此結果轉換為 ai_router.RoutingDecision
- selected_provider = AIProviderEnum[result.primary.provider_name.upper()] or 新的 OLLAMA_188
- selected_model = result.primary.model
- fallback_chain = [(AIProviderEnum[p.provider_name.upper()], p.model) for p in result.fallback_chain]
"""
primary: OllamaEndpoint
fallback_chain: list[OllamaEndpoint]
routing_reason: str
health_111: HealthReport
health_188: HealthReport | None = None
def all_endpoints_in_order(self) -> list[OllamaEndpoint]:
"""返回完整的優先序端點列表primary 在前)"""
return [self.primary, *self.fallback_chain]
def to_dict(self) -> dict:
return {
"primary": {
"url": self.primary.url,
"provider": self.primary.provider_name,
"model": self.primary.model,
},
"fallback_chain": [
{"url": e.url, "provider": e.provider_name, "model": e.model}
for e in self.fallback_chain
],
"routing_reason": self.routing_reason,
"health_111": self.health_111.to_dict(),
"health_188": self.health_188.to_dict() if self.health_188 else None,
}
# =============================================================================
# 已知 Fallback 端點定義Nemotron / Gemini / Claude
# =============================================================================
# 以 provider_name 對應 ai_router.AIProviderEnum 的 value
_NEMOTRON_ENDPOINT = OllamaEndpoint(
url="", # Nemotron 不是 HTTP URL由 AIRouterExecutor 從 Registry 取得
provider_name="nemotron",
model="nvidia/nemotron-mini-4b-instruct",
)
_GEMINI_ENDPOINT = OllamaEndpoint(
url="",
provider_name="gemini",
model="gemini-1.5-flash",
)
_CLAUDE_ENDPOINT = OllamaEndpoint(
url="",
provider_name="claude",
model="claude-3-5-haiku-20241022",
)
# =============================================================================
# OllamaFailoverManager
# =============================================================================
class OllamaFailoverManager:
"""
Ollama 自動容災管理器
並行檢查 111 + 188依健康狀態選擇最佳路由。
使用方式:
manager = OllamaFailoverManager()
result = await manager.select_provider()
# result.primary.url → 使用的 Ollama URL
# result.fallback_chain → 依序 fallback
2026-04-25 Claude Engineer-C (P1.1b)
"""
def __init__(
self,
health_monitor: OllamaHealthMonitor | None = None,
recovery_callback=None,
) -> None:
self._monitor = health_monitor or get_ollama_health_monitor()
self._settings = get_settings()
# 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2
# recovery_callback: async callable(provider_name: str) → None
# OllamaAutoRecoveryService.set_current_primary 在 failover 時被通知,
# 避免重啟後 _current_primary 停留在 "ollama" 而永不啟動恢復監控
self._recovery_callback = recovery_callback
# -------------------------------------------------------------------------
# Public API
# -------------------------------------------------------------------------
async def select_provider(
self,
task_type: str = "",
context: dict | None = None,
) -> OllamaRoutingResult:
"""
並行檢查 111 + 188返回路由結果
Args:
task_type: 任務類型(預留,目前未影響路由邏輯)
context: 額外上下文(預留)
Returns:
OllamaRoutingResult
"""
url_111 = self._settings.OLLAMA_URL
url_188 = self._settings.OLLAMA_FALLBACK_URL or ""
# 並行檢查
# 2026-04-25 critic-fix Part2 H4 by Claude Engineer-C2
# return_exceptions=True 防止任一 check 例外導致整個 select_provider 炸
if url_188:
results = await asyncio.gather(
self._monitor.check(url_111),
self._monitor.check(url_188),
return_exceptions=True,
)
# 處理 exception — 任一失敗視為 OFFLINE
health_111_raw, health_188_raw = results
health_111: HealthReport = (
HealthReport(status=HealthStatus.OFFLINE, reason=f"check error: {health_111_raw}")
if isinstance(health_111_raw, Exception)
else health_111_raw
)
health_188: HealthReport | None = (
HealthReport(status=HealthStatus.OFFLINE, reason=f"check error: {health_188_raw}")
if isinstance(health_188_raw, Exception)
else health_188_raw
)
else:
health_111 = await self._monitor.check(url_111)
health_188 = None
result = self._decide_route(
health_111=health_111,
health_188=health_188,
url_111=url_111,
url_188=url_188,
)
# Gemini 帳單熔斷quota gate
# 2026-04-25 critic-fix Part2 H7 by Claude Engineer-C2
if result.primary.provider_name == "gemini":
quota_ok = await self._check_gemini_quota()
if not quota_ok:
quota = getattr(self._settings, "GEMINI_DAILY_QUOTA", 1000)
logger.warning(
"gemini_quota_exceeded_falling_to_188",
quota=quota,
health_111=health_111.status.value,
)
result = self._build_quota_exceeded_route(
health_111=health_111,
health_188=health_188,
url_111=url_111,
url_188=url_188,
)
# 寫入 audit_logbest-effort
await self._write_failover_audit(result)
logger.info(
"ollama_failover_decision",
primary=result.primary.provider_name,
primary_url=result.primary.url,
reason=result.routing_reason,
fallback_count=len(result.fallback_chain),
health_111=health_111.status.value,
health_188=health_188.status.value if health_188 else "not_configured",
)
# 通知 recovery service 當前 primary跨重啟持久化
# 2026-04-25 critic-fix Part2 H5+H6 by Claude Engineer-C2
if self._recovery_callback is not None:
try:
await self._recovery_callback(result.primary.provider_name)
except Exception as e:
logger.warning(
"ollama_failover_recovery_callback_failed",
error=str(e),
)
return result
# -------------------------------------------------------------------------
# 路由決策邏輯
# -------------------------------------------------------------------------
def _decide_route(
self,
health_111: HealthReport,
health_188: HealthReport | None,
url_111: str,
url_188: str,
) -> OllamaRoutingResult:
"""
決策矩陣2026-04-25 統帥指令Gemini 優先188 最後備援):
111 HEALTHY → primary=111, fallback=[Gemini, 188, Nemotron]
111 SLOW → primary=Gemini, fallback=[111, 188]
111 DEGRADED → primary=Gemini, fallback=[188, Nemotron, Claude]
111 OFFLINE → primary=Gemini, fallback=[188, Nemotron, Claude]
111 OFFLINE + 188 OFFLINE → primary=Gemini, fallback=[Nemotron, Claude]
關鍵原則:
- 111 非 HEALTHY 時primary 必為 Gemini快速雲端不等 188 慢推理)
- 188 永遠在 fallback chain作為 Gemini 額度耗盡的最後備援
- degradation_reason 記錄切換原因 + 時間戳
2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
"""
model_111 = self._settings.OLLAMA_HEALTH_CHECK_MODEL
model_188 = "qwen2.5:7b-instruct" # 188 CPU-only 備援推薦模型plan 方案 C
ep_111 = OllamaEndpoint(url=url_111, provider_name="ollama", model=model_111)
ep_188 = (
OllamaEndpoint(url=url_188, provider_name="ollama_188", model=model_188)
if url_188
else None
)
# 188 可用性判斷(僅供 fallback 使用)
has_188 = ep_188 is not None and (
health_188 is not None and health_188.status != HealthStatus.OFFLINE
)
# 切換時間戳(台北時區 +8標準庫保證
# 2026-04-25 critic-fix Part2 B4 by Claude Engineer-C2
now_ts = datetime.datetime.now(TAIPEI_TZ).isoformat()
# ==========================================================
# 111 HEALTHY → 主 111Gemini 作為第一 fallback快速雲端
# ==========================================================
if health_111.status == HealthStatus.HEALTHY:
fallback: list[OllamaEndpoint] = [_GEMINI_ENDPOINT]
if has_188 and ep_188:
fallback.append(ep_188)
fallback.append(_NEMOTRON_ENDPOINT)
return OllamaRoutingResult(
primary=ep_111,
fallback_chain=fallback,
routing_reason="111 HEALTHY → 主 111",
health_111=health_111,
health_188=health_188,
)
# ==========================================================
# 111 SLOW → primary=Geminifallback=[111, 188]
# 111 實測 eval rate 0.09 token/s~111s 推理Gemini 更快
# ==========================================================
if health_111.status == HealthStatus.SLOW:
fallback_slow: list[OllamaEndpoint] = [ep_111]
if has_188 and ep_188:
fallback_slow.append(ep_188)
degradation_reason = (
f"111 SLOWeval ~0.09 token/s, ~111s→ 切 Gemini at {now_ts}"
)
return OllamaRoutingResult(
primary=_GEMINI_ENDPOINT,
fallback_chain=fallback_slow,
routing_reason=degradation_reason,
health_111=health_111,
health_188=health_188,
)
# ==========================================================
# 111 DEGRADED 或 OFFLINE → primary=Gemini188 在 fallback
# ==========================================================
status_label = health_111.status.value # "degraded" / "offline"
degradation_reason = f"111 {status_label} → 切 Gemini at {now_ts}"
if has_188 and ep_188:
return OllamaRoutingResult(
primary=_GEMINI_ENDPOINT,
fallback_chain=[ep_188, _NEMOTRON_ENDPOINT, _CLAUDE_ENDPOINT],
routing_reason=degradation_reason,
health_111=health_111,
health_188=health_188,
)
# 188 也不可用 → Gemini 主力,最後備援 Nemotron / Claude
degradation_reason = f"111 {status_label} + 188 不可用 → 切 Gemini at {now_ts}"
return OllamaRoutingResult(
primary=_GEMINI_ENDPOINT,
fallback_chain=[_NEMOTRON_ENDPOINT, _CLAUDE_ENDPOINT],
routing_reason=degradation_reason,
health_111=health_111,
health_188=health_188,
)
# -------------------------------------------------------------------------
# Gemini 帳單熔斷quota gate
# 2026-04-25 critic-fix Part2 H7 by Claude Engineer-C2
# -------------------------------------------------------------------------
async def _check_gemini_quota(self) -> bool:
"""
檢查每日 Gemini call 配額,超過上限則禁用。
Redis key: ollama:gemini_daily_count:{YYYY-MM-DD}TTL 86400s
計數 atomicincr
Returns:
True → 仍在配額內,可使用 Gemini
False → 已超配額,應切到 188+Nemotron
fail-openRedis 不可用時允許走 Gemini不阻擋服務
"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
if redis is None:
return True # fail-open
quota = getattr(self._settings, "GEMINI_DAILY_QUOTA", 1000)
key = f"ollama:gemini_daily_count:{datetime.date.today().isoformat()}"
count_raw = await redis.get(key)
count = int(count_raw or 0)
if count >= quota:
return False
# atomic incr + 設定 TTL確保跨日自動重置
await redis.incr(key)
await redis.expire(key, 86400)
return True
except Exception as e:
logger.warning("gemini_quota_check_failed", error=str(e))
return True # fail-open
def _build_quota_exceeded_route(
self,
health_111: HealthReport,
health_188: HealthReport | None,
url_111: str, # noqa: ARG002 — 保留供 OllamaRoutingResult 結構完整性health_111 對應)
url_188: str,
) -> OllamaRoutingResult:
"""
Gemini 配額耗盡時的備援路由primary=OLLAMA_188fallback=[Nemotron, Claude]
若 188 也不可用,則 primary=Nemotron。
"""
model_188 = "qwen2.5:7b-instruct"
ep_188 = (
OllamaEndpoint(url=url_188, provider_name="ollama_188", model=model_188)
if url_188
else None
)
has_188 = ep_188 is not None and (
health_188 is not None and health_188.status != HealthStatus.OFFLINE
)
if has_188 and ep_188:
return OllamaRoutingResult(
primary=ep_188,
fallback_chain=[_NEMOTRON_ENDPOINT, _CLAUDE_ENDPOINT],
routing_reason="Gemini quota exceeded → 188 CPU-only 備援",
health_111=health_111,
health_188=health_188,
)
# 188 也不可用
return OllamaRoutingResult(
primary=_NEMOTRON_ENDPOINT,
fallback_chain=[_CLAUDE_ENDPOINT],
routing_reason="Gemini quota exceeded + 188 不可用 → Nemotron 備援",
health_111=health_111,
health_188=health_188,
)
# -------------------------------------------------------------------------
# Recovery API供 OllamaAutoRecoveryService 呼叫)
# -------------------------------------------------------------------------
def set_recovery_callback(self, callback) -> None:
"""
設定 recovery callback供 lifespan wiring 使用)。
callback signature: async (provider_name: str) -> None
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
"""
self._recovery_callback = callback
async def clear_cache(self) -> None:
"""
清空路由決策快取,讓下次 select_provider 重新評估健康狀態。
OllamaAutoRecoveryService 在偵測 111 恢復後呼叫此方法。
2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
# 2026-04-25 P1.2 by Claude Engineer-A2 — 改用 make_cache_key 動態組 key消除硬編碼 IP
"""
try:
from src.core.redis_client import get_redis
from src.services.ollama_health_monitor import make_cache_key
redis = get_redis()
if redis is None:
return
# 動態由 settings URL 組 cache key避免硬編碼 IP
keys = [
make_cache_key(self._settings.OLLAMA_URL),
make_cache_key(self._settings.OLLAMA_FALLBACK_URL or ""),
]
for k in keys:
if k and k != "ollama_health:": # 空 URL 會產生無意義的 key跳過
await redis.delete(k)
logger.info(
"ollama_failover_cache_cleared",
service="ollama_failover",
reason="recovery_triggered",
)
except Exception as e:
logger.debug("ollama_failover_clear_cache_failed", error=str(e))
def notify_recovery(self, provider: str) -> None:
"""
預留P1.5 Engineer 接入 Telegram alerter 時使用。
目前僅寫 structlog audit。
2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
"""
logger.info(
"ollama_recovery_notified",
service="ollama_failover",
provider=provider,
action="recovery_received",
)
# -------------------------------------------------------------------------
# Audit Log
# -------------------------------------------------------------------------
async def _write_failover_audit(self, result: OllamaRoutingResult) -> None:
"""
切換觸發時寫 structlog auditbest-effort+ Telegram 告警
# 2026-04-25 critic-fix Part2 B1 by Claude Engineer-C2
# 原 AuditLog DB 寫入使用不存在的欄位service/action/target/status/metadata
# → SQLAlchemy crash → except 吃掉 → 零稽核
# 修法:刪除 DB 寫入路徑,改用 structlog onlyaudit 不依賴 DB schema
# 2026-04-25 P1.5 by Claude Engineer-D — 新增 Telegram 告警dedup 10min
service="ollama_failover"per 任務規格)
僅在 primary 非 111 時記錄(真正發生切換)
"""
if result.primary.provider_name == "ollama":
# 111 正常,無切換事件
return
logger.info(
"ollama_failover_triggered",
service="ollama_failover",
action="failover_triggered",
from_provider="ollama",
to_provider=result.primary.provider_name,
reason=result.routing_reason,
primary_url=result.primary.url or result.primary.provider_name,
health_111=result.health_111.status.value,
health_188=result.health_188.status.value if result.health_188 else "not_configured",
)
# Telegram 告警首次切換才通知dedup 10min 內建)
# 2026-04-25 P1.5 by Claude Engineer-D — 告警失敗不阻斷主路由邏輯
try:
from src.services.failover_alerter import get_failover_alerter
fallback_chain_str = "".join(
p.provider_name for p in result.fallback_chain
)
alerter = get_failover_alerter()
await alerter.alert_failover({
"to_provider": result.primary.provider_name,
"model": result.primary.model,
"reason": result.routing_reason,
"timestamp": datetime.datetime.now(TAIPEI_TZ).isoformat(),
"fallback_chain_str": fallback_chain_str,
})
except Exception as e:
logger.warning("failover_alert_failed", error=str(e))
# =============================================================================
# Singleton
# =============================================================================
_failover_manager: OllamaFailoverManager | None = None
def get_ollama_failover_manager() -> OllamaFailoverManager:
"""取得 OllamaFailoverManager singleton"""
global _failover_manager
if _failover_manager is None:
_failover_manager = OllamaFailoverManager()
return _failover_manager
def reset_ollama_failover_manager() -> None:
"""重置 singleton測試用"""
global _failover_manager
_failover_manager = None

View File

@@ -0,0 +1,356 @@
"""
Ollama 健康檢測模組 - P1.1a
============================
三層健康檢查:連通性 → 推理測試 → (optional) GPU 記憶體
設計原則:
- Redis 30s 快取防 health check storm
- 降級安全:快取失敗不影響功能,直接執行檢查
- 用 settings 取 host 配置(禁硬編碼 IP
- 用 httpx.AsyncClient非 requests
版本: v1.1
建立: 2026-04-25 (台北時區)
建立者: Claude Engineer-C (P1.1a)
# Created 2026-04-25 P1.1 by Claude Engineer-C
# 2026-04-25 critic-fix by Claude Engineer-C — 修 BLOCKER B3(make_cache_key 公開) + H3(timeout 45s)
"""
from __future__ import annotations
import asyncio
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from urllib.parse import urlparse
import httpx
import structlog
from src.core.config import get_settings
logger = structlog.get_logger(__name__)
# =============================================================================
# 常數
# =============================================================================
REDIS_CACHE_KEY_PREFIX = "ollama_health:"
REDIS_CACHE_TTL_SECONDS = 30 # 防 health check storm
CONNECTIVITY_TIMEOUT_SECONDS = 5.0
# 2026-04-25 critic-fix H3 by Claude Engineer-C — 45s 讓 SLOW 門檻(30s)真的能觀察到
INFERENCE_TIMEOUT_SECONDS = 45.0
# 推理延遲分級門檻(毫秒)
LATENCY_HEALTHY_THRESHOLD_MS = 10_000 # <10s → HEALTHY
LATENCY_SLOW_THRESHOLD_MS = 30_000 # 10-30s → SLOW
# >30s → DEGRADED
# =============================================================================
# 公開工具函數
# =============================================================================
def make_cache_key(url: str) -> str:
"""
由 URL 產生 Redis cache key。
取 host:port 部分netloc作為 key避免 scheme 差異干擾。
http://192.168.0.111:11434 → "ollama_health:192.168.0.111:11434"
公開函數供 OllamaFailoverManager.clear_cache() 動態組 key
確保與內部 _cache_key() 使用相同邏輯。
# 2026-04-25 critic-fix B3 by Claude Engineer-C — 避免 clear_cache 硬編碼 IP
"""
parsed = urlparse(url)
netloc = parsed.netloc or url
return f"{REDIS_CACHE_KEY_PREFIX}{netloc}"
# =============================================================================
# 資料模型
# =============================================================================
class HealthStatus(Enum):
"""Ollama 健康狀態分級"""
HEALTHY = "healthy" # <10s 推理,正常使用
SLOW = "slow" # 10-30s降級使用
DEGRADED = "degraded" # >30s 或推理超時,優先切換
OFFLINE = "offline" # 連通性失敗,必須切換
@dataclass
class HealthReport:
"""Ollama 健康檢測報告"""
status: HealthStatus
host: str = ""
latency_ms: float = 0.0
reason: str = ""
checked_at: float = field(default_factory=time.time)
from_cache: bool = False
mcp_health: dict[str, bool] = field(default_factory=dict)
def is_usable(self) -> bool:
"""是否可用(不含 OFFLINE"""
return self.status != HealthStatus.OFFLINE
def to_dict(self) -> dict:
return {
"status": self.status.value,
"host": self.host,
"latency_ms": round(self.latency_ms, 1),
"reason": self.reason,
"checked_at": self.checked_at,
"from_cache": self.from_cache,
}
# =============================================================================
# OllamaHealthMonitor
# =============================================================================
class OllamaHealthMonitor:
"""
Ollama 三層健康檢測
層 1連通性/api/tags5s timeout
層 2推理測試/api/generate35s timeout
層 3GPU 記憶體optionalvia SSH MCP
結果 Redis 快取 30s防 health check storm。
2026-04-25 Claude Engineer-C (P1.1a)
"""
def __init__(self) -> None:
self._settings = get_settings()
# -------------------------------------------------------------------------
# Public API
# -------------------------------------------------------------------------
async def check(self, host: str) -> HealthReport:
"""
執行健康檢測(含 Redis 快取)
Args:
host: Ollama 主機 URLe.g. "http://192.168.0.111:11434"
Returns:
HealthReport
"""
# 嘗試從快取取得
cached = await self._get_cached(host)
if cached is not None:
cached.from_cache = True
logger.debug("ollama_health_cache_hit", host=host, status=cached.status.value)
return cached
# 執行實際健康檢測
report = await self._run_checks(host)
report.host = host
# 寫入快取(失敗不影響功能,外部 exception 也靜默)
try:
await self._set_cached(host, report)
except Exception as e:
logger.debug("ollama_health_set_cached_outer_failed", host=host, error=str(e))
logger.info(
"ollama_health_checked",
host=host,
status=report.status.value,
latency_ms=round(report.latency_ms, 1),
reason=report.reason,
)
# 寫入 audit_logbest-effort
await self._write_audit_log(host, report)
return report
# -------------------------------------------------------------------------
# 三層檢查
# -------------------------------------------------------------------------
async def _run_checks(self, host: str) -> HealthReport:
"""依序執行三層檢查"""
# 層 1連通性
connectivity_ok = await self._check_connectivity(host)
if not connectivity_ok:
return HealthReport(
status=HealthStatus.OFFLINE,
host=host,
reason="連通性失敗:/api/tags 無回應",
)
# 層 2推理測試
report = await self._check_inference(host)
return report
async def _check_connectivity(self, host: str) -> bool:
"""
層 1連通性檢查
GET /api/tags5s timeout回傳 200 即通過
"""
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(CONNECTIVITY_TIMEOUT_SECONDS)
) as client:
resp = await client.get(f"{host}/api/tags")
return resp.status_code == 200
except (httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError):
return False
except Exception as e:
logger.warning("ollama_connectivity_check_error", host=host, error=str(e))
return False
async def _check_inference(self, host: str) -> HealthReport:
"""
層 2推理測試
POST /api/generate35s timeout依延遲分級
"""
model = self._settings.OLLAMA_HEALTH_CHECK_MODEL
start = time.perf_counter()
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(INFERENCE_TIMEOUT_SECONDS)
) as client:
resp = await client.post(
f"{host}/api/generate",
json={
"model": model,
"prompt": "hi",
"stream": False,
"options": {"num_predict": 1},
},
)
latency_ms = (time.perf_counter() - start) * 1000
if resp.status_code != 200:
return HealthReport(
status=HealthStatus.DEGRADED,
latency_ms=latency_ms,
reason=f"推理回傳 HTTP {resp.status_code}",
)
# 分級判斷
if latency_ms < LATENCY_HEALTHY_THRESHOLD_MS:
return HealthReport(
status=HealthStatus.HEALTHY,
latency_ms=latency_ms,
)
elif latency_ms < LATENCY_SLOW_THRESHOLD_MS:
return HealthReport(
status=HealthStatus.SLOW,
latency_ms=latency_ms,
reason=f"推理延遲 {latency_ms:.0f}msslow zone",
)
else:
return HealthReport(
status=HealthStatus.DEGRADED,
latency_ms=latency_ms,
reason=f"推理延遲 {latency_ms:.0f}ms>30s",
)
except (httpx.TimeoutException, asyncio.TimeoutError):
latency_ms = (time.perf_counter() - start) * 1000
return HealthReport(
status=HealthStatus.DEGRADED,
latency_ms=latency_ms,
reason="推理超時 >35s",
)
except (httpx.ConnectError, httpx.NetworkError) as e:
return HealthReport(
status=HealthStatus.OFFLINE,
reason=f"推理連接失敗:{e}",
)
except Exception as e:
logger.warning("ollama_inference_check_error", host=host, error=str(e))
return HealthReport(
status=HealthStatus.DEGRADED,
reason=f"推理測試例外:{e}",
)
# -------------------------------------------------------------------------
# Redis 快取
# -------------------------------------------------------------------------
def _cache_key(self, host: str) -> str:
"""生成 Redis cache key內部使用委派至 make_cache_key"""
return make_cache_key(host)
async def _get_cached(self, host: str) -> HealthReport | None:
"""從 Redis 取得快取的 HealthReport失敗返回 None"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
raw = await redis.get(self._cache_key(host))
if not raw:
return None
data = json.loads(raw)
return HealthReport(
status=HealthStatus(data["status"]),
host=data.get("host", host),
latency_ms=data.get("latency_ms", 0.0),
reason=data.get("reason", ""),
checked_at=data.get("checked_at", time.time()),
)
except Exception as e:
logger.debug("ollama_health_cache_get_failed", host=host, error=str(e))
return None
async def _set_cached(self, host: str, report: HealthReport) -> None:
"""寫入 Redis 快取,失敗靜默(不影響功能)"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
data = json.dumps(report.to_dict())
await redis.set(self._cache_key(host), data, ex=REDIS_CACHE_TTL_SECONDS)
except Exception as e:
logger.debug("ollama_health_cache_set_failed", host=host, error=str(e))
# -------------------------------------------------------------------------
# Audit Log (structured logAuditLog 表設計用於 K8s 操作審計,不適合此場景)
# -------------------------------------------------------------------------
async def _write_audit_log(self, host: str, report: HealthReport) -> None:
"""記錄健康檢測結果structlogservice=ollama_health_monitor"""
logger.info(
"ollama_health_monitor_audit",
service="ollama_health_monitor",
host=host,
status=report.status.value,
latency_ms=round(report.latency_ms, 1),
reason=report.reason,
is_usable=report.is_usable(),
)
# =============================================================================
# Singleton
# =============================================================================
_monitor: OllamaHealthMonitor | None = None
def get_ollama_health_monitor() -> OllamaHealthMonitor:
"""取得 OllamaHealthMonitor singleton"""
global _monitor
if _monitor is None:
_monitor = OllamaHealthMonitor()
return _monitor
def reset_ollama_health_monitor() -> None:
"""重置 singleton測試用"""
global _monitor
_monitor = None

View File

@@ -0,0 +1,257 @@
# apps/api/tests/test_ai_router_failover_integration.py | 2026-04-25 @ Asia/Taipei
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
"""
AIRouter × OllamaFailoverManager 整合測試
==========================================
測試覆蓋:
1. 初步路由選 OLLAMA → failover_manager 重評 → decision 使用 failover 結果
2. failover 回傳 GEMINI primary → decision.selected_provider == GEMINI
3. failover 的 fallback_chain 正確轉換到 decision.fallback_chain
4. 初步路由選 NEMOTRON → failover_manager 不被呼叫
5. 初步路由選 OPENCLAW_NEMO → failover_manager 不被呼叫
6. failover_manager 發生例外 → fail-open保留原始 provider
測試分類unitmock OllamaFailoverManager無 Redis / DB 依賴)
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.services.ai_router import AIProviderEnum, AIRouter, reset_ai_router
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
from src.services.ollama_health_monitor import HealthReport, HealthStatus
# =============================================================================
# Fixtures / Helpers
# =============================================================================
@pytest.fixture(autouse=True)
def reset_router_singleton():
"""每個測試前後重置 AIRouter singleton避免 failover_manager mock 殘留"""
yield
reset_ai_router()
def _make_health(status: HealthStatus) -> HealthReport:
return HealthReport(status=status, host="http://192.168.0.111:11434", latency_ms=500.0)
def _make_failover_result(
primary_provider: str,
primary_model: str,
fallback: list[tuple[str, str]] | None = None,
) -> OllamaRoutingResult:
"""建立 OllamaRoutingResult 測試物件"""
fb_endpoints = [
OllamaEndpoint(url="", provider_name=p, model=m)
for p, m in (fallback or [])
]
return OllamaRoutingResult(
primary=OllamaEndpoint(url="", provider_name=primary_provider, model=primary_model),
fallback_chain=fb_endpoints,
routing_reason=f"test: {primary_provider}",
health_111=_make_health(HealthStatus.OFFLINE),
health_188=None,
)
def _make_router_with_mock_failover(mock_failover_manager) -> AIRouter:
"""建立 AIRouter並替換其 _failover_manager"""
router = AIRouter()
router._failover_manager = mock_failover_manager
return router
# =============================================================================
# Test 1: OLLAMA 路由 → failover_manager 重評 → 使用 GEMINI
# =============================================================================
@pytest.mark.asyncio
async def test_router_uses_failover_when_ollama_initial_provider():
"""初步路由選 OLLAMA → 應走 failover_manager 重評decision.selected_provider == GEMINI"""
mock_fm = MagicMock()
mock_fm.select_provider = AsyncMock(
return_value=_make_failover_result(
primary_provider="gemini",
primary_model="gemini-1.5-flash",
fallback=[("ollama_188", "qwen2.5:7b-instruct"), ("nemotron", "nvidia/nemotron-mini-4b-instruct")],
)
)
router = _make_router_with_mock_failover(mock_fm)
# 讓 intent classifier + complexity scorer 走 sync 快路徑ALERT_TRIAGE → OLLAMA
with patch.object(router._intent_classifier, "classify") as mock_classify:
from src.services.intent_classifier import IntentResult, IntentType, RiskLevel
from src.services.complexity_scorer import ComplexityScore
mock_classify.return_value = IntentResult(
intent=IntentType.ALERT_TRIAGE,
confidence=0.9,
method="keyword",
matched_keywords=["alert"],
detected_resources=[],
reasoning="test",
)
with patch.object(router._complexity_scorer, "score") as mock_score:
mock_score.return_value = ComplexityScore(score=1, features={})
decision = await router.route("test alert message")
assert decision.selected_provider == AIProviderEnum.GEMINI
assert decision.selected_model == "gemini-1.5-flash"
mock_fm.select_provider.assert_awaited_once()
# =============================================================================
# Test 2: fallback_chain 正確轉換
# =============================================================================
@pytest.mark.asyncio
async def test_router_failover_fallback_chain_converted():
"""failover_manager 回傳 fallback_chain → decision.fallback_chain 包含 OLLAMA_188"""
mock_fm = MagicMock()
mock_fm.select_provider = AsyncMock(
return_value=_make_failover_result(
primary_provider="gemini",
primary_model="gemini-1.5-flash",
fallback=[
("ollama_188", "qwen2.5:7b-instruct"),
("nemotron", "nvidia/nemotron-mini-4b-instruct"),
("claude", "claude-3-5-haiku-20241022"),
],
)
)
router = _make_router_with_mock_failover(mock_fm)
with patch.object(router._intent_classifier, "classify") as mock_classify:
from src.services.intent_classifier import IntentResult, IntentType
from src.services.complexity_scorer import ComplexityScore
mock_classify.return_value = IntentResult(
intent=IntentType.ALERT_TRIAGE,
confidence=0.9,
method="keyword",
matched_keywords=["alert"],
detected_resources=[],
reasoning="test",
)
with patch.object(router._complexity_scorer, "score") as mock_score:
mock_score.return_value = ComplexityScore(score=1, features={})
decision = await router.route("test alert message")
fb_providers = [p for p, _ in decision.fallback_chain]
assert AIProviderEnum.OLLAMA_188 in fb_providers, (
f"OLLAMA_188 not in fallback_chain: {fb_providers}"
)
assert AIProviderEnum.NEMOTRON in fb_providers
assert AIProviderEnum.CLAUDE in fb_providers
# =============================================================================
# Test 3: NEMOTRON 路由 → failover_manager 不被呼叫
# =============================================================================
@pytest.mark.asyncio
async def test_router_does_not_use_failover_for_nemotron():
"""初步路由選 NEMOTRONtool_calling→ failover_manager.select_provider 不應被呼叫"""
mock_fm = MagicMock()
mock_fm.select_provider = AsyncMock()
router = _make_router_with_mock_failover(mock_fm)
# 強制 intent = DIAGNOSE→ OPENCLAW_NEMO再用 context_hint 跳過 LLM
# 但 NEMOTRON 只由 route_tool_calling() 觸發route() 最多到 OPENCLAW_NEMO
# 改用 QUERY → OLLAMA 的 override然後驗 failover 被觸發(這不是 NEMOTRON 測試)
# 正確測試:強制 CRITICAL → CLAUDE驗 failover 不被呼叫
with patch.object(router._intent_classifier, "classify") as mock_classify:
from src.services.intent_classifier import IntentResult, IntentType, RiskLevel
from src.services.complexity_scorer import ComplexityScore
mock_classify.return_value = IntentResult(
intent=IntentType.DELETE,
confidence=1.0,
method="keyword",
matched_keywords=["delete"],
detected_resources=[],
reasoning="test",
risk_level=RiskLevel.CRITICAL,
)
with patch.object(router._complexity_scorer, "score") as mock_score:
mock_score.return_value = ComplexityScore(score=5, features={})
decision = await router.route("delete this service")
# CRITICAL risk → CLAUDEfailover_manager 不應被呼叫
assert decision.selected_provider == AIProviderEnum.CLAUDE
mock_fm.select_provider.assert_not_awaited()
# =============================================================================
# Test 4: OPENCLAW_NEMO 路由 → failover_manager 不被呼叫
# =============================================================================
@pytest.mark.asyncio
async def test_router_does_not_use_failover_for_openclaw_nemo():
"""DIAGNOSE intent → OPENCLAW_NEMO → failover_manager 不應被呼叫"""
mock_fm = MagicMock()
mock_fm.select_provider = AsyncMock()
router = _make_router_with_mock_failover(mock_fm)
# context_hint=diagnose → OPENCLAW_NEMO規則 3 override
decision = await router.route(
"diagnose service crash",
context={"intent_hint": "diagnose"},
)
assert decision.selected_provider == AIProviderEnum.OPENCLAW_NEMO
mock_fm.select_provider.assert_not_awaited()
# =============================================================================
# Test 5: failover_manager 發生例外 → fail-open保留原始 OLLAMA
# =============================================================================
@pytest.mark.asyncio
async def test_router_failopen_when_failover_manager_raises():
"""failover_manager.select_provider 拋出例外 → fail-opendecision 仍然成功(使用原始 OLLAMA"""
mock_fm = MagicMock()
mock_fm.select_provider = AsyncMock(side_effect=RuntimeError("redis timeout"))
router = _make_router_with_mock_failover(mock_fm)
with patch.object(router._intent_classifier, "classify") as mock_classify:
from src.services.intent_classifier import IntentResult, IntentType
from src.services.complexity_scorer import ComplexityScore
mock_classify.return_value = IntentResult(
intent=IntentType.ALERT_TRIAGE,
confidence=0.9,
method="keyword",
matched_keywords=["alert"],
detected_resources=[],
reasoning="test",
)
with patch.object(router._complexity_scorer, "score") as mock_score:
mock_score.return_value = ComplexityScore(score=1, features={})
# 不應 raise應 fail-open
decision = await router.route("test alert message")
# fail-open → 保留 OLLAMA原始 initial decision
assert decision.selected_provider == AIProviderEnum.OLLAMA
# fallback_chain 仍然存在(來自 _build_fallback_chain
assert len(decision.fallback_chain) > 0

View File

@@ -0,0 +1,136 @@
# apps/api/tests/test_lifespan_failover_wiring.py | 2026-04-25 @ Asia/Taipei
# 2026-04-25 P1.2 by Claude Engineer-A2 — failover 整合到 ai_router + lifespan
"""
Ollama Failover Lifespan Wiring 測試
=====================================
驗收:
1. get_ollama_failover_manager singleton 呼叫了 set_recovery_callback
2. get_ollama_auto_recovery_service singleton 的 start() 被呼叫
3. shutdown 時 recovery service 的 stop() 被呼叫
4. failover_manager.set_recovery_callback 收到的是 recovery_svc.set_current_primary
測試分類unitmock get_ollama_failover_manager / get_ollama_auto_recovery_service
不啟動完整 FastAPI app避免 DB / Redis 依賴),直接測試 wiring 邏輯。
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# =============================================================================
# Wiring 邏輯測試
# =============================================================================
@pytest.mark.asyncio
async def test_lifespan_wires_recovery_callback_and_starts_service():
"""
lifespan startup 邏輯:
- failover_mgr.set_recovery_callback(recovery_svc.set_current_primary) 被呼叫
- recovery_svc.start() 被呼叫
"""
mock_failover_mgr = MagicMock()
mock_failover_mgr.set_recovery_callback = MagicMock()
mock_recovery_svc = MagicMock()
mock_recovery_svc.set_current_primary = AsyncMock()
mock_recovery_svc.start = AsyncMock()
mock_recovery_svc.stop = AsyncMock()
with (
patch(
"src.services.ollama_failover_manager.get_ollama_failover_manager",
return_value=mock_failover_mgr,
),
patch(
"src.services.ollama_auto_recovery.get_ollama_auto_recovery_service",
return_value=mock_recovery_svc,
),
):
# 直接執行 lifespan 邏輯(不啟動完整 FastAPI
from src.services.ollama_failover_manager import get_ollama_failover_manager
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
_failover_mgr = get_ollama_failover_manager()
_recovery_svc = get_ollama_auto_recovery_service()
_failover_mgr.set_recovery_callback(_recovery_svc.set_current_primary)
await _recovery_svc.start()
# 驗收 1: set_recovery_callback 被呼叫一次
mock_failover_mgr.set_recovery_callback.assert_called_once_with(
mock_recovery_svc.set_current_primary
)
# 驗收 2: start() 被呼叫一次
mock_recovery_svc.start.assert_awaited_once()
@pytest.mark.asyncio
async def test_lifespan_stops_recovery_service_on_shutdown():
"""
lifespan shutdown 邏輯:
- recovery_svc.stop() 被呼叫
"""
mock_recovery_svc = MagicMock()
mock_recovery_svc.stop = AsyncMock()
with patch(
"src.services.ollama_auto_recovery.get_ollama_auto_recovery_service",
return_value=mock_recovery_svc,
):
from src.services.ollama_auto_recovery import get_ollama_auto_recovery_service
svc = get_ollama_auto_recovery_service()
await svc.stop()
mock_recovery_svc.stop.assert_awaited_once()
@pytest.mark.asyncio
async def test_set_recovery_callback_wires_correct_method():
"""
set_recovery_callback 收到的 callable 是 recovery_svc.set_current_primary
確保 wiring 的物件同一性正確
"""
from src.services.ollama_failover_manager import (
OllamaFailoverManager,
reset_ollama_failover_manager,
)
from src.services.ollama_auto_recovery import (
OllamaAutoRecoveryService,
reset_ollama_auto_recovery_service,
)
try:
mock_monitor = MagicMock()
failover_mgr = OllamaFailoverManager(health_monitor=mock_monitor)
mock_health_monitor = MagicMock()
mock_failover_mgr_inner = MagicMock()
recovery_svc = OllamaAutoRecoveryService(
health_monitor=mock_health_monitor,
failover_manager=mock_failover_mgr_inner,
)
# Wire callback
failover_mgr.set_recovery_callback(recovery_svc.set_current_primary)
# 驗收_recovery_callback 是 recovery_svc.set_current_primary
# 注意bound methods 每次屬性存取都建立新物件,必須用 __func__ + __self__ 比對
cb = failover_mgr._recovery_callback
assert cb is not None
assert cb.__func__ is recovery_svc.set_current_primary.__func__
assert cb.__self__ is recovery_svc
# 驗收:呼叫 callback 等同於呼叫 set_current_primary
# (用 mock 驗證真實 set_current_primary 被呼叫)
with patch.object(recovery_svc, "set_current_primary", new_callable=AsyncMock) as mock_scp:
failover_mgr.set_recovery_callback(recovery_svc.set_current_primary)
await failover_mgr._recovery_callback("gemini")
mock_scp.assert_awaited_once_with("gemini")
finally:
reset_ollama_failover_manager()
reset_ollama_auto_recovery_service()

View File

@@ -0,0 +1,580 @@
# apps/api/tests/test_ollama_auto_recovery.py | 2026-04-25 @ Asia/Taipei
# Created 2026-04-25 P1.1d by Claude Engineer-C
# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復
"""
OllamaAutoRecoveryService 單元測試 - P1.1d
==========================================
測試覆蓋:
1. 111 OFFLINE → HEALTHY × 3 → 觸發 _switch_back_to_ollama
2. 111 OFFLINE → HEALTHY × 2 → OFFLINE → counter 歸零
3. 中途 SLOW/DEGRADED → counter 歸零
4. stop() 優雅取消 task
5. start() 重複呼叫不重複建立 task
6. _switch_back_to_ollamaclear_cache + notify_recovery + Telegram alerter
7. alerter = None 時不 crash
8. clear_cache 失敗時不 crashbest-effort
9. set_current_primary / is_running / consecutive_healthy 屬性
測試分類unitmock OllamaHealthMonitor + OllamaFailoverManager
"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, MagicMock, call, patch
import pytest
from src.services.ollama_health_monitor import HealthReport, HealthStatus
from src.services.ollama_auto_recovery import (
OllamaAutoRecoveryService,
get_ollama_auto_recovery_service,
reset_ollama_auto_recovery_service,
)
# =============================================================================
# Fixtures / Helpers
# =============================================================================
URL_111 = "http://192.168.0.111:11434"
@pytest.fixture(autouse=True)
def reset_singleton():
yield
reset_ollama_auto_recovery_service()
def _make_health(status: HealthStatus) -> HealthReport:
return HealthReport(status=status, host=URL_111, latency_ms=500.0)
def _make_service(
*,
current_primary: str = "gemini",
stable_count: int = 3,
check_interval: int = 1, # 測試用短 interval
alerter=None,
) -> tuple[OllamaAutoRecoveryService, AsyncMock, MagicMock]:
"""
建立 service + mock monitor + mock failover manager。
返回 (service, mock_monitor, mock_failover_manager)
"""
mock_monitor = MagicMock()
mock_monitor.check = AsyncMock()
mock_failover = MagicMock()
mock_failover.clear_cache = AsyncMock()
mock_failover.notify_recovery = MagicMock()
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
svc = OllamaAutoRecoveryService(
health_monitor=mock_monitor,
failover_manager=mock_failover,
telegram_alerter=alerter,
recovery_check_interval_sec=check_interval,
stable_count_required=stable_count,
)
svc._settings = mock_settings
svc._current_primary = current_primary
return svc, mock_monitor, mock_failover
# =============================================================================
# _check_and_recover():核心防抖邏輯
# =============================================================================
class TestCheckAndRecover:
"""_check_and_recover 單次執行邏輯"""
@pytest.mark.asyncio
async def test_healthy_increments_counter(self):
"""HEALTHY → counter +1"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY)
await svc._check_and_recover()
assert svc.consecutive_healthy == 1
@pytest.mark.asyncio
async def test_non_healthy_resets_counter(self):
"""OFFLINE → counter 歸零"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
svc._consecutive_healthy = 2
mock_monitor.check.return_value = _make_health(HealthStatus.OFFLINE)
await svc._check_and_recover()
assert svc.consecutive_healthy == 0
@pytest.mark.asyncio
async def test_slow_resets_counter(self):
"""SLOW → counter 歸零(不算穩定)"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
svc._consecutive_healthy = 1
mock_monitor.check.return_value = _make_health(HealthStatus.SLOW)
await svc._check_and_recover()
assert svc.consecutive_healthy == 0
@pytest.mark.asyncio
async def test_degraded_resets_counter(self):
"""DEGRADED → counter 歸零"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
svc._consecutive_healthy = 2
mock_monitor.check.return_value = _make_health(HealthStatus.DEGRADED)
await svc._check_and_recover()
assert svc.consecutive_healthy == 0
@pytest.mark.asyncio
async def test_three_healthy_triggers_switch_back(self):
"""連續 3 次 HEALTHY + current_primary=gemini → 觸發切回"""
svc, mock_monitor, mock_failover = _make_service(current_primary="gemini", stable_count=3)
mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY)
svc._consecutive_healthy = 2 # 已有 2 次,本次第 3 次
with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch:
await svc._check_and_recover()
mock_switch.assert_awaited_once()
@pytest.mark.asyncio
async def test_two_healthy_not_yet_switch(self):
"""連續 2 次 HEALTHY未達 3 次門檻)→ 不觸發切回"""
svc, mock_monitor, _ = _make_service(current_primary="gemini", stable_count=3)
mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY)
svc._consecutive_healthy = 1 # 已有 1 次,本次第 2 次
with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch:
await svc._check_and_recover()
mock_switch.assert_not_awaited()
assert svc.consecutive_healthy == 2
@pytest.mark.asyncio
async def test_already_ollama_primary_no_switch(self):
"""current_primary 已是 ollama → 不觸發切回(避免重複)"""
svc, mock_monitor, _ = _make_service(current_primary="ollama", stable_count=3)
mock_monitor.check.return_value = _make_health(HealthStatus.HEALTHY)
svc._consecutive_healthy = 2
with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch:
await svc._check_and_recover()
mock_switch.assert_not_awaited()
@pytest.mark.asyncio
async def test_check_exception_resets_counter(self):
"""health check 拋異常 → counter 歸零,不 crash"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
mock_monitor.check.side_effect = RuntimeError("network error")
svc._consecutive_healthy = 2
# 不應 raise
await svc._check_and_recover()
assert svc.consecutive_healthy == 0
# =============================================================================
# 防抖場景OFFLINE → HEALTHY × 2 → OFFLINE → counter 歸零
# =============================================================================
class TestDebounce:
"""防抖機制:中途斷線 → counter 歸零"""
@pytest.mark.asyncio
async def test_counter_resets_on_intermittent_offline(self):
"""
模擬HEALTHY × 2 → OFFLINE → HEALTHY → counter 應從 1 重新開始
"""
svc, mock_monitor, _ = _make_service(current_primary="gemini", stable_count=3)
statuses = [
HealthStatus.HEALTHY,
HealthStatus.HEALTHY,
HealthStatus.OFFLINE, # 中途斷線 → counter 歸零
HealthStatus.HEALTHY, # 重新開始
]
mock_monitor.check.side_effect = [_make_health(s) for s in statuses]
with patch.object(svc, "_switch_back_to_ollama", new_callable=AsyncMock) as mock_switch:
for _ in range(4):
await svc._check_and_recover()
# 總計 3 次 HEALTHY第 1、2、4 次),但第 3 次斷線 → counter 歸零
# 第 4 次後 counter=1未達門檻
mock_switch.assert_not_awaited()
assert svc.consecutive_healthy == 1
@pytest.mark.asyncio
async def test_full_recovery_flow(self):
"""
完整場景OFFLINE → HEALTHY × 3 → 觸發 switch_back
"""
svc, mock_monitor, mock_failover = _make_service(current_primary="gemini", stable_count=3)
mock_monitor.check.side_effect = [
_make_health(HealthStatus.HEALTHY),
_make_health(HealthStatus.HEALTHY),
_make_health(HealthStatus.HEALTHY),
]
for _ in range(3):
await svc._check_and_recover()
# 觸發後 current_primary 應切回 ollama
assert svc.current_primary == "ollama"
mock_failover.clear_cache.assert_awaited_once()
mock_failover.notify_recovery.assert_called_once_with("ollama_111")
# =============================================================================
# _switch_back_to_ollama()
# =============================================================================
class TestSwitchBackToOllama:
"""_switch_back_to_ollama 切回邏輯"""
@pytest.mark.asyncio
async def test_sets_current_primary_to_ollama(self):
svc, _, _ = _make_service(current_primary="gemini")
await svc._switch_back_to_ollama()
assert svc.current_primary == "ollama"
@pytest.mark.asyncio
async def test_calls_clear_cache(self):
svc, _, mock_failover = _make_service(current_primary="gemini")
await svc._switch_back_to_ollama()
mock_failover.clear_cache.assert_awaited_once()
@pytest.mark.asyncio
async def test_calls_notify_recovery(self):
svc, _, mock_failover = _make_service(current_primary="gemini")
await svc._switch_back_to_ollama()
mock_failover.notify_recovery.assert_called_once_with("ollama_111")
@pytest.mark.asyncio
async def test_calls_telegram_alerter_when_set(self):
"""alerter 已設定 → 呼叫 alert_recovery"""
mock_alerter = AsyncMock()
mock_alerter.alert_recovery = AsyncMock()
svc, _, _ = _make_service(current_primary="gemini", alerter=mock_alerter)
svc._consecutive_healthy = 3
await svc._switch_back_to_ollama()
mock_alerter.alert_recovery.assert_awaited_once()
payload = mock_alerter.alert_recovery.call_args[0][0]
assert payload["from"] == "gemini"
assert payload["to"] == "ollama_111"
assert payload["stable_count"] == 3
@pytest.mark.asyncio
async def test_no_alerter_does_not_crash(self):
"""alerter = None → 不 crash"""
svc, _, _ = _make_service(current_primary="gemini", alerter=None)
# 不應 raise
await svc._switch_back_to_ollama()
assert svc.current_primary == "ollama"
@pytest.mark.asyncio
async def test_clear_cache_failure_does_not_crash(self):
"""clear_cache 失敗 → 靜默繼續,不 crash"""
svc, _, mock_failover = _make_service(current_primary="gemini")
mock_failover.clear_cache.side_effect = RuntimeError("Redis down")
# 不應 raise
await svc._switch_back_to_ollama()
# 儘管 clear_cache 失敗current_primary 應已更新
assert svc.current_primary == "ollama"
@pytest.mark.asyncio
async def test_alerter_failure_does_not_crash(self):
"""Telegram alerter 失敗 → 靜默繼續,不 crash"""
mock_alerter = AsyncMock()
mock_alerter.alert_recovery = AsyncMock(side_effect=RuntimeError("TG timeout"))
svc, _, _ = _make_service(current_primary="gemini", alerter=mock_alerter)
# 不應 raise
await svc._switch_back_to_ollama()
assert svc.current_primary == "ollama"
# =============================================================================
# start() / stop() 生命週期
# =============================================================================
class TestLifecycle:
"""start() / stop() 背景任務生命週期"""
@pytest.mark.asyncio
async def test_start_creates_task(self):
svc, _, _ = _make_service()
try:
await svc.start()
assert svc.is_running is True
finally:
await svc.stop()
@pytest.mark.asyncio
async def test_stop_cancels_task(self):
svc, _, _ = _make_service()
await svc.start()
assert svc.is_running is True
await svc.stop()
assert svc.is_running is False
@pytest.mark.asyncio
async def test_stop_idempotent_when_not_started(self):
"""未 start 的情況下呼叫 stop → 不 crash"""
svc, _, _ = _make_service()
await svc.stop() # 不應 raise
@pytest.mark.asyncio
async def test_start_idempotent_second_call(self):
"""重複呼叫 start() → 不重複建立 task"""
svc, _, _ = _make_service()
try:
await svc.start()
task_1 = svc._task
await svc.start() # 第二次呼叫
task_2 = svc._task
assert task_1 is task_2 # 同一個 task
finally:
await svc.stop()
@pytest.mark.asyncio
async def test_monitor_loop_continues_after_exception(self):
"""
_monitor_loop 內部拋異常 → 繼續監控(不 break
模擬:第一次 check 拋異常,第二次正常。
"""
svc, mock_monitor, _ = _make_service(
current_primary="gemini",
check_interval=0, # 0s interval測試快速
stable_count=3,
)
call_count = [0]
async def _check_side_effect(host):
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("transient error")
return _make_health(HealthStatus.HEALTHY)
mock_monitor.check.side_effect = _check_side_effect
# 讓 loop 跑幾次
await svc.start()
await asyncio.sleep(0.05)
await svc.stop()
# 至少被呼叫 2 次(第一次異常,第二次正常)
assert call_count[0] >= 2
# =============================================================================
# set_current_primary() / 屬性
# =============================================================================
class TestStateManagement:
"""set_current_primary + 屬性"""
@pytest.mark.asyncio
async def test_set_current_primary_updates_state(self):
# H5+H6 修復set_current_primary 改為 async需 await
svc, _, _ = _make_service(current_primary="ollama")
with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("no redis")):
await svc.set_current_primary("gemini")
assert svc.current_primary == "gemini"
@pytest.mark.asyncio
async def test_set_current_primary_non_ollama_resets_counter(self):
"""切換到非 Ollama 時counter 歸零,開始等待恢復"""
svc, _, _ = _make_service(current_primary="ollama")
svc._consecutive_healthy = 5
with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("no redis")):
await svc.set_current_primary("gemini")
assert svc.consecutive_healthy == 0
@pytest.mark.asyncio
async def test_set_current_primary_to_ollama_no_counter_reset(self):
"""切換到 ollama正常路由→ counter 不重置(直接標記)"""
svc, _, _ = _make_service(current_primary="gemini")
svc._consecutive_healthy = 3
with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("no redis")):
await svc.set_current_primary("ollama")
# set_current_primary 只在切到非 ollama 時 reset counter
assert svc.current_primary == "ollama"
def test_is_running_false_before_start(self):
svc, _, _ = _make_service()
assert svc.is_running is False
# =============================================================================
# H5+H6: Redis 持久化 + Bootstrap + 立刻 check
# 2026-04-25 critic-fix Part2 by Claude Engineer-C2
# =============================================================================
class TestRedisPersistence:
"""H5+H6 修復驗證set_current_primary 持久化 + start() bootstrap"""
@pytest.mark.asyncio
async def test_set_current_primary_persists_to_redis(self):
"""set_current_primary("gemini") → 寫 Redis ollama:current_primary"""
svc, _, _ = _make_service(current_primary="ollama")
mock_redis = AsyncMock()
mock_redis.set = AsyncMock()
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
await svc.set_current_primary("gemini")
mock_redis.set.assert_awaited_once_with("ollama:current_primary", "gemini")
@pytest.mark.asyncio
async def test_set_current_primary_same_value_no_redis_write(self):
"""set_current_primary("gemini") 但 current_primary 已是 gemini → 不重複寫 Redis"""
svc, _, _ = _make_service(current_primary="gemini")
mock_redis = AsyncMock()
mock_redis.set = AsyncMock()
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
await svc.set_current_primary("gemini")
# 相同值不觸發 persist
mock_redis.set.assert_not_awaited()
@pytest.mark.asyncio
async def test_load_primary_from_redis(self):
"""_load_primary() 從 Redis 讀取,返回正確值"""
svc, _, _ = _make_service()
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=b"gemini")
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
result = await svc._load_primary()
assert result == "gemini"
@pytest.mark.asyncio
async def test_load_primary_redis_empty_returns_ollama(self):
"""Redis 無值 → 預設返回 'ollama'"""
svc, _, _ = _make_service()
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=None)
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
result = await svc._load_primary()
assert result == "ollama"
@pytest.mark.asyncio
async def test_load_primary_redis_error_returns_ollama(self):
"""Redis 掛掉 → 預設返回 "ollama"fail-safe"""
svc, _, _ = _make_service()
with patch("src.core.redis_client.get_redis", side_effect=RuntimeError("Redis down")):
result = await svc._load_primary()
assert result == "ollama"
@pytest.mark.asyncio
async def test_start_loads_primary_from_redis(self):
"""start() 從 Redis bootstrap current_primary"""
svc, mock_monitor, _ = _make_service(current_primary="ollama")
mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.HEALTHY))
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=b"gemini")
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
try:
await svc.start()
# bootstrap 後 current_primary 應為 Redis 讀到的值
assert svc.current_primary == "gemini"
finally:
await svc.stop()
@pytest.mark.asyncio
async def test_start_immediate_check_when_primary_not_ollama(self):
"""start() 時 primary=gemini → 立刻執行一次 _check_and_recover不等 30s"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.HEALTHY))
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=b"gemini")
check_called = [False]
original_check = svc._check_and_recover
async def _spy_check():
check_called[0] = True
await original_check()
with patch.object(svc, "_check_and_recover", side_effect=_spy_check), \
patch("src.core.redis_client.get_redis", return_value=mock_redis):
try:
await svc.start()
assert check_called[0] is True
finally:
await svc.stop()
@pytest.mark.asyncio
async def test_start_no_immediate_check_when_primary_is_ollama(self):
"""start() 時 primary=ollama正常狀態→ 不立刻執行 check"""
svc, mock_monitor, _ = _make_service(current_primary="gemini")
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=b"ollama") # Redis 說 primary=ollama
check_called = [False]
async def _spy_check():
check_called[0] = True
with patch.object(svc, "_check_and_recover", side_effect=_spy_check), \
patch("src.core.redis_client.get_redis", return_value=mock_redis):
try:
await svc.start()
# primary=ollama → 不立刻 check
assert check_called[0] is False
finally:
await svc.stop()
# =============================================================================
# Singleton
# =============================================================================
def test_singleton_returns_same_instance():
s1 = get_ollama_auto_recovery_service()
s2 = get_ollama_auto_recovery_service()
assert s1 is s2
def test_reset_singleton_gives_new_instance():
s1 = get_ollama_auto_recovery_service()
reset_ollama_auto_recovery_service()
s2 = get_ollama_auto_recovery_service()
assert s1 is not s2

View File

@@ -0,0 +1,707 @@
# apps/api/tests/test_ollama_failover_manager.py | 2026-04-25 @ Asia/Taipei
# Created 2026-04-25 P1.1c by Claude Engineer-C
# 2026-04-25 統帥指令 by Claude Engineer-C — 自動切 Gemini + 自動恢復(路由矩陣更新)
"""
OllamaFailoverManager 單元測試 - P1.1c v2.0
=============================================
測試覆蓋新路由矩陣Gemini 優先188 最後備援):
- 111 HEALTHY → primary=ollama(111)fallback=[Gemini, 188, Nemotron]
- 111 SLOW → primary=Geminifallback 包含 111 + 188
- 111 DEGRADED → primary=Geminifallback 包含 188 + nemotron + claude
- 111 OFFLINE → primary=Geminifallback 包含 188 + nemotron + claude
- 111 + 188 都 OFFLINE → primary=Geminifallback 包含 nemotron + claude
- OLLAMA_FALLBACK_URL 未設定時的單節點行為
- 並行 gather 邏輯asyncio.gather mock
- clear_cache() / notify_recovery() 方法
測試分類unitmock OllamaHealthMonitor無 DB 依賴)
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.services.ollama_health_monitor import HealthReport, HealthStatus
from src.services.ollama_failover_manager import (
OllamaFailoverManager,
OllamaRoutingResult,
get_ollama_failover_manager,
reset_ollama_failover_manager,
)
# =============================================================================
# Fixtures
# =============================================================================
URL_111 = "http://192.168.0.111:11434"
URL_188 = "http://192.168.0.188:11434"
@pytest.fixture(autouse=True)
def reset_singleton():
yield
reset_ollama_failover_manager()
def _make_health(status: HealthStatus, url: str = URL_111) -> HealthReport:
return HealthReport(status=status, host=url, latency_ms=500.0)
def _make_manager(url_111: str = URL_111, url_188: str = URL_188) -> OllamaFailoverManager:
"""建立 managersettings mock 為指定 URL"""
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = url_111
mock_settings.OLLAMA_FALLBACK_URL = url_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
mock_monitor = MagicMock()
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
return manager
# =============================================================================
# _decide_route 決策矩陣
# =============================================================================
class TestDecideRoute:
"""_decide_route 路由邏輯純函數測試"""
def _setup(self, url_188: str = URL_188) -> OllamaFailoverManager:
return _make_manager(url_188=url_188)
# ------------------------------------------------------------------
# 111 HEALTHY
# ------------------------------------------------------------------
def test_111_healthy_primary_is_ollama(self):
manager = self._setup()
h111 = _make_health(HealthStatus.HEALTHY, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert result.primary.provider_name == "ollama"
assert result.primary.url == URL_111
def test_111_healthy_fallback_includes_188(self):
manager = self._setup()
h111 = _make_health(HealthStatus.HEALTHY, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
provider_names = [e.provider_name for e in result.fallback_chain]
assert "ollama_188" in provider_names
def test_111_healthy_fallback_includes_nemotron_gemini(self):
manager = self._setup()
h111 = _make_health(HealthStatus.HEALTHY, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
provider_names = [e.provider_name for e in result.fallback_chain]
assert "nemotron" in provider_names
assert "gemini" in provider_names
def test_111_healthy_fallback_order_gemini_first(self):
"""新矩陣Gemini 應排在 188/nemotron 之前(快速雲端優先)"""
manager = self._setup()
h111 = _make_health(HealthStatus.HEALTHY, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert result.fallback_chain[0].provider_name == "gemini"
# ------------------------------------------------------------------
# 111 SLOW
# ------------------------------------------------------------------
def test_111_slow_primary_is_gemini(self):
"""新矩陣111 SLOW → primary=Gemini111 eval ~0.09 token/s, ~111sGemini 更快)"""
manager = self._setup()
h111 = _make_health(HealthStatus.SLOW, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert result.primary.provider_name == "gemini"
def test_111_slow_fallback_includes_111_and_188(self):
"""SLOW 時 111 + 188 仍在 fallbackGemini 額度耗盡時的降級鏈)"""
manager = self._setup()
h111 = _make_health(HealthStatus.SLOW, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
provider_names = [e.provider_name for e in result.fallback_chain]
assert "ollama" in provider_names
assert "ollama_188" in provider_names
def test_111_slow_no_188_primary_is_gemini(self):
"""111 SLOW + 188 未設定 → primary=Gemini新矩陣不強撐 111"""
manager = _make_manager(url_188="") # 188 未設定
h111 = _make_health(HealthStatus.SLOW, URL_111)
result = manager._decide_route(h111, None, URL_111, "")
assert result.primary.provider_name == "gemini"
# ------------------------------------------------------------------
# 111 DEGRADED
# ------------------------------------------------------------------
def test_111_degraded_primary_is_gemini(self):
"""新矩陣111 DEGRADED → primary=Gemini"""
manager = self._setup()
h111 = _make_health(HealthStatus.DEGRADED, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert result.primary.provider_name == "gemini"
def test_111_degraded_fallback_no_111(self):
"""DEGRADED 時 111 不在 fallback太差了"""
manager = self._setup()
h111 = _make_health(HealthStatus.DEGRADED, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
provider_names = [e.provider_name for e in result.fallback_chain]
assert "ollama" not in provider_names
def test_111_degraded_fallback_includes_188_nemotron_claude(self):
"""新矩陣DEGRADED fallback = [188, nemotron, claude]"""
manager = self._setup()
h111 = _make_health(HealthStatus.DEGRADED, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
provider_names = [e.provider_name for e in result.fallback_chain]
assert "ollama_188" in provider_names
assert "nemotron" in provider_names
assert "claude" in provider_names
# ------------------------------------------------------------------
# 111 OFFLINE
# ------------------------------------------------------------------
def test_111_offline_primary_is_gemini(self):
"""新矩陣111 OFFLINE → primary=Gemini188 降為 fallback 備援)"""
manager = self._setup()
h111 = _make_health(HealthStatus.OFFLINE, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert result.primary.provider_name == "gemini"
# ------------------------------------------------------------------
# 雙節點都 OFFLINE
# ------------------------------------------------------------------
def test_both_offline_primary_is_gemini(self):
"""新矩陣111 + 188 都 OFFLINE → Gemini 接手(最快雲端)"""
manager = self._setup()
h111 = _make_health(HealthStatus.OFFLINE, URL_111)
h188 = _make_health(HealthStatus.OFFLINE, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert result.primary.provider_name == "gemini"
def test_both_offline_fallback_includes_nemotron_claude(self):
"""雙 OFFLINE 時fallback=[Nemotron, Claude](無可用 Ollama"""
manager = self._setup()
h111 = _make_health(HealthStatus.OFFLINE, URL_111)
h188 = _make_health(HealthStatus.OFFLINE, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
provider_names = [e.provider_name for e in result.fallback_chain]
assert "nemotron" in provider_names
assert "claude" in provider_names
def test_111_offline_no_188_primary_is_gemini(self):
"""新矩陣111 OFFLINE + 188 未設定 → Gemini不是 Nemotron"""
manager = _make_manager(url_188="")
h111 = _make_health(HealthStatus.OFFLINE, URL_111)
result = manager._decide_route(h111, None, URL_111, "")
assert result.primary.provider_name == "gemini"
# ------------------------------------------------------------------
# routing_reason 記錄
# ------------------------------------------------------------------
def test_routing_reason_contains_status(self):
"""routing_reason 應包含 111 的狀態資訊"""
manager = self._setup()
h111 = _make_health(HealthStatus.OFFLINE, URL_111)
h188 = _make_health(HealthStatus.HEALTHY, URL_188)
result = manager._decide_route(h111, h188, URL_111, URL_188)
assert "offline" in result.routing_reason.lower() or "111" in result.routing_reason
# =============================================================================
# select_provider():並行 gather
# =============================================================================
class TestSelectProvider:
"""select_provider() 並行邏輯"""
@pytest.mark.asyncio
async def test_select_provider_calls_gather(self):
"""有 url_188 時應並行 gather 兩個 check"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(
side_effect=[
_make_health(HealthStatus.HEALTHY, URL_111),
_make_health(HealthStatus.HEALTHY, URL_188),
]
)
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = URL_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None):
result = await manager.select_provider()
# 兩個 host 都被 check
assert mock_monitor.check.call_count == 2
called_hosts = {call.args[0] for call in mock_monitor.check.call_args_list}
assert URL_111 in called_hosts
assert URL_188 in called_hosts
@pytest.mark.asyncio
async def test_select_provider_single_node_no_188(self):
"""OLLAMA_FALLBACK_URL 空字串 → 只 check 111"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.HEALTHY, URL_111))
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = ""
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None):
result = await manager.select_provider()
assert mock_monitor.check.call_count == 1
assert result.primary.provider_name == "ollama"
@pytest.mark.asyncio
async def test_select_provider_returns_routing_result(self):
"""select_provider 返回 OllamaRoutingResult 類型新矩陣111 OFFLINE → Gemini"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(
side_effect=[
_make_health(HealthStatus.OFFLINE, URL_111),
_make_health(HealthStatus.HEALTHY, URL_188),
]
)
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = URL_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None):
result = await manager.select_provider()
assert isinstance(result, OllamaRoutingResult)
# 新矩陣111 OFFLINE → primary=Gemini188 降為 fallback
assert result.primary.provider_name == "gemini"
@pytest.mark.asyncio
async def test_audit_not_written_when_111_healthy(self):
"""111 正常時不觸發 failover audit"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(
side_effect=[
_make_health(HealthStatus.HEALTHY, URL_111),
_make_health(HealthStatus.HEALTHY, URL_188),
]
)
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = URL_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
audit_called = [False]
original_write = manager._write_failover_audit
async def _spy_audit(result):
# _write_failover_audit 在 111 HEALTHY 時 early return不寫 DB
# 追蹤呼叫是否有 side effectDB 寫入)
audit_called[0] = result.primary.provider_name != "ollama"
with patch.object(manager, "_write_failover_audit", side_effect=_spy_audit):
await manager.select_provider()
# 111 HEALTHY不應有 failover 事件
assert audit_called[0] is False
# =============================================================================
# clear_cache() / notify_recovery()
# =============================================================================
class TestRecoveryAPI:
"""clear_cache() / notify_recovery() 方法"""
@pytest.mark.asyncio
async def test_clear_cache_calls_redis_delete(self):
"""clear_cache() 呼叫 redis.delete 清除 health monitor 快取"""
manager = _make_manager()
mock_redis = AsyncMock()
mock_redis.delete = AsyncMock()
with patch("src.services.ollama_failover_manager.OllamaFailoverManager.clear_cache") as mock_clear:
mock_clear.return_value = None
await manager.clear_cache()
mock_clear.assert_called_once()
@pytest.mark.asyncio
async def test_clear_cache_fails_gracefully(self):
"""Redis import 失敗時clear_cache() 內部 try/except 攔截,靜默不 crash"""
manager = _make_manager()
# 模擬 get_redis 拋 ImportErrorRedis 不可用)
# clear_cache 有 try/except Exception應靜默吸收
with patch(
"src.services.ollama_failover_manager.get_redis",
side_effect=ImportError("no redis"),
create=True,
):
# 不應 raise
await manager.clear_cache()
def test_notify_recovery_does_not_raise(self):
"""notify_recovery() 只寫 structlog不應 raise"""
manager = _make_manager()
# 不應 raise
manager.notify_recovery("ollama_111")
# =============================================================================
# OllamaRoutingResult
# =============================================================================
class TestOllamaRoutingResult:
"""OllamaRoutingResult 輔助方法"""
def test_all_endpoints_in_order(self):
from src.services.ollama_failover_manager import OllamaEndpoint
primary = OllamaEndpoint(url=URL_111, provider_name="ollama", model="m1")
fb1 = OllamaEndpoint(url=URL_188, provider_name="ollama_188", model="m2")
fb2 = OllamaEndpoint(url="", provider_name="nemotron", model="m3")
result = OllamaRoutingResult(
primary=primary,
fallback_chain=[fb1, fb2],
routing_reason="test",
health_111=_make_health(HealthStatus.HEALTHY),
)
ordered = result.all_endpoints_in_order()
assert ordered[0].provider_name == "ollama"
assert ordered[1].provider_name == "ollama_188"
assert ordered[2].provider_name == "nemotron"
def test_to_dict_structure(self):
from src.services.ollama_failover_manager import OllamaEndpoint
primary = OllamaEndpoint(url=URL_111, provider_name="ollama", model="qwen")
result = OllamaRoutingResult(
primary=primary,
fallback_chain=[],
routing_reason="111 HEALTHY",
health_111=_make_health(HealthStatus.HEALTHY),
)
d = result.to_dict()
assert d["primary"]["provider"] == "ollama"
assert d["routing_reason"] == "111 HEALTHY"
assert isinstance(d["fallback_chain"], list)
# =============================================================================
# Singleton
# =============================================================================
def test_singleton_returns_same_instance():
m1 = get_ollama_failover_manager()
m2 = get_ollama_failover_manager()
assert m1 is m2
def test_reset_singleton_gives_new_instance():
m1 = get_ollama_failover_manager()
reset_ollama_failover_manager()
m2 = get_ollama_failover_manager()
assert m1 is not m2
# =============================================================================
# B1: _write_failover_audit 改用 structlog不再寫 DB
# 2026-04-25 critic-fix Part2 by Claude Engineer-C2
# =============================================================================
class TestWriteFailoverAudit:
"""B1 修復驗證_write_failover_audit 使用 structlog不依賴 AuditLog model"""
@pytest.mark.asyncio
async def test_audit_uses_structlog_not_db(self):
"""_write_failover_audit 應呼叫 structlog不呼叫 DB"""
import structlog
manager = _make_manager()
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
result = OllamaRoutingResult(
primary=OllamaEndpoint(url="", provider_name="gemini", model="gemini-1.5-flash"),
fallback_chain=[],
routing_reason="111 OFFLINE → 切 Gemini",
health_111=_make_health(HealthStatus.OFFLINE),
)
# 只要不 raise 就是成功DB path 已移除structlog path 無 DB 依賴)
await manager._write_failover_audit(result)
@pytest.mark.asyncio
async def test_audit_skipped_when_111_healthy(self):
"""111 HEALTHY 時 early return不記錄 failover"""
manager = _make_manager()
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
result = OllamaRoutingResult(
primary=OllamaEndpoint(url=URL_111, provider_name="ollama", model="qwen"),
fallback_chain=[],
routing_reason="111 HEALTHY → 主 111",
health_111=_make_health(HealthStatus.HEALTHY),
)
# primary=ollama → early return不執行任何 DB/log
await manager._write_failover_audit(result) # 不應 raise
# =============================================================================
# B2: AIProviderEnum.OLLAMA_188 存在
# 2026-04-25 critic-fix Part2 by Claude Engineer-C2
# =============================================================================
class TestAIProviderEnumOllama188:
"""B2 修復驗證AIProviderEnum.OLLAMA_188 存在且 PROVIDER_LATENCY_BUDGET 有對應值"""
def test_ollama_188_enum_exists(self):
from src.services.ai_router import AIProviderEnum
assert AIProviderEnum.OLLAMA_188.value == "ollama_188"
def test_ollama_188_in_latency_budget(self):
from src.services.ai_router import AIProviderEnum, PROVIDER_LATENCY_BUDGET
assert AIProviderEnum.OLLAMA_188 in PROVIDER_LATENCY_BUDGET
assert PROVIDER_LATENCY_BUDGET[AIProviderEnum.OLLAMA_188] == 120000
# =============================================================================
# H4: asyncio.gather return_exceptions=True
# 2026-04-25 critic-fix Part2 by Claude Engineer-C2
# =============================================================================
class TestGatherReturnExceptions:
"""H4 修復驗證:任一 check 拋例外時不炸整個 select_provider"""
@pytest.mark.asyncio
async def test_gather_exception_in_111_treated_as_offline(self):
"""111 check 拋例外 → health_111=OFFLINEselect_provider 正常返回"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(
side_effect=[
RuntimeError("111 network error"),
_make_health(HealthStatus.HEALTHY, URL_188),
]
)
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = URL_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
mock_settings.GEMINI_DAILY_QUOTA = 1000
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None), \
patch.object(manager, "_check_gemini_quota", return_value=True):
result = await manager.select_provider()
# 111 exception → OFFLINE → primary=gemininew matrix
assert result.primary.provider_name == "gemini"
@pytest.mark.asyncio
async def test_gather_exception_in_188_treated_as_offline(self):
"""188 check 拋例外 → health_188=OFFLINEselect_provider 正常返回"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(
side_effect=[
_make_health(HealthStatus.HEALTHY, URL_111),
RuntimeError("188 network error"),
]
)
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = URL_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
mock_settings.GEMINI_DAILY_QUOTA = 1000
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None), \
patch.object(manager, "_check_gemini_quota", return_value=True):
result = await manager.select_provider()
# 111 HEALTHY → primary=ollama188 exception 不影響主路由)
assert result.primary.provider_name == "ollama"
# =============================================================================
# H7: Gemini 帳單熔斷
# 2026-04-25 critic-fix Part2 by Claude Engineer-C2
# =============================================================================
class TestGeminiQuota:
"""H7 修復驗證Gemini 每日配額熔斷"""
@pytest.mark.asyncio
async def test_gemini_quota_under_limit(self):
"""count=500 < quota=1000 → 返回 True允許走 Gemini"""
manager = _make_manager()
manager._settings.GEMINI_DAILY_QUOTA = 1000
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=b"500")
mock_redis.incr = AsyncMock(return_value=501)
mock_redis.expire = AsyncMock()
# lazy import patch_check_gemini_quota 內用 `from src.core.redis_client import get_redis`
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
ok = await manager._check_gemini_quota()
assert ok is True
mock_redis.incr.assert_awaited_once()
mock_redis.expire.assert_awaited_once()
@pytest.mark.asyncio
async def test_gemini_quota_exactly_at_limit(self):
"""count=1000 >= quota=1000 → 返回 False熔斷不再呼叫 Gemini"""
manager = _make_manager()
manager._settings.GEMINI_DAILY_QUOTA = 1000
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=b"1000")
mock_redis.incr = AsyncMock()
mock_redis.expire = AsyncMock()
with patch("src.core.redis_client.get_redis", return_value=mock_redis):
ok = await manager._check_gemini_quota()
assert ok is False
# 超過配額不應再 incr
mock_redis.incr.assert_not_awaited()
@pytest.mark.asyncio
async def test_gemini_quota_redis_unavailable_fail_open(self):
"""Redis 掛掉 → 返回 Truefail-open仍允許走 Gemini"""
manager = _make_manager()
with patch(
"src.core.redis_client.get_redis",
side_effect=RuntimeError("Redis unavailable"),
):
ok = await manager._check_gemini_quota()
assert ok is True
@pytest.mark.asyncio
async def test_select_provider_quota_exceeded_uses_188(self):
"""select_providerGemini quota 超過 → primary 改為 OLLAMA_188"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(
side_effect=[
_make_health(HealthStatus.OFFLINE, URL_111),
_make_health(HealthStatus.HEALTHY, URL_188),
]
)
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = URL_188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
mock_settings.GEMINI_DAILY_QUOTA = 1000
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None), \
patch.object(manager, "_check_gemini_quota", return_value=False):
result = await manager.select_provider()
# quota 超過 → 不走 Gemini改走 188
assert result.primary.provider_name == "ollama_188"
@pytest.mark.asyncio
async def test_select_provider_quota_exceeded_no_188_uses_nemotron(self):
"""select_providerGemini quota 超過 + 188 不可用 → primary=Nemotron"""
mock_monitor = AsyncMock()
mock_monitor.check = AsyncMock(return_value=_make_health(HealthStatus.OFFLINE, URL_111))
mock_settings = MagicMock()
mock_settings.OLLAMA_URL = URL_111
mock_settings.OLLAMA_FALLBACK_URL = "" # 無 188
mock_settings.OLLAMA_HEALTH_CHECK_MODEL = "qwen2.5:7b-instruct"
mock_settings.GEMINI_DAILY_QUOTA = 1000
manager = OllamaFailoverManager(health_monitor=mock_monitor)
manager._settings = mock_settings
with patch.object(manager, "_write_failover_audit", return_value=None), \
patch.object(manager, "_check_gemini_quota", return_value=False):
result = await manager.select_provider()
assert result.primary.provider_name == "nemotron"

View File

@@ -0,0 +1,402 @@
# apps/api/tests/test_ollama_health_monitor.py | 2026-04-25 @ Asia/Taipei
# Created 2026-04-25 P1.1c by Claude Engineer-C
"""
OllamaHealthMonitor 單元測試 - P1.1c
=====================================
測試覆蓋:
- 4 種健康狀態HEALTHY / SLOW / DEGRADED / OFFLINE
- 連通性失敗 → OFFLINE
- 推理超時asyncio.TimeoutError / httpx.TimeoutException→ DEGRADED
- 推理回傳非 200 → DEGRADED
- Redis 快取命中from_cache=True
- Redis 快取失敗時降級直接執行(不 crash
- is_usable() 邏輯
測試分類unitmock httpx無 DB / Redis 依賴)
"""
from __future__ import annotations
import asyncio
import json
import time
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from src.services.ollama_health_monitor import (
LATENCY_HEALTHY_THRESHOLD_MS,
LATENCY_SLOW_THRESHOLD_MS,
HealthReport,
HealthStatus,
OllamaHealthMonitor,
get_ollama_health_monitor,
reset_ollama_health_monitor,
)
# =============================================================================
# Fixtures
# =============================================================================
HOST = "http://192.168.0.111:11434"
HOST_188 = "http://192.168.0.188:11434"
@pytest.fixture(autouse=True)
def reset_singleton():
"""每個測試後重置 singleton"""
yield
reset_ollama_health_monitor()
@pytest.fixture
def monitor():
return OllamaHealthMonitor()
def _mock_tags_ok() -> MagicMock:
"""/api/tags 回傳 200"""
resp = MagicMock()
resp.status_code = 200
return resp
def _mock_generate_ok(latency_s: float = 0.5) -> tuple[MagicMock, float]:
"""
/api/generate 回傳 200模擬給定延遲。
返回 (response_mock, latency_s),由 test 自行控制 time.perf_counter patch。
"""
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"response": "ok"}
return resp
# =============================================================================
# 層 1連通性
# =============================================================================
class TestConnectivity:
"""_check_connectivity 各種情況"""
@pytest.mark.asyncio
async def test_connectivity_success(self, monitor):
"""/api/tags 200 → 連通性通過"""
mock_resp = _mock_tags_ok()
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await monitor._check_connectivity(HOST)
assert result is True
@pytest.mark.asyncio
async def test_connectivity_non_200(self, monitor):
"""/api/tags 非 200 → 連通性失敗"""
mock_resp = MagicMock()
mock_resp.status_code = 503
mock_client = AsyncMock()
mock_client.get = AsyncMock(return_value=mock_resp)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await monitor._check_connectivity(HOST)
assert result is False
@pytest.mark.asyncio
async def test_connectivity_timeout(self, monitor):
"""連線 timeout → 返回 False不 raise"""
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=httpx.TimeoutException("timeout"))
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await monitor._check_connectivity(HOST)
assert result is False
@pytest.mark.asyncio
async def test_connectivity_connect_error(self, monitor):
"""連線拒絕 → 返回 False不 raise"""
mock_client = AsyncMock()
mock_client.get = AsyncMock(side_effect=httpx.ConnectError("refused"))
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await monitor._check_connectivity(HOST)
assert result is False
# =============================================================================
# 層 2推理測試分級
# =============================================================================
class TestInference:
"""_check_inference 延遲分級"""
def _make_mock_client(self, status_code: int = 200) -> AsyncMock:
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = {"response": "ok"}
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=resp)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
return mock_client
@pytest.mark.asyncio
async def test_inference_healthy(self, monitor):
"""推理延遲 <10s → HEALTHY"""
mock_client = self._make_mock_client()
# 模擬 0.5s 延遲(< 10s threshold
call_count = [0]
original_perf_counter = time.perf_counter
def _fake_perf_counter():
call_count[0] += 1
# 第一次呼叫start第二次呼叫end = start + 0.5s
if call_count[0] == 1:
return 0.0
return 0.5
with patch("httpx.AsyncClient", return_value=mock_client):
with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.HEALTHY
assert report.latency_ms == pytest.approx(500.0, abs=10)
@pytest.mark.asyncio
async def test_inference_slow(self, monitor):
"""推理延遲 10-30s → SLOW"""
mock_client = self._make_mock_client()
call_count = [0]
def _fake_perf_counter():
call_count[0] += 1
if call_count[0] == 1:
return 0.0
return 15.0 # 15s → SLOW zone
with patch("httpx.AsyncClient", return_value=mock_client):
with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.SLOW
assert report.latency_ms == pytest.approx(15_000.0, abs=10)
@pytest.mark.asyncio
async def test_inference_degraded_by_latency(self, monitor):
"""推理延遲 >30s → DEGRADED"""
mock_client = self._make_mock_client()
call_count = [0]
def _fake_perf_counter():
call_count[0] += 1
if call_count[0] == 1:
return 0.0
return 32.0 # 32s → DEGRADED
with patch("httpx.AsyncClient", return_value=mock_client):
with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.DEGRADED
@pytest.mark.asyncio
async def test_inference_timeout_degraded(self, monitor):
"""推理 TimeoutException → DEGRADED不 crash不 OFFLINE"""
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=httpx.TimeoutException("timeout"))
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.DEGRADED
assert "超時" in report.reason or "timeout" in report.reason.lower()
@pytest.mark.asyncio
async def test_inference_asyncio_timeout_degraded(self, monitor):
"""推理 asyncio.TimeoutError → DEGRADED"""
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=asyncio.TimeoutError())
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.DEGRADED
@pytest.mark.asyncio
async def test_inference_connect_error_offline(self, monitor):
"""推理 ConnectError → OFFLINE連通性層已放行但推理層掉線"""
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=httpx.ConnectError("refused"))
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", return_value=mock_client):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.OFFLINE
@pytest.mark.asyncio
async def test_inference_non_200_degraded(self, monitor):
"""推理回傳非 200 → DEGRADED"""
mock_client = self._make_mock_client(status_code=503)
call_count = [0]
def _fake_perf_counter():
call_count[0] += 1
return float(call_count[0] - 1) * 0.1
with patch("httpx.AsyncClient", return_value=mock_client):
with patch("src.services.ollama_health_monitor.time.perf_counter", _fake_perf_counter):
report = await monitor._check_inference(HOST)
assert report.status == HealthStatus.DEGRADED
assert "503" in report.reason
# =============================================================================
# check() 整合(含 Redis 快取)
# =============================================================================
class TestCheckWithCache:
"""check() 方法含 Redis 快取邏輯"""
@pytest.mark.asyncio
async def test_check_offline_when_connectivity_fails(self, monitor):
"""連通性失敗 → 最終 OFFLINE"""
with patch.object(monitor, "_check_connectivity", return_value=False):
with patch.object(monitor, "_get_cached", return_value=None):
with patch.object(monitor, "_set_cached", return_value=None):
with patch.object(monitor, "_write_audit_log", return_value=None):
report = await monitor.check(HOST)
assert report.status == HealthStatus.OFFLINE
assert report.host == HOST
@pytest.mark.asyncio
async def test_check_healthy_when_inference_fast(self, monitor):
"""連通性通過 + 推理快 → HEALTHY"""
healthy_report = HealthReport(
status=HealthStatus.HEALTHY,
latency_ms=500.0,
)
with patch.object(monitor, "_check_connectivity", return_value=True):
with patch.object(monitor, "_check_inference", return_value=healthy_report):
with patch.object(monitor, "_get_cached", return_value=None):
with patch.object(monitor, "_set_cached", return_value=None):
with patch.object(monitor, "_write_audit_log", return_value=None):
report = await monitor.check(HOST)
assert report.status == HealthStatus.HEALTHY
assert report.host == HOST
@pytest.mark.asyncio
async def test_check_returns_cached(self, monitor):
"""快取命中 → 直接返回快取結果from_cache=True"""
cached = HealthReport(
status=HealthStatus.HEALTHY,
host=HOST,
latency_ms=300.0,
)
with patch.object(monitor, "_get_cached", return_value=cached):
report = await monitor.check(HOST)
assert report.from_cache is True
assert report.status == HealthStatus.HEALTHY
@pytest.mark.asyncio
async def test_check_proceeds_when_cache_get_fails(self, monitor):
"""Redis get 失敗 → 降級直接執行 check不 crash"""
with patch.object(monitor, "_get_cached", return_value=None):
with patch.object(monitor, "_check_connectivity", return_value=False):
with patch.object(monitor, "_set_cached", return_value=None):
with patch.object(monitor, "_write_audit_log", return_value=None):
report = await monitor.check(HOST)
assert report.status == HealthStatus.OFFLINE # 正常降級,未 crash
@pytest.mark.asyncio
async def test_cache_set_failure_does_not_crash(self, monitor):
"""Redis set 失敗 → 靜默,結果仍正常返回"""
healthy = HealthReport(status=HealthStatus.HEALTHY, latency_ms=200.0)
with patch.object(monitor, "_get_cached", return_value=None):
with patch.object(monitor, "_check_connectivity", return_value=True):
with patch.object(monitor, "_check_inference", return_value=healthy):
with patch.object(monitor, "_set_cached", side_effect=RuntimeError("Redis down")):
with patch.object(monitor, "_write_audit_log", return_value=None):
# 不應 raise
report = await monitor.check(HOST)
assert report.status == HealthStatus.HEALTHY
# =============================================================================
# HealthReport 輔助方法
# =============================================================================
class TestHealthReport:
"""HealthReport dataclass 邏輯"""
def test_is_usable_healthy(self):
assert HealthReport(status=HealthStatus.HEALTHY).is_usable() is True
def test_is_usable_slow(self):
assert HealthReport(status=HealthStatus.SLOW).is_usable() is True
def test_is_usable_degraded(self):
assert HealthReport(status=HealthStatus.DEGRADED).is_usable() is True
def test_is_usable_offline(self):
assert HealthReport(status=HealthStatus.OFFLINE).is_usable() is False
def test_to_dict_structure(self):
report = HealthReport(
status=HealthStatus.SLOW,
host=HOST,
latency_ms=15500.0,
reason="slow zone",
)
d = report.to_dict()
assert d["status"] == "slow"
assert d["host"] == HOST
assert d["latency_ms"] == 15500.0
assert d["reason"] == "slow zone"
# =============================================================================
# Singleton
# =============================================================================
def test_singleton_returns_same_instance():
m1 = get_ollama_health_monitor()
m2 = get_ollama_health_monitor()
assert m1 is m2
def test_reset_singleton_gives_new_instance():
m1 = get_ollama_health_monitor()
reset_ollama_health_monitor()
m2 = get_ollama_health_monitor()
assert m1 is not m2

View File

@@ -0,0 +1,34 @@
# ============================================================================
# PATCH: 188 CPU-only Ollama 備援端點
# 日期: 2026-04-25 (台北時區)
# 負責人: ogt + Claude Sonnet 4.6
# ADR 參考: plan_complete_v3.md P0.5
# 診斷實測數據:
# 主機: 192.168.0.188, Intel Xeon Silver 4214 @ 2.2GHz, 12 核, CPU-only
# RAM: 62GB (used 14GB), Disk: 982GB (used 221GB)
# GPU: 無
# 現有模型: qwen2.5:7b-instruct (4.5GB), llama3.2:3b (1.9GB),
# deepseek-r1:14b (8.5GB), nomic-embed-text (261MB)
# 推理延遲實測: qwen2.5:7b-instruct → total=111s, eval_rate=0.09 token/s
# llama3.2:3b → total=155s (cold start, 比 7b 更慢)
# 目標 ~30s 無法達到 (CPU 推理硬上限 ~0.09 token/s)
# 決策: qwen2.5:7b-instruct 已存在,設為備援 (111s 延遲,使用者需知情)
# 連通性: 110 → 188:11434 ✅ 已驗證
# ⚠️ 注意: 188 推理極慢(~111s),應只在 111 GPU Ollama 完全失效時啟用
# 建議: 程式碼層應設 OLLAMA_FALLBACK_188_TIMEOUT_SEC = 150
# ============================================================================
#
# 將以下兩行加入 /Users/ogt/awoooi/k8s/awoooi-prod/04-configmap.yaml
# 建議位置: OLLAMA_URL 行 (第 20 行) 之後
#
# --- 新增內容 ---
# 2026-04-25 ogt + Claude Sonnet 4.6: 188 CPU-only Ollama 備援 (plan_complete_v3 P0.5)
# ⚠️ 188 推理延遲實測 ~111s (0.09 token/s, CPU-only Xeon 4214),僅作 111 完全失效時的降級備援
# 模型已存在: qwen2.5:7b-instruct (4.5GB), 無需重拉
OLLAMA_FALLBACK_188: "http://192.168.0.188:11434"
OLLAMA_188_MODEL: "qwen2.5:7b-instruct"
# --- 新增內容結束 ---
#
# 使用方式 (需用戶 review 後手動 apply):
# kubectl -n awoooi-prod apply -f k8s/awoooi-prod/04-configmap.yaml
# kubectl -n awoooi-prod rollout restart deployment/awoooi-api