From 77a92eb469ce9411ae564544746d8e864404b8dd Mon Sep 17 00:00:00 2001 From: OG T Date: Wed, 15 Apr 2026 22:29:09 +0800 Subject: [PATCH] =?UTF-8?q?feat(P6):=20=E6=8F=90=E4=BA=A4=20offline=5Frepl?= =?UTF-8?q?ay=5Fservice=20+=20model=5Frollback=5Fservice=20(=E6=BC=8F?= =?UTF-8?q?=E6=8F=90)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 6 ADR-087 治理閉環兩個核心服務, 之前建立後沒有 git add,一直是 untracked 狀態。 2026-04-15 Claude Sonnet 4.6 Asia/Taipei --- apps/api/src/jobs/offline_replay_service.py | 318 ++++++++++++++++++ .../src/services/model_rollback_service.py | 241 +++++++++++++ 2 files changed, 559 insertions(+) create mode 100644 apps/api/src/jobs/offline_replay_service.py create mode 100644 apps/api/src/services/model_rollback_service.py diff --git a/apps/api/src/jobs/offline_replay_service.py b/apps/api/src/jobs/offline_replay_service.py new file mode 100644 index 00000000..22e7583c --- /dev/null +++ b/apps/api/src/jobs/offline_replay_service.py @@ -0,0 +1,318 @@ +""" +AWOOOI AIOps Phase 6 — 離線回放服務(決策一致率基線) +===================================================== +職責:每週取最近 100 個已結案的 Incident,比對當時 AI 決策 vs 實際執行結果, + 計算「決策一致率」,輸出基線並在持續衰退時寫入 ai_governance_events。 + +一致率定義(加權,共 100 分): + - Coordinator 決策正確(requires_human = auto 且有 execution = 成功)→ +60% + - Playbook 命中(coordinator recommended action 含 playbook 關鍵字)→ +20% + - 驗證結果一致(verification_result = success)→ +20% + +設計原則: + 1. 不呼叫 LLM — 純 DB 查詢 + 靜態規則比對,避免回放產生費用 + 2. best-effort — 查詢失敗只記錄 warning,不阻塞主流程 + 3. 結果寫入 ai_governance_events(event_type = replay_degraded / replay_ok) + 4. feature flag: AIOPS_P6_GOVERNANCE_ENABLED + +ADR-087 Phase 6: 自我治理閉環 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 +""" + +from __future__ import annotations + +import asyncio +import uuid +from dataclasses import dataclass, field +from datetime import timedelta + +import structlog +from sqlalchemy import and_, select + +from src.db.base import get_session_factory +from src.db.models import AgentSession, AiGovernanceEvent, AutoRepairExecution, IncidentEvidence +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# 常數 +# ───────────────────────────────────────────────────────────────────────────── + +REPLAY_BATCH = 100 # 每次回放最多 N 個 Incident +REPLAY_LOOKBACK_DAYS = 14 # 回放視窗(取最近 14 天資料) +DEGRADED_THRESHOLD = 0.70 # 一致率 < 70% → 寫 replay_degraded 事件 +WEEKLY_INTERVAL_SEC = 7 * 86_400 + + +# ───────────────────────────────────────────────────────────────────────────── +# Result Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class IncidentReplayResult: + """單個 Incident 回放結果""" + incident_id: str + has_coordinator_decision: bool = False + coordinator_approved_auto: bool = False + has_execution: bool = False + execution_success: bool = False + has_verification: bool = False + verification_success: bool = False + consistency_score: float = 0.0 # 0~1 + + +@dataclass +class OfflineReplayReport: + """離線回放報告""" + run_at: str = "" + incidents_sampled: int = 0 + incidents_with_data: int = 0 + consistency_rate: float = 0.0 # 0~1 加權一致率 + degraded: bool = False + results: list[IncidentReplayResult] = field(default_factory=list) + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class OfflineReplayService: + """ + 離線回放服務 + + Usage: + svc = OfflineReplayService() + report = await svc.run() + """ + + async def run(self) -> OfflineReplayReport: + """執行一次離線回放,返回一致率報告。""" + from src.core.feature_flags import aiops_flags + if not aiops_flags.AIOPS_P6_GOVERNANCE_ENABLED: + logger.debug("offline_replay_skipped", reason="AIOPS_P6_GOVERNANCE_ENABLED=False") + return OfflineReplayReport(run_at=now_taipei().isoformat()) + + try: + return await self._run_replay() + except Exception as e: + logger.error("offline_replay_error", error=str(e)) + return OfflineReplayReport(run_at=now_taipei().isoformat()) + + async def run_forced(self) -> OfflineReplayReport: + """強制執行(繞過 feature flag,供管理員手動觸發)。""" + try: + return await self._run_replay() + except Exception as e: + logger.error("offline_replay_forced_error", error=str(e)) + return OfflineReplayReport(run_at=now_taipei().isoformat()) + + async def _run_replay(self) -> OfflineReplayReport: + cutoff = now_taipei() - timedelta(days=REPLAY_LOOKBACK_DAYS) + session_factory = get_session_factory() + + async with session_factory() as db: + # 1. 取最近 N 個有 AgentSession(coordinator) 的 Incident + stmt = ( + select(AgentSession.incident_id) + .where( + and_( + AgentSession.agent_role == "coordinator", + AgentSession.created_at >= cutoff, + ) + ) + .distinct() + .limit(REPLAY_BATCH) + ) + result = await db.execute(stmt) + incident_ids = [row[0] for row in result.all()] + + if not incident_ids: + logger.info("offline_replay_no_incidents", lookback_days=REPLAY_LOOKBACK_DAYS) + return OfflineReplayReport( + run_at=now_taipei().isoformat(), + incidents_sampled=0, + incidents_with_data=0, + consistency_rate=1.0, # 無資料 → 保守視為正常 + ) + + results: list[IncidentReplayResult] = [] + async with session_factory() as db: + for incident_id in incident_ids: + r = await self._replay_one(db, incident_id) + results.append(r) + + # 計算整體一致率 + incidents_with_data = [r for r in results if r.has_coordinator_decision] + if not incidents_with_data: + rate = 1.0 + else: + rate = sum(r.consistency_score for r in incidents_with_data) / len(incidents_with_data) + + degraded = rate < DEGRADED_THRESHOLD + report = OfflineReplayReport( + run_at=now_taipei().isoformat(), + incidents_sampled=len(incident_ids), + incidents_with_data=len(incidents_with_data), + consistency_rate=round(rate, 4), + degraded=degraded, + results=results, + ) + + # 2. 寫入治理事件 + await self._save_governance_event(report) + + logger.info( + "offline_replay_done", + sampled=report.incidents_sampled, + with_data=report.incidents_with_data, + consistency_rate=report.consistency_rate, + degraded=report.degraded, + ) + return report + + async def _replay_one(self, db, incident_id: str) -> IncidentReplayResult: + """回放單個 Incident。""" + r = IncidentReplayResult(incident_id=incident_id) + + # Coordinator AgentSession + coord_stmt = ( + select(AgentSession) + .where( + and_( + AgentSession.incident_id == incident_id, + AgentSession.agent_role == "coordinator", + ) + ) + .order_by(AgentSession.created_at.desc()) + .limit(1) + ) + coord_result = await db.execute(coord_stmt) + coordinator = coord_result.scalar_one_or_none() + + if not coordinator: + return r + + r.has_coordinator_decision = True + + # 判斷 coordinator 是否批准自動執行 + out = coordinator.output_json or {} + if isinstance(out, dict): + r.coordinator_approved_auto = not out.get("requires_human_approval", True) + + # AutoRepairExecution + exec_stmt = ( + select(AutoRepairExecution) + .where(AutoRepairExecution.incident_id == incident_id) + .order_by(AutoRepairExecution.created_at.desc()) + .limit(1) + ) + exec_result = await db.execute(exec_stmt) + execution = exec_result.scalar_one_or_none() + + if execution: + r.has_execution = True + r.execution_success = execution.success + + # IncidentEvidence(驗證結果) + ev_stmt = ( + select(IncidentEvidence) + .where(IncidentEvidence.incident_id == incident_id) + .order_by(IncidentEvidence.collected_at.desc()) + .limit(1) + ) + ev_result = await db.execute(ev_stmt) + evidence = ev_result.scalar_one_or_none() + + if evidence and evidence.verification_result: + r.has_verification = True + r.verification_success = (evidence.verification_result == "success") + + # 計算一致率(加權) + score = 0.0 + if r.has_coordinator_decision and r.has_execution: + # 決策與執行一致:auto approve + execution success + if r.coordinator_approved_auto and r.execution_success: + score += 0.60 + elif not r.coordinator_approved_auto and not r.has_execution: + # 升人工且無自動執行 — 也算一致 + score += 0.60 + else: + score += 0.10 # 決策與執行不一致,給小分 + elif r.has_coordinator_decision: + # 有決策但無執行 — 部分分 + score += 0.30 + + if r.has_execution and r.execution_success: + score += 0.20 + if r.has_verification and r.verification_success: + score += 0.20 + + r.consistency_score = min(score, 1.0) + return r + + async def _save_governance_event(self, report: OfflineReplayReport) -> None: + """寫入 ai_governance_events。""" + try: + from sqlalchemy import insert as sa_insert + from src.db.base import get_db_context + event_type = "replay_degraded" if report.degraded else "replay_ok" + async with get_db_context() as db: + await db.execute( + sa_insert(AiGovernanceEvent).values( + id=str(uuid.uuid4()), + event_type=event_type, + details={ + "consistency_rate": report.consistency_rate, + "incidents_sampled": report.incidents_sampled, + "incidents_with_data": report.incidents_with_data, + "degraded_threshold": DEGRADED_THRESHOLD, + "run_at": report.run_at, + }, + resolved=not report.degraded, + ) + ) + except Exception as e: + logger.warning("offline_replay_governance_write_failed", error=str(e)) + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_instance: OfflineReplayService | None = None + + +def get_offline_replay_service() -> OfflineReplayService: + global _instance + if _instance is None: + _instance = OfflineReplayService() + return _instance + + +# ───────────────────────────────────────────────────────────────────────────── +# Loop(掛載到 main.py) +# ───────────────────────────────────────────────────────────────────────────── + +async def run_offline_replay_loop() -> None: + """ + 無限迴圈:每 7 天執行一次離線回放。 + 在 main.py startup 以 asyncio.create_task 掛載。 + + ADR-087 Phase 6: 離線回放基線(L7×D6) + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 + """ + svc = get_offline_replay_service() + while True: + try: + report = await svc.run() + if report.incidents_sampled > 0: + logger.info( + "offline_replay_loop_tick", + rate=report.consistency_rate, + degraded=report.degraded, + ) + except Exception as e: + logger.error("offline_replay_loop_error", error=str(e)) + + await asyncio.sleep(WEEKLY_INTERVAL_SEC) diff --git a/apps/api/src/services/model_rollback_service.py b/apps/api/src/services/model_rollback_service.py new file mode 100644 index 00000000..e8324b2e --- /dev/null +++ b/apps/api/src/services/model_rollback_service.py @@ -0,0 +1,241 @@ +""" +AWOOOI AIOps Phase 6 — Model Rollback 服務(決策衰退偵測) +========================================================== +職責:讀取最近 N 週的離線回放一致率,偵測連續衰退趨勢, + 在確認衰退後寫入 ai_governance_events(conservative_mode) + 並觸發 Telegram 告警通知 SRE 評估 retrain。 + +衰退定義: + - 連續 4 週一致率下降(week[i] < week[i-1]) + - 最新一週一致率 < 55%(絕對閾值) + +設計原則: + 1. 不直接執行 retrain / model rollback — 只發通知 + 寫 governance event + 2. 自我降級連動:偵測到衰退 → conservative_mode event → decision_manager 讀取 + 3. best-effort — 任何錯誤靜默記錄,不影響主流程 + 4. feature flag: AIOPS_P6_GOVERNANCE_ENABLED + +ADR-087 Phase 6: 自我治理閉環 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立 +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass + +import structlog +from sqlalchemy import and_, desc, select + +from src.db.base import get_db_context +from src.db.models import AiGovernanceEvent +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# 常數 +# ───────────────────────────────────────────────────────────────────────────── + +CONSECUTIVE_DECLINE_WEEKS = 4 # 連續衰退 N 週才觸發 +ABSOLUTE_FLOOR_RATE = 0.55 # 一致率低於此值視為嚴重 +RETRAIN_COOLDOWN_DAYS = 14 # 兩次 conservative_mode 事件間最短間隔 + + +# ───────────────────────────────────────────────────────────────────────────── +# Result Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class RollbackCheckResult: + """衰退偵測結果""" + checked_weeks: int = 0 + consistency_rates: list[float] = None # type: ignore[assignment] + consecutive_declines: int = 0 + absolute_floor_breached: bool = False + retrain_recommended: bool = False + conservative_mode_triggered: bool = False + cooldown_active: bool = False + + def __post_init__(self) -> None: + if self.consistency_rates is None: + self.consistency_rates = [] + + +# ───────────────────────────────────────────────────────────────────────────── +# Service +# ───────────────────────────────────────────────────────────────────────────── + +class ModelRollbackService: + """ + Model Rollback 衰退偵測服務 + + Usage: + svc = ModelRollbackService() + result = await svc.check() + """ + + async def check(self, force: bool = False) -> RollbackCheckResult: + """ + 檢查是否有決策衰退趨勢。 + + Args: + force: True 時繞過 feature flag + """ + from src.core.feature_flags import aiops_flags + if not force and not aiops_flags.AIOPS_P6_GOVERNANCE_ENABLED: + logger.debug("model_rollback_check_skipped") + return RollbackCheckResult() + + try: + return await self._check() + except Exception as e: + logger.error("model_rollback_check_error", error=str(e)) + return RollbackCheckResult() + + async def _check(self) -> RollbackCheckResult: + """讀取最近 N 次回放記錄,偵測衰退。""" + async with get_db_context() as db: + # 取最近 CONSECUTIVE_DECLINE_WEEKS + 1 筆回放記錄(包含 ok 和 degraded) + stmt = ( + select(AiGovernanceEvent) + .where( + AiGovernanceEvent.event_type.in_(["replay_ok", "replay_degraded"]) + ) + .order_by(desc(AiGovernanceEvent.triggered_at)) + .limit(CONSECUTIVE_DECLINE_WEEKS + 1) + ) + result = await db.execute(stmt) + events = list(result.scalars().all()) + + if len(events) < 2: + logger.info("model_rollback_insufficient_history", count=len(events)) + return RollbackCheckResult(checked_weeks=len(events)) + + # 按時間由舊到新排列 + events.reverse() + rates = [] + for ev in events: + details = ev.details or {} + rate = details.get("consistency_rate", 1.0) + rates.append(float(rate)) + + # 計算連續衰退次數(由新到舊) + recent = rates[::-1] # 最新在前 + consecutive = 0 + for i in range(len(recent) - 1): + if recent[i] < recent[i + 1]: + consecutive += 1 + else: + break + + absolute_floor = rates[-1] < ABSOLUTE_FLOOR_RATE if rates else False + retrain_recommended = ( + consecutive >= CONSECUTIVE_DECLINE_WEEKS or absolute_floor + ) + + result_obj = RollbackCheckResult( + checked_weeks=len(rates), + consistency_rates=rates, + consecutive_declines=consecutive, + absolute_floor_breached=absolute_floor, + retrain_recommended=retrain_recommended, + ) + + if retrain_recommended: + await self._maybe_trigger_conservative_mode(result_obj) + + logger.info( + "model_rollback_check_done", + consecutive_declines=consecutive, + latest_rate=rates[-1] if rates else None, + retrain_recommended=retrain_recommended, + ) + return result_obj + + async def _maybe_trigger_conservative_mode( + self, result: RollbackCheckResult + ) -> None: + """ + 若距上次 conservative_mode 事件超過 RETRAIN_COOLDOWN_DAYS, + 寫入新的 conservative_mode 事件並發送告警。 + """ + from datetime import timedelta + cutoff = now_taipei() - timedelta(days=RETRAIN_COOLDOWN_DAYS) + + async with get_db_context() as db: + # 查是否在冷卻期內 + stmt = ( + select(AiGovernanceEvent) + .where( + and_( + AiGovernanceEvent.event_type == "conservative_mode", + AiGovernanceEvent.triggered_at >= cutoff, + ) + ) + .limit(1) + ) + recent_evt = (await db.execute(stmt)).scalar_one_or_none() + + if recent_evt: + result.cooldown_active = True + logger.info( + "model_rollback_conservative_mode_cooldown", + cooldown_days=RETRAIN_COOLDOWN_DAYS, + ) + return + + # 寫入 conservative_mode 事件 + from sqlalchemy import insert as sa_insert + await db.execute( + sa_insert(AiGovernanceEvent).values( + id=str(uuid.uuid4()), + event_type="conservative_mode", + details={ + "consecutive_declines": result.consecutive_declines, + "consistency_rates": result.consistency_rates, + "absolute_floor_breached": result.absolute_floor_breached, + "triggered_by": "model_rollback_service", + }, + resolved=False, + ) + ) + result.conservative_mode_triggered = True + logger.warning( + "model_rollback_conservative_mode_triggered", + consecutive_declines=result.consecutive_declines, + rates=result.consistency_rates, + ) + + # 發送 Telegram 告警(best-effort) + await self._notify_retrain_needed(result) + + async def _notify_retrain_needed(self, result: RollbackCheckResult) -> None: + """發送 Telegram Tier-2 通知,提醒 SRE 評估 retrain。""" + try: + from src.services.notification_service import get_notification_service + svc = get_notification_service() + msg = ( + "⚠️ AI 決策品質衰退偵測\n\n" + f"連續衰退次數: {result.consecutive_declines} 週\n" + f"最近一致率: {result.consistency_rates[-1]:.1%}\n" + f"絕對閾值突破: {'是' if result.absolute_floor_breached else '否'}\n\n" + "建議動作: 評估 fine-tune retrain 或回滾至上一版本" + ) + await svc.send_system_alert(message=msg, level="warning") + except Exception as e: + logger.warning("model_rollback_notify_failed", error=str(e)) + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_instance: ModelRollbackService | None = None + + +def get_model_rollback_service() -> ModelRollbackService: + global _instance + if _instance is None: + _instance = ModelRollbackService() + return _instance