49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
from src.services.alert_chain_metrics_service import (
|
|
AlertChainEvidenceSample,
|
|
merge_alert_chain_evidence_rows,
|
|
)
|
|
|
|
|
|
def test_merge_alert_chain_evidence_rows_keeps_newest_per_source() -> None:
|
|
samples = merge_alert_chain_evidence_rows([
|
|
{
|
|
"source": "Alertmanager",
|
|
"last_success_timestamp": 100.0,
|
|
"evidence_source": "alert_operation_log",
|
|
},
|
|
{
|
|
"source": "alertmanager",
|
|
"last_success_timestamp": 120.0,
|
|
"evidence_source": "awooop_conversation_event",
|
|
},
|
|
{
|
|
"source": "sentry",
|
|
"last_success_timestamp": "90",
|
|
"evidence_source": "awooop_conversation_event",
|
|
},
|
|
])
|
|
|
|
assert samples == [
|
|
AlertChainEvidenceSample(
|
|
source="alertmanager",
|
|
last_success_timestamp=120.0,
|
|
evidence_source="awooop_conversation_event",
|
|
),
|
|
AlertChainEvidenceSample(
|
|
source="sentry",
|
|
last_success_timestamp=90.0,
|
|
evidence_source="awooop_conversation_event",
|
|
),
|
|
]
|
|
|
|
|
|
def test_merge_alert_chain_evidence_rows_ignores_invalid_rows() -> None:
|
|
samples = merge_alert_chain_evidence_rows([
|
|
{"source": "", "last_success_timestamp": 100.0},
|
|
{"source": "signoz", "last_success_timestamp": None},
|
|
{"source": "sentry", "last_success_timestamp": "not-a-number"},
|
|
{"source": "alertmanager", "last_success_timestamp": 0},
|
|
])
|
|
|
|
assert samples == []
|