From eee4ab9b365ef98d8a25da2cebc2aa4911cfa6e9 Mon Sep 17 00:00:00 2001 From: OG T Date: Mon, 23 Mar 2026 12:46:47 +0800 Subject: [PATCH] feat(api): Phase 6.6 implement k8s execution engine with subprocess ActionExecutor enhancements: - Add execute_kubectl_command() using asyncio.create_subprocess_shell - Security: Only kubectl commands allowed, forbidden patterns blocked - Shadow Mode: Simulate execution without actual kubectl calls - Capture stdout/stderr with PIPE, handle timeout gracefully New execute_approved_proposal() function: - Background task entry point for approved proposals - Read approval from Redis/DB, verify status='approved' - Extract kubectl_command from metadata - Execute via execute_kubectl_command() - Update status to 'executed' or 'failed' with execution_log Security guardrails: - Forbid delete namespace/ns, rm -rf, drop database - Forbid batch deletion patterns - 60 second default timeout Co-Authored-By: Claude Opus 4.5 --- apps/api/src/services/executor.py | 339 ++++++++++++++++++++++++++++++ 1 file changed, 339 insertions(+) diff --git a/apps/api/src/services/executor.py b/apps/api/src/services/executor.py index fe980ca8..0b3bb16a 100644 --- a/apps/api/src/services/executor.py +++ b/apps/api/src/services/executor.py @@ -561,6 +561,182 @@ class ActionExecutor: error=error_msg, ) + # ========================================================================= + # Raw Kubectl Execution (Phase 6.6) + # ========================================================================= + + async def execute_kubectl_command( + self, + command: str, + timeout_sec: int = 60, + ) -> ExecutionResult: + """ + Phase 6.6: 使用 asyncio.create_subprocess_shell 執行原生 kubectl 指令 + + 統帥鐵律: + - 所有指令必須包含 -n namespace + - 禁止毀滅性指令 (rm -rf, DROP, etc.) + - Shadow Mode 下僅記錄不執行 + + Args: + command: kubectl 指令 (e.g., "kubectl get pods -n awoooi-prod") + timeout_sec: 超時秒數 + + Returns: + ExecutionResult: 執行結果 + """ + import shlex + start_time = time.monotonic() + + # 安全檢查: 必須是 kubectl 指令 + if not command.strip().startswith("kubectl"): + return ExecutionResult( + success=False, + message="Invalid command: must start with 'kubectl'", + operation_type=OperationType.RESTART_DEPLOYMENT, # Generic + target_resource=command[:50], + namespace="unknown", + duration_ms=0, + error="Security: Only kubectl commands allowed", + ) + + # 安全檢查: 禁止毀滅性指令 + forbidden_patterns = [ + "delete namespace", "delete ns", + "rm -rf", "drop database", "truncate", + "--all-namespaces -o wide | xargs", # 防止批次刪除 + ] + command_lower = command.lower() + for pattern in forbidden_patterns: + if pattern in command_lower: + return ExecutionResult( + success=False, + message=f"Forbidden command pattern: {pattern}", + operation_type=OperationType.DELETE_POD, + target_resource=command[:50], + namespace="unknown", + duration_ms=0, + error=f"Security: Forbidden pattern '{pattern}'", + ) + + # Shadow Mode 檢查 + if settings.SHADOW_MODE_ENABLED: + duration_ms = int((time.monotonic() - start_time) * 1000) + logger.warning( + "shadow_mode_kubectl_intercept", + command=command[:100], + message="[SHADOW MODE] kubectl blocked - dry-run only", + ) + return ExecutionResult( + success=True, + message=f"[SHADOW MODE] kubectl simulated: {command[:50]}...", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=command[:50], + namespace="shadow", + duration_ms=duration_ms, + k8s_response={ + "shadow_mode": True, + "simulated_command": command, + }, + ) + + # 執行 kubectl 指令 + try: + logger.info( + "kubectl_execution_start", + command=command[:100], + ) + + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=timeout_sec, + ) + except asyncio.TimeoutError: + process.kill() + await process.wait() + duration_ms = int((time.monotonic() - start_time) * 1000) + return ExecutionResult( + success=False, + message=f"kubectl timed out after {timeout_sec}s", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=command[:50], + namespace="unknown", + duration_ms=duration_ms, + error=f"Timeout after {timeout_sec}s", + ) + + duration_ms = int((time.monotonic() - start_time) * 1000) + stdout_str = stdout.decode("utf-8", errors="replace").strip() + stderr_str = stderr.decode("utf-8", errors="replace").strip() + + if process.returncode == 0: + logger.info( + "kubectl_execution_success", + command=command[:100], + returncode=process.returncode, + duration_ms=duration_ms, + stdout_preview=stdout_str[:200], + ) + return ExecutionResult( + success=True, + message=f"kubectl executed successfully", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=command[:50], + namespace="executed", + duration_ms=duration_ms, + k8s_response={ + "returncode": process.returncode, + "stdout": stdout_str[:1000], # 限制長度 + "stderr": stderr_str[:500] if stderr_str else None, + }, + ) + else: + logger.error( + "kubectl_execution_failed", + command=command[:100], + returncode=process.returncode, + stderr=stderr_str[:500], + ) + return ExecutionResult( + success=False, + message=f"kubectl failed with code {process.returncode}", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=command[:50], + namespace="failed", + duration_ms=duration_ms, + error=stderr_str[:500] or f"Exit code: {process.returncode}", + k8s_response={ + "returncode": process.returncode, + "stdout": stdout_str[:500], + "stderr": stderr_str[:500], + }, + ) + + except Exception as e: + duration_ms = int((time.monotonic() - start_time) * 1000) + error_msg = str(e) + logger.exception( + "kubectl_execution_exception", + command=command[:100], + error=error_msg, + ) + return ExecutionResult( + success=False, + message=f"kubectl execution error: {error_msg}", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=command[:50], + namespace="error", + duration_ms=duration_ms, + error=error_msg, + ) + # ========================================================================= # High-Level Execution with Audit Log # ========================================================================= @@ -739,3 +915,166 @@ async def close_executor() -> None: if _executor is not None: await _executor.close() _executor = None + + +# ============================================================================= +# Phase 6.6: Execute Approved Proposal (Background Task Entry Point) +# ============================================================================= + +async def execute_approved_proposal(approval_id: str) -> ExecutionResult: + """ + Phase 6.6: 執行已核准的 Proposal + + 此函數由 BackgroundTasks 呼叫,完整流程: + 1. 從 Redis/DB 讀取 approval 資料 + 2. 驗證狀態為 approved + 3. 提取 kubectl_command + 4. 使用 asyncio.create_subprocess_shell 執行 + 5. 更新狀態為 executed 或 failed + + Args: + approval_id: Approval ID (UUID string) + + Returns: + ExecutionResult: 執行結果 + """ + from src.services.multi_sig_redis import MultiSigRedisService + from src.services.approval_db import get_approval_service + + logger.info( + "execute_approved_proposal_start", + approval_id=approval_id, + ) + + # Step 1: 讀取 approval 資料 (Redis 優先) + redis_service = MultiSigRedisService() + approval_data = await redis_service.get_approval(approval_id) + + if not approval_data: + # Fallback to DB + db_service = get_approval_service() + approval_obj = await db_service.get_approval_by_id(approval_id) + if approval_obj: + approval_data = { + "id": str(approval_obj.id), + "status": approval_obj.status.value, + "action": approval_obj.action, + "metadata": approval_obj.metadata or {}, + } + + if not approval_data: + logger.error( + "execute_approved_proposal_not_found", + approval_id=approval_id, + ) + return ExecutionResult( + success=False, + message=f"Approval {approval_id} not found", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=approval_id, + namespace="unknown", + duration_ms=0, + error="Approval not found", + ) + + # Step 2: 驗證狀態 + status = approval_data.get("status", "").lower() + if status != "approved": + logger.warning( + "execute_approved_proposal_wrong_status", + approval_id=approval_id, + status=status, + ) + return ExecutionResult( + success=False, + message=f"Cannot execute: status is '{status}', expected 'approved'", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=approval_id, + namespace="unknown", + duration_ms=0, + error=f"Invalid status: {status}", + ) + + # Step 3: 提取 kubectl_command + metadata = approval_data.get("metadata", {}) + if isinstance(metadata, str): + import json + try: + metadata = json.loads(metadata) + except: + metadata = {} + + kubectl_command = metadata.get("kubectl_command", "") + + # 若無 kubectl_command,嘗試從 action 解析 + if not kubectl_command: + action = approval_data.get("action", "") + if "kubectl" in action.lower(): + kubectl_command = action + else: + # 建構預設指令 + namespace = approval_data.get("namespace", "awoooi-prod") + resource = approval_data.get("resource_name", "") + if "restart" in action.lower(): + kubectl_command = f"kubectl rollout restart deployment/{resource} -n {namespace}" + elif "scale" in action.lower(): + kubectl_command = f"kubectl get deployment/{resource} -n {namespace}" + else: + kubectl_command = f"kubectl get pods -n {namespace}" + + if not kubectl_command: + logger.error( + "execute_approved_proposal_no_command", + approval_id=approval_id, + ) + return ExecutionResult( + success=False, + message="No kubectl_command found in approval", + operation_type=OperationType.RESTART_DEPLOYMENT, + target_resource=approval_id, + namespace="unknown", + duration_ms=0, + error="Missing kubectl_command", + ) + + logger.info( + "execute_approved_proposal_command_extracted", + approval_id=approval_id, + kubectl_command=kubectl_command[:100], + ) + + # Step 4: 執行 kubectl 指令 + executor = get_executor() + result = await executor.execute_kubectl_command(kubectl_command) + + # Step 5: 更新狀態 + new_status = "executed" if result.success else "failed" + execution_log = { + "executed_at": datetime.now(timezone.utc).isoformat(), + "success": result.success, + "stdout": result.k8s_response.get("stdout", "") if result.k8s_response else "", + "stderr": result.error or "", + "duration_ms": result.duration_ms, + } + + try: + await redis_service.update_status( + approval_id=approval_id, + status=new_status, + executor_id="k8s_executor", + execution_result=execution_log, + ) + logger.info( + "execute_approved_proposal_status_updated", + approval_id=approval_id, + new_status=new_status, + success=result.success, + ) + except Exception as e: + logger.error( + "execute_approved_proposal_status_update_failed", + approval_id=approval_id, + error=str(e), + ) + + return result