Files
awoooi/apps/api/src/core/circuit_breaker.py
OG T 27509db212 feat(api): Wave 1 安全網 - Circuit Breaker + Global Repair Cooldown
ADR-038: OpenClaw 雙層保護
- Layer 1: Circuit Breaker (5 failures → 60s cooldown)
- Layer 2: Concurrency Semaphore (max 3 concurrent)
- 新增 src/core/circuit_breaker.py

ADR-039: 全域修復熔斷
- Global Cooldown: 5 repairs/15min → freeze
- StatefulSet Blacklist: postgres/redis/clickhouse 禁止自動重啟
- 新增 src/services/global_repair_cooldown.py
- 整合到 auto_repair_service.py

測試:
- test_circuit_breaker.py (狀態轉換 + Semaphore)
- test_global_repair_cooldown.py (黑名單 + 計數閾值)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-29 15:48:03 +08:00

135 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenClaw 推理引擎保護機制
=========================
ADR-038: 雙層保護策略
- Layer 1: Circuit Breaker防失敗傳播
- Layer 2: Concurrency Semaphore防 Thundering Herd
遵循 leWOOOgo 積木化鐵律:
- 此模組屬於 core/ 基礎設施層
- 不依賴任何 Service 層
- 透過 Singleton 提供全域狀態
2026-03-29 ogt - ADR-038 實作
"""
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
import structlog
logger = structlog.get_logger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # 正常運作
OPEN = "open" # 斷路(快速失敗)
HALF_OPEN = "half_open" # 試探性恢復
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # 連續失敗次數觸發斷路
timeout_s: float = 60.0 # 斷路後冷卻時間(秒)
max_concurrent: int = 3 # 最大並發 LLM 推理數
class OpenClawGuard:
"""
OpenClaw 雙層推理保護門衛
使用方式:
guard = get_openclaw_guard()
if guard.is_circuit_open():
return None # 快速失敗
async with guard.semaphore: # 排隊等待
try:
result = await call_openclaw(...)
guard.record_success()
return result
except Exception:
guard.record_failure()
raise
"""
def __init__(self, config: CircuitBreakerConfig | None = None):
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self.failure_count = 0
self._opened_at: float | None = None
# Semaphore 必須在 event loop 中建立
self._semaphore: asyncio.Semaphore | None = None
@property
def semaphore(self) -> asyncio.Semaphore:
if self._semaphore is None:
self._semaphore = asyncio.Semaphore(self.config.max_concurrent)
return self._semaphore
def is_circuit_open(self) -> bool:
"""檢查 Circuit Breaker 是否處於斷路狀態"""
if self.state == CircuitState.OPEN:
if self._opened_at and time.time() - self._opened_at > self.config.timeout_s:
self.state = CircuitState.HALF_OPEN
logger.info("circuit_breaker_half_open")
return False
return True
return False
def record_success(self) -> None:
"""記錄成功呼叫,重置失敗計數"""
self.failure_count = 0
if self.state != CircuitState.CLOSED:
logger.info("circuit_breaker_closed")
self.state = CircuitState.CLOSED
def record_failure(self) -> None:
"""記錄失敗呼叫,可能觸發斷路"""
self.failure_count += 1
if self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
self._opened_at = time.time()
logger.warning(
"circuit_breaker_opened",
failure_count=self.failure_count,
cooldown_s=self.config.timeout_s,
)
def get_metrics(self) -> dict:
"""取得目前狀態指標"""
return {
"state": self.state.value,
"failure_count": self.failure_count,
"max_concurrent": self.config.max_concurrent,
"opened_at": self._opened_at,
}
def reset(self) -> None:
"""重置狀態(供測試用)"""
self.state = CircuitState.CLOSED
self.failure_count = 0
self._opened_at = None
self._semaphore = None
# 全域 Singleton
_guard: OpenClawGuard | None = None
def get_openclaw_guard() -> OpenClawGuard:
"""取得全域 OpenClaw 保護門衛"""
global _guard
if _guard is None:
_guard = OpenClawGuard()
return _guard
def reset_openclaw_guard() -> None:
"""重置全域 Guard供測試用"""
global _guard
_guard = None