feat(heartbeat): show ollama endpoint topology
This commit is contained in:
@@ -108,6 +108,7 @@ class HeartbeatReport:
|
||||
timestamp: datetime
|
||||
ai_services: dict[str, ProbeResult] = field(default_factory=dict)
|
||||
ollama_models: dict[str, bool] = field(default_factory=dict)
|
||||
ollama_endpoints: dict[str, ProbeResult] = field(default_factory=dict)
|
||||
mcp_providers: dict[str, ProbeResult] = field(default_factory=dict)
|
||||
flywheel: FlywheelStats = field(default_factory=FlywheelStats)
|
||||
infra: dict[str, ProbeResult] = field(default_factory=dict)
|
||||
@@ -181,6 +182,7 @@ class HeartbeatReportService:
|
||||
ollama_data = collected["_ollama"] or {}
|
||||
report.ai_services["ollama"] = ollama_data.get("probe", ProbeResult(False, "❌ 無回應"))
|
||||
report.ollama_models = ollama_data.get("models", {})
|
||||
report.ollama_endpoints = ollama_data.get("endpoints", {})
|
||||
report.ai_services["nemotron"] = collected["_nemotron"] or ProbeResult(False, "❌ 無回應")
|
||||
report.ai_services["gemini"] = collected["_gemini"] or ProbeResult(False, "❌ 無回應")
|
||||
report.ai_services["claude"] = collected["_claude"] or ProbeResult(False, "❌ 無回應")
|
||||
@@ -224,37 +226,62 @@ class HeartbeatReportService:
|
||||
|
||||
async def _probe_ollama(self) -> dict:
|
||||
"""探測 Ollama 服務 + 逐一確認所需模型"""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
|
||||
endpoints = [
|
||||
("GCP-A", settings.OLLAMA_URL),
|
||||
("GCP-B", getattr(settings, "OLLAMA_SECONDARY_URL", "")),
|
||||
("111", getattr(settings, "OLLAMA_FALLBACK_URL", "")),
|
||||
]
|
||||
|
||||
async def _probe_endpoint(
|
||||
client: httpx.AsyncClient,
|
||||
label: str,
|
||||
url: str,
|
||||
) -> tuple[str, ProbeResult, set[str]]:
|
||||
if not url:
|
||||
return label, ProbeResult(False, "⚠️ 未設定"), set()
|
||||
try:
|
||||
t0 = asyncio.get_event_loop().time()
|
||||
resp = await client.get(f"{settings.OLLAMA_URL}/api/tags")
|
||||
resp = await client.get(f"{url}/api/tags")
|
||||
latency = (asyncio.get_event_loop().time() - t0) * 1000
|
||||
if resp.status_code != 200:
|
||||
return label, ProbeResult(False, f"❌ HTTP {resp.status_code}", latency), set()
|
||||
available = {m["name"] for m in resp.json().get("models", [])}
|
||||
return label, ProbeResult(True, "✅ 正常", round(latency, 1)), available
|
||||
except Exception as e:
|
||||
return label, ProbeResult(False, f"❌ {str(e)[:60]}"), set()
|
||||
|
||||
if resp.status_code != 200:
|
||||
return {
|
||||
"probe": ProbeResult(False, f"❌ HTTP {resp.status_code}", latency),
|
||||
"models": {},
|
||||
}
|
||||
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
|
||||
results = await asyncio.gather(
|
||||
*[_probe_endpoint(client, label, url) for label, url in endpoints],
|
||||
)
|
||||
|
||||
available = {m["name"] for m in resp.json().get("models", [])}
|
||||
endpoint_status = {label: probe for label, probe, _available in results}
|
||||
primary_probe = endpoint_status.get("GCP-A", ProbeResult(False, "❌ 無回應"))
|
||||
primary_available = next(
|
||||
(available for label, _probe, available in results if label == "GCP-A"),
|
||||
set(),
|
||||
)
|
||||
|
||||
if primary_probe.ok:
|
||||
# 也把 short name(無 :tag)加進去方便匹配
|
||||
available_short = {n.split(":")[0] for n in available}
|
||||
available_short = {n.split(":")[0] for n in primary_available}
|
||||
|
||||
model_status: dict[str, bool] = {}
|
||||
for required in settings.OLLAMA_REQUIRED_MODELS:
|
||||
req_short = required.split(":")[0]
|
||||
ok = required in available or req_short in available_short
|
||||
ok = required in primary_available or req_short in available_short
|
||||
model_status[required] = ok
|
||||
|
||||
return {
|
||||
"probe": ProbeResult(True, "✅ 正常", round(latency, 1)),
|
||||
"probe": primary_probe,
|
||||
"models": model_status,
|
||||
"endpoints": endpoint_status,
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"probe": ProbeResult(False, f"❌ {str(e)[:60]}"),
|
||||
"models": {},
|
||||
}
|
||||
|
||||
return {
|
||||
"probe": primary_probe,
|
||||
"models": {},
|
||||
"endpoints": endpoint_status,
|
||||
}
|
||||
|
||||
async def _probe_nemotron(self) -> ProbeResult:
|
||||
"""探測 Nemotron NIM API"""
|
||||
@@ -437,9 +464,11 @@ class HeartbeatReportService:
|
||||
|
||||
try:
|
||||
# KM 向量化率(DB 查詢)
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.db.models import KnowledgeEntryRecord
|
||||
from sqlalchemy import func, select, text as sa_text
|
||||
async with get_db_context() as db:
|
||||
# KM 總數
|
||||
km_total = await db.scalar(select(func.count()).select_from(KnowledgeEntryRecord))
|
||||
@@ -490,8 +519,9 @@ class HeartbeatReportService:
|
||||
"""查 24h 告警流水線統計(approval_records)"""
|
||||
stats = AlertPipelineStats()
|
||||
try:
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
async with get_db_context() as db:
|
||||
r = await db.execute(sa_text("""
|
||||
SELECT
|
||||
@@ -517,8 +547,9 @@ class HeartbeatReportService:
|
||||
"""探測 PostgreSQL 與 Redis 連線健康"""
|
||||
s = DbRedisStats()
|
||||
try:
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
async with get_db_context() as db:
|
||||
await db.execute(sa_text("SELECT 1"))
|
||||
s.db_ok = True
|
||||
@@ -652,8 +683,9 @@ class HeartbeatReportService:
|
||||
logger.debug("heartbeat_automation_redis_failed", error=str(e))
|
||||
|
||||
try:
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
async with get_db_context() as db:
|
||||
# 今日新增 KM(timestamptz 直接比較,不需 AT TIME ZONE)
|
||||
km_today = await db.scalar(sa_text(
|
||||
@@ -686,6 +718,10 @@ class HeartbeatReportService:
|
||||
if not loaded:
|
||||
warnings.append(f"{model} 未載入,相關功能失效")
|
||||
|
||||
for name, probe in report.ollama_endpoints.items():
|
||||
if not probe.ok and not probe.status.startswith("⚠️ 未設定"):
|
||||
warnings.append(f"Ollama {name} 異常: {probe.status}")
|
||||
|
||||
# AI 服務異常
|
||||
for name, probe in report.ai_services.items():
|
||||
if not probe.ok and not probe.status.startswith("⚠️"):
|
||||
@@ -816,6 +852,12 @@ def report_to_telegram_html(report: HeartbeatReport) -> str:
|
||||
|
||||
lines.append("🤖 <b>AI 服務</b>")
|
||||
lines.append(f"├─ Ollama: {ollama.status}{ollama_lat} <code>{html.escape(models_str)}</code>")
|
||||
if report.ollama_endpoints:
|
||||
endpoint_items = list(report.ollama_endpoints.items())
|
||||
for idx, (name, probe) in enumerate(endpoint_items):
|
||||
branch = "└" if idx == len(endpoint_items) - 1 else "├"
|
||||
latency = f" {probe.latency_ms:.0f}ms" if probe.latency_ms else ""
|
||||
lines.append(f"│ {branch}─ {html.escape(name)}: {probe.status}{latency}")
|
||||
lines.append(f"├─ Nemotron NIM: {nem.status}" + (f" {nem.latency_ms:.0f}ms" if nem.latency_ms else ""))
|
||||
lines.append(f"├─ Gemini API: {gem.status}" + (f" {gem.latency_ms:.0f}ms" if gem.latency_ms else ""))
|
||||
lines.append(f"└─ Claude API: {cla.status}" + (f" {cla.latency_ms:.0f}ms" if cla.latency_ms else ""))
|
||||
|
||||
77
apps/api/tests/test_heartbeat_ollama_endpoints.py
Normal file
77
apps/api/tests/test_heartbeat_ollama_endpoints.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services import heartbeat_report_service as heartbeat
|
||||
|
||||
|
||||
class _FakeResponse:
|
||||
def __init__(self, status_code: int, payload: dict[str, Any] | None = None) -> None:
|
||||
self.status_code = status_code
|
||||
self._payload = payload or {}
|
||||
|
||||
def json(self) -> dict[str, Any]:
|
||||
return self._payload
|
||||
|
||||
|
||||
class _FakeAsyncClient:
|
||||
def __init__(self, *_args: Any, **_kwargs: Any) -> None:
|
||||
pass
|
||||
|
||||
async def __aenter__(self) -> "_FakeAsyncClient":
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_args: Any) -> None:
|
||||
return None
|
||||
|
||||
async def get(self, url: str) -> _FakeResponse:
|
||||
if url.startswith("http://gcp-a"):
|
||||
return _FakeResponse(
|
||||
200,
|
||||
{"models": [{"name": "qwen3:14b"}, {"name": "bge-m3:latest"}]},
|
||||
)
|
||||
if url.startswith("http://gcp-b"):
|
||||
return _FakeResponse(200, {"models": [{"name": "gemma3:4b"}]})
|
||||
raise TimeoutError("connect failed")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_probe_ollama_reports_each_endpoint(monkeypatch) -> None:
|
||||
monkeypatch.setattr(heartbeat.httpx, "AsyncClient", _FakeAsyncClient)
|
||||
monkeypatch.setattr(heartbeat.settings, "OLLAMA_URL", "http://gcp-a:11434")
|
||||
monkeypatch.setattr(heartbeat.settings, "OLLAMA_SECONDARY_URL", "http://gcp-b:11434")
|
||||
monkeypatch.setattr(heartbeat.settings, "OLLAMA_FALLBACK_URL", "http://local-111:11434")
|
||||
monkeypatch.setattr(heartbeat.settings, "OLLAMA_REQUIRED_MODELS", ["qwen3:14b", "bge-m3:latest"])
|
||||
|
||||
result = await heartbeat.HeartbeatReportService()._probe_ollama()
|
||||
|
||||
assert result["probe"].ok is True
|
||||
assert result["models"] == {"qwen3:14b": True, "bge-m3:latest": True}
|
||||
assert result["endpoints"]["GCP-A"].ok is True
|
||||
assert result["endpoints"]["GCP-B"].ok is True
|
||||
assert result["endpoints"]["111"].ok is False
|
||||
|
||||
|
||||
def test_report_to_telegram_html_renders_ollama_endpoint_statuses() -> None:
|
||||
report = heartbeat.HeartbeatReport(timestamp=datetime(2026, 5, 6, 18, 0))
|
||||
report.ai_services["ollama"] = heartbeat.ProbeResult(True, "✅ 正常", 1200)
|
||||
report.ai_services["nemotron"] = heartbeat.ProbeResult(True, "✅ 正常", 900)
|
||||
report.ai_services["gemini"] = heartbeat.ProbeResult(True, "✅ 正常", 800)
|
||||
report.ai_services["claude"] = heartbeat.ProbeResult(True, "✅ 正常", 700)
|
||||
report.ollama_models = {"qwen3:14b": True}
|
||||
report.ollama_endpoints = {
|
||||
"GCP-A": heartbeat.ProbeResult(True, "✅ 正常", 1000),
|
||||
"GCP-B": heartbeat.ProbeResult(True, "✅ 正常", 1100),
|
||||
"111": heartbeat.ProbeResult(False, "❌ connect failed"),
|
||||
}
|
||||
report.warnings = heartbeat.HeartbeatReportService()._build_warnings(report)
|
||||
|
||||
text = heartbeat.report_to_telegram_html(report)
|
||||
|
||||
assert "GCP-A: ✅ 正常" in text
|
||||
assert "GCP-B: ✅ 正常" in text
|
||||
assert "111: ❌ connect failed" in text
|
||||
assert "Ollama 111 異常" in "\n".join(report.warnings)
|
||||
@@ -3986,3 +3986,31 @@ ruff check --select F401,F821,I001 apps/api/src/services/mcp_audit_context.py ap
|
||||
|
||||
- `ollama188-retirement-gate.sh` 預設 24 小時窗口仍會看到退場前歷史 POST,因此短期會 fail;判斷「現在是否仍打 188」需用較短觀察窗口。
|
||||
- 後續若要讓 111 真正成為第三順位可用 fallback,需要先修通 K8s / API Pod 到 `192.168.0.111:11434` 的網路路徑。
|
||||
|
||||
---
|
||||
|
||||
## 2026-05-06(台北)— 心跳報告列出 Ollama 三段式端點
|
||||
|
||||
**觸發**:系統報告原本只顯示單一 `Ollama: 正常`,容易讓人誤判 111、GCP-A、GCP-B 的實際狀態;在 111 網路不可達時,報告仍可能看起來「全系統正常」。
|
||||
|
||||
### 已修正
|
||||
|
||||
| 範圍 | 結果 |
|
||||
|------|------|
|
||||
| `heartbeat_report_service.py` | `_probe_ollama()` 改為同時探測 GCP-A、GCP-B、111 fallback,保留主 Ollama models 檢查 |
|
||||
| Telegram 心跳 HTML | AI 服務區新增三個子列:`GCP-A`、`GCP-B`、`111`,各自顯示狀態與延遲 |
|
||||
| warnings | 任一已設定 Ollama endpoint 異常時,明確加入 `Ollama {name} 異常` |
|
||||
| 測試 | 新增 `test_heartbeat_ollama_endpoints.py`,鎖住三端點顯示與 warning 行為 |
|
||||
|
||||
### 驗證
|
||||
|
||||
```text
|
||||
pytest apps/api/tests/test_heartbeat_ollama_endpoints.py apps/api/tests/test_heartbeat_pod_state_machine.py apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py
|
||||
# 19 passed
|
||||
|
||||
py_compile apps/api/src/services/heartbeat_report_service.py apps/api/tests/test_heartbeat_ollama_endpoints.py
|
||||
# 通過
|
||||
|
||||
ruff check --select F401,F821,I001 apps/api/src/services/heartbeat_report_service.py apps/api/tests/test_heartbeat_ollama_endpoints.py
|
||||
# All checks passed
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user