feat(heartbeat): show ollama endpoint topology
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m16s
CD Pipeline / build-and-deploy (push) Successful in 3m30s
CD Pipeline / post-deploy-checks (push) Successful in 1m17s

This commit is contained in:
Your Name
2026-05-06 17:58:56 +08:00
parent d79ec4f647
commit a158b77422
3 changed files with 169 additions and 22 deletions

View File

@@ -108,6 +108,7 @@ class HeartbeatReport:
timestamp: datetime
ai_services: dict[str, ProbeResult] = field(default_factory=dict)
ollama_models: dict[str, bool] = field(default_factory=dict)
ollama_endpoints: dict[str, ProbeResult] = field(default_factory=dict)
mcp_providers: dict[str, ProbeResult] = field(default_factory=dict)
flywheel: FlywheelStats = field(default_factory=FlywheelStats)
infra: dict[str, ProbeResult] = field(default_factory=dict)
@@ -181,6 +182,7 @@ class HeartbeatReportService:
ollama_data = collected["_ollama"] or {}
report.ai_services["ollama"] = ollama_data.get("probe", ProbeResult(False, "❌ 無回應"))
report.ollama_models = ollama_data.get("models", {})
report.ollama_endpoints = ollama_data.get("endpoints", {})
report.ai_services["nemotron"] = collected["_nemotron"] or ProbeResult(False, "❌ 無回應")
report.ai_services["gemini"] = collected["_gemini"] or ProbeResult(False, "❌ 無回應")
report.ai_services["claude"] = collected["_claude"] or ProbeResult(False, "❌ 無回應")
@@ -224,37 +226,62 @@ class HeartbeatReportService:
async def _probe_ollama(self) -> dict:
"""探測 Ollama 服務 + 逐一確認所需模型"""
try:
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
endpoints = [
("GCP-A", settings.OLLAMA_URL),
("GCP-B", getattr(settings, "OLLAMA_SECONDARY_URL", "")),
("111", getattr(settings, "OLLAMA_FALLBACK_URL", "")),
]
async def _probe_endpoint(
client: httpx.AsyncClient,
label: str,
url: str,
) -> tuple[str, ProbeResult, set[str]]:
if not url:
return label, ProbeResult(False, "⚠️ 未設定"), set()
try:
t0 = asyncio.get_event_loop().time()
resp = await client.get(f"{settings.OLLAMA_URL}/api/tags")
resp = await client.get(f"{url}/api/tags")
latency = (asyncio.get_event_loop().time() - t0) * 1000
if resp.status_code != 200:
return label, ProbeResult(False, f"❌ HTTP {resp.status_code}", latency), set()
available = {m["name"] for m in resp.json().get("models", [])}
return label, ProbeResult(True, "✅ 正常", round(latency, 1)), available
except Exception as e:
return label, ProbeResult(False, f"{str(e)[:60]}"), set()
if resp.status_code != 200:
return {
"probe": ProbeResult(False, f"❌ HTTP {resp.status_code}", latency),
"models": {},
}
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
results = await asyncio.gather(
*[_probe_endpoint(client, label, url) for label, url in endpoints],
)
available = {m["name"] for m in resp.json().get("models", [])}
endpoint_status = {label: probe for label, probe, _available in results}
primary_probe = endpoint_status.get("GCP-A", ProbeResult(False, "❌ 無回應"))
primary_available = next(
(available for label, _probe, available in results if label == "GCP-A"),
set(),
)
if primary_probe.ok:
# 也把 short name無 :tag加進去方便匹配
available_short = {n.split(":")[0] for n in available}
available_short = {n.split(":")[0] for n in primary_available}
model_status: dict[str, bool] = {}
for required in settings.OLLAMA_REQUIRED_MODELS:
req_short = required.split(":")[0]
ok = required in available or req_short in available_short
ok = required in primary_available or req_short in available_short
model_status[required] = ok
return {
"probe": ProbeResult(True, "✅ 正常", round(latency, 1)),
"probe": primary_probe,
"models": model_status,
"endpoints": endpoint_status,
}
except Exception as e:
return {
"probe": ProbeResult(False, f"{str(e)[:60]}"),
"models": {},
}
return {
"probe": primary_probe,
"models": {},
"endpoints": endpoint_status,
}
async def _probe_nemotron(self) -> ProbeResult:
"""探測 Nemotron NIM API"""
@@ -437,9 +464,11 @@ class HeartbeatReportService:
try:
# KM 向量化率DB 查詢)
from sqlalchemy import func, select
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
from src.db.models import KnowledgeEntryRecord
from sqlalchemy import func, select, text as sa_text
async with get_db_context() as db:
# KM 總數
km_total = await db.scalar(select(func.count()).select_from(KnowledgeEntryRecord))
@@ -490,8 +519,9 @@ class HeartbeatReportService:
"""查 24h 告警流水線統計approval_records"""
stats = AlertPipelineStats()
try:
from src.db.base import get_db_context
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
async with get_db_context() as db:
r = await db.execute(sa_text("""
SELECT
@@ -517,8 +547,9 @@ class HeartbeatReportService:
"""探測 PostgreSQL 與 Redis 連線健康"""
s = DbRedisStats()
try:
from src.db.base import get_db_context
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
async with get_db_context() as db:
await db.execute(sa_text("SELECT 1"))
s.db_ok = True
@@ -652,8 +683,9 @@ class HeartbeatReportService:
logger.debug("heartbeat_automation_redis_failed", error=str(e))
try:
from src.db.base import get_db_context
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
async with get_db_context() as db:
# 今日新增 KMtimestamptz 直接比較,不需 AT TIME ZONE
km_today = await db.scalar(sa_text(
@@ -686,6 +718,10 @@ class HeartbeatReportService:
if not loaded:
warnings.append(f"{model} 未載入,相關功能失效")
for name, probe in report.ollama_endpoints.items():
if not probe.ok and not probe.status.startswith("⚠️ 未設定"):
warnings.append(f"Ollama {name} 異常: {probe.status}")
# AI 服務異常
for name, probe in report.ai_services.items():
if not probe.ok and not probe.status.startswith("⚠️"):
@@ -816,6 +852,12 @@ def report_to_telegram_html(report: HeartbeatReport) -> str:
lines.append("🤖 <b>AI 服務</b>")
lines.append(f"├─ Ollama: {ollama.status}{ollama_lat} <code>{html.escape(models_str)}</code>")
if report.ollama_endpoints:
endpoint_items = list(report.ollama_endpoints.items())
for idx, (name, probe) in enumerate(endpoint_items):
branch = "" if idx == len(endpoint_items) - 1 else ""
latency = f" {probe.latency_ms:.0f}ms" if probe.latency_ms else ""
lines.append(f"{branch}{html.escape(name)}: {probe.status}{latency}")
lines.append(f"├─ Nemotron NIM: {nem.status}" + (f" {nem.latency_ms:.0f}ms" if nem.latency_ms else ""))
lines.append(f"├─ Gemini API: {gem.status}" + (f" {gem.latency_ms:.0f}ms" if gem.latency_ms else ""))
lines.append(f"└─ Claude API: {cla.status}" + (f" {cla.latency_ms:.0f}ms" if cla.latency_ms else ""))

View File

@@ -0,0 +1,77 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
import pytest
from src.services import heartbeat_report_service as heartbeat
class _FakeResponse:
def __init__(self, status_code: int, payload: dict[str, Any] | None = None) -> None:
self.status_code = status_code
self._payload = payload or {}
def json(self) -> dict[str, Any]:
return self._payload
class _FakeAsyncClient:
def __init__(self, *_args: Any, **_kwargs: Any) -> None:
pass
async def __aenter__(self) -> "_FakeAsyncClient":
return self
async def __aexit__(self, *_args: Any) -> None:
return None
async def get(self, url: str) -> _FakeResponse:
if url.startswith("http://gcp-a"):
return _FakeResponse(
200,
{"models": [{"name": "qwen3:14b"}, {"name": "bge-m3:latest"}]},
)
if url.startswith("http://gcp-b"):
return _FakeResponse(200, {"models": [{"name": "gemma3:4b"}]})
raise TimeoutError("connect failed")
@pytest.mark.asyncio
async def test_probe_ollama_reports_each_endpoint(monkeypatch) -> None:
monkeypatch.setattr(heartbeat.httpx, "AsyncClient", _FakeAsyncClient)
monkeypatch.setattr(heartbeat.settings, "OLLAMA_URL", "http://gcp-a:11434")
monkeypatch.setattr(heartbeat.settings, "OLLAMA_SECONDARY_URL", "http://gcp-b:11434")
monkeypatch.setattr(heartbeat.settings, "OLLAMA_FALLBACK_URL", "http://local-111:11434")
monkeypatch.setattr(heartbeat.settings, "OLLAMA_REQUIRED_MODELS", ["qwen3:14b", "bge-m3:latest"])
result = await heartbeat.HeartbeatReportService()._probe_ollama()
assert result["probe"].ok is True
assert result["models"] == {"qwen3:14b": True, "bge-m3:latest": True}
assert result["endpoints"]["GCP-A"].ok is True
assert result["endpoints"]["GCP-B"].ok is True
assert result["endpoints"]["111"].ok is False
def test_report_to_telegram_html_renders_ollama_endpoint_statuses() -> None:
report = heartbeat.HeartbeatReport(timestamp=datetime(2026, 5, 6, 18, 0))
report.ai_services["ollama"] = heartbeat.ProbeResult(True, "✅ 正常", 1200)
report.ai_services["nemotron"] = heartbeat.ProbeResult(True, "✅ 正常", 900)
report.ai_services["gemini"] = heartbeat.ProbeResult(True, "✅ 正常", 800)
report.ai_services["claude"] = heartbeat.ProbeResult(True, "✅ 正常", 700)
report.ollama_models = {"qwen3:14b": True}
report.ollama_endpoints = {
"GCP-A": heartbeat.ProbeResult(True, "✅ 正常", 1000),
"GCP-B": heartbeat.ProbeResult(True, "✅ 正常", 1100),
"111": heartbeat.ProbeResult(False, "❌ connect failed"),
}
report.warnings = heartbeat.HeartbeatReportService()._build_warnings(report)
text = heartbeat.report_to_telegram_html(report)
assert "GCP-A: ✅ 正常" in text
assert "GCP-B: ✅ 正常" in text
assert "111: ❌ connect failed" in text
assert "Ollama 111 異常" in "\n".join(report.warnings)

View File

@@ -3986,3 +3986,31 @@ ruff check --select F401,F821,I001 apps/api/src/services/mcp_audit_context.py ap
- `ollama188-retirement-gate.sh` 預設 24 小時窗口仍會看到退場前歷史 POST因此短期會 fail判斷「現在是否仍打 188」需用較短觀察窗口。
- 後續若要讓 111 真正成為第三順位可用 fallback需要先修通 K8s / API Pod 到 `192.168.0.111:11434` 的網路路徑。
---
## 2026-05-06台北— 心跳報告列出 Ollama 三段式端點
**觸發**:系統報告原本只顯示單一 `Ollama: 正常`,容易讓人誤判 111、GCP-A、GCP-B 的實際狀態;在 111 網路不可達時,報告仍可能看起來「全系統正常」。
### 已修正
| 範圍 | 結果 |
|------|------|
| `heartbeat_report_service.py` | `_probe_ollama()` 改為同時探測 GCP-A、GCP-B、111 fallback保留主 Ollama models 檢查 |
| Telegram 心跳 HTML | AI 服務區新增三個子列:`GCP-A``GCP-B``111`,各自顯示狀態與延遲 |
| warnings | 任一已設定 Ollama endpoint 異常時,明確加入 `Ollama {name} 異常` |
| 測試 | 新增 `test_heartbeat_ollama_endpoints.py`,鎖住三端點顯示與 warning 行為 |
### 驗證
```text
pytest apps/api/tests/test_heartbeat_ollama_endpoints.py apps/api/tests/test_heartbeat_pod_state_machine.py apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py
# 19 passed
py_compile apps/api/src/services/heartbeat_report_service.py apps/api/tests/test_heartbeat_ollama_endpoints.py
# 通過
ruff check --select F401,F821,I001 apps/api/src/services/heartbeat_report_service.py apps/api/tests/test_heartbeat_ollama_endpoints.py
# All checks passed
```