From f8d4772abfde257aa92038ba5c9f62f71c7d2278 Mon Sep 17 00:00:00 2001 From: OG T Date: Tue, 7 Apr 2026 11:09:45 +0800 Subject: [PATCH] fix(api): Sprint 3 P0-1/P0-2/P0-3/P0-4 Critical Security Fixes P0-1: Complete shell metacharacter regex detection - Enhanced _SHELL_METACHAR_RE to detect: >, <, \n, ${}, $() - Prevents all shell injection vectors (redirects, variable expansion, newlines) - Added 5 new validation tests P0-2: Add shlex.quote() protection for ansible playbook path - Wraps playbook_path in shlex.quote() before SSH command construction - Prevents shell injection if path contains special characters - Applied in _execute_ansible() method P0-3: Add SSH target host whitelist validation - Introduces validate_ssh_target_host() function - Only allows SSH to: 192.168.0.110, 192.168.0.188 - Prevents unauthorized SSH target exploitation - Added 5 new whitelist validation tests P0-4: Convert HostRepairAgent to singleton pattern - Implements __new__() singleton with shared _in_process_locks dict - Ensures in-process locks persist across multiple auto_repair_service calls - Previously created new instance per call, making locks ineffective - Added singleton persistence test Test Results: 45/45 passing (34 existing + 11 new P0 tests) All security validations verified via comprehensive unit test coverage. Co-Authored-By: Claude Haiku 4.5 --- apps/api/src/services/host_repair_agent.py | 56 ++++++++++- apps/api/tests/test_host_repair_agent.py | 112 +++++++++++++++++++++ 2 files changed, 164 insertions(+), 4 deletions(-) diff --git a/apps/api/src/services/host_repair_agent.py b/apps/api/src/services/host_repair_agent.py index bdf5f88f..02013112 100644 --- a/apps/api/src/services/host_repair_agent.py +++ b/apps/api/src/services/host_repair_agent.py @@ -3,11 +3,13 @@ src/services/host_repair_agent.py Host Repair Agent — 透過 SSH 執行主機層修復 2026-04-05 Claude Code: Sprint 3 Host Auto-Repair 2026-04-05 Claude Code: C1 修正 — key_path 直接傳入 _ssh_execute,不反查 +2026-04-06 Claude Code: Sprint 3 P0-1/P0-2/P0-3/P0-4 Critical Security Fixes """ import asyncio import os import re import logging +import shlex from dataclasses import dataclass logger = logging.getLogger(__name__) @@ -50,7 +52,9 @@ class SshCommandURI: _SUPPORTED_SCHEMES = {"openclaw", "ansible", "ssh"} -_SHELL_METACHAR_RE = re.compile(r'[;&|`&]|\$\(') +# 2026-04-06 Claude Code: Sprint 3 P0-1 Security Fix — Complete shell metacharacter detection +# Prevents: pipe (|), redirect (>, <), command substitution ($(), ${}, ``), logic ops (;, &) +_SHELL_METACHAR_RE = re.compile(r'[;&|<>`\n]|(\$\{|\$\()') _MAX_COMMAND_LEN = 512 # Ansible 控制節點設定 — 從 env/ConfigMap 讀取 @@ -60,6 +64,9 @@ ANSIBLE_CONTROL_USER = os.environ.get("ANSIBLE_CONTROL_NODE_USER", "ollama") ANSIBLE_PLAYBOOKS_PATH = os.environ.get("ANSIBLE_PLAYBOOKS_PATH", "~/openclaw-v5/ansible/playbooks") KNOWN_HOSTS_PATH = "/etc/repair-known-hosts/known_hosts" +# 2026-04-06 Claude Code: Sprint 3 P0-3 — SSH target whitelist (prevent unauthorized targets) +SSH_TARGET_WHITELIST = {"192.168.0.110", "192.168.0.188"} # Only docker/ansible control nodes + def validate_ansible_playbook(playbook_name: str) -> None: """ @@ -78,6 +85,21 @@ def validate_ansible_playbook(playbook_name: str) -> None: ) +def validate_ssh_target_host(host: str) -> None: + """ + 驗證 SSH 目標主機在白名單內,防止向未授權的主機執行命令。 + 2026-04-06 Claude Code: Sprint 3 P0-3 + + Raises: + ValueError: host 不在白名單 + """ + if host not in SSH_TARGET_WHITELIST: + raise ValueError( + f"Security Block: SSH target '{host}' not in allowed whitelist. " + f"Allowed: {sorted(SSH_TARGET_WHITELIST)}" + ) + + def parse_uri_command(command: str) -> SshCommandURI: """ 解析 SSH_COMMAND URI scheme。 @@ -143,12 +165,26 @@ def build_repair_command(component: str) -> str: class HostRepairAgent: - """透過 SSH 執行主機層修復命令。""" + """透過 SSH 執行主機層修復命令。 + 2026-04-06 Claude Code: Sprint 3 P0-4 — Singleton pattern ensures in-process locks persist + """ + + _instance: "HostRepairAgent | None" = None + + def __new__(cls) -> "HostRepairAgent": + """Singleton: return shared instance across all calls.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + # Initialize only once + cls._instance._in_process_locks = {} + return cls._instance def __init__(self) -> None: # in-process 鎖表 — key: lock_key → asyncio.Lock # 2026-04-06 Claude Code: Sprint 3 T4 - self._in_process_locks: dict[str, asyncio.Lock] = {} + # Only initialize if not already set (singleton pattern) + if not hasattr(self, '_in_process_locks'): + self._in_process_locks: dict[str, asyncio.Lock] = {} def _get_in_process_lock(self, lock_key: str) -> asyncio.Lock: """取得或建立指定 key 的 in-process 鎖。""" @@ -381,12 +417,14 @@ class HostRepairAgent: """ ansible:// — SSH 至控制節點,執行 ansible-playbook。 2026-04-06 Claude Code: Sprint 3 T3 + 2026-04-06 Claude Code: Sprint 3 P0-2 — shlex.quote() prevents path injection 注意: 強制使用 ConfigMap 的控制節點,忽略 URI 中的 host (安全設計) """ host = ANSIBLE_CONTROL_HOST user = ANSIBLE_CONTROL_USER playbook_path = f"{ANSIBLE_PLAYBOOKS_PATH}/{playbook_name}" - ssh_command = f"ansible-playbook {playbook_path}" + # P0-2: Quote playbook_path to prevent shell injection if path contains special chars + ssh_command = f"ansible-playbook {shlex.quote(playbook_path)}" try: output = await self._ssh_execute( @@ -420,6 +458,7 @@ class HostRepairAgent: ssh:// — 直接執行 SSH 命令(需明確 approved=True)。 host_user 格式: "wooo@192.168.0.110" 2026-04-06 Claude Code: Sprint 3 T3 + 2026-04-06 Claude Code: Sprint 3 P0-3 — Validate target host in whitelist """ if "@" in host_user: user, host = host_user.split("@", 1) @@ -428,6 +467,15 @@ class HostRepairAgent: success=False, layer="ssh", component=command, error=f"Invalid host_user format '{host_user}' (expected user@host)", ) + + # P0-3: Validate SSH target is in whitelist + try: + validate_ssh_target_host(host) + except ValueError as e: + return HostRepairResult( + success=False, layer="ssh", component=command, + error=str(e), + ) try: output = await self._ssh_execute( host=host, diff --git a/apps/api/tests/test_host_repair_agent.py b/apps/api/tests/test_host_repair_agent.py index 13272d80..bc42cbbb 100644 --- a/apps/api/tests/test_host_repair_agent.py +++ b/apps/api/tests/test_host_repair_agent.py @@ -325,3 +325,115 @@ class TestRepairLock: blocked = [r for r in results if isinstance(r, HostRepairResult) and not r.success and "already running" in r.error] assert len(successes) == 1 assert len(blocked) == 1 + + @pytest.mark.asyncio + async def test_singleton_lock_persistence(self): + """P0-4: 測試 singleton 模式確保 in-process lock 跨 instance 共享""" + import asyncio + from src.services.host_repair_agent import HostRepairAgent, HostRepairResult + from unittest.mock import AsyncMock, patch + + # Create two instances (should be the same object due to singleton) + agent1 = HostRepairAgent() + agent2 = HostRepairAgent() + + assert agent1 is agent2, "HostRepairAgent should be singleton" + assert agent1._in_process_locks is agent2._in_process_locks, "Locks dict should be shared" + + call_count = 0 + + async def fake_execute(layer, component): + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.05) + return HostRepairResult(success=True, layer=layer, component=component, output="OK") + + # Use agent1 and agent2 in concurrent calls + with patch.object(agent1, "_execute_openclaw", side_effect=fake_execute): + results = await asyncio.gather( + agent1.repair_by_uri("openclaw://docker-110/test"), + agent2.repair_by_uri("openclaw://docker-110/test"), + return_exceptions=True, + ) + + successes = [r for r in results if isinstance(r, HostRepairResult) and r.success] + blocked = [r for r in results if isinstance(r, HostRepairResult) and not r.success and "already running" in r.error] + + assert len(successes) == 1, "First call should succeed" + assert len(blocked) == 1, "Second call should be blocked by shared lock" + + +# ============================================================================= +# P0-1 Tests: Enhanced shell metacharacter detection +# ============================================================================= + +class TestEnhancedShellMetacharDetection: + """2026-04-06 Claude Code: Sprint 3 P0-1 Tests""" + + def test_redirect_out_blocked(self): + """P0-1: > 重導向應被阻擋""" + from src.services.host_repair_agent import validate_shell_safety + with pytest.raises(ValueError, match="Shell metacharacter"): + validate_shell_safety("ls > /tmp/out") + + def test_redirect_in_blocked(self): + """P0-1: < 重導向應被阻擋""" + from src.services.host_repair_agent import validate_shell_safety + with pytest.raises(ValueError, match="Shell metacharacter"): + validate_shell_safety("cat < /etc/passwd") + + def test_newline_blocked(self): + """P0-1: Newline 換行應被阻擋(允許多行命令)""" + from src.services.host_repair_agent import validate_shell_safety + with pytest.raises(ValueError, match="Shell metacharacter"): + validate_shell_safety("ls\nrm -rf /") + + def test_dollar_brace_substitution_blocked(self): + """P0-1: ${ 變數擴展應被阻擋""" + from src.services.host_repair_agent import validate_shell_safety + with pytest.raises(ValueError, match="Shell metacharacter"): + validate_shell_safety("echo ${PATH}") + + def test_safe_simple_command_passes(self): + """P0-1: 簡單命令應通過""" + from src.services.host_repair_agent import validate_shell_safety + validate_shell_safety("docker ps") # must not raise + + +# ============================================================================= +# P0-3 Tests: SSH target host whitelist +# ============================================================================= + +class TestSSHTargetWhitelist: + """2026-04-06 Claude Code: Sprint 3 P0-3 Tests""" + + def test_allowed_host_110_passes(self): + """P0-3: 192.168.0.110 在白名單應通過""" + from src.services.host_repair_agent import validate_ssh_target_host + validate_ssh_target_host("192.168.0.110") # must not raise + + def test_allowed_host_188_passes(self): + """P0-3: 192.168.0.188 在白名單應通過""" + from src.services.host_repair_agent import validate_ssh_target_host + validate_ssh_target_host("192.168.0.188") # must not raise + + def test_unauthorized_host_blocked(self): + """P0-3: 非白名單的主機應被阻擋""" + from src.services.host_repair_agent import validate_ssh_target_host + with pytest.raises(ValueError, match="not in allowed whitelist"): + validate_ssh_target_host("192.168.0.999") + + def test_localhost_blocked(self): + """P0-3: localhost 應被阻擋""" + from src.services.host_repair_agent import validate_ssh_target_host + with pytest.raises(ValueError, match="not in allowed whitelist"): + validate_ssh_target_host("127.0.0.1") + + @pytest.mark.asyncio + async def test_ssh_scheme_with_unauthorized_host_fails(self): + """P0-3: ssh:// URI 指向未授權主機應失敗""" + from src.services.host_repair_agent import HostRepairAgent + agent = HostRepairAgent() + result = await agent.repair_by_uri("ssh://wooo@192.168.0.999/ls", approved=True) + assert result.success is False + assert "not in allowed whitelist" in result.error