- incident_memory.py: 移除 ~480 行 DualIncidentMemory + IIncidentMemory 內嵌版本 保留 IncidentDbAdapter (SQLAlchemy bridge) + get_incident_memory() singleton - incident_engine.py: 移除 ~405 行 IncidentEngine 舊版內嵌類別 保留 IncidentMemoryAdapter + BlastRadiusAdapter (lewooogo-brain 橋接) - 全面切換至 lewooogo-brain 套件 (USE_NEW_ENGINE=True 已驗證穩定) - 測試驗證: 104 passed, 13 skipped (所有 Redis-independent 測試通過) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,26 +1,15 @@
|
||||
"""
|
||||
Incident Engine v1.2 - Phase 6.4e DualMemory 整合版
|
||||
====================================================
|
||||
Incident Engine - Phase 16 lewooogo-brain 整合版
|
||||
================================================
|
||||
|
||||
v1.2 重構內容 (Phase 6.4e):
|
||||
- 整合 DualIncidentMemory 進行 DB 持久化
|
||||
- 保持 Lua 原子操作進行 Redis Working Memory 更新
|
||||
- 支援從 Episodic Memory (PostgreSQL) 回載 Incident
|
||||
Phase R-R2 (2026-04-01 ogt): 移除內嵌 IncidentEngine 重複邏輯,
|
||||
全面切換至 lewooogo-brain IncidentEngine。
|
||||
完整舊版本歸檔: src/services/_archived/incident_engine_v1.py
|
||||
|
||||
v1.1 重構內容 (2026-03-22 架構師審查後修正):
|
||||
1. O(1) 反向索引: 廢除 SCAN,改用 namespace/target 索引直查
|
||||
2. Lua 原子操作: 廢除 Read-Modify-Write,改用 Redis Lua Script
|
||||
3. 併發防護: 確保告警風暴下不會發生 Race Condition
|
||||
|
||||
功能:
|
||||
1. 事件聚合 (Alert Aggregation): 將相關告警聚合到同一個 Incident
|
||||
2. 爆炸半徑分析 (Blast Radius): 透過 GraphRAG 分析受影響服務
|
||||
3. 智能去重 (Deduplication): 避免重複告警造成 Incident 爆炸
|
||||
|
||||
設計原則:
|
||||
- 30 分鐘時間窗口: 超過此時間的 Incident 視為新事件
|
||||
- 關聯判斷: 同 namespace 或同 target 視為相關
|
||||
- 狀態過濾: 只聚合 INVESTIGATING 或 MITIGATING 狀態的事件
|
||||
架構:
|
||||
- IncidentMemoryAdapter: 橋接現有 Lua Scripts + DualIncidentMemory
|
||||
- BlastRadiusAdapter: 包裝 topology_graph 注入 lewooogo-brain
|
||||
- get_incident_engine(): 統一入口 (Singleton)
|
||||
|
||||
統帥鐵律:
|
||||
- 禁止告警風暴: 相關告警必須聚合,減少 Incident 數量
|
||||
@@ -28,20 +17,14 @@ v1.1 重構內容 (2026-03-22 架構師審查後修正):
|
||||
- 禁止 Race Condition: 所有寫入必須原子操作
|
||||
"""
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
import structlog
|
||||
|
||||
from src.core.redis_client import get_redis
|
||||
from src.models.incident import (
|
||||
Incident,
|
||||
Severity,
|
||||
Signal,
|
||||
)
|
||||
from src.models.incident import Incident
|
||||
from src.services.graph_rag import BlastRadiusResult, topology_graph
|
||||
from src.services.incident_memory import DualIncidentMemory, get_incident_memory
|
||||
from src.services.incident_memory import get_incident_memory
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -52,7 +35,7 @@ logger = structlog.get_logger(__name__)
|
||||
|
||||
# Redis Key Patterns
|
||||
INCIDENT_KEY_PREFIX = "incident:"
|
||||
INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id
|
||||
INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id
|
||||
INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id
|
||||
|
||||
# 聚合時間窗口: 30 分鐘
|
||||
@@ -64,7 +47,7 @@ WORKING_MEMORY_TTL = 604800
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Lua Scripts (原子操作)
|
||||
# Lua Scripts (原子操作) - 供 IncidentMemoryAdapter 使用
|
||||
# =============================================================================
|
||||
|
||||
# Lua Script: 原子聚合 Signal 到 Incident
|
||||
@@ -274,415 +257,6 @@ class IIncidentEngine(Protocol):
|
||||
...
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Incident Engine v1.1
|
||||
# =============================================================================
|
||||
|
||||
class IncidentEngine:
|
||||
"""
|
||||
事件引擎 v1.1 - 認知覺醒核心 (效能強化版)
|
||||
|
||||
職責:
|
||||
1. 聚合相關告警到同一 Incident (減少噪音)
|
||||
2. 整合 GraphRAG 分析爆炸半徑
|
||||
3. 雙層持久化 (Redis + SQLite/PG)
|
||||
|
||||
v1.1 重構:
|
||||
- O(1) 反向索引取代 O(N) SCAN
|
||||
- Lua 原子操作取代 Read-Modify-Write
|
||||
- 完全消除 Race Condition
|
||||
|
||||
使用方式:
|
||||
engine = IncidentEngine()
|
||||
incident = await engine.process_signal(signal_data)
|
||||
"""
|
||||
|
||||
def __init__(self, memory: DualIncidentMemory | None = None) -> None:
|
||||
"""
|
||||
初始化 IncidentEngine
|
||||
|
||||
Args:
|
||||
memory: DualIncidentMemory 實例 (可選,預設使用 Singleton)
|
||||
"""
|
||||
self._graph = topology_graph
|
||||
self._memory = memory or get_incident_memory()
|
||||
self._lua_aggregate_sha: str | None = None
|
||||
self._lua_create_sha: str | None = None
|
||||
|
||||
# =========================================================================
|
||||
# Lua Script 初始化
|
||||
# =========================================================================
|
||||
|
||||
async def _ensure_lua_scripts(self) -> None:
|
||||
"""確保 Lua Scripts 已載入 Redis (SCRIPT LOAD)"""
|
||||
if self._lua_aggregate_sha and self._lua_create_sha:
|
||||
return
|
||||
|
||||
redis_client = get_redis()
|
||||
|
||||
# Load aggregate script (for existing incident updates)
|
||||
self._lua_aggregate_sha = await redis_client.script_load(
|
||||
LUA_AGGREGATE_SIGNAL
|
||||
)
|
||||
logger.debug(
|
||||
"lua_script_loaded",
|
||||
script="aggregate_signal",
|
||||
sha=self._lua_aggregate_sha,
|
||||
)
|
||||
|
||||
# Load unified create-or-aggregate script
|
||||
self._lua_create_sha = await redis_client.script_load(
|
||||
LUA_CREATE_OR_AGGREGATE
|
||||
)
|
||||
logger.debug(
|
||||
"lua_script_loaded",
|
||||
script="create_or_aggregate",
|
||||
sha=self._lua_create_sha,
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# 核心方法: 處理 Signal
|
||||
# =========================================================================
|
||||
|
||||
async def process_signal(
|
||||
self,
|
||||
signal_data: dict[str, Any],
|
||||
) -> Incident | None:
|
||||
"""
|
||||
處理 Signal: 原子建立或聚合 Incident
|
||||
|
||||
Phase 6.3 核心邏輯 (v1.1 重構):
|
||||
1. 解析 Signal
|
||||
2. 單一 Lua Script 原子操作: 建立或聚合 (完全消除 Race Condition)
|
||||
3. 調用 GraphRAG 分析爆炸半徑
|
||||
4. 雙層持久化
|
||||
|
||||
Args:
|
||||
signal_data: 從 Redis Stream 收到的 Signal 資料
|
||||
|
||||
Returns:
|
||||
Incident | None: 處理後的 Incident
|
||||
"""
|
||||
try:
|
||||
# 確保 Lua Scripts 已載入
|
||||
await self._ensure_lua_scripts()
|
||||
|
||||
# 1. 解析 Signal
|
||||
signal = self._parse_signal(signal_data)
|
||||
namespace = signal_data.get("namespace", "default")
|
||||
target = signal_data.get("target", "unknown")
|
||||
|
||||
# 在 labels 中加入 namespace
|
||||
signal.labels["namespace"] = namespace
|
||||
|
||||
logger.info(
|
||||
"signal_processing",
|
||||
alert_name=signal.alert_name,
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
)
|
||||
|
||||
# 2. 單一 Lua Script 原子操作: 建立或聚合
|
||||
incident = await self._atomic_create_or_aggregate(
|
||||
signal=signal,
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
)
|
||||
|
||||
if not incident:
|
||||
logger.error(
|
||||
"atomic_operation_failed",
|
||||
alert_name=signal.alert_name,
|
||||
namespace=namespace,
|
||||
)
|
||||
return None
|
||||
|
||||
# 3. GraphRAG 分析爆炸半徑
|
||||
await self._analyze_blast_radius(incident, target)
|
||||
|
||||
# 4. 雙層持久化 (DB 層)
|
||||
await self._persist_to_db(incident)
|
||||
|
||||
return incident
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"process_signal_error",
|
||||
error=str(e),
|
||||
)
|
||||
return None
|
||||
|
||||
# =========================================================================
|
||||
# 原子建立或聚合 (單一 Lua Script - 完全消除 Race Condition)
|
||||
# =========================================================================
|
||||
|
||||
async def _atomic_create_or_aggregate(
|
||||
self,
|
||||
signal: Signal,
|
||||
namespace: str,
|
||||
target: str,
|
||||
) -> Incident | None:
|
||||
"""
|
||||
使用單一 Lua Script 原子建立或聚合 Incident
|
||||
|
||||
核心設計:
|
||||
1. 使用 SETNX 搶佔索引作為分散式鎖
|
||||
2. 如果搶到 → 建立新 Incident
|
||||
3. 如果沒搶到 → 聚合到已存在的 Incident
|
||||
4. 整個流程在 Lua 中原子執行
|
||||
|
||||
優點:
|
||||
- 完全消除 Race Condition
|
||||
- 單次 Redis 往返完成所有操作
|
||||
- 無論多少併發 Signal,同一 namespace/target 只會有一個 Incident
|
||||
"""
|
||||
redis_client = get_redis()
|
||||
|
||||
# Redis Keys
|
||||
ns_index_key = f"{INCIDENT_INDEX_NS}{namespace}"
|
||||
target_index_key = f"{INCIDENT_INDEX_TARGET}{target}"
|
||||
|
||||
# 準備新 Incident (如果需要建立)
|
||||
new_incident = Incident(
|
||||
severity=signal.severity,
|
||||
signals=[signal],
|
||||
affected_services=[target],
|
||||
)
|
||||
new_incident_json = new_incident.model_dump_json()
|
||||
|
||||
# Signal 參數
|
||||
signal_json = signal.model_dump_json()
|
||||
severity_str = signal.severity.value
|
||||
timestamp_str = datetime.now(UTC).isoformat()
|
||||
|
||||
try:
|
||||
# 執行統一 Lua Script (原子操作)
|
||||
result = await redis_client.evalsha(
|
||||
self._lua_create_sha,
|
||||
2, # number of keys
|
||||
ns_index_key, # KEYS[1]
|
||||
target_index_key, # KEYS[2]
|
||||
new_incident_json, # ARGV[1] - new incident JSON
|
||||
new_incident.incident_id, # ARGV[2] - new incident ID
|
||||
signal_json, # ARGV[3] - new signal JSON
|
||||
severity_str, # ARGV[4] - severity
|
||||
timestamp_str, # ARGV[5] - timestamp
|
||||
str(WORKING_MEMORY_TTL), # ARGV[6] - incident TTL
|
||||
str(AGGREGATION_WINDOW_SECONDS), # ARGV[7] - index TTL
|
||||
INCIDENT_KEY_PREFIX, # ARGV[8] - key prefix
|
||||
)
|
||||
|
||||
if not result:
|
||||
logger.error(
|
||||
"lua_script_returned_nil",
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
)
|
||||
return None
|
||||
|
||||
# 解析結果
|
||||
result_str = result.decode() if isinstance(result, bytes) else result
|
||||
|
||||
if result_str.startswith("CREATED:"):
|
||||
incident_json = result_str[8:] # 移除 "CREATED:" 前綴
|
||||
incident = self._parse_lua_incident(incident_json)
|
||||
logger.info(
|
||||
"incident_created_atomic",
|
||||
incident_id=incident.incident_id,
|
||||
severity=incident.severity.value,
|
||||
namespace=namespace,
|
||||
signal_count=1,
|
||||
)
|
||||
return incident
|
||||
|
||||
elif result_str.startswith("AGGREGATED:"):
|
||||
incident_json = result_str[11:] # 移除 "AGGREGATED:" 前綴
|
||||
incident = self._parse_lua_incident(incident_json)
|
||||
logger.info(
|
||||
"signal_aggregated_atomic",
|
||||
incident_id=incident.incident_id,
|
||||
severity=incident.severity.value,
|
||||
namespace=namespace,
|
||||
signal_count=len(incident.signals),
|
||||
)
|
||||
return incident
|
||||
|
||||
else:
|
||||
logger.error(
|
||||
"lua_script_unexpected_result",
|
||||
result=result_str[:100],
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"atomic_create_or_aggregate_error",
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
error=str(e),
|
||||
)
|
||||
return None
|
||||
|
||||
# =========================================================================
|
||||
# GraphRAG 整合
|
||||
# =========================================================================
|
||||
|
||||
async def _analyze_blast_radius(
|
||||
self,
|
||||
incident: Incident,
|
||||
target: str,
|
||||
) -> None:
|
||||
"""
|
||||
調用 GraphRAG 分析爆炸半徑
|
||||
|
||||
將結果寫入 incident.affected_services
|
||||
"""
|
||||
try:
|
||||
result: BlastRadiusResult = self._graph.get_blast_radius(target)
|
||||
|
||||
# 合併 affected_services (去重)
|
||||
for service in result.affected_services:
|
||||
if service not in incident.affected_services:
|
||||
incident.affected_services.append(service)
|
||||
|
||||
# 確保 target 本身在列表中
|
||||
if target not in incident.affected_services:
|
||||
incident.affected_services.append(target)
|
||||
|
||||
logger.info(
|
||||
"blast_radius_analyzed",
|
||||
incident_id=incident.incident_id,
|
||||
target=target,
|
||||
affected_count=result.affected_count,
|
||||
affected_services=incident.affected_services,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"blast_radius_analysis_failed",
|
||||
incident_id=incident.incident_id,
|
||||
target=target,
|
||||
error=str(e),
|
||||
)
|
||||
# 失敗時至少保留 target
|
||||
if target not in incident.affected_services:
|
||||
incident.affected_services.append(target)
|
||||
|
||||
# =========================================================================
|
||||
# 持久化 (DB 層) - Phase 6.4e: 委託給 DualIncidentMemory
|
||||
# =========================================================================
|
||||
|
||||
async def _persist_to_db(self, incident: Incident) -> None:
|
||||
"""
|
||||
持久化到 PostgreSQL (Episodic Memory)
|
||||
|
||||
Phase 6.4e: 委託給 DualIncidentMemory.persist_incident()
|
||||
Redis 已在 Lua Script 中更新,這裡只處理 DB
|
||||
"""
|
||||
try:
|
||||
success = await self._memory.persist_incident(incident)
|
||||
incident.persisted_to_pg = success
|
||||
|
||||
if success:
|
||||
logger.debug(
|
||||
"db_persisted_via_dual_memory",
|
||||
incident_id=incident.incident_id,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"db_persist_failed_via_dual_memory",
|
||||
incident_id=incident.incident_id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("db_save_error", error=str(e))
|
||||
|
||||
# =========================================================================
|
||||
# 從 Episodic Memory 載入 (Phase 6.4e 新增)
|
||||
# =========================================================================
|
||||
|
||||
async def get_incident(self, incident_id: str) -> Incident | None:
|
||||
"""
|
||||
取得 Incident
|
||||
|
||||
Phase 6.4e: 委託給 DualIncidentMemory.load_incident()
|
||||
優先從 Working Memory (Redis) 讀取,miss 時從 Episodic (PostgreSQL) 讀取
|
||||
|
||||
Args:
|
||||
incident_id: Incident ID
|
||||
|
||||
Returns:
|
||||
Incident 或 None
|
||||
"""
|
||||
return await self._memory.load_incident(incident_id)
|
||||
|
||||
# =========================================================================
|
||||
# 輔助方法
|
||||
# =========================================================================
|
||||
|
||||
def _parse_lua_incident(self, incident_json: str) -> Incident:
|
||||
"""
|
||||
解析 Lua 返回的 Incident JSON
|
||||
|
||||
修復 Lua cjson 的問題:
|
||||
- cjson.encode 會把空陣列 [] 轉成空物件 {}
|
||||
- 需要手動修復陣列欄位
|
||||
"""
|
||||
data = json.loads(incident_json)
|
||||
|
||||
# 修復可能被轉成空物件的陣列欄位
|
||||
array_fields = ["signals", "affected_services", "proposal_ids"]
|
||||
for field in array_fields:
|
||||
is_empty_dict = isinstance(data[field], dict) and len(data[field]) == 0
|
||||
if field in data and is_empty_dict:
|
||||
data[field] = []
|
||||
|
||||
return Incident.model_validate(data)
|
||||
|
||||
def _parse_signal(self, signal_data: dict[str, Any]) -> Signal:
|
||||
"""解析 Signal"""
|
||||
return Signal(
|
||||
alert_name=signal_data.get("alert_name", "unknown"),
|
||||
severity=self._parse_severity(signal_data.get("severity", "warning")),
|
||||
source=self._parse_source(signal_data.get("source", "manual")),
|
||||
fired_at=datetime.now(UTC),
|
||||
labels=self._parse_dict(signal_data.get("labels", "{}")),
|
||||
annotations=self._parse_dict(signal_data.get("annotations", "{}")),
|
||||
fingerprint=signal_data.get("fingerprint"),
|
||||
)
|
||||
|
||||
def _parse_source(self, source_str: str) -> str:
|
||||
"""解析來源"""
|
||||
valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"}
|
||||
if source_str.lower() in valid_sources:
|
||||
return source_str.lower()
|
||||
return "manual"
|
||||
|
||||
def _parse_severity(self, severity_str: str) -> Severity:
|
||||
"""解析嚴重度"""
|
||||
mapping = {
|
||||
"critical": Severity.P0,
|
||||
"high": Severity.P1,
|
||||
"warning": Severity.P2,
|
||||
"medium": Severity.P2,
|
||||
"low": Severity.P3,
|
||||
"info": Severity.P3,
|
||||
}
|
||||
return mapping.get(severity_str.lower(), Severity.P2)
|
||||
|
||||
def _parse_dict(self, value: str | dict) -> dict[str, str]:
|
||||
"""解析字典"""
|
||||
if isinstance(value, dict):
|
||||
return {str(k): str(v) for k, v in value.items()}
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
parsed = json.loads(value.replace("'", '"'))
|
||||
return {str(k): str(v) for k, v in parsed.items()}
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return {}
|
||||
return {}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Phase 16: 絞殺者模式 - Adapter 實作
|
||||
# =============================================================================
|
||||
@@ -698,7 +272,7 @@ class IncidentMemoryAdapter:
|
||||
建立者: Claude Code
|
||||
"""
|
||||
|
||||
def __init__(self, memory: DualIncidentMemory) -> None:
|
||||
def __init__(self, memory: Any) -> None:
|
||||
self._memory = memory
|
||||
self._lua_create_sha: str | None = None
|
||||
self._lua_aggregate_sha: str | None = None
|
||||
@@ -804,20 +378,23 @@ class BlastRadiusAdapter:
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Singleton + 絞殺者模式切換
|
||||
# Singleton (Phase R-R2: 僅保留 lewooogo-brain 版本)
|
||||
# =============================================================================
|
||||
|
||||
_incident_engine: IncidentEngine | None = None
|
||||
_new_incident_engine = None # Type: lewooogo_brain IncidentEngine
|
||||
_new_incident_engine: Any | None = None
|
||||
|
||||
|
||||
def _get_new_engine():
|
||||
"""取得 lewooogo-brain 的 IncidentEngine (Phase 16 新版)"""
|
||||
def get_incident_engine() -> Any:
|
||||
"""
|
||||
取得 Incident Engine 實例 (Singleton)
|
||||
|
||||
Phase R-R2: 統一使用 lewooogo-brain IncidentEngine。
|
||||
回滾方式: git revert Phase R-R2 commit + redeploy。
|
||||
"""
|
||||
global _new_incident_engine
|
||||
if _new_incident_engine is None:
|
||||
from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine
|
||||
|
||||
# 建立 Adapters
|
||||
memory_adapter = IncidentMemoryAdapter(get_incident_memory())
|
||||
blast_adapter = BlastRadiusAdapter()
|
||||
|
||||
@@ -826,33 +403,5 @@ def _get_new_engine():
|
||||
blast_analyzer=blast_adapter,
|
||||
logger=logger,
|
||||
)
|
||||
logger.info("new_incident_engine_initialized", version="lewooogo-brain")
|
||||
logger.info("incident_engine_initialized", version="lewooogo-brain")
|
||||
return _new_incident_engine
|
||||
|
||||
|
||||
def _get_legacy_engine() -> IncidentEngine:
|
||||
"""取得舊版 IncidentEngine"""
|
||||
global _incident_engine
|
||||
if _incident_engine is None:
|
||||
_incident_engine = IncidentEngine()
|
||||
return _incident_engine
|
||||
|
||||
|
||||
def get_incident_engine():
|
||||
"""
|
||||
取得 Incident Engine 實例 (Singleton + 絞殺者模式)
|
||||
|
||||
Phase 16: 根據 USE_NEW_ENGINE 設定切換引擎
|
||||
- False (預設): 使用內嵌版 IncidentEngine
|
||||
- True: 使用 lewooogo-brain 的 IncidentEngine
|
||||
|
||||
回滾方式:
|
||||
kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false
|
||||
"""
|
||||
from src.core.config import settings
|
||||
|
||||
if settings.USE_NEW_ENGINE:
|
||||
logger.debug("using_new_incident_engine", version="lewooogo-brain")
|
||||
return _get_new_engine()
|
||||
else:
|
||||
return _get_legacy_engine()
|
||||
|
||||
@@ -3,9 +3,11 @@ Incident Memory Provider - 事件記憶體提供者
|
||||
============================================
|
||||
Phase 6.4e: DualIncidentMemory 整合
|
||||
Phase 16 R1.2: 絞殺者模式 (Strangler Fig Pattern) 2026-03-26
|
||||
Phase R-R2 (2026-04-01 ogt): 移除內嵌 DualIncidentMemory 重複邏輯,
|
||||
全面切換至 lewooogo-brain。回滾方式: git revert + redeploy。
|
||||
|
||||
設計:
|
||||
- 實作 IIncidentMemory 協定 (Protocol)
|
||||
- IncidentDbAdapter: SQLAlchemy Bridge,注入 lewooogo-brain DualIncidentMemory
|
||||
- 雙層記憶體: Working (Redis) + Episodic (PostgreSQL)
|
||||
- 反向索引: namespace:target -> incident_id
|
||||
|
||||
@@ -13,19 +15,9 @@ Phase 16 R1.2: 絞殺者模式 (Strangler Fig Pattern) 2026-03-26
|
||||
- Working Memory (Redis): 7 天 TTL
|
||||
- Episodic Memory (PostgreSQL): 永久
|
||||
- 反向索引: 30 分鐘 TTL (聚合窗口)
|
||||
|
||||
Phase 16 絞殺者模式:
|
||||
- USE_NEW_ENGINE=False: 使用此內嵌版本 (當前預設)
|
||||
- USE_NEW_ENGINE=True: 使用 lewooogo-brain 套件版本
|
||||
- 回滾指令: kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false
|
||||
- 監控週期: 48 小時驗證期
|
||||
|
||||
NOTE: 此模組為 lewooogo-brain/adapters/incident_memory.py 的 apps/api 內嵌版本
|
||||
Phase 16 R2 階段會在 48 小時驗證通過後移除此檔案
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any, Protocol
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
@@ -37,444 +29,6 @@ from src.models.incident import Incident
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Constants
|
||||
# =============================================================================
|
||||
|
||||
WORKING_MEMORY_TTL = 604800 # 7 天
|
||||
AGGREGATION_WINDOW_MINUTES = 30
|
||||
INDEX_TTL = 1800 # 索引 30 分鐘 TTL
|
||||
|
||||
# Redis Key Patterns
|
||||
INCIDENT_KEY_PREFIX = "awoooi:incidents:"
|
||||
INDEX_PREFIX = "awoooi:incidents:index:"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Protocol Definition (與 lewooogo-brain 保持一致)
|
||||
# =============================================================================
|
||||
|
||||
class IIncidentMemory(Protocol):
|
||||
"""Incident 專用記憶體提供者協定"""
|
||||
|
||||
async def load_incident(self, incident_id: str) -> Incident | None:
|
||||
"""從 Working Memory 載入 Incident"""
|
||||
...
|
||||
|
||||
async def save_incident(self, incident: Incident, ttl_seconds: int = WORKING_MEMORY_TTL) -> bool:
|
||||
"""儲存 Incident 到 Working Memory (預設 7 天 TTL)"""
|
||||
...
|
||||
|
||||
async def persist_incident(self, incident: Incident) -> bool:
|
||||
"""持久化到 Episodic Memory (PostgreSQL)"""
|
||||
...
|
||||
|
||||
async def find_related_incident(
|
||||
self,
|
||||
namespace: str,
|
||||
target: str,
|
||||
window_minutes: int = AGGREGATION_WINDOW_MINUTES,
|
||||
) -> Incident | None:
|
||||
"""尋找相關的活躍 Incident (用於聚合)"""
|
||||
...
|
||||
|
||||
async def update_index(
|
||||
self,
|
||||
incident_id: str,
|
||||
namespace: str,
|
||||
target: str,
|
||||
) -> bool:
|
||||
"""更新反向索引 (namespace/target -> incident_id)"""
|
||||
...
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# DualIncidentMemory Implementation
|
||||
# =============================================================================
|
||||
|
||||
class DualIncidentMemory:
|
||||
"""
|
||||
Incident 專用雙層記憶體適配器
|
||||
|
||||
實作 IIncidentMemory 協定:
|
||||
- load_incident: 從 Working/Episodic 載入
|
||||
- save_incident: 儲存到 Working
|
||||
- persist_incident: 持久化到 Episodic
|
||||
- find_related_incident: 透過反向索引尋找相關 Incident
|
||||
- update_index: 更新反向索引
|
||||
|
||||
反向索引結構:
|
||||
Key: awoooi:incidents:index:{namespace}:{target}
|
||||
Value: incident_id
|
||||
TTL: 30 分鐘 (聚合窗口)
|
||||
"""
|
||||
|
||||
def __init__(self, redis_client: Any = None, key_prefix: str = INCIDENT_KEY_PREFIX):
|
||||
"""
|
||||
初始化適配器
|
||||
|
||||
Args:
|
||||
redis_client: Redis 連線客戶端 (可選,預設使用 get_redis())
|
||||
key_prefix: Redis Key 前綴
|
||||
"""
|
||||
self._redis = redis_client
|
||||
self._key_prefix = key_prefix
|
||||
self._index_prefix = INDEX_PREFIX
|
||||
|
||||
def _get_redis(self) -> Any:
|
||||
"""取得 Redis 客戶端 (延遲初始化)"""
|
||||
if self._redis is None:
|
||||
self._redis = get_redis()
|
||||
return self._redis
|
||||
|
||||
def _make_key(self, incident_id: str) -> str:
|
||||
"""生成 Incident Key"""
|
||||
return f"{self._key_prefix}{incident_id}"
|
||||
|
||||
def _make_index_key(self, namespace: str, target: str) -> str:
|
||||
"""生成索引 Key"""
|
||||
return f"{self._index_prefix}{namespace}:{target}"
|
||||
|
||||
async def load_incident(self, incident_id: str) -> Incident | None:
|
||||
"""
|
||||
載入 Incident
|
||||
|
||||
策略:
|
||||
1. 從 Redis (Working Memory) 讀取
|
||||
2. 若 miss,從 PostgreSQL (Episodic) 讀取
|
||||
|
||||
Args:
|
||||
incident_id: Incident ID
|
||||
|
||||
Returns:
|
||||
Incident 或 None
|
||||
"""
|
||||
try:
|
||||
redis_client = self._get_redis()
|
||||
key = self._make_key(incident_id)
|
||||
data = await redis_client.get(key)
|
||||
|
||||
if data is not None:
|
||||
# JSON -> Incident
|
||||
return Incident.model_validate_json(data)
|
||||
|
||||
# Working Memory miss, 嘗試從 Episodic Memory 載入
|
||||
logger.debug("incident_not_found_in_working", incident_id=incident_id)
|
||||
|
||||
async with get_db_context() as db:
|
||||
from sqlalchemy import select
|
||||
stmt = select(IncidentRecord).where(
|
||||
IncidentRecord.incident_id == incident_id
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
record = result.scalar_one_or_none()
|
||||
|
||||
if record:
|
||||
# 從 DB 重建 Incident
|
||||
incident = self._record_to_incident(record)
|
||||
# 寫回 Working Memory (快取)
|
||||
await self.save_incident(incident)
|
||||
return incident
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error("load_incident_failed", incident_id=incident_id, error=str(e))
|
||||
return None
|
||||
|
||||
async def save_incident(
|
||||
self,
|
||||
incident: Incident,
|
||||
ttl_seconds: int = WORKING_MEMORY_TTL,
|
||||
) -> bool:
|
||||
"""
|
||||
儲存 Incident 到 Working Memory (Redis)
|
||||
|
||||
Args:
|
||||
incident: Incident 物件
|
||||
ttl_seconds: TTL (預設 7 天)
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
try:
|
||||
redis_client = self._get_redis()
|
||||
key = self._make_key(incident.incident_id)
|
||||
json_data = incident.model_dump_json()
|
||||
|
||||
await redis_client.setex(key, ttl_seconds, json_data)
|
||||
|
||||
logger.debug(
|
||||
"incident_saved_to_working",
|
||||
incident_id=incident.incident_id,
|
||||
ttl=ttl_seconds,
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"save_incident_failed",
|
||||
incident_id=incident.incident_id,
|
||||
error=str(e),
|
||||
)
|
||||
return False
|
||||
|
||||
async def persist_incident(self, incident: Incident) -> bool:
|
||||
"""
|
||||
持久化到 Episodic Memory (PostgreSQL)
|
||||
|
||||
Args:
|
||||
incident: Incident 物件
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
from sqlalchemy import select
|
||||
|
||||
# 檢查是否已存在
|
||||
stmt = select(IncidentRecord).where(
|
||||
IncidentRecord.incident_id == incident.incident_id
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
existing = result.scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
# 更新現有記錄
|
||||
existing.status = incident.status.value
|
||||
existing.severity = incident.severity.value
|
||||
existing.signals = [
|
||||
s.model_dump(mode="json") for s in incident.signals
|
||||
]
|
||||
existing.affected_services = incident.affected_services
|
||||
existing.updated_at = incident.updated_at
|
||||
if incident.resolved_at:
|
||||
existing.resolved_at = incident.resolved_at
|
||||
if incident.closed_at:
|
||||
existing.closed_at = incident.closed_at
|
||||
else:
|
||||
# 建立新記錄
|
||||
record = IncidentRecord(
|
||||
incident_id=incident.incident_id,
|
||||
status=incident.status.value,
|
||||
severity=incident.severity.value,
|
||||
signals=[
|
||||
s.model_dump(mode="json") for s in incident.signals
|
||||
],
|
||||
affected_services=incident.affected_services,
|
||||
decision_chain=(
|
||||
incident.decision_chain.model_dump(mode="json")
|
||||
if incident.decision_chain
|
||||
else None
|
||||
),
|
||||
proposal_ids=[str(pid) for pid in incident.proposal_ids],
|
||||
outcome=(
|
||||
incident.outcome.model_dump(mode="json")
|
||||
if incident.outcome
|
||||
else None
|
||||
),
|
||||
created_at=incident.created_at,
|
||||
updated_at=incident.updated_at,
|
||||
resolved_at=incident.resolved_at,
|
||||
closed_at=incident.closed_at,
|
||||
ttl_days=incident.ttl_days,
|
||||
vectorized=incident.vectorized,
|
||||
)
|
||||
db.add(record)
|
||||
|
||||
logger.debug(
|
||||
"incident_persisted_to_episodic",
|
||||
incident_id=incident.incident_id,
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"persist_incident_failed",
|
||||
incident_id=incident.incident_id,
|
||||
error=str(e),
|
||||
)
|
||||
return False
|
||||
|
||||
async def find_related_incident(
|
||||
self,
|
||||
namespace: str,
|
||||
target: str,
|
||||
window_minutes: int = AGGREGATION_WINDOW_MINUTES,
|
||||
) -> Incident | None:
|
||||
"""
|
||||
尋找相關的活躍 Incident (用於聚合)
|
||||
|
||||
透過反向索引快速查找:
|
||||
1. 查詢索引 Key: namespace:target -> incident_id
|
||||
2. 載入 Incident
|
||||
3. 檢查是否仍在聚合窗口內
|
||||
|
||||
Args:
|
||||
namespace: 命名空間
|
||||
target: 目標服務
|
||||
window_minutes: 聚合窗口 (分鐘)
|
||||
|
||||
Returns:
|
||||
相關 Incident 或 None
|
||||
"""
|
||||
try:
|
||||
redis_client = self._get_redis()
|
||||
|
||||
# Step 1: 查詢索引
|
||||
index_key = self._make_index_key(namespace, target)
|
||||
incident_id = await redis_client.get(index_key)
|
||||
|
||||
if incident_id is None:
|
||||
return None
|
||||
|
||||
# 解碼 bytes
|
||||
if isinstance(incident_id, bytes):
|
||||
incident_id = incident_id.decode()
|
||||
|
||||
# Step 2: 載入 Incident
|
||||
incident = await self.load_incident(incident_id)
|
||||
if incident is None:
|
||||
# 索引存在但 Incident 不存在,清除索引
|
||||
await redis_client.delete(index_key)
|
||||
return None
|
||||
|
||||
# Step 3: 檢查聚合窗口
|
||||
window_start = datetime.now(UTC) - timedelta(minutes=window_minutes)
|
||||
if incident.updated_at < window_start:
|
||||
# 超出聚合窗口,不聚合
|
||||
logger.debug(
|
||||
"incident_outside_window",
|
||||
incident_id=incident_id,
|
||||
updated_at=incident.updated_at.isoformat(),
|
||||
)
|
||||
return None
|
||||
|
||||
logger.debug(
|
||||
"found_related_incident",
|
||||
incident_id=incident_id,
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
)
|
||||
return incident
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"find_related_incident_failed",
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
error=str(e),
|
||||
)
|
||||
return None
|
||||
|
||||
async def update_index(
|
||||
self,
|
||||
incident_id: str,
|
||||
namespace: str,
|
||||
target: str,
|
||||
) -> bool:
|
||||
"""
|
||||
更新反向索引
|
||||
|
||||
索引結構:
|
||||
Key: awoooi:incidents:index:{namespace}:{target}
|
||||
Value: incident_id
|
||||
TTL: 30 分鐘
|
||||
|
||||
Args:
|
||||
incident_id: Incident ID
|
||||
namespace: 命名空間
|
||||
target: 目標服務
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
try:
|
||||
redis_client = self._get_redis()
|
||||
index_key = self._make_index_key(namespace, target)
|
||||
await redis_client.setex(index_key, INDEX_TTL, incident_id)
|
||||
|
||||
logger.debug(
|
||||
"index_updated",
|
||||
incident_id=incident_id,
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
ttl=INDEX_TTL,
|
||||
)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"update_index_failed",
|
||||
incident_id=incident_id,
|
||||
namespace=namespace,
|
||||
target=target,
|
||||
error=str(e),
|
||||
)
|
||||
return False
|
||||
|
||||
async def delete_incident(self, incident_id: str) -> bool:
|
||||
"""
|
||||
刪除 Incident
|
||||
|
||||
Args:
|
||||
incident_id: Incident ID
|
||||
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
try:
|
||||
redis_client = self._get_redis()
|
||||
key = self._make_key(incident_id)
|
||||
result = await redis_client.delete(key)
|
||||
return result > 0
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"delete_incident_failed",
|
||||
incident_id=incident_id,
|
||||
error=str(e),
|
||||
)
|
||||
return False
|
||||
|
||||
def _record_to_incident(self, record: IncidentRecord) -> Incident:
|
||||
"""
|
||||
將 DB Record 轉換為 Incident 物件
|
||||
|
||||
Args:
|
||||
record: IncidentRecord
|
||||
|
||||
Returns:
|
||||
Incident
|
||||
"""
|
||||
from src.models.incident import (
|
||||
IncidentStatus,
|
||||
Severity,
|
||||
Signal,
|
||||
)
|
||||
|
||||
# 重建 Signals
|
||||
signals = []
|
||||
for s in record.signals or []:
|
||||
signals.append(Signal.model_validate(s))
|
||||
|
||||
return Incident(
|
||||
incident_id=record.incident_id,
|
||||
status=IncidentStatus(record.status),
|
||||
severity=Severity(record.severity),
|
||||
signals=signals,
|
||||
affected_services=record.affected_services or [],
|
||||
proposal_ids=record.proposal_ids or [],
|
||||
created_at=record.created_at,
|
||||
updated_at=record.updated_at,
|
||||
resolved_at=record.resolved_at,
|
||||
closed_at=record.closed_at,
|
||||
ttl_days=record.ttl_days or 30,
|
||||
vectorized=record.vectorized or False,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Phase 16: IncidentDbAdapter (DI 注入實現)
|
||||
# =============================================================================
|
||||
@@ -512,7 +66,6 @@ class IncidentDbAdapter:
|
||||
async with get_db_context() as db:
|
||||
from sqlalchemy import select
|
||||
|
||||
# 檢查是否已存在
|
||||
stmt = select(IncidentRecord).where(
|
||||
IncidentRecord.incident_id == incident.incident_id
|
||||
)
|
||||
@@ -520,7 +73,6 @@ class IncidentDbAdapter:
|
||||
existing = result.scalar_one_or_none()
|
||||
|
||||
if existing:
|
||||
# 更新現有記錄
|
||||
existing.status = incident.status.value
|
||||
existing.severity = incident.severity.value
|
||||
existing.signals = [
|
||||
@@ -533,7 +85,6 @@ class IncidentDbAdapter:
|
||||
if incident.closed_at:
|
||||
existing.closed_at = incident.closed_at
|
||||
else:
|
||||
# 建立新記錄
|
||||
record = IncidentRecord(
|
||||
incident_id=incident.incident_id,
|
||||
status=incident.status.value,
|
||||
@@ -571,7 +122,6 @@ class IncidentDbAdapter:
|
||||
|
||||
def _record_to_incident(self, record: IncidentRecord) -> Incident:
|
||||
"""將 DB Record 轉換為 Incident (lewooogo-brain 版本)"""
|
||||
# 延遲導入 lewooogo-brain 的 Incident
|
||||
from lewooogo_brain.interfaces.incident_processor import (
|
||||
Incident as BrainIncident,
|
||||
)
|
||||
@@ -585,7 +135,6 @@ class IncidentDbAdapter:
|
||||
Signal as BrainSignal,
|
||||
)
|
||||
|
||||
# 重建 Signals
|
||||
signals = []
|
||||
for s in record.signals or []:
|
||||
signals.append(BrainSignal.model_validate(s))
|
||||
@@ -605,70 +154,44 @@ class IncidentDbAdapter:
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Singleton + 絞殺者模式 (Phase 16 R1.2)
|
||||
# Singleton (Phase R-R2: 僅保留 lewooogo-brain 版本)
|
||||
# =============================================================================
|
||||
|
||||
_dual_memory: DualIncidentMemory | None = None
|
||||
_new_engine_memory: Any | None = None # lewooogo-brain 版本
|
||||
_db_adapter: IncidentDbAdapter | None = None # DB Adapter singleton
|
||||
_new_engine_memory: Any | None = None
|
||||
_db_adapter: IncidentDbAdapter | None = None
|
||||
|
||||
|
||||
def get_incident_memory() -> DualIncidentMemory:
|
||||
def get_incident_memory() -> Any:
|
||||
"""
|
||||
取得 DualIncidentMemory 實例 (Singleton)
|
||||
|
||||
Phase 16 絞殺者模式:
|
||||
- USE_NEW_ENGINE=False (預設): 返回內嵌版本
|
||||
- USE_NEW_ENGINE=True: 返回 lewooogo-brain 套件版本
|
||||
|
||||
回滾指令: kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false
|
||||
Phase R-R2: 統一使用 lewooogo-brain 套件版本。
|
||||
回滾方式: git revert Phase R-R2 commit + redeploy。
|
||||
"""
|
||||
from src.core.config import settings
|
||||
|
||||
if settings.USE_NEW_ENGINE:
|
||||
return _get_new_engine_memory()
|
||||
else:
|
||||
return _get_legacy_memory()
|
||||
|
||||
|
||||
def _get_legacy_memory() -> DualIncidentMemory:
|
||||
"""取得內嵌版本 (Phase 16 舊引擎)"""
|
||||
global _dual_memory
|
||||
if _dual_memory is None:
|
||||
_dual_memory = DualIncidentMemory()
|
||||
logger.info("incident_memory_initialized", engine="legacy_embedded")
|
||||
return _dual_memory
|
||||
return _get_new_engine_memory()
|
||||
|
||||
|
||||
def _get_new_engine_memory() -> Any:
|
||||
"""
|
||||
取得 lewooogo-brain 套件版本 (Phase 16 新引擎)
|
||||
取得 lewooogo-brain 套件版本
|
||||
|
||||
注意事項:
|
||||
- 需要 lewooogo-brain 已安裝 (Dockerfile 已配置)
|
||||
- PostgreSQL 透過 IncidentDbAdapter 注入 (Phase 16 DI 模式)
|
||||
- 初次啟用建議 48 小時監控
|
||||
|
||||
回滾: 設定 USE_NEW_ENGINE=false 即可瞬間切回
|
||||
"""
|
||||
global _new_engine_memory, _db_adapter
|
||||
|
||||
if _new_engine_memory is None:
|
||||
try:
|
||||
# 延遲導入: 避免 lewooogo-brain 未安裝時啟動失敗
|
||||
from lewooogo_brain.adapters.incident_memory import (
|
||||
DualIncidentMemory as NewDualIncidentMemory,
|
||||
)
|
||||
|
||||
from src.core.redis_client import get_redis
|
||||
|
||||
redis_client = get_redis()
|
||||
|
||||
# 初始化 DB Adapter (Phase 16 DI 模式)
|
||||
if _db_adapter is None:
|
||||
_db_adapter = IncidentDbAdapter()
|
||||
|
||||
# 初始化 lewooogo-brain 版本 (含 DB Adapter)
|
||||
_new_engine_memory = NewDualIncidentMemory(
|
||||
redis_client=redis_client,
|
||||
db_adapter=_db_adapter,
|
||||
@@ -683,19 +206,17 @@ def _get_new_engine_memory() -> Any:
|
||||
)
|
||||
|
||||
except ImportError as e:
|
||||
# lewooogo-brain 未安裝,降級到內嵌版本
|
||||
logger.warning(
|
||||
"lewooogo_brain_not_available_fallback_to_legacy",
|
||||
logger.error(
|
||||
"lewooogo_brain_not_available",
|
||||
error=str(e),
|
||||
)
|
||||
return _get_legacy_memory()
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
# 其他錯誤,降級到內嵌版本
|
||||
logger.error(
|
||||
"new_engine_init_failed_fallback_to_legacy",
|
||||
"new_engine_init_failed",
|
||||
error=str(e),
|
||||
)
|
||||
return _get_legacy_memory()
|
||||
raise
|
||||
|
||||
return _new_engine_memory
|
||||
|
||||
Reference in New Issue
Block a user