Files
awoooi/apps/api/tests/test_channel_event_dossier_service.py
Your Name e947e60d11
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m3s
CD Pipeline / build-and-deploy (push) Successful in 3m47s
CD Pipeline / post-deploy-checks (push) Successful in 1m43s
fix(awooop): type dossier run filter
2026-05-13 22:08:00 +08:00

117 lines
3.5 KiB
Python

from __future__ import annotations
import pytest
from fastapi import HTTPException
from uuid import UUID
from src.services import channel_event_dossier_service
from src.services.channel_event_dossier_service import (
build_dossier_event,
fetch_channel_event_dossier,
)
def test_build_dossier_event_summarizes_source_envelope() -> None:
event = build_dossier_event({
"event_id": "event-1",
"project_id": "awoooi",
"channel_type": "internal",
"provider_event_id": "sentry:received:issue-1",
"content_hash": "h" * 64,
"content_preview": "Sentry issue",
"content_redacted": "Sentry issue redacted",
"redaction_version": "audit_sink_v1",
"source_envelope": {
"provider": "sentry",
"stage": "received",
"source_url": "https://sentry.example.invalid/issues/issue-1",
"content_sha256": "a" * 64,
"content_length": 42,
"source_refs": {
"event_ids": ["issue-1"],
"sentry_issue_ids": ["issue-1", "sentry:received:issue-1"],
"fingerprints": ["sentry-issue-1"],
},
"log_correlation": {
"alertname": "Sentry Issue",
"severity": "error",
"namespace": "sentry",
"target_resource": "frontend",
"fingerprint": "sentry-issue-1",
},
},
"is_duplicate": False,
"provider_ts": None,
"received_at": "2026-05-13T13:46:00",
})
assert event["provider"] == "sentry"
assert event["stage"] == "received"
assert event["alertname"] == "Sentry Issue"
assert event["severity"] == "error"
assert event["source_ref_count"] == 4
assert event["has_redacted_content"] is True
assert event["content_sha256"] == "a" * 64
@pytest.mark.asyncio
async def test_fetch_channel_event_dossier_requires_source() -> None:
with pytest.raises(HTTPException) as exc_info:
await fetch_channel_event_dossier(
project_id="awoooi",
run_id=None,
provider_event_id=None,
limit=20,
)
assert exc_info.value.status_code == 422
@pytest.mark.asyncio
async def test_fetch_channel_event_dossier_uses_typed_run_filter(monkeypatch) -> None:
captured: dict[str, object] = {}
class FakeMappings:
def all(self) -> list[dict[str, object]]:
return []
class FakeResult:
def mappings(self) -> FakeMappings:
return FakeMappings()
class FakeDb:
async def execute(self, statement, params): # noqa: ANN001
captured["sql"] = str(statement)
captured["params"] = params
return FakeResult()
class FakeContext:
async def __aenter__(self) -> FakeDb:
return FakeDb()
async def __aexit__(self, exc_type, exc, tb) -> None: # noqa: ANN001
return None
monkeypatch.setattr(
channel_event_dossier_service,
"get_db_context",
lambda _project_id: FakeContext(),
)
run_id = UUID("0a4c365f-609e-5441-bc29-4c7ebc3603b6")
result = await fetch_channel_event_dossier(
project_id="awoooi",
run_id=run_id,
provider_event_id=None,
limit=20,
)
assert result["total"] == 0
assert "run_id = CAST(:run_id AS uuid)" in str(captured["sql"])
assert ":run_id IS NULL" not in str(captured["sql"])
assert captured["params"] == {
"project_id": "awoooi",
"run_id": str(run_id),
"limit": 20,
}