Files
awoooi/apps/api/src/services/incident_service.py
OG T 670cd5df86
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running
refactor(flywheel): 首席架構師審查修正 C1/C2/I1/I2/I3/I4/M1
C1 — Repository 層修正 (積木化鐵律):
  新增 PlaybookEmbeddingRepository (pgvector UPSERT)
  playbook_embedding_service 改透過 Repository 存取 DB,不再直接 db.execute(text(...))

C2 — Router 層業務邏輯移入 Service 層:
  create_incident_for_approval + extract_affected_services (去掉底線前綴) 移入 incident_service.py
  webhooks.py 改從 incident_service import,自身不再含業務邏輯

I1 — _infra_jobs 提升為 module-level frozenset (_INFRA_JOB_NAMES),避免每次呼叫重建

I2 — _persist_embeddings_to_db 補齊 PlaybookRAGService / list[Playbook] 型別標注

I3 — embedding 格式顯式化: "[" + ",".join(str(float(x)) for x in embedding) + "]"
  防止 pgvector 因格式差異靜默解析失敗

I4 — import asyncio 移至 main.py 頂層,移除 try 區塊內重複 import

M1 — similarity.py: 移除死代碼 `if union > 0 else 0.0`
  union 在兩個集合都非空時不可能為 0

2026-04-10 Asia/Taipei — Claude Sonnet 4.6
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 11:35:10 +08:00

