Files
awoooi/apps/api/src/services/anomaly_counter.py
OG T 89f0bae3f2
Some checks failed
E2E Health Check / e2e-health (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(safety-net): complete wave 1 atomicity (adr-038, adr-039, debounce, graceful degrade, xclaim)
2026-03-29 23:55:38 +08:00

610 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
異常頻率統計服務
================================
ADR-037: 監控增強架構 - 異常頻率統計與根本修復
建立: 2026-03-29 (台北時區) Claude Code
使用 Redis Sorted Set 實作滑動窗口計數:
- ZADD anomaly:timeline:{key} {timestamp} {timestamp}
- ZCOUNT anomaly:timeline:{key} {start} +inf
- ZREMRANGEBYSCORE anomaly:timeline:{key} -inf {cutoff}
設計原則:
- 遵循 leWOOOgo 積木化鐵律
- 不直接存取 DB只用 Redis
- 完整審計追蹤
"""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
import redis.asyncio as redis
logger = structlog.get_logger(__name__)
# =============================================================================
# Data Types
# =============================================================================
@dataclass
class AnomalyFrequency:
"""異常頻率資料"""
anomaly_key: str
count_1h: int
count_24h: int
count_7d: int
count_30d: int
first_seen: datetime
last_seen: datetime
auto_repair_count: int
permanent_fix_applied: bool
escalation_level: str | None # None, REPEAT, ESCALATE, PERMANENT_FIX
def to_dict(self) -> dict:
"""轉換為字典 (供 Telegram 告警使用)"""
return {
"anomaly_key": self.anomaly_key,
"count_1h": self.count_1h,
"count_24h": self.count_24h,
"count_7d": self.count_7d,
"count_30d": self.count_30d,
"first_seen": self.first_seen.isoformat(),
"last_seen": self.last_seen.isoformat(),
"auto_repair_count": self.auto_repair_count,
"permanent_fix_applied": self.permanent_fix_applied,
"escalation_level": self.escalation_level,
}
# =============================================================================
# AnomalyCounter Service
# =============================================================================
class AnomalyCounter:
"""
異常計數器 - 追蹤每種異常的發生頻率
統帥指示 (2026-03-29):
- "重啟只是治標,不是治本!太常發生的異常必須徹底解決"
- "需要統計、計數!必須要讓使用者知道!!"
閾值配置:
- REPEAT: 3 次/24h → 標記重複,通知用戶
- ESCALATE: 5 次/24h → 升級 Tier通知 Owner
- PERMANENT_FIX: 10 次/24h → 強制根因修復
"""
# 升級閾值 (可透過環境變數覆寫)
THRESHOLDS = {
"REPEAT": 3, # 3 次 → 重複告警
"ESCALATE": 5, # 5 次 → 人工介入
"PERMANENT_FIX": 10, # 10 次 → 必須永久修復
}
# Redis Key 前綴
PREFIX_TIMELINE = "anomaly:timeline:"
PREFIX_REPAIR_COUNT = "anomaly:repair_count:"
PREFIX_PERMANENT_FIX = "anomaly:permanent_fix:"
PREFIX_METADATA = "anomaly:metadata:"
PREFIX_REPAIR_HISTORY = "anomaly:repair_history:"
# TTL 設定 (35 天,比清理週期長一點)
TTL_SECONDS = 35 * 24 * 3600
def __init__(self, redis_client: redis.Redis) -> None:
self.redis = redis_client
@staticmethod
def hash_signature(signature: dict) -> str:
"""
生成異常簽名的 hash key
簽名欄位:
- alert_name: 告警名稱 (e.g., PodCrashLoopBackOff)
- service: 服務名稱 (e.g., awoooi-api)
- namespace: K8s 命名空間 (e.g., awoooi-prod)
- error_type: 錯誤類型 (e.g., OOMKilled)
"""
# 只取關鍵欄位,忽略時間戳等易變欄位
key_fields = {
"alert_name": signature.get("alert_name", signature.get("alertname", "")),
"service": signature.get("service", signature.get("job", "")),
"namespace": signature.get("namespace", ""),
"error_type": signature.get("error_type", signature.get("reason", "")),
}
# 排序確保一致性
canonical = json.dumps(key_fields, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
async def record_anomaly(self, anomaly_signature: dict) -> AnomalyFrequency:
"""
記錄一次異常發生
ADR-038/039: Redis 故障時 Graceful Degradation
- 記錄錯誤但不中斷主流程
- 返回空的 AnomalyFrequency頻率計數 = 0
Args:
anomaly_signature: 異常簽名字典
Returns:
AnomalyFrequency: 當前頻率統計Redis 失敗時返回預設值)
"""
anomaly_key = self.hash_signature(anomaly_signature)
now = datetime.now()
try:
return await self._record_anomaly_impl(
anomaly_key=anomaly_key,
anomaly_signature=anomaly_signature,
now=now,
)
except Exception as e:
# ADR-038: Redis 故障 Graceful Degradation
logger.warning(
"anomaly_counter_redis_error",
error=str(e),
anomaly_key=anomaly_key,
fallback="returning_default_frequency",
)
# 返回預設值,不阻擋主流程
return AnomalyFrequency(
anomaly_key=anomaly_key,
count_1h=0,
count_24h=0,
count_7d=0,
count_30d=0,
first_seen=now,
last_seen=now,
auto_repair_count=0,
permanent_fix_applied=False,
escalation_level=None,
)
async def _record_anomaly_impl(
self,
anomaly_key: str,
anomaly_signature: dict,
now: datetime,
) -> AnomalyFrequency:
"""實際的異常記錄邏輯(可能拋出 Redis 異常)"""
timestamp = now.timestamp()
timeline_key = f"{self.PREFIX_TIMELINE}{anomaly_key}"
# 1. 添加到 Sorted Set (score = timestamp, member = timestamp string)
await self.redis.zadd(timeline_key, {str(timestamp): timestamp})
# 2. 清理過期數據 (30 天前)
cutoff_30d = (now - timedelta(days=30)).timestamp()
await self.redis.zremrangebyscore(timeline_key, "-inf", cutoff_30d)
# 3. 設置 TTL
await self.redis.expire(timeline_key, self.TTL_SECONDS)
# 4. 計算各時間窗口的計數
count_1h = await self.redis.zcount(
timeline_key,
(now - timedelta(hours=1)).timestamp(),
"+inf",
)
count_24h = await self.redis.zcount(
timeline_key,
(now - timedelta(hours=24)).timestamp(),
"+inf",
)
count_7d = await self.redis.zcount(
timeline_key,
(now - timedelta(days=7)).timestamp(),
"+inf",
)
count_30d = await self.redis.zcount(
timeline_key,
cutoff_30d,
"+inf",
)
# 5. 取得首次/最近時間
first_seen_data = await self.redis.zrange(
timeline_key, 0, 0, withscores=True
)
last_seen_data = await self.redis.zrange(
timeline_key, -1, -1, withscores=True
)
first_seen = (
datetime.fromtimestamp(first_seen_data[0][1])
if first_seen_data
else now
)
last_seen = (
datetime.fromtimestamp(last_seen_data[0][1])
if last_seen_data
else now
)
# 6. 讀取修復統計
repair_count_str = await self.redis.get(
f"{self.PREFIX_REPAIR_COUNT}{anomaly_key}"
)
auto_repair_count = int(repair_count_str) if repair_count_str else 0
permanent_fix_str = await self.redis.get(
f"{self.PREFIX_PERMANENT_FIX}{anomaly_key}"
)
permanent_fix = permanent_fix_str == "1"
# 7. 儲存 metadata (首次記錄時)
metadata_key = f"{self.PREFIX_METADATA}{anomaly_key}"
if not await self.redis.exists(metadata_key):
await self.redis.hset(
metadata_key,
mapping={
"signature": json.dumps(anomaly_signature),
"first_seen": now.isoformat(),
},
)
await self.redis.expire(metadata_key, self.TTL_SECONDS)
# 8. 判斷升級等級
escalation_level = self._get_escalation_level(count_24h)
freq = AnomalyFrequency(
anomaly_key=anomaly_key,
count_1h=count_1h,
count_24h=count_24h,
count_7d=count_7d,
count_30d=count_30d,
first_seen=first_seen,
last_seen=last_seen,
auto_repair_count=auto_repair_count,
permanent_fix_applied=permanent_fix,
escalation_level=escalation_level,
)
# 9. 記錄日誌
logger.info(
"anomaly_recorded",
anomaly_key=anomaly_key,
count_1h=count_1h,
count_24h=count_24h,
count_30d=count_30d,
escalation_level=escalation_level,
)
return freq
def _get_escalation_level(self, count_24h: int) -> str | None:
"""判斷升級等級 (基於 24h 內次數)"""
if count_24h >= self.THRESHOLDS["PERMANENT_FIX"]:
return "PERMANENT_FIX"
elif count_24h >= self.THRESHOLDS["ESCALATE"]:
return "ESCALATE"
elif count_24h >= self.THRESHOLDS["REPEAT"]:
return "REPEAT"
return None
async def record_repair_attempt(
self,
anomaly_key: str,
action: str,
success: bool,
) -> None:
"""
記錄修復嘗試
Args:
anomaly_key: 異常 key
action: 修復動作 (e.g., restart_pod, scale_up)
success: 是否成功
"""
try:
repair_key = f"{self.PREFIX_REPAIR_COUNT}{anomaly_key}"
# 遞增修復嘗試次數
await self.redis.incr(repair_key)
await self.redis.expire(repair_key, self.TTL_SECONDS)
# 記錄修復歷史 (用於學習)
history_key = f"{self.PREFIX_REPAIR_HISTORY}{anomaly_key}"
await self.redis.lpush(
history_key,
json.dumps(
{
"action": action,
"success": success,
"timestamp": datetime.now().isoformat(),
}
),
)
await self.redis.ltrim(history_key, 0, 99) # 只保留最近 100 次
await self.redis.expire(history_key, self.TTL_SECONDS)
logger.info(
"repair_attempt_recorded",
anomaly_key=anomaly_key,
action=action,
success=success,
)
except Exception as e:
logger.warning("record_repair_attempt_redis_error", error=str(e), anomaly_key=anomaly_key)
async def mark_permanent_fix_applied(
self,
anomaly_key: str,
fix_description: str,
) -> None:
"""
標記已套用永久修復
Args:
anomaly_key: 異常 key
fix_description: 修復說明
"""
try:
await self.redis.set(
f"{self.PREFIX_PERMANENT_FIX}{anomaly_key}",
"1",
ex=90 * 24 * 3600, # 90 天
)
# 記錄修復詳情
metadata_key = f"{self.PREFIX_METADATA}{anomaly_key}"
await self.redis.hset(
metadata_key,
mapping={
"permanent_fix_applied": "true",
"permanent_fix_description": fix_description,
"permanent_fix_time": datetime.now().isoformat(),
},
)
logger.info(
"permanent_fix_marked",
anomaly_key=anomaly_key,
fix_description=fix_description,
)
except Exception as e:
logger.warning("mark_permanent_fix_applied_redis_error", error=str(e), anomaly_key=anomaly_key)
async def get_repair_success_rate(
self,
anomaly_key: str,
action: str,
) -> dict:
"""
取得特定動作的修復成功率
Returns:
{
'action': 'restart_pod',
'total': 10,
'success': 3,
'success_rate': 0.3,
}
"""
try:
history_key = f"{self.PREFIX_REPAIR_HISTORY}{anomaly_key}"
history = await self.redis.lrange(history_key, 0, -1)
total = 0
success_count = 0
for item in history:
data = json.loads(item)
if data["action"] == action:
total += 1
if data["success"]:
success_count += 1
return {
"action": action,
"total": total,
"success": success_count,
"success_rate": success_count / total if total > 0 else 0.0,
}
except Exception as e:
logger.warning("get_repair_success_rate_redis_error", error=str(e), anomaly_key=anomaly_key)
return {
"action": action,
"total": 0,
"success": 0,
"success_rate": 0.0,
}
async def get_all_repair_stats(self, anomaly_key: str) -> dict[str, dict]:
"""
取得所有修復動作的統計
Returns:
{
'restart_pod': {'total': 10, 'success': 3, 'success_rate': 0.3},
'scale_up': {'total': 2, 'success': 1, 'success_rate': 0.5},
}
"""
try:
history_key = f"{self.PREFIX_REPAIR_HISTORY}{anomaly_key}"
history = await self.redis.lrange(history_key, 0, -1)
stats: dict[str, dict] = {}
for item in history:
data = json.loads(item)
action = data["action"]
if action not in stats:
stats[action] = {"total": 0, "success": 0}
stats[action]["total"] += 1
if data["success"]:
stats[action]["success"] += 1
# 計算成功率
for action_stats in stats.values():
total = action_stats["total"]
action_stats["success_rate"] = (
action_stats["success"] / total if total > 0 else 0.0
)
return stats
except Exception as e:
logger.warning("get_all_repair_stats_redis_error", error=str(e), anomaly_key=anomaly_key)
return {}
async def get_frequency(self, anomaly_key: str) -> AnomalyFrequency | None:
"""
取得異常頻率統計 (不記錄新事件)
Args:
anomaly_key: 異常 key
Returns:
AnomalyFrequency 或 None (若無記錄 或 Redis 重連失敗)
"""
try:
timeline_key = f"{self.PREFIX_TIMELINE}{anomaly_key}"
# 檢查是否有記錄
if not await self.redis.exists(timeline_key):
return None
now = datetime.now()
cutoff_30d = (now - timedelta(days=30)).timestamp()
# 計算各時間窗口的計數
count_1h = await self.redis.zcount(
timeline_key,
(now - timedelta(hours=1)).timestamp(),
"+inf",
)
count_24h = await self.redis.zcount(
timeline_key,
(now - timedelta(hours=24)).timestamp(),
"+inf",
)
count_7d = await self.redis.zcount(
timeline_key,
(now - timedelta(days=7)).timestamp(),
"+inf",
)
count_30d = await self.redis.zcount(
timeline_key,
cutoff_30d,
"+inf",
)
# 取得時間範圍
first_seen_data = await self.redis.zrange(
timeline_key, 0, 0, withscores=True
)
last_seen_data = await self.redis.zrange(
timeline_key, -1, -1, withscores=True
)
first_seen = (
datetime.fromtimestamp(first_seen_data[0][1])
if first_seen_data
else now
)
last_seen = (
datetime.fromtimestamp(last_seen_data[0][1])
if last_seen_data
else now
)
# 讀取修復統計
repair_count_str = await self.redis.get(
f"{self.PREFIX_REPAIR_COUNT}{anomaly_key}"
)
auto_repair_count = int(repair_count_str) if repair_count_str else 0
permanent_fix_str = await self.redis.get(
f"{self.PREFIX_PERMANENT_FIX}{anomaly_key}"
)
permanent_fix = permanent_fix_str == "1"
escalation_level = self._get_escalation_level(count_24h)
return AnomalyFrequency(
anomaly_key=anomaly_key,
count_1h=count_1h,
count_24h=count_24h,
count_7d=count_7d,
count_30d=count_30d,
first_seen=first_seen,
last_seen=last_seen,
auto_repair_count=auto_repair_count,
permanent_fix_applied=permanent_fix,
escalation_level=escalation_level,
)
except Exception as e:
logger.warning("get_frequency_redis_error", error=str(e), anomaly_key=anomaly_key)
return None
async def should_skip_action(
self,
anomaly_key: str,
action: str,
min_success_rate: float = 0.2,
) -> bool:
"""
判斷是否應跳過某修復動作
統帥指示: 成功率 < 20% 時應該跳過,嘗試其他動作
Args:
anomaly_key: 異常 key
action: 修復動作
min_success_rate: 最低成功率閾值 (預設 20%)
Returns:
True 表示應跳過此動作
"""
stats = await self.get_repair_success_rate(anomaly_key, action)
# 至少有 2 次嘗試才判斷
if stats["total"] < 2:
return False
return stats["success_rate"] < min_success_rate
# =============================================================================
# Singleton Factory (遵循現有模式)
# =============================================================================
_anomaly_counter: AnomalyCounter | None = None
def get_anomaly_counter() -> AnomalyCounter:
"""
取得 AnomalyCounter 實例
使用 Singleton 模式,共用 Redis 連線池
"""
global _anomaly_counter
if _anomaly_counter is None:
from src.core.redis_client import get_redis
_anomaly_counter = AnomalyCounter(get_redis())
return _anomaly_counter
def reset_anomaly_counter() -> None:
"""
重置 AnomalyCounter 實例 (供測試使用)
"""
global _anomaly_counter
_anomaly_counter = None