fix(metrics): refresh alert chain timestamp from durable evidence
This commit is contained in:
@@ -97,6 +97,7 @@ from src.routers import proposals as proposals_router
|
||||
# Legacy route imports (to be migrated)
|
||||
from src.routes import agent, notifications, pipelines, plugins
|
||||
from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service
|
||||
from src.services.alert_chain_metrics_service import get_alert_chain_metrics_service
|
||||
from src.services.executor import close_executor
|
||||
from src.services.flywheel_stats_service import get_flywheel_stats_service
|
||||
|
||||
@@ -1025,6 +1026,15 @@ app.include_router(platform_v1.router, prefix="/api/v1/platform", tags=["AwoooP
|
||||
@app.get("/metrics", include_in_schema=False)
|
||||
async def prometheus_metrics() -> Response:
|
||||
"""Prometheus metrics endpoint for alerting"""
|
||||
# 2026-05-19 Codex — T85 Alert Chain DB evidence refresh.
|
||||
# record_alert_chain_success() 是 process-local gauge;部署後第一個 scrape
|
||||
# 可能尚未收到新 webhook,導致 smoke test 誤判 metric 不存在。
|
||||
# 先用 AwoooP inbound / alert_operation_log 的 durable evidence 回填 last_success。
|
||||
try:
|
||||
await get_alert_chain_metrics_service().refresh_last_success_gauge()
|
||||
except Exception as exc:
|
||||
logger.warning("prometheus_metrics_alert_chain_evidence_error", error=str(exc))
|
||||
|
||||
content = generate_latest().decode("utf-8")
|
||||
# 2026-05-07 ogt + Claude Sonnet 4.6 — INC-20260507-99ADF2 修復
|
||||
# 飛輪指標(awoooi_flywheel_*)原本只在 /api/v1/stats/flywheel/metrics 暴露,
|
||||
|
||||
160
apps/api/src/services/alert_chain_metrics_service.py
Normal file
160
apps/api/src/services/alert_chain_metrics_service.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""DB-backed Alert Chain metric refresh.
|
||||
|
||||
`record_alert_chain_success()` updates Prometheus gauges in process memory. A
|
||||
deployment or Pod restart clears that memory, so the first scrape after deploy
|
||||
can report `awoooi_alert_chain_last_success_timestamp` as missing even though
|
||||
Alertmanager / Sentry / SigNoz evidence is already in DB.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from time import time
|
||||
from typing import Any, Mapping
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.core.metrics import ALERT_CHAIN_LAST_SUCCESS
|
||||
from src.db.base import get_db_context
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_REFRESH_INTERVAL_SECONDS = 15
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AlertChainEvidenceSample:
|
||||
source: str
|
||||
last_success_timestamp: float
|
||||
evidence_source: str
|
||||
|
||||
|
||||
class AlertChainMetricsService:
|
||||
"""Refresh process-local alert-chain gauges from durable DB evidence."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._last_refresh_at = 0.0
|
||||
self._last_samples: list[AlertChainEvidenceSample] = []
|
||||
|
||||
async def refresh_last_success_gauge(
|
||||
self,
|
||||
*,
|
||||
project_id: str = "awoooi",
|
||||
force: bool = False,
|
||||
) -> list[AlertChainEvidenceSample]:
|
||||
now_ts = time()
|
||||
if (
|
||||
not force
|
||||
and self._last_samples
|
||||
and now_ts - self._last_refresh_at < _REFRESH_INTERVAL_SECONDS
|
||||
):
|
||||
apply_alert_chain_last_success_samples(self._last_samples)
|
||||
return self._last_samples
|
||||
|
||||
samples = await self.fetch_latest_success_samples(project_id=project_id)
|
||||
apply_alert_chain_last_success_samples(samples)
|
||||
self._last_samples = samples
|
||||
self._last_refresh_at = now_ts
|
||||
return samples
|
||||
|
||||
async def fetch_latest_success_samples(
|
||||
self,
|
||||
*,
|
||||
project_id: str = "awoooi",
|
||||
) -> list[AlertChainEvidenceSample]:
|
||||
async with get_db_context(project_id) as db:
|
||||
result = await db.execute(text(_ALERT_CHAIN_EVIDENCE_SQL), {"project_id": project_id})
|
||||
return merge_alert_chain_evidence_rows(result.mappings().all())
|
||||
|
||||
|
||||
def apply_alert_chain_last_success_samples(samples: list[AlertChainEvidenceSample]) -> None:
|
||||
"""Write durable last-success timestamps into the existing Prometheus gauge.
|
||||
|
||||
Do not write `awoooi_alert_chain_healthy` here: that gauge represents an
|
||||
explicit runtime failure signal. Durable success evidence should not mask a
|
||||
later in-memory failure from the same process.
|
||||
"""
|
||||
for sample in samples:
|
||||
ALERT_CHAIN_LAST_SUCCESS.labels(source=sample.source).set(
|
||||
sample.last_success_timestamp
|
||||
)
|
||||
|
||||
|
||||
def merge_alert_chain_evidence_rows(
|
||||
rows: list[Mapping[str, Any]] | list[Any],
|
||||
) -> list[AlertChainEvidenceSample]:
|
||||
"""Normalize SQL rows and keep the newest evidence per source."""
|
||||
latest_by_source: dict[str, AlertChainEvidenceSample] = {}
|
||||
|
||||
for row in rows:
|
||||
source = str(_row_value(row, "source") or "").strip().lower()
|
||||
evidence_source = str(_row_value(row, "evidence_source") or "db").strip()
|
||||
raw_timestamp = _row_value(row, "last_success_timestamp")
|
||||
if not source or raw_timestamp is None:
|
||||
continue
|
||||
|
||||
try:
|
||||
timestamp = float(raw_timestamp)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if timestamp <= 0:
|
||||
continue
|
||||
|
||||
sample = AlertChainEvidenceSample(
|
||||
source=source,
|
||||
last_success_timestamp=timestamp,
|
||||
evidence_source=evidence_source,
|
||||
)
|
||||
previous = latest_by_source.get(source)
|
||||
if previous is None or sample.last_success_timestamp > previous.last_success_timestamp:
|
||||
latest_by_source[source] = sample
|
||||
|
||||
return sorted(latest_by_source.values(), key=lambda item: item.source)
|
||||
|
||||
|
||||
def _row_value(row: Mapping[str, Any] | Any, key: str) -> Any:
|
||||
if isinstance(row, Mapping):
|
||||
return row.get(key)
|
||||
return getattr(row, key, None)
|
||||
|
||||
|
||||
_ALERT_CHAIN_EVIDENCE_SQL = """
|
||||
WITH conversation_evidence AS (
|
||||
SELECT
|
||||
lower(platform_subject_id) AS source,
|
||||
EXTRACT(EPOCH FROM MAX(COALESCE(provider_ts, received_at))) AS last_success_timestamp,
|
||||
'awooop_conversation_event' AS evidence_source
|
||||
FROM awooop_conversation_event
|
||||
WHERE project_id = :project_id
|
||||
AND channel_type = 'internal'
|
||||
AND lower(COALESCE(platform_subject_id, '')) IN ('alertmanager', 'sentry', 'signoz')
|
||||
GROUP BY lower(platform_subject_id)
|
||||
),
|
||||
alertmanager_log_evidence AS (
|
||||
SELECT
|
||||
'alertmanager' AS source,
|
||||
EXTRACT(EPOCH FROM MAX(created_at)) AS last_success_timestamp,
|
||||
'alert_operation_log' AS evidence_source
|
||||
FROM alert_operation_log
|
||||
WHERE event_type = 'ALERT_RECEIVED'
|
||||
AND actor = 'alertmanager'
|
||||
)
|
||||
SELECT source, last_success_timestamp, evidence_source
|
||||
FROM conversation_evidence
|
||||
WHERE last_success_timestamp IS NOT NULL
|
||||
UNION ALL
|
||||
SELECT source, last_success_timestamp, evidence_source
|
||||
FROM alertmanager_log_evidence
|
||||
WHERE last_success_timestamp IS NOT NULL
|
||||
"""
|
||||
|
||||
|
||||
_alert_chain_metrics_service: AlertChainMetricsService | None = None
|
||||
|
||||
|
||||
def get_alert_chain_metrics_service() -> AlertChainMetricsService:
|
||||
global _alert_chain_metrics_service
|
||||
if _alert_chain_metrics_service is None:
|
||||
_alert_chain_metrics_service = AlertChainMetricsService()
|
||||
return _alert_chain_metrics_service
|
||||
48
apps/api/tests/test_alert_chain_metrics_service.py
Normal file
48
apps/api/tests/test_alert_chain_metrics_service.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from src.services.alert_chain_metrics_service import (
|
||||
AlertChainEvidenceSample,
|
||||
merge_alert_chain_evidence_rows,
|
||||
)
|
||||
|
||||
|
||||
def test_merge_alert_chain_evidence_rows_keeps_newest_per_source() -> None:
|
||||
samples = merge_alert_chain_evidence_rows([
|
||||
{
|
||||
"source": "Alertmanager",
|
||||
"last_success_timestamp": 100.0,
|
||||
"evidence_source": "alert_operation_log",
|
||||
},
|
||||
{
|
||||
"source": "alertmanager",
|
||||
"last_success_timestamp": 120.0,
|
||||
"evidence_source": "awooop_conversation_event",
|
||||
},
|
||||
{
|
||||
"source": "sentry",
|
||||
"last_success_timestamp": "90",
|
||||
"evidence_source": "awooop_conversation_event",
|
||||
},
|
||||
])
|
||||
|
||||
assert samples == [
|
||||
AlertChainEvidenceSample(
|
||||
source="alertmanager",
|
||||
last_success_timestamp=120.0,
|
||||
evidence_source="awooop_conversation_event",
|
||||
),
|
||||
AlertChainEvidenceSample(
|
||||
source="sentry",
|
||||
last_success_timestamp=90.0,
|
||||
evidence_source="awooop_conversation_event",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def test_merge_alert_chain_evidence_rows_ignores_invalid_rows() -> None:
|
||||
samples = merge_alert_chain_evidence_rows([
|
||||
{"source": "", "last_success_timestamp": 100.0},
|
||||
{"source": "signoz", "last_success_timestamp": None},
|
||||
{"source": "sentry", "last_success_timestamp": "not-a-number"},
|
||||
{"source": "alertmanager", "last_success_timestamp": 0},
|
||||
])
|
||||
|
||||
assert samples == []
|
||||
Reference in New Issue
Block a user