fix(api): route decision manager ollama calls through fallback
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 5m59s
CD Pipeline / build-and-deploy (push) Successful in 3m30s
CD Pipeline / post-deploy-checks (push) Successful in 1m17s

This commit is contained in:
Your Name
2026-05-19 13:15:21 +08:00
parent a0ca2ccb7f
commit a379a80ce1
2 changed files with 208 additions and 23 deletions

View File

@@ -36,6 +36,7 @@ from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.action_parser import parse_kubectl_action
from src.services.auto_approve import get_auto_approve_policy
from src.services.ollama_endpoint_resolver import resolve_ollama_order
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
from src.services.telegram_gateway import SILENCE_KEY_PREFIX # P1-24: 統一常數,禁止重複定義
@@ -645,11 +646,9 @@ async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
"""
try:
from src.core.config import settings
import httpx as _httpx
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.nemoclaw 讀取
model = _get_model("ollama", "nemoclaw")
@@ -673,23 +672,36 @@ async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -
)
async with _httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
data = resp.json()
for endpoint in resolve_ollama_order("deep_rca"):
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url.rstrip('/')}/api/generate",
json={"model": model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.debug(
"nemoclaw_second_opinion_endpoint_failed",
provider_name=endpoint.provider_name,
endpoint_reason=endpoint.reason,
error=str(e),
)
continue
advisory = data.get("response", "").strip()
# 截取 <think>...</think> 後的正文deepseek-r1 CoT 格式)
if "</think>" in advisory:
advisory = advisory.split("</think>", 1)[-1].strip()
advisory = data.get("response", "").strip()
# 截取 <think>...</think> 後的正文deepseek-r1 CoT 格式)
if "</think>" in advisory:
advisory = advisory.split("</think>", 1)[-1].strip()
return advisory[:300] if advisory else None
return advisory[:300] if advisory else None
return None
except Exception as e:
import structlog as _sl
_sl.get_logger(__name__).debug("nemoclaw_second_opinion_error", error=str(e))
logger.debug("nemoclaw_second_opinion_error", error=str(e))
return None
@@ -706,7 +718,6 @@ async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
"""
try:
import httpx as _httpx
from src.core.config import settings
from src.models.knowledge import (
EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate,
)
@@ -742,16 +753,32 @@ async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
)
from src.services.model_registry import get_model as _get_model
ollama_url = getattr(settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.playbook_draft 讀取
_pb_model = _get_model("ollama", "playbook_draft")
content = ""
async with _httpx.AsyncClient(timeout=45.0) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": _pb_model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
content = resp.json().get("response", "").strip()
for endpoint in resolve_ollama_order("deep_rca"):
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url.rstrip('/')}/api/generate",
json={"model": _pb_model, "prompt": prompt, "stream": False},
)
resp.raise_for_status()
content = resp.json().get("response", "").strip()
except Exception as e:
logger.debug(
"playbook_draft_ollama_endpoint_failed",
incident_id=incident.incident_id,
provider_name=endpoint.provider_name,
endpoint_reason=endpoint.reason,
error=str(e),
)
continue
if content:
break
if not content or len(content) < 50:
return

View File

