diff --git a/apps/api/src/services/ai_providers/ollama.py b/apps/api/src/services/ai_providers/ollama.py index eb5dce07..49a7bdb6 100644 --- a/apps/api/src/services/ai_providers/ollama.py +++ b/apps/api/src/services/ai_providers/ollama.py @@ -133,3 +133,127 @@ class OllamaProvider: if self._http_client: await self._http_client.aclose() self._http_client = None + + +# 2026-04-26 Wave5 B1-fix by Claude Engineer-A4 — OLLAMA_188 provider 註冊 +class Ollama188Provider(OllamaProvider): + """ + Ollama 188 CPU-only 備援 Provider + + 繼承 OllamaProvider,但使用 OLLAMA_FALLBACK_URL(192.168.0.188:11434) + 作為推理端點,模型預設 OLLAMA_HEALTH_CHECK_MODEL(qwen2.5:7b-instruct)。 + + B1 修復:原本 _init_registry 未登錄此 provider,導致 + executor.execute() 遇到 "ollama_188" → not_registered → 跳過, + 188 從未被打到。此類別補全登錄鏈路。 + + 2026-04-26 Wave5 B1-fix by Claude Engineer-A4 + """ + + @property + def name(self) -> str: + return "ollama_188" + + @property + def is_enabled(self) -> bool: + import os + # 優先查 ENABLE_OLLAMA_188;若未設定(預設 true)則看 OLLAMA_FALLBACK_URL 是否有值 + env_override = os.getenv("ENABLE_OLLAMA_188", "true").lower() == "true" + if not env_override: + return False + # OLLAMA_FALLBACK_URL 空字串 → 未設定 188 節點 → 停用 + return bool(getattr(settings, "OLLAMA_FALLBACK_URL", "")) + + async def analyze( + self, + prompt: str, + context: dict | None = None, + ) -> AIResult: + start = time.perf_counter() + fallback_url = getattr(settings, "OLLAMA_FALLBACK_URL", "") + if not fallback_url: + return AIResult( + raw_response="", + success=False, + provider=self.name, + error="OLLAMA_FALLBACK_URL not configured", + ) + + try: + client = await self._get_client() + + registry = get_model_registry() + # 嘗試取 ollama_188 專屬設定,fallback 到 ollama 預設 + try: + model_name = registry.get_model("ollama_188", "rca") + except Exception: + model_name = getattr(settings, "OLLAMA_HEALTH_CHECK_MODEL", "qwen2.5:7b-instruct") + + try: + options = registry.get_provider_options("ollama_188") + except Exception: + options = registry.get_provider_options("ollama") + + # CPU-only 備援:固定使用較長 timeout(CPU 推理慢) + task_type = (context or {}).get("task_type", "") + if task_type in ("diagnose", "force_local"): + read_timeout = float(getattr(settings, "OLLAMA_DIAGNOSE_TIMEOUT_SECONDS", 200)) + else: + read_timeout = float(settings.OPENCLAW_TIMEOUT) + + response = await client.post( + f"{fallback_url}/api/generate", + json={ + "model": model_name, + "prompt": prompt, + "stream": False, + "format": "json", + "options": { + "num_predict": options.get("num_predict", 1024), + "temperature": options.get("temperature", 0.1), + "top_p": options.get("top_p", 0.9), + }, + }, + timeout=httpx.Timeout(read_timeout, connect=10.0), + ) + response.raise_for_status() + data = response.json() + result = data.get("response", "") + tokens = data.get("eval_count", 0) + data.get("prompt_eval_count", 0) + latency = (time.perf_counter() - start) * 1000 + + logger.info( + "ollama_188_provider_success", + response_length=len(result), + tokens=tokens, + latency_ms=round(latency, 1), + endpoint=fallback_url, + ) + return AIResult( + raw_response=result, + success=True, + provider=self.name, + tokens=tokens, + latency_ms=latency, + ) + + except httpx.TimeoutException as e: + latency = (time.perf_counter() - start) * 1000 + logger.warning("ollama_188_provider_timeout", error=str(e), latency_ms=round(latency, 1)) + return AIResult(raw_response="", success=False, provider=self.name, latency_ms=latency, error=f"Timeout: {e}") + + except Exception as e: + latency = (time.perf_counter() - start) * 1000 + logger.warning("ollama_188_provider_failed", error=str(e), latency_ms=round(latency, 1)) + return AIResult(raw_response="", success=False, provider=self.name, latency_ms=latency, error=str(e)) + + async def health_check(self) -> bool: + fallback_url = getattr(settings, "OLLAMA_FALLBACK_URL", "") + if not fallback_url: + return False + try: + client = await self._get_client() + resp = await client.get(f"{fallback_url}/api/tags", timeout=5.0) + return resp.status_code == 200 + except Exception: + return False diff --git a/apps/api/src/services/ai_router.py b/apps/api/src/services/ai_router.py index 634ae7dc..f939032b 100644 --- a/apps/api/src/services/ai_router.py +++ b/apps/api/src/services/ai_router.py @@ -1191,7 +1191,7 @@ _executor: AIRouterExecutor | None = None def _init_registry() -> AIProviderRegistry: """初始化 Provider Registry (首次呼叫時自動註冊所有 Provider)""" - from src.services.ai_providers.ollama import OllamaProvider + from src.services.ai_providers.ollama import OllamaProvider, Ollama188Provider # 2026-04-26 Wave5 B1-fix by Claude Engineer-A4 from src.services.ai_providers.gemini import GeminiProvider from src.services.ai_providers.claude import ClaudeProvider from src.services.ai_providers.openclaw_nemo import OpenClawNemoProvider @@ -1206,6 +1206,11 @@ def _init_registry() -> AIProviderRegistry: from src.services.ai_providers.nemotron import NemotronProvider registry.register(NemotronProvider()) + # 2026-04-26 Wave5 B1-fix by Claude Engineer-A4 — 補登 OLLAMA_188 備援 provider + # 修復:原本 failover_manager 決策返回 "ollama_188",但 executor 查不到 → not_registered + # → 188 從未被打到。必須明確 register 才能讓 executor.execute() 路由到 188。 + registry.register(Ollama188Provider()) + return registry diff --git a/apps/api/src/services/auto_repair_service.py b/apps/api/src/services/auto_repair_service.py index 4c29510c..312d85fb 100644 --- a/apps/api/src/services/auto_repair_service.py +++ b/apps/api/src/services/auto_repair_service.py @@ -435,6 +435,51 @@ class AutoRepairService: except Exception as _rg_e: logger.warning("runbook_generator_task_failed", error=str(_rg_e)) + # 2026-04-26 Wave4 P1.3+P1.4 by Claude Engineer-B3 — 飛輪閉環最後一哩 + # 成功執行後,fire-and-forget 啟動後執行驗證 + EWMA 學習回饋 + # verifier 有 10s warmup + 30s timeout,不能阻塞在主路徑 + try: + import asyncio as _asyncio + from src.services.post_execution_verifier import get_post_execution_verifier + from src.services.learning_service import get_learning_service + + _action_taken = f"auto_repair:{playbook.playbook_id}" + _verifier = get_post_execution_verifier() + _learning = get_learning_service() + + async def _verify_and_learn() -> None: + try: + verification_result = await _verifier.verify( + incident=incident, + snapshot=None, + action_taken=_action_taken, + ) + await _learning.record_verification_result( + incident_id=incident.incident_id, + action_taken=_action_taken, + verification_result=verification_result, + matched_playbook_id=playbook.playbook_id, + ) + logger.info( + "auto_repair_verify_and_learn_done", + incident_id=incident.incident_id, + playbook_id=playbook.playbook_id, + verification_result=verification_result, + ) + except Exception as _inner_e: + logger.warning( + "auto_repair_verify_and_learn_failed", + incident_id=incident.incident_id, + error=str(_inner_e), + ) + + _vl_task = _asyncio.create_task(_verify_and_learn()) + if hasattr(self, "_pending_tasks"): + self._pending_tasks.add(_vl_task) + _vl_task.add_done_callback(self._pending_tasks.discard) + except Exception as _vl_e: + logger.warning("auto_repair_verifier_setup_failed", error=str(_vl_e)) + return repair_result except Exception as e: diff --git a/apps/api/src/services/ollama_failover_manager.py b/apps/api/src/services/ollama_failover_manager.py index b342a327..dab53202 100644 --- a/apps/api/src/services/ollama_failover_manager.py +++ b/apps/api/src/services/ollama_failover_manager.py @@ -413,13 +413,21 @@ class OllamaFailoverManager: return True # fail-open quota = getattr(self._settings, "GEMINI_DAILY_QUOTA", 1000) key = f"ollama:gemini_daily_count:{datetime.date.today().isoformat()}" - count_raw = await redis.get(key) - count = int(count_raw or 0) - if count >= quota: + + # 2026-04-26 Wave5 B3-fix by Claude Engineer-A4 — atomic pipeline 修復 TOCTOU + # 原實作:GET → 判斷 → INCR → EXPIRE(分四步,INCR 後 crash 會丟 TTL, + # 且並行請求在 GET/INCR 之間競爭導致配額超發) + # 修法:pipeline 原子執行 SET NX(首次設 TTL) + INCR,用 INCR 後的新值判斷 + pipe = redis.pipeline() + pipe.set(key, 0, ex=86400, nx=True) # 僅首次寫入設 TTL;已存在則跳過 + pipe.incr(key) # 原子遞增,回傳遞增後的值 + results = await pipe.execute() + new_count = int(results[1]) # results[1] = INCR 後新值 + + if new_count > quota: + # 已超配額(INCR 後 > quota),回退不是必要的(最多超發 1 次) + # 但要回傳 False 讓 router 切到 188 return False - # atomic incr + 設定 TTL(確保跨日自動重置) - await redis.incr(key) - await redis.expire(key, 86400) return True except Exception as e: logger.warning("gemini_quota_check_failed", error=str(e)) diff --git a/apps/api/tests/test_learning_chain_e2e.py b/apps/api/tests/test_learning_chain_e2e.py new file mode 100644 index 00000000..977bde76 --- /dev/null +++ b/apps/api/tests/test_learning_chain_e2e.py @@ -0,0 +1,377 @@ +""" +飛輪閉環 E2E 測試 — auto_repair → PostExecutionVerifier → LearningService → EWMA +================================================================================ +2026-04-26 Wave4 P1.3+P1.4 by Claude Engineer-B3 — 飛輪閉環最後一哩 + +測試範圍: + - execute_auto_repair 成功 → verifier 被呼叫 → record_verification_result 被呼叫 + - execute_auto_repair 失敗 → verifier 不被呼叫(主 except 路徑) + - matched_playbook_id=None 的 record_verification_result → log warning 不 crash + - verifier 拋例外 → 修復仍回傳成功,trust 不更新 + +🔴 遵循 feedback_no_mock_testing.md: + - 禁止 MagicMock/AsyncMock/unittest.mock.patch + - 使用純 Python Stub 類別 + pytest monkeypatch(替換 module-level getter) +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from src.models.incident import Incident, IncidentStatus, Severity, Signal +from src.models.playbook import ( + ActionType, + Playbook, + PlaybookStatus, + RepairStep, + RiskLevel, + SymptomPattern, +) +from src.services.auto_repair_service import AutoRepairService +from src.utils.timezone import now_taipei + + +# ============================================================================= +# Stubs +# ============================================================================= + + +class StubVerifier: + """PostExecutionVerifier 的輕量 Stub — 記錄呼叫,不真正等 K8s""" + + def __init__(self, result: str = "success", raise_exc: Exception | None = None): + self.result = result + self.raise_exc = raise_exc + self.calls: list[dict] = [] + + async def verify( + self, + incident, + snapshot, + action_taken: str, + warmup_sec: float = 0.0, + ) -> str: + self.calls.append( + {"incident_id": incident.incident_id, "snapshot": snapshot, "action_taken": action_taken} + ) + if self.raise_exc is not None: + raise self.raise_exc + return self.result + + +class StubLearningService: + """LearningService 的輕量 Stub — 記錄 record_verification_result 呼叫""" + + def __init__(self) -> None: + self.verification_calls: list[dict] = [] + + async def record_verification_result( + self, + incident_id: str, + action_taken: str, + verification_result: str, + matched_playbook_id: str | None = None, + ) -> None: + self.verification_calls.append( + { + "incident_id": incident_id, + "action_taken": action_taken, + "verification_result": verification_result, + "matched_playbook_id": matched_playbook_id, + } + ) + + +class StubPlaybookService: + """PlaybookService 的輕量 Stub — 支援 record_execution + get_recommendations""" + + def __init__(self) -> None: + self._playbooks: dict[str, Playbook] = {} + self._recommendations: list = [] + + def add_playbook(self, playbook: Playbook) -> None: + self._playbooks[playbook.playbook_id] = playbook + + def set_recommendations(self, recommendations: list) -> None: + self._recommendations = recommendations + + async def get_recommendations(self, symptoms, top_k: int = 3) -> list: + return self._recommendations + + async def get_by_id(self, playbook_id: str) -> Playbook | None: + return self._playbooks.get(playbook_id) + + async def record_execution(self, playbook_id: str, success: bool) -> bool: + playbook = self._playbooks.get(playbook_id) + if playbook is not None: + if success: + playbook.success_count += 1 + else: + playbook.failure_count += 1 + return playbook is not None + + +class StubRecommendation: + def __init__(self, playbook: Playbook, similarity_score: float = 0.9) -> None: + self.playbook = playbook + self.similarity_score = similarity_score + + +# ============================================================================= +# Factories +# ============================================================================= + + +def _make_incident( + incident_id: str = "INC-E2E-001", + severity: Severity = Severity.P2, +) -> Incident: + now = now_taipei() + return Incident( + incident_id=incident_id, + status=IncidentStatus.INVESTIGATING, + severity=severity, + affected_services=["e2e-service"], + signals=[ + Signal( + alert_name="TestAlert", + severity=severity, + source="prometheus", + fired_at=now, + labels={"namespace": "awoooi-prod", "alertname": "TestAlert"}, + ) + ], + ) + + +def _make_playbook( + playbook_id: str = "PB-E2E-001", + trust_score: float = 0.5, +) -> Playbook: + pb = Playbook( + playbook_id=playbook_id, + name="E2E 測試 Playbook", + description="飛輪閉環 E2E 測試用", + status=PlaybookStatus.APPROVED, + symptom_pattern=SymptomPattern( + alert_names=["TestAlert"], + affected_services=["e2e-service"], + severity_range=["P2"], + ), + repair_steps=[ + RepairStep( + step_number=1, + action_type=ActionType.MANUAL, + command="echo test", + risk_level=RiskLevel.LOW, + ) + ], + trust_score=trust_score, + success_count=5, + failure_count=1, + ) + return pb + + +async def _no_cooldown(*args, **kwargs) -> tuple[bool, str]: + return True, "允許修復 (test bypass)" + + +# ============================================================================= +# Tests +# ============================================================================= + + +@pytest.mark.asyncio +async def test_auto_repair_success_triggers_verify_and_learn(monkeypatch): + """ + 執行成功 → verifier.verify() 被呼叫 → record_verification_result 被呼叫 + 驗證飛輪鏈路的前兩段接通。 + """ + stub_verifier = StubVerifier(result="success") + stub_learning = StubLearningService() + + # 替換 module-level getters(pure Python, no MagicMock) + import src.services.auto_repair_service as _ars_mod + monkeypatch.setattr(_ars_mod, "_verifier_getter", None, raising=False) + + import src.services.post_execution_verifier as _pev_mod + monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier) + + import src.services.learning_service as _ls_mod + monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning) + + playbook = _make_playbook() + pb_service = StubPlaybookService() + pb_service.add_playbook(playbook) + pb_service.set_recommendations([StubRecommendation(playbook)]) + + service = AutoRepairService( + playbook_service=pb_service, + cooldown_checker=_no_cooldown, + ) + + incident = _make_incident() + result = await service.execute_auto_repair(incident, playbook) + + assert result.success is True + + # fire-and-forget task — 讓 event loop 執行完 + # verifier 有 warmup_sec,但 Stub 忽略 warmup(不 sleep) + await asyncio.sleep(0.05) + + assert len(stub_verifier.calls) == 1, "verifier.verify() 應被呼叫一次" + assert stub_verifier.calls[0]["incident_id"] == incident.incident_id + assert stub_verifier.calls[0]["snapshot"] is None + + assert len(stub_learning.verification_calls) == 1, "record_verification_result 應被呼叫一次" + call = stub_learning.verification_calls[0] + assert call["incident_id"] == incident.incident_id + assert call["verification_result"] == "success" + assert call["matched_playbook_id"] == playbook.playbook_id + + +@pytest.mark.asyncio +async def test_auto_repair_failure_does_not_call_verifier(monkeypatch): + """ + 執行失敗(步驟拋例外)→ verifier 不被呼叫(失敗路徑不進入 verify-and-learn 區塊) + """ + stub_verifier = StubVerifier(result="success") + + import src.services.post_execution_verifier as _pev_mod + monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier) + + import src.services.learning_service as _ls_mod + stub_learning = StubLearningService() + monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning) + + # 建立一個會讓 _execute_step raise 的 playbook(KUBECTL 步驟,executor 不可用時只 skip,不 raise) + # 直接讓 playbook_service.record_execution 正常工作,驗證失敗路徑不呼叫 verifier + + class FailingPlaybookService(StubPlaybookService): + async def record_execution(self, playbook_id: str, success: bool) -> bool: + # 正常記錄,不 raise + return True + + playbook = _make_playbook() + pb_service = FailingPlaybookService() + pb_service.add_playbook(playbook) + + # 讓 _execute_step 拋例外以觸發失敗路徑 + original_execute_step = AutoRepairService._execute_step + + async def _always_fail(self_inner, incident_arg, step_arg) -> str: + raise RuntimeError("強制測試失敗") + + service = AutoRepairService( + playbook_service=pb_service, + cooldown_checker=_no_cooldown, + ) + # Monkeypatch instance method + monkeypatch.setattr(AutoRepairService, "_execute_step", _always_fail) + + incident = _make_incident() + result = await service.execute_auto_repair(incident, playbook) + + assert result.success is False + + await asyncio.sleep(0.05) + + # 失敗路徑不進入 verify-and-learn 塊 + assert len(stub_verifier.calls) == 0, "執行失敗時不應呼叫 verifier" + assert len(stub_learning.verification_calls) == 0, "執行失敗時不應呼叫 record_verification_result" + + +@pytest.mark.asyncio +async def test_record_verification_result_no_playbook_id_does_not_crash(): + """ + matched_playbook_id=None → record_verification_result 正常執行,不 crash。 + 驗證 learning_service 對 None playbook_id 的防禦性。 + """ + from src.services.learning_service import LearningService + from src.repositories.interfaces import ILearningRepository, ITrustRepository + + class NullLearningRepo: + async def record_repair(self, **kwargs) -> bool: + return True + + async def get_repair_stats(self, *a, **kw): + return {} + + async def get_all_repair_stats(self, *a, **kw): + return {} + + async def record_disposition(self, *a, **kw): + return True + + async def get_dispositions(self, *a, **kw): + return {} + + class NullTrustRepo: + async def save_trust_record(self, *a, **kw): + pass + + async def load_trust_record(self, *a, **kw): + return None + + async def get_all_trust_records(self, *a, **kw): + return [] + + # 直接呼叫 record_verification_result(matched_playbook_id=None) + # 不應 raise,只應 log warning 並略過 _update_playbook_stats + svc = LearningService( + repository=NullLearningRepo(), + trust_repository=NullTrustRepo(), + ) + + # 不應拋例外 + await svc.record_verification_result( + incident_id="INC-NULL-PB-001", + action_taken="auto_repair:none", + verification_result="success", + matched_playbook_id=None, + ) + # 只要不 crash 即通過 + + +@pytest.mark.asyncio +async def test_verifier_exception_does_not_block_repair(monkeypatch): + """ + verifier 拋例外 → 修復結果仍回傳 success=True,learning 不被呼叫。 + 驗證 _verify_and_learn 的 exception 隔離。 + """ + stub_verifier = StubVerifier( + result="success", + raise_exc=RuntimeError("verifier 模擬故障"), + ) + stub_learning = StubLearningService() + + import src.services.post_execution_verifier as _pev_mod + monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier) + + import src.services.learning_service as _ls_mod + monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning) + + playbook = _make_playbook() + pb_service = StubPlaybookService() + pb_service.add_playbook(playbook) + + service = AutoRepairService( + playbook_service=pb_service, + cooldown_checker=_no_cooldown, + ) + + incident = _make_incident() + result = await service.execute_auto_repair(incident, playbook) + + # 主路徑成功回傳 + assert result.success is True + + await asyncio.sleep(0.05) + + # verifier 被呼叫(但拋了例外) + assert len(stub_verifier.calls) == 1 + # learning 不應被呼叫(因為 verifier raise 中斷了 _verify_and_learn) + assert len(stub_learning.verification_calls) == 0, "verifier 拋例外後 learning 不應被呼叫"