Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 2m7s
H2 — metric semantic 切換污染 baseline: - cpu_usage_awoooi_api → cpu_usage_node_188 - memory_usage_awoooi_api → memory_usage_node_188 原 metric_name 對應 container working set,新 PromQL 改為 node-level ratio (cadvisor 停止後的替代)。語意完全不同但保留同名 → 既有 DynamicBaseline 模型用舊單位訓練的 σ 對新值失真,5 分鐘 inspector 週期會狂報假 anomaly。 改名後 baseline 從零學習,初期 sample 數不足會被 _has_enough_samples 守門 跳過告警,安全度過 30 個週期暖機期。 H4 — probe_success 全部不可達假觸發: - 1 - avg(probe_success) + 1 - avg(probe_success or on() vector(1)) 原 expr 在 Blackbox 全部 target 失聯時 avg 回空 vector → _fetch_current_value 若把空當 0 → 1-0=1 遠超 0.05 threshold → 5min 一次假告警。 fallback 視為全部成功(值=1,1-1=0),真實 probe down 由獨立的 BlackboxProbeFailure rule 偵測,責任分離。 部署後驗證: - baseline 表新增 metric_name='memory_usage_node_188' / 'cpu_usage_node_188' 的 row - 舊 metric_name='memory_usage_awoooi_api' / 'cpu_usage_awoooi_api' 的 row 30 天後可清理 - proactive_inspection_logs 30 個週期內看 _baseline_warmup_skipped 條目而非假 anomaly Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
473 lines
22 KiB
Python
473 lines
22 KiB
Python
"""
|
||
AWOOOI AIOps Phase 4 — Proactive Inspector(主動巡檢)
|
||
======================================================
|
||
職責:每 5 分鐘主動掃描異常,聚合動態偵測信號,產生 ProactiveAlert
|
||
|
||
協調三大 Phase 4 感官:
|
||
1. DynamicBaselineService — 即時異常偵測(3σ 偏離)
|
||
2. LogAnomalyDetector — K8s Pod 新 log pattern
|
||
3. TrendPredictor — 4h 內閾值突破預警
|
||
|
||
設計原則:
|
||
- Shadow Mode(AIOPS_P4_SHADOW_MODE=True):所有偵測只記錄,不觸發 Alert
|
||
- 熔斷:任一子感官失敗 → 繼續其他感官,不中斷整個巡檢
|
||
- 去重:同一 metric/cluster 在 30 分鐘內只上報一次(Redis TTL)
|
||
- 訓練調度:每次巡檢順便觸發 Holt-Winters 重訓(async,背景執行)
|
||
|
||
ADR-084: Phase 4 動態異常偵測源頭升級
|
||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from dataclasses import dataclass, field
|
||
from typing import Any
|
||
|
||
import structlog
|
||
|
||
from src.utils.timezone import now_taipei
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
# ── 常數 ────────────────────────────────────────────────────────────────────
|
||
INSPECTOR_INTERVAL_SEC = 300 # 每 5 分鐘巡檢一次
|
||
DEDUP_TTL_SEC = 1800 # 同一異常 30 分鐘內去重
|
||
DEDUP_KEY_PREFIX = "proactive:dedup:"
|
||
K8S_NAMESPACE = "awoooi-prod"
|
||
|
||
# 需要監控的 metrics(Prometheus PromQL + 警戒閾值)
|
||
# 2026-04-24 ogt + Claude Sonnet 4.6: P0.6 修復 — 修正 PromQL labels 使其對應實際 Prometheus 資料
|
||
# - CPU/Memory: cadvisor 無 namespace label,改用 kube_pod_container_status_restarts_total 確認存在的 namespace 篩法
|
||
# - pod_restart_rate: 改用 sum() 聚合,避免回傳多 vector 使 _fetch_current_value 只取第一筆
|
||
# - db_connection_pool: datname 實際為 awoooi_prod(非 awoooi)
|
||
# - http_error_rate: cadvisor 無 http_requests_total,改用 probe_success 替代
|
||
# 2026-04-25 P0.6 修復 by Claude Engineer-B:
|
||
# - http_error_rate: Prometheus 實測確認 metric 名稱為 probe_success(非 blackbox_probe_success)
|
||
# - cpu_usage_awoooi_api: cadvisor up=0(停止),改用 node-exporter node_cpu_seconds_total(node level)
|
||
# - memory_usage_awoooi_api: cadvisor 停止,改用 node-exporter 記憶體使用率比例(0-1 scale)
|
||
# 2026-04-26 critic-H2/H4 hotfix by Claude Opus 4.7:
|
||
# - cpu/memory metric 改名 _node_188 — 語意從 container 變 node,避免污染既有 baseline
|
||
# 舊 baseline (container working set) 的數值範圍跟新 (node ratio) 完全不同,
|
||
# 保留同名會造成 baseline 學到的 σ 完全失真,5 分鐘內狂報假 anomaly。
|
||
# - http_error_rate 加 `or vector(1)` fallback:probe_success 全部不可達時不誤觸發
|
||
MONITORED_METRICS: list[dict[str, Any]] = [
|
||
{
|
||
"name": "http_error_rate",
|
||
# probe_success:Blackbox Exporter 實際 metric 名稱(非 blackbox_probe_success)
|
||
# 2026-04-26 H4 hotfix: avg() 在所有 target 缺值時回空 vector,搭配 fallback `or vector(1)`
|
||
# 防 _fetch_current_value 把空當 0 → 1-0=1 → 5min 一次假告警洪水
|
||
# 語意:探測全失敗時 fallback=1(視為「全部成功」),避免假告警;真正 down 由 BlackboxProbeFailure 抓
|
||
"promql": '1 - avg(probe_success or on() vector(1))',
|
||
"threshold": 0.05, # > 5% probe 失敗 = 警戒
|
||
"description": "HTTP Probe 失敗率(Blackbox Exporter)",
|
||
},
|
||
{
|
||
# 2026-04-26 H2 hotfix: 改名 _node_188(原 _awoooi_api 語意是 container working set)
|
||
"name": "cpu_usage_node_188",
|
||
# cadvisor up=0(prod-docker-188 離線),改用 node-exporter node-level CPU
|
||
# 實測確認:avg(rate(node_cpu_seconds_total{mode!="idle"}[5m])) → 有資料
|
||
# threshold 0.85 = 85% CPU 使用率(node level,0-1 比例)
|
||
"promql": 'avg(rate(node_cpu_seconds_total{mode!="idle"}[5m]))',
|
||
"threshold": 0.85, # > 85% node CPU(所有 core 平均)
|
||
"description": "Node 188 CPU 使用率(node-exporter,cadvisor 停止時替代)",
|
||
},
|
||
{
|
||
# 2026-04-26 H2 hotfix: 改名 _node_188
|
||
"name": "memory_usage_node_188",
|
||
# cadvisor 停止,改用 node-exporter 節點記憶體使用率比例(0-1)
|
||
# 實測確認:188 機器 62.76 GiB,當前 ~30% 使用率
|
||
# threshold 0.85 = 85% node memory usage
|
||
"promql": '(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes',
|
||
"threshold": 0.85, # > 85% node memory(0-1 比例)
|
||
"description": "Node 188 記憶體使用率(node-exporter,cadvisor 停止時替代)",
|
||
},
|
||
{
|
||
"name": "pod_restart_rate",
|
||
# kube-state-metrics: namespace=awoooi-prod,sum 聚合避免 multi-vector
|
||
"promql": 'sum(increase(kube_pod_container_status_restarts_total{namespace="awoooi-prod"}[15m]))',
|
||
"threshold": 2.0, # 15 分鐘內 > 2 次重啟
|
||
"description": "Pod 重啟次數(15分鐘窗口)",
|
||
},
|
||
{
|
||
"name": "db_connection_pool",
|
||
# datname 實際值為 awoooi_prod;sum 聚合所有 state
|
||
# 實測確認:curl 查詢返回有效資料,datname=awoooi_prod 存在
|
||
"promql": 'sum(pg_stat_activity_count{datname="awoooi_prod"})',
|
||
"threshold": 80.0, # > 80 個 DB 連線
|
||
"description": "PostgreSQL 連線數(awoooi_prod)",
|
||
},
|
||
]
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Data Types
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class ProactiveAlert:
|
||
"""主動巡檢偵測到的預警事件"""
|
||
alert_type: str # "dynamic_anomaly" / "log_pattern" / "trend_breach"
|
||
metric_name: str
|
||
severity: str # "warning" / "critical"
|
||
description: str
|
||
current_value: float = 0.0
|
||
threshold: float = 0.0
|
||
deviation_sigma: float = 0.0
|
||
template: str = "" # log pattern(log_pattern 類型)
|
||
predicted_breach_hours: float | None = None
|
||
shadow_mode: bool = True
|
||
detected_at: str = ""
|
||
source: str = "proactive_inspector"
|
||
|
||
|
||
@dataclass
|
||
class InspectionReport:
|
||
"""一次巡檢週期的完整報告"""
|
||
started_at: str
|
||
finished_at: str
|
||
alerts: list[ProactiveAlert] = field(default_factory=list)
|
||
baseline_anomalies: int = 0
|
||
log_patterns_new: int = 0
|
||
trend_breaches: int = 0
|
||
errors: list[str] = field(default_factory=list)
|
||
shadow_mode: bool = True
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Main Service
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class ProactiveInspector:
|
||
"""
|
||
主動巡檢協調器
|
||
|
||
每 5 分鐘執行一輪:
|
||
1. Prometheus metrics → DynamicBaselineService 異常偵測
|
||
2. K8s Pod logs → LogAnomalyDetector 新 pattern
|
||
3. 趨勢資料點 → TrendPredictor 4h 預警
|
||
4. 背景觸發 Holt-Winters 重訓(每次巡檢)
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
# Phase 4 ADR-084: 快取最後一次巡檢報告,供 PreDecisionInvestigator 8D 感官使用
|
||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
|
||
self._last_report: InspectionReport | None = None
|
||
|
||
def get_last_report(self) -> InspectionReport | None:
|
||
"""取得最近一次巡檢報告(PreDecisionInvestigator 8D 感官用)。"""
|
||
return self._last_report
|
||
|
||
async def run_inspection(self) -> InspectionReport:
|
||
"""
|
||
執行一次完整巡檢。
|
||
|
||
Returns:
|
||
InspectionReport(Shadow Mode 時只記錄,不觸發 Alert)
|
||
"""
|
||
from src.core.feature_flags import aiops_flags
|
||
|
||
if not aiops_flags.AIOPS_P4_PROACTIVE_INSPECTOR:
|
||
return InspectionReport(
|
||
started_at=now_taipei().isoformat(),
|
||
finished_at=now_taipei().isoformat(),
|
||
)
|
||
|
||
shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE
|
||
started_at = now_taipei().isoformat()
|
||
report = InspectionReport(
|
||
started_at=started_at,
|
||
finished_at="",
|
||
shadow_mode=shadow_mode,
|
||
)
|
||
|
||
logger.info("proactive_inspection_started", shadow_mode=shadow_mode)
|
||
|
||
# 三大感官並行執行(熔斷隔離)
|
||
tasks = [
|
||
self._inspect_dynamic_baseline(report),
|
||
self._inspect_log_patterns(report),
|
||
]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
for r in results:
|
||
if isinstance(r, Exception):
|
||
report.errors.append(str(r))
|
||
|
||
# 趨勢預測(依賴 baseline 結果,稍後執行)
|
||
try:
|
||
await self._inspect_trends(report)
|
||
except Exception as e:
|
||
report.errors.append(f"trend_inspect_error: {e}")
|
||
|
||
# 背景觸發基線重訓(不等待結果)
|
||
asyncio.create_task(self._retrain_baselines_background())
|
||
|
||
report.finished_at = now_taipei().isoformat()
|
||
self._last_report = report # 快取供 PreDecisionInvestigator 8D 感官讀取
|
||
|
||
logger.info(
|
||
"proactive_inspection_finished",
|
||
baseline_anomalies=report.baseline_anomalies,
|
||
log_patterns_new=report.log_patterns_new,
|
||
trend_breaches=report.trend_breaches,
|
||
errors=len(report.errors),
|
||
shadow_mode=shadow_mode,
|
||
)
|
||
|
||
return report
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# 感官 1: Dynamic Baseline 異常偵測
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _inspect_dynamic_baseline(self, report: InspectionReport) -> None:
|
||
"""從 Prometheus 抓取當前值,與 Holt-Winters 基線比對。"""
|
||
from src.services.dynamic_baseline_service import get_dynamic_baseline_service
|
||
from src.core.feature_flags import aiops_flags
|
||
|
||
if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE:
|
||
return
|
||
|
||
svc = get_dynamic_baseline_service()
|
||
|
||
for metric_cfg in MONITORED_METRICS:
|
||
metric_name = metric_cfg["name"]
|
||
try:
|
||
current = await self._fetch_current_value(metric_cfg["promql"])
|
||
if current is None:
|
||
continue
|
||
|
||
hour_of_day = now_taipei().hour
|
||
result = await svc.is_anomaly(metric_name, current, hour_of_day=hour_of_day)
|
||
|
||
if result.is_anomaly:
|
||
dedup_key = f"dynamic:{metric_name}"
|
||
if await self._is_dedup(dedup_key):
|
||
continue
|
||
|
||
severity = "critical" if result.deviation_sigma >= 5.0 else "warning"
|
||
alert = ProactiveAlert(
|
||
alert_type="dynamic_anomaly",
|
||
metric_name=metric_name,
|
||
severity=severity,
|
||
description=(
|
||
f"{metric_cfg['description']} 偏離基線 {result.deviation_sigma:.1f}σ "
|
||
f"(當前 {current:.4f},期望 {result.expected_mean:.4f})"
|
||
),
|
||
current_value=current,
|
||
threshold=result.expected_mean + 3 * result.expected_std,
|
||
deviation_sigma=result.deviation_sigma,
|
||
shadow_mode=report.shadow_mode,
|
||
detected_at=now_taipei().isoformat(),
|
||
)
|
||
report.alerts.append(alert)
|
||
report.baseline_anomalies += 1
|
||
await self._mark_dedup(dedup_key)
|
||
|
||
except Exception as e:
|
||
logger.warning("baseline_inspect_metric_failed", metric=metric_name, error=str(e))
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# 感官 2: Log Anomaly 新 Pattern
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _inspect_log_patterns(self, report: InspectionReport) -> None:
|
||
"""掃描 K8s Pod 日誌,偵測新 log pattern。"""
|
||
from src.services.log_anomaly_detector import get_log_anomaly_detector
|
||
from src.core.feature_flags import aiops_flags
|
||
|
||
if not aiops_flags.AIOPS_P4_LOG_ANOMALY:
|
||
return
|
||
|
||
detector = get_log_anomaly_detector()
|
||
try:
|
||
events = await detector.process_pod_logs(
|
||
namespace=K8S_NAMESPACE,
|
||
tail_lines=200,
|
||
)
|
||
for event in events:
|
||
dedup_key = f"log:{event.cluster_id}"
|
||
if await self._is_dedup(dedup_key):
|
||
continue
|
||
|
||
alert = ProactiveAlert(
|
||
alert_type="log_pattern",
|
||
metric_name="log_anomaly",
|
||
severity="warning",
|
||
description=f"偵測到新 log pattern:{event.template[:200]}",
|
||
template=event.template,
|
||
shadow_mode=report.shadow_mode,
|
||
detected_at=event.detected_at,
|
||
source=f"k8s/{K8S_NAMESPACE}",
|
||
)
|
||
report.alerts.append(alert)
|
||
report.log_patterns_new += 1
|
||
await self._mark_dedup(dedup_key)
|
||
|
||
except Exception as e:
|
||
logger.warning("log_inspect_failed", error=str(e))
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# 感官 3: Trend Predictor 4h 預警
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _inspect_trends(self, report: InspectionReport) -> None:
|
||
"""對各 metric 做 4h 趨勢外推。"""
|
||
from src.services.trend_predictor import get_trend_predictor
|
||
from src.core.feature_flags import aiops_flags
|
||
|
||
if not aiops_flags.AIOPS_P4_TREND_PREDICTOR:
|
||
return
|
||
|
||
predictor = get_trend_predictor()
|
||
|
||
for metric_cfg in MONITORED_METRICS:
|
||
metric_name = metric_cfg["name"]
|
||
threshold = metric_cfg["threshold"]
|
||
try:
|
||
current = await self._fetch_current_value(metric_cfg["promql"])
|
||
if current is None:
|
||
continue
|
||
|
||
pred = await predictor.predict_breach(metric_name, current, threshold)
|
||
pred.metric_name = metric_name # TrendPredictor 內部未填 metric_name
|
||
|
||
if pred.will_breach and pred.confidence in ("high", "medium"):
|
||
dedup_key = f"trend:{metric_name}"
|
||
if await self._is_dedup(dedup_key):
|
||
continue
|
||
|
||
hours_str = f"{pred.breach_in_hours:.1f}h" if pred.breach_in_hours is not None else "已超越"
|
||
alert = ProactiveAlert(
|
||
alert_type="trend_breach",
|
||
metric_name=metric_name,
|
||
severity="warning",
|
||
description=(
|
||
f"{metric_cfg['description']} 趨勢預警:預計 {hours_str} 後超越閾值 "
|
||
f"(當前 {current:.4f},閾值 {threshold},斜率 {pred.slope_per_hour:+.6f}/h,R²={pred.r_squared:.2f})"
|
||
),
|
||
current_value=current,
|
||
threshold=threshold,
|
||
predicted_breach_hours=pred.breach_in_hours,
|
||
shadow_mode=report.shadow_mode,
|
||
detected_at=now_taipei().isoformat(),
|
||
)
|
||
report.alerts.append(alert)
|
||
report.trend_breaches += 1
|
||
await self._mark_dedup(dedup_key)
|
||
|
||
except Exception as e:
|
||
logger.warning("trend_inspect_metric_failed", metric=metric_name, error=str(e))
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# 背景重訓
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _retrain_baselines_background(self) -> None:
|
||
"""背景重訓所有 Holt-Winters 基線(不阻塞巡檢)。"""
|
||
from src.services.dynamic_baseline_service import get_dynamic_baseline_service
|
||
from src.core.feature_flags import aiops_flags
|
||
|
||
if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE:
|
||
return
|
||
|
||
svc = get_dynamic_baseline_service()
|
||
for metric_cfg in MONITORED_METRICS:
|
||
try:
|
||
await svc.train_baseline(
|
||
metric_name=metric_cfg["name"],
|
||
promql=metric_cfg["promql"],
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
"baseline_retrain_background_failed",
|
||
metric=metric_cfg["name"],
|
||
error=str(e),
|
||
)
|
||
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
# 工具方法
|
||
# ──────────────────────────────────────────────────────────────────────────
|
||
|
||
async def _fetch_current_value(self, promql: str) -> float | None:
|
||
"""從 Prometheus 抓取當前值(instant query)。"""
|
||
import httpx
|
||
from src.core.config import settings
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
resp = await client.get(
|
||
f"{settings.PROMETHEUS_URL}/api/v1/query",
|
||
params={"query": promql},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
|
||
results = data.get("data", {}).get("result", [])
|
||
if not results:
|
||
return None
|
||
|
||
value_str = results[0].get("value", [None, None])[1]
|
||
if value_str is None or value_str == "NaN":
|
||
return None
|
||
return float(value_str)
|
||
|
||
except Exception as e:
|
||
logger.warning("prometheus_instant_query_failed", promql=promql[:80], error=str(e))
|
||
return None
|
||
|
||
async def _is_dedup(self, key: str) -> bool:
|
||
"""檢查是否在去重窗口內(30 分鐘)。"""
|
||
try:
|
||
from src.core.redis_client import get_redis
|
||
r = get_redis()
|
||
return bool(await r.exists(f"{DEDUP_KEY_PREFIX}{key}"))
|
||
except Exception:
|
||
return False
|
||
|
||
async def _mark_dedup(self, key: str) -> None:
|
||
"""標記去重,TTL = 30 分鐘。"""
|
||
try:
|
||
from src.core.redis_client import get_redis
|
||
r = get_redis()
|
||
await r.set(f"{DEDUP_KEY_PREFIX}{key}", "1", ex=DEDUP_TTL_SEC)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Background Loop(供 main.py lifespan 呼叫)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
async def run_proactive_inspector_loop() -> None:
|
||
"""
|
||
永久迴圈:每 INSPECTOR_INTERVAL_SEC 秒執行一次巡檢。
|
||
|
||
由 main.py lifespan 透過 asyncio.create_task() 啟動。
|
||
Loop 內部有熔斷:任一輪失敗不會終止整個迴圈。
|
||
"""
|
||
inspector = get_proactive_inspector()
|
||
logger.info("proactive_inspector_loop_started", interval_sec=INSPECTOR_INTERVAL_SEC)
|
||
|
||
while True:
|
||
try:
|
||
await inspector.run_inspection()
|
||
except Exception as e:
|
||
logger.warning("proactive_inspector_loop_error", error=str(e))
|
||
|
||
await asyncio.sleep(INSPECTOR_INTERVAL_SEC)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Singleton
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
_inspector: ProactiveInspector | None = None
|
||
|
||
|
||
def get_proactive_inspector() -> ProactiveInspector:
|
||
global _inspector
|
||
if _inspector is None:
|
||
_inspector = ProactiveInspector()
|
||
return _inspector
|