""" Approval Execution Service - Phase 16 R4.2 瘦身 Router 抽取 ============================================================ 從 approvals.py 抽取執行編排邏輯,整合: - OperationParser: 解析操作類型 - K8s Executor: 執行 K8s 操作 - ApprovalDBService: 更新狀態 - TimelineService: 記錄事件 - NotificationManager: 發送通知 版本: v1.0 建立: 2026-03-25 (台北時區) 建立者: Claude Code (Phase 16 R4.2) """ import asyncio from typing import TYPE_CHECKING import structlog from src.core.config import settings from src.models.approval import ApprovalRequest from src.services.approval_db import get_approval_service, get_timeline_service from src.services.executor import get_executor from src.services.operation_parser import parse_operation_from_action if TYPE_CHECKING: from src.services.notifications import ExecutionStatus logger = structlog.get_logger(__name__) class ApprovalExecutionService: """ 授權執行服務 - 編排整個執行流程 職責: 1. 解析操作類型 2. 呼叫 K8s Executor 執行 3. 更新資料庫狀態 4. 記錄 Timeline 事件 5. 發送通知 """ async def execute_approved_action(self, approval: ApprovalRequest) -> None: """ 背景執行已批准的操作 此函數由 BackgroundTasks 呼叫,不阻塞 API 回應 Phase 5: 執行後更新資料庫狀態 Phase 6: 執行後發送通知 (Post-Execution Hook) Args: approval: 已批准的授權請求 """ from src.services.notifications import ExecutionStatus logger.info( "background_execution_start", approval_id=str(approval.id), action=approval.action, ) service = get_approval_service() timeline = get_timeline_service() # Parse operation details parsed = parse_operation_from_action(approval.action) operation_type = parsed.operation_type resource_name = parsed.resource_name namespace = parsed.namespace if operation_type is None or resource_name is None: logger.warning( "background_execution_skip", approval_id=str(approval.id), reason="Could not parse operation type from action", action=approval.action, ) # Phase 5: 更新資料庫狀態 await service.update_execution_status(approval.id, success=False) await timeline.add_event( event_type="exec", status="error", title="執行失敗: 無法解析操作類型", description=f"Action: {approval.action}", actor="leWOOOgo", actor_role="executor", approval_id=str(approval.id), ) # Phase 6: 發送失敗通知 (fire-and-forget) asyncio.create_task( self._send_execution_notification( approval=approval, execution_status=ExecutionStatus.FAILED, operation_type="unknown", namespace=namespace, error_message="Could not parse operation type", ) ) return # Execute with audit executor = get_executor() result = await executor.execute_with_audit( approval=approval, operation_type=operation_type, resource_name=resource_name, namespace=namespace, ) # Phase 5: 更新資料庫狀態 await service.update_execution_status(approval.id, success=result.success) # Update approval status based on result if result.success: logger.info( "background_execution_success", approval_id=str(approval.id), operation=operation_type.value, target=resource_name, namespace=namespace, duration_ms=result.duration_ms, ) await timeline.add_event( event_type="exec", status="success", title=f"✅ K8s 執行成功: {operation_type.value}", description=f"Target: {resource_name} @ {namespace} ({result.duration_ms}ms)", actor="leWOOOgo", actor_role="executor", approval_id=str(approval.id), ) # Phase 6: 發送成功通知 (fire-and-forget) asyncio.create_task( self._send_execution_notification( approval=approval, execution_status=ExecutionStatus.SUCCESS, operation_type=operation_type.value, namespace=namespace, duration_ms=result.duration_ms, ) ) else: logger.error( "background_execution_failed", approval_id=str(approval.id), operation=operation_type.value, target=resource_name, namespace=namespace, error=result.error, ) await timeline.add_event( event_type="exec", status="error", title=f"❌ K8s 執行失敗: {operation_type.value}", description=f"Error: {result.error}", actor="leWOOOgo", actor_role="executor", approval_id=str(approval.id), ) # Phase 6: 發送失敗通知 (fire-and-forget, 包含 Dry-Run 攔截) exec_status = ( ExecutionStatus.DRY_RUN_BLOCKED if "not found" in (result.error or "") else ExecutionStatus.FAILED ) asyncio.create_task( self._send_execution_notification( approval=approval, execution_status=exec_status, operation_type=operation_type.value, namespace=namespace, error_message=result.error, duration_ms=result.duration_ms, ) ) async def _send_execution_notification( self, approval: ApprovalRequest, execution_status: "ExecutionStatus", operation_type: str, namespace: str, duration_ms: int | None = None, error_message: str | None = None, ) -> None: """ Phase 6: 發送執行通知 (Post-Execution Hook) 將執行結果發送至所有已配置的通知頻道 (Discord, Slack, etc.) """ from src.services.notifications import ( NotificationMessage, get_notification_manager, ) if not settings.NOTIFICATION_ENABLED: logger.info("notification_disabled", approval_id=str(approval.id)) return try: # 建構簽核者列表 signers = [ {"name": sig.signer_name, "comment": sig.comment or ""} for sig in approval.signatures ] # 建構通知訊息 message = NotificationMessage( execution_status=execution_status, action_title=approval.action[:100], action_description=approval.description[:200] if approval.description else "", approval_id=str(approval.id), signers=signers, required_signatures=approval.required_signatures, affected_pods=approval.blast_radius.affected_pods if approval.blast_radius else 0, estimated_downtime=approval.blast_radius.estimated_downtime if approval.blast_radius else "N/A", related_services=approval.blast_radius.related_services if approval.blast_radius else [], data_impact=approval.blast_radius.data_impact.value if approval.blast_radius else "none", namespace=namespace, operation_type=operation_type, duration_ms=duration_ms, error_message=error_message, risk_level=approval.risk_level.value, ai_provider=approval.requested_by, ) # 發送通知 manager = get_notification_manager() results = await manager.send_all(message) for result in results: logger.info( "notification_result", approval_id=str(approval.id), provider=result.provider, status=result.status.value, message=result.message, ) except Exception as e: logger.exception( "notification_failed", approval_id=str(approval.id), error=str(e), ) # ============================================================================= # Singleton Instance # ============================================================================= _execution_service: ApprovalExecutionService | None = None def get_execution_service() -> ApprovalExecutionService: """ 取得 ApprovalExecutionService 單例 Returns: ApprovalExecutionService: 執行服務實例 """ global _execution_service if _execution_service is None: _execution_service = ApprovalExecutionService() return _execution_service