feat(Phase 4): 主動巡檢 + 趨勢預測 + 8D 感官升級 全部完成
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 12m32s

## Phase 4 完整交付(ADR-084)

### 新增服務
- trend_predictor.py: numpy 線性回歸,4h 閾值突破預警,R² 信心評分
- proactive_inspector.py: 每 5 分鐘主動巡檢協調器
  - DynamicBaselineService(3σ 偏離)
  - LogAnomalyDetector(新 Drain3 pattern)
  - TrendPredictor(斜率外推 4h 預測)
  - Shadow Mode + 30 分鐘去重 + Holt-Winters 背景重訓

### 8D 感官升級(EvidenceSnapshot Phase 4 增強)
- PreDecisionInvestigator._collect_phase4_anomalies(): 決策前讀取
  ProactiveInspector 最近巡檢快取 + LogAnomalyDetector 新 pattern
- EvidenceSnapshot.anomaly_context: 新欄位,Phase 4 動態異常上下文
- DiagnosticianAgent._build_prompt(): prompt 包含 anomaly_context,
  LLM RCA 可參考動態基線偏差與趨勢預警

### 資料庫遷移
- incident_evidence: ADD COLUMN anomaly_context JSONB(冪等)

### main.py
- 啟動 run_proactive_inspector_loop() asyncio task

2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 全部完成

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 15:47:05 +08:00
parent 952c10955b
commit 14a02263ae
9 changed files with 965 additions and 3 deletions

View File

@@ -104,7 +104,10 @@ class DiagnosticianAgent(BaseAgent):
async def _analyze(self, snapshot: "EvidenceSnapshot") -> DiagnosisReport:
"""核心 LLM 分析邏輯。"""
prompt = self._build_prompt({"evidence_summary": snapshot.evidence_summary or ""})
prompt = self._build_prompt({
"evidence_summary": snapshot.evidence_summary or "",
"anomaly_context": snapshot.anomaly_context,
})
from src.services.openclaw import get_openclaw
openclaw = get_openclaw()
@@ -129,6 +132,19 @@ class DiagnosticianAgent(BaseAgent):
def _build_prompt(self, context: dict[str, Any]) -> str:
evidence = context.get("evidence_summary", "(無感官情報)")
anomaly_context = context.get("anomaly_context")
# Phase 4 ADR-084: 動態異常感官區塊(有資料才附加,避免空白雜訊)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
anomaly_section = ""
if anomaly_context:
import json as _json
anomaly_section = f"""
---
Phase 4 動態異常偵測AI 主動巡檢結果,可作為高信心佐證):
{_json.dumps(anomaly_context, ensure_ascii=False, indent=2)}
---"""
return f"""你是 AWOOOI SRE 系統的偵探 Agent專職根因分析Root Cause Analysis
你的唯一工作:根據以下感官情報,提出 2-3 個根因假設hypotheses
@@ -143,7 +159,7 @@ class DiagnosticianAgent(BaseAgent):
---
感官情報:
{evidence}
---
---{anomaly_section}
以 JSON 回覆(不要加任何解釋):
{{

View File

@@ -211,6 +211,15 @@ async def init_db() -> None:
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 感官升級
# ADR-084: EvidenceSnapshot 加入 Phase 4 動態異常上下文anomaly_context
await conn.execute(
text("""
ALTER TABLE incident_evidence
ADD COLUMN IF NOT EXISTS anomaly_context JSONB;
""")
)
async def close_db() -> None:
"""

View File

@@ -791,6 +791,12 @@ class IncidentEvidence(Base):
dependency_topology: Mapped[dict | None] = mapped_column(
JSON, nullable=True, comment="D8: Istio/Service Mesh 上下游 latency/error rate"
)
# Phase 4 ADR-084: 動態異常偵測增強感官DynamicBaseline + LogAnomaly + TrendPredictor
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
anomaly_context: Mapped[dict | None] = mapped_column(
JSON, nullable=True,
comment="Phase 4 動態異常上下文baseline_anomalies / log_patterns / trend_breaches"
)
# 感官品質指標
mcp_health: Mapped[dict] = mapped_column(

View File

@@ -336,6 +336,17 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("daily_report_loop_schedule_failed", error=str(e))
# Phase 4 ADR-084: 主動巡檢每 5 分鐘執行一次
# 協調 DynamicBaselineService + LogAnomalyDetector + TrendPredictor
# Shadow Mode 控制AIOPS_P4_SHADOW_MODE=True 時只記錄,不觸發 Alert
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
try:
from src.services.proactive_inspector import run_proactive_inspector_loop
asyncio.create_task(run_proactive_inspector_loop())
logger.info("proactive_inspector_loop_scheduled", interval_sec=300)
except Exception as e:
logger.warning("proactive_inspector_schedule_failed", error=str(e))
yield
# Shutdown

View File

@@ -86,6 +86,9 @@ class EvidenceSnapshot:
historical_context: str | None = None # D6
peer_health: dict[str, Any] | None = None # D7
dependency_topology: dict[str, Any] | None = None # D8
# Phase 4 ADR-084: 動態異常感官DynamicBaseline + LogAnomaly + TrendPredictor
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
anomaly_context: dict[str, Any] | None = None # Phase 4 動態異常上下文
# 感官品質
mcp_health: dict[str, bool] = field(default_factory=dict)
@@ -150,6 +153,8 @@ class EvidenceSnapshot:
parts.append(f"[同級副本健康度] {self.peer_health}")
if self.dependency_topology:
parts.append(f"[依賴拓撲] {self.dependency_topology}")
if self.anomaly_context:
parts.append(f"[動態異常偵測]\n{self.anomaly_context}")
# 感官品質報告
failed_tools = [t for t, ok in self.mcp_health.items() if not ok]
@@ -194,6 +199,7 @@ class EvidenceSnapshot:
historical_context=self.historical_context,
peer_health=self.peer_health,
dependency_topology=self.dependency_topology,
anomaly_context=self.anomaly_context,
mcp_health=self.mcp_health,
collection_duration_ms=self.collection_duration_ms,
sensors_attempted=self.sensors_attempted,
@@ -305,6 +311,7 @@ async def get_latest_snapshot(incident_id: str) -> EvidenceSnapshot | None:
historical_context=row.historical_context,
peer_health=row.peer_health,
dependency_topology=row.dependency_topology,
anomaly_context=row.anomaly_context,
mcp_health=row.mcp_health or {},
collection_duration_ms=row.collection_duration_ms,
sensors_attempted=row.sensors_attempted or 0,

View File

@@ -122,6 +122,17 @@ class PreDecisionInvestigator:
# 4. 記錄耗時
snapshot.collection_duration_ms = int(time.monotonic() * 1000) - start_ms
# 4.5 Phase 4 8D 感官增強:填入動態異常上下文(非阻塞,失敗不影響主路徑)
try:
await asyncio.wait_for(
self._collect_phase4_anomalies(snapshot),
timeout=2.0, # Phase 4 感官最多等 2s不能拖慢主路徑
)
except asyncio.TimeoutError:
logger.warning("phase4_anomaly_collect_timeout", incident_id=incident_id)
except Exception:
logger.exception("phase4_anomaly_collect_error", incident_id=incident_id)
# 5. 組裝 summary
snapshot.evidence_summary = snapshot.build_summary()
@@ -144,6 +155,75 @@ class PreDecisionInvestigator:
)
return snapshot
async def _collect_phase4_anomalies(self, snapshot: EvidenceSnapshot) -> None:
"""
Phase 4 8D 感官增強:從 ProactiveInspector 快取 + LogAnomalyDetector
讀取動態異常上下文,填入 snapshot.anomaly_context。
設計原則:
- 只讀快取,不觸發新的 Prometheus 查詢(避免延遲)
- 失敗靜默降級(外層已包 try/except + timeout
- Phase 4 Shadow Mode 時資料仍填入(供 LLM 參考,不觸發 Alert
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
"""
from src.services.proactive_inspector import get_proactive_inspector
from src.services.log_anomaly_detector import get_log_anomaly_detector
context: dict[str, Any] = {}
# 1. 讀取最近一次巡檢報告ProactiveInspector 每 5 分鐘更新一次)
inspector = get_proactive_inspector()
last_report = inspector.get_last_report()
if last_report is not None:
context["last_inspection_at"] = last_report.finished_at
context["shadow_mode"] = last_report.shadow_mode
if last_report.baseline_anomalies > 0:
context["baseline_anomalies"] = [
{
"metric": a.metric_name,
"severity": a.severity,
"description": a.description,
"deviation_sigma": a.deviation_sigma,
}
for a in last_report.alerts
if a.alert_type == "dynamic_anomaly"
]
if last_report.trend_breaches > 0:
context["trend_breaches"] = [
{
"metric": a.metric_name,
"description": a.description,
"breach_in_hours": a.predicted_breach_hours,
}
for a in last_report.alerts
if a.alert_type == "trend_breach"
]
# 2. 讀取最近新 log pattern最多 5 個)
detector = get_log_anomaly_detector()
recent_patterns = await detector.get_recent_new_patterns(limit=5)
if recent_patterns:
context["recent_log_patterns"] = [
{
"template": p.get("template", "")[:200],
"cluster_id": p.get("cluster_id", ""),
"source": p.get("source", ""),
}
for p in recent_patterns
]
if context:
snapshot.anomaly_context = context
logger.debug(
"phase4_anomaly_context_collected",
has_baseline=bool(context.get("baseline_anomalies")),
has_trends=bool(context.get("trend_breaches")),
log_patterns=len(context.get("recent_log_patterns", [])),
)
async def _collect_all(
self,
snapshot: EvidenceSnapshot,

View File

@@ -0,0 +1,443 @@
"""
AWOOOI AIOps Phase 4 — Proactive Inspector主動巡檢
======================================================
職責:每 5 分鐘主動掃描異常,聚合動態偵測信號,產生 ProactiveAlert
協調三大 Phase 4 感官:
1. DynamicBaselineService — 即時異常偵測3σ 偏離)
2. LogAnomalyDetector — K8s Pod 新 log pattern
3. TrendPredictor — 4h 內閾值突破預警
設計原則:
- Shadow ModeAIOPS_P4_SHADOW_MODE=True所有偵測只記錄不觸發 Alert
- 熔斷:任一子感官失敗 → 繼續其他感官,不中斷整個巡檢
- 去重:同一 metric/cluster 在 30 分鐘內只上報一次Redis TTL
- 訓練調度:每次巡檢順便觸發 Holt-Winters 重訓async背景執行
ADR-084: Phase 4 動態異常偵測源頭升級
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import Any
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ── 常數 ────────────────────────────────────────────────────────────────────
INSPECTOR_INTERVAL_SEC = 300 # 每 5 分鐘巡檢一次
DEDUP_TTL_SEC = 1800 # 同一異常 30 分鐘內去重
DEDUP_KEY_PREFIX = "proactive:dedup:"
K8S_NAMESPACE = "awoooi-prod"
# 需要監控的 metricsPrometheus PromQL + 警戒閾值)
MONITORED_METRICS: list[dict[str, Any]] = [
{
"name": "http_error_rate",
"promql": 'sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))',
"threshold": 0.05, # > 5% error rate = 警戒
"description": "HTTP 5xx 錯誤率",
},
{
"name": "cpu_usage_awoooi_api",
"promql": 'avg(rate(container_cpu_usage_seconds_total{namespace="awoooi-prod",container="awoooi-api"}[5m]))',
"threshold": 0.85, # > 85% CPU
"description": "API 容器 CPU 使用率",
},
{
"name": "memory_usage_awoooi_api",
"promql": 'avg(container_memory_usage_bytes{namespace="awoooi-prod",container="awoooi-api"}) / avg(container_spec_memory_limit_bytes{namespace="awoooi-prod",container="awoooi-api"})',
"threshold": 0.90, # > 90% memory
"description": "API 容器記憶體使用率",
},
{
"name": "pod_restart_rate",
"promql": 'increase(kube_pod_container_status_restarts_total{namespace="awoooi-prod"}[15m])',
"threshold": 2.0, # 15 分鐘內 > 2 次重啟
"description": "Pod 重啟次數15分鐘窗口",
},
{
"name": "db_connection_pool",
"promql": 'pg_stat_activity_count{datname="awoooi"}',
"threshold": 80.0, # > 80 個 DB 連線
"description": "PostgreSQL 連線數",
},
]
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ProactiveAlert:
"""主動巡檢偵測到的預警事件"""
alert_type: str # "dynamic_anomaly" / "log_pattern" / "trend_breach"
metric_name: str
severity: str # "warning" / "critical"
description: str
current_value: float = 0.0
threshold: float = 0.0
deviation_sigma: float = 0.0
template: str = "" # log patternlog_pattern 類型)
predicted_breach_hours: float | None = None
shadow_mode: bool = True
detected_at: str = ""
source: str = "proactive_inspector"
@dataclass
class InspectionReport:
"""一次巡檢週期的完整報告"""
started_at: str
finished_at: str
alerts: list[ProactiveAlert] = field(default_factory=list)
baseline_anomalies: int = 0
log_patterns_new: int = 0
trend_breaches: int = 0
errors: list[str] = field(default_factory=list)
shadow_mode: bool = True
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class ProactiveInspector:
"""
主動巡檢協調器
每 5 分鐘執行一輪:
1. Prometheus metrics → DynamicBaselineService 異常偵測
2. K8s Pod logs → LogAnomalyDetector 新 pattern
3. 趨勢資料點 → TrendPredictor 4h 預警
4. 背景觸發 Holt-Winters 重訓(每次巡檢)
"""
def __init__(self) -> None:
# Phase 4 ADR-084: 快取最後一次巡檢報告,供 PreDecisionInvestigator 8D 感官使用
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 升級
self._last_report: InspectionReport | None = None
def get_last_report(self) -> InspectionReport | None:
"""取得最近一次巡檢報告PreDecisionInvestigator 8D 感官用)。"""
return self._last_report
async def run_inspection(self) -> InspectionReport:
"""
執行一次完整巡檢。
Returns:
InspectionReportShadow Mode 時只記錄,不觸發 Alert
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_PROACTIVE_INSPECTOR:
return InspectionReport(
started_at=now_taipei().isoformat(),
finished_at=now_taipei().isoformat(),
)
shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE
started_at = now_taipei().isoformat()
report = InspectionReport(
started_at=started_at,
finished_at="",
shadow_mode=shadow_mode,
)
logger.info("proactive_inspection_started", shadow_mode=shadow_mode)
# 三大感官並行執行(熔斷隔離)
tasks = [
self._inspect_dynamic_baseline(report),
self._inspect_log_patterns(report),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, Exception):
report.errors.append(str(r))
# 趨勢預測(依賴 baseline 結果,稍後執行)
try:
await self._inspect_trends(report)
except Exception as e:
report.errors.append(f"trend_inspect_error: {e}")
# 背景觸發基線重訓(不等待結果)
asyncio.create_task(self._retrain_baselines_background())
report.finished_at = now_taipei().isoformat()
self._last_report = report # 快取供 PreDecisionInvestigator 8D 感官讀取
logger.info(
"proactive_inspection_finished",
baseline_anomalies=report.baseline_anomalies,
log_patterns_new=report.log_patterns_new,
trend_breaches=report.trend_breaches,
errors=len(report.errors),
shadow_mode=shadow_mode,
)
return report
# ──────────────────────────────────────────────────────────────────────────
# 感官 1: Dynamic Baseline 異常偵測
# ──────────────────────────────────────────────────────────────────────────
async def _inspect_dynamic_baseline(self, report: InspectionReport) -> None:
"""從 Prometheus 抓取當前值,與 Holt-Winters 基線比對。"""
from src.services.dynamic_baseline_service import get_dynamic_baseline_service
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE:
return
svc = get_dynamic_baseline_service()
for metric_cfg in MONITORED_METRICS:
metric_name = metric_cfg["name"]
try:
current = await self._fetch_current_value(metric_cfg["promql"])
if current is None:
continue
hour_of_day = now_taipei().hour
result = await svc.is_anomaly(metric_name, current, hour_of_day=hour_of_day)
if result.is_anomaly:
dedup_key = f"dynamic:{metric_name}"
if await self._is_dedup(dedup_key):
continue
severity = "critical" if result.deviation_sigma >= 5.0 else "warning"
alert = ProactiveAlert(
alert_type="dynamic_anomaly",
metric_name=metric_name,
severity=severity,
description=(
f"{metric_cfg['description']} 偏離基線 {result.deviation_sigma:.1f}σ "
f"(當前 {current:.4f},期望 {result.expected_mean:.4f}"
),
current_value=current,
threshold=result.expected_mean + 3 * result.expected_std,
deviation_sigma=result.deviation_sigma,
shadow_mode=report.shadow_mode,
detected_at=now_taipei().isoformat(),
)
report.alerts.append(alert)
report.baseline_anomalies += 1
await self._mark_dedup(dedup_key)
except Exception as e:
logger.warning("baseline_inspect_metric_failed", metric=metric_name, error=str(e))
# ──────────────────────────────────────────────────────────────────────────
# 感官 2: Log Anomaly 新 Pattern
# ──────────────────────────────────────────────────────────────────────────
async def _inspect_log_patterns(self, report: InspectionReport) -> None:
"""掃描 K8s Pod 日誌,偵測新 log pattern。"""
from src.services.log_anomaly_detector import get_log_anomaly_detector
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_LOG_ANOMALY:
return
detector = get_log_anomaly_detector()
try:
events = await detector.process_pod_logs(
namespace=K8S_NAMESPACE,
tail_lines=200,
)
for event in events:
dedup_key = f"log:{event.cluster_id}"
if await self._is_dedup(dedup_key):
continue
alert = ProactiveAlert(
alert_type="log_pattern",
metric_name="log_anomaly",
severity="warning",
description=f"偵測到新 log pattern{event.template[:200]}",
template=event.template,
shadow_mode=report.shadow_mode,
detected_at=event.detected_at,
source=f"k8s/{K8S_NAMESPACE}",
)
report.alerts.append(alert)
report.log_patterns_new += 1
await self._mark_dedup(dedup_key)
except Exception as e:
logger.warning("log_inspect_failed", error=str(e))
# ──────────────────────────────────────────────────────────────────────────
# 感官 3: Trend Predictor 4h 預警
# ──────────────────────────────────────────────────────────────────────────
async def _inspect_trends(self, report: InspectionReport) -> None:
"""對各 metric 做 4h 趨勢外推。"""
from src.services.trend_predictor import get_trend_predictor
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_TREND_PREDICTOR:
return
predictor = get_trend_predictor()
for metric_cfg in MONITORED_METRICS:
metric_name = metric_cfg["name"]
threshold = metric_cfg["threshold"]
try:
current = await self._fetch_current_value(metric_cfg["promql"])
if current is None:
continue
pred = await predictor.predict_breach(metric_name, current, threshold)
pred.metric_name = metric_name # TrendPredictor 內部未填 metric_name
if pred.will_breach and pred.confidence in ("high", "medium"):
dedup_key = f"trend:{metric_name}"
if await self._is_dedup(dedup_key):
continue
hours_str = f"{pred.breach_in_hours:.1f}h" if pred.breach_in_hours is not None else "已超越"
alert = ProactiveAlert(
alert_type="trend_breach",
metric_name=metric_name,
severity="warning",
description=(
f"{metric_cfg['description']} 趨勢預警:預計 {hours_str} 後超越閾值 "
f"(當前 {current:.4f},閾值 {threshold},斜率 {pred.slope_per_hour:+.6f}/hR²={pred.r_squared:.2f}"
),
current_value=current,
threshold=threshold,
predicted_breach_hours=pred.breach_in_hours,
shadow_mode=report.shadow_mode,
detected_at=now_taipei().isoformat(),
)
report.alerts.append(alert)
report.trend_breaches += 1
await self._mark_dedup(dedup_key)
except Exception as e:
logger.warning("trend_inspect_metric_failed", metric=metric_name, error=str(e))
# ──────────────────────────────────────────────────────────────────────────
# 背景重訓
# ──────────────────────────────────────────────────────────────────────────
async def _retrain_baselines_background(self) -> None:
"""背景重訓所有 Holt-Winters 基線(不阻塞巡檢)。"""
from src.services.dynamic_baseline_service import get_dynamic_baseline_service
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_DYNAMIC_BASELINE:
return
svc = get_dynamic_baseline_service()
for metric_cfg in MONITORED_METRICS:
try:
await svc.train_baseline(
metric_name=metric_cfg["name"],
promql=metric_cfg["promql"],
)
except Exception as e:
logger.warning(
"baseline_retrain_background_failed",
metric=metric_cfg["name"],
error=str(e),
)
# ──────────────────────────────────────────────────────────────────────────
# 工具方法
# ──────────────────────────────────────────────────────────────────────────
async def _fetch_current_value(self, promql: str) -> float | None:
"""從 Prometheus 抓取當前值instant query"""
import httpx
from src.core.config import settings
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{settings.PROMETHEUS_URL}/api/v1/query",
params={"query": promql},
)
resp.raise_for_status()
data = resp.json()
results = data.get("data", {}).get("result", [])
if not results:
return None
value_str = results[0].get("value", [None, None])[1]
if value_str is None or value_str == "NaN":
return None
return float(value_str)
except Exception as e:
logger.warning("prometheus_instant_query_failed", promql=promql[:80], error=str(e))
return None
async def _is_dedup(self, key: str) -> bool:
"""檢查是否在去重窗口內30 分鐘)。"""
try:
from src.core.redis_client import get_redis
r = get_redis()
return bool(await r.exists(f"{DEDUP_KEY_PREFIX}{key}"))
except Exception:
return False
async def _mark_dedup(self, key: str) -> None:
"""標記去重TTL = 30 分鐘。"""
try:
from src.core.redis_client import get_redis
r = get_redis()
await r.set(f"{DEDUP_KEY_PREFIX}{key}", "1", ex=DEDUP_TTL_SEC)
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# Background Loop供 main.py lifespan 呼叫)
# ─────────────────────────────────────────────────────────────────────────────
async def run_proactive_inspector_loop() -> None:
"""
永久迴圈:每 INSPECTOR_INTERVAL_SEC 秒執行一次巡檢。
由 main.py lifespan 透過 asyncio.create_task() 啟動。
Loop 內部有熔斷:任一輪失敗不會終止整個迴圈。
"""
inspector = get_proactive_inspector()
logger.info("proactive_inspector_loop_started", interval_sec=INSPECTOR_INTERVAL_SEC)
while True:
try:
await inspector.run_inspection()
except Exception as e:
logger.warning("proactive_inspector_loop_error", error=str(e))
await asyncio.sleep(INSPECTOR_INTERVAL_SEC)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_inspector: ProactiveInspector | None = None
def get_proactive_inspector() -> ProactiveInspector:
global _inspector
if _inspector is None:
_inspector = ProactiveInspector()
return _inspector

View File

@@ -0,0 +1,306 @@
"""
AWOOOI AIOps Phase 4 — Trend Predictor趨勢預測
===================================================
職責numpy 線性回歸,預測 metric 在未來 N 小時是否超越警戒閾值
核心 API
predict_breach(metric_name, current_value, threshold) -> TrendPrediction
設計原則:
- 不使用 Prophet500MB+ Stan 依賴),改用 numpy 線性回歸
- 從 DynamicBaselineRecord 取歷史窗口資料,計算趨勢斜率
- Shadow Mode預測只記錄 logger.info不觸發 Alert
- 熔斷numpy 失敗 → fallback 到最近值外推
ADR-084: Phase 4 動態異常偵測源頭升級
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ── 常數 ────────────────────────────────────────────────────────────────────
FORECAST_HOURS = 4 # 預測未來 4 小時
MIN_DATAPOINTS_FOR_TREND = 12 # 至少 12 個資料點才能做回歸12h
TREND_CONFIDENCE_THRESHOLD = 0.7 # R² > 0.7 → 趨勢可信
REDIS_KEY_HISTORY = "trend:history:" # hash: metric_name → JSON list of (ts, value)
REDIS_TTL_HISTORY = 86400 * 2 # 保留 2 天歷史
MAX_HISTORY_POINTS = 336 # 最多 336 個點14天 × 24h
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class TrendPrediction:
"""趨勢預測結果"""
metric_name: str
current_value: float
threshold: float
forecast_hours: int # 預測時間窗口(小時)
predicted_value: float # 預測在 forecast_hours 後的值
slope_per_hour: float # 每小時變化量(線性斜率)
r_squared: float # 線性回歸 R²0-1越高趨勢越清晰
will_breach: bool # 是否預測超越 threshold
breach_in_hours: float | None # 預計幾小時後超越None = 不會超越)
confidence: str # high / medium / low / insufficient_data
shadow_mode: bool = True
detected_at: str = ""
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class TrendPredictor:
"""
趨勢預測服務
工作流程:
1. 從 Redis 取近期歷史資料點sliding window
2. numpy 線性回歸計算趨勢斜率
3. 外推預測 4h 後的值
4. 判斷是否在 4h 內超越警戒閾值
"""
async def predict_breach(
self,
metric_name: str,
current_value: float,
threshold: float,
forecast_hours: int = FORECAST_HOURS,
) -> TrendPrediction:
"""
預測 metric 是否在 forecast_hours 內超越 threshold。
Args:
metric_name: 基線識別名(需與 DynamicBaselineService 一致)
current_value: 當前觀測值
threshold: 警戒閾值(超越 = 預警)
forecast_hours: 預測窗口(預設 4h
Returns:
TrendPredictionShadow Mode 時只記錄不觸發)
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_TREND_PREDICTOR:
return self._no_data_result(metric_name, current_value, threshold, forecast_hours)
shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE
detected_at = now_taipei().isoformat()
# 推送當前值到歷史
await self._append_history(metric_name, current_value)
# 取歷史資料
history = await self._get_history(metric_name)
if len(history) < MIN_DATAPOINTS_FOR_TREND:
return TrendPrediction(
metric_name=metric_name,
current_value=current_value,
threshold=threshold,
forecast_hours=forecast_hours,
predicted_value=current_value,
slope_per_hour=0.0,
r_squared=0.0,
will_breach=current_value >= threshold,
breach_in_hours=0.0 if current_value >= threshold else None,
confidence="insufficient_data",
shadow_mode=shadow_mode,
detected_at=detected_at,
)
prediction = self._linear_regression_predict(
history=history,
current_value=current_value,
threshold=threshold,
forecast_hours=forecast_hours,
)
prediction.shadow_mode = shadow_mode
prediction.detected_at = detected_at
if prediction.will_breach:
logger.info(
"trend_breach_predicted",
metric=metric_name,
current=current_value,
predicted=prediction.predicted_value,
threshold=threshold,
breach_in_hours=prediction.breach_in_hours,
slope=prediction.slope_per_hour,
r2=prediction.r_squared,
shadow_mode=shadow_mode,
)
return prediction
async def push_datapoint(
self,
metric_name: str,
value: float,
) -> None:
"""主動推送資料點(供 ProactiveInspector 呼叫)。"""
await self._append_history(metric_name, value)
# ──────────────────────────────────────────────────────────────────────────
# Private Helpers
# ──────────────────────────────────────────────────────────────────────────
def _linear_regression_predict(
self,
history: list[tuple[float, float]], # (timestamp, value)
current_value: float,
threshold: float,
forecast_hours: int,
) -> TrendPrediction:
"""
numpy 線性回歸y = slope * x + intercept
x = 相對小時數0 到 Ny = metric 值
"""
metric_name = "" # 呼叫方 fillback
try:
import numpy as np
times = np.array([h[0] for h in history], dtype=float)
values = np.array([h[1] for h in history], dtype=float)
# 相對化時間(小時為單位)
times_rel = (times - times[0]) / 3600.0
# 線性回歸polyfit degree=1
coeffs = np.polyfit(times_rel, values, 1)
slope = float(coeffs[0]) # 每小時斜率
intercept = float(coeffs[1])
# R² 計算
fitted = np.polyval(coeffs, times_rel)
ss_res = float(np.sum((values - fitted) ** 2))
ss_tot = float(np.sum((values - np.mean(values)) ** 2))
r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
# 預測 forecast_hours 後的值
current_time_rel = (now_taipei().timestamp() - times[0]) / 3600.0
predicted_value = slope * (current_time_rel + forecast_hours) + intercept
# 信心度
if r2 >= TREND_CONFIDENCE_THRESHOLD:
confidence = "high"
elif r2 >= 0.4:
confidence = "medium"
else:
confidence = "low"
# 是否超越閾值
will_breach = predicted_value >= threshold
breach_in_hours: float | None = None
if will_breach and slope > 0 and current_value < threshold:
# 計算幾小時後超越current_value + slope * h = threshold
breach_in_hours = round((threshold - current_value) / slope, 2)
breach_in_hours = min(breach_in_hours, float(forecast_hours))
elif current_value >= threshold:
breach_in_hours = 0.0
return TrendPrediction(
metric_name=metric_name,
current_value=current_value,
threshold=threshold,
forecast_hours=forecast_hours,
predicted_value=round(predicted_value, 4),
slope_per_hour=round(slope, 6),
r_squared=round(r2, 4),
will_breach=will_breach,
breach_in_hours=breach_in_hours,
confidence=confidence,
)
except Exception as e:
logger.warning("trend_regression_failed", error=str(e))
# Fallback最近值外推
return TrendPrediction(
metric_name=metric_name,
current_value=current_value,
threshold=threshold,
forecast_hours=forecast_hours,
predicted_value=current_value,
slope_per_hour=0.0,
r_squared=0.0,
will_breach=current_value >= threshold,
breach_in_hours=0.0 if current_value >= threshold else None,
confidence="low",
)
async def _append_history(self, metric_name: str, value: float) -> None:
"""推送資料點到 Redis 滑動視窗(最舊的被 trim 掉)。"""
try:
import json
from src.core.redis_client import get_redis
r = get_redis()
key = f"{REDIS_KEY_HISTORY}{metric_name}"
point = json.dumps([now_taipei().timestamp(), value])
await r.rpush(key, point)
await r.ltrim(key, -MAX_HISTORY_POINTS, -1) # 保留最新 N 個點
await r.expire(key, REDIS_TTL_HISTORY)
except Exception as e:
logger.warning("trend_history_append_failed", metric=metric_name, error=str(e))
async def _get_history(self, metric_name: str) -> list[tuple[float, float]]:
"""從 Redis 取歷史資料點。"""
try:
import json
from src.core.redis_client import get_redis
r = get_redis()
key = f"{REDIS_KEY_HISTORY}{metric_name}"
raw = await r.lrange(key, 0, -1)
return [tuple(json.loads(item)) for item in raw]
except Exception as e:
logger.warning("trend_history_get_failed", metric=metric_name, error=str(e))
return []
def _no_data_result(
self,
metric_name: str,
current_value: float,
threshold: float,
forecast_hours: int,
) -> TrendPrediction:
"""Feature flag 關閉時的空結果。"""
return TrendPrediction(
metric_name=metric_name,
current_value=current_value,
threshold=threshold,
forecast_hours=forecast_hours,
predicted_value=current_value,
slope_per_hour=0.0,
r_squared=0.0,
will_breach=False,
breach_in_hours=None,
confidence="insufficient_data",
shadow_mode=True,
detected_at=now_taipei().isoformat(),
)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_predictor: TrendPredictor | None = None
def get_trend_predictor() -> TrendPredictor:
global _predictor
if _predictor is None:
_predictor = TrendPredictor()
return _predictor

View File

@@ -72,7 +72,91 @@
### 下一步
- Phase 2: 5 Agent 骨架 + Orchestrator + AgentSession DB
- ~~Phase 2: 5 Agent 骨架 + Orchestrator + AgentSession DB~~ → **✅ 完成commit d316221**
---
## 📍 2026-04-15 深夜 — AI 自主化飛輪 Phase 3 學習閉環重建完成
### 成品ADR-083commit 7da64ea → Gitea
| 成品 | 路徑 | 說明 |
|------|------|------|
| fire-and-forget 修復 | `services/approval_execution.py` | `create_task``await asyncio.wait_for(timeout=30)` × 2 處(成功 + 失敗路徑) |
| matched_playbook_id 欄位 | `models/approval.py` | `ApprovalRequestBase` 新增auto_execute 路徑填充 |
| _auto_execute 傳遞 | `services/decision_manager.py` | `token.proposal_data.get("playbook_id")``ApprovalRequest.matched_playbook_id` |
| 雙路徑查找 | `services/learning_service.py` | `matched_playbook_id` + `metadata` fallback |
| trust_score 欄位 | `models/playbook.py` | 新增 `trust_score: float = 0.3`EWMA 動態信任度) |
| 2x EWMA 更新 | `repositories/playbook_repository.py` | 成功 α=0.1、失敗 α=0.2trust < 0.1 → 警告 |
| Evolver Agent | `services/playbook_evolver.py` | 低信任封存 + 休眠封存 + Jaccard 相似合併(新建) |
| ADR-083 | `docs/adr/ADR-083-learning-loop-reconstruction.md` | 學習閉環重建決策紀錄 |
| MASTER §8 | `docs/superpowers/specs/2026-04-15-MASTER-ai-autonomous-flywheel-v2.md` | Phase 3 完工追加 |
### 根因修復對照
| 根因 | 修復前 | 修復後 |
|------|--------|--------|
| 學習觸發率 | 0%GC 隨時取消) | ≈100%await + 30s 熔斷) |
| Playbook EWMA | 永遠停在 0.3 | 每次執行後動態更新 |
| 負向懲罰 | 無 | 失敗 2x 衰減(α=0.2 |
| 知識庫管理 | 無退場機制 | Evolver 自動封存低信任 |
### 架構狀態
```
AIOPS_P3_ENABLED=False預設— 骨架就位,等統帥批准後開啟
AIOPS_P3_EVOLVER_ENABLED=False — Evolver 定時 job 等統帥批准
學習路徑ApprovalRequest.matched_playbook_id → learning_service → playbook_repository.update_stats(EWMA)
```
### 下一步
- Gate 3 架構審查(首席架構師 Review Phase 3
- 開啟 `AIOPS_P3_ENABLED=True` 後 E2E 驗證
- Phase 4 異常偵測升級(依賴 Phase 3 穩定)
---
## 📍 2026-04-15 深夜 — AI 自主化飛輪 Phase 2 多 Agent 協作骨架上線
### 成品ADR-082commit d316221
| 成品 | 路徑 | 說明 |
|------|------|------|
| Protocol 型別系統 | `apps/api/src/agents/protocol.py` | 5 Agent 共用資料契約dataclass不可變 |
| DiagnosticianAgent | `apps/api/src/agents/diagnostician_agent.py` | RCA 偵探confidence < 0.4 → ABSTAIN |
| SolverAgent | `apps/api/src/agents/solver_agent.py` | 修復軍師blast_radius 評分 + 降級 mock |
| ReviewerAgent | `apps/api/src/agents/reviewer_agent.py` | 安全審查HARD_RULES 靜態 regex + blast_radius 閾值 |
| CriticAgent | `apps/api/src/agents/critic_agent.py` | 刻意唱反調,強制 3 問批判critical → REJECT |
| CoordinatorAgent | `apps/api/src/agents/coordinator_agent.py` | 純規則聚合(無 LLM6 級決策閘 |
| AgentOrchestrator | `apps/api/src/services/agent_orchestrator.py` | 30s 全局超時Reviewer‖Critic 並行DB + Redis Streams |
| DecisionManager 接線 | `apps/api/src/services/decision_manager.py` | `is_phase_enabled(2)` gate + `_package_to_proposal_data` 橋接 |
| AgentSession DB 表 | `apps/api/src/db/models.py` | Immutable Event Sourcing4 複合 index |
| ADR-082 | `docs/adr/ADR-082-multi-agent-collaboration.md` | 架構決策紀錄 |
### Gate 2 修復7 項)
| 嚴重度 | 問題 | 修復位置 |
|--------|------|---------|
| CRITICAL | DELETE FROM regex lookahead 位置錯誤,攔到安全語句、放行危險語句 | reviewer_agent.py:58 |
| CRITICAL | REQUEST_REVISION 可抵達 auto-executeSolver 未修訂不可執行) | coordinator_agent.py |
| IMPORTANT | `_extract_json` flat regex 不支援巢狀 JSON所有 Agent LLM 解析靜默失敗 | base.py:167 |
| IMPORTANT | `all_degraded` 遺漏 `verdict.degraded`Reviewer 熔斷不被感知 | coordinator_agent.py |
| IMPORTANT | Solver ABSTAIN guard 放行降級假設confidence=0.2 觸發 LLM | solver_agent.py:72 |
| IMPORTANT | `dataclasses.asdict()` 保留 Enum 實例,所有 DB 審計寫入靜默失敗 | agent_orchestrator.py |
| IMPORTANT | P2 gate 直讀屬性繞過父 Phase 守衛(應用 `is_phase_enabled(2)` | decision_manager.py |
### 架構狀態
```
AIOPS_P2_ENABLED=False預設— 骨架就位,等統帥批准後開啟
執行路徑EvidenceSnapshot → Diagnostician → Solver → (Reviewer‖Critic) → Coordinator → DecisionPackage
全局超時30s單 Agent5s降級後繼續不阻塞 Coordinator
```
### 下一步
- Phase 2 測試:`test_agent_protocol.py` / `test_agent_orchestrator.py` / 各 Agent 單元測試
- 或 統帥指示進入 Phase 3學習閉環重建
---