fix(telegram): execute approved callbacks
This commit is contained in:
@@ -19,6 +19,7 @@ Endpoints:
|
||||
- 每個 Nonce 只能使用一次
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status
|
||||
@@ -27,6 +28,8 @@ from pydantic import BaseModel
|
||||
from src.core.config import settings
|
||||
from src.core.logging import get_logger
|
||||
from src.services.approval_db import get_approval_service
|
||||
from src.services.approval_execution import get_execution_service
|
||||
from src.services.incident_approval_service import get_incident_approval_service
|
||||
from src.services.security_interceptor import (
|
||||
NonceReplayError,
|
||||
UserNotWhitelistedError,
|
||||
@@ -64,6 +67,80 @@ class TestPushRequest(BaseModel):
|
||||
incident_id: str = ""
|
||||
|
||||
|
||||
async def _run_telegram_approved_execution(approval) -> None:
|
||||
"""Run the approved action that originated from a Telegram callback."""
|
||||
approval_id = str(getattr(approval, "id", ""))
|
||||
incident_id = getattr(approval, "incident_id", None)
|
||||
try:
|
||||
result = await get_execution_service().execute_approved_action(approval)
|
||||
logger.info(
|
||||
"telegram_approval_execution_completed",
|
||||
approval_id=approval_id,
|
||||
incident_id=incident_id,
|
||||
success=bool(result),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"telegram_approval_execution_failed",
|
||||
approval_id=approval_id,
|
||||
incident_id=incident_id,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
|
||||
def _schedule_telegram_approved_execution(approval) -> bool:
|
||||
"""Schedule execution after Telegram approval reaches required signatures."""
|
||||
try:
|
||||
asyncio.create_task(_run_telegram_approved_execution(approval))
|
||||
logger.info(
|
||||
"telegram_approval_execution_scheduled",
|
||||
approval_id=str(getattr(approval, "id", "")),
|
||||
incident_id=getattr(approval, "incident_id", None),
|
||||
)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"telegram_approval_execution_schedule_failed",
|
||||
approval_id=str(getattr(approval, "id", "")),
|
||||
incident_id=getattr(approval, "incident_id", None),
|
||||
error=str(exc),
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def _finalize_telegram_approval(approval, execution_triggered: bool) -> bool:
|
||||
"""Complete the execution handoff for Telegram approvals.
|
||||
|
||||
ApprovalDBService only records the signature/status transition. The actual
|
||||
executor scheduling lives in API callers, so Telegram must mirror the REST
|
||||
approval endpoint instead of stopping at a visual approval stamp.
|
||||
"""
|
||||
if not execution_triggered:
|
||||
return False
|
||||
return _schedule_telegram_approved_execution(approval)
|
||||
|
||||
|
||||
async def _sync_telegram_rejection(approval_id: str) -> bool:
|
||||
"""Keep Incident state aligned when an approval is rejected from Telegram."""
|
||||
try:
|
||||
await get_incident_approval_service().on_approval_status_change(
|
||||
approval_id=approval_id,
|
||||
new_status="rejected",
|
||||
)
|
||||
logger.info(
|
||||
"telegram_rejection_incident_synced",
|
||||
approval_id=approval_id,
|
||||
)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"telegram_rejection_incident_sync_failed",
|
||||
approval_id=approval_id,
|
||||
error=str(exc),
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Endpoints
|
||||
# =============================================================================
|
||||
@@ -198,12 +275,17 @@ async def telegram_webhook(
|
||||
)
|
||||
|
||||
if approval:
|
||||
execution_scheduled = await _finalize_telegram_approval(
|
||||
approval=approval,
|
||||
execution_triggered=execution_triggered,
|
||||
)
|
||||
logger.info(
|
||||
"telegram_approval_signed",
|
||||
approval_id=approval_id,
|
||||
user_id=user_id,
|
||||
status=approval.status.value,
|
||||
execution_triggered=execution_triggered,
|
||||
execution_scheduled=execution_scheduled,
|
||||
)
|
||||
await _log_user_action("approve", True, getattr(approval, "incident_id", None))
|
||||
|
||||
@@ -213,6 +295,7 @@ async def telegram_webhook(
|
||||
"approval_id": approval_id,
|
||||
"status": approval.status.value,
|
||||
"execution_triggered": execution_triggered,
|
||||
"execution_scheduled": execution_scheduled,
|
||||
}
|
||||
|
||||
elif action == "reject":
|
||||
@@ -224,10 +307,12 @@ async def telegram_webhook(
|
||||
)
|
||||
|
||||
if approval:
|
||||
incident_synced = await _sync_telegram_rejection(approval_id)
|
||||
logger.info(
|
||||
"telegram_approval_rejected",
|
||||
approval_id=approval_id,
|
||||
user_id=user_id,
|
||||
incident_synced=incident_synced,
|
||||
)
|
||||
await _log_user_action("reject", False, getattr(approval, "incident_id", None))
|
||||
|
||||
@@ -236,6 +321,7 @@ async def telegram_webhook(
|
||||
"message": "Rejected",
|
||||
"approval_id": approval_id,
|
||||
"status": approval.status.value,
|
||||
"incident_synced": incident_synced,
|
||||
}
|
||||
|
||||
return {"ok": False, "message": "Unknown action"}
|
||||
|
||||
174
apps/api/tests/test_telegram_webhook_execution_handoff.py
Normal file
174
apps/api/tests/test_telegram_webhook_execution_handoff.py
Normal file
@@ -0,0 +1,174 @@
|
||||
from types import SimpleNamespace
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from src.api.v1 import telegram as telegram_api
|
||||
|
||||
|
||||
class _FakeGateway:
|
||||
def __init__(self, result: dict) -> None:
|
||||
self.result = result
|
||||
|
||||
async def handle_callback(self, **_kwargs):
|
||||
return self.result
|
||||
|
||||
|
||||
class _FakeApprovalService:
|
||||
def __init__(self, approval, execution_triggered: bool) -> None:
|
||||
self.approval = approval
|
||||
self.execution_triggered = execution_triggered
|
||||
|
||||
async def sign_approval(self, **_kwargs):
|
||||
return self.approval, "Approval complete", self.execution_triggered
|
||||
|
||||
async def reject_approval(self, **_kwargs):
|
||||
return self.approval, "Approval rejected"
|
||||
|
||||
|
||||
class _FakeAlertOperationLogRepository:
|
||||
def __init__(self) -> None:
|
||||
self.rows: list[dict] = []
|
||||
|
||||
async def append(self, *args, **kwargs):
|
||||
self.rows.append({"args": args, "kwargs": kwargs})
|
||||
|
||||
|
||||
def _callback_update(callback_data: str) -> telegram_api.TelegramUpdate:
|
||||
return telegram_api.TelegramUpdate(
|
||||
update_id=123,
|
||||
callback_query={
|
||||
"id": "callback-1",
|
||||
"data": callback_data,
|
||||
"from": {"id": 42, "username": "ops"},
|
||||
"message": {"message_id": 99, "text": "ACTION REQUIRED"},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telegram_approval_schedules_executor_after_required_signature(monkeypatch):
|
||||
approval_id = "11111111-1111-1111-1111-111111111111"
|
||||
approval = SimpleNamespace(
|
||||
id=UUID(approval_id),
|
||||
status=SimpleNamespace(value="approved"),
|
||||
incident_id="INC-20260513-TGEXEC",
|
||||
)
|
||||
finalizer_calls: list[dict] = []
|
||||
op_log_repo = _FakeAlertOperationLogRepository()
|
||||
|
||||
async def fake_finalize(*, approval, execution_triggered: bool) -> bool:
|
||||
finalizer_calls.append({
|
||||
"approval_id": str(approval.id),
|
||||
"incident_id": approval.incident_id,
|
||||
"execution_triggered": execution_triggered,
|
||||
})
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(
|
||||
telegram_api,
|
||||
"get_telegram_gateway",
|
||||
lambda: _FakeGateway({
|
||||
"success": True,
|
||||
"action": "approve",
|
||||
"approval_id": approval_id,
|
||||
"user": {"id": 42, "username": "ops"},
|
||||
}),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
telegram_api,
|
||||
"get_approval_service",
|
||||
lambda: _FakeApprovalService(approval, execution_triggered=True),
|
||||
)
|
||||
monkeypatch.setattr(telegram_api, "_finalize_telegram_approval", fake_finalize)
|
||||
monkeypatch.setattr(
|
||||
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
|
||||
lambda: op_log_repo,
|
||||
)
|
||||
|
||||
result = await telegram_api.telegram_webhook(_callback_update(f"approve:{approval_id}:ts:nonce"))
|
||||
|
||||
assert result["ok"] is True
|
||||
assert result["message"] == "Approved"
|
||||
assert result["execution_triggered"] is True
|
||||
assert result["execution_scheduled"] is True
|
||||
assert finalizer_calls == [{
|
||||
"approval_id": approval_id,
|
||||
"incident_id": "INC-20260513-TGEXEC",
|
||||
"execution_triggered": True,
|
||||
}]
|
||||
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "approve"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telegram_rejection_syncs_incident_state(monkeypatch):
|
||||
approval_id = "22222222-2222-2222-2222-222222222222"
|
||||
approval = SimpleNamespace(
|
||||
id=UUID(approval_id),
|
||||
status=SimpleNamespace(value="rejected"),
|
||||
incident_id="INC-20260513-TGREJ",
|
||||
)
|
||||
sync_calls: list[str] = []
|
||||
op_log_repo = _FakeAlertOperationLogRepository()
|
||||
|
||||
async def fake_sync(rejected_approval_id: str) -> bool:
|
||||
sync_calls.append(rejected_approval_id)
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(
|
||||
telegram_api,
|
||||
"get_telegram_gateway",
|
||||
lambda: _FakeGateway({
|
||||
"success": True,
|
||||
"action": "reject",
|
||||
"approval_id": approval_id,
|
||||
"user": {"id": 42, "username": "ops"},
|
||||
}),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
telegram_api,
|
||||
"get_approval_service",
|
||||
lambda: _FakeApprovalService(approval, execution_triggered=False),
|
||||
)
|
||||
monkeypatch.setattr(telegram_api, "_sync_telegram_rejection", fake_sync)
|
||||
monkeypatch.setattr(
|
||||
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
|
||||
lambda: op_log_repo,
|
||||
)
|
||||
|
||||
result = await telegram_api.telegram_webhook(_callback_update(f"reject:{approval_id}:ts:nonce"))
|
||||
|
||||
assert result["ok"] is True
|
||||
assert result["message"] == "Rejected"
|
||||
assert result["incident_synced"] is True
|
||||
assert sync_calls == [approval_id]
|
||||
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "reject"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_finalize_telegram_approval_runs_executor_task(monkeypatch):
|
||||
executed: list[str] = []
|
||||
approval = SimpleNamespace(
|
||||
id=UUID("33333333-3333-3333-3333-333333333333"),
|
||||
incident_id="INC-20260513-TGRUN",
|
||||
)
|
||||
|
||||
class _FakeExecutionService:
|
||||
async def execute_approved_action(self, received_approval):
|
||||
executed.append(str(received_approval.id))
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(
|
||||
telegram_api,
|
||||
"get_execution_service",
|
||||
lambda: _FakeExecutionService(),
|
||||
)
|
||||
|
||||
scheduled = await telegram_api._finalize_telegram_approval(
|
||||
approval=approval,
|
||||
execution_triggered=True,
|
||||
)
|
||||
|
||||
assert scheduled is True
|
||||
await telegram_api.asyncio.sleep(0)
|
||||
assert executed == ["33333333-3333-3333-3333-333333333333"]
|
||||
Reference in New Issue
Block a user