diff --git a/apps/api/src/jobs/ai_slo_watchdog_job.py b/apps/api/src/jobs/ai_slo_watchdog_job.py index 29d0f676..6d4b39d9 100644 --- a/apps/api/src/jobs/ai_slo_watchdog_job.py +++ b/apps/api/src/jobs/ai_slo_watchdog_job.py @@ -108,6 +108,7 @@ async def _check_once() -> None: # 修法:dedup 用穩定 violation_codes(W-N:type 格式),Telegram 照常顯示動態值 violations: list[str] = [] violation_codes: list[str] = [] + probable_causes: list[str] = [] # A3 修復:cluster-shared grace period,單次查詢供所有 W-check 使用,避免 Pod 間不一致 grace = await _is_grace_active() @@ -117,7 +118,10 @@ async def _check_once() -> None: report = await AiSloCalculator().calculate() if report.any_violated: violated = [m.name for m in report.metrics if m.violated] - violations.append(f"SLO 違反: {', '.join(violated)}") + w1_line, w1_cause = _format_slo_violation_for_alert(report, violated) + violations.append(w1_line) + if w1_cause: + probable_causes.append(w1_cause) violation_codes.append(f"W1:slo_violated:{','.join(sorted(violated))}") except Exception as e: logger.warning("watchdog_w1_slo_check_failed", error=str(e)) @@ -261,7 +265,9 @@ async def _check_once() -> None: *violation_lines, ] ) - probable_cause = "治理異常與執行資料同時異常,建議先核對 AI SLO 指標與最近自修復任務執行紀錄" + probable_cause = "\n".join(probable_causes) if probable_causes else ( + "治理異常與執行資料同時異常,建議先核對 AI SLO 指標與最近自修復任務執行紀錄" + ) # 發送 TYPE-8M Meta-System 告警 # 重大異常:超過 2 項即升為 critical,便於前線分流;1-2 項走 warning @@ -290,6 +296,77 @@ async def _check_once() -> None: logger.error("ai_slo_watchdog_telegram_failed", error=str(e), violations=violations) +def _format_slo_violation_for_alert(report, violated: list[str]) -> tuple[str, str | None]: + """把 W-1 診斷資料壓成 Telegram 可讀摘要,dedup key 仍沿用穩定 code。""" + if "auto_execute_success_rate" not in violated: + return f"SLO 違反: {', '.join(violated)}", None + + diagnostics = getattr(report, "diagnostics", {}) or {} + diag = diagnostics.get("auto_execute_success_rate") or {} + summary = diag.get("summary") or {} + total = int(summary.get("total") or 0) + success = int(summary.get("success") or 0) + rate = summary.get("rate") + threshold = summary.get("threshold") + sealed = int(diag.get("sealed_failure_group_count") or 0) + open_groups = int(diag.get("open_failure_group_count") or 0) + needed = int(diag.get("immediate_successes_needed") or 0) + projected = _short_taipei_time(diag.get("projected_green_at")) + + if isinstance(rate, (int, float)) and isinstance(threshold, (int, float)): + line = ( + f"SLO 違反: auto_execute_success_rate " + f"({success}/{total}={rate:.1%},門檻 {threshold:.0%};" + f"已封口群組 {sealed},待查群組 {open_groups}" + ) + if projected: + line += f";預估 {projected} 回綠" + elif needed: + line += f";需新增成功 {needed} 次" + line += ")" + else: + line = "SLO 違反: auto_execute_success_rate(診斷資料不足)" + + groups = diag.get("top_failure_groups") or [] + group_lines = [] + for group in groups[:3]: + label = group.get("closure_status") or "unknown" + group_lines.append( + f"{group.get('alertname', 'unknown')}/{group.get('playbook_id', 'unknown')}" + f"×{group.get('count', 0)}={label}" + ) + + cause_parts = [ + f"auto_execute_success_rate 仍在 7 日滾動窗內偏低:{success}/{total}" + if total else "auto_execute_success_rate 診斷資料不足", + ] + if group_lines: + cause_parts.append("Top failure groups: " + ";".join(group_lines)) + if sealed and not open_groups: + cause_parts.append("目前已知失敗來源已封口,狀態是等待舊失敗滾出 7 日視窗。") + if projected: + cause_parts.append(f"若沒有新失敗,預估 {projected} 自然回綠;不需要重啟服務或改寫歷史資料。") + elif needed: + cause_parts.append(f"若要立即回綠,需要新增 {needed} 次真實成功自動修復樣本。") + if open_groups: + cause_parts.append("仍有未封口失敗群組,請反查 truth-chain、PlayBook 與 MCP 執行紀錄。") + + return line, "\n".join(cause_parts) + + +def _short_taipei_time(value: str | None) -> str | None: + if not value: + return None + try: + parsed = datetime.fromisoformat(value) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=UTC) + taipei = parsed.astimezone(now_taipei().tzinfo) + return taipei.strftime("%m/%d %H:%M") + except Exception: + return None + + async def _count_pending_no_tg_sent() -> int: """ 查詢真正靜默的 PENDING 告警:PENDING 超過 30 分鐘且 telegram_message_id IS NULL。 diff --git a/apps/api/src/services/ai_slo_calculator.py b/apps/api/src/services/ai_slo_calculator.py index aab4c5c6..b8ec06c4 100644 --- a/apps/api/src/services/ai_slo_calculator.py +++ b/apps/api/src/services/ai_slo_calculator.py @@ -23,7 +23,9 @@ from __future__ import annotations import json from dataclasses import dataclass, field -from datetime import timedelta +from datetime import datetime, timedelta +from math import ceil +from typing import Any import structlog from sqlalchemy import func, select, text @@ -81,6 +83,7 @@ class SloReport: any_violated: bool = False calculated_at: str = field(default_factory=lambda: now_taipei().isoformat()) window_days: int = SLO_WINDOW_DAYS + diagnostics: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict: return { @@ -99,6 +102,7 @@ class SloReport: } for m in self.metrics ], + "diagnostics": self.diagnostics, } @@ -131,6 +135,11 @@ class AiSloCalculator: slo1 = await self._calc_auto_success_rate(session, since) slo2 = await self._calc_human_override_rate(session, since) slo3 = await self._calc_false_neg_rate(session, since) + diagnostics = {} + if slo1.violated: + diagnostics["auto_execute_success_rate"] = ( + await self._build_auto_success_diagnostics(session, since) + ) metrics = [slo1, slo2, slo3] any_violated = any(m.violated for m in metrics) @@ -138,6 +147,7 @@ class AiSloCalculator: report = SloReport( metrics=metrics, any_violated=any_violated, + diagnostics=diagnostics, ) logger.info( @@ -189,6 +199,7 @@ class AiSloCalculator: any_violated=data.get("any_violated", False), calculated_at=data.get("calculated_at", ""), window_days=data.get("window_days", SLO_WINDOW_DAYS), + diagnostics=data.get("diagnostics", {}), ) except Exception as e: logger.warning("slo_cache_read_error", error=str(e)) @@ -403,6 +414,264 @@ class AiSloCalculator: direction="below", sample_count=0, violated=False, ) + async def _build_auto_success_diagnostics(self, session, since) -> dict[str, Any]: + """建立 W-1 auto_execute_success_rate 的可解釋診斷資料。""" + try: + result = await session.execute( + text(""" + SELECT + are.incident_id, + are.playbook_id, + are.playbook_name, + are.success, + are.error_message, + are.created_at, + COALESCE( + inc.signals->0->>'alertname', + inc.signals->0->'labels'->>'alertname', + inc.signals->0->>'alert_name', + inc.affected_services->>0, + 'unknown' + ) AS alertname + FROM auto_repair_executions are + LEFT JOIN incidents inc ON inc.incident_id = are.incident_id + WHERE are.created_at >= :since + ORDER BY are.created_at ASC + """), + {"since": since}, + ) + rows = [dict(row._mapping) for row in result] + return build_auto_execute_success_diagnostics( + rows=rows, + now=now_taipei(), + threshold=SLO_AUTO_SUCCESS_MIN, + window_days=SLO_WINDOW_DAYS, + min_samples=SLO_MIN_SAMPLES, + ) + except Exception as e: + logger.warning("slo1_diagnostics_error", error=str(e)) + return { + "schema_version": "ai_slo_auto_execute_diagnostics_v1", + "status": "diagnostics_unavailable", + "error": str(e)[:200], + } + + +def build_auto_execute_success_diagnostics( + rows: list[dict[str, Any]], + now: datetime, + threshold: float = SLO_AUTO_SUCCESS_MIN, + window_days: int = SLO_WINDOW_DAYS, + min_samples: int = SLO_MIN_SAMPLES, +) -> dict[str, Any]: + """ + 從 auto_repair_executions rows 建立前端/Telegram 可讀的 W-1 診斷。 + + 此函式保持純邏輯,讓 watchdog 與 API 可以共用同一份語義,也方便 + 單元測試鎖住 rolling-window 回綠推估。 + """ + sorted_rows = sorted(rows, key=lambda r: r.get("created_at") or now) + total = len(sorted_rows) + success = sum(1 for row in sorted_rows if bool(row.get("success"))) + failed = total - success + rate = (success / total) if total else None + failures = [row for row in sorted_rows if not bool(row.get("success"))] + failure_groups = _build_failure_groups(failures) + sealed_groups = [ + group for group in failure_groups + if str(group.get("closure_status", "")).startswith("sealed_") + ] + open_groups = [ + group for group in failure_groups + if not str(group.get("closure_status", "")).startswith("sealed_") + ] + projected_green_at, projection_reason = _project_auto_success_green_at( + rows=sorted_rows, + now=now, + threshold=threshold, + window_days=window_days, + min_samples=min_samples, + ) + + if failed == 0: + status = "green" + elif open_groups: + status = "needs_investigation" + elif sealed_groups: + status = "sealed_waiting_window" + else: + status = "insufficient_diagnostics" + + return { + "schema_version": "ai_slo_auto_execute_diagnostics_v1", + "status": status, + "summary": { + "total": total, + "success": success, + "failed": failed, + "rate": rate, + "threshold": threshold, + "window_days": window_days, + "min_samples": min_samples, + }, + "top_failure_groups": failure_groups[:5], + "sealed_failure_group_count": len(sealed_groups), + "open_failure_group_count": len(open_groups), + "immediate_successes_needed": _successes_needed_now(success, total, threshold), + "projected_green_at": projected_green_at.isoformat() if projected_green_at else None, + "projection_reason": projection_reason, + "next_action": _auto_execute_diagnostics_next_action(status), + } + + +def _build_failure_groups(failures: list[dict[str, Any]]) -> list[dict[str, Any]]: + groups: dict[tuple[str, str, str, str], dict[str, Any]] = {} + for row in failures: + alertname = str(row.get("alertname") or "unknown") + playbook_id = str(row.get("playbook_id") or "unknown") + playbook_name = str(row.get("playbook_name") or "unknown") + error_signature = _auto_repair_error_signature(row.get("error_message")) + key = (alertname, playbook_id, playbook_name, error_signature) + group = groups.setdefault( + key, + { + "alertname": alertname, + "playbook_id": playbook_id, + "playbook_name": playbook_name, + "error_signature": error_signature, + "count": 0, + "first_seen": None, + "last_seen": None, + "example_incident_id": row.get("incident_id"), + }, + ) + group["count"] += 1 + created_at = row.get("created_at") + if isinstance(created_at, datetime): + if group["first_seen"] is None or created_at < group["first_seen"]: + group["first_seen"] = created_at + if group["last_seen"] is None or created_at > group["last_seen"]: + group["last_seen"] = created_at + + enriched = [] + for group in groups.values(): + closure = _classify_auto_repair_failure_closure(group) + enriched.append({ + **group, + "first_seen": group["first_seen"].isoformat() if group["first_seen"] else None, + "last_seen": group["last_seen"].isoformat() if group["last_seen"] else None, + **closure, + }) + + return sorted(enriched, key=lambda item: item["count"], reverse=True) + + +def _auto_repair_error_signature(error_message: Any) -> str: + error = str(error_message or "").strip().lower() + if not error: + return "missing_error_message" + if "unsupported scheme" in error and "docker restart" in error: + return "legacy_ssh_docker_restart" + if "nodes" in error and "not found" in error: + return "k3s_node_target_not_found" + if "http error" in error: + return "http_error" + if "timeout" in error: + return "timeout" + compact = " ".join(error.split()) + return compact[:120] or "unknown_error" + + +def _classify_auto_repair_failure_closure(group: dict[str, Any]) -> dict[str, str]: + signature = str(group.get("error_signature") or "") + alertname = str(group.get("alertname") or "") + playbook_name = str(group.get("playbook_name") or "") + text = f"{alertname} {playbook_name}".lower() + + if signature == "legacy_ssh_docker_restart": + return { + "closure_status": "sealed_by_mcp_grant", + "closure_label": "已封口:Docker restart 已改走 ssh_docker_restart/write MCP grant", + "recommended_action": "觀察後續 DockerContainerUnhealthy 執行,不回填舊歷史", + } + + if signature == "k3s_node_target_not_found" and ( + "stock" in text or "wooo.work" in text or "external" in text + ): + return { + "closure_status": "sealed_by_external_site_guard", + "closure_label": "已封口:外部站台告警已阻擋 K3s node PlayBook 誤配", + "recommended_action": "觀察 StockWoooWorkDown 是否改走 external_site_down / NO_ACTION", + } + + return { + "closure_status": "open_failure_source", + "closure_label": "待調查:尚未匹配到已封口修復來源", + "recommended_action": "反查 incident truth-chain、PlayBook、MCP 執行紀錄", + } + + +def _successes_needed_now(success: int, total: int, threshold: float) -> int: + if total <= 0 or threshold >= 1: + return 0 + gap = (threshold * total) - success + if gap <= 0: + return 0 + return max(0, ceil(gap / (1 - threshold))) + + +def _project_auto_success_green_at( + rows: list[dict[str, Any]], + now: datetime, + threshold: float, + window_days: int, + min_samples: int, +) -> tuple[datetime | None, str | None]: + window = timedelta(days=window_days) + current_rows = [ + row for row in rows + if isinstance(row.get("created_at"), datetime) + and row["created_at"] >= now - window + ] + current_total = len(current_rows) + current_success = sum(1 for row in current_rows if bool(row.get("success"))) + + if current_total < min_samples: + return now, "sample_window_below_min" + if current_success / current_total >= threshold: + return now, "already_green" + + candidates = sorted({ + row["created_at"] + window + timedelta(seconds=1) + for row in current_rows + if row["created_at"] + window > now + }) + for checkpoint in candidates: + active_rows = [ + row for row in rows + if isinstance(row.get("created_at"), datetime) + and row["created_at"] >= checkpoint - window + and row["created_at"] <= checkpoint + ] + active_total = len(active_rows) + active_success = sum(1 for row in active_rows if bool(row.get("success"))) + if active_total < min_samples: + return checkpoint, "sample_window_below_min" + if active_success / active_total >= threshold: + return checkpoint, "rolling_window_if_no_new_failures" + + return None, "no_projection_available" + + +def _auto_execute_diagnostics_next_action(status: str) -> str: + if status == "green": + return "keep_monitoring" + if status == "sealed_waiting_window": + return "observe_rolling_window_no_manual_restart" + if status == "needs_investigation": + return "investigate_open_failure_groups" + return "refresh_truth_chain_and_execution_logs" + # ───────────────────────────────────────────────────────────────────────────── # Singleton diff --git a/apps/api/tests/test_ai_slo_calculator.py b/apps/api/tests/test_ai_slo_calculator.py new file mode 100644 index 00000000..f62cf83b --- /dev/null +++ b/apps/api/tests/test_ai_slo_calculator.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +from src.jobs.ai_slo_watchdog_job import _format_slo_violation_for_alert +from src.services.ai_slo_calculator import ( + SLO_AUTO_SUCCESS_MIN, + SloMetric, + SloReport, + build_auto_execute_success_diagnostics, +) + + +TZ = ZoneInfo("Asia/Taipei") + + +def _row( + *, + created_at: datetime, + success: bool, + incident_id: str, + playbook_id: str = "PB-OK", + playbook_name: str = "成功修復 PlayBook", + alertname: str = "SyntheticAutoRepair", + error_message: str | None = None, +) -> dict: + return { + "created_at": created_at, + "success": success, + "incident_id": incident_id, + "playbook_id": playbook_id, + "playbook_name": playbook_name, + "alertname": alertname, + "error_message": error_message, + } + + +def test_auto_execute_diagnostics_marks_known_failures_as_sealed_and_projects_green(): + now = datetime(2026, 6, 1, 18, 0, tzinfo=TZ) + rows = [ + _row( + created_at=now - timedelta(days=2, minutes=i), + success=True, + incident_id=f"INC-SUCCESS-{i:02d}", + ) + for i in range(45) + ] + rows.extend( + _row( + created_at=now - timedelta(days=6, minutes=i), + success=False, + incident_id=f"INC-DOCKER-{i}", + playbook_id="PB-20260420-3F9C4C", + playbook_name="DockerContainerUnhealthy 修復", + alertname="DockerContainerUnhealthy", + error_message="Unsupported scheme: ssh {host} docker inspect minio && docker restart minio", + ) + for i in range(5) + ) + rows.extend( + _row( + created_at=now - timedelta(days=5, minutes=i), + success=False, + incident_id=f"INC-STOCK-{i}", + playbook_id="PB-20260416-79EB94", + playbook_name="K3s 節點下線修復", + alertname="StockWoooWorkDown", + error_message='nodes "stock-platform" not found', + ) + for i in range(4) + ) + + diagnostics = build_auto_execute_success_diagnostics(rows, now) + + assert diagnostics["status"] == "sealed_waiting_window" + assert diagnostics["summary"]["total"] == 54 + assert diagnostics["summary"]["success"] == 45 + assert diagnostics["summary"]["rate"] == 45 / 54 + assert diagnostics["sealed_failure_group_count"] == 2 + assert diagnostics["open_failure_group_count"] == 0 + assert diagnostics["immediate_successes_needed"] == 6 + assert diagnostics["projection_reason"] == "rolling_window_if_no_new_failures" + assert diagnostics["projected_green_at"].startswith("2026-06-02T17:57") + statuses = {group["closure_status"] for group in diagnostics["top_failure_groups"]} + assert "sealed_by_mcp_grant" in statuses + assert "sealed_by_external_site_guard" in statuses + + +def test_auto_execute_diagnostics_keeps_unknown_failures_open(): + now = datetime(2026, 6, 1, 18, 0, tzinfo=TZ) + rows = [ + _row( + created_at=now - timedelta(hours=i), + success=False, + incident_id=f"INC-UNKNOWN-{i}", + playbook_id="PB-UNKNOWN", + playbook_name="未知修復", + alertname="UnknownRepair", + error_message="timeout waiting for executor", + ) + for i in range(5) + ] + + diagnostics = build_auto_execute_success_diagnostics(rows, now) + + assert diagnostics["status"] == "needs_investigation" + assert diagnostics["sealed_failure_group_count"] == 0 + assert diagnostics["open_failure_group_count"] == 1 + assert diagnostics["next_action"] == "investigate_open_failure_groups" + + +def test_slo_report_to_dict_includes_diagnostics(): + report = SloReport( + metrics=[ + SloMetric( + name="auto_execute_success_rate", + value=0.8, + threshold=SLO_AUTO_SUCCESS_MIN, + direction="above", + sample_count=10, + violated=True, + ) + ], + any_violated=True, + diagnostics={"auto_execute_success_rate": {"status": "sealed_waiting_window"}}, + ) + + assert report.to_dict()["diagnostics"]["auto_execute_success_rate"]["status"] == "sealed_waiting_window" + + +def test_watchdog_formats_auto_execute_diagnostics_for_meta_alert(): + projected = datetime(2026, 6, 3, 23, 7, tzinfo=TZ).isoformat() + report = SloReport( + metrics=[ + SloMetric( + name="auto_execute_success_rate", + value=45 / 54, + threshold=SLO_AUTO_SUCCESS_MIN, + direction="above", + sample_count=54, + violated=True, + ) + ], + any_violated=True, + diagnostics={ + "auto_execute_success_rate": { + "summary": { + "total": 54, + "success": 45, + "failed": 9, + "rate": 45 / 54, + "threshold": SLO_AUTO_SUCCESS_MIN, + }, + "sealed_failure_group_count": 2, + "open_failure_group_count": 0, + "immediate_successes_needed": 6, + "projected_green_at": projected, + "top_failure_groups": [ + { + "alertname": "DockerContainerUnhealthy", + "playbook_id": "PB-20260420-3F9C4C", + "count": 5, + "closure_status": "sealed_by_mcp_grant", + } + ], + } + }, + ) + + line, cause = _format_slo_violation_for_alert(report, ["auto_execute_success_rate"]) + + assert "45/54=83.3%" in line + assert "已封口群組 2" in line + assert "06/03 23:07" in line + assert cause is not None + assert "Top failure groups" in cause + assert "不需要重啟服務或改寫歷史資料" in cause