fix(aiops): ADR-092 三修 — Playbook enum崩潰 + Telegram永久靜默 + 採納失敗 + AI自健診
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 13m33s
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 13m33s
B1 playbook_service.py: evolver setattr傳str而非PlaybookStatus enum
→ _pg_upsert playbook.status.value炸(163次/48h),修:update_with_validation強制enum轉型
B2 approval_db.py + webhooks.py: find_by_fingerprint PENDING誤收斂
→ PENDING≠Telegram已發;修:成功push後mark tg_sent:{fingerprint} Redis(24h TTL)
→ find_by_fingerprint debounce窗外PENDING必須Redis確認才收斂
drift_adopt_service.py: telegram_gateway呼叫adopt_drift(report_id)但方法不存在
→ 新增adopt_drift()包裝:從DB載入DriftReport後委派adopt(),修復採納失敗
B3 ai_slo_watchdog_job.py + main.py: AI無法感知自身故障(MASTER §1.1盲區)
→ 新增每15分鐘自健診:W-1 SLO違反 W-2 TG靜默偵測 W-3 飛輪成功率
→ 任一異常→TYPE-8M send_meta_alert;Redis去重1h
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -312,6 +312,8 @@ async def _push_to_telegram_background(
|
||||
diff_summary: str = "",
|
||||
# 2026-04-12 ogt: ADR-075 斷點 E 修復 — alert_category 傳入以啟用 TYPE-8M 路由
|
||||
alert_category: str = "",
|
||||
# 2026-04-20 ogt: ADR-092 tg_sent Redis 標記(防收斂靜默)
|
||||
fingerprint: str = "",
|
||||
) -> None:
|
||||
"""
|
||||
背景任務: 推送待簽核卡片到 Telegram (v7.0 含 SignOz 整合)
|
||||
@@ -347,6 +349,8 @@ async def _push_to_telegram_background(
|
||||
approval_id=approval_id,
|
||||
incident_id=incident_id,
|
||||
)
|
||||
if fingerprint:
|
||||
await get_approval_service().mark_telegram_confirmed(fingerprint)
|
||||
return
|
||||
|
||||
# 2026-04-12 ogt: ADR-075 斷點 E 修復 — TYPE-8M Meta-System 使用專屬卡片
|
||||
@@ -367,6 +371,8 @@ async def _push_to_telegram_background(
|
||||
incident_id=incident_id,
|
||||
alert_category=alert_category,
|
||||
)
|
||||
if fingerprint:
|
||||
await get_approval_service().mark_telegram_confirmed(fingerprint)
|
||||
return
|
||||
|
||||
# 如果是收斂告警,在訊息中加入聚合次數
|
||||
@@ -414,6 +420,8 @@ async def _push_to_telegram_background(
|
||||
ai_tokens=ai_tokens,
|
||||
ai_cost=f"${ai_cost:.6f}",
|
||||
)
|
||||
if fingerprint:
|
||||
await get_approval_service().mark_telegram_confirmed(fingerprint)
|
||||
|
||||
# 2026-04-08 Claude Code: 記錄 Telegram 推送事件
|
||||
try:
|
||||
@@ -989,6 +997,7 @@ async def receive_alert(
|
||||
"disk_full": "storage",
|
||||
"ssl_expiry": "ssl_cert",
|
||||
}.get(alert.alert_type, "general"),
|
||||
fingerprint=fingerprint,
|
||||
)
|
||||
|
||||
return AlertResponse(
|
||||
@@ -1204,6 +1213,7 @@ async def _process_new_alert_background(
|
||||
incident_id=incident_id,
|
||||
notification_type=notification_type,
|
||||
alert_category=alert_category,
|
||||
fingerprint=fingerprint,
|
||||
)
|
||||
|
||||
record_alert_chain_success("alertmanager")
|
||||
@@ -1257,6 +1267,7 @@ async def _process_new_alert_background(
|
||||
incident_id=fallback_incident_id,
|
||||
notification_type=notification_type,
|
||||
alert_category=alert_category,
|
||||
fingerprint=fingerprint,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
148
apps/api/src/jobs/ai_slo_watchdog_job.py
Normal file
148
apps/api/src/jobs/ai_slo_watchdog_job.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""
|
||||
AI SLO Watchdog Job — 系統自健診(每 15 分鐘)
|
||||
=============================================
|
||||
MASTER §1.1 AI 自主化方向:系統必須能感知自身故障。
|
||||
ADR-092 (2026-04-20 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
|
||||
檢查項目:
|
||||
W-1 AI SLO 違反(決策品質,7d 滾動)
|
||||
W-2 Telegram 靜默偵測(PENDING 告警無 tg_sent 確認超過 30 分鐘)
|
||||
W-3 飛輪 execution_success_rate 低落(< 30%)
|
||||
|
||||
任一異常 → send_meta_alert(TYPE-8M,flywheel_health)
|
||||
去重:Redis watchdog:alert:{dedup_hash} TTL 1h,避免每 15 分鐘重複洗版
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import and_, select
|
||||
|
||||
from src.core.redis_client import get_redis
|
||||
from src.db.base import get_db_context
|
||||
from src.db.models import ApprovalRecord
|
||||
from src.models.approval import ApprovalStatus
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_INTERVAL_SEC = 900 # 每 15 分鐘
|
||||
_DEDUP_TTL_SEC = 3600 # 同一告警 1 小時內不重複發送
|
||||
_TG_SILENCE_THRESHOLD = 2 # PENDING 無 tg_sent 確認數量告警門檻
|
||||
_FLYWHEEL_SUCCESS_MIN = 0.30 # 執行成功率下限
|
||||
|
||||
|
||||
async def run_ai_slo_watchdog_loop() -> None:
|
||||
"""
|
||||
永久迴圈:每 15 分鐘自健診,異常時發送 TYPE-8M Meta-System 告警。
|
||||
由 main.py lifespan 透過 asyncio.create_task() 啟動。
|
||||
"""
|
||||
logger.info("ai_slo_watchdog_started", interval_sec=_INTERVAL_SEC)
|
||||
while True:
|
||||
try:
|
||||
await _check_once()
|
||||
except Exception as e:
|
||||
logger.warning("ai_slo_watchdog_error", error=str(e))
|
||||
await asyncio.sleep(_INTERVAL_SEC)
|
||||
|
||||
|
||||
async def _check_once() -> None:
|
||||
violations: list[str] = []
|
||||
|
||||
# W-1: AI SLO 違反(決策品質 7d 滾動)
|
||||
try:
|
||||
from src.services.ai_slo_calculator import AiSloCalculator
|
||||
report = await AiSloCalculator().calculate()
|
||||
if report.any_violated:
|
||||
violated = [m.name for m in report.metrics if m.violated]
|
||||
violations.append(f"SLO 違反: {', '.join(violated)}")
|
||||
except Exception as e:
|
||||
logger.warning("watchdog_w1_slo_check_failed", error=str(e))
|
||||
|
||||
# W-2: Telegram 靜默偵測(PENDING 無 tg_sent 確認 > 30 分鐘)
|
||||
try:
|
||||
silent_count = await _count_pending_no_tg_sent()
|
||||
if silent_count >= _TG_SILENCE_THRESHOLD:
|
||||
violations.append(f"{silent_count} 個 PENDING 告警超 30 分鐘無 Telegram 確認(疑似靜默故障)")
|
||||
except Exception as e:
|
||||
logger.warning("watchdog_w2_tg_silence_check_failed", error=str(e))
|
||||
|
||||
# W-3: 飛輪執行成功率過低
|
||||
try:
|
||||
from src.services.flywheel_stats_service import FlywheelStatsService
|
||||
metrics = await FlywheelStatsService().compute()
|
||||
if metrics and metrics.execution_success_rate < _FLYWHEEL_SUCCESS_MIN:
|
||||
violations.append(f"飛輪執行成功率 {metrics.execution_success_rate:.1%} < {_FLYWHEEL_SUCCESS_MIN:.0%}")
|
||||
except Exception as e:
|
||||
logger.warning("watchdog_w3_flywheel_check_failed", error=str(e))
|
||||
|
||||
if not violations:
|
||||
logger.debug("ai_slo_watchdog_all_ok", checks=3)
|
||||
return
|
||||
|
||||
# 去重:violations 相同內容 1 小時內不重複發
|
||||
dedup_hash = f"{hash(tuple(sorted(violations))) & 0xFFFFFF:06x}"
|
||||
dedup_key = f"watchdog:alert:{dedup_hash}"
|
||||
redis = get_redis()
|
||||
if await redis.exists(dedup_key):
|
||||
logger.debug("ai_slo_watchdog_deduped", key=dedup_key)
|
||||
return
|
||||
await redis.setex(dedup_key, _DEDUP_TTL_SEC, "1")
|
||||
|
||||
# 發送 TYPE-8M Meta-System 告警
|
||||
diagnosis = " | ".join(violations)
|
||||
incident_id = f"META-{now_taipei().strftime('%Y%m%d%H%M%S')}"
|
||||
try:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
await get_telegram_gateway().send_meta_alert(
|
||||
incident_id=incident_id,
|
||||
approval_id=str(uuid.uuid4()),
|
||||
alertname="AI 自健診異常",
|
||||
alert_category="flywheel_health",
|
||||
diagnosis=diagnosis,
|
||||
severity_level="critical",
|
||||
system_impact=f"{len(violations)} 項 KPI 異常,飛輪自動化能力可能降級",
|
||||
)
|
||||
logger.warning(
|
||||
"ai_slo_watchdog_alert_sent",
|
||||
incident_id=incident_id,
|
||||
violation_count=len(violations),
|
||||
violations=violations,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("ai_slo_watchdog_telegram_failed", error=str(e), violations=violations)
|
||||
|
||||
|
||||
async def _count_pending_no_tg_sent() -> int:
|
||||
"""
|
||||
查詢 PENDING 超過 30 分鐘且 Redis tg_sent:{fingerprint} 無確認的告警數量。
|
||||
與 ADR-092 B2 修復配合:B2 修復後新告警會標記 tg_sent;
|
||||
此查詢偵測仍存在的靜默告警(B2 修復前殘留 + 未來潛在故障)。
|
||||
"""
|
||||
cutoff = datetime.now(UTC) - timedelta(minutes=30)
|
||||
redis = get_redis()
|
||||
silent = 0
|
||||
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
select(ApprovalRecord.id, ApprovalRecord.fingerprint)
|
||||
.where(
|
||||
and_(
|
||||
ApprovalRecord.status == ApprovalStatus.PENDING,
|
||||
ApprovalRecord.created_at <= cutoff,
|
||||
ApprovalRecord.fingerprint.isnot(None),
|
||||
)
|
||||
)
|
||||
.limit(20)
|
||||
)
|
||||
rows = result.all()
|
||||
|
||||
for row in rows:
|
||||
fp = row.fingerprint
|
||||
if fp and not await redis.exists(f"tg_sent:{fp}"):
|
||||
silent += 1
|
||||
|
||||
return silent
|
||||
@@ -536,6 +536,15 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("offline_replay_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-092 (2026-04-20 ogt + Claude Opus 4.7): AI SLO Watchdog — 每 15 分鐘自健診
|
||||
# MASTER §1.1:系統必須能感知自身故障,W-1 SLO + W-2 Telegram靜默 + W-3 飛輪成功率
|
||||
try:
|
||||
from src.jobs.ai_slo_watchdog_job import run_ai_slo_watchdog_loop
|
||||
asyncio.create_task(run_ai_slo_watchdog_loop())
|
||||
logger.info("ai_slo_watchdog_scheduled", interval_sec=900)
|
||||
except Exception as e:
|
||||
logger.warning("ai_slo_watchdog_schedule_failed", error=str(e))
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown
|
||||
|
||||
@@ -20,6 +20,7 @@ from uuid import UUID
|
||||
import structlog
|
||||
from sqlalchemy import and_, or_, select, update
|
||||
|
||||
from src.core.redis_client import get_redis
|
||||
from src.core.trust_engine import classify_risk, get_required_signatures
|
||||
from src.db.base import get_db_context
|
||||
from src.db.models import ApprovalRecord, TimelineEvent
|
||||
@@ -330,10 +331,45 @@ class ApprovalDBService:
|
||||
hit_count=record.hit_count,
|
||||
status=record.status.value if hasattr(record.status, 'value') else record.status,
|
||||
)
|
||||
|
||||
# 2026-04-20 ogt + Claude Opus 4.7: ADR-092 tg_sent Redis 驗證
|
||||
# PENDING 記錄不代表 Telegram 已發送(可能因網路/Token錯誤而靜默失敗)
|
||||
# 僅在 debounce 窗口外的 PENDING 收斂時,必須確認 Redis 有 tg_sent 標記
|
||||
within_debounce = record.created_at >= cutoff_time
|
||||
if not within_debounce:
|
||||
try:
|
||||
r = get_redis()
|
||||
tg_confirmed = await r.exists(f"tg_sent:{fingerprint}")
|
||||
except Exception as _re:
|
||||
tg_confirmed = False
|
||||
logger.warning("tg_sent_redis_check_failed", fingerprint=fingerprint, error=str(_re))
|
||||
|
||||
if not tg_confirmed:
|
||||
logger.warning(
|
||||
"fingerprint_pending_no_tg_confirmation",
|
||||
fingerprint=fingerprint,
|
||||
approval_id=str(record.id),
|
||||
created_at=record.created_at.isoformat(),
|
||||
)
|
||||
return None # 視為新告警,重新發送 Telegram
|
||||
|
||||
return approval_record_to_request(record)
|
||||
|
||||
return None
|
||||
|
||||
async def mark_telegram_confirmed(self, fingerprint: str, ttl: int = 86400) -> None:
|
||||
"""
|
||||
2026-04-20 ogt + Claude Opus 4.7: ADR-092
|
||||
記錄 Telegram 已成功發送,防止 PENDING 誤收斂造成永久靜默。
|
||||
TTL 與 PENDING_TTL_HOURS 對齊(24h)。
|
||||
"""
|
||||
try:
|
||||
r = get_redis()
|
||||
await r.setex(f"tg_sent:{fingerprint}", ttl, "1")
|
||||
logger.debug("tg_sent_marked", fingerprint=fingerprint, ttl=ttl)
|
||||
except Exception as e:
|
||||
logger.warning("tg_sent_mark_failed", fingerprint=fingerprint, error=str(e))
|
||||
|
||||
async def increment_hit_count(
|
||||
self,
|
||||
approval_id: UUID,
|
||||
|
||||
@@ -59,6 +59,18 @@ class DriftAdoptService:
|
||||
self._repo = settings.GITEA_REPO_NAME
|
||||
self._k8s_dir = pathlib.Path("k8s")
|
||||
|
||||
async def adopt_drift(self, report_id: str) -> dict:
|
||||
"""
|
||||
2026-04-20 ogt + Claude Opus 4.7: Telegram 按鈕呼叫入口
|
||||
從 DB 載入 DriftReport 後委派給 adopt()。
|
||||
telegram_gateway._handle_drift_action 呼叫此方法。
|
||||
"""
|
||||
from src.repositories.drift_repository import get_drift_repository
|
||||
report = await get_drift_repository().get(report_id)
|
||||
if not report:
|
||||
return {"success": False, "message": f"Report {report_id} not found"}
|
||||
return await self.adopt(report)
|
||||
|
||||
async def adopt(self, report: "DriftReport", field_description: str = "") -> dict:
|
||||
"""
|
||||
將漂移寫回 Git:建立 branch + commit + PR
|
||||
|
||||
@@ -537,8 +537,22 @@ class PlaybookService:
|
||||
return None
|
||||
|
||||
# 應用更新
|
||||
# 2026-04-20 ogt + Claude Opus 4.7: setattr 不觸發 Pydantic validation
|
||||
# Evolver 傳入 PlaybookStatus.DEPRECATED.value(str "deprecated")
|
||||
# → _pg_upsert playbook.status.value 炸:'str' has no attribute 'value'
|
||||
# 修:Enum 欄位強制轉型,防止 str 混入 Playbook 物件
|
||||
for field, value in update_data.items():
|
||||
if value is not None and hasattr(playbook, field):
|
||||
if field == "status" and isinstance(value, str) and not isinstance(value, PlaybookStatus):
|
||||
try:
|
||||
value = PlaybookStatus(value)
|
||||
except ValueError:
|
||||
pass
|
||||
elif field == "source" and isinstance(value, str) and not isinstance(value, PlaybookSource):
|
||||
try:
|
||||
value = PlaybookSource(value)
|
||||
except ValueError:
|
||||
pass
|
||||
setattr(playbook, field, value)
|
||||
|
||||
return await self._repository.update(playbook)
|
||||
|
||||
Reference in New Issue
Block a user