""" Infrastructure Execution Engine ================================ CTO-201: Kubernetes 操作執行器 Features: - 非同步 kubernetes_asyncio - Dry-run 資源驗證 - 防禦性邊界處理 - 完整 AuditLog 記錄 Supported Operations: - RESTART_DEPLOYMENT: 重啟 Deployment (patch annotation) - DELETE_POD: 刪除 Pod 防禦性工程鐵律: - Dry-run Mandatory: 執行前必須驗證資源存在 - Edge Case Anticipation: 超時、網路中斷處理 """ import asyncio import time from dataclasses import dataclass from datetime import UTC, datetime from enum import Enum from pathlib import Path from typing import Any import structlog from src.core.config import settings from src.db.base import get_db_context from src.db.models import AuditLog from src.models.approval import ApprovalRequest logger = structlog.get_logger(__name__) # ============================================================================= # Operation Types # ============================================================================= class OperationType(str, Enum): """支援的 K8s 操作類型""" RESTART_DEPLOYMENT = "RESTART_DEPLOYMENT" DELETE_POD = "DELETE_POD" SCALE_DEPLOYMENT = "SCALE_DEPLOYMENT" # ============================================================================= # Result Types # ============================================================================= @dataclass class DryRunResult: """Dry-run 驗證結果""" passed: bool message: str resource_exists: bool = False resource_info: dict[str, Any] | None = None @dataclass class ExecutionResult: """執行結果""" success: bool message: str operation_type: OperationType target_resource: str namespace: str duration_ms: int k8s_response: dict[str, Any] | None = None error: str | None = None # ============================================================================= # Action Executor # ============================================================================= class ActionExecutor: """ 基礎設施執行引擎 負責: 1. 連接 K3s 叢集 2. Dry-run 驗證資源存在 3. 執行實際操作 4. 寫入 AuditLog """ def __init__(self): self._initialized = False self._api_client = None self._core_v1 = None self._apps_v1 = None async def initialize(self) -> bool: """ 初始化 K8s 連線 優先順序: 1. In-cluster 配置 (ServiceAccount Token) - 用於 K8s Pod 內部 2. Kubeconfig 檔案 - 用於本地開發 Returns: bool: 是否成功初始化 """ if self._initialized: return True try: from kubernetes_asyncio import client from kubernetes_asyncio.config import ( ConfigException, load_incluster_config, load_kube_config, ) config_source = None # 優先嘗試 in-cluster 配置 (K8s Pod 內部) try: load_incluster_config() config_source = "in-cluster" logger.info("k8s_using_incluster_config") except ConfigException: # 不在 K8s 內部,嘗試 kubeconfig 檔案 kubeconfig_path = Path(settings.KUBECONFIG_PATH) if not kubeconfig_path.is_absolute(): kubeconfig_path = Path(__file__).parent.parent.parent / settings.KUBECONFIG_PATH if not kubeconfig_path.exists(): logger.error( "kubeconfig_not_found", path=str(kubeconfig_path), ) return False await load_kube_config(config_file=str(kubeconfig_path)) config_source = str(kubeconfig_path) # 建立 API clients self._api_client = client.ApiClient() self._core_v1 = client.CoreV1Api(self._api_client) self._apps_v1 = client.AppsV1Api(self._api_client) self._initialized = True logger.info( "k8s_executor_initialized", config_source=config_source, ) return True except Exception as e: logger.error( "k8s_executor_init_failed", error=str(e), ) return False async def close(self) -> None: """關閉連線""" if self._api_client: await self._api_client.close() self._api_client = None self._core_v1 = None self._apps_v1 = None self._initialized = False # ========================================================================= # Dry-Run Validation # ========================================================================= async def validate_deployment_exists( self, name: str, namespace: str = "default", ) -> DryRunResult: """ 驗證 Deployment 是否存在 [Dry-run Mandatory] 執行操作前必須呼叫此方法 """ if not await self.initialize(): return DryRunResult( passed=False, message="K8s connection not available", resource_exists=False, ) try: deployment = await self._apps_v1.read_namespaced_deployment( name=name, namespace=namespace, ) return DryRunResult( passed=True, message=f"Deployment '{name}' found in namespace '{namespace}'", resource_exists=True, resource_info={ "name": deployment.metadata.name, "namespace": deployment.metadata.namespace, "replicas": deployment.spec.replicas, "ready_replicas": deployment.status.ready_replicas or 0, "uid": deployment.metadata.uid, }, ) except Exception as e: error_msg = str(e) if "404" in error_msg or "not found" in error_msg.lower(): return DryRunResult( passed=False, message=f"Deployment '{name}' not found in namespace '{namespace}'", resource_exists=False, ) return DryRunResult( passed=False, message=f"Failed to validate deployment: {error_msg}", resource_exists=False, ) async def validate_pod_exists( self, name: str, namespace: str = "default", ) -> DryRunResult: """ 驗證 Pod 是否存在 [Dry-run Mandatory] 執行操作前必須呼叫此方法 """ if not await self.initialize(): return DryRunResult( passed=False, message="K8s connection not available", resource_exists=False, ) try: pod = await self._core_v1.read_namespaced_pod( name=name, namespace=namespace, ) return DryRunResult( passed=True, message=f"Pod '{name}' found in namespace '{namespace}'", resource_exists=True, resource_info={ "name": pod.metadata.name, "namespace": pod.metadata.namespace, "phase": pod.status.phase, "uid": pod.metadata.uid, }, ) except Exception as e: error_msg = str(e) if "404" in error_msg or "not found" in error_msg.lower(): return DryRunResult( passed=False, message=f"Pod '{name}' not found in namespace '{namespace}'", resource_exists=False, ) return DryRunResult( passed=False, message=f"Failed to validate pod: {error_msg}", resource_exists=False, ) async def validate_action( self, operation_type: OperationType, resource_name: str, namespace: str = "default", ) -> DryRunResult: """ 通用 Dry-run 驗證入口 根據操作類型驗證目標資源是否存在 """ logger.info( "dry_run_validation_start", operation=operation_type.value, resource=resource_name, namespace=namespace, ) if operation_type == OperationType.RESTART_DEPLOYMENT: result = await self.validate_deployment_exists(resource_name, namespace) elif operation_type == OperationType.DELETE_POD: result = await self.validate_pod_exists(resource_name, namespace) elif operation_type == OperationType.SCALE_DEPLOYMENT: result = await self.validate_deployment_exists(resource_name, namespace) else: result = DryRunResult( passed=False, message=f"Unknown operation type: {operation_type}", resource_exists=False, ) logger.info( "dry_run_validation_complete", operation=operation_type.value, resource=resource_name, passed=result.passed, message=result.message, ) return result # ========================================================================= # Execute Operations # ========================================================================= async def restart_deployment( self, name: str, namespace: str = "default", ) -> ExecutionResult: """ 重啟 Deployment 實作方式: patch annotation 觸發 rollout restart 等同於: kubectl rollout restart deployment/ Shadow Mode: 當 SHADOW_MODE_ENABLED=True 時,僅記錄操作不執行 """ start_time = time.monotonic() target = f"deployment/{name}" # ===================================================================== # Shadow Mode Check (物理繳械) # ===================================================================== if settings.SHADOW_MODE_ENABLED: duration_ms = int((time.monotonic() - start_time) * 1000) logger.warning( "shadow_mode_intercept", operation="RESTART_DEPLOYMENT", target=target, namespace=namespace, message="[SHADOW MODE] Operation blocked - dry-run only", would_execute=f"kubectl rollout restart deployment/{name} -n {namespace}", ) return ExecutionResult( success=True, message=f"[SHADOW MODE] Deployment '{name}' restart simulated (dry-run only)", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=target, namespace=namespace, duration_ms=duration_ms, k8s_response={ "shadow_mode": True, "dry_run": True, "simulated_action": f"kubectl rollout restart deployment/{name} -n {namespace}", }, ) if not await self.initialize(): return ExecutionResult( success=False, message="K8s connection not available", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=target, namespace=namespace, duration_ms=0, error="K8s not initialized", ) try: # Patch annotation to trigger restart patch_body = { "spec": { "template": { "metadata": { "annotations": { "kubectl.kubernetes.io/restartedAt": datetime.now(UTC).isoformat() } } } } } result = await asyncio.wait_for( self._apps_v1.patch_namespaced_deployment( name=name, namespace=namespace, body=patch_body, ), timeout=settings.K8S_OPERATION_TIMEOUT, ) duration_ms = int((time.monotonic() - start_time) * 1000) logger.info( "deployment_restart_success", deployment=name, namespace=namespace, duration_ms=duration_ms, ) return ExecutionResult( success=True, message=f"Deployment '{name}' restart triggered", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=target, namespace=namespace, duration_ms=duration_ms, k8s_response={ "name": result.metadata.name, "uid": result.metadata.uid, "generation": result.metadata.generation, }, ) except TimeoutError: duration_ms = int((time.monotonic() - start_time) * 1000) error_msg = f"Operation timed out after {settings.K8S_OPERATION_TIMEOUT}s" logger.error( "deployment_restart_timeout", deployment=name, namespace=namespace, ) return ExecutionResult( success=False, message=error_msg, operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=target, namespace=namespace, duration_ms=duration_ms, error=error_msg, ) except Exception as e: duration_ms = int((time.monotonic() - start_time) * 1000) error_msg = str(e) logger.error( "deployment_restart_failed", deployment=name, namespace=namespace, error=error_msg, ) return ExecutionResult( success=False, message=f"Failed to restart deployment: {error_msg}", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=target, namespace=namespace, duration_ms=duration_ms, error=error_msg, ) async def delete_pod( self, name: str, namespace: str = "default", ) -> ExecutionResult: """ 刪除 Pod 等同於: kubectl delete pod -n Shadow Mode: 當 SHADOW_MODE_ENABLED=True 時,僅記錄操作不執行 """ start_time = time.monotonic() target = f"pod/{name}" # ===================================================================== # Shadow Mode Check (物理繳械) # ===================================================================== if settings.SHADOW_MODE_ENABLED: duration_ms = int((time.monotonic() - start_time) * 1000) logger.warning( "shadow_mode_intercept", operation="DELETE_POD", target=target, namespace=namespace, message="[SHADOW MODE] Operation blocked - dry-run only", would_execute=f"kubectl delete pod {name} -n {namespace}", ) return ExecutionResult( success=True, message=f"[SHADOW MODE] Pod '{name}' deletion simulated (dry-run only)", operation_type=OperationType.DELETE_POD, target_resource=target, namespace=namespace, duration_ms=duration_ms, k8s_response={ "shadow_mode": True, "dry_run": True, "simulated_action": f"kubectl delete pod {name} -n {namespace}", }, ) if not await self.initialize(): return ExecutionResult( success=False, message="K8s connection not available", operation_type=OperationType.DELETE_POD, target_resource=target, namespace=namespace, duration_ms=0, error="K8s not initialized", ) try: result = await asyncio.wait_for( self._core_v1.delete_namespaced_pod( name=name, namespace=namespace, ), timeout=settings.K8S_OPERATION_TIMEOUT, ) duration_ms = int((time.monotonic() - start_time) * 1000) logger.info( "pod_delete_success", pod=name, namespace=namespace, duration_ms=duration_ms, ) return ExecutionResult( success=True, message=f"Pod '{name}' deleted successfully", operation_type=OperationType.DELETE_POD, target_resource=target, namespace=namespace, duration_ms=duration_ms, k8s_response={ "status": result.status if hasattr(result, 'status') else "Deleted", }, ) except TimeoutError: duration_ms = int((time.monotonic() - start_time) * 1000) error_msg = f"Operation timed out after {settings.K8S_OPERATION_TIMEOUT}s" logger.error( "pod_delete_timeout", pod=name, namespace=namespace, ) return ExecutionResult( success=False, message=error_msg, operation_type=OperationType.DELETE_POD, target_resource=target, namespace=namespace, duration_ms=duration_ms, error=error_msg, ) except Exception as e: duration_ms = int((time.monotonic() - start_time) * 1000) error_msg = str(e) logger.error( "pod_delete_failed", pod=name, namespace=namespace, error=error_msg, ) return ExecutionResult( success=False, message=f"Failed to delete pod: {error_msg}", operation_type=OperationType.DELETE_POD, target_resource=target, namespace=namespace, duration_ms=duration_ms, error=error_msg, ) # ========================================================================= # Raw Kubectl Execution (Phase 6.6) # ========================================================================= async def execute_kubectl_command( self, command: str, timeout_sec: int = 60, ) -> ExecutionResult: """ Phase 6.6: 使用 asyncio.create_subprocess_shell 執行原生 kubectl 指令 統帥鐵律: - 所有指令必須包含 -n namespace - 禁止毀滅性指令 (rm -rf, DROP, etc.) - Shadow Mode 下僅記錄不執行 Args: command: kubectl 指令 (e.g., "kubectl get pods -n awoooi-prod") timeout_sec: 超時秒數 Returns: ExecutionResult: 執行結果 """ start_time = time.monotonic() # 安全檢查: 必須是 kubectl 指令 if not command.strip().startswith("kubectl"): return ExecutionResult( success=False, message="Invalid command: must start with 'kubectl'", operation_type=OperationType.RESTART_DEPLOYMENT, # Generic target_resource=command[:50], namespace="unknown", duration_ms=0, error="Security: Only kubectl commands allowed", ) # 安全檢查: 禁止毀滅性指令 forbidden_patterns = [ "delete namespace", "delete ns", "rm -rf", "drop database", "truncate", "--all-namespaces -o wide | xargs", # 防止批次刪除 ] command_lower = command.lower() for pattern in forbidden_patterns: if pattern in command_lower: return ExecutionResult( success=False, message=f"Forbidden command pattern: {pattern}", operation_type=OperationType.DELETE_POD, target_resource=command[:50], namespace="unknown", duration_ms=0, error=f"Security: Forbidden pattern '{pattern}'", ) # Shadow Mode 檢查 if settings.SHADOW_MODE_ENABLED: duration_ms = int((time.monotonic() - start_time) * 1000) logger.warning( "shadow_mode_kubectl_intercept", command=command[:100], message="[SHADOW MODE] kubectl blocked - dry-run only", ) return ExecutionResult( success=True, message=f"[SHADOW MODE] kubectl simulated: {command[:50]}...", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=command[:50], namespace="shadow", duration_ms=duration_ms, k8s_response={ "shadow_mode": True, "simulated_command": command, }, ) # 執行 kubectl 指令 try: logger.info( "kubectl_execution_start", command=command[:100], ) process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout_sec, ) except TimeoutError: process.kill() await process.wait() duration_ms = int((time.monotonic() - start_time) * 1000) return ExecutionResult( success=False, message=f"kubectl timed out after {timeout_sec}s", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=command[:50], namespace="unknown", duration_ms=duration_ms, error=f"Timeout after {timeout_sec}s", ) duration_ms = int((time.monotonic() - start_time) * 1000) stdout_str = stdout.decode("utf-8", errors="replace").strip() stderr_str = stderr.decode("utf-8", errors="replace").strip() if process.returncode == 0: logger.info( "kubectl_execution_success", command=command[:100], returncode=process.returncode, duration_ms=duration_ms, stdout_preview=stdout_str[:200], ) return ExecutionResult( success=True, message="kubectl executed successfully", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=command[:50], namespace="executed", duration_ms=duration_ms, k8s_response={ "returncode": process.returncode, "stdout": stdout_str[:1000], # 限制長度 "stderr": stderr_str[:500] if stderr_str else None, }, ) else: logger.error( "kubectl_execution_failed", command=command[:100], returncode=process.returncode, stderr=stderr_str[:500], ) return ExecutionResult( success=False, message=f"kubectl failed with code {process.returncode}", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=command[:50], namespace="failed", duration_ms=duration_ms, error=stderr_str[:500] or f"Exit code: {process.returncode}", k8s_response={ "returncode": process.returncode, "stdout": stdout_str[:500], "stderr": stderr_str[:500], }, ) except Exception as e: duration_ms = int((time.monotonic() - start_time) * 1000) error_msg = str(e) logger.exception( "kubectl_execution_exception", command=command[:100], error=error_msg, ) return ExecutionResult( success=False, message=f"kubectl execution error: {error_msg}", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=command[:50], namespace="error", duration_ms=duration_ms, error=error_msg, ) # ========================================================================= # High-Level Execution with Audit Log # ========================================================================= async def execute_with_audit( self, approval: ApprovalRequest, operation_type: OperationType, resource_name: str, namespace: str = "default", ) -> ExecutionResult: """ 執行操作並寫入 AuditLog 完整流程: 1. Dry-run 驗證 2. 執行操作 3. 寫入 AuditLog 4. 更新 Approval 狀態 """ # Step 1: Dry-run validation dry_run = await self.validate_action(operation_type, resource_name, namespace) if not dry_run.passed: # Write failed audit log await self._write_audit_log( approval_id=str(approval.id), operation_type=operation_type, target_resource=f"{operation_type.value.lower()}/{resource_name}", namespace=namespace, success=False, error_message=dry_run.message, executed_by=approval.requested_by, dry_run_passed=False, dry_run_message=dry_run.message, ) return ExecutionResult( success=False, message=f"Dry-run failed: {dry_run.message}", operation_type=operation_type, target_resource=f"{operation_type.value.lower()}/{resource_name}", namespace=namespace, duration_ms=0, error=dry_run.message, ) # Step 2: Execute operation if operation_type == OperationType.RESTART_DEPLOYMENT: result = await self.restart_deployment(resource_name, namespace) elif operation_type == OperationType.DELETE_POD: result = await self.delete_pod(resource_name, namespace) else: result = ExecutionResult( success=False, message=f"Unsupported operation: {operation_type}", operation_type=operation_type, target_resource=f"{operation_type.value.lower()}/{resource_name}", namespace=namespace, duration_ms=0, error="Unsupported operation", ) # Step 3: Write audit log await self._write_audit_log( approval_id=str(approval.id), operation_type=operation_type, target_resource=result.target_resource, namespace=namespace, success=result.success, error_message=result.error, k8s_response=result.k8s_response, executed_by=approval.requested_by, execution_duration_ms=result.duration_ms, dry_run_passed=True, dry_run_message=dry_run.message, ) return result async def _write_audit_log( self, approval_id: str, operation_type: OperationType, target_resource: str, namespace: str, success: bool, executed_by: str, error_message: str | None = None, k8s_response: dict[str, Any] | None = None, execution_duration_ms: int | None = None, dry_run_passed: bool = True, dry_run_message: str | None = None, ) -> None: """寫入稽核日誌到 SQLite""" try: async with get_db_context() as db: audit_log = AuditLog( approval_id=approval_id, operation_type=operation_type.value, target_resource=target_resource, namespace=namespace, success=success, error_message=error_message, k8s_response=k8s_response, executed_by=executed_by, execution_duration_ms=execution_duration_ms, dry_run_passed=dry_run_passed, dry_run_message=dry_run_message, ) db.add(audit_log) await db.commit() logger.info( "audit_log_written", approval_id=approval_id, operation=operation_type.value, success=success, ) except Exception as e: logger.error( "audit_log_write_failed", approval_id=approval_id, error=str(e), ) # ========================================================================= # Utility Methods # ========================================================================= async def list_namespaces(self) -> list[str]: """ 列出所有 Namespace 用於測試 K8s 連線 """ if not await self.initialize(): return [] try: result = await self._core_v1.list_namespace() namespaces = [ns.metadata.name for ns in result.items] logger.info( "namespaces_listed", count=len(namespaces), ) return namespaces except Exception as e: logger.error( "list_namespaces_failed", error=str(e), ) return [] # ============================================================================= # Singleton Instance # ============================================================================= _executor: ActionExecutor | None = None def get_executor() -> ActionExecutor: """取得全域執行器實例""" global _executor if _executor is None: _executor = ActionExecutor() return _executor async def close_executor() -> None: """關閉執行器連線""" global _executor if _executor is not None: await _executor.close() _executor = None # ============================================================================= # Phase 6.6: Execute Approved Proposal (Background Task Entry Point) # ============================================================================= async def execute_approved_proposal(approval_id: str) -> ExecutionResult: """ Phase 6.6: 執行已核准的 Proposal 此函數由 BackgroundTasks 呼叫,完整流程: 1. 從 Redis/DB 讀取 approval 資料 2. 驗證狀態為 approved 3. 提取 kubectl_command 4. 使用 asyncio.create_subprocess_shell 執行 5. 更新狀態為 executed 或 failed Args: approval_id: Approval ID (UUID string) Returns: ExecutionResult: 執行結果 """ from src.services.approval_db import get_approval_service from src.services.multi_sig_redis import MultiSigRedisService logger.info( "execute_approved_proposal_start", approval_id=approval_id, ) # Step 1: 讀取 approval 資料 (Redis 優先) redis_service = MultiSigRedisService() approval_data = await redis_service.get_approval(approval_id) if not approval_data: # Fallback to DB db_service = get_approval_service() approval_obj = await db_service.get_approval_by_id(approval_id) if approval_obj: approval_data = { "id": str(approval_obj.id), "status": approval_obj.status.value, "action": approval_obj.action, "metadata": approval_obj.metadata or {}, } if not approval_data: logger.error( "execute_approved_proposal_not_found", approval_id=approval_id, ) return ExecutionResult( success=False, message=f"Approval {approval_id} not found", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=approval_id, namespace="unknown", duration_ms=0, error="Approval not found", ) # Step 2: 驗證狀態 status = approval_data.get("status", "").lower() if status != "approved": logger.warning( "execute_approved_proposal_wrong_status", approval_id=approval_id, status=status, ) return ExecutionResult( success=False, message=f"Cannot execute: status is '{status}', expected 'approved'", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=approval_id, namespace="unknown", duration_ms=0, error=f"Invalid status: {status}", ) # Step 3: 提取 kubectl_command metadata = approval_data.get("metadata", {}) if isinstance(metadata, str): import json try: metadata = json.loads(metadata) except Exception: metadata = {} kubectl_command = metadata.get("kubectl_command", "") # 若無 kubectl_command,嘗試從 action 解析 if not kubectl_command: action = approval_data.get("action", "") if "kubectl" in action.lower(): kubectl_command = action else: # 建構預設指令 namespace = approval_data.get("namespace", "awoooi-prod") resource = approval_data.get("resource_name", "") if "restart" in action.lower(): kubectl_command = f"kubectl rollout restart deployment/{resource} -n {namespace}" elif "scale" in action.lower(): kubectl_command = f"kubectl get deployment/{resource} -n {namespace}" else: kubectl_command = f"kubectl get pods -n {namespace}" if not kubectl_command: logger.error( "execute_approved_proposal_no_command", approval_id=approval_id, ) return ExecutionResult( success=False, message="No kubectl_command found in approval", operation_type=OperationType.RESTART_DEPLOYMENT, target_resource=approval_id, namespace="unknown", duration_ms=0, error="Missing kubectl_command", ) logger.info( "execute_approved_proposal_command_extracted", approval_id=approval_id, kubectl_command=kubectl_command[:100], ) # Step 4: 執行 kubectl 指令 executor = get_executor() result = await executor.execute_kubectl_command(kubectl_command) # Step 5: 更新狀態 new_status = "executed" if result.success else "failed" execution_log = { "executed_at": datetime.now(UTC).isoformat(), "success": result.success, "stdout": result.k8s_response.get("stdout", "") if result.k8s_response else "", "stderr": result.error or "", "duration_ms": result.duration_ms, } try: await redis_service.update_status( approval_id=approval_id, status=new_status, executor_id="k8s_executor", execution_result=execution_log, ) logger.info( "execute_approved_proposal_status_updated", approval_id=approval_id, new_status=new_status, success=result.success, ) except Exception as e: logger.error( "execute_approved_proposal_status_update_failed", approval_id=approval_id, error=str(e), ) return result