feat(api): Redis 冪等鎖防止重複修復 (Sprint 3 T4)

雙層鎖設計: in-process asyncio.Lock (必定生效) + Redis 分散式鎖 (跨 Pod best-effort)
同一 URI 的第二次修復呼叫立即返回 "already running" 錯誤

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-06 14:26:53 +08:00
parent 1a654aa37d
commit 4561f141bb
2 changed files with 110 additions and 17 deletions

View File

@@ -145,6 +145,17 @@ def build_repair_command(component: str) -> str:
class HostRepairAgent:
"""透過 SSH 執行主機層修復命令。"""
def __init__(self) -> None:
# in-process 鎖表 — key: lock_key → asyncio.Lock
# 2026-04-06 Claude Code: Sprint 3 T4
self._in_process_locks: dict[str, asyncio.Lock] = {}
def _get_in_process_lock(self, lock_key: str) -> asyncio.Lock:
"""取得或建立指定 key 的 in-process 鎖。"""
if lock_key not in self._in_process_locks:
self._in_process_locks[lock_key] = asyncio.Lock()
return self._in_process_locks[lock_key]
async def repair(self, layer: str, component: str) -> HostRepairResult:
"""執行修復並回傳結果。"""
try:
@@ -193,37 +204,89 @@ class HostRepairAgent:
"""
根據 URI scheme 路由至對應的執行路徑。
2026-04-06 Claude Code: Sprint 3 T3
2026-04-06 Claude Code: Sprint 3 T4 — Redis 冪等鎖防止重複修復
"""
try:
uri = parse_uri_command(command)
except ValueError as e:
return HostRepairResult(success=False, layer="", component="", error=str(e))
if uri.scheme == "openclaw":
return await self._execute_openclaw(uri.host_or_layer, uri.payload)
# 冪等鎖 — 防止同一 component 並發修復
# 雙層鎖: in-process asyncio.Lock (必定生效) + Redis 分散式鎖 (best-effort)
# 2026-04-06 Claude Code: Sprint 3 T4
lock_key = f"repair_lock:ssh_command:{uri.scheme}:{uri.host_or_layer}:{uri.payload}"
in_process_lock = self._get_in_process_lock(lock_key)
if uri.scheme == "ansible":
async def _execute() -> HostRepairResult:
if uri.scheme == "openclaw":
return await self._execute_openclaw(uri.host_or_layer, uri.payload)
if uri.scheme == "ansible":
try:
validate_ansible_playbook(uri.payload)
except ValueError as e:
return HostRepairResult(success=False, layer="ansible", component=uri.payload, error=str(e))
return await self._execute_ansible(uri.host_or_layer, uri.payload)
if uri.scheme == "ssh":
if not approved:
return HostRepairResult(
success=False,
layer="ssh",
component=uri.payload,
error="ssh:// scheme requires_approval=True — must be explicitly approved",
)
try:
validate_shell_safety(uri.payload)
except ValueError as e:
return HostRepairResult(success=False, layer="ssh", component=uri.payload, error=str(e))
return await self._execute_ssh_direct(uri.host_or_layer, uri.payload)
return HostRepairResult(success=False, layer="", component="", error=f"Unhandled scheme: {uri.scheme}")
# in-process 鎖: locked() 代表正在進行,立即拒絕重複
if in_process_lock.locked():
return HostRepairResult(
success=False,
layer=uri.scheme,
component=uri.payload,
error=f"Repair already running for {uri.scheme}://{uri.host_or_layer}/{uri.payload}",
)
async with in_process_lock:
# Redis 分散式鎖 (best-effort跨 Pod)
# blocking_timeout=0: 立即失敗,不等待
try:
validate_ansible_playbook(uri.payload)
except ValueError as e:
return HostRepairResult(success=False, layer="ansible", component=uri.payload, error=str(e))
return await self._execute_ansible(uri.host_or_layer, uri.payload)
from src.core.redis_client import RedisLock
redis_lock: RedisLock | None = RedisLock(lock_key, timeout=120, blocking_timeout=0)
except Exception:
redis_lock = None # Redis 未連線fail open
if uri.scheme == "ssh":
if not approved:
if redis_lock is None:
return await _execute()
try:
acquired = await redis_lock.acquire()
except Exception:
# Redis 不可用fail open
return await _execute()
if not acquired:
# Redis 鎖已被其他 Pod 持有
return HostRepairResult(
success=False,
layer="ssh",
layer=uri.scheme,
component=uri.payload,
error="ssh:// scheme requires_approval=True — must be explicitly approved",
error=f"Repair already running for {uri.scheme}://{uri.host_or_layer}/{uri.payload}",
)
try:
validate_shell_safety(uri.payload)
except ValueError as e:
return HostRepairResult(success=False, layer="ssh", component=uri.payload, error=str(e))
return await self._execute_ssh_direct(uri.host_or_layer, uri.payload)
return HostRepairResult(success=False, layer="", component="", error=f"Unhandled scheme: {uri.scheme}")
try:
return await _execute()
finally:
try:
await redis_lock.release()
except Exception:
pass
async def _execute_openclaw(self, layer: str, component: str) -> HostRepairResult:
"""openclaw:// — 呼叫現有的 repair(layer, component) 邏輯"""

View File

@@ -271,3 +271,33 @@ class TestRepairByUri:
result = await agent.repair_by_uri("bad-format")
assert result.success is False
assert "Unsupported scheme" in result.error
class TestRepairLock:
@pytest.mark.asyncio
async def test_duplicate_repair_is_blocked(self):
"""同一個 component 的修復,第二次呼叫應被 lock 阻擋"""
import asyncio
from src.services.host_repair_agent import HostRepairAgent, HostRepairResult
from unittest.mock import AsyncMock, patch
agent = HostRepairAgent()
call_count = 0
async def fake_execute_openclaw(layer, component):
nonlocal call_count
call_count += 1
await asyncio.sleep(0.1) # simulate work
return HostRepairResult(success=True, layer=layer, component=component, output="REPAIR_OK:test")
with patch.object(agent, "_execute_openclaw", side_effect=fake_execute_openclaw):
results = await asyncio.gather(
agent.repair_by_uri("openclaw://docker-110/sentry"),
agent.repair_by_uri("openclaw://docker-110/sentry"),
return_exceptions=True,
)
successes = [r for r in results if isinstance(r, HostRepairResult) and r.success]
blocked = [r for r in results if isinstance(r, HostRepairResult) and not r.success and "already running" in r.error]
assert len(successes) == 1
assert len(blocked) == 1