fix(api): Phase R-R2.1 修復架構審查 P0+P1 問題

P0-01: IncidentDbAdapter._record_to_incident 返回型別標注為 Any
       (實際返回 BrainIncident,非本地 Incident,避免型別誤報)
P0-02: get_incident_engine() 加入 try/except ImportError 保護
       (仿照 get_incident_memory() 錯誤處理模式,確保可觀測性)
P1-01: 移除 IncidentMemoryAdapter 死碼 (-170 行 Lua scripts + _ensure_lua_scripts)
       (lewooogo-brain 不調用此方法,已確認)
P1-03: IncidentMemoryAdapter.save_incident() 委派給 self._memory
       (修復 key prefix 不一致: "incident:" vs "awoooi:incidents:")

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-31 22:15:06 +08:00
parent 67ef98e737
commit d17b67c823
2 changed files with 46 additions and 219 deletions

View File

@@ -6,8 +6,11 @@ Phase R-R2 (2026-04-01 ogt): 移除內嵌 IncidentEngine 重複邏輯,
全面切換至 lewooogo-brain IncidentEngine。
完整舊版本歸檔: src/services/_archived/incident_engine_v1.py
Phase R-R2.1 (2026-04-01 ogt): 修復 P0-02 ImportError 保護 + P1-01 死碼移除
+ P1-03 save_incident key prefix 不一致 (IncidentMemoryAdapter 委派給 self._memory)
架構:
- IncidentMemoryAdapter: 橋接現有 Lua Scripts + DualIncidentMemory
- IncidentMemoryAdapter: 橋接 DualIncidentMemory 接口到 lewooogo-brain IncidentEngine
- BlastRadiusAdapter: 包裝 topology_graph 注入 lewooogo-brain
- get_incident_engine(): 統一入口 (Singleton)
@@ -33,189 +36,14 @@ logger = structlog.get_logger(__name__)
# Constants
# =============================================================================
# Redis Key Patterns
INCIDENT_KEY_PREFIX = "incident:"
# Redis Key Patterns (索引 Key與 lewooogo-brain DualIncidentMemory 保持一致)
INCIDENT_INDEX_NS = "incident:idx:ns:" # namespace → incident_id
INCIDENT_INDEX_TARGET = "incident:idx:target:" # target → incident_id
# 聚合時間窗口: 30 分鐘
# 聚合時間窗口
AGGREGATION_WINDOW_MINUTES = 30
AGGREGATION_WINDOW_SECONDS = AGGREGATION_WINDOW_MINUTES * 60
# Working Memory TTL: 7 天 = 604800 秒
WORKING_MEMORY_TTL = 604800
# =============================================================================
# Lua Scripts (原子操作) - 供 IncidentMemoryAdapter 使用
# =============================================================================
# Lua Script: 原子聚合 Signal 到 Incident
# KEYS[1] = incident key (incident:{id})
# ARGV[1] = new signal JSON
# ARGV[2] = new severity string (P0/P1/P2/P3)
# ARGV[3] = current timestamp ISO string
# ARGV[4] = TTL seconds
# Returns: updated incident JSON or nil if not found
LUA_AGGREGATE_SIGNAL = """
local data = redis.call('GET', KEYS[1])
if not data then
return nil
end
local incident = cjson.decode(data)
-- Parse new signal
local new_signal = cjson.decode(ARGV[1])
-- Check fingerprint deduplication
local fingerprint = new_signal.fingerprint
if fingerprint and fingerprint ~= cjson.null then
for _, signal in ipairs(incident.signals) do
if signal.fingerprint == fingerprint then
-- Duplicate detected, return unchanged
return data
end
end
end
-- Append signal atomically
table.insert(incident.signals, new_signal)
-- Severity escalation (P0 < P1 < P2 < P3, lower index = more severe)
local severity_order = {P0=0, P1=1, P2=2, P3=3}
local new_sev = ARGV[2]
local cur_sev = incident.severity
if severity_order[new_sev] and severity_order[cur_sev] then
if severity_order[new_sev] < severity_order[cur_sev] then
incident.severity = new_sev
end
end
-- Update timestamp
incident.updated_at = ARGV[3]
-- Serialize and save with TTL
local new_data = cjson.encode(incident)
redis.call('SET', KEYS[1], new_data, 'EX', tonumber(ARGV[4]))
return new_data
"""
# Lua Script: 原子建立或聚合 Incident (完全消除 Race Condition)
# KEYS[1] = namespace index key (incident:idx:ns:{ns})
# KEYS[2] = target index key (incident:idx:target:{target})
# ARGV[1] = new incident JSON (if creating)
# ARGV[2] = new incident_id
# ARGV[3] = new signal JSON
# ARGV[4] = new severity string (P0/P1/P2/P3)
# ARGV[5] = current timestamp ISO string
# ARGV[6] = incident TTL seconds
# ARGV[7] = index TTL seconds (aggregation window)
# ARGV[8] = incident key prefix
# Returns: "CREATED:{incident_json}" or "AGGREGATED:{incident_json}"
LUA_CREATE_OR_AGGREGATE = """
local ns_index_key = KEYS[1]
local target_index_key = KEYS[2]
local new_incident_json = ARGV[1]
local new_incident_id = ARGV[2]
local new_signal_json = ARGV[3]
local new_severity = ARGV[4]
local timestamp = ARGV[5]
local incident_ttl = tonumber(ARGV[6])
local index_ttl = tonumber(ARGV[7])
local incident_key_prefix = ARGV[8]
-- Step 1: 嘗試搶佔 namespace 索引 (SETNX 原子操作)
local ns_set_result = redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl, 'NX')
if ns_set_result then
-- 我們是第一個!建立新 Incident
local incident_key = incident_key_prefix .. new_incident_id
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
-- 設置 target 索引
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
return "CREATED:" .. new_incident_json
end
-- Step 2: 索引已存在,查找現有 Incident ID
local existing_incident_id = redis.call('GET', ns_index_key)
if not existing_incident_id then
-- 可能剛好過期,嘗試 target 索引
existing_incident_id = redis.call('GET', target_index_key)
end
if not existing_incident_id then
-- 兩個索引都沒有,建立新的 (邊緣情況)
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl, 'NX')
local incident_key = incident_key_prefix .. new_incident_id
redis.call('SET', incident_key, new_incident_json, 'EX', incident_ttl)
return "CREATED:" .. new_incident_json
end
-- Step 3: 聚合到現有 Incident
local incident_key = incident_key_prefix .. existing_incident_id
local existing_data = redis.call('GET', incident_key)
if not existing_data then
-- Incident 已過期但索引未過期,建立新的
redis.call('SET', ns_index_key, new_incident_id, 'EX', index_ttl)
redis.call('SET', target_index_key, new_incident_id, 'EX', index_ttl)
local new_incident_key = incident_key_prefix .. new_incident_id
redis.call('SET', new_incident_key, new_incident_json, 'EX', incident_ttl)
return "CREATED:" .. new_incident_json
end
-- Step 4: 原子聚合 Signal
local incident = cjson.decode(existing_data)
local new_signal = cjson.decode(new_signal_json)
-- 修復 cjson 空陣列問題 (cjson 會把 [] 變成 {})
if type(incident.proposal_ids) == "table" and next(incident.proposal_ids) == nil then
incident.proposal_ids = cjson.empty_array
end
if type(incident.affected_services) == "table" and next(incident.affected_services) == nil then
incident.affected_services = cjson.empty_array
end
-- Fingerprint 去重
local fingerprint = new_signal.fingerprint
if fingerprint and fingerprint ~= cjson.null then
for _, signal in ipairs(incident.signals) do
if signal.fingerprint == fingerprint then
return "AGGREGATED:" .. existing_data
end
end
end
-- 附加 Signal
table.insert(incident.signals, new_signal)
-- Severity 升級
local severity_order = {P0=0, P1=1, P2=2, P3=3}
if severity_order[new_severity] and severity_order[incident.severity] then
if severity_order[new_severity] < severity_order[incident.severity] then
incident.severity = new_severity
end
end
-- 更新時間戳
incident.updated_at = timestamp
-- 保存並返回
local updated_json = cjson.encode(incident)
redis.call('SET', incident_key, updated_json, 'EX', incident_ttl)
return "AGGREGATED:" .. updated_json
"""
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
@@ -265,45 +93,28 @@ class IncidentMemoryAdapter:
"""
Incident Memory Adapter - 實作 lewooogo-brain 的 IIncidentMemory Protocol
Phase 16 R1.3: 橋接現有 Lua Scripts + DualIncidentMemory 到新 IncidentEngine
Phase 16 R1.3: 橋接 DualIncidentMemory 接口到 lewooogo-brain IncidentEngine
Phase R-R2.1 (2026-04-01 ogt): 修復 save_incident key prefix 不一致
- 全部委派給 self._memory (lewooogo-brain DualIncidentMemory)
- 不再直接存取 Redis確保 key prefix 一致性
版本: v1.0
建立: 2026-03-26 (台北時區)
建立者: Claude Code
NOTE: self._memory 是 lewooogo-brain.adapters.incident_memory.DualIncidentMemory
key_prefix = "awoooi:incidents" (由 get_incident_memory() 初始化時設定)
"""
def __init__(self, memory: Any) -> None:
self._memory = memory
self._lua_create_sha: str | None = None
self._lua_aggregate_sha: str | None = None
async def _ensure_lua_scripts(self) -> None:
"""確保 Lua Scripts 已載入"""
if self._lua_create_sha:
return
redis_client = get_redis()
self._lua_create_sha = await redis_client.script_load(LUA_CREATE_OR_AGGREGATE)
self._lua_aggregate_sha = await redis_client.script_load(LUA_AGGREGATE_SIGNAL)
async def load_incident(self, incident_id: str) -> Incident | None:
async def load_incident(self, incident_id: str) -> Any:
"""從 Working Memory 載入 Incident"""
return await self._memory.load_incident(incident_id)
async def save_incident(
self, incident: Incident, ttl_seconds: int = 604800
) -> bool:
"""儲存 Incident 到 Working Memory"""
try:
redis_client = get_redis()
key = f"{INCIDENT_KEY_PREFIX}{incident.incident_id}"
await redis_client.set(key, incident.model_dump_json(), ex=ttl_seconds)
return True
except Exception as e:
logger.exception("save_incident_error", error=str(e))
return False
async def save_incident(self, incident: Any, ttl_seconds: int = 604800) -> bool:
"""儲存 Incident 到 Working Memory (委派給 DualIncidentMemory保持 key prefix 一致)"""
return await self._memory.save_incident(incident, ttl_seconds)
async def persist_incident(self, incident: Incident) -> bool:
"""持久化到 Episodic Memory (PostgreSQL)"""
async def persist_incident(self, incident: Any) -> bool:
"""持久化到 Episodic Memory (PostgreSQL,透過 IncidentDbAdapter)"""
return await self._memory.persist_incident(incident)
async def find_related_incident(
@@ -311,7 +122,7 @@ class IncidentMemoryAdapter:
namespace: str,
target: str,
window_minutes: int = 30, # noqa: ARG002
) -> Incident | None:
) -> Any:
"""尋找相關的活躍 Incident (用於聚合)"""
redis_client = get_redis()
@@ -368,7 +179,7 @@ class BlastRadiusAdapter:
self._graph = graph or topology_graph
def analyze(self, target: str) -> list[str]:
"""分析受影響的服務列表"""
"""分析受影響的服務列表 (外部依賴: topology_graph GraphRAG失敗時降級返回 [target])"""
try:
result: BlastRadiusResult = self._graph.get_blast_radius(target)
return result.affected_services
@@ -390,18 +201,32 @@ def get_incident_engine() -> Any:
Phase R-R2: 統一使用 lewooogo-brain IncidentEngine。
回滾方式: git revert Phase R-R2 commit + redeploy。
Raises:
ImportError: lewooogo-brain 未安裝
RuntimeError: 引擎初始化失敗
"""
global _new_incident_engine
if _new_incident_engine is None:
from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine
try:
from lewooogo_brain.engines import IncidentEngine as NewIncidentEngine
memory_adapter = IncidentMemoryAdapter(get_incident_memory())
blast_adapter = BlastRadiusAdapter()
memory_adapter = IncidentMemoryAdapter(get_incident_memory())
blast_adapter = BlastRadiusAdapter()
_new_incident_engine = NewIncidentEngine(
memory=memory_adapter,
blast_analyzer=blast_adapter,
logger=logger,
)
logger.info("incident_engine_initialized", version="lewooogo-brain")
except ImportError as e:
logger.error("lewooogo_brain_engines_not_available", error=str(e))
raise
except Exception as e:
logger.error("incident_engine_init_failed", error=str(e))
raise
_new_incident_engine = NewIncidentEngine(
memory=memory_adapter,
blast_analyzer=blast_adapter,
logger=logger,
)
logger.info("incident_engine_initialized", version="lewooogo-brain")
return _new_incident_engine

View File

@@ -120,7 +120,9 @@ class IncidentDbAdapter:
logger.error("db_adapter_save_failed", incident_id=incident.incident_id, error=str(e))
return False
def _record_to_incident(self, record: IncidentRecord) -> Incident:
def _record_to_incident(self, record: IncidentRecord) -> Any:
# 2026-04-01 ogt: 返回 BrainIncident (lewooogo-brain),非本地 Incident
# 型別為 Any 避免靜態分析誤報,實際為 lewooogo_brain.interfaces.incident_processor.Incident
"""將 DB Record 轉換為 Incident (lewooogo-brain 版本)"""
from lewooogo_brain.interfaces.incident_processor import (
Incident as BrainIncident,