diff --git a/apps/api/src/services/host_repair_agent.py b/apps/api/src/services/host_repair_agent.py index 24208368..e53c486e 100644 --- a/apps/api/src/services/host_repair_agent.py +++ b/apps/api/src/services/host_repair_agent.py @@ -145,6 +145,17 @@ def build_repair_command(component: str) -> str: class HostRepairAgent: """透過 SSH 執行主機層修復命令。""" + def __init__(self) -> None: + # in-process 鎖表 — key: lock_key → asyncio.Lock + # 2026-04-06 Claude Code: Sprint 3 T4 + self._in_process_locks: dict[str, asyncio.Lock] = {} + + def _get_in_process_lock(self, lock_key: str) -> asyncio.Lock: + """取得或建立指定 key 的 in-process 鎖。""" + if lock_key not in self._in_process_locks: + self._in_process_locks[lock_key] = asyncio.Lock() + return self._in_process_locks[lock_key] + async def repair(self, layer: str, component: str) -> HostRepairResult: """執行修復並回傳結果。""" try: @@ -193,37 +204,89 @@ class HostRepairAgent: """ 根據 URI scheme 路由至對應的執行路徑。 2026-04-06 Claude Code: Sprint 3 T3 + 2026-04-06 Claude Code: Sprint 3 T4 — Redis 冪等鎖防止重複修復 """ try: uri = parse_uri_command(command) except ValueError as e: return HostRepairResult(success=False, layer="", component="", error=str(e)) - if uri.scheme == "openclaw": - return await self._execute_openclaw(uri.host_or_layer, uri.payload) + # 冪等鎖 — 防止同一 component 並發修復 + # 雙層鎖: in-process asyncio.Lock (必定生效) + Redis 分散式鎖 (best-effort) + # 2026-04-06 Claude Code: Sprint 3 T4 + lock_key = f"repair_lock:ssh_command:{uri.scheme}:{uri.host_or_layer}:{uri.payload}" + in_process_lock = self._get_in_process_lock(lock_key) - if uri.scheme == "ansible": + async def _execute() -> HostRepairResult: + if uri.scheme == "openclaw": + return await self._execute_openclaw(uri.host_or_layer, uri.payload) + + if uri.scheme == "ansible": + try: + validate_ansible_playbook(uri.payload) + except ValueError as e: + return HostRepairResult(success=False, layer="ansible", component=uri.payload, error=str(e)) + return await self._execute_ansible(uri.host_or_layer, uri.payload) + + if uri.scheme == "ssh": + if not approved: + return HostRepairResult( + success=False, + layer="ssh", + component=uri.payload, + error="ssh:// scheme requires_approval=True — must be explicitly approved", + ) + try: + validate_shell_safety(uri.payload) + except ValueError as e: + return HostRepairResult(success=False, layer="ssh", component=uri.payload, error=str(e)) + return await self._execute_ssh_direct(uri.host_or_layer, uri.payload) + + return HostRepairResult(success=False, layer="", component="", error=f"Unhandled scheme: {uri.scheme}") + + # in-process 鎖: locked() 代表正在進行,立即拒絕重複 + if in_process_lock.locked(): + return HostRepairResult( + success=False, + layer=uri.scheme, + component=uri.payload, + error=f"Repair already running for {uri.scheme}://{uri.host_or_layer}/{uri.payload}", + ) + + async with in_process_lock: + # Redis 分散式鎖 (best-effort,跨 Pod) + # blocking_timeout=0: 立即失敗,不等待 try: - validate_ansible_playbook(uri.payload) - except ValueError as e: - return HostRepairResult(success=False, layer="ansible", component=uri.payload, error=str(e)) - return await self._execute_ansible(uri.host_or_layer, uri.payload) + from src.core.redis_client import RedisLock + redis_lock: RedisLock | None = RedisLock(lock_key, timeout=120, blocking_timeout=0) + except Exception: + redis_lock = None # Redis 未連線,fail open - if uri.scheme == "ssh": - if not approved: + if redis_lock is None: + return await _execute() + + try: + acquired = await redis_lock.acquire() + except Exception: + # Redis 不可用,fail open + return await _execute() + + if not acquired: + # Redis 鎖已被其他 Pod 持有 return HostRepairResult( success=False, - layer="ssh", + layer=uri.scheme, component=uri.payload, - error="ssh:// scheme requires_approval=True — must be explicitly approved", + error=f"Repair already running for {uri.scheme}://{uri.host_or_layer}/{uri.payload}", ) - try: - validate_shell_safety(uri.payload) - except ValueError as e: - return HostRepairResult(success=False, layer="ssh", component=uri.payload, error=str(e)) - return await self._execute_ssh_direct(uri.host_or_layer, uri.payload) - return HostRepairResult(success=False, layer="", component="", error=f"Unhandled scheme: {uri.scheme}") + try: + return await _execute() + finally: + try: + await redis_lock.release() + except Exception: + pass async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult: """openclaw:// — 呼叫現有的 repair(layer, component) 邏輯""" diff --git a/apps/api/tests/test_host_repair_agent.py b/apps/api/tests/test_host_repair_agent.py index a3530fac..38e70c0f 100644 --- a/apps/api/tests/test_host_repair_agent.py +++ b/apps/api/tests/test_host_repair_agent.py @@ -271,3 +271,33 @@ class TestRepairByUri: result = await agent.repair_by_uri("bad-format") assert result.success is False assert "Unsupported scheme" in result.error + + +class TestRepairLock: + @pytest.mark.asyncio + async def test_duplicate_repair_is_blocked(self): + """同一個 component 的修復,第二次呼叫應被 lock 阻擋""" + import asyncio + from src.services.host_repair_agent import HostRepairAgent, HostRepairResult + from unittest.mock import AsyncMock, patch + agent = HostRepairAgent() + + call_count = 0 + + async def fake_execute_openclaw(layer, component): + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.1) # simulate work + return HostRepairResult(success=True, layer=layer, component=component, output="REPAIR_OK:test") + + with patch.object(agent, "_execute_openclaw", side_effect=fake_execute_openclaw): + results = await asyncio.gather( + agent.repair_by_uri("openclaw://docker-110/sentry"), + agent.repair_by_uri("openclaw://docker-110/sentry"), + return_exceptions=True, + ) + + successes = [r for r in results if isinstance(r, HostRepairResult) and r.success] + blocked = [r for r in results if isinstance(r, HostRepairResult) and not r.success and "already running" in r.error] + assert len(successes) == 1 + assert len(blocked) == 1