fix(api): reconcile inactive stale incidents
All checks were successful
CD Pipeline / tests (push) Successful in 1m26s
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / build-and-deploy (push) Successful in 4m23s
CD Pipeline / post-deploy-checks (push) Successful in 2m17s

This commit is contained in:
Your Name
2026-05-29 11:43:13 +08:00
parent f0a77d79f4
commit 9e093a9525

View File

@@ -18,15 +18,19 @@ from __future__ import annotations
import asyncio
from dataclasses import dataclass
import httpx
import structlog
from sqlalchemy import text
from src.core.config import settings
from src.db.base import get_db_context
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
BATCH_LIMIT = 25
BATCH_LIMIT = 100
INTERVAL_SECONDS = 1800
_PROMETHEUS_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
@@ -34,6 +38,7 @@ class LifecycleCandidate:
incident_id: str
resolution_type: str
reason: str
direct_db_only: bool = False
async def run_incident_lifecycle_reconciler_loop() -> None:
@@ -62,6 +67,18 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]
(resolved_count, error_count)
"""
candidates = await _fetch_candidates(limit)
remaining = max(0, limit - len(candidates))
if remaining > 0:
active_alertnames = await _fetch_active_alertnames()
if active_alertnames is not None:
candidates.extend(
await _fetch_inactive_or_duplicate_alert_candidates(
limit=remaining,
active_alertnames=active_alertnames,
exclude_incident_ids={c.incident_id for c in candidates},
)
)
if not candidates:
return 0, 0
@@ -73,19 +90,24 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]
for candidate in candidates:
try:
result = await incident_service.resolve_incident(
candidate.incident_id,
resolution_type=candidate.resolution_type,
emit_postmortem=False,
)
if result is not None:
resolved += 1
logger.info(
"incident_lifecycle_reconciled",
incident_id=candidate.incident_id,
reason=candidate.reason,
if candidate.direct_db_only:
result = await _resolve_db_only(candidate.incident_id)
else:
result = await incident_service.resolve_incident(
candidate.incident_id,
resolution_type=candidate.resolution_type,
emit_postmortem=False,
)
if not result:
continue
resolved += 1
logger.info(
"incident_lifecycle_reconciled",
incident_id=candidate.incident_id,
reason=candidate.reason,
resolution_type=candidate.resolution_type,
direct_db_only=candidate.direct_db_only,
)
except Exception as exc:
errors += 1
logger.warning(
@@ -98,6 +120,45 @@ async def reconcile_stuck_incidents(limit: int = BATCH_LIMIT) -> tuple[int, int]
return resolved, errors
async def _fetch_active_alertnames() -> set[str] | None:
"""Read current firing alertnames from Prometheus. None means fail-closed."""
try:
async with httpx.AsyncClient(timeout=_PROMETHEUS_TIMEOUT_SECONDS) as client:
response = await client.get(
f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query",
params={"query": 'ALERTS{alertstate="firing"}'},
)
response.raise_for_status()
payload = response.json()
except Exception as exc:
logger.warning("incident_lifecycle_active_alerts_fetch_failed", error=str(exc))
return None
result = payload.get("data", {}).get("result", [])
active_alertnames = {
item.get("metric", {}).get("alertname")
for item in result
if item.get("metric", {}).get("alertname")
}
logger.info(
"incident_lifecycle_active_alerts_loaded",
active_alert_count=len(active_alertnames),
)
return active_alertnames
async def _resolve_db_only(incident_id: str) -> bool:
from src.repositories.incident_repository import get_incident_repository
now = now_taipei()
return await get_incident_repository().update_status(
incident_id=incident_id,
status="resolved",
updated_at=now,
resolved_at=now,
)
async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]:
async with get_db_context() as db:
result = await db.execute(
@@ -163,3 +224,66 @@ async def _fetch_candidates(limit: int) -> list[LifecycleCandidate]:
)
for row in rows
]
async def _fetch_inactive_or_duplicate_alert_candidates(
*,
limit: int,
active_alertnames: set[str],
exclude_incident_ids: set[str],
) -> list[LifecycleCandidate]:
"""
收斂 Alertmanager 已不再 firing 的舊 incident以及同一 active alertname 的舊重複案。
若 Prometheus/Alertmanager 讀不到 active alertnames上層會 fail-closed 不呼叫本函式。
"""
active_list = list(active_alertnames) or ["__no_active_alertnames__"]
exclude_list = list(exclude_incident_ids) or ["__no_excluded_incidents__"]
async with get_db_context() as db:
result = await db.execute(
text(
"""
WITH ranked AS (
SELECT
i.incident_id,
i.alertname,
i.created_at,
row_number() OVER (
PARTITION BY i.alertname
ORDER BY i.created_at DESC, i.incident_id DESC
) AS rn
FROM incidents i
WHERE i.status = 'INVESTIGATING'
AND i.created_at <= now() - interval '24 hours'
AND NOT (i.incident_id = ANY(:exclude_incident_ids))
)
SELECT
incident_id,
CASE
WHEN alertname = ANY(:active_alertnames)
THEN 'active_duplicate_stale'
ELSE 'inactive_alert_stale'
END AS reason
FROM ranked
WHERE NOT (alertname = ANY(:active_alertnames) AND rn = 1)
ORDER BY created_at ASC
LIMIT :limit
"""
),
{
"active_alertnames": active_list,
"exclude_incident_ids": exclude_list,
"limit": limit,
},
)
rows = result.mappings().all()
return [
LifecycleCandidate(
incident_id=str(row["incident_id"]),
resolution_type="timeout",
reason=str(row["reason"]),
direct_db_only=True,
)
for row in rows
]