fix(api): route direct ollama callers through ordered fallback
All checks were successful
Code Review / ai-code-review (push) Successful in 23s
CD Pipeline / tests (push) Successful in 5m51s
CD Pipeline / build-and-deploy (push) Successful in 3m29s
CD Pipeline / post-deploy-checks (push) Successful in 1m14s

This commit is contained in:
Your Name
2026-05-19 12:56:13 +08:00
parent 8a0a3f89aa
commit 35fe37c82a
13 changed files with 579 additions and 384 deletions

View File

@@ -64,6 +64,7 @@ async def rag_debug() -> dict:
"""診斷用:確認容器內 docs 路徑 + Ollama 連線"""
import os
from pathlib import Path
import httpx
paths_check = {}
@@ -78,12 +79,23 @@ async def rag_debug() -> dict:
try:
async with httpx.AsyncClient(timeout=10.0) as c:
from src.core.config import get_settings as _gs
from src.services.ollama_endpoint_resolver import resolve_ollama_order
settings = _gs()
r = await c.post(
f"{settings.OLLAMA_URL}/api/embeddings",
json={"model": settings.OLLAMA_EMBEDDING_MODEL, "prompt": "test"},
)
ollama_ok = r.status_code == 200 if r.status_code == 200 else f"http_{r.status_code}"
statuses: list[str] = []
for endpoint in resolve_ollama_order("embedding"):
if not endpoint.url:
continue
r = await c.post(
f"{endpoint.url}/api/embeddings",
json={"model": settings.OLLAMA_EMBEDDING_MODEL, "prompt": "test"},
)
if r.status_code == 200:
ollama_ok = True
break
statuses.append(f"{endpoint.provider_name}=http_{r.status_code}")
if ollama_ok is not True:
ollama_ok = ", ".join(statuses) or "no_endpoint"
except Exception as e:
ollama_ok = f"error: {type(e).__name__}: {e}"

View File

@@ -23,6 +23,7 @@ from src.db.base import get_db_context
from src.hermes.agent_loader import get_agent_system_prompt
from src.hermes.display_names import DEFAULT_AGENT, format_response_header
from src.hermes.safety_hooks import is_dangerous_input, is_mutate_intent
from src.services.ollama_endpoint_resolver import resolve_ollama_order
logger = structlog.get_logger(__name__)
@@ -261,44 +262,46 @@ async def process_nl_message(
t0 = time.monotonic()
# 呼叫 Ollama 本地模型111零費用按 agent 選模型)
# 呼叫 Ollama 模型(GCP-A → GCP-B → 111零費用按 agent 選模型)
model = _pick_model(agent_name)
success = False
error_type: str | None = None
try:
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
ollama_base = resolve_ollama_endpoint("hermes")
async with httpx.AsyncClient(timeout=_OLLAMA_TIMEOUT) as _hc:
resp = await _hc.post(
f"{ollama_base}/api/chat",
json={
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt_with_ctx},
],
"stream": False,
"options": {"num_predict": 1500, "temperature": 0.3},
},
)
resp.raise_for_status()
result_text = resp.json().get("message", {}).get("content", "")
result_text = _strip_think_tags(result_text)
if not result_text:
result_text = "_Agent 回應為空請稍後再試。_"
success = True
except Exception as exc:
error_type = type(exc).__name__
logger.error(
"hermes_nl_ollama_error",
error=str(exc),
agent=agent_name,
model=model,
exc_type=error_type,
)
result_text = ""
async with httpx.AsyncClient(timeout=_OLLAMA_TIMEOUT) as _hc:
for endpoint in resolve_ollama_order("hermes"):
if not endpoint.url:
continue
try:
resp = await _hc.post(
f"{endpoint.url}/api/chat",
json={
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt_with_ctx},
],
"stream": False,
"options": {"num_predict": 1500, "temperature": 0.3},
},
)
resp.raise_for_status()
result_text = resp.json().get("message", {}).get("content", "")
result_text = _strip_think_tags(result_text)
if not result_text:
result_text = "_Agent 回應為空請稍後再試。_"
success = True
break
except Exception as exc:
error_type = type(exc).__name__
logger.error(
"hermes_nl_ollama_error",
error=str(exc),
agent=agent_name,
model=model,
provider=endpoint.provider_name,
exc_type=error_type,
)
if not success:
result_text = f"_Hermes 暫時無法連線({error_type}請稍後再試。_"
latency_ms = int((time.monotonic() - t0) * 1000)

View File

