""" Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取 ============================================================ 從 approvals.py 抽取執行編排邏輯,整合: - OperationParser: 解析操作類型 - K8s Executor: 執行 K8s 操作 - ApprovalDBService: 更新狀態 - TimelineService: 記錄事件 - NotificationManager: 發送通知 - Phase 7.6: Playbook 自動萃取 版本: v1.1 建立: 2026-03-25 (台北時區) 更新: 2026-03-26 (Phase 7.6 自動萃取) 建立者: Claude Code (Phase 16 R4.2) """ import asyncio from typing import TYPE_CHECKING import structlog from src.core.config import settings from src.models.approval import ApprovalRequest from src.services.approval_db import get_approval_service, get_timeline_service from src.services.executor import get_executor from src.services.operation_parser import parse_operation_from_action if TYPE_CHECKING: from src.services.notifications import ExecutionStatus logger = structlog.get_logger(__name__) class ApprovalExecutionService: """ 授權執行服務 - 編排整個執行流程 職責: 1. 解析操作類型 2. 呼叫 K8s Executor 執行 3. 更新資料庫狀態 4. 記錄 Timeline 事件 5. 發送通知 """ async def execute_approved_action(self, approval: ApprovalRequest) -> None: """ 背景執行已批准的操作 此函數由 BackgroundTasks 呼叫,不阻塞 API 回應 Phase 5: 執行後更新資料庫狀態 Phase 6: 執行後發送通知 (Post-Execution Hook) Args: approval: 已批准的授權請求 """ from src.services.notifications import ExecutionStatus logger.info( "background_execution_start", approval_id=str(approval.id), action=approval.action, ) service = get_approval_service() timeline = get_timeline_service() # Parse operation details parsed = parse_operation_from_action(approval.action) operation_type = parsed.operation_type resource_name = parsed.resource_name namespace = parsed.namespace if operation_type is None or resource_name is None: logger.warning( "background_execution_skip", approval_id=str(approval.id), reason="Could not parse operation type from action", action=approval.action, ) # Phase 5: 更新資料庫狀態 await service.update_execution_status(approval.id, success=False) await timeline.add_event( event_type="exec", status="error", title="執行失敗: 無法解析操作類型", description=f"Action: {approval.action}", actor="leWOOOgo", actor_role="executor", approval_id=str(approval.id), ) # Phase 6: 發送失敗通知 (fire-and-forget) asyncio.create_task( self._send_execution_notification( approval=approval, execution_status=ExecutionStatus.FAILED, operation_type="unknown", namespace=namespace, error_message="Could not parse operation type", ) ) return # Execute with audit executor = get_executor() result = await executor.execute_with_audit( approval=approval, operation_type=operation_type, resource_name=resource_name, namespace=namespace, ) # Phase 5: 更新資料庫狀態 await service.update_execution_status(approval.id, success=result.success) # Update approval status based on result if result.success: logger.info( "background_execution_success", approval_id=str(approval.id), operation=operation_type.value, target=resource_name, namespace=namespace, duration_ms=result.duration_ms, ) await timeline.add_event( event_type="exec", status="success", title=f"✅ K8s 執行成功: {operation_type.value}", description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)", actor="leWOOOgo", actor_role="executor", approval_id=str(approval.id), ) # Phase 6: 發送成功通知 (fire-and-forget) asyncio.create_task( self._send_execution_notification( approval=approval, execution_status=ExecutionStatus.SUCCESS, operation_type=operation_type.value, namespace=namespace, duration_ms=result.duration_ms, ) ) # Phase 7.6: 觸發 Playbook 自動萃取 (fire-and-forget) asyncio.create_task( self._trigger_playbook_extraction(approval) ) # ADR-030 Phase 5: 觸發學習服務 (fire-and-forget) asyncio.create_task( self._trigger_learning( approval=approval, success=True, duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0, ) ) else: logger.error( "background_execution_failed", approval_id=str(approval.id), operation=operation_type.value, target=resource_name, namespace=namespace, error=result.error, ) await timeline.add_event( event_type="exec", status="error", title=f"❌ K8s 執行失敗: {operation_type.value}", description=f"Error: {result.error}", actor="leWOOOgo", actor_role="executor", approval_id=str(approval.id), ) # Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截) exec_status = ( ExecutionStatus.DRY_RUN_BLOCKED if "not found" in (result.error or "") else ExecutionStatus.FAILED ) asyncio.create_task( self._send_execution_notification( approval=approval, execution_status=exec_status, operation_type=operation_type.value, namespace=namespace, error_message=result.error, duration_ms=result.duration_ms, ) ) # ADR-030 Phase 5: 觸發學習服務 (失敗案例) asyncio.create_task( self._trigger_learning( approval=approval, success=False, error_message=result.error, duration_seconds=result.duration_ms / 1000 if result.duration_ms else 0, ) ) async def _trigger_learning( self, approval: ApprovalRequest, success: bool, duration_seconds: float = 0, error_message: str | None = None, ) -> None: """ ADR-030 Phase 5: 觸發學習服務 處理執行結果,調整信任度和 Playbook 統計 """ try: from src.services.learning_service import ( ExecutionResult, get_learning_service, ) learning = get_learning_service() result = ExecutionResult( approval_id=str(approval.id), incident_id=approval.incident_id or "", action=approval.action, success=success, error_message=error_message, duration_seconds=duration_seconds, ) await learning.process_execution_result( approval=approval, result=result, ) except Exception as e: # 學習失敗不影響主流程 logger.warning( "learning_trigger_failed", approval_id=str(approval.id), error=str(e), ) # 2026-04-04 ogt: 執行結果沉澱到 KM — 移出 try/except 確保 learning 失敗也寫入 # 統帥鐵律: 所有異常與自動修復紀錄必須回寫 KM asyncio.create_task( self._write_execution_result_to_km(approval, success, error_message) ) async def _write_execution_result_to_km( self, approval: "ApprovalRequest", success: bool, error_message: str | None, ) -> None: """ 執行結果沉澱到 KM (Knowledge Base) 2026-04-04 ogt: 統帥鐵律 — 成功/失敗執行記錄都必須回寫 KM """ try: from src.models.knowledge import EntrySource, EntryType, KnowledgeEntryCreate from src.services.knowledge_service import get_knowledge_service status_icon = "✅" if success else "❌" status_text = "成功" if success else f"失敗: {error_message or '未知原因'}" content = ( f"# {status_icon} 執行記錄: {approval.action[:80]}\n\n" f"**Approval ID**: {approval.id}\n" f"**Incident ID**: {approval.incident_id or '未關聯'}\n" f"**執行結果**: {status_text}\n" f"**風險等級**: {approval.risk_level.value if approval.risk_level else '未知'}\n\n" f"## 操作內容\n{approval.description or '無描述'}\n" ) entry_data = KnowledgeEntryCreate( title=f"[執行記錄] {status_icon} {approval.action[:60]}", content=content, entry_type=EntryType.INCIDENT_CASE, category="execution_result", tags=["execution", "auto_repair" if success else "execution_failed"], source=EntrySource.AI_EXTRACTED, related_incident_id=approval.incident_id or None, created_by="approval_execution", ) await get_knowledge_service().create_entry(entry_data) logger.debug( "execution_result_written_to_km", approval_id=str(approval.id), success=success, ) except Exception as e: logger.warning( "execution_result_km_write_failed", approval_id=str(approval.id), error=str(e), ) async def _send_execution_notification( self, approval: ApprovalRequest, execution_status: "ExecutionStatus", operation_type: str, namespace: str, duration_ms: int | None = None, error_message: str | None = None, ) -> None: """ Phase 6: 發送執行通知 (Post-Execution Hook) 將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.) """ from src.services.notifications import ( NotificationMessage, get_notification_manager, ) if not settings.NOTIFICATION_ENABLED: logger.info("notification_disabled", approval_id=str(approval.id)) return try: # 建構簽核者列表 signers = [ {"name": sig.signer_name, "comment": sig.comment or ""} for sig in approval.signatures ] # 建構通知訊息 message = NotificationMessage( execution_status=execution_status, action_title=approval.action[:100], action_description=approval.description[:200] if approval.description else "", approval_id=str(approval.id), signers=signers, required_signatures=approval.required_signatures, affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0, estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A", related_services=approval.blast_radius.related_services if approval.blast_radius else [], data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none", namespace=namespace, operation_type=operation_type, duration_ms=duration_ms, error_message=error_message, risk_level=approval.risk_level.value, ai_provider=approval.requested_by, ) # 發送通知 manager = get_notification_manager() results = await manager.send_all(message) for result in results: logger.info( "notification_result", approval_id=str(approval.id), provider=result.provider, status=result.status.value, message=result.message, ) except Exception as e: logger.exception( "notification_failed", approval_id=str(approval.id), error=str(e), ) async def _trigger_playbook_extraction( self, approval: ApprovalRequest, ) -> None: """ Phase 7.6: 觸發 Playbook 自動萃取 條件: - 執行成功 - 關聯的 Incident 狀態為 RESOLVED 或 CLOSED - effectiveness_score >= 4 此函數為 fire-and-forget,失敗不影響主流程 """ try: # 1. 從 approval 取得關聯的 incident_id # approval.requested_by 可能包含 incident 資訊,或從 metadata 取得 # 暫時從 description 或 action 解析 incident_id = self._extract_incident_id_from_approval(approval) if not incident_id: logger.info( "playbook_extraction_skipped", approval_id=str(approval.id), reason="No incident_id found", ) return # 2. 取得 Incident from src.services.incident_service import get_incident_service incident_service = get_incident_service() incident = await incident_service.get_incident(incident_id) if not incident: logger.info( "playbook_extraction_skipped", approval_id=str(approval.id), incident_id=incident_id, reason="Incident not found", ) return # 3. 執行成功後自動設定 outcome (冷啟動關鍵) # 2026-04-04 ogt: 首席架構師 Review — 補上 execution_success + effectiveness_score # 確保 Playbook 萃取前置條件能成立,不再依賴人工填分 from src.models.incident import IncidentOutcome, IncidentStatus from src.utils.timezone import now_taipei if incident.outcome is None: incident.outcome = IncidentOutcome() if not incident.outcome.execution_success: incident.outcome.execution_success = True if incident.outcome.effectiveness_score is None or incident.outcome.effectiveness_score < 4: incident.outcome.effectiveness_score = 4 # 系統判斷:K8s 執行成功 = 有效 if incident.status not in [IncidentStatus.RESOLVED, IncidentStatus.CLOSED]: incident.status = IncidentStatus.RESOLVED incident.resolved_at = now_taipei() # 回存 Incident(fire-and-forget 路徑,失敗不影響主流程) await incident_service.save_to_working_memory(incident) logger.info( "playbook_extraction_incident_updated", approval_id=str(approval.id), incident_id=incident_id, effectiveness_score=incident.outcome.effectiveness_score, status=incident.status.value, ) # 4. 觸發萃取(effectiveness 已保證 >= 4) from src.services.playbook_service import get_playbook_service playbook_service = get_playbook_service() effectiveness = incident.outcome.effectiveness_score or 4 playbook = await playbook_service.extract_from_incident( incident=incident, auto_approve=effectiveness >= 5, # 滿分自動核准 ) if playbook: logger.info( "playbook_auto_extracted", approval_id=str(approval.id), incident_id=incident_id, playbook_id=playbook.playbook_id, playbook_name=playbook.name, auto_approved=playbook.status.value == "approved", ) else: logger.debug( "playbook_extraction_no_result", approval_id=str(approval.id), incident_id=incident_id, ) except Exception as e: # 萃取失敗不影響主流程 logger.warning( "playbook_extraction_error", approval_id=str(approval.id), error=str(e), ) def _extract_incident_id_from_approval( self, approval: ApprovalRequest, ) -> str | None: """ 從 approval 提取關聯的 incident_id 嘗試以下來源: 1. approval.metadata (如果有) 2. approval.description 中的 INC- 模式 3. approval.requested_by 中的 incident 資訊 """ import re # 從 description 或 action 中尋找 INC-XXXXXX 模式 text = f"{approval.description or ''} {approval.action or ''}" match = re.search(r"INC-([A-Z0-9-]+)", text) if match: return match.group(0) # 返回完整的 INC-XXXXX # 從 requested_by 尋找 if approval.requested_by and "INC-" in approval.requested_by: match = re.search(r"INC-([A-Z0-9-]+)", approval.requested_by) if match: return match.group(0) return None # ============================================================================= # Singleton Instance # ============================================================================= _execution_service: ApprovalExecutionService | None = None def get_execution_service() -> ApprovalExecutionService: """ 取得 ApprovalExecutionService 單例 Returns: ApprovalExecutionService: 執行服務實例 """ global _execution_service if _execution_service is None: _execution_service = ApprovalExecutionService() return _execution_service