Files
awoooi/apps/api/src/plugins/mcp/providers/ssh_provider.py
Your Name 813d088339
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
run-migration / migrate (push) Successful in 9s
CD Pipeline / tests (push) Successful in 1m11s
CD Pipeline / build-and-deploy (push) Successful in 3m17s
CD Pipeline / post-deploy-checks (push) Successful in 1m16s
feat(auto-repair): route ssh diagnostics through mcp gateway
2026-05-14 21:13:05 +08:00

705 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
SSH MCP Tool Provider — MCP Phase 2a
======================================
主機層操作代理 — 補完 K8s 無法覆蓋的 70% 主機層告警
工具分組:
群組 A (10 個只讀診斷工具,無需信任度)
ssh_diagnose — 主機/備份通用只讀診斷
ssh_get_top_processes — HostHighCpuLoad / HostOutOfMemory
ssh_get_disk_usage — HostOutOfDiskSpace
ssh_get_memory_info — HostOutOfMemory
ssh_get_container_logs — DockerContainerExited / HarborDown
ssh_get_container_status — 所有 DockerContainer* 告警
ssh_get_service_status — OllamaDown / KaliScannerDown
ssh_check_port — 服務宕機確認
ssh_get_nginx_error_log — 網站宕機 / TLS 問題
ssh_get_swap_info — HostOutOfMemory
群組 B (6 個安全操作工具,需 trust_score >= 0.8)
ssh_docker_restart — DockerContainerExited / HarborDown
ssh_docker_compose_restart — SentryDown / SignOzDown / GiteaDown
ssh_systemctl_restart — OllamaDown / KaliScannerDown
ssh_clear_docker_logs — HostOutOfDiskSpace (log 佔用)
ssh_renew_ssl — TLSCertExpiringIn7Days
ssh_reload_nginx — TLSProbeFailure / conf 更新後
四層安全守衛 (缺一不可):
1. tool_name 必須在白名單
2. host 必須在 SSH_MCP_ALLOWED_HOSTS
3. 所有參數通過 FORBIDDEN_PATTERNS 正則審查
4. 群組 B 工具必須 trust_score >= 0.8
SSH 連線:
認證方式: Private Key從 /run/secrets/ssh_mcp_key 讀取
連線庫: asyncssh (純 Python)
絞殺者開關: SSH_MCP_ENABLED env var
建立時間: 2026-04-11 (台北時區)
建立者: Claude Sonnet 4.6 — MCP Phase 2a
@see docs/superpowers/specs/2026-04-10-infra-rebuild-sprint-abc-design.md §MCP-2a
"""
import logging
import re
import uuid
from datetime import UTC, datetime
from typing import Any
import structlog
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
logger = structlog.get_logger(__name__)
_asyncssh_logger_configured = False
# =============================================================================
# 安全常數
# =============================================================================
SSH_KEY_PATH = "/run/secrets/ssh_mcp_key"
SSH_USER = "wooo"
SSH_PORT = 22
DEFAULT_HOST_USERS = {
# AI/Web host is operated by the ollama account in the current topology.
"192.168.0.188": "ollama",
}
SHORT_HOST_MAP = {
"110": "192.168.0.110",
"120": "192.168.0.120",
"121": "192.168.0.121",
"188": "192.168.0.188",
}
DIAG_TIMEOUT = 10 # 診斷類超時(秒)
OP_TIMEOUT = 60 # 操作類超時(秒)
# 禁止字串正則(硬編碼守衛)
FORBIDDEN_PATTERNS = [
r"rm\s+-rf", # 遞歸刪除
r"/etc/passwd", # 系統帳號
r"authorized_keys", # SSH key
r"sudoers", # 權限設定
r"\$\(", # 命令替換
r"`", # 反引號執行
r"\|.*rm", # pipe 到刪除
r">\s*/etc/", # 重定向到系統目錄
]
# 參數白名單正則 — 用於 _validate_param()
# container_name / service / filter_name: 英數字、連字號、底線、點k8s/docker 命名規範)
_RE_SAFE_NAME = re.compile(r'^[a-zA-Z0-9._-]{1,128}$')
# compose_dir: 必須以 /opt/ 或 /srv/ 開頭,不含 ..
_RE_SAFE_DIR = re.compile(r'^/(opt|srv)/[a-zA-Z0-9._/-]{1,200}$')
# domain: FQDN + 萬用字元
_RE_SAFE_DOMAIN = re.compile(r'^(\*\.)?[a-zA-Z0-9.-]{1,253}$')
def _validate_param(key: str, value: str) -> str:
"""
對用戶提供的字串參數套用白名單驗證。
傳回驗證後的值;驗證失敗拋出 ValueError呼叫方應攔截並拒絕執行
"""
if key in ("container_name", "filter_name", "service"):
if not _RE_SAFE_NAME.match(value):
raise ValueError(f"Unsafe {key}: {value!r}")
elif key == "compose_dir":
# 禁止路徑穿越
if ".." in value or not _RE_SAFE_DIR.match(value):
raise ValueError(f"Unsafe compose_dir: {value!r}")
elif key == "domain":
if not _RE_SAFE_DOMAIN.match(value):
raise ValueError(f"Unsafe domain: {value!r}")
# tail / port / lines 由呼叫方 int() 轉換,不需字串白名單
return value
def _normalize_ssh_host(value: str) -> str:
"""
Normalize host labels before they enter asyncssh.
Prometheus labels often arrive as ``192.168.0.110:9100``. That port is the
exporter port, not SSH. The SSH provider must connect to the host on the
platform SSH port, otherwise asyncssh can receive a stringly port from
config/labels and fail with ``%d format`` before the tool even runs.
"""
host = (value or "").strip()
if host.startswith("ssh://"):
host = host.removeprefix("ssh://")
if "@" in host:
host = host.rsplit("@", 1)[1]
if host.startswith("[") and "]" in host:
return host[1:host.index("]")]
if host.count(":") == 1:
maybe_host, maybe_port = host.rsplit(":", 1)
if maybe_port.isdigit():
host = maybe_host
if host in SHORT_HOST_MAP:
return SHORT_HOST_MAP[host]
return host
def _quiet_asyncssh_info_logs() -> None:
"""Keep third-party asyncssh INFO logs from breaking stdlib %-format logging.
Some target SSH servers send exit status as a string. AsyncSSH then emits an
INFO log with ``%d`` and that string argument before our code sees the
result, which produces noisy ``TypeError: %d format`` tracebacks. The tool
result itself is still available, so production should keep asyncssh at
WARNING and rely on our structured MCP audit logs.
"""
global _asyncssh_logger_configured
if _asyncssh_logger_configured:
return
logging.getLogger("asyncssh").setLevel(logging.WARNING)
_asyncssh_logger_configured = True
# 群組 A只讀
GROUP_A_TOOLS = {
"ssh_diagnose",
"ssh_get_top_processes",
"ssh_get_disk_usage",
"ssh_get_memory_info",
"ssh_get_container_logs",
"ssh_get_container_status",
"ssh_get_service_status",
"ssh_check_port",
"ssh_get_nginx_error_log",
"ssh_get_swap_info",
}
# 群組 B安全操作需 trust_score
GROUP_B_TOOLS = {
"ssh_docker_restart",
"ssh_docker_compose_restart",
"ssh_systemctl_restart",
"ssh_clear_docker_logs",
"ssh_renew_ssl",
"ssh_reload_nginx",
"ssh_docker_prune",
}
# Disk usage gate for ssh_docker_prune (only run when usage >= this %)
# 2026-05-02 ogt + Claude Sonnet 4.6: prevent accidental prune on healthy hosts
DOCKER_PRUNE_DISK_GATE_PCT = 75
ALL_TOOLS = GROUP_A_TOOLS | GROUP_B_TOOLS
MIN_TRUST_SCORE_FOR_GROUP_B = 0.8
class SSHProvider(MCPToolProvider):
"""
SSH MCP Provider — 主機層操作代理
Phase MCP-2a: 補完 K8s 無法覆蓋的 70% 主機層告警
絞殺者開關: SSH_MCP_ENABLED env var
"""
@property
def name(self) -> str:
return "ssh_host"
@property
def enabled(self) -> bool:
from src.core.config import settings
return getattr(settings, "SSH_MCP_ENABLED", False)
def _allowed_hosts(self) -> list[str]:
from src.core.config import settings
raw = getattr(settings, "SSH_MCP_ALLOWED_HOSTS", "")
if not raw:
# P0.4 fix 2026-04-24 ogt + Claude Sonnet 4.6: 補入 120/121原 default 缺失)
return ["192.168.0.188", "192.168.0.110", "192.168.0.111", "192.168.0.120", "192.168.0.121"]
if isinstance(raw, list):
return raw
return [h.strip() for h in raw.split(",") if h.strip()]
def _host_users(self) -> dict[str, str]:
from src.core.config import settings
users = dict(DEFAULT_HOST_USERS)
raw = getattr(settings, "SSH_MCP_HOST_USERS", "") or ""
for item in raw.split(","):
if not item.strip() or "=" not in item:
continue
host, username = item.split("=", 1)
host = host.strip()
username = username.strip()
if host and username:
users[host] = username
return users
def _ssh_user_for_host(self, host: str) -> str:
return self._host_users().get(host, SSH_USER)
# =========================================================================
# list_tools
# =========================================================================
async def list_tools(self) -> list[MCPTool]:
return [
# ---- 群組 A ----
MCPTool(
name="ssh_diagnose",
description=(
"Collect host CPU, memory, disk and load evidence. "
"Read-only; used when host/backup alerts need generic SSH diagnosis."
),
input_schema={"type": "object", "properties": {
"host": {"type": "string", "description": "Target host IP"},
"container_name": {
"type": "string",
"description": "Optional Docker container name for container-focused diagnostics",
},
}, "required": ["host"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_top_processes",
description="Get top CPU/memory consuming processes on the target host (ps aux --sort=-%cpu | head 15). Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string", "description": "Target host IP (e.g. 192.168.0.188)"},
}, "required": ["host"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_disk_usage",
description="Get disk usage on the target host (df -h && du -sh /var/lib/docker). Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
}, "required": ["host"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_memory_info",
description="Get memory info (free -h). Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
}, "required": ["host"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_container_logs",
description="Get Docker container logs (last 50 lines). Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"container_name": {"type": "string", "description": "Docker container name"},
"tail": {"type": "integer", "description": "Number of lines (default: 50)"},
}, "required": ["host", "container_name"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_container_status",
description="Get Docker container status filtered by name. Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"filter_name": {"type": "string", "description": "Container name filter"},
}, "required": ["host", "filter_name"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_service_status",
description="Get systemd service status. Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"service": {"type": "string", "description": "Service name (e.g. ollama)"},
}, "required": ["host", "service"]},
server_name=self.name,
),
MCPTool(
name="ssh_check_port",
description="Check if a port is listening on the target host. Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"port": {"type": "integer", "description": "Port number"},
}, "required": ["host", "port"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_nginx_error_log",
description="Get last 50 lines of nginx error log. Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"lines": {"type": "integer", "description": "Number of lines (default: 50)"},
}, "required": ["host"]},
server_name=self.name,
),
MCPTool(
name="ssh_get_swap_info",
description="Get swap and memory info (swapon --show && free -h). Read-only.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
}, "required": ["host"]},
server_name=self.name,
),
# ---- 群組 B ----
MCPTool(
name="ssh_docker_restart",
description="Restart a Docker container (docker restart <name>). Requires trust_score >= 0.8.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"container_name": {"type": "string"},
"trust_score": {"type": "number", "description": "Current trust score (0.0-1.0)"},
}, "required": ["host", "container_name", "trust_score"]},
server_name=self.name,
),
MCPTool(
name="ssh_docker_compose_restart",
description="Restart a service via docker compose. Requires trust_score >= 0.8.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"compose_dir": {"type": "string", "description": "Directory containing docker-compose.yml"},
"service": {"type": "string", "description": "Service name in compose file"},
"trust_score": {"type": "number"},
}, "required": ["host", "compose_dir", "service", "trust_score"]},
server_name=self.name,
),
MCPTool(
name="ssh_systemctl_restart",
description="Restart a systemd service (systemctl restart <service>). Requires trust_score >= 0.8.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"service": {"type": "string"},
"trust_score": {"type": "number"},
}, "required": ["host", "service", "trust_score"]},
server_name=self.name,
),
MCPTool(
name="ssh_clear_docker_logs",
description="Truncate Docker container log file to free disk space. Requires trust_score >= 0.8.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"container_name": {"type": "string"},
"trust_score": {"type": "number"},
}, "required": ["host", "container_name", "trust_score"]},
server_name=self.name,
),
MCPTool(
name="ssh_renew_ssl",
description="Renew SSL certificate via certbot. Requires trust_score >= 0.8.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"domain": {"type": "string", "description": "Certificate name (certbot --cert-name)"},
"trust_score": {"type": "number"},
}, "required": ["host", "domain", "trust_score"]},
server_name=self.name,
),
MCPTool(
name="ssh_reload_nginx",
description="Test and reload nginx config (nginx -t && systemctl reload nginx). Requires trust_score >= 0.8.",
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"trust_score": {"type": "number"},
}, "required": ["host", "trust_score"]},
server_name=self.name,
),
MCPTool(
name="ssh_docker_prune",
description=(
"Reclaim disk space via docker image+volume+builder prune. "
f"Gated: only runs when root filesystem usage >= {DOCKER_PRUNE_DISK_GATE_PCT}%. "
"Requires trust_score >= 0.8."
),
input_schema={"type": "object", "properties": {
"host": {"type": "string"},
"trust_score": {"type": "number"},
}, "required": ["host", "trust_score"]},
server_name=self.name,
),
]
# =========================================================================
# execute
# =========================================================================
async def execute(
self,
tool_name: str,
parameters: dict[str, Any],
) -> MCPToolResult:
execution_id = str(uuid.uuid4())
start = datetime.now(UTC)
# 守衛 1: 白名單
if tool_name not in ALL_TOOLS:
return MCPToolResult(
success=False,
execution_id=execution_id,
error=f"Unknown tool: {tool_name}",
)
host = _normalize_ssh_host(str(parameters.get("host", "")))
# 守衛 2: 允許的 host
if host not in self._allowed_hosts():
return MCPToolResult(
success=False,
execution_id=execution_id,
error=f"Host '{host}' not in SSH_MCP_ALLOWED_HOSTS",
)
# 守衛 3: 參數安全檢查
security_error = self._check_params_safety(parameters)
if security_error:
logger.warning(
"ssh_mcp_forbidden_pattern",
tool=tool_name,
host=host,
reason=security_error,
)
return MCPToolResult(
success=False,
execution_id=execution_id,
error=f"Security guard blocked: {security_error}",
)
# 守衛 4: 群組 B 信任度
if tool_name in GROUP_B_TOOLS:
trust_score = float(parameters.get("trust_score", 0.0))
if trust_score < MIN_TRUST_SCORE_FOR_GROUP_B:
return MCPToolResult(
success=False,
execution_id=execution_id,
error=(
f"Group B tool '{tool_name}' requires trust_score >= "
f"{MIN_TRUST_SCORE_FOR_GROUP_B}, got {trust_score:.2f}. "
"Consider manual execution."
),
)
# P1 fix 2026-04-11: 群組 B 寫入工具必須有 known_hosts否則拒絕執行
# 防止 MITM — 讀取工具Group A允許 known_hosts=None 方便診斷;寫入操作不行
import os as _os
if tool_name in GROUP_B_TOOLS:
_kh = _os.environ.get("SSH_MCP_KNOWN_HOSTS_FILE")
if not _kh or not _os.path.exists(_kh):
return MCPToolResult(
success=False,
execution_id=execution_id,
error=(
"Group B write tool refused: SSH_MCP_KNOWN_HOSTS_FILE not set or missing. "
"Set up known_hosts per docs/runbooks/ssh-mcp-setup.md before write operations."
),
)
# 執行
try:
is_group_b = tool_name in GROUP_B_TOOLS
timeout = OP_TIMEOUT if is_group_b else DIAG_TIMEOUT
output = await self._run_tool(tool_name, parameters, host, timeout)
duration = (datetime.now(UTC) - start).total_seconds()
logger.info("ssh_mcp_executed", tool=tool_name, host=host, duration=duration)
return MCPToolResult(
success=True,
execution_id=execution_id,
output=output,
duration=duration,
)
except Exception as e:
duration = (datetime.now(UTC) - start).total_seconds()
logger.warning("ssh_mcp_failed", tool=tool_name, host=host, error=str(e))
return MCPToolResult(
success=False,
execution_id=execution_id,
error=str(e),
duration=duration,
)
async def health_check(self) -> bool:
"""只確認 SSH key 存在,不真正 SSH避免 health check 觸發副作用)"""
import os
return os.path.exists(SSH_KEY_PATH)
# =========================================================================
# 安全守衛
# =========================================================================
def _check_params_safety(self, params: dict) -> str | None:
"""
掃描所有字串參數,找到禁止字串即返回 error 字串
"""
for value in params.values():
if not isinstance(value, str):
continue
for pattern in FORBIDDEN_PATTERNS:
if re.search(pattern, value, re.IGNORECASE):
return f"Forbidden pattern '{pattern}' found in parameter"
return None
# =========================================================================
# 工具執行
# =========================================================================
async def _run_tool(
self,
tool_name: str,
params: dict,
host: str,
timeout: int,
) -> dict:
cmd = self._build_command(tool_name, params)
username = self._ssh_user_for_host(host)
stdout, stderr = await self._ssh_exec(host, cmd, timeout, username)
return {
"host": host,
"username": username,
"tool": tool_name,
"command": cmd,
"stdout": stdout,
"stderr": stderr,
}
def _build_command(self, tool_name: str, params: dict) -> str:
# 所有接受用戶字串的工具,必須先通過 _validate_param() 白名單驗證
if tool_name == "ssh_diagnose":
# 2026-04-27 Claude Sonnet 4.6: 主機告警自動診斷 — 只讀,不修改任何狀態
command = (
"echo '=== CPU TOP ===' && ps aux --sort=-%cpu | head -15 && "
"echo '=== MEMORY ===' && free -h && "
"echo '=== DISK ===' && df -h && "
"echo '=== LOAD ===' && uptime"
)
container_name = params.get("container_name")
if container_name:
name = _validate_param("container_name", str(container_name))
command = (
f"{command} && "
f"echo '=== DOCKER STATS {name} ===' && "
f"docker stats --no-stream {name} 2>&1 && "
f"echo '=== DOCKER INSPECT {name} ===' && "
f"docker inspect {name} 2>&1 | head -80"
)
return command
if tool_name == "ssh_get_top_processes":
return "ps aux --sort=-%cpu | head -15"
if tool_name == "ssh_get_disk_usage":
return "df -h && echo '---' && du -sh /var/lib/docker 2>/dev/null || true"
if tool_name == "ssh_get_memory_info":
return "free -h"
if tool_name == "ssh_get_container_logs":
name = _validate_param("container_name", params["container_name"])
tail = max(1, min(int(params.get("tail", 50)), 1000)) # 上限 1000 行
return f"docker logs {name} --tail {tail} 2>&1"
if tool_name == "ssh_get_container_status":
name = _validate_param("filter_name", params["filter_name"])
return f"docker ps -a --filter name={name}"
if tool_name == "ssh_get_service_status":
svc = _validate_param("service", params["service"])
return f"systemctl status {svc} --no-pager -l 2>&1 | head -30"
if tool_name == "ssh_check_port":
port = max(1, min(int(params["port"]), 65535))
return f"ss -tlnp | grep :{port}"
if tool_name == "ssh_get_nginx_error_log":
lines = max(1, min(int(params.get("lines", 50)), 500)) # 上限 500 行
return f"tail -n {lines} /var/log/nginx/error.log 2>/dev/null || echo 'Log not found'"
if tool_name == "ssh_get_swap_info":
return "swapon --show; echo '---'; free -h"
if tool_name == "ssh_docker_restart":
name = _validate_param("container_name", params["container_name"])
return f"docker restart {name}"
if tool_name == "ssh_docker_compose_restart":
compose_dir = _validate_param("compose_dir", params["compose_dir"])
service = _validate_param("service", params["service"])
return f"cd {compose_dir} && docker compose restart {service}"
if tool_name == "ssh_systemctl_restart":
svc = _validate_param("service", params["service"])
return f"systemctl restart {svc}"
if tool_name == "ssh_clear_docker_logs":
name = _validate_param("container_name", params["container_name"])
# 透過 docker inspect 取得 log 路徑,再截斷
return (
f"LOG_PATH=$(docker inspect --format='{{{{.LogPath}}}}' {name} 2>/dev/null) "
f"&& [ -n \"$LOG_PATH\" ] && truncate -s 0 \"$LOG_PATH\" && echo 'Cleared' "
f"|| echo 'Container not found'"
)
if tool_name == "ssh_renew_ssl":
domain = _validate_param("domain", params["domain"])
return f"/snap/bin/certbot renew --cert-name {domain} --non-interactive 2>&1"
if tool_name == "ssh_reload_nginx":
return "nginx -t 2>&1 && systemctl reload nginx && echo 'Nginx reloaded'"
if tool_name == "ssh_docker_prune":
# Disk-gated docker prune: only acts when the alerting condition still holds.
# 2026-05-02 ogt + Claude Sonnet 4.6: ADR-068 飛輪 — disk full SOP
gate = DOCKER_PRUNE_DISK_GATE_PCT
return (
"USAGE=$(df --output=pcent / | tail -1 | tr -dc '0-9'); "
f"if [ -z \"$USAGE\" ] || [ \"$USAGE\" -lt {gate} ]; then "
f"echo \"skip: disk usage ${{USAGE}}% < {gate}% gate\"; exit 0; fi; "
"echo '=== DISK BEFORE ==='; df -h /; "
"echo '=== IMAGE PRUNE ==='; docker image prune -a -f 2>&1 | tail -3; "
"echo '=== VOLUME PRUNE ==='; docker volume prune -f 2>&1 | tail -3; "
"echo '=== BUILDER PRUNE ==='; docker builder prune -a -f 2>&1 | tail -3; "
"echo '=== DISK AFTER ==='; df -h /"
)
raise ValueError(f"No command builder for tool: {tool_name}")
async def _ssh_exec(
self,
host: str,
cmd: str,
timeout: int,
username: str | None = None,
) -> tuple[str, str]:
"""
透過 asyncssh 執行 SSH 命令
如果 asyncssh 未安裝,返回明確錯誤而非崩潰。
"""
try:
import asyncssh
except ImportError:
raise RuntimeError(
"asyncssh is not installed. "
"Add 'asyncssh' to pyproject.toml dependencies."
) from None
_quiet_asyncssh_info_logs()
import os
if not os.path.exists(SSH_KEY_PATH):
raise RuntimeError(
f"SSH key not found at {SSH_KEY_PATH}. "
"Ensure K8s Secret 'ssh-mcp-key' is mounted correctly."
)
# known_hosts: 預設 None內網快速啟動
# 生產強化方式:設定 SSH_MCP_KNOWN_HOSTS_FILE 指向 ssh-keyscan 產生的文件
import os as _os
known_hosts_path = _os.environ.get("SSH_MCP_KNOWN_HOSTS_FILE", None)
if known_hosts_path is None:
logger.warning(
"ssh_mcp.known_hosts_disabled",
note="Set SSH_MCP_KNOWN_HOSTS_FILE env var to enable host key verification",
)
async with asyncssh.connect(
host,
port=SSH_PORT,
username=username or SSH_USER,
client_keys=[SSH_KEY_PATH],
known_hosts=known_hosts_path, # None = 跳過驗證(內網),或指定文件路徑
config=None, # 禁止讀取使用者 ssh config避免 Port 字串污染 asyncssh
connect_timeout=float(timeout),
) as conn:
# Bug 根因asyncssh 模組沒有頂層 run();應呼叫 conn.run()2026-04-24 Claude Sonnet 4.6
result = await conn.run(cmd, timeout=float(timeout), check=False)
return (result.stdout or ""), (result.stderr or "")