99 lines
3.1 KiB
Python
99 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from src.api.v1.platform import events
|
|
from src.core.context import (
|
|
clear_project_context,
|
|
get_current_project_context,
|
|
set_project_context,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_source_correlation_review_uses_body_project_context(monkeypatch):
|
|
observed: dict[str, object] = {}
|
|
|
|
async def fake_fetch_source_correlation_review_decision(**kwargs):
|
|
observed["kwargs"] = kwargs
|
|
observed["context"] = get_current_project_context()
|
|
return {"review_record_status": "recorded"}
|
|
|
|
monkeypatch.setattr(
|
|
events,
|
|
"fetch_source_correlation_review_decision",
|
|
fake_fetch_source_correlation_review_decision,
|
|
)
|
|
|
|
tokens = set_project_context(
|
|
project_id=None,
|
|
source="request.project_id.missing",
|
|
request_id="request-123",
|
|
)
|
|
try:
|
|
response = await events.review_source_correlation_work_item(
|
|
events.SourceCorrelationReviewDecisionRequest(
|
|
project_id="awoooi",
|
|
work_item_id="source-evidence:sentry:canary",
|
|
decision="accepted",
|
|
target_incident_id="INC-20260505-25E744",
|
|
)
|
|
)
|
|
assert response == {"review_record_status": "recorded"}
|
|
assert observed["kwargs"]["project_id"] == "awoooi"
|
|
assert observed["context"] == {
|
|
"project_id": "awoooi",
|
|
"source": "request.body",
|
|
"request_id": "request-123",
|
|
}
|
|
assert get_current_project_context() == {
|
|
"project_id": None,
|
|
"source": "request.project_id.missing",
|
|
"request_id": "request-123",
|
|
}
|
|
finally:
|
|
clear_project_context(tokens)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_source_correlation_apply_uses_body_project_context(monkeypatch):
|
|
observed: dict[str, object] = {}
|
|
|
|
async def fake_fetch_source_correlation_apply(**kwargs):
|
|
observed["kwargs"] = kwargs
|
|
observed["context"] = get_current_project_context()
|
|
return {"apply_status": "applied"}
|
|
|
|
monkeypatch.setattr(
|
|
events,
|
|
"fetch_source_correlation_apply",
|
|
fake_fetch_source_correlation_apply,
|
|
)
|
|
|
|
tokens = set_project_context(
|
|
project_id=None,
|
|
source="request.project_id.missing",
|
|
request_id="request-456",
|
|
)
|
|
try:
|
|
response = await events.apply_source_correlation_work_item(
|
|
events.SourceCorrelationApplyRequest(
|
|
project_id="awoooi",
|
|
work_item_id="source-evidence:sentry:canary",
|
|
)
|
|
)
|
|
assert response == {"apply_status": "applied"}
|
|
assert observed["kwargs"]["project_id"] == "awoooi"
|
|
assert observed["context"] == {
|
|
"project_id": "awoooi",
|
|
"source": "request.body",
|
|
"request_id": "request-456",
|
|
}
|
|
assert get_current_project_context() == {
|
|
"project_id": None,
|
|
"source": "request.project_id.missing",
|
|
"request_id": "request-456",
|
|
}
|
|
finally:
|
|
clear_project_context(tokens)
|