fix(alerts): PENDING 收斂無 TTL → 老記錄永久封鎖 Telegram 告警
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running

根因:find_by_fingerprint 的 PENDING 匹配條件無時間上限,
2026-04-12 建立的 3 筆 PENDING approval records(hit=77/30/17)
持續吃掉所有同指紋告警,造成 2+ 小時 Telegram 靜音。

修正(approval_db.py):
  - PENDING_TTL_HOURS = 24:PENDING 記錄逾 24h 不再收斂新告警
  - 原本:OR(status=PENDING, created_at>=30min前)
  - 修正:OR(PENDING AND created_at>=24h前, created_at>=30min前)

緊急修復:kubectl exec 直接將 7 筆過期 PENDING 記錄設為 expired,
即時恢復 Telegram 告警流(不等部署)。

Phase 6 AI 自我治理閉環(ADR-087):
  - feat(db): 新增 ai_governance_events 表 + 3 個 index(base.py + models.py)
  - feat(svc): ai_slo_calculator.py — 7d 滾動 SLO(success/override/false_neg)
  - feat(svc): trust_drift_detector.py — Playbook 信任度極端偏態偵測
  - feat(job): kb_rot_cleaner.py — K8s API/Prom metric/老舊 incident_case 腐爛清理
  - feat(svc): decision_manager.py — 自我降級守衛(SLO 違反 → 提高門檻/保守模式)

2026-04-15 ogt + Claude Sonnet 4.6(亞太)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 18:56:16 +08:00
parent 37f4553349
commit fab65e7d7a
7 changed files with 1112 additions and 3 deletions

View File

