feat(api): Phase 6.6 implement k8s execution engine with subprocess

ActionExecutor enhancements:
- Add execute_kubectl_command() using asyncio.create_subprocess_shell
- Security: Only kubectl commands allowed, forbidden patterns blocked
- Shadow Mode: Simulate execution without actual kubectl calls
- Capture stdout/stderr with PIPE, handle timeout gracefully

New execute_approved_proposal() function:
- Background task entry point for approved proposals
- Read approval from Redis/DB, verify status='approved'
- Extract kubectl_command from metadata
- Execute via execute_kubectl_command()
- Update status to 'executed' or 'failed' with execution_log

Security guardrails:
- Forbid delete namespace/ns, rm -rf, drop database
- Forbid batch deletion patterns
- 60 second default timeout

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-23 12:46:47 +08:00
parent 28fa8e6af4
commit eee4ab9b36

View File

@@ -561,6 +561,182 @@ class ActionExecutor:
error=error_msg,
)
# =========================================================================
# Raw Kubectl Execution (Phase 6.6)
# =========================================================================
async def execute_kubectl_command(
self,
command: str,
timeout_sec: int = 60,
) -> ExecutionResult:
"""
Phase 6.6: 使用 asyncio.create_subprocess_shell 執行原生 kubectl 指令
統帥鐵律:
- 所有指令必須包含 -n namespace
- 禁止毀滅性指令 (rm -rf, DROP, etc.)
- Shadow Mode 下僅記錄不執行
Args:
command: kubectl 指令 (e.g., "kubectl get pods -n awoooi-prod")
timeout_sec: 超時秒數
Returns:
ExecutionResult: 執行結果
"""
import shlex
start_time = time.monotonic()
# 安全檢查: 必須是 kubectl 指令
if not command.strip().startswith("kubectl"):
return ExecutionResult(
success=False,
message="Invalid command: must start with 'kubectl'",
operation_type=OperationType.RESTART_DEPLOYMENT, # Generic
target_resource=command[:50],
namespace="unknown",
duration_ms=0,
error="Security: Only kubectl commands allowed",
)
# 安全檢查: 禁止毀滅性指令
forbidden_patterns = [
"delete namespace", "delete ns",
"rm -rf", "drop database", "truncate",
"--all-namespaces -o wide | xargs", # 防止批次刪除
]
command_lower = command.lower()
for pattern in forbidden_patterns:
if pattern in command_lower:
return ExecutionResult(
success=False,
message=f"Forbidden command pattern: {pattern}",
operation_type=OperationType.DELETE_POD,
target_resource=command[:50],
namespace="unknown",
duration_ms=0,
error=f"Security: Forbidden pattern '{pattern}'",
)
# Shadow Mode 檢查
if settings.SHADOW_MODE_ENABLED:
duration_ms = int((time.monotonic() - start_time) * 1000)
logger.warning(
"shadow_mode_kubectl_intercept",
command=command[:100],
message="[SHADOW MODE] kubectl blocked - dry-run only",
)
return ExecutionResult(
success=True,
message=f"[SHADOW MODE] kubectl simulated: {command[:50]}...",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=command[:50],
namespace="shadow",
duration_ms=duration_ms,
k8s_response={
"shadow_mode": True,
"simulated_command": command,
},
)
# 執行 kubectl 指令
try:
logger.info(
"kubectl_execution_start",
command=command[:100],
)
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout_sec,
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
duration_ms = int((time.monotonic() - start_time) * 1000)
return ExecutionResult(
success=False,
message=f"kubectl timed out after {timeout_sec}s",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=command[:50],
namespace="unknown",
duration_ms=duration_ms,
error=f"Timeout after {timeout_sec}s",
)
duration_ms = int((time.monotonic() - start_time) * 1000)
stdout_str = stdout.decode("utf-8", errors="replace").strip()
stderr_str = stderr.decode("utf-8", errors="replace").strip()
if process.returncode == 0:
logger.info(
"kubectl_execution_success",
command=command[:100],
returncode=process.returncode,
duration_ms=duration_ms,
stdout_preview=stdout_str[:200],
)
return ExecutionResult(
success=True,
message=f"kubectl executed successfully",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=command[:50],
namespace="executed",
duration_ms=duration_ms,
k8s_response={
"returncode": process.returncode,
"stdout": stdout_str[:1000], # 限制長度
"stderr": stderr_str[:500] if stderr_str else None,
},
)
else:
logger.error(
"kubectl_execution_failed",
command=command[:100],
returncode=process.returncode,
stderr=stderr_str[:500],
)
return ExecutionResult(
success=False,
message=f"kubectl failed with code {process.returncode}",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=command[:50],
namespace="failed",
duration_ms=duration_ms,
error=stderr_str[:500] or f"Exit code: {process.returncode}",
k8s_response={
"returncode": process.returncode,
"stdout": stdout_str[:500],
"stderr": stderr_str[:500],
},
)
except Exception as e:
duration_ms = int((time.monotonic() - start_time) * 1000)
error_msg = str(e)
logger.exception(
"kubectl_execution_exception",
command=command[:100],
error=error_msg,
)
return ExecutionResult(
success=False,
message=f"kubectl execution error: {error_msg}",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=command[:50],
namespace="error",
duration_ms=duration_ms,
error=error_msg,
)
# =========================================================================
# High-Level Execution with Audit Log
# =========================================================================
@@ -739,3 +915,166 @@ async def close_executor() -> None:
if _executor is not None:
await _executor.close()
_executor = None
# =============================================================================
# Phase 6.6: Execute Approved Proposal (Background Task Entry Point)
# =============================================================================
async def execute_approved_proposal(approval_id: str) -> ExecutionResult:
"""
Phase 6.6: 執行已核准的 Proposal
此函數由 BackgroundTasks 呼叫,完整流程:
1. 從 Redis/DB 讀取 approval 資料
2. 驗證狀態為 approved
3. 提取 kubectl_command
4. 使用 asyncio.create_subprocess_shell 執行
5. 更新狀態為 executed 或 failed
Args:
approval_id: Approval ID (UUID string)
Returns:
ExecutionResult: 執行結果
"""
from src.services.multi_sig_redis import MultiSigRedisService
from src.services.approval_db import get_approval_service
logger.info(
"execute_approved_proposal_start",
approval_id=approval_id,
)
# Step 1: 讀取 approval 資料 (Redis 優先)
redis_service = MultiSigRedisService()
approval_data = await redis_service.get_approval(approval_id)
if not approval_data:
# Fallback to DB
db_service = get_approval_service()
approval_obj = await db_service.get_approval_by_id(approval_id)
if approval_obj:
approval_data = {
"id": str(approval_obj.id),
"status": approval_obj.status.value,
"action": approval_obj.action,
"metadata": approval_obj.metadata or {},
}
if not approval_data:
logger.error(
"execute_approved_proposal_not_found",
approval_id=approval_id,
)
return ExecutionResult(
success=False,
message=f"Approval {approval_id} not found",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=approval_id,
namespace="unknown",
duration_ms=0,
error="Approval not found",
)
# Step 2: 驗證狀態
status = approval_data.get("status", "").lower()
if status != "approved":
logger.warning(
"execute_approved_proposal_wrong_status",
approval_id=approval_id,
status=status,
)
return ExecutionResult(
success=False,
message=f"Cannot execute: status is '{status}', expected 'approved'",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=approval_id,
namespace="unknown",
duration_ms=0,
error=f"Invalid status: {status}",
)
# Step 3: 提取 kubectl_command
metadata = approval_data.get("metadata", {})
if isinstance(metadata, str):
import json
try:
metadata = json.loads(metadata)
except:
metadata = {}
kubectl_command = metadata.get("kubectl_command", "")
# 若無 kubectl_command嘗試從 action 解析
if not kubectl_command:
action = approval_data.get("action", "")
if "kubectl" in action.lower():
kubectl_command = action
else:
# 建構預設指令
namespace = approval_data.get("namespace", "awoooi-prod")
resource = approval_data.get("resource_name", "")
if "restart" in action.lower():
kubectl_command = f"kubectl rollout restart deployment/{resource} -n {namespace}"
elif "scale" in action.lower():
kubectl_command = f"kubectl get deployment/{resource} -n {namespace}"
else:
kubectl_command = f"kubectl get pods -n {namespace}"
if not kubectl_command:
logger.error(
"execute_approved_proposal_no_command",
approval_id=approval_id,
)
return ExecutionResult(
success=False,
message="No kubectl_command found in approval",
operation_type=OperationType.RESTART_DEPLOYMENT,
target_resource=approval_id,
namespace="unknown",
duration_ms=0,
error="Missing kubectl_command",
)
logger.info(
"execute_approved_proposal_command_extracted",
approval_id=approval_id,
kubectl_command=kubectl_command[:100],
)
# Step 4: 執行 kubectl 指令
executor = get_executor()
result = await executor.execute_kubectl_command(kubectl_command)
# Step 5: 更新狀態
new_status = "executed" if result.success else "failed"
execution_log = {
"executed_at": datetime.now(timezone.utc).isoformat(),
"success": result.success,
"stdout": result.k8s_response.get("stdout", "") if result.k8s_response else "",
"stderr": result.error or "",
"duration_ms": result.duration_ms,
}
try:
await redis_service.update_status(
approval_id=approval_id,
status=new_status,
executor_id="k8s_executor",
execution_result=execution_log,
)
logger.info(
"execute_approved_proposal_status_updated",
approval_id=approval_id,
new_status=new_status,
success=result.success,
)
except Exception as e:
logger.error(
"execute_approved_proposal_status_update_failed",
approval_id=approval_id,
error=str(e),
)
return result