Files
awoooi/apps/api/src/services/proactive_inspector.py
OG T 14a02263ae
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 12m32s
feat(Phase 4): 主動巡檢 + 趨勢預測 + 8D 感官升級 全部完成
## Phase 4 完整交付(ADR-084)

### 新增服務
- trend_predictor.py: numpy 線性回歸,4h 閾值突破預警,R² 信心評分
- proactive_inspector.py: 每 5 分鐘主動巡檢協調器
  - DynamicBaselineService(3σ 偏離)
  - LogAnomalyDetector(新 Drain3 pattern)
  - TrendPredictor(斜率外推 4h 預測)
  - Shadow Mode + 30 分鐘去重 + Holt-Winters 背景重訓

### 8D 感官升級(EvidenceSnapshot Phase 4 增強)
- PreDecisionInvestigator._collect_phase4_anomalies(): 決策前讀取
  ProactiveInspector 最近巡檢快取 + LogAnomalyDetector 新 pattern
- EvidenceSnapshot.anomaly_context: 新欄位,Phase 4 動態異常上下文
- DiagnosticianAgent._build_prompt(): prompt 包含 anomaly_context,
  LLM RCA 可參考動態基線偏差與趨勢預警

### 資料庫遷移
- incident_evidence: ADD COLUMN anomaly_context JSONB(冪等)

### main.py
- 啟動 run_proactive_inspector_loop() asyncio task

2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 全部完成

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 15:47:05 +08:00

444 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AWOOOI AIOps Phase 4 — Proactive Inspector主動巡檢
======================================================
職責:每 5 分鐘主動掃描異常,聚合動態偵測信號,產生 ProactiveAlert
協調三大 Phase 4 感官:
1. DynamicBaselineService — 即時異常偵測3σ 偏離)
2. LogAnomalyDetector — K8s Pod 新 log pattern
3. TrendPredictor — 4h 內閾值突破預警
設計原則:
- Shadow ModeAIOPS_P4_SHADOW_MODE=True所有偵測只記錄不觸發 Alert
- 熔斷:任一子感官失敗 → 繼續其他感官,不中斷整個巡檢
- 去重:同一 metric/cluster 在 30 分鐘內只上報一次Redis TTL
- 訓練調度:每次巡檢順便觸發 Holt-Winters 重訓async背景執行
ADR-084: Phase 4 動態異常偵測源頭升級
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import Any
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ── 常數 ────────────────────────────────────────────────────────────────────
INSPECTOR_INTERVAL_SEC = 300 # 每 5 分鐘巡檢一次
DEDUP_TTL_SEC = 1800 # 同一異常 30 分鐘內去重
DEDUP_KEY_PREFIX = "proactive:dedup:"
K8S_NAMESPACE = "awoooi-prod"
# 需要監控的 metricsPrometheus PromQL + 警戒閾值)
MONITORED_METRICS: list[dict[str, Any]] = [
{
"name": "http_error_rate",
"promql": 'sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))',
"threshold": 0.05, # > 5% error rate = 警戒
"description": "HTTP 5xx 錯誤率",
},
{
"name": "cpu_usage_awoooi_api",
"promql": 'avg(rate(container_cpu_usage_seconds_total{namespace="awoooi-prod",container="awoooi-api"}[5m]))',
"threshold": 0.85, # > 85% CPU
"description": "API 容器 CPU 使用率",
},
{
"name": "memory_usage_awoooi_api",
"promql": 'avg(container_memory_usage_bytes{namespace="awoooi-prod",container="awoooi-api"}) / avg(container_spec_memory_limit_bytes{namespace="awoooi-prod",container="awoooi-api"})',
"threshold": 0.90, # > 90% memory
"description": "API 容器記憶體使用率",
},
{
"name": "pod_restart_rate",
"promql": 'increase(kube_pod_container_status_restarts_total{namespace="awoooi-prod"}[15m])',
"threshold": 2.0, # 15 分鐘內 > 2 次重啟
"description": "Pod 重啟次數15分鐘窗口",
},
{
"name": "db_connection_pool",
"promql": 'pg_stat_activity_count{datname="awoooi"}',
"threshold": 80.0, # > 80 個 DB 連線
"description": "PostgreSQL 連線數",
},
]
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ProactiveAlert:
"""主動巡檢偵測到的預警事件"""
alert_type: str # "dynamic_anomaly" / "log_pattern" / "trend_breach"
metric_name: str
severity: str # "warning" / "critical"
description: str
current_value: float = 0.0
threshold: float = 0.0
deviation_sigma: float = 0.0
template: str = "" # log patternlog_pattern 類型)
predicted_breach_hours: float | None = None
shadow_mode: bool = True
detected_at: str = ""
source: str = "proactive_inspector"
@dataclass
class InspectionReport:
"""一次巡檢週期的完整報告"""
started_at: str
finished_at: str
alerts: list[ProactiveAlert] = field(default_factory=list)
baseline_anomalies: int = 0
log_patterns_new: int = 0
trend_breaches: int = 0
errors: list[str] = field(default_factory=list)
shadow_mode: bool = True
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class ProactiveInspector:
"""
主動巡檢協調器
每 5 分鐘執行一輪:
1. Prometheus metrics → DynamicBaselineService 異常偵測
2. K8s Pod logs → LogAnomalyDetector 新 pattern
3. 趨勢資料點 → TrendPredictor 4h 預警
4. 背景觸發 Holt-Winters 重訓(每次巡檢)
"""
def __init__(self) -> None:
# Phase 4 ADR-084: 快取最後一次巡檢報告,供 PreDecisionInvestigator 8D 感官使用
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
self._last_report: InspectionReport | None = None
def get_last_report(self) -> InspectionReport | None:
"""取得最近一次巡檢報告PreDecisionInvestigator 8D 感官用)。"""
return self._last_report
async def run_inspection(self) -> InspectionReport:
"""
執行一次完整巡檢。
Returns:
InspectionReportShadow Mode 時只記錄,不觸發 Alert
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_PROACTIVE_INSPECTOR:
return InspectionReport(
started_at=now_taipei().isoformat(),
finished_at=now_taipei().isoformat(),
)
shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE
started_at = now_taipei().isoformat()
report = InspectionReport(
started_at=started_at,
finished_at="",
shadow_mode=shadow_mode,
)
logger.info("proactive_inspection_started", shadow_mode=shadow_mode)
# 三大感官並行執行(熔斷隔離)
tasks = [
self._inspect_dynamic_baseline(report),
self._inspect_log_patterns(report),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
report.errors.append(str(r))
# 趨勢預測(依賴 baseline 結果,稍後執行)
try:
await self._inspect_trends(report)
except Exception as e:
report.errors.append(f"trend_inspect_error: {e}")
# 背景觸發基線重訓(不等待結果)
asyncio.create_task(self._retrain_baselines_background())
report.finished_at = now_taipei().isoformat()
self._last_report = report # 快取供 PreDecisionInvestigator 8D 感官讀取
logger.info(
"proactive_inspection_finished",
baseline_anomalies=report.baseline_anomalies,
log_patterns_new=report.log_patterns_new,
trend_breaches=report.trend_breaches,
errors=len(report.errors),
shadow_mode=shadow_mode,
)
return report
# ──────────────────────────────────────────────────────────────────────────
# 感官 1: Dynamic Baseline 異常偵測
# ──────────────────────────────────────────────────────────────────────────
async def _inspect_dynamic_baseline(self, report: InspectionReport) -> None:
"""從 Prometheus 抓取當前值,與 Holt-Winters 基線比對。"""
from src.services.dynamic_baseline_service import get_dynamic_baseline_service
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE:
return
svc = get_dynamic_baseline_service()
for metric_cfg in MONITORED_METRICS:
metric_name = metric_cfg["name"]
try:
current = await self._fetch_current_value(metric_cfg["promql"])
if current is None:
continue
hour_of_day = now_taipei().hour
result = await svc.is_anomaly(metric_name, current, hour_of_day=hour_of_day)
if result.is_anomaly:
dedup_key = f"dynamic:{metric_name}"
if await self._is_dedup(dedup_key):
continue
severity = "critical" if result.deviation_sigma >= 5.0 else "warning"
alert = ProactiveAlert(
alert_type="dynamic_anomaly",
metric_name=metric_name,
severity=severity,
description=(
f"{metric_cfg['description']} 偏離基線 {result.deviation_sigma:.1f}σ "
f"(當前 {current:.4f},期望 {result.expected_mean:.4f}"
),
current_value=current,
threshold=result.expected_mean + 3 * result.expected_std,
deviation_sigma=result.deviation_sigma,
shadow_mode=report.shadow_mode,
detected_at=now_taipei().isoformat(),
)
report.alerts.append(alert)
report.baseline_anomalies += 1
await self._mark_dedup(dedup_key)
except Exception as e:
logger.warning("baseline_inspect_metric_failed", metric=metric_name, error=str(e))
# ──────────────────────────────────────────────────────────────────────────
# 感官 2: Log Anomaly 新 Pattern
# ──────────────────────────────────────────────────────────────────────────
async def _inspect_log_patterns(self, report: InspectionReport) -> None:
"""掃描 K8s Pod 日誌,偵測新 log pattern。"""
from src.services.log_anomaly_detector import get_log_anomaly_detector
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_LOG_ANOMALY:
return
detector = get_log_anomaly_detector()
try:
events = await detector.process_pod_logs(
namespace=K8S_NAMESPACE,
tail_lines=200,
)
for event in events:
dedup_key = f"log:{event.cluster_id}"
if await self._is_dedup(dedup_key):
continue
alert = ProactiveAlert(
alert_type="log_pattern",
metric_name="log_anomaly",
severity="warning",
description=f"偵測到新 log pattern{event.template[:200]}",
template=event.template,
shadow_mode=report.shadow_mode,
detected_at=event.detected_at,
source=f"k8s/{K8S_NAMESPACE}",
)
report.alerts.append(alert)
report.log_patterns_new += 1
await self._mark_dedup(dedup_key)
except Exception as e:
logger.warning("log_inspect_failed", error=str(e))
# ──────────────────────────────────────────────────────────────────────────
# 感官 3: Trend Predictor 4h 預警
# ──────────────────────────────────────────────────────────────────────────
async def _inspect_trends(self, report: InspectionReport) -> None:
"""對各 metric 做 4h 趨勢外推。"""
from src.services.trend_predictor import get_trend_predictor
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_TREND_PREDICTOR:
return
predictor = get_trend_predictor()
for metric_cfg in MONITORED_METRICS:
metric_name = metric_cfg["name"]
threshold = metric_cfg["threshold"]
try:
current = await self._fetch_current_value(metric_cfg["promql"])
if current is None:
continue
pred = await predictor.predict_breach(metric_name, current, threshold)
pred.metric_name = metric_name # TrendPredictor 內部未填 metric_name
if pred.will_breach and pred.confidence in ("high", "medium"):
dedup_key = f"trend:{metric_name}"
if await self._is_dedup(dedup_key):
continue
hours_str = f"{pred.breach_in_hours:.1f}h" if pred.breach_in_hours is not None else "已超越"
alert = ProactiveAlert(
alert_type="trend_breach",
metric_name=metric_name,
severity="warning",
description=(
f"{metric_cfg['description']} 趨勢預警:預計 {hours_str} 後超越閾值 "
f"(當前 {current:.4f},閾值 {threshold},斜率 {pred.slope_per_hour:+.6f}/hR²={pred.r_squared:.2f}"
),
current_value=current,
threshold=threshold,
predicted_breach_hours=pred.breach_in_hours,
shadow_mode=report.shadow_mode,
detected_at=now_taipei().isoformat(),
)
report.alerts.append(alert)
report.trend_breaches += 1
await self._mark_dedup(dedup_key)
except Exception as e:
logger.warning("trend_inspect_metric_failed", metric=metric_name, error=str(e))
# ──────────────────────────────────────────────────────────────────────────
# 背景重訓
# ──────────────────────────────────────────────────────────────────────────
async def _retrain_baselines_background(self) -> None:
"""背景重訓所有 Holt-Winters 基線(不阻塞巡檢)。"""
from src.services.dynamic_baseline_service import get_dynamic_baseline_service
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE:
return
svc = get_dynamic_baseline_service()
for metric_cfg in MONITORED_METRICS:
try:
await svc.train_baseline(
metric_name=metric_cfg["name"],
promql=metric_cfg["promql"],
)
except Exception as e:
logger.warning(
"baseline_retrain_background_failed",
metric=metric_cfg["name"],
error=str(e),
)
# ──────────────────────────────────────────────────────────────────────────
# 工具方法
# ──────────────────────────────────────────────────────────────────────────
async def _fetch_current_value(self, promql: str) -> float | None:
"""從 Prometheus 抓取當前值instant query"""
import httpx
from src.core.config import settings
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{settings.PROMETHEUS_URL}/api/v1/query",
params={"query": promql},
)
resp.raise_for_status()
data = resp.json()
results = data.get("data", {}).get("result", [])
if not results:
return None
value_str = results[0].get("value", [None, None])[1]
if value_str is None or value_str == "NaN":
return None
return float(value_str)
except Exception as e:
logger.warning("prometheus_instant_query_failed", promql=promql[:80], error=str(e))
return None
async def _is_dedup(self, key: str) -> bool:
"""檢查是否在去重窗口內30 分鐘)。"""
try:
from src.core.redis_client import get_redis
r = get_redis()
return bool(await r.exists(f"{DEDUP_KEY_PREFIX}{key}"))
except Exception:
return False
async def _mark_dedup(self, key: str) -> None:
"""標記去重TTL = 30 分鐘。"""
try:
from src.core.redis_client import get_redis
r = get_redis()
await r.set(f"{DEDUP_KEY_PREFIX}{key}", "1", ex=DEDUP_TTL_SEC)
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# Background Loop供 main.py lifespan 呼叫)
# ─────────────────────────────────────────────────────────────────────────────
async def run_proactive_inspector_loop() -> None:
"""
永久迴圈:每 INSPECTOR_INTERVAL_SEC 秒執行一次巡檢。
由 main.py lifespan 透過 asyncio.create_task() 啟動。
Loop 內部有熔斷:任一輪失敗不會終止整個迴圈。
"""
inspector = get_proactive_inspector()
logger.info("proactive_inspector_loop_started", interval_sec=INSPECTOR_INTERVAL_SEC)
while True:
try:
await inspector.run_inspection()
except Exception as e:
logger.warning("proactive_inspector_loop_error", error=str(e))
await asyncio.sleep(INSPECTOR_INTERVAL_SEC)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_inspector: ProactiveInspector | None = None
def get_proactive_inspector() -> ProactiveInspector:
global _inspector
if _inspector is None:
_inspector = ProactiveInspector()
return _inspector