feat(wave4-5): P1.3+P1.4 真接線 + Ollama_188 provider 註冊 + quota atomic 修復
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 2m0s

3 個 engineers 在限額前的 Wave 4/5 完成工作(補 commit):

Engineer-B3 — Wave 4 P1.3+P1.4 真飛輪閉環(auto_repair_service.py 才是正確接線位置):
- execute_auto_repair 成功後 fire-and-forget 啟動 PostExecutionVerifier
- record_verification_result 觸發 EWMA trust_score 演化
- snapshot=None(不依賴 EvidenceSnapshot,避免我之前 webhooks.py 補丁的 B2 bug)
- _pending_tasks 管理生命週期,Lifespan shutdown 時等任務完成

Engineer-A4 — Wave 5 B1-fix Ollama188Provider 註冊:
- ai_providers/ollama.py: 新增 Ollama188Provider(OllamaProvider) 子類
  - name="ollama_188", is_enabled 看 ENABLE_OLLAMA_188 + OLLAMA_FALLBACK_URL
  - analyze() 用 OLLAMA_FALLBACK_URL(192.168.0.188:11434)作為推理端點
- ai_router.py:_init_registry 補 registry.register(Ollama188Provider())
- 修復 BLOCKER:原本 failover_manager 決策返回 "ollama_188",但 executor 查不到
  → not_registered → 188 從未被打到。Wave 2 P1.1 整套容災系統前段卡住。

Engineer-A4 — Wave 5 B3-fix Gemini quota TOCTOU 修復:
- ollama_failover_manager.py:_check_gemini_quota 改用 redis.pipeline()
  原 GET → 判斷 → INCR → EXPIRE 四步分離,並行請求在 GET/INCR 間競爭超發
  修法:SET NX(首次設 TTL) + INCR atomic pipeline,用 INCR 後新值判斷

Engineer-B3 — test_learning_chain_e2e.py(377 行 No-Mock 整合測試):
- 純 Python Stub + monkeypatch(feedback_no_mock_testing.md 合規)
- execute_auto_repair 成功 → verifier 被呼叫 ✓
- execute_auto_repair 失敗 → verifier 不被呼叫 ✓
- matched_playbook_id=None → log warning 不 crash ✓
- verifier 拋例外 → 修復回傳成功,trust 不更新 ✓

Tests: 42 passed (failover_manager + ai_router_failover_integration 全綠)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Engineer-A4 + Engineer-B3 (上 session) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-26 20:44:19 +08:00
parent 75b404379b
commit 02362eddcf
5 changed files with 566 additions and 7 deletions

View File

