fix(api): fallback ai route status to connectivity
All checks were successful
CD Pipeline / tests (push) Successful in 5m59s
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / build-and-deploy (push) Successful in 4m17s
CD Pipeline / post-deploy-checks (push) Successful in 1m36s

This commit is contained in:
Your Name
2026-05-24 13:39:20 +08:00
parent 82e471a7f2
commit 478e25b6a2
2 changed files with 274 additions and 35 deletions

View File

@@ -10,6 +10,7 @@ from __future__ import annotations
import asyncio
import re
import time
import uuid
from collections import defaultdict
from collections.abc import Mapping
@@ -17,11 +18,13 @@ from datetime import UTC, datetime, timedelta
from typing import Any, get_args
from uuid import UUID
import httpx
import structlog
from fastapi import HTTPException, status
from sqlalchemy import func, select, text, update
from sqlalchemy import or_ as sa_or
from src.core.config import get_settings
from src.db.awooop_models import (
AwoooPContractRevision,
AwoooPConversationEvent,
@@ -49,7 +52,7 @@ from src.services.ollama_failover_manager import (
OllamaRoutingResult,
get_ollama_failover_manager,
)
from src.services.ollama_health_monitor import HealthReport
from src.services.ollama_health_monitor import HealthReport, HealthStatus
from src.services.run_state_machine import transition
logger = structlog.get_logger(__name__)
@@ -62,6 +65,7 @@ _MAX_TIMELINE_ITEMS = 100
_MAX_LIST_CONTEXT_ROWS = 500
_MAX_STEP_SUMMARY_CHARS = 128
_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0
_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5
_REMEDIATION_HISTORY_LIMIT = 20
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
_REMEDIATION_STATUS_FILTERS = {
@@ -509,44 +513,30 @@ async def get_ai_route_status(
get_ollama_failover_manager().select_provider(task_type=workload),
timeout=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
except TimeoutError:
logger.warning(
"ai_route_status_check_timeout",
workload_type=workload,
timeout_seconds=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS,
)
return {
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
"workload_type": workload,
"policy_order": policy_order,
"selected_provider": None,
"selected_url": None,
"selected_model": None,
"fallback_chain": [],
"route_reason": "route_check_timeout",
"route_source": "ollama_failover_manager",
"route_error": (
return await _ai_route_lightweight_status_from_policy(
workload=workload,
policy_order=policy_order,
checked_at=checked_at,
route_reason="route_check_timeout",
route_error=(
f"route status timed out after "
f"{_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS:g}s"
),
"health": {},
"checked_at": checked_at,
}
)
except Exception as exc:
return {
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
"workload_type": workload,
"policy_order": policy_order,
"selected_provider": None,
"selected_url": None,
"selected_model": None,
"fallback_chain": [],
"route_reason": "route_check_failed",
"route_source": "ollama_failover_manager",
"route_error": str(exc),
"health": {},
"checked_at": checked_at,
}
return await _ai_route_lightweight_status_from_policy(
workload=workload,
policy_order=policy_order,
checked_at=checked_at,
route_reason="route_check_failed",
route_error=str(exc),
)
return {
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
@@ -597,6 +587,186 @@ def _ai_route_policy_order(workload: OllamaWorkloadType) -> list[dict[str, Any]]
return items
async def _ai_route_lightweight_status_from_policy(
*,
workload: OllamaWorkloadType,
policy_order: list[dict[str, Any]],
checked_at: datetime,
route_reason: str,
route_error: str,
) -> dict[str, Any]:
"""Fallback read model for route status; never changes the execution router."""
endpoints = list(resolve_ollama_order(workload))
try:
reports = await asyncio.gather(
*[_ai_route_probe_connectivity(endpoint) for endpoint in endpoints],
)
except Exception as exc:
logger.warning(
"ai_route_status_lightweight_probe_failed",
workload_type=workload,
route_reason=route_reason,
error=str(exc),
)
return _ai_route_unavailable_status(
workload=workload,
policy_order=policy_order,
checked_at=checked_at,
route_reason=route_reason,
route_error=route_error,
route_source="ollama_failover_manager",
)
health_by_provider = {
endpoint.provider_name: _ai_route_health_item(report)
for endpoint, report in zip(endpoints, reports, strict=False)
}
selected_index = next(
(
index
for index, report in enumerate(reports)
if report.status != HealthStatus.OFFLINE
),
None,
)
if selected_index is None:
return {
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
"workload_type": workload,
"policy_order": policy_order,
"selected_provider": "gemini",
"selected_url": None,
"selected_model": None,
"fallback_chain": [],
"route_reason": (
f"{route_reason}; lightweight connectivity found all Ollama "
"endpoints offline; final fallback policy is Gemini"
),
"route_source": "lightweight_connectivity_fallback",
"route_error": None,
"health": health_by_provider,
"checked_at": checked_at,
}
selected = endpoints[selected_index]
model = get_settings().OLLAMA_HEALTH_CHECK_MODEL
fallback_chain = [
_ai_route_runtime_policy_endpoint_item(
endpoint,
priority=index + 1,
model=model,
)
for index, endpoint in enumerate(endpoints[selected_index + 1 :], start=selected_index + 1)
]
fallback_chain.append({
"priority": len(endpoints) + 1,
"provider_name": "gemini",
"url": None,
"model": None,
"runtime": "cloud",
})
return {
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
"workload_type": workload,
"policy_order": policy_order,
"selected_provider": selected.provider_name,
"selected_url": selected.url,
"selected_model": model,
"fallback_chain": fallback_chain,
"route_reason": (
f"{route_reason}; lightweight connectivity selected "
f"{selected.provider_name}"
),
"route_source": "lightweight_connectivity_fallback",
"route_error": None,
"health": health_by_provider,
"checked_at": checked_at,
}
async def _ai_route_probe_connectivity(
endpoint: OllamaEndpointSelection,
) -> HealthReport:
"""Cheap read-only /api/tags probe for Operator Console status fallback."""
if not endpoint.url:
return HealthReport(
status=HealthStatus.OFFLINE,
host=endpoint.url,
reason="no_ollama_endpoint_url",
)
start = time.perf_counter()
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS),
) as client:
response = await client.get(f"{endpoint.url.rstrip('/')}/api/tags")
latency_ms = (time.perf_counter() - start) * 1000
if response.status_code == 200:
return HealthReport(
status=HealthStatus.HEALTHY,
host=endpoint.url,
latency_ms=latency_ms,
reason="status_only_connectivity_ok",
)
return HealthReport(
status=HealthStatus.OFFLINE,
host=endpoint.url,
latency_ms=latency_ms,
reason=f"status_only_connectivity_http_{response.status_code}",
)
except Exception as exc:
latency_ms = (time.perf_counter() - start) * 1000
return HealthReport(
status=HealthStatus.OFFLINE,
host=endpoint.url,
latency_ms=latency_ms,
reason=f"status_only_connectivity_error:{type(exc).__name__}",
)
def _ai_route_runtime_policy_endpoint_item(
endpoint: OllamaEndpointSelection,
*,
priority: int,
model: str,
) -> dict[str, Any]:
return {
"priority": priority,
"provider_name": endpoint.provider_name,
"url": endpoint.url or None,
"model": model,
"runtime": "ollama",
}
def _ai_route_unavailable_status(
*,
workload: OllamaWorkloadType,
policy_order: list[dict[str, Any]],
checked_at: datetime,
route_reason: str,
route_error: str,
route_source: str,
) -> dict[str, Any]:
return {
"schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION,
"workload_type": workload,
"policy_order": policy_order,
"selected_provider": None,
"selected_url": None,
"selected_model": None,
"fallback_chain": [],
"route_reason": route_reason,
"route_source": route_source,
"route_error": route_error,
"health": {},
"checked_at": checked_at,
}
def _ai_route_policy_endpoint_item(
endpoint: OllamaEndpointSelection,
*,

View File

@@ -10,9 +10,9 @@ from fastapi import HTTPException
import src.services.platform_operator_service as platform_operator_service
from src.api.v1.platform.operator_runs import (
AiRouteStatusResponse,
ListCicdEventsResponse,
ListApprovalsResponse,
ListCallbackRepliesResponse,
ListCicdEventsResponse,
ListRunsResponse,
)
from src.services.ollama_failover_manager import OllamaEndpoint, OllamaRoutingResult
@@ -23,8 +23,8 @@ from src.services.platform_operator_service import (
_build_awooop_status_chain,
_callback_reply_event_item,
_callback_reply_summary_matches_status,
_cicd_event_item_from_row,
_cicd_duration_seconds,
_cicd_event_item_from_row,
_collect_run_incident_ids,
_is_source_correlation_applied_link,
_legacy_mcp_timeline_status,
@@ -1190,6 +1190,20 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch
async def select_provider(self, task_type: str = "general") -> None:
await asyncio.sleep(0.05)
async def fake_connectivity(endpoint):
if endpoint.provider_name == "ollama_gcp_a":
return HealthReport(
status=HealthStatus.OFFLINE,
host=endpoint.url,
reason="timeout",
)
return HealthReport(
status=HealthStatus.HEALTHY,
host=endpoint.url,
latency_ms=12.3,
reason="status_only_connectivity_ok",
)
monkeypatch.setattr(
platform_operator_service,
"_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS",
@@ -1200,12 +1214,26 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch
"get_ollama_failover_manager",
lambda: SlowFailoverManager(),
)
monkeypatch.setattr(
platform_operator_service,
"_ai_route_probe_connectivity",
fake_connectivity,
)
response = await platform_operator_service.get_ai_route_status("deep_rca")
assert response["route_reason"] == "route_check_timeout"
assert response["route_error"] == "route status timed out after 0.001s"
assert response["selected_provider"] is None
assert response["route_reason"] == (
"route_check_timeout; lightweight connectivity selected ollama_gcp_b"
)
assert response["route_error"] is None
assert response["route_source"] == "lightweight_connectivity_fallback"
assert response["selected_provider"] == "ollama_gcp_b"
assert response["health"]["ollama_gcp_a"]["status"] == "offline"
assert response["health"]["ollama_gcp_b"]["status"] == "healthy"
assert [item["provider_name"] for item in response["fallback_chain"]] == [
"ollama_local",
"gemini",
]
assert [item["provider_name"] for item in response["policy_order"]] == [
"ollama_gcp_a",
"ollama_gcp_b",
@@ -1214,6 +1242,47 @@ async def test_ai_route_status_times_out_before_slow_provider_checks(monkeypatch
]
@pytest.mark.asyncio
async def test_ai_route_status_lightweight_fallback_keeps_gemini_policy_only(
monkeypatch,
) -> None:
class SlowFailoverManager:
async def select_provider(self, task_type: str = "general") -> None:
await asyncio.sleep(0.05)
async def fake_offline_connectivity(endpoint):
return HealthReport(
status=HealthStatus.OFFLINE,
host=endpoint.url,
reason="offline",
)
monkeypatch.setattr(
platform_operator_service,
"_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS",
0.001,
)
monkeypatch.setattr(
platform_operator_service,
"get_ollama_failover_manager",
lambda: SlowFailoverManager(),
)
monkeypatch.setattr(
platform_operator_service,
"_ai_route_probe_connectivity",
fake_offline_connectivity,
)
response = await platform_operator_service.get_ai_route_status("deep_rca")
assert response["selected_provider"] == "gemini"
assert response["selected_model"] is None
assert response["route_source"] == "lightweight_connectivity_fallback"
assert response["route_error"] is None
assert "final fallback policy is Gemini" in response["route_reason"]
assert all(item["status"] == "offline" for item in response["health"].values())
def test_ai_route_workload_validation_rejects_unknown_value() -> None:
assert _validate_ai_route_workload(" hermes ") == "hermes"
with pytest.raises(HTTPException) as exc_info: