fix(api): reconcile inactive stale incidents
This commit is contained in:
@@ -18,15 +18,19 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.core.config import settings
|
||||
from src.db.base import get_db_context
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
BATCH_LIMIT = 25
|
||||
BATCH_LIMIT = 100
|
||||
INTERVAL_SECONDS = 1800
|
||||
_PROMETHEUS_TIMEOUT_SECONDS = 5.0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -34,6 +38,7 @@ class LifecycleCandidate:
|
||||
incident_id: str
|
||||
resolution_type: str
|
||||
reason: str
|
||||
direct_db_only: bool = False
|
||||
|
||||
|
||||
async def run_incident_lifecycle_reconciler_loop() -> None:
|
||||
@@ -62,6 +67,18 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]
|
||||
(resolved_count, error_count)
|
||||
"""
|
||||
candidates = await _fetch_candidates(limit)
|
||||
remaining = max(0, limit - len(candidates))
|
||||
if remaining > 0:
|
||||
active_alertnames = await _fetch_active_alertnames()
|
||||
if active_alertnames is not None:
|
||||
candidates.extend(
|
||||
await _fetch_inactive_or_duplicate_alert_candidates(
|
||||
limit=remaining,
|
||||
active_alertnames=active_alertnames,
|
||||
exclude_incident_ids={c.incident_id for c in candidates},
|
||||
)
|
||||
)
|
||||
|
||||
if not candidates:
|
||||
return 0, 0
|
||||
|
||||
@@ -73,19 +90,24 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]
|
||||
|
||||
for candidate in candidates:
|
||||
try:
|
||||
result = await incident_service.resolve_incident(
|
||||
candidate.incident_id,
|
||||
resolution_type=candidate.resolution_type,
|
||||
emit_postmortem=False,
|
||||
)
|
||||
if result is not None:
|
||||
resolved += 1
|
||||
logger.info(
|
||||
"incident_lifecycle_reconciled",
|
||||
incident_id=candidate.incident_id,
|
||||
reason=candidate.reason,
|
||||
if candidate.direct_db_only:
|
||||
result = await _resolve_db_only(candidate.incident_id)
|
||||
else:
|
||||
result = await incident_service.resolve_incident(
|
||||
candidate.incident_id,
|
||||
resolution_type=candidate.resolution_type,
|
||||
emit_postmortem=False,
|
||||
)
|
||||
if not result:
|
||||
continue
|
||||
resolved += 1
|
||||
logger.info(
|
||||
"incident_lifecycle_reconciled",
|
||||
incident_id=candidate.incident_id,
|
||||
reason=candidate.reason,
|
||||
resolution_type=candidate.resolution_type,
|
||||
direct_db_only=candidate.direct_db_only,
|
||||
)
|
||||
except Exception as exc:
|
||||
errors += 1
|
||||
logger.warning(
|
||||
@@ -98,6 +120,45 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]
|
||||
return resolved, errors
|
||||
|
||||
|
||||
async def _fetch_active_alertnames() -> set[str] | None:
|
||||
"""Read current firing alertnames from Prometheus. None means fail-closed."""
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_PROMETHEUS_TIMEOUT_SECONDS) as client:
|
||||
response = await client.get(
|
||||
f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query",
|
||||
params={"query": 'ALERTS{alertstate="firing"}'},
|
||||
)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
except Exception as exc:
|
||||
logger.warning("incident_lifecycle_active_alerts_fetch_failed", error=str(exc))
|
||||
return None
|
||||
|
||||
result = payload.get("data", {}).get("result", [])
|
||||
active_alertnames = {
|
||||
item.get("metric", {}).get("alertname")
|
||||
for item in result
|
||||
if item.get("metric", {}).get("alertname")
|
||||
}
|
||||
logger.info(
|
||||
"incident_lifecycle_active_alerts_loaded",
|
||||
active_alert_count=len(active_alertnames),
|
||||
)
|
||||
return active_alertnames
|
||||
|
||||
|
||||
async def _resolve_db_only(incident_id: str) -> bool:
|
||||
from src.repositories.incident_repository import get_incident_repository
|
||||
|
||||
now = now_taipei()
|
||||
return await get_incident_repository().update_status(
|
||||
incident_id=incident_id,
|
||||
status="resolved",
|
||||
updated_at=now,
|
||||
resolved_at=now,
|
||||
)
|
||||
|
||||
|
||||
async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]:
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
@@ -163,3 +224,66 @@ async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]:
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
|
||||
async def _fetch_inactive_or_duplicate_alert_candidates(
|
||||
*,
|
||||
limit: int,
|
||||
active_alertnames: set[str],
|
||||
exclude_incident_ids: set[str],
|
||||
) -> list[LifecycleCandidate]:
|
||||
"""
|
||||
收斂 Alertmanager 已不再 firing 的舊 incident,以及同一 active alertname 的舊重複案。
|
||||
|
||||
若 Prometheus/Alertmanager 讀不到 active alertnames,上層會 fail-closed 不呼叫本函式。
|
||||
"""
|
||||
active_list = list(active_alertnames) or ["__no_active_alertnames__"]
|
||||
exclude_list = list(exclude_incident_ids) or ["__no_excluded_incidents__"]
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
text(
|
||||
"""
|
||||
WITH ranked AS (
|
||||
SELECT
|
||||
i.incident_id,
|
||||
i.alertname,
|
||||
i.created_at,
|
||||
row_number() OVER (
|
||||
PARTITION BY i.alertname
|
||||
ORDER BY i.created_at DESC, i.incident_id DESC
|
||||
) AS rn
|
||||
FROM incidents i
|
||||
WHERE i.status = 'INVESTIGATING'
|
||||
AND i.created_at <= now() - interval '24 hours'
|
||||
AND NOT (i.incident_id = ANY(:exclude_incident_ids))
|
||||
)
|
||||
SELECT
|
||||
incident_id,
|
||||
CASE
|
||||
WHEN alertname = ANY(:active_alertnames)
|
||||
THEN 'active_duplicate_stale'
|
||||
ELSE 'inactive_alert_stale'
|
||||
END AS reason
|
||||
FROM ranked
|
||||
WHERE NOT (alertname = ANY(:active_alertnames) AND rn = 1)
|
||||
ORDER BY created_at ASC
|
||||
LIMIT :limit
|
||||
"""
|
||||
),
|
||||
{
|
||||
"active_alertnames": active_list,
|
||||
"exclude_incident_ids": exclude_list,
|
||||
"limit": limit,
|
||||
},
|
||||
)
|
||||
rows = result.mappings().all()
|
||||
|
||||
return [
|
||||
LifecycleCandidate(
|
||||
incident_id=str(row["incident_id"]),
|
||||
resolution_type="timeout",
|
||||
reason=str(row["reason"]),
|
||||
direct_db_only=True,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user