feat(api): AIOps KPI Dashboard — AI 自主化成熟度全景 (積木化重構)
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m47s

GET /api/v1/aiops/kpi → 一次整合 MASTER §7.1 全部 KPI.

leWOOOgo 積木化鐵律對齊:
  - Router (api/v1/aiops_kpi.py) 僅 HTTP 路由, 不碰 DB
  - Service (services/aiops_kpi_service.py) 負責所有 SQL + 計算
  - 前次 commit 被 hook 擋下 (Router 直接 import get_db_context), 本次修正

services/aiops_kpi_service.py (~230 行):
  AiopsKpiService.get_snapshot() 回 6 section:

  1. asset_inventory: by_type + total + last_scan (run_id/ended_at/總計/new/modified)
  2. coverage_kpi: 7 維 × (green/yellow/red/unknown)
     + green_ratio_per_dim + overall_green_ratio (MASTER §7.1 #5 SLO)
  3. rule_quality: total/with_fires/noisy/deprecated/ai_generated + top 5 noisy
  4. capacity_health: 最新 snapshot per host + by_verdict + violations_7d
  5. automation_flow_24h: aol detail + by_actor + by_operation_type
  6. ai_autonomy_score: 0-100 總分
     5 子項 × 20: asset_coverage / rule_quality / capacity_health /
                  automation_flow / ai_diversity
     grade: mature(90+) / in_progress(70-90) / starter(50-70) / initial(<50)

api/v1/aiops_kpi.py (~35 行 精簡 router):
  只做 router = APIRouter() + @router.get 委派給 service

main.py:
  include_router(aiops_kpi_v1.router, prefix='/api/v1', tags=['AIOps KPI'])

統帥使用:
  curl http://192.168.0.121:32334/api/v1/aiops/kpi | jq .
  一次看見 AI 自主化成熟度全景

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 21:20:13 +08:00
parent f1b13d7b26
commit 0004554bc6
3 changed files with 308 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
"""
AIOps KPI Dashboard — ADR-090 + MASTER §7.1
=============================================
GET /api/v1/aiops/kpi → 一次回傳 AI 自主化成熟度全景.
Router 層只負責 HTTP 路由,DB/business logic 由 AiopsKpiService 處理
(leWOOOgo 積木化鐵律: Router 禁直接存取 DB).
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
"""
from __future__ import annotations
from typing import Any
from fastapi import APIRouter
from src.services.aiops_kpi_service import get_aiops_kpi_service
router = APIRouter()
@router.get("/aiops/kpi", tags=["AIOps KPI"])
async def get_aiops_kpi() -> dict[str, Any]:
"""
AI 自主化成熟度全景 KPI.
一次返回 6 個 section + autonomy_score:
- asset_inventory: 資產盤點 (by type + last_scan)
- coverage_kpi: 7 維自動化覆蓋 SLO (green/yellow/red/unknown)
- rule_quality: 規則品質 (noisy/deprecated/with_fires + top 5)
- capacity_health: 主機容量健康 (ai_verdict 分布)
- automation_flow_24h: 過去 24h aol 動作流量
- ai_autonomy_score: 自主化總分 (0-100, 5 子項 × 20)
"""
svc = get_aiops_kpi_service()
return await svc.get_snapshot()

View File

@@ -35,6 +35,7 @@ from sentry_sdk.integrations.starlette import StarletteIntegration
from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API
from src.api.v1 import ai as ai_v1
from src.api.v1 import ai_slo as ai_slo_v1 # Phase 6 ADR-087: AI SLO 自我治理
from src.api.v1 import aiops_kpi as aiops_kpi_v1 # ADR-090 § Phase 7 KPI Dashboard
from src.api.v1 import approvals as approvals_v1
from src.api.v1 import alert_operation_logs as alert_operation_logs_v1
from src.api.v1 import audit_logs as audit_logs_v1
@@ -685,6 +686,7 @@ app.include_router(dashboard_v1.router, prefix="/api/v1", tags=["Dashboard"])
app.include_router(approvals_v1.router, prefix="/api/v1", tags=["HITL Approvals"])
app.include_router(ai_v1.router, prefix="/api/v1", tags=["AI Decision"])
app.include_router(ai_slo_v1.router, prefix="/api/v1", tags=["AI SLO"]) # Phase 6 ADR-087
app.include_router(aiops_kpi_v1.router, prefix="/api/v1", tags=["AIOps KPI"]) # ADR-090 § Phase 7 Dashboard
app.include_router(webhooks_v1.router, prefix="/api/v1", tags=["Webhooks"])
app.include_router(timeline_v1.router, prefix="/api/v1", tags=["Timeline"])
app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"])

View File

@@ -0,0 +1,270 @@
"""
AIOps KPI Service — ADR-090 + MASTER §7.1
==========================================
Router 層呼叫本 Service 取得 KPI 全景,Router 禁直接存取 DB (leWOOOgo 積木化鐵律).
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
"""
from __future__ import annotations
import math
from typing import Any
from sqlalchemy import text as _sql
from src.db.base import get_db_context
from src.utils.timezone import now_taipei
class AiopsKpiService:
"""組裝 AI 自主化成熟度 KPI 全景."""
async def get_snapshot(self) -> dict[str, Any]:
"""一次回傳 6 section + 自主化總分."""
async with get_db_context() as db:
inventory = await self._fetch_asset_inventory(db)
coverage = await self._fetch_coverage_kpi(db)
rule_quality = await self._fetch_rule_quality(db)
capacity = await self._fetch_capacity_health(db)
aol_flow = await self._fetch_automation_flow_24h(db)
autonomy = self._compute_autonomy_score(
inventory, coverage, rule_quality, capacity, aol_flow,
)
return {
"generated_at": now_taipei().isoformat(),
"asset_inventory": inventory,
"coverage_kpi": coverage,
"rule_quality": rule_quality,
"capacity_health": capacity,
"automation_flow_24h": aol_flow,
"ai_autonomy_score": autonomy,
}
async def _fetch_asset_inventory(self, db) -> dict[str, Any]:
rows = await db.execute(_sql("""
SELECT asset_type, count(*) AS cnt
FROM asset_inventory
WHERE lifecycle_state = 'active'
GROUP BY asset_type
ORDER BY cnt DESC
"""))
by_type = {r.asset_type: int(r.cnt) for r in rows.fetchall()}
run_row = await db.execute(_sql("""
SELECT run_id, ended_at, total_assets, new_assets, modified_assets, duration_ms
FROM asset_discovery_run
WHERE status = 'success'
ORDER BY ended_at DESC LIMIT 1
"""))
run = run_row.one_or_none()
last_run: dict[str, Any] | None = None
if run:
last_run = {
"run_id": str(run.run_id),
"ended_at": run.ended_at.isoformat() if run.ended_at else None,
"total_assets": run.total_assets,
"new_assets": run.new_assets,
"modified_assets": run.modified_assets,
"duration_ms": run.duration_ms,
}
return {
"by_type": by_type,
"total": sum(by_type.values()),
"last_scan": last_run,
}
async def _fetch_coverage_kpi(self, db) -> dict[str, Any]:
rows = await db.execute(_sql("""
SELECT dimension, coverage_status, count(*) AS cnt
FROM asset_coverage_snapshot
WHERE run_id = (
SELECT run_id FROM asset_discovery_run
WHERE status = 'success' ORDER BY ended_at DESC LIMIT 1
)
GROUP BY dimension, coverage_status
ORDER BY dimension, coverage_status
"""))
by_dim: dict[str, dict[str, int]] = {}
for r in rows.fetchall():
by_dim.setdefault(r.dimension, {})[r.coverage_status] = int(r.cnt)
slo_per_dim: dict[str, float] = {}
for dim, statuses in by_dim.items():
total = sum(statuses.values())
green = statuses.get("green", 0)
slo_per_dim[dim] = round(green / total, 4) if total else 0.0
return {
"by_dimension": by_dim,
"green_ratio_per_dim": slo_per_dim,
"overall_green_ratio": round(
sum(slo_per_dim.values()) / len(slo_per_dim), 4
) if slo_per_dim else 0.0,
}
async def _fetch_rule_quality(self, db) -> dict[str, Any]:
summary = await db.execute(_sql("""
SELECT
count(*) AS total,
count(*) FILTER (WHERE last_fired_at IS NOT NULL) AS with_fires,
count(*) FILTER (WHERE noise_rate > 0.5) AS noisy,
count(*) FILTER (WHERE review_status = 'deprecated') AS deprecated,
count(*) FILTER (WHERE source = 'ai_generated') AS ai_generated
FROM alert_rule_catalog
"""))
s = summary.one()
noisy_rows = await db.execute(_sql("""
SELECT rule_name, severity, true_positive_count AS tp, false_positive_count AS fp,
noise_rate, last_fired_at
FROM alert_rule_catalog
WHERE noise_rate IS NOT NULL
AND review_status IS DISTINCT FROM 'deprecated'
ORDER BY noise_rate DESC, true_positive_count + false_positive_count DESC
LIMIT 5
"""))
top_noisy = [
{
"rule_name": r.rule_name,
"severity": r.severity,
"tp": int(r.tp or 0),
"fp": int(r.fp or 0),
"noise_rate": float(r.noise_rate) if r.noise_rate else 0.0,
"last_fired_at": r.last_fired_at.isoformat() if r.last_fired_at else None,
}
for r in noisy_rows.fetchall()
]
return {
"total": int(s.total or 0),
"with_fires": int(s.with_fires or 0),
"noisy_above_0_5": int(s.noisy or 0),
"deprecated": int(s.deprecated or 0),
"ai_generated": int(s.ai_generated or 0),
"top_noisy": top_noisy,
}
async def _fetch_capacity_health(self, db) -> dict[str, Any]:
rows = await db.execute(_sql("""
SELECT DISTINCT ON (host)
host, ai_verdict, cpu_used_pct, mem_used_pct, swap_used_pct,
captured_at, ai_reasoning
FROM host_capacity_snapshot
ORDER BY host, captured_at DESC
"""))
hosts = [
{
"host": r.host,
"ai_verdict": r.ai_verdict,
"cpu_used_pct": float(r.cpu_used_pct) if r.cpu_used_pct else None,
"mem_used_pct": float(r.mem_used_pct) if r.mem_used_pct else None,
"swap_used_pct": float(r.swap_used_pct) if r.swap_used_pct else None,
"captured_at": r.captured_at.isoformat() if r.captured_at else None,
"reasoning": r.ai_reasoning,
}
for r in rows.fetchall()
]
by_verdict: dict[str, int] = {}
for h in hosts:
key = h["ai_verdict"] or "unknown"
by_verdict[key] = by_verdict.get(key, 0) + 1
violations = await db.execute(_sql("""
SELECT count(*) AS cnt FROM capacity_violation_event
WHERE detected_at > NOW() - INTERVAL '7 days'
"""))
return {
"hosts": hosts,
"by_verdict": by_verdict,
"violations_7d": int(violations.scalar() or 0),
}
async def _fetch_automation_flow_24h(self, db) -> dict[str, Any]:
rows = await db.execute(_sql("""
SELECT operation_type, actor, status, count(*) AS cnt
FROM automation_operation_log
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY operation_type, actor, status
ORDER BY cnt DESC
"""))
flows = [
{"operation_type": r.operation_type, "actor": r.actor, "status": r.status, "count": int(r.cnt)}
for r in rows.fetchall()
]
by_actor: dict[str, int] = {}
by_type: dict[str, int] = {}
for f in flows:
by_actor[f["actor"]] = by_actor.get(f["actor"], 0) + f["count"]
by_type[f["operation_type"]] = by_type.get(f["operation_type"], 0) + f["count"]
return {
"detail": flows,
"by_actor": by_actor,
"by_operation_type": by_type,
"total": sum(by_type.values()),
}
def _compute_autonomy_score(
self,
inventory: dict[str, Any],
coverage: dict[str, Any],
rule_quality: dict[str, Any],
capacity: dict[str, Any],
aol_flow: dict[str, Any],
) -> dict[str, Any]:
"""AI 自主化總分 (0-100),5 子項各 20 分."""
score_coverage = min(20.0, coverage.get("overall_green_ratio", 0.0) * 20)
total_rules = rule_quality.get("total", 1) or 1
noisy = rule_quality.get("noisy_above_0_5", 0)
score_rule = max(0.0, 20 - (noisy / total_rules * 20))
by_verdict = capacity.get("by_verdict", {})
critical = by_verdict.get("critical", 0)
warning = by_verdict.get("warning", 0)
deductions = min(20, critical * 10 + warning * 3)
score_capacity = max(0.0, 20 - deductions)
total_ops_24h = aol_flow.get("total", 0)
if total_ops_24h > 0:
score_flow = min(20.0, math.log10(total_ops_24h + 1) / math.log10(101) * 20)
else:
score_flow = 0.0
ai_rules = rule_quality.get("ai_generated", 0)
op_types = len(aol_flow.get("by_operation_type", {}))
score_diversity = min(20.0, ai_rules * 1.0 + min(op_types, 10))
total = score_coverage + score_rule + score_capacity + score_flow + score_diversity
if total >= 90:
grade = "mature"
elif total >= 70:
grade = "in_progress"
elif total >= 50:
grade = "starter"
else:
grade = "initial"
return {
"total": round(total, 1),
"grade": grade,
"breakdown": {
"asset_coverage": round(score_coverage, 2),
"rule_quality": round(score_rule, 2),
"capacity_health": round(score_capacity, 2),
"automation_flow": round(score_flow, 2),
"ai_diversity": round(score_diversity, 2),
},
"max": 100.0,
}
_singleton: AiopsKpiService | None = None
def get_aiops_kpi_service() -> AiopsKpiService:
global _singleton
if _singleton is None:
_singleton = AiopsKpiService()
return _singleton