259 lines
11 KiB
Python
259 lines
11 KiB
Python
"""
|
||
AWOOOI AIOps Phase 6 — Trust Drift Detector(信任度漂移偵測器)
|
||
===============================================================
|
||
【LIB ONLY — NO SIDE EFFECTS】
|
||
|
||
2026-05-02 ogt + Claude Sonnet 4.6(亞太): 整併雙寫路徑
|
||
背景:原本 watchdog W-6 呼叫 detector.run() 會直接寫 event_type=trust_drift 到
|
||
ai_governance_events;governance_agent.check_trust_drift() 每 1h 也寫同一 event_type。
|
||
造成雙寫、語義混淆,下游 consumer 無法區分 source-of-truth。
|
||
整併決策:governance_agent.check_trust_drift() 為唯一 source-of-truth(功能更完整:
|
||
含 auto-deprecate + Telegram 推送)。本模組降為純統計 lib,不再自行寫 PG。
|
||
|
||
職責(整併後):純統計 lib,偵測 Playbook trust_score 分布的兩種極端偏態:
|
||
|
||
極端 A「盲目樂觀」:> 70% Playbook trust_score > 0.9
|
||
→ 可能是 PostExecutionVerifier 失效,或 RAG 資料被污染,讓所有 AI 都以為「我很棒」
|
||
→ 真正的好系統不會所有 Playbook 都高分
|
||
|
||
極端 B「學習鎖死」:> 70% Playbook trust_score < 0.3
|
||
→ 可能是 EWMA 計算出錯,或所有執行都被誤判失敗,讓 AI 對自己完全沒信心
|
||
→ 學習機制可能卡死
|
||
|
||
設計原則(整併後):
|
||
1. 只讀 DB,不修改任何數據
|
||
2. detect() / run() 只回傳 TrustDistribution,不寫 ai_governance_events
|
||
3. save_drift_event() 保留供呼叫方(如需要分布事件)顯式呼叫,不在 run() 內自動觸發
|
||
4. 樣本不足(< 10 個 approved Playbook)→ 跳過偵測,不告警
|
||
5. AI 治理事件的唯一寫入點:governance_agent.check_trust_drift()
|
||
|
||
ADR-087: AI 自我治理閉環
|
||
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
|
||
2026-05-02 ogt + Claude Sonnet 4.6(亞太): 降為 lib only,移除 run() 自動 PG 寫入
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
|
||
import structlog
|
||
from sqlalchemy import func, select
|
||
|
||
from src.db.base import get_db_context
|
||
from src.db.models import AiGovernanceEvent, PlaybookRecord
|
||
from src.utils.timezone import now_taipei
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 偵測閾值(MASTER §3.6,修改需 ADR-087 更新)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
DRIFT_HIGH_THRESHOLD: float = 0.9 # trust_score > 此值算「過高」
|
||
DRIFT_LOW_THRESHOLD: float = 0.3 # trust_score < 此值算「過低」
|
||
DRIFT_RATIO_TRIGGER: float = 0.70 # 超過 70% Playbook 落在極端 → 觸發警報
|
||
DRIFT_MIN_SAMPLES: int = 10 # 最少 approved Playbook 數量
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Data Types
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class TrustDistribution:
|
||
"""Playbook 信任度分布快照"""
|
||
total: int
|
||
high_count: int # trust_score > 0.9
|
||
low_count: int # trust_score < 0.3
|
||
mid_count: int # 0.3 <= trust_score <= 0.9(正常區間)
|
||
high_ratio: float
|
||
low_ratio: float
|
||
mean_trust: float
|
||
drift_type: str | None # "optimism_bias" / "confidence_collapse" / None
|
||
drift_detected: bool
|
||
|
||
def to_dict(self) -> dict:
|
||
return {
|
||
"total": self.total,
|
||
"high_count": self.high_count,
|
||
"low_count": self.low_count,
|
||
"mid_count": self.mid_count,
|
||
"high_ratio": round(self.high_ratio, 4),
|
||
"low_ratio": round(self.low_ratio, 4),
|
||
"mean_trust": round(self.mean_trust, 4),
|
||
"drift_type": self.drift_type,
|
||
"drift_detected": self.drift_detected,
|
||
"thresholds": {
|
||
"high": DRIFT_HIGH_THRESHOLD,
|
||
"low": DRIFT_LOW_THRESHOLD,
|
||
"ratio_trigger": DRIFT_RATIO_TRIGGER,
|
||
"min_samples": DRIFT_MIN_SAMPLES,
|
||
},
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Main Service
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
class TrustDriftDetector:
|
||
"""
|
||
信任度漂移偵測器
|
||
|
||
Usage:
|
||
detector = TrustDriftDetector()
|
||
dist = await detector.detect()
|
||
if dist.drift_detected:
|
||
await detector.save_drift_event(dist)
|
||
"""
|
||
|
||
async def detect(self) -> TrustDistribution:
|
||
"""
|
||
讀取所有 approved Playbook,計算信任度分布,偵測漂移。
|
||
|
||
Returns:
|
||
TrustDistribution(樣本不足時 drift_detected=False)
|
||
"""
|
||
try:
|
||
async with get_db_context() as session:
|
||
# 只計算 approved 狀態的 Playbook
|
||
total_q = await session.execute(
|
||
select(func.count()).where(
|
||
PlaybookRecord.status == "approved"
|
||
)
|
||
)
|
||
total: int = total_q.scalar() or 0
|
||
|
||
if total < DRIFT_MIN_SAMPLES:
|
||
logger.info(
|
||
"trust_drift_skip_insufficient_samples",
|
||
total=total,
|
||
required=DRIFT_MIN_SAMPLES,
|
||
)
|
||
return TrustDistribution(
|
||
total=total,
|
||
high_count=0, low_count=0, mid_count=0,
|
||
high_ratio=0.0, low_ratio=0.0, mean_trust=0.0,
|
||
drift_type=None, drift_detected=False,
|
||
)
|
||
|
||
high_q = await session.execute(
|
||
select(func.count()).where(
|
||
PlaybookRecord.status == "approved",
|
||
PlaybookRecord.trust_score > DRIFT_HIGH_THRESHOLD,
|
||
)
|
||
)
|
||
high_count: int = high_q.scalar() or 0
|
||
|
||
low_q = await session.execute(
|
||
select(func.count()).where(
|
||
PlaybookRecord.status == "approved",
|
||
PlaybookRecord.trust_score < DRIFT_LOW_THRESHOLD,
|
||
)
|
||
)
|
||
low_count: int = low_q.scalar() or 0
|
||
|
||
mean_q = await session.execute(
|
||
select(func.avg(PlaybookRecord.trust_score)).where(
|
||
PlaybookRecord.status == "approved"
|
||
)
|
||
)
|
||
mean_trust: float = float(mean_q.scalar() or 0.0)
|
||
|
||
mid_count = total - high_count - low_count
|
||
high_ratio = high_count / total
|
||
low_ratio = low_count / total
|
||
|
||
# 偵測漂移類型
|
||
drift_type = None
|
||
if high_ratio >= DRIFT_RATIO_TRIGGER:
|
||
drift_type = "optimism_bias" # 所有 Playbook 都覺得自己很好 → 可疑
|
||
elif low_ratio >= DRIFT_RATIO_TRIGGER:
|
||
drift_type = "confidence_collapse" # AI 對自己完全沒信心 → 學習卡死
|
||
|
||
dist = TrustDistribution(
|
||
total=total,
|
||
high_count=high_count,
|
||
low_count=low_count,
|
||
mid_count=mid_count,
|
||
high_ratio=high_ratio,
|
||
low_ratio=low_ratio,
|
||
mean_trust=mean_trust,
|
||
drift_type=drift_type,
|
||
drift_detected=drift_type is not None,
|
||
)
|
||
|
||
if dist.drift_detected:
|
||
logger.warning(
|
||
"trust_drift_detected",
|
||
drift_type=drift_type,
|
||
high_ratio=round(high_ratio, 3),
|
||
low_ratio=round(low_ratio, 3),
|
||
mean_trust=round(mean_trust, 3),
|
||
total=total,
|
||
)
|
||
else:
|
||
logger.info(
|
||
"trust_drift_ok",
|
||
mean_trust=round(mean_trust, 3),
|
||
total=total,
|
||
high_ratio=round(high_ratio, 3),
|
||
)
|
||
|
||
return dist
|
||
|
||
except Exception as e:
|
||
logger.error("trust_drift_detect_error", error=str(e))
|
||
# 保守:偵測失敗 → 不告警(不知道比亂告警好)
|
||
return TrustDistribution(
|
||
total=0,
|
||
high_count=0, low_count=0, mid_count=0,
|
||
high_ratio=0.0, low_ratio=0.0, mean_trust=0.0,
|
||
drift_type=None, drift_detected=False,
|
||
)
|
||
|
||
async def save_drift_event(self, dist: TrustDistribution) -> None:
|
||
"""將信任度漂移事件寫入 ai_governance_events。"""
|
||
try:
|
||
async with get_db_context() as session:
|
||
event = AiGovernanceEvent(
|
||
event_type="trust_drift",
|
||
details={
|
||
**dist.to_dict(),
|
||
"detected_at": now_taipei().isoformat(),
|
||
},
|
||
resolved=False,
|
||
)
|
||
session.add(event)
|
||
await session.commit()
|
||
logger.warning(
|
||
"trust_drift_event_saved",
|
||
drift_type=dist.drift_type,
|
||
)
|
||
except Exception as e:
|
||
logger.error("trust_drift_event_save_error", error=str(e))
|
||
|
||
async def run(self) -> TrustDistribution:
|
||
"""統計偵測(LIB ONLY):只回傳 TrustDistribution,不寫 ai_governance_events。
|
||
|
||
2026-05-02 ogt + Claude Sonnet 4.6(亞太): 整併雙寫路徑
|
||
原行為:detect() 後若 drift_detected 自動呼叫 save_drift_event() 寫 PG。
|
||
改為:只回傳結果,由呼叫方決定是否寫入。
|
||
ai_governance_events 的唯一寫入點:governance_agent.check_trust_drift()。
|
||
"""
|
||
return await self.detect()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Singleton
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
_detector: TrustDriftDetector | None = None
|
||
|
||
|
||
def get_trust_drift_detector() -> TrustDriftDetector:
|
||
global _detector
|
||
if _detector is None:
|
||
_detector = TrustDriftDetector()
|
||
return _detector
|