diff --git a/apps/api/src/api/v1/telegram.py b/apps/api/src/api/v1/telegram.py index c0f9a70a..83236fbe 100644 --- a/apps/api/src/api/v1/telegram.py +++ b/apps/api/src/api/v1/telegram.py @@ -19,6 +19,7 @@ Endpoints: - 每個 Nonce 只能使用一次 """ +import asyncio from uuid import UUID from fastapi import APIRouter, HTTPException, status @@ -27,6 +28,8 @@ from pydantic import BaseModel from src.core.config import settings from src.core.logging import get_logger from src.services.approval_db import get_approval_service +from src.services.approval_execution import get_execution_service +from src.services.incident_approval_service import get_incident_approval_service from src.services.security_interceptor import ( NonceReplayError, UserNotWhitelistedError, @@ -64,6 +67,80 @@ class TestPushRequest(BaseModel): incident_id: str = "" +async def _run_telegram_approved_execution(approval) -> None: + """Run the approved action that originated from a Telegram callback.""" + approval_id = str(getattr(approval, "id", "")) + incident_id = getattr(approval, "incident_id", None) + try: + result = await get_execution_service().execute_approved_action(approval) + logger.info( + "telegram_approval_execution_completed", + approval_id=approval_id, + incident_id=incident_id, + success=bool(result), + ) + except Exception as exc: + logger.error( + "telegram_approval_execution_failed", + approval_id=approval_id, + incident_id=incident_id, + error=str(exc), + ) + + +def _schedule_telegram_approved_execution(approval) -> bool: + """Schedule execution after Telegram approval reaches required signatures.""" + try: + asyncio.create_task(_run_telegram_approved_execution(approval)) + logger.info( + "telegram_approval_execution_scheduled", + approval_id=str(getattr(approval, "id", "")), + incident_id=getattr(approval, "incident_id", None), + ) + return True + except Exception as exc: + logger.error( + "telegram_approval_execution_schedule_failed", + approval_id=str(getattr(approval, "id", "")), + incident_id=getattr(approval, "incident_id", None), + error=str(exc), + ) + return False + + +async def _finalize_telegram_approval(approval, execution_triggered: bool) -> bool: + """Complete the execution handoff for Telegram approvals. + + ApprovalDBService only records the signature/status transition. The actual + executor scheduling lives in API callers, so Telegram must mirror the REST + approval endpoint instead of stopping at a visual approval stamp. + """ + if not execution_triggered: + return False + return _schedule_telegram_approved_execution(approval) + + +async def _sync_telegram_rejection(approval_id: str) -> bool: + """Keep Incident state aligned when an approval is rejected from Telegram.""" + try: + await get_incident_approval_service().on_approval_status_change( + approval_id=approval_id, + new_status="rejected", + ) + logger.info( + "telegram_rejection_incident_synced", + approval_id=approval_id, + ) + return True + except Exception as exc: + logger.error( + "telegram_rejection_incident_sync_failed", + approval_id=approval_id, + error=str(exc), + ) + return False + + # ============================================================================= # Endpoints # ============================================================================= @@ -198,12 +275,17 @@ async def telegram_webhook( ) if approval: + execution_scheduled = await _finalize_telegram_approval( + approval=approval, + execution_triggered=execution_triggered, + ) logger.info( "telegram_approval_signed", approval_id=approval_id, user_id=user_id, status=approval.status.value, execution_triggered=execution_triggered, + execution_scheduled=execution_scheduled, ) await _log_user_action("approve", True, getattr(approval, "incident_id", None)) @@ -213,6 +295,7 @@ async def telegram_webhook( "approval_id": approval_id, "status": approval.status.value, "execution_triggered": execution_triggered, + "execution_scheduled": execution_scheduled, } elif action == "reject": @@ -224,10 +307,12 @@ async def telegram_webhook( ) if approval: + incident_synced = await _sync_telegram_rejection(approval_id) logger.info( "telegram_approval_rejected", approval_id=approval_id, user_id=user_id, + incident_synced=incident_synced, ) await _log_user_action("reject", False, getattr(approval, "incident_id", None)) @@ -236,6 +321,7 @@ async def telegram_webhook( "message": "Rejected", "approval_id": approval_id, "status": approval.status.value, + "incident_synced": incident_synced, } return {"ok": False, "message": "Unknown action"} diff --git a/apps/api/tests/test_telegram_webhook_execution_handoff.py b/apps/api/tests/test_telegram_webhook_execution_handoff.py new file mode 100644 index 00000000..697a5c79 --- /dev/null +++ b/apps/api/tests/test_telegram_webhook_execution_handoff.py @@ -0,0 +1,174 @@ +from types import SimpleNamespace +from uuid import UUID + +import pytest + +from src.api.v1 import telegram as telegram_api + + +class _FakeGateway: + def __init__(self, result: dict) -> None: + self.result = result + + async def handle_callback(self, **_kwargs): + return self.result + + +class _FakeApprovalService: + def __init__(self, approval, execution_triggered: bool) -> None: + self.approval = approval + self.execution_triggered = execution_triggered + + async def sign_approval(self, **_kwargs): + return self.approval, "Approval complete", self.execution_triggered + + async def reject_approval(self, **_kwargs): + return self.approval, "Approval rejected" + + +class _FakeAlertOperationLogRepository: + def __init__(self) -> None: + self.rows: list[dict] = [] + + async def append(self, *args, **kwargs): + self.rows.append({"args": args, "kwargs": kwargs}) + + +def _callback_update(callback_data: str) -> telegram_api.TelegramUpdate: + return telegram_api.TelegramUpdate( + update_id=123, + callback_query={ + "id": "callback-1", + "data": callback_data, + "from": {"id": 42, "username": "ops"}, + "message": {"message_id": 99, "text": "ACTION REQUIRED"}, + }, + ) + + +@pytest.mark.asyncio +async def test_telegram_approval_schedules_executor_after_required_signature(monkeypatch): + approval_id = "11111111-1111-1111-1111-111111111111" + approval = SimpleNamespace( + id=UUID(approval_id), + status=SimpleNamespace(value="approved"), + incident_id="INC-20260513-TGEXEC", + ) + finalizer_calls: list[dict] = [] + op_log_repo = _FakeAlertOperationLogRepository() + + async def fake_finalize(*, approval, execution_triggered: bool) -> bool: + finalizer_calls.append({ + "approval_id": str(approval.id), + "incident_id": approval.incident_id, + "execution_triggered": execution_triggered, + }) + return True + + monkeypatch.setattr( + telegram_api, + "get_telegram_gateway", + lambda: _FakeGateway({ + "success": True, + "action": "approve", + "approval_id": approval_id, + "user": {"id": 42, "username": "ops"}, + }), + ) + monkeypatch.setattr( + telegram_api, + "get_approval_service", + lambda: _FakeApprovalService(approval, execution_triggered=True), + ) + monkeypatch.setattr(telegram_api, "_finalize_telegram_approval", fake_finalize) + monkeypatch.setattr( + "src.repositories.alert_operation_log_repository.get_alert_operation_log_repository", + lambda: op_log_repo, + ) + + result = await telegram_api.telegram_webhook(_callback_update(f"approve:{approval_id}:ts:nonce")) + + assert result["ok"] is True + assert result["message"] == "Approved" + assert result["execution_triggered"] is True + assert result["execution_scheduled"] is True + assert finalizer_calls == [{ + "approval_id": approval_id, + "incident_id": "INC-20260513-TGEXEC", + "execution_triggered": True, + }] + assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "approve" + + +@pytest.mark.asyncio +async def test_telegram_rejection_syncs_incident_state(monkeypatch): + approval_id = "22222222-2222-2222-2222-222222222222" + approval = SimpleNamespace( + id=UUID(approval_id), + status=SimpleNamespace(value="rejected"), + incident_id="INC-20260513-TGREJ", + ) + sync_calls: list[str] = [] + op_log_repo = _FakeAlertOperationLogRepository() + + async def fake_sync(rejected_approval_id: str) -> bool: + sync_calls.append(rejected_approval_id) + return True + + monkeypatch.setattr( + telegram_api, + "get_telegram_gateway", + lambda: _FakeGateway({ + "success": True, + "action": "reject", + "approval_id": approval_id, + "user": {"id": 42, "username": "ops"}, + }), + ) + monkeypatch.setattr( + telegram_api, + "get_approval_service", + lambda: _FakeApprovalService(approval, execution_triggered=False), + ) + monkeypatch.setattr(telegram_api, "_sync_telegram_rejection", fake_sync) + monkeypatch.setattr( + "src.repositories.alert_operation_log_repository.get_alert_operation_log_repository", + lambda: op_log_repo, + ) + + result = await telegram_api.telegram_webhook(_callback_update(f"reject:{approval_id}:ts:nonce")) + + assert result["ok"] is True + assert result["message"] == "Rejected" + assert result["incident_synced"] is True + assert sync_calls == [approval_id] + assert op_log_repo.rows[0]["kwargs"]["action_detail"] == "reject" + + +@pytest.mark.asyncio +async def test_finalize_telegram_approval_runs_executor_task(monkeypatch): + executed: list[str] = [] + approval = SimpleNamespace( + id=UUID("33333333-3333-3333-3333-333333333333"), + incident_id="INC-20260513-TGRUN", + ) + + class _FakeExecutionService: + async def execute_approved_action(self, received_approval): + executed.append(str(received_approval.id)) + return True + + monkeypatch.setattr( + telegram_api, + "get_execution_service", + lambda: _FakeExecutionService(), + ) + + scheduled = await telegram_api._finalize_telegram_approval( + approval=approval, + execution_triggered=True, + ) + + assert scheduled is True + await telegram_api.asyncio.sleep(0) + assert executed == ["33333333-3333-3333-3333-333333333333"]