fix(api): explain auto execute slo degradation
All checks were successful
CD Pipeline / tests (push) Successful in 1m20s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / build-and-deploy (push) Successful in 7m32s
CD Pipeline / post-deploy-checks (push) Successful in 1m46s

This commit is contained in:
Your Name
2026-06-01 17:45:08 +08:00
parent d25927d854
commit d610c7386e
3 changed files with 527 additions and 3 deletions

View File

@@ -108,6 +108,7 @@ async def _check_once() -> None:
# 修法dedup 用穩定 violation_codesW-N:type 格式Telegram 照常顯示動態值
violations: list[str] = []
violation_codes: list[str] = []
probable_causes: list[str] = []
# A3 修復cluster-shared grace period單次查詢供所有 W-check 使用,避免 Pod 間不一致
grace = await _is_grace_active()
@@ -117,7 +118,10 @@ async def _check_once() -> None:
report = await AiSloCalculator().calculate()
if report.any_violated:
violated = [m.name for m in report.metrics if m.violated]
violations.append(f"SLO 違反: {', '.join(violated)}")
w1_line, w1_cause = _format_slo_violation_for_alert(report, violated)
violations.append(w1_line)
if w1_cause:
probable_causes.append(w1_cause)
violation_codes.append(f"W1:slo_violated:{','.join(sorted(violated))}")
except Exception as e:
logger.warning("watchdog_w1_slo_check_failed", error=str(e))
@@ -261,7 +265,9 @@ async def _check_once() -> None:
*violation_lines,
]
)
probable_cause = "治理異常與執行資料同時異常,建議先核對 AI SLO 指標與最近自修復任務執行紀錄"
probable_cause = "\n".join(probable_causes) if probable_causes else (
"治理異常與執行資料同時異常,建議先核對 AI SLO 指標與最近自修復任務執行紀錄"
)
# 發送 TYPE-8M Meta-System 告警
# 重大異常:超過 2 項即升為 critical便於前線分流1-2 項走 warning
@@ -290,6 +296,77 @@ async def _check_once() -> None:
logger.error("ai_slo_watchdog_telegram_failed", error=str(e), violations=violations)
def _format_slo_violation_for_alert(report, violated: list[str]) -> tuple[str, str | None]:
"""把 W-1 診斷資料壓成 Telegram 可讀摘要dedup key 仍沿用穩定 code。"""
if "auto_execute_success_rate" not in violated:
return f"SLO 違反: {', '.join(violated)}", None
diagnostics = getattr(report, "diagnostics", {}) or {}
diag = diagnostics.get("auto_execute_success_rate") or {}
summary = diag.get("summary") or {}
total = int(summary.get("total") or 0)
success = int(summary.get("success") or 0)
rate = summary.get("rate")
threshold = summary.get("threshold")
sealed = int(diag.get("sealed_failure_group_count") or 0)
open_groups = int(diag.get("open_failure_group_count") or 0)
needed = int(diag.get("immediate_successes_needed") or 0)
projected = _short_taipei_time(diag.get("projected_green_at"))
if isinstance(rate, (int, float)) and isinstance(threshold, (int, float)):
line = (
f"SLO 違反: auto_execute_success_rate "
f"({success}/{total}={rate:.1%},門檻 {threshold:.0%}"
f"已封口群組 {sealed},待查群組 {open_groups}"
)
if projected:
line += f";預估 {projected} 回綠"
elif needed:
line += f";需新增成功 {needed}"
line += ")"
else:
line = "SLO 違反: auto_execute_success_rate診斷資料不足"
groups = diag.get("top_failure_groups") or []
group_lines = []
for group in groups[:3]:
label = group.get("closure_status") or "unknown"
group_lines.append(
f"{group.get('alertname', 'unknown')}/{group.get('playbook_id', 'unknown')}"
f"×{group.get('count', 0)}={label}"
)
cause_parts = [
f"auto_execute_success_rate 仍在 7 日滾動窗內偏低:{success}/{total}"
if total else "auto_execute_success_rate 診斷資料不足",
]
if group_lines:
cause_parts.append("Top failure groups: " + "".join(group_lines))
if sealed and not open_groups:
cause_parts.append("目前已知失敗來源已封口,狀態是等待舊失敗滾出 7 日視窗。")
if projected:
cause_parts.append(f"若沒有新失敗,預估 {projected} 自然回綠;不需要重啟服務或改寫歷史資料。")
elif needed:
cause_parts.append(f"若要立即回綠,需要新增 {needed} 次真實成功自動修復樣本。")
if open_groups:
cause_parts.append("仍有未封口失敗群組,請反查 truth-chain、PlayBook 與 MCP 執行紀錄。")
return line, "\n".join(cause_parts)
def _short_taipei_time(value: str | None) -> str | None:
if not value:
return None
try:
parsed = datetime.fromisoformat(value)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
taipei = parsed.astimezone(now_taipei().tzinfo)
return taipei.strftime("%m/%d %H:%M")
except Exception:
return None
async def _count_pending_no_tg_sent() -> int:
"""
查詢真正靜默的 PENDING 告警PENDING 超過 30 分鐘且 telegram_message_id IS NULL。

View File

@@ -23,7 +23,9 @@ from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import timedelta
from datetime import datetime, timedelta
from math import ceil
from typing import Any
import structlog
from sqlalchemy import func, select, text
@@ -81,6 +83,7 @@ class SloReport:
any_violated: bool = False
calculated_at: str = field(default_factory=lambda: now_taipei().isoformat())
window_days: int = SLO_WINDOW_DAYS
diagnostics: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
@@ -99,6 +102,7 @@ class SloReport:
}
for m in self.metrics
],
"diagnostics": self.diagnostics,
}
@@ -131,6 +135,11 @@ class AiSloCalculator:
slo1 = await self._calc_auto_success_rate(session, since)
slo2 = await self._calc_human_override_rate(session, since)
slo3 = await self._calc_false_neg_rate(session, since)
diagnostics = {}
if slo1.violated:
diagnostics["auto_execute_success_rate"] = (
await self._build_auto_success_diagnostics(session, since)
)
metrics = [slo1, slo2, slo3]
any_violated = any(m.violated for m in metrics)
@@ -138,6 +147,7 @@ class AiSloCalculator:
report = SloReport(
metrics=metrics,
any_violated=any_violated,
diagnostics=diagnostics,
)
logger.info(
@@ -189,6 +199,7 @@ class AiSloCalculator:
any_violated=data.get("any_violated", False),
calculated_at=data.get("calculated_at", ""),
window_days=data.get("window_days", SLO_WINDOW_DAYS),
diagnostics=data.get("diagnostics", {}),
)
except Exception as e:
logger.warning("slo_cache_read_error", error=str(e))
@@ -403,6 +414,264 @@ class AiSloCalculator:
direction="below", sample_count=0, violated=False,
)
async def _build_auto_success_diagnostics(self, session, since) -> dict[str, Any]:
"""建立 W-1 auto_execute_success_rate 的可解釋診斷資料。"""
try:
result = await session.execute(
text("""
SELECT
are.incident_id,
are.playbook_id,
are.playbook_name,
are.success,
are.error_message,
are.created_at,
COALESCE(
inc.signals->0->>'alertname',
inc.signals->0->'labels'->>'alertname',
inc.signals->0->>'alert_name',
inc.affected_services->>0,
'unknown'
) AS alertname
FROM auto_repair_executions are
LEFT JOIN incidents inc ON inc.incident_id = are.incident_id
WHERE are.created_at >= :since
ORDER BY are.created_at ASC
"""),
{"since": since},
)
rows = [dict(row._mapping) for row in result]
return build_auto_execute_success_diagnostics(
rows=rows,
now=now_taipei(),
threshold=SLO_AUTO_SUCCESS_MIN,
window_days=SLO_WINDOW_DAYS,
min_samples=SLO_MIN_SAMPLES,
)
except Exception as e:
logger.warning("slo1_diagnostics_error", error=str(e))
return {
"schema_version": "ai_slo_auto_execute_diagnostics_v1",
"status": "diagnostics_unavailable",
"error": str(e)[:200],
}
def build_auto_execute_success_diagnostics(
rows: list[dict[str, Any]],
now: datetime,
threshold: float = SLO_AUTO_SUCCESS_MIN,
window_days: int = SLO_WINDOW_DAYS,
min_samples: int = SLO_MIN_SAMPLES,
) -> dict[str, Any]:
"""
從 auto_repair_executions rows 建立前端/Telegram 可讀的 W-1 診斷。
此函式保持純邏輯,讓 watchdog 與 API 可以共用同一份語義,也方便
單元測試鎖住 rolling-window 回綠推估。
"""
sorted_rows = sorted(rows, key=lambda r: r.get("created_at") or now)
total = len(sorted_rows)
success = sum(1 for row in sorted_rows if bool(row.get("success")))
failed = total - success
rate = (success / total) if total else None
failures = [row for row in sorted_rows if not bool(row.get("success"))]
failure_groups = _build_failure_groups(failures)
sealed_groups = [
group for group in failure_groups
if str(group.get("closure_status", "")).startswith("sealed_")
]
open_groups = [
group for group in failure_groups
if not str(group.get("closure_status", "")).startswith("sealed_")
]
projected_green_at, projection_reason = _project_auto_success_green_at(
rows=sorted_rows,
now=now,
threshold=threshold,
window_days=window_days,
min_samples=min_samples,
)
if failed == 0:
status = "green"
elif open_groups:
status = "needs_investigation"
elif sealed_groups:
status = "sealed_waiting_window"
else:
status = "insufficient_diagnostics"
return {
"schema_version": "ai_slo_auto_execute_diagnostics_v1",
"status": status,
"summary": {
"total": total,
"success": success,
"failed": failed,
"rate": rate,
"threshold": threshold,
"window_days": window_days,
"min_samples": min_samples,
},
"top_failure_groups": failure_groups[:5],
"sealed_failure_group_count": len(sealed_groups),
"open_failure_group_count": len(open_groups),
"immediate_successes_needed": _successes_needed_now(success, total, threshold),
"projected_green_at": projected_green_at.isoformat() if projected_green_at else None,
"projection_reason": projection_reason,
"next_action": _auto_execute_diagnostics_next_action(status),
}
def _build_failure_groups(failures: list[dict[str, Any]]) -> list[dict[str, Any]]:
groups: dict[tuple[str, str, str, str], dict[str, Any]] = {}
for row in failures:
alertname = str(row.get("alertname") or "unknown")
playbook_id = str(row.get("playbook_id") or "unknown")
playbook_name = str(row.get("playbook_name") or "unknown")
error_signature = _auto_repair_error_signature(row.get("error_message"))
key = (alertname, playbook_id, playbook_name, error_signature)
group = groups.setdefault(
key,
{
"alertname": alertname,
"playbook_id": playbook_id,
"playbook_name": playbook_name,
"error_signature": error_signature,
"count": 0,
"first_seen": None,
"last_seen": None,
"example_incident_id": row.get("incident_id"),
},
)
group["count"] += 1
created_at = row.get("created_at")
if isinstance(created_at, datetime):
if group["first_seen"] is None or created_at < group["first_seen"]:
group["first_seen"] = created_at
if group["last_seen"] is None or created_at > group["last_seen"]:
group["last_seen"] = created_at
enriched = []
for group in groups.values():
closure = _classify_auto_repair_failure_closure(group)
enriched.append({
**group,
"first_seen": group["first_seen"].isoformat() if group["first_seen"] else None,
"last_seen": group["last_seen"].isoformat() if group["last_seen"] else None,
**closure,
})
return sorted(enriched, key=lambda item: item["count"], reverse=True)
def _auto_repair_error_signature(error_message: Any) -> str:
error = str(error_message or "").strip().lower()
if not error:
return "missing_error_message"
if "unsupported scheme" in error and "docker restart" in error:
return "legacy_ssh_docker_restart"
if "nodes" in error and "not found" in error:
return "k3s_node_target_not_found"
if "http error" in error:
return "http_error"
if "timeout" in error:
return "timeout"
compact = " ".join(error.split())
return compact[:120] or "unknown_error"
def _classify_auto_repair_failure_closure(group: dict[str, Any]) -> dict[str, str]:
signature = str(group.get("error_signature") or "")
alertname = str(group.get("alertname") or "")
playbook_name = str(group.get("playbook_name") or "")
text = f"{alertname} {playbook_name}".lower()
if signature == "legacy_ssh_docker_restart":
return {
"closure_status": "sealed_by_mcp_grant",
"closure_label": "已封口Docker restart 已改走 ssh_docker_restart/write MCP grant",
"recommended_action": "觀察後續 DockerContainerUnhealthy 執行,不回填舊歷史",
}
if signature == "k3s_node_target_not_found" and (
"stock" in text or "wooo.work" in text or "external" in text
):
return {
"closure_status": "sealed_by_external_site_guard",
"closure_label": "已封口:外部站台告警已阻擋 K3s node PlayBook 誤配",
"recommended_action": "觀察 StockWoooWorkDown 是否改走 external_site_down / NO_ACTION",
}
return {
"closure_status": "open_failure_source",
"closure_label": "待調查:尚未匹配到已封口修復來源",
"recommended_action": "反查 incident truth-chain、PlayBook、MCP 執行紀錄",
}
def _successes_needed_now(success: int, total: int, threshold: float) -> int:
if total <= 0 or threshold >= 1:
return 0
gap = (threshold * total) - success
if gap <= 0:
return 0
return max(0, ceil(gap / (1 - threshold)))
def _project_auto_success_green_at(
rows: list[dict[str, Any]],
now: datetime,
threshold: float,
window_days: int,
min_samples: int,
) -> tuple[datetime | None, str | None]:
window = timedelta(days=window_days)
current_rows = [
row for row in rows
if isinstance(row.get("created_at"), datetime)
and row["created_at"] >= now - window
]
current_total = len(current_rows)
current_success = sum(1 for row in current_rows if bool(row.get("success")))
if current_total < min_samples:
return now, "sample_window_below_min"
if current_success / current_total >= threshold:
return now, "already_green"
candidates = sorted({
row["created_at"] + window + timedelta(seconds=1)
for row in current_rows
if row["created_at"] + window > now
})
for checkpoint in candidates:
active_rows = [
row for row in rows
if isinstance(row.get("created_at"), datetime)
and row["created_at"] >= checkpoint - window
and row["created_at"] <= checkpoint
]
active_total = len(active_rows)
active_success = sum(1 for row in active_rows if bool(row.get("success")))
if active_total < min_samples:
return checkpoint, "sample_window_below_min"
if active_success / active_total >= threshold:
return checkpoint, "rolling_window_if_no_new_failures"
return None, "no_projection_available"
def _auto_execute_diagnostics_next_action(status: str) -> str:
if status == "green":
return "keep_monitoring"
if status == "sealed_waiting_window":
return "observe_rolling_window_no_manual_restart"
if status == "needs_investigation":
return "investigate_open_failure_groups"
return "refresh_truth_chain_and_execution_logs"
# ─────────────────────────────────────────────────────────────────────────────
# Singleton

View File

@@ -0,0 +1,178 @@
from __future__ import annotations
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from src.jobs.ai_slo_watchdog_job import _format_slo_violation_for_alert
from src.services.ai_slo_calculator import (
SLO_AUTO_SUCCESS_MIN,
SloMetric,
SloReport,
build_auto_execute_success_diagnostics,
)
TZ = ZoneInfo("Asia/Taipei")
def _row(
*,
created_at: datetime,
success: bool,
incident_id: str,
playbook_id: str = "PB-OK",
playbook_name: str = "成功修復 PlayBook",
alertname: str = "SyntheticAutoRepair",
error_message: str | None = None,
) -> dict:
return {
"created_at": created_at,
"success": success,
"incident_id": incident_id,
"playbook_id": playbook_id,
"playbook_name": playbook_name,
"alertname": alertname,
"error_message": error_message,
}
def test_auto_execute_diagnostics_marks_known_failures_as_sealed_and_projects_green():
now = datetime(2026, 6, 1, 18, 0, tzinfo=TZ)
rows = [
_row(
created_at=now - timedelta(days=2, minutes=i),
success=True,
incident_id=f"INC-SUCCESS-{i:02d}",
)
for i in range(45)
]
rows.extend(
_row(
created_at=now - timedelta(days=6, minutes=i),
success=False,
incident_id=f"INC-DOCKER-{i}",
playbook_id="PB-20260420-3F9C4C",
playbook_name="DockerContainerUnhealthy 修復",
alertname="DockerContainerUnhealthy",
error_message="Unsupported scheme: ssh {host} docker inspect minio && docker restart minio",
)
for i in range(5)
)
rows.extend(
_row(
created_at=now - timedelta(days=5, minutes=i),
success=False,
incident_id=f"INC-STOCK-{i}",
playbook_id="PB-20260416-79EB94",
playbook_name="K3s 節點下線修復",
alertname="StockWoooWorkDown",
error_message='nodes "stock-platform" not found',
)
for i in range(4)
)
diagnostics = build_auto_execute_success_diagnostics(rows, now)
assert diagnostics["status"] == "sealed_waiting_window"
assert diagnostics["summary"]["total"] == 54
assert diagnostics["summary"]["success"] == 45
assert diagnostics["summary"]["rate"] == 45 / 54
assert diagnostics["sealed_failure_group_count"] == 2
assert diagnostics["open_failure_group_count"] == 0
assert diagnostics["immediate_successes_needed"] == 6
assert diagnostics["projection_reason"] == "rolling_window_if_no_new_failures"
assert diagnostics["projected_green_at"].startswith("2026-06-02T17:57")
statuses = {group["closure_status"] for group in diagnostics["top_failure_groups"]}
assert "sealed_by_mcp_grant" in statuses
assert "sealed_by_external_site_guard" in statuses
def test_auto_execute_diagnostics_keeps_unknown_failures_open():
now = datetime(2026, 6, 1, 18, 0, tzinfo=TZ)
rows = [
_row(
created_at=now - timedelta(hours=i),
success=False,
incident_id=f"INC-UNKNOWN-{i}",
playbook_id="PB-UNKNOWN",
playbook_name="未知修復",
alertname="UnknownRepair",
error_message="timeout waiting for executor",
)
for i in range(5)
]
diagnostics = build_auto_execute_success_diagnostics(rows, now)
assert diagnostics["status"] == "needs_investigation"
assert diagnostics["sealed_failure_group_count"] == 0
assert diagnostics["open_failure_group_count"] == 1
assert diagnostics["next_action"] == "investigate_open_failure_groups"
def test_slo_report_to_dict_includes_diagnostics():
report = SloReport(
metrics=[
SloMetric(
name="auto_execute_success_rate",
value=0.8,
threshold=SLO_AUTO_SUCCESS_MIN,
direction="above",
sample_count=10,
violated=True,
)
],
any_violated=True,
diagnostics={"auto_execute_success_rate": {"status": "sealed_waiting_window"}},
)
assert report.to_dict()["diagnostics"]["auto_execute_success_rate"]["status"] == "sealed_waiting_window"
def test_watchdog_formats_auto_execute_diagnostics_for_meta_alert():
projected = datetime(2026, 6, 3, 23, 7, tzinfo=TZ).isoformat()
report = SloReport(
metrics=[
SloMetric(
name="auto_execute_success_rate",
value=45 / 54,
threshold=SLO_AUTO_SUCCESS_MIN,
direction="above",
sample_count=54,
violated=True,
)
],
any_violated=True,
diagnostics={
"auto_execute_success_rate": {
"summary": {
"total": 54,
"success": 45,
"failed": 9,
"rate": 45 / 54,
"threshold": SLO_AUTO_SUCCESS_MIN,
},
"sealed_failure_group_count": 2,
"open_failure_group_count": 0,
"immediate_successes_needed": 6,
"projected_green_at": projected,
"top_failure_groups": [
{
"alertname": "DockerContainerUnhealthy",
"playbook_id": "PB-20260420-3F9C4C",
"count": 5,
"closure_status": "sealed_by_mcp_grant",
}
],
}
},
)
line, cause = _format_slo_violation_for_alert(report, ["auto_execute_success_rate"])
assert "45/54=83.3%" in line
assert "已封口群組 2" in line
assert "06/03 23:07" in line
assert cause is not None
assert "Top failure groups" in cause
assert "不需要重啟服務或改寫歷史資料" in cause