fix(telegram): execute approved callbacks
All checks were successful
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / tests (push) Successful in 1m10s
CD Pipeline / build-and-deploy (push) Successful in 3m39s
CD Pipeline / post-deploy-checks (push) Successful in 1m31s

This commit is contained in:
Your Name
2026-05-17 23:54:50 +08:00
parent ba971e7a29
commit 913e1abcfa
2 changed files with 260 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ Endpoints:
- 每個 Nonce 只能使用一次
"""
import asyncio
from uuid import UUID
from fastapi import APIRouter, HTTPException, status
@@ -27,6 +28,8 @@ from pydantic import BaseModel
from src.core.config import settings
from src.core.logging import get_logger
from src.services.approval_db import get_approval_service
from src.services.approval_execution import get_execution_service
from src.services.incident_approval_service import get_incident_approval_service
from src.services.security_interceptor import (
NonceReplayError,
UserNotWhitelistedError,
@@ -64,6 +67,80 @@ class TestPushRequest(BaseModel):
incident_id: str = ""
async def _run_telegram_approved_execution(approval) -> None:
"""Run the approved action that originated from a Telegram callback."""
approval_id = str(getattr(approval, "id", ""))
incident_id = getattr(approval, "incident_id", None)
try:
result = await get_execution_service().execute_approved_action(approval)
logger.info(
"telegram_approval_execution_completed",
approval_id=approval_id,
incident_id=incident_id,
success=bool(result),
)
except Exception as exc:
logger.error(
"telegram_approval_execution_failed",
approval_id=approval_id,
incident_id=incident_id,
error=str(exc),
)
def _schedule_telegram_approved_execution(approval) -> bool:
"""Schedule execution after Telegram approval reaches required signatures."""
try:
asyncio.create_task(_run_telegram_approved_execution(approval))
logger.info(
"telegram_approval_execution_scheduled",
approval_id=str(getattr(approval, "id", "")),
incident_id=getattr(approval, "incident_id", None),
)
return True
except Exception as exc:
logger.error(
"telegram_approval_execution_schedule_failed",
approval_id=str(getattr(approval, "id", "")),
incident_id=getattr(approval, "incident_id", None),
error=str(exc),
)
return False
async def _finalize_telegram_approval(approval, execution_triggered: bool) -> bool:
"""Complete the execution handoff for Telegram approvals.
ApprovalDBService only records the signature/status transition. The actual
executor scheduling lives in API callers, so Telegram must mirror the REST
approval endpoint instead of stopping at a visual approval stamp.
"""
if not execution_triggered:
return False
return _schedule_telegram_approved_execution(approval)
async def _sync_telegram_rejection(approval_id: str) -> bool:
"""Keep Incident state aligned when an approval is rejected from Telegram."""
try:
await get_incident_approval_service().on_approval_status_change(
approval_id=approval_id,
new_status="rejected",
)
logger.info(
"telegram_rejection_incident_synced",
approval_id=approval_id,
)
return True
except Exception as exc:
logger.error(
"telegram_rejection_incident_sync_failed",
approval_id=approval_id,
error=str(exc),
)
return False
# =============================================================================
# Endpoints
# =============================================================================
@@ -198,12 +275,17 @@ async def telegram_webhook(
)
if approval:
execution_scheduled = await _finalize_telegram_approval(
approval=approval,
execution_triggered=execution_triggered,
)
logger.info(
"telegram_approval_signed",
approval_id=approval_id,
user_id=user_id,
status=approval.status.value,
execution_triggered=execution_triggered,
execution_scheduled=execution_scheduled,
)
await _log_user_action("approve", True, getattr(approval, "incident_id", None))
@@ -213,6 +295,7 @@ async def telegram_webhook(
"approval_id": approval_id,
"status": approval.status.value,
"execution_triggered": execution_triggered,
"execution_scheduled": execution_scheduled,
}
elif action == "reject":
@@ -224,10 +307,12 @@ async def telegram_webhook(
)
if approval:
incident_synced = await _sync_telegram_rejection(approval_id)
logger.info(
"telegram_approval_rejected",
approval_id=approval_id,
user_id=user_id,
incident_synced=incident_synced,
)
await _log_user_action("reject", False, getattr(approval, "incident_id", None))
@@ -236,6 +321,7 @@ async def telegram_webhook(
"message": "Rejected",
"approval_id": approval_id,
"status": approval.status.value,
"incident_synced": incident_synced,
}
return {"ok": False, "message": "Unknown action"}

View File

@@ -0,0 +1,174 @@
from types import SimpleNamespace
from uuid import UUID
import pytest
from src.api.v1 import telegram as telegram_api
class _FakeGateway:
def __init__(self, result: dict) -> None:
self.result = result
async def handle_callback(self, **_kwargs):
return self.result
class _FakeApprovalService:
def __init__(self, approval, execution_triggered: bool) -> None:
self.approval = approval
self.execution_triggered = execution_triggered
async def sign_approval(self, **_kwargs):
return self.approval, "Approval complete", self.execution_triggered
async def reject_approval(self, **_kwargs):
return self.approval, "Approval rejected"
class _FakeAlertOperationLogRepository:
def __init__(self) -> None:
self.rows: list[dict] = []
async def append(self, *args, **kwargs):
self.rows.append({"args": args, "kwargs": kwargs})
def _callback_update(callback_data: str) -> telegram_api.TelegramUpdate:
return telegram_api.TelegramUpdate(
update_id=123,
callback_query={
"id": "callback-1",
"data": callback_data,
"from": {"id": 42, "username": "ops"},
"message": {"message_id": 99, "text": "ACTION REQUIRED"},
},
)
@pytest.mark.asyncio
async def test_telegram_approval_schedules_executor_after_required_signature(monkeypatch):
approval_id = "11111111-1111-1111-1111-111111111111"
approval = SimpleNamespace(
id=UUID(approval_id),
status=SimpleNamespace(value="approved"),
incident_id="INC-20260513-TGEXEC",
)
finalizer_calls: list[dict] = []
op_log_repo = _FakeAlertOperationLogRepository()
async def fake_finalize(*, approval, execution_triggered: bool) -> bool:
finalizer_calls.append({
"approval_id": str(approval.id),
"incident_id": approval.incident_id,
"execution_triggered": execution_triggered,
})
return True
monkeypatch.setattr(
telegram_api,
"get_telegram_gateway",
lambda: _FakeGateway({
"success": True,
"action": "approve",
"approval_id": approval_id,
"user": {"id": 42, "username": "ops"},
}),
)
monkeypatch.setattr(
telegram_api,
"get_approval_service",
lambda: _FakeApprovalService(approval, execution_triggered=True),
)
monkeypatch.setattr(telegram_api, "_finalize_telegram_approval", fake_finalize)
monkeypatch.setattr(
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
lambda: op_log_repo,
)
result = await telegram_api.telegram_webhook(_callback_update(f"approve:{approval_id}:ts:nonce"))
assert result["ok"] is True
assert result["message"] == "Approved"
assert result["execution_triggered"] is True
assert result["execution_scheduled"] is True
assert finalizer_calls == [{
"approval_id": approval_id,
"incident_id": "INC-20260513-TGEXEC",
"execution_triggered": True,
}]
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "approve"
@pytest.mark.asyncio
async def test_telegram_rejection_syncs_incident_state(monkeypatch):
approval_id = "22222222-2222-2222-2222-222222222222"
approval = SimpleNamespace(
id=UUID(approval_id),
status=SimpleNamespace(value="rejected"),
incident_id="INC-20260513-TGREJ",
)
sync_calls: list[str] = []
op_log_repo = _FakeAlertOperationLogRepository()
async def fake_sync(rejected_approval_id: str) -> bool:
sync_calls.append(rejected_approval_id)
return True
monkeypatch.setattr(
telegram_api,
"get_telegram_gateway",
lambda: _FakeGateway({
"success": True,
"action": "reject",
"approval_id": approval_id,
"user": {"id": 42, "username": "ops"},
}),
)
monkeypatch.setattr(
telegram_api,
"get_approval_service",
lambda: _FakeApprovalService(approval, execution_triggered=False),
)
monkeypatch.setattr(telegram_api, "_sync_telegram_rejection", fake_sync)
monkeypatch.setattr(
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
lambda: op_log_repo,
)
result = await telegram_api.telegram_webhook(_callback_update(f"reject:{approval_id}:ts:nonce"))
assert result["ok"] is True
assert result["message"] == "Rejected"
assert result["incident_synced"] is True
assert sync_calls == [approval_id]
assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "reject"
@pytest.mark.asyncio
async def test_finalize_telegram_approval_runs_executor_task(monkeypatch):
executed: list[str] = []
approval = SimpleNamespace(
id=UUID("33333333-3333-3333-3333-333333333333"),
incident_id="INC-20260513-TGRUN",
)
class _FakeExecutionService:
async def execute_approved_action(self, received_approval):
executed.append(str(received_approval.id))
return True
monkeypatch.setattr(
telegram_api,
"get_execution_service",
lambda: _FakeExecutionService(),
)
scheduled = await telegram_api._finalize_telegram_approval(
approval=approval,
execution_triggered=True,
)
assert scheduled is True
await telegram_api.asyncio.sleep(0)
assert executed == ["33333333-3333-3333-3333-333333333333"]