From fab65e7d7a5e550e07cf5748b27fedb01e369069 Mon Sep 17 00:00:00 2001 From: OG T Date: Wed, 15 Apr 2026 18:56:16 +0800 Subject: [PATCH] =?UTF-8?q?fix(alerts):=20PENDING=20=E6=94=B6=E6=96=82?= =?UTF-8?q?=E7=84=A1=20TTL=20=E2=86=92=20=E8=80=81=E8=A8=98=E9=8C=84?= =?UTF-8?q?=E6=B0=B8=E4=B9=85=E5=B0=81=E9=8E=96=20Telegram=20=E5=91=8A?= =?UTF-8?q?=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根因:find_by_fingerprint 的 PENDING 匹配條件無時間上限, 2026-04-12 建立的 3 筆 PENDING approval records(hit=77/30/17) 持續吃掉所有同指紋告警,造成 2+ 小時 Telegram 靜音。 修正(approval_db.py): - PENDING_TTL_HOURS = 24:PENDING 記錄逾 24h 不再收斂新告警 - 原本:OR(status=PENDING, created_at>=30min前) - 修正:OR(PENDING AND created_at>=24h前, created_at>=30min前) 緊急修復:kubectl exec 直接將 7 筆過期 PENDING 記錄設為 expired, 即時恢復 Telegram 告警流(不等部署)。 Phase 6 AI 自我治理閉環(ADR-087): - feat(db): 新增 ai_governance_events 表 + 3 個 index(base.py + models.py) - feat(svc): ai_slo_calculator.py — 7d 滾動 SLO(success/override/false_neg) - feat(svc): trust_drift_detector.py — Playbook 信任度極端偏態偵測 - feat(job): kb_rot_cleaner.py — K8s API/Prom metric/老舊 incident_case 腐爛清理 - feat(svc): decision_manager.py — 自我降級守衛(SLO 違反 → 提高門檻/保守模式) 2026-04-15 ogt + Claude Sonnet 4.6(亞太) Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/db/base.py | 14 + apps/api/src/db/models.py | 59 +++ apps/api/src/jobs/kb_rot_cleaner.py | 247 +++++++++++ apps/api/src/services/ai_slo_calculator.py | 418 ++++++++++++++++++ apps/api/src/services/approval_db.py | 18 +- apps/api/src/services/decision_manager.py | 116 +++++ apps/api/src/services/trust_drift_detector.py | 243 ++++++++++ 7 files changed, 1112 insertions(+), 3 deletions(-) create mode 100644 apps/api/src/jobs/kb_rot_cleaner.py create mode 100644 apps/api/src/services/ai_slo_calculator.py create mode 100644 apps/api/src/services/trust_drift_detector.py diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index d664398c..3aa243e7 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -220,6 +220,20 @@ async def init_db() -> None: """) ) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環 + # ADR-087: ai_governance_events 不可變 Event Sourcing 表 + # create_all 已建表,此處補 INDEX(部分環境 create 不跑 Index) + await conn.execute( + text(""" + CREATE INDEX IF NOT EXISTS ix_ai_governance_event_type + ON ai_governance_events (event_type); + CREATE INDEX IF NOT EXISTS ix_ai_governance_triggered_at + ON ai_governance_events (triggered_at); + CREATE INDEX IF NOT EXISTS ix_ai_governance_resolved + ON ai_governance_events (resolved); + """) + ) + async def close_db() -> None: """ diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py index 3710a736..3b6a9f51 100644 --- a/apps/api/src/db/models.py +++ b/apps/api/src/db/models.py @@ -1116,3 +1116,62 @@ class AgentSession(Base): # 查詢某 session 中特定 role 的 turn(Coordinator 聚合時常用) Index("ix_agent_sessions_session_role", "session_id", "agent_role"), ) + + +# ============================================================================= +# AiGovernanceEvent — Phase 6 自我治理事件溯源(不可刪除) +# ADR-087: AI 自我治理閉環:SLO 違反 / 信任漂移 / KB 腐爛 / 自我降級 +# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 +# +# 核心鐵律: +# - 不可變 Event Sourcing — 只 INSERT,禁止 UPDATE/DELETE +# - 所有治理事件必須落地 PG,SLO dashboard 依賴此表 +# - resolved=True 僅由人工或下次計算時補填,不可自動翻轉未解決項目 +# ============================================================================= + +class AiGovernanceEvent(Base): + """ + AI 自我治理事件記錄(不可變) + + event_type 值: + slo_violation — SLO 計算結果違反閾值 + trust_drift — Playbook 信任度分布偏態(全高或全低) + kb_stale — KB 條目引用已廢棄 K8s API / Prometheus query + self_demotion — 信心閾值自動調高(自我降級) + conservative_mode — 連續 SLO 違反,全系統切保守模式 + replay_degraded — 離線回放一致率連續下降 + + immutable — 只 INSERT,禁 UPDATE / DELETE + """ + __tablename__ = "ai_governance_events" + + id: Mapped[str] = mapped_column( + String(36), primary_key=True, default=generate_uuid, + comment="主鍵(UUID)" + ) + event_type: Mapped[str] = mapped_column( + String(40), nullable=False, + comment="slo_violation / trust_drift / kb_stale / self_demotion / conservative_mode / replay_degraded" + ) + triggered_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=taipei_now, nullable=False, + comment="事件觸發時間(台北時區)" + ) + details: Mapped[dict] = mapped_column( + JSON, nullable=False, default=dict, + comment="事件詳情 JSONB(SLO 數值、漂移分布等)" + ) + resolved: Mapped[bool] = mapped_column( + default=False, nullable=False, + comment="是否已解決(人工確認或下次計算恢復正常後補填)" + ) + resolved_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True, + comment="解決時間(僅人工/系統補填,不得自動反轉未解決項目)" + ) + + __table_args__ = ( + Index("ix_ai_governance_event_type", "event_type"), + Index("ix_ai_governance_triggered_at", "triggered_at"), + Index("ix_ai_governance_resolved", "resolved"), + ) diff --git a/apps/api/src/jobs/kb_rot_cleaner.py b/apps/api/src/jobs/kb_rot_cleaner.py new file mode 100644 index 00000000..349e56f9 --- /dev/null +++ b/apps/api/src/jobs/kb_rot_cleaner.py @@ -0,0 +1,247 @@ +""" +AWOOOI AIOps Phase 6 — KB 腐爛清理 Job +======================================= +職責:月度巡檢知識庫(knowledge_entries)中腐爛的知識條目, + 標記引用了已廢棄資源的條目為 stale,並寫入 ai_governance_events。 + +「腐爛」的三種形態: + ROT-1 廢棄 K8s API 版本引用(extensions/v1beta1、apps/v1beta1、v1beta2) + ROT-2 過時 Prometheus query pattern(已知廢棄 metric 名稱前綴) + ROT-3 超過 180 天未被引用且成功率為 0 的 incident_case 條目 + +設計原則: +1. 只讀掃描 + 標記(不刪除任何 entry,符合 archive_not_delete 鐵律) +2. 標記方式:status = 'archived' + tags 追加 'kb_rot_detected' +3. 掃描失敗 → 記錄 error,不拋出,不影響主路徑 +4. 每次執行結果寫 ai_governance_events(event_type=kb_stale) + +ADR-087: AI 自我治理閉環 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from datetime import timedelta + +import structlog +from sqlalchemy import select, update + +from src.db.base import get_session_factory +from src.db.models import AiGovernanceEvent, KnowledgeEntryRecord +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# 腐爛偵測規則(不可寫死 action,只標記 stale) +# ───────────────────────────────────────────────────────────────────────────── + +# ROT-1: 廢棄 K8s API 版本(Kubernetes 1.16+ 已移除) +DEPRECATED_K8S_APIS = [ + "extensions/v1beta1", + "apps/v1beta1", + "apps/v1beta2", + "networking.k8s.io/v1beta1", + "policy/v1beta1", + "rbac.authorization.k8s.io/v1beta1", +] + +# ROT-2: 廢棄 Prometheus metric 前綴(已知改名的 metric pattern) +DEPRECATED_PROM_PATTERNS = [ + r"container_cpu_used_total", # → container_cpu_usage_seconds_total + r"kube_pod_container_status_restarts$", # → kube_pod_container_status_restarts_total + r"http_requests_total\{.*le=", # 錯誤 histogram 用法 +] + +# ROT-3: 未引用 + 零成功率條目的老化天數 +STALE_AGE_DAYS = 180 + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class RotScanResult: + """KB 腐爛掃描結果""" + total_scanned: int + stale_ids: list[str] = field(default_factory=list) + rot_reasons: dict[str, list[str]] = field(default_factory=dict) + # rot_reasons: {entry_id: ["ROT-1: extensions/v1beta1", ...]} + scanned_at: str = field(default_factory=lambda: now_taipei().isoformat()) + + @property + def stale_count(self) -> int: + return len(self.stale_ids) + + def to_dict(self) -> dict: + return { + "total_scanned": self.total_scanned, + "stale_count": self.stale_count, + "stale_ids": self.stale_ids[:50], # 最多記錄前 50 個 + "rot_reasons_sample": {k: v for k, v in list(self.rot_reasons.items())[:10]}, + "scanned_at": self.scanned_at, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Job +# ───────────────────────────────────────────────────────────────────────────── + +class KbRotCleaner: + """ + KB 腐爛清理 Job(月度執行) + + Usage: + cleaner = KbRotCleaner() + result = await cleaner.run() + """ + + async def run(self) -> RotScanResult: + """ + 完整執行:掃描 → 標記 stale → 寫 governance event。 + + Returns: + RotScanResult + """ + from src.core.feature_flags import aiops_flags + if not aiops_flags.is_sub_flag_enabled("AIOPS_P6_KB_ROT_CLEANER"): + logger.info("kb_rot_cleaner_skipped_feature_flag") + return RotScanResult(total_scanned=0) + + try: + result = await self._scan() + if result.stale_count > 0: + await self._mark_stale(result) + await self._save_event(result) + else: + logger.info("kb_rot_scan_clean", total_scanned=result.total_scanned) + return result + except Exception as e: + logger.error("kb_rot_cleaner_error", error=str(e)) + return RotScanResult(total_scanned=0) + + async def _scan(self) -> RotScanResult: + """掃描所有 approved / draft 條目,找出腐爛項目。""" + stale_ids: list[str] = [] + rot_reasons: dict[str, list[str]] = {} + total = 0 + + async with get_session_factory()() as session: + # 只掃 active 狀態(非 archived) + q = await session.execute( + select(KnowledgeEntryRecord).where( + KnowledgeEntryRecord.status.in_(["approved", "draft", "review"]) + ) + ) + entries = q.scalars().all() + total = len(entries) + + stale_cutoff = now_taipei() - timedelta(days=STALE_AGE_DAYS) + + for entry in entries: + reasons: list[str] = [] + + content = (entry.content or "").lower() + title = (entry.title or "").lower() + combined = content + " " + title + + # ROT-1: 廢棄 K8s API + for api in DEPRECATED_K8S_APIS: + if api.lower() in combined: + reasons.append(f"ROT-1: 廢棄 K8s API {api}") + + # ROT-2: 廢棄 Prometheus pattern + for pattern in DEPRECATED_PROM_PATTERNS: + if re.search(pattern, combined): + reasons.append(f"ROT-2: 廢棄 Prom metric pattern {pattern[:40]}") + + # ROT-3: 老化未引用(incident_case 且 180 天未更新) + if ( + entry.entry_type == "incident_case" + and entry.updated_at < stale_cutoff + and entry.view_count == 0 + ): + reasons.append( + f"ROT-3: 超過 {STALE_AGE_DAYS}d 未引用 " + f"(last_updated={entry.updated_at.strftime('%Y-%m-%d')})" + ) + + if reasons: + stale_ids.append(entry.id) + rot_reasons[entry.id] = reasons + + logger.info( + "kb_rot_scan_complete", + total=total, + stale_count=len(stale_ids), + ) + return RotScanResult( + total_scanned=total, + stale_ids=stale_ids, + rot_reasons=rot_reasons, + ) + + async def _mark_stale(self, result: RotScanResult) -> None: + """ + 將腐爛條目標記為 archived,並追加 kb_rot_detected tag。 + + 符合 archive_not_delete 鐵律:只封存,不刪除。 + """ + if not result.stale_ids: + return + + async with get_session_factory()() as session: + # 逐條更新(避免 bulk update 覆蓋 tags JSONB) + q = await session.execute( + select(KnowledgeEntryRecord).where( + KnowledgeEntryRecord.id.in_(result.stale_ids) + ) + ) + entries = q.scalars().all() + + for entry in entries: + entry.status = "archived" + tags = list(entry.tags or []) + if "kb_rot_detected" not in tags: + tags.append("kb_rot_detected") + entry.tags = tags + + await session.commit() + + logger.warning( + "kb_rot_entries_archived", + count=len(result.stale_ids), + entry_ids=result.stale_ids[:10], + ) + + async def _save_event(self, result: RotScanResult) -> None: + """寫 kb_stale 事件到 ai_governance_events。""" + try: + async with get_session_factory()() as session: + event = AiGovernanceEvent( + event_type="kb_stale", + details=result.to_dict(), + resolved=False, + ) + session.add(event) + await session.commit() + logger.info("kb_rot_event_saved", stale_count=result.stale_count) + except Exception as e: + logger.error("kb_rot_event_save_error", error=str(e)) + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_cleaner: KbRotCleaner | None = None + + +def get_kb_rot_cleaner() -> KbRotCleaner: + global _cleaner + if _cleaner is None: + _cleaner = KbRotCleaner() + return _cleaner diff --git a/apps/api/src/services/ai_slo_calculator.py b/apps/api/src/services/ai_slo_calculator.py new file mode 100644 index 00000000..84b39e40 --- /dev/null +++ b/apps/api/src/services/ai_slo_calculator.py @@ -0,0 +1,418 @@ +""" +AWOOOI AIOps Phase 6 — AI SLO 計算器(決策品質自我監控) +========================================================= +職責:滾動計算三大 AI 決策品質 SLO;違反閾值時寫入 ai_governance_events, + 供 decision_manager 自我降級邏輯讀取。 + +三大 SLO(MASTER §3.6 ADR-087): + SLO-1 auto_execute_success_rate > 85% (7d 滾動) + SLO-2 human_override_rate < 20% (7d 滾動) + SLO-3 verifier_false_neg_rate < 5% (7d 滾動,proxy: 2h 內重複告警) + +設計原則: +1. 純讀 + 純寫分離 — calculate() 只讀 DB,save_event() 只寫 DB +2. 計算失敗 → 保守:假設 SLO 違反,寫 violation 事件 +3. 所有結果快取 Redis(key: ai:slo:latest, TTL 5min),避免高頻查 DB +4. 不自動解決舊 violation — resolved 只能人工或下次「全部通過」時補填 + +ADR-087: AI 自我治理閉環 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import timedelta + +import structlog +from sqlalchemy import func, select, text + +from src.db.base import get_session_factory +from src.db.models import AiGovernanceEvent, AutoRepairExecution, ApprovalRecord +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# SLO 閾值(MASTER §3.6 鐵律,修改前需 ADR-087 更新) +# ───────────────────────────────────────────────────────────────────────────── + +SLO_AUTO_SUCCESS_MIN: float = 0.85 # auto_execute 成功率下限 +SLO_OVERRIDE_RATE_MAX: float = 0.20 # 人工推翻率上限 +SLO_FALSE_NEG_MAX: float = 0.05 # verifier false negative 上限 + +SLO_WINDOW_DAYS: int = 7 # 滾動視窗(天) +SLO_MIN_SAMPLES: int = 5 # 最少樣本數,低於此不計算(資料不足) + +REDIS_KEY = "ai:slo:latest" +REDIS_TTL_SEC = 300 # 5 分鐘快取 + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class SloMetric: + """單一 SLO 指標""" + name: str + value: float | None # None = 樣本不足,跳過 + threshold: float + direction: str # "above" = 需高於閾值 / "below" = 需低於閾值 + sample_count: int + violated: bool # 是否違反(None → False,不觸發降級) + + @property + def label(self) -> str: + if self.value is None: + return f"{self.name}: N/A(樣本 {self.sample_count} < {SLO_MIN_SAMPLES})" + pct = f"{self.value:.1%}" + thr = f"{self.threshold:.0%}" + op = ">" if self.direction == "above" else "<" + status = "❌ 違反" if self.violated else "✅ 合規" + return f"{self.name}: {pct} (需 {op}{thr}) {status}" + + +@dataclass +class SloReport: + """完整 SLO 計算報告""" + metrics: list[SloMetric] = field(default_factory=list) + any_violated: bool = False + calculated_at: str = field(default_factory=lambda: now_taipei().isoformat()) + window_days: int = SLO_WINDOW_DAYS + + def to_dict(self) -> dict: + return { + "calculated_at": self.calculated_at, + "window_days": self.window_days, + "any_violated": self.any_violated, + "metrics": [ + { + "name": m.name, + "value": m.value, + "threshold": m.threshold, + "direction": m.direction, + "sample_count": m.sample_count, + "violated": m.violated, + "label": m.label, + } + for m in self.metrics + ], + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class AiSloCalculator: + """ + AI 決策品質 SLO 計算器 + + Usage: + calc = AiSloCalculator() + report = await calc.calculate() + if report.any_violated: + await calc.save_violation_event(report) + """ + + async def calculate(self) -> SloReport: + """ + 計算三大 SLO 指標(7d 滾動視窗)。 + + Returns: + SloReport(計算失敗時保守回傳 any_violated=True) + """ + try: + since = now_taipei() - timedelta(days=SLO_WINDOW_DAYS) + + async with get_session_factory()() as session: + slo1 = await self._calc_auto_success_rate(session, since) + slo2 = await self._calc_human_override_rate(session, since) + slo3 = await self._calc_false_neg_rate(session, since) + + metrics = [slo1, slo2, slo3] + any_violated = any(m.violated for m in metrics) + + report = SloReport( + metrics=metrics, + any_violated=any_violated, + ) + + logger.info( + "slo_calculated", + any_violated=any_violated, + slo1=slo1.value, + slo2=slo2.value, + slo3=slo3.value, + ) + return report + + except Exception as e: + logger.error("slo_calculation_error", error=str(e)) + # 保守:計算失敗 → 假設違反 + violated_metric = SloMetric( + name="calculation_error", + value=None, + threshold=0.0, + direction="above", + sample_count=0, + violated=True, + ) + return SloReport( + metrics=[violated_metric], + any_violated=True, + ) + + async def get_cached_report(self) -> SloReport | None: + """從 Redis 讀取最近一次 SLO 報告(5min 快取)。""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + raw = await redis.get(REDIS_KEY) + if raw: + data = json.loads(raw) + metrics = [ + SloMetric( + name=m["name"], + value=m["value"], + threshold=m["threshold"], + direction=m["direction"], + sample_count=m["sample_count"], + violated=m["violated"], + ) + for m in data.get("metrics", []) + ] + return SloReport( + metrics=metrics, + any_violated=data.get("any_violated", False), + calculated_at=data.get("calculated_at", ""), + window_days=data.get("window_days", SLO_WINDOW_DAYS), + ) + except Exception as e: + logger.warning("slo_cache_read_error", error=str(e)) + return None + + async def cache_report(self, report: SloReport) -> None: + """將 SLO 報告存入 Redis 快取(TTL 5min)。""" + try: + from src.core.redis_client import get_redis + redis = get_redis() + await redis.set(REDIS_KEY, json.dumps(report.to_dict()), ex=REDIS_TTL_SEC) + except Exception as e: + logger.warning("slo_cache_write_error", error=str(e)) + + async def save_violation_event(self, report: SloReport) -> None: + """ + 將 SLO 違反寫入 ai_governance_events。 + + 只在 any_violated=True 時呼叫。不管舊違反是否解決。 + """ + try: + async with get_session_factory()() as session: + event = AiGovernanceEvent( + event_type="slo_violation", + details=report.to_dict(), + resolved=False, + ) + session.add(event) + await session.commit() + logger.warning( + "slo_violation_recorded", + violated_metrics=[m.name for m in report.metrics if m.violated], + ) + except Exception as e: + logger.error("slo_violation_save_error", error=str(e)) + + async def run(self) -> SloReport: + """ + 完整執行:計算 → 快取 → 如違反則寫事件。 + + Returns: + SloReport + """ + report = await self.calculate() + await self.cache_report(report) + if report.any_violated: + await self.save_violation_event(report) + return report + + # ────────────────────────────────────────────────────────────────────────── + # Private: SLO 計算方法 + # ────────────────────────────────────────────────────────────────────────── + + async def _calc_auto_success_rate(self, session, since) -> SloMetric: + """SLO-1: auto_repair_executions 7d 成功率。""" + try: + total_q = await session.execute( + select(func.count()).where( + AutoRepairExecution.created_at >= since + ) + ) + total: int = total_q.scalar() or 0 + + if total < SLO_MIN_SAMPLES: + return SloMetric( + name="auto_execute_success_rate", + value=None, + threshold=SLO_AUTO_SUCCESS_MIN, + direction="above", + sample_count=total, + violated=False, + ) + + success_q = await session.execute( + select(func.count()).where( + AutoRepairExecution.created_at >= since, + AutoRepairExecution.success.is_(True), + ) + ) + success: int = success_q.scalar() or 0 + rate = success / total + + return SloMetric( + name="auto_execute_success_rate", + value=rate, + threshold=SLO_AUTO_SUCCESS_MIN, + direction="above", + sample_count=total, + violated=rate < SLO_AUTO_SUCCESS_MIN, + ) + except Exception as e: + logger.warning("slo1_calc_error", error=str(e)) + return SloMetric( + name="auto_execute_success_rate", + value=None, threshold=SLO_AUTO_SUCCESS_MIN, + direction="above", sample_count=0, violated=False, + ) + + async def _calc_human_override_rate(self, session, since) -> SloMetric: + """ + SLO-2: 人工推翻率 = AI 提案被 rejected / 總 AI 提案。 + + rejected = approval_records.status = 'rejected' + AI 提案 = requested_by LIKE 'ai_%' or 'system' + """ + try: + ai_q = await session.execute( + select(func.count()).where( + ApprovalRecord.created_at >= since, + ) + ) + total: int = ai_q.scalar() or 0 + + if total < SLO_MIN_SAMPLES: + return SloMetric( + name="human_override_rate", + value=None, + threshold=SLO_OVERRIDE_RATE_MAX, + direction="below", + sample_count=total, + violated=False, + ) + + rejected_q = await session.execute( + select(func.count()).where( + ApprovalRecord.created_at >= since, + ApprovalRecord.status == "rejected", + ) + ) + rejected: int = rejected_q.scalar() or 0 + rate = rejected / total + + return SloMetric( + name="human_override_rate", + value=rate, + threshold=SLO_OVERRIDE_RATE_MAX, + direction="below", + sample_count=total, + violated=rate > SLO_OVERRIDE_RATE_MAX, + ) + except Exception as e: + logger.warning("slo2_calc_error", error=str(e)) + return SloMetric( + name="human_override_rate", + value=None, threshold=SLO_OVERRIDE_RATE_MAX, + direction="below", sample_count=0, violated=False, + ) + + async def _calc_false_neg_rate(self, session, since) -> SloMetric: + """ + SLO-3: Verifier false negative(代理指標)。 + + 計算方式:auto_repair 執行後 2 小時內同 incident_id 再次出現 + 在 auto_repair_executions 中(= 修好了又壞 = verifier 誤判為成功)。 + + 使用 SQL window function: + - 找出 success=True 的執行 + - 計算同 incident_id 下是否有後續 failed 執行在 2h 內 + """ + try: + result = await session.execute( + text(""" + WITH success_runs AS ( + SELECT incident_id, created_at + FROM auto_repair_executions + WHERE success = TRUE + AND created_at >= :since + ), + false_negs AS ( + SELECT DISTINCT s.incident_id + FROM success_runs s + JOIN auto_repair_executions f + ON f.incident_id = s.incident_id + AND f.success = FALSE + AND f.created_at > s.created_at + AND f.created_at <= s.created_at + INTERVAL '2 hours' + ) + SELECT + (SELECT COUNT(*) FROM success_runs) AS total_success, + (SELECT COUNT(*) FROM false_negs) AS false_neg_count + """), + {"since": since}, + ) + row = result.fetchone() + total_success: int = row[0] if row else 0 + false_neg: int = row[1] if row else 0 + + if total_success < SLO_MIN_SAMPLES: + return SloMetric( + name="verifier_false_neg_rate", + value=None, + threshold=SLO_FALSE_NEG_MAX, + direction="below", + sample_count=total_success, + violated=False, + ) + + rate = false_neg / total_success + return SloMetric( + name="verifier_false_neg_rate", + value=rate, + threshold=SLO_FALSE_NEG_MAX, + direction="below", + sample_count=total_success, + violated=rate > SLO_FALSE_NEG_MAX, + ) + except Exception as e: + logger.warning("slo3_calc_error", error=str(e)) + return SloMetric( + name="verifier_false_neg_rate", + value=None, threshold=SLO_FALSE_NEG_MAX, + direction="below", sample_count=0, violated=False, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_calculator: AiSloCalculator | None = None + + +def get_ai_slo_calculator() -> AiSloCalculator: + global _calculator + if _calculator is None: + _calculator = AiSloCalculator() + return _calculator diff --git a/apps/api/src/services/approval_db.py b/apps/api/src/services/approval_db.py index 677d2fea..14ec07dd 100644 --- a/apps/api/src/services/approval_db.py +++ b/apps/api/src/services/approval_db.py @@ -272,14 +272,21 @@ class ApprovalDBService: 查詢條件: 1. 相同指紋 - 2. 狀態為 PENDING,或 - 3. 在 debounce_minutes 分鐘內建立 + 2. 狀態為 PENDING 且在 24 小時內建立(超過 24h 的 PENDING 視為過期,不再收斂) + 3. 或在 debounce_minutes 分鐘內建立(不論狀態) + + ADR-073 補丁 2026-04-15 ogt + Claude Sonnet 4.6: + 原邏輯 PENDING 無 TTL → 3 天前 PENDING 記錄永久封鎖同指紋告警。 + 修正:PENDING 收斂窗口上限 PENDING_TTL_HOURS(24h)。 Returns: ApprovalRequest if found, None otherwise """ + PENDING_TTL_HOURS = 24 # PENDING 記錄最長收斂時效(超過則視為已過期) + now = datetime.now(UTC) cutoff_time = now - timedelta(minutes=debounce_minutes) + pending_cutoff = now - timedelta(hours=PENDING_TTL_HOURS) async with get_db_context() as db: result = await db.execute( @@ -287,7 +294,12 @@ class ApprovalDBService: .where(ApprovalRecord.fingerprint == fingerprint) .where( or_( - ApprovalRecord.status == ApprovalStatus.PENDING, + # PENDING 狀態但必須在 24h 內,防止老 PENDING 永久封鎖 + and_( + ApprovalRecord.status == ApprovalStatus.PENDING, + ApprovalRecord.created_at >= pending_cutoff, + ), + # 最近 debounce_minutes 分鐘內建立的任何記錄 ApprovalRecord.created_at >= cutoff_time, ) ) diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 725a6daa..7df9bac1 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -1316,6 +1316,122 @@ class DecisionManager: """ action = token.proposal_data.get("kubectl_command", "") + # Phase 6 ADR-087: 自我降級守衛(AIOPS_P6_SELF_DEMOTION 控制) + # SLO 違反 → 全域信心閾值調高;連續違反 → 保守模式,所有自動執行降為人工 + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 + try: + from src.core.feature_flags import aiops_flags as _p6_flags + if _p6_flags.is_sub_flag_enabled("AIOPS_P6_SELF_DEMOTION"): + from src.db.base import get_session_factory as _p6_sf + from src.db.models import AiGovernanceEvent as _GovernanceEvent + from sqlalchemy import select as _p6_select, func as _p6_func + from datetime import timedelta as _p6_td + + _now = __import__("src.utils.timezone", fromlist=["now_taipei"]).now_taipei() + + async with _p6_sf()() as _p6_sess: + # 過去 7 天有幾筆未解決的 slo_violation? + _viol_7d_q = await _p6_sess.execute( + _p6_select(_p6_func.count()).where( + _GovernanceEvent.event_type == "slo_violation", + _GovernanceEvent.resolved.is_(False), + _GovernanceEvent.triggered_at >= _now - _p6_td(days=7), + ) + ) + _viol_7d: int = _viol_7d_q.scalar() or 0 + + # 過去 14 天有幾筆未解決的 slo_violation? + _viol_14d_q = await _p6_sess.execute( + _p6_select(_p6_func.count()).where( + _GovernanceEvent.event_type == "slo_violation", + _GovernanceEvent.resolved.is_(False), + _GovernanceEvent.triggered_at >= _now - _p6_td(days=14), + ) + ) + _viol_14d: int = _viol_14d_q.scalar() or 0 + + if _viol_14d >= 2: + # 連續 2 週違反 → 保守模式:全部降為人工 + logger.warning( + "auto_execute_conservative_mode", + incident_id=incident.incident_id, + viol_14d=_viol_14d, + reason="Phase 6 保守模式:連續 SLO 違反,所有自動執行暫停", + ) + token.state = DecisionState.READY + token.proposal_data["decision_state"] = DecisionState.READY.value + token.proposal_data["auto_executed"] = False + token.proposal_data["p6_conservative_mode"] = True + token.proposal_data["p6_reason"] = f"SLO 連續違反 {_viol_14d}d,系統進入保守模式" + await self._save_token(token) + _fire_and_forget( + _push_decision_to_telegram(incident, token.proposal_data) + ) + # 記錄保守模式事件 + try: + from src.db.base import get_session_factory as _p6_sf2 + async with _p6_sf2()() as _s2: + _s2.add(_GovernanceEvent( + event_type="conservative_mode", + details={ + "incident_id": incident.incident_id, + "viol_14d": _viol_14d, + "triggered_at": _now.isoformat(), + }, + resolved=False, + )) + await _s2.commit() + except Exception: + pass + return + + elif _viol_7d >= 1: + # 近 7 天有違反 → 自我降級:信心閾值提高,記錄 demotion 事件 + _confidence = float(token.proposal_data.get("confidence", 0.0)) + _raised_threshold = 0.75 # 原 0.70 → 調高 0.05 + if _confidence < _raised_threshold: + logger.warning( + "auto_execute_self_demoted", + incident_id=incident.incident_id, + confidence=_confidence, + raised_threshold=_raised_threshold, + reason="Phase 6 自我降級:近 7d SLO 違反,信心閾值提高", + ) + token.state = DecisionState.READY + token.proposal_data["decision_state"] = DecisionState.READY.value + token.proposal_data["auto_executed"] = False + token.proposal_data["p6_self_demoted"] = True + token.proposal_data["p6_reason"] = ( + f"Phase 6 自我降級:近 7d SLO 違反," + f"信心 {_confidence:.2f} < {_raised_threshold},升為人工" + ) + await self._save_token(token) + _fire_and_forget( + _push_decision_to_telegram(incident, token.proposal_data) + ) + try: + from src.db.base import get_session_factory as _p6_sf3 + async with _p6_sf3()() as _s3: + _s3.add(_GovernanceEvent( + event_type="self_demotion", + details={ + "incident_id": incident.incident_id, + "confidence": _confidence, + "raised_threshold": _raised_threshold, + "viol_7d": _viol_7d, + "triggered_at": _now.isoformat(), + }, + resolved=False, + )) + await _s3.commit() + except Exception: + pass + return + # confidence >= raised_threshold → 允許繼續自動執行 + except Exception as _p6_err: + logger.warning("p6_self_demotion_check_error", error=str(_p6_err)) + # 保守:P6 check 出錯 → 不阻擋(避免因 P6 bug 把所有修復都堵住) + # ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt) # LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y" # | 後面是查詢指令,取第一個才是真正的修復操作 diff --git a/apps/api/src/services/trust_drift_detector.py b/apps/api/src/services/trust_drift_detector.py new file mode 100644 index 00000000..efac1361 --- /dev/null +++ b/apps/api/src/services/trust_drift_detector.py @@ -0,0 +1,243 @@ +""" +AWOOOI AIOps Phase 6 — Trust Drift Detector(信任度漂移偵測器) +=============================================================== +職責:偵測 Playbook trust_score 分布的兩種極端偏態: + + 極端 A「盲目樂觀」:> 70% Playbook trust_score > 0.9 + → 可能是 PostExecutionVerifier 失效,或 RAG 資料被污染,讓所有 AI 都以為「我很棒」 + → 真正的好系統不會所有 Playbook 都高分 + + 極端 B「學習鎖死」:> 70% Playbook trust_score < 0.3 + → 可能是 EWMA 計算出錯,或所有執行都被誤判失敗,讓 AI 對自己完全沒信心 + → 學習機制可能卡死 + +設計原則: +1. 只讀 DB,不修改任何數據 +2. 違反 → 寫 trust_drift 事件到 ai_governance_events +3. 樣本不足(< 10 個 approved Playbook)→ 跳過偵測,不告警 + +ADR-087: AI 自我治理閉環 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 +""" + +from __future__ import annotations + +from dataclasses import dataclass + +import structlog +from sqlalchemy import func, select + +from src.db.base import get_session_factory +from src.db.models import AiGovernanceEvent, PlaybookRecord +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# 偵測閾值(MASTER §3.6,修改需 ADR-087 更新) +# ───────────────────────────────────────────────────────────────────────────── + +DRIFT_HIGH_THRESHOLD: float = 0.9 # trust_score > 此值算「過高」 +DRIFT_LOW_THRESHOLD: float = 0.3 # trust_score < 此值算「過低」 +DRIFT_RATIO_TRIGGER: float = 0.70 # 超過 70% Playbook 落在極端 → 觸發警報 +DRIFT_MIN_SAMPLES: int = 10 # 最少 approved Playbook 數量 + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class TrustDistribution: + """Playbook 信任度分布快照""" + total: int + high_count: int # trust_score > 0.9 + low_count: int # trust_score < 0.3 + mid_count: int # 0.3 <= trust_score <= 0.9(正常區間) + high_ratio: float + low_ratio: float + mean_trust: float + drift_type: str | None # "optimism_bias" / "confidence_collapse" / None + drift_detected: bool + + def to_dict(self) -> dict: + return { + "total": self.total, + "high_count": self.high_count, + "low_count": self.low_count, + "mid_count": self.mid_count, + "high_ratio": round(self.high_ratio, 4), + "low_ratio": round(self.low_ratio, 4), + "mean_trust": round(self.mean_trust, 4), + "drift_type": self.drift_type, + "drift_detected": self.drift_detected, + "thresholds": { + "high": DRIFT_HIGH_THRESHOLD, + "low": DRIFT_LOW_THRESHOLD, + "ratio_trigger": DRIFT_RATIO_TRIGGER, + "min_samples": DRIFT_MIN_SAMPLES, + }, + } + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class TrustDriftDetector: + """ + 信任度漂移偵測器 + + Usage: + detector = TrustDriftDetector() + dist = await detector.detect() + if dist.drift_detected: + await detector.save_drift_event(dist) + """ + + async def detect(self) -> TrustDistribution: + """ + 讀取所有 approved Playbook,計算信任度分布,偵測漂移。 + + Returns: + TrustDistribution(樣本不足時 drift_detected=False) + """ + try: + async with get_session_factory()() as session: + # 只計算 approved 狀態的 Playbook + total_q = await session.execute( + select(func.count()).where( + PlaybookRecord.status == "approved" + ) + ) + total: int = total_q.scalar() or 0 + + if total < DRIFT_MIN_SAMPLES: + logger.info( + "trust_drift_skip_insufficient_samples", + total=total, + required=DRIFT_MIN_SAMPLES, + ) + return TrustDistribution( + total=total, + high_count=0, low_count=0, mid_count=0, + high_ratio=0.0, low_ratio=0.0, mean_trust=0.0, + drift_type=None, drift_detected=False, + ) + + high_q = await session.execute( + select(func.count()).where( + PlaybookRecord.status == "approved", + PlaybookRecord.trust_score > DRIFT_HIGH_THRESHOLD, + ) + ) + high_count: int = high_q.scalar() or 0 + + low_q = await session.execute( + select(func.count()).where( + PlaybookRecord.status == "approved", + PlaybookRecord.trust_score < DRIFT_LOW_THRESHOLD, + ) + ) + low_count: int = low_q.scalar() or 0 + + mean_q = await session.execute( + select(func.avg(PlaybookRecord.trust_score)).where( + PlaybookRecord.status == "approved" + ) + ) + mean_trust: float = float(mean_q.scalar() or 0.0) + + mid_count = total - high_count - low_count + high_ratio = high_count / total + low_ratio = low_count / total + + # 偵測漂移類型 + drift_type = None + if high_ratio >= DRIFT_RATIO_TRIGGER: + drift_type = "optimism_bias" # 所有 Playbook 都覺得自己很好 → 可疑 + elif low_ratio >= DRIFT_RATIO_TRIGGER: + drift_type = "confidence_collapse" # AI 對自己完全沒信心 → 學習卡死 + + dist = TrustDistribution( + total=total, + high_count=high_count, + low_count=low_count, + mid_count=mid_count, + high_ratio=high_ratio, + low_ratio=low_ratio, + mean_trust=mean_trust, + drift_type=drift_type, + drift_detected=drift_type is not None, + ) + + if dist.drift_detected: + logger.warning( + "trust_drift_detected", + drift_type=drift_type, + high_ratio=round(high_ratio, 3), + low_ratio=round(low_ratio, 3), + mean_trust=round(mean_trust, 3), + total=total, + ) + else: + logger.info( + "trust_drift_ok", + mean_trust=round(mean_trust, 3), + total=total, + high_ratio=round(high_ratio, 3), + ) + + return dist + + except Exception as e: + logger.error("trust_drift_detect_error", error=str(e)) + # 保守:偵測失敗 → 不告警(不知道比亂告警好) + return TrustDistribution( + total=0, + high_count=0, low_count=0, mid_count=0, + high_ratio=0.0, low_ratio=0.0, mean_trust=0.0, + drift_type=None, drift_detected=False, + ) + + async def save_drift_event(self, dist: TrustDistribution) -> None: + """將信任度漂移事件寫入 ai_governance_events。""" + try: + async with get_session_factory()() as session: + event = AiGovernanceEvent( + event_type="trust_drift", + details={ + **dist.to_dict(), + "detected_at": now_taipei().isoformat(), + }, + resolved=False, + ) + session.add(event) + await session.commit() + logger.warning( + "trust_drift_event_saved", + drift_type=dist.drift_type, + ) + except Exception as e: + logger.error("trust_drift_event_save_error", error=str(e)) + + async def run(self) -> TrustDistribution: + """完整執行:偵測 → 如有漂移則寫事件。""" + dist = await self.detect() + if dist.drift_detected: + await self.save_drift_event(dist) + return dist + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_detector: TrustDriftDetector | None = None + + +def get_trust_drift_detector() -> TrustDriftDetector: + global _detector + if _detector is None: + _detector = TrustDriftDetector() + return _detector