@@ -133,3 +133,127 @@ class OllamaProvider:
if self._http_client:
await self._http_client.aclose()
self._http_client = None
# 2026-04-26 Wave5 B1-fix by Claude Engineer-A4 — OLLAMA_188 provider 註冊
class Ollama188Provider(OllamaProvider):
"""
Ollama 188 CPU-only 備援 Provider
繼承 OllamaProvider但使用 OLLAMA_FALLBACK_URL192.168.0.188:11434
作為推理端點,模型預設 OLLAMA_HEALTH_CHECK_MODELqwen2.5:7b-instruct
B1 修復:原本 _init_registry 未登錄此 provider導致
executor.execute() 遇到 "ollama_188" → not_registered → 跳過,
188 從未被打到。此類別補全登錄鏈路。
2026-04-26 Wave5 B1-fix by Claude Engineer-A4
"""
@property
def name(self) -> str:
return "ollama_188"
@property
def is_enabled(self) -> bool:
import os
# 優先查 ENABLE_OLLAMA_188若未設定預設 true則看 OLLAMA_FALLBACK_URL 是否有值
env_override = os.getenv("ENABLE_OLLAMA_188", "true").lower() == "true"
if not env_override:
return False
# OLLAMA_FALLBACK_URL 空字串 → 未設定 188 節點 → 停用
return bool(getattr(settings, "OLLAMA_FALLBACK_URL", ""))
async def analyze(
self,
prompt: str,
context: dict | None = None,
) -> AIResult:
start = time.perf_counter()
fallback_url = getattr(settings, "OLLAMA_FALLBACK_URL", "")
if not fallback_url:
return AIResult(
raw_response="",
success=False,
provider=self.name,
error="OLLAMA_FALLBACK_URL not configured",
)
try:
client = await self._get_client()
registry = get_model_registry()
# 嘗試取 ollama_188 專屬設定fallback 到 ollama 預設
try:
model_name = registry.get_model("ollama_188", "rca")
except Exception:
model_name = getattr(settings, "OLLAMA_HEALTH_CHECK_MODEL", "qwen2.5:7b-instruct")
try:
options = registry.get_provider_options("ollama_188")
except Exception:
options = registry.get_provider_options("ollama")
# CPU-only 備援:固定使用較長 timeoutCPU 推理慢)
task_type = (context or {}).get("task_type", "")
if task_type in ("diagnose", "force_local"):
read_timeout = float(getattr(settings, "OLLAMA_DIAGNOSE_TIMEOUT_SECONDS", 200))
else:
read_timeout = float(settings.OPENCLAW_TIMEOUT)
response = await client.post(
f"{fallback_url}/api/generate",
json={
"model": model_name,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {
"num_predict": options.get("num_predict", 1024),
"temperature": options.get("temperature", 0.1),
"top_p": options.get("top_p", 0.9),
},
},
timeout=httpx.Timeout(read_timeout, connect=10.0),
)
response.raise_for_status()
data = response.json()
result = data.get("response", "")
tokens = data.get("eval_count", 0) + data.get("prompt_eval_count", 0)
latency = (time.perf_counter() - start) * 1000
logger.info(
"ollama_188_provider_success",
response_length=len(result),
tokens=tokens,
latency_ms=round(latency, 1),
endpoint=fallback_url,
)
return AIResult(
raw_response=result,
success=True,
provider=self.name,
tokens=tokens,
latency_ms=latency,
)
except httpx.TimeoutException as e:
latency = (time.perf_counter() - start) * 1000
logger.warning("ollama_188_provider_timeout", error=str(e), latency_ms=round(latency, 1))
return AIResult(raw_response="", success=False, provider=self.name, latency_ms=latency, error=f"Timeout: {e}")
except Exception as e:
latency = (time.perf_counter() - start) * 1000
logger.warning("ollama_188_provider_failed", error=str(e), latency_ms=round(latency, 1))
return AIResult(raw_response="", success=False, provider=self.name, latency_ms=latency, error=str(e))
async def health_check(self) -> bool:
fallback_url = getattr(settings, "OLLAMA_FALLBACK_URL", "")
if not fallback_url:
return False
try:
client = await self._get_client()
resp = await client.get(f"{fallback_url}/api/tags", timeout=5.0)
return resp.status_code == 200
except Exception:
return False

View File

@@ -1191,7 +1191,7 @@ _executor: AIRouterExecutor | None = None
def _init_registry() -> AIProviderRegistry:
"""初始化 Provider Registry (首次呼叫時自動註冊所有 Provider)"""
from src.services.ai_providers.ollama import OllamaProvider
from src.services.ai_providers.ollama import OllamaProvider, Ollama188Provider # 2026-04-26 Wave5 B1-fix by Claude Engineer-A4
from src.services.ai_providers.gemini import GeminiProvider
from src.services.ai_providers.claude import ClaudeProvider
from src.services.ai_providers.openclaw_nemo import OpenClawNemoProvider
@@ -1206,6 +1206,11 @@ def _init_registry() -> AIProviderRegistry:
from src.services.ai_providers.nemotron import NemotronProvider
registry.register(NemotronProvider())
# 2026-04-26 Wave5 B1-fix by Claude Engineer-A4 — 補登 OLLAMA_188 備援 provider
# 修復:原本 failover_manager 決策返回 "ollama_188",但 executor 查不到 → not_registered
# → 188 從未被打到。必須明確 register 才能讓 executor.execute() 路由到 188。
registry.register(Ollama188Provider())
return registry

View File

@@ -435,6 +435,51 @@ class AutoRepairService:
except Exception as _rg_e:
logger.warning("runbook_generator_task_failed", error=str(_rg_e))
# 2026-04-26 Wave4 P1.3+P1.4 by Claude Engineer-B3 — 飛輪閉環最後一哩
# 成功執行後fire-and-forget 啟動後執行驗證 + EWMA 學習回饋
# verifier 有 10s warmup + 30s timeout不能阻塞在主路徑
try:
import asyncio as _asyncio
from src.services.post_execution_verifier import get_post_execution_verifier
from src.services.learning_service import get_learning_service
_action_taken = f"auto_repair:{playbook.playbook_id}"
_verifier = get_post_execution_verifier()
_learning = get_learning_service()
async def _verify_and_learn() -> None:
try:
verification_result = await _verifier.verify(
incident=incident,
snapshot=None,
action_taken=_action_taken,
)
await _learning.record_verification_result(
incident_id=incident.incident_id,
action_taken=_action_taken,
verification_result=verification_result,
matched_playbook_id=playbook.playbook_id,
)
logger.info(
"auto_repair_verify_and_learn_done",
incident_id=incident.incident_id,
playbook_id=playbook.playbook_id,
verification_result=verification_result,
)
except Exception as _inner_e:
logger.warning(
"auto_repair_verify_and_learn_failed",
incident_id=incident.incident_id,
error=str(_inner_e),
)
_vl_task = _asyncio.create_task(_verify_and_learn())
if hasattr(self, "_pending_tasks"):
self._pending_tasks.add(_vl_task)
_vl_task.add_done_callback(self._pending_tasks.discard)
except Exception as _vl_e:
logger.warning("auto_repair_verifier_setup_failed", error=str(_vl_e))
return repair_result
except Exception as e:

