325 lines
10 KiB
Python
325 lines
10 KiB
Python
from types import SimpleNamespace
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
|
|
from src.api.v1 import telegram as telegram_api
|
|
|
|
|
|
class _FakeGateway:
|
|
def __init__(self, result: dict) -> None:
|
|
self.result = result
|
|
self.mirror_calls: list[dict] = []
|
|
|
|
async def handle_callback(self, **_kwargs):
|
|
return self.result
|
|
|
|
async def mirror_callback_query_received(self, **kwargs):
|
|
self.mirror_calls.append(kwargs)
|
|
return "event-1"
|
|
|
|
|
|
class _FakeApprovalService:
|
|
def __init__(
|
|
self,
|
|
approval,
|
|
execution_triggered: bool,
|
|
sign_message: str = "Approval complete",
|
|
) -> None:
|
|
self.approval = approval
|
|
self.execution_triggered = execution_triggered
|
|
self.sign_message = sign_message
|
|
|
|
async def sign_approval(self, **_kwargs):
|
|
return self.approval, self.sign_message, self.execution_triggered
|
|
|
|
async def reject_approval(self, **_kwargs):
|
|
return self.approval, "Approval rejected"
|
|
|
|
|
|
class _FakeAlertOperationLogRepository:
|
|
def __init__(self) -> None:
|
|
self.rows: list[dict] = []
|
|
|
|
async def append(self, *args, **kwargs):
|
|
self.rows.append({"args": args, "kwargs": kwargs})
|
|
|
|
|
|
def _callback_update(callback_data: str) -> telegram_api.TelegramUpdate:
|
|
return telegram_api.TelegramUpdate(
|
|
update_id=123,
|
|
callback_query={
|
|
"id": "callback-1",
|
|
"data": callback_data,
|
|
"from": {"id": 42, "username": "ops"},
|
|
"message": {"message_id": 99, "text": "ACTION REQUIRED"},
|
|
},
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_approval_schedules_executor_after_required_signature(monkeypatch):
|
|
approval_id = "11111111-1111-1111-1111-111111111111"
|
|
approval = SimpleNamespace(
|
|
id=UUID(approval_id),
|
|
status=SimpleNamespace(value="approved"),
|
|
incident_id="INC-20260513-TGEXEC",
|
|
)
|
|
finalizer_calls: list[dict] = []
|
|
op_log_repo = _FakeAlertOperationLogRepository()
|
|
|
|
async def fake_finalize(*, approval, execution_triggered: bool) -> bool:
|
|
finalizer_calls.append({
|
|
"approval_id": str(approval.id),
|
|
"incident_id": approval.incident_id,
|
|
"execution_triggered": execution_triggered,
|
|
})
|
|
return True
|
|
|
|
fake_gateway = _FakeGateway({
|
|
"success": True,
|
|
"action": "approve",
|
|
"approval_id": approval_id,
|
|
"user": {"id": 42, "username": "ops"},
|
|
})
|
|
monkeypatch.setattr(telegram_api, "get_telegram_gateway", lambda: fake_gateway)
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_approval_service",
|
|
lambda: _FakeApprovalService(approval, execution_triggered=True),
|
|
)
|
|
monkeypatch.setattr(telegram_api, "_finalize_telegram_approval", fake_finalize)
|
|
monkeypatch.setattr(
|
|
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
|
|
lambda: op_log_repo,
|
|
)
|
|
|
|
result = await telegram_api.telegram_webhook(_callback_update(f"approve:{approval_id}:ts:nonce"))
|
|
|
|
assert result["ok"] is True
|
|
assert result["message"] == "Approved"
|
|
assert result["execution_triggered"] is True
|
|
assert result["execution_scheduled"] is True
|
|
assert fake_gateway.mirror_calls == [{
|
|
"update_id": 123,
|
|
"callback_query_id": "callback-1",
|
|
"callback_data": f"approve:{approval_id}:ts:nonce",
|
|
"user_id": 42,
|
|
"username": "ops",
|
|
"message_id": 99,
|
|
"chat_id": None,
|
|
}]
|
|
assert finalizer_calls == [{
|
|
"approval_id": approval_id,
|
|
"incident_id": "INC-20260513-TGEXEC",
|
|
"execution_triggered": True,
|
|
}]
|
|
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "approve"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_approval_suppresses_executor_for_no_action(monkeypatch):
|
|
approval_id = "55555555-5555-5555-5555-555555555555"
|
|
approval = SimpleNamespace(
|
|
id=UUID(approval_id),
|
|
status=SimpleNamespace(value="approved"),
|
|
incident_id="INC-20260611-NOEXEC",
|
|
action="NO_ACTION - REPAIR_CANDIDATE_MISSING: LLM 分析失敗",
|
|
)
|
|
finalizer_calls: list[dict] = []
|
|
op_log_repo = _FakeAlertOperationLogRepository()
|
|
|
|
async def fake_finalize(*, approval, execution_triggered: bool) -> bool:
|
|
finalizer_calls.append({
|
|
"approval_id": str(approval.id),
|
|
"execution_triggered": execution_triggered,
|
|
})
|
|
return False
|
|
|
|
fake_gateway = _FakeGateway({
|
|
"success": True,
|
|
"action": "approve",
|
|
"approval_id": approval_id,
|
|
"user": {"id": 42, "username": "ops"},
|
|
})
|
|
monkeypatch.setattr(telegram_api, "get_telegram_gateway", lambda: fake_gateway)
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_approval_service",
|
|
lambda: _FakeApprovalService(approval, execution_triggered=True),
|
|
)
|
|
monkeypatch.setattr(telegram_api, "_finalize_telegram_approval", fake_finalize)
|
|
monkeypatch.setattr(
|
|
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
|
|
lambda: op_log_repo,
|
|
)
|
|
|
|
result = await telegram_api.telegram_webhook(_callback_update(f"approve:{approval_id}:ts:nonce"))
|
|
|
|
assert result["ok"] is True
|
|
assert result["message"] == "ApprovedWithoutExecution"
|
|
assert result["execution_triggered"] is True
|
|
assert result["execution_scheduled"] is False
|
|
assert result["execution_suppressed"] is True
|
|
assert finalizer_calls == [{
|
|
"approval_id": approval_id,
|
|
"execution_triggered": True,
|
|
}]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_approval_duplicate_does_not_schedule_executor(monkeypatch):
|
|
approval_id = "33333333-3333-3333-3333-333333333333"
|
|
approval = SimpleNamespace(
|
|
id=UUID(approval_id),
|
|
status=SimpleNamespace(value="execution_success"),
|
|
incident_id="INC-20260531-DUPE",
|
|
)
|
|
finalizer_calls: list[dict] = []
|
|
op_log_repo = _FakeAlertOperationLogRepository()
|
|
|
|
async def fake_finalize(*, approval, execution_triggered: bool) -> bool:
|
|
finalizer_calls.append({
|
|
"approval_id": str(approval.id),
|
|
"execution_triggered": execution_triggered,
|
|
})
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_telegram_gateway",
|
|
lambda: _FakeGateway({
|
|
"success": True,
|
|
"action": "approve",
|
|
"approval_id": approval_id,
|
|
"user": {"id": 42, "username": "ops"},
|
|
}),
|
|
)
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_approval_service",
|
|
lambda: _FakeApprovalService(
|
|
approval,
|
|
execution_triggered=False,
|
|
sign_message="Cannot sign: status is execution_success",
|
|
),
|
|
)
|
|
monkeypatch.setattr(telegram_api, "_finalize_telegram_approval", fake_finalize)
|
|
monkeypatch.setattr(
|
|
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
|
|
lambda: op_log_repo,
|
|
)
|
|
|
|
result = await telegram_api.telegram_webhook(_callback_update(f"approve:{approval_id}:ts:nonce"))
|
|
|
|
assert result["ok"] is True
|
|
assert result["message"] == "Already processed"
|
|
assert result["execution_triggered"] is False
|
|
assert result["execution_scheduled"] is False
|
|
assert finalizer_calls == []
|
|
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "approve_duplicate"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_rejection_syncs_incident_state(monkeypatch):
|
|
approval_id = "22222222-2222-2222-2222-222222222222"
|
|
approval = SimpleNamespace(
|
|
id=UUID(approval_id),
|
|
status=SimpleNamespace(value="rejected"),
|
|
incident_id="INC-20260513-TGREJ",
|
|
)
|
|
sync_calls: list[str] = []
|
|
op_log_repo = _FakeAlertOperationLogRepository()
|
|
|
|
async def fake_sync(rejected_approval_id: str) -> bool:
|
|
sync_calls.append(rejected_approval_id)
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_telegram_gateway",
|
|
lambda: _FakeGateway({
|
|
"success": True,
|
|
"action": "reject",
|
|
"approval_id": approval_id,
|
|
"user": {"id": 42, "username": "ops"},
|
|
}),
|
|
)
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_approval_service",
|
|
lambda: _FakeApprovalService(approval, execution_triggered=False),
|
|
)
|
|
monkeypatch.setattr(telegram_api, "_sync_telegram_rejection", fake_sync)
|
|
monkeypatch.setattr(
|
|
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
|
|
lambda: op_log_repo,
|
|
)
|
|
|
|
result = await telegram_api.telegram_webhook(_callback_update(f"reject:{approval_id}:ts:nonce"))
|
|
|
|
assert result["ok"] is True
|
|
assert result["message"] == "Rejected"
|
|
assert result["incident_synced"] is True
|
|
assert sync_calls == [approval_id]
|
|
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "reject"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_finalize_telegram_approval_runs_executor_task(monkeypatch):
|
|
executed: list[str] = []
|
|
approval = SimpleNamespace(
|
|
id=UUID("33333333-3333-3333-3333-333333333333"),
|
|
incident_id="INC-20260513-TGRUN",
|
|
)
|
|
|
|
class _FakeExecutionService:
|
|
async def execute_approved_action(self, received_approval):
|
|
executed.append(str(received_approval.id))
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_execution_service",
|
|
lambda: _FakeExecutionService(),
|
|
)
|
|
|
|
scheduled = await telegram_api._finalize_telegram_approval(
|
|
approval=approval,
|
|
execution_triggered=True,
|
|
)
|
|
|
|
assert scheduled is True
|
|
await telegram_api.asyncio.sleep(0)
|
|
assert executed == ["33333333-3333-3333-3333-333333333333"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_finalize_telegram_approval_does_not_schedule_no_action(monkeypatch):
|
|
executed: list[str] = []
|
|
approval = SimpleNamespace(
|
|
id=UUID("66666666-6666-6666-6666-666666666666"),
|
|
incident_id="INC-20260611-NOOP",
|
|
action="OBSERVE",
|
|
)
|
|
|
|
class _FakeExecutionService:
|
|
async def execute_approved_action(self, received_approval):
|
|
executed.append(str(received_approval.id))
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
telegram_api,
|
|
"get_execution_service",
|
|
lambda: _FakeExecutionService(),
|
|
)
|
|
|
|
scheduled = await telegram_api._finalize_telegram_approval(
|
|
approval=approval,
|
|
execution_triggered=True,
|
|
)
|
|
|
|
assert scheduled is False
|
|
await telegram_api.asyncio.sleep(0)
|
|
assert executed == []
|