Files
awoooi/apps/api/src/jobs/incident_analysis_sweeper.py
Your Name b3a0f0d766
All checks were successful
CD Pipeline / tests (push) Successful in 2m22s
Code Review / ai-code-review (push) Successful in 57s
CD Pipeline / build-and-deploy (push) Successful in 21m3s
CD Pipeline / post-deploy-checks (push) Successful in 5m2s
fix(telegram): dedup by fingerprint + 24h TTL to stop repeat alerts
Telegram 重複發告警鐵證(4 個 agent 真實數據):
- INC-6FE3BD (HostBackupFailed) 24h 內被推 15 次
- INC-FD6E21 (HostHighCpuLoad) 24h 內被推 6 次
- 06:44:18 同秒兩送 = pod 並發 race

根因:
1. `telegram_sent:{incident_id}` dedup key 綁 uuid4 隨機 INC ID,
   同 fingerprint 換新 INC 完全不去重
2. dedup TTL=600s 比 incident_analysis_sweeper 重觸週期 1h、
   alertmanager repeat_interval 4h 都短 → 每輪都過期通過
3. pod restart 走 _resend_unconfirmed_ready_tokens 用同一 incident_id key
   → 重啟必炸一波

修法(不消音、是「AI 認得這是同一事故」):
- decision_manager.py:207-225 dedup key 改 alertname+target fingerprint
- decision_manager.py:573-578 TTL 600s → 86400s (蓋住 sweeper 1h × alertmanager 4h)
- decision_manager.py:3189-3208 pod restart resend 路徑同步改 fingerprint
- incident_analysis_sweeper.py:37-42 sweeper_done TTL 3600s → 86400s

預期:同症狀 24h 內最多發 1 張 decision card;resolved 後 line 220-226
status check 會 early return,不影響復發偵測。

Tests: 35 passed (test_telegram_adr050 + test_decision_manager_docker_prune_routing)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 16:25:48 +08:00

157 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Incident Analysis Sweeper — 自動觸發 INVESTIGATING 事件 AI 分析
================================================================
問題背景:
Signal Worker 創建 Incident 後AI 分析 (decision_manager) 原本只在
GET /api/v1/incidents 被呼叫時才觸發 (背景 fire-and-forget)。
若前端沒人看或 Telegram Bot 未呼叫該端點,新 Incident 永遠沒有 AI 分析。
解法:
每 90 秒掃描 INVESTIGATING 狀態且無 decision token 的 Incident
自動在背景觸發 get_or_create_decision()。
限流:
Semaphore(3) — 避免並發壓垮 OPENCLAW_NEMO/Ollama
每批最多 5 個 incident避免啟動雪崩
Key 格式說明:
decision token 儲存為 decision:DEC-{HEX},內部 incident_id 欄位對應 INC-*。
使用 sweeper_done:{incident_id} 輕量標記避免重複掃描。
get_or_create_decision() 本身已有 COMPLETED/READY 去重,雙重保護。
2026-04-16 Claude Sonnet 4.6 Asia/Taipei — 修正 key 格式 BUG
"""
from __future__ import annotations
import asyncio
import structlog
from src.models.incident import Incident, IncidentStatus, Severity
logger = structlog.get_logger(__name__)
_SWEEP_INTERVAL_SEC = 90 # 每 90 秒掃一次
_MAX_BATCH = 5 # 每批最多 5 個
_SEMAPHORE_LIMIT = 3 # 最多 3 個並發 AI 分析
_DONE_MARKER_PREFIX = "sweeper_done:" # 輕量標記:已觸發過分析
# 2026-05-02 Claude Opus 4.7 + 統帥 ogtTTL 從 3600s 拉到 86400s。
# 原因sweeper_done 過期 → 同 incident 被重新掃描觸發 decision → 通過 telegram dedup 重發。
# 與 decision_manager.py:574 telegram dedup 24h 對齊徹底治住「INVESTIGATING 中 INC 每小時被推一次」。
_DONE_MARKER_TTL = 86400 # 24 小時 TTL後續由 get_or_create 去重
# 2026-04-16 ogt: 只處理 48h 內的 incident避免首次啟動把所有歷史舊案洗版到 Telegram
_MAX_INCIDENT_AGE_HOURS = 48
async def run_incident_analysis_sweeper() -> None:
"""
永久迴圈:每 90 秒自動為未分析的 INVESTIGATING Incident 觸發 AI 分析。
由 main.py lifespan 透過 asyncio.create_task() 啟動。
"""
logger.info("incident_analysis_sweeper_started", interval_sec=_SWEEP_INTERVAL_SEC)
sem = asyncio.Semaphore(_SEMAPHORE_LIMIT)
while True:
try:
await _sweep_once(sem)
except Exception as e:
logger.warning("incident_analysis_sweeper_error", error=str(e))
await asyncio.sleep(_SWEEP_INTERVAL_SEC)
async def _sweep_once(sem: asyncio.Semaphore) -> None:
"""
執行一次掃描:找出沒有 decision token 的 INVESTIGATING incidents
在背景觸發 AI 分析。
Decision token key 格式: decision:DEC-{HEX12} (非 decision:INC-*)
使用 sweeper_done:{incident_id} 輕量標記避免重複觸發。
"""
from src.services.decision_manager import get_decision_manager
from src.services.incident_service import get_incident_service
from src.core.redis_client import get_redis
redis = get_redis()
incident_service = get_incident_service()
dm = get_decision_manager()
# 取得所有 INVESTIGATING incidents
try:
incidents: list[Incident] = await incident_service.get_active_incidents()
except Exception as e:
logger.warning("sweeper_get_incidents_failed", error=str(e))
return
if not incidents:
return
# 過濾:只處理 48h 內的 incident避免首次啟動把全部歷史舊案洗版 Telegram
from datetime import datetime, timezone, timedelta
now_utc = datetime.now(timezone.utc)
cutoff = now_utc - timedelta(hours=_MAX_INCIDENT_AGE_HOURS)
recent_incidents = []
for incident in incidents:
created = getattr(incident, "created_at", None)
if created:
# 確保 created_at 有時區資訊
if created.tzinfo is None:
created = created.replace(tzinfo=timezone.utc)
if created >= cutoff:
recent_incidents.append(incident)
else:
# 沒有 created_at 的舊資料:跳過
pass
if not recent_incidents:
return
# 找出尚未觸發過分析的 (用輕量標記,不掃描 decision:DEC-* 全集)
unanalyzed = []
for incident in recent_incidents:
done_key = f"{_DONE_MARKER_PREFIX}{incident.incident_id}"
if not await redis.exists(done_key):
unanalyzed.append(incident)
if not unanalyzed:
return
# 限制每批
batch = unanalyzed[:_MAX_BATCH]
logger.info(
"sweeper_triggering_analysis",
total_unanalyzed=len(unanalyzed),
batch_size=len(batch),
)
async def _analyze(incident: Incident) -> None:
async with sem:
try:
timeout = 120.0 if incident.severity in (Severity.P0, Severity.P1) else 180.0
# 2026-04-26 P2.4 by Claude — 12-Agent Consensus 整合
# ENABLE_12AGENT_CONSENSUS=True + P0/P1 → 走 consensus 路徑(由 dm 內部 flag 守門)
from src.core.config import settings as _settings
if (
_settings.ENABLE_12AGENT_CONSENSUS
and incident.severity in (Severity.P0, Severity.P1)
):
await dm.get_or_create_decision_with_consensus(
incident=incident, timeout_sec=timeout
)
else:
await dm.get_or_create_decision(incident=incident, timeout_sec=timeout)
# 設 done 標記,避免下次掃描重複觸發
done_key = f"{_DONE_MARKER_PREFIX}{incident.incident_id}"
await redis.set(done_key, "1", ex=_DONE_MARKER_TTL)
logger.info("sweeper_analysis_done", incident_id=incident.incident_id)
except Exception as e:
logger.warning(
"sweeper_analysis_failed",
incident_id=incident.incident_id,
error=str(e),
)
tasks = [asyncio.create_task(_analyze(inc)) for inc in batch]
await asyncio.gather(*tasks, return_exceptions=True)