fix(metrics): refresh alert chain timestamp from durable evidence
All checks were successful
Code Review / ai-code-review (push) Successful in 9s
CD Pipeline / tests (push) Successful in 5m53s
CD Pipeline / build-and-deploy (push) Successful in 4m13s
CD Pipeline / post-deploy-checks (push) Successful in 1m29s

This commit is contained in:
Your Name
2026-05-19 17:55:47 +08:00
parent f0a9b1e00a
commit c516f9fc71
3 changed files with 218 additions and 0 deletions

View File

@@ -97,6 +97,7 @@ from src.routers import proposals as proposals_router
# Legacy route imports (to be migrated)
from src.routes import agent, notifications, pipelines, plugins
from src.services.adr100_slo_metrics_service import get_adr100_slo_metrics_service
from src.services.alert_chain_metrics_service import get_alert_chain_metrics_service
from src.services.executor import close_executor
from src.services.flywheel_stats_service import get_flywheel_stats_service
@@ -1025,6 +1026,15 @@ app.include_router(platform_v1.router, prefix="/api/v1/platform", tags=["AwoooP
@app.get("/metrics", include_in_schema=False)
async def prometheus_metrics() -> Response:
"""Prometheus metrics endpoint for alerting"""
# 2026-05-19 Codex — T85 Alert Chain DB evidence refresh.
# record_alert_chain_success() 是 process-local gauge部署後第一個 scrape
# 可能尚未收到新 webhook導致 smoke test 誤判 metric 不存在。
# 先用 AwoooP inbound / alert_operation_log 的 durable evidence 回填 last_success。
try:
await get_alert_chain_metrics_service().refresh_last_success_gauge()
except Exception as exc:
logger.warning("prometheus_metrics_alert_chain_evidence_error", error=str(exc))
content = generate_latest().decode("utf-8")
# 2026-05-07 ogt + Claude Sonnet 4.6 — INC-20260507-99ADF2 修復
# 飛輪指標awoooi_flywheel_*)原本只在 /api/v1/stats/flywheel/metrics 暴露,

View File

@@ -0,0 +1,160 @@
"""DB-backed Alert Chain metric refresh.
`record_alert_chain_success()` updates Prometheus gauges in process memory. A
deployment or Pod restart clears that memory, so the first scrape after deploy
can report `awoooi_alert_chain_last_success_timestamp` as missing even though
Alertmanager / Sentry / SigNoz evidence is already in DB.
"""
from __future__ import annotations
from dataclasses import dataclass
from time import time
from typing import Any, Mapping
import structlog
from sqlalchemy import text
from src.core.metrics import ALERT_CHAIN_LAST_SUCCESS
from src.db.base import get_db_context
logger = structlog.get_logger(__name__)
_REFRESH_INTERVAL_SECONDS = 15
@dataclass(frozen=True)
class AlertChainEvidenceSample:
source: str
last_success_timestamp: float
evidence_source: str
class AlertChainMetricsService:
"""Refresh process-local alert-chain gauges from durable DB evidence."""
def __init__(self) -> None:
self._last_refresh_at = 0.0
self._last_samples: list[AlertChainEvidenceSample] = []
async def refresh_last_success_gauge(
self,
*,
project_id: str = "awoooi",
force: bool = False,
) -> list[AlertChainEvidenceSample]:
now_ts = time()
if (
not force
and self._last_samples
and now_ts - self._last_refresh_at < _REFRESH_INTERVAL_SECONDS
):
apply_alert_chain_last_success_samples(self._last_samples)
return self._last_samples
samples = await self.fetch_latest_success_samples(project_id=project_id)
apply_alert_chain_last_success_samples(samples)
self._last_samples = samples
self._last_refresh_at = now_ts
return samples
async def fetch_latest_success_samples(
self,
*,
project_id: str = "awoooi",
) -> list[AlertChainEvidenceSample]:
async with get_db_context(project_id) as db:
result = await db.execute(text(_ALERT_CHAIN_EVIDENCE_SQL), {"project_id": project_id})
return merge_alert_chain_evidence_rows(result.mappings().all())
def apply_alert_chain_last_success_samples(samples: list[AlertChainEvidenceSample]) -> None:
"""Write durable last-success timestamps into the existing Prometheus gauge.
Do not write `awoooi_alert_chain_healthy` here: that gauge represents an
explicit runtime failure signal. Durable success evidence should not mask a
later in-memory failure from the same process.
"""
for sample in samples:
ALERT_CHAIN_LAST_SUCCESS.labels(source=sample.source).set(
sample.last_success_timestamp
)
def merge_alert_chain_evidence_rows(
rows: list[Mapping[str, Any]] | list[Any],
) -> list[AlertChainEvidenceSample]:
"""Normalize SQL rows and keep the newest evidence per source."""
latest_by_source: dict[str, AlertChainEvidenceSample] = {}
for row in rows:
source = str(_row_value(row, "source") or "").strip().lower()
evidence_source = str(_row_value(row, "evidence_source") or "db").strip()
raw_timestamp = _row_value(row, "last_success_timestamp")
if not source or raw_timestamp is None:
continue
try:
timestamp = float(raw_timestamp)
except (TypeError, ValueError):
continue
if timestamp <= 0:
continue
sample = AlertChainEvidenceSample(
source=source,
last_success_timestamp=timestamp,
evidence_source=evidence_source,
)
previous = latest_by_source.get(source)
if previous is None or sample.last_success_timestamp > previous.last_success_timestamp:
latest_by_source[source] = sample
return sorted(latest_by_source.values(), key=lambda item: item.source)
def _row_value(row: Mapping[str, Any] | Any, key: str) -> Any:
if isinstance(row, Mapping):
return row.get(key)
return getattr(row, key, None)
_ALERT_CHAIN_EVIDENCE_SQL = """
WITH conversation_evidence AS (
SELECT
lower(platform_subject_id) AS source,
EXTRACT(EPOCH FROM MAX(COALESCE(provider_ts, received_at))) AS last_success_timestamp,
'awooop_conversation_event' AS evidence_source
FROM awooop_conversation_event
WHERE project_id = :project_id
AND channel_type = 'internal'
AND lower(COALESCE(platform_subject_id, '')) IN ('alertmanager', 'sentry', 'signoz')
GROUP BY lower(platform_subject_id)
),
alertmanager_log_evidence AS (
SELECT
'alertmanager' AS source,
EXTRACT(EPOCH FROM MAX(created_at)) AS last_success_timestamp,
'alert_operation_log' AS evidence_source
FROM alert_operation_log
WHERE event_type = 'ALERT_RECEIVED'
AND actor = 'alertmanager'
)
SELECT source, last_success_timestamp, evidence_source
FROM conversation_evidence
WHERE last_success_timestamp IS NOT NULL
UNION ALL
SELECT source, last_success_timestamp, evidence_source
FROM alertmanager_log_evidence
WHERE last_success_timestamp IS NOT NULL
"""
_alert_chain_metrics_service: AlertChainMetricsService | None = None
def get_alert_chain_metrics_service() -> AlertChainMetricsService:
global _alert_chain_metrics_service
if _alert_chain_metrics_service is None:
_alert_chain_metrics_service = AlertChainMetricsService()
return _alert_chain_metrics_service

View File

@@ -0,0 +1,48 @@
from src.services.alert_chain_metrics_service import (
AlertChainEvidenceSample,
merge_alert_chain_evidence_rows,
)
def test_merge_alert_chain_evidence_rows_keeps_newest_per_source() -> None:
samples = merge_alert_chain_evidence_rows([
{
"source": "Alertmanager",
"last_success_timestamp": 100.0,
"evidence_source": "alert_operation_log",
},
{
"source": "alertmanager",
"last_success_timestamp": 120.0,
"evidence_source": "awooop_conversation_event",
},
{
"source": "sentry",
"last_success_timestamp": "90",
"evidence_source": "awooop_conversation_event",
},
])
assert samples == [
AlertChainEvidenceSample(
source="alertmanager",
last_success_timestamp=120.0,
evidence_source="awooop_conversation_event",
),
AlertChainEvidenceSample(
source="sentry",
last_success_timestamp=90.0,
evidence_source="awooop_conversation_event",
),
]
def test_merge_alert_chain_evidence_rows_ignores_invalid_rows() -> None:
samples = merge_alert_chain_evidence_rows([
{"source": "", "last_success_timestamp": 100.0},
{"source": "signoz", "last_success_timestamp": None},
{"source": "sentry", "last_success_timestamp": "not-a-number"},
{"source": "alertmanager", "last_success_timestamp": 0},
])
assert samples == []