fix(aiops): ADR-092 三修 — Playbook enum崩潰 + Telegram永久靜默 + 採納失敗 + AI自健診
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 13m33s

B1 playbook_service.py: evolver setattr傳str而非PlaybookStatus enum
  → _pg_upsert playbook.status.value炸(163次/48h),修:update_with_validation強制enum轉型

B2 approval_db.py + webhooks.py: find_by_fingerprint PENDING誤收斂
  → PENDING≠Telegram已發;修:成功push後mark tg_sent:{fingerprint} Redis(24h TTL)
  → find_by_fingerprint debounce窗外PENDING必須Redis確認才收斂

drift_adopt_service.py: telegram_gateway呼叫adopt_drift(report_id)但方法不存在
  → 新增adopt_drift()包裝:從DB載入DriftReport後委派adopt(),修復採納失敗

B3 ai_slo_watchdog_job.py + main.py: AI無法感知自身故障(MASTER §1.1盲區)
  → 新增每15分鐘自健診:W-1 SLO違反 W-2 TG靜默偵測 W-3 飛輪成功率
  → 任一異常→TYPE-8M send_meta_alert;Redis去重1h

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-20 20:00:06 +08:00
parent 1744b1e923
commit 156a52f807
6 changed files with 230 additions and 0 deletions

View File

