""" KM Conversion Service — ADR-071-G ================================== Incident RESOLVED 後自動轉換為 KnowledgeEntry + Playbook 草稿 設計原則: - 非同步觸發,失敗不影響主流程 - 根據 notification_type 決定 KM 品質等級 - 自動向量化(embedding) - 寫入 AlertOperationLog KM_CONVERTED 事件 建立時間: 2026-04-11 (台北時區) 建立者: Claude Sonnet 4.6 — ADR-071-G leWOOOgo 積木化: - KMConversionService → KnowledgeService + LearningService + AlertOperationLogRepository - 不直接存取 DB,透過 Repository 層 """ import structlog from src.models.knowledge import ( EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate, ) from src.repositories.alert_operation_log_repository import ( ALERT_EVENT_TYPES, get_alert_operation_log_repository, ) from src.services.knowledge_service import get_knowledge_service logger = structlog.get_logger(__name__) # ADR-071 新 event_type 已在 migrations/adr071_notification_lifecycle.sql 的 DB enum 中定義 # 不在此做 runtime ALERT_EVENT_TYPES.update():避免模組副作用污染全局狀態 + 測試隔離失敗 # 若 repository 層有 ALERT_EVENT_TYPES 白名單 validation, # 請在 alert_operation_log_repository.py 的 ALERT_EVENT_TYPES 靜態集合中加入以下值: # "KM_CONVERTED", "NOTIFICATION_CLASSIFIED", "MANUAL_FIX_RECORDED", # "PLAYBOOK_DRAFT_CREATED", "STATE_GUARD_BLOCKED" # 通知類型 → KM 品質等級對應 _TYPE_TO_STATUS = { "TYPE-2": EntryStatus.APPROVED, # 自動修復成功,最高品質 "TYPE-3": EntryStatus.REVIEW, # 人工審核後執行 "TYPE-4": EntryStatus.DRAFT, # AI 無法判斷,草稿 "TYPE-4D": EntryStatus.DRAFT, # Config Drift,草稿 "TYPE-1": None, # 純資訊,不轉 KM } _TYPE_TO_SOURCE = { "TYPE-2": EntrySource.AI_EXTRACTED, "TYPE-3": EntrySource.AI_EXTRACTED, "TYPE-4": EntrySource.HUMAN, "TYPE-4D": EntrySource.AI_EXTRACTED, } class KMConversionService: """ Incident → KM 自動轉換服務 觸發時機: 1. Incident 狀態變為 RESOLVED(主要路徑) 2. 使用者點擊 [手動修復後記錄] 後(ADR-071-H) 3. 每日 03:00 cron 補轉換(vectorized=False + RESOLVED) """ def __init__(self) -> None: self._knowledge_svc = get_knowledge_service() self._op_log_repo = get_alert_operation_log_repository() async def convert(self, incident) -> dict | None: """ 將 Incident 轉換為 KnowledgeEntry Args: incident: Incident ORM 物件(呼叫前確保 signals 已 eager load) Returns: dict with km_entry_id and quality_level, or None if skipped """ try: return await self._convert_inner(incident) except Exception as e: logger.error( "km_conversion_error", incident_id=getattr(incident, "incident_id", "unknown"), error=str(e), ) raise async def _convert_inner(self, incident) -> dict | None: notification_type = getattr(incident, "notification_type", None) or "TYPE-3" # TYPE-1 不轉 KM target_status = _TYPE_TO_STATUS.get(notification_type) if target_status is None: logger.debug( "km_conversion_skipped", incident_id=incident.incident_id, reason="TYPE-1 純資訊,不轉 KM", ) return None entry_source = _TYPE_TO_SOURCE.get(notification_type, EntrySource.AI_EXTRACTED) alert_category = getattr(incident, "alert_category", None) or "unknown" # 提取 label 資訊 labels = incident.signals[0].labels if incident.signals else {} alertname = labels.get("alertname", "unknown") severity = labels.get("severity", "unknown") affected_services = ", ".join(incident.affected_services or ["unknown"]) # 計算修復耗時 resolution_time = "" if incident.resolved_at and incident.created_at: try: delta = incident.resolved_at - incident.created_at resolution_time = f"{int(delta.total_seconds())}s" except Exception: pass # 建立 KM 內容(標準格式) content = self._build_content( incident=incident, alertname=alertname, affected_services=affected_services, severity=severity, resolution_time=resolution_time, ) _inc_title = getattr(incident, "title", None) or alertname title = f"{alertname} @ {affected_services[:40]} — {_inc_title[:60]}" tags = [alertname, affected_services, severity, notification_type] if alert_category != "unknown": tags.append(alert_category) km_entry = await self._knowledge_svc.create_entry( KnowledgeEntryCreate( title=title[:255], content=content, entry_type=EntryType.INCIDENT_CASE, category=alert_category, tags=[t for t in tags if t], source=entry_source, status=target_status, related_incident_id=incident.incident_id, ) ) # 寫入操作日誌 try: await self._op_log_repo.append( event_type="KM_CONVERTED", incident_id=incident.incident_id, actor="km_conversion_service", action_detail=f"KM entry created: {km_entry.id}", success=True, context={ "km_entry_id": km_entry.id, "quality_level": target_status.value, "notification_type": notification_type, }, ) except Exception as _e: logger.warning("km_op_log_failed", incident_id=incident.incident_id, error=str(_e)) # BUG-004 修復 2026-04-11: KM entry 建立後,knowledge_service 背景觸發 embedding, # 但 incidents.vectorized 沒有被設為 True → 飛輪閉環(ADR-068)學習效果歸零。 # 等 embedding 背景任務啟動後(短延遲)更新 incidents.vectorized = True。 # 注意:embedding 為背景 asyncio task,此處標記 vectorized=True 代表「已觸發向量化」 # 真正完成以 knowledge_embedding_saved log 為準,但 vectorized flag 用於篩選補轉換 try: from src.db.base import get_db_context from src.db.models import IncidentRecord from sqlalchemy import update as _sa_update async with get_db_context() as _db: await _db.execute( _sa_update(IncidentRecord) .where(IncidentRecord.incident_id == incident.incident_id) .values(vectorized=True) ) await _db.commit() logger.info( "km_incident_vectorized_flagged", incident_id=incident.incident_id, km_entry_id=km_entry.id, ) except Exception as _ve: logger.warning( "km_vectorized_flag_failed", incident_id=incident.incident_id, error=str(_ve), ) # C2 修復 2026-04-11: DB 更新後,同步更新 Redis Working Memory 中的 vectorized 欄位 # 審計查 Redis Incident 物件,若不同步則審計仍顯示 vectorized=False # Key 格式: incident:{incident_id}(與 incident_service.save_to_working_memory 一致) try: import json as _json from src.core.redis_client import get_redis _redis = get_redis() _redis_key = f"incident:{incident.incident_id}" _raw = await _redis.get(_redis_key) if _raw: _data = _json.loads(_raw) if not _data.get("vectorized"): _data["vectorized"] = True _ttl = await _redis.ttl(_redis_key) _ex = _ttl if _ttl and _ttl > 0 else 604800 await _redis.set(_redis_key, _json.dumps(_data), ex=_ex) logger.info( "km_incident_vectorized_redis_synced", incident_id=incident.incident_id, ) except Exception as _re: logger.debug( "km_vectorized_redis_sync_failed", incident_id=incident.incident_id, error=str(_re), ) logger.info( "km_converted", incident_id=incident.incident_id, km_entry_id=km_entry.id, quality_level=target_status.value, notification_type=notification_type, ) return { "km_entry_id": km_entry.id, "quality_level": target_status.value, } def _build_content( self, incident, alertname: str, affected_services: str, severity: str, resolution_time: str, ) -> str: """ 建立 KM 條目內容(標準格式) """ created_at_str = str(incident.created_at) if incident.created_at else "未知" resolved_at_str = str(incident.resolved_at) if incident.resolved_at else "未知" context_summary = "" if incident.context_bundle: context_summary = str(incident.context_bundle.get("summary", "")) # 決策鏈資訊 decision_chain = getattr(incident, "decision_chain", None) root_cause = "" action_type = "" action_command = "" if decision_chain and isinstance(decision_chain, dict): root_cause = decision_chain.get("root_cause", "") action_type = decision_chain.get("action_type", "") action_command = decision_chain.get("action", "") # 指標快照(若有)— ADR-071-J: 使用精簡 delta 格式 (2026-04-11 Claude Sonnet 4.6) metrics_section = "" if incident.metrics_before or incident.metrics_after: mb = incident.metrics_before or {} ma = incident.metrics_after or {} delta_parts = [] for key, label in (("cpu_pct", "CPU"), ("mem_pct", "Mem"), ("disk_pct", "Disk")): if key in mb and key in ma: delta_parts.append(f"{label} {mb[key]:.0f}%→{ma[key]:.0f}%") elif key in ma: delta_parts.append(f"{label} 後={ma[key]:.0f}%") if "restart_count" in mb and "restart_count" in ma: delta_parts.append(f"重啟 {mb['restart_count']}→{ma['restart_count']}") delta_str = " | ".join(delta_parts) if delta_parts else "(無量化數據)" # K8s 狀態(若有) k8s_state = getattr(incident, "k8s_state_after", None) k8s_line = f"\n- K8s 狀態: {k8s_state}" if k8s_state else "" metrics_section = ( f"\n## 效果驗證\n" f"- 指標變化: {delta_str}\n" f"- 恢復耗時: {resolution_time}\n" + k8s_line + "\n" ) # 驗證結果(若有) verify_section = "" if incident.verification_result: verify_section = f"- 驗證方式: {incident.verification_result}\n" manual_section = "" if incident.manual_fix_steps: manual_section = ( f"\n## 手動修復步驟\n" f"- 執行者: {incident.manual_fix_by or '未知'}\n" f"```\n{incident.manual_fix_steps}\n```\n" ) return ( f"## 症狀\n" f"- 告警: {alertname}\n" f"- 服務: {affected_services}\n" f"- 嚴重度: {severity}\n" f"- 觸發時間: {created_at_str}\n" f"- 解決時間: {resolved_at_str}\n" + (f"- 即時情境: {context_summary}\n" if context_summary else "") + f"\n## 根因分析\n{root_cause or getattr(incident, 'title', None) or '未知'}\n" + ( f"\n## 執行動作\n" f"- 類型: {action_type}\n" f"- 指令: {action_command}\n" if action_type or action_command else "" ) + metrics_section + verify_section + manual_section ) # Singleton (模組層級) _km_conversion_service: KMConversionService | None = None def get_km_conversion_service() -> KMConversionService: global _km_conversion_service if _km_conversion_service is None: _km_conversion_service = KMConversionService() return _km_conversion_service