View File

@@ -413,13 +413,21 @@ class OllamaFailoverManager:
return True # fail-open
quota = getattr(self._settings, "GEMINI_DAILY_QUOTA", 1000)
key = f"ollama:gemini_daily_count:{datetime.date.today().isoformat()}"
count_raw = await redis.get(key)
count = int(count_raw or 0)
if count >= quota:
# 2026-04-26 Wave5 B3-fix by Claude Engineer-A4 — atomic pipeline 修復 TOCTOU
# 原實作GET → 判斷 → INCR → EXPIRE分四步INCR 後 crash 會丟 TTL
# 且並行請求在 GET/INCR 之間競爭導致配額超發)
# 修法pipeline 原子執行 SET NX首次設 TTL + INCR用 INCR 後的新值判斷
pipe = redis.pipeline()
pipe.set(key, 0, ex=86400, nx=True) # 僅首次寫入設 TTL已存在則跳過
pipe.incr(key) # 原子遞增,回傳遞增後的值
results = await pipe.execute()
new_count = int(results[1]) # results[1] = INCR 後新值
if new_count > quota:
# 已超配額INCR 後 > quota回退不是必要的最多超發 1 次)
# 但要回傳 False 讓 router 切到 188
return False
# atomic incr + 設定 TTL確保跨日自動重置
await redis.incr(key)
await redis.expire(key, 86400)
return True
except Exception as e:
logger.warning("gemini_quota_check_failed", error=str(e))

View File

