diff --git a/apps/api/src/services/platform_operator_service.py b/apps/api/src/services/platform_operator_service.py index ada65e3e..4b5db7d3 100644 --- a/apps/api/src/services/platform_operator_service.py +++ b/apps/api/src/services/platform_operator_service.py @@ -10,6 +10,7 @@ from __future__ import annotations import asyncio import re +import time import uuid from collections import defaultdict from collections.abc import Mapping @@ -17,11 +18,13 @@ from datetime import UTC, datetime, timedelta from typing import Any, get_args from uuid import UUID +import httpx import structlog from fastapi import HTTPException, status from sqlalchemy import func, select, text, update from sqlalchemy import or_ as sa_or +from src.core.config import get_settings from src.db.awooop_models import ( AwoooPContractRevision, AwoooPConversationEvent, @@ -49,7 +52,7 @@ from src.services.ollama_failover_manager import ( OllamaRoutingResult, get_ollama_failover_manager, ) -from src.services.ollama_health_monitor import HealthReport +from src.services.ollama_health_monitor import HealthReport, HealthStatus from src.services.run_state_machine import transition logger = structlog.get_logger(__name__) @@ -62,6 +65,7 @@ _MAX_TIMELINE_ITEMS = 100 _MAX_LIST_CONTEXT_ROWS = 500 _MAX_STEP_SUMMARY_CHARS = 128 _AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0 +_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5 _REMEDIATION_HISTORY_LIMIT = 20 _INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b") _REMEDIATION_STATUS_FILTERS = { @@ -509,44 +513,30 @@ async def get_ai_route_status( get_ollama_failover_manager().select_provider(task_type=workload), timeout=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS, ) - except asyncio.TimeoutError: + except TimeoutError: logger.warning( "ai_route_status_check_timeout", workload_type=workload, timeout_seconds=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS, ) - return { - "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, - "workload_type": workload, - "policy_order": policy_order, - "selected_provider": None, - "selected_url": None, - "selected_model": None, - "fallback_chain": [], - "route_reason": "route_check_timeout", - "route_source": "ollama_failover_manager", - "route_error": ( + return await _ai_route_lightweight_status_from_policy( + workload=workload, + policy_order=policy_order, + checked_at=checked_at, + route_reason="route_check_timeout", + route_error=( f"route status timed out after " f"{_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS:g}s" ), - "health": {}, - "checked_at": checked_at, - } + ) except Exception as exc: - return { - "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, - "workload_type": workload, - "policy_order": policy_order, - "selected_provider": None, - "selected_url": None, - "selected_model": None, - "fallback_chain": [], - "route_reason": "route_check_failed", - "route_source": "ollama_failover_manager", - "route_error": str(exc), - "health": {}, - "checked_at": checked_at, - } + return await _ai_route_lightweight_status_from_policy( + workload=workload, + policy_order=policy_order, + checked_at=checked_at, + route_reason="route_check_failed", + route_error=str(exc), + ) return { "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, @@ -597,6 +587,186 @@ def _ai_route_policy_order(workload: OllamaWorkloadType) -> list[dict[str, Any]] return items +async def _ai_route_lightweight_status_from_policy( + *, + workload: OllamaWorkloadType, + policy_order: list[dict[str, Any]], + checked_at: datetime, + route_reason: str, + route_error: str, +) -> dict[str, Any]: + """Fallback read model for route status; never changes the execution router.""" + endpoints = list(resolve_ollama_order(workload)) + try: + reports = await asyncio.gather( + *[_ai_route_probe_connectivity(endpoint) for endpoint in endpoints], + ) + except Exception as exc: + logger.warning( + "ai_route_status_lightweight_probe_failed", + workload_type=workload, + route_reason=route_reason, + error=str(exc), + ) + return _ai_route_unavailable_status( + workload=workload, + policy_order=policy_order, + checked_at=checked_at, + route_reason=route_reason, + route_error=route_error, + route_source="ollama_failover_manager", + ) + + health_by_provider = { + endpoint.provider_name: _ai_route_health_item(report) + for endpoint, report in zip(endpoints, reports, strict=False) + } + selected_index = next( + ( + index + for index, report in enumerate(reports) + if report.status != HealthStatus.OFFLINE + ), + None, + ) + + if selected_index is None: + return { + "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, + "workload_type": workload, + "policy_order": policy_order, + "selected_provider": "gemini", + "selected_url": None, + "selected_model": None, + "fallback_chain": [], + "route_reason": ( + f"{route_reason}; lightweight connectivity found all Ollama " + "endpoints offline; final fallback policy is Gemini" + ), + "route_source": "lightweight_connectivity_fallback", + "route_error": None, + "health": health_by_provider, + "checked_at": checked_at, + } + + selected = endpoints[selected_index] + model = get_settings().OLLAMA_HEALTH_CHECK_MODEL + fallback_chain = [ + _ai_route_runtime_policy_endpoint_item( + endpoint, + priority=index + 1, + model=model, + ) + for index, endpoint in enumerate(endpoints[selected_index + 1 :], start=selected_index + 1) + ] + fallback_chain.append({ + "priority": len(endpoints) + 1, + "provider_name": "gemini", + "url": None, + "model": None, + "runtime": "cloud", + }) + + return { + "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, + "workload_type": workload, + "policy_order": policy_order, + "selected_provider": selected.provider_name, + "selected_url": selected.url, + "selected_model": model, + "fallback_chain": fallback_chain, + "route_reason": ( + f"{route_reason}; lightweight connectivity selected " + f"{selected.provider_name}" + ), + "route_source": "lightweight_connectivity_fallback", + "route_error": None, + "health": health_by_provider, + "checked_at": checked_at, + } + + +async def _ai_route_probe_connectivity( + endpoint: OllamaEndpointSelection, +) -> HealthReport: + """Cheap read-only /api/tags probe for Operator Console status fallback.""" + if not endpoint.url: + return HealthReport( + status=HealthStatus.OFFLINE, + host=endpoint.url, + reason="no_ollama_endpoint_url", + ) + + start = time.perf_counter() + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS), + ) as client: + response = await client.get(f"{endpoint.url.rstrip('/')}/api/tags") + latency_ms = (time.perf_counter() - start) * 1000 + if response.status_code == 200: + return HealthReport( + status=HealthStatus.HEALTHY, + host=endpoint.url, + latency_ms=latency_ms, + reason="status_only_connectivity_ok", + ) + return HealthReport( + status=HealthStatus.OFFLINE, + host=endpoint.url, + latency_ms=latency_ms, + reason=f"status_only_connectivity_http_{response.status_code}", + ) + except Exception as exc: + latency_ms = (time.perf_counter() - start) * 1000 + return HealthReport( + status=HealthStatus.OFFLINE, + host=endpoint.url, + latency_ms=latency_ms, + reason=f"status_only_connectivity_error:{type(exc).__name__}", + ) + + +def _ai_route_runtime_policy_endpoint_item( + endpoint: OllamaEndpointSelection, + *, + priority: int, + model: str, +) -> dict[str, Any]: + return { + "priority": priority, + "provider_name": endpoint.provider_name, + "url": endpoint.url or None, + "model": model, + "runtime": "ollama", + } + + +def _ai_route_unavailable_status( + *, + workload: OllamaWorkloadType, + policy_order: list[dict[str, Any]], + checked_at: datetime, + route_reason: str, + route_error: str, + route_source: str, +) -> dict[str, Any]: + return { + "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, + "workload_type": workload, + "policy_order": policy_order, + "selected_provider": None, + "selected_url": None, + "selected_model": None, + "fallback_chain": [], + "route_reason": route_reason, + "route_source": route_source, + "route_error": route_error, + "health": {}, + "checked_at": checked_at, + } + + def _ai_route_policy_endpoint_item( endpoint: OllamaEndpointSelection, *, diff --git a/apps/api/tests/test_awooop_operator_timeline_labels.py b/apps/api/tests/test_awooop_operator_timeline_labels.py index 6a1cca69..bdd47dc1 100644 --- a/apps/api/tests/test_awooop_operator_timeline_labels.py +++ b/apps/api/tests/test_awooop_operator_timeline_labels.py @@ -10,9 +10,9 @@ from fastapi import HTTPException import src.services.platform_operator_service as platform_operator_service from src.api.v1.platform.operator_runs import ( AiRouteStatusResponse, - ListCicdEventsResponse, ListApprovalsResponse, ListCallbackRepliesResponse, + ListCicdEventsResponse, ListRunsResponse, ) from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult @@ -23,8 +23,8 @@ from src.services.platform_operator_service import ( _build_awooop_status_chain, _callback_reply_event_item, _callback_reply_summary_matches_status, - _cicd_event_item_from_row, _cicd_duration_seconds, + _cicd_event_item_from_row, _collect_run_incident_ids, _is_source_correlation_applied_link, _legacy_mcp_timeline_status, @@ -1190,6 +1190,20 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch async def select_provider(self, task_type: str = "general") -> None: await asyncio.sleep(0.05) + async def fake_connectivity(endpoint): + if endpoint.provider_name == "ollama_gcp_a": + return HealthReport( + status=HealthStatus.OFFLINE, + host=endpoint.url, + reason="timeout", + ) + return HealthReport( + status=HealthStatus.HEALTHY, + host=endpoint.url, + latency_ms=12.3, + reason="status_only_connectivity_ok", + ) + monkeypatch.setattr( platform_operator_service, "_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS", @@ -1200,12 +1214,26 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch "get_ollama_failover_manager", lambda: SlowFailoverManager(), ) + monkeypatch.setattr( + platform_operator_service, + "_ai_route_probe_connectivity", + fake_connectivity, + ) response = await platform_operator_service.get_ai_route_status("deep_rca") - assert response["route_reason"] == "route_check_timeout" - assert response["route_error"] == "route status timed out after 0.001s" - assert response["selected_provider"] is None + assert response["route_reason"] == ( + "route_check_timeout; lightweight connectivity selected ollama_gcp_b" + ) + assert response["route_error"] is None + assert response["route_source"] == "lightweight_connectivity_fallback" + assert response["selected_provider"] == "ollama_gcp_b" + assert response["health"]["ollama_gcp_a"]["status"] == "offline" + assert response["health"]["ollama_gcp_b"]["status"] == "healthy" + assert [item["provider_name"] for item in response["fallback_chain"]] == [ + "ollama_local", + "gemini", + ] assert [item["provider_name"] for item in response["policy_order"]] == [ "ollama_gcp_a", "ollama_gcp_b", @@ -1214,6 +1242,47 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch ] +@pytest.mark.asyncio +async def test_ai_route_status_lightweight_fallback_keeps_gemini_policy_only( + monkeypatch, +) -> None: + class SlowFailoverManager: + async def select_provider(self, task_type: str = "general") -> None: + await asyncio.sleep(0.05) + + async def fake_offline_connectivity(endpoint): + return HealthReport( + status=HealthStatus.OFFLINE, + host=endpoint.url, + reason="offline", + ) + + monkeypatch.setattr( + platform_operator_service, + "_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS", + 0.001, + ) + monkeypatch.setattr( + platform_operator_service, + "get_ollama_failover_manager", + lambda: SlowFailoverManager(), + ) + monkeypatch.setattr( + platform_operator_service, + "_ai_route_probe_connectivity", + fake_offline_connectivity, + ) + + response = await platform_operator_service.get_ai_route_status("deep_rca") + + assert response["selected_provider"] == "gemini" + assert response["selected_model"] is None + assert response["route_source"] == "lightweight_connectivity_fallback" + assert response["route_error"] is None + assert "final fallback policy is Gemini" in response["route_reason"] + assert all(item["status"] == "offline" for item in response["health"].values()) + + def test_ai_route_workload_validation_rejects_unknown_value() -> None: assert _validate_ai_route_workload(" hermes ") == "hermes" with pytest.raises(HTTPException) as exc_info: