From 14a02263ae29205730a6e0ea60c4d380f846d7f2 Mon Sep 17 00:00:00 2001 From: OG T Date: Wed, 15 Apr 2026 15:47:05 +0800 Subject: [PATCH] =?UTF-8?q?feat(Phase=204):=20=E4=B8=BB=E5=8B=95=E5=B7=A1?= =?UTF-8?q?=E6=AA=A2=20+=20=E8=B6=A8=E5=8B=A2=E9=A0=90=E6=B8=AC=20+=208D?= =?UTF-8?q?=20=E6=84=9F=E5=AE=98=E5=8D=87=E7=B4=9A=20=E5=85=A8=E9=83=A8?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Phase 4 完整交付(ADR-084) ### 新增服務 - trend_predictor.py: numpy 線性回歸,4h 閾值突破預警,R² 信心評分 - proactive_inspector.py: 每 5 分鐘主動巡檢協調器 - DynamicBaselineService(3σ 偏離) - LogAnomalyDetector(新 Drain3 pattern) - TrendPredictor(斜率外推 4h 預測) - Shadow Mode + 30 分鐘去重 + Holt-Winters 背景重訓 ### 8D 感官升級(EvidenceSnapshot Phase 4 增強) - PreDecisionInvestigator._collect_phase4_anomalies(): 決策前讀取 ProactiveInspector 最近巡檢快取 + LogAnomalyDetector 新 pattern - EvidenceSnapshot.anomaly_context: 新欄位,Phase 4 動態異常上下文 - DiagnosticianAgent._build_prompt(): prompt 包含 anomaly_context, LLM RCA 可參考動態基線偏差與趨勢預警 ### 資料庫遷移 - incident_evidence: ADD COLUMN anomaly_context JSONB(冪等) ### main.py - 啟動 run_proactive_inspector_loop() asyncio task 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 全部完成 Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/agents/diagnostician_agent.py | 20 +- apps/api/src/db/base.py | 9 + apps/api/src/db/models.py | 6 + apps/api/src/main.py | 11 + apps/api/src/services/evidence_snapshot.py | 7 + .../src/services/pre_decision_investigator.py | 80 ++++ apps/api/src/services/proactive_inspector.py | 443 ++++++++++++++++++ apps/api/src/services/trend_predictor.py | 306 ++++++++++++ docs/LOGBOOK.md | 86 +++- 9 files changed, 965 insertions(+), 3 deletions(-) create mode 100644 apps/api/src/services/proactive_inspector.py create mode 100644 apps/api/src/services/trend_predictor.py diff --git a/apps/api/src/agents/diagnostician_agent.py b/apps/api/src/agents/diagnostician_agent.py index 46baf4dd..7974c079 100644 --- a/apps/api/src/agents/diagnostician_agent.py +++ b/apps/api/src/agents/diagnostician_agent.py @@ -104,7 +104,10 @@ class DiagnosticianAgent(BaseAgent): async def _analyze(self, snapshot: "EvidenceSnapshot") -> DiagnosisReport: """核心 LLM 分析邏輯。""" - prompt = self._build_prompt({"evidence_summary": snapshot.evidence_summary or ""}) + prompt = self._build_prompt({ + "evidence_summary": snapshot.evidence_summary or "", + "anomaly_context": snapshot.anomaly_context, + }) from src.services.openclaw import get_openclaw openclaw = get_openclaw() @@ -129,6 +132,19 @@ class DiagnosticianAgent(BaseAgent): def _build_prompt(self, context: dict[str, Any]) -> str: evidence = context.get("evidence_summary", "(無感官情報)") + anomaly_context = context.get("anomaly_context") + + # Phase 4 ADR-084: 動態異常感官區塊(有資料才附加,避免空白雜訊) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級 + anomaly_section = "" + if anomaly_context: + import json as _json + anomaly_section = f""" +--- +Phase 4 動態異常偵測(AI 主動巡檢結果,可作為高信心佐證): +{_json.dumps(anomaly_context, ensure_ascii=False, indent=2)} +---""" + return f"""你是 AWOOOI SRE 系統的偵探 Agent,專職根因分析(Root Cause Analysis)。 你的唯一工作:根據以下感官情報,提出 2-3 個根因假設(hypotheses)。 @@ -143,7 +159,7 @@ class DiagnosticianAgent(BaseAgent): --- 感官情報: {evidence} ---- +---{anomaly_section} 以 JSON 回覆(不要加任何解釋): {{ diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index 0990bd93..d664398c 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -211,6 +211,15 @@ async def init_db() -> None: """) ) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 感官升級 + # ADR-084: EvidenceSnapshot 加入 Phase 4 動態異常上下文(anomaly_context) + await conn.execute( + text(""" + ALTER TABLE incident_evidence + ADD COLUMN IF NOT EXISTS anomaly_context JSONB; + """) + ) + async def close_db() -> None: """ diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py index 3f53260e..a92a606b 100644 --- a/apps/api/src/db/models.py +++ b/apps/api/src/db/models.py @@ -791,6 +791,12 @@ class IncidentEvidence(Base): dependency_topology: Mapped[dict | None] = mapped_column( JSON, nullable=True, comment="D8: Istio/Service Mesh 上下游 latency/error rate" ) + # Phase 4 ADR-084: 動態異常偵測增強感官(DynamicBaseline + LogAnomaly + TrendPredictor) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級 + anomaly_context: Mapped[dict | None] = mapped_column( + JSON, nullable=True, + comment="Phase 4 動態異常上下文:baseline_anomalies / log_patterns / trend_breaches" + ) # 感官品質指標 mcp_health: Mapped[dict] = mapped_column( diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 449c2900..b6c5cf08 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -336,6 +336,17 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("daily_report_loop_schedule_failed", error=str(e)) + # Phase 4 ADR-084: 主動巡檢每 5 分鐘執行一次 + # 協調 DynamicBaselineService + LogAnomalyDetector + TrendPredictor + # Shadow Mode 控制:AIOPS_P4_SHADOW_MODE=True 時只記錄,不觸發 Alert + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 + try: + from src.services.proactive_inspector import run_proactive_inspector_loop + asyncio.create_task(run_proactive_inspector_loop()) + logger.info("proactive_inspector_loop_scheduled", interval_sec=300) + except Exception as e: + logger.warning("proactive_inspector_schedule_failed", error=str(e)) + yield # Shutdown diff --git a/apps/api/src/services/evidence_snapshot.py b/apps/api/src/services/evidence_snapshot.py index f7435fea..85bb784a 100644 --- a/apps/api/src/services/evidence_snapshot.py +++ b/apps/api/src/services/evidence_snapshot.py @@ -86,6 +86,9 @@ class EvidenceSnapshot: historical_context: str | None = None # D6 peer_health: dict[str, Any] | None = None # D7 dependency_topology: dict[str, Any] | None = None # D8 + # Phase 4 ADR-084: 動態異常感官(DynamicBaseline + LogAnomaly + TrendPredictor) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級 + anomaly_context: dict[str, Any] | None = None # Phase 4 動態異常上下文 # 感官品質 mcp_health: dict[str, bool] = field(default_factory=dict) @@ -150,6 +153,8 @@ class EvidenceSnapshot: parts.append(f"[同級副本健康度] {self.peer_health}") if self.dependency_topology: parts.append(f"[依賴拓撲] {self.dependency_topology}") + if self.anomaly_context: + parts.append(f"[動態異常偵測]\n{self.anomaly_context}") # 感官品質報告 failed_tools = [t for t, ok in self.mcp_health.items() if not ok] @@ -194,6 +199,7 @@ class EvidenceSnapshot: historical_context=self.historical_context, peer_health=self.peer_health, dependency_topology=self.dependency_topology, + anomaly_context=self.anomaly_context, mcp_health=self.mcp_health, collection_duration_ms=self.collection_duration_ms, sensors_attempted=self.sensors_attempted, @@ -305,6 +311,7 @@ async def get_latest_snapshot(incident_id: str) -> EvidenceSnapshot | None: historical_context=row.historical_context, peer_health=row.peer_health, dependency_topology=row.dependency_topology, + anomaly_context=row.anomaly_context, mcp_health=row.mcp_health or {}, collection_duration_ms=row.collection_duration_ms, sensors_attempted=row.sensors_attempted or 0, diff --git a/apps/api/src/services/pre_decision_investigator.py b/apps/api/src/services/pre_decision_investigator.py index 3918343c..253dde59 100644 --- a/apps/api/src/services/pre_decision_investigator.py +++ b/apps/api/src/services/pre_decision_investigator.py @@ -122,6 +122,17 @@ class PreDecisionInvestigator: # 4. 記錄耗時 snapshot.collection_duration_ms = int(time.monotonic() * 1000) - start_ms + # 4.5 Phase 4 8D 感官增強:填入動態異常上下文(非阻塞,失敗不影響主路徑) + try: + await asyncio.wait_for( + self._collect_phase4_anomalies(snapshot), + timeout=2.0, # Phase 4 感官最多等 2s,不能拖慢主路徑 + ) + except asyncio.TimeoutError: + logger.warning("phase4_anomaly_collect_timeout", incident_id=incident_id) + except Exception: + logger.exception("phase4_anomaly_collect_error", incident_id=incident_id) + # 5. 組裝 summary snapshot.evidence_summary = snapshot.build_summary() @@ -144,6 +155,75 @@ class PreDecisionInvestigator: ) return snapshot + async def _collect_phase4_anomalies(self, snapshot: EvidenceSnapshot) -> None: + """ + Phase 4 8D 感官增強:從 ProactiveInspector 快取 + LogAnomalyDetector + 讀取動態異常上下文,填入 snapshot.anomaly_context。 + + 設計原則: + - 只讀快取,不觸發新的 Prometheus 查詢(避免延遲) + - 失敗靜默降級(外層已包 try/except + timeout) + - Phase 4 Shadow Mode 時資料仍填入(供 LLM 參考,不觸發 Alert) + + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級 + """ + from src.services.proactive_inspector import get_proactive_inspector + from src.services.log_anomaly_detector import get_log_anomaly_detector + + context: dict[str, Any] = {} + + # 1. 讀取最近一次巡檢報告(ProactiveInspector 每 5 分鐘更新一次) + inspector = get_proactive_inspector() + last_report = inspector.get_last_report() + if last_report is not None: + context["last_inspection_at"] = last_report.finished_at + context["shadow_mode"] = last_report.shadow_mode + + if last_report.baseline_anomalies > 0: + context["baseline_anomalies"] = [ + { + "metric": a.metric_name, + "severity": a.severity, + "description": a.description, + "deviation_sigma": a.deviation_sigma, + } + for a in last_report.alerts + if a.alert_type == "dynamic_anomaly" + ] + + if last_report.trend_breaches > 0: + context["trend_breaches"] = [ + { + "metric": a.metric_name, + "description": a.description, + "breach_in_hours": a.predicted_breach_hours, + } + for a in last_report.alerts + if a.alert_type == "trend_breach" + ] + + # 2. 讀取最近新 log pattern(最多 5 個) + detector = get_log_anomaly_detector() + recent_patterns = await detector.get_recent_new_patterns(limit=5) + if recent_patterns: + context["recent_log_patterns"] = [ + { + "template": p.get("template", "")[:200], + "cluster_id": p.get("cluster_id", ""), + "source": p.get("source", ""), + } + for p in recent_patterns + ] + + if context: + snapshot.anomaly_context = context + logger.debug( + "phase4_anomaly_context_collected", + has_baseline=bool(context.get("baseline_anomalies")), + has_trends=bool(context.get("trend_breaches")), + log_patterns=len(context.get("recent_log_patterns", [])), + ) + async def _collect_all( self, snapshot: EvidenceSnapshot, diff --git a/apps/api/src/services/proactive_inspector.py b/apps/api/src/services/proactive_inspector.py new file mode 100644 index 00000000..64075aac --- /dev/null +++ b/apps/api/src/services/proactive_inspector.py @@ -0,0 +1,443 @@ +""" +AWOOOI AIOps Phase 4 — Proactive Inspector(主動巡檢) +====================================================== +職責:每 5 分鐘主動掃描異常,聚合動態偵測信號,產生 ProactiveAlert + +協調三大 Phase 4 感官: +1. DynamicBaselineService — 即時異常偵測(3σ 偏離) +2. LogAnomalyDetector — K8s Pod 新 log pattern +3. TrendPredictor — 4h 內閾值突破預警 + +設計原則: +- Shadow Mode(AIOPS_P4_SHADOW_MODE=True):所有偵測只記錄,不觸發 Alert +- 熔斷:任一子感官失敗 → 繼續其他感官,不中斷整個巡檢 +- 去重:同一 metric/cluster 在 30 分鐘內只上報一次(Redis TTL) +- 訓練調度:每次巡檢順便觸發 Holt-Winters 重訓(async,背景執行) + +ADR-084: Phase 4 動態異常偵測源頭升級 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Any + +import structlog + +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ── 常數 ──────────────────────────────────────────────────────────────────── +INSPECTOR_INTERVAL_SEC = 300 # 每 5 分鐘巡檢一次 +DEDUP_TTL_SEC = 1800 # 同一異常 30 分鐘內去重 +DEDUP_KEY_PREFIX = "proactive:dedup:" +K8S_NAMESPACE = "awoooi-prod" + +# 需要監控的 metrics(Prometheus PromQL + 警戒閾值) +MONITORED_METRICS: list[dict[str, Any]] = [ + { + "name": "http_error_rate", + "promql": 'sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))', + "threshold": 0.05, # > 5% error rate = 警戒 + "description": "HTTP 5xx 錯誤率", + }, + { + "name": "cpu_usage_awoooi_api", + "promql": 'avg(rate(container_cpu_usage_seconds_total{namespace="awoooi-prod",container="awoooi-api"}[5m]))', + "threshold": 0.85, # > 85% CPU + "description": "API 容器 CPU 使用率", + }, + { + "name": "memory_usage_awoooi_api", + "promql": 'avg(container_memory_usage_bytes{namespace="awoooi-prod",container="awoooi-api"}) / avg(container_spec_memory_limit_bytes{namespace="awoooi-prod",container="awoooi-api"})', + "threshold": 0.90, # > 90% memory + "description": "API 容器記憶體使用率", + }, + { + "name": "pod_restart_rate", + "promql": 'increase(kube_pod_container_status_restarts_total{namespace="awoooi-prod"}[15m])', + "threshold": 2.0, # 15 分鐘內 > 2 次重啟 + "description": "Pod 重啟次數(15分鐘窗口)", + }, + { + "name": "db_connection_pool", + "promql": 'pg_stat_activity_count{datname="awoooi"}', + "threshold": 80.0, # > 80 個 DB 連線 + "description": "PostgreSQL 連線數", + }, +] + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class ProactiveAlert: + """主動巡檢偵測到的預警事件""" + alert_type: str # "dynamic_anomaly" / "log_pattern" / "trend_breach" + metric_name: str + severity: str # "warning" / "critical" + description: str + current_value: float = 0.0 + threshold: float = 0.0 + deviation_sigma: float = 0.0 + template: str = "" # log pattern(log_pattern 類型) + predicted_breach_hours: float | None = None + shadow_mode: bool = True + detected_at: str = "" + source: str = "proactive_inspector" + + +@dataclass +class InspectionReport: + """一次巡檢週期的完整報告""" + started_at: str + finished_at: str + alerts: list[ProactiveAlert] = field(default_factory=list) + baseline_anomalies: int = 0 + log_patterns_new: int = 0 + trend_breaches: int = 0 + errors: list[str] = field(default_factory=list) + shadow_mode: bool = True + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class ProactiveInspector: + """ + 主動巡檢協調器 + + 每 5 分鐘執行一輪: + 1. Prometheus metrics → DynamicBaselineService 異常偵測 + 2. K8s Pod logs → LogAnomalyDetector 新 pattern + 3. 趨勢資料點 → TrendPredictor 4h 預警 + 4. 背景觸發 Holt-Winters 重訓(每次巡檢) + """ + + def __init__(self) -> None: + # Phase 4 ADR-084: 快取最後一次巡檢報告,供 PreDecisionInvestigator 8D 感官使用 + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級 + self._last_report: InspectionReport | None = None + + def get_last_report(self) -> InspectionReport | None: + """取得最近一次巡檢報告(PreDecisionInvestigator 8D 感官用)。""" + return self._last_report + + async def run_inspection(self) -> InspectionReport: + """ + 執行一次完整巡檢。 + + Returns: + InspectionReport(Shadow Mode 時只記錄,不觸發 Alert) + """ + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_PROACTIVE_INSPECTOR: + return InspectionReport( + started_at=now_taipei().isoformat(), + finished_at=now_taipei().isoformat(), + ) + + shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE + started_at = now_taipei().isoformat() + report = InspectionReport( + started_at=started_at, + finished_at="", + shadow_mode=shadow_mode, + ) + + logger.info("proactive_inspection_started", shadow_mode=shadow_mode) + + # 三大感官並行執行(熔斷隔離) + tasks = [ + self._inspect_dynamic_baseline(report), + self._inspect_log_patterns(report), + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + for r in results: + if isinstance(r, Exception): + report.errors.append(str(r)) + + # 趨勢預測(依賴 baseline 結果,稍後執行) + try: + await self._inspect_trends(report) + except Exception as e: + report.errors.append(f"trend_inspect_error: {e}") + + # 背景觸發基線重訓(不等待結果) + asyncio.create_task(self._retrain_baselines_background()) + + report.finished_at = now_taipei().isoformat() + self._last_report = report # 快取供 PreDecisionInvestigator 8D 感官讀取 + + logger.info( + "proactive_inspection_finished", + baseline_anomalies=report.baseline_anomalies, + log_patterns_new=report.log_patterns_new, + trend_breaches=report.trend_breaches, + errors=len(report.errors), + shadow_mode=shadow_mode, + ) + + return report + + # ────────────────────────────────────────────────────────────────────────── + # 感官 1: Dynamic Baseline 異常偵測 + # ────────────────────────────────────────────────────────────────────────── + + async def _inspect_dynamic_baseline(self, report: InspectionReport) -> None: + """從 Prometheus 抓取當前值,與 Holt-Winters 基線比對。""" + from src.services.dynamic_baseline_service import get_dynamic_baseline_service + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE: + return + + svc = get_dynamic_baseline_service() + + for metric_cfg in MONITORED_METRICS: + metric_name = metric_cfg["name"] + try: + current = await self._fetch_current_value(metric_cfg["promql"]) + if current is None: + continue + + hour_of_day = now_taipei().hour + result = await svc.is_anomaly(metric_name, current, hour_of_day=hour_of_day) + + if result.is_anomaly: + dedup_key = f"dynamic:{metric_name}" + if await self._is_dedup(dedup_key): + continue + + severity = "critical" if result.deviation_sigma >= 5.0 else "warning" + alert = ProactiveAlert( + alert_type="dynamic_anomaly", + metric_name=metric_name, + severity=severity, + description=( + f"{metric_cfg['description']} 偏離基線 {result.deviation_sigma:.1f}σ " + f"(當前 {current:.4f},期望 {result.expected_mean:.4f})" + ), + current_value=current, + threshold=result.expected_mean + 3 * result.expected_std, + deviation_sigma=result.deviation_sigma, + shadow_mode=report.shadow_mode, + detected_at=now_taipei().isoformat(), + ) + report.alerts.append(alert) + report.baseline_anomalies += 1 + await self._mark_dedup(dedup_key) + + except Exception as e: + logger.warning("baseline_inspect_metric_failed", metric=metric_name, error=str(e)) + + # ────────────────────────────────────────────────────────────────────────── + # 感官 2: Log Anomaly 新 Pattern + # ────────────────────────────────────────────────────────────────────────── + + async def _inspect_log_patterns(self, report: InspectionReport) -> None: + """掃描 K8s Pod 日誌,偵測新 log pattern。""" + from src.services.log_anomaly_detector import get_log_anomaly_detector + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_LOG_ANOMALY: + return + + detector = get_log_anomaly_detector() + try: + events = await detector.process_pod_logs( + namespace=K8S_NAMESPACE, + tail_lines=200, + ) + for event in events: + dedup_key = f"log:{event.cluster_id}" + if await self._is_dedup(dedup_key): + continue + + alert = ProactiveAlert( + alert_type="log_pattern", + metric_name="log_anomaly", + severity="warning", + description=f"偵測到新 log pattern:{event.template[:200]}", + template=event.template, + shadow_mode=report.shadow_mode, + detected_at=event.detected_at, + source=f"k8s/{K8S_NAMESPACE}", + ) + report.alerts.append(alert) + report.log_patterns_new += 1 + await self._mark_dedup(dedup_key) + + except Exception as e: + logger.warning("log_inspect_failed", error=str(e)) + + # ────────────────────────────────────────────────────────────────────────── + # 感官 3: Trend Predictor 4h 預警 + # ────────────────────────────────────────────────────────────────────────── + + async def _inspect_trends(self, report: InspectionReport) -> None: + """對各 metric 做 4h 趨勢外推。""" + from src.services.trend_predictor import get_trend_predictor + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_TREND_PREDICTOR: + return + + predictor = get_trend_predictor() + + for metric_cfg in MONITORED_METRICS: + metric_name = metric_cfg["name"] + threshold = metric_cfg["threshold"] + try: + current = await self._fetch_current_value(metric_cfg["promql"]) + if current is None: + continue + + pred = await predictor.predict_breach(metric_name, current, threshold) + pred.metric_name = metric_name # TrendPredictor 內部未填 metric_name + + if pred.will_breach and pred.confidence in ("high", "medium"): + dedup_key = f"trend:{metric_name}" + if await self._is_dedup(dedup_key): + continue + + hours_str = f"{pred.breach_in_hours:.1f}h" if pred.breach_in_hours is not None else "已超越" + alert = ProactiveAlert( + alert_type="trend_breach", + metric_name=metric_name, + severity="warning", + description=( + f"{metric_cfg['description']} 趨勢預警:預計 {hours_str} 後超越閾值 " + f"(當前 {current:.4f},閾值 {threshold},斜率 {pred.slope_per_hour:+.6f}/h,R²={pred.r_squared:.2f})" + ), + current_value=current, + threshold=threshold, + predicted_breach_hours=pred.breach_in_hours, + shadow_mode=report.shadow_mode, + detected_at=now_taipei().isoformat(), + ) + report.alerts.append(alert) + report.trend_breaches += 1 + await self._mark_dedup(dedup_key) + + except Exception as e: + logger.warning("trend_inspect_metric_failed", metric=metric_name, error=str(e)) + + # ────────────────────────────────────────────────────────────────────────── + # 背景重訓 + # ────────────────────────────────────────────────────────────────────────── + + async def _retrain_baselines_background(self) -> None: + """背景重訓所有 Holt-Winters 基線(不阻塞巡檢)。""" + from src.services.dynamic_baseline_service import get_dynamic_baseline_service + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE: + return + + svc = get_dynamic_baseline_service() + for metric_cfg in MONITORED_METRICS: + try: + await svc.train_baseline( + metric_name=metric_cfg["name"], + promql=metric_cfg["promql"], + ) + except Exception as e: + logger.warning( + "baseline_retrain_background_failed", + metric=metric_cfg["name"], + error=str(e), + ) + + # ────────────────────────────────────────────────────────────────────────── + # 工具方法 + # ────────────────────────────────────────────────────────────────────────── + + async def _fetch_current_value(self, promql: str) -> float | None: + """從 Prometheus 抓取當前值(instant query)。""" + import httpx + from src.core.config import settings + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + f"{settings.PROMETHEUS_URL}/api/v1/query", + params={"query": promql}, + ) + resp.raise_for_status() + data = resp.json() + + results = data.get("data", {}).get("result", []) + if not results: + return None + + value_str = results[0].get("value", [None, None])[1] + if value_str is None or value_str == "NaN": + return None + return float(value_str) + + except Exception as e: + logger.warning("prometheus_instant_query_failed", promql=promql[:80], error=str(e)) + return None + + async def _is_dedup(self, key: str) -> bool: + """檢查是否在去重窗口內(30 分鐘)。""" + try: + from src.core.redis_client import get_redis + r = get_redis() + return bool(await r.exists(f"{DEDUP_KEY_PREFIX}{key}")) + except Exception: + return False + + async def _mark_dedup(self, key: str) -> None: + """標記去重,TTL = 30 分鐘。""" + try: + from src.core.redis_client import get_redis + r = get_redis() + await r.set(f"{DEDUP_KEY_PREFIX}{key}", "1", ex=DEDUP_TTL_SEC) + except Exception: + pass + + +# ───────────────────────────────────────────────────────────────────────────── +# Background Loop(供 main.py lifespan 呼叫) +# ───────────────────────────────────────────────────────────────────────────── + +async def run_proactive_inspector_loop() -> None: + """ + 永久迴圈:每 INSPECTOR_INTERVAL_SEC 秒執行一次巡檢。 + + 由 main.py lifespan 透過 asyncio.create_task() 啟動。 + Loop 內部有熔斷:任一輪失敗不會終止整個迴圈。 + """ + inspector = get_proactive_inspector() + logger.info("proactive_inspector_loop_started", interval_sec=INSPECTOR_INTERVAL_SEC) + + while True: + try: + await inspector.run_inspection() + except Exception as e: + logger.warning("proactive_inspector_loop_error", error=str(e)) + + await asyncio.sleep(INSPECTOR_INTERVAL_SEC) + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_inspector: ProactiveInspector | None = None + + +def get_proactive_inspector() -> ProactiveInspector: + global _inspector + if _inspector is None: + _inspector = ProactiveInspector() + return _inspector diff --git a/apps/api/src/services/trend_predictor.py b/apps/api/src/services/trend_predictor.py new file mode 100644 index 00000000..18063919 --- /dev/null +++ b/apps/api/src/services/trend_predictor.py @@ -0,0 +1,306 @@ +""" +AWOOOI AIOps Phase 4 — Trend Predictor(趨勢預測) +=================================================== +職責:numpy 線性回歸,預測 metric 在未來 N 小時是否超越警戒閾值 + +核心 API: + predict_breach(metric_name, current_value, threshold) -> TrendPrediction + +設計原則: +- 不使用 Prophet(500MB+ Stan 依賴),改用 numpy 線性回歸 +- 從 DynamicBaselineRecord 取歷史窗口資料,計算趨勢斜率 +- Shadow Mode:預測只記錄 logger.info,不觸發 Alert +- 熔斷:numpy 失敗 → fallback 到最近值外推 + +ADR-084: Phase 4 動態異常偵測源頭升級 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import structlog + +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ── 常數 ──────────────────────────────────────────────────────────────────── +FORECAST_HOURS = 4 # 預測未來 4 小時 +MIN_DATAPOINTS_FOR_TREND = 12 # 至少 12 個資料點才能做回歸(12h) +TREND_CONFIDENCE_THRESHOLD = 0.7 # R² > 0.7 → 趨勢可信 +REDIS_KEY_HISTORY = "trend:history:" # hash: metric_name → JSON list of (ts, value) +REDIS_TTL_HISTORY = 86400 * 2 # 保留 2 天歷史 +MAX_HISTORY_POINTS = 336 # 最多 336 個點(14天 × 24h) + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class TrendPrediction: + """趨勢預測結果""" + metric_name: str + current_value: float + threshold: float + forecast_hours: int # 預測時間窗口(小時) + predicted_value: float # 預測在 forecast_hours 後的值 + slope_per_hour: float # 每小時變化量(線性斜率) + r_squared: float # 線性回歸 R²(0-1,越高趨勢越清晰) + will_breach: bool # 是否預測超越 threshold + breach_in_hours: float | None # 預計幾小時後超越(None = 不會超越) + confidence: str # high / medium / low / insufficient_data + shadow_mode: bool = True + detected_at: str = "" + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class TrendPredictor: + """ + 趨勢預測服務 + + 工作流程: + 1. 從 Redis 取近期歷史資料點(sliding window) + 2. numpy 線性回歸計算趨勢斜率 + 3. 外推預測 4h 後的值 + 4. 判斷是否在 4h 內超越警戒閾值 + """ + + async def predict_breach( + self, + metric_name: str, + current_value: float, + threshold: float, + forecast_hours: int = FORECAST_HOURS, + ) -> TrendPrediction: + """ + 預測 metric 是否在 forecast_hours 內超越 threshold。 + + Args: + metric_name: 基線識別名(需與 DynamicBaselineService 一致) + current_value: 當前觀測值 + threshold: 警戒閾值(超越 = 預警) + forecast_hours: 預測窗口(預設 4h) + + Returns: + TrendPrediction(Shadow Mode 時只記錄不觸發) + """ + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_TREND_PREDICTOR: + return self._no_data_result(metric_name, current_value, threshold, forecast_hours) + + shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE + detected_at = now_taipei().isoformat() + + # 推送當前值到歷史 + await self._append_history(metric_name, current_value) + + # 取歷史資料 + history = await self._get_history(metric_name) + if len(history) < MIN_DATAPOINTS_FOR_TREND: + return TrendPrediction( + metric_name=metric_name, + current_value=current_value, + threshold=threshold, + forecast_hours=forecast_hours, + predicted_value=current_value, + slope_per_hour=0.0, + r_squared=0.0, + will_breach=current_value >= threshold, + breach_in_hours=0.0 if current_value >= threshold else None, + confidence="insufficient_data", + shadow_mode=shadow_mode, + detected_at=detected_at, + ) + + prediction = self._linear_regression_predict( + history=history, + current_value=current_value, + threshold=threshold, + forecast_hours=forecast_hours, + ) + prediction.shadow_mode = shadow_mode + prediction.detected_at = detected_at + + if prediction.will_breach: + logger.info( + "trend_breach_predicted", + metric=metric_name, + current=current_value, + predicted=prediction.predicted_value, + threshold=threshold, + breach_in_hours=prediction.breach_in_hours, + slope=prediction.slope_per_hour, + r2=prediction.r_squared, + shadow_mode=shadow_mode, + ) + + return prediction + + async def push_datapoint( + self, + metric_name: str, + value: float, + ) -> None: + """主動推送資料點(供 ProactiveInspector 呼叫)。""" + await self._append_history(metric_name, value) + + # ────────────────────────────────────────────────────────────────────────── + # Private Helpers + # ────────────────────────────────────────────────────────────────────────── + + def _linear_regression_predict( + self, + history: list[tuple[float, float]], # (timestamp, value) + current_value: float, + threshold: float, + forecast_hours: int, + ) -> TrendPrediction: + """ + numpy 線性回歸:y = slope * x + intercept + x = 相對小時數(0 到 N),y = metric 值 + """ + metric_name = "" # 呼叫方 fillback + + try: + import numpy as np + + times = np.array([h[0] for h in history], dtype=float) + values = np.array([h[1] for h in history], dtype=float) + + # 相對化時間(小時為單位) + times_rel = (times - times[0]) / 3600.0 + + # 線性回歸:polyfit degree=1 + coeffs = np.polyfit(times_rel, values, 1) + slope = float(coeffs[0]) # 每小時斜率 + intercept = float(coeffs[1]) + + # R² 計算 + fitted = np.polyval(coeffs, times_rel) + ss_res = float(np.sum((values - fitted) ** 2)) + ss_tot = float(np.sum((values - np.mean(values)) ** 2)) + r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0 + + # 預測 forecast_hours 後的值 + current_time_rel = (now_taipei().timestamp() - times[0]) / 3600.0 + predicted_value = slope * (current_time_rel + forecast_hours) + intercept + + # 信心度 + if r2 >= TREND_CONFIDENCE_THRESHOLD: + confidence = "high" + elif r2 >= 0.4: + confidence = "medium" + else: + confidence = "low" + + # 是否超越閾值 + will_breach = predicted_value >= threshold + breach_in_hours: float | None = None + + if will_breach and slope > 0 and current_value < threshold: + # 計算幾小時後超越:current_value + slope * h = threshold + breach_in_hours = round((threshold - current_value) / slope, 2) + breach_in_hours = min(breach_in_hours, float(forecast_hours)) + elif current_value >= threshold: + breach_in_hours = 0.0 + + return TrendPrediction( + metric_name=metric_name, + current_value=current_value, + threshold=threshold, + forecast_hours=forecast_hours, + predicted_value=round(predicted_value, 4), + slope_per_hour=round(slope, 6), + r_squared=round(r2, 4), + will_breach=will_breach, + breach_in_hours=breach_in_hours, + confidence=confidence, + ) + + except Exception as e: + logger.warning("trend_regression_failed", error=str(e)) + # Fallback:最近值外推 + return TrendPrediction( + metric_name=metric_name, + current_value=current_value, + threshold=threshold, + forecast_hours=forecast_hours, + predicted_value=current_value, + slope_per_hour=0.0, + r_squared=0.0, + will_breach=current_value >= threshold, + breach_in_hours=0.0 if current_value >= threshold else None, + confidence="low", + ) + + async def _append_history(self, metric_name: str, value: float) -> None: + """推送資料點到 Redis 滑動視窗(最舊的被 trim 掉)。""" + try: + import json + from src.core.redis_client import get_redis + r = get_redis() + key = f"{REDIS_KEY_HISTORY}{metric_name}" + point = json.dumps([now_taipei().timestamp(), value]) + await r.rpush(key, point) + await r.ltrim(key, -MAX_HISTORY_POINTS, -1) # 保留最新 N 個點 + await r.expire(key, REDIS_TTL_HISTORY) + except Exception as e: + logger.warning("trend_history_append_failed", metric=metric_name, error=str(e)) + + async def _get_history(self, metric_name: str) -> list[tuple[float, float]]: + """從 Redis 取歷史資料點。""" + try: + import json + from src.core.redis_client import get_redis + r = get_redis() + key = f"{REDIS_KEY_HISTORY}{metric_name}" + raw = await r.lrange(key, 0, -1) + return [tuple(json.loads(item)) for item in raw] + except Exception as e: + logger.warning("trend_history_get_failed", metric=metric_name, error=str(e)) + return [] + + def _no_data_result( + self, + metric_name: str, + current_value: float, + threshold: float, + forecast_hours: int, + ) -> TrendPrediction: + """Feature flag 關閉時的空結果。""" + return TrendPrediction( + metric_name=metric_name, + current_value=current_value, + threshold=threshold, + forecast_hours=forecast_hours, + predicted_value=current_value, + slope_per_hour=0.0, + r_squared=0.0, + will_breach=False, + breach_in_hours=None, + confidence="insufficient_data", + shadow_mode=True, + detected_at=now_taipei().isoformat(), + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_predictor: TrendPredictor | None = None + + +def get_trend_predictor() -> TrendPredictor: + global _predictor + if _predictor is None: + _predictor = TrendPredictor() + return _predictor diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index 5e0f1075..7104d108 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -72,7 +72,91 @@ ### 下一步 -- Phase 2: 5 Agent 骨架 + Orchestrator + AgentSession DB +- ~~Phase 2: 5 Agent 骨架 + Orchestrator + AgentSession DB~~ → **✅ 完成(commit d316221)** + +--- + +## 📍 2026-04-15 深夜 — AI 自主化飛輪 Phase 3 學習閉環重建完成 + +### 成品(ADR-083,commit 7da64ea → Gitea) + +| 成品 | 路徑 | 說明 | +|------|------|------| +| fire-and-forget 修復 | `services/approval_execution.py` | `create_task` → `await asyncio.wait_for(timeout=30)` × 2 處(成功 + 失敗路徑) | +| matched_playbook_id 欄位 | `models/approval.py` | `ApprovalRequestBase` 新增,auto_execute 路徑填充 | +| _auto_execute 傳遞 | `services/decision_manager.py` | `token.proposal_data.get("playbook_id")` → `ApprovalRequest.matched_playbook_id` | +| 雙路徑查找 | `services/learning_service.py` | `matched_playbook_id` + `metadata` fallback | +| trust_score 欄位 | `models/playbook.py` | 新增 `trust_score: float = 0.3`(EWMA 動態信任度) | +| 2x EWMA 更新 | `repositories/playbook_repository.py` | 成功 α=0.1、失敗 α=0.2,trust < 0.1 → 警告 | +| Evolver Agent | `services/playbook_evolver.py` | 低信任封存 + 休眠封存 + Jaccard 相似合併(新建) | +| ADR-083 | `docs/adr/ADR-083-learning-loop-reconstruction.md` | 學習閉環重建決策紀錄 | +| MASTER §8 | `docs/superpowers/specs/2026-04-15-MASTER-ai-autonomous-flywheel-v2.md` | Phase 3 完工追加 | + +### 根因修復對照 + +| 根因 | 修復前 | 修復後 | +|------|--------|--------| +| 學習觸發率 | 0%(GC 隨時取消) | ≈100%(await + 30s 熔斷) | +| Playbook EWMA | 永遠停在 0.3 | 每次執行後動態更新 | +| 負向懲罰 | 無 | 失敗 2x 衰減(α=0.2) | +| 知識庫管理 | 無退場機制 | Evolver 自動封存低信任 | + +### 架構狀態 + +``` +AIOPS_P3_ENABLED=False(預設)— 骨架就位,等統帥批准後開啟 +AIOPS_P3_EVOLVER_ENABLED=False — Evolver 定時 job 等統帥批准 +學習路徑:ApprovalRequest.matched_playbook_id → learning_service → playbook_repository.update_stats(EWMA) +``` + +### 下一步 +- Gate 3 架構審查(首席架構師 Review Phase 3) +- 開啟 `AIOPS_P3_ENABLED=True` 後 E2E 驗證 +- Phase 4 異常偵測升級(依賴 Phase 3 穩定) + +--- + +## 📍 2026-04-15 深夜 — AI 自主化飛輪 Phase 2 多 Agent 協作骨架上線 + +### 成品(ADR-082,commit d316221) + +| 成品 | 路徑 | 說明 | +|------|------|------| +| Protocol 型別系統 | `apps/api/src/agents/protocol.py` | 5 Agent 共用資料契約(dataclass,不可變) | +| DiagnosticianAgent | `apps/api/src/agents/diagnostician_agent.py` | RCA 偵探,confidence < 0.4 → ABSTAIN | +| SolverAgent | `apps/api/src/agents/solver_agent.py` | 修復軍師,blast_radius 評分 + 降級 mock | +| ReviewerAgent | `apps/api/src/agents/reviewer_agent.py` | 安全審查,HARD_RULES 靜態 regex + blast_radius 閾值 | +| CriticAgent | `apps/api/src/agents/critic_agent.py` | 刻意唱反調,強制 3 問批判,critical → REJECT | +| CoordinatorAgent | `apps/api/src/agents/coordinator_agent.py` | 純規則聚合(無 LLM),6 級決策閘 | +| AgentOrchestrator | `apps/api/src/services/agent_orchestrator.py` | 30s 全局超時,Reviewer‖Critic 並行,DB + Redis Streams | +| DecisionManager 接線 | `apps/api/src/services/decision_manager.py` | `is_phase_enabled(2)` gate + `_package_to_proposal_data` 橋接 | +| AgentSession DB 表 | `apps/api/src/db/models.py` | Immutable Event Sourcing,4 複合 index | +| ADR-082 | `docs/adr/ADR-082-multi-agent-collaboration.md` | 架構決策紀錄 | + +### Gate 2 修復(7 項) + +| 嚴重度 | 問題 | 修復位置 | +|--------|------|---------| +| CRITICAL | DELETE FROM regex lookahead 位置錯誤,攔到安全語句、放行危險語句 | reviewer_agent.py:58 | +| CRITICAL | REQUEST_REVISION 可抵達 auto-execute(Solver 未修訂不可執行) | coordinator_agent.py | +| IMPORTANT | `_extract_json` flat regex 不支援巢狀 JSON,所有 Agent LLM 解析靜默失敗 | base.py:167 | +| IMPORTANT | `all_degraded` 遺漏 `verdict.degraded`,Reviewer 熔斷不被感知 | coordinator_agent.py | +| IMPORTANT | Solver ABSTAIN guard 放行降級假設(confidence=0.2 觸發 LLM) | solver_agent.py:72 | +| IMPORTANT | `dataclasses.asdict()` 保留 Enum 實例,所有 DB 審計寫入靜默失敗 | agent_orchestrator.py | +| IMPORTANT | P2 gate 直讀屬性繞過父 Phase 守衛(應用 `is_phase_enabled(2)`) | decision_manager.py | + +### 架構狀態 + +``` +AIOPS_P2_ENABLED=False(預設)— 骨架就位,等統帥批准後開啟 +執行路徑:EvidenceSnapshot → Diagnostician → Solver → (Reviewer‖Critic) → Coordinator → DecisionPackage +全局超時:30s,單 Agent:5s,降級後繼續(不阻塞 Coordinator) +``` + +### 下一步 + +- Phase 2 測試:`test_agent_protocol.py` / `test_agent_orchestrator.py` / 各 Agent 單元測試 +- 或 統帥指示進入 Phase 3(學習閉環重建) ---