新增 5 個紅區核心服務的 Protocol 介面: - IDecisionManager: 決策狀態機 - ITrustScoreManager: 信任評分引擎 - IIncidentEngine: 事件處理引擎 - IMultiSigRedisService: 分散式鎖服務 - ITelegramSecurityInterceptor: 安全攔截器 符合 leWOOOgo 積木化規範: - 支援依賴注入 (DI) - 便於測試時 Mock - 型別約束確保實作一致性 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
859 lines
29 KiB
Python
859 lines
29 KiB
Python
"""
|
||
Incident Engine v1.2 - Phase 6.4e DualMemory 整合版
|
||
====================================================
|
||
|
||
v1.2 重構內容 (Phase 6.4e):
|
||
- 整合 DualIncidentMemory 進行 DB 持久化
|
||
- 保持 Lua 原子操作進行 Redis Working Memory 更新
|
||
- 支援從 Episodic Memory (PostgreSQL) 回載 Incident
|
||
|
||
v1.1 重構內容 (2026-03-22 架構師審查後修正):
|
||
1. O(1) 反向索引: 廢除 SCAN,改用 namespace/target 索引直查
|
||
2. Lua 原子操作: 廢除 Read-Modify-Write,改用 Redis Lua Script
|
||
3. 併發防護: 確保告警風暴下不會發生 Race Condition
|
||
|
||
功能:
|
||
1. 事件聚合 (Alert Aggregation): 將相關告警聚合到同一個 Incident
|
||
2. 爆炸半徑分析 (Blast Radius): 透過 GraphRAG 分析受影響服務
|
||
3. 智能去重 (Deduplication): 避免重複告警造成 Incident 爆炸
|
||
|
||
設計原則:
|
||
- 30 分鐘時間窗口: 超過此時間的 Incident 視為新事件
|
||
- 關聯判斷: 同 namespace 或同 target 視為相關
|
||
- 狀態過濾: 只聚合 INVESTIGATING 或 MITIGATING 狀態的事件
|
||
|
||
統帥鐵律:
|
||
- 禁止告警風暴: 相關告警必須聚合,減少 Incident 數量
|
||
- 禁止 O(N) 掃描: 所有查詢必須 O(1)
|
||
- 禁止 Race Condition: 所有寫入必須原子操作
|
||
"""
|
||
|
||
import json
|
||
from datetime import UTC, datetime
|
||
from typing import Any, Protocol, runtime_checkable
|
||
|
||
import structlog
|
||
|
||
from src.core.redis_client import get_redis
|
||
from src.models.incident import (
|
||
Incident,
|
||
Severity,
|
||
Signal,
|
||
)
|
||
from src.services.graph_rag import BlastRadiusResult, topology_graph
|
||
from src.services.incident_memory import DualIncidentMemory, get_incident_memory
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
|
||
# =============================================================================
|
||
# Constants
|
||
# =============================================================================
|
||
|
||
# Redis Key Patterns
|
||
INCIDENT_KEY_PREFIX = "incident:"
|
||
INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id
|
||
INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id
|
||
|
||
# 聚合時間窗口: 30 分鐘
|
||
AGGREGATION_WINDOW_MINUTES = 30
|
||
AGGREGATION_WINDOW_SECONDS = AGGREGATION_WINDOW_MINUTES * 60
|
||
|
||
# Working Memory TTL: 7 天 = 604800 秒
|
||
WORKING_MEMORY_TTL = 604800
|
||
|
||
|
||
# =============================================================================
|
||
# Lua Scripts (原子操作)
|
||
# =============================================================================
|
||
|
||
# Lua Script: 原子聚合 Signal 到 Incident
|
||
# KEYS[1] = incident key (incident:{id})
|
||
# ARGV[1] = new signal JSON
|
||
# ARGV[2] = new severity string (P0/P1/P2/P3)
|
||
# ARGV[3] = current timestamp ISO string
|
||
# ARGV[4] = TTL seconds
|
||
# Returns: updated incident JSON or nil if not found
|
||
LUA_AGGREGATE_SIGNAL = """
|
||
local data = redis.call('GET', KEYS[1])
|
||
if not data then
|
||
return nil
|
||
end
|
||
|
||
local incident = cjson.decode(data)
|
||
|
||
-- Parse new signal
|
||
local new_signal = cjson.decode(ARGV[1])
|
||
|
||
-- Check fingerprint deduplication
|
||
local fingerprint = new_signal.fingerprint
|
||
if fingerprint and fingerprint ~= cjson.null then
|
||
for _, signal in ipairs(incident.signals) do
|
||
if signal.fingerprint == fingerprint then
|
||
-- Duplicate detected, return unchanged
|
||
return data
|
||
end
|
||
end
|
||
end
|
||
|
||
-- Append signal atomically
|
||
table.insert(incident.signals, new_signal)
|
||
|
||
-- Severity escalation (P0 < P1 < P2 < P3, lower index = more severe)
|
||
local severity_order = {P0=0, P1=1, P2=2, P3=3}
|
||
local new_sev = ARGV[2]
|
||
local cur_sev = incident.severity
|
||
if severity_order[new_sev] and severity_order[cur_sev] then
|
||
if severity_order[new_sev] < severity_order[cur_sev] then
|
||
incident.severity = new_sev
|
||
end
|
||
end
|
||
|
||
-- Update timestamp
|
||
incident.updated_at = ARGV[3]
|
||
|
||
-- Serialize and save with TTL
|
||
local new_data = cjson.encode(incident)
|
||
redis.call('SET', KEYS[1], new_data, 'EX', tonumber(ARGV[4]))
|
||
|
||
return new_data
|
||
"""
|
||
|
||
# Lua Script: 原子建立或聚合 Incident (完全消除 Race Condition)
|
||
# KEYS[1] = namespace index key (incident:idx:ns:{ns})
|
||
# KEYS[2] = target index key (incident:idx:target:{target})
|
||
# ARGV[1] = new incident JSON (if creating)
|
||
# ARGV[2] = new incident_id
|
||
# ARGV[3] = new signal JSON
|
||
# ARGV[4] = new severity string (P0/P1/P2/P3)
|
||
# ARGV[5] = current timestamp ISO string
|
||
# ARGV[6] = incident TTL seconds
|
||
# ARGV[7] = index TTL seconds (aggregation window)
|
||
# ARGV[8] = incident key prefix
|
||
# Returns: "CREATED:{incident_json}" or "AGGREGATED:{incident_json}"
|
||
LUA_CREATE_OR_AGGREGATE = """
|
||
local ns_index_key = KEYS[1]
|
||
local target_index_key = KEYS[2]
|
||
local new_incident_json = ARGV[1]
|
||
local new_incident_id = ARGV[2]
|
||
local new_signal_json = ARGV[3]
|
||
local new_severity = ARGV[4]
|
||
local timestamp = ARGV[5]
|
||
local incident_ttl = tonumber(ARGV[6])
|
||
local index_ttl = tonumber(ARGV[7])
|
||
local incident_key_prefix = ARGV[8]
|
||
|
||
-- Step 1: 嘗試搶佔 namespace 索引 (SETNX 原子操作)
|
||
local ns_set_result = redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl, 'NX')
|
||
|
||
if ns_set_result then
|
||
-- 我們是第一個!建立新 Incident
|
||
local incident_key = incident_key_prefix .. new_incident_id
|
||
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
|
||
|
||
-- 設置 target 索引
|
||
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
|
||
|
||
return "CREATED:" .. new_incident_json
|
||
end
|
||
|
||
-- Step 2: 索引已存在,查找現有 Incident ID
|
||
local existing_incident_id = redis.call('GET', ns_index_key)
|
||
if not existing_incident_id then
|
||
-- 可能剛好過期,嘗試 target 索引
|
||
existing_incident_id = redis.call('GET', target_index_key)
|
||
end
|
||
|
||
if not existing_incident_id then
|
||
-- 兩個索引都沒有,建立新的 (邊緣情況)
|
||
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
|
||
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
|
||
|
||
local incident_key = incident_key_prefix .. new_incident_id
|
||
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
|
||
|
||
return "CREATED:" .. new_incident_json
|
||
end
|
||
|
||
-- Step 3: 聚合到現有 Incident
|
||
local incident_key = incident_key_prefix .. existing_incident_id
|
||
local existing_data = redis.call('GET', incident_key)
|
||
|
||
if not existing_data then
|
||
-- Incident 已過期但索引未過期,建立新的
|
||
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
|
||
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl)
|
||
|
||
local new_incident_key = incident_key_prefix .. new_incident_id
|
||
redis.call('SET', new_incident_key, new_incident_json, 'EX', incident_ttl)
|
||
|
||
return "CREATED:" .. new_incident_json
|
||
end
|
||
|
||
-- Step 4: 原子聚合 Signal
|
||
local incident = cjson.decode(existing_data)
|
||
local new_signal = cjson.decode(new_signal_json)
|
||
|
||
-- 修復 cjson 空陣列問題 (cjson 會把 [] 變成 {})
|
||
if type(incident.proposal_ids) == "table" and next(incident.proposal_ids) == nil then
|
||
incident.proposal_ids = cjson.empty_array
|
||
end
|
||
if type(incident.affected_services) == "table" and next(incident.affected_services) == nil then
|
||
incident.affected_services = cjson.empty_array
|
||
end
|
||
|
||
-- Fingerprint 去重
|
||
local fingerprint = new_signal.fingerprint
|
||
if fingerprint and fingerprint ~= cjson.null then
|
||
for _, signal in ipairs(incident.signals) do
|
||
if signal.fingerprint == fingerprint then
|
||
return "AGGREGATED:" .. existing_data
|
||
end
|
||
end
|
||
end
|
||
|
||
-- 附加 Signal
|
||
table.insert(incident.signals, new_signal)
|
||
|
||
-- Severity 升級
|
||
local severity_order = {P0=0, P1=1, P2=2, P3=3}
|
||
if severity_order[new_severity] and severity_order[incident.severity] then
|
||
if severity_order[new_severity] < severity_order[incident.severity] then
|
||
incident.severity = new_severity
|
||
end
|
||
end
|
||
|
||
-- 更新時間戳
|
||
incident.updated_at = timestamp
|
||
|
||
-- 保存並返回
|
||
local updated_json = cjson.encode(incident)
|
||
redis.call('SET', incident_key, updated_json, 'EX', incident_ttl)
|
||
|
||
return "AGGREGATED:" .. updated_json
|
||
"""
|
||
|
||
|
||
# =============================================================================
|
||
# Protocol Interface (Phase 17 P1 - 紅區治理)
|
||
# =============================================================================
|
||
|
||
@runtime_checkable
|
||
class IIncidentEngine(Protocol):
|
||
"""
|
||
IncidentEngine 介面定義
|
||
|
||
用途:
|
||
- 依賴注入 (DI) 時的型別約束
|
||
- 測試時 Mock 的型別檢查
|
||
- 符合 leWOOOgo 積木化規範
|
||
|
||
Tier 3 紅區服務: 修改需首席架構師簽核
|
||
|
||
@see feedback_lewooogo_modular_enforcement.md
|
||
@see docs/RED_ZONES.md
|
||
"""
|
||
|
||
async def process_signal(
|
||
self,
|
||
signal_data: dict[str, Any],
|
||
) -> Incident | None:
|
||
"""處理 Signal: 原子建立或聚合 Incident"""
|
||
...
|
||
|
||
async def get_incident(self, incident_id: str) -> Incident | None:
|
||
"""取得指定 Incident"""
|
||
...
|
||
|
||
async def update_incident_status(
|
||
self,
|
||
incident_id: str,
|
||
status: str,
|
||
) -> Incident | None:
|
||
"""更新 Incident 狀態"""
|
||
...
|
||
|
||
|
||
# =============================================================================
|
||
# Incident Engine v1.1
|
||
# =============================================================================
|
||
|
||
class IncidentEngine:
|
||
"""
|
||
事件引擎 v1.1 - 認知覺醒核心 (效能強化版)
|
||
|
||
職責:
|
||
1. 聚合相關告警到同一 Incident (減少噪音)
|
||
2. 整合 GraphRAG 分析爆炸半徑
|
||
3. 雙層持久化 (Redis + SQLite/PG)
|
||
|
||
v1.1 重構:
|
||
- O(1) 反向索引取代 O(N) SCAN
|
||
- Lua 原子操作取代 Read-Modify-Write
|
||
- 完全消除 Race Condition
|
||
|
||
使用方式:
|
||
engine = IncidentEngine()
|
||
incident = await engine.process_signal(signal_data)
|
||
"""
|
||
|
||
def __init__(self, memory: DualIncidentMemory | None = None) -> None:
|
||
"""
|
||
初始化 IncidentEngine
|
||
|
||
Args:
|
||
memory: DualIncidentMemory 實例 (可選,預設使用 Singleton)
|
||
"""
|
||
self._graph = topology_graph
|
||
self._memory = memory or get_incident_memory()
|
||
self._lua_aggregate_sha: str | None = None
|
||
self._lua_create_sha: str | None = None
|
||
|
||
# =========================================================================
|
||
# Lua Script 初始化
|
||
# =========================================================================
|
||
|
||
async def _ensure_lua_scripts(self) -> None:
|
||
"""確保 Lua Scripts 已載入 Redis (SCRIPT LOAD)"""
|
||
if self._lua_aggregate_sha and self._lua_create_sha:
|
||
return
|
||
|
||
redis_client = get_redis()
|
||
|
||
# Load aggregate script (for existing incident updates)
|
||
self._lua_aggregate_sha = await redis_client.script_load(
|
||
LUA_AGGREGATE_SIGNAL
|
||
)
|
||
logger.debug(
|
||
"lua_script_loaded",
|
||
script="aggregate_signal",
|
||
sha=self._lua_aggregate_sha,
|
||
)
|
||
|
||
# Load unified create-or-aggregate script
|
||
self._lua_create_sha = await redis_client.script_load(
|
||
LUA_CREATE_OR_AGGREGATE
|
||
)
|
||
logger.debug(
|
||
"lua_script_loaded",
|
||
script="create_or_aggregate",
|
||
sha=self._lua_create_sha,
|
||
)
|
||
|
||
# =========================================================================
|
||
# 核心方法: 處理 Signal
|
||
# =========================================================================
|
||
|
||
async def process_signal(
|
||
self,
|
||
signal_data: dict[str, Any],
|
||
) -> Incident | None:
|
||
"""
|
||
處理 Signal: 原子建立或聚合 Incident
|
||
|
||
Phase 6.3 核心邏輯 (v1.1 重構):
|
||
1. 解析 Signal
|
||
2. 單一 Lua Script 原子操作: 建立或聚合 (完全消除 Race Condition)
|
||
3. 調用 GraphRAG 分析爆炸半徑
|
||
4. 雙層持久化
|
||
|
||
Args:
|
||
signal_data: 從 Redis Stream 收到的 Signal 資料
|
||
|
||
Returns:
|
||
Incident | None: 處理後的 Incident
|
||
"""
|
||
try:
|
||
# 確保 Lua Scripts 已載入
|
||
await self._ensure_lua_scripts()
|
||
|
||
# 1. 解析 Signal
|
||
signal = self._parse_signal(signal_data)
|
||
namespace = signal_data.get("namespace", "default")
|
||
target = signal_data.get("target", "unknown")
|
||
|
||
# 在 labels 中加入 namespace
|
||
signal.labels["namespace"] = namespace
|
||
|
||
logger.info(
|
||
"signal_processing",
|
||
alert_name=signal.alert_name,
|
||
namespace=namespace,
|
||
target=target,
|
||
)
|
||
|
||
# 2. 單一 Lua Script 原子操作: 建立或聚合
|
||
incident = await self._atomic_create_or_aggregate(
|
||
signal=signal,
|
||
namespace=namespace,
|
||
target=target,
|
||
)
|
||
|
||
if not incident:
|
||
logger.error(
|
||
"atomic_operation_failed",
|
||
alert_name=signal.alert_name,
|
||
namespace=namespace,
|
||
)
|
||
return None
|
||
|
||
# 3. GraphRAG 分析爆炸半徑
|
||
await self._analyze_blast_radius(incident, target)
|
||
|
||
# 4. 雙層持久化 (DB 層)
|
||
await self._persist_to_db(incident)
|
||
|
||
return incident
|
||
|
||
except Exception as e:
|
||
logger.exception(
|
||
"process_signal_error",
|
||
error=str(e),
|
||
)
|
||
return None
|
||
|
||
# =========================================================================
|
||
# 原子建立或聚合 (單一 Lua Script - 完全消除 Race Condition)
|
||
# =========================================================================
|
||
|
||
async def _atomic_create_or_aggregate(
|
||
self,
|
||
signal: Signal,
|
||
namespace: str,
|
||
target: str,
|
||
) -> Incident | None:
|
||
"""
|
||
使用單一 Lua Script 原子建立或聚合 Incident
|
||
|
||
核心設計:
|
||
1. 使用 SETNX 搶佔索引作為分散式鎖
|
||
2. 如果搶到 → 建立新 Incident
|
||
3. 如果沒搶到 → 聚合到已存在的 Incident
|
||
4. 整個流程在 Lua 中原子執行
|
||
|
||
優點:
|
||
- 完全消除 Race Condition
|
||
- 單次 Redis 往返完成所有操作
|
||
- 無論多少併發 Signal,同一 namespace/target 只會有一個 Incident
|
||
"""
|
||
redis_client = get_redis()
|
||
|
||
# Redis Keys
|
||
ns_index_key = f"{INCIDENT_INDEX_NS}{namespace}"
|
||
target_index_key = f"{INCIDENT_INDEX_TARGET}{target}"
|
||
|
||
# 準備新 Incident (如果需要建立)
|
||
new_incident = Incident(
|
||
severity=signal.severity,
|
||
signals=[signal],
|
||
affected_services=[target],
|
||
)
|
||
new_incident_json = new_incident.model_dump_json()
|
||
|
||
# Signal 參數
|
||
signal_json = signal.model_dump_json()
|
||
severity_str = signal.severity.value
|
||
timestamp_str = datetime.now(UTC).isoformat()
|
||
|
||
try:
|
||
# 執行統一 Lua Script (原子操作)
|
||
result = await redis_client.evalsha(
|
||
self._lua_create_sha,
|
||
2, # number of keys
|
||
ns_index_key, # KEYS[1]
|
||
target_index_key, # KEYS[2]
|
||
new_incident_json, # ARGV[1] - new incident JSON
|
||
new_incident.incident_id, # ARGV[2] - new incident ID
|
||
signal_json, # ARGV[3] - new signal JSON
|
||
severity_str, # ARGV[4] - severity
|
||
timestamp_str, # ARGV[5] - timestamp
|
||
str(WORKING_MEMORY_TTL), # ARGV[6] - incident TTL
|
||
str(AGGREGATION_WINDOW_SECONDS), # ARGV[7] - index TTL
|
||
INCIDENT_KEY_PREFIX, # ARGV[8] - key prefix
|
||
)
|
||
|
||
if not result:
|
||
logger.error(
|
||
"lua_script_returned_nil",
|
||
namespace=namespace,
|
||
target=target,
|
||
)
|
||
return None
|
||
|
||
# 解析結果
|
||
result_str = result.decode() if isinstance(result, bytes) else result
|
||
|
||
if result_str.startswith("CREATED:"):
|
||
incident_json = result_str[8:] # 移除 "CREATED:" 前綴
|
||
incident = self._parse_lua_incident(incident_json)
|
||
logger.info(
|
||
"incident_created_atomic",
|
||
incident_id=incident.incident_id,
|
||
severity=incident.severity.value,
|
||
namespace=namespace,
|
||
signal_count=1,
|
||
)
|
||
return incident
|
||
|
||
elif result_str.startswith("AGGREGATED:"):
|
||
incident_json = result_str[11:] # 移除 "AGGREGATED:" 前綴
|
||
incident = self._parse_lua_incident(incident_json)
|
||
logger.info(
|
||
"signal_aggregated_atomic",
|
||
incident_id=incident.incident_id,
|
||
severity=incident.severity.value,
|
||
namespace=namespace,
|
||
signal_count=len(incident.signals),
|
||
)
|
||
return incident
|
||
|
||
else:
|
||
logger.error(
|
||
"lua_script_unexpected_result",
|
||
result=result_str[:100],
|
||
)
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.exception(
|
||
"atomic_create_or_aggregate_error",
|
||
namespace=namespace,
|
||
target=target,
|
||
error=str(e),
|
||
)
|
||
return None
|
||
|
||
# =========================================================================
|
||
# GraphRAG 整合
|
||
# =========================================================================
|
||
|
||
async def _analyze_blast_radius(
|
||
self,
|
||
incident: Incident,
|
||
target: str,
|
||
) -> None:
|
||
"""
|
||
調用 GraphRAG 分析爆炸半徑
|
||
|
||
將結果寫入 incident.affected_services
|
||
"""
|
||
try:
|
||
result: BlastRadiusResult = self._graph.get_blast_radius(target)
|
||
|
||
# 合併 affected_services (去重)
|
||
for service in result.affected_services:
|
||
if service not in incident.affected_services:
|
||
incident.affected_services.append(service)
|
||
|
||
# 確保 target 本身在列表中
|
||
if target not in incident.affected_services:
|
||
incident.affected_services.append(target)
|
||
|
||
logger.info(
|
||
"blast_radius_analyzed",
|
||
incident_id=incident.incident_id,
|
||
target=target,
|
||
affected_count=result.affected_count,
|
||
affected_services=incident.affected_services,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.warning(
|
||
"blast_radius_analysis_failed",
|
||
incident_id=incident.incident_id,
|
||
target=target,
|
||
error=str(e),
|
||
)
|
||
# 失敗時至少保留 target
|
||
if target not in incident.affected_services:
|
||
incident.affected_services.append(target)
|
||
|
||
# =========================================================================
|
||
# 持久化 (DB 層) - Phase 6.4e: 委託給 DualIncidentMemory
|
||
# =========================================================================
|
||
|
||
async def _persist_to_db(self, incident: Incident) -> None:
|
||
"""
|
||
持久化到 PostgreSQL (Episodic Memory)
|
||
|
||
Phase 6.4e: 委託給 DualIncidentMemory.persist_incident()
|
||
Redis 已在 Lua Script 中更新,這裡只處理 DB
|
||
"""
|
||
try:
|
||
success = await self._memory.persist_incident(incident)
|
||
incident.persisted_to_pg = success
|
||
|
||
if success:
|
||
logger.debug(
|
||
"db_persisted_via_dual_memory",
|
||
incident_id=incident.incident_id,
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"db_persist_failed_via_dual_memory",
|
||
incident_id=incident.incident_id,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.exception("db_save_error", error=str(e))
|
||
|
||
# =========================================================================
|
||
# 從 Episodic Memory 載入 (Phase 6.4e 新增)
|
||
# =========================================================================
|
||
|
||
async def get_incident(self, incident_id: str) -> Incident | None:
|
||
"""
|
||
取得 Incident
|
||
|
||
Phase 6.4e: 委託給 DualIncidentMemory.load_incident()
|
||
優先從 Working Memory (Redis) 讀取,miss 時從 Episodic (PostgreSQL) 讀取
|
||
|
||
Args:
|
||
incident_id: Incident ID
|
||
|
||
Returns:
|
||
Incident 或 None
|
||
"""
|
||
return await self._memory.load_incident(incident_id)
|
||
|
||
# =========================================================================
|
||
# 輔助方法
|
||
# =========================================================================
|
||
|
||
def _parse_lua_incident(self, incident_json: str) -> Incident:
|
||
"""
|
||
解析 Lua 返回的 Incident JSON
|
||
|
||
修復 Lua cjson 的問題:
|
||
- cjson.encode 會把空陣列 [] 轉成空物件 {}
|
||
- 需要手動修復陣列欄位
|
||
"""
|
||
data = json.loads(incident_json)
|
||
|
||
# 修復可能被轉成空物件的陣列欄位
|
||
array_fields = ["signals", "affected_services", "proposal_ids"]
|
||
for field in array_fields:
|
||
is_empty_dict = isinstance(data[field], dict) and len(data[field]) == 0
|
||
if field in data and is_empty_dict:
|
||
data[field] = []
|
||
|
||
return Incident.model_validate(data)
|
||
|
||
def _parse_signal(self, signal_data: dict[str, Any]) -> Signal:
|
||
"""解析 Signal"""
|
||
return Signal(
|
||
alert_name=signal_data.get("alert_name", "unknown"),
|
||
severity=self._parse_severity(signal_data.get("severity", "warning")),
|
||
source=self._parse_source(signal_data.get("source", "manual")),
|
||
fired_at=datetime.now(UTC),
|
||
labels=self._parse_dict(signal_data.get("labels", "{}")),
|
||
annotations=self._parse_dict(signal_data.get("annotations", "{}")),
|
||
fingerprint=signal_data.get("fingerprint"),
|
||
)
|
||
|
||
def _parse_source(self, source_str: str) -> str:
|
||
"""解析來源"""
|
||
valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"}
|
||
if source_str.lower() in valid_sources:
|
||
return source_str.lower()
|
||
return "manual"
|
||
|
||
def _parse_severity(self, severity_str: str) -> Severity:
|
||
"""解析嚴重度"""
|
||
mapping = {
|
||
"critical": Severity.P0,
|
||
"high": Severity.P1,
|
||
"warning": Severity.P2,
|
||
"medium": Severity.P2,
|
||
"low": Severity.P3,
|
||
"info": Severity.P3,
|
||
}
|
||
return mapping.get(severity_str.lower(), Severity.P2)
|
||
|
||
def _parse_dict(self, value: str | dict) -> dict[str, str]:
|
||
"""解析字典"""
|
||
if isinstance(value, dict):
|
||
return {str(k): str(v) for k, v in value.items()}
|
||
if isinstance(value, str):
|
||
try:
|
||
parsed = json.loads(value.replace("'", '"'))
|
||
return {str(k): str(v) for k, v in parsed.items()}
|
||
except (json.JSONDecodeError, TypeError):
|
||
return {}
|
||
return {}
|
||
|
||
|
||
# =============================================================================
|
||
# Phase 16: 絞殺者模式 - Adapter 實作
|
||
# =============================================================================
|
||
|
||
class IncidentMemoryAdapter:
|
||
"""
|
||
Incident Memory Adapter - 實作 lewooogo-brain 的 IIncidentMemory Protocol
|
||
|
||
Phase 16 R1.3: 橋接現有 Lua Scripts + DualIncidentMemory 到新 IncidentEngine
|
||
|
||
版本: v1.0
|
||
建立: 2026-03-26 (台北時區)
|
||
建立者: Claude Code
|
||
"""
|
||
|
||
def __init__(self, memory: DualIncidentMemory) -> None:
|
||
self._memory = memory
|
||
self._lua_create_sha: str | None = None
|
||
self._lua_aggregate_sha: str | None = None
|
||
|
||
async def _ensure_lua_scripts(self) -> None:
|
||
"""確保 Lua Scripts 已載入"""
|
||
if self._lua_create_sha:
|
||
return
|
||
redis_client = get_redis()
|
||
self._lua_create_sha = await redis_client.script_load(LUA_CREATE_OR_AGGREGATE)
|
||
self._lua_aggregate_sha = await redis_client.script_load(LUA_AGGREGATE_SIGNAL)
|
||
|
||
async def load_incident(self, incident_id: str) -> Incident | None:
|
||
"""從 Working Memory 載入 Incident"""
|
||
return await self._memory.load_incident(incident_id)
|
||
|
||
async def save_incident(
|
||
self, incident: Incident, ttl_seconds: int = 604800
|
||
) -> bool:
|
||
"""儲存 Incident 到 Working Memory"""
|
||
try:
|
||
redis_client = get_redis()
|
||
key = f"{INCIDENT_KEY_PREFIX}{incident.incident_id}"
|
||
await redis_client.set(key, incident.model_dump_json(), ex=ttl_seconds)
|
||
return True
|
||
except Exception as e:
|
||
logger.exception("save_incident_error", error=str(e))
|
||
return False
|
||
|
||
async def persist_incident(self, incident: Incident) -> bool:
|
||
"""持久化到 Episodic Memory (PostgreSQL)"""
|
||
return await self._memory.persist_incident(incident)
|
||
|
||
async def find_related_incident(
|
||
self,
|
||
namespace: str,
|
||
target: str,
|
||
window_minutes: int = 30, # noqa: ARG002
|
||
) -> Incident | None:
|
||
"""尋找相關的活躍 Incident (用於聚合)"""
|
||
redis_client = get_redis()
|
||
|
||
# 嘗試 namespace 索引
|
||
ns_key = f"{INCIDENT_INDEX_NS}{namespace}"
|
||
incident_id = await redis_client.get(ns_key)
|
||
|
||
if not incident_id:
|
||
# 嘗試 target 索引
|
||
target_key = f"{INCIDENT_INDEX_TARGET}{target}"
|
||
incident_id = await redis_client.get(target_key)
|
||
|
||
if incident_id:
|
||
if isinstance(incident_id, bytes):
|
||
incident_id = incident_id.decode()
|
||
return await self.load_incident(incident_id)
|
||
|
||
return None
|
||
|
||
async def update_index(
|
||
self,
|
||
incident_id: str,
|
||
namespace: str,
|
||
target: str,
|
||
) -> bool:
|
||
"""更新反向索引"""
|
||
try:
|
||
redis_client = get_redis()
|
||
ttl = AGGREGATION_WINDOW_SECONDS
|
||
|
||
ns_key = f"{INCIDENT_INDEX_NS}{namespace}"
|
||
target_key = f"{INCIDENT_INDEX_TARGET}{target}"
|
||
|
||
await redis_client.set(ns_key, incident_id, ex=ttl, nx=True)
|
||
await redis_client.set(target_key, incident_id, ex=ttl, nx=True)
|
||
return True
|
||
except Exception as e:
|
||
logger.exception("update_index_error", error=str(e))
|
||
return False
|
||
|
||
|
||
class BlastRadiusAdapter:
|
||
"""
|
||
Blast Radius Adapter - 實作 lewooogo-brain 的 IBlastRadiusAnalyzer Protocol
|
||
|
||
Phase 16 R1.3: 包裝現有 topology_graph
|
||
|
||
版本: v1.0
|
||
建立: 2026-03-26 (台北時區)
|
||
建立者: Claude Code
|
||
"""
|
||
|
||
def __init__(self, graph=None) -> None:
|
||
self._graph = graph or topology_graph
|
||
|
||
def analyze(self, target: str) -> list[str]:
|
||
"""分析受影響的服務列表"""
|
||
try:
|
||
result: BlastRadiusResult = self._graph.get_blast_radius(target)
|
||
return result.affected_services
|
||
except Exception as e:
|
||
logger.warning("blast_radius_analysis_failed", target=target, error=str(e))
|
||
return [target] if target != "unknown" else []
|
||
|
||
|
||
# =============================================================================
|
||
# Singleton + 絞殺者模式切換
|
||
# =============================================================================
|
||
|
||
_incident_engine: IncidentEngine | None = None
|
||
_new_incident_engine = None # Type: lewooogo_brain IncidentEngine
|
||
|
||
|
||
def _get_new_engine():
|
||
"""取得 lewooogo-brain 的 IncidentEngine (Phase 16 新版)"""
|
||
global _new_incident_engine
|
||
if _new_incident_engine is None:
|
||
from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine
|
||
|
||
# 建立 Adapters
|
||
memory_adapter = IncidentMemoryAdapter(get_incident_memory())
|
||
blast_adapter = BlastRadiusAdapter()
|
||
|
||
_new_incident_engine = NewIncidentEngine(
|
||
memory=memory_adapter,
|
||
blast_analyzer=blast_adapter,
|
||
logger=logger,
|
||
)
|
||
logger.info("new_incident_engine_initialized", version="lewooogo-brain")
|
||
return _new_incident_engine
|
||
|
||
|
||
def _get_legacy_engine() -> IncidentEngine:
|
||
"""取得舊版 IncidentEngine"""
|
||
global _incident_engine
|
||
if _incident_engine is None:
|
||
_incident_engine = IncidentEngine()
|
||
return _incident_engine
|
||
|
||
|
||
def get_incident_engine():
|
||
"""
|
||
取得 Incident Engine 實例 (Singleton + 絞殺者模式)
|
||
|
||
Phase 16: 根據 USE_NEW_ENGINE 設定切換引擎
|
||
- False (預設): 使用內嵌版 IncidentEngine
|
||
- True: 使用 lewooogo-brain 的 IncidentEngine
|
||
|
||
回滾方式:
|
||
kubectl set env deployment/awoooi-api USE_NEW_ENGINE=false
|
||
"""
|
||
from src.core.config import settings
|
||
|
||
if settings.USE_NEW_ENGINE:
|
||
logger.debug("using_new_incident_engine", version="lewooogo-brain")
|
||
return _get_new_engine()
|
||
else:
|
||
return _get_legacy_engine()
|