1014 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Incident Service - Phase 6.2 雙層記憶寫入
==========================================
功能:
- Working Memory (Redis): 活躍事件7 天 TTL
- Episodic Memory (PostgreSQL): 歷史事件,永久保留
設計原則:
- 先寫 Redis (快),再寫 PostgreSQL (持久)
- 兩者都成功才算完成
- 失敗時記錄日誌但不中斷主流程
統帥鐵律:
- 禁止硬編碼 IP 或密碼,嚴格讀取 .env
- 所有寫入操作都必須有結構化日誌
C2 修正 (首席架構師審查 2026-04-10 Claude Sonnet 4.6 Asia/Taipei):
create_incident_for_approval + _extract_affected_services 從 Router 層移入此 Service 層
原違規: 業務邏輯 (Severity 映射, Signal 建立, Incident 建立) 放在 api/v1/webhooks.py
"""
import json
from datetime import UTC, datetime
from typing import Any, Literal
from uuid import UUID
import structlog
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.db.models import IncidentRecord
from src.models.incident import (
Incident,
IncidentStatus,
Severity,
Signal,
)
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# =============================================================================
# C2 修正: 從 webhooks.py 遷入的業務邏輯
# 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
# =============================================================================
# 風險等級 → 事件嚴重度映射 (原在 webhooks.py)
_RISK_TO_SEVERITY = {
"critical": Severity.P0,
"high": Severity.P1,
"medium": Severity.P2,
"low": Severity.P3,
}
# I1 修正: 提升為 module-level frozenset避免每次呼叫重建 (原在 webhooks.py 函數體內)
_INFRA_JOB_NAMES: frozenset[str] = frozenset(
j.lower().replace("-", "").replace("_", "")
for j in {"node", "node-exporter", "pushgateway", "blackbox",
"prometheus", "alertmanager", "cadvisor"}
)
def extract_affected_services(labels: dict, target_resource: str) -> list[str]:
"""
從告警 labels 提取真實服務名,防止 IP 或 alertname 污染 affected_services。
優先序:
1. component labelDocker-compose 層告警最可靠)
2. job label排除 node-exporter / pushgateway 等基礎設施 job
3. pod label取 deployment name去掉 hash suffix
4. target_resource不含冒號、不等於 alertname 時才採用)
5. 空列表(讓通用型 Playbook 透過空集合豁免規則匹配)
Phase 1 飛輪修復 — 2026-04-10 Claude Sonnet 4.6 Asia/Taipei
C2 修正: 從 api/v1/webhooks.py 移入 Service 層(純業務邏輯,無 I/O
"""
alertname = labels.get("alertname", "")
if comp := labels.get("component"):
return [comp]
if job := labels.get("job"):
normalized = job.lower().replace("-", "").replace("_", "")
if normalized not in _INFRA_JOB_NAMES:
return [job]
if pod := labels.get("pod"):
parts = pod.rsplit("-", 2)
if len(parts) >= 3 and len(parts[-1]) == 5 and len(parts[-2]) in (9, 10):
return [parts[0]]
elif len(parts) >= 2:
return ["-".join(parts[:-1])]
if (target_resource
and ":" not in target_resource
and target_resource != alertname
and not target_resource[0].isdigit()):
return [target_resource]
return []
async def create_incident_for_approval(
approval_id: str,
risk_level: str,
target_resource: str,
namespace: str,
alert_type: str,
message: str,
source: str = "alertmanager",
alertname: str | None = None,
alert_labels: dict | None = None,
) -> str:
"""
為 Approval 創建對應的 Incident (活躍事件同步)。
設計原則:
- Approval 和 Incident 必須同時存在
- Incident 存入 Redis (Working Memory) + PostgreSQL (Episodic Memory)
- 7 天 TTL 自動過期
C2 修正: 從 api/v1/webhooks.py 移入 Service 層(業務邏輯不屬 Router 層)
Returns:
str: Incident ID
"""
incident_service = get_incident_service()
severity = _RISK_TO_SEVERITY.get(risk_level.lower(), Severity.P2)
_labels: dict = {
"namespace": namespace,
"resource": target_resource,
"alertname": alertname or alert_type,
**(alert_labels or {}),
}
signal = Signal(
alert_name=alertname or alert_type,
severity=severity,
source=source,
fired_at=now_taipei(),
labels=_labels,
annotations={"message": message},
)
_affected_services = extract_affected_services(_labels, target_resource)
incident = Incident(
status=IncidentStatus.INVESTIGATING,
severity=severity,
signals=[signal],
affected_services=_affected_services,
proposal_ids=[UUID(approval_id)],
)
await incident_service.save_to_working_memory(incident)
try:
await incident_service.save_to_episodic_memory(incident)
except Exception as _pg_err:
logger.warning(
"incident_episodic_memory_failed",
incident_id=incident.incident_id,
error=str(_pg_err),
)
logger.info(
"incident_created_for_approval",
incident_id=incident.incident_id,
approval_id=approval_id,
severity=severity.value,
target=target_resource,
)
return incident.incident_id
# =============================================================================
# Legacy Value Normalization (方案 C - 代碼相容舊格式)
# =============================================================================
# 問題: Redis 有舊 Enum 值 (status='open', severity='critical')
# 解法: 解析時正規化,不動 Redis 資料
# 回滾: git revert (秒級恢復)
# =============================================================================
def normalize_status(value: str) -> str:
"""
正規化 IncidentStatus 舊格式值
舊值 → 新值:
- 'open''investigating'
"""
legacy_map = {
"open": "investigating",
}
return legacy_map.get(value, value)
def normalize_severity(value: str) -> str:
"""
正規化 Severity 舊格式值
舊值 → 新值:
- 'critical''P0'
- 'high''P1'
- 'warning''P2'
- 'medium''P2'
- 'info''P3'
- 'low''P3'
- 'none''P3'
"""
legacy_map = {
"critical": "P0",
"high": "P1",
"warning": "P2",
"medium": "P2",
"info": "P3",
"low": "P3",
"none": "P3",
}
return legacy_map.get(value, value)
# =============================================================================
# Constants
# =============================================================================
# Redis Key Prefix
INCIDENT_KEY_PREFIX = "incident:"
# Working Memory TTL: 7 天 = 604800 秒
WORKING_MEMORY_TTL = 604800
# =============================================================================
# Incident Service
# =============================================================================
class IncidentService:
"""
雙層記憶服務
職責:
1. Working Memory (Redis): 活躍事件快取
2. Episodic Memory (PostgreSQL): 歷史事件持久化
使用方式:
service = IncidentService()
incident = await service.create_incident_from_signal(signal_data)
"""
# =========================================================================
# Working Memory (Redis)
# =========================================================================
async def save_to_working_memory(self, incident: Incident) -> bool:
"""
將 Incident 寫入 Working Memory (Redis)
使用 Redis Hash 儲存Key 格式: incident:{incident_id}
TTL: 7 天 (604800 秒)
Returns:
bool: 是否成功寫入
"""
redis_client = get_redis()
key = f"{INCIDENT_KEY_PREFIX}{incident.incident_id}"
try:
# 序列化為 JSON
incident_json = incident.model_dump_json()
# SET with TTL
await redis_client.set(
key,
incident_json,
ex=WORKING_MEMORY_TTL,
)
logger.info(
"working_memory_saved",
incident_id=incident.incident_id,
key=key,
ttl_seconds=WORKING_MEMORY_TTL,
)
return True
except Exception as e:
logger.exception(
"working_memory_save_error",
incident_id=incident.incident_id,
error=str(e),
)
return False
async def get_from_working_memory(self, incident_id: str) -> Incident | None:
"""
從 Working Memory 讀取 Incident
方案 C: 解析時正規化舊格式 Enum 值
Returns:
Incident | None: 事件資料,若不存在則返回 None
"""
redis_client = get_redis()
key = f"{INCIDENT_KEY_PREFIX}{incident_id}"
try:
data = await redis_client.get(key)
if data is None:
return None
# 方案 C: 正規化舊格式 Enum 值
incident_dict = json.loads(data)
if "status" in incident_dict:
incident_dict["status"] = normalize_status(incident_dict["status"])
if "severity" in incident_dict:
incident_dict["severity"] = normalize_severity(incident_dict["severity"])
# 同時正規化 signals 內的 severity
for signal in incident_dict.get("signals", []):
if "severity" in signal:
signal["severity"] = normalize_severity(signal["severity"])
return Incident.model_validate(incident_dict)
except Exception as e:
logger.exception(
"working_memory_get_error",
incident_id=incident_id,
error=str(e),
)
return None
async def get_active_incidents(self) -> list[Incident]:
"""
列出所有活躍的 Incidents (從 Working Memory)
方案 C: 解析時正規化舊格式 Enum 值
Returns:
list[Incident]: 活躍事件列表 (investigating 或 mitigating)
"""
redis_client = get_redis()
incidents: list[Incident] = []
try:
# SCAN 所有 incident:* keys
async for key in redis_client.scan_iter(
match=f"{INCIDENT_KEY_PREFIX}*",
count=100,
):
# 排除索引 keys
if ":idx:" in key:
continue
data = await redis_client.get(key)
if data is None:
continue
try:
# 方案 C: 正規化舊格式 Enum 值
incident_dict = json.loads(data)
if "status" in incident_dict:
incident_dict["status"] = normalize_status(incident_dict["status"])
if "severity" in incident_dict:
incident_dict["severity"] = normalize_severity(incident_dict["severity"])
# 正規化 signals 內的 severity
for signal in incident_dict.get("signals", []):
if "severity" in signal:
signal["severity"] = normalize_severity(signal["severity"])
incident = Incident.model_validate(incident_dict)
# 只返回活躍狀態的 Incident
if incident.status in (
IncidentStatus.INVESTIGATING,
IncidentStatus.MITIGATING,
):
incidents.append(incident)
except Exception as e:
logger.warning(
"incident_parse_error",
key=key,
error=str(e),
)
continue
logger.info(
"get_active_incidents",
count=len(incidents),
)
return incidents
except Exception as e:
logger.exception(
"get_active_incidents_error",
error=str(e),
)
return []
# =========================================================================
# Episodic Memory (PostgreSQL)
# =========================================================================
async def save_to_episodic_memory(self, incident: Incident) -> bool:
"""
將 Incident 寫入 Episodic Memory (PostgreSQL)
使用 SQLAlchemy async session 寫入 incidents 表。
Returns:
bool: 是否成功寫入
"""
try:
async with get_db_context() as db:
# 轉換為 SQLAlchemy model
# 使用 model_dump(mode="json") 確保 datetime 正確序列化
record = IncidentRecord(
incident_id=incident.incident_id,
status=incident.status.value,
severity=incident.severity.value,
signals=[
s.model_dump(mode="json") for s in incident.signals
],
affected_services=incident.affected_services,
decision_chain=(
incident.decision_chain.model_dump(mode="json")
if incident.decision_chain
else None
),
proposal_ids=[str(pid) for pid in incident.proposal_ids],
outcome=(
incident.outcome.model_dump(mode="json")
if incident.outcome
else None
),
created_at=incident.created_at,
updated_at=incident.updated_at,
resolved_at=incident.resolved_at,
closed_at=incident.closed_at,
ttl_days=incident.ttl_days,
vectorized=incident.vectorized,
)
db.add(record)
# commit 由 get_db_context 自動處理
logger.info(
"episodic_memory_saved",
incident_id=incident.incident_id,
table="incidents",
)
return True
except Exception as e:
logger.exception(
"episodic_memory_save_error",
incident_id=incident.incident_id,
error=str(e),
)
return False
async def get_from_episodic_memory(self, incident_id: str) -> Incident | None:
"""
從 Episodic Memory 讀取 Incident
Returns:
Incident | None: 事件資料,若不存在則返回 None
"""
try:
async with get_db_context() as db:
from sqlalchemy import select
stmt = select(IncidentRecord).where(
IncidentRecord.incident_id == incident_id
)
result = await db.execute(stmt)
record = result.scalar_one_or_none()
if record is None:
return None
# 轉換回 Pydantic model
return self._record_to_incident(record)
except Exception as e:
logger.exception(
"episodic_memory_get_error",
incident_id=incident_id,
error=str(e),
)
return None
def _record_to_incident(self, record: IncidentRecord) -> Incident:
"""
將 SQLAlchemy record 轉換為 Pydantic Incident
方案 C: 解析時正規化舊格式 Enum 值
"""
from src.models.incident import AIDecisionChain, IncidentOutcome
# 方案 C: 正規化 signals 內的舊格式 severity
signals = []
for s in (record.signals or []):
signal_data = s.copy()
if "severity" in signal_data:
signal_data["severity"] = normalize_severity(signal_data["severity"])
signals.append(Signal(**signal_data))
decision_chain = (
AIDecisionChain(**record.decision_chain)
if record.decision_chain
else None
)
outcome = (
IncidentOutcome(**record.outcome)
if record.outcome
else None
)
# 方案 C: 正規化舊格式 Enum 值
normalized_status = normalize_status(record.status)
normalized_severity = normalize_severity(record.severity)
return Incident(
incident_id=record.incident_id,
status=IncidentStatus(normalized_status),
severity=Severity(normalized_severity),
signals=signals,
affected_services=record.affected_services or [],
decision_chain=decision_chain,
proposal_ids=record.proposal_ids or [],
outcome=outcome,
created_at=record.created_at,
updated_at=record.updated_at,
resolved_at=record.resolved_at,
closed_at=record.closed_at,
ttl_days=record.ttl_days,
persisted_to_pg=True, # 從 PG 讀取,必為 True
vectorized=record.vectorized,
)
# =========================================================================
# 雙層寫入核心邏輯
# =========================================================================
async def create_incident_from_signal(
self,
signal_data: dict[str, Any],
frequency_stats: dict[str, Any] | None = None,
) -> Incident | None:
"""
從 Signal 建立 Incident 並雙層寫入
Phase 6.2 核心邏輯:
1. 建立 Incident (含 Signal)
2. 寫入 Working Memory (Redis) - 7 天 TTL
3. 寫入 Episodic Memory (PostgreSQL) - 永久保留
4. 標記 persisted_to_pg = True
Phase 21 (ADR-037) 擴展:
5. 含異常頻率統計 (用於 Tier 分級修復策略)
Args:
signal_data: 從 Redis Stream 收到的 Signal 資料
frequency_stats: ADR-037 異常頻率統計 (可選)
Returns:
Incident | None: 成功返回 Incident失敗返回 None
"""
try:
# 0. 去抖動 (Debounce) - 防止告警風暴
fingerprint = signal_data.get("fingerprint")
if fingerprint:
try:
redis_client = get_redis()
debounce_key = f"debounce:{fingerprint}"
# SETNX 若成功表示是新的,給予 3 分鐘 TTL (180s)
is_new = await redis_client.set(debounce_key, "1", ex=180, nx=True)
if not is_new:
logger.info(
"incident_debounced",
fingerprint=fingerprint,
reason="Duplicate signal within 3 minutes",
)
return None
except Exception as e:
logger.warning("incident_debounce_redis_error", error=str(e))
# 1. 解析 Signal
signal = Signal(
alert_name=signal_data.get("alert_name", "unknown"),
severity=self._parse_severity(signal_data.get("severity", "warning")),
source=self._parse_source(signal_data.get("source", "manual")),
fired_at=datetime.now(UTC),
labels=self._parse_dict(signal_data.get("labels", "{}")),
annotations=self._parse_dict(signal_data.get("annotations", "{}")),
fingerprint=signal_data.get("fingerprint"),
)
# 2. 建立 Incident (含頻率統計)
# ADR-037: 統帥指示「重啟只是治標,太常發生的異常必須徹底解決」
from src.models.incident import IncidentFrequencyStats
freq_stats = None
if frequency_stats:
freq_stats = IncidentFrequencyStats(
anomaly_key=frequency_stats.get("anomaly_key", "unknown"),
count_1h=frequency_stats.get("count_1h", 0),
count_24h=frequency_stats.get("count_24h", 0),
count_7d=frequency_stats.get("count_7d", 0),
count_30d=frequency_stats.get("count_30d", 0),
escalation_level=frequency_stats.get("escalation_level"),
auto_repair_count=frequency_stats.get("auto_repair_count", 0),
)
incident = Incident(
severity=signal.severity,
signals=[signal],
affected_services=[signal_data.get("target", "unknown")],
frequency_stats=freq_stats,
)
logger.info(
"incident_created",
incident_id=incident.incident_id,
severity=incident.severity.value,
signal_count=len(incident.signals),
)
# 3. 寫入 Working Memory (Redis)
redis_success = await self.save_to_working_memory(incident)
# 4. 寫入 Episodic Memory (PostgreSQL)
pg_success = await self.save_to_episodic_memory(incident)
# 5. 更新狀態
if pg_success:
incident.persisted_to_pg = True
# 更新 Redis 中的狀態
if redis_success:
await self.save_to_working_memory(incident)
# 6. 記錄雙層寫入結果
logger.info(
"dual_layer_memory_result",
incident_id=incident.incident_id,
redis_success=redis_success,
pg_success=pg_success,
persisted_to_pg=incident.persisted_to_pg,
)
return incident
except Exception as e:
logger.exception(
"create_incident_error",
error=str(e),
)
return None
def _parse_source(
self,
source_str: str,
) -> Literal["prometheus", "signoz", "alertmanager", "manual", "telegram"]:
"""
解析來源字串,映射到 Signal 允許的 Literal 值
不在白名單中的來源一律映射為 'manual'
"""
valid_sources = {"prometheus", "signoz", "alertmanager", "manual", "telegram"}
if source_str.lower() in valid_sources:
return source_str.lower() # type: ignore
return "manual"
def _parse_severity(self, severity_str: str) -> Severity:
"""解析嚴重度字串"""
mapping = {
"critical": Severity.P0,
"high": Severity.P1,
"warning": Severity.P2,
"medium": Severity.P2,
"low": Severity.P3,
"info": Severity.P3,
}
return mapping.get(severity_str.lower(), Severity.P2)
def _parse_dict(self, value: str | dict) -> dict[str, str]:
"""解析字典字串或字典"""
if isinstance(value, dict):
return {str(k): str(v) for k, v in value.items()}
if isinstance(value, str):
try:
# 嘗試解析 JSON
parsed = json.loads(value.replace("'", '"'))
return {str(k): str(v) for k, v in parsed.items()}
except (json.JSONDecodeError, TypeError):
return {}
return {}
# =========================================================================
# Phase 17 P0: Router 層違規修復 - 新增方法
# =========================================================================
async def update_outcome(
self,
incident_id: str,
effectiveness_score: int | None = None,
human_feedback: str | None = None,
learning_notes: str | None = None,
should_remember: bool = True,
) -> Incident | None:
"""
更新 Incident 的 outcome (人類回饋)
Phase 17: 從 Router 層遷移至 Service 層
Args:
incident_id: 事件 ID
effectiveness_score: 有效性評分 (1-5)
human_feedback: 文字回饋
learning_notes: 學習筆記
should_remember: 是否納入長期記憶
Returns:
Incident | None: 更新後的事件,失敗返回 None
"""
from src.models.incident import IncidentOutcome
from src.repositories.incident_repository import get_incident_repository
from src.utils.timezone import now_taipei
# 1. 從 Working Memory 讀取
incident = await self.get_from_working_memory(incident_id)
if incident is None:
logger.warning("incident_not_found_for_outcome", incident_id=incident_id)
return None
# 2. 更新 outcome
if incident.outcome is None:
incident.outcome = IncidentOutcome()
if effectiveness_score is not None:
incident.outcome.effectiveness_score = effectiveness_score
if human_feedback is not None:
incident.outcome.human_feedback = human_feedback
if learning_notes is not None:
incident.outcome.learning_notes = learning_notes
incident.outcome.should_remember = should_remember
incident.updated_at = now_taipei()
# 3. 寫入 Working Memory
redis_success = await self.save_to_working_memory(incident)
if not redis_success:
logger.error("outcome_redis_write_failed", incident_id=incident_id)
return None
# 4. 同步到 Episodic Memory (PostgreSQL)
try:
repo = get_incident_repository()
await repo.update_outcome(
incident_id=incident_id,
outcome=incident.outcome.model_dump(mode="json"),
updated_at=now_taipei(),
)
logger.info("outcome_db_updated", incident_id=incident_id)
except Exception as e:
logger.warning(
"outcome_db_update_failed",
incident_id=incident_id,
error=str(e),
)
# DB 失敗不影響主流程
return incident
async def resolve_incident(self, incident_id: str) -> Incident | None:
"""
將 Incident 狀態更新為 RESOLVED
Phase 17: 從 Router 層遷移至 Service 層
Args:
incident_id: 事件 ID
Returns:
Incident | None: 更新後的事件,失敗返回 None
"""
from src.repositories.incident_repository import get_incident_repository
from src.utils.timezone import now_taipei
# 1. 從 Working Memory 讀取
incident = await self.get_from_working_memory(incident_id)
if incident is None:
logger.warning("incident_not_found_for_resolve", incident_id=incident_id)
return None
# 2. 更新狀態
incident.status = IncidentStatus.RESOLVED
incident.resolved_at = now_taipei()
incident.updated_at = now_taipei()
# 3. 寫入 Working Memory
redis_success = await self.save_to_working_memory(incident)
if not redis_success:
logger.error("resolve_redis_write_failed", incident_id=incident_id)
return None
# 4. 同步到 Episodic Memory
try:
repo = get_incident_repository()
await repo.update_status(
incident_id=incident_id,
status="resolved",
updated_at=now_taipei(),
)
logger.info("resolve_db_updated", incident_id=incident_id)
except Exception as e:
logger.warning(
"resolve_db_update_failed",
incident_id=incident_id,
error=str(e),
)
# KB Phase 2-A: 自動萃取 KB 草稿 (fire-and-forget, 2026-04-03 ogt)
try:
import asyncio
from src.services.knowledge_extractor_service import get_knowledge_extractor
asyncio.create_task(
get_knowledge_extractor().extract_from_incident(incident)
)
except Exception:
logger.exception("kb_extract_task_create_failed", incident_id=incident_id)
# 2026-04-07 Claude Code: Sprint 4 B4 — 手動處理推斷
# I1+S1 Fix: 委託 derive_key_from_incident() 統一推導
try:
from src.services.anomaly_counter import AnomalyCounter, get_anomaly_counter
counter = get_anomaly_counter()
anomaly_key = AnomalyCounter.derive_key_from_incident(incident)
if anomaly_key:
disposition = await counter.get_disposition_stats(anomaly_key)
has_system_resolution = (
disposition["auto_repair"] > 0
or disposition["human_approved"] > 0
or disposition["cold_start_trust"] > 0
)
if not has_system_resolution:
await counter.record_disposition(anomaly_key, "manual_resolved")
except Exception as _disp_e:
logger.warning("disposition_manual_resolve_failed", error=str(_disp_e))
return incident
async def find_by_proposal_id(self, proposal_id: str) -> Incident | None:
"""
根據 proposal_id 查找關聯的 Incident
Phase 17: 從 Router 層遷移至 Service 層
Args:
proposal_id: 提案 ID (UUID 字串)
Returns:
Incident | None: 找到的事件,未找到返回 None
"""
from uuid import UUID
redis_client = get_redis()
try:
target_uuid = UUID(proposal_id)
async for key in redis_client.scan_iter(
match=f"{INCIDENT_KEY_PREFIX}INC-*",
count=100,
):
data = await redis_client.get(key)
if data is None:
continue
try:
# 方案 C: 正規化舊格式 Enum 值
incident_dict = json.loads(data)
if "status" in incident_dict:
incident_dict["status"] = normalize_status(incident_dict["status"])
if "severity" in incident_dict:
incident_dict["severity"] = normalize_severity(incident_dict["severity"])
for signal in incident_dict.get("signals", []):
if "severity" in signal:
signal["severity"] = normalize_severity(signal["severity"])
incident = Incident.model_validate(incident_dict)
if target_uuid in incident.proposal_ids:
return incident
except Exception as e:
logger.warning(
"incident_parse_error_in_find",
key=key,
error=str(e),
)
continue
return None
except Exception as e:
logger.exception(
"find_by_proposal_id_error",
proposal_id=proposal_id,
error=str(e),
)
return None
async def trigger_reanalysis(self, incident_id: str) -> dict:
"""
觸發 Incident 重診 (ADR-050 P2: reanalyze button)
去重保護:同一 incident 10 分鐘內只觸發一次。
觸發後將 incident status 標記為 analyzing等待 AI 自動接手。
Args:
incident_id: Incident ID
Returns:
dict: {
"triggered": bool,
"message": str,
"already_analyzing": bool,
}
2026-04-01 Claude Code (ADR-050 P2): reanalyze button handler
"""
REANALYZE_TTL_SECONDS = 600 # 10 分鐘去重 TTL (ADR-050)
dedup_key = f"reanalyze_dedup:{incident_id}"
try:
redis_client = get_redis()
# 去重檢查 (SETNX: 只有第一次設定會成功)
is_new = await redis_client.set(dedup_key, "1", ex=REANALYZE_TTL_SECONDS, nx=True)
if not is_new:
logger.info(
"reanalyze_deduplicated",
incident_id=incident_id,
reason="Already triggered within 10 minutes",
)
return {
"triggered": False,
"message": "重診已在進行中,請 10 分鐘後再試",
"already_analyzing": True,
}
# 從 Working Memory 取得 Incident
incident = await self.get_from_working_memory(incident_id)
if not incident:
incident = await self.get_from_episodic_memory(incident_id)
if not incident:
# 刪除剛設定的去重 key讓下次能重試
await redis_client.delete(dedup_key)
logger.warning("reanalyze_incident_not_found", incident_id=incident_id)
return {
"triggered": False,
"message": f"找不到事件 {incident_id}",
"already_analyzing": False,
}
# 標記 status 為 analyzing讓 AI 引擎接手)
# 使用延遲 import 避免循環依賴(同 create_incident_from_signal 模式)
from src.models.incident import IncidentStatus
# 使用 INVESTIGATING 若 ANALYZING 不存在
analyzing_status = getattr(IncidentStatus, "ANALYZING", None) or getattr(IncidentStatus, "INVESTIGATING", None)
if analyzing_status:
incident.status = analyzing_status
await self.save_to_working_memory(incident)
logger.info(
"reanalyze_triggered",
incident_id=incident_id,
severity=incident.severity.value,
)
return {
"triggered": True,
"message": "重診已排程AI 正在分析中",
"already_analyzing": False,
}
except Exception as e:
logger.exception("reanalyze_failed", incident_id=incident_id, error=str(e))
return {
"triggered": False,
"message": f"重診觸發失敗: {str(e)[:80]}",
"already_analyzing": False,
}
# =============================================================================
# Singleton
# =============================================================================
_incident_service: IncidentService | None = None
def get_incident_service() -> IncidentService:
"""取得 Incident Service 實例 (Singleton)"""
global _incident_service
if _incident_service is None:
_incident_service = IncidentService()
return _incident_service