Files
awoooi/apps/api/tests/test_emergency_escalation_service.py
Your Name 7795f027d2
Some checks failed
CD Pipeline / tests (push) Successful in 2m56s
Code Review / ai-code-review (push) Failing after 39s
CD Pipeline / build-and-deploy (push) Successful in 12m54s
CD Pipeline / post-deploy-checks (push) Successful in 4m40s
fix(aiops): persist emergency intervention traces
2026-05-01 20:34:33 +08:00

72 lines
2.1 KiB
Python

from types import SimpleNamespace
import pytest
from src.services import emergency_escalation_service as service
@pytest.mark.asyncio
async def test_drift_emergency_escalation_writes_aol_and_timeline(monkeypatch):
sent_cards = []
aol_calls = []
timeline_calls = []
async def fake_dedup(*args, **kwargs):
return True
class FakeGateway:
async def send_escalation_card(self, **kwargs):
sent_cards.append(kwargs)
class FakeRepo:
async def append(self, *args, **kwargs):
aol_calls.append((args, kwargs))
return object()
class FakeTimeline:
async def add_event(self, **kwargs):
timeline_calls.append(kwargs)
return {"id": "timeline-1"}
monkeypatch.setattr(service, "_dedup_first_send", fake_dedup)
monkeypatch.setattr(
"src.services.telegram_gateway.get_telegram_gateway",
lambda: FakeGateway(),
)
monkeypatch.setattr(
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
lambda: FakeRepo(),
)
monkeypatch.setattr(
"src.services.approval_db.get_timeline_service",
lambda: FakeTimeline(),
)
report = SimpleNamespace(
report_id="drift-123",
namespace="awoooi-prod",
high_count=1,
medium_count=2,
items=[
SimpleNamespace(is_allowlisted=False),
SimpleNamespace(is_allowlisted=True),
],
)
interpretation = SimpleNamespace(
intent=SimpleNamespace(value="emergency_hotfix"),
confidence=0.72,
risk="high",
)
await service.escalate_drift_auto_adopt_blocked(
report=report,
reason="unsafe drift",
interpretation=interpretation,
)
assert sent_cards and sent_cards[0]["incident_id"] == "drift-123"
assert aol_calls and aol_calls[0][0][0] == "APPROVAL_ESCALATED"
assert aol_calls[0][1]["actor"] == "drift_auto_adopt"
assert aol_calls[0][1]["context"]["intent"] == "emergency_hotfix"
assert timeline_calls and timeline_calls[0]["actor_role"] == "emergency_intervention"