feat(api): HostRepairAgent — SSH 主機層修復 (Task 11)

- host_repair_agent.py: layer路由、command injection防護、asyncio SSH執行
- 測試: 12 cases 全通過 (routing/sanitize/success/fail/timeout/denied)
- SSH key: /etc/repair-ssh/id_ed25519 (K8s secret mount)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-05 11:22:00 +08:00
parent 892c5d53a7
commit e7d8da85f6
2 changed files with 264 additions and 0 deletions

View File

@@ -0,0 +1,134 @@
"""
src/services/host_repair_agent.py
Host Repair Agent — 透過 SSH 執行主機層修復
2026-04-05 Claude Code: Sprint 3 Host Auto-Repair
"""
import asyncio
import re
import logging
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# SSH 連線設定 — layer → host config
LAYER_SSH_CONFIG: dict[str, dict] = {
"docker-110": {
"host": "192.168.0.110",
"user": "wooo",
"key_path": "/etc/repair-ssh/id_ed25519",
},
"docker-188": {
"host": "192.168.0.188",
"user": "ollama",
"key_path": "/etc/repair-ssh/id_ed25519",
},
"systemd-188": {
"host": "192.168.0.188",
"user": "ollama",
"key_path": "/etc/repair-ssh/id_ed25519",
},
}
# Component 名稱規則: 小寫英數 + 連字符1-31 字元
_COMPONENT_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,30}$")
SSH_TIMEOUT = 60 # seconds
@dataclass
class HostRepairResult:
success: bool
layer: str
component: str
output: str = ""
error: str = ""
def get_ssh_config_for_layer(layer: str) -> dict:
"""取得指定 layer 的 SSH 連線設定。k8s layer 不走 SSH。"""
if layer == "k8s" or layer.startswith("k8s"):
raise ValueError(f"Layer '{layer}' uses kubectl, not SSH")
config = LAYER_SSH_CONFIG.get(layer)
if config is None:
raise ValueError(f"Unknown layer: '{layer}'")
return config
def build_repair_command(component: str) -> str:
"""組裝 repair 命令,防止 command injection。"""
if not _COMPONENT_RE.match(component):
raise ValueError(f"Invalid component name: '{component}'")
return f"repair:{component}"
class HostRepairAgent:
"""透過 SSH 執行主機層修復命令。"""
async def repair(self, layer: str, component: str) -> HostRepairResult:
"""執行修復並回傳結果。"""
try:
config = get_ssh_config_for_layer(layer)
command = build_repair_command(component)
except ValueError as e:
return HostRepairResult(
success=False,
layer=layer,
component=component,
error=str(e),
)
try:
output = await self._ssh_execute(
host=config["host"],
user=config["user"],
command=command,
)
except asyncio.TimeoutError:
return HostRepairResult(
success=False,
layer=layer,
component=component,
error=f"SSH timeout after {SSH_TIMEOUT}s",
)
except Exception as e:
return HostRepairResult(
success=False,
layer=layer,
component=component,
error=str(e),
)
success = output.startswith("REPAIR_OK:")
return HostRepairResult(
success=success,
layer=layer,
component=component,
output=output,
error="" if success else output,
)
async def _ssh_execute(self, host: str, user: str, command: str) -> str:
"""執行 SSH 命令,回傳 stdout。"""
key_path = LAYER_SSH_CONFIG.get(
next((k for k, v in LAYER_SSH_CONFIG.items() if v["host"] == host and v["user"] == user), None),
{}
).get("key_path", "/etc/repair-ssh/id_ed25519")
proc = await asyncio.wait_for(
asyncio.create_subprocess_exec(
"ssh",
"-i", key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-o", f"ConnectTimeout={SSH_TIMEOUT}",
f"{user}@{host}",
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
),
timeout=SSH_TIMEOUT,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=SSH_TIMEOUT)
output = stdout.decode().strip()
logger.info("SSH repair %s@%s %s%s", user, host, command, output)
return output

View File

@@ -0,0 +1,130 @@
"""
tests/test_host_repair_agent.py
Host Repair Agent 單元測試
不需要實際 SSH 連線 — 測試路由邏輯和命令組裝
"""
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
# =============================================================================
# 測試 HostRepairConfig 路由
# =============================================================================
class TestHostRepairConfig:
def test_layer_docker_110_routes_to_110(self):
from src.services.host_repair_agent import get_ssh_config_for_layer
config = get_ssh_config_for_layer("docker-110")
assert config["user"] == "wooo"
assert config["host"] == "192.168.0.110"
def test_layer_docker_188_routes_to_188(self):
from src.services.host_repair_agent import get_ssh_config_for_layer
config = get_ssh_config_for_layer("docker-188")
assert config["user"] == "ollama"
assert config["host"] == "192.168.0.188"
def test_layer_systemd_188_routes_to_188(self):
from src.services.host_repair_agent import get_ssh_config_for_layer
config = get_ssh_config_for_layer("systemd-188")
assert config["user"] == "ollama"
assert config["host"] == "192.168.0.188"
def test_unknown_layer_raises(self):
from src.services.host_repair_agent import get_ssh_config_for_layer
with pytest.raises(ValueError, match="Unknown layer"):
get_ssh_config_for_layer("unknown-layer")
def test_k8s_layer_raises(self):
"""k8s layer 不走 SSH應 raise"""
from src.services.host_repair_agent import get_ssh_config_for_layer
with pytest.raises(ValueError, match="kubectl"):
get_ssh_config_for_layer("k8s")
# =============================================================================
# 測試 SSH 命令組裝
# =============================================================================
class TestSSHCommandBuilding:
def test_repair_command_format(self):
from src.services.host_repair_agent import build_repair_command
cmd = build_repair_command("sentry")
assert cmd == "repair:sentry"
def test_repair_command_component_sanitized(self):
"""防止 command injection"""
from src.services.host_repair_agent import build_repair_command
with pytest.raises(ValueError, match="Invalid component"):
build_repair_command("sentry; rm -rf /")
def test_repair_command_valid_components(self):
from src.services.host_repair_agent import build_repair_command
valid = ["sentry", "harbor", "gitea", "openclaw", "gitea-runner", "alertmanager", "redis", "nginx"]
for component in valid:
cmd = build_repair_command(component)
assert cmd == f"repair:{component}"
# =============================================================================
# 測試 HostRepairAgent.repair() 路由
# =============================================================================
class TestHostRepairAgent:
@pytest.mark.asyncio
async def test_repair_success_returns_ok(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
with patch.object(agent, "_ssh_execute", new_callable=AsyncMock) as mock_ssh:
mock_ssh.return_value = "REPAIR_OK:sentry"
result = await agent.repair(layer="docker-110", component="sentry")
assert result.success is True
assert result.component == "sentry"
assert result.layer == "docker-110"
mock_ssh.assert_called_once_with(
host="192.168.0.110",
user="wooo",
command="repair:sentry"
)
@pytest.mark.asyncio
async def test_repair_fail_returns_failure(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
with patch.object(agent, "_ssh_execute", new_callable=AsyncMock) as mock_ssh:
mock_ssh.return_value = "REPAIR_FAIL:harbor:exit_1"
result = await agent.repair(layer="docker-110", component="harbor")
assert result.success is False
assert "REPAIR_FAIL" in result.error
@pytest.mark.asyncio
async def test_repair_ssh_timeout_returns_failure(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
with patch.object(agent, "_ssh_execute", new_callable=AsyncMock) as mock_ssh:
mock_ssh.side_effect = asyncio.TimeoutError()
result = await agent.repair(layer="docker-110", component="sentry")
assert result.success is False
assert "timeout" in result.error.lower()
@pytest.mark.asyncio
async def test_repair_denied_returns_failure(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
with patch.object(agent, "_ssh_execute", new_callable=AsyncMock) as mock_ssh:
mock_ssh.return_value = "REPAIR_DENIED:unknown_component:badcomponent"
result = await agent.repair(layer="docker-110", component="badcomponent")
assert result.success is False