refactor(api): Re-Review S1/S2/S3 改善 — 消除重複+防禦性驗證+測試隔離

S1: 抽取 _execute_and_observe() 公用方法
  - 消除 repair_by_uri 中 3 處重複的 execute+audit+langfuse 邏輯
  - 統一 AuditLog + Langfuse trace 寫入路徑

S2: SSH username 防禦性驗證
  - 新增 validate_ssh_user() + _SSH_USER_RE 正則
  - 在 _ssh_execute() 入口驗證 user 參數
  - 防止 user@host 拼接產生非預期行為
  - 新增 8 個 username 驗證測試

S3: Singleton 測試重置
  - 新增 _reset_for_test() classmethod
  - 避免跨測試狀態污染
  - 新增 2 個 singleton reset 測試

測試: 55/55 全數通過 (原 45 + 新 10)
首席架構師 Re-Review: 91/100  通過,3 個 Suggestion 全數實裝

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-07 11:17:40 +08:00
parent 78a8d3dfa5
commit 2fe8062fb8
2 changed files with 149 additions and 51 deletions

View File

@@ -56,6 +56,8 @@ _SUPPORTED_SCHEMES = {"openclaw", "ansible", "ssh"}
# Prevents: pipe (|), redirect (>, <), command substitution ($(), ${}, ``), logic ops (;, &)
_SHELL_METACHAR_RE = re.compile(r'[;&|<>`\n]|(\$\{|\$\()')
_MAX_COMMAND_LEN = 512
# 2026-04-07 Claude Code: S2 — SSH username 防禦性驗證 (小寫英數+底線+連字號)
_SSH_USER_RE = re.compile(r'^[a-z][a-z0-9_-]{0,31}$')
# Ansible 控制節點設定 — 從 env/ConfigMap 讀取
# 2026-04-06 Claude Code: Sprint 3 T3
@@ -100,6 +102,21 @@ def validate_ssh_target_host(host: str) -> None:
)
def validate_ssh_user(user: str) -> None:
"""
驗證 SSH username 僅含安全字元,防止 user@host 拼接產生非預期行為。
2026-04-07 Claude Code: Re-Review S2 — 防禦性工程
Raises:
ValueError: username 格式不合法
"""
if not _SSH_USER_RE.match(user):
raise ValueError(
f"Security Block: SSH user '{user}' contains invalid characters. "
f"Expected: lowercase alphanumeric, underscore, hyphen (1-32 chars)"
)
def parse_uri_command(command: str) -> SshCommandURI:
"""
解析 SSH_COMMAND URI scheme。
@@ -179,6 +196,13 @@ class HostRepairAgent:
cls._instance._in_process_locks = {}
return cls._instance
@classmethod
def _reset_for_test(cls) -> None:
"""測試專用:重置 singleton 狀態,避免跨測試污染。
2026-04-07 Claude Code: Re-Review S3
"""
cls._instance = None
def __init__(self) -> None:
# in-process 鎖表 — key: lock_key → asyncio.Lock
# 2026-04-06 Claude Code: Sprint 3 T4
@@ -271,12 +295,58 @@ class HostRepairAgent:
logger.error("ssh_command_audit_failed", uri=uri, error=str(e))
# Do not re-raise — audit failure must not affect repair result
async def _execute_and_observe(
self,
command: str,
uri: SshCommandURI,
execute_fn,
) -> HostRepairResult:
"""
執行修復 + AuditLog + Langfuse trace 的公用邏輯。
2026-04-07 Claude Code: Re-Review S1 — 消除 repair_by_uri 中 3 處重複
"""
import time as _time
_start = _time.monotonic()
result = await execute_fn()
duration_ms = int((_time.monotonic() - _start) * 1000)
# AuditLog (fire and forget)
try:
await self._write_audit_log(
uri=command,
success=result.success,
output=result.output,
error=result.error or None,
duration_ms=duration_ms,
)
except Exception:
pass
# Langfuse trace (fire and forget)
try:
from src.services.langfuse_client import get_langfuse
lf = get_langfuse()
if lf:
trace = lf.trace(name="ssh_command_repair")
trace.span(
name=f"{uri.scheme}_execute",
input={"uri": command},
output={"success": result.success, "output": result.output[:500] if result.output else ""},
metadata={"duration_ms": duration_ms, "scheme": uri.scheme},
)
lf.flush()
except Exception as lf_err:
logger.debug("langfuse_trace_skipped", error=str(lf_err))
return result
async def repair_by_uri(self, command: str, approved: bool = False) -> HostRepairResult:
"""
根據 URI scheme 路由至對應的執行路徑。
2026-04-06 Claude Code: Sprint 3 T3
2026-04-06 Claude Code: Sprint 3 T4 — Redis 冪等鎖防止重複修復
2026-04-06 Claude Code: Sprint 3 T5 — AuditLog + Langfuse Trace
2026-04-07 Claude Code: Re-Review S1 — 抽取 _execute_and_observe 消除重複
"""
try:
uri = parse_uri_command(command)
@@ -334,30 +404,15 @@ class HostRepairAgent:
except Exception:
redis_lock = None # Redis 未連線fail open
# Redis 無法取得 → fail open直接執行
if redis_lock is None:
import time as _time
_start = _time.monotonic()
result = await _execute()
duration_ms = int((_time.monotonic() - _start) * 1000)
try:
await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms)
except Exception:
pass
return result
return await self._execute_and_observe(command, uri, _execute)
try:
acquired = await redis_lock.acquire()
except Exception:
# Redis 不可用fail open
import time as _time
_start = _time.monotonic()
result = await _execute()
duration_ms = int((_time.monotonic() - _start) * 1000)
try:
await self._write_audit_log(uri=command, success=result.success, output=result.output, error=result.error or None, duration_ms=duration_ms)
except Exception:
pass
return result
return await self._execute_and_observe(command, uri, _execute)
if not acquired:
# Redis 鎖已被其他 Pod 持有
@@ -369,44 +424,13 @@ class HostRepairAgent:
)
try:
import time as _time
_start = _time.monotonic()
result = await _execute()
duration_ms = int((_time.monotonic() - _start) * 1000)
result = await self._execute_and_observe(command, uri, _execute)
finally:
try:
await redis_lock.release()
except Exception:
pass
# AuditLog (fire and forget)
try:
await self._write_audit_log(
uri=command,
success=result.success,
output=result.output,
error=result.error or None,
duration_ms=duration_ms,
)
except Exception:
pass
# Langfuse trace (fire and forget)
try:
from src.services.langfuse_client import get_langfuse
lf = get_langfuse()
if lf:
trace = lf.trace(name="ssh_command_repair")
trace.span(
name=f"{uri.scheme}_execute",
input={"uri": command},
output={"success": result.success, "output": result.output[:500] if result.output else ""},
metadata={"duration_ms": duration_ms, "scheme": uri.scheme},
)
lf.flush()
except Exception as lf_err:
logger.debug("langfuse_trace_skipped", error=str(lf_err))
return result
async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult:
@@ -509,7 +533,10 @@ class HostRepairAgent:
)
async def _ssh_execute(self, host: str, user: str, key_path: str, command: str) -> str:
"""執行 SSH 命令,回傳 stdout。key_path 由呼叫方傳入,不反查。"""
"""執行 SSH 命令,回傳 stdout。key_path 由呼叫方傳入,不反查。
2026-04-07 Claude Code: Re-Review S2 — 加 user 防禦性驗證
"""
validate_ssh_user(user)
import time
deadline = time.monotonic() + SSH_TIMEOUT
proc = await asyncio.wait_for(

View File

@@ -437,3 +437,74 @@ class TestSSHTargetWhitelist:
result = await agent.repair_by_uri("ssh://wooo@192.168.0.999/ls", approved=True)
assert result.success is False
assert "not in allowed whitelist" in result.error
# =============================================================================
# S2 Tests: SSH username validation
# 2026-04-07 Claude Code: Re-Review S2
# =============================================================================
class TestSSHUserValidation:
"""2026-04-07 Claude Code: Re-Review S2 Tests"""
def test_valid_user_wooo(self):
from src.services.host_repair_agent import validate_ssh_user
validate_ssh_user("wooo") # must not raise
def test_valid_user_ollama(self):
from src.services.host_repair_agent import validate_ssh_user
validate_ssh_user("ollama") # must not raise
def test_valid_user_with_underscore(self):
from src.services.host_repair_agent import validate_ssh_user
validate_ssh_user("deploy_user") # must not raise
def test_invalid_user_with_space(self):
from src.services.host_repair_agent import validate_ssh_user
with pytest.raises(ValueError, match="invalid characters"):
validate_ssh_user("bad user")
def test_invalid_user_with_at(self):
from src.services.host_repair_agent import validate_ssh_user
with pytest.raises(ValueError, match="invalid characters"):
validate_ssh_user("user@host")
def test_invalid_user_root(self):
"""root 是合法格式但這裡不做權限限制,只驗格式"""
from src.services.host_repair_agent import validate_ssh_user
validate_ssh_user("root") # format-valid, authorization is separate
def test_invalid_user_empty(self):
from src.services.host_repair_agent import validate_ssh_user
with pytest.raises(ValueError, match="invalid characters"):
validate_ssh_user("")
def test_invalid_user_starts_with_digit(self):
from src.services.host_repair_agent import validate_ssh_user
with pytest.raises(ValueError, match="invalid characters"):
validate_ssh_user("1baduser")
# =============================================================================
# S3 Tests: Singleton reset for test isolation
# 2026-04-07 Claude Code: Re-Review S3
# =============================================================================
class TestSingletonReset:
"""2026-04-07 Claude Code: Re-Review S3 Tests"""
def test_reset_creates_new_instance(self):
from src.services.host_repair_agent import HostRepairAgent
agent1 = HostRepairAgent()
HostRepairAgent._reset_for_test()
agent2 = HostRepairAgent()
assert agent1 is not agent2, "_reset_for_test should create a fresh instance"
def test_reset_clears_locks(self):
from src.services.host_repair_agent import HostRepairAgent
agent = HostRepairAgent()
agent._get_in_process_lock("test_key")
assert len(agent._in_process_locks) > 0
HostRepairAgent._reset_for_test()
new_agent = HostRepairAgent()
assert len(new_agent._in_process_locks) == 0, "Reset should give fresh lock dict"