From 156a52f807e30bbfc067fa7788865507e12721f1 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 20 Apr 2026 20:00:06 +0800 Subject: [PATCH] =?UTF-8?q?fix(aiops):=20ADR-092=20=E4=B8=89=E4=BF=AE=20?= =?UTF-8?q?=E2=80=94=20Playbook=20enum=E5=B4=A9=E6=BD=B0=20+=20Telegram?= =?UTF-8?q?=E6=B0=B8=E4=B9=85=E9=9D=9C=E9=BB=98=20+=20=E6=8E=A1=E7=B4=8D?= =?UTF-8?q?=E5=A4=B1=E6=95=97=20+=20AI=E8=87=AA=E5=81=A5=E8=A8=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit B1 playbook_service.py: evolver setattr傳str而非PlaybookStatus enum → _pg_upsert playbook.status.value炸(163次/48h),修:update_with_validation強制enum轉型 B2 approval_db.py + webhooks.py: find_by_fingerprint PENDING誤收斂 → PENDING≠Telegram已發;修:成功push後mark tg_sent:{fingerprint} Redis(24h TTL) → find_by_fingerprint debounce窗外PENDING必須Redis確認才收斂 drift_adopt_service.py: telegram_gateway呼叫adopt_drift(report_id)但方法不存在 → 新增adopt_drift()包裝:從DB載入DriftReport後委派adopt(),修復採納失敗 B3 ai_slo_watchdog_job.py + main.py: AI無法感知自身故障(MASTER §1.1盲區) → 新增每15分鐘自健診:W-1 SLO違反 W-2 TG靜默偵測 W-3 飛輪成功率 → 任一異常→TYPE-8M send_meta_alert;Redis去重1h Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/api/v1/webhooks.py | 11 ++ apps/api/src/jobs/ai_slo_watchdog_job.py | 148 +++++++++++++++++++ apps/api/src/main.py | 9 ++ apps/api/src/services/approval_db.py | 36 +++++ apps/api/src/services/drift_adopt_service.py | 12 ++ apps/api/src/services/playbook_service.py | 14 ++ 6 files changed, 230 insertions(+) create mode 100644 apps/api/src/jobs/ai_slo_watchdog_job.py diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index 2767d3a0..e1beccce 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -312,6 +312,8 @@ async def _push_to_telegram_background( diff_summary: str = "", # 2026-04-12 ogt: ADR-075 斷點 E 修復 — alert_category 傳入以啟用 TYPE-8M 路由 alert_category: str = "", + # 2026-04-20 ogt: ADR-092 tg_sent Redis 標記(防收斂靜默) + fingerprint: str = "", ) -> None: """ 背景任務: 推送待簽核卡片到 Telegram (v7.0 含 SignOz 整合) @@ -347,6 +349,8 @@ async def _push_to_telegram_background( approval_id=approval_id, incident_id=incident_id, ) + if fingerprint: + await get_approval_service().mark_telegram_confirmed(fingerprint) return # 2026-04-12 ogt: ADR-075 斷點 E 修復 — TYPE-8M Meta-System 使用專屬卡片 @@ -367,6 +371,8 @@ async def _push_to_telegram_background( incident_id=incident_id, alert_category=alert_category, ) + if fingerprint: + await get_approval_service().mark_telegram_confirmed(fingerprint) return # 如果是收斂告警,在訊息中加入聚合次數 @@ -414,6 +420,8 @@ async def _push_to_telegram_background( ai_tokens=ai_tokens, ai_cost=f"${ai_cost:.6f}", ) + if fingerprint: + await get_approval_service().mark_telegram_confirmed(fingerprint) # 2026-04-08 Claude Code: 記錄 Telegram 推送事件 try: @@ -989,6 +997,7 @@ async def receive_alert( "disk_full": "storage", "ssl_expiry": "ssl_cert", }.get(alert.alert_type, "general"), + fingerprint=fingerprint, ) return AlertResponse( @@ -1204,6 +1213,7 @@ async def _process_new_alert_background( incident_id=incident_id, notification_type=notification_type, alert_category=alert_category, + fingerprint=fingerprint, ) record_alert_chain_success("alertmanager") @@ -1257,6 +1267,7 @@ async def _process_new_alert_background( incident_id=fallback_incident_id, notification_type=notification_type, alert_category=alert_category, + fingerprint=fingerprint, ) except Exception as e: diff --git a/apps/api/src/jobs/ai_slo_watchdog_job.py b/apps/api/src/jobs/ai_slo_watchdog_job.py new file mode 100644 index 00000000..02b1bc6a --- /dev/null +++ b/apps/api/src/jobs/ai_slo_watchdog_job.py @@ -0,0 +1,148 @@ +""" +AI SLO Watchdog Job — 系統自健診(每 15 分鐘) +============================================= +MASTER §1.1 AI 自主化方向:系統必須能感知自身故障。 +ADR-092 (2026-04-20 ogt + Claude Opus 4.7 Asia/Taipei) + +檢查項目: + W-1 AI SLO 違反(決策品質,7d 滾動) + W-2 Telegram 靜默偵測(PENDING 告警無 tg_sent 確認超過 30 分鐘) + W-3 飛輪 execution_success_rate 低落(< 30%) + +任一異常 → send_meta_alert(TYPE-8M,flywheel_health) +去重:Redis watchdog:alert:{dedup_hash} TTL 1h,避免每 15 分鐘重複洗版 +""" +from __future__ import annotations + +import asyncio +import uuid +from datetime import UTC, datetime, timedelta + +import structlog +from sqlalchemy import and_, select + +from src.core.redis_client import get_redis +from src.db.base import get_db_context +from src.db.models import ApprovalRecord +from src.models.approval import ApprovalStatus +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +_INTERVAL_SEC = 900 # 每 15 分鐘 +_DEDUP_TTL_SEC = 3600 # 同一告警 1 小時內不重複發送 +_TG_SILENCE_THRESHOLD = 2 # PENDING 無 tg_sent 確認數量告警門檻 +_FLYWHEEL_SUCCESS_MIN = 0.30 # 執行成功率下限 + + +async def run_ai_slo_watchdog_loop() -> None: + """ + 永久迴圈:每 15 分鐘自健診,異常時發送 TYPE-8M Meta-System 告警。 + 由 main.py lifespan 透過 asyncio.create_task() 啟動。 + """ + logger.info("ai_slo_watchdog_started", interval_sec=_INTERVAL_SEC) + while True: + try: + await _check_once() + except Exception as e: + logger.warning("ai_slo_watchdog_error", error=str(e)) + await asyncio.sleep(_INTERVAL_SEC) + + +async def _check_once() -> None: + violations: list[str] = [] + + # W-1: AI SLO 違反(決策品質 7d 滾動) + try: + from src.services.ai_slo_calculator import AiSloCalculator + report = await AiSloCalculator().calculate() + if report.any_violated: + violated = [m.name for m in report.metrics if m.violated] + violations.append(f"SLO 違反: {', '.join(violated)}") + except Exception as e: + logger.warning("watchdog_w1_slo_check_failed", error=str(e)) + + # W-2: Telegram 靜默偵測(PENDING 無 tg_sent 確認 > 30 分鐘) + try: + silent_count = await _count_pending_no_tg_sent() + if silent_count >= _TG_SILENCE_THRESHOLD: + violations.append(f"{silent_count} 個 PENDING 告警超 30 分鐘無 Telegram 確認(疑似靜默故障)") + except Exception as e: + logger.warning("watchdog_w2_tg_silence_check_failed", error=str(e)) + + # W-3: 飛輪執行成功率過低 + try: + from src.services.flywheel_stats_service import FlywheelStatsService + metrics = await FlywheelStatsService().compute() + if metrics and metrics.execution_success_rate < _FLYWHEEL_SUCCESS_MIN: + violations.append(f"飛輪執行成功率 {metrics.execution_success_rate:.1%} < {_FLYWHEEL_SUCCESS_MIN:.0%}") + except Exception as e: + logger.warning("watchdog_w3_flywheel_check_failed", error=str(e)) + + if not violations: + logger.debug("ai_slo_watchdog_all_ok", checks=3) + return + + # 去重:violations 相同內容 1 小時內不重複發 + dedup_hash = f"{hash(tuple(sorted(violations))) & 0xFFFFFF:06x}" + dedup_key = f"watchdog:alert:{dedup_hash}" + redis = get_redis() + if await redis.exists(dedup_key): + logger.debug("ai_slo_watchdog_deduped", key=dedup_key) + return + await redis.setex(dedup_key, _DEDUP_TTL_SEC, "1") + + # 發送 TYPE-8M Meta-System 告警 + diagnosis = " | ".join(violations) + incident_id = f"META-{now_taipei().strftime('%Y%m%d%H%M%S')}" + try: + from src.services.telegram_gateway import get_telegram_gateway + await get_telegram_gateway().send_meta_alert( + incident_id=incident_id, + approval_id=str(uuid.uuid4()), + alertname="AI 自健診異常", + alert_category="flywheel_health", + diagnosis=diagnosis, + severity_level="critical", + system_impact=f"{len(violations)} 項 KPI 異常,飛輪自動化能力可能降級", + ) + logger.warning( + "ai_slo_watchdog_alert_sent", + incident_id=incident_id, + violation_count=len(violations), + violations=violations, + ) + except Exception as e: + logger.error("ai_slo_watchdog_telegram_failed", error=str(e), violations=violations) + + +async def _count_pending_no_tg_sent() -> int: + """ + 查詢 PENDING 超過 30 分鐘且 Redis tg_sent:{fingerprint} 無確認的告警數量。 + 與 ADR-092 B2 修復配合:B2 修復後新告警會標記 tg_sent; + 此查詢偵測仍存在的靜默告警(B2 修復前殘留 + 未來潛在故障)。 + """ + cutoff = datetime.now(UTC) - timedelta(minutes=30) + redis = get_redis() + silent = 0 + + async with get_db_context() as db: + result = await db.execute( + select(ApprovalRecord.id, ApprovalRecord.fingerprint) + .where( + and_( + ApprovalRecord.status == ApprovalStatus.PENDING, + ApprovalRecord.created_at <= cutoff, + ApprovalRecord.fingerprint.isnot(None), + ) + ) + .limit(20) + ) + rows = result.all() + + for row in rows: + fp = row.fingerprint + if fp and not await redis.exists(f"tg_sent:{fp}"): + silent += 1 + + return silent diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 37f86e1c..1ad6a07c 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -536,6 +536,15 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("offline_replay_loop_schedule_failed", error=str(e)) + # ADR-092 (2026-04-20 ogt + Claude Opus 4.7): AI SLO Watchdog — 每 15 分鐘自健診 + # MASTER §1.1:系統必須能感知自身故障,W-1 SLO + W-2 Telegram靜默 + W-3 飛輪成功率 + try: + from src.jobs.ai_slo_watchdog_job import run_ai_slo_watchdog_loop + asyncio.create_task(run_ai_slo_watchdog_loop()) + logger.info("ai_slo_watchdog_scheduled", interval_sec=900) + except Exception as e: + logger.warning("ai_slo_watchdog_schedule_failed", error=str(e)) + yield # Shutdown diff --git a/apps/api/src/services/approval_db.py b/apps/api/src/services/approval_db.py index fcd71664..f5739f57 100644 --- a/apps/api/src/services/approval_db.py +++ b/apps/api/src/services/approval_db.py @@ -20,6 +20,7 @@ from uuid import UUID import structlog from sqlalchemy import and_, or_, select, update +from src.core.redis_client import get_redis from src.core.trust_engine import classify_risk, get_required_signatures from src.db.base import get_db_context from src.db.models import ApprovalRecord, TimelineEvent @@ -330,10 +331,45 @@ class ApprovalDBService: hit_count=record.hit_count, status=record.status.value if hasattr(record.status, 'value') else record.status, ) + + # 2026-04-20 ogt + Claude Opus 4.7: ADR-092 tg_sent Redis 驗證 + # PENDING 記錄不代表 Telegram 已發送(可能因網路/Token錯誤而靜默失敗) + # 僅在 debounce 窗口外的 PENDING 收斂時,必須確認 Redis 有 tg_sent 標記 + within_debounce = record.created_at >= cutoff_time + if not within_debounce: + try: + r = get_redis() + tg_confirmed = await r.exists(f"tg_sent:{fingerprint}") + except Exception as _re: + tg_confirmed = False + logger.warning("tg_sent_redis_check_failed", fingerprint=fingerprint, error=str(_re)) + + if not tg_confirmed: + logger.warning( + "fingerprint_pending_no_tg_confirmation", + fingerprint=fingerprint, + approval_id=str(record.id), + created_at=record.created_at.isoformat(), + ) + return None # 視為新告警,重新發送 Telegram + return approval_record_to_request(record) return None + async def mark_telegram_confirmed(self, fingerprint: str, ttl: int = 86400) -> None: + """ + 2026-04-20 ogt + Claude Opus 4.7: ADR-092 + 記錄 Telegram 已成功發送,防止 PENDING 誤收斂造成永久靜默。 + TTL 與 PENDING_TTL_HOURS 對齊(24h)。 + """ + try: + r = get_redis() + await r.setex(f"tg_sent:{fingerprint}", ttl, "1") + logger.debug("tg_sent_marked", fingerprint=fingerprint, ttl=ttl) + except Exception as e: + logger.warning("tg_sent_mark_failed", fingerprint=fingerprint, error=str(e)) + async def increment_hit_count( self, approval_id: UUID, diff --git a/apps/api/src/services/drift_adopt_service.py b/apps/api/src/services/drift_adopt_service.py index 26fd8c9b..f049c87b 100644 --- a/apps/api/src/services/drift_adopt_service.py +++ b/apps/api/src/services/drift_adopt_service.py @@ -59,6 +59,18 @@ class DriftAdoptService: self._repo = settings.GITEA_REPO_NAME self._k8s_dir = pathlib.Path("k8s") + async def adopt_drift(self, report_id: str) -> dict: + """ + 2026-04-20 ogt + Claude Opus 4.7: Telegram 按鈕呼叫入口 + 從 DB 載入 DriftReport 後委派給 adopt()。 + telegram_gateway._handle_drift_action 呼叫此方法。 + """ + from src.repositories.drift_repository import get_drift_repository + report = await get_drift_repository().get(report_id) + if not report: + return {"success": False, "message": f"Report {report_id} not found"} + return await self.adopt(report) + async def adopt(self, report: "DriftReport", field_description: str = "") -> dict: """ 將漂移寫回 Git:建立 branch + commit + PR diff --git a/apps/api/src/services/playbook_service.py b/apps/api/src/services/playbook_service.py index a4259bae..703b10fb 100644 --- a/apps/api/src/services/playbook_service.py +++ b/apps/api/src/services/playbook_service.py @@ -537,8 +537,22 @@ class PlaybookService: return None # 應用更新 + # 2026-04-20 ogt + Claude Opus 4.7: setattr 不觸發 Pydantic validation + # Evolver 傳入 PlaybookStatus.DEPRECATED.value(str "deprecated") + # → _pg_upsert playbook.status.value 炸:'str' has no attribute 'value' + # 修:Enum 欄位強制轉型,防止 str 混入 Playbook 物件 for field, value in update_data.items(): if value is not None and hasattr(playbook, field): + if field == "status" and isinstance(value, str) and not isinstance(value, PlaybookStatus): + try: + value = PlaybookStatus(value) + except ValueError: + pass + elif field == "source" and isinstance(value, str) and not isinstance(value, PlaybookSource): + try: + value = PlaybookSource(value) + except ValueError: + pass setattr(playbook, field, value) return await self._repository.update(playbook)