feat(mcp-phase1): K8s MCP 強化 — 6 個新工具 + namespace 白名單

MCP Phase 1 (ADR-069 Sprint B 後驗收):
  k8s_get_pod_logs    — Pod log 取得 (tail 1-500,支援 previous)
  k8s_watch_rollout   — rollout 狀態監控直到完成 (timeout 10-300s)
  k8s_get_events      — K8s events (可過濾 resource_name / event_type)
  k8s_describe_pod    — 完整 Pod describe (Conditions/Volumes/Env)
  k8s_get_hpa_status  — HPA 副本數/CPU utilization
  k8s_get_node_conditions — Node Ready/MemoryPressure/DiskPressure

安全強化:
  - ALLOWED_NAMESPACES = {"awoooi-prod"} 硬編碼白名單
  - _validate_namespace() + _validate_name() 參數白名單
  - 數值參數上下限夾緊 (tail 1-500, timeout 10-300s)
  - event_type 只允許 Warning / Normal

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-11 03:01:38 +08:00
parent b783f71b97
commit a29e5e1de2

View File

@@ -8,11 +8,25 @@ Kubernetes MCP Tool Provider - ADR-015 模組化架構
- kubectl_scale: 調整副本數
- kubectl_restart: 重啟 Deployment
MCP Phase 1 新增 (2026-04-11 Claude Sonnet 4.6 — Sprint B 後驗收):
- k8s_get_pod_logs: 取得 Pod 最後 N 行 log取代 log_summary 手動呼叫)
- k8s_watch_rollout: 監控 rollout 狀態直到完成(真正的驗證,非 sleep 猜)
- k8s_get_events: 取得 namespace/resource 的 K8s events
- k8s_describe_pod: 完整 Pod describe含 Conditions、Volumes、Env
- k8s_get_hpa_status: 取得 HPA 當前副本數/上限/下限
- k8s_get_node_conditions: 取得 Node conditionsReady/MemoryPressure/DiskPressure
安全守衛Phase 1 強化):
- namespace 限 awoooi-prod硬編碼白名單
- 操作類工具需 trust_score >= 0.7
透過 DI 注入 ActionExecutor不直接 import services。
@see docs/adr/ADR-015-mcp-modular-architecture.md
@see docs/superpowers/specs/2026-04-10-infra-rebuild-sprint-abc-design.md §MCP Phase 1
"""
import re
import uuid
from typing import Any
@@ -22,6 +36,26 @@ from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
logger = structlog.get_logger(__name__)
# namespace 白名單(硬編碼,防止跨命名空間操作)
ALLOWED_NAMESPACES = {"awoooi-prod"}
DEFAULT_NAMESPACE = "awoooi-prod"
# 資源名稱白名單正則(防止 injection
_RE_SAFE_K8S_NAME = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,252}$')
_RE_SAFE_RESOURCE = re.compile(r'^[a-zA-Z0-9/-]{1,64}$')
def _validate_namespace(ns: str) -> str:
if ns not in ALLOWED_NAMESPACES:
raise ValueError(f"Namespace '{ns}' not allowed (whitelist: {ALLOWED_NAMESPACES})")
return ns
def _validate_name(name: str, field: str = "name") -> str:
if not name or not _RE_SAFE_K8S_NAME.match(name):
raise ValueError(f"Unsafe {field}: {name!r}")
return name
class K8sProvider(MCPToolProvider):
"""
@@ -102,6 +136,105 @@ class K8sProvider(MCPToolProvider):
},
server_name=self.name,
),
# ── MCP Phase 1 新增工具 ──────────────────────────────────────────
MCPTool(
name="k8s_get_pod_logs",
description=(
"Get last N lines of logs from a Pod container. "
"Use for PodRestartingTooMuch, CrashLoopBackOff diagnosis. Read-only."
),
input_schema={
"type": "object",
"properties": {
"pod_name": {"type": "string", "description": "Pod name (e.g. awoooi-api-xxx-yyy)"},
"container": {"type": "string", "description": "Container name (optional, uses first if omitted)"},
"namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"},
"tail": {"type": "integer", "description": "Number of lines (default: 100, max: 500)"},
"previous": {"type": "boolean", "description": "Get logs from previous (crashed) container"},
},
"required": ["pod_name"],
},
server_name=self.name,
),
MCPTool(
name="k8s_watch_rollout",
description=(
"Watch a Deployment rollout until complete or timeout. "
"Use after kubectl_restart to verify success instead of sleeping."
),
input_schema={
"type": "object",
"properties": {
"deployment": {"type": "string", "description": "Deployment name"},
"namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"},
"timeout_seconds": {"type": "integer", "description": "Timeout in seconds (default: 120, max: 300)"},
},
"required": ["deployment"],
},
server_name=self.name,
),
MCPTool(
name="k8s_get_events",
description=(
"Get Kubernetes events for a namespace or specific resource. "
"Useful for diagnosing scheduling failures, image pull errors, OOMKilled."
),
input_schema={
"type": "object",
"properties": {
"namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"},
"resource_name": {"type": "string", "description": "Filter by resource name (optional)"},
"event_type": {"type": "string", "description": "Filter: Warning or Normal (optional)"},
},
"required": [],
},
server_name=self.name,
),
MCPTool(
name="k8s_describe_pod",
description=(
"Full kubectl describe for a Pod: Conditions, Events, Volumes, Env, Resource limits. "
"Use when get_pod_logs is insufficient for diagnosis."
),
input_schema={
"type": "object",
"properties": {
"pod_name": {"type": "string", "description": "Pod name"},
"namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"},
},
"required": ["pod_name"],
},
server_name=self.name,
),
MCPTool(
name="k8s_get_hpa_status",
description=(
"Get HPA (HorizontalPodAutoscaler) current/min/max replicas and CPU utilization. "
"Use for capacity planning and scaling diagnosis."
),
input_schema={
"type": "object",
"properties": {
"namespace": {"type": "string", "description": "Namespace (default: awoooi-prod)"},
"name": {"type": "string", "description": "HPA name (optional, lists all if omitted)"},
},
"required": [],
},
server_name=self.name,
),
MCPTool(
name="k8s_get_node_conditions",
description=(
"Get all Node conditions: Ready, MemoryPressure, DiskPressure, PIDPressure. "
"Use for cluster-wide health diagnosis."
),
input_schema={
"type": "object",
"properties": {},
"required": [],
},
server_name=self.name,
),
]
async def execute(
@@ -121,6 +254,18 @@ class K8sProvider(MCPToolProvider):
output = await self._kubectl_scale(executor, parameters)
elif tool_name == "kubectl_restart":
output = await self._kubectl_restart(executor, parameters)
elif tool_name == "k8s_get_pod_logs":
output = await self._k8s_get_pod_logs(executor, parameters)
elif tool_name == "k8s_watch_rollout":
output = await self._k8s_watch_rollout(executor, parameters)
elif tool_name == "k8s_get_events":
output = await self._k8s_get_events(executor, parameters)
elif tool_name == "k8s_describe_pod":
output = await self._k8s_describe_pod(executor, parameters)
elif tool_name == "k8s_get_hpa_status":
output = await self._k8s_get_hpa_status(executor, parameters)
elif tool_name == "k8s_get_node_conditions":
output = await self._k8s_get_node_conditions(executor, parameters)
else:
return MCPToolResult(
success=False,
@@ -221,6 +366,113 @@ class K8sProvider(MCPToolProvider):
"duration_ms": result.duration_ms,
}
# =========================================================================
# MCP Phase 1 工具實作2026-04-11 Claude Sonnet 4.6
# =========================================================================
async def _k8s_get_pod_logs(self, executor, parameters: dict) -> dict:
pod_name = _validate_name(parameters["pod_name"], "pod_name")
namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE))
container = parameters.get("container", "")
tail = max(1, min(int(parameters.get("tail", 100)), 500))
previous = parameters.get("previous", False)
container_flag = f"-c {_validate_name(container, 'container')}" if container else ""
previous_flag = "--previous" if previous else ""
cmd = f"kubectl logs {pod_name} {container_flag} {previous_flag} --tail={tail} -n {namespace}".strip()
result = await executor.execute_kubectl_command(cmd)
return {
"pod": pod_name,
"namespace": namespace,
"tail": tail,
"previous": previous,
"logs": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "",
"error": result.error if not result.success else None,
}
async def _k8s_watch_rollout(self, executor, parameters: dict) -> dict:
deployment = _validate_name(parameters["deployment"], "deployment")
namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE))
timeout = max(10, min(int(parameters.get("timeout_seconds", 120)), 300))
cmd = f"kubectl rollout status deployment/{deployment} -n {namespace} --timeout={timeout}s"
result = await executor.execute_kubectl_command(cmd)
stdout = result.k8s_response.get("stdout", "") if result.success and result.k8s_response else ""
return {
"deployment": deployment,
"namespace": namespace,
"success": result.success,
"status": stdout.strip() or result.error,
}
async def _k8s_get_events(self, executor, parameters: dict) -> dict:
namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE))
resource_name = parameters.get("resource_name", "")
event_type = parameters.get("event_type", "")
# 白名單校驗可選參數
if resource_name:
resource_name = _validate_name(resource_name, "resource_name")
if event_type and event_type not in ("Warning", "Normal"):
return {"error": "event_type must be 'Warning' or 'Normal'"}
field_selector_parts = []
if resource_name:
field_selector_parts.append(f"involvedObject.name={resource_name}")
if event_type:
field_selector_parts.append(f"type={event_type}")
field_selector = f"--field-selector={','.join(field_selector_parts)}" if field_selector_parts else ""
cmd = f"kubectl get events -n {namespace} {field_selector} --sort-by='.lastTimestamp' -o json".strip()
result = await executor.execute_kubectl_command(cmd)
return {
"namespace": namespace,
"resource_name": resource_name or None,
"event_type": event_type or None,
"events": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "",
"error": result.error if not result.success else None,
}
async def _k8s_describe_pod(self, executor, parameters: dict) -> dict:
pod_name = _validate_name(parameters["pod_name"], "pod_name")
namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE))
cmd = f"kubectl describe pod {pod_name} -n {namespace}"
result = await executor.execute_kubectl_command(cmd)
return {
"pod": pod_name,
"namespace": namespace,
"description": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "",
"error": result.error if not result.success else None,
}
async def _k8s_get_hpa_status(self, executor, parameters: dict) -> dict:
namespace = _validate_namespace(parameters.get("namespace", DEFAULT_NAMESPACE))
name = parameters.get("name", "")
if name:
name = _validate_name(name, "name")
target = name if name else ""
cmd = f"kubectl get hpa {target} -n {namespace} -o json".strip()
result = await executor.execute_kubectl_command(cmd)
return {
"namespace": namespace,
"hpa": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "",
"error": result.error if not result.success else None,
}
async def _k8s_get_node_conditions(self, executor, _parameters: dict) -> dict:
# Node 操作不需要 namespace直接查 cluster 層
cmd = "kubectl get nodes -o json"
result = await executor.execute_kubectl_command(cmd)
return {
"nodes": result.k8s_response.get("stdout", "") if result.success and result.k8s_response else "",
"error": result.error if not result.success else None,
}
async def health_check(self) -> bool:
"""Check if kubectl is accessible"""
try: