fix(openclaw): gate alert cloud fallback behind flag
This commit is contained in:
@@ -504,6 +504,22 @@ class Settings(BaseSettings):
|
||||
"unexpected cloud spend from Gitea push/PR alerts."
|
||||
),
|
||||
)
|
||||
ALERT_AI_ALLOW_CLOUD_FALLBACK: bool = Field(
|
||||
default=True,
|
||||
description=(
|
||||
"Allow incident/alert OpenClaw analysis to use cloud fallback "
|
||||
"providers after the GCP-A/GCP-B/111 Ollama lane is exhausted. "
|
||||
"Default true so Gemini can act as the final backup, after the "
|
||||
"ordered Ollama lane is exhausted."
|
||||
),
|
||||
)
|
||||
ALERT_AI_ENFORCE_OLLAMA_FIRST: bool = Field(
|
||||
default=True,
|
||||
description=(
|
||||
"Force incident/alert OpenClaw analysis to try GCP-A, then GCP-B, "
|
||||
"then local 111 before cloud backup providers such as Gemini."
|
||||
),
|
||||
)
|
||||
# 2026-03-29 ogt: ADR-036 Nemotron Tool Calling 整合
|
||||
NVIDIA_API_KEY: str = Field(
|
||||
default="",
|
||||
|
||||
@@ -1078,11 +1078,46 @@ class AIRouterExecutor:
|
||||
cached = await redis.get(cache_key)
|
||||
if cached:
|
||||
data = _json.loads(cached)
|
||||
cached_provider = data.get("provider", "cache")
|
||||
provider_allowed = cached_provider in provider_order
|
||||
ollama_first_required = (
|
||||
bool(context)
|
||||
and any(
|
||||
key in context
|
||||
for key in (
|
||||
"alert_type",
|
||||
"alertname",
|
||||
"alert_name",
|
||||
"fingerprint",
|
||||
"incident_id",
|
||||
"severity",
|
||||
"target_resource",
|
||||
)
|
||||
)
|
||||
and bool(provider_order)
|
||||
and provider_order[0].startswith("ollama")
|
||||
)
|
||||
if (
|
||||
cached_provider == "ollama"
|
||||
and any(provider.startswith("ollama") for provider in provider_order)
|
||||
):
|
||||
provider_allowed = True
|
||||
if ollama_first_required and not cached_provider.startswith("ollama"):
|
||||
provider_allowed = False
|
||||
if not provider_allowed:
|
||||
logger.info(
|
||||
"ai_router_cache_provider_mismatch_skip",
|
||||
cache_key=cache_key[:30],
|
||||
cached_provider=cached_provider,
|
||||
provider_order=provider_order,
|
||||
ollama_first_required=ollama_first_required,
|
||||
)
|
||||
raise ValueError("cached provider not allowed by current provider_order")
|
||||
logger.info("ai_router_cache_hit", cache_key=cache_key[:30])
|
||||
return AIResult(
|
||||
raw_response=data.get("response", ""),
|
||||
success=True,
|
||||
provider=data.get("provider", "cache"),
|
||||
provider=cached_provider,
|
||||
from_cache=True,
|
||||
)
|
||||
except Exception as e:
|
||||
|
||||
@@ -5,7 +5,7 @@ Phase 5: OpenClaw 實體化升級 (2026-03-21)
|
||||
統帥校正: SignOz 為唯一全能視力中心
|
||||
|
||||
Features:
|
||||
- 真實 LLM SDK 整合 (Ollama → Gemini → Claude)
|
||||
- 真實 LLM SDK 整合 (告警預設 Ollama GCP-A → GCP-B → 111 → Gemini)
|
||||
- SignOz Gold Metrics 即時擷取 (P99/Error/RPS)
|
||||
- AIOps Agent 專業人格 (K8s 維運 + SRE RCA 專精)
|
||||
- 強制結構化 JSON 輸出 (符合 API 契約)
|
||||
@@ -144,8 +144,8 @@ class OpenClawService:
|
||||
"""
|
||||
OpenClaw AI 決策服務 - True LLM + SignOz Integration
|
||||
|
||||
實作 AI_FALLBACK_ORDER 備援機制:
|
||||
Ollama → Gemini → Claude → Mock
|
||||
實作 AI_FALLBACK_ORDER 備援機制。
|
||||
告警/incident 上下文預設套用成本防線,只允許 Ollama GCP-A → GCP-B → 111。
|
||||
|
||||
新增 SignOz 整合:
|
||||
- 自動擷取 Gold Metrics
|
||||
@@ -176,6 +176,89 @@ class OpenClawService:
|
||||
await self._http_client.aclose()
|
||||
self._http_client = None
|
||||
|
||||
def _is_incident_alert_context(self, alert_context: dict | None) -> bool:
|
||||
"""Return true when a request came from the alert/incident automation path."""
|
||||
if not alert_context:
|
||||
return False
|
||||
alert_keys = {
|
||||
"alert_type",
|
||||
"alertname",
|
||||
"alert_name",
|
||||
"fingerprint",
|
||||
"incident_id",
|
||||
"severity",
|
||||
"signals",
|
||||
"target_resource",
|
||||
}
|
||||
return any(key in alert_context for key in alert_keys)
|
||||
|
||||
def _cloud_fallback_allowed_for_alert(self, alert_context: dict | None) -> bool:
|
||||
"""Cloud fallback is allowed after the ordered Ollama lane for alerts."""
|
||||
if not self._is_incident_alert_context(alert_context):
|
||||
return True
|
||||
return bool(getattr(settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", True))
|
||||
|
||||
def _alert_enforces_ollama_first(self, alert_context: dict | None) -> bool:
|
||||
"""Alert cards must try GCP-A/GCP-B/111 before Gemini backup."""
|
||||
return (
|
||||
self._is_incident_alert_context(alert_context)
|
||||
and bool(getattr(settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True))
|
||||
)
|
||||
|
||||
async def _resolve_alert_provider_order(
|
||||
self,
|
||||
task_type: str = "diagnose",
|
||||
alert_context: dict | None = None,
|
||||
cloud_provider_order: list[str] | None = None,
|
||||
) -> list[str]:
|
||||
"""Resolve GCP-A/GCP-B/111, then Gemini backup, for alert analysis."""
|
||||
provider_order: list[str] = []
|
||||
try:
|
||||
route = await get_ollama_failover_manager().select_provider(task_type=task_type)
|
||||
provider_order = [
|
||||
endpoint.provider_name
|
||||
for endpoint in route.all_endpoints_in_order()
|
||||
if endpoint.provider_name.startswith("ollama")
|
||||
]
|
||||
except Exception as route_error:
|
||||
logger.warning(
|
||||
"alert_ollama_route_lookup_failed",
|
||||
error=str(route_error),
|
||||
task_type=task_type,
|
||||
)
|
||||
|
||||
if not provider_order:
|
||||
provider_order = ["ollama_gcp_a", "ollama_gcp_b", "ollama_local"]
|
||||
|
||||
deduped: list[str] = []
|
||||
for provider_name in provider_order:
|
||||
if provider_name and provider_name not in deduped:
|
||||
deduped.append(provider_name)
|
||||
|
||||
if not self._alert_enforces_ollama_first(alert_context):
|
||||
return deduped
|
||||
|
||||
ollama_order = {"ollama_gcp_a": 0, "ollama_gcp_b": 1, "ollama_local": 2}
|
||||
ordered_ollama = [
|
||||
provider_name
|
||||
for provider_name in deduped
|
||||
if provider_name in ollama_order
|
||||
]
|
||||
ordered_ollama.sort(key=lambda provider_name: ollama_order[provider_name])
|
||||
if not ordered_ollama:
|
||||
ordered_ollama = ["ollama_gcp_a", "ollama_gcp_b", "ollama_local"]
|
||||
|
||||
if not self._cloud_fallback_allowed_for_alert(alert_context):
|
||||
return ordered_ollama
|
||||
|
||||
cloud_candidates = cloud_provider_order or []
|
||||
cloud_backup: list[str] = []
|
||||
for provider_name in [*cloud_candidates, "gemini"]:
|
||||
if provider_name == "gemini" and provider_name not in cloud_backup:
|
||||
cloud_backup.append(provider_name)
|
||||
|
||||
return ordered_ollama + cloud_backup
|
||||
|
||||
# =========================================================================
|
||||
# SignOz Integration
|
||||
# =========================================================================
|
||||
@@ -437,13 +520,13 @@ class OpenClawService:
|
||||
# 完整移除時機: Phase 24 完整驗收後 (ADR-052 D11)
|
||||
# =========================================================================
|
||||
|
||||
async def _call_ollama(self, prompt: str) -> tuple[str, bool]:
|
||||
async def _call_ollama(self, prompt: str, *, ollama_only: bool = False) -> tuple[str, bool]:
|
||||
"""
|
||||
呼叫 Ollama (支援 JSON Mode)。
|
||||
|
||||
USE_AI_ROUTER=true 正常會走 AIRouterExecutor;這裡是 legacy safety-net。
|
||||
2026-05-05 Codex: safety-net 也必須遵守 ADR-110 三層 Ollama
|
||||
路由,不能只打 OLLAMA_URL 後直接掉 Gemini。
|
||||
路由,告警路徑預設只允許 GCP-A/GCP-B/111,不能只打 OLLAMA_URL 後直接掉 Gemini。
|
||||
"""
|
||||
try:
|
||||
client = await self._get_client()
|
||||
@@ -484,6 +567,26 @@ class OpenClawService:
|
||||
endpoints.append((provider_name, endpoint_url))
|
||||
seen_urls.add(endpoint_url)
|
||||
|
||||
if ollama_only:
|
||||
allowed_provider_order = {"ollama_gcp_a": 0, "ollama_gcp_b": 1, "ollama_local": 2}
|
||||
endpoints = [
|
||||
(provider_name, endpoint_url)
|
||||
for provider_name, endpoint_url in endpoints
|
||||
if provider_name in allowed_provider_order
|
||||
]
|
||||
endpoints.sort(key=lambda item: allowed_provider_order[item[0]])
|
||||
if not endpoints:
|
||||
endpoints = [
|
||||
("ollama_gcp_a", settings.OLLAMA_URL),
|
||||
("ollama_gcp_b", getattr(settings, "OLLAMA_SECONDARY_URL", "")),
|
||||
("ollama_local", getattr(settings, "OLLAMA_FALLBACK_URL", "")),
|
||||
]
|
||||
endpoints = [
|
||||
(provider_name, endpoint_url)
|
||||
for provider_name, endpoint_url in endpoints
|
||||
if endpoint_url
|
||||
]
|
||||
|
||||
last_error = ""
|
||||
for provider_name, endpoint_url in endpoints:
|
||||
try:
|
||||
@@ -973,7 +1076,11 @@ class OpenClawService:
|
||||
try:
|
||||
# 2026-04-02 ogt: C2 修復 — 呼叫 AIRouter.route() 智慧路由 (非靜態 order)
|
||||
# D1 意圖分類路由、D7 隱私保護 (DIAGNOSE/CODE_REVIEW 強制 local) 生效
|
||||
from src.services.ai_router import get_ai_router, get_ai_executor, IntentType
|
||||
from src.services.ai_router import (
|
||||
IntentType,
|
||||
get_ai_executor,
|
||||
get_ai_router,
|
||||
)
|
||||
router = get_ai_router()
|
||||
executor = get_ai_executor()
|
||||
|
||||
@@ -987,7 +1094,10 @@ class OpenClawService:
|
||||
if p.value != decision.selected_provider.value
|
||||
]
|
||||
try:
|
||||
from src.services.ai_control import get_primary_provider, is_provider_disabled
|
||||
from src.services.ai_control import (
|
||||
get_primary_provider,
|
||||
is_provider_disabled,
|
||||
)
|
||||
_primary = await get_primary_provider()
|
||||
if _primary and _primary != decision.selected_provider.value:
|
||||
# 把 primary 移到首位 (保留原始 fallback)
|
||||
@@ -1003,6 +1113,20 @@ class OpenClawService:
|
||||
except Exception as _e:
|
||||
logger.warning("ai_control_override_failed", error=str(_e))
|
||||
|
||||
if self._alert_enforces_ollama_first(alert_context):
|
||||
original_provider_order = list(provider_order)
|
||||
provider_order = await self._resolve_alert_provider_order(
|
||||
task_type=decision.intent.value if decision.intent else "diagnose",
|
||||
alert_context=alert_context,
|
||||
cloud_provider_order=original_provider_order,
|
||||
)
|
||||
logger.info(
|
||||
"alert_ollama_first_provider_order",
|
||||
original_provider_order=original_provider_order,
|
||||
provider_order=provider_order,
|
||||
cloud_fallback_allowed=self._cloud_fallback_allowed_for_alert(alert_context),
|
||||
)
|
||||
|
||||
# Step 3: D7 隱私 — CODE_REVIEW 強制 local
|
||||
# 2026-04-15 ogt: DIAGNOSE 移除 require_local(v4.3 決策:NIM 為主力,無隱私問題)
|
||||
# ai_router.py v4.3 已明確:「NIM 從 Phase 22 起就是主力,無隱私問題」
|
||||
@@ -1045,13 +1169,18 @@ class OpenClawService:
|
||||
_mock_json, _rule_id = self._generate_mock_response(alert_context or {}, signoz_metrics)
|
||||
if _rule_id == "generic_fallback":
|
||||
import asyncio
|
||||
|
||||
from src.services.alert_rule_engine import auto_generate_rule
|
||||
try:
|
||||
asyncio.create_task(auto_generate_rule(
|
||||
alert_context or {},
|
||||
ollama_url=settings.OLLAMA_URL,
|
||||
model=settings.OPENCLAW_DEFAULT_MODEL,
|
||||
gemini_api_key=getattr(settings, "GEMINI_API_KEY", ""),
|
||||
gemini_api_key=(
|
||||
getattr(settings, "GEMINI_API_KEY", "")
|
||||
if self._cloud_fallback_allowed_for_alert(alert_context)
|
||||
else ""
|
||||
),
|
||||
))
|
||||
except Exception as _e:
|
||||
logger.warning("auto_rule_trigger_failed", error=str(_e))
|
||||
@@ -1086,7 +1215,18 @@ class OpenClawService:
|
||||
from src.services.ai_rate_limiter import get_ai_rate_limiter
|
||||
rate_limiter = get_ai_rate_limiter()
|
||||
|
||||
for provider in settings.AI_FALLBACK_ORDER:
|
||||
legacy_provider_order = list(settings.AI_FALLBACK_ORDER)
|
||||
if self._alert_enforces_ollama_first(alert_context):
|
||||
legacy_provider_order = ["ollama"]
|
||||
if self._cloud_fallback_allowed_for_alert(alert_context):
|
||||
legacy_provider_order.append("gemini")
|
||||
logger.info(
|
||||
"legacy_alert_ollama_first_provider_order",
|
||||
provider_order=legacy_provider_order,
|
||||
cloud_fallback_allowed=self._cloud_fallback_allowed_for_alert(alert_context),
|
||||
)
|
||||
|
||||
for provider in legacy_provider_order:
|
||||
# Rate Limit 檢查 (nvidia/gemini/claude 需檢查,ollama 不限)
|
||||
# 2026-03-30 ogt: 加入 nvidia (RPM=5 限制)
|
||||
if provider in ("nvidia", "gemini", "claude"):
|
||||
@@ -1109,7 +1249,10 @@ class OpenClawService:
|
||||
cost_usd = 0.0
|
||||
|
||||
if provider == "ollama":
|
||||
response, success = await self._call_ollama(prompt)
|
||||
response, success = await self._call_ollama(
|
||||
prompt,
|
||||
ollama_only=self._alert_enforces_ollama_first(alert_context),
|
||||
)
|
||||
elif provider == "gemini":
|
||||
response, success, total_tokens, cost_usd = await self._call_gemini(prompt)
|
||||
elif provider == "nvidia":
|
||||
@@ -1165,13 +1308,18 @@ class OpenClawService:
|
||||
_mock_json, _rule_id = self._generate_mock_response(alert_context or {}, signoz_metrics)
|
||||
if _rule_id == "generic_fallback":
|
||||
import asyncio
|
||||
|
||||
from src.services.alert_rule_engine import auto_generate_rule
|
||||
try:
|
||||
asyncio.create_task(auto_generate_rule(
|
||||
alert_context or {},
|
||||
ollama_url=settings.OLLAMA_URL,
|
||||
model=settings.OPENCLAW_DEFAULT_MODEL,
|
||||
gemini_api_key=getattr(settings, "GEMINI_API_KEY", ""),
|
||||
gemini_api_key=(
|
||||
getattr(settings, "GEMINI_API_KEY", "")
|
||||
if self._cloud_fallback_allowed_for_alert(alert_context)
|
||||
else ""
|
||||
),
|
||||
))
|
||||
except Exception as _e:
|
||||
logger.warning("auto_rule_trigger_failed", error=str(_e))
|
||||
@@ -1218,14 +1366,14 @@ class OpenClawService:
|
||||
except json.JSONDecodeError:
|
||||
# 3. 啟發式修補: 如果結尾缺少括號,嘗試補齊
|
||||
if candidate.startswith("{") and not candidate.endswith("}"):
|
||||
for i in range(1, 5): # 嘗試補 1-5 個括號/引號
|
||||
try:
|
||||
repaired = candidate + '"' * (i-1) + "}" * i
|
||||
json.loads(repaired)
|
||||
logger.info("json_repaired_heuristically", level=i)
|
||||
return repaired
|
||||
except:
|
||||
continue
|
||||
for i in range(1, 5): # 嘗試補 1-5 個括號/引號
|
||||
try:
|
||||
repaired = candidate + '"' * (i - 1) + "}" * i
|
||||
json.loads(repaired)
|
||||
logger.info("json_repaired_heuristically", level=i)
|
||||
return repaired
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
continue
|
||||
|
||||
# 4. 極端情況: 找出最後一個有效 key
|
||||
@@ -1235,11 +1383,11 @@ class OpenClawService:
|
||||
# 暴力去除非法尾綴 (如 \t\t...)
|
||||
candidate = re.sub(r"[ \t\r\n]+$", "", candidate)
|
||||
if not candidate.endswith("}"):
|
||||
candidate += '"}' # 嘗試最簡單的閉合
|
||||
candidate += '"}' # 嘗試最簡單的閉合
|
||||
try:
|
||||
json.loads(candidate)
|
||||
return candidate
|
||||
except:
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return None
|
||||
@@ -2200,6 +2348,7 @@ Expert context: {json.dumps(expert_context or {}, ensure_ascii=False, default=st
|
||||
}
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from src.services.nvidia_provider import get_nvidia_provider
|
||||
|
||||
nvidia = get_nvidia_provider()
|
||||
@@ -2334,7 +2483,7 @@ Expert context: {json.dumps(expert_context or {}, ensure_ascii=False, default=st
|
||||
"latency_ms": latency_ms,
|
||||
}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
except TimeoutError:
|
||||
latency_ms = (time.time() - start_time) * 1000
|
||||
logger.error(
|
||||
"nemotron_tool_call_timeout",
|
||||
@@ -2528,6 +2677,7 @@ async def _fetch_k8s_inventory_for_openclaw(
|
||||
"awoooi-api, awoooi-web, ..." 格式字串,失敗時返回 ""
|
||||
"""
|
||||
import asyncio as _asyncio
|
||||
|
||||
import structlog as _structlog
|
||||
_logger = _structlog.get_logger(__name__)
|
||||
try:
|
||||
@@ -2542,7 +2692,7 @@ async def _fetch_k8s_inventory_for_openclaw(
|
||||
)
|
||||
try:
|
||||
stdout, _ = await _asyncio.wait_for(proc.communicate(), timeout=timeout_sec)
|
||||
except _asyncio.TimeoutError:
|
||||
except TimeoutError:
|
||||
proc.kill()
|
||||
_logger.warning("k8s_inventory_timeout_openclaw", namespace=namespace)
|
||||
return ""
|
||||
|
||||
90
apps/api/tests/test_ai_router_cache_provider_policy.py
Normal file
90
apps/api/tests/test_ai_router_cache_provider_policy.py
Normal file
@@ -0,0 +1,90 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services import ai_router as ai_router_module
|
||||
from src.services.ai_providers.interfaces import AIResult
|
||||
from src.services.ai_router import AIProviderRegistry, AIRouterExecutor
|
||||
|
||||
|
||||
class _FakeRedis:
|
||||
def __init__(self, cached_provider: str) -> None:
|
||||
self.cached_provider = cached_provider
|
||||
self.set_calls: list[tuple[str, str, int | None]] = []
|
||||
|
||||
async def get(self, key: str) -> str:
|
||||
return json.dumps({
|
||||
"response": '{"provider":"stale"}',
|
||||
"provider": self.cached_provider,
|
||||
})
|
||||
|
||||
async def set(self, key: str, value: str, ex: int | None = None) -> None:
|
||||
self.set_calls.append((key, value, ex))
|
||||
|
||||
|
||||
class _FakeProvider:
|
||||
name = "ollama_gcp_a"
|
||||
privacy_level = "local"
|
||||
is_enabled = True
|
||||
capabilities = {"rca", "chat"}
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.calls = 0
|
||||
|
||||
async def analyze(self, prompt: str, context: dict[str, Any] | None = None) -> AIResult:
|
||||
self.calls += 1
|
||||
return AIResult(
|
||||
raw_response='{"provider":"fresh_ollama"}',
|
||||
success=True,
|
||||
provider=self.name,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_executor_skips_cached_cloud_provider_when_ollama_lane_is_required(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
fake_redis = _FakeRedis(cached_provider="gemini")
|
||||
fake_provider = _FakeProvider()
|
||||
registry = AIProviderRegistry()
|
||||
registry.register(fake_provider)
|
||||
|
||||
monkeypatch.setattr(ai_router_module._settings, "MOCK_MODE", False)
|
||||
monkeypatch.setattr("src.core.redis_client.get_redis", lambda: fake_redis)
|
||||
|
||||
result = await AIRouterExecutor(registry).execute(
|
||||
prompt="diagnose alert",
|
||||
provider_order=["ollama_gcp_a", "ollama_gcp_b", "ollama_local", "gemini"],
|
||||
context={"intent_hint": "diagnose", "alert_type": "HostHighCpuLoad"},
|
||||
)
|
||||
|
||||
assert result.provider == "ollama_gcp_a"
|
||||
assert result.raw_response == '{"provider":"fresh_ollama"}'
|
||||
assert fake_provider.calls == 1
|
||||
assert fake_redis.set_calls
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_executor_allows_cached_ollama_provider_for_ollama_lane(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
fake_redis = _FakeRedis(cached_provider="ollama")
|
||||
fake_provider = _FakeProvider()
|
||||
registry = AIProviderRegistry()
|
||||
registry.register(fake_provider)
|
||||
|
||||
monkeypatch.setattr(ai_router_module._settings, "MOCK_MODE", False)
|
||||
monkeypatch.setattr("src.core.redis_client.get_redis", lambda: fake_redis)
|
||||
|
||||
result = await AIRouterExecutor(registry).execute(
|
||||
prompt="diagnose alert",
|
||||
provider_order=["ollama_gcp_a", "ollama_gcp_b", "ollama_local"],
|
||||
context={"intent_hint": "diagnose", "alert_type": "HostHighCpuLoad"},
|
||||
)
|
||||
|
||||
assert result.provider == "ollama"
|
||||
assert result.from_cache is True
|
||||
assert fake_provider.calls == 0
|
||||
231
apps/api/tests/test_openclaw_alert_cloud_fallback_gate.py
Normal file
231
apps/api/tests/test_openclaw_alert_cloud_fallback_gate.py
Normal file
@@ -0,0 +1,231 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services import ai_control as ai_control_module
|
||||
from src.services import ai_router as ai_router_module
|
||||
from src.services import openclaw as openclaw_module
|
||||
from src.services.ai_router import AIProviderEnum
|
||||
from src.services.intent_classifier import IntentType
|
||||
from src.services.openclaw import OpenClawService
|
||||
|
||||
|
||||
@dataclass
|
||||
class _FakeEndpoint:
|
||||
provider_name: str
|
||||
url: str = "http://example.test"
|
||||
|
||||
|
||||
class _FakeRoute:
|
||||
def all_endpoints_in_order(self) -> list[_FakeEndpoint]:
|
||||
return [
|
||||
_FakeEndpoint("ollama_gcp_a"),
|
||||
_FakeEndpoint("ollama_gcp_b"),
|
||||
_FakeEndpoint("ollama_local"),
|
||||
_FakeEndpoint("gemini", ""),
|
||||
]
|
||||
|
||||
|
||||
class _FakeFailoverManager:
|
||||
def __init__(self) -> None:
|
||||
self.task_types: list[str] = []
|
||||
|
||||
async def select_provider(self, task_type: str = "general") -> _FakeRoute:
|
||||
self.task_types.append(task_type)
|
||||
return _FakeRoute()
|
||||
|
||||
|
||||
class _UnorderedFailoverManager:
|
||||
async def select_provider(self, task_type: str = "general") -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
all_endpoints_in_order=lambda: [
|
||||
_FakeEndpoint("ollama_local"),
|
||||
_FakeEndpoint("gemini"),
|
||||
_FakeEndpoint("ollama_gcp_b"),
|
||||
_FakeEndpoint("ollama_gcp_a"),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class _FakeRouter:
|
||||
async def route(self, prompt: str, context: dict[str, Any]) -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
selected_provider=AIProviderEnum.GEMINI,
|
||||
fallback_chain=[
|
||||
(AIProviderEnum.CLAUDE, "claude"),
|
||||
(AIProviderEnum.OLLAMA, "qwen2.5:7b-instruct"),
|
||||
],
|
||||
intent=IntentType.DIAGNOSE,
|
||||
routing_reason="high complexity would normally prefer cloud",
|
||||
)
|
||||
|
||||
|
||||
class _FakeExecutor:
|
||||
def __init__(self) -> None:
|
||||
self.provider_order: list[str] | None = None
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
*,
|
||||
prompt: str,
|
||||
provider_order: list[str],
|
||||
context: dict[str, Any],
|
||||
cache_ttl: int,
|
||||
require_local: bool,
|
||||
) -> SimpleNamespace:
|
||||
self.provider_order = provider_order
|
||||
return SimpleNamespace(
|
||||
raw_response='{"root_cause":"ok","suggested_action":"NO_ACTION"}',
|
||||
provider=provider_order[0],
|
||||
success=True,
|
||||
tokens=42,
|
||||
cost_usd=0.0,
|
||||
latency_ms=10.0,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alert_context_uses_ollama_lane_then_gemini_backup(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
fake_executor = _FakeExecutor()
|
||||
fake_failover = _FakeFailoverManager()
|
||||
|
||||
monkeypatch.setattr(openclaw_module.settings, "USE_AI_ROUTER", True)
|
||||
monkeypatch.setattr(openclaw_module.settings, "MOCK_MODE", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", True)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True)
|
||||
monkeypatch.setattr(ai_control_module, "get_ai_router_enabled", AsyncMock(return_value=None))
|
||||
monkeypatch.setattr(ai_control_module, "get_primary_provider", AsyncMock(return_value=None))
|
||||
monkeypatch.setattr(ai_control_module, "is_provider_disabled", AsyncMock(return_value=False))
|
||||
monkeypatch.setattr(ai_router_module, "get_ai_router", lambda: _FakeRouter())
|
||||
monkeypatch.setattr(ai_router_module, "get_ai_executor", lambda: fake_executor)
|
||||
monkeypatch.setattr(openclaw_module, "get_ollama_failover_manager", lambda: fake_failover)
|
||||
|
||||
service = object.__new__(OpenClawService)
|
||||
result = await service._call_with_fallback(
|
||||
"diagnose alert",
|
||||
alert_context={
|
||||
"incident_id": "INC-1",
|
||||
"alertname": "HostHighCpuLoad",
|
||||
"target_resource": "node-exporter-110",
|
||||
},
|
||||
)
|
||||
|
||||
assert result == (
|
||||
'{"root_cause":"ok","suggested_action":"NO_ACTION"}',
|
||||
"ollama_gcp_a",
|
||||
True,
|
||||
42,
|
||||
0.0,
|
||||
)
|
||||
assert fake_executor.provider_order == ["ollama_gcp_a", "ollama_gcp_b", "ollama_local", "gemini"]
|
||||
assert fake_failover.task_types == ["diagnose"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alert_context_can_disable_cloud_backup_for_cost_stop(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
fake_executor = _FakeExecutor()
|
||||
fake_failover = _FakeFailoverManager()
|
||||
|
||||
monkeypatch.setattr(openclaw_module.settings, "USE_AI_ROUTER", True)
|
||||
monkeypatch.setattr(openclaw_module.settings, "MOCK_MODE", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True)
|
||||
monkeypatch.setattr(ai_control_module, "get_ai_router_enabled", AsyncMock(return_value=None))
|
||||
monkeypatch.setattr(ai_control_module, "get_primary_provider", AsyncMock(return_value=None))
|
||||
monkeypatch.setattr(ai_control_module, "is_provider_disabled", AsyncMock(return_value=False))
|
||||
monkeypatch.setattr(ai_router_module, "get_ai_router", lambda: _FakeRouter())
|
||||
monkeypatch.setattr(ai_router_module, "get_ai_executor", lambda: fake_executor)
|
||||
monkeypatch.setattr(openclaw_module, "get_ollama_failover_manager", lambda: fake_failover)
|
||||
|
||||
service = object.__new__(OpenClawService)
|
||||
await service._call_with_fallback(
|
||||
"diagnose alert",
|
||||
alert_context={"incident_id": "INC-1", "alertname": "HostHighCpuLoad"},
|
||||
)
|
||||
|
||||
assert fake_executor.provider_order == ["ollama_gcp_a", "ollama_gcp_b", "ollama_local"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_alert_context_keeps_router_cloud_order(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
fake_executor = _FakeExecutor()
|
||||
|
||||
monkeypatch.setattr(openclaw_module.settings, "USE_AI_ROUTER", True)
|
||||
monkeypatch.setattr(openclaw_module.settings, "MOCK_MODE", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True)
|
||||
monkeypatch.setattr(ai_control_module, "get_ai_router_enabled", AsyncMock(return_value=None))
|
||||
monkeypatch.setattr(ai_control_module, "get_primary_provider", AsyncMock(return_value=None))
|
||||
monkeypatch.setattr(ai_control_module, "is_provider_disabled", AsyncMock(return_value=False))
|
||||
monkeypatch.setattr(ai_router_module, "get_ai_router", lambda: _FakeRouter())
|
||||
monkeypatch.setattr(ai_router_module, "get_ai_executor", lambda: fake_executor)
|
||||
|
||||
service = object.__new__(OpenClawService)
|
||||
await service._call_with_fallback("general question", alert_context={"intent_hint": "query"})
|
||||
|
||||
assert fake_executor.provider_order == ["gemini", "claude", "ollama"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alert_context_uses_gcp_a_gcp_b_then_111_order(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
fake_failover = _FakeFailoverManager()
|
||||
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True)
|
||||
monkeypatch.setattr(openclaw_module, "get_ollama_failover_manager", lambda: fake_failover)
|
||||
|
||||
service = object.__new__(OpenClawService)
|
||||
provider_order = await service._resolve_alert_provider_order(
|
||||
task_type="diagnose",
|
||||
alert_context={"incident_id": "INC-1", "alertname": "HostHighCpuLoad"},
|
||||
)
|
||||
|
||||
assert provider_order == ["ollama_gcp_a", "ollama_gcp_b", "ollama_local"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alert_context_sorts_ollama_lane_and_drops_cloud_providers(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", False)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True)
|
||||
monkeypatch.setattr(openclaw_module, "get_ollama_failover_manager", lambda: _UnorderedFailoverManager())
|
||||
|
||||
service = object.__new__(OpenClawService)
|
||||
provider_order = await service._resolve_alert_provider_order(
|
||||
task_type="diagnose",
|
||||
alert_context={"incident_id": "INC-1", "alertname": "HostHighCpuLoad"},
|
||||
)
|
||||
|
||||
assert provider_order == ["ollama_gcp_a", "ollama_gcp_b", "ollama_local"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alert_context_sorts_ollama_lane_before_gemini_backup(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ALLOW_CLOUD_FALLBACK", True)
|
||||
monkeypatch.setattr(openclaw_module.settings, "ALERT_AI_ENFORCE_OLLAMA_FIRST", True)
|
||||
monkeypatch.setattr(openclaw_module, "get_ollama_failover_manager", lambda: _UnorderedFailoverManager())
|
||||
|
||||
service = object.__new__(OpenClawService)
|
||||
provider_order = await service._resolve_alert_provider_order(
|
||||
task_type="diagnose",
|
||||
alert_context={"incident_id": "INC-1", "alertname": "HostHighCpuLoad"},
|
||||
cloud_provider_order=["claude", "gemini", "ollama"],
|
||||
)
|
||||
|
||||
assert provider_order == ["ollama_gcp_a", "ollama_gcp_b", "ollama_local", "gemini"]
|
||||
Reference in New Issue
Block a user