chore(api): Phase 16 R2 封存舊版代碼

封存:
- incident_memory_v1.py (483 行) - 絞殺者模式前版本
- incident_engine_v1.py (657 行) - 絞殺者模式前版本

策略: 90 天後無問題才刪除 (2026-06-24)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-25 16:08:49 +08:00
parent ef12228cc7
commit 14dc77e4ad
3 changed files with 1170 additions and 20 deletions

View File

@@ -4,38 +4,48 @@
## 封存規則
1. 新代碼穩定運行 **48 小時** 後,才能封存舊代碼
1. 新代碼穩定運行 **8 小時** 後,才能封存舊代碼 (統帥指示縮短)
2. 封存檔案加上 `_v1` 後綴
3. **90 天後** 無問題才真正刪除
4. 所有封存必須記錄在此檔案
---
## 待封存清單 (Phase 16 R2)
| 檔案 | 行數 | 替代方案 | 預計封存日期 |
|------|------|----------|--------------|
| incident_memory.py | ~483 | lewooogo_brain.adapters.incident_memory | 48hr 驗證後 |
| incident_engine.py | ~657 | lewooogo_brain.engines.incident_engine | 48hr 驗證後 |
---
## 已封存
目前無封存檔案。
<!--
## 範例格式
### incident_memory_v1.py
| 欄位 | 值 |
|------|-----|
| 封存日期 | 2026-XX-XX |
| 封存日期 | 2026-03-26 |
| 封存原因 | Phase 16 絞殺者模式,改用 lewooogo-brain |
| 原始位置 | apps/api/src/services/incident_memory.py |
| 替代方案 | lewooogo_brain.adapters.incident_memory |
| 回復指令 | `git checkout a202a26 -- apps/api/src/services/incident_memory.py` |
| 48hr 驗證通過 | 2026-XX-XX |
| 最終刪除日期 | 2026-XX-XX (封存後 90 天) |
-->
| 原始 commit | 6f04987 |
| 回復指令 | `git checkout 6f04987 -- apps/api/src/services/incident_memory.py` |
| 行數 | 483 行 |
| 驗證通過 | 🟢 USE_NEW_ENGINE=true 運行中 |
| 最終刪除日期 | 2026-06-24 (封存後 90 天) |
### incident_engine_v1.py
| 欄位 | 值 |
|------|-----|
| 封存日期 | 2026-03-26 |
| 封存原因 | Phase 16 絞殺者模式,改用 lewooogo-brain |
| 原始位置 | apps/api/src/services/incident_engine.py |
| 替代方案 | lewooogo_brain.engines.incident_engine |
| 原始 commit | 6f04987 |
| 回復指令 | `git checkout 6f04987 -- apps/api/src/services/incident_engine.py` |
| 行數 | 657 行 |
| 驗證通過 | 🟢 USE_NEW_ENGINE=true 運行中 |
| 最終刪除日期 | 2026-06-24 (封存後 90 天) |
---
## 待清理 (90 天後)
| 檔案 | 封存日期 | 刪除日期 |
|------|----------|----------|
| incident_memory_v1.py | 2026-03-26 | 2026-06-24 |
| incident_engine_v1.py | 2026-03-26 | 2026-06-24 |

View File

@@ -0,0 +1,657 @@
"""
Incident Engine v1.2 - Phase 6.4e DualMemory 整合版
====================================================
v1.2 重構內容 (Phase 6.4e):
- 整合 DualIncidentMemory 進行 DB 持久化
- 保持 Lua 原子操作進行 Redis Working Memory 更新
- 支援從 Episodic Memory (PostgreSQL) 回載 Incident
v1.1 重構內容 (2026-03-22 架構師審查後修正):
1. O(1) 反向索引: 廢除 SCAN改用 namespace/target 索引直查
2. Lua 原子操作: 廢除 Read-Modify-Write改用 Redis Lua Script
3. 併發防護: 確保告警風暴下不會發生 Race Condition
功能:
1. 事件聚合 (Alert Aggregation): 將相關告警聚合到同一個 Incident
2. 爆炸半徑分析 (Blast Radius): 透過 GraphRAG 分析受影響服務
3. 智能去重 (Deduplication): 避免重複告警造成 Incident 爆炸
設計原則:
- 30 分鐘時間窗口: 超過此時間的 Incident 視為新事件
- 關聯判斷: 同 namespace 或同 target 視為相關
- 狀態過濾: 只聚合 INVESTIGATING 或 MITIGATING 狀態的事件
統帥鐵律:
- 禁止告警風暴: 相關告警必須聚合,減少 Incident 數量
- 禁止 O(N) 掃描: 所有查詢必須 O(1)
- 禁止 Race Condition: 所有寫入必須原子操作
"""
import json
from datetime import UTC, datetime
from typing import Any
import structlog
from src.core.redis_client import get_redis
from src.models.incident import (
Incident,
Severity,
Signal,
)
from src.services.graph_rag import BlastRadiusResult, topology_graph
from src.services.incident_memory import DualIncidentMemory, get_incident_memory
logger = structlog.get_logger(__name__)
# =============================================================================
# Constants
# =============================================================================
# Redis Key Patterns
INCIDENT_KEY_PREFIX = "incident:"
INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id
INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id
# 聚合時間窗口: 30 分鐘
AGGREGATION_WINDOW_MINUTES = 30
AGGREGATION_WINDOW_SECONDS = AGGREGATION_WINDOW_MINUTES * 60
# Working Memory TTL: 7 天 = 604800 秒
WORKING_MEMORY_TTL = 604800
# =============================================================================
# Lua Scripts (原子操作)
# =============================================================================
# Lua Script: 原子聚合 Signal 到 Incident
# KEYS[1] = incident key (incident:{id})
# ARGV[1] = new signal JSON
# ARGV[2] = new severity string (P0/P1/P2/P3)
# ARGV[3] = current timestamp ISO string
# ARGV[4] = TTL seconds
# Returns: updated incident JSON or nil if not found
LUA_AGGREGATE_SIGNAL = """
local data = redis.call('GET', KEYS[1])
if not data then
return nil
end
local incident = cjson.decode(data)
-- Parse new signal
local new_signal = cjson.decode(ARGV[1])
-- Check fingerprint deduplication
local fingerprint = new_signal.fingerprint
if fingerprint and fingerprint ~= cjson.null then
for _, signal in ipairs(incident.signals) do
if signal.fingerprint == fingerprint then
-- Duplicate detected, return unchanged
return data
end
end
end
-- Append signal atomically
table.insert(incident.signals, new_signal)
-- Severity escalation (P0 < P1 < P2 < P3, lower index = more severe)
local severity_order = {P0=0, P1=1, P2=2, P3=3}
local new_sev = ARGV[2]
local cur_sev = incident.severity
if severity_order[new_sev] and severity_order[cur_sev] then
if severity_order[new_sev] < severity_order[cur_sev] then
incident.severity = new_sev
end
end
-- Update timestamp
incident.updated_at = ARGV[3]
-- Serialize and save with TTL
local new_data = cjson.encode(incident)
redis.call('SET', KEYS[1], new_data, 'EX', tonumber(ARGV[4]))
return new_data
"""
# Lua Script: 原子建立或聚合 Incident (完全消除 Race Condition)
# KEYS[1] = namespace index key (incident:idx:ns:{ns})
# KEYS[2] = target index key (incident:idx:target:{target})
# ARGV[1] = new incident JSON (if creating)
# ARGV[2] = new incident_id
# ARGV[3] = new signal JSON
# ARGV[4] = new severity string (P0/P1/P2/P3)
# ARGV[5] = current timestamp ISO string
# ARGV[6] = incident TTL seconds
# ARGV[7] = index TTL seconds (aggregation window)
# ARGV[8] = incident key prefix
# Returns: "CREATED:{incident_json}" or "AGGREGATED:{incident_json}"
LUA_CREATE_OR_AGGREGATE = """
local ns_index_key = KEYS[1]
local target_index_key = KEYS[2]
local new_incident_json = ARGV[1]
local new_incident_id = ARGV[2]
local new_signal_json = ARGV[3]
local new_severity = ARGV[4]
local timestamp = ARGV[5]
local incident_ttl = tonumber(ARGV[6])
local index_ttl = tonumber(ARGV[7])
local incident_key_prefix = ARGV[8]
-- Step 1: 嘗試搶佔 namespace 索引 (SETNX 原子操作)
local ns_set_result = redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl, 'NX')
if ns_set_result then
-- 我們是第一個!建立新 Incident
local incident_key = incident_key_prefix .. new_incident_id
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
-- 設置 target 索引
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
return "CREATED:" .. new_incident_json
end
-- Step 2: 索引已存在,查找現有 Incident ID
local existing_incident_id = redis.call('GET', ns_index_key)
if not existing_incident_id then
-- 可能剛好過期,嘗試 target 索引
existing_incident_id = redis.call('GET', target_index_key)
end
if not existing_incident_id then
-- 兩個索引都沒有,建立新的 (邊緣情況)
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
local incident_key = incident_key_prefix .. new_incident_id
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
return "CREATED:" .. new_incident_json
end
-- Step 3: 聚合到現有 Incident
local incident_key = incident_key_prefix .. existing_incident_id
local existing_data = redis.call('GET', incident_key)
if not existing_data then
-- Incident 已過期但索引未過期,建立新的
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl)
local new_incident_key = incident_key_prefix .. new_incident_id
redis.call('SET', new_incident_key, new_incident_json, 'EX', incident_ttl)
return "CREATED:" .. new_incident_json
end
-- Step 4: 原子聚合 Signal
local incident = cjson.decode(existing_data)
local new_signal = cjson.decode(new_signal_json)
-- 修復 cjson 空陣列問題 (cjson 會把 [] 變成 {})
if type(incident.proposal_ids) == "table" and next(incident.proposal_ids) == nil then
incident.proposal_ids = cjson.empty_array
end
if type(incident.affected_services) == "table" and next(incident.affected_services) == nil then
incident.affected_services = cjson.empty_array
end
-- Fingerprint 去重
local fingerprint = new_signal.fingerprint
if fingerprint and fingerprint ~= cjson.null then
for _, signal in ipairs(incident.signals) do
if signal.fingerprint == fingerprint then
return "AGGREGATED:" .. existing_data
end
end
end
-- 附加 Signal
table.insert(incident.signals, new_signal)
-- Severity 升級
local severity_order = {P0=0, P1=1, P2=2, P3=3}
if severity_order[new_severity] and severity_order[incident.severity] then
if severity_order[new_severity] < severity_order[incident.severity] then
incident.severity = new_severity
end
end
-- 更新時間戳
incident.updated_at = timestamp
-- 保存並返回
local updated_json = cjson.encode(incident)
redis.call('SET', incident_key, updated_json, 'EX', incident_ttl)
return "AGGREGATED:" .. updated_json
"""
# =============================================================================
# Incident Engine v1.1
# =============================================================================
class IncidentEngine:
"""
事件引擎 v1.1 - 認知覺醒核心 (效能強化版)
職責:
1. 聚合相關告警到同一 Incident (減少噪音)
2. 整合 GraphRAG 分析爆炸半徑
3. 雙層持久化 (Redis + SQLite/PG)
v1.1 重構:
- O(1) 反向索引取代 O(N) SCAN
- Lua 原子操作取代 Read-Modify-Write
- 完全消除 Race Condition
使用方式:
engine = IncidentEngine()
incident = await engine.process_signal(signal_data)
"""
def __init__(self, memory: DualIncidentMemory | None = None) -> None:
"""
初始化 IncidentEngine
Args:
memory: DualIncidentMemory 實例 (可選,預設使用 Singleton)
"""
self._graph = topology_graph
self._memory = memory or get_incident_memory()
self._lua_aggregate_sha: str | None = None
self._lua_create_sha: str | None = None
# =========================================================================
# Lua Script 初始化
# =========================================================================
async def _ensure_lua_scripts(self) -> None:
"""確保 Lua Scripts 已載入 Redis (SCRIPT LOAD)"""
if self._lua_aggregate_sha and self._lua_create_sha:
return
redis_client = get_redis()
# Load aggregate script (for existing incident updates)
self._lua_aggregate_sha = await redis_client.script_load(
LUA_AGGREGATE_SIGNAL
)
logger.debug(
"lua_script_loaded",
script="aggregate_signal",
sha=self._lua_aggregate_sha,
)
# Load unified create-or-aggregate script
self._lua_create_sha = await redis_client.script_load(
LUA_CREATE_OR_AGGREGATE
)
logger.debug(
"lua_script_loaded",
script="create_or_aggregate",
sha=self._lua_create_sha,
)
# =========================================================================
# 核心方法: 處理 Signal
# =========================================================================
async def process_signal(
self,
signal_data: dict[str, Any],
) -> Incident | None:
"""
處理 Signal: 原子建立或聚合 Incident
Phase 6.3 核心邏輯 (v1.1 重構):
1. 解析 Signal
2. 單一 Lua Script 原子操作: 建立或聚合 (完全消除 Race Condition)
3. 調用 GraphRAG 分析爆炸半徑
4. 雙層持久化
Args:
signal_data: 從 Redis Stream 收到的 Signal 資料
Returns:
Incident | None: 處理後的 Incident
"""
try:
# 確保 Lua Scripts 已載入
await self._ensure_lua_scripts()
# 1. 解析 Signal
signal = self._parse_signal(signal_data)
namespace = signal_data.get("namespace", "default")
target = signal_data.get("target", "unknown")
# 在 labels 中加入 namespace
signal.labels["namespace"] = namespace
logger.info(
"signal_processing",
alert_name=signal.alert_name,
namespace=namespace,
target=target,
)
# 2. 單一 Lua Script 原子操作: 建立或聚合
incident = await self._atomic_create_or_aggregate(
signal=signal,
namespace=namespace,
target=target,
)
if not incident:
logger.error(
"atomic_operation_failed",
alert_name=signal.alert_name,
namespace=namespace,
)
return None
# 3. GraphRAG 分析爆炸半徑
await self._analyze_blast_radius(incident, target)
# 4. 雙層持久化 (DB 層)
await self._persist_to_db(incident)
return incident
except Exception as e:
logger.exception(
"process_signal_error",
error=str(e),
)
return None
# =========================================================================
# 原子建立或聚合 (單一 Lua Script - 完全消除 Race Condition)
# =========================================================================
async def _atomic_create_or_aggregate(
self,
signal: Signal,
namespace: str,
target: str,
) -> Incident | None:
"""
使用單一 Lua Script 原子建立或聚合 Incident
核心設計:
1. 使用 SETNX 搶佔索引作為分散式鎖
2. 如果搶到 → 建立新 Incident
3. 如果沒搶到 → 聚合到已存在的 Incident
4. 整個流程在 Lua 中原子執行
優點:
- 完全消除 Race Condition
- 單次 Redis 往返完成所有操作
- 無論多少併發 Signal同一 namespace/target 只會有一個 Incident
"""
redis_client = get_redis()
# Redis Keys
ns_index_key = f"{INCIDENT_INDEX_NS}{namespace}"
target_index_key = f"{INCIDENT_INDEX_TARGET}{target}"
# 準備新 Incident (如果需要建立)
new_incident = Incident(
severity=signal.severity,
signals=[signal],
affected_services=[target],
)
new_incident_json = new_incident.model_dump_json()
# Signal 參數
signal_json = signal.model_dump_json()
severity_str = signal.severity.value
timestamp_str = datetime.now(UTC).isoformat()
try:
# 執行統一 Lua Script (原子操作)
result = await redis_client.evalsha(
self._lua_create_sha,
2, # number of keys
ns_index_key, # KEYS[1]
target_index_key, # KEYS[2]
new_incident_json, # ARGV[1] - new incident JSON
new_incident.incident_id, # ARGV[2] - new incident ID
signal_json, # ARGV[3] - new signal JSON
severity_str, # ARGV[4] - severity
timestamp_str, # ARGV[5] - timestamp
str(WORKING_MEMORY_TTL), # ARGV[6] - incident TTL
str(AGGREGATION_WINDOW_SECONDS), # ARGV[7] - index TTL
INCIDENT_KEY_PREFIX, # ARGV[8] - key prefix
)
if not result:
logger.error(
"lua_script_returned_nil",
namespace=namespace,
target=target,
)
return None
# 解析結果
result_str = result.decode() if isinstance(result, bytes) else result
if result_str.startswith("CREATED:"):
incident_json = result_str[8:] # 移除 "CREATED:" 前綴
incident = self._parse_lua_incident(incident_json)
logger.info(
"incident_created_atomic",
incident_id=incident.incident_id,
severity=incident.severity.value,
namespace=namespace,
signal_count=1,
)
return incident
elif result_str.startswith("AGGREGATED:"):
incident_json = result_str[11:] # 移除 "AGGREGATED:" 前綴
incident = self._parse_lua_incident(incident_json)
logger.info(
"signal_aggregated_atomic",
incident_id=incident.incident_id,
severity=incident.severity.value,
namespace=namespace,
signal_count=len(incident.signals),
)
return incident
else:
logger.error(
"lua_script_unexpected_result",
result=result_str[:100],
)
return None
except Exception as e:
logger.exception(
"atomic_create_or_aggregate_error",
namespace=namespace,
target=target,
error=str(e),
)
return None
# =========================================================================
# GraphRAG 整合
# =========================================================================
async def _analyze_blast_radius(
self,
incident: Incident,
target: str,
) -> None:
"""
調用 GraphRAG 分析爆炸半徑
將結果寫入 incident.affected_services
"""
try:
result: BlastRadiusResult = self._graph.get_blast_radius(target)
# 合併 affected_services (去重)
for service in result.affected_services:
if service not in incident.affected_services:
incident.affected_services.append(service)
# 確保 target 本身在列表中
if target not in incident.affected_services:
incident.affected_services.append(target)
logger.info(
"blast_radius_analyzed",
incident_id=incident.incident_id,
target=target,
affected_count=result.affected_count,
affected_services=incident.affected_services,
)
except Exception as e:
logger.warning(
"blast_radius_analysis_failed",
incident_id=incident.incident_id,
target=target,
error=str(e),
)
# 失敗時至少保留 target
if target not in incident.affected_services:
incident.affected_services.append(target)
# =========================================================================
# 持久化 (DB 層) - Phase 6.4e: 委託給 DualIncidentMemory
# =========================================================================
async def _persist_to_db(self, incident: Incident) -> None:
"""
持久化到 PostgreSQL (Episodic Memory)
Phase 6.4e: 委託給 DualIncidentMemory.persist_incident()
Redis 已在 Lua Script 中更新,這裡只處理 DB
"""
try:
success = await self._memory.persist_incident(incident)
incident.persisted_to_pg = success
if success:
logger.debug(
"db_persisted_via_dual_memory",
incident_id=incident.incident_id,
)
else:
logger.warning(
"db_persist_failed_via_dual_memory",
incident_id=incident.incident_id,
)
except Exception as e:
logger.exception("db_save_error", error=str(e))
# =========================================================================
# 從 Episodic Memory 載入 (Phase 6.4e 新增)
# =========================================================================
async def get_incident(self, incident_id: str) -> Incident | None:
"""
取得 Incident
Phase 6.4e: 委託給 DualIncidentMemory.load_incident()
優先從 Working Memory (Redis) 讀取miss 時從 Episodic (PostgreSQL) 讀取
Args:
incident_id: Incident ID
Returns:
Incident 或 None
"""
return await self._memory.load_incident(incident_id)
# =========================================================================
# 輔助方法
# =========================================================================
def _parse_lua_incident(self, incident_json: str) -> Incident:
"""
解析 Lua 返回的 Incident JSON
修復 Lua cjson 的問題:
- cjson.encode 會把空陣列 [] 轉成空物件 {}
- 需要手動修復陣列欄位
"""
data = json.loads(incident_json)
# 修復可能被轉成空物件的陣列欄位
array_fields = ["signals", "affected_services", "proposal_ids"]
for field in array_fields:
if field in data and isinstance(data[field], dict) and len(data[field]) == 0:
data[field] = []
return Incident.model_validate(data)
def _parse_signal(self, signal_data: dict[str, Any]) -> Signal:
"""解析 Signal"""
return Signal(
alert_name=signal_data.get("alert_name", "unknown"),
severity=self._parse_severity(signal_data.get("severity", "warning")),
source=self._parse_source(signal_data.get("source", "manual")),
fired_at=datetime.now(UTC),
labels=self._parse_dict(signal_data.get("labels", "{}")),
annotations=self._parse_dict(signal_data.get("annotations", "{}")),
fingerprint=signal_data.get("fingerprint"),
)
def _parse_source(self, source_str: str) -> str:
"""解析來源"""
valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"}
if source_str.lower() in valid_sources:
return source_str.lower()
return "manual"
def _parse_severity(self, severity_str: str) -> Severity:
"""解析嚴重度"""
mapping = {
"critical": Severity.P0,
"high": Severity.P1,
"warning": Severity.P2,
"medium": Severity.P2,
"low": Severity.P3,
"info": Severity.P3,
}
return mapping.get(severity_str.lower(), Severity.P2)
def _parse_dict(self, value: str | dict) -> dict[str, str]:
"""解析字典"""
if isinstance(value, dict):
return {str(k): str(v) for k, v in value.items()}
if isinstance(value, str):
try:
parsed = json.loads(value.replace("'", '"'))
return {str(k): str(v) for k, v in parsed.items()}
except (json.JSONDecodeError, TypeError):
return {}
return {}
# =============================================================================
# Singleton
# =============================================================================
_incident_engine: IncidentEngine | None = None
def get_incident_engine() -> IncidentEngine:
"""取得 Incident Engine 實例 (Singleton)"""
global _incident_engine
if _incident_engine is None:
_incident_engine = IncidentEngine()
return _incident_engine

View File

@@ -0,0 +1,483 @@
"""
Incident Memory Provider - 事件記憶體提供者
============================================
Phase 6.4e: DualIncidentMemory 整合
設計:
- 實作 IIncidentMemory 協定 (Protocol)
- 雙層記憶體: Working (Redis) + Episodic (PostgreSQL)
- 反向索引: namespace:target -> incident_id
統帥鐵律:
- Working Memory (Redis): 7 天 TTL
- Episodic Memory (PostgreSQL): 永久
- 反向索引: 30 分鐘 TTL (聚合窗口)
NOTE: 此模組為 lewooogo-brain/adapters/incident_memory.py 的 apps/api 內嵌版本
待 Phase 6.4i 完成 monorepo Docker 解法後,將直接引用 lewooogo-brain 套件
"""
from datetime import UTC, datetime, timedelta
from typing import Any, Protocol
import structlog
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from src.models.incident import Incident
logger = structlog.get_logger(__name__)
# =============================================================================
# Constants
# =============================================================================
WORKING_MEMORY_TTL = 604800 # 7 天
AGGREGATION_WINDOW_MINUTES = 30
INDEX_TTL = 1800 # 索引 30 分鐘 TTL
# Redis Key Patterns
INCIDENT_KEY_PREFIX = "awoooi:incidents:"
INDEX_PREFIX = "awoooi:incidents:index:"
# =============================================================================
# Protocol Definition (與 lewooogo-brain 保持一致)
# =============================================================================
class IIncidentMemory(Protocol):
"""Incident 專用記憶體提供者協定"""
async def load_incident(self, incident_id: str) -> Incident | None:
"""從 Working Memory 載入 Incident"""
...
async def save_incident(self, incident: Incident, ttl_seconds: int = WORKING_MEMORY_TTL) -> bool:
"""儲存 Incident 到 Working Memory (預設 7 天 TTL)"""
...
async def persist_incident(self, incident: Incident) -> bool:
"""持久化到 Episodic Memory (PostgreSQL)"""
...
async def find_related_incident(
self,
namespace: str,
target: str,
window_minutes: int = AGGREGATION_WINDOW_MINUTES,
) -> Incident | None:
"""尋找相關的活躍 Incident (用於聚合)"""
...
async def update_index(
self,
incident_id: str,
namespace: str,
target: str,
) -> bool:
"""更新反向索引 (namespace/target -> incident_id)"""
...
# =============================================================================
# DualIncidentMemory Implementation
# =============================================================================
class DualIncidentMemory:
"""
Incident 專用雙層記憶體適配器
實作 IIncidentMemory 協定:
- load_incident: 從 Working/Episodic 載入
- save_incident: 儲存到 Working
- persist_incident: 持久化到 Episodic
- find_related_incident: 透過反向索引尋找相關 Incident
- update_index: 更新反向索引
反向索引結構:
Key: awoooi:incidents:index:{namespace}:{target}
Value: incident_id
TTL: 30 分鐘 (聚合窗口)
"""
def __init__(self, redis_client: Any = None, key_prefix: str = INCIDENT_KEY_PREFIX):
"""
初始化適配器
Args:
redis_client: Redis 連線客戶端 (可選,預設使用 get_redis())
key_prefix: Redis Key 前綴
"""
self._redis = redis_client
self._key_prefix = key_prefix
self._index_prefix = INDEX_PREFIX
def _get_redis(self) -> Any:
"""取得 Redis 客戶端 (延遲初始化)"""
if self._redis is None:
self._redis = get_redis()
return self._redis
def _make_key(self, incident_id: str) -> str:
"""生成 Incident Key"""
return f"{self._key_prefix}{incident_id}"
def _make_index_key(self, namespace: str, target: str) -> str:
"""生成索引 Key"""
return f"{self._index_prefix}{namespace}:{target}"
async def load_incident(self, incident_id: str) -> Incident | None:
"""
載入 Incident
策略:
1. 從 Redis (Working Memory) 讀取
2. 若 miss從 PostgreSQL (Episodic) 讀取
Args:
incident_id: Incident ID
Returns:
Incident 或 None
"""
try:
redis_client = self._get_redis()
key = self._make_key(incident_id)
data = await redis_client.get(key)
if data is not None:
# JSON -> Incident
return Incident.model_validate_json(data)
# Working Memory miss, 嘗試從 Episodic Memory 載入
logger.debug("incident_not_found_in_working", incident_id=incident_id)
async with get_db_context() as db:
from sqlalchemy import select
stmt = select(IncidentRecord).where(
IncidentRecord.incident_id == incident_id
)
result = await db.execute(stmt)
record = result.scalar_one_or_none()
if record:
# 從 DB 重建 Incident
incident = self._record_to_incident(record)
# 寫回 Working Memory (快取)
await self.save_incident(incident)
return incident
return None
except Exception as e:
logger.error("load_incident_failed", incident_id=incident_id, error=str(e))
return None
async def save_incident(
self,
incident: Incident,
ttl_seconds: int = WORKING_MEMORY_TTL,
) -> bool:
"""
儲存 Incident 到 Working Memory (Redis)
Args:
incident: Incident 物件
ttl_seconds: TTL (預設 7 天)
Returns:
是否成功
"""
try:
redis_client = self._get_redis()
key = self._make_key(incident.incident_id)
json_data = incident.model_dump_json()
await redis_client.setex(key, ttl_seconds, json_data)
logger.debug(
"incident_saved_to_working",
incident_id=incident.incident_id,
ttl=ttl_seconds,
)
return True
except Exception as e:
logger.error(
"save_incident_failed",
incident_id=incident.incident_id,
error=str(e),
)
return False
async def persist_incident(self, incident: Incident) -> bool:
"""
持久化到 Episodic Memory (PostgreSQL)
Args:
incident: Incident 物件
Returns:
是否成功
"""
try:
async with get_db_context() as db:
from sqlalchemy import select
# 檢查是否已存在
stmt = select(IncidentRecord).where(
IncidentRecord.incident_id == incident.incident_id
)
result = await db.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
# 更新現有記錄
existing.status = incident.status.value
existing.severity = incident.severity.value
existing.signals = [
s.model_dump(mode="json") for s in incident.signals
]
existing.affected_services = incident.affected_services
existing.updated_at = incident.updated_at
if incident.resolved_at:
existing.resolved_at = incident.resolved_at
if incident.closed_at:
existing.closed_at = incident.closed_at
else:
# 建立新記錄
record = IncidentRecord(
incident_id=incident.incident_id,
status=incident.status.value,
severity=incident.severity.value,
signals=[
s.model_dump(mode="json") for s in incident.signals
],
affected_services=incident.affected_services,
decision_chain=(
incident.decision_chain.model_dump(mode="json")
if incident.decision_chain
else None
),
proposal_ids=[str(pid) for pid in incident.proposal_ids],
outcome=(
incident.outcome.model_dump(mode="json")
if incident.outcome
else None
),
created_at=incident.created_at,
updated_at=incident.updated_at,
resolved_at=incident.resolved_at,
closed_at=incident.closed_at,
ttl_days=incident.ttl_days,
vectorized=incident.vectorized,
)
db.add(record)
logger.debug(
"incident_persisted_to_episodic",
incident_id=incident.incident_id,
)
return True
except Exception as e:
logger.error(
"persist_incident_failed",
incident_id=incident.incident_id,
error=str(e),
)
return False
async def find_related_incident(
self,
namespace: str,
target: str,
window_minutes: int = AGGREGATION_WINDOW_MINUTES,
) -> Incident | None:
"""
尋找相關的活躍 Incident (用於聚合)
透過反向索引快速查找:
1. 查詢索引 Key: namespace:target -> incident_id
2. 載入 Incident
3. 檢查是否仍在聚合窗口內
Args:
namespace: 命名空間
target: 目標服務
window_minutes: 聚合窗口 (分鐘)
Returns:
相關 Incident 或 None
"""
try:
redis_client = self._get_redis()
# Step 1: 查詢索引
index_key = self._make_index_key(namespace, target)
incident_id = await redis_client.get(index_key)
if incident_id is None:
return None
# 解碼 bytes
if isinstance(incident_id, bytes):
incident_id = incident_id.decode()
# Step 2: 載入 Incident
incident = await self.load_incident(incident_id)
if incident is None:
# 索引存在但 Incident 不存在,清除索引
await redis_client.delete(index_key)
return None
# Step 3: 檢查聚合窗口
window_start = datetime.now(UTC) - timedelta(minutes=window_minutes)
if incident.updated_at < window_start:
# 超出聚合窗口,不聚合
logger.debug(
"incident_outside_window",
incident_id=incident_id,
updated_at=incident.updated_at.isoformat(),
)
return None
logger.debug(
"found_related_incident",
incident_id=incident_id,
namespace=namespace,
target=target,
)
return incident
except Exception as e:
logger.error(
"find_related_incident_failed",
namespace=namespace,
target=target,
error=str(e),
)
return None
async def update_index(
self,
incident_id: str,
namespace: str,
target: str,
) -> bool:
"""
更新反向索引
索引結構:
Key: awoooi:incidents:index:{namespace}:{target}
Value: incident_id
TTL: 30 分鐘
Args:
incident_id: Incident ID
namespace: 命名空間
target: 目標服務
Returns:
是否成功
"""
try:
redis_client = self._get_redis()
index_key = self._make_index_key(namespace, target)
await redis_client.setex(index_key, INDEX_TTL, incident_id)
logger.debug(
"index_updated",
incident_id=incident_id,
namespace=namespace,
target=target,
ttl=INDEX_TTL,
)
return True
except Exception as e:
logger.error(
"update_index_failed",
incident_id=incident_id,
namespace=namespace,
target=target,
error=str(e),
)
return False
async def delete_incident(self, incident_id: str) -> bool:
"""
刪除 Incident
Args:
incident_id: Incident ID
Returns:
是否成功
"""
try:
redis_client = self._get_redis()
key = self._make_key(incident_id)
result = await redis_client.delete(key)
return result > 0
except Exception as e:
logger.error(
"delete_incident_failed",
incident_id=incident_id,
error=str(e),
)
return False
def _record_to_incident(self, record: IncidentRecord) -> Incident:
"""
將 DB Record 轉換為 Incident 物件
Args:
record: IncidentRecord
Returns:
Incident
"""
from src.models.incident import (
IncidentStatus,
Severity,
Signal,
)
# 重建 Signals
signals = []
for s in record.signals or []:
signals.append(Signal.model_validate(s))
return Incident(
incident_id=record.incident_id,
status=IncidentStatus(record.status),
severity=Severity(record.severity),
signals=signals,
affected_services=record.affected_services or [],
proposal_ids=record.proposal_ids or [],
created_at=record.created_at,
updated_at=record.updated_at,
resolved_at=record.resolved_at,
closed_at=record.closed_at,
ttl_days=record.ttl_days or 30,
vectorized=record.vectorized or False,
)
# =============================================================================
# Singleton
# =============================================================================
_dual_memory: DualIncidentMemory | None = None
def get_incident_memory() -> DualIncidentMemory:
"""取得 DualIncidentMemory 實例 (Singleton)"""
global _dual_memory
if _dual_memory is None:
_dual_memory = DualIncidentMemory()
return _dual_memory