fix(api): Phase R-R2.1 修復架構審查 P0+P1 問題
P0-01: IncidentDbAdapter._record_to_incident 返回型別標注為 Any
(實際返回 BrainIncident,非本地 Incident,避免型別誤報)
P0-02: get_incident_engine() 加入 try/except ImportError 保護
(仿照 get_incident_memory() 錯誤處理模式,確保可觀測性)
P1-01: 移除 IncidentMemoryAdapter 死碼 (-170 行 Lua scripts + _ensure_lua_scripts)
(lewooogo-brain 不調用此方法,已確認)
P1-03: IncidentMemoryAdapter.save_incident() 委派給 self._memory
(修復 key prefix 不一致: "incident:" vs "awoooi:incidents:")
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,8 +6,11 @@ Phase R-R2 (2026-04-01 ogt): 移除內嵌 IncidentEngine 重複邏輯,
|
||||
全面切換至 lewooogo-brain IncidentEngine。
|
||||
完整舊版本歸檔: src/services/_archived/incident_engine_v1.py
|
||||
|
||||
Phase R-R2.1 (2026-04-01 ogt): 修復 P0-02 ImportError 保護 + P1-01 死碼移除
|
||||
+ P1-03 save_incident key prefix 不一致 (IncidentMemoryAdapter 委派給 self._memory)
|
||||
|
||||
架構:
|
||||
- IncidentMemoryAdapter: 橋接現有 Lua Scripts + DualIncidentMemory
|
||||
- IncidentMemoryAdapter: 橋接 DualIncidentMemory 接口到 lewooogo-brain IncidentEngine
|
||||
- BlastRadiusAdapter: 包裝 topology_graph 注入 lewooogo-brain
|
||||
- get_incident_engine(): 統一入口 (Singleton)
|
||||
|
||||
@@ -33,189 +36,14 @@ logger = structlog.get_logger(__name__)
|
||||
# Constants
|
||||
# =============================================================================
|
||||
|
||||
# Redis Key Patterns
|
||||
INCIDENT_KEY_PREFIX = "incident:"
|
||||
# Redis Key Patterns (索引 Key,與 lewooogo-brain DualIncidentMemory 保持一致)
|
||||
INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id
|
||||
INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id
|
||||
|
||||
# 聚合時間窗口: 30 分鐘
|
||||
# 聚合時間窗口
|
||||
AGGREGATION_WINDOW_MINUTES = 30
|
||||
AGGREGATION_WINDOW_SECONDS = AGGREGATION_WINDOW_MINUTES * 60
|
||||
|
||||
# Working Memory TTL: 7 天 = 604800 秒
|
||||
WORKING_MEMORY_TTL = 604800
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Lua Scripts (原子操作) - 供 IncidentMemoryAdapter 使用
|
||||
# =============================================================================
|
||||
|
||||
# Lua Script: 原子聚合 Signal 到 Incident
|
||||
# KEYS[1] = incident key (incident:{id})
|
||||
# ARGV[1] = new signal JSON
|
||||
# ARGV[2] = new severity string (P0/P1/P2/P3)
|
||||
# ARGV[3] = current timestamp ISO string
|
||||
# ARGV[4] = TTL seconds
|
||||
# Returns: updated incident JSON or nil if not found
|
||||
LUA_AGGREGATE_SIGNAL = """
|
||||
local data = redis.call('GET', KEYS[1])
|
||||
if not data then
|
||||
return nil
|
||||
end
|
||||
|
||||
local incident = cjson.decode(data)
|
||||
|
||||
-- Parse new signal
|
||||
local new_signal = cjson.decode(ARGV[1])
|
||||
|
||||
-- Check fingerprint deduplication
|
||||
local fingerprint = new_signal.fingerprint
|
||||
if fingerprint and fingerprint ~= cjson.null then
|
||||
for _, signal in ipairs(incident.signals) do
|
||||
if signal.fingerprint == fingerprint then
|
||||
-- Duplicate detected, return unchanged
|
||||
return data
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- Append signal atomically
|
||||
table.insert(incident.signals, new_signal)
|
||||
|
||||
-- Severity escalation (P0 < P1 < P2 < P3, lower index = more severe)
|
||||
local severity_order = {P0=0, P1=1, P2=2, P3=3}
|
||||
local new_sev = ARGV[2]
|
||||
local cur_sev = incident.severity
|
||||
if severity_order[new_sev] and severity_order[cur_sev] then
|
||||
if severity_order[new_sev] < severity_order[cur_sev] then
|
||||
incident.severity = new_sev
|
||||
end
|
||||
end
|
||||
|
||||
-- Update timestamp
|
||||
incident.updated_at = ARGV[3]
|
||||
|
||||
-- Serialize and save with TTL
|
||||
local new_data = cjson.encode(incident)
|
||||
redis.call('SET', KEYS[1], new_data, 'EX', tonumber(ARGV[4]))
|
||||
|
||||
return new_data
|
||||
"""
|
||||
|
||||
# Lua Script: 原子建立或聚合 Incident (完全消除 Race Condition)
|
||||
# KEYS[1] = namespace index key (incident:idx:ns:{ns})
|
||||
# KEYS[2] = target index key (incident:idx:target:{target})
|
||||
# ARGV[1] = new incident JSON (if creating)
|
||||
# ARGV[2] = new incident_id
|
||||
# ARGV[3] = new signal JSON
|
||||
# ARGV[4] = new severity string (P0/P1/P2/P3)
|
||||
# ARGV[5] = current timestamp ISO string
|
||||
# ARGV[6] = incident TTL seconds
|
||||
# ARGV[7] = index TTL seconds (aggregation window)
|
||||
# ARGV[8] = incident key prefix
|
||||
# Returns: "CREATED:{incident_json}" or "AGGREGATED:{incident_json}"
|
||||
LUA_CREATE_OR_AGGREGATE = """
|
||||
local ns_index_key = KEYS[1]
|
||||
local target_index_key = KEYS[2]
|
||||
local new_incident_json = ARGV[1]
|
||||
local new_incident_id = ARGV[2]
|
||||
local new_signal_json = ARGV[3]
|
||||
local new_severity = ARGV[4]
|
||||
local timestamp = ARGV[5]
|
||||
local incident_ttl = tonumber(ARGV[6])
|
||||
local index_ttl = tonumber(ARGV[7])
|
||||
local incident_key_prefix = ARGV[8]
|
||||
|
||||
-- Step 1: 嘗試搶佔 namespace 索引 (SETNX 原子操作)
|
||||
local ns_set_result = redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl, 'NX')
|
||||
|
||||
if ns_set_result then
|
||||
-- 我們是第一個!建立新 Incident
|
||||
local incident_key = incident_key_prefix .. new_incident_id
|
||||
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
|
||||
|
||||
-- 設置 target 索引
|
||||
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
|
||||
|
||||
return "CREATED:" .. new_incident_json
|
||||
end
|
||||
|
||||
-- Step 2: 索引已存在,查找現有 Incident ID
|
||||
local existing_incident_id = redis.call('GET', ns_index_key)
|
||||
if not existing_incident_id then
|
||||
-- 可能剛好過期,嘗試 target 索引
|
||||
existing_incident_id = redis.call('GET', target_index_key)
|
||||
end
|
||||
|
||||
if not existing_incident_id then
|
||||
-- 兩個索引都沒有,建立新的 (邊緣情況)
|
||||
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
|
||||
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
|
||||
|
||||
local incident_key = incident_key_prefix .. new_incident_id
|
||||
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
|
||||
|
||||
return "CREATED:" .. new_incident_json
|
||||
end
|
||||
|
||||
-- Step 3: 聚合到現有 Incident
|
||||
local incident_key = incident_key_prefix .. existing_incident_id
|
||||
local existing_data = redis.call('GET', incident_key)
|
||||
|
||||
if not existing_data then
|
||||
-- Incident 已過期但索引未過期,建立新的
|
||||
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
|
||||
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl)
|
||||
|
||||
local new_incident_key = incident_key_prefix .. new_incident_id
|
||||
redis.call('SET', new_incident_key, new_incident_json, 'EX', incident_ttl)
|
||||
|
||||
return "CREATED:" .. new_incident_json
|
||||
end
|
||||
|
||||
-- Step 4: 原子聚合 Signal
|
||||
local incident = cjson.decode(existing_data)
|
||||
local new_signal = cjson.decode(new_signal_json)
|
||||
|
||||
-- 修復 cjson 空陣列問題 (cjson 會把 [] 變成 {})
|
||||
if type(incident.proposal_ids) == "table" and next(incident.proposal_ids) == nil then
|
||||
incident.proposal_ids = cjson.empty_array
|
||||
end
|
||||
if type(incident.affected_services) == "table" and next(incident.affected_services) == nil then
|
||||
incident.affected_services = cjson.empty_array
|
||||
end
|
||||
|
||||
-- Fingerprint 去重
|
||||
local fingerprint = new_signal.fingerprint
|
||||
if fingerprint and fingerprint ~= cjson.null then
|
||||
for _, signal in ipairs(incident.signals) do
|
||||
if signal.fingerprint == fingerprint then
|
||||
return "AGGREGATED:" .. existing_data
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- 附加 Signal
|
||||
table.insert(incident.signals, new_signal)
|
||||
|
||||
-- Severity 升級
|
||||
local severity_order = {P0=0, P1=1, P2=2, P3=3}
|
||||
if severity_order[new_severity] and severity_order[incident.severity] then
|
||||
if severity_order[new_severity] < severity_order[incident.severity] then
|
||||
incident.severity = new_severity
|
||||
end
|
||||
end
|
||||
|
||||
-- 更新時間戳
|
||||
incident.updated_at = timestamp
|
||||
|
||||
-- 保存並返回
|
||||
local updated_json = cjson.encode(incident)
|
||||
redis.call('SET', incident_key, updated_json, 'EX', incident_ttl)
|
||||
|
||||
return "AGGREGATED:" .. updated_json
|
||||
"""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Protocol Interface (Phase 17 P1 - 紅區治理)
|
||||
@@ -265,45 +93,28 @@ class IncidentMemoryAdapter:
|
||||
"""
|
||||
Incident Memory Adapter - 實作 lewooogo-brain 的 IIncidentMemory Protocol
|
||||
|
||||
Phase 16 R1.3: 橋接現有 Lua Scripts + DualIncidentMemory 到新 IncidentEngine
|
||||
Phase 16 R1.3: 橋接 DualIncidentMemory 接口到 lewooogo-brain IncidentEngine
|
||||
Phase R-R2.1 (2026-04-01 ogt): 修復 save_incident key prefix 不一致
|
||||
- 全部委派給 self._memory (lewooogo-brain DualIncidentMemory)
|
||||
- 不再直接存取 Redis,確保 key prefix 一致性
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-03-26 (台北時區)
|
||||
建立者: Claude Code
|
||||
NOTE: self._memory 是 lewooogo-brain.adapters.incident_memory.DualIncidentMemory
|
||||
key_prefix = "awoooi:incidents" (由 get_incident_memory() 初始化時設定)
|
||||
"""
|
||||
|
||||
def __init__(self, memory: Any) -> None:
|
||||
self._memory = memory
|
||||
self._lua_create_sha: str | None = None
|
||||
self._lua_aggregate_sha: str | None = None
|
||||
|
||||
async def _ensure_lua_scripts(self) -> None:
|
||||
"""確保 Lua Scripts 已載入"""
|
||||
if self._lua_create_sha:
|
||||
return
|
||||
redis_client = get_redis()
|
||||
self._lua_create_sha = await redis_client.script_load(LUA_CREATE_OR_AGGREGATE)
|
||||
self._lua_aggregate_sha = await redis_client.script_load(LUA_AGGREGATE_SIGNAL)
|
||||
|
||||
async def load_incident(self, incident_id: str) -> Incident | None:
|
||||
async def load_incident(self, incident_id: str) -> Any:
|
||||
"""從 Working Memory 載入 Incident"""
|
||||
return await self._memory.load_incident(incident_id)
|
||||
|
||||
async def save_incident(
|
||||
self, incident: Incident, ttl_seconds: int = 604800
|
||||
) -> bool:
|
||||
"""儲存 Incident 到 Working Memory"""
|
||||
try:
|
||||
redis_client = get_redis()
|
||||
key = f"{INCIDENT_KEY_PREFIX}{incident.incident_id}"
|
||||
await redis_client.set(key, incident.model_dump_json(), ex=ttl_seconds)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception("save_incident_error", error=str(e))
|
||||
return False
|
||||
async def save_incident(self, incident: Any, ttl_seconds: int = 604800) -> bool:
|
||||
"""儲存 Incident 到 Working Memory (委派給 DualIncidentMemory,保持 key prefix 一致)"""
|
||||
return await self._memory.save_incident(incident, ttl_seconds)
|
||||
|
||||
async def persist_incident(self, incident: Incident) -> bool:
|
||||
"""持久化到 Episodic Memory (PostgreSQL)"""
|
||||
async def persist_incident(self, incident: Any) -> bool:
|
||||
"""持久化到 Episodic Memory (PostgreSQL,透過 IncidentDbAdapter)"""
|
||||
return await self._memory.persist_incident(incident)
|
||||
|
||||
async def find_related_incident(
|
||||
@@ -311,7 +122,7 @@ class IncidentMemoryAdapter:
|
||||
namespace: str,
|
||||
target: str,
|
||||
window_minutes: int = 30, # noqa: ARG002
|
||||
) -> Incident | None:
|
||||
) -> Any:
|
||||
"""尋找相關的活躍 Incident (用於聚合)"""
|
||||
redis_client = get_redis()
|
||||
|
||||
@@ -368,7 +179,7 @@ class BlastRadiusAdapter:
|
||||
self._graph = graph or topology_graph
|
||||
|
||||
def analyze(self, target: str) -> list[str]:
|
||||
"""分析受影響的服務列表"""
|
||||
"""分析受影響的服務列表 (外部依賴: topology_graph GraphRAG,失敗時降級返回 [target])"""
|
||||
try:
|
||||
result: BlastRadiusResult = self._graph.get_blast_radius(target)
|
||||
return result.affected_services
|
||||
@@ -390,18 +201,32 @@ def get_incident_engine() -> Any:
|
||||
|
||||
Phase R-R2: 統一使用 lewooogo-brain IncidentEngine。
|
||||
回滾方式: git revert Phase R-R2 commit + redeploy。
|
||||
|
||||
Raises:
|
||||
ImportError: lewooogo-brain 未安裝
|
||||
RuntimeError: 引擎初始化失敗
|
||||
"""
|
||||
global _new_incident_engine
|
||||
if _new_incident_engine is None:
|
||||
from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine
|
||||
try:
|
||||
from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine
|
||||
|
||||
memory_adapter = IncidentMemoryAdapter(get_incident_memory())
|
||||
blast_adapter = BlastRadiusAdapter()
|
||||
memory_adapter = IncidentMemoryAdapter(get_incident_memory())
|
||||
blast_adapter = BlastRadiusAdapter()
|
||||
|
||||
_new_incident_engine = NewIncidentEngine(
|
||||
memory=memory_adapter,
|
||||
blast_analyzer=blast_adapter,
|
||||
logger=logger,
|
||||
)
|
||||
logger.info("incident_engine_initialized", version="lewooogo-brain")
|
||||
|
||||
except ImportError as e:
|
||||
logger.error("lewooogo_brain_engines_not_available", error=str(e))
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.error("incident_engine_init_failed", error=str(e))
|
||||
raise
|
||||
|
||||
_new_incident_engine = NewIncidentEngine(
|
||||
memory=memory_adapter,
|
||||
blast_analyzer=blast_adapter,
|
||||
logger=logger,
|
||||
)
|
||||
logger.info("incident_engine_initialized", version="lewooogo-brain")
|
||||
return _new_incident_engine
|
||||
|
||||
@@ -120,7 +120,9 @@ class IncidentDbAdapter:
|
||||
logger.error("db_adapter_save_failed", incident_id=incident.incident_id, error=str(e))
|
||||
return False
|
||||
|
||||
def _record_to_incident(self, record: IncidentRecord) -> Incident:
|
||||
def _record_to_incident(self, record: IncidentRecord) -> Any:
|
||||
# 2026-04-01 ogt: 返回 BrainIncident (lewooogo-brain),非本地 Incident
|
||||
# 型別為 Any 避免靜態分析誤報,實際為 lewooogo_brain.interfaces.incident_processor.Incident
|
||||
"""將 DB Record 轉換為 Incident (lewooogo-brain 版本)"""
|
||||
from lewooogo_brain.interfaces.incident_processor import (
|
||||
Incident as BrainIncident,
|
||||
|
||||
Reference in New Issue
Block a user