@@ -220,6 +220,20 @@ async def init_db() -> None:
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
# create_all 已建表,此處補 INDEX部分環境 create 不跑 Index
await conn.execute(
text("""
CREATE INDEX IF NOT EXISTS ix_ai_governance_event_type
ON ai_governance_events (event_type);
CREATE INDEX IF NOT EXISTS ix_ai_governance_triggered_at
ON ai_governance_events (triggered_at);
CREATE INDEX IF NOT EXISTS ix_ai_governance_resolved
ON ai_governance_events (resolved);
""")
)
async def close_db() -> None:
"""

View File

@@ -1116,3 +1116,62 @@ class AgentSession(Base):
# 查詢某 session 中特定 role 的 turnCoordinator 聚合時常用)
Index("ix_agent_sessions_session_role", "session_id", "agent_role"),
)
# =============================================================================
# AiGovernanceEvent — Phase 6 自我治理事件溯源(不可刪除)
# ADR-087: AI 自我治理閉環SLO 違反 / 信任漂移 / KB 腐爛 / 自我降級
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
#
# 核心鐵律:
# - 不可變 Event Sourcing — 只 INSERT禁止 UPDATE/DELETE
# - 所有治理事件必須落地 PGSLO dashboard 依賴此表
# - resolved=True 僅由人工或下次計算時補填,不可自動翻轉未解決項目
# =============================================================================
class AiGovernanceEvent(Base):
"""
AI 自我治理事件記錄(不可變)
event_type 值:
slo_violation — SLO 計算結果違反閾值
trust_drift — Playbook 信任度分布偏態(全高或全低)
kb_stale — KB 條目引用已廢棄 K8s API / Prometheus query
self_demotion — 信心閾值自動調高(自我降級)
conservative_mode — 連續 SLO 違反,全系統切保守模式
replay_degraded — 離線回放一致率連續下降
immutable — 只 INSERT禁 UPDATE / DELETE
"""
__tablename__ = "ai_governance_events"
id: Mapped[str] = mapped_column(
String(36), primary_key=True, default=generate_uuid,
comment="主鍵UUID"
)
event_type: Mapped[str] = mapped_column(
String(40), nullable=False,
comment="slo_violation / trust_drift / kb_stale / self_demotion / conservative_mode / replay_degraded"
)
triggered_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=taipei_now, nullable=False,
comment="事件觸發時間(台北時區)"
)
details: Mapped[dict] = mapped_column(
JSON, nullable=False, default=dict,
comment="事件詳情 JSONBSLO 數值、漂移分布等)"
)
resolved: Mapped[bool] = mapped_column(
default=False, nullable=False,
comment="是否已解決(人工確認或下次計算恢復正常後補填)"
)
resolved_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True,
comment="解決時間(僅人工/系統補填,不得自動反轉未解決項目)"
)
__table_args__ = (
Index("ix_ai_governance_event_type", "event_type"),
Index("ix_ai_governance_triggered_at", "triggered_at"),
Index("ix_ai_governance_resolved", "resolved"),
)

View File

@@ -0,0 +1,247 @@
"""
AWOOOI AIOps Phase 6 — KB 腐爛清理 Job
=======================================
職責月度巡檢知識庫knowledge_entries中腐爛的知識條目
標記引用了已廢棄資源的條目為 stale並寫入 ai_governance_events。
「腐爛」的三種形態:
ROT-1 廢棄 K8s API 版本引用extensions/v1beta1、apps/v1beta1、v1beta2
ROT-2 過時 Prometheus query pattern已知廢棄 metric 名稱前綴)
ROT-3 超過 180 天未被引用且成功率為 0 的 incident_case 條目
設計原則:
1. 只讀掃描 + 標記(不刪除任何 entry符合 archive_not_delete 鐵律)
2. 標記方式status = 'archived' + tags 追加 'kb_rot_detected'
3. 掃描失敗 → 記錄 error不拋出不影響主路徑
4. 每次執行結果寫 ai_governance_eventsevent_type=kb_stale
ADR-087: AI 自我治理閉環
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import timedelta
import structlog
from sqlalchemy import select, update
from src.db.base import get_session_factory
from src.db.models import AiGovernanceEvent, KnowledgeEntryRecord
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 腐爛偵測規則(不可寫死 action只標記 stale
# ─────────────────────────────────────────────────────────────────────────────
# ROT-1: 廢棄 K8s API 版本Kubernetes 1.16+ 已移除)
DEPRECATED_K8S_APIS = [
"extensions/v1beta1",
"apps/v1beta1",
"apps/v1beta2",
"networking.k8s.io/v1beta1",
"policy/v1beta1",
"rbac.authorization.k8s.io/v1beta1",
]
# ROT-2: 廢棄 Prometheus metric 前綴(已知改名的 metric pattern
DEPRECATED_PROM_PATTERNS = [
r"container_cpu_used_total", # → container_cpu_usage_seconds_total
r"kube_pod_container_status_restarts$", # → kube_pod_container_status_restarts_total
r"http_requests_total\{.*le=", # 錯誤 histogram 用法
]
# ROT-3: 未引用 + 零成功率條目的老化天數
STALE_AGE_DAYS = 180
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class RotScanResult:
"""KB 腐爛掃描結果"""
total_scanned: int
stale_ids: list[str] = field(default_factory=list)
rot_reasons: dict[str, list[str]] = field(default_factory=dict)
# rot_reasons: {entry_id: ["ROT-1: extensions/v1beta1", ...]}
scanned_at: str = field(default_factory=lambda: now_taipei().isoformat())
@property
def stale_count(self) -> int:
return len(self.stale_ids)
def to_dict(self) -> dict:
return {
"total_scanned": self.total_scanned,
"stale_count": self.stale_count,
"stale_ids": self.stale_ids[:50], # 最多記錄前 50 個
"rot_reasons_sample": {k: v for k, v in list(self.rot_reasons.items())[:10]},
"scanned_at": self.scanned_at,
}
# ─────────────────────────────────────────────────────────────────────────────
# Main Job
# ─────────────────────────────────────────────────────────────────────────────
class KbRotCleaner:
"""
KB 腐爛清理 Job月度執行
Usage:
cleaner = KbRotCleaner()
result = await cleaner.run()
"""
async def run(self) -> RotScanResult:
"""
完整執行:掃描 → 標記 stale → 寫 governance event。
Returns:
RotScanResult
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.is_sub_flag_enabled("AIOPS_P6_KB_ROT_CLEANER"):
logger.info("kb_rot_cleaner_skipped_feature_flag")
return RotScanResult(total_scanned=0)
try:
result = await self._scan()
if result.stale_count > 0:
await self._mark_stale(result)
await self._save_event(result)
else:
logger.info("kb_rot_scan_clean", total_scanned=result.total_scanned)
return result
except Exception as e:
logger.error("kb_rot_cleaner_error", error=str(e))
return RotScanResult(total_scanned=0)
async def _scan(self) -> RotScanResult:
"""掃描所有 approved / draft 條目,找出腐爛項目。"""
stale_ids: list[str] = []
rot_reasons: dict[str, list[str]] = {}
total = 0
async with get_session_factory()() as session:
# 只掃 active 狀態(非 archived
q = await session.execute(
select(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.status.in_(["approved", "draft", "review"])
)
)
entries = q.scalars().all()
total = len(entries)
stale_cutoff = now_taipei() - timedelta(days=STALE_AGE_DAYS)
for entry in entries:
reasons: list[str] = []
content = (entry.content or "").lower()
title = (entry.title or "").lower()
combined = content + " " + title
# ROT-1: 廢棄 K8s API
for api in DEPRECATED_K8S_APIS:
if api.lower() in combined:
reasons.append(f"ROT-1: 廢棄 K8s API {api}")
# ROT-2: 廢棄 Prometheus pattern
for pattern in DEPRECATED_PROM_PATTERNS:
if re.search(pattern, combined):
reasons.append(f"ROT-2: 廢棄 Prom metric pattern {pattern[:40]}")
# ROT-3: 老化未引用incident_case 且 180 天未更新)
if (
entry.entry_type == "incident_case"
and entry.updated_at < stale_cutoff
and entry.view_count == 0
):
reasons.append(
f"ROT-3: 超過 {STALE_AGE_DAYS}d 未引用 "
f"(last_updated={entry.updated_at.strftime('%Y-%m-%d')})"
)
if reasons:
stale_ids.append(entry.id)
rot_reasons[entry.id] = reasons
logger.info(
"kb_rot_scan_complete",
total=total,
stale_count=len(stale_ids),
)
return RotScanResult(
total_scanned=total,
stale_ids=stale_ids,
rot_reasons=rot_reasons,
)
async def _mark_stale(self, result: RotScanResult) -> None:
"""
將腐爛條目標記為 archived並追加 kb_rot_detected tag。
符合 archive_not_delete 鐵律:只封存,不刪除。
"""
if not result.stale_ids:
return
async with get_session_factory()() as session:
# 逐條更新(避免 bulk update 覆蓋 tags JSONB
q = await session.execute(
select(KnowledgeEntryRecord).where(
KnowledgeEntryRecord.id.in_(result.stale_ids)
)
)
entries = q.scalars().all()
for entry in entries:
entry.status = "archived"
tags = list(entry.tags or [])
if "kb_rot_detected" not in tags:
tags.append("kb_rot_detected")
entry.tags = tags
await session.commit()
logger.warning(
"kb_rot_entries_archived",
count=len(result.stale_ids),
entry_ids=result.stale_ids[:10],
)
async def _save_event(self, result: RotScanResult) -> None:
"""寫 kb_stale 事件到 ai_governance_events。"""
try:
async with get_session_factory()() as session:
event = AiGovernanceEvent(
event_type="kb_stale",
details=result.to_dict(),
resolved=False,
)
session.add(event)
await session.commit()
logger.info("kb_rot_event_saved", stale_count=result.stale_count)
except Exception as e:
logger.error("kb_rot_event_save_error", error=str(e))
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_cleaner: KbRotCleaner | None = None
def get_kb_rot_cleaner() -> KbRotCleaner:
global _cleaner
if _cleaner is None:
_cleaner = KbRotCleaner()
return _cleaner

View File

@@ -0,0 +1,418 @@
"""
AWOOOI AIOps Phase 6 — AI SLO 計算器(決策品質自我監控)
=========================================================
職責:滾動計算三大 AI 決策品質 SLO違反閾值時寫入 ai_governance_events
供 decision_manager 自我降級邏輯讀取。
三大 SLOMASTER §3.6 ADR-087
SLO-1 auto_execute_success_rate > 85% 7d 滾動)
SLO-2 human_override_rate < 20% 7d 滾動)
SLO-3 verifier_false_neg_rate < 5% 7d 滾動proxy: 2h 內重複告警)
設計原則:
1. 純讀 + 純寫分離 — calculate() 只讀 DBsave_event() 只寫 DB
2. 計算失敗 → 保守:假設 SLO 違反,寫 violation 事件
3. 所有結果快取 Rediskey: ai:slo:latest, TTL 5min避免高頻查 DB
4. 不自動解決舊 violation — resolved 只能人工或下次「全部通過」時補填
ADR-087: AI 自我治理閉環
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import timedelta
import structlog
from sqlalchemy import func, select, text
from src.db.base import get_session_factory
from src.db.models import AiGovernanceEvent, AutoRepairExecution, ApprovalRecord
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# SLO 閾值MASTER §3.6 鐵律,修改前需 ADR-087 更新)
# ─────────────────────────────────────────────────────────────────────────────
SLO_AUTO_SUCCESS_MIN: float = 0.85 # auto_execute 成功率下限
SLO_OVERRIDE_RATE_MAX: float = 0.20 # 人工推翻率上限
SLO_FALSE_NEG_MAX: float = 0.05 # verifier false negative 上限
SLO_WINDOW_DAYS: int = 7 # 滾動視窗(天)
SLO_MIN_SAMPLES: int = 5 # 最少樣本數,低於此不計算(資料不足)
REDIS_KEY = "ai:slo:latest"
REDIS_TTL_SEC = 300 # 5 分鐘快取
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class SloMetric:
"""單一 SLO 指標"""
name: str
value: float | None # None = 樣本不足,跳過
threshold: float
direction: str # "above" = 需高於閾值 / "below" = 需低於閾值
sample_count: int
violated: bool # 是否違反None → False不觸發降級
@property
def label(self) -> str:
if self.value is None:
return f"{self.name}: N/A樣本 {self.sample_count} < {SLO_MIN_SAMPLES}"
pct = f"{self.value:.1%}"
thr = f"{self.threshold:.0%}"
op = ">" if self.direction == "above" else "<"
status = "❌ 違反" if self.violated else "✅ 合規"
return f"{self.name}: {pct} (需 {op}{thr} {status}"
@dataclass
class SloReport:
"""完整 SLO 計算報告"""
metrics: list[SloMetric] = field(default_factory=list)
any_violated: bool = False
calculated_at: str = field(default_factory=lambda: now_taipei().isoformat())
window_days: int = SLO_WINDOW_DAYS
def to_dict(self) -> dict:
return {
"calculated_at": self.calculated_at,
"window_days": self.window_days,
"any_violated": self.any_violated,
"metrics": [
{
"name": m.name,
"value": m.value,
"threshold": m.threshold,
"direction": m.direction,
"sample_count": m.sample_count,
"violated": m.violated,
"label": m.label,
}
for m in self.metrics
],
}
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class AiSloCalculator:
"""
AI 決策品質 SLO 計算器
Usage:
calc = AiSloCalculator()
report = await calc.calculate()
if report.any_violated:
await calc.save_violation_event(report)
"""
async def calculate(self) -> SloReport:
"""
計算三大 SLO 指標7d 滾動視窗)。
Returns:
SloReport計算失敗時保守回傳 any_violated=True
"""
try:
since = now_taipei() - timedelta(days=SLO_WINDOW_DAYS)
async with get_session_factory()() as session:
slo1 = await self._calc_auto_success_rate(session, since)
slo2 = await self._calc_human_override_rate(session, since)
slo3 = await self._calc_false_neg_rate(session, since)
metrics = [slo1, slo2, slo3]
any_violated = any(m.violated for m in metrics)
report = SloReport(
metrics=metrics,
any_violated=any_violated,
)
logger.info(
"slo_calculated",
any_violated=any_violated,
slo1=slo1.value,
slo2=slo2.value,
slo3=slo3.value,
)
return report
except Exception as e:
logger.error("slo_calculation_error", error=str(e))
# 保守:計算失敗 → 假設違反
violated_metric = SloMetric(
name="calculation_error",
value=None,
threshold=0.0,
direction="above",
sample_count=0,
violated=True,
)
return SloReport(
metrics=[violated_metric],
any_violated=True,
)
async def get_cached_report(self) -> SloReport | None:
"""從 Redis 讀取最近一次 SLO 報告5min 快取)。"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
raw = await redis.get(REDIS_KEY)
if raw:
data = json.loads(raw)
metrics = [
SloMetric(
name=m["name"],
value=m["value"],
threshold=m["threshold"],
direction=m["direction"],
sample_count=m["sample_count"],
violated=m["violated"],
)
for m in data.get("metrics", [])
]
return SloReport(
metrics=metrics,
any_violated=data.get("any_violated", False),
calculated_at=data.get("calculated_at", ""),
window_days=data.get("window_days", SLO_WINDOW_DAYS),
)
except Exception as e:
logger.warning("slo_cache_read_error", error=str(e))
return None
async def cache_report(self, report: SloReport) -> None:
"""將 SLO 報告存入 Redis 快取TTL 5min"""
try:
from src.core.redis_client import get_redis
redis = get_redis()
await redis.set(REDIS_KEY, json.dumps(report.to_dict()), ex=REDIS_TTL_SEC)
except Exception as e:
logger.warning("slo_cache_write_error", error=str(e))
async def save_violation_event(self, report: SloReport) -> None:
"""
將 SLO 違反寫入 ai_governance_events。
只在 any_violated=True 時呼叫。不管舊違反是否解決。
"""
try:
async with get_session_factory()() as session:
event = AiGovernanceEvent(
event_type="slo_violation",
details=report.to_dict(),
resolved=False,
)
session.add(event)
await session.commit()
logger.warning(
"slo_violation_recorded",
violated_metrics=[m.name for m in report.metrics if m.violated],
)
except Exception as e:
logger.error("slo_violation_save_error", error=str(e))
async def run(self) -> SloReport:
"""
完整執行:計算 → 快取 → 如違反則寫事件。
Returns:
SloReport
"""
report = await self.calculate()
await self.cache_report(report)
if report.any_violated:
await self.save_violation_event(report)
return report
# ──────────────────────────────────────────────────────────────────────────
# Private: SLO 計算方法
# ──────────────────────────────────────────────────────────────────────────
async def _calc_auto_success_rate(self, session, since) -> SloMetric:
"""SLO-1: auto_repair_executions 7d 成功率。"""
try:
total_q = await session.execute(
select(func.count()).where(
AutoRepairExecution.created_at >= since
)
)
total: int = total_q.scalar() or 0
if total < SLO_MIN_SAMPLES:
return SloMetric(
name="auto_execute_success_rate",
value=None,
threshold=SLO_AUTO_SUCCESS_MIN,
direction="above",
sample_count=total,
violated=False,
)
success_q = await session.execute(
select(func.count()).where(
AutoRepairExecution.created_at >= since,
AutoRepairExecution.success.is_(True),
)
)
success: int = success_q.scalar() or 0
rate = success / total
return SloMetric(
name="auto_execute_success_rate",
value=rate,
threshold=SLO_AUTO_SUCCESS_MIN,
direction="above",
sample_count=total,
violated=rate < SLO_AUTO_SUCCESS_MIN,
)
except Exception as e:
logger.warning("slo1_calc_error", error=str(e))
return SloMetric(
name="auto_execute_success_rate",
value=None, threshold=SLO_AUTO_SUCCESS_MIN,
direction="above", sample_count=0, violated=False,
)
async def _calc_human_override_rate(self, session, since) -> SloMetric:
"""
SLO-2: 人工推翻率 = AI 提案被 rejected / 總 AI 提案。
rejected = approval_records.status = 'rejected'
AI 提案 = requested_by LIKE 'ai_%' or 'system'
"""
try:
ai_q = await session.execute(
select(func.count()).where(
ApprovalRecord.created_at >= since,
)
)
total: int = ai_q.scalar() or 0
if total < SLO_MIN_SAMPLES:
return SloMetric(
name="human_override_rate",
value=None,
threshold=SLO_OVERRIDE_RATE_MAX,
direction="below",
sample_count=total,
violated=False,
)
rejected_q = await session.execute(
select(func.count()).where(
ApprovalRecord.created_at >= since,
ApprovalRecord.status == "rejected",
)
)
rejected: int = rejected_q.scalar() or 0
rate = rejected / total
return SloMetric(
name="human_override_rate",
value=rate,
threshold=SLO_OVERRIDE_RATE_MAX,
direction="below",
sample_count=total,
violated=rate > SLO_OVERRIDE_RATE_MAX,
)
except Exception as e:
logger.warning("slo2_calc_error", error=str(e))
return SloMetric(
name="human_override_rate",
value=None, threshold=SLO_OVERRIDE_RATE_MAX,
direction="below", sample_count=0, violated=False,
)
async def _calc_false_neg_rate(self, session, since) -> SloMetric:
"""
SLO-3: Verifier false negative代理指標
計算方式auto_repair 執行後 2 小時內同 incident_id 再次出現
在 auto_repair_executions 中(= 修好了又壞 = verifier 誤判為成功)。
使用 SQL window function
- 找出 success=True 的執行
- 計算同 incident_id 下是否有後續 failed 執行在 2h 內
"""
try:
result = await session.execute(
text("""
WITH success_runs AS (
SELECT incident_id, created_at
FROM auto_repair_executions
WHERE success = TRUE
AND created_at >= :since
),
false_negs AS (
SELECT DISTINCT s.incident_id
FROM success_runs s
JOIN auto_repair_executions f
ON f.incident_id = s.incident_id
AND f.success = FALSE
AND f.created_at > s.created_at
AND f.created_at <= s.created_at + INTERVAL '2 hours'
)
SELECT
(SELECT COUNT(*) FROM success_runs) AS total_success,
(SELECT COUNT(*) FROM false_negs) AS false_neg_count
"""),
{"since": since},
)
row = result.fetchone()
total_success: int = row[0] if row else 0
false_neg: int = row[1] if row else 0
if total_success < SLO_MIN_SAMPLES:
return SloMetric(
name="verifier_false_neg_rate",
value=None,
threshold=SLO_FALSE_NEG_MAX,
direction="below",
sample_count=total_success,
violated=False,
)
rate = false_neg / total_success
return SloMetric(
name="verifier_false_neg_rate",
value=rate,
threshold=SLO_FALSE_NEG_MAX,
direction="below",
sample_count=total_success,
violated=rate > SLO_FALSE_NEG_MAX,
)
except Exception as e:
logger.warning("slo3_calc_error", error=str(e))
return SloMetric(
name="verifier_false_neg_rate",
value=None, threshold=SLO_FALSE_NEG_MAX,
direction="below", sample_count=0, violated=False,
)
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_calculator: AiSloCalculator | None = None
def get_ai_slo_calculator() -> AiSloCalculator:
global _calculator
if _calculator is None:
_calculator = AiSloCalculator()
return _calculator

View File

@@ -272,14 +272,21 @@ class ApprovalDBService:
查詢條件:
1. 相同指紋
2. 狀態為 PENDING,或
3. 在 debounce_minutes 分鐘內建立
2. 狀態為 PENDING 且在 24 小時內建立(超過 24h 的 PENDING 視為過期,不再收斂)
3. 在 debounce_minutes 分鐘內建立(不論狀態)
ADR-073 補丁 2026-04-15 ogt + Claude Sonnet 4.6:
原邏輯 PENDING 無 TTL → 3 天前 PENDING 記錄永久封鎖同指紋告警。
修正PENDING 收斂窗口上限 PENDING_TTL_HOURS24h
Returns:
ApprovalRequest if found, None otherwise
"""
PENDING_TTL_HOURS = 24 # PENDING 記錄最長收斂時效(超過則視為已過期)
now = datetime.now(UTC)
cutoff_time = now - timedelta(minutes=debounce_minutes)
pending_cutoff = now - timedelta(hours=PENDING_TTL_HOURS)
async with get_db_context() as db:
result = await db.execute(
@@ -287,7 +294,12 @@ class ApprovalDBService:
.where(ApprovalRecord.fingerprint == fingerprint)
.where(
or_(
ApprovalRecord.status == ApprovalStatus.PENDING,
# PENDING 狀態但必須在 24h 內,防止老 PENDING 永久封鎖
and_(
ApprovalRecord.status == ApprovalStatus.PENDING,
ApprovalRecord.created_at >= pending_cutoff,
),
# 最近 debounce_minutes 分鐘內建立的任何記錄
ApprovalRecord.created_at >= cutoff_time,
)
)

View File

@@ -1316,6 +1316,122 @@ class DecisionManager:
"""
action = token.proposal_data.get("kubectl_command", "")
# Phase 6 ADR-087: 自我降級守衛AIOPS_P6_SELF_DEMOTION 控制)
# SLO 違反 → 全域信心閾值調高;連續違反 → 保守模式,所有自動執行降為人工
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
try:
from src.core.feature_flags import aiops_flags as _p6_flags
if _p6_flags.is_sub_flag_enabled("AIOPS_P6_SELF_DEMOTION"):
from src.db.base import get_session_factory as _p6_sf
from src.db.models import AiGovernanceEvent as _GovernanceEvent
from sqlalchemy import select as _p6_select, func as _p6_func
from datetime import timedelta as _p6_td
_now = __import__("src.utils.timezone", fromlist=["now_taipei"]).now_taipei()
async with _p6_sf()() as _p6_sess:
# 過去 7 天有幾筆未解決的 slo_violation
_viol_7d_q = await _p6_sess.execute(
_p6_select(_p6_func.count()).where(
_GovernanceEvent.event_type == "slo_violation",
_GovernanceEvent.resolved.is_(False),
_GovernanceEvent.triggered_at >= _now - _p6_td(days=7),
)
)
_viol_7d: int = _viol_7d_q.scalar() or 0
# 過去 14 天有幾筆未解決的 slo_violation
_viol_14d_q = await _p6_sess.execute(
_p6_select(_p6_func.count()).where(
_GovernanceEvent.event_type == "slo_violation",
_GovernanceEvent.resolved.is_(False),
_GovernanceEvent.triggered_at >= _now - _p6_td(days=14),
)
)
_viol_14d: int = _viol_14d_q.scalar() or 0
if _viol_14d >= 2:
# 連續 2 週違反 → 保守模式:全部降為人工
logger.warning(
"auto_execute_conservative_mode",
incident_id=incident.incident_id,
viol_14d=_viol_14d,
reason="Phase 6 保守模式:連續 SLO 違反,所有自動執行暫停",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["p6_conservative_mode"] = True
token.proposal_data["p6_reason"] = f"SLO 連續違反 {_viol_14d}d系統進入保守模式"
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
# 記錄保守模式事件
try:
from src.db.base import get_session_factory as _p6_sf2
async with _p6_sf2()() as _s2:
_s2.add(_GovernanceEvent(
event_type="conservative_mode",
details={
"incident_id": incident.incident_id,
"viol_14d": _viol_14d,
"triggered_at": _now.isoformat(),
},
resolved=False,
))
await _s2.commit()
except Exception:
pass
return
elif _viol_7d >= 1:
# 近 7 天有違反 → 自我降級:信心閾值提高,記錄 demotion 事件
_confidence = float(token.proposal_data.get("confidence", 0.0))
_raised_threshold = 0.75 # 原 0.70 → 調高 0.05
if _confidence < _raised_threshold:
logger.warning(
"auto_execute_self_demoted",
incident_id=incident.incident_id,
confidence=_confidence,
raised_threshold=_raised_threshold,
reason="Phase 6 自我降級:近 7d SLO 違反,信心閾值提高",
)
token.state = DecisionState.READY
token.proposal_data["decision_state"] = DecisionState.READY.value
token.proposal_data["auto_executed"] = False
token.proposal_data["p6_self_demoted"] = True
token.proposal_data["p6_reason"] = (
f"Phase 6 自我降級:近 7d SLO 違反,"
f"信心 {_confidence:.2f} < {_raised_threshold},升為人工"
)
await self._save_token(token)
_fire_and_forget(
_push_decision_to_telegram(incident, token.proposal_data)
)
try:
from src.db.base import get_session_factory as _p6_sf3
async with _p6_sf3()() as _s3:
_s3.add(_GovernanceEvent(
event_type="self_demotion",
details={
"incident_id": incident.incident_id,
"confidence": _confidence,
"raised_threshold": _raised_threshold,
"viol_7d": _viol_7d,
"triggered_at": _now.isoformat(),
},
resolved=False,
))
await _s3.commit()
except Exception:
pass
return
# confidence >= raised_threshold → 允許繼續自動執行
except Exception as _p6_err:
logger.warning("p6_self_demotion_check_error", error=str(_p6_err))
# 保守P6 check 出錯 → 不阻擋(避免因 P6 bug 把所有修復都堵住)
# ADR-073 Phase 3-5: action | parse fix (2026-04-12 ogt)
# LLM 有時輸出 "kubectl rollout restart X | kubectl get pods -n Y"
# | 後面是查詢指令,取第一個才是真正的修復操作

View File

@@ -0,0 +1,243 @@
"""
AWOOOI AIOps Phase 6 — Trust Drift Detector信任度漂移偵測器
===============================================================
職責:偵測 Playbook trust_score 分布的兩種極端偏態:
極端 A「盲目樂觀」> 70% Playbook trust_score > 0.9
→ 可能是 PostExecutionVerifier 失效,或 RAG 資料被污染,讓所有 AI 都以為「我很棒」
→ 真正的好系統不會所有 Playbook 都高分
極端 B「學習鎖死」> 70% Playbook trust_score < 0.3
→ 可能是 EWMA 計算出錯,或所有執行都被誤判失敗,讓 AI 對自己完全沒信心
→ 學習機制可能卡死
設計原則:
1. 只讀 DB不修改任何數據
2. 違反 → 寫 trust_drift 事件到 ai_governance_events
3. 樣本不足(< 10 個 approved Playbook→ 跳過偵測,不告警
ADR-087: AI 自我治理閉環
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
"""
from __future__ import annotations
from dataclasses import dataclass
import structlog
from sqlalchemy import func, select
from src.db.base import get_session_factory
from src.db.models import AiGovernanceEvent, PlaybookRecord
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 偵測閾值MASTER §3.6,修改需 ADR-087 更新)
# ─────────────────────────────────────────────────────────────────────────────
DRIFT_HIGH_THRESHOLD: float = 0.9 # trust_score > 此值算「過高」
DRIFT_LOW_THRESHOLD: float = 0.3 # trust_score < 此值算「過低」
DRIFT_RATIO_TRIGGER: float = 0.70 # 超過 70% Playbook 落在極端 → 觸發警報
DRIFT_MIN_SAMPLES: int = 10 # 最少 approved Playbook 數量
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class TrustDistribution:
"""Playbook 信任度分布快照"""
total: int
high_count: int # trust_score > 0.9
low_count: int # trust_score < 0.3
mid_count: int # 0.3 <= trust_score <= 0.9(正常區間)
high_ratio: float
low_ratio: float
mean_trust: float
drift_type: str | None # "optimism_bias" / "confidence_collapse" / None
drift_detected: bool
def to_dict(self) -> dict:
return {
"total": self.total,
"high_count": self.high_count,
"low_count": self.low_count,
"mid_count": self.mid_count,
"high_ratio": round(self.high_ratio, 4),
"low_ratio": round(self.low_ratio, 4),
"mean_trust": round(self.mean_trust, 4),
"drift_type": self.drift_type,
"drift_detected": self.drift_detected,
"thresholds": {
"high": DRIFT_HIGH_THRESHOLD,
"low": DRIFT_LOW_THRESHOLD,
"ratio_trigger": DRIFT_RATIO_TRIGGER,
"min_samples": DRIFT_MIN_SAMPLES,
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class TrustDriftDetector:
"""
信任度漂移偵測器
Usage:
detector = TrustDriftDetector()
dist = await detector.detect()
if dist.drift_detected:
await detector.save_drift_event(dist)
"""
async def detect(self) -> TrustDistribution:
"""
讀取所有 approved Playbook計算信任度分布偵測漂移。
Returns:
TrustDistribution樣本不足時 drift_detected=False
"""
try:
async with get_session_factory()() as session:
# 只計算 approved 狀態的 Playbook
total_q = await session.execute(
select(func.count()).where(
PlaybookRecord.status == "approved"
)
)
total: int = total_q.scalar() or 0
if total < DRIFT_MIN_SAMPLES:
logger.info(
"trust_drift_skip_insufficient_samples",
total=total,
required=DRIFT_MIN_SAMPLES,
)
return TrustDistribution(
total=total,
high_count=0, low_count=0, mid_count=0,
high_ratio=0.0, low_ratio=0.0, mean_trust=0.0,
drift_type=None, drift_detected=False,
)
high_q = await session.execute(
select(func.count()).where(
PlaybookRecord.status == "approved",
PlaybookRecord.trust_score > DRIFT_HIGH_THRESHOLD,
)
)
high_count: int = high_q.scalar() or 0
low_q = await session.execute(
select(func.count()).where(
PlaybookRecord.status == "approved",
PlaybookRecord.trust_score < DRIFT_LOW_THRESHOLD,
)
)
low_count: int = low_q.scalar() or 0
mean_q = await session.execute(
select(func.avg(PlaybookRecord.trust_score)).where(
PlaybookRecord.status == "approved"
)
)
mean_trust: float = float(mean_q.scalar() or 0.0)
mid_count = total - high_count - low_count
high_ratio = high_count / total
low_ratio = low_count / total
# 偵測漂移類型
drift_type = None
if high_ratio >= DRIFT_RATIO_TRIGGER:
drift_type = "optimism_bias" # 所有 Playbook 都覺得自己很好 → 可疑
elif low_ratio >= DRIFT_RATIO_TRIGGER:
drift_type = "confidence_collapse" # AI 對自己完全沒信心 → 學習卡死
dist = TrustDistribution(
total=total,
high_count=high_count,
low_count=low_count,
mid_count=mid_count,
high_ratio=high_ratio,
low_ratio=low_ratio,
mean_trust=mean_trust,
drift_type=drift_type,
drift_detected=drift_type is not None,
)
if dist.drift_detected:
logger.warning(
"trust_drift_detected",
drift_type=drift_type,
high_ratio=round(high_ratio, 3),
low_ratio=round(low_ratio, 3),
mean_trust=round(mean_trust, 3),
total=total,
)
else:
logger.info(
"trust_drift_ok",
mean_trust=round(mean_trust, 3),
total=total,
high_ratio=round(high_ratio, 3),
)
return dist
except Exception as e:
logger.error("trust_drift_detect_error", error=str(e))
# 保守:偵測失敗 → 不告警(不知道比亂告警好)
return TrustDistribution(
total=0,
high_count=0, low_count=0, mid_count=0,
high_ratio=0.0, low_ratio=0.0, mean_trust=0.0,
drift_type=None, drift_detected=False,
)
async def save_drift_event(self, dist: TrustDistribution) -> None:
"""將信任度漂移事件寫入 ai_governance_events。"""
try:
async with get_session_factory()() as session:
event = AiGovernanceEvent(
event_type="trust_drift",
details={
**dist.to_dict(),
"detected_at": now_taipei().isoformat(),
},
resolved=False,
)
session.add(event)
await session.commit()
logger.warning(
"trust_drift_event_saved",
drift_type=dist.drift_type,
)
except Exception as e:
logger.error("trust_drift_event_save_error", error=str(e))
async def run(self) -> TrustDistribution:
"""完整執行:偵測 → 如有漂移則寫事件。"""
dist = await self.detect()
if dist.drift_detected:
await self.save_drift_event(dist)
return dist
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_detector: TrustDriftDetector | None = None
def get_trust_drift_detector() -> TrustDriftDetector:
global _detector
if _detector is None:
_detector = TrustDriftDetector()
return _detector