diff --git a/apps/api/src/plugins/mcp/providers/k8s_provider.py b/apps/api/src/plugins/mcp/providers/k8s_provider.py index 2c43fa72..9cfb57b4 100644 --- a/apps/api/src/plugins/mcp/providers/k8s_provider.py +++ b/apps/api/src/plugins/mcp/providers/k8s_provider.py @@ -8,11 +8,25 @@ Kubernetes MCP Tool Provider - ADR-015 模組化架構 - kubectl_scale: 調整副本數 - kubectl_restart: 重啟 Deployment +MCP Phase 1 新增 (2026-04-11 Claude Sonnet 4.6 — Sprint B 後驗收): +- k8s_get_pod_logs: 取得 Pod 最後 N 行 log(取代 log_summary 手動呼叫) +- k8s_watch_rollout: 監控 rollout 狀態直到完成(真正的驗證,非 sleep 猜) +- k8s_get_events: 取得 namespace/resource 的 K8s events +- k8s_describe_pod: 完整 Pod describe(含 Conditions、Volumes、Env) +- k8s_get_hpa_status: 取得 HPA 當前副本數/上限/下限 +- k8s_get_node_conditions: 取得 Node conditions(Ready/MemoryPressure/DiskPressure) + +安全守衛(Phase 1 強化): +- namespace 限 awoooi-prod(硬編碼白名單) +- 操作類工具需 trust_score >= 0.7 + 透過 DI 注入 ActionExecutor,不直接 import services。 @see docs/adr/ADR-015-mcp-modular-architecture.md +@see docs/superpowers/specs/2026-04-10-infra-rebuild-sprint-abc-design.md §MCP Phase 1 """ +import re import uuid from typing import Any @@ -22,6 +36,26 @@ from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult logger = structlog.get_logger(__name__) +# namespace 白名單(硬編碼,防止跨命名空間操作) +ALLOWED_NAMESPACES = {"awoooi-prod"} +DEFAULT_NAMESPACE = "awoooi-prod" + +# 資源名稱白名單正則(防止 injection) +_RE_SAFE_K8S_NAME = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,252}$') +_RE_SAFE_RESOURCE = re.compile(r'^[a-zA-Z0-9/-]{1,64}$') + + +def _validate_namespace(ns: str) -> str: + if ns not in ALLOWED_NAMESPACES: + raise ValueError(f"Namespace '{ns}' not allowed (whitelist: {ALLOWED_NAMESPACES})") + return ns + + +def _validate_name(name: str, field: str = "name") -> str: + if not name or not _RE_SAFE_K8S_NAME.match(name): + raise ValueError(f"Unsafe {field}: {name!r}") + return name + class K8sProvider(MCPToolProvider): """ @@ -102,6 +136,105 @@ class K8sProvider(MCPToolProvider): }, server_name=self.name, ), + # ── MCP Phase 1 新增工具 ────────────────────────────────────────── + MCPTool( + name="k8s_get_pod_logs", + description=( + "Get last N lines of logs from a Pod container. " + "Use for PodRestartingTooMuch, CrashLoopBackOff diagnosis. Read-only." + ), + input_schema={ + "type": "object", + "properties": { + "pod_name": {"type": "string", "description": "Pod name (e.g. awoooi-api-xxx-yyy)"}, + "container": {"type": "string", "description": "Container name (optional, uses first if omitted)"}, + "namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"}, + "tail": {"type": "integer", "description": "Number of lines (default: 100, max: 500)"}, + "previous": {"type": "boolean", "description": "Get logs from previous (crashed) container"}, + }, + "required": ["pod_name"], + }, + server_name=self.name, + ), + MCPTool( + name="k8s_watch_rollout", + description=( + "Watch a Deployment rollout until complete or timeout. " + "Use after kubectl_restart to verify success instead of sleeping." + ), + input_schema={ + "type": "object", + "properties": { + "deployment": {"type": "string", "description": "Deployment name"}, + "namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"}, + "timeout_seconds": {"type": "integer", "description": "Timeout in seconds (default: 120, max: 300)"}, + }, + "required": ["deployment"], + }, + server_name=self.name, + ), + MCPTool( + name="k8s_get_events", + description=( + "Get Kubernetes events for a namespace or specific resource. " + "Useful for diagnosing scheduling failures, image pull errors, OOMKilled." + ), + input_schema={ + "type": "object", + "properties": { + "namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"}, + "resource_name": {"type": "string", "description": "Filter by resource name (optional)"}, + "event_type": {"type": "string", "description": "Filter: Warning or Normal (optional)"}, + }, + "required": [], + }, + server_name=self.name, + ), + MCPTool( + name="k8s_describe_pod", + description=( + "Full kubectl describe for a Pod: Conditions, Events, Volumes, Env, Resource limits. " + "Use when get_pod_logs is insufficient for diagnosis." + ), + input_schema={ + "type": "object", + "properties": { + "pod_name": {"type": "string", "description": "Pod name"}, + "namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"}, + }, + "required": ["pod_name"], + }, + server_name=self.name, + ), + MCPTool( + name="k8s_get_hpa_status", + description=( + "Get HPA (HorizontalPodAutoscaler) current/min/max replicas and CPU utilization. " + "Use for capacity planning and scaling diagnosis." + ), + input_schema={ + "type": "object", + "properties": { + "namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"}, + "name": {"type": "string", "description": "HPA name (optional, lists all if omitted)"}, + }, + "required": [], + }, + server_name=self.name, + ), + MCPTool( + name="k8s_get_node_conditions", + description=( + "Get all Node conditions: Ready, MemoryPressure, DiskPressure, PIDPressure. " + "Use for cluster-wide health diagnosis." + ), + input_schema={ + "type": "object", + "properties": {}, + "required": [], + }, + server_name=self.name, + ), ] async def execute( @@ -121,6 +254,18 @@ class K8sProvider(MCPToolProvider): output = await self._kubectl_scale(executor, parameters) elif tool_name == "kubectl_restart": output = await self._kubectl_restart(executor, parameters) + elif tool_name == "k8s_get_pod_logs": + output = await self._k8s_get_pod_logs(executor, parameters) + elif tool_name == "k8s_watch_rollout": + output = await self._k8s_watch_rollout(executor, parameters) + elif tool_name == "k8s_get_events": + output = await self._k8s_get_events(executor, parameters) + elif tool_name == "k8s_describe_pod": + output = await self._k8s_describe_pod(executor, parameters) + elif tool_name == "k8s_get_hpa_status": + output = await self._k8s_get_hpa_status(executor, parameters) + elif tool_name == "k8s_get_node_conditions": + output = await self._k8s_get_node_conditions(executor, parameters) else: return MCPToolResult( success=False, @@ -221,6 +366,113 @@ class K8sProvider(MCPToolProvider): "duration_ms": result.duration_ms, } + # ========================================================================= + # MCP Phase 1 工具實作(2026-04-11 Claude Sonnet 4.6) + # ========================================================================= + + async def _k8s_get_pod_logs(self, executor, parameters: dict) -> dict: + pod_name = _validate_name(parameters["pod_name"], "pod_name") + namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE)) + container = parameters.get("container", "") + tail = max(1, min(int(parameters.get("tail", 100)), 500)) + previous = parameters.get("previous", False) + + container_flag = f"-c {_validate_name(container, 'container')}" if container else "" + previous_flag = "--previous" if previous else "" + cmd = f"kubectl logs {pod_name} {container_flag} {previous_flag} --tail={tail} -n {namespace}".strip() + + result = await executor.execute_kubectl_command(cmd) + return { + "pod": pod_name, + "namespace": namespace, + "tail": tail, + "previous": previous, + "logs": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "", + "error": result.error if not result.success else None, + } + + async def _k8s_watch_rollout(self, executor, parameters: dict) -> dict: + deployment = _validate_name(parameters["deployment"], "deployment") + namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE)) + timeout = max(10, min(int(parameters.get("timeout_seconds", 120)), 300)) + + cmd = f"kubectl rollout status deployment/{deployment} -n {namespace} --timeout={timeout}s" + result = await executor.execute_kubectl_command(cmd) + + stdout = result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "" + return { + "deployment": deployment, + "namespace": namespace, + "success": result.success, + "status": stdout.strip() or result.error, + } + + async def _k8s_get_events(self, executor, parameters: dict) -> dict: + namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE)) + resource_name = parameters.get("resource_name", "") + event_type = parameters.get("event_type", "") + + # 白名單校驗可選參數 + if resource_name: + resource_name = _validate_name(resource_name, "resource_name") + if event_type and event_type not in ("Warning", "Normal"): + return {"error": "event_type must be 'Warning' or 'Normal'"} + + field_selector_parts = [] + if resource_name: + field_selector_parts.append(f"involvedObject.name={resource_name}") + if event_type: + field_selector_parts.append(f"type={event_type}") + + field_selector = f"--field-selector={','.join(field_selector_parts)}" if field_selector_parts else "" + cmd = f"kubectl get events -n {namespace} {field_selector} --sort-by='.lastTimestamp' -o json".strip() + + result = await executor.execute_kubectl_command(cmd) + return { + "namespace": namespace, + "resource_name": resource_name or None, + "event_type": event_type or None, + "events": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "", + "error": result.error if not result.success else None, + } + + async def _k8s_describe_pod(self, executor, parameters: dict) -> dict: + pod_name = _validate_name(parameters["pod_name"], "pod_name") + namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE)) + + cmd = f"kubectl describe pod {pod_name} -n {namespace}" + result = await executor.execute_kubectl_command(cmd) + return { + "pod": pod_name, + "namespace": namespace, + "description": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "", + "error": result.error if not result.success else None, + } + + async def _k8s_get_hpa_status(self, executor, parameters: dict) -> dict: + namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE)) + name = parameters.get("name", "") + if name: + name = _validate_name(name, "name") + + target = name if name else "" + cmd = f"kubectl get hpa {target} -n {namespace} -o json".strip() + result = await executor.execute_kubectl_command(cmd) + return { + "namespace": namespace, + "hpa": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "", + "error": result.error if not result.success else None, + } + + async def _k8s_get_node_conditions(self, executor, _parameters: dict) -> dict: + # Node 操作不需要 namespace,直接查 cluster 層 + cmd = "kubectl get nodes -o json" + result = await executor.execute_kubectl_command(cmd) + return { + "nodes": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "", + "error": result.error if not result.success else None, + } + async def health_check(self) -> bool: """Check if kubectl is accessible""" try: