diff --git a/apps/api/src/jobs/incident_lifecycle_reconciler.py b/apps/api/src/jobs/incident_lifecycle_reconciler.py index a45e8a8c..c90316c9 100644 --- a/apps/api/src/jobs/incident_lifecycle_reconciler.py +++ b/apps/api/src/jobs/incident_lifecycle_reconciler.py @@ -18,15 +18,19 @@ from __future__ import annotations import asyncio from dataclasses import dataclass +import httpx import structlog from sqlalchemy import text +from src.core.config import settings from src.db.base import get_db_context +from src.utils.timezone import now_taipei logger = structlog.get_logger(__name__) -BATCH_LIMIT = 25 +BATCH_LIMIT = 100 INTERVAL_SECONDS = 1800 +_PROMETHEUS_TIMEOUT_SECONDS = 5.0 @dataclass(frozen=True) @@ -34,6 +38,7 @@ class LifecycleCandidate: incident_id: str resolution_type: str reason: str + direct_db_only: bool = False async def run_incident_lifecycle_reconciler_loop() -> None: @@ -62,6 +67,18 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int] (resolved_count, error_count) """ candidates = await _fetch_candidates(limit) + remaining = max(0, limit - len(candidates)) + if remaining > 0: + active_alertnames = await _fetch_active_alertnames() + if active_alertnames is not None: + candidates.extend( + await _fetch_inactive_or_duplicate_alert_candidates( + limit=remaining, + active_alertnames=active_alertnames, + exclude_incident_ids={c.incident_id for c in candidates}, + ) + ) + if not candidates: return 0, 0 @@ -73,19 +90,24 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int] for candidate in candidates: try: - result = await incident_service.resolve_incident( - candidate.incident_id, - resolution_type=candidate.resolution_type, - emit_postmortem=False, - ) - if result is not None: - resolved += 1 - logger.info( - "incident_lifecycle_reconciled", - incident_id=candidate.incident_id, - reason=candidate.reason, + if candidate.direct_db_only: + result = await _resolve_db_only(candidate.incident_id) + else: + result = await incident_service.resolve_incident( + candidate.incident_id, resolution_type=candidate.resolution_type, + emit_postmortem=False, ) + if not result: + continue + resolved += 1 + logger.info( + "incident_lifecycle_reconciled", + incident_id=candidate.incident_id, + reason=candidate.reason, + resolution_type=candidate.resolution_type, + direct_db_only=candidate.direct_db_only, + ) except Exception as exc: errors += 1 logger.warning( @@ -98,6 +120,45 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int] return resolved, errors +async def _fetch_active_alertnames() -> set[str] | None: + """Read current firing alertnames from Prometheus. None means fail-closed.""" + try: + async with httpx.AsyncClient(timeout=_PROMETHEUS_TIMEOUT_SECONDS) as client: + response = await client.get( + f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query", + params={"query": 'ALERTS{alertstate="firing"}'}, + ) + response.raise_for_status() + payload = response.json() + except Exception as exc: + logger.warning("incident_lifecycle_active_alerts_fetch_failed", error=str(exc)) + return None + + result = payload.get("data", {}).get("result", []) + active_alertnames = { + item.get("metric", {}).get("alertname") + for item in result + if item.get("metric", {}).get("alertname") + } + logger.info( + "incident_lifecycle_active_alerts_loaded", + active_alert_count=len(active_alertnames), + ) + return active_alertnames + + +async def _resolve_db_only(incident_id: str) -> bool: + from src.repositories.incident_repository import get_incident_repository + + now = now_taipei() + return await get_incident_repository().update_status( + incident_id=incident_id, + status="resolved", + updated_at=now, + resolved_at=now, + ) + + async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]: async with get_db_context() as db: result = await db.execute( @@ -163,3 +224,66 @@ async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]: ) for row in rows ] + + +async def _fetch_inactive_or_duplicate_alert_candidates( + *, + limit: int, + active_alertnames: set[str], + exclude_incident_ids: set[str], +) -> list[LifecycleCandidate]: + """ + 收斂 Alertmanager 已不再 firing 的舊 incident,以及同一 active alertname 的舊重複案。 + + 若 Prometheus/Alertmanager 讀不到 active alertnames,上層會 fail-closed 不呼叫本函式。 + """ + active_list = list(active_alertnames) or ["__no_active_alertnames__"] + exclude_list = list(exclude_incident_ids) or ["__no_excluded_incidents__"] + async with get_db_context() as db: + result = await db.execute( + text( + """ + WITH ranked AS ( + SELECT + i.incident_id, + i.alertname, + i.created_at, + row_number() OVER ( + PARTITION BY i.alertname + ORDER BY i.created_at DESC, i.incident_id DESC + ) AS rn + FROM incidents i + WHERE i.status = 'INVESTIGATING' + AND i.created_at <= now() - interval '24 hours' + AND NOT (i.incident_id = ANY(:exclude_incident_ids)) + ) + SELECT + incident_id, + CASE + WHEN alertname = ANY(:active_alertnames) + THEN 'active_duplicate_stale' + ELSE 'inactive_alert_stale' + END AS reason + FROM ranked + WHERE NOT (alertname = ANY(:active_alertnames) AND rn = 1) + ORDER BY created_at ASC + LIMIT :limit + """ + ), + { + "active_alertnames": active_list, + "exclude_incident_ids": exclude_list, + "limit": limit, + }, + ) + rows = result.mappings().all() + + return [ + LifecycleCandidate( + incident_id=str(row["incident_id"]), + resolution_type="timeout", + reason=str(row["reason"]), + direct_db_only=True, + ) + for row in rows + ]