diff --git a/apps/api/src/services/_archived/ARCHIVE_LOG.md b/apps/api/src/services/_archived/ARCHIVE_LOG.md index 47a210e3..8bfafd09 100644 --- a/apps/api/src/services/_archived/ARCHIVE_LOG.md +++ b/apps/api/src/services/_archived/ARCHIVE_LOG.md @@ -4,38 +4,48 @@ ## 封存規則 -1. 新代碼穩定運行 **48 小時** 後,才能封存舊代碼 +1. 新代碼穩定運行 **8 小時** 後,才能封存舊代碼 (統帥指示縮短) 2. 封存檔案加上 `_v1` 後綴 3. **90 天後** 無問題才真正刪除 4. 所有封存必須記錄在此檔案 --- -## 待封存清單 (Phase 16 R2) - -| 檔案 | 行數 | 替代方案 | 預計封存日期 | -|------|------|----------|--------------| -| incident_memory.py | ~483 | lewooogo_brain.adapters.incident_memory | 48hr 驗證後 | -| incident_engine.py | ~657 | lewooogo_brain.engines.incident_engine | 48hr 驗證後 | - ---- - ## 已封存 -目前無封存檔案。 - - +| 原始 commit | 6f04987 | +| 回復指令 | `git checkout 6f04987 -- apps/api/src/services/incident_memory.py` | +| 行數 | 483 行 | +| 驗證通過 | 🟢 USE_NEW_ENGINE=true 運行中 | +| 最終刪除日期 | 2026-06-24 (封存後 90 天) | + +### incident_engine_v1.py + +| 欄位 | 值 | +|------|-----| +| 封存日期 | 2026-03-26 | +| 封存原因 | Phase 16 絞殺者模式,改用 lewooogo-brain | +| 原始位置 | apps/api/src/services/incident_engine.py | +| 替代方案 | lewooogo_brain.engines.incident_engine | +| 原始 commit | 6f04987 | +| 回復指令 | `git checkout 6f04987 -- apps/api/src/services/incident_engine.py` | +| 行數 | 657 行 | +| 驗證通過 | 🟢 USE_NEW_ENGINE=true 運行中 | +| 最終刪除日期 | 2026-06-24 (封存後 90 天) | + +--- + +## 待清理 (90 天後) + +| 檔案 | 封存日期 | 刪除日期 | +|------|----------|----------| +| incident_memory_v1.py | 2026-03-26 | 2026-06-24 | +| incident_engine_v1.py | 2026-03-26 | 2026-06-24 | diff --git a/apps/api/src/services/_archived/incident_engine_v1.py b/apps/api/src/services/_archived/incident_engine_v1.py new file mode 100644 index 00000000..31287bd5 --- /dev/null +++ b/apps/api/src/services/_archived/incident_engine_v1.py @@ -0,0 +1,657 @@ +""" +Incident Engine v1.2 - Phase 6.4e DualMemory 整合版 +==================================================== + +v1.2 重構內容 (Phase 6.4e): +- 整合 DualIncidentMemory 進行 DB 持久化 +- 保持 Lua 原子操作進行 Redis Working Memory 更新 +- 支援從 Episodic Memory (PostgreSQL) 回載 Incident + +v1.1 重構內容 (2026-03-22 架構師審查後修正): +1. O(1) 反向索引: 廢除 SCAN,改用 namespace/target 索引直查 +2. Lua 原子操作: 廢除 Read-Modify-Write,改用 Redis Lua Script +3. 併發防護: 確保告警風暴下不會發生 Race Condition + +功能: +1. 事件聚合 (Alert Aggregation): 將相關告警聚合到同一個 Incident +2. 爆炸半徑分析 (Blast Radius): 透過 GraphRAG 分析受影響服務 +3. 智能去重 (Deduplication): 避免重複告警造成 Incident 爆炸 + +設計原則: +- 30 分鐘時間窗口: 超過此時間的 Incident 視為新事件 +- 關聯判斷: 同 namespace 或同 target 視為相關 +- 狀態過濾: 只聚合 INVESTIGATING 或 MITIGATING 狀態的事件 + +統帥鐵律: +- 禁止告警風暴: 相關告警必須聚合,減少 Incident 數量 +- 禁止 O(N) 掃描: 所有查詢必須 O(1) +- 禁止 Race Condition: 所有寫入必須原子操作 +""" + +import json +from datetime import UTC, datetime +from typing import Any + +import structlog + +from src.core.redis_client import get_redis +from src.models.incident import ( + Incident, + Severity, + Signal, +) +from src.services.graph_rag import BlastRadiusResult, topology_graph +from src.services.incident_memory import DualIncidentMemory, get_incident_memory + +logger = structlog.get_logger(__name__) + + +# ============================================================================= +# Constants +# ============================================================================= + +# Redis Key Patterns +INCIDENT_KEY_PREFIX = "incident:" +INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id +INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id + +# 聚合時間窗口: 30 分鐘 +AGGREGATION_WINDOW_MINUTES = 30 +AGGREGATION_WINDOW_SECONDS = AGGREGATION_WINDOW_MINUTES * 60 + +# Working Memory TTL: 7 天 = 604800 秒 +WORKING_MEMORY_TTL = 604800 + + +# ============================================================================= +# Lua Scripts (原子操作) +# ============================================================================= + +# Lua Script: 原子聚合 Signal 到 Incident +# KEYS[1] = incident key (incident:{id}) +# ARGV[1] = new signal JSON +# ARGV[2] = new severity string (P0/P1/P2/P3) +# ARGV[3] = current timestamp ISO string +# ARGV[4] = TTL seconds +# Returns: updated incident JSON or nil if not found +LUA_AGGREGATE_SIGNAL = """ +local data = redis.call('GET', KEYS[1]) +if not data then + return nil +end + +local incident = cjson.decode(data) + +-- Parse new signal +local new_signal = cjson.decode(ARGV[1]) + +-- Check fingerprint deduplication +local fingerprint = new_signal.fingerprint +if fingerprint and fingerprint ~= cjson.null then + for _, signal in ipairs(incident.signals) do + if signal.fingerprint == fingerprint then + -- Duplicate detected, return unchanged + return data + end + end +end + +-- Append signal atomically +table.insert(incident.signals, new_signal) + +-- Severity escalation (P0 < P1 < P2 < P3, lower index = more severe) +local severity_order = {P0=0, P1=1, P2=2, P3=3} +local new_sev = ARGV[2] +local cur_sev = incident.severity +if severity_order[new_sev] and severity_order[cur_sev] then + if severity_order[new_sev] < severity_order[cur_sev] then + incident.severity = new_sev + end +end + +-- Update timestamp +incident.updated_at = ARGV[3] + +-- Serialize and save with TTL +local new_data = cjson.encode(incident) +redis.call('SET', KEYS[1], new_data, 'EX', tonumber(ARGV[4])) + +return new_data +""" + +# Lua Script: 原子建立或聚合 Incident (完全消除 Race Condition) +# KEYS[1] = namespace index key (incident:idx:ns:{ns}) +# KEYS[2] = target index key (incident:idx:target:{target}) +# ARGV[1] = new incident JSON (if creating) +# ARGV[2] = new incident_id +# ARGV[3] = new signal JSON +# ARGV[4] = new severity string (P0/P1/P2/P3) +# ARGV[5] = current timestamp ISO string +# ARGV[6] = incident TTL seconds +# ARGV[7] = index TTL seconds (aggregation window) +# ARGV[8] = incident key prefix +# Returns: "CREATED:{incident_json}" or "AGGREGATED:{incident_json}" +LUA_CREATE_OR_AGGREGATE = """ +local ns_index_key = KEYS[1] +local target_index_key = KEYS[2] +local new_incident_json = ARGV[1] +local new_incident_id = ARGV[2] +local new_signal_json = ARGV[3] +local new_severity = ARGV[4] +local timestamp = ARGV[5] +local incident_ttl = tonumber(ARGV[6]) +local index_ttl = tonumber(ARGV[7]) +local incident_key_prefix = ARGV[8] + +-- Step 1: 嘗試搶佔 namespace 索引 (SETNX 原子操作) +local ns_set_result = redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl, 'NX') + +if ns_set_result then + -- 我們是第一個!建立新 Incident + local incident_key = incident_key_prefix .. new_incident_id + redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl) + + -- 設置 target 索引 + redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX') + + return "CREATED:" .. new_incident_json +end + +-- Step 2: 索引已存在,查找現有 Incident ID +local existing_incident_id = redis.call('GET', ns_index_key) +if not existing_incident_id then + -- 可能剛好過期,嘗試 target 索引 + existing_incident_id = redis.call('GET', target_index_key) +end + +if not existing_incident_id then + -- 兩個索引都沒有,建立新的 (邊緣情況) + redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl) + redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX') + + local incident_key = incident_key_prefix .. new_incident_id + redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl) + + return "CREATED:" .. new_incident_json +end + +-- Step 3: 聚合到現有 Incident +local incident_key = incident_key_prefix .. existing_incident_id +local existing_data = redis.call('GET', incident_key) + +if not existing_data then + -- Incident 已過期但索引未過期,建立新的 + redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl) + redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl) + + local new_incident_key = incident_key_prefix .. new_incident_id + redis.call('SET', new_incident_key, new_incident_json, 'EX', incident_ttl) + + return "CREATED:" .. new_incident_json +end + +-- Step 4: 原子聚合 Signal +local incident = cjson.decode(existing_data) +local new_signal = cjson.decode(new_signal_json) + +-- 修復 cjson 空陣列問題 (cjson 會把 [] 變成 {}) +if type(incident.proposal_ids) == "table" and next(incident.proposal_ids) == nil then + incident.proposal_ids = cjson.empty_array +end +if type(incident.affected_services) == "table" and next(incident.affected_services) == nil then + incident.affected_services = cjson.empty_array +end + +-- Fingerprint 去重 +local fingerprint = new_signal.fingerprint +if fingerprint and fingerprint ~= cjson.null then + for _, signal in ipairs(incident.signals) do + if signal.fingerprint == fingerprint then + return "AGGREGATED:" .. existing_data + end + end +end + +-- 附加 Signal +table.insert(incident.signals, new_signal) + +-- Severity 升級 +local severity_order = {P0=0, P1=1, P2=2, P3=3} +if severity_order[new_severity] and severity_order[incident.severity] then + if severity_order[new_severity] < severity_order[incident.severity] then + incident.severity = new_severity + end +end + +-- 更新時間戳 +incident.updated_at = timestamp + +-- 保存並返回 +local updated_json = cjson.encode(incident) +redis.call('SET', incident_key, updated_json, 'EX', incident_ttl) + +return "AGGREGATED:" .. updated_json +""" + + +# ============================================================================= +# Incident Engine v1.1 +# ============================================================================= + +class IncidentEngine: + """ + 事件引擎 v1.1 - 認知覺醒核心 (效能強化版) + + 職責: + 1. 聚合相關告警到同一 Incident (減少噪音) + 2. 整合 GraphRAG 分析爆炸半徑 + 3. 雙層持久化 (Redis + SQLite/PG) + + v1.1 重構: + - O(1) 反向索引取代 O(N) SCAN + - Lua 原子操作取代 Read-Modify-Write + - 完全消除 Race Condition + + 使用方式: + engine = IncidentEngine() + incident = await engine.process_signal(signal_data) + """ + + def __init__(self, memory: DualIncidentMemory | None = None) -> None: + """ + 初始化 IncidentEngine + + Args: + memory: DualIncidentMemory 實例 (可選,預設使用 Singleton) + """ + self._graph = topology_graph + self._memory = memory or get_incident_memory() + self._lua_aggregate_sha: str | None = None + self._lua_create_sha: str | None = None + + # ========================================================================= + # Lua Script 初始化 + # ========================================================================= + + async def _ensure_lua_scripts(self) -> None: + """確保 Lua Scripts 已載入 Redis (SCRIPT LOAD)""" + if self._lua_aggregate_sha and self._lua_create_sha: + return + + redis_client = get_redis() + + # Load aggregate script (for existing incident updates) + self._lua_aggregate_sha = await redis_client.script_load( + LUA_AGGREGATE_SIGNAL + ) + logger.debug( + "lua_script_loaded", + script="aggregate_signal", + sha=self._lua_aggregate_sha, + ) + + # Load unified create-or-aggregate script + self._lua_create_sha = await redis_client.script_load( + LUA_CREATE_OR_AGGREGATE + ) + logger.debug( + "lua_script_loaded", + script="create_or_aggregate", + sha=self._lua_create_sha, + ) + + # ========================================================================= + # 核心方法: 處理 Signal + # ========================================================================= + + async def process_signal( + self, + signal_data: dict[str, Any], + ) -> Incident | None: + """ + 處理 Signal: 原子建立或聚合 Incident + + Phase 6.3 核心邏輯 (v1.1 重構): + 1. 解析 Signal + 2. 單一 Lua Script 原子操作: 建立或聚合 (完全消除 Race Condition) + 3. 調用 GraphRAG 分析爆炸半徑 + 4. 雙層持久化 + + Args: + signal_data: 從 Redis Stream 收到的 Signal 資料 + + Returns: + Incident | None: 處理後的 Incident + """ + try: + # 確保 Lua Scripts 已載入 + await self._ensure_lua_scripts() + + # 1. 解析 Signal + signal = self._parse_signal(signal_data) + namespace = signal_data.get("namespace", "default") + target = signal_data.get("target", "unknown") + + # 在 labels 中加入 namespace + signal.labels["namespace"] = namespace + + logger.info( + "signal_processing", + alert_name=signal.alert_name, + namespace=namespace, + target=target, + ) + + # 2. 單一 Lua Script 原子操作: 建立或聚合 + incident = await self._atomic_create_or_aggregate( + signal=signal, + namespace=namespace, + target=target, + ) + + if not incident: + logger.error( + "atomic_operation_failed", + alert_name=signal.alert_name, + namespace=namespace, + ) + return None + + # 3. GraphRAG 分析爆炸半徑 + await self._analyze_blast_radius(incident, target) + + # 4. 雙層持久化 (DB 層) + await self._persist_to_db(incident) + + return incident + + except Exception as e: + logger.exception( + "process_signal_error", + error=str(e), + ) + return None + + # ========================================================================= + # 原子建立或聚合 (單一 Lua Script - 完全消除 Race Condition) + # ========================================================================= + + async def _atomic_create_or_aggregate( + self, + signal: Signal, + namespace: str, + target: str, + ) -> Incident | None: + """ + 使用單一 Lua Script 原子建立或聚合 Incident + + 核心設計: + 1. 使用 SETNX 搶佔索引作為分散式鎖 + 2. 如果搶到 → 建立新 Incident + 3. 如果沒搶到 → 聚合到已存在的 Incident + 4. 整個流程在 Lua 中原子執行 + + 優點: + - 完全消除 Race Condition + - 單次 Redis 往返完成所有操作 + - 無論多少併發 Signal,同一 namespace/target 只會有一個 Incident + """ + redis_client = get_redis() + + # Redis Keys + ns_index_key = f"{INCIDENT_INDEX_NS}{namespace}" + target_index_key = f"{INCIDENT_INDEX_TARGET}{target}" + + # 準備新 Incident (如果需要建立) + new_incident = Incident( + severity=signal.severity, + signals=[signal], + affected_services=[target], + ) + new_incident_json = new_incident.model_dump_json() + + # Signal 參數 + signal_json = signal.model_dump_json() + severity_str = signal.severity.value + timestamp_str = datetime.now(UTC).isoformat() + + try: + # 執行統一 Lua Script (原子操作) + result = await redis_client.evalsha( + self._lua_create_sha, + 2, # number of keys + ns_index_key, # KEYS[1] + target_index_key, # KEYS[2] + new_incident_json, # ARGV[1] - new incident JSON + new_incident.incident_id, # ARGV[2] - new incident ID + signal_json, # ARGV[3] - new signal JSON + severity_str, # ARGV[4] - severity + timestamp_str, # ARGV[5] - timestamp + str(WORKING_MEMORY_TTL), # ARGV[6] - incident TTL + str(AGGREGATION_WINDOW_SECONDS), # ARGV[7] - index TTL + INCIDENT_KEY_PREFIX, # ARGV[8] - key prefix + ) + + if not result: + logger.error( + "lua_script_returned_nil", + namespace=namespace, + target=target, + ) + return None + + # 解析結果 + result_str = result.decode() if isinstance(result, bytes) else result + + if result_str.startswith("CREATED:"): + incident_json = result_str[8:] # 移除 "CREATED:" 前綴 + incident = self._parse_lua_incident(incident_json) + logger.info( + "incident_created_atomic", + incident_id=incident.incident_id, + severity=incident.severity.value, + namespace=namespace, + signal_count=1, + ) + return incident + + elif result_str.startswith("AGGREGATED:"): + incident_json = result_str[11:] # 移除 "AGGREGATED:" 前綴 + incident = self._parse_lua_incident(incident_json) + logger.info( + "signal_aggregated_atomic", + incident_id=incident.incident_id, + severity=incident.severity.value, + namespace=namespace, + signal_count=len(incident.signals), + ) + return incident + + else: + logger.error( + "lua_script_unexpected_result", + result=result_str[:100], + ) + return None + + except Exception as e: + logger.exception( + "atomic_create_or_aggregate_error", + namespace=namespace, + target=target, + error=str(e), + ) + return None + + # ========================================================================= + # GraphRAG 整合 + # ========================================================================= + + async def _analyze_blast_radius( + self, + incident: Incident, + target: str, + ) -> None: + """ + 調用 GraphRAG 分析爆炸半徑 + + 將結果寫入 incident.affected_services + """ + try: + result: BlastRadiusResult = self._graph.get_blast_radius(target) + + # 合併 affected_services (去重) + for service in result.affected_services: + if service not in incident.affected_services: + incident.affected_services.append(service) + + # 確保 target 本身在列表中 + if target not in incident.affected_services: + incident.affected_services.append(target) + + logger.info( + "blast_radius_analyzed", + incident_id=incident.incident_id, + target=target, + affected_count=result.affected_count, + affected_services=incident.affected_services, + ) + + except Exception as e: + logger.warning( + "blast_radius_analysis_failed", + incident_id=incident.incident_id, + target=target, + error=str(e), + ) + # 失敗時至少保留 target + if target not in incident.affected_services: + incident.affected_services.append(target) + + # ========================================================================= + # 持久化 (DB 層) - Phase 6.4e: 委託給 DualIncidentMemory + # ========================================================================= + + async def _persist_to_db(self, incident: Incident) -> None: + """ + 持久化到 PostgreSQL (Episodic Memory) + + Phase 6.4e: 委託給 DualIncidentMemory.persist_incident() + Redis 已在 Lua Script 中更新,這裡只處理 DB + """ + try: + success = await self._memory.persist_incident(incident) + incident.persisted_to_pg = success + + if success: + logger.debug( + "db_persisted_via_dual_memory", + incident_id=incident.incident_id, + ) + else: + logger.warning( + "db_persist_failed_via_dual_memory", + incident_id=incident.incident_id, + ) + + except Exception as e: + logger.exception("db_save_error", error=str(e)) + + # ========================================================================= + # 從 Episodic Memory 載入 (Phase 6.4e 新增) + # ========================================================================= + + async def get_incident(self, incident_id: str) -> Incident | None: + """ + 取得 Incident + + Phase 6.4e: 委託給 DualIncidentMemory.load_incident() + 優先從 Working Memory (Redis) 讀取,miss 時從 Episodic (PostgreSQL) 讀取 + + Args: + incident_id: Incident ID + + Returns: + Incident 或 None + """ + return await self._memory.load_incident(incident_id) + + # ========================================================================= + # 輔助方法 + # ========================================================================= + + def _parse_lua_incident(self, incident_json: str) -> Incident: + """ + 解析 Lua 返回的 Incident JSON + + 修復 Lua cjson 的問題: + - cjson.encode 會把空陣列 [] 轉成空物件 {} + - 需要手動修復陣列欄位 + """ + data = json.loads(incident_json) + + # 修復可能被轉成空物件的陣列欄位 + array_fields = ["signals", "affected_services", "proposal_ids"] + for field in array_fields: + if field in data and isinstance(data[field], dict) and len(data[field]) == 0: + data[field] = [] + + return Incident.model_validate(data) + + def _parse_signal(self, signal_data: dict[str, Any]) -> Signal: + """解析 Signal""" + return Signal( + alert_name=signal_data.get("alert_name", "unknown"), + severity=self._parse_severity(signal_data.get("severity", "warning")), + source=self._parse_source(signal_data.get("source", "manual")), + fired_at=datetime.now(UTC), + labels=self._parse_dict(signal_data.get("labels", "{}")), + annotations=self._parse_dict(signal_data.get("annotations", "{}")), + fingerprint=signal_data.get("fingerprint"), + ) + + def _parse_source(self, source_str: str) -> str: + """解析來源""" + valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"} + if source_str.lower() in valid_sources: + return source_str.lower() + return "manual" + + def _parse_severity(self, severity_str: str) -> Severity: + """解析嚴重度""" + mapping = { + "critical": Severity.P0, + "high": Severity.P1, + "warning": Severity.P2, + "medium": Severity.P2, + "low": Severity.P3, + "info": Severity.P3, + } + return mapping.get(severity_str.lower(), Severity.P2) + + def _parse_dict(self, value: str | dict) -> dict[str, str]: + """解析字典""" + if isinstance(value, dict): + return {str(k): str(v) for k, v in value.items()} + if isinstance(value, str): + try: + parsed = json.loads(value.replace("'", '"')) + return {str(k): str(v) for k, v in parsed.items()} + except (json.JSONDecodeError, TypeError): + return {} + return {} + + +# ============================================================================= +# Singleton +# ============================================================================= + +_incident_engine: IncidentEngine | None = None + + +def get_incident_engine() -> IncidentEngine: + """取得 Incident Engine 實例 (Singleton)""" + global _incident_engine + if _incident_engine is None: + _incident_engine = IncidentEngine() + return _incident_engine diff --git a/apps/api/src/services/_archived/incident_memory_v1.py b/apps/api/src/services/_archived/incident_memory_v1.py new file mode 100644 index 00000000..c0fdaa82 --- /dev/null +++ b/apps/api/src/services/_archived/incident_memory_v1.py @@ -0,0 +1,483 @@ +""" +Incident Memory Provider - 事件記憶體提供者 +============================================ +Phase 6.4e: DualIncidentMemory 整合 + +設計: +- 實作 IIncidentMemory 協定 (Protocol) +- 雙層記憶體: Working (Redis) + Episodic (PostgreSQL) +- 反向索引: namespace:target -> incident_id + +統帥鐵律: +- Working Memory (Redis): 7 天 TTL +- Episodic Memory (PostgreSQL): 永久 +- 反向索引: 30 分鐘 TTL (聚合窗口) + +NOTE: 此模組為 lewooogo-brain/adapters/incident_memory.py 的 apps/api 內嵌版本 + 待 Phase 6.4i 完成 monorepo Docker 解法後,將直接引用 lewooogo-brain 套件 +""" + +from datetime import UTC, datetime, timedelta +from typing import Any, Protocol + +import structlog + +from src.core.redis_client import get_redis +from src.db.base import get_db_context +from src.db.models import IncidentRecord +from src.models.incident import Incident + +logger = structlog.get_logger(__name__) + + +# ============================================================================= +# Constants +# ============================================================================= + +WORKING_MEMORY_TTL = 604800 # 7 天 +AGGREGATION_WINDOW_MINUTES = 30 +INDEX_TTL = 1800 # 索引 30 分鐘 TTL + +# Redis Key Patterns +INCIDENT_KEY_PREFIX = "awoooi:incidents:" +INDEX_PREFIX = "awoooi:incidents:index:" + + +# ============================================================================= +# Protocol Definition (與 lewooogo-brain 保持一致) +# ============================================================================= + +class IIncidentMemory(Protocol): + """Incident 專用記憶體提供者協定""" + + async def load_incident(self, incident_id: str) -> Incident | None: + """從 Working Memory 載入 Incident""" + ... + + async def save_incident(self, incident: Incident, ttl_seconds: int = WORKING_MEMORY_TTL) -> bool: + """儲存 Incident 到 Working Memory (預設 7 天 TTL)""" + ... + + async def persist_incident(self, incident: Incident) -> bool: + """持久化到 Episodic Memory (PostgreSQL)""" + ... + + async def find_related_incident( + self, + namespace: str, + target: str, + window_minutes: int = AGGREGATION_WINDOW_MINUTES, + ) -> Incident | None: + """尋找相關的活躍 Incident (用於聚合)""" + ... + + async def update_index( + self, + incident_id: str, + namespace: str, + target: str, + ) -> bool: + """更新反向索引 (namespace/target -> incident_id)""" + ... + + +# ============================================================================= +# DualIncidentMemory Implementation +# ============================================================================= + +class DualIncidentMemory: + """ + Incident 專用雙層記憶體適配器 + + 實作 IIncidentMemory 協定: + - load_incident: 從 Working/Episodic 載入 + - save_incident: 儲存到 Working + - persist_incident: 持久化到 Episodic + - find_related_incident: 透過反向索引尋找相關 Incident + - update_index: 更新反向索引 + + 反向索引結構: + Key: awoooi:incidents:index:{namespace}:{target} + Value: incident_id + TTL: 30 分鐘 (聚合窗口) + """ + + def __init__(self, redis_client: Any = None, key_prefix: str = INCIDENT_KEY_PREFIX): + """ + 初始化適配器 + + Args: + redis_client: Redis 連線客戶端 (可選,預設使用 get_redis()) + key_prefix: Redis Key 前綴 + """ + self._redis = redis_client + self._key_prefix = key_prefix + self._index_prefix = INDEX_PREFIX + + def _get_redis(self) -> Any: + """取得 Redis 客戶端 (延遲初始化)""" + if self._redis is None: + self._redis = get_redis() + return self._redis + + def _make_key(self, incident_id: str) -> str: + """生成 Incident Key""" + return f"{self._key_prefix}{incident_id}" + + def _make_index_key(self, namespace: str, target: str) -> str: + """生成索引 Key""" + return f"{self._index_prefix}{namespace}:{target}" + + async def load_incident(self, incident_id: str) -> Incident | None: + """ + 載入 Incident + + 策略: + 1. 從 Redis (Working Memory) 讀取 + 2. 若 miss,從 PostgreSQL (Episodic) 讀取 + + Args: + incident_id: Incident ID + + Returns: + Incident 或 None + """ + try: + redis_client = self._get_redis() + key = self._make_key(incident_id) + data = await redis_client.get(key) + + if data is not None: + # JSON -> Incident + return Incident.model_validate_json(data) + + # Working Memory miss, 嘗試從 Episodic Memory 載入 + logger.debug("incident_not_found_in_working", incident_id=incident_id) + + async with get_db_context() as db: + from sqlalchemy import select + stmt = select(IncidentRecord).where( + IncidentRecord.incident_id == incident_id + ) + result = await db.execute(stmt) + record = result.scalar_one_or_none() + + if record: + # 從 DB 重建 Incident + incident = self._record_to_incident(record) + # 寫回 Working Memory (快取) + await self.save_incident(incident) + return incident + + return None + + except Exception as e: + logger.error("load_incident_failed", incident_id=incident_id, error=str(e)) + return None + + async def save_incident( + self, + incident: Incident, + ttl_seconds: int = WORKING_MEMORY_TTL, + ) -> bool: + """ + 儲存 Incident 到 Working Memory (Redis) + + Args: + incident: Incident 物件 + ttl_seconds: TTL (預設 7 天) + + Returns: + 是否成功 + """ + try: + redis_client = self._get_redis() + key = self._make_key(incident.incident_id) + json_data = incident.model_dump_json() + + await redis_client.setex(key, ttl_seconds, json_data) + + logger.debug( + "incident_saved_to_working", + incident_id=incident.incident_id, + ttl=ttl_seconds, + ) + return True + + except Exception as e: + logger.error( + "save_incident_failed", + incident_id=incident.incident_id, + error=str(e), + ) + return False + + async def persist_incident(self, incident: Incident) -> bool: + """ + 持久化到 Episodic Memory (PostgreSQL) + + Args: + incident: Incident 物件 + + Returns: + 是否成功 + """ + try: + async with get_db_context() as db: + from sqlalchemy import select + + # 檢查是否已存在 + stmt = select(IncidentRecord).where( + IncidentRecord.incident_id == incident.incident_id + ) + result = await db.execute(stmt) + existing = result.scalar_one_or_none() + + if existing: + # 更新現有記錄 + existing.status = incident.status.value + existing.severity = incident.severity.value + existing.signals = [ + s.model_dump(mode="json") for s in incident.signals + ] + existing.affected_services = incident.affected_services + existing.updated_at = incident.updated_at + if incident.resolved_at: + existing.resolved_at = incident.resolved_at + if incident.closed_at: + existing.closed_at = incident.closed_at + else: + # 建立新記錄 + record = IncidentRecord( + incident_id=incident.incident_id, + status=incident.status.value, + severity=incident.severity.value, + signals=[ + s.model_dump(mode="json") for s in incident.signals + ], + affected_services=incident.affected_services, + decision_chain=( + incident.decision_chain.model_dump(mode="json") + if incident.decision_chain + else None + ), + proposal_ids=[str(pid) for pid in incident.proposal_ids], + outcome=( + incident.outcome.model_dump(mode="json") + if incident.outcome + else None + ), + created_at=incident.created_at, + updated_at=incident.updated_at, + resolved_at=incident.resolved_at, + closed_at=incident.closed_at, + ttl_days=incident.ttl_days, + vectorized=incident.vectorized, + ) + db.add(record) + + logger.debug( + "incident_persisted_to_episodic", + incident_id=incident.incident_id, + ) + return True + + except Exception as e: + logger.error( + "persist_incident_failed", + incident_id=incident.incident_id, + error=str(e), + ) + return False + + async def find_related_incident( + self, + namespace: str, + target: str, + window_minutes: int = AGGREGATION_WINDOW_MINUTES, + ) -> Incident | None: + """ + 尋找相關的活躍 Incident (用於聚合) + + 透過反向索引快速查找: + 1. 查詢索引 Key: namespace:target -> incident_id + 2. 載入 Incident + 3. 檢查是否仍在聚合窗口內 + + Args: + namespace: 命名空間 + target: 目標服務 + window_minutes: 聚合窗口 (分鐘) + + Returns: + 相關 Incident 或 None + """ + try: + redis_client = self._get_redis() + + # Step 1: 查詢索引 + index_key = self._make_index_key(namespace, target) + incident_id = await redis_client.get(index_key) + + if incident_id is None: + return None + + # 解碼 bytes + if isinstance(incident_id, bytes): + incident_id = incident_id.decode() + + # Step 2: 載入 Incident + incident = await self.load_incident(incident_id) + if incident is None: + # 索引存在但 Incident 不存在,清除索引 + await redis_client.delete(index_key) + return None + + # Step 3: 檢查聚合窗口 + window_start = datetime.now(UTC) - timedelta(minutes=window_minutes) + if incident.updated_at < window_start: + # 超出聚合窗口,不聚合 + logger.debug( + "incident_outside_window", + incident_id=incident_id, + updated_at=incident.updated_at.isoformat(), + ) + return None + + logger.debug( + "found_related_incident", + incident_id=incident_id, + namespace=namespace, + target=target, + ) + return incident + + except Exception as e: + logger.error( + "find_related_incident_failed", + namespace=namespace, + target=target, + error=str(e), + ) + return None + + async def update_index( + self, + incident_id: str, + namespace: str, + target: str, + ) -> bool: + """ + 更新反向索引 + + 索引結構: + Key: awoooi:incidents:index:{namespace}:{target} + Value: incident_id + TTL: 30 分鐘 + + Args: + incident_id: Incident ID + namespace: 命名空間 + target: 目標服務 + + Returns: + 是否成功 + """ + try: + redis_client = self._get_redis() + index_key = self._make_index_key(namespace, target) + await redis_client.setex(index_key, INDEX_TTL, incident_id) + + logger.debug( + "index_updated", + incident_id=incident_id, + namespace=namespace, + target=target, + ttl=INDEX_TTL, + ) + return True + + except Exception as e: + logger.error( + "update_index_failed", + incident_id=incident_id, + namespace=namespace, + target=target, + error=str(e), + ) + return False + + async def delete_incident(self, incident_id: str) -> bool: + """ + 刪除 Incident + + Args: + incident_id: Incident ID + + Returns: + 是否成功 + """ + try: + redis_client = self._get_redis() + key = self._make_key(incident_id) + result = await redis_client.delete(key) + return result > 0 + + except Exception as e: + logger.error( + "delete_incident_failed", + incident_id=incident_id, + error=str(e), + ) + return False + + def _record_to_incident(self, record: IncidentRecord) -> Incident: + """ + 將 DB Record 轉換為 Incident 物件 + + Args: + record: IncidentRecord + + Returns: + Incident + """ + from src.models.incident import ( + IncidentStatus, + Severity, + Signal, + ) + + # 重建 Signals + signals = [] + for s in record.signals or []: + signals.append(Signal.model_validate(s)) + + return Incident( + incident_id=record.incident_id, + status=IncidentStatus(record.status), + severity=Severity(record.severity), + signals=signals, + affected_services=record.affected_services or [], + proposal_ids=record.proposal_ids or [], + created_at=record.created_at, + updated_at=record.updated_at, + resolved_at=record.resolved_at, + closed_at=record.closed_at, + ttl_days=record.ttl_days or 30, + vectorized=record.vectorized or False, + ) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_dual_memory: DualIncidentMemory | None = None + + +def get_incident_memory() -> DualIncidentMemory: + """取得 DualIncidentMemory 實例 (Singleton)""" + global _dual_memory + if _dual_memory is None: + _dual_memory = DualIncidentMemory() + return _dual_memory