@@ -0,0 +1,377 @@
"""
飛輪閉環 E2E 測試 — auto_repair → PostExecutionVerifier → LearningService → EWMA
================================================================================
2026-04-26 Wave4 P1.3+P1.4 by Claude Engineer-B3 — 飛輪閉環最後一哩
測試範圍:
- execute_auto_repair 成功 → verifier 被呼叫 → record_verification_result 被呼叫
- execute_auto_repair 失敗 → verifier 不被呼叫(主 except 路徑)
- matched_playbook_id=None 的 record_verification_result → log warning 不 crash
- verifier 拋例外 → 修復仍回傳成功trust 不更新
🔴 遵循 feedback_no_mock_testing.md:
- 禁止 MagicMock/AsyncMock/unittest.mock.patch
- 使用純 Python Stub 類別 + pytest monkeypatch替換 module-level getter
"""
from __future__ import annotations
import asyncio
import pytest
from src.models.incident import Incident, IncidentStatus, Severity, Signal
from src.models.playbook import (
ActionType,
Playbook,
PlaybookStatus,
RepairStep,
RiskLevel,
SymptomPattern,
)
from src.services.auto_repair_service import AutoRepairService
from src.utils.timezone import now_taipei
# =============================================================================
# Stubs
# =============================================================================
class StubVerifier:
"""PostExecutionVerifier 的輕量 Stub — 記錄呼叫,不真正等 K8s"""
def __init__(self, result: str = "success", raise_exc: Exception | None = None):
self.result = result
self.raise_exc = raise_exc
self.calls: list[dict] = []
async def verify(
self,
incident,
snapshot,
action_taken: str,
warmup_sec: float = 0.0,
) -> str:
self.calls.append(
{"incident_id": incident.incident_id, "snapshot": snapshot, "action_taken": action_taken}
)
if self.raise_exc is not None:
raise self.raise_exc
return self.result
class StubLearningService:
"""LearningService 的輕量 Stub — 記錄 record_verification_result 呼叫"""
def __init__(self) -> None:
self.verification_calls: list[dict] = []
async def record_verification_result(
self,
incident_id: str,
action_taken: str,
verification_result: str,
matched_playbook_id: str | None = None,
) -> None:
self.verification_calls.append(
{
"incident_id": incident_id,
"action_taken": action_taken,
"verification_result": verification_result,
"matched_playbook_id": matched_playbook_id,
}
)
class StubPlaybookService:
"""PlaybookService 的輕量 Stub — 支援 record_execution + get_recommendations"""
def __init__(self) -> None:
self._playbooks: dict[str, Playbook] = {}
self._recommendations: list = []
def add_playbook(self, playbook: Playbook) -> None:
self._playbooks[playbook.playbook_id] = playbook
def set_recommendations(self, recommendations: list) -> None:
self._recommendations = recommendations
async def get_recommendations(self, symptoms, top_k: int = 3) -> list:
return self._recommendations
async def get_by_id(self, playbook_id: str) -> Playbook | None:
return self._playbooks.get(playbook_id)
async def record_execution(self, playbook_id: str, success: bool) -> bool:
playbook = self._playbooks.get(playbook_id)
if playbook is not None:
if success:
playbook.success_count += 1
else:
playbook.failure_count += 1
return playbook is not None
class StubRecommendation:
def __init__(self, playbook: Playbook, similarity_score: float = 0.9) -> None:
self.playbook = playbook
self.similarity_score = similarity_score
# =============================================================================
# Factories
# =============================================================================
def _make_incident(
incident_id: str = "INC-E2E-001",
severity: Severity = Severity.P2,
) -> Incident:
now = now_taipei()
return Incident(
incident_id=incident_id,
status=IncidentStatus.INVESTIGATING,
severity=severity,
affected_services=["e2e-service"],
signals=[
Signal(
alert_name="TestAlert",
severity=severity,
source="prometheus",
fired_at=now,
labels={"namespace": "awoooi-prod", "alertname": "TestAlert"},
)
],
)
def _make_playbook(
playbook_id: str = "PB-E2E-001",
trust_score: float = 0.5,
) -> Playbook:
pb = Playbook(
playbook_id=playbook_id,
name="E2E 測試 Playbook",
description="飛輪閉環 E2E 測試用",
status=PlaybookStatus.APPROVED,
symptom_pattern=SymptomPattern(
alert_names=["TestAlert"],
affected_services=["e2e-service"],
severity_range=["P2"],
),
repair_steps=[
RepairStep(
step_number=1,
action_type=ActionType.MANUAL,
command="echo test",
risk_level=RiskLevel.LOW,
)
],
trust_score=trust_score,
success_count=5,
failure_count=1,
)
return pb
async def _no_cooldown(*args, **kwargs) -> tuple[bool, str]:
return True, "允許修復 (test bypass)"
# =============================================================================
# Tests
# =============================================================================
@pytest.mark.asyncio
async def test_auto_repair_success_triggers_verify_and_learn(monkeypatch):
"""
執行成功 → verifier.verify() 被呼叫 → record_verification_result 被呼叫
驗證飛輪鏈路的前兩段接通。
"""
stub_verifier = StubVerifier(result="success")
stub_learning = StubLearningService()
# 替換 module-level getterspure Python, no MagicMock
import src.services.auto_repair_service as _ars_mod
monkeypatch.setattr(_ars_mod, "_verifier_getter", None, raising=False)
import src.services.post_execution_verifier as _pev_mod
monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier)
import src.services.learning_service as _ls_mod
monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning)
playbook = _make_playbook()
pb_service = StubPlaybookService()
pb_service.add_playbook(playbook)
pb_service.set_recommendations([StubRecommendation(playbook)])
service = AutoRepairService(
playbook_service=pb_service,
cooldown_checker=_no_cooldown,
)
incident = _make_incident()
result = await service.execute_auto_repair(incident, playbook)
assert result.success is True
# fire-and-forget task — 讓 event loop 執行完
# verifier 有 warmup_sec但 Stub 忽略 warmup不 sleep
await asyncio.sleep(0.05)
assert len(stub_verifier.calls) == 1, "verifier.verify() 應被呼叫一次"
assert stub_verifier.calls[0]["incident_id"] == incident.incident_id
assert stub_verifier.calls[0]["snapshot"] is None
assert len(stub_learning.verification_calls) == 1, "record_verification_result 應被呼叫一次"
call = stub_learning.verification_calls[0]
assert call["incident_id"] == incident.incident_id
assert call["verification_result"] == "success"
assert call["matched_playbook_id"] == playbook.playbook_id
@pytest.mark.asyncio
async def test_auto_repair_failure_does_not_call_verifier(monkeypatch):
"""
執行失敗(步驟拋例外)→ verifier 不被呼叫(失敗路徑不進入 verify-and-learn 區塊)
"""
stub_verifier = StubVerifier(result="success")
import src.services.post_execution_verifier as _pev_mod
monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier)
import src.services.learning_service as _ls_mod
stub_learning = StubLearningService()
monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning)
# 建立一個會讓 _execute_step raise 的 playbookKUBECTL 步驟executor 不可用時只 skip不 raise
# 直接讓 playbook_service.record_execution 正常工作,驗證失敗路徑不呼叫 verifier
class FailingPlaybookService(StubPlaybookService):
async def record_execution(self, playbook_id: str, success: bool) -> bool:
# 正常記錄,不 raise
return True
playbook = _make_playbook()
pb_service = FailingPlaybookService()
pb_service.add_playbook(playbook)
# 讓 _execute_step 拋例外以觸發失敗路徑
original_execute_step = AutoRepairService._execute_step
async def _always_fail(self_inner, incident_arg, step_arg) -> str:
raise RuntimeError("強制測試失敗")
service = AutoRepairService(
playbook_service=pb_service,
cooldown_checker=_no_cooldown,
)
# Monkeypatch instance method
monkeypatch.setattr(AutoRepairService, "_execute_step", _always_fail)
incident = _make_incident()
result = await service.execute_auto_repair(incident, playbook)
assert result.success is False
await asyncio.sleep(0.05)
# 失敗路徑不進入 verify-and-learn 塊
assert len(stub_verifier.calls) == 0, "執行失敗時不應呼叫 verifier"
assert len(stub_learning.verification_calls) == 0, "執行失敗時不應呼叫 record_verification_result"
@pytest.mark.asyncio
async def test_record_verification_result_no_playbook_id_does_not_crash():
"""
matched_playbook_id=None → record_verification_result 正常執行,不 crash。
驗證 learning_service 對 None playbook_id 的防禦性。
"""
from src.services.learning_service import LearningService
from src.repositories.interfaces import ILearningRepository, ITrustRepository
class NullLearningRepo:
async def record_repair(self, **kwargs) -> bool:
return True
async def get_repair_stats(self, *a, **kw):
return {}
async def get_all_repair_stats(self, *a, **kw):
return {}
async def record_disposition(self, *a, **kw):
return True
async def get_dispositions(self, *a, **kw):
return {}
class NullTrustRepo:
async def save_trust_record(self, *a, **kw):
pass
async def load_trust_record(self, *a, **kw):
return None
async def get_all_trust_records(self, *a, **kw):
return []
# 直接呼叫 record_verification_result(matched_playbook_id=None)
# 不應 raise只應 log warning 並略過 _update_playbook_stats
svc = LearningService(
repository=NullLearningRepo(),
trust_repository=NullTrustRepo(),
)
# 不應拋例外
await svc.record_verification_result(
incident_id="INC-NULL-PB-001",
action_taken="auto_repair:none",
verification_result="success",
matched_playbook_id=None,
)
# 只要不 crash 即通過
@pytest.mark.asyncio
async def test_verifier_exception_does_not_block_repair(monkeypatch):
"""
verifier 拋例外 → 修復結果仍回傳 success=Truelearning 不被呼叫。
驗證 _verify_and_learn 的 exception 隔離。
"""
stub_verifier = StubVerifier(
result="success",
raise_exc=RuntimeError("verifier 模擬故障"),
)
stub_learning = StubLearningService()
import src.services.post_execution_verifier as _pev_mod
monkeypatch.setattr(_pev_mod, "_verifier", stub_verifier)
import src.services.learning_service as _ls_mod
monkeypatch.setattr(_ls_mod, "_learning_service", stub_learning)
playbook = _make_playbook()
pb_service = StubPlaybookService()
pb_service.add_playbook(playbook)
service = AutoRepairService(
playbook_service=pb_service,
cooldown_checker=_no_cooldown,
)
incident = _make_incident()
result = await service.execute_auto_repair(incident, playbook)
# 主路徑成功回傳
assert result.success is True
await asyncio.sleep(0.05)
# verifier 被呼叫(但拋了例外)
assert len(stub_verifier.calls) == 1
# learning 不應被呼叫(因為 verifier raise 中斷了 _verify_and_learn
assert len(stub_learning.verification_calls) == 0, "verifier 拋例外後 learning 不應被呼叫"