@@ -19,10 +19,11 @@ router = APIRouter()
logger = logging.getLogger(__name__)
# ==================== Ollama Config ====================
# 2026-05-03 ogt: ADR-110 GCP-A Primary — 改從 settings 讀取,不再硬編碼 111
def _get_ollama_base_url() -> str:
from src.core.config import get_settings
return get_settings().OLLAMA_URL
# 2026-05-19 Codex: agent thinking stream follows GCP-A → GCP-B → 111.
def _get_ollama_endpoints():
from src.services.ollama_endpoint_resolver import resolve_ollama_order
return resolve_ollama_order("interactive")
OLLAMA_MODEL = "llama3.2:latest" # 可根據實際部署調整
OLLAMA_TIMEOUT = 120.0 # 串流超時
@@ -112,66 +113,82 @@ async def get_agent_thinking(
# 1. 開始思考
yield f"data: {json.dumps({'type': 'thinking', 'content': '正在連接 AI 模型...'}, ensure_ascii=False)}\n\n"
try:
async with httpx.AsyncClient(timeout=OLLAMA_TIMEOUT) as client:
# 2. 發送請求到 Ollama
yield f"data: {json.dumps({'type': 'thinking', 'content': f'模型: {model}'}, ensure_ascii=False)}\n\n"
last_error = ""
async with httpx.AsyncClient(timeout=OLLAMA_TIMEOUT) as client:
# 2. 發送請求到 Ollama
yield f"data: {json.dumps({'type': 'thinking', 'content': f'模型: {model}'}, ensure_ascii=False)}\n\n"
async with client.stream(
"POST",
f"{_get_ollama_base_url()}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": True,
},
) as response:
if response.status_code != 200:
yield f"data: {json.dumps({'type': 'error', 'content': f'Ollama 錯誤: HTTP {response.status_code}'}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
yield f"data: {json.dumps({'type': 'thinking', 'content': '開始接收 AI 回應...'}, ensure_ascii=False)}\n\n"
# 3. 串流讀取 Ollama 回應
buffer = ""
async for line in response.aiter_lines():
if not line:
for endpoint in _get_ollama_endpoints():
if not endpoint.url:
continue
try:
async with client.stream(
"POST",
f"{endpoint.url}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": True,
},
) as response:
if response.status_code != 200:
last_error = f"HTTP {response.status_code}"
logger.warning(
"agent_thinking_ollama_http_error",
provider=endpoint.provider_name,
status=response.status_code,
)
continue
try:
chunk = json.loads(line)
token = chunk.get("response", "")
done = chunk.get("done", False)
yield f"data: {json.dumps({'type': 'thinking', 'content': '開始接收 AI 回應...'}, ensure_ascii=False)}\n\n"
if token:
# 累積 token每 10 字符或遇到標點符號時發送
buffer += token
if len(buffer) >= 10 or any(p in buffer for p in "。!?,、\n"):
yield f"data: {json.dumps({'type': 'thinking', 'content': buffer}, ensure_ascii=False)}\n\n"
buffer = ""
# 3. 串流讀取 Ollama 回應
buffer = ""
async for line in response.aiter_lines():
if not line:
continue
if done:
# 發送剩餘 buffer
if buffer:
yield f"data: {json.dumps({'type': 'thinking', 'content': buffer}, ensure_ascii=False)}\n\n"
# 發送完成訊息
yield f"data: {json.dumps({'type': 'result', 'content': '分析完成'}, ensure_ascii=False)}\n\n"
break
try:
chunk = json.loads(line)
token = chunk.get("response", "")
done = chunk.get("done", False)
except json.JSONDecodeError as e:
logger.warning(f"JSON 解析失敗: {line[:100]}... - {e}")
continue
if token:
# 累積 token每 10 字符或遇到標點符號時發送
buffer += token
if len(buffer) >= 10 or any(p in buffer for p in "。!?,、\n"):
yield f"data: {json.dumps({'type': 'thinking', 'content': buffer}, ensure_ascii=False)}\n\n"
buffer = ""
except httpx.ConnectError as e:
logger.error(f"無法連接 Ollama: {e}")
yield f"data: {json.dumps({'type': 'error', 'content': f'無法連接 Ollama ({_get_ollama_base_url()})'}, ensure_ascii=False)}\n\n"
except httpx.TimeoutException as e:
logger.error(f"Ollama 超時: {e}")
yield f"data: {json.dumps({'type': 'error', 'content': '請求超時'}, ensure_ascii=False)}\n\n"
except Exception as e:
logger.error(f"未知錯誤: {e}")
yield f"data: {json.dumps({'type': 'error', 'content': f'未知錯誤: {str(e)}'}, ensure_ascii=False)}\n\n"
if done:
# 發送剩餘 buffer
if buffer:
yield f"data: {json.dumps({'type': 'thinking', 'content': buffer}, ensure_ascii=False)}\n\n"
# 發送完成訊息
yield f"data: {json.dumps({'type': 'result', 'content': '分析完成'}, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
except json.JSONDecodeError as e:
logger.warning(f"JSON 解析失敗: {line[:100]}... - {e}")
continue
except (httpx.ConnectError, httpx.TimeoutException) as e:
last_error = type(e).__name__
logger.error(
"agent_thinking_ollama_endpoint_failed",
provider=endpoint.provider_name,
error=str(e),
)
except Exception as e:
last_error = str(e)
logger.error(
"agent_thinking_unknown_error",
provider=endpoint.provider_name,
error=str(e),
)
error_content = f"Ollama 全端點不可用: {last_error or 'unknown'}"
yield f"data: {json.dumps({'type': 'error', 'content': error_content}, ensure_ascii=False)}\n\n"
# 4. 結束標記
yield "data: [DONE]\n\n"