@@ -0,0 +1,158 @@
from __future__ import annotations
from types import SimpleNamespace
from typing import Any
from unittest.mock import AsyncMock
import pytest
from src.services import decision_manager as decision_module
class _FakeResponse:
def __init__(self, response: str) -> None:
self._response = response
def raise_for_status(self) -> None:
return None
def json(self) -> dict[str, str]:
return {"response": self._response}
class _FakeAsyncClient:
posted_urls: list[str] = []
fail_urls: set[str] = set()
response: str = ""
def __init__(self, *args: Any, **kwargs: Any) -> None:
self.args = args
self.kwargs = kwargs
async def __aenter__(self) -> _FakeAsyncClient:
return self
async def __aexit__(self, *args: Any) -> None:
return None
async def post(self, url: str, *, json: dict[str, Any]) -> _FakeResponse:
self.posted_urls.append(url)
if url in self.fail_urls:
raise RuntimeError("endpoint unavailable")
return _FakeResponse(self.response)
@pytest.fixture(autouse=True)
def _reset_fake_client() -> None:
_FakeAsyncClient.posted_urls = []
_FakeAsyncClient.fail_urls = set()
_FakeAsyncClient.response = ""
def _incident() -> SimpleNamespace:
return SimpleNamespace(
incident_id="INC-ROUTE-001",
affected_services=["awoooi-api"],
signals=[
SimpleNamespace(
labels={"alertname": "AwoooPRouteTest", "severity": "warning"},
alert_name="AwoooPRouteTest",
annotations={},
)
],
)
def _ollama_order(_workload_type: str) -> tuple[SimpleNamespace, ...]:
return (
SimpleNamespace(
url="http://gcp-a:11435",
provider_name="ollama_gcp_a",
reason="global_primary_gcp_a",
),
SimpleNamespace(
url="http://gcp-b:11436",
provider_name="ollama_gcp_b",
reason="global_secondary_gcp_b",
),
SimpleNamespace(
url="http://local-111:11434",
provider_name="ollama_local",
reason="global_local_111",
),
)
def _patch_ollama_dependencies(monkeypatch: pytest.MonkeyPatch) -> None:
import httpx
from src.services import model_registry
monkeypatch.setattr(httpx, "AsyncClient", _FakeAsyncClient)
monkeypatch.setattr(decision_module, "resolve_ollama_order", _ollama_order)
monkeypatch.setattr(
model_registry,
"get_model",
lambda _provider, model_key: f"{model_key}-model",
)
@pytest.mark.asyncio
async def test_nemoclaw_second_opinion_tries_gcp_b_after_gcp_a_failure(
monkeypatch: pytest.MonkeyPatch,
) -> None:
_patch_ollama_dependencies(monkeypatch)
_FakeAsyncClient.fail_urls = {"http://gcp-a:11435/api/generate"}
_FakeAsyncClient.response = "<think>scratchpad</think>GCP-B advisory"
advisory = await decision_module._nemoclaw_second_opinion(
_incident(),
{"action": "restart", "confidence": 0.4, "reasoning": "primary reasoning"},
)
assert advisory == "GCP-B advisory"
assert _FakeAsyncClient.posted_urls == [
"http://gcp-a:11435/api/generate",
"http://gcp-b:11436/api/generate",
]
@pytest.mark.asyncio
async def test_playbook_draft_tries_gcp_b_after_gcp_a_failure(
monkeypatch: pytest.MonkeyPatch,
) -> None:
_patch_ollama_dependencies(monkeypatch)
_FakeAsyncClient.fail_urls = {"http://gcp-a:11435/api/generate"}
_FakeAsyncClient.response = (
"## 症狀\nGCP-B 生成的 Playbook 草稿內容。\n"
"## 根因假設\n主要服務短暫不可用。\n"
"## 診斷步驟\n確認告警、查詢 Pod、檢查近期部署。\n"
"## 修復動作\n依標準流程處理。\n"
)
from src.repositories import alert_operation_log_repository
from src.services import knowledge_service
knowledge = SimpleNamespace(
semantic_search=AsyncMock(return_value=[]),
create_entry=AsyncMock(return_value=SimpleNamespace(entry_id="KB-ROUTE-001")),
)
op_repo = SimpleNamespace(append=AsyncMock())
monkeypatch.setattr(knowledge_service, "get_knowledge_service", lambda: knowledge)
monkeypatch.setattr(
alert_operation_log_repository,
"get_alert_operation_log_repository",
lambda: op_repo,
)
await decision_module._generate_playbook_draft_if_new(_incident())
assert _FakeAsyncClient.posted_urls == [
"http://gcp-a:11435/api/generate",
"http://gcp-b:11436/api/generate",
]
knowledge.create_entry.assert_awaited_once()
created_entry = knowledge.create_entry.await_args.args[0]
assert created_entry.related_incident_id == "INC-ROUTE-001"
assert "GCP-B 生成" in created_entry.content
op_repo.append.assert_awaited_once()