diff --git a/apps/api/src/services/incident_engine.py b/apps/api/src/services/incident_engine.py index 3e15599f..a2e36bf9 100644 --- a/apps/api/src/services/incident_engine.py +++ b/apps/api/src/services/incident_engine.py @@ -1,26 +1,15 @@ """ -Incident Engine v1.2 - Phase 6.4e DualMemory 整合版 -==================================================== +Incident Engine - Phase 16 lewooogo-brain 整合版 +================================================ -v1.2 重構內容 (Phase 6.4e): -- 整合 DualIncidentMemory 進行 DB 持久化 -- 保持 Lua 原子操作進行 Redis Working Memory 更新 -- 支援從 Episodic Memory (PostgreSQL) 回載 Incident +Phase R-R2 (2026-04-01 ogt): 移除內嵌 IncidentEngine 重複邏輯, + 全面切換至 lewooogo-brain IncidentEngine。 + 完整舊版本歸檔: src/services/_archived/incident_engine_v1.py -v1.1 重構內容 (2026-03-22 架構師審查後修正): -1. O(1) 反向索引: 廢除 SCAN,改用 namespace/target 索引直查 -2. Lua 原子操作: 廢除 Read-Modify-Write,改用 Redis Lua Script -3. 併發防護: 確保告警風暴下不會發生 Race Condition - -功能: -1. 事件聚合 (Alert Aggregation): 將相關告警聚合到同一個 Incident -2. 爆炸半徑分析 (Blast Radius): 透過 GraphRAG 分析受影響服務 -3. 智能去重 (Deduplication): 避免重複告警造成 Incident 爆炸 - -設計原則: -- 30 分鐘時間窗口: 超過此時間的 Incident 視為新事件 -- 關聯判斷: 同 namespace 或同 target 視為相關 -- 狀態過濾: 只聚合 INVESTIGATING 或 MITIGATING 狀態的事件 +架構: +- IncidentMemoryAdapter: 橋接現有 Lua Scripts + DualIncidentMemory +- BlastRadiusAdapter: 包裝 topology_graph 注入 lewooogo-brain +- get_incident_engine(): 統一入口 (Singleton) 統帥鐵律: - 禁止告警風暴: 相關告警必須聚合,減少 Incident 數量 @@ -28,20 +17,14 @@ v1.1 重構內容 (2026-03-22 架構師審查後修正): - 禁止 Race Condition: 所有寫入必須原子操作 """ -import json -from datetime import UTC, datetime from typing import Any, Protocol, runtime_checkable import structlog from src.core.redis_client import get_redis -from src.models.incident import ( - Incident, - Severity, - Signal, -) +from src.models.incident import Incident from src.services.graph_rag import BlastRadiusResult, topology_graph -from src.services.incident_memory import DualIncidentMemory, get_incident_memory +from src.services.incident_memory import get_incident_memory logger = structlog.get_logger(__name__) @@ -52,7 +35,7 @@ logger = structlog.get_logger(__name__) # Redis Key Patterns INCIDENT_KEY_PREFIX = "incident:" -INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id +INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id # 聚合時間窗口: 30 分鐘 @@ -64,7 +47,7 @@ WORKING_MEMORY_TTL = 604800 # ============================================================================= -# Lua Scripts (原子操作) +# Lua Scripts (原子操作) - 供 IncidentMemoryAdapter 使用 # ============================================================================= # Lua Script: 原子聚合 Signal 到 Incident @@ -274,415 +257,6 @@ class IIncidentEngine(Protocol): ... -# ============================================================================= -# Incident Engine v1.1 -# ============================================================================= - -class IncidentEngine: - """ - 事件引擎 v1.1 - 認知覺醒核心 (效能強化版) - - 職責: - 1. 聚合相關告警到同一 Incident (減少噪音) - 2. 整合 GraphRAG 分析爆炸半徑 - 3. 雙層持久化 (Redis + SQLite/PG) - - v1.1 重構: - - O(1) 反向索引取代 O(N) SCAN - - Lua 原子操作取代 Read-Modify-Write - - 完全消除 Race Condition - - 使用方式: - engine = IncidentEngine() - incident = await engine.process_signal(signal_data) - """ - - def __init__(self, memory: DualIncidentMemory | None = None) -> None: - """ - 初始化 IncidentEngine - - Args: - memory: DualIncidentMemory 實例 (可選,預設使用 Singleton) - """ - self._graph = topology_graph - self._memory = memory or get_incident_memory() - self._lua_aggregate_sha: str | None = None - self._lua_create_sha: str | None = None - - # ========================================================================= - # Lua Script 初始化 - # ========================================================================= - - async def _ensure_lua_scripts(self) -> None: - """確保 Lua Scripts 已載入 Redis (SCRIPT LOAD)""" - if self._lua_aggregate_sha and self._lua_create_sha: - return - - redis_client = get_redis() - - # Load aggregate script (for existing incident updates) - self._lua_aggregate_sha = await redis_client.script_load( - LUA_AGGREGATE_SIGNAL - ) - logger.debug( - "lua_script_loaded", - script="aggregate_signal", - sha=self._lua_aggregate_sha, - ) - - # Load unified create-or-aggregate script - self._lua_create_sha = await redis_client.script_load( - LUA_CREATE_OR_AGGREGATE - ) - logger.debug( - "lua_script_loaded", - script="create_or_aggregate", - sha=self._lua_create_sha, - ) - - # ========================================================================= - # 核心方法: 處理 Signal - # ========================================================================= - - async def process_signal( - self, - signal_data: dict[str, Any], - ) -> Incident | None: - """ - 處理 Signal: 原子建立或聚合 Incident - - Phase 6.3 核心邏輯 (v1.1 重構): - 1. 解析 Signal - 2. 單一 Lua Script 原子操作: 建立或聚合 (完全消除 Race Condition) - 3. 調用 GraphRAG 分析爆炸半徑 - 4. 雙層持久化 - - Args: - signal_data: 從 Redis Stream 收到的 Signal 資料 - - Returns: - Incident | None: 處理後的 Incident - """ - try: - # 確保 Lua Scripts 已載入 - await self._ensure_lua_scripts() - - # 1. 解析 Signal - signal = self._parse_signal(signal_data) - namespace = signal_data.get("namespace", "default") - target = signal_data.get("target", "unknown") - - # 在 labels 中加入 namespace - signal.labels["namespace"] = namespace - - logger.info( - "signal_processing", - alert_name=signal.alert_name, - namespace=namespace, - target=target, - ) - - # 2. 單一 Lua Script 原子操作: 建立或聚合 - incident = await self._atomic_create_or_aggregate( - signal=signal, - namespace=namespace, - target=target, - ) - - if not incident: - logger.error( - "atomic_operation_failed", - alert_name=signal.alert_name, - namespace=namespace, - ) - return None - - # 3. GraphRAG 分析爆炸半徑 - await self._analyze_blast_radius(incident, target) - - # 4. 雙層持久化 (DB 層) - await self._persist_to_db(incident) - - return incident - - except Exception as e: - logger.exception( - "process_signal_error", - error=str(e), - ) - return None - - # ========================================================================= - # 原子建立或聚合 (單一 Lua Script - 完全消除 Race Condition) - # ========================================================================= - - async def _atomic_create_or_aggregate( - self, - signal: Signal, - namespace: str, - target: str, - ) -> Incident | None: - """ - 使用單一 Lua Script 原子建立或聚合 Incident - - 核心設計: - 1. 使用 SETNX 搶佔索引作為分散式鎖 - 2. 如果搶到 → 建立新 Incident - 3. 如果沒搶到 → 聚合到已存在的 Incident - 4. 整個流程在 Lua 中原子執行 - - 優點: - - 完全消除 Race Condition - - 單次 Redis 往返完成所有操作 - - 無論多少併發 Signal,同一 namespace/target 只會有一個 Incident - """ - redis_client = get_redis() - - # Redis Keys - ns_index_key = f"{INCIDENT_INDEX_NS}{namespace}" - target_index_key = f"{INCIDENT_INDEX_TARGET}{target}" - - # 準備新 Incident (如果需要建立) - new_incident = Incident( - severity=signal.severity, - signals=[signal], - affected_services=[target], - ) - new_incident_json = new_incident.model_dump_json() - - # Signal 參數 - signal_json = signal.model_dump_json() - severity_str = signal.severity.value - timestamp_str = datetime.now(UTC).isoformat() - - try: - # 執行統一 Lua Script (原子操作) - result = await redis_client.evalsha( - self._lua_create_sha, - 2, # number of keys - ns_index_key, # KEYS[1] - target_index_key, # KEYS[2] - new_incident_json, # ARGV[1] - new incident JSON - new_incident.incident_id, # ARGV[2] - new incident ID - signal_json, # ARGV[3] - new signal JSON - severity_str, # ARGV[4] - severity - timestamp_str, # ARGV[5] - timestamp - str(WORKING_MEMORY_TTL), # ARGV[6] - incident TTL - str(AGGREGATION_WINDOW_SECONDS), # ARGV[7] - index TTL - INCIDENT_KEY_PREFIX, # ARGV[8] - key prefix - ) - - if not result: - logger.error( - "lua_script_returned_nil", - namespace=namespace, - target=target, - ) - return None - - # 解析結果 - result_str = result.decode() if isinstance(result, bytes) else result - - if result_str.startswith("CREATED:"): - incident_json = result_str[8:] # 移除 "CREATED:" 前綴 - incident = self._parse_lua_incident(incident_json) - logger.info( - "incident_created_atomic", - incident_id=incident.incident_id, - severity=incident.severity.value, - namespace=namespace, - signal_count=1, - ) - return incident - - elif result_str.startswith("AGGREGATED:"): - incident_json = result_str[11:] # 移除 "AGGREGATED:" 前綴 - incident = self._parse_lua_incident(incident_json) - logger.info( - "signal_aggregated_atomic", - incident_id=incident.incident_id, - severity=incident.severity.value, - namespace=namespace, - signal_count=len(incident.signals), - ) - return incident - - else: - logger.error( - "lua_script_unexpected_result", - result=result_str[:100], - ) - return None - - except Exception as e: - logger.exception( - "atomic_create_or_aggregate_error", - namespace=namespace, - target=target, - error=str(e), - ) - return None - - # ========================================================================= - # GraphRAG 整合 - # ========================================================================= - - async def _analyze_blast_radius( - self, - incident: Incident, - target: str, - ) -> None: - """ - 調用 GraphRAG 分析爆炸半徑 - - 將結果寫入 incident.affected_services - """ - try: - result: BlastRadiusResult = self._graph.get_blast_radius(target) - - # 合併 affected_services (去重) - for service in result.affected_services: - if service not in incident.affected_services: - incident.affected_services.append(service) - - # 確保 target 本身在列表中 - if target not in incident.affected_services: - incident.affected_services.append(target) - - logger.info( - "blast_radius_analyzed", - incident_id=incident.incident_id, - target=target, - affected_count=result.affected_count, - affected_services=incident.affected_services, - ) - - except Exception as e: - logger.warning( - "blast_radius_analysis_failed", - incident_id=incident.incident_id, - target=target, - error=str(e), - ) - # 失敗時至少保留 target - if target not in incident.affected_services: - incident.affected_services.append(target) - - # ========================================================================= - # 持久化 (DB 層) - Phase 6.4e: 委託給 DualIncidentMemory - # ========================================================================= - - async def _persist_to_db(self, incident: Incident) -> None: - """ - 持久化到 PostgreSQL (Episodic Memory) - - Phase 6.4e: 委託給 DualIncidentMemory.persist_incident() - Redis 已在 Lua Script 中更新,這裡只處理 DB - """ - try: - success = await self._memory.persist_incident(incident) - incident.persisted_to_pg = success - - if success: - logger.debug( - "db_persisted_via_dual_memory", - incident_id=incident.incident_id, - ) - else: - logger.warning( - "db_persist_failed_via_dual_memory", - incident_id=incident.incident_id, - ) - - except Exception as e: - logger.exception("db_save_error", error=str(e)) - - # ========================================================================= - # 從 Episodic Memory 載入 (Phase 6.4e 新增) - # ========================================================================= - - async def get_incident(self, incident_id: str) -> Incident | None: - """ - 取得 Incident - - Phase 6.4e: 委託給 DualIncidentMemory.load_incident() - 優先從 Working Memory (Redis) 讀取,miss 時從 Episodic (PostgreSQL) 讀取 - - Args: - incident_id: Incident ID - - Returns: - Incident 或 None - """ - return await self._memory.load_incident(incident_id) - - # ========================================================================= - # 輔助方法 - # ========================================================================= - - def _parse_lua_incident(self, incident_json: str) -> Incident: - """ - 解析 Lua 返回的 Incident JSON - - 修復 Lua cjson 的問題: - - cjson.encode 會把空陣列 [] 轉成空物件 {} - - 需要手動修復陣列欄位 - """ - data = json.loads(incident_json) - - # 修復可能被轉成空物件的陣列欄位 - array_fields = ["signals", "affected_services", "proposal_ids"] - for field in array_fields: - is_empty_dict = isinstance(data[field], dict) and len(data[field]) == 0 - if field in data and is_empty_dict: - data[field] = [] - - return Incident.model_validate(data) - - def _parse_signal(self, signal_data: dict[str, Any]) -> Signal: - """解析 Signal""" - return Signal( - alert_name=signal_data.get("alert_name", "unknown"), - severity=self._parse_severity(signal_data.get("severity", "warning")), - source=self._parse_source(signal_data.get("source", "manual")), - fired_at=datetime.now(UTC), - labels=self._parse_dict(signal_data.get("labels", "{}")), - annotations=self._parse_dict(signal_data.get("annotations", "{}")), - fingerprint=signal_data.get("fingerprint"), - ) - - def _parse_source(self, source_str: str) -> str: - """解析來源""" - valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"} - if source_str.lower() in valid_sources: - return source_str.lower() - return "manual" - - def _parse_severity(self, severity_str: str) -> Severity: - """解析嚴重度""" - mapping = { - "critical": Severity.P0, - "high": Severity.P1, - "warning": Severity.P2, - "medium": Severity.P2, - "low": Severity.P3, - "info": Severity.P3, - } - return mapping.get(severity_str.lower(), Severity.P2) - - def _parse_dict(self, value: str | dict) -> dict[str, str]: - """解析字典""" - if isinstance(value, dict): - return {str(k): str(v) for k, v in value.items()} - if isinstance(value, str): - try: - parsed = json.loads(value.replace("'", '"')) - return {str(k): str(v) for k, v in parsed.items()} - except (json.JSONDecodeError, TypeError): - return {} - return {} - - # ============================================================================= # Phase 16: 絞殺者模式 - Adapter 實作 # ============================================================================= @@ -698,7 +272,7 @@ class IncidentMemoryAdapter: 建立者: Claude Code """ - def __init__(self, memory: DualIncidentMemory) -> None: + def __init__(self, memory: Any) -> None: self._memory = memory self._lua_create_sha: str | None = None self._lua_aggregate_sha: str | None = None @@ -804,20 +378,23 @@ class BlastRadiusAdapter: # ============================================================================= -# Singleton + 絞殺者模式切換 +# Singleton (Phase R-R2: 僅保留 lewooogo-brain 版本) # ============================================================================= -_incident_engine: IncidentEngine | None = None -_new_incident_engine = None # Type: lewooogo_brain IncidentEngine +_new_incident_engine: Any | None = None -def _get_new_engine(): - """取得 lewooogo-brain 的 IncidentEngine (Phase 16 新版)""" +def get_incident_engine() -> Any: + """ + 取得 Incident Engine 實例 (Singleton) + + Phase R-R2: 統一使用 lewooogo-brain IncidentEngine。 + 回滾方式: git revert Phase R-R2 commit + redeploy。 + """ global _new_incident_engine if _new_incident_engine is None: from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine - # 建立 Adapters memory_adapter = IncidentMemoryAdapter(get_incident_memory()) blast_adapter = BlastRadiusAdapter() @@ -826,33 +403,5 @@ def _get_new_engine(): blast_analyzer=blast_adapter, logger=logger, ) - logger.info("new_incident_engine_initialized", version="lewooogo-brain") + logger.info("incident_engine_initialized", version="lewooogo-brain") return _new_incident_engine - - -def _get_legacy_engine() -> IncidentEngine: - """取得舊版 IncidentEngine""" - global _incident_engine - if _incident_engine is None: - _incident_engine = IncidentEngine() - return _incident_engine - - -def get_incident_engine(): - """ - 取得 Incident Engine 實例 (Singleton + 絞殺者模式) - - Phase 16: 根據 USE_NEW_ENGINE 設定切換引擎 - - False (預設): 使用內嵌版 IncidentEngine - - True: 使用 lewooogo-brain 的 IncidentEngine - - 回滾方式: - kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false - """ - from src.core.config import settings - - if settings.USE_NEW_ENGINE: - logger.debug("using_new_incident_engine", version="lewooogo-brain") - return _get_new_engine() - else: - return _get_legacy_engine() diff --git a/apps/api/src/services/incident_memory.py b/apps/api/src/services/incident_memory.py index 99f57590..35175b69 100644 --- a/apps/api/src/services/incident_memory.py +++ b/apps/api/src/services/incident_memory.py @@ -3,9 +3,11 @@ Incident Memory Provider - 事件記憶體提供者 ============================================ Phase 6.4e: DualIncidentMemory 整合 Phase 16 R1.2: 絞殺者模式 (Strangler Fig Pattern) 2026-03-26 +Phase R-R2 (2026-04-01 ogt): 移除內嵌 DualIncidentMemory 重複邏輯, + 全面切換至 lewooogo-brain。回滾方式: git revert + redeploy。 設計: -- 實作 IIncidentMemory 協定 (Protocol) +- IncidentDbAdapter: SQLAlchemy Bridge,注入 lewooogo-brain DualIncidentMemory - 雙層記憶體: Working (Redis) + Episodic (PostgreSQL) - 反向索引: namespace:target -> incident_id @@ -13,19 +15,9 @@ Phase 16 R1.2: 絞殺者模式 (Strangler Fig Pattern) 2026-03-26 - Working Memory (Redis): 7 天 TTL - Episodic Memory (PostgreSQL): 永久 - 反向索引: 30 分鐘 TTL (聚合窗口) - -Phase 16 絞殺者模式: -- USE_NEW_ENGINE=False: 使用此內嵌版本 (當前預設) -- USE_NEW_ENGINE=True: 使用 lewooogo-brain 套件版本 -- 回滾指令: kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false -- 監控週期: 48 小時驗證期 - -NOTE: 此模組為 lewooogo-brain/adapters/incident_memory.py 的 apps/api 內嵌版本 - Phase 16 R2 階段會在 48 小時驗證通過後移除此檔案 """ -from datetime import UTC, datetime, timedelta -from typing import Any, Protocol +from typing import Any import structlog @@ -37,444 +29,6 @@ from src.models.incident import Incident logger = structlog.get_logger(__name__) -# ============================================================================= -# Constants -# ============================================================================= - -WORKING_MEMORY_TTL = 604800 # 7 天 -AGGREGATION_WINDOW_MINUTES = 30 -INDEX_TTL = 1800 # 索引 30 分鐘 TTL - -# Redis Key Patterns -INCIDENT_KEY_PREFIX = "awoooi:incidents:" -INDEX_PREFIX = "awoooi:incidents:index:" - - -# ============================================================================= -# Protocol Definition (與 lewooogo-brain 保持一致) -# ============================================================================= - -class IIncidentMemory(Protocol): - """Incident 專用記憶體提供者協定""" - - async def load_incident(self, incident_id: str) -> Incident | None: - """從 Working Memory 載入 Incident""" - ... - - async def save_incident(self, incident: Incident, ttl_seconds: int = WORKING_MEMORY_TTL) -> bool: - """儲存 Incident 到 Working Memory (預設 7 天 TTL)""" - ... - - async def persist_incident(self, incident: Incident) -> bool: - """持久化到 Episodic Memory (PostgreSQL)""" - ... - - async def find_related_incident( - self, - namespace: str, - target: str, - window_minutes: int = AGGREGATION_WINDOW_MINUTES, - ) -> Incident | None: - """尋找相關的活躍 Incident (用於聚合)""" - ... - - async def update_index( - self, - incident_id: str, - namespace: str, - target: str, - ) -> bool: - """更新反向索引 (namespace/target -> incident_id)""" - ... - - -# ============================================================================= -# DualIncidentMemory Implementation -# ============================================================================= - -class DualIncidentMemory: - """ - Incident 專用雙層記憶體適配器 - - 實作 IIncidentMemory 協定: - - load_incident: 從 Working/Episodic 載入 - - save_incident: 儲存到 Working - - persist_incident: 持久化到 Episodic - - find_related_incident: 透過反向索引尋找相關 Incident - - update_index: 更新反向索引 - - 反向索引結構: - Key: awoooi:incidents:index:{namespace}:{target} - Value: incident_id - TTL: 30 分鐘 (聚合窗口) - """ - - def __init__(self, redis_client: Any = None, key_prefix: str = INCIDENT_KEY_PREFIX): - """ - 初始化適配器 - - Args: - redis_client: Redis 連線客戶端 (可選,預設使用 get_redis()) - key_prefix: Redis Key 前綴 - """ - self._redis = redis_client - self._key_prefix = key_prefix - self._index_prefix = INDEX_PREFIX - - def _get_redis(self) -> Any: - """取得 Redis 客戶端 (延遲初始化)""" - if self._redis is None: - self._redis = get_redis() - return self._redis - - def _make_key(self, incident_id: str) -> str: - """生成 Incident Key""" - return f"{self._key_prefix}{incident_id}" - - def _make_index_key(self, namespace: str, target: str) -> str: - """生成索引 Key""" - return f"{self._index_prefix}{namespace}:{target}" - - async def load_incident(self, incident_id: str) -> Incident | None: - """ - 載入 Incident - - 策略: - 1. 從 Redis (Working Memory) 讀取 - 2. 若 miss,從 PostgreSQL (Episodic) 讀取 - - Args: - incident_id: Incident ID - - Returns: - Incident 或 None - """ - try: - redis_client = self._get_redis() - key = self._make_key(incident_id) - data = await redis_client.get(key) - - if data is not None: - # JSON -> Incident - return Incident.model_validate_json(data) - - # Working Memory miss, 嘗試從 Episodic Memory 載入 - logger.debug("incident_not_found_in_working", incident_id=incident_id) - - async with get_db_context() as db: - from sqlalchemy import select - stmt = select(IncidentRecord).where( - IncidentRecord.incident_id == incident_id - ) - result = await db.execute(stmt) - record = result.scalar_one_or_none() - - if record: - # 從 DB 重建 Incident - incident = self._record_to_incident(record) - # 寫回 Working Memory (快取) - await self.save_incident(incident) - return incident - - return None - - except Exception as e: - logger.error("load_incident_failed", incident_id=incident_id, error=str(e)) - return None - - async def save_incident( - self, - incident: Incident, - ttl_seconds: int = WORKING_MEMORY_TTL, - ) -> bool: - """ - 儲存 Incident 到 Working Memory (Redis) - - Args: - incident: Incident 物件 - ttl_seconds: TTL (預設 7 天) - - Returns: - 是否成功 - """ - try: - redis_client = self._get_redis() - key = self._make_key(incident.incident_id) - json_data = incident.model_dump_json() - - await redis_client.setex(key, ttl_seconds, json_data) - - logger.debug( - "incident_saved_to_working", - incident_id=incident.incident_id, - ttl=ttl_seconds, - ) - return True - - except Exception as e: - logger.error( - "save_incident_failed", - incident_id=incident.incident_id, - error=str(e), - ) - return False - - async def persist_incident(self, incident: Incident) -> bool: - """ - 持久化到 Episodic Memory (PostgreSQL) - - Args: - incident: Incident 物件 - - Returns: - 是否成功 - """ - try: - async with get_db_context() as db: - from sqlalchemy import select - - # 檢查是否已存在 - stmt = select(IncidentRecord).where( - IncidentRecord.incident_id == incident.incident_id - ) - result = await db.execute(stmt) - existing = result.scalar_one_or_none() - - if existing: - # 更新現有記錄 - existing.status = incident.status.value - existing.severity = incident.severity.value - existing.signals = [ - s.model_dump(mode="json") for s in incident.signals - ] - existing.affected_services = incident.affected_services - existing.updated_at = incident.updated_at - if incident.resolved_at: - existing.resolved_at = incident.resolved_at - if incident.closed_at: - existing.closed_at = incident.closed_at - else: - # 建立新記錄 - record = IncidentRecord( - incident_id=incident.incident_id, - status=incident.status.value, - severity=incident.severity.value, - signals=[ - s.model_dump(mode="json") for s in incident.signals - ], - affected_services=incident.affected_services, - decision_chain=( - incident.decision_chain.model_dump(mode="json") - if incident.decision_chain - else None - ), - proposal_ids=[str(pid) for pid in incident.proposal_ids], - outcome=( - incident.outcome.model_dump(mode="json") - if incident.outcome - else None - ), - created_at=incident.created_at, - updated_at=incident.updated_at, - resolved_at=incident.resolved_at, - closed_at=incident.closed_at, - ttl_days=incident.ttl_days, - vectorized=incident.vectorized, - ) - db.add(record) - - logger.debug( - "incident_persisted_to_episodic", - incident_id=incident.incident_id, - ) - return True - - except Exception as e: - logger.error( - "persist_incident_failed", - incident_id=incident.incident_id, - error=str(e), - ) - return False - - async def find_related_incident( - self, - namespace: str, - target: str, - window_minutes: int = AGGREGATION_WINDOW_MINUTES, - ) -> Incident | None: - """ - 尋找相關的活躍 Incident (用於聚合) - - 透過反向索引快速查找: - 1. 查詢索引 Key: namespace:target -> incident_id - 2. 載入 Incident - 3. 檢查是否仍在聚合窗口內 - - Args: - namespace: 命名空間 - target: 目標服務 - window_minutes: 聚合窗口 (分鐘) - - Returns: - 相關 Incident 或 None - """ - try: - redis_client = self._get_redis() - - # Step 1: 查詢索引 - index_key = self._make_index_key(namespace, target) - incident_id = await redis_client.get(index_key) - - if incident_id is None: - return None - - # 解碼 bytes - if isinstance(incident_id, bytes): - incident_id = incident_id.decode() - - # Step 2: 載入 Incident - incident = await self.load_incident(incident_id) - if incident is None: - # 索引存在但 Incident 不存在,清除索引 - await redis_client.delete(index_key) - return None - - # Step 3: 檢查聚合窗口 - window_start = datetime.now(UTC) - timedelta(minutes=window_minutes) - if incident.updated_at < window_start: - # 超出聚合窗口,不聚合 - logger.debug( - "incident_outside_window", - incident_id=incident_id, - updated_at=incident.updated_at.isoformat(), - ) - return None - - logger.debug( - "found_related_incident", - incident_id=incident_id, - namespace=namespace, - target=target, - ) - return incident - - except Exception as e: - logger.error( - "find_related_incident_failed", - namespace=namespace, - target=target, - error=str(e), - ) - return None - - async def update_index( - self, - incident_id: str, - namespace: str, - target: str, - ) -> bool: - """ - 更新反向索引 - - 索引結構: - Key: awoooi:incidents:index:{namespace}:{target} - Value: incident_id - TTL: 30 分鐘 - - Args: - incident_id: Incident ID - namespace: 命名空間 - target: 目標服務 - - Returns: - 是否成功 - """ - try: - redis_client = self._get_redis() - index_key = self._make_index_key(namespace, target) - await redis_client.setex(index_key, INDEX_TTL, incident_id) - - logger.debug( - "index_updated", - incident_id=incident_id, - namespace=namespace, - target=target, - ttl=INDEX_TTL, - ) - return True - - except Exception as e: - logger.error( - "update_index_failed", - incident_id=incident_id, - namespace=namespace, - target=target, - error=str(e), - ) - return False - - async def delete_incident(self, incident_id: str) -> bool: - """ - 刪除 Incident - - Args: - incident_id: Incident ID - - Returns: - 是否成功 - """ - try: - redis_client = self._get_redis() - key = self._make_key(incident_id) - result = await redis_client.delete(key) - return result > 0 - - except Exception as e: - logger.error( - "delete_incident_failed", - incident_id=incident_id, - error=str(e), - ) - return False - - def _record_to_incident(self, record: IncidentRecord) -> Incident: - """ - 將 DB Record 轉換為 Incident 物件 - - Args: - record: IncidentRecord - - Returns: - Incident - """ - from src.models.incident import ( - IncidentStatus, - Severity, - Signal, - ) - - # 重建 Signals - signals = [] - for s in record.signals or []: - signals.append(Signal.model_validate(s)) - - return Incident( - incident_id=record.incident_id, - status=IncidentStatus(record.status), - severity=Severity(record.severity), - signals=signals, - affected_services=record.affected_services or [], - proposal_ids=record.proposal_ids or [], - created_at=record.created_at, - updated_at=record.updated_at, - resolved_at=record.resolved_at, - closed_at=record.closed_at, - ttl_days=record.ttl_days or 30, - vectorized=record.vectorized or False, - ) - - # ============================================================================= # Phase 16: IncidentDbAdapter (DI 注入實現) # ============================================================================= @@ -512,7 +66,6 @@ class IncidentDbAdapter: async with get_db_context() as db: from sqlalchemy import select - # 檢查是否已存在 stmt = select(IncidentRecord).where( IncidentRecord.incident_id == incident.incident_id ) @@ -520,7 +73,6 @@ class IncidentDbAdapter: existing = result.scalar_one_or_none() if existing: - # 更新現有記錄 existing.status = incident.status.value existing.severity = incident.severity.value existing.signals = [ @@ -533,7 +85,6 @@ class IncidentDbAdapter: if incident.closed_at: existing.closed_at = incident.closed_at else: - # 建立新記錄 record = IncidentRecord( incident_id=incident.incident_id, status=incident.status.value, @@ -571,7 +122,6 @@ class IncidentDbAdapter: def _record_to_incident(self, record: IncidentRecord) -> Incident: """將 DB Record 轉換為 Incident (lewooogo-brain 版本)""" - # 延遲導入 lewooogo-brain 的 Incident from lewooogo_brain.interfaces.incident_processor import ( Incident as BrainIncident, ) @@ -585,7 +135,6 @@ class IncidentDbAdapter: Signal as BrainSignal, ) - # 重建 Signals signals = [] for s in record.signals or []: signals.append(BrainSignal.model_validate(s)) @@ -605,70 +154,44 @@ class IncidentDbAdapter: # ============================================================================= -# Singleton + 絞殺者模式 (Phase 16 R1.2) +# Singleton (Phase R-R2: 僅保留 lewooogo-brain 版本) # ============================================================================= -_dual_memory: DualIncidentMemory | None = None -_new_engine_memory: Any | None = None # lewooogo-brain 版本 -_db_adapter: IncidentDbAdapter | None = None # DB Adapter singleton +_new_engine_memory: Any | None = None +_db_adapter: IncidentDbAdapter | None = None -def get_incident_memory() -> DualIncidentMemory: +def get_incident_memory() -> Any: """ 取得 DualIncidentMemory 實例 (Singleton) - Phase 16 絞殺者模式: - - USE_NEW_ENGINE=False (預設): 返回內嵌版本 - - USE_NEW_ENGINE=True: 返回 lewooogo-brain 套件版本 - - 回滾指令: kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false + Phase R-R2: 統一使用 lewooogo-brain 套件版本。 + 回滾方式: git revert Phase R-R2 commit + redeploy。 """ - from src.core.config import settings - - if settings.USE_NEW_ENGINE: - return _get_new_engine_memory() - else: - return _get_legacy_memory() - - -def _get_legacy_memory() -> DualIncidentMemory: - """取得內嵌版本 (Phase 16 舊引擎)""" - global _dual_memory - if _dual_memory is None: - _dual_memory = DualIncidentMemory() - logger.info("incident_memory_initialized", engine="legacy_embedded") - return _dual_memory + return _get_new_engine_memory() def _get_new_engine_memory() -> Any: """ - 取得 lewooogo-brain 套件版本 (Phase 16 新引擎) + 取得 lewooogo-brain 套件版本 注意事項: - 需要 lewooogo-brain 已安裝 (Dockerfile 已配置) - PostgreSQL 透過 IncidentDbAdapter 注入 (Phase 16 DI 模式) - - 初次啟用建議 48 小時監控 - - 回滾: 設定 USE_NEW_ENGINE=false 即可瞬間切回 """ global _new_engine_memory, _db_adapter if _new_engine_memory is None: try: - # 延遲導入: 避免 lewooogo-brain 未安裝時啟動失敗 from lewooogo_brain.adapters.incident_memory import ( DualIncidentMemory as NewDualIncidentMemory, ) - from src.core.redis_client import get_redis - redis_client = get_redis() - # 初始化 DB Adapter (Phase 16 DI 模式) if _db_adapter is None: _db_adapter = IncidentDbAdapter() - # 初始化 lewooogo-brain 版本 (含 DB Adapter) _new_engine_memory = NewDualIncidentMemory( redis_client=redis_client, db_adapter=_db_adapter, @@ -683,19 +206,17 @@ def _get_new_engine_memory() -> Any: ) except ImportError as e: - # lewooogo-brain 未安裝,降級到內嵌版本 - logger.warning( - "lewooogo_brain_not_available_fallback_to_legacy", + logger.error( + "lewooogo_brain_not_available", error=str(e), ) - return _get_legacy_memory() + raise except Exception as e: - # 其他錯誤,降級到內嵌版本 logger.error( - "new_engine_init_failed_fallback_to_legacy", + "new_engine_init_failed", error=str(e), ) - return _get_legacy_memory() + raise return _new_engine_memory