fix(api): enforce global ollama endpoint order
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 5m13s
CD Pipeline / build-and-deploy (push) Successful in 3m31s
CD Pipeline / post-deploy-checks (push) Successful in 1m18s

This commit is contained in:
Your Name
2026-05-19 12:31:56 +08:00
parent 5fa0e1452c
commit 45cd55b2da
7 changed files with 359 additions and 228 deletions

View File

@@ -22,7 +22,7 @@ import httpx
import structlog
from src.services.model_registry import get_model as _get_model
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import resolve_ollama_order
logger = structlog.get_logger(__name__)
@@ -98,7 +98,15 @@ class OllamaEmbeddingService:
P1 修復 (2026-03-29): 維度配置化,支援更多模型
"""
self._model = model
self._ollama_url = ollama_url or resolve_ollama_endpoint("embedding")
if ollama_url:
self._ollama_endpoints = ((ollama_url, "custom"),)
else:
self._ollama_endpoints = tuple(
(endpoint.url, endpoint.provider_name)
for endpoint in resolve_ollama_order("embedding")
if endpoint.url
)
self._ollama_url = self._ollama_endpoints[0][0] if self._ollama_endpoints else ""
self._timeout = timeout
self._default_dimension = default_dimension or self.MODEL_DIMENSIONS.get(
model, self.DEFAULT_DIMENSION
@@ -142,43 +150,65 @@ class OllamaEmbeddingService:
"""
client = await self._get_client()
try:
response = await client.post(
f"{self._ollama_url}/api/embeddings",
json={
"model": self._model,
"prompt": text,
},
)
response.raise_for_status()
last_error: Exception | None = None
for endpoint_url, provider_name in self._ollama_endpoints:
try:
response = await client.post(
f"{endpoint_url}/api/embeddings",
json={
"model": self._model,
"prompt": text,
},
)
response.raise_for_status()
data = response.json()
embedding = data.get("embedding", [])
data = response.json()
embedding = data.get("embedding", [])
# 更新維度快取
if self._dimension is None and embedding:
self._dimension = len(embedding)
logger.info(
"embedding_dimension_detected",
# 更新維度快取
if self._dimension is None and embedding:
self._dimension = len(embedding)
logger.info(
"embedding_dimension_detected",
model=self._model,
dimension=self._dimension,
provider=provider_name,
)
return embedding
except httpx.TimeoutException as e:
last_error = e
logger.error(
"embedding_timeout",
model=self._model,
dimension=self._dimension,
text_len=len(text),
provider=provider_name,
)
except httpx.HTTPStatusError as e:
last_error = e
logger.error(
"embedding_http_error",
status=e.response.status_code,
model=self._model,
provider=provider_name,
)
except Exception as e:
last_error = e
logger.error(
"embedding_error",
error=str(e),
model=self._model,
provider=provider_name,
)
return embedding
except httpx.TimeoutException as e:
logger.error("embedding_timeout", model=self._model, text_len=len(text))
raise EmbeddingError(f"Embedding timeout after {self._timeout}s") from e
except httpx.HTTPStatusError as e:
logger.error(
"embedding_http_error",
status=e.response.status_code,
model=self._model,
)
raise EmbeddingError(f"Ollama API error: {e.response.status_code}") from e
except Exception as e:
logger.error("embedding_error", error=str(e), model=self._model)
raise EmbeddingError(f"Embedding failed: {e}") from e
if isinstance(last_error, httpx.TimeoutException):
raise EmbeddingError(f"Embedding timeout after {self._timeout}s") from last_error
if isinstance(last_error, httpx.HTTPStatusError):
raise EmbeddingError(
f"Ollama API error: {last_error.response.status_code}"
) from last_error
raise EmbeddingError("Embedding failed on all Ollama endpoints") from last_error
async def embed_batch(
self,

View File

@@ -4,7 +4,7 @@ Knowledge Extractor Service — KB Phase 2-A
Incident resolved 後自動萃取 KB 草稿。
設計原則:
- 強制使用 Ollama llama3.2:3b(本地推理,符合 Phase 24 D7 隱私規則)
- 使用 Ollama llama3.2:3b,依全域順序 GCP-A → GCP-B → 111 嘗試
- fire-and-forget失敗不影響 resolve 主流程
- logger.exception 保留完整 Stack Trace 供 Prompt 調優
@@ -15,11 +15,11 @@ import structlog
logger = structlog.get_logger(__name__)
# 2026-05-05 Codex: KB 萃取走 111 lane避免污染 GCP alert-fast lane
def _get_ollama_base() -> str:
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
# 2026-05-19 Codex: 統帥校正,全 Ollama workload 固定 GCP-A → GCP-B → 111。
def _get_ollama_endpoints():
from src.services.ollama_endpoint_resolver import resolve_ollama_order
return resolve_ollama_endpoint("deep_rca")
return resolve_ollama_order("deep_rca")
_EXTRACT_MODEL = "llama3.2:3b"
_EXTRACT_TIMEOUT = 30.0 # 秒,容忍慢速
@@ -160,36 +160,54 @@ class KnowledgeExtractorService:
不走 AIRouter 是刻意設計:
- KB 萃取是背景工作,不需要完整的路由/閘門/Cache 邏輯
- 強制本地,不允許 fallback 到 cloud provider
- Ollama endpoint 固定依 GCP-A → GCP-B → 111 嘗試
"""
import httpx
try:
async with httpx.AsyncClient(timeout=_EXTRACT_TIMEOUT) as client:
r = await client.post(
f"{_get_ollama_base()}/api/generate",
json={
"model": _EXTRACT_MODEL,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3, # 低溫:減少幻覺
"num_predict": 800, # 控制長度
"stop": ["\n\n\n"], # 防止無限生成
endpoints = _get_ollama_endpoints()
async with httpx.AsyncClient(timeout=_EXTRACT_TIMEOUT) as client:
for endpoint in endpoints:
if not endpoint.url:
continue
try:
r = await client.post(
f"{endpoint.url}/api/generate",
json={
"model": _EXTRACT_MODEL,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3, # 低溫:減少幻覺
"num_predict": 800, # 控制長度
"stop": ["\n\n\n"], # 防止無限生成
},
},
},
)
r.raise_for_status()
text = r.json().get("response", "").strip()
return text or None
)
r.raise_for_status()
text = r.json().get("response", "").strip()
if text:
logger.info(
"kb_ollama_call_success",
model=_EXTRACT_MODEL,
provider=endpoint.provider_name,
base=endpoint.url,
)
return text
except Exception as e:
logger.warning(
"kb_ollama_call_failed",
model=_EXTRACT_MODEL,
provider=endpoint.provider_name,
base=endpoint.url,
error=str(e),
)
except Exception:
logger.exception(
"kb_ollama_call_failed",
model=_EXTRACT_MODEL,
base=_get_ollama_base(),
)
return None
logger.error(
"kb_ollama_all_endpoints_failed",
model=_EXTRACT_MODEL,
attempted=[endpoint.provider_name for endpoint in endpoints],
)
return None
def _extract_title(self, markdown: str, incident) -> str:
"""

View File

@@ -8,7 +8,7 @@ AWOOOI — Knowledge RAG Service (Phase 33, ADR-067)
- 超過 100 筆: 執行 CREATE INDEX ivfflat (手動觸發)
向量模型: bge-m3 (GCP-A/GCP-B/111 Ollama lane, 1024維)
生成模型: qwen2.5:7b-instruct (Ollama 111)
生成模型: qwen2.5:7b-instruct (Ollama GCP-A/GCP-B/111)
leWOOOgo: Service 層只處理業務邏輯DB 存取委派 rag_chunk_repository
架構審查 C1 修正: 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
@@ -22,7 +22,7 @@ import structlog
import src.repositories.rag_chunk_repository as rag_repo
from src.core.config import settings
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import resolve_ollama_order
logger = structlog.get_logger(__name__)
@@ -128,19 +128,35 @@ class KnowledgeRAGService:
# ------------------------------------------------------------------
async def _embed(self, text: str) -> list[float] | None:
try:
http = await self._get_http()
resp = await http.post(
f"{resolve_ollama_endpoint('embedding')}/api/embeddings",
json={
"model": getattr(settings, "OLLAMA_EMBEDDING_MODEL", _EMBED_MODEL),
"prompt": text,
},
)
if resp.status_code == 200:
return resp.json().get("embedding")
except Exception as e:
logger.warning("rag_embed_failed", error=str(e))
http = await self._get_http()
for endpoint in resolve_ollama_order("embedding"):
if not endpoint.url:
continue
try:
resp = await http.post(
f"{endpoint.url}/api/embeddings",
json={
"model": getattr(settings, "OLLAMA_EMBEDDING_MODEL", _EMBED_MODEL),
"prompt": text,
},
)
if resp.status_code == 200:
logger.debug(
"rag_embed_success",
provider=endpoint.provider_name,
)
return resp.json().get("embedding")
logger.warning(
"rag_embed_http_error",
provider=endpoint.provider_name,
status=resp.status_code,
)
except Exception as e:
logger.warning(
"rag_embed_failed",
provider=endpoint.provider_name,
error=str(e),
)
return None
async def _generate_answer(self, question: str, context: str) -> str:
@@ -150,22 +166,38 @@ class KnowledgeRAGService:
f"=== 相關資料 ===\n{context[:6000]}\n\n"
f"=== 問題 ===\n{question}"
)
try:
http = await self._get_http()
resp = await http.post(
f"{resolve_ollama_endpoint('rag')}/api/generate",
json={
"model": _GEN_MODEL,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 512, "temperature": 0.2},
},
timeout=httpx.Timeout(90.0, connect=10.0),
)
if resp.status_code == 200:
return resp.json().get("response", "").strip()
except Exception as e:
logger.error("rag_generate_failed", error=str(e))
http = await self._get_http()
for endpoint in resolve_ollama_order("rag"):
if not endpoint.url:
continue
try:
resp = await http.post(
f"{endpoint.url}/api/generate",
json={
"model": _GEN_MODEL,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 512, "temperature": 0.2},
},
timeout=httpx.Timeout(90.0, connect=10.0),
)
if resp.status_code == 200:
logger.debug(
"rag_generate_success",
provider=endpoint.provider_name,
)
return resp.json().get("response", "").strip()
logger.warning(
"rag_generate_http_error",
provider=endpoint.provider_name,
status=resp.status_code,
)
except Exception as e:
logger.error(
"rag_generate_failed",
provider=endpoint.provider_name,
error=str(e),
)
return "⚠️ RAG 生成失敗,請稍後再試"
# ------------------------------------------------------------------

