Files
awoooi/apps/api/src/services/incident_service.py
OG T 89f0bae3f2
Some checks failed
E2E Health Check / e2e-health (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(safety-net): complete wave 1 atomicity (adr-038, adr-039, debounce, graceful degrade, xclaim)
2026-03-29 23:55:38 +08:00

763 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Incident Service - Phase 6.2 雙層記憶寫入
==========================================
功能:
- Working Memory (Redis): 活躍事件7 天 TTL
- Episodic Memory (PostgreSQL): 歷史事件,永久保留
設計原則:
- 先寫 Redis (快),再寫 PostgreSQL (持久)
- 兩者都成功才算完成
- 失敗時記錄日誌但不中斷主流程
統帥鐵律:
- 禁止硬編碼 IP 或密碼,嚴格讀取 .env
- 所有寫入操作都必須有結構化日誌
"""
import json
from datetime import UTC, datetime
from typing import Any, Literal
import structlog
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from src.models.incident import (
Incident,
IncidentStatus,
Severity,
Signal,
)
logger = structlog.get_logger(__name__)
# =============================================================================
# Legacy Value Normalization (方案 C - 代碼相容舊格式)
# =============================================================================
# 問題: Redis 有舊 Enum 值 (status='open', severity='critical')
# 解法: 解析時正規化,不動 Redis 資料
# 回滾: git revert (秒級恢復)
# =============================================================================
def normalize_status(value: str) -> str:
"""
正規化 IncidentStatus 舊格式值
舊值 → 新值:
- 'open''investigating'
"""
legacy_map = {
"open": "investigating",
}
return legacy_map.get(value, value)
def normalize_severity(value: str) -> str:
"""
正規化 Severity 舊格式值
舊值 → 新值:
- 'critical''P0'
- 'high''P1'
- 'warning''P2'
- 'medium''P2'
- 'info''P3'
- 'low''P3'
- 'none''P3'
"""
legacy_map = {
"critical": "P0",
"high": "P1",
"warning": "P2",
"medium": "P2",
"info": "P3",
"low": "P3",
"none": "P3",
}
return legacy_map.get(value, value)
# =============================================================================
# Constants
# =============================================================================
# Redis Key Prefix
INCIDENT_KEY_PREFIX = "incident:"
# Working Memory TTL: 7 天 = 604800 秒
WORKING_MEMORY_TTL = 604800
# =============================================================================
# Incident Service
# =============================================================================
class IncidentService:
"""
雙層記憶服務
職責:
1. Working Memory (Redis): 活躍事件快取
2. Episodic Memory (PostgreSQL): 歷史事件持久化
使用方式:
service = IncidentService()
incident = await service.create_incident_from_signal(signal_data)
"""
# =========================================================================
# Working Memory (Redis)
# =========================================================================
async def save_to_working_memory(self, incident: Incident) -> bool:
"""
將 Incident 寫入 Working Memory (Redis)
使用 Redis Hash 儲存Key 格式: incident:{incident_id}
TTL: 7 天 (604800 秒)
Returns:
bool: 是否成功寫入
"""
redis_client = get_redis()
key = f"{INCIDENT_KEY_PREFIX}{incident.incident_id}"
try:
# 序列化為 JSON
incident_json = incident.model_dump_json()
# SET with TTL
await redis_client.set(
key,
incident_json,
ex=WORKING_MEMORY_TTL,
)
logger.info(
"working_memory_saved",
incident_id=incident.incident_id,
key=key,
ttl_seconds=WORKING_MEMORY_TTL,
)
return True
except Exception as e:
logger.exception(
"working_memory_save_error",
incident_id=incident.incident_id,
error=str(e),
)
return False
async def get_from_working_memory(self, incident_id: str) -> Incident | None:
"""
從 Working Memory 讀取 Incident
方案 C: 解析時正規化舊格式 Enum 值
Returns:
Incident | None: 事件資料,若不存在則返回 None
"""
redis_client = get_redis()
key = f"{INCIDENT_KEY_PREFIX}{incident_id}"
try:
data = await redis_client.get(key)
if data is None:
return None
# 方案 C: 正規化舊格式 Enum 值
incident_dict = json.loads(data)
if "status" in incident_dict:
incident_dict["status"] = normalize_status(incident_dict["status"])
if "severity" in incident_dict:
incident_dict["severity"] = normalize_severity(incident_dict["severity"])
# 同時正規化 signals 內的 severity
for signal in incident_dict.get("signals", []):
if "severity" in signal:
signal["severity"] = normalize_severity(signal["severity"])
return Incident.model_validate(incident_dict)
except Exception as e:
logger.exception(
"working_memory_get_error",
incident_id=incident_id,
error=str(e),
)
return None
async def get_active_incidents(self) -> list[Incident]:
"""
列出所有活躍的 Incidents (從 Working Memory)
方案 C: 解析時正規化舊格式 Enum 值
Returns:
list[Incident]: 活躍事件列表 (investigating 或 mitigating)
"""
redis_client = get_redis()
incidents: list[Incident] = []
try:
# SCAN 所有 incident:* keys
async for key in redis_client.scan_iter(
match=f"{INCIDENT_KEY_PREFIX}*",
count=100,
):
# 排除索引 keys
if ":idx:" in key:
continue
data = await redis_client.get(key)
if data is None:
continue
try:
# 方案 C: 正規化舊格式 Enum 值
incident_dict = json.loads(data)
if "status" in incident_dict:
incident_dict["status"] = normalize_status(incident_dict["status"])
if "severity" in incident_dict:
incident_dict["severity"] = normalize_severity(incident_dict["severity"])
# 正規化 signals 內的 severity
for signal in incident_dict.get("signals", []):
if "severity" in signal:
signal["severity"] = normalize_severity(signal["severity"])
incident = Incident.model_validate(incident_dict)
# 只返回活躍狀態的 Incident
if incident.status in (
IncidentStatus.INVESTIGATING,
IncidentStatus.MITIGATING,
):
incidents.append(incident)
except Exception as e:
logger.warning(
"incident_parse_error",
key=key,
error=str(e),
)
continue
logger.info(
"get_active_incidents",
count=len(incidents),
)
return incidents
except Exception as e:
logger.exception(
"get_active_incidents_error",
error=str(e),
)
return []
# =========================================================================
# Episodic Memory (PostgreSQL)
# =========================================================================
async def save_to_episodic_memory(self, incident: Incident) -> bool:
"""
將 Incident 寫入 Episodic Memory (PostgreSQL)
使用 SQLAlchemy async session 寫入 incidents 表。
Returns:
bool: 是否成功寫入
"""
try:
async with get_db_context() as db:
# 轉換為 SQLAlchemy model
# 使用 model_dump(mode="json") 確保 datetime 正確序列化
record = IncidentRecord(
incident_id=incident.incident_id,
status=incident.status.value,
severity=incident.severity.value,
signals=[
s.model_dump(mode="json") for s in incident.signals
],
affected_services=incident.affected_services,
decision_chain=(
incident.decision_chain.model_dump(mode="json")
if incident.decision_chain
else None
),
proposal_ids=[str(pid) for pid in incident.proposal_ids],
outcome=(
incident.outcome.model_dump(mode="json")
if incident.outcome
else None
),
created_at=incident.created_at,
updated_at=incident.updated_at,
resolved_at=incident.resolved_at,
closed_at=incident.closed_at,
ttl_days=incident.ttl_days,
vectorized=incident.vectorized,
)
db.add(record)
# commit 由 get_db_context 自動處理
logger.info(
"episodic_memory_saved",
incident_id=incident.incident_id,
table="incidents",
)
return True
except Exception as e:
logger.exception(
"episodic_memory_save_error",
incident_id=incident.incident_id,
error=str(e),
)
return False
async def get_from_episodic_memory(self, incident_id: str) -> Incident | None:
"""
從 Episodic Memory 讀取 Incident
Returns:
Incident | None: 事件資料,若不存在則返回 None
"""
try:
async with get_db_context() as db:
from sqlalchemy import select
stmt = select(IncidentRecord).where(
IncidentRecord.incident_id == incident_id
)
result = await db.execute(stmt)
record = result.scalar_one_or_none()
if record is None:
return None
# 轉換回 Pydantic model
return self._record_to_incident(record)
except Exception as e:
logger.exception(
"episodic_memory_get_error",
incident_id=incident_id,
error=str(e),
)
return None
def _record_to_incident(self, record: IncidentRecord) -> Incident:
"""
將 SQLAlchemy record 轉換為 Pydantic Incident
方案 C: 解析時正規化舊格式 Enum 值
"""
from src.models.incident import AIDecisionChain, IncidentOutcome
# 方案 C: 正規化 signals 內的舊格式 severity
signals = []
for s in (record.signals or []):
signal_data = s.copy()
if "severity" in signal_data:
signal_data["severity"] = normalize_severity(signal_data["severity"])
signals.append(Signal(**signal_data))
decision_chain = (
AIDecisionChain(**record.decision_chain)
if record.decision_chain
else None
)
outcome = (
IncidentOutcome(**record.outcome)
if record.outcome
else None
)
# 方案 C: 正規化舊格式 Enum 值
normalized_status = normalize_status(record.status)
normalized_severity = normalize_severity(record.severity)
return Incident(
incident_id=record.incident_id,
status=IncidentStatus(normalized_status),
severity=Severity(normalized_severity),
signals=signals,
affected_services=record.affected_services or [],
decision_chain=decision_chain,
proposal_ids=record.proposal_ids or [],
outcome=outcome,
created_at=record.created_at,
updated_at=record.updated_at,
resolved_at=record.resolved_at,
closed_at=record.closed_at,
ttl_days=record.ttl_days,
persisted_to_pg=True, # 從 PG 讀取,必為 True
vectorized=record.vectorized,
)
# =========================================================================
# 雙層寫入核心邏輯
# =========================================================================
async def create_incident_from_signal(
self,
signal_data: dict[str, Any],
frequency_stats: dict[str, Any] | None = None,
) -> Incident | None:
"""
從 Signal 建立 Incident 並雙層寫入
Phase 6.2 核心邏輯:
1. 建立 Incident (含 Signal)
2. 寫入 Working Memory (Redis) - 7 天 TTL
3. 寫入 Episodic Memory (PostgreSQL) - 永久保留
4. 標記 persisted_to_pg = True
Phase 21 (ADR-037) 擴展:
5. 含異常頻率統計 (用於 Tier 分級修復策略)
Args:
signal_data: 從 Redis Stream 收到的 Signal 資料
frequency_stats: ADR-037 異常頻率統計 (可選)
Returns:
Incident | None: 成功返回 Incident失敗返回 None
"""
try:
# 0. 去抖動 (Debounce) - 防止告警風暴
fingerprint = signal_data.get("fingerprint")
if fingerprint:
try:
redis_client = get_redis()
debounce_key = f"debounce:{fingerprint}"
# SETNX 若成功表示是新的,給予 3 分鐘 TTL (180s)
is_new = await redis_client.set(debounce_key, "1", ex=180, nx=True)
if not is_new:
logger.info(
"incident_debounced",
fingerprint=fingerprint,
reason="Duplicate signal within 3 minutes",
)
return None
except Exception as e:
logger.warning("incident_debounce_redis_error", error=str(e))
# 1. 解析 Signal
signal = Signal(
alert_name=signal_data.get("alert_name", "unknown"),
severity=self._parse_severity(signal_data.get("severity", "warning")),
source=self._parse_source(signal_data.get("source", "manual")),
fired_at=datetime.now(UTC),
labels=self._parse_dict(signal_data.get("labels", "{}")),
annotations=self._parse_dict(signal_data.get("annotations", "{}")),
fingerprint=signal_data.get("fingerprint"),
)
# 2. 建立 Incident (含頻率統計)
# ADR-037: 統帥指示「重啟只是治標,太常發生的異常必須徹底解決」
from src.models.incident import IncidentFrequencyStats
freq_stats = None
if frequency_stats:
freq_stats = IncidentFrequencyStats(
anomaly_key=frequency_stats.get("anomaly_key", "unknown"),
count_1h=frequency_stats.get("count_1h", 0),
count_24h=frequency_stats.get("count_24h", 0),
count_7d=frequency_stats.get("count_7d", 0),
count_30d=frequency_stats.get("count_30d", 0),
escalation_level=frequency_stats.get("escalation_level"),
auto_repair_count=frequency_stats.get("auto_repair_count", 0),
)
incident = Incident(
severity=signal.severity,
signals=[signal],
affected_services=[signal_data.get("target", "unknown")],
frequency_stats=freq_stats,
)
logger.info(
"incident_created",
incident_id=incident.incident_id,
severity=incident.severity.value,
signal_count=len(incident.signals),
)
# 3. 寫入 Working Memory (Redis)
redis_success = await self.save_to_working_memory(incident)
# 4. 寫入 Episodic Memory (PostgreSQL)
pg_success = await self.save_to_episodic_memory(incident)
# 5. 更新狀態
if pg_success:
incident.persisted_to_pg = True
# 更新 Redis 中的狀態
if redis_success:
await self.save_to_working_memory(incident)
# 6. 記錄雙層寫入結果
logger.info(
"dual_layer_memory_result",
incident_id=incident.incident_id,
redis_success=redis_success,
pg_success=pg_success,
persisted_to_pg=incident.persisted_to_pg,
)
return incident
except Exception as e:
logger.exception(
"create_incident_error",
error=str(e),
)
return None
def _parse_source(
self,
source_str: str,
) -> Literal["prometheus", "signoz", "alertmanager", "manual", "telegram"]:
"""
解析來源字串,映射到 Signal 允許的 Literal 值
不在白名單中的來源一律映射為 'manual'
"""
valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"}
if source_str.lower() in valid_sources:
return source_str.lower() # type: ignore
return "manual"
def _parse_severity(self, severity_str: str) -> Severity:
"""解析嚴重度字串"""
mapping = {
"critical": Severity.P0,
"high": Severity.P1,
"warning": Severity.P2,
"medium": Severity.P2,
"low": Severity.P3,
"info": Severity.P3,
}
return mapping.get(severity_str.lower(), Severity.P2)
def _parse_dict(self, value: str | dict) -> dict[str, str]:
"""解析字典字串或字典"""
if isinstance(value, dict):
return {str(k): str(v) for k, v in value.items()}
if isinstance(value, str):
try:
# 嘗試解析 JSON
parsed = json.loads(value.replace("'", '"'))
return {str(k): str(v) for k, v in parsed.items()}
except (json.JSONDecodeError, TypeError):
return {}
return {}
# =========================================================================
# Phase 17 P0: Router 層違規修復 - 新增方法
# =========================================================================
async def update_outcome(
self,
incident_id: str,
effectiveness_score: int | None = None,
human_feedback: str | None = None,
learning_notes: str | None = None,
should_remember: bool = True,
) -> Incident | None:
"""
更新 Incident 的 outcome (人類回饋)
Phase 17: 從 Router 層遷移至 Service 層
Args:
incident_id: 事件 ID
effectiveness_score: 有效性評分 (1-5)
human_feedback: 文字回饋
learning_notes: 學習筆記
should_remember: 是否納入長期記憶
Returns:
Incident | None: 更新後的事件,失敗返回 None
"""
from src.models.incident import IncidentOutcome
from src.repositories.incident_repository import get_incident_repository
from src.utils.timezone import now_taipei
# 1. 從 Working Memory 讀取
incident = await self.get_from_working_memory(incident_id)
if incident is None:
logger.warning("incident_not_found_for_outcome", incident_id=incident_id)
return None
# 2. 更新 outcome
if incident.outcome is None:
incident.outcome = IncidentOutcome()
if effectiveness_score is not None:
incident.outcome.effectiveness_score = effectiveness_score
if human_feedback is not None:
incident.outcome.human_feedback = human_feedback
if learning_notes is not None:
incident.outcome.learning_notes = learning_notes
incident.outcome.should_remember = should_remember
incident.updated_at = now_taipei()
# 3. 寫入 Working Memory
redis_success = await self.save_to_working_memory(incident)
if not redis_success:
logger.error("outcome_redis_write_failed", incident_id=incident_id)
return None
# 4. 同步到 Episodic Memory (PostgreSQL)
try:
repo = get_incident_repository()
await repo.update_outcome(
incident_id=incident_id,
outcome=incident.outcome.model_dump(mode="json"),
updated_at=now_taipei(),
)
logger.info("outcome_db_updated", incident_id=incident_id)
except Exception as e:
logger.warning(
"outcome_db_update_failed",
incident_id=incident_id,
error=str(e),
)
# DB 失敗不影響主流程
return incident
async def resolve_incident(self, incident_id: str) -> Incident | None:
"""
將 Incident 狀態更新為 RESOLVED
Phase 17: 從 Router 層遷移至 Service 層
Args:
incident_id: 事件 ID
Returns:
Incident | None: 更新後的事件,失敗返回 None
"""
from src.repositories.incident_repository import get_incident_repository
from src.utils.timezone import now_taipei
# 1. 從 Working Memory 讀取
incident = await self.get_from_working_memory(incident_id)
if incident is None:
logger.warning("incident_not_found_for_resolve", incident_id=incident_id)
return None
# 2. 更新狀態
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = now_taipei()
incident.updated_at = now_taipei()
# 3. 寫入 Working Memory
redis_success = await self.save_to_working_memory(incident)
if not redis_success:
logger.error("resolve_redis_write_failed", incident_id=incident_id)
return None
# 4. 同步到 Episodic Memory
try:
repo = get_incident_repository()
await repo.update_status(
incident_id=incident_id,
status="resolved",
updated_at=now_taipei(),
)
logger.info("resolve_db_updated", incident_id=incident_id)
except Exception as e:
logger.warning(
"resolve_db_update_failed",
incident_id=incident_id,
error=str(e),
)
return incident
async def find_by_proposal_id(self, proposal_id: str) -> Incident | None:
"""
根據 proposal_id 查找關聯的 Incident
Phase 17: 從 Router 層遷移至 Service 層
Args:
proposal_id: 提案 ID (UUID 字串)
Returns:
Incident | None: 找到的事件,未找到返回 None
"""
from uuid import UUID
redis_client = get_redis()
try:
target_uuid = UUID(proposal_id)
async for key in redis_client.scan_iter(
match=f"{INCIDENT_KEY_PREFIX}INC-*",
count=100,
):
data = await redis_client.get(key)
if data is None:
continue
try:
# 方案 C: 正規化舊格式 Enum 值
incident_dict = json.loads(data)
if "status" in incident_dict:
incident_dict["status"] = normalize_status(incident_dict["status"])
if "severity" in incident_dict:
incident_dict["severity"] = normalize_severity(incident_dict["severity"])
for signal in incident_dict.get("signals", []):
if "severity" in signal:
signal["severity"] = normalize_severity(signal["severity"])
incident = Incident.model_validate(incident_dict)
if target_uuid in incident.proposal_ids:
return incident
except Exception as e:
logger.warning(
"incident_parse_error_in_find",
key=key,
error=str(e),
)
continue
return None
except Exception as e:
logger.exception(
"find_by_proposal_id_error",
proposal_id=proposal_id,
error=str(e),
)
return None
# =============================================================================
# Singleton
# =============================================================================
_incident_service: IncidentService | None = None
def get_incident_service() -> IncidentService:
"""取得 Incident Service 實例 (Singleton)"""
global _incident_service
if _incident_service is None:
_incident_service = IncidentService()
return _incident_service