fix(awooop): use shared redis for approval gates
Some checks failed
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m0s
CD Pipeline / build-and-deploy (push) Failing after 4m6s
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
OG T
2026-05-06 13:18:43 +08:00
parent 56b4d8165b
commit 2c2bf9d665
6 changed files with 266 additions and 19 deletions

View File

@@ -0,0 +1,138 @@
from __future__ import annotations
import pytest
from src.services import awooop_approval_token as approval_token
class FakeRedis:
def __init__(self) -> None:
self.values: dict[str, str] = {}
self.sets: dict[str, set[str]] = {}
self.expirations: dict[str, int] = {}
async def set(
self,
key: str,
value: str,
*,
nx: bool = False,
ex: int | None = None,
) -> bool:
if nx and key in self.values:
return False
self.values[key] = value
if ex is not None:
self.expirations[key] = ex
return True
async def sadd(self, key: str, member: str) -> int:
members = self.sets.setdefault(key, set())
before = len(members)
members.add(member)
return 1 if len(members) > before else 0
async def expire(self, key: str, ttl: int) -> bool:
self.expirations[key] = ttl
return True
async def scard(self, key: str) -> int:
return len(self.sets.get(key, set()))
@pytest.fixture(autouse=True)
def stable_hmac(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(approval_token, "_get_hmac_key", lambda: b"test-hmac-key")
@pytest.mark.asyncio
async def test_record_approval_uses_shared_redis_pool(
monkeypatch: pytest.MonkeyPatch,
) -> None:
redis = FakeRedis()
monkeypatch.setattr(approval_token, "get_redis", lambda: redis)
token = approval_token.issue_approval_token(
project_id="awoooi",
run_id="run-1",
tool_name="operator_console_approve",
approver_id="telegram:123456",
)
count = await approval_token.record_approval(
project_id="awoooi",
run_id="run-1",
tool_name="operator_console_approve",
approver_id="telegram:123456",
token=token,
)
assert count == 1
assert any(key.startswith("awooop_appr:jti:") for key in redis.values)
assert redis.sets[
"awooop_appr:sigs:awoooi:run-1:operator_console_approve"
] == {"telegram:123456"}
@pytest.mark.asyncio
async def test_record_approval_rejects_token_replay(
monkeypatch: pytest.MonkeyPatch,
) -> None:
redis = FakeRedis()
monkeypatch.setattr(approval_token, "get_redis", lambda: redis)
token = approval_token.issue_approval_token(
project_id="awoooi",
run_id="run-1",
tool_name="operator_console_approve",
approver_id="telegram:123456",
)
await approval_token.record_approval(
project_id="awoooi",
run_id="run-1",
tool_name="operator_console_approve",
approver_id="telegram:123456",
token=token,
)
with pytest.raises(approval_token.TokenReplayError):
await approval_token.record_approval(
project_id="awoooi",
run_id="run-1",
tool_name="operator_console_approve",
approver_id="telegram:123456",
token=token,
)
@pytest.mark.asyncio
async def test_check_approval_quorum_uses_shared_redis_pool(
monkeypatch: pytest.MonkeyPatch,
) -> None:
redis = FakeRedis()
redis.sets["awooop_appr:sigs:awoooi:run-1:tool"] = {"operator-a"}
monkeypatch.setattr(approval_token, "get_redis", lambda: redis)
assert await approval_token.check_approval_quorum(
project_id="awoooi",
run_id="run-1",
tool_name="tool",
required_count=1,
)
@pytest.mark.asyncio
async def test_check_approval_quorum_rejects_insufficient_count(
monkeypatch: pytest.MonkeyPatch,
) -> None:
redis = FakeRedis()
monkeypatch.setattr(approval_token, "get_redis", lambda: redis)
with pytest.raises(approval_token.QuorumNotMetError):
await approval_token.check_approval_quorum(
project_id="awoooi",
run_id="run-1",
tool_name="tool",
required_count=1,
)

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import uuid
import pytest
from src.plugins.mcp import gateway as gateway_module
from src.plugins.mcp.gateway import (
GateApprovalError,
GateCheckResult,
GatewayContext,
McpGateway,
)
class FakeRedis:
def __init__(self, values: dict[str, str] | None = None) -> None:
self.values = values or {}
self.closed = False
async def get(self, key: str) -> str | None:
return self.values.get(key)
async def aclose(self) -> None:
self.closed = True
@pytest.mark.asyncio
async def test_gate5_uses_shared_redis_pool_without_closing(
monkeypatch: pytest.MonkeyPatch,
) -> None:
run_id = uuid.uuid4()
approval_key = f"mcp_approval:awoooi:openclaw-sre:k8s_apply:{run_id}"
redis = FakeRedis({approval_key: "approved"})
monkeypatch.setattr(gateway_module, "get_redis", lambda: redis)
gate_result = GateCheckResult()
await McpGateway(db=None)._gate5_approval(
GatewayContext(
project_id="awoooi",
agent_id="openclaw-sre",
tool_name="k8s_apply",
run_id=run_id,
is_shadow=False,
required_scope="write",
),
grant_row=None,
gate_result=gate_result,
)
assert gate_result.gate5_approval is True
assert redis.closed is False
@pytest.mark.asyncio
async def test_gate5_rejects_missing_approval(
monkeypatch: pytest.MonkeyPatch,
) -> None:
redis = FakeRedis()
monkeypatch.setattr(gateway_module, "get_redis", lambda: redis)
with pytest.raises(GateApprovalError):
await McpGateway(db=None)._gate5_approval(
GatewayContext(
project_id="awoooi",
agent_id="openclaw-sre",
tool_name="k8s_apply",
run_id=uuid.uuid4(),
is_shadow=False,
required_scope="write",
),
grant_row=None,
gate_result=GateCheckResult(),
)
@pytest.mark.asyncio
async def test_gate5_read_scope_skips_redis(
monkeypatch: pytest.MonkeyPatch,
) -> None:
called = False
def fail_if_called() -> FakeRedis:
nonlocal called
called = True
return FakeRedis()
monkeypatch.setattr(gateway_module, "get_redis", fail_if_called)
gate_result = GateCheckResult()
await McpGateway(db=None)._gate5_approval(
GatewayContext(
project_id="awoooi",
agent_id="openclaw-sre",
tool_name="k8s_get",
is_shadow=False,
required_scope="read",
),
grant_row=None,
gate_result=gate_result,
)
assert called is False
assert gate_result.gate5_approval is True