diff --git a/apps/api/src/services/host_repair_agent.py b/apps/api/src/services/host_repair_agent.py index 1b307b09..bb5f1c06 100644 --- a/apps/api/src/services/host_repair_agent.py +++ b/apps/api/src/services/host_repair_agent.py @@ -56,6 +56,8 @@ _SUPPORTED_SCHEMES = {"openclaw", "ansible", "ssh"} # Prevents: pipe (|), redirect (>, <), command substitution ($(), ${}, ``), logic ops (;, &) _SHELL_METACHAR_RE = re.compile(r'[;&|<>`\n]|(\$\{|\$\()') _MAX_COMMAND_LEN = 512 +# 2026-04-07 Claude Code: S2 — SSH username 防禦性驗證 (小寫英數+底線+連字號) +_SSH_USER_RE = re.compile(r'^[a-z][a-z0-9_-]{0,31}$') # Ansible 控制節點設定 — 從 env/ConfigMap 讀取 # 2026-04-06 Claude Code: Sprint 3 T3 @@ -100,6 +102,21 @@ def validate_ssh_target_host(host: str) -> None: ) +def validate_ssh_user(user: str) -> None: + """ + 驗證 SSH username 僅含安全字元,防止 user@host 拼接產生非預期行為。 + 2026-04-07 Claude Code: Re-Review S2 — 防禦性工程 + + Raises: + ValueError: username 格式不合法 + """ + if not _SSH_USER_RE.match(user): + raise ValueError( + f"Security Block: SSH user '{user}' contains invalid characters. " + f"Expected: lowercase alphanumeric, underscore, hyphen (1-32 chars)" + ) + + def parse_uri_command(command: str) -> SshCommandURI: """ 解析 SSH_COMMAND URI scheme。 @@ -179,6 +196,13 @@ class HostRepairAgent: cls._instance._in_process_locks = {} return cls._instance + @classmethod + def _reset_for_test(cls) -> None: + """測試專用:重置 singleton 狀態,避免跨測試污染。 + 2026-04-07 Claude Code: Re-Review S3 + """ + cls._instance = None + def __init__(self) -> None: # in-process 鎖表 — key: lock_key → asyncio.Lock # 2026-04-06 Claude Code: Sprint 3 T4 @@ -271,12 +295,58 @@ class HostRepairAgent: logger.error("ssh_command_audit_failed", uri=uri, error=str(e)) # Do not re-raise — audit failure must not affect repair result + async def _execute_and_observe( + self, + command: str, + uri: SshCommandURI, + execute_fn, + ) -> HostRepairResult: + """ + 執行修復 + AuditLog + Langfuse trace 的公用邏輯。 + 2026-04-07 Claude Code: Re-Review S1 — 消除 repair_by_uri 中 3 處重複 + """ + import time as _time + _start = _time.monotonic() + result = await execute_fn() + duration_ms = int((_time.monotonic() - _start) * 1000) + + # AuditLog (fire and forget) + try: + await self._write_audit_log( + uri=command, + success=result.success, + output=result.output, + error=result.error or None, + duration_ms=duration_ms, + ) + except Exception: + pass + + # Langfuse trace (fire and forget) + try: + from src.services.langfuse_client import get_langfuse + lf = get_langfuse() + if lf: + trace = lf.trace(name="ssh_command_repair") + trace.span( + name=f"{uri.scheme}_execute", + input={"uri": command}, + output={"success": result.success, "output": result.output[:500] if result.output else ""}, + metadata={"duration_ms": duration_ms, "scheme": uri.scheme}, + ) + lf.flush() + except Exception as lf_err: + logger.debug("langfuse_trace_skipped", error=str(lf_err)) + + return result + async def repair_by_uri(self, command: str, approved: bool = False) -> HostRepairResult: """ 根據 URI scheme 路由至對應的執行路徑。 2026-04-06 Claude Code: Sprint 3 T3 2026-04-06 Claude Code: Sprint 3 T4 — Redis 冪等鎖防止重複修復 2026-04-06 Claude Code: Sprint 3 T5 — AuditLog + Langfuse Trace + 2026-04-07 Claude Code: Re-Review S1 — 抽取 _execute_and_observe 消除重複 """ try: uri = parse_uri_command(command) @@ -334,30 +404,15 @@ class HostRepairAgent: except Exception: redis_lock = None # Redis 未連線,fail open + # Redis 無法取得 → fail open,直接執行 if redis_lock is None: - import time as _time - _start = _time.monotonic() - result = await _execute() - duration_ms = int((_time.monotonic() - _start) * 1000) - try: - await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms) - except Exception: - pass - return result + return await self._execute_and_observe(command, uri, _execute) try: acquired = await redis_lock.acquire() except Exception: # Redis 不可用,fail open - import time as _time - _start = _time.monotonic() - result = await _execute() - duration_ms = int((_time.monotonic() - _start) * 1000) - try: - await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms) - except Exception: - pass - return result + return await self._execute_and_observe(command, uri, _execute) if not acquired: # Redis 鎖已被其他 Pod 持有 @@ -369,44 +424,13 @@ class HostRepairAgent: ) try: - import time as _time - _start = _time.monotonic() - result = await _execute() - duration_ms = int((_time.monotonic() - _start) * 1000) + result = await self._execute_and_observe(command, uri, _execute) finally: try: await redis_lock.release() except Exception: pass - # AuditLog (fire and forget) - try: - await self._write_audit_log( - uri=command, - success=result.success, - output=result.output, - error=result.error or None, - duration_ms=duration_ms, - ) - except Exception: - pass - - # Langfuse trace (fire and forget) - try: - from src.services.langfuse_client import get_langfuse - lf = get_langfuse() - if lf: - trace = lf.trace(name="ssh_command_repair") - trace.span( - name=f"{uri.scheme}_execute", - input={"uri": command}, - output={"success": result.success, "output": result.output[:500] if result.output else ""}, - metadata={"duration_ms": duration_ms, "scheme": uri.scheme}, - ) - lf.flush() - except Exception as lf_err: - logger.debug("langfuse_trace_skipped", error=str(lf_err)) - return result async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult: @@ -509,7 +533,10 @@ class HostRepairAgent: ) async def _ssh_execute(self, host: str, user: str, key_path: str, command: str) -> str: - """執行 SSH 命令,回傳 stdout。key_path 由呼叫方傳入,不反查。""" + """執行 SSH 命令,回傳 stdout。key_path 由呼叫方傳入,不反查。 + 2026-04-07 Claude Code: Re-Review S2 — 加 user 防禦性驗證 + """ + validate_ssh_user(user) import time deadline = time.monotonic() + SSH_TIMEOUT proc = await asyncio.wait_for( diff --git a/apps/api/tests/test_host_repair_agent.py b/apps/api/tests/test_host_repair_agent.py index bc42cbbb..74548f2d 100644 --- a/apps/api/tests/test_host_repair_agent.py +++ b/apps/api/tests/test_host_repair_agent.py @@ -437,3 +437,74 @@ class TestSSHTargetWhitelist: result = await agent.repair_by_uri("ssh://wooo@192.168.0.999/ls", approved=True) assert result.success is False assert "not in allowed whitelist" in result.error + + +# ============================================================================= +# S2 Tests: SSH username validation +# 2026-04-07 Claude Code: Re-Review S2 +# ============================================================================= + +class TestSSHUserValidation: + """2026-04-07 Claude Code: Re-Review S2 Tests""" + + def test_valid_user_wooo(self): + from src.services.host_repair_agent import validate_ssh_user + validate_ssh_user("wooo") # must not raise + + def test_valid_user_ollama(self): + from src.services.host_repair_agent import validate_ssh_user + validate_ssh_user("ollama") # must not raise + + def test_valid_user_with_underscore(self): + from src.services.host_repair_agent import validate_ssh_user + validate_ssh_user("deploy_user") # must not raise + + def test_invalid_user_with_space(self): + from src.services.host_repair_agent import validate_ssh_user + with pytest.raises(ValueError, match="invalid characters"): + validate_ssh_user("bad user") + + def test_invalid_user_with_at(self): + from src.services.host_repair_agent import validate_ssh_user + with pytest.raises(ValueError, match="invalid characters"): + validate_ssh_user("user@host") + + def test_invalid_user_root(self): + """root 是合法格式但這裡不做權限限制,只驗格式""" + from src.services.host_repair_agent import validate_ssh_user + validate_ssh_user("root") # format-valid, authorization is separate + + def test_invalid_user_empty(self): + from src.services.host_repair_agent import validate_ssh_user + with pytest.raises(ValueError, match="invalid characters"): + validate_ssh_user("") + + def test_invalid_user_starts_with_digit(self): + from src.services.host_repair_agent import validate_ssh_user + with pytest.raises(ValueError, match="invalid characters"): + validate_ssh_user("1baduser") + + +# ============================================================================= +# S3 Tests: Singleton reset for test isolation +# 2026-04-07 Claude Code: Re-Review S3 +# ============================================================================= + +class TestSingletonReset: + """2026-04-07 Claude Code: Re-Review S3 Tests""" + + def test_reset_creates_new_instance(self): + from src.services.host_repair_agent import HostRepairAgent + agent1 = HostRepairAgent() + HostRepairAgent._reset_for_test() + agent2 = HostRepairAgent() + assert agent1 is not agent2, "_reset_for_test should create a fresh instance" + + def test_reset_clears_locks(self): + from src.services.host_repair_agent import HostRepairAgent + agent = HostRepairAgent() + agent._get_in_process_lock("test_key") + assert len(agent._in_process_locks) > 0 + HostRepairAgent._reset_for_test() + new_agent = HostRepairAgent() + assert len(new_agent._in_process_locks) == 0, "Reset should give fresh lock dict"