@@ -312,6 +312,8 @@ async def _push_to_telegram_background(
diff_summary: str = "",
# 2026-04-12 ogt: ADR-075 斷點 E 修復 — alert_category 傳入以啟用 TYPE-8M 路由
alert_category: str = "",
# 2026-04-20 ogt: ADR-092 tg_sent Redis 標記(防收斂靜默)
fingerprint: str = "",
) -> None:
"""
背景任務: 推送待簽核卡片到 Telegram (v7.0 含 SignOz 整合)
@@ -347,6 +349,8 @@ async def _push_to_telegram_background(
approval_id=approval_id,
incident_id=incident_id,
)
if fingerprint:
await get_approval_service().mark_telegram_confirmed(fingerprint)
return
# 2026-04-12 ogt: ADR-075 斷點 E 修復 — TYPE-8M Meta-System 使用專屬卡片
@@ -367,6 +371,8 @@ async def _push_to_telegram_background(
incident_id=incident_id,
alert_category=alert_category,
)
if fingerprint:
await get_approval_service().mark_telegram_confirmed(fingerprint)
return
# 如果是收斂告警,在訊息中加入聚合次數
@@ -414,6 +420,8 @@ async def _push_to_telegram_background(
ai_tokens=ai_tokens,
ai_cost=f"${ai_cost:.6f}",
)
if fingerprint:
await get_approval_service().mark_telegram_confirmed(fingerprint)
# 2026-04-08 Claude Code: 記錄 Telegram 推送事件
try:
@@ -989,6 +997,7 @@ async def receive_alert(
"disk_full": "storage",
"ssl_expiry": "ssl_cert",
}.get(alert.alert_type, "general"),
fingerprint=fingerprint,
)
return AlertResponse(
@@ -1204,6 +1213,7 @@ async def _process_new_alert_background(
incident_id=incident_id,
notification_type=notification_type,
alert_category=alert_category,
fingerprint=fingerprint,
)
record_alert_chain_success("alertmanager")
@@ -1257,6 +1267,7 @@ async def _process_new_alert_background(
incident_id=fallback_incident_id,
notification_type=notification_type,
alert_category=alert_category,
fingerprint=fingerprint,
)
except Exception as e:

View File

@@ -0,0 +1,148 @@
"""
AI SLO Watchdog Job — 系統自健診(每 15 分鐘)
=============================================
MASTER §1.1 AI 自主化方向:系統必須能感知自身故障。
ADR-092 (2026-04-20 ogt + Claude Opus 4.7 Asia/Taipei)
檢查項目:
W-1 AI SLO 違反決策品質7d 滾動)
W-2 Telegram 靜默偵測PENDING 告警無 tg_sent 確認超過 30 分鐘)
W-3 飛輪 execution_success_rate 低落(< 30%
任一異常 → send_meta_alertTYPE-8Mflywheel_health
去重Redis watchdog:alert:{dedup_hash} TTL 1h避免每 15 分鐘重複洗版
"""
from __future__ import annotations
import asyncio
import uuid
from datetime import UTC, datetime, timedelta
import structlog
from sqlalchemy import and_, select
from src.core.redis_client import get_redis
from src.db.base import get_db_context
from src.db.models import ApprovalRecord
from src.models.approval import ApprovalStatus
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
_INTERVAL_SEC = 900 # 每 15 分鐘
_DEDUP_TTL_SEC = 3600 # 同一告警 1 小時內不重複發送
_TG_SILENCE_THRESHOLD = 2 # PENDING 無 tg_sent 確認數量告警門檻
_FLYWHEEL_SUCCESS_MIN = 0.30 # 執行成功率下限
async def run_ai_slo_watchdog_loop() -> None:
"""
永久迴圈:每 15 分鐘自健診,異常時發送 TYPE-8M Meta-System 告警。
由 main.py lifespan 透過 asyncio.create_task() 啟動。
"""
logger.info("ai_slo_watchdog_started", interval_sec=_INTERVAL_SEC)
while True:
try:
await _check_once()
except Exception as e:
logger.warning("ai_slo_watchdog_error", error=str(e))
await asyncio.sleep(_INTERVAL_SEC)
async def _check_once() -> None:
violations: list[str] = []
# W-1: AI SLO 違反(決策品質 7d 滾動)
try:
from src.services.ai_slo_calculator import AiSloCalculator
report = await AiSloCalculator().calculate()
if report.any_violated:
violated = [m.name for m in report.metrics if m.violated]
violations.append(f"SLO 違反: {', '.join(violated)}")
except Exception as e:
logger.warning("watchdog_w1_slo_check_failed", error=str(e))
# W-2: Telegram 靜默偵測PENDING 無 tg_sent 確認 > 30 分鐘)
try:
silent_count = await _count_pending_no_tg_sent()
if silent_count >= _TG_SILENCE_THRESHOLD:
violations.append(f"{silent_count} 個 PENDING 告警超 30 分鐘無 Telegram 確認(疑似靜默故障)")
except Exception as e:
logger.warning("watchdog_w2_tg_silence_check_failed", error=str(e))
# W-3: 飛輪執行成功率過低
try:
from src.services.flywheel_stats_service import FlywheelStatsService
metrics = await FlywheelStatsService().compute()
if metrics and metrics.execution_success_rate < _FLYWHEEL_SUCCESS_MIN:
violations.append(f"飛輪執行成功率 {metrics.execution_success_rate:.1%} < {_FLYWHEEL_SUCCESS_MIN:.0%}")
except Exception as e:
logger.warning("watchdog_w3_flywheel_check_failed", error=str(e))
if not violations:
logger.debug("ai_slo_watchdog_all_ok", checks=3)
return
# 去重violations 相同內容 1 小時內不重複發
dedup_hash = f"{hash(tuple(sorted(violations))) & 0xFFFFFF:06x}"
dedup_key = f"watchdog:alert:{dedup_hash}"
redis = get_redis()
if await redis.exists(dedup_key):
logger.debug("ai_slo_watchdog_deduped", key=dedup_key)
return
await redis.setex(dedup_key, _DEDUP_TTL_SEC, "1")
# 發送 TYPE-8M Meta-System 告警
diagnosis = " | ".join(violations)
incident_id = f"META-{now_taipei().strftime('%Y%m%d%H%M%S')}"
try:
from src.services.telegram_gateway import get_telegram_gateway
await get_telegram_gateway().send_meta_alert(
incident_id=incident_id,
approval_id=str(uuid.uuid4()),
alertname="AI 自健診異常",
alert_category="flywheel_health",
diagnosis=diagnosis,
severity_level="critical",
system_impact=f"{len(violations)} 項 KPI 異常,飛輪自動化能力可能降級",
)
logger.warning(
"ai_slo_watchdog_alert_sent",
incident_id=incident_id,
violation_count=len(violations),
violations=violations,
)
except Exception as e:
logger.error("ai_slo_watchdog_telegram_failed", error=str(e), violations=violations)
async def _count_pending_no_tg_sent() -> int:
"""
查詢 PENDING 超過 30 分鐘且 Redis tg_sent:{fingerprint} 無確認的告警數量。
與 ADR-092 B2 修復配合B2 修復後新告警會標記 tg_sent
此查詢偵測仍存在的靜默告警B2 修復前殘留 + 未來潛在故障)。
"""
cutoff = datetime.now(UTC) - timedelta(minutes=30)
redis = get_redis()
silent = 0
async with get_db_context() as db:
result = await db.execute(
select(ApprovalRecord.id, ApprovalRecord.fingerprint)
.where(
and_(
ApprovalRecord.status == ApprovalStatus.PENDING,
ApprovalRecord.created_at <= cutoff,
ApprovalRecord.fingerprint.isnot(None),
)
)
.limit(20)
)
rows = result.all()
for row in rows:
fp = row.fingerprint
if fp and not await redis.exists(f"tg_sent:{fp}"):
silent += 1
return silent

View File

@@ -536,6 +536,15 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("offline_replay_loop_schedule_failed", error=str(e))
# ADR-092 (2026-04-20 ogt + Claude Opus 4.7): AI SLO Watchdog — 每 15 分鐘自健診
# MASTER §1.1系統必須能感知自身故障W-1 SLO + W-2 Telegram靜默 + W-3 飛輪成功率
try:
from src.jobs.ai_slo_watchdog_job import run_ai_slo_watchdog_loop
asyncio.create_task(run_ai_slo_watchdog_loop())
logger.info("ai_slo_watchdog_scheduled", interval_sec=900)
except Exception as e:
logger.warning("ai_slo_watchdog_schedule_failed", error=str(e))
yield
# Shutdown

View File

@@ -20,6 +20,7 @@ from uuid import UUID
import structlog
from sqlalchemy import and_, or_, select, update
from src.core.redis_client import get_redis
from src.core.trust_engine import classify_risk, get_required_signatures
from src.db.base import get_db_context
from src.db.models import ApprovalRecord, TimelineEvent
@@ -330,10 +331,45 @@ class ApprovalDBService:
hit_count=record.hit_count,
status=record.status.value if hasattr(record.status, 'value') else record.status,
)
# 2026-04-20 ogt + Claude Opus 4.7: ADR-092 tg_sent Redis 驗證
# PENDING 記錄不代表 Telegram 已發送(可能因網路/Token錯誤而靜默失敗
# 僅在 debounce 窗口外的 PENDING 收斂時,必須確認 Redis 有 tg_sent 標記
within_debounce = record.created_at >= cutoff_time
if not within_debounce:
try:
r = get_redis()
tg_confirmed = await r.exists(f"tg_sent:{fingerprint}")
except Exception as _re:
tg_confirmed = False
logger.warning("tg_sent_redis_check_failed", fingerprint=fingerprint, error=str(_re))
if not tg_confirmed:
logger.warning(
"fingerprint_pending_no_tg_confirmation",
fingerprint=fingerprint,
approval_id=str(record.id),
created_at=record.created_at.isoformat(),
)
return None # 視為新告警,重新發送 Telegram
return approval_record_to_request(record)
return None
async def mark_telegram_confirmed(self, fingerprint: str, ttl: int = 86400) -> None:
"""
2026-04-20 ogt + Claude Opus 4.7: ADR-092
記錄 Telegram 已成功發送,防止 PENDING 誤收斂造成永久靜默。
TTL 與 PENDING_TTL_HOURS 對齊24h
"""
try:
r = get_redis()
await r.setex(f"tg_sent:{fingerprint}", ttl, "1")
logger.debug("tg_sent_marked", fingerprint=fingerprint, ttl=ttl)
except Exception as e:
logger.warning("tg_sent_mark_failed", fingerprint=fingerprint, error=str(e))
async def increment_hit_count(
self,
approval_id: UUID,

View File

@@ -59,6 +59,18 @@ class DriftAdoptService:
self._repo = settings.GITEA_REPO_NAME
self._k8s_dir = pathlib.Path("k8s")
async def adopt_drift(self, report_id: str) -> dict:
"""
2026-04-20 ogt + Claude Opus 4.7: Telegram 按鈕呼叫入口
從 DB 載入 DriftReport 後委派給 adopt()。
telegram_gateway._handle_drift_action 呼叫此方法。
"""
from src.repositories.drift_repository import get_drift_repository
report = await get_drift_repository().get(report_id)
if not report:
return {"success": False, "message": f"Report {report_id} not found"}
return await self.adopt(report)
async def adopt(self, report: "DriftReport", field_description: str = "") -> dict:
"""
將漂移寫回 Git建立 branch + commit + PR

View File

@@ -537,8 +537,22 @@ class PlaybookService:
return None
# 應用更新
# 2026-04-20 ogt + Claude Opus 4.7: setattr 不觸發 Pydantic validation
# Evolver 傳入 PlaybookStatus.DEPRECATED.valuestr "deprecated"
# → _pg_upsert playbook.status.value 炸:'str' has no attribute 'value'
# 修Enum 欄位強制轉型,防止 str 混入 Playbook 物件
for field, value in update_data.items():
if value is not None and hasattr(playbook, field):
if field == "status" and isinstance(value, str) and not isinstance(value, PlaybookStatus):
try:
value = PlaybookStatus(value)
except ValueError:
pass
elif field == "source" and isinstance(value, str) and not isinstance(value, PlaybookSource):
try:
value = PlaybookSource(value)
except ValueError:
pass
setattr(playbook, field, value)
return await self._repository.update(playbook)