fix(api): fallback ai route status to connectivity
This commit is contained in:
@@ -10,6 +10,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from collections.abc import Mapping
|
||||
@@ -17,11 +18,13 @@ from datetime import UTC, datetime, timedelta
|
||||
from typing import Any, get_args
|
||||
from uuid import UUID
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from fastapi import HTTPException, status
|
||||
from sqlalchemy import func, select, text, update
|
||||
from sqlalchemy import or_ as sa_or
|
||||
|
||||
from src.core.config import get_settings
|
||||
from src.db.awooop_models import (
|
||||
AwoooPContractRevision,
|
||||
AwoooPConversationEvent,
|
||||
@@ -49,7 +52,7 @@ from src.services.ollama_failover_manager import (
|
||||
OllamaRoutingResult,
|
||||
get_ollama_failover_manager,
|
||||
)
|
||||
from src.services.ollama_health_monitor import HealthReport
|
||||
from src.services.ollama_health_monitor import HealthReport, HealthStatus
|
||||
from src.services.run_state_machine import transition
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
@@ -62,6 +65,7 @@ _MAX_TIMELINE_ITEMS = 100
|
||||
_MAX_LIST_CONTEXT_ROWS = 500
|
||||
_MAX_STEP_SUMMARY_CHARS = 128
|
||||
_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0
|
||||
_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5
|
||||
_REMEDIATION_HISTORY_LIMIT = 20
|
||||
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
|
||||
_REMEDIATION_STATUS_FILTERS = {
|
||||
@@ -509,44 +513,30 @@ async def get_ai_route_status(
|
||||
get_ollama_failover_manager().select_provider(task_type=workload),
|
||||
timeout=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
"ai_route_status_check_timeout",
|
||||
workload_type=workload,
|
||||
timeout_seconds=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS,
|
||||
)
|
||||
return {
|
||||
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
|
||||
"workload_type": workload,
|
||||
"policy_order": policy_order,
|
||||
"selected_provider": None,
|
||||
"selected_url": None,
|
||||
"selected_model": None,
|
||||
"fallback_chain": [],
|
||||
"route_reason": "route_check_timeout",
|
||||
"route_source": "ollama_failover_manager",
|
||||
"route_error": (
|
||||
return await _ai_route_lightweight_status_from_policy(
|
||||
workload=workload,
|
||||
policy_order=policy_order,
|
||||
checked_at=checked_at,
|
||||
route_reason="route_check_timeout",
|
||||
route_error=(
|
||||
f"route status timed out after "
|
||||
f"{_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS:g}s"
|
||||
),
|
||||
"health": {},
|
||||
"checked_at": checked_at,
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
return {
|
||||
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
|
||||
"workload_type": workload,
|
||||
"policy_order": policy_order,
|
||||
"selected_provider": None,
|
||||
"selected_url": None,
|
||||
"selected_model": None,
|
||||
"fallback_chain": [],
|
||||
"route_reason": "route_check_failed",
|
||||
"route_source": "ollama_failover_manager",
|
||||
"route_error": str(exc),
|
||||
"health": {},
|
||||
"checked_at": checked_at,
|
||||
}
|
||||
return await _ai_route_lightweight_status_from_policy(
|
||||
workload=workload,
|
||||
policy_order=policy_order,
|
||||
checked_at=checked_at,
|
||||
route_reason="route_check_failed",
|
||||
route_error=str(exc),
|
||||
)
|
||||
|
||||
return {
|
||||
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
|
||||
@@ -597,6 +587,186 @@ def _ai_route_policy_order(workload: OllamaWorkloadType) -> list[dict[str, Any]]
|
||||
return items
|
||||
|
||||
|
||||
async def _ai_route_lightweight_status_from_policy(
|
||||
*,
|
||||
workload: OllamaWorkloadType,
|
||||
policy_order: list[dict[str, Any]],
|
||||
checked_at: datetime,
|
||||
route_reason: str,
|
||||
route_error: str,
|
||||
) -> dict[str, Any]:
|
||||
"""Fallback read model for route status; never changes the execution router."""
|
||||
endpoints = list(resolve_ollama_order(workload))
|
||||
try:
|
||||
reports = await asyncio.gather(
|
||||
*[_ai_route_probe_connectivity(endpoint) for endpoint in endpoints],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"ai_route_status_lightweight_probe_failed",
|
||||
workload_type=workload,
|
||||
route_reason=route_reason,
|
||||
error=str(exc),
|
||||
)
|
||||
return _ai_route_unavailable_status(
|
||||
workload=workload,
|
||||
policy_order=policy_order,
|
||||
checked_at=checked_at,
|
||||
route_reason=route_reason,
|
||||
route_error=route_error,
|
||||
route_source="ollama_failover_manager",
|
||||
)
|
||||
|
||||
health_by_provider = {
|
||||
endpoint.provider_name: _ai_route_health_item(report)
|
||||
for endpoint, report in zip(endpoints, reports, strict=False)
|
||||
}
|
||||
selected_index = next(
|
||||
(
|
||||
index
|
||||
for index, report in enumerate(reports)
|
||||
if report.status != HealthStatus.OFFLINE
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
if selected_index is None:
|
||||
return {
|
||||
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
|
||||
"workload_type": workload,
|
||||
"policy_order": policy_order,
|
||||
"selected_provider": "gemini",
|
||||
"selected_url": None,
|
||||
"selected_model": None,
|
||||
"fallback_chain": [],
|
||||
"route_reason": (
|
||||
f"{route_reason}; lightweight connectivity found all Ollama "
|
||||
"endpoints offline; final fallback policy is Gemini"
|
||||
),
|
||||
"route_source": "lightweight_connectivity_fallback",
|
||||
"route_error": None,
|
||||
"health": health_by_provider,
|
||||
"checked_at": checked_at,
|
||||
}
|
||||
|
||||
selected = endpoints[selected_index]
|
||||
model = get_settings().OLLAMA_HEALTH_CHECK_MODEL
|
||||
fallback_chain = [
|
||||
_ai_route_runtime_policy_endpoint_item(
|
||||
endpoint,
|
||||
priority=index + 1,
|
||||
model=model,
|
||||
)
|
||||
for index, endpoint in enumerate(endpoints[selected_index + 1 :], start=selected_index + 1)
|
||||
]
|
||||
fallback_chain.append({
|
||||
"priority": len(endpoints) + 1,
|
||||
"provider_name": "gemini",
|
||||
"url": None,
|
||||
"model": None,
|
||||
"runtime": "cloud",
|
||||
})
|
||||
|
||||
return {
|
||||
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
|
||||
"workload_type": workload,
|
||||
"policy_order": policy_order,
|
||||
"selected_provider": selected.provider_name,
|
||||
"selected_url": selected.url,
|
||||
"selected_model": model,
|
||||
"fallback_chain": fallback_chain,
|
||||
"route_reason": (
|
||||
f"{route_reason}; lightweight connectivity selected "
|
||||
f"{selected.provider_name}"
|
||||
),
|
||||
"route_source": "lightweight_connectivity_fallback",
|
||||
"route_error": None,
|
||||
"health": health_by_provider,
|
||||
"checked_at": checked_at,
|
||||
}
|
||||
|
||||
|
||||
async def _ai_route_probe_connectivity(
|
||||
endpoint: OllamaEndpointSelection,
|
||||
) -> HealthReport:
|
||||
"""Cheap read-only /api/tags probe for Operator Console status fallback."""
|
||||
if not endpoint.url:
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=endpoint.url,
|
||||
reason="no_ollama_endpoint_url",
|
||||
)
|
||||
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
async with httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS),
|
||||
) as client:
|
||||
response = await client.get(f"{endpoint.url.rstrip('/')}/api/tags")
|
||||
latency_ms = (time.perf_counter() - start) * 1000
|
||||
if response.status_code == 200:
|
||||
return HealthReport(
|
||||
status=HealthStatus.HEALTHY,
|
||||
host=endpoint.url,
|
||||
latency_ms=latency_ms,
|
||||
reason="status_only_connectivity_ok",
|
||||
)
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=endpoint.url,
|
||||
latency_ms=latency_ms,
|
||||
reason=f"status_only_connectivity_http_{response.status_code}",
|
||||
)
|
||||
except Exception as exc:
|
||||
latency_ms = (time.perf_counter() - start) * 1000
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=endpoint.url,
|
||||
latency_ms=latency_ms,
|
||||
reason=f"status_only_connectivity_error:{type(exc).__name__}",
|
||||
)
|
||||
|
||||
|
||||
def _ai_route_runtime_policy_endpoint_item(
|
||||
endpoint: OllamaEndpointSelection,
|
||||
*,
|
||||
priority: int,
|
||||
model: str,
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"priority": priority,
|
||||
"provider_name": endpoint.provider_name,
|
||||
"url": endpoint.url or None,
|
||||
"model": model,
|
||||
"runtime": "ollama",
|
||||
}
|
||||
|
||||
|
||||
def _ai_route_unavailable_status(
|
||||
*,
|
||||
workload: OllamaWorkloadType,
|
||||
policy_order: list[dict[str, Any]],
|
||||
checked_at: datetime,
|
||||
route_reason: str,
|
||||
route_error: str,
|
||||
route_source: str,
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
|
||||
"workload_type": workload,
|
||||
"policy_order": policy_order,
|
||||
"selected_provider": None,
|
||||
"selected_url": None,
|
||||
"selected_model": None,
|
||||
"fallback_chain": [],
|
||||
"route_reason": route_reason,
|
||||
"route_source": route_source,
|
||||
"route_error": route_error,
|
||||
"health": {},
|
||||
"checked_at": checked_at,
|
||||
}
|
||||
|
||||
|
||||
def _ai_route_policy_endpoint_item(
|
||||
endpoint: OllamaEndpointSelection,
|
||||
*,
|
||||
|
||||
@@ -10,9 +10,9 @@ from fastapi import HTTPException
|
||||
import src.services.platform_operator_service as platform_operator_service
|
||||
from src.api.v1.platform.operator_runs import (
|
||||
AiRouteStatusResponse,
|
||||
ListCicdEventsResponse,
|
||||
ListApprovalsResponse,
|
||||
ListCallbackRepliesResponse,
|
||||
ListCicdEventsResponse,
|
||||
ListRunsResponse,
|
||||
)
|
||||
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
|
||||
@@ -23,8 +23,8 @@ from src.services.platform_operator_service import (
|
||||
_build_awooop_status_chain,
|
||||
_callback_reply_event_item,
|
||||
_callback_reply_summary_matches_status,
|
||||
_cicd_event_item_from_row,
|
||||
_cicd_duration_seconds,
|
||||
_cicd_event_item_from_row,
|
||||
_collect_run_incident_ids,
|
||||
_is_source_correlation_applied_link,
|
||||
_legacy_mcp_timeline_status,
|
||||
@@ -1190,6 +1190,20 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch
|
||||
async def select_provider(self, task_type: str = "general") -> None:
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
async def fake_connectivity(endpoint):
|
||||
if endpoint.provider_name == "ollama_gcp_a":
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=endpoint.url,
|
||||
reason="timeout",
|
||||
)
|
||||
return HealthReport(
|
||||
status=HealthStatus.HEALTHY,
|
||||
host=endpoint.url,
|
||||
latency_ms=12.3,
|
||||
reason="status_only_connectivity_ok",
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
platform_operator_service,
|
||||
"_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS",
|
||||
@@ -1200,12 +1214,26 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch
|
||||
"get_ollama_failover_manager",
|
||||
lambda: SlowFailoverManager(),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
platform_operator_service,
|
||||
"_ai_route_probe_connectivity",
|
||||
fake_connectivity,
|
||||
)
|
||||
|
||||
response = await platform_operator_service.get_ai_route_status("deep_rca")
|
||||
|
||||
assert response["route_reason"] == "route_check_timeout"
|
||||
assert response["route_error"] == "route status timed out after 0.001s"
|
||||
assert response["selected_provider"] is None
|
||||
assert response["route_reason"] == (
|
||||
"route_check_timeout; lightweight connectivity selected ollama_gcp_b"
|
||||
)
|
||||
assert response["route_error"] is None
|
||||
assert response["route_source"] == "lightweight_connectivity_fallback"
|
||||
assert response["selected_provider"] == "ollama_gcp_b"
|
||||
assert response["health"]["ollama_gcp_a"]["status"] == "offline"
|
||||
assert response["health"]["ollama_gcp_b"]["status"] == "healthy"
|
||||
assert [item["provider_name"] for item in response["fallback_chain"]] == [
|
||||
"ollama_local",
|
||||
"gemini",
|
||||
]
|
||||
assert [item["provider_name"] for item in response["policy_order"]] == [
|
||||
"ollama_gcp_a",
|
||||
"ollama_gcp_b",
|
||||
@@ -1214,6 +1242,47 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ai_route_status_lightweight_fallback_keeps_gemini_policy_only(
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
class SlowFailoverManager:
|
||||
async def select_provider(self, task_type: str = "general") -> None:
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
async def fake_offline_connectivity(endpoint):
|
||||
return HealthReport(
|
||||
status=HealthStatus.OFFLINE,
|
||||
host=endpoint.url,
|
||||
reason="offline",
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
platform_operator_service,
|
||||
"_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS",
|
||||
0.001,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
platform_operator_service,
|
||||
"get_ollama_failover_manager",
|
||||
lambda: SlowFailoverManager(),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
platform_operator_service,
|
||||
"_ai_route_probe_connectivity",
|
||||
fake_offline_connectivity,
|
||||
)
|
||||
|
||||
response = await platform_operator_service.get_ai_route_status("deep_rca")
|
||||
|
||||
assert response["selected_provider"] == "gemini"
|
||||
assert response["selected_model"] is None
|
||||
assert response["route_source"] == "lightweight_connectivity_fallback"
|
||||
assert response["route_error"] is None
|
||||
assert "final fallback policy is Gemini" in response["route_reason"]
|
||||
assert all(item["status"] == "offline" for item in response["health"].values())
|
||||
|
||||
|
||||
def test_ai_route_workload_validation_rejects_unknown_value() -> None:
|
||||
assert _validate_ai_route_workload(" hermes ") == "hermes"
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
|
||||
Reference in New Issue
Block a user