fix(api): route decision manager ollama calls through fallback
This commit is contained in:
@@ -36,6 +36,7 @@ from src.models.incident import Incident
|
||||
from src.models.playbook import SymptomPattern
|
||||
from src.services.action_parser import parse_kubectl_action
|
||||
from src.services.auto_approve import get_auto_approve_policy
|
||||
from src.services.ollama_endpoint_resolver import resolve_ollama_order
|
||||
from src.services.openclaw import get_openclaw
|
||||
from src.services.playbook_service import get_playbook_service
|
||||
from src.services.telegram_gateway import SILENCE_KEY_PREFIX # P1-24: 統一常數,禁止重複定義
|
||||
@@ -645,11 +646,9 @@ async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -
|
||||
2026-04-11 Claude Sonnet 4.6 Asia/Taipei
|
||||
"""
|
||||
try:
|
||||
from src.core.config import settings
|
||||
import httpx as _httpx
|
||||
|
||||
from src.services.model_registry import get_model as _get_model
|
||||
ollama_url = getattr(settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
|
||||
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.nemoclaw 讀取
|
||||
model = _get_model("ollama", "nemoclaw")
|
||||
|
||||
@@ -673,23 +672,36 @@ async def _nemoclaw_second_opinion(incident: "Incident", primary_result: dict) -
|
||||
)
|
||||
|
||||
async with _httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.post(
|
||||
f"{ollama_url}/api/generate",
|
||||
json={"model": model, "prompt": prompt, "stream": False},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
for endpoint in resolve_ollama_order("deep_rca"):
|
||||
if not endpoint.url:
|
||||
continue
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{endpoint.url.rstrip('/')}/api/generate",
|
||||
json={"model": model, "prompt": prompt, "stream": False},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"nemoclaw_second_opinion_endpoint_failed",
|
||||
provider_name=endpoint.provider_name,
|
||||
endpoint_reason=endpoint.reason,
|
||||
error=str(e),
|
||||
)
|
||||
continue
|
||||
|
||||
advisory = data.get("response", "").strip()
|
||||
# 截取 <think>...</think> 後的正文(deepseek-r1 CoT 格式)
|
||||
if "</think>" in advisory:
|
||||
advisory = advisory.split("</think>", 1)[-1].strip()
|
||||
advisory = data.get("response", "").strip()
|
||||
# 截取 <think>...</think> 後的正文(deepseek-r1 CoT 格式)
|
||||
if "</think>" in advisory:
|
||||
advisory = advisory.split("</think>", 1)[-1].strip()
|
||||
|
||||
return advisory[:300] if advisory else None
|
||||
return advisory[:300] if advisory else None
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
import structlog as _sl
|
||||
_sl.get_logger(__name__).debug("nemoclaw_second_opinion_error", error=str(e))
|
||||
logger.debug("nemoclaw_second_opinion_error", error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
@@ -706,7 +718,6 @@ async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
|
||||
"""
|
||||
try:
|
||||
import httpx as _httpx
|
||||
from src.core.config import settings
|
||||
from src.models.knowledge import (
|
||||
EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate,
|
||||
)
|
||||
@@ -742,16 +753,32 @@ async def _generate_playbook_draft_if_new(incident: "Incident") -> None:
|
||||
)
|
||||
|
||||
from src.services.model_registry import get_model as _get_model
|
||||
ollama_url = getattr(settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
|
||||
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.playbook_draft 讀取
|
||||
_pb_model = _get_model("ollama", "playbook_draft")
|
||||
content = ""
|
||||
async with _httpx.AsyncClient(timeout=45.0) as client:
|
||||
resp = await client.post(
|
||||
f"{ollama_url}/api/generate",
|
||||
json={"model": _pb_model, "prompt": prompt, "stream": False},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
content = resp.json().get("response", "").strip()
|
||||
for endpoint in resolve_ollama_order("deep_rca"):
|
||||
if not endpoint.url:
|
||||
continue
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{endpoint.url.rstrip('/')}/api/generate",
|
||||
json={"model": _pb_model, "prompt": prompt, "stream": False},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
content = resp.json().get("response", "").strip()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"playbook_draft_ollama_endpoint_failed",
|
||||
incident_id=incident.incident_id,
|
||||
provider_name=endpoint.provider_name,
|
||||
endpoint_reason=endpoint.reason,
|
||||
error=str(e),
|
||||
)
|
||||
continue
|
||||
|
||||
if content:
|
||||
break
|
||||
|
||||
if not content or len(content) < 50:
|
||||
return
|
||||
|
||||
158
apps/api/tests/test_decision_manager_ollama_routing.py
Normal file
158
apps/api/tests/test_decision_manager_ollama_routing.py
Normal file
@@ -0,0 +1,158 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services import decision_manager as decision_module
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, response: str) -> None:
|
||||
self._response = response
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
return None
|
||||
|
||||
def json(self) -> dict[str, str]:
|
||||
return {"response": self._response}
|
||||
|
||||
|
||||
class _FakeAsyncClient:
|
||||
posted_urls: list[str] = []
|
||||
fail_urls: set[str] = set()
|
||||
response: str = ""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
|
||||
async def __aenter__(self) -> _FakeAsyncClient:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args: Any) -> None:
|
||||
return None
|
||||
|
||||
async def post(self, url: str, *, json: dict[str, Any]) -> _FakeResponse:
|
||||
self.posted_urls.append(url)
|
||||
if url in self.fail_urls:
|
||||
raise RuntimeError("endpoint unavailable")
|
||||
return _FakeResponse(self.response)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_fake_client() -> None:
|
||||
_FakeAsyncClient.posted_urls = []
|
||||
_FakeAsyncClient.fail_urls = set()
|
||||
_FakeAsyncClient.response = ""
|
||||
|
||||
|
||||
def _incident() -> SimpleNamespace:
|
||||
return SimpleNamespace(
|
||||
incident_id="INC-ROUTE-001",
|
||||
affected_services=["awoooi-api"],
|
||||
signals=[
|
||||
SimpleNamespace(
|
||||
labels={"alertname": "AwoooPRouteTest", "severity": "warning"},
|
||||
alert_name="AwoooPRouteTest",
|
||||
annotations={},
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def _ollama_order(_workload_type: str) -> tuple[SimpleNamespace, ...]:
|
||||
return (
|
||||
SimpleNamespace(
|
||||
url="http://gcp-a:11435",
|
||||
provider_name="ollama_gcp_a",
|
||||
reason="global_primary_gcp_a",
|
||||
),
|
||||
SimpleNamespace(
|
||||
url="http://gcp-b:11436",
|
||||
provider_name="ollama_gcp_b",
|
||||
reason="global_secondary_gcp_b",
|
||||
),
|
||||
SimpleNamespace(
|
||||
url="http://local-111:11434",
|
||||
provider_name="ollama_local",
|
||||
reason="global_local_111",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _patch_ollama_dependencies(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
import httpx
|
||||
|
||||
from src.services import model_registry
|
||||
|
||||
monkeypatch.setattr(httpx, "AsyncClient", _FakeAsyncClient)
|
||||
monkeypatch.setattr(decision_module, "resolve_ollama_order", _ollama_order)
|
||||
monkeypatch.setattr(
|
||||
model_registry,
|
||||
"get_model",
|
||||
lambda _provider, model_key: f"{model_key}-model",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nemoclaw_second_opinion_tries_gcp_b_after_gcp_a_failure(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_patch_ollama_dependencies(monkeypatch)
|
||||
_FakeAsyncClient.fail_urls = {"http://gcp-a:11435/api/generate"}
|
||||
_FakeAsyncClient.response = "<think>scratchpad</think>GCP-B advisory"
|
||||
|
||||
advisory = await decision_module._nemoclaw_second_opinion(
|
||||
_incident(),
|
||||
{"action": "restart", "confidence": 0.4, "reasoning": "primary reasoning"},
|
||||
)
|
||||
|
||||
assert advisory == "GCP-B advisory"
|
||||
assert _FakeAsyncClient.posted_urls == [
|
||||
"http://gcp-a:11435/api/generate",
|
||||
"http://gcp-b:11436/api/generate",
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_playbook_draft_tries_gcp_b_after_gcp_a_failure(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_patch_ollama_dependencies(monkeypatch)
|
||||
_FakeAsyncClient.fail_urls = {"http://gcp-a:11435/api/generate"}
|
||||
_FakeAsyncClient.response = (
|
||||
"## 症狀\nGCP-B 生成的 Playbook 草稿內容。\n"
|
||||
"## 根因假設\n主要服務短暫不可用。\n"
|
||||
"## 診斷步驟\n確認告警、查詢 Pod、檢查近期部署。\n"
|
||||
"## 修復動作\n依標準流程處理。\n"
|
||||
)
|
||||
|
||||
from src.repositories import alert_operation_log_repository
|
||||
from src.services import knowledge_service
|
||||
|
||||
knowledge = SimpleNamespace(
|
||||
semantic_search=AsyncMock(return_value=[]),
|
||||
create_entry=AsyncMock(return_value=SimpleNamespace(entry_id="KB-ROUTE-001")),
|
||||
)
|
||||
op_repo = SimpleNamespace(append=AsyncMock())
|
||||
monkeypatch.setattr(knowledge_service, "get_knowledge_service", lambda: knowledge)
|
||||
monkeypatch.setattr(
|
||||
alert_operation_log_repository,
|
||||
"get_alert_operation_log_repository",
|
||||
lambda: op_repo,
|
||||
)
|
||||
|
||||
await decision_module._generate_playbook_draft_if_new(_incident())
|
||||
|
||||
assert _FakeAsyncClient.posted_urls == [
|
||||
"http://gcp-a:11435/api/generate",
|
||||
"http://gcp-b:11436/api/generate",
|
||||
]
|
||||
knowledge.create_entry.assert_awaited_once()
|
||||
created_entry = knowledge.create_entry.await_args.args[0]
|
||||
assert created_entry.related_incident_id == "INC-ROUTE-001"
|
||||
assert "GCP-B 生成" in created_entry.content
|
||||
op_repo.append.assert_awaited_once()
|
||||
Reference in New Issue
Block a user