Files
awoooi/apps/api/tests/test_emergency_escalation_service.py
Your Name 0367dde686
All checks were successful
Code Review / ai-code-review (push) Successful in 9s
CD Pipeline / tests (push) Successful in 1m4s
CD Pipeline / build-and-deploy (push) Successful in 3m41s
CD Pipeline / post-deploy-checks (push) Successful in 1m39s
fix(drift): dedupe blocked auto-adopt escalations
2026-05-19 00:13:41 +08:00

78 lines
2.4 KiB
Python

from types import SimpleNamespace
import pytest
from src.services import emergency_escalation_service as service
@pytest.mark.asyncio
async def test_drift_emergency_escalation_writes_aol_and_timeline(monkeypatch):
sent_cards = []
aol_calls = []
timeline_calls = []
dedup_calls = []
async def fake_dedup(*args, **kwargs):
dedup_calls.append((args, kwargs))
return True
class FakeGateway:
async def send_escalation_card(self, **kwargs):
sent_cards.append(kwargs)
class FakeRepo:
async def append(self, *args, **kwargs):
aol_calls.append((args, kwargs))
return object()
class FakeTimeline:
async def add_event(self, **kwargs):
timeline_calls.append(kwargs)
return {"id": "timeline-1"}
monkeypatch.setattr(service, "_dedup_first_send", fake_dedup)
monkeypatch.setattr(
"src.services.telegram_gateway.get_telegram_gateway",
lambda: FakeGateway(),
)
monkeypatch.setattr(
"src.repositories.alert_operation_log_repository.get_alert_operation_log_repository",
lambda: FakeRepo(),
)
monkeypatch.setattr(
"src.services.approval_db.get_timeline_service",
lambda: FakeTimeline(),
)
report = SimpleNamespace(
report_id="drift-123",
namespace="awoooi-prod",
high_count=1,
medium_count=2,
items=[
SimpleNamespace(is_allowlisted=False),
SimpleNamespace(is_allowlisted=True),
],
)
interpretation = SimpleNamespace(
intent=SimpleNamespace(value="emergency_hotfix"),
confidence=0.72,
risk="high",
)
await service.escalate_drift_auto_adopt_blocked(
report=report,
reason="unsafe drift",
interpretation=interpretation,
)
assert sent_cards and sent_cards[0]["incident_id"] == "drift-123"
assert "fingerprint=dfp_" in sent_cards[0]["current_impact"]
assert dedup_calls[0][0][0].startswith("drift:auto_adopt_emergency:fp:dfp_")
assert dedup_calls[0][1]["ttl"] == 86400
assert aol_calls and aol_calls[0][0][0] == "APPROVAL_ESCALATED"
assert aol_calls[0][1]["actor"] == "drift_auto_adopt"
assert aol_calls[0][1]["context"]["intent"] == "emergency_hotfix"
assert aol_calls[0][1]["context"]["fingerprint"].startswith("dfp_")
assert timeline_calls and timeline_calls[0]["actor_role"] == "emergency_intervention"