fix(alerts): PENDING 收斂無 TTL → 老記錄永久封鎖 Telegram 告警
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running
根因:find_by_fingerprint 的 PENDING 匹配條件無時間上限, 2026-04-12 建立的 3 筆 PENDING approval records(hit=77/30/17) 持續吃掉所有同指紋告警,造成 2+ 小時 Telegram 靜音。 修正(approval_db.py): - PENDING_TTL_HOURS = 24:PENDING 記錄逾 24h 不再收斂新告警 - 原本:OR(status=PENDING, created_at>=30min前) - 修正:OR(PENDING AND created_at>=24h前, created_at>=30min前) 緊急修復:kubectl exec 直接將 7 筆過期 PENDING 記錄設為 expired, 即時恢復 Telegram 告警流(不等部署)。 Phase 6 AI 自我治理閉環(ADR-087): - feat(db): 新增 ai_governance_events 表 + 3 個 index(base.py + models.py) - feat(svc): ai_slo_calculator.py — 7d 滾動 SLO(success/override/false_neg) - feat(svc): trust_drift_detector.py — Playbook 信任度極端偏態偵測 - feat(job): kb_rot_cleaner.py — K8s API/Prom metric/老舊 incident_case 腐爛清理 - feat(svc): decision_manager.py — 自我降級守衛(SLO 違反 → 提高門檻/保守模式) 2026-04-15 ogt + Claude Sonnet 4.6(亞太) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -220,6 +220,20 @@ async def init_db() -> None:
|
||||
""")
|
||||
)
|
||||
|
||||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
|
||||
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
|
||||
# create_all 已建表,此處補 INDEX(部分環境 create 不跑 Index)
|
||||
await conn.execute(
|
||||
text("""
|
||||
CREATE INDEX IF NOT EXISTS ix_ai_governance_event_type
|
||||
ON ai_governance_events (event_type);
|
||||
CREATE INDEX IF NOT EXISTS ix_ai_governance_triggered_at
|
||||
ON ai_governance_events (triggered_at);
|
||||
CREATE INDEX IF NOT EXISTS ix_ai_governance_resolved
|
||||
ON ai_governance_events (resolved);
|
||||
""")
|
||||
)
|
||||
|
||||
|
||||
async def close_db() -> None:
|
||||
"""
|
||||
|
||||
@@ -1116,3 +1116,62 @@ class AgentSession(Base):
|
||||
# 查詢某 session 中特定 role 的 turn(Coordinator 聚合時常用)
|
||||
Index("ix_agent_sessions_session_role", "session_id", "agent_role"),
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# AiGovernanceEvent — Phase 6 自我治理事件溯源(不可刪除)
|
||||
# ADR-087: AI 自我治理閉環:SLO 違反 / 信任漂移 / KB 腐爛 / 自我降級
|
||||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
|
||||
#
|
||||
# 核心鐵律:
|
||||
# - 不可變 Event Sourcing — 只 INSERT,禁止 UPDATE/DELETE
|
||||
# - 所有治理事件必須落地 PG,SLO dashboard 依賴此表
|
||||
# - resolved=True 僅由人工或下次計算時補填,不可自動翻轉未解決項目
|
||||
# =============================================================================
|
||||
|
||||
class AiGovernanceEvent(Base):
|
||||
"""
|
||||
AI 自我治理事件記錄(不可變)
|
||||
|
||||
event_type 值:
|
||||
slo_violation — SLO 計算結果違反閾值
|
||||
trust_drift — Playbook 信任度分布偏態(全高或全低)
|
||||
kb_stale — KB 條目引用已廢棄 K8s API / Prometheus query
|
||||
self_demotion — 信心閾值自動調高(自我降級)
|
||||
conservative_mode — 連續 SLO 違反,全系統切保守模式
|
||||
replay_degraded — 離線回放一致率連續下降
|
||||
|
||||
immutable — 只 INSERT,禁 UPDATE / DELETE
|
||||
"""
|
||||
__tablename__ = "ai_governance_events"
|
||||
|
||||
id: Mapped[str] = mapped_column(
|
||||
String(36), primary_key=True, default=generate_uuid,
|
||||
comment="主鍵(UUID)"
|
||||
)
|
||||
event_type: Mapped[str] = mapped_column(
|
||||
String(40), nullable=False,
|
||||
comment="slo_violation / trust_drift / kb_stale / self_demotion / conservative_mode / replay_degraded"
|
||||
)
|
||||
triggered_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), default=taipei_now, nullable=False,
|
||||
comment="事件觸發時間(台北時區)"
|
||||
)
|
||||
details: Mapped[dict] = mapped_column(
|
||||
JSON, nullable=False, default=dict,
|
||||
comment="事件詳情 JSONB(SLO 數值、漂移分布等)"
|
||||
)
|
||||
resolved: Mapped[bool] = mapped_column(
|
||||
default=False, nullable=False,
|
||||
comment="是否已解決(人工確認或下次計算恢復正常後補填)"
|
||||
)
|
||||
resolved_at: Mapped[datetime | None] = mapped_column(
|
||||
DateTime(timezone=True), nullable=True,
|
||||
comment="解決時間(僅人工/系統補填,不得自動反轉未解決項目)"
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index("ix_ai_governance_event_type", "event_type"),
|
||||
Index("ix_ai_governance_triggered_at", "triggered_at"),
|
||||
Index("ix_ai_governance_resolved", "resolved"),
|
||||
)
|
||||
|
||||
247
apps/api/src/jobs/kb_rot_cleaner.py
Normal file
247
apps/api/src/jobs/kb_rot_cleaner.py
Normal file
@@ -0,0 +1,247 @@
|
||||
"""
|
||||
AWOOOI AIOps Phase 6 — KB 腐爛清理 Job
|
||||
=======================================
|
||||
職責:月度巡檢知識庫(knowledge_entries)中腐爛的知識條目,
|
||||
標記引用了已廢棄資源的條目為 stale,並寫入 ai_governance_events。
|
||||
|
||||
「腐爛」的三種形態:
|
||||
ROT-1 廢棄 K8s API 版本引用(extensions/v1beta1、apps/v1beta1、v1beta2)
|
||||
ROT-2 過時 Prometheus query pattern(已知廢棄 metric 名稱前綴)
|
||||
ROT-3 超過 180 天未被引用且成功率為 0 的 incident_case 條目
|
||||
|
||||
設計原則:
|
||||
1. 只讀掃描 + 標記(不刪除任何 entry,符合 archive_not_delete 鐵律)
|
||||
2. 標記方式:status = 'archived' + tags 追加 'kb_rot_detected'
|
||||
3. 掃描失敗 → 記錄 error,不拋出,不影響主路徑
|
||||
4. 每次執行結果寫 ai_governance_events(event_type=kb_stale)
|
||||
|
||||
ADR-087: AI 自我治理閉環
|
||||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import timedelta
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import select, update
|
||||
|
||||
from src.db.base import get_session_factory
|
||||
from src.db.models import AiGovernanceEvent, KnowledgeEntryRecord
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# 腐爛偵測規則(不可寫死 action,只標記 stale)
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
# ROT-1: 廢棄 K8s API 版本(Kubernetes 1.16+ 已移除)
|
||||
DEPRECATED_K8S_APIS = [
|
||||
"extensions/v1beta1",
|
||||
"apps/v1beta1",
|
||||
"apps/v1beta2",
|
||||
"networking.k8s.io/v1beta1",
|
||||
"policy/v1beta1",
|
||||
"rbac.authorization.k8s.io/v1beta1",
|
||||
]
|
||||
|
||||
# ROT-2: 廢棄 Prometheus metric 前綴(已知改名的 metric pattern)
|
||||
DEPRECATED_PROM_PATTERNS = [
|
||||
r"container_cpu_used_total", # → container_cpu_usage_seconds_total
|
||||
r"kube_pod_container_status_restarts$", # → kube_pod_container_status_restarts_total
|
||||
r"http_requests_total\{.*le=", # 錯誤 histogram 用法
|
||||
]
|
||||
|
||||
# ROT-3: 未引用 + 零成功率條目的老化天數
|
||||
STALE_AGE_DAYS = 180
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Data Types
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class RotScanResult:
|
||||
"""KB 腐爛掃描結果"""
|
||||
total_scanned: int
|
||||
stale_ids: list[str] = field(default_factory=list)
|
||||
rot_reasons: dict[str, list[str]] = field(default_factory=dict)
|
||||
# rot_reasons: {entry_id: ["ROT-1: extensions/v1beta1", ...]}
|
||||
scanned_at: str = field(default_factory=lambda: now_taipei().isoformat())
|
||||
|
||||
@property
|
||||
def stale_count(self) -> int:
|
||||
return len(self.stale_ids)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"total_scanned": self.total_scanned,
|
||||
"stale_count": self.stale_count,
|
||||
"stale_ids": self.stale_ids[:50], # 最多記錄前 50 個
|
||||
"rot_reasons_sample": {k: v for k, v in list(self.rot_reasons.items())[:10]},
|
||||
"scanned_at": self.scanned_at,
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Main Job
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class KbRotCleaner:
|
||||
"""
|
||||
KB 腐爛清理 Job(月度執行)
|
||||
|
||||
Usage:
|
||||
cleaner = KbRotCleaner()
|
||||
result = await cleaner.run()
|
||||
"""
|
||||
|
||||
async def run(self) -> RotScanResult:
|
||||
"""
|
||||
完整執行:掃描 → 標記 stale → 寫 governance event。
|
||||
|
||||
Returns:
|
||||
RotScanResult
|
||||
"""
|
||||
from src.core.feature_flags import aiops_flags
|
||||
if not aiops_flags.is_sub_flag_enabled("AIOPS_P6_KB_ROT_CLEANER"):
|
||||
logger.info("kb_rot_cleaner_skipped_feature_flag")
|
||||
return RotScanResult(total_scanned=0)
|
||||
|
||||
try:
|
||||
result = await self._scan()
|
||||
if result.stale_count > 0:
|
||||
await self._mark_stale(result)
|
||||
await self._save_event(result)
|
||||
else:
|
||||
logger.info("kb_rot_scan_clean", total_scanned=result.total_scanned)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.error("kb_rot_cleaner_error", error=str(e))
|
||||
return RotScanResult(total_scanned=0)
|
||||
|
||||
async def _scan(self) -> RotScanResult:
|
||||
"""掃描所有 approved / draft 條目,找出腐爛項目。"""
|
||||
stale_ids: list[str] = []
|
||||
rot_reasons: dict[str, list[str]] = {}
|
||||
total = 0
|
||||
|
||||
async with get_session_factory()() as session:
|
||||
# 只掃 active 狀態(非 archived)
|
||||
q = await session.execute(
|
||||
select(KnowledgeEntryRecord).where(
|
||||
KnowledgeEntryRecord.status.in_(["approved", "draft", "review"])
|
||||
)
|
||||
)
|
||||
entries = q.scalars().all()
|
||||
total = len(entries)
|
||||
|
||||
stale_cutoff = now_taipei() - timedelta(days=STALE_AGE_DAYS)
|
||||
|
||||
for entry in entries:
|
||||
reasons: list[str] = []
|
||||
|
||||
content = (entry.content or "").lower()
|
||||
title = (entry.title or "").lower()
|
||||
combined = content + " " + title
|
||||
|
||||
# ROT-1: 廢棄 K8s API
|
||||
for api in DEPRECATED_K8S_APIS:
|
||||
if api.lower() in combined:
|
||||
reasons.append(f"ROT-1: 廢棄 K8s API {api}")
|
||||
|
||||
# ROT-2: 廢棄 Prometheus pattern
|
||||
for pattern in DEPRECATED_PROM_PATTERNS:
|
||||
if re.search(pattern, combined):
|
||||
reasons.append(f"ROT-2: 廢棄 Prom metric pattern {pattern[:40]}")
|
||||
|
||||
# ROT-3: 老化未引用(incident_case 且 180 天未更新)
|
||||
if (
|
||||
entry.entry_type == "incident_case"
|
||||
and entry.updated_at < stale_cutoff
|
||||
and entry.view_count == 0
|
||||
):
|
||||
reasons.append(
|
||||
f"ROT-3: 超過 {STALE_AGE_DAYS}d 未引用 "
|
||||
f"(last_updated={entry.updated_at.strftime('%Y-%m-%d')})"
|
||||
)
|
||||
|
||||
if reasons:
|
||||
stale_ids.append(entry.id)
|
||||
rot_reasons[entry.id] = reasons
|
||||
|
||||
logger.info(
|
||||
"kb_rot_scan_complete",
|
||||
total=total,
|
||||
stale_count=len(stale_ids),
|
||||
)
|
||||
return RotScanResult(
|
||||
total_scanned=total,
|
||||
stale_ids=stale_ids,
|
||||
rot_reasons=rot_reasons,
|
||||
)
|
||||
|
||||
async def _mark_stale(self, result: RotScanResult) -> None:
|
||||
"""
|
||||
將腐爛條目標記為 archived,並追加 kb_rot_detected tag。
|
||||
|
||||
符合 archive_not_delete 鐵律:只封存,不刪除。
|
||||
"""
|
||||
if not result.stale_ids:
|
||||
return
|
||||
|
||||
async with get_session_factory()() as session:
|
||||
# 逐條更新(避免 bulk update 覆蓋 tags JSONB)
|
||||
q = await session.execute(
|
||||
select(KnowledgeEntryRecord).where(
|
||||
KnowledgeEntryRecord.id.in_(result.stale_ids)
|
||||
)
|
||||
)
|
||||
entries = q.scalars().all()
|
||||
|
||||
for entry in entries:
|
||||
entry.status = "archived"
|
||||
tags = list(entry.tags or [])
|
||||
if "kb_rot_detected" not in tags:
|
||||
tags.append("kb_rot_detected")
|
||||
entry.tags = tags
|
||||
|
||||
await session.commit()
|
||||
|
||||
logger.warning(
|
||||
"kb_rot_entries_archived",
|
||||
count=len(result.stale_ids),
|
||||
entry_ids=result.stale_ids[:10],
|
||||
)
|
||||
|
||||
async def _save_event(self, result: RotScanResult) -> None:
|
||||
"""寫 kb_stale 事件到 ai_governance_events。"""
|
||||
try:
|
||||
async with get_session_factory()() as session:
|
||||
event = AiGovernanceEvent(
|
||||
event_type="kb_stale",
|
||||
details=result.to_dict(),
|
||||
resolved=False,
|
||||
)
|
||||
session.add(event)
|
||||
await session.commit()
|
||||
logger.info("kb_rot_event_saved", stale_count=result.stale_count)
|
||||
except Exception as e:
|
||||
logger.error("kb_rot_event_save_error", error=str(e))
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Singleton
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
_cleaner: KbRotCleaner | None = None
|
||||
|
||||
|
||||
def get_kb_rot_cleaner() -> KbRotCleaner:
|
||||
global _cleaner
|
||||
if _cleaner is None:
|
||||
_cleaner = KbRotCleaner()
|
||||
return _cleaner
|
||||
418
apps/api/src/services/ai_slo_calculator.py
Normal file
418
apps/api/src/services/ai_slo_calculator.py
Normal file
@@ -0,0 +1,418 @@
|
||||
"""
|
||||
AWOOOI AIOps Phase 6 — AI SLO 計算器(決策品質自我監控)
|
||||
=========================================================
|
||||
職責:滾動計算三大 AI 決策品質 SLO;違反閾值時寫入 ai_governance_events,
|
||||
供 decision_manager 自我降級邏輯讀取。
|
||||
|
||||
三大 SLO(MASTER §3.6 ADR-087):
|
||||
SLO-1 auto_execute_success_rate > 85% (7d 滾動)
|
||||
SLO-2 human_override_rate < 20% (7d 滾動)
|
||||
SLO-3 verifier_false_neg_rate < 5% (7d 滾動,proxy: 2h 內重複告警)
|
||||
|
||||
設計原則:
|
||||
1. 純讀 + 純寫分離 — calculate() 只讀 DB,save_event() 只寫 DB
|
||||
2. 計算失敗 → 保守:假設 SLO 違反,寫 violation 事件
|
||||
3. 所有結果快取 Redis(key: ai:slo:latest, TTL 5min),避免高頻查 DB
|
||||
4. 不自動解決舊 violation — resolved 只能人工或下次「全部通過」時補填
|
||||
|
||||
ADR-087: AI 自我治理閉環
|
||||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import timedelta
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import func, select, text
|
||||
|
||||
from src.db.base import get_session_factory
|
||||
from src.db.models import AiGovernanceEvent, AutoRepairExecution, ApprovalRecord
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# SLO 閾值(MASTER §3.6 鐵律,修改前需 ADR-087 更新)
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
SLO_AUTO_SUCCESS_MIN: float = 0.85 # auto_execute 成功率下限
|
||||
SLO_OVERRIDE_RATE_MAX: float = 0.20 # 人工推翻率上限
|
||||
SLO_FALSE_NEG_MAX: float = 0.05 # verifier false negative 上限
|
||||
|
||||
SLO_WINDOW_DAYS: int = 7 # 滾動視窗(天)
|
||||
SLO_MIN_SAMPLES: int = 5 # 最少樣本數,低於此不計算(資料不足)
|
||||
|
||||
REDIS_KEY = "ai:slo:latest"
|
||||
REDIS_TTL_SEC = 300 # 5 分鐘快取
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Data Types
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class SloMetric:
|
||||
"""單一 SLO 指標"""
|
||||
name: str
|
||||
value: float | None # None = 樣本不足,跳過
|
||||
threshold: float
|
||||
direction: str # "above" = 需高於閾值 / "below" = 需低於閾值
|
||||
sample_count: int
|
||||
violated: bool # 是否違反(None → False,不觸發降級)
|
||||
|
||||
@property
|
||||
def label(self) -> str:
|
||||
if self.value is None:
|
||||
return f"{self.name}: N/A(樣本 {self.sample_count} < {SLO_MIN_SAMPLES})"
|
||||
pct = f"{self.value:.1%}"
|
||||
thr = f"{self.threshold:.0%}"
|
||||
op = ">" if self.direction == "above" else "<"
|
||||
status = "❌ 違反" if self.violated else "✅ 合規"
|
||||
return f"{self.name}: {pct} (需 {op}{thr}) {status}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SloReport:
|
||||
"""完整 SLO 計算報告"""
|
||||
metrics: list[SloMetric] = field(default_factory=list)
|
||||
any_violated: bool = False
|
||||
calculated_at: str = field(default_factory=lambda: now_taipei().isoformat())
|
||||
window_days: int = SLO_WINDOW_DAYS
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"calculated_at": self.calculated_at,
|
||||
"window_days": self.window_days,
|
||||
"any_violated": self.any_violated,
|
||||
"metrics": [
|
||||
{
|
||||
"name": m.name,
|
||||
"value": m.value,
|
||||
"threshold": m.threshold,
|
||||
"direction": m.direction,
|
||||
"sample_count": m.sample_count,
|
||||
"violated": m.violated,
|
||||
"label": m.label,
|
||||
}
|
||||
for m in self.metrics
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Main Service
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class AiSloCalculator:
|
||||
"""
|
||||
AI 決策品質 SLO 計算器
|
||||
|
||||
Usage:
|
||||
calc = AiSloCalculator()
|
||||
report = await calc.calculate()
|
||||
if report.any_violated:
|
||||
await calc.save_violation_event(report)
|
||||
"""
|
||||
|
||||
async def calculate(self) -> SloReport:
|
||||
"""
|
||||
計算三大 SLO 指標(7d 滾動視窗)。
|
||||
|
||||
Returns:
|
||||
SloReport(計算失敗時保守回傳 any_violated=True)
|
||||
"""
|
||||
try:
|
||||
since = now_taipei() - timedelta(days=SLO_WINDOW_DAYS)
|
||||
|
||||
async with get_session_factory()() as session:
|
||||
slo1 = await self._calc_auto_success_rate(session, since)
|
||||
slo2 = await self._calc_human_override_rate(session, since)
|
||||
slo3 = await self._calc_false_neg_rate(session, since)
|
||||
|
||||
metrics = [slo1, slo2, slo3]
|
||||
any_violated = any(m.violated for m in metrics)
|
||||
|
||||
report = SloReport(
|
||||
metrics=metrics,
|
||||
any_violated=any_violated,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"slo_calculated",
|
||||
any_violated=any_violated,
|
||||
slo1=slo1.value,
|
||||
slo2=slo2.value,
|
||||
slo3=slo3.value,
|
||||
)
|
||||
return report
|
||||
|
||||
except Exception as e:
|
||||
logger.error("slo_calculation_error", error=str(e))
|
||||
# 保守:計算失敗 → 假設違反
|
||||
violated_metric = SloMetric(
|
||||
name="calculation_error",
|
||||
value=None,
|
||||
threshold=0.0,
|
||||
direction="above",
|
||||
sample_count=0,
|
||||
violated=True,
|
||||
)
|
||||
return SloReport(
|
||||
metrics=[violated_metric],
|
||||
any_violated=True,
|
||||
)
|
||||
|
||||
async def get_cached_report(self) -> SloReport | None:
|
||||
"""從 Redis 讀取最近一次 SLO 報告(5min 快取)。"""
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
redis = get_redis()
|
||||
raw = await redis.get(REDIS_KEY)
|
||||
if raw:
|
||||
data = json.loads(raw)
|
||||
metrics = [
|
||||
SloMetric(
|
||||
name=m["name"],
|
||||
value=m["value"],
|
||||
threshold=m["threshold"],
|
||||
direction=m["direction"],
|
||||
sample_count=m["sample_count"],
|
||||
violated=m["violated"],
|
||||
)
|
||||
for m in data.get("metrics", [])
|
||||
]
|
||||
return SloReport(
|
||||
metrics=metrics,
|
||||
any_violated=data.get("any_violated", False),
|
||||
calculated_at=data.get("calculated_at", ""),
|
||||
window_days=data.get("window_days", SLO_WINDOW_DAYS),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("slo_cache_read_error", error=str(e))
|
||||
return None
|
||||
|
||||
async def cache_report(self, report: SloReport) -> None:
|
||||
"""將 SLO 報告存入 Redis 快取(TTL 5min)。"""
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
redis = get_redis()
|
||||
await redis.set(REDIS_KEY, json.dumps(report.to_dict()), ex=REDIS_TTL_SEC)
|
||||
except Exception as e:
|
||||
logger.warning("slo_cache_write_error", error=str(e))
|
||||
|
||||
async def save_violation_event(self, report: SloReport) -> None:
|
||||
"""
|
||||
將 SLO 違反寫入 ai_governance_events。
|
||||
|
||||
只在 any_violated=True 時呼叫。不管舊違反是否解決。
|
||||
"""
|
||||
try:
|
||||
async with get_session_factory()() as session:
|
||||
event = AiGovernanceEvent(
|
||||
event_type="slo_violation",
|
||||
details=report.to_dict(),
|
||||
resolved=False,
|
||||
)
|
||||
session.add(event)
|
||||
await session.commit()
|
||||
logger.warning(
|
||||
"slo_violation_recorded",
|
||||
violated_metrics=[m.name for m in report.metrics if m.violated],
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("slo_violation_save_error", error=str(e))
|
||||
|
||||
async def run(self) -> SloReport:
|
||||
"""
|
||||
完整執行:計算 → 快取 → 如違反則寫事件。
|
||||
|
||||
Returns:
|
||||
SloReport
|
||||
"""
|
||||
report = await self.calculate()
|
||||
await self.cache_report(report)
|
||||
if report.any_violated:
|
||||
await self.save_violation_event(report)
|
||||
return report
|
||||
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
# Private: SLO 計算方法
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _calc_auto_success_rate(self, session, since) -> SloMetric:
|
||||
"""SLO-1: auto_repair_executions 7d 成功率。"""
|
||||
try:
|
||||
total_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
AutoRepairExecution.created_at >= since
|
||||
)
|
||||
)
|
||||
total: int = total_q.scalar() or 0
|
||||
|
||||
if total < SLO_MIN_SAMPLES:
|
||||
return SloMetric(
|
||||
name="auto_execute_success_rate",
|
||||
value=None,
|
||||
threshold=SLO_AUTO_SUCCESS_MIN,
|
||||
direction="above",
|
||||
sample_count=total,
|
||||
violated=False,
|
||||
)
|
||||
|
||||
success_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
AutoRepairExecution.created_at >= since,
|
||||
AutoRepairExecution.success.is_(True),
|
||||
)
|
||||
)
|
||||
success: int = success_q.scalar() or 0
|
||||
rate = success / total
|
||||
|
||||
return SloMetric(
|
||||
name="auto_execute_success_rate",
|
||||
value=rate,
|
||||
threshold=SLO_AUTO_SUCCESS_MIN,
|
||||
direction="above",
|
||||
sample_count=total,
|
||||
violated=rate < SLO_AUTO_SUCCESS_MIN,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("slo1_calc_error", error=str(e))
|
||||
return SloMetric(
|
||||
name="auto_execute_success_rate",
|
||||
value=None, threshold=SLO_AUTO_SUCCESS_MIN,
|
||||
direction="above", sample_count=0, violated=False,
|
||||
)
|
||||
|
||||
async def _calc_human_override_rate(self, session, since) -> SloMetric:
|
||||
"""
|
||||
SLO-2: 人工推翻率 = AI 提案被 rejected / 總 AI 提案。
|
||||
|
||||
rejected = approval_records.status = 'rejected'
|
||||
AI 提案 = requested_by LIKE 'ai_%' or 'system'
|
||||
"""
|
||||
try:
|
||||
ai_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
ApprovalRecord.created_at >= since,
|
||||
)
|
||||
)
|
||||
total: int = ai_q.scalar() or 0
|
||||
|
||||
if total < SLO_MIN_SAMPLES:
|
||||
return SloMetric(
|
||||
name="human_override_rate",
|
||||
value=None,
|
||||
threshold=SLO_OVERRIDE_RATE_MAX,
|
||||
direction="below",
|
||||
sample_count=total,
|
||||
violated=False,
|
||||
)
|
||||
|
||||
rejected_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
ApprovalRecord.created_at >= since,
|
||||
ApprovalRecord.status == "rejected",
|
||||
)
|
||||
)
|
||||
rejected: int = rejected_q.scalar() or 0
|
||||
rate = rejected / total
|
||||
|
||||
return SloMetric(
|
||||
name="human_override_rate",
|
||||
value=rate,
|
||||
threshold=SLO_OVERRIDE_RATE_MAX,
|
||||
direction="below",
|
||||
sample_count=total,
|
||||
violated=rate > SLO_OVERRIDE_RATE_MAX,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("slo2_calc_error", error=str(e))
|
||||
return SloMetric(
|
||||
name="human_override_rate",
|
||||
value=None, threshold=SLO_OVERRIDE_RATE_MAX,
|
||||
direction="below", sample_count=0, violated=False,
|
||||
)
|
||||
|
||||
async def _calc_false_neg_rate(self, session, since) -> SloMetric:
|
||||
"""
|
||||
SLO-3: Verifier false negative(代理指標)。
|
||||
|
||||
計算方式:auto_repair 執行後 2 小時內同 incident_id 再次出現
|
||||
在 auto_repair_executions 中(= 修好了又壞 = verifier 誤判為成功)。
|
||||
|
||||
使用 SQL window function:
|
||||
- 找出 success=True 的執行
|
||||
- 計算同 incident_id 下是否有後續 failed 執行在 2h 內
|
||||
"""
|
||||
try:
|
||||
result = await session.execute(
|
||||
text("""
|
||||
WITH success_runs AS (
|
||||
SELECT incident_id, created_at
|
||||
FROM auto_repair_executions
|
||||
WHERE success = TRUE
|
||||
AND created_at >= :since
|
||||
),
|
||||
false_negs AS (
|
||||
SELECT DISTINCT s.incident_id
|
||||
FROM success_runs s
|
||||
JOIN auto_repair_executions f
|
||||
ON f.incident_id = s.incident_id
|
||||
AND f.success = FALSE
|
||||
AND f.created_at > s.created_at
|
||||
AND f.created_at <= s.created_at + INTERVAL '2 hours'
|
||||
)
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM success_runs) AS total_success,
|
||||
(SELECT COUNT(*) FROM false_negs) AS false_neg_count
|
||||
"""),
|
||||
{"since": since},
|
||||
)
|
||||
row = result.fetchone()
|
||||
total_success: int = row[0] if row else 0
|
||||
false_neg: int = row[1] if row else 0
|
||||
|
||||
if total_success < SLO_MIN_SAMPLES:
|
||||
return SloMetric(
|
||||
name="verifier_false_neg_rate",
|
||||
value=None,
|
||||
threshold=SLO_FALSE_NEG_MAX,
|
||||
direction="below",
|
||||
sample_count=total_success,
|
||||
violated=False,
|
||||
)
|
||||
|
||||
rate = false_neg / total_success
|
||||
return SloMetric(
|
||||
name="verifier_false_neg_rate",
|
||||
value=rate,
|
||||
threshold=SLO_FALSE_NEG_MAX,
|
||||
direction="below",
|
||||
sample_count=total_success,
|
||||
violated=rate > SLO_FALSE_NEG_MAX,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("slo3_calc_error", error=str(e))
|
||||
return SloMetric(
|
||||
name="verifier_false_neg_rate",
|
||||
value=None, threshold=SLO_FALSE_NEG_MAX,
|
||||
direction="below", sample_count=0, violated=False,
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Singleton
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
_calculator: AiSloCalculator | None = None
|
||||
|
||||
|
||||
def get_ai_slo_calculator() -> AiSloCalculator:
|
||||
global _calculator
|
||||
if _calculator is None:
|
||||
_calculator = AiSloCalculator()
|
||||
return _calculator
|
||||
@@ -272,14 +272,21 @@ class ApprovalDBService:
|
||||
|
||||
查詢條件:
|
||||
1. 相同指紋
|
||||
2. 狀態為 PENDING,或
|
||||
3. 在 debounce_minutes 分鐘內建立
|
||||
2. 狀態為 PENDING 且在 24 小時內建立(超過 24h 的 PENDING 視為過期,不再收斂)
|
||||
3. 或在 debounce_minutes 分鐘內建立(不論狀態)
|
||||
|
||||
ADR-073 補丁 2026-04-15 ogt + Claude Sonnet 4.6:
|
||||
原邏輯 PENDING 無 TTL → 3 天前 PENDING 記錄永久封鎖同指紋告警。
|
||||
修正:PENDING 收斂窗口上限 PENDING_TTL_HOURS(24h)。
|
||||
|
||||
Returns:
|
||||
ApprovalRequest if found, None otherwise
|
||||
"""
|
||||
PENDING_TTL_HOURS = 24 # PENDING 記錄最長收斂時效(超過則視為已過期)
|
||||
|
||||
now = datetime.now(UTC)
|
||||
cutoff_time = now - timedelta(minutes=debounce_minutes)
|
||||
pending_cutoff = now - timedelta(hours=PENDING_TTL_HOURS)
|
||||
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
@@ -287,7 +294,12 @@ class ApprovalDBService:
|
||||
.where(ApprovalRecord.fingerprint == fingerprint)
|
||||
.where(
|
||||
or_(
|
||||
ApprovalRecord.status == ApprovalStatus.PENDING,
|
||||
# PENDING 狀態但必須在 24h 內,防止老 PENDING 永久封鎖
|
||||
and_(
|
||||
ApprovalRecord.status == ApprovalStatus.PENDING,
|
||||
ApprovalRecord.created_at >= pending_cutoff,
|
||||
),
|
||||
# 最近 debounce_minutes 分鐘內建立的任何記錄
|
||||
ApprovalRecord.created_at >= cutoff_time,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -1316,6 +1316,122 @@ class DecisionManager:
|
||||
"""
|
||||
action = token.proposal_data.get("kubectl_command", "")
|
||||
|
||||
# Phase 6 ADR-087: 自我降級守衛(AIOPS_P6_SELF_DEMOTION 控制)
|
||||
# SLO 違反 → 全域信心閾值調高;連續違反 → 保守模式,所有自動執行降為人工
|
||||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
|
||||
try:
|
||||
from src.core.feature_flags import aiops_flags as _p6_flags
|
||||
if _p6_flags.is_sub_flag_enabled("AIOPS_P6_SELF_DEMOTION"):
|
||||
from src.db.base import get_session_factory as _p6_sf
|
||||
from src.db.models import AiGovernanceEvent as _GovernanceEvent
|
||||
from sqlalchemy import select as _p6_select, func as _p6_func
|
||||
from datetime import timedelta as _p6_td
|
||||
|
||||
_now = __import__("src.utils.timezone", fromlist=["now_taipei"]).now_taipei()
|
||||
|
||||
async with _p6_sf()() as _p6_sess:
|
||||
# 過去 7 天有幾筆未解決的 slo_violation?
|
||||
_viol_7d_q = await _p6_sess.execute(
|
||||
_p6_select(_p6_func.count()).where(
|
||||
_GovernanceEvent.event_type == "slo_violation",
|
||||
_GovernanceEvent.resolved.is_(False),
|
||||
_GovernanceEvent.triggered_at >= _now - _p6_td(days=7),
|
||||
)
|
||||
)
|
||||
_viol_7d: int = _viol_7d_q.scalar() or 0
|
||||
|
||||
# 過去 14 天有幾筆未解決的 slo_violation?
|
||||
_viol_14d_q = await _p6_sess.execute(
|
||||
_p6_select(_p6_func.count()).where(
|
||||
_GovernanceEvent.event_type == "slo_violation",
|
||||
_GovernanceEvent.resolved.is_(False),
|
||||
_GovernanceEvent.triggered_at >= _now - _p6_td(days=14),
|
||||
)
|
||||
)
|
||||
_viol_14d: int = _viol_14d_q.scalar() or 0
|
||||
|
||||
if _viol_14d >= 2:
|
||||
# 連續 2 週違反 → 保守模式:全部降為人工
|
||||
logger.warning(
|
||||
"auto_execute_conservative_mode",
|
||||
incident_id=incident.incident_id,
|
||||
viol_14d=_viol_14d,
|
||||
reason="Phase 6 保守模式:連續 SLO 違反,所有自動執行暫停",
|
||||
)
|
||||
token.state = DecisionState.READY
|
||||
token.proposal_data["decision_state"] = DecisionState.READY.value
|
||||
token.proposal_data["auto_executed"] = False
|
||||
token.proposal_data["p6_conservative_mode"] = True
|
||||
token.proposal_data["p6_reason"] = f"SLO 連續違反 {_viol_14d}d,系統進入保守模式"
|
||||
await self._save_token(token)
|
||||
_fire_and_forget(
|
||||
_push_decision_to_telegram(incident, token.proposal_data)
|
||||
)
|
||||
# 記錄保守模式事件
|
||||
try:
|
||||
from src.db.base import get_session_factory as _p6_sf2
|
||||
async with _p6_sf2()() as _s2:
|
||||
_s2.add(_GovernanceEvent(
|
||||
event_type="conservative_mode",
|
||||
details={
|
||||
"incident_id": incident.incident_id,
|
||||
"viol_14d": _viol_14d,
|
||||
"triggered_at": _now.isoformat(),
|
||||
},
|
||||
resolved=False,
|
||||
))
|
||||
await _s2.commit()
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
|
||||
elif _viol_7d >= 1:
|
||||
# 近 7 天有違反 → 自我降級:信心閾值提高,記錄 demotion 事件
|
||||
_confidence = float(token.proposal_data.get("confidence", 0.0))
|
||||
_raised_threshold = 0.75 # 原 0.70 → 調高 0.05
|
||||
if _confidence < _raised_threshold:
|
||||
logger.warning(
|
||||
"auto_execute_self_demoted",
|
||||
incident_id=incident.incident_id,
|
||||
confidence=_confidence,
|
||||
raised_threshold=_raised_threshold,
|
||||
reason="Phase 6 自我降級:近 7d SLO 違反,信心閾值提高",
|
||||
)
|
||||
token.state = DecisionState.READY
|
||||
token.proposal_data["decision_state"] = DecisionState.READY.value
|
||||
token.proposal_data["auto_executed"] = False
|
||||
token.proposal_data["p6_self_demoted"] = True
|
||||
token.proposal_data["p6_reason"] = (
|
||||
f"Phase 6 自我降級:近 7d SLO 違反,"
|
||||
f"信心 {_confidence:.2f} < {_raised_threshold},升為人工"
|
||||
)
|
||||
await self._save_token(token)
|
||||
_fire_and_forget(
|
||||
_push_decision_to_telegram(incident, token.proposal_data)
|
||||
)
|
||||
try:
|
||||
from src.db.base import get_session_factory as _p6_sf3
|
||||
async with _p6_sf3()() as _s3:
|
||||
_s3.add(_GovernanceEvent(
|
||||
event_type="self_demotion",
|
||||
details={
|
||||
"incident_id": incident.incident_id,
|
||||
"confidence": _confidence,
|
||||
"raised_threshold": _raised_threshold,
|
||||
"viol_7d": _viol_7d,
|
||||
"triggered_at": _now.isoformat(),
|
||||
},
|
||||
resolved=False,
|
||||
))
|
||||
await _s3.commit()
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
# confidence >= raised_threshold → 允許繼續自動執行
|
||||
except Exception as _p6_err:
|
||||
logger.warning("p6_self_demotion_check_error", error=str(_p6_err))
|
||||
# 保守:P6 check 出錯 → 不阻擋(避免因 P6 bug 把所有修復都堵住)
|
||||
|
||||
# ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt)
|
||||
# LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y"
|
||||
# | 後面是查詢指令,取第一個才是真正的修復操作
|
||||
|
||||
243
apps/api/src/services/trust_drift_detector.py
Normal file
243
apps/api/src/services/trust_drift_detector.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""
|
||||
AWOOOI AIOps Phase 6 — Trust Drift Detector(信任度漂移偵測器)
|
||||
===============================================================
|
||||
職責:偵測 Playbook trust_score 分布的兩種極端偏態:
|
||||
|
||||
極端 A「盲目樂觀」:> 70% Playbook trust_score > 0.9
|
||||
→ 可能是 PostExecutionVerifier 失效,或 RAG 資料被污染,讓所有 AI 都以為「我很棒」
|
||||
→ 真正的好系統不會所有 Playbook 都高分
|
||||
|
||||
極端 B「學習鎖死」:> 70% Playbook trust_score < 0.3
|
||||
→ 可能是 EWMA 計算出錯,或所有執行都被誤判失敗,讓 AI 對自己完全沒信心
|
||||
→ 學習機制可能卡死
|
||||
|
||||
設計原則:
|
||||
1. 只讀 DB,不修改任何數據
|
||||
2. 違反 → 寫 trust_drift 事件到 ai_governance_events
|
||||
3. 樣本不足(< 10 個 approved Playbook)→ 跳過偵測,不告警
|
||||
|
||||
ADR-087: AI 自我治理閉環
|
||||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import func, select
|
||||
|
||||
from src.db.base import get_session_factory
|
||||
from src.db.models import AiGovernanceEvent, PlaybookRecord
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# 偵測閾值(MASTER §3.6,修改需 ADR-087 更新)
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
DRIFT_HIGH_THRESHOLD: float = 0.9 # trust_score > 此值算「過高」
|
||||
DRIFT_LOW_THRESHOLD: float = 0.3 # trust_score < 此值算「過低」
|
||||
DRIFT_RATIO_TRIGGER: float = 0.70 # 超過 70% Playbook 落在極端 → 觸發警報
|
||||
DRIFT_MIN_SAMPLES: int = 10 # 最少 approved Playbook 數量
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Data Types
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class TrustDistribution:
|
||||
"""Playbook 信任度分布快照"""
|
||||
total: int
|
||||
high_count: int # trust_score > 0.9
|
||||
low_count: int # trust_score < 0.3
|
||||
mid_count: int # 0.3 <= trust_score <= 0.9(正常區間)
|
||||
high_ratio: float
|
||||
low_ratio: float
|
||||
mean_trust: float
|
||||
drift_type: str | None # "optimism_bias" / "confidence_collapse" / None
|
||||
drift_detected: bool
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"total": self.total,
|
||||
"high_count": self.high_count,
|
||||
"low_count": self.low_count,
|
||||
"mid_count": self.mid_count,
|
||||
"high_ratio": round(self.high_ratio, 4),
|
||||
"low_ratio": round(self.low_ratio, 4),
|
||||
"mean_trust": round(self.mean_trust, 4),
|
||||
"drift_type": self.drift_type,
|
||||
"drift_detected": self.drift_detected,
|
||||
"thresholds": {
|
||||
"high": DRIFT_HIGH_THRESHOLD,
|
||||
"low": DRIFT_LOW_THRESHOLD,
|
||||
"ratio_trigger": DRIFT_RATIO_TRIGGER,
|
||||
"min_samples": DRIFT_MIN_SAMPLES,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Main Service
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TrustDriftDetector:
|
||||
"""
|
||||
信任度漂移偵測器
|
||||
|
||||
Usage:
|
||||
detector = TrustDriftDetector()
|
||||
dist = await detector.detect()
|
||||
if dist.drift_detected:
|
||||
await detector.save_drift_event(dist)
|
||||
"""
|
||||
|
||||
async def detect(self) -> TrustDistribution:
|
||||
"""
|
||||
讀取所有 approved Playbook,計算信任度分布,偵測漂移。
|
||||
|
||||
Returns:
|
||||
TrustDistribution(樣本不足時 drift_detected=False)
|
||||
"""
|
||||
try:
|
||||
async with get_session_factory()() as session:
|
||||
# 只計算 approved 狀態的 Playbook
|
||||
total_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
PlaybookRecord.status == "approved"
|
||||
)
|
||||
)
|
||||
total: int = total_q.scalar() or 0
|
||||
|
||||
if total < DRIFT_MIN_SAMPLES:
|
||||
logger.info(
|
||||
"trust_drift_skip_insufficient_samples",
|
||||
total=total,
|
||||
required=DRIFT_MIN_SAMPLES,
|
||||
)
|
||||
return TrustDistribution(
|
||||
total=total,
|
||||
high_count=0, low_count=0, mid_count=0,
|
||||
high_ratio=0.0, low_ratio=0.0, mean_trust=0.0,
|
||||
drift_type=None, drift_detected=False,
|
||||
)
|
||||
|
||||
high_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
PlaybookRecord.status == "approved",
|
||||
PlaybookRecord.trust_score > DRIFT_HIGH_THRESHOLD,
|
||||
)
|
||||
)
|
||||
high_count: int = high_q.scalar() or 0
|
||||
|
||||
low_q = await session.execute(
|
||||
select(func.count()).where(
|
||||
PlaybookRecord.status == "approved",
|
||||
PlaybookRecord.trust_score < DRIFT_LOW_THRESHOLD,
|
||||
)
|
||||
)
|
||||
low_count: int = low_q.scalar() or 0
|
||||
|
||||
mean_q = await session.execute(
|
||||
select(func.avg(PlaybookRecord.trust_score)).where(
|
||||
PlaybookRecord.status == "approved"
|
||||
)
|
||||
)
|
||||
mean_trust: float = float(mean_q.scalar() or 0.0)
|
||||
|
||||
mid_count = total - high_count - low_count
|
||||
high_ratio = high_count / total
|
||||
low_ratio = low_count / total
|
||||
|
||||
# 偵測漂移類型
|
||||
drift_type = None
|
||||
if high_ratio >= DRIFT_RATIO_TRIGGER:
|
||||
drift_type = "optimism_bias" # 所有 Playbook 都覺得自己很好 → 可疑
|
||||
elif low_ratio >= DRIFT_RATIO_TRIGGER:
|
||||
drift_type = "confidence_collapse" # AI 對自己完全沒信心 → 學習卡死
|
||||
|
||||
dist = TrustDistribution(
|
||||
total=total,
|
||||
high_count=high_count,
|
||||
low_count=low_count,
|
||||
mid_count=mid_count,
|
||||
high_ratio=high_ratio,
|
||||
low_ratio=low_ratio,
|
||||
mean_trust=mean_trust,
|
||||
drift_type=drift_type,
|
||||
drift_detected=drift_type is not None,
|
||||
)
|
||||
|
||||
if dist.drift_detected:
|
||||
logger.warning(
|
||||
"trust_drift_detected",
|
||||
drift_type=drift_type,
|
||||
high_ratio=round(high_ratio, 3),
|
||||
low_ratio=round(low_ratio, 3),
|
||||
mean_trust=round(mean_trust, 3),
|
||||
total=total,
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"trust_drift_ok",
|
||||
mean_trust=round(mean_trust, 3),
|
||||
total=total,
|
||||
high_ratio=round(high_ratio, 3),
|
||||
)
|
||||
|
||||
return dist
|
||||
|
||||
except Exception as e:
|
||||
logger.error("trust_drift_detect_error", error=str(e))
|
||||
# 保守:偵測失敗 → 不告警(不知道比亂告警好)
|
||||
return TrustDistribution(
|
||||
total=0,
|
||||
high_count=0, low_count=0, mid_count=0,
|
||||
high_ratio=0.0, low_ratio=0.0, mean_trust=0.0,
|
||||
drift_type=None, drift_detected=False,
|
||||
)
|
||||
|
||||
async def save_drift_event(self, dist: TrustDistribution) -> None:
|
||||
"""將信任度漂移事件寫入 ai_governance_events。"""
|
||||
try:
|
||||
async with get_session_factory()() as session:
|
||||
event = AiGovernanceEvent(
|
||||
event_type="trust_drift",
|
||||
details={
|
||||
**dist.to_dict(),
|
||||
"detected_at": now_taipei().isoformat(),
|
||||
},
|
||||
resolved=False,
|
||||
)
|
||||
session.add(event)
|
||||
await session.commit()
|
||||
logger.warning(
|
||||
"trust_drift_event_saved",
|
||||
drift_type=dist.drift_type,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("trust_drift_event_save_error", error=str(e))
|
||||
|
||||
async def run(self) -> TrustDistribution:
|
||||
"""完整執行:偵測 → 如有漂移則寫事件。"""
|
||||
dist = await self.detect()
|
||||
if dist.drift_detected:
|
||||
await self.save_drift_event(dist)
|
||||
return dist
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Singleton
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
_detector: TrustDriftDetector | None = None
|
||||
|
||||
|
||||
def get_trust_drift_detector() -> TrustDriftDetector:
|
||||
global _detector
|
||||
if _detector is None:
|
||||
_detector = TrustDriftDetector()
|
||||
return _detector
|
||||
Reference in New Issue
Block a user