View File

@@ -20,7 +20,7 @@ import structlog
from src.core.config import get_settings
from src.services.model_registry import get_model
from src.services.ollama_endpoint_resolver import resolve_ollama_endpoint
from src.services.ollama_endpoint_resolver import resolve_ollama_order
logger = structlog.get_logger(__name__)
settings = get_settings()
@@ -120,26 +120,54 @@ class LocalCodeReviewService:
"格式:每個問題獨立一行,以「⚠️」開頭。如果沒有問題,說「✅ 程式碼品質良好」\n\n"
f"=== Diff ===\n{diff[:40000]}\n=== 結束 ==="
)
try:
http = await self._get_http()
resp = await http.post(
f"{resolve_ollama_endpoint('code_review')}/api/generate",
json={
"model": _MODEL_OLLAMA,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 1024, "temperature": 0.1},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
issues = text.count("⚠️")
logger.info("pr_review_ollama_done", pr_id=pr_id, issues=issues)
return {"review_text": text, "issues_count": issues, "model": _MODEL_OLLAMA, "provider": "ollama"}
except httpx.TimeoutException:
logger.warning("pr_review_ollama_timeout", pr_id=pr_id)
except Exception as e:
logger.error("pr_review_ollama_failed", pr_id=pr_id, error=str(e))
http = await self._get_http()
for endpoint in resolve_ollama_order("code_review"):
if not endpoint.url:
continue
try:
resp = await http.post(
f"{endpoint.url}/api/generate",
json={
"model": _MODEL_OLLAMA,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 1024, "temperature": 0.1},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
issues = text.count("⚠️")
logger.info(
"pr_review_ollama_done",
pr_id=pr_id,
issues=issues,
provider=endpoint.provider_name,
)
return {
"review_text": text,
"issues_count": issues,
"model": _MODEL_OLLAMA,
"provider": endpoint.provider_name,
}
logger.warning(
"pr_review_ollama_http_error",
pr_id=pr_id,
provider=endpoint.provider_name,
status=resp.status_code,
)
except httpx.TimeoutException:
logger.warning(
"pr_review_ollama_timeout",
pr_id=pr_id,
provider=endpoint.provider_name,
)
except Exception as e:
logger.error(
"pr_review_ollama_failed",
pr_id=pr_id,
provider=endpoint.provider_name,
error=str(e),
)
return None
async def _review_with_gemini(
@@ -243,26 +271,55 @@ class LocalCodeReviewService:
"格式:每個問題以「⚠️」開頭,如無問題說「✅ Push 品質正常」\n"
"5 行以內,繁體中文。"
)
try:
http = await self._get_http()
resp = await http.post(
f"{resolve_ollama_endpoint('code_review')}/api/generate",
json={
"model": _MODEL_OLLAMA,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 512, "temperature": 0.1},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
issues = text.count("⚠️")
logger.info("push_review_ollama_done", repo=repo_name, branch=branch, issues=issues)
return {"review_text": text, "issues_count": issues, "model": _MODEL_OLLAMA, "provider": "ollama"}
except httpx.TimeoutException:
logger.warning("push_review_ollama_timeout", repo=repo_name)
except Exception as e:
logger.error("push_review_ollama_failed", repo=repo_name, error=str(e))
http = await self._get_http()
for endpoint in resolve_ollama_order("code_review"):
if not endpoint.url:
continue
try:
resp = await http.post(
f"{endpoint.url}/api/generate",
json={
"model": _MODEL_OLLAMA,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 512, "temperature": 0.1},
},
)
if resp.status_code == 200:
text = resp.json().get("response", "").strip()
issues = text.count("⚠️")
logger.info(
"push_review_ollama_done",
repo=repo_name,
branch=branch,
issues=issues,
provider=endpoint.provider_name,
)
return {
"review_text": text,
"issues_count": issues,
"model": _MODEL_OLLAMA,
"provider": endpoint.provider_name,
}
logger.warning(
"push_review_ollama_http_error",
repo=repo_name,
provider=endpoint.provider_name,
status=resp.status_code,
)
except httpx.TimeoutException:
logger.warning(
"push_review_ollama_timeout",
repo=repo_name,
provider=endpoint.provider_name,
)
except Exception as e:
logger.error(
"push_review_ollama_failed",
repo=repo_name,
provider=endpoint.provider_name,
error=str(e),
)
return None
async def close(self) -> None:

View File

@@ -1,9 +1,9 @@
"""
Ollama endpoint resolver for non-critical workload placement.
Ollama endpoint resolver for AWOOOI workload placement.
ADR-110 gives AWOOOI three Ollama endpoints. This resolver is intentionally
small: it chooses the preferred endpoint by workload class, while health-aware
failover remains owned by ollama_failover_manager.py.
ADR-110 gives AWOOOI three Ollama endpoints. The global order is always
GCP-A -> GCP-B -> 111 local; Gemini is owned by the caller/AI Router as the
final non-Ollama fallback.
"""
from __future__ import annotations
@@ -31,28 +31,6 @@ OllamaWorkloadType = Literal[
"dr",
]
_GCP_A_PREFERRED_WORKLOADS = {
"interactive",
"healthcheck",
"alert_fast",
"batch",
"embedding",
"rag",
"code_review",
"shadow",
"canary",
"deep_rca",
"image_analysis",
"hermes",
}
_LOCAL_PREFERRED_WORKLOADS = {
"local_required",
"privacy_sensitive",
"dr",
}
class _OllamaSettings(Protocol):
OLLAMA_URL: str
OLLAMA_SECONDARY_URL: str
@@ -73,56 +51,47 @@ def resolve_ollama_selection(
config: _OllamaSettings | None = None,
) -> OllamaEndpointSelection:
"""Return the preferred Ollama endpoint for a workload class."""
return resolve_ollama_order(workload_type, config=config)[0]
def resolve_ollama_order(
workload_type: OllamaWorkloadType = "interactive",
*,
config: _OllamaSettings | None = None,
) -> tuple[OllamaEndpointSelection, ...]:
"""Return the global Ollama fallback order: GCP-A -> GCP-B -> 111."""
cfg = config or settings
primary = cfg.OLLAMA_URL
secondary = cfg.OLLAMA_SECONDARY_URL
fallback = cfg.OLLAMA_FALLBACK_URL
if workload_type in _LOCAL_PREFERRED_WORKLOADS:
if fallback:
return OllamaEndpointSelection(
url=fallback,
provider_name="ollama_local",
candidates = (
(cfg.OLLAMA_URL, "ollama_gcp_a", "global_primary_gcp_a"),
(cfg.OLLAMA_SECONDARY_URL, "ollama_gcp_b", "global_secondary_gcp_b"),
(cfg.OLLAMA_FALLBACK_URL, "ollama_local", "global_local_111"),
)
selections: list[OllamaEndpointSelection] = []
seen: set[str] = set()
for url, provider_name, reason in candidates:
if not url or url in seen:
continue
seen.add(url)
selections.append(
OllamaEndpointSelection(
url=url,
provider_name=provider_name,
workload_type=workload_type,
reason="local_heavy_or_privacy_lane",
reason=reason,
)
if secondary:
return OllamaEndpointSelection(
url=secondary,
provider_name="ollama_gcp_b",
workload_type=workload_type,
reason="local_missing_gcp_b_fallback",
)
if workload_type not in _GCP_A_PREFERRED_WORKLOADS and secondary:
return OllamaEndpointSelection(
url=secondary,
provider_name="ollama_gcp_b",
workload_type=workload_type,
reason="gcp_b_default_non_alert_lane",
)
if primary:
return OllamaEndpointSelection(
url=primary,
provider_name="ollama_gcp_a",
workload_type=workload_type,
reason="primary_interactive_lane",
)
if selections:
return tuple(selections)
if secondary:
return OllamaEndpointSelection(
url=secondary,
provider_name="ollama_gcp_b",
return (
OllamaEndpointSelection(
url="",
provider_name="ollama_unconfigured",
workload_type=workload_type,
reason="primary_missing_gcp_b_fallback",
)
return OllamaEndpointSelection(
url=fallback,
provider_name="ollama_local",
workload_type=workload_type,
reason="gcp_missing_local_fallback",
reason="no_ollama_endpoint_configured",
),
)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from types import SimpleNamespace
from typing import Any
import httpx
@@ -32,6 +33,14 @@ async def _noop_save(*args: Any, **kwargs: Any) -> None:
return None
def _fake_ollama_order(_workload_type: str) -> tuple[SimpleNamespace, ...]:
return (
SimpleNamespace(url="http://gcp-a:11434", provider_name="ollama_gcp_a"),
SimpleNamespace(url="http://gcp-b:11434", provider_name="ollama_gcp_b"),
SimpleNamespace(url="http://local-111:11434", provider_name="ollama_local"),
)
@pytest.mark.asyncio
async def test_large_pr_uses_local_ollama_when_gemini_fallback_disabled(
monkeypatch: pytest.MonkeyPatch,
@@ -43,8 +52,8 @@ async def test_large_pr_uses_local_ollama_when_gemini_fallback_disabled(
)
monkeypatch.setattr(
review_module,
"resolve_ollama_endpoint",
lambda workload_type: "http://local-111:11434",
"resolve_ollama_order",
_fake_ollama_order,
)
client = _FakeClient()
@@ -68,8 +77,8 @@ async def test_large_pr_uses_local_ollama_when_gemini_fallback_disabled(
)
assert result is not None
assert result["provider"] == "ollama"
assert client.posted_urls == ["http://local-111:11434/api/generate"]
assert result["provider"] == "ollama_gcp_a"
assert client.posted_urls == ["http://gcp-a:11434/api/generate"]
@pytest.mark.asyncio
@@ -83,8 +92,8 @@ async def test_ollama_failure_does_not_fall_back_to_gemini_by_default(
)
monkeypatch.setattr(
review_module,
"resolve_ollama_endpoint",
lambda workload_type: "http://local-111:11434",
"resolve_ollama_order",
_fake_ollama_order,
)
client = _FakeClient(fail=True)
@@ -110,7 +119,11 @@ async def test_ollama_failure_does_not_fall_back_to_gemini_by_default(
assert result is not None
assert result["provider"] == "ollama_unavailable"
assert result["cloud_fallback_skipped"] is True
assert client.posted_urls == ["http://local-111:11434/api/generate"]
assert client.posted_urls == [
"http://gcp-a:11434/api/generate",
"http://gcp-b:11434/api/generate",
"http://local-111:11434/api/generate",
]
@pytest.mark.asyncio

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from types import SimpleNamespace
from src.services.ollama_endpoint_resolver import (
resolve_ollama_order,
resolve_ollama_selection,
)
@@ -20,10 +21,13 @@ def _settings(
)
def test_non_sensitive_workloads_prefer_gcp_a_lane() -> None:
def test_all_workloads_prefer_gcp_a_lane() -> None:
cfg = _settings()
for workload in (
"interactive",
"healthcheck",
"alert_fast",
"batch",
"embedding",
"rag",
@@ -33,29 +37,31 @@ def test_non_sensitive_workloads_prefer_gcp_a_lane() -> None:
"deep_rca",
"image_analysis",
"hermes",
"local_required",
"privacy_sensitive",
"dr",
):
selection = resolve_ollama_selection(workload, config=cfg)
assert selection.url == "http://192.168.0.110:11435"
assert selection.provider_name == "ollama_gcp_a"
assert selection.reason == "primary_interactive_lane"
assert selection.reason == "global_primary_gcp_a"
def test_interactive_workloads_stay_on_gcp_a() -> None:
def test_all_workloads_share_global_ollama_order() -> None:
cfg = _settings()
for workload in ("interactive", "healthcheck", "alert_fast"):
selection = resolve_ollama_selection(workload, config=cfg)
assert selection.url == "http://192.168.0.110:11435"
assert selection.provider_name == "ollama_gcp_a"
def test_local_required_workloads_use_local_lane() -> None:
cfg = _settings()
for workload in ("local_required", "privacy_sensitive", "dr"):
selection = resolve_ollama_selection(workload, config=cfg)
assert selection.url == "http://192.168.0.110:11437"
assert selection.provider_name == "ollama_local"
for workload in ("interactive", "deep_rca", "local_required", "privacy_sensitive", "dr"):
order = resolve_ollama_order(workload, config=cfg)
assert [selection.url for selection in order] == [
"http://192.168.0.110:11435",
"http://192.168.0.110:11436",
"http://192.168.0.110:11437",
]
assert [selection.provider_name for selection in order] == [
"ollama_gcp_a",
"ollama_gcp_b",
"ollama_local",
]
def test_non_sensitive_workloads_fall_back_to_gcp_b_when_primary_missing() -> None:
@@ -64,4 +70,10 @@ def test_non_sensitive_workloads_fall_back_to_gcp_b_when_primary_missing() -> No
selection = resolve_ollama_selection("embedding", config=cfg)
assert selection.url == "http://192.168.0.110:11436"
assert selection.provider_name == "ollama_gcp_b"
assert selection.reason == "primary_missing_gcp_b_fallback"
assert selection.reason == "global_secondary_gcp_b"
order = resolve_ollama_order("embedding", config=cfg)
assert [selection.provider_name for selection in order] == [
"ollama_gcp_b",
"ollama_local",
]