View File

@@ -33,6 +33,7 @@ import yaml
from src.constants.alert_types import ALERTNAME_TO_TYPE
from src.services.action_parser import parse_kubectl_action
from src.services.ollama_endpoint_resolver import resolve_ollama_order
logger = structlog.get_logger(__name__)
@@ -634,10 +635,12 @@ async def _insert_catalog_ai_generated(
)
return
from sqlalchemy import text as _sql
import src.db.base as _db_base
import json as _json
from sqlalchemy import text as _sql
import src.db.base as _db_base
# 從 rule_dict 提取欄位
# 'expr' 在 OpenClaw YAML 規則中不存在(非 PromQL
# 使用 alertname 作為語意佔位(與 yaml_hardcoded 同等策略)
@@ -704,17 +707,30 @@ async def _insert_catalog_ai_generated(
async def _call_ollama(prompt: str, ollama_url: str, model: str) -> str | None:
"""呼叫 Ollama 生成規則 YAML"""
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1}},
)
resp.raise_for_status()
return resp.json().get("response", "")
except Exception as e:
logger.warning("auto_rule_ollama_failed", error=str(e))
return None
endpoints = list(resolve_ollama_order("deep_rca"))
if ollama_url and all(endpoint.url != ollama_url for endpoint in endpoints):
from types import SimpleNamespace
endpoints.insert(0, SimpleNamespace(url=ollama_url, provider_name="custom"))
async with httpx.AsyncClient(timeout=60) as client:
for endpoint in endpoints:
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1}},
)
resp.raise_for_status()
return resp.json().get("response", "")
except Exception as e:
logger.warning(
"auto_rule_ollama_failed",
provider=endpoint.provider_name,
error=str(e),
)
return None
async def _call_gemini(prompt: str, api_key: str) -> str | None:

View File

