fix(ollama): cooldown provider health probes
This commit is contained in:
@@ -26,6 +26,11 @@ from pydantic import BaseModel
|
||||
from src.core.config import settings
|
||||
from src.core.logging import get_logger
|
||||
from src.services.health_check_service import get_health_check_service
|
||||
from src.services.ollama_endpoint_circuit_breaker import (
|
||||
get_ollama_endpoint_cooldown_remaining_seconds,
|
||||
record_ollama_endpoint_failure,
|
||||
record_ollama_endpoint_success,
|
||||
)
|
||||
from src.services.ollama_endpoint_resolver import resolve_ollama_order
|
||||
|
||||
router = APIRouter()
|
||||
@@ -137,11 +142,7 @@ async def check_ollama_provider_chain() -> tuple[ComponentHealth, dict[str, Comp
|
||||
|
||||
checked = await asyncio.gather(
|
||||
*(
|
||||
_http_health_check(
|
||||
selection.provider_name,
|
||||
selection.url,
|
||||
"/api/tags",
|
||||
)
|
||||
_ollama_endpoint_health_check(selection.provider_name, selection.url)
|
||||
for selection in selections
|
||||
)
|
||||
)
|
||||
@@ -187,6 +188,22 @@ async def check_ollama_provider_chain() -> tuple[ComponentHealth, dict[str, Comp
|
||||
)
|
||||
|
||||
|
||||
async def _ollama_endpoint_health_check(name: str, url: str) -> ComponentHealth:
|
||||
cooldown_remaining = get_ollama_endpoint_cooldown_remaining_seconds(url)
|
||||
if cooldown_remaining > 0:
|
||||
return ComponentHealth(
|
||||
status="down",
|
||||
error=f"recent endpoint failure cooldown: {cooldown_remaining:.0f}s",
|
||||
)
|
||||
|
||||
result = await _http_health_check(name, url, "/api/tags")
|
||||
if result.status == "up":
|
||||
record_ollama_endpoint_success(url)
|
||||
else:
|
||||
record_ollama_endpoint_failure(url)
|
||||
return result
|
||||
|
||||
|
||||
async def check_openclaw() -> ComponentHealth:
|
||||
"""Async OpenClaw health check via /health"""
|
||||
return await _http_health_check("openclaw", settings.OPENCLAW_URL, "/health")
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""
|
||||
Lightweight in-process cooldown for noisy Ollama endpoint failures.
|
||||
|
||||
This does not change ADR-110 policy order. It only suppresses endpoints that
|
||||
just failed for short-lived high-volume callers such as embedding/RAG, while
|
||||
leaving health checks and failover status free to probe the full topology.
|
||||
This does not change ADR-110 policy order. It suppresses endpoints that just
|
||||
failed for short-lived callers while still returning explicit offline/cooldown
|
||||
state to health and route-status surfaces.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -60,6 +60,26 @@ def is_ollama_endpoint_blocked(url: str, *, now: float | None = None) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def get_ollama_endpoint_cooldown_remaining_seconds(
|
||||
url: str,
|
||||
*,
|
||||
now: float | None = None,
|
||||
) -> float:
|
||||
"""Return remaining cooldown seconds for display/debug surfaces."""
|
||||
if not url:
|
||||
return 0.0
|
||||
current_time = time.monotonic() if now is None else now
|
||||
normalized = _normalize_url(url)
|
||||
blocked_until = _blocked_until_by_url.get(normalized)
|
||||
if blocked_until is None:
|
||||
return 0.0
|
||||
remaining = blocked_until - current_time
|
||||
if remaining <= 0:
|
||||
_blocked_until_by_url.pop(normalized, None)
|
||||
return 0.0
|
||||
return remaining
|
||||
|
||||
|
||||
def filter_ollama_urls_with_cooldown(
|
||||
urls: Iterable[str],
|
||||
*,
|
||||
|
||||
@@ -29,6 +29,11 @@ import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import get_settings
|
||||
from src.services.ollama_endpoint_circuit_breaker import (
|
||||
get_ollama_endpoint_cooldown_remaining_seconds,
|
||||
record_ollama_endpoint_failure,
|
||||
record_ollama_endpoint_success,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -206,9 +211,21 @@ class OllamaHealthMonitor:
|
||||
|
||||
async def _run_checks(self, host: str) -> HealthReport:
|
||||
"""依序執行三層檢查"""
|
||||
cooldown_remaining = get_ollama_endpoint_cooldown_remaining_seconds(host)
|
||||
if cooldown_remaining > 0:
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=host,
|
||||
reason=(
|
||||
"recent_endpoint_failure_cooldown:"
|
||||
f"{cooldown_remaining:.0f}s"
|
||||
),
|
||||
)
|
||||
|
||||
# 層 1:連通性
|
||||
connectivity_ok = await self._check_connectivity(host)
|
||||
if not connectivity_ok:
|
||||
record_ollama_endpoint_failure(host)
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=host,
|
||||
@@ -217,6 +234,7 @@ class OllamaHealthMonitor:
|
||||
|
||||
# 層 2:推理測試
|
||||
report = await self._check_inference(host)
|
||||
record_ollama_endpoint_success(host)
|
||||
return report
|
||||
|
||||
async def _check_connectivity(self, host: str) -> bool:
|
||||
|
||||
@@ -3,6 +3,19 @@ from __future__ import annotations
|
||||
import pytest
|
||||
|
||||
from src.api.v1 import health
|
||||
from src.services.ollama_endpoint_circuit_breaker import (
|
||||
is_ollama_endpoint_blocked,
|
||||
record_ollama_endpoint_failure,
|
||||
reset_ollama_endpoint_cooldown_for_tests,
|
||||
)
|
||||
|
||||
|
||||
def setup_function() -> None:
|
||||
reset_ollama_endpoint_cooldown_for_tests()
|
||||
|
||||
|
||||
def teardown_function() -> None:
|
||||
reset_ollama_endpoint_cooldown_for_tests()
|
||||
|
||||
|
||||
def _set_ollama_settings(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
@@ -66,3 +79,56 @@ def test_overall_status_uses_aggregate_ollama_not_endpoint_details() -> None:
|
||||
}
|
||||
|
||||
assert health._determine_overall_status(components) == "degraded"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_provider_chain_uses_cooldown_after_failure(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_set_ollama_settings(monkeypatch)
|
||||
calls: list[str] = []
|
||||
|
||||
async def fake_http_check(name: str, _url: str, _path: str) -> health.ComponentHealth:
|
||||
calls.append(name)
|
||||
if name == "ollama_gcp_a":
|
||||
return health.ComponentHealth(status="down", error="http 502")
|
||||
return health.ComponentHealth(status="up", latency_ms=9.0)
|
||||
|
||||
monkeypatch.setattr(health, "_http_health_check", fake_http_check)
|
||||
|
||||
aggregate, details = await health.check_ollama_provider_chain()
|
||||
|
||||
assert aggregate.status == "degraded"
|
||||
assert details["ollama_gcp_a"].status == "down"
|
||||
assert is_ollama_endpoint_blocked("http://gcp-a:11434")
|
||||
|
||||
calls.clear()
|
||||
aggregate, details = await health.check_ollama_provider_chain()
|
||||
|
||||
assert aggregate.status == "degraded"
|
||||
assert details["ollama_gcp_a"].status == "down"
|
||||
assert "cooldown" in (details["ollama_gcp_a"].error or "")
|
||||
assert "ollama_gcp_a" not in calls
|
||||
assert {"ollama_gcp_b", "ollama_local"} == set(calls)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ollama_provider_chain_success_clears_cooldown(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_set_ollama_settings(monkeypatch)
|
||||
record_ollama_endpoint_failure("http://gcp-a:11434")
|
||||
monkeypatch.setattr(
|
||||
health,
|
||||
"get_ollama_endpoint_cooldown_remaining_seconds",
|
||||
lambda _url: 0.0,
|
||||
)
|
||||
|
||||
async def fake_http_check(name: str, _url: str, _path: str) -> health.ComponentHealth:
|
||||
return health.ComponentHealth(status="up", latency_ms=8.0)
|
||||
|
||||
monkeypatch.setattr(health, "_http_health_check", fake_http_check)
|
||||
|
||||
await health.check_ollama_provider_chain()
|
||||
|
||||
assert not is_ollama_endpoint_blocked("http://gcp-a:11434")
|
||||
|
||||
@@ -4,6 +4,7 @@ from types import SimpleNamespace
|
||||
|
||||
from src.services.ollama_endpoint_circuit_breaker import (
|
||||
filter_ollama_urls_with_cooldown,
|
||||
get_ollama_endpoint_cooldown_remaining_seconds,
|
||||
is_ollama_endpoint_blocked,
|
||||
record_ollama_endpoint_failure,
|
||||
record_ollama_endpoint_success,
|
||||
@@ -41,6 +42,26 @@ def test_cooldown_expires_and_success_clears_block() -> None:
|
||||
assert not is_ollama_endpoint_blocked("http://gcp-a:11434", now=201.0)
|
||||
|
||||
|
||||
def test_cooldown_remaining_seconds_expires_cleanly() -> None:
|
||||
record_ollama_endpoint_failure("http://gcp-a:11434", cooldown_seconds=10.0, now=100.0)
|
||||
|
||||
assert (
|
||||
get_ollama_endpoint_cooldown_remaining_seconds(
|
||||
"http://gcp-a:11434",
|
||||
now=103.0,
|
||||
)
|
||||
== 7.0
|
||||
)
|
||||
assert (
|
||||
get_ollama_endpoint_cooldown_remaining_seconds(
|
||||
"http://gcp-a:11434",
|
||||
now=111.0,
|
||||
)
|
||||
== 0.0
|
||||
)
|
||||
assert not is_ollama_endpoint_blocked("http://gcp-a:11434", now=112.0)
|
||||
|
||||
|
||||
def test_all_blocked_returns_full_order_for_recovery_probe() -> None:
|
||||
urls = ("http://gcp-a:11434", "http://gcp-b:11434")
|
||||
|
||||
|
||||
@@ -19,16 +19,17 @@ OllamaHealthMonitor 單元測試 - P1.1c
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from src.services.ollama_endpoint_circuit_breaker import (
|
||||
is_ollama_endpoint_blocked,
|
||||
record_ollama_endpoint_failure,
|
||||
reset_ollama_endpoint_cooldown_for_tests,
|
||||
)
|
||||
from src.services.ollama_health_monitor import (
|
||||
LATENCY_HEALTHY_THRESHOLD_MS,
|
||||
LATENCY_SLOW_THRESHOLD_MS,
|
||||
HealthReport,
|
||||
HealthStatus,
|
||||
OllamaHealthMonitor,
|
||||
@@ -48,8 +49,10 @@ HOST_LOCAL = "http://192.168.0.111:11434" # Local fallback(已移出 188 主
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_singleton():
|
||||
"""每個測試後重置 singleton"""
|
||||
reset_ollama_endpoint_cooldown_for_tests()
|
||||
yield
|
||||
reset_ollama_health_monitor()
|
||||
reset_ollama_endpoint_cooldown_for_tests()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -110,6 +113,29 @@ class TestConnectivity:
|
||||
result = await monitor._check_connectivity(HOST)
|
||||
assert result is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_checks_records_connectivity_failure_cooldown(self, monitor, monkeypatch):
|
||||
"""連通性失敗會建立短暫 cooldown,避免重複撞同一個 upstream。"""
|
||||
monkeypatch.setattr(monitor, "_check_connectivity", AsyncMock(return_value=False))
|
||||
|
||||
report = await monitor._run_checks(HOST)
|
||||
|
||||
assert report.status == HealthStatus.OFFLINE
|
||||
assert is_ollama_endpoint_blocked(HOST)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_checks_respects_existing_failure_cooldown(self, monitor, monkeypatch):
|
||||
"""cooldown 中直接回報 OFFLINE,不再打 /api/tags。"""
|
||||
record_ollama_endpoint_failure(HOST)
|
||||
connectivity = AsyncMock(return_value=True)
|
||||
monkeypatch.setattr(monitor, "_check_connectivity", connectivity)
|
||||
|
||||
report = await monitor._run_checks(HOST)
|
||||
|
||||
assert report.status == HealthStatus.OFFLINE
|
||||
assert "cooldown" in report.reason
|
||||
connectivity.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connectivity_timeout(self, monitor):
|
||||
"""連線 timeout → 返回 False(不 raise)"""
|
||||
@@ -160,7 +186,6 @@ class TestInference:
|
||||
|
||||
# 模擬 0.5s 延遲(< 10s threshold)
|
||||
call_count = [0]
|
||||
original_perf_counter = time.perf_counter
|
||||
|
||||
def _fake_perf_counter():
|
||||
call_count[0] += 1
|
||||
|
||||
@@ -31,7 +31,8 @@
|
||||
- `knowledge_rag_service.py`
|
||||
- `playbook_rag.py`
|
||||
- 成功時會清除 cooldown;network/timeout/5xx 失敗才短暫標記,不因 4xx 或資料錯誤誤封 endpoint。
|
||||
- 前端/health/failover status 不套用 cooldown,仍會顯示 GCP-A/B 真實紅燈,不消音、不假裝 healthy。
|
||||
- health/failover status 也回報明確 cooldown/offline 狀態,仍保留 GCP-A/B 真實紅燈,不消音、不假裝 healthy。
|
||||
- `/api/v1/health` 的 provider chain 與 `OllamaHealthMonitor` 會在 endpoint 剛失敗後短暫回報 `recent endpoint failure cooldown`,避免每個健康頁、route 頁連續撞同一個 502。
|
||||
|
||||
**local verification**:
|
||||
|
||||
@@ -49,6 +50,9 @@ pytest:
|
||||
test_ollama_endpoint_resolver.py
|
||||
test_playbook_service.py
|
||||
-> 23 passed
|
||||
test_health_ollama_provider_chain.py
|
||||
test_ollama_health_monitor.py
|
||||
-> 35 passed
|
||||
git diff --check -> ok
|
||||
```
|
||||
|
||||
|
||||
Reference in New Issue
Block a user