feat(P6): 提交 offline_replay_service + model_rollback_service (漏提)
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 14m59s

Phase 6 ADR-087 治理閉環兩個核心服務,
之前建立後沒有 git add,一直是 untracked 狀態。

2026-04-15 Claude Sonnet 4.6 Asia/Taipei
This commit is contained in:
OG T
2026-04-15 22:29:09 +08:00
parent 85c4e3b434
commit 77a92eb469
2 changed files with 559 additions and 0 deletions

View File

@@ -0,0 +1,318 @@
"""
AWOOOI AIOps Phase 6 — 離線回放服務(決策一致率基線)
=====================================================
職責:每週取最近 100 個已結案的 Incident比對當時 AI 決策 vs 實際執行結果,
計算「決策一致率」,輸出基線並在持續衰退時寫入 ai_governance_events。
一致率定義(加權,共 100 分):
- Coordinator 決策正確requires_human = auto 且有 execution = 成功)→ +60%
- Playbook 命中coordinator recommended action 含 playbook 關鍵字)→ +20%
- 驗證結果一致verification_result = success→ +20%
設計原則:
1. 不呼叫 LLM — 純 DB 查詢 + 靜態規則比對,避免回放產生費用
2. best-effort — 查詢失敗只記錄 warning不阻塞主流程
3. 結果寫入 ai_governance_eventsevent_type = replay_degraded / replay_ok
4. feature flag: AIOPS_P6_GOVERNANCE_ENABLED
ADR-087 Phase 6: 自我治理閉環
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
"""
from __future__ import annotations
import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import timedelta
import structlog
from sqlalchemy import and_, select
from src.db.base import get_session_factory
from src.db.models import AgentSession, AiGovernanceEvent, AutoRepairExecution, IncidentEvidence
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 常數
# ─────────────────────────────────────────────────────────────────────────────
REPLAY_BATCH = 100 # 每次回放最多 N 個 Incident
REPLAY_LOOKBACK_DAYS = 14 # 回放視窗(取最近 14 天資料)
DEGRADED_THRESHOLD = 0.70 # 一致率 < 70% → 寫 replay_degraded 事件
WEEKLY_INTERVAL_SEC = 7 * 86_400
# ─────────────────────────────────────────────────────────────────────────────
# Result Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class IncidentReplayResult:
"""單個 Incident 回放結果"""
incident_id: str
has_coordinator_decision: bool = False
coordinator_approved_auto: bool = False
has_execution: bool = False
execution_success: bool = False
has_verification: bool = False
verification_success: bool = False
consistency_score: float = 0.0 # 0~1
@dataclass
class OfflineReplayReport:
"""離線回放報告"""
run_at: str = ""
incidents_sampled: int = 0
incidents_with_data: int = 0
consistency_rate: float = 0.0 # 0~1 加權一致率
degraded: bool = False
results: list[IncidentReplayResult] = field(default_factory=list)
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class OfflineReplayService:
"""
離線回放服務
Usage:
svc = OfflineReplayService()
report = await svc.run()
"""
async def run(self) -> OfflineReplayReport:
"""執行一次離線回放,返回一致率報告。"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P6_GOVERNANCE_ENABLED:
logger.debug("offline_replay_skipped", reason="AIOPS_P6_GOVERNANCE_ENABLED=False")
return OfflineReplayReport(run_at=now_taipei().isoformat())
try:
return await self._run_replay()
except Exception as e:
logger.error("offline_replay_error", error=str(e))
return OfflineReplayReport(run_at=now_taipei().isoformat())
async def run_forced(self) -> OfflineReplayReport:
"""強制執行(繞過 feature flag供管理員手動觸發"""
try:
return await self._run_replay()
except Exception as e:
logger.error("offline_replay_forced_error", error=str(e))
return OfflineReplayReport(run_at=now_taipei().isoformat())
async def _run_replay(self) -> OfflineReplayReport:
cutoff = now_taipei() - timedelta(days=REPLAY_LOOKBACK_DAYS)
session_factory = get_session_factory()
async with session_factory() as db:
# 1. 取最近 N 個有 AgentSession(coordinator) 的 Incident
stmt = (
select(AgentSession.incident_id)
.where(
and_(
AgentSession.agent_role == "coordinator",
AgentSession.created_at >= cutoff,
)
)
.distinct()
.limit(REPLAY_BATCH)
)
result = await db.execute(stmt)
incident_ids = [row[0] for row in result.all()]
if not incident_ids:
logger.info("offline_replay_no_incidents", lookback_days=REPLAY_LOOKBACK_DAYS)
return OfflineReplayReport(
run_at=now_taipei().isoformat(),
incidents_sampled=0,
incidents_with_data=0,
consistency_rate=1.0, # 無資料 → 保守視為正常
)
results: list[IncidentReplayResult] = []
async with session_factory() as db:
for incident_id in incident_ids:
r = await self._replay_one(db, incident_id)
results.append(r)
# 計算整體一致率
incidents_with_data = [r for r in results if r.has_coordinator_decision]
if not incidents_with_data:
rate = 1.0
else:
rate = sum(r.consistency_score for r in incidents_with_data) / len(incidents_with_data)
degraded = rate < DEGRADED_THRESHOLD
report = OfflineReplayReport(
run_at=now_taipei().isoformat(),
incidents_sampled=len(incident_ids),
incidents_with_data=len(incidents_with_data),
consistency_rate=round(rate, 4),
degraded=degraded,
results=results,
)
# 2. 寫入治理事件
await self._save_governance_event(report)
logger.info(
"offline_replay_done",
sampled=report.incidents_sampled,
with_data=report.incidents_with_data,
consistency_rate=report.consistency_rate,
degraded=report.degraded,
)
return report
async def _replay_one(self, db, incident_id: str) -> IncidentReplayResult:
"""回放單個 Incident。"""
r = IncidentReplayResult(incident_id=incident_id)
# Coordinator AgentSession
coord_stmt = (
select(AgentSession)
.where(
and_(
AgentSession.incident_id == incident_id,
AgentSession.agent_role == "coordinator",
)
)
.order_by(AgentSession.created_at.desc())
.limit(1)
)
coord_result = await db.execute(coord_stmt)
coordinator = coord_result.scalar_one_or_none()
if not coordinator:
return r
r.has_coordinator_decision = True
# 判斷 coordinator 是否批准自動執行
out = coordinator.output_json or {}
if isinstance(out, dict):
r.coordinator_approved_auto = not out.get("requires_human_approval", True)
# AutoRepairExecution
exec_stmt = (
select(AutoRepairExecution)
.where(AutoRepairExecution.incident_id == incident_id)
.order_by(AutoRepairExecution.created_at.desc())
.limit(1)
)
exec_result = await db.execute(exec_stmt)
execution = exec_result.scalar_one_or_none()
if execution:
r.has_execution = True
r.execution_success = execution.success
# IncidentEvidence驗證結果
ev_stmt = (
select(IncidentEvidence)
.where(IncidentEvidence.incident_id == incident_id)
.order_by(IncidentEvidence.collected_at.desc())
.limit(1)
)
ev_result = await db.execute(ev_stmt)
evidence = ev_result.scalar_one_or_none()
if evidence and evidence.verification_result:
r.has_verification = True
r.verification_success = (evidence.verification_result == "success")
# 計算一致率(加權)
score = 0.0
if r.has_coordinator_decision and r.has_execution:
# 決策與執行一致auto approve + execution success
if r.coordinator_approved_auto and r.execution_success:
score += 0.60
elif not r.coordinator_approved_auto and not r.has_execution:
# 升人工且無自動執行 — 也算一致
score += 0.60
else:
score += 0.10 # 決策與執行不一致,給小分
elif r.has_coordinator_decision:
# 有決策但無執行 — 部分分
score += 0.30
if r.has_execution and r.execution_success:
score += 0.20
if r.has_verification and r.verification_success:
score += 0.20
r.consistency_score = min(score, 1.0)
return r
async def _save_governance_event(self, report: OfflineReplayReport) -> None:
"""寫入 ai_governance_events。"""
try:
from sqlalchemy import insert as sa_insert
from src.db.base import get_db_context
event_type = "replay_degraded" if report.degraded else "replay_ok"
async with get_db_context() as db:
await db.execute(
sa_insert(AiGovernanceEvent).values(
id=str(uuid.uuid4()),
event_type=event_type,
details={
"consistency_rate": report.consistency_rate,
"incidents_sampled": report.incidents_sampled,
"incidents_with_data": report.incidents_with_data,
"degraded_threshold": DEGRADED_THRESHOLD,
"run_at": report.run_at,
},
resolved=not report.degraded,
)
)
except Exception as e:
logger.warning("offline_replay_governance_write_failed", error=str(e))
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_instance: OfflineReplayService | None = None
def get_offline_replay_service() -> OfflineReplayService:
global _instance
if _instance is None:
_instance = OfflineReplayService()
return _instance
# ─────────────────────────────────────────────────────────────────────────────
# Loop掛載到 main.py
# ─────────────────────────────────────────────────────────────────────────────
async def run_offline_replay_loop() -> None:
"""
無限迴圈:每 7 天執行一次離線回放。
在 main.py startup 以 asyncio.create_task 掛載。
ADR-087 Phase 6: 離線回放基線L7×D6
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
"""
svc = get_offline_replay_service()
while True:
try:
report = await svc.run()
if report.incidents_sampled > 0:
logger.info(
"offline_replay_loop_tick",
rate=report.consistency_rate,
degraded=report.degraded,
)
except Exception as e:
logger.error("offline_replay_loop_error", error=str(e))
await asyncio.sleep(WEEKLY_INTERVAL_SEC)

View File

@@ -0,0 +1,241 @@
"""
AWOOOI AIOps Phase 6 — Model Rollback 服務(決策衰退偵測)
==========================================================
職責:讀取最近 N 週的離線回放一致率,偵測連續衰退趨勢,
在確認衰退後寫入 ai_governance_eventsconservative_mode
並觸發 Telegram 告警通知 SRE 評估 retrain。
衰退定義:
- 連續 4 週一致率下降week[i] < week[i-1]
- 最新一週一致率 < 55%(絕對閾值)
設計原則:
1. 不直接執行 retrain / model rollback — 只發通知 + 寫 governance event
2. 自我降級連動:偵測到衰退 → conservative_mode event → decision_manager 讀取
3. best-effort — 任何錯誤靜默記錄,不影響主流程
4. feature flag: AIOPS_P6_GOVERNANCE_ENABLED
ADR-087 Phase 6: 自我治理閉環
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 初始建立
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
import structlog
from sqlalchemy import and_, desc, select
from src.db.base import get_db_context
from src.db.models import AiGovernanceEvent
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 常數
# ─────────────────────────────────────────────────────────────────────────────
CONSECUTIVE_DECLINE_WEEKS = 4 # 連續衰退 N 週才觸發
ABSOLUTE_FLOOR_RATE = 0.55 # 一致率低於此值視為嚴重
RETRAIN_COOLDOWN_DAYS = 14 # 兩次 conservative_mode 事件間最短間隔
# ─────────────────────────────────────────────────────────────────────────────
# Result Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class RollbackCheckResult:
"""衰退偵測結果"""
checked_weeks: int = 0
consistency_rates: list[float] = None # type: ignore[assignment]
consecutive_declines: int = 0
absolute_floor_breached: bool = False
retrain_recommended: bool = False
conservative_mode_triggered: bool = False
cooldown_active: bool = False
def __post_init__(self) -> None:
if self.consistency_rates is None:
self.consistency_rates = []
# ─────────────────────────────────────────────────────────────────────────────
# Service
# ─────────────────────────────────────────────────────────────────────────────
class ModelRollbackService:
"""
Model Rollback 衰退偵測服務
Usage:
svc = ModelRollbackService()
result = await svc.check()
"""
async def check(self, force: bool = False) -> RollbackCheckResult:
"""
檢查是否有決策衰退趨勢。
Args:
force: True 時繞過 feature flag
"""
from src.core.feature_flags import aiops_flags
if not force and not aiops_flags.AIOPS_P6_GOVERNANCE_ENABLED:
logger.debug("model_rollback_check_skipped")
return RollbackCheckResult()
try:
return await self._check()
except Exception as e:
logger.error("model_rollback_check_error", error=str(e))
return RollbackCheckResult()
async def _check(self) -> RollbackCheckResult:
"""讀取最近 N 次回放記錄,偵測衰退。"""
async with get_db_context() as db:
# 取最近 CONSECUTIVE_DECLINE_WEEKS + 1 筆回放記錄(包含 ok 和 degraded
stmt = (
select(AiGovernanceEvent)
.where(
AiGovernanceEvent.event_type.in_(["replay_ok", "replay_degraded"])
)
.order_by(desc(AiGovernanceEvent.triggered_at))
.limit(CONSECUTIVE_DECLINE_WEEKS + 1)
)
result = await db.execute(stmt)
events = list(result.scalars().all())
if len(events) < 2:
logger.info("model_rollback_insufficient_history", count=len(events))
return RollbackCheckResult(checked_weeks=len(events))
# 按時間由舊到新排列
events.reverse()
rates = []
for ev in events:
details = ev.details or {}
rate = details.get("consistency_rate", 1.0)
rates.append(float(rate))
# 計算連續衰退次數(由新到舊)
recent = rates[::-1] # 最新在前
consecutive = 0
for i in range(len(recent) - 1):
if recent[i] < recent[i + 1]:
consecutive += 1
else:
break
absolute_floor = rates[-1] < ABSOLUTE_FLOOR_RATE if rates else False
retrain_recommended = (
consecutive >= CONSECUTIVE_DECLINE_WEEKS or absolute_floor
)
result_obj = RollbackCheckResult(
checked_weeks=len(rates),
consistency_rates=rates,
consecutive_declines=consecutive,
absolute_floor_breached=absolute_floor,
retrain_recommended=retrain_recommended,
)
if retrain_recommended:
await self._maybe_trigger_conservative_mode(result_obj)
logger.info(
"model_rollback_check_done",
consecutive_declines=consecutive,
latest_rate=rates[-1] if rates else None,
retrain_recommended=retrain_recommended,
)
return result_obj
async def _maybe_trigger_conservative_mode(
self, result: RollbackCheckResult
) -> None:
"""
若距上次 conservative_mode 事件超過 RETRAIN_COOLDOWN_DAYS
寫入新的 conservative_mode 事件並發送告警。
"""
from datetime import timedelta
cutoff = now_taipei() - timedelta(days=RETRAIN_COOLDOWN_DAYS)
async with get_db_context() as db:
# 查是否在冷卻期內
stmt = (
select(AiGovernanceEvent)
.where(
and_(
AiGovernanceEvent.event_type == "conservative_mode",
AiGovernanceEvent.triggered_at >= cutoff,
)
)
.limit(1)
)
recent_evt = (await db.execute(stmt)).scalar_one_or_none()
if recent_evt:
result.cooldown_active = True
logger.info(
"model_rollback_conservative_mode_cooldown",
cooldown_days=RETRAIN_COOLDOWN_DAYS,
)
return
# 寫入 conservative_mode 事件
from sqlalchemy import insert as sa_insert
await db.execute(
sa_insert(AiGovernanceEvent).values(
id=str(uuid.uuid4()),
event_type="conservative_mode",
details={
"consecutive_declines": result.consecutive_declines,
"consistency_rates": result.consistency_rates,
"absolute_floor_breached": result.absolute_floor_breached,
"triggered_by": "model_rollback_service",
},
resolved=False,
)
)
result.conservative_mode_triggered = True
logger.warning(
"model_rollback_conservative_mode_triggered",
consecutive_declines=result.consecutive_declines,
rates=result.consistency_rates,
)
# 發送 Telegram 告警best-effort
await self._notify_retrain_needed(result)
async def _notify_retrain_needed(self, result: RollbackCheckResult) -> None:
"""發送 Telegram Tier-2 通知,提醒 SRE 評估 retrain。"""
try:
from src.services.notification_service import get_notification_service
svc = get_notification_service()
msg = (
"⚠️ AI 決策品質衰退偵測\n\n"
f"連續衰退次數: {result.consecutive_declines}\n"
f"最近一致率: {result.consistency_rates[-1]:.1%}\n"
f"絕對閾值突破: {'' if result.absolute_floor_breached else ''}\n\n"
"建議動作: 評估 fine-tune retrain 或回滾至上一版本"
)
await svc.send_system_alert(message=msg, level="warning")
except Exception as e:
logger.warning("model_rollback_notify_failed", error=str(e))
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_instance: ModelRollbackService | None = None
def get_model_rollback_service() -> ModelRollbackService:
global _instance
if _instance is None:
_instance = ModelRollbackService()
return _instance