@@ -25,7 +25,7 @@ import structlog
from src.core.config import get_settings
from src.repositories.incident_repository import get_incident_repository
from src.repositories.k8s_repository import get_k8s_repository
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import resolve_ollama_order
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
@@ -96,42 +96,51 @@ class ChatManager:
settings = get_settings()
model = settings.OPENCLAW_DEFAULT_MODEL
ollama_url = resolve_ollama_endpoint("interactive")
try:
async with httpx.AsyncClient(timeout=40.0) as client:
resp = await client.post(
f"{ollama_url}/api/chat",
json={
"model": model,
"stream": False,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"options": {"num_predict": 900, "temperature": 0.2},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("message", {}).get("content", "").strip()
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip() or raw
async with httpx.AsyncClient(timeout=40.0) as client:
for endpoint in resolve_ollama_order("interactive"):
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url}/api/chat",
json={
"model": model,
"stream": False,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"options": {"num_predict": 900, "temperature": 0.2},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("message", {}).get("content", "").strip()
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip() or raw
eval_count = data.get("eval_count", 0)
prompt_eval_count = data.get("prompt_eval_count", 0)
total_tokens = eval_count + prompt_eval_count
eval_count = data.get("eval_count", 0)
prompt_eval_count = data.get("prompt_eval_count", 0)
total_tokens = eval_count + prompt_eval_count
logger.info(
"openclaw_ollama_chat_usage",
model=model,
endpoint=ollama_url,
prompt_tokens=prompt_eval_count,
output_tokens=eval_count,
)
logger.info(
"openclaw_ollama_chat_usage",
model=model,
endpoint=endpoint.url,
provider=endpoint.provider_name,
prompt_tokens=prompt_eval_count,
output_tokens=eval_count,
)
return f"{text}\n\n<i>🦙 {model} | {total_tokens} tokens | 免費</i>"
except Exception as e:
logger.warning("openclaw_chat_failed", error=str(e))
return None
return f"{text}\n\n<i>🦙 {model} | {total_tokens} tokens | 免費</i>"
except Exception as e:
logger.warning(
"openclaw_chat_endpoint_failed",
provider=endpoint.provider_name,
endpoint=endpoint.url,
error=str(e),
)
logger.warning("openclaw_chat_failed_all_endpoints", model=model)
return None
async def _call_nemotron(self, system_prompt: str, user_message: str) -> str | None:
"""
@@ -146,43 +155,57 @@ class ChatManager:
system_prompt = f"{NEMOCLAW_PERSONA}\n{system_prompt}"
# 2026-05-05 Codex: ADR-110 interactive lane由 resolver 管理 GCP-A/GCP-B/111 拓撲
OLLAMA_URL = resolve_ollama_endpoint("interactive")
MODEL = "deepseek-r1:14b"
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": MODEL,
"stream": False,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"options": {"num_predict": 1200},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("message", {}).get("content", "").strip()
async with httpx.AsyncClient(timeout=120.0) as client:
for endpoint in resolve_ollama_order("interactive"):
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url}/api/chat",
json={
"model": MODEL,
"stream": False,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"options": {"num_predict": 1200},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("message", {}).get("content", "").strip()
# 過濾 deepseek-r1 的 <think>...</think> 推理區塊
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
if not text:
text = raw # 萬一全是 think block直接回傳原文
# 過濾 deepseek-r1 的 <think>...</think> 推理區塊
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
if not text:
text = raw # 萬一全是 think block直接回傳原文
eval_count = data.get("eval_count", 0)
prompt_eval_count = data.get("prompt_eval_count", 0)
total_tokens = eval_count + prompt_eval_count
eval_count = data.get("eval_count", 0)
prompt_eval_count = data.get("prompt_eval_count", 0)
total_tokens = eval_count + prompt_eval_count
logger.info("nemotron_ollama_usage", model=MODEL,
prompt_tokens=prompt_eval_count, output_tokens=eval_count)
logger.info(
"nemotron_ollama_usage",
model=MODEL,
provider=endpoint.provider_name,
prompt_tokens=prompt_eval_count,
output_tokens=eval_count,
)
return f"{text}\n\n<i>🦙 {MODEL} | {total_tokens} tokens | 免費</i>"
except Exception as e:
logger.warning("nemotron_chat_failed", model=MODEL, error=str(e))
return None
return f"{text}\n\n<i>🦙 {MODEL} | {total_tokens} tokens | 免費</i>"
except Exception as e:
logger.warning(
"nemotron_chat_endpoint_failed",
model=MODEL,
provider=endpoint.provider_name,
endpoint=endpoint.url,
error=str(e),
)
logger.warning("nemotron_chat_failed_all_endpoints", model=MODEL)
return None
async def generate_response(
self,

View File

@@ -29,6 +29,7 @@ import httpx
import structlog
from src.core.config import get_settings
from src.services.ollama_endpoint_resolver import resolve_ollama_order
if TYPE_CHECKING:
from src.models.incident import Incident
@@ -186,9 +187,41 @@ class DecisionFusionEngine:
# settings 延遲讀取(避免測試環境初始化問題)
self._settings = get_settings()
@property
def _ollama_url(self) -> str:
return getattr(self._settings, "OLLAMA_URL", "http://34.143.170.20:11434") # 2026-05-03 ogt: ADR-110 GCP-A Primary
async def _call_ollama_generate(
self,
*,
prompt: str,
timeout_sec: float,
num_predict: int,
) -> str:
"""Call Ollama in the global order: GCP-A -> GCP-B -> 111."""
last_error: Exception | None = None
async with httpx.AsyncClient(
timeout=httpx.Timeout(timeout_sec, connect=5.0)
) as client:
for endpoint in resolve_ollama_order("deep_rca"):
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url}{_OLLAMA_GENERATE_PATH}",
json={
"model": "qwen3:8b",
"prompt": prompt,
"stream": False,
"options": {"num_predict": num_predict, "temperature": 0.1},
},
)
resp.raise_for_status()
return resp.json().get("response", "").strip()
except Exception as exc:
last_error = exc
logger.debug(
"decision_fusion_ollama_endpoint_failed",
provider=endpoint.provider_name,
error=str(exc),
)
raise RuntimeError(str(last_error) if last_error else "no_ollama_endpoint")
# =========================================================================
# Public API
@@ -322,21 +355,12 @@ class DecisionFusionEngine:
)
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(_HERMES_TIMEOUT_SEC, connect=5.0)
) as client:
resp = await client.post(
f"{self._ollama_url}{_OLLAMA_GENERATE_PATH}",
json={
"model": "qwen3:8b",
"prompt": prompt,
"stream": False,
"options": {"num_predict": 16, "temperature": 0.1},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
return self._extract_float(text, default=0.5)
text = await self._call_ollama_generate(
prompt=prompt,
timeout_sec=_HERMES_TIMEOUT_SEC,
num_predict=16,
)
return self._extract_float(text, default=0.5)
except Exception as exc:
logger.debug("hermes_score_failed", error=str(exc))
@@ -451,20 +475,11 @@ class DecisionFusionEngine:
"只回覆一個 0-1 的小數,不要解釋。"
)
async with httpx.AsyncClient(
timeout=httpx.Timeout(_ELEPHANT_TIMEOUT_SEC, connect=5.0)
) as client:
resp = await client.post(
f"{self._ollama_url}{_OLLAMA_GENERATE_PATH}",
json={
"model": "qwen3:8b",
"prompt": prompt,
"stream": False,
"options": {"num_predict": 32, "temperature": 0.1},
},
)
resp.raise_for_status()
raw_text = resp.json().get("response", "").strip()
raw_text = await self._call_ollama_generate(
prompt=prompt,
timeout_sec=_ELEPHANT_TIMEOUT_SEC,
num_predict=32,
)
# 移除 deepseek/qwen3 <think> 標籤
clean = re.sub(r"<think>.*?</think>", "", raw_text, flags=re.DOTALL).strip()

View File

@@ -33,6 +33,7 @@ import httpx
import structlog
from src.core.config import get_settings
from src.services.ollama_endpoint_resolver import resolve_ollama_order
if TYPE_CHECKING:
from src.db.models import AiGovernanceEvent
@@ -254,35 +255,47 @@ class DecisionFusionAdapter:
"只輸出 CONFIDENCE 和 ACTION 兩行,不要其他解釋。"
)
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
ollama_url = resolve_ollama_endpoint("deep_rca")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(_LLM_TIMEOUT_SEC, connect=5.0)
) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={
"model": "qwen3:8b",
"prompt": prompt,
"stream": False,
"options": {"num_predict": 128, "temperature": 0.1},
},
)
if resp.status_code != 200:
logger.warning(
"fusion_llm_http_error",
status=resp.status_code,
event_id=event.id,
raw_text = ""
last_error = ""
async with httpx.AsyncClient(
timeout=httpx.Timeout(_LLM_TIMEOUT_SEC, connect=5.0)
) as client:
for endpoint in resolve_ollama_order("deep_rca"):
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url}/api/generate",
json={
"model": "qwen3:8b",
"prompt": prompt,
"stream": False,
"options": {"num_predict": 128, "temperature": 0.1},
},
)
return 0.5, "LLM 不可用,使用中立值)", {"error": f"http_{resp.status_code}"}
if resp.status_code != 200:
last_error = f"http_{resp.status_code}"
logger.warning(
"fusion_llm_http_error",
provider=endpoint.provider_name,
status=resp.status_code,
event_id=event.id,
)
continue
raw_text = resp.json().get("response", "").strip()
except Exception as exc:
logger.warning("fusion_llm_request_failed", event_id=event.id, error=str(exc))
return 0.5, "LLM 連線失敗,使用中立值)", {"error": str(exc)}
raw_text = resp.json().get("response", "").strip()
break
except Exception as exc:
last_error = str(exc)
logger.warning(
"fusion_llm_request_failed",
provider=endpoint.provider_name,
event_id=event.id,
error=str(exc),
)
if not raw_text:
return 0.5, "LLM 連線失敗,使用中立值)", {"error": last_error or "no_ollama_endpoint"}
# 移除 <think> 標籤qwen3 CoT 輸出)
clean = re.sub(r"<think>.*?</think>", "", raw_text, flags=re.DOTALL).strip()

View File

@@ -33,11 +33,6 @@ logger = structlog.get_logger(__name__)
# ============================================================
# 設定
# ============================================================
# 2026-05-05 Codex: 重摘要走 111 lane避免污染 GCP alert-fast lane
def _get_ollama_url() -> str:
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
return resolve_ollama_endpoint("deep_rca")
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.drift_summary 讀取
NARRATOR_MODEL = get_model("ollama", "drift_summary")
NARRATOR_TIMEOUT = 90.0 # seconds

View File

@@ -29,7 +29,7 @@ import httpx
import structlog
from src.services.model_registry import get_model
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import resolve_ollama_order
if TYPE_CHECKING:
pass
@@ -118,31 +118,51 @@ class ImageAnalysisService:
async def _call_llava(self, image_path: Path, question: str) -> str | None:
"""呼叫 llava:latest via Ollama /api/generate with base64 image"""
timed_out = False
try:
image_b64 = base64.b64encode(image_path.read_bytes()).decode()
http = await self._get_http()
resp = await http.post(
f"{resolve_ollama_endpoint('image_analysis')}/api/generate",
json={
"model": _MODEL,
"prompt": question,
"images": [image_b64],
"stream": False,
"options": {
"num_predict": 512,
"temperature": 0.3,
},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
return text or None
else:
logger.warning("image_analysis_ollama_error", status=resp.status_code)
return None
except httpx.TimeoutException:
logger.warning("image_analysis_timeout", path=str(image_path))
return "⚠️ 圖片分析超時llava 處理中),請稍後重試"
for endpoint in resolve_ollama_order("image_analysis"):
if not endpoint.url:
continue
try:
resp = await http.post(
f"{endpoint.url}/api/generate",
json={
"model": _MODEL,
"prompt": question,
"images": [image_b64],
"stream": False,
"options": {
"num_predict": 512,
"temperature": 0.3,
},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
return text or None
logger.warning(
"image_analysis_ollama_error",
provider=endpoint.provider_name,
status=resp.status_code,
)
except httpx.TimeoutException:
timed_out = True
logger.warning(
"image_analysis_timeout",
provider=endpoint.provider_name,
path=str(image_path),
)
except Exception as e:
logger.error(
"image_analysis_failed",
provider=endpoint.provider_name,
error=str(e),
)
if timed_out:
return "⚠️ 圖片分析超時llava 處理中),請稍後重試"
return None
except Exception as e:
logger.error("image_analysis_failed", error=str(e))
return None

View File

@@ -32,7 +32,7 @@ import httpx
import structlog
from src.services.model_registry import get_model_registry
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import resolve_ollama_order
logger = structlog.get_logger(__name__)
@@ -546,27 +546,56 @@ class IntentClassifier:
# 取得模型配置
model_name = self.llm_model # qwen2.5:1b 或配置值
# 呼叫 Ollama
# 呼叫 OllamaGCP-A → GCP-B → 111
result_text = ""
last_error: Exception | None = None
async with httpx.AsyncClient() as client:
response = await client.post(
f"{resolve_ollama_endpoint('hermes')}/api/generate",
json={
"model": model_name,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {
"num_predict": 128, # 意圖分類只需短回應
"temperature": 0.0, # 確定性輸出
"top_p": 0.9,
},
},
timeout=httpx.Timeout(5.0, connect=2.0), # 嚴格超時
)
for endpoint in resolve_ollama_order("hermes"):
if not endpoint.url:
continue
try:
response = await client.post(
f"{endpoint.url}/api/generate",
json={
"model": model_name,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {
"num_predict": 128, # 意圖分類只需短回應
"temperature": 0.0, # 確定性輸出
"top_p": 0.9,
},
},
timeout=httpx.Timeout(5.0, connect=2.0), # 嚴格超時
)
response.raise_for_status()
data = response.json()
result_text = data.get("response", "")
response.raise_for_status()
data = response.json()
result_text = data.get("response", "")
break
except Exception as e:
last_error = e
logger.warning(
"intent_llm_endpoint_failed",
provider=endpoint.provider_name,
error=str(e),
error_type=type(e).__name__,
)
if not result_text:
_kw_result = self._keyword_classify(text)
return IntentResult(
intent=_kw_result.intent,
confidence=_kw_result.confidence,
method="llm_unavailable_keyword",
matched_keywords=_kw_result.matched_keywords,
detected_resources=_kw_result.detected_resources,
reasoning=(
f"LLM 全端點不可用({type(last_error).__name__ if last_error else 'no_endpoint'}"
f"{_kw_result.reasoning}"
),
)
# 解析 JSON 回應
elapsed_ms = (time.time() - start_time) * 1000

View File

@@ -33,11 +33,11 @@ logger = structlog.get_logger(__name__)
# ============================================================
# 設定
# ============================================================
# 2026-05-03 ogt: ADR-110 GCP-A Primary — 改從 settings 讀取,不再硬編碼 111
def _get_ollama_url() -> str:
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
# 2026-05-19 Codex: 所有 Ollama workload 固定 GCP-A → GCP-B → 111
def _get_ollama_endpoints():
from src.services.ollama_endpoint_resolver import resolve_ollama_order
return resolve_ollama_endpoint("deep_rca")
return resolve_ollama_order("deep_rca")
# D1 集中化 2026-04-11: 從 models.json providers.ollama.models.log_anomaly 讀取
SUMMARY_MODEL = get_model("ollama", "log_anomaly")
LLM_TIMEOUT = 180.0 # deepseek-r1 硬超時
@@ -209,31 +209,40 @@ class LogSummaryService:
log_snippet=log_snippet[:3000], # 避免超出 context
)
try:
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
resp = await client.post(
f"{_get_ollama_url()}/api/generate",
json={
"model": SUMMARY_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 200},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("response", "").strip()
async with httpx.AsyncClient(timeout=LLM_TIMEOUT) as client:
for endpoint in _get_ollama_endpoints():
if not endpoint.url:
continue
try:
resp = await client.post(
f"{endpoint.url}/api/generate",
json={
"model": SUMMARY_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 200},
},
)
resp.raise_for_status()
data = resp.json()
raw = data.get("response", "").strip()
# 過濾 deepseek-r1 的 <think>...</think> 推理區塊
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
return text or raw or None
except httpx.TimeoutException:
logger.warning("log_summary_llm_timeout", model=SUMMARY_MODEL)
return None
except Exception as e:
logger.warning("log_summary_llm_error", error=str(e))
return None
# 過濾 deepseek-r1 的 <think>...</think> 推理區塊
text = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip()
return text or raw or None
except httpx.TimeoutException:
logger.warning(
"log_summary_llm_timeout",
model=SUMMARY_MODEL,
provider=endpoint.provider_name,
)
except Exception as e:
logger.warning(
"log_summary_llm_error",
provider=endpoint.provider_name,
error=str(e),
)
return None
# ============================================================

View File

@@ -40,7 +40,10 @@ from src.models.nvidia import (
from src.services.langfuse_client import ( # 2026-03-29 ogt: P1-1 Langfuse 整合
LangfuseTraceContext,
)
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import (
resolve_ollama_endpoint,
resolve_ollama_order,
)
logger = structlog.get_logger(__name__)
settings = get_settings()
@@ -874,15 +877,30 @@ class OllamaToolProvider:
return [tc for tc in tool_calls if self.is_high_risk_tool(tc.tool_name)]
def _base_url(self) -> str:
"""Tool-calling/Hermes models stay off the GCP alert lane."""
"""Return the first Hermes/tool endpoint for backward compatibility."""
return resolve_ollama_endpoint("hermes").rstrip("/")
def _base_urls(self) -> list[str]:
"""Tool-calling/Hermes follows GCP-A -> GCP-B -> 111."""
urls = [self._base_url()]
urls.extend(endpoint.url.rstrip("/") for endpoint in resolve_ollama_order("hermes") if endpoint.url)
deduped: list[str] = []
for url in urls:
if url and url not in deduped:
deduped.append(url)
return deduped
async def health_check(self) -> bool:
try:
client = await self._get_client()
base_url = self._base_url()
resp = await client.get(f"{base_url}/api/tags", timeout=5.0)
return resp.status_code == 200
for base_url in self._base_urls():
try:
resp = await client.get(f"{base_url}/api/tags", timeout=5.0)
if resp.status_code == 200:
return True
except Exception:
continue
return False
except Exception:
return False
@@ -897,8 +915,6 @@ class OllamaToolProvider:
"""Ollama /v1/chat/completions tool calling"""
start_time = time.perf_counter()
model = model or settings.OLLAMA_TOOL_MODEL
base_url = self._base_url()
url = f"{base_url}/v1/chat/completions"
# 轉換 tools 為 dict 格式(同 NvidiaProvider
tools_data = []
@@ -919,68 +935,78 @@ class OllamaToolProvider:
try:
client = await self._get_client()
response = await client.post(url, json=request_body)
latency_ms = (time.perf_counter() - start_time) * 1000
if response.status_code != 200:
logger.warning(
"ollama_tool_call_http_error",
status=response.status_code,
body=response.text[:200],
last_error = ""
for base_url in self._base_urls():
response = await client.post(
f"{base_url}/v1/chat/completions",
json=request_body,
)
latency_ms = (time.perf_counter() - start_time) * 1000
if response.status_code != 200:
last_error = f"Ollama HTTP {response.status_code}"
logger.warning(
"ollama_tool_call_http_error",
status=response.status_code,
body=response.text[:200],
endpoint=base_url,
)
continue
data = response.json()
# 解析 tool_callsOpenAI 格式)
choices = data.get("choices", [])
if not choices:
last_error = "Ollama 無回應"
continue
message = choices[0].get("message", {})
raw_tool_calls = message.get("tool_calls", [])
tool_call_results: list[ToolCallValidationResult] = []
for tc in raw_tool_calls:
fn = tc.get("function", {})
name = fn.get("name", "")
args_raw = fn.get("arguments", "{}")
try:
args = json.loads(args_raw) if isinstance(args_raw, str) else args_raw
except json.JSONDecodeError:
args = {}
tool_call_results.append(ToolCallValidationResult(
tool_name=name,
arguments=args,
valid=bool(name),
))
usage_data = data.get("usage", {})
from src.models.nvidia import NvidiaUsage
usage = NvidiaUsage(
prompt_tokens=usage_data.get("prompt_tokens", 0),
completion_tokens=usage_data.get("completion_tokens", 0),
total_tokens=usage_data.get("total_tokens", 0),
)
logger.info(
"ollama_tool_call_success",
model=model,
tool_count=len(tool_call_results),
latency_ms=round(latency_ms, 1),
tokens=usage.total_tokens,
endpoint=base_url,
)
return NvidiaProviderResult(
success=False,
error=f"Ollama HTTP {response.status_code}",
success=True,
tool_calls=tool_call_results,
latency_ms=latency_ms,
fallback_triggered=True,
usage=usage,
)
data = response.json()
# 解析 tool_callsOpenAI 格式)
choices = data.get("choices", [])
if not choices:
return NvidiaProviderResult(
success=False, error="Ollama 無回應", latency_ms=latency_ms, fallback_triggered=True
)
message = choices[0].get("message", {})
raw_tool_calls = message.get("tool_calls", [])
tool_call_results: list[ToolCallValidationResult] = []
for tc in raw_tool_calls:
fn = tc.get("function", {})
name = fn.get("name", "")
args_raw = fn.get("arguments", "{}")
try:
args = json.loads(args_raw) if isinstance(args_raw, str) else args_raw
except json.JSONDecodeError:
args = {}
tool_call_results.append(ToolCallValidationResult(
tool_name=name,
arguments=args,
valid=bool(name),
))
usage_data = data.get("usage", {})
from src.models.nvidia import NvidiaUsage
usage = NvidiaUsage(
prompt_tokens=usage_data.get("prompt_tokens", 0),
completion_tokens=usage_data.get("completion_tokens", 0),
total_tokens=usage_data.get("total_tokens", 0),
)
logger.info(
"ollama_tool_call_success",
model=model,
tool_count=len(tool_call_results),
latency_ms=round(latency_ms, 1),
tokens=usage.total_tokens,
)
latency_ms = (time.perf_counter() - start_time) * 1000
return NvidiaProviderResult(
success=True,
tool_calls=tool_call_results,
success=False,
error=last_error or "Ollama 無回應",
latency_ms=latency_ms,
usage=usage,
fallback_triggered=True,
)
except Exception as e:
@@ -993,16 +1019,25 @@ class OllamaToolProvider:
async def chat(self, prompt: str, model: str = "", temperature: float = 0.7, max_tokens: int = 512) -> str:
"""簡單 chat非 tool calling 路徑,保持 INvidiaProvider 相容)"""
model = model or settings.OLLAMA_TOOL_MODEL
base_url = self._base_url()
try:
client = await self._get_client()
resp = await client.post(
f"{base_url}/v1/chat/completions",
json={"model": model, "messages": [{"role": "user", "content": prompt}],
"temperature": temperature, "max_tokens": max_tokens},
)
data = resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
last_error = ""
for base_url in self._base_urls():
try:
resp = await client.post(
f"{base_url}/v1/chat/completions",
json={"model": model, "messages": [{"role": "user", "content": prompt}],
"temperature": temperature, "max_tokens": max_tokens},
)
if resp.status_code != 200:
last_error = f"http_{resp.status_code}"
continue
data = resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
except Exception as e:
last_error = str(e)
continue
return f"Ollama chat error: {last_error or 'no endpoint'}"
except Exception as e:
return f"Ollama chat error: {e}"

View File

@@ -47,6 +47,14 @@ def _settings() -> SimpleNamespace:
return SimpleNamespace(OPENCLAW_DEFAULT_MODEL="qwen3:14b")
def _fake_ollama_order(_workload_type: str) -> tuple[SimpleNamespace, ...]:
return (
SimpleNamespace(url="http://gcp-a:11435", provider_name="ollama_gcp_a"),
SimpleNamespace(url="http://gcp-b:11436", provider_name="ollama_gcp_b"),
SimpleNamespace(url="http://local-111:11434", provider_name="ollama_local"),
)
@pytest.fixture(autouse=True)
def _reset_fake_client() -> None:
_FakeAsyncClient.posted = []
@@ -60,8 +68,8 @@ async def test_openclaw_chat_uses_ollama_interactive_lane(
monkeypatch.setattr(chat_module, "get_settings", _settings)
monkeypatch.setattr(
chat_module,
"resolve_ollama_endpoint",
lambda workload_type: "http://gcp-a:11435",
"resolve_ollama_order",
_fake_ollama_order,
)
result = await ChatManager()._call_openclaw("system context", "幫我看狀態")
@@ -85,8 +93,8 @@ async def test_nemoclaw_chat_uses_resolved_interactive_lane(
monkeypatch.setattr(chat_module.httpx, "AsyncClient", _FakeAsyncClient)
monkeypatch.setattr(
chat_module,
"resolve_ollama_endpoint",
lambda workload_type: "http://gcp-a:11435",
"resolve_ollama_order",
_fake_ollama_order,
)
result = await ChatManager()._call_nemotron("system context", "補充觀點")