Files
awoooi/apps/api/tests/test_source_provider_upstream_canary.py
Your Name f3fbd39898
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 5m50s
CD Pipeline / build-and-deploy (push) Successful in 3m58s
CD Pipeline / post-deploy-checks (push) Successful in 1m48s
feat(awooop): add provider upstream canary
2026-05-20 20:48:36 +08:00

171 lines
5.2 KiB
Python

from __future__ import annotations
import json
from uuid import uuid4
import pytest
from fastapi import BackgroundTasks, HTTPException
from starlette.requests import Request
def _request_with_json(payload: dict, headers: dict[str, str] | None = None) -> Request:
body = json.dumps(payload).encode("utf-8")
raw_headers = [
(key.lower().encode("latin-1"), value.encode("latin-1"))
for key, value in (headers or {}).items()
]
async def receive() -> dict:
return {"type": "http.request", "body": body, "more_body": False}
return Request(
{
"type": "http",
"method": "POST",
"path": "/",
"headers": raw_headers,
},
receive,
)
def _operator_headers() -> dict[str, str]:
return {
"X-AwoooP-Operator-Id": "gitea-e2e-health",
"X-AwoooP-Operator-Key": "secret",
}
def _sentry_canary_payload() -> dict:
return {
"action": "triggered",
"data": {
"issue": {
"id": "awoooi-canary-test-run",
"shortId": "AWOOOI-CANARY",
"title": "AwoooPSourceProviderCanary",
"culprit": "source-provider-ingestion",
"level": "info",
"project": {"slug": "awoooi"},
},
"event": {
"message": "AwoooP upstream source provider canary",
"tags": [["awoooi_canary", "true"], ["run_ref", "test-run"]],
},
},
}
@pytest.mark.asyncio
async def test_sentry_upstream_canary_records_without_signature(monkeypatch) -> None:
from src.api.v1 import sentry_webhook
from src.core.config import settings
calls: list[dict] = []
async def fake_record_external_alert_event(**kwargs):
calls.append(kwargs)
return uuid4()
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
monkeypatch.setattr(settings, "AWOOOP_OPERATOR_API_KEY", "secret")
monkeypatch.setattr(
sentry_webhook,
"record_external_alert_event",
fake_record_external_alert_event,
)
result = await sentry_webhook.handle_sentry_error(
_request_with_json(_sentry_canary_payload(), _operator_headers()),
BackgroundTasks(),
)
assert result["status"] == "canary_recorded"
assert result["side_effects"] == {
"incident_created": False,
"approval_created": False,
"telegram_sent": False,
"openclaw_called": False,
}
assert calls[0]["provider"] == "sentry"
assert calls[0]["stage"] == "upstream_canary"
assert calls[0]["event_id"] == "awoooi-canary-test-run"
assert calls[0]["labels"]["incident"] == "not_created"
@pytest.mark.asyncio
async def test_sentry_non_canary_still_requires_signature(monkeypatch) -> None:
from src.api.v1 import sentry_webhook
from src.core.config import settings
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
monkeypatch.setattr(settings, "SENTRY_WEBHOOK_SECRET", "required-secret")
payload = _sentry_canary_payload()
payload["data"]["issue"]["id"] = "real-issue-1"
payload["data"]["issue"]["shortId"] = "REAL-1"
payload["data"]["issue"]["title"] = "Real issue"
payload["data"]["event"]["tags"] = []
with pytest.raises(HTTPException) as exc_info:
await sentry_webhook.handle_sentry_error(
_request_with_json(payload, _operator_headers()),
BackgroundTasks(),
)
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_signoz_upstream_canary_records_without_incident_side_effects(monkeypatch) -> None:
from src.api.v1 import signoz_webhook
from src.core.config import settings
calls: list[dict] = []
async def fake_record_external_alert_event(**kwargs):
calls.append(kwargs)
return uuid4()
monkeypatch.setattr(settings, "ENVIRONMENT", "prod")
monkeypatch.setattr(settings, "AWOOOP_OPERATOR_API_KEY", "secret")
monkeypatch.setattr(
signoz_webhook,
"record_external_alert_event",
fake_record_external_alert_event,
)
payload = {
"alerts": [
{
"status": "firing",
"alertname": "AwoooPSourceProviderCanary",
"labels": {
"alertname": "AwoooPSourceProviderCanary",
"severity": "info",
"namespace": "awoooi-prod",
"service": "source-provider-ingestion",
"awoooi_canary": "true",
"run_ref": "test-run",
},
"annotations": {"summary": "AwoooP upstream source provider canary"},
}
]
}
result = await signoz_webhook.handle_signoz_alert(
_request_with_json(payload, _operator_headers()),
BackgroundTasks(),
)
assert result["status"] == "ok"
assert result["results"][0]["status"] == "canary_recorded"
assert result["results"][0]["side_effects"] == {
"incident_created": False,
"approval_created": False,
"telegram_sent": False,
"openclaw_called": False,
}
assert calls[0]["provider"] == "signoz"
assert calls[0]["stage"] == "upstream_canary"
assert calls[0]["labels"]["approval"] == "not_created"