feat(api): HostRepairAgent 三條執行路徑 + known_hosts + Ansible 白名單 (Sprint 3 T3)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-06 14:22:54 +08:00
parent d4cb9a4ac5
commit 1a654aa37d
2 changed files with 204 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ Host Repair Agent — 透過 SSH 執行主機層修復
2026-04-05 Claude Code: C1 修正 — key_path 直接傳入 _ssh_execute不反查
"""
import asyncio
import os
import re
import logging
from dataclasses import dataclass
@@ -52,6 +53,30 @@ _SUPPORTED_SCHEMES = {"openclaw", "ansible", "ssh"}
_SHELL_METACHAR_RE = re.compile(r'[;&|`&]|\$\(')
_MAX_COMMAND_LEN = 512
# Ansible 控制節點設定 — 從 env/ConfigMap 讀取
# 2026-04-06 Claude Code: Sprint 3 T3
ANSIBLE_CONTROL_HOST = os.environ.get("ANSIBLE_CONTROL_NODE_HOST", "192.168.0.188")
ANSIBLE_CONTROL_USER = os.environ.get("ANSIBLE_CONTROL_NODE_USER", "ollama")
ANSIBLE_PLAYBOOKS_PATH = os.environ.get("ANSIBLE_PLAYBOOKS_PATH", "~/openclaw-v5/ansible/playbooks")
KNOWN_HOSTS_PATH = "/etc/repair-ssh/known_hosts"
def validate_ansible_playbook(playbook_name: str) -> None:
"""
驗證 playbook 名稱在白名單內,防止路徑遍歷攻擊。
白名單從環境變數 ANSIBLE_PLAYBOOK_WHITELIST 讀取ConfigMap 注入)。
Raises:
ValueError: playbook 不在白名單
"""
whitelist_raw = os.environ.get("ANSIBLE_PLAYBOOK_WHITELIST", "")
allowed = {p.strip() for p in whitelist_raw.split(",") if p.strip()}
if "/" in playbook_name or ".." in playbook_name or playbook_name not in allowed:
raise ValueError(
f"Security Block: '{playbook_name}' not in allowed whitelist. "
f"Allowed: {sorted(allowed)}"
)
def parse_uri_command(command: str) -> SshCommandURI:
"""
@@ -164,6 +189,121 @@ class HostRepairAgent:
error="" if success else output,
)
async def repair_by_uri(self, command: str, approved: bool = False) -> HostRepairResult:
"""
根據 URI scheme 路由至對應的執行路徑。
2026-04-06 Claude Code: Sprint 3 T3
"""
try:
uri = parse_uri_command(command)
except ValueError as e:
return HostRepairResult(success=False, layer="", component="", error=str(e))
if uri.scheme == "openclaw":
return await self._execute_openclaw(uri.host_or_layer, uri.payload)
if uri.scheme == "ansible":
try:
validate_ansible_playbook(uri.payload)
except ValueError as e:
return HostRepairResult(success=False, layer="ansible", component=uri.payload, error=str(e))
return await self._execute_ansible(uri.host_or_layer, uri.payload)
if uri.scheme == "ssh":
if not approved:
return HostRepairResult(
success=False,
layer="ssh",
component=uri.payload,
error="ssh:// scheme requires_approval=True — must be explicitly approved",
)
try:
validate_shell_safety(uri.payload)
except ValueError as e:
return HostRepairResult(success=False, layer="ssh", component=uri.payload, error=str(e))
return await self._execute_ssh_direct(uri.host_or_layer, uri.payload)
return HostRepairResult(success=False, layer="", component="", error=f"Unhandled scheme: {uri.scheme}")
async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult:
"""openclaw:// — 呼叫現有的 repair(layer, component) 邏輯"""
return await self.repair(layer=layer, component=component)
async def _execute_ansible(self, _control_host: str, playbook_name: str) -> HostRepairResult:
"""
ansible:// — SSH 至控制節點,執行 ansible-playbook。
2026-04-06 Claude Code: Sprint 3 T3
注意: 強制使用 ConfigMap 的控制節點,忽略 URI 中的 host (安全設計)
"""
host = ANSIBLE_CONTROL_HOST
user = ANSIBLE_CONTROL_USER
playbook_path = f"{ANSIBLE_PLAYBOOKS_PATH}/{playbook_name}"
ssh_command = f"ansible-playbook {playbook_path}"
try:
output = await self._ssh_execute(
host=host,
user=user,
key_path="/etc/repair-ssh/id_ed25519",
command=ssh_command,
)
except asyncio.TimeoutError:
return HostRepairResult(
success=False, layer="ansible", component=playbook_name,
error=f"Ansible SSH timeout after {SSH_TIMEOUT}s",
)
except Exception as e:
return HostRepairResult(
success=False, layer="ansible", component=playbook_name,
error=str(e),
)
success = "REPAIR_OK" in output or "ok=" in output
return HostRepairResult(
success=success,
layer="ansible",
component=playbook_name,
output=output,
error="" if success else output,
)
async def _execute_ssh_direct(self, host_user: str, command: str) -> HostRepairResult:
"""
ssh:// — 直接執行 SSH 命令(需明確 approved=True
host_user 格式: "wooo@192.168.0.110"
2026-04-06 Claude Code: Sprint 3 T3
"""
if "@" in host_user:
user, host = host_user.split("@", 1)
else:
return HostRepairResult(
success=False, layer="ssh", component=command,
error=f"Invalid host_user format '{host_user}' (expected user@host)",
)
try:
output = await self._ssh_execute(
host=host,
user=user,
key_path="/etc/repair-ssh/id_ed25519",
command=command,
)
except asyncio.TimeoutError:
return HostRepairResult(
success=False, layer="ssh", component=command,
error=f"SSH timeout after {SSH_TIMEOUT}s",
)
except Exception as e:
return HostRepairResult(success=False, layer="ssh", component=command, error=str(e))
success = not output.startswith("ERROR")
return HostRepairResult(
success=success,
layer="ssh",
component=command,
output=output,
error="" if success else output,
)
async def _ssh_execute(self, host: str, user: str, key_path: str, command: str) -> str:
"""執行 SSH 命令,回傳 stdout。key_path 由呼叫方傳入,不反查。"""
import time
@@ -172,7 +312,8 @@ class HostRepairAgent:
asyncio.create_subprocess_exec(
"ssh",
"-i", key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "StrictHostKeyChecking=yes",
"-o", f"UserKnownHostsFile={KNOWN_HOSTS_PATH}",
"-o", "BatchMode=yes",
"-o", f"ConnectTimeout={SSH_TIMEOUT}",
f"{user}@{host}",

View File

@@ -209,3 +209,65 @@ class TestValidateShellSafety:
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="too long"):
validate_shell_safety("a" * 513)
import os
from unittest.mock import patch, AsyncMock
class TestAnsibleWhitelist:
def test_allowed_playbook_passes(self):
from src.services.host_repair_agent import validate_ansible_playbook
with patch.dict(os.environ, {"ANSIBLE_PLAYBOOK_WHITELIST": "vacuum_postgres.yml,clear_redis_cache.yml"}):
validate_ansible_playbook("vacuum_postgres.yml") # must not raise
def test_disallowed_playbook_raises(self):
from src.services.host_repair_agent import validate_ansible_playbook
with patch.dict(os.environ, {"ANSIBLE_PLAYBOOK_WHITELIST": "vacuum_postgres.yml"}):
with pytest.raises(ValueError, match="not in allowed whitelist"):
validate_ansible_playbook("evil_script.sh")
def test_path_traversal_blocked(self):
from src.services.host_repair_agent import validate_ansible_playbook
with patch.dict(os.environ, {"ANSIBLE_PLAYBOOK_WHITELIST": "vacuum_postgres.yml"}):
with pytest.raises(ValueError, match="not in allowed whitelist"):
validate_ansible_playbook("../../../etc/passwd")
class TestRepairByUri:
@pytest.mark.asyncio
async def test_openclaw_scheme_calls_repair(self):
from src.services.host_repair_agent import HostRepairAgent, HostRepairResult
agent = HostRepairAgent()
with patch.object(agent, "_execute_openclaw", new_callable=AsyncMock) as mock_oc:
mock_oc.return_value = HostRepairResult(success=True, layer="docker-110", component="sentry", output="REPAIR_OK:sentry")
result = await agent.repair_by_uri("openclaw://docker-110/sentry")
assert result.success is True
mock_oc.assert_awaited_once_with("docker-110", "sentry")
@pytest.mark.asyncio
async def test_ansible_scheme_calls_ansible(self):
from src.services.host_repair_agent import HostRepairAgent, HostRepairResult
agent = HostRepairAgent()
with patch.object(agent, "_execute_ansible", new_callable=AsyncMock) as mock_ans, \
patch.dict(os.environ, {"ANSIBLE_PLAYBOOK_WHITELIST": "vacuum_postgres.yml"}):
mock_ans.return_value = HostRepairResult(success=True, layer="ansible", component="vacuum_postgres.yml", output="REPAIR_OK:ansible")
result = await agent.repair_by_uri("ansible://192.168.0.188/vacuum_postgres.yml")
assert result.success is True
mock_ans.assert_awaited_once_with("192.168.0.188", "vacuum_postgres.yml")
@pytest.mark.asyncio
async def test_ssh_scheme_blocked_without_approval_flag(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
result = await agent.repair_by_uri("ssh://wooo@192.168.0.110/docker ps")
assert result.success is False
assert "requires_approval" in result.error
@pytest.mark.asyncio
async def test_invalid_uri_returns_failure(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
result = await agent.repair_by_uri("bad-format")
assert result.success is False
assert "Unsupported scheme" in result.error