Files
awoooi/apps/api/src/services/alert_analyzer_service.py
OG T c9c60c3a61
Some checks failed
E2E Health Check / e2e-health (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
Type Sync Check / check-type-sync (push) Failing after 22s
feat(mcp-integrations): Phase S 架構修復 + MCP 整合基礎建設
Phase S 技術債修復 (首席架構師審查 82→完整):
- S-01: generate_alert_fingerprint 移至 AlertAnalyzer.generate_fingerprint() staticmethod
- S-04: 移除 Pydantic v2 deprecated json_encoders (直接用原生 datetime 序列化)

Sentry MCP 整合 (Phase 23):
- ADR-048: Sentry→OpenClaw AI Triage 架構決策
- sentry_webhook_service.py: parse/analyze/create_incident/build_message Service 層
- config.py: SENTRY_WEBHOOK_SECRET (Fail-Closed HMAC-SHA256)

Playwright MCP 整合 (短期):
- smoke.spec.ts: 5 頁面 E2E smoke test (home/dashboard/incidents/approvals/terminal)
- cd.yaml: E2E Smoke Test 步驟 + Telegram 🎭 Smoke 狀態通知

長期規劃 ADR:
- ADR-049: Figma Code Connect 設計系統同步
- ADR-050: Telegram 互動式 Incident 2.0 (6鍵 Inline Keyboard)
- ADR-051: Context7 依賴升級顧問 (Next.js 14→15, FastAPI 0.115→0.128)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 16:20:57 +08:00

202 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Alert Analyzer Service - 告警分析大腦
======================================
從 api/v1/webhooks.py 抽取至 services 層 (ADR-024 四層架構R4 #129)
職責:
- 根據告警類型、嚴重度、相關指標,判定風險等級
- 計算爆炸半徑 (Blast Radius)
- 組裝 ApprovalRequestCreate
設計原則:
- 純業務邏輯層,不存取 Redis/DB
- 依賴 K8s 資源名稱正規化工具 (ADR-016)
- 可獨立測試 (無外部依賴)
版本: v1.0
建立: 2026-04-01 (台北時區)
建立者: Claude Code (R4 Router 瘦身 #129)
"""
import hashlib
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
DataImpact,
DryRunCheck,
RiskLevel,
)
from src.models.webhook import AlertPayload
from src.utils.k8s_naming import normalize_resource_name
class AlertAnalyzer:
"""
告警分析器 - AWOOOI 核心大腦
根據告警類型、嚴重度、相關指標,
自動判定風險等級、爆炸半徑、處置建議。
搬移自: api/v1/webhooks.py (ADR-024 R4 #129, 2026-04-01 ogt)
"""
# 告警類型 → 風險等級映射
RISK_MAPPING: dict[str, RiskLevel] = {
"k8s_node_failure": RiskLevel.CRITICAL,
"k8s_pod_crash": RiskLevel.MEDIUM,
"db_connection_timeout": RiskLevel.CRITICAL,
"service_404": RiskLevel.MEDIUM,
"high_cpu": RiskLevel.MEDIUM,
"high_memory": RiskLevel.MEDIUM,
"disk_full": RiskLevel.CRITICAL,
"ssl_expiry": RiskLevel.LOW,
"custom": RiskLevel.MEDIUM,
}
# 告警類型 → 處置建議映射
ACTION_MAPPING: dict[str, str] = {
"k8s_node_failure": "kubectl drain {resource} --ignore-daemonsets",
"k8s_pod_crash": "kubectl delete pod {resource} -n {namespace}",
"db_connection_timeout": "重啟資料庫連線池並檢查網路",
"service_404": "kubectl rollout restart deployment/{resource} -n {namespace}",
"high_cpu": "kubectl scale deployment/{resource} --replicas=+2 -n {namespace}",
"high_memory": "kubectl delete pod {resource} -n {namespace} (記憶體洩漏清理)",
"disk_full": "清理 /var/log 與 /tmp 目錄",
"ssl_expiry": "更新 SSL 憑證",
"custom": "人工分析處置",
}
# 告警類型 → 爆炸半徑映射
BLAST_RADIUS_MAPPING: dict[str, dict] = {
"k8s_node_failure": {"pods": 10, "downtime": "~5 min", "services": ["all-on-node"]},
"k8s_pod_crash": {"pods": 1, "downtime": "~30s", "services": []},
"db_connection_timeout": {"pods": 0, "downtime": "~2 min", "services": ["api", "auth"]},
"service_404": {"pods": 3, "downtime": "~1 min", "services": []},
"high_cpu": {"pods": 0, "downtime": "0", "services": []},
"high_memory": {"pods": 1, "downtime": "~30s", "services": []},
"disk_full": {"pods": 0, "downtime": "~5 min", "services": ["logging"]},
"ssl_expiry": {"pods": 0, "downtime": "0", "services": ["https"]},
"custom": {"pods": 0, "downtime": "unknown", "services": []},
}
@classmethod
def analyze(cls, alert: AlertPayload) -> ApprovalRequestCreate:
"""
分析告警並生成 ApprovalRequestCreate
Phase 18.1.7: 整合 K8s 資源名稱正規化 (ADR-016)
Returns:
ApprovalRequestCreate 用於建立待簽核卡片
"""
# Phase 18.1.7: 先正規化資源名稱
normalized = normalize_resource_name(alert.target_resource, alert.namespace)
resolved_resource = normalized.normalized or alert.target_resource
resolved_namespace = normalized.namespace or alert.namespace
# 1. 判定風險等級
base_risk = cls.RISK_MAPPING.get(alert.alert_type, RiskLevel.MEDIUM)
# 嚴重度提升
if alert.severity == "critical" and base_risk != RiskLevel.CRITICAL:
risk_level = RiskLevel.CRITICAL
else:
risk_level = base_risk
# 2. 取得處置建議 (使用正規化後的資源名稱)
action_template = cls.ACTION_MAPPING.get(alert.alert_type, "人工分析處置")
action = action_template.format(
resource=resolved_resource,
namespace=resolved_namespace,
)
# 3. 取得爆炸半徑
blast_info = cls.BLAST_RADIUS_MAPPING.get(
alert.alert_type,
{"pods": 0, "downtime": "unknown", "services": []},
)
# 判定 data_impact
data_impact = DataImpact.NONE
if alert.alert_type in ["db_connection_timeout", "disk_full"]:
data_impact = DataImpact.WRITE
# 4. 建立 Dry-run 檢查項目
dry_run_checks = [
DryRunCheck(
name="權限驗證",
passed=True,
message="cluster-admin",
),
DryRunCheck(
name="語法驗證",
passed=True,
message=None,
),
DryRunCheck(
name="告警來源驗證",
passed=True,
message=alert.source,
),
]
# 如果有 metrics加入 sigma 分析
if alert.metrics:
cpu = alert.metrics.get("cpu_percent", 0)
sigma = alert.metrics.get("sigma_deviation", 0)
if sigma and abs(sigma) > 2:
dry_run_checks.append(
DryRunCheck(
name="基準線偏差分析",
passed=True,
message=f"CPU: {cpu:.0f}% (σ: {sigma:+.1f})",
)
)
# 5. 組裝 description
description = f"[{alert.alert_type}] {alert.message}"
if alert.metrics:
metrics_str = ", ".join(f"{k}={v}" for k, v in alert.metrics.items())
description += f" | 指標: {metrics_str}"
# 6. 建立 ApprovalRequestCreate
return ApprovalRequestCreate(
action=action,
description=description,
risk_level=risk_level,
blast_radius=BlastRadius(
affected_pods=blast_info["pods"],
estimated_downtime=blast_info["downtime"],
related_services=blast_info["services"] + [alert.target_resource],
data_impact=data_impact,
),
dry_run_checks=dry_run_checks,
requested_by="OpenClaw",
)
# [首席架構師] 封裝 generate_alert_fingerprint 為 staticmethod v1.2 2026-04-01 Asia/Taipei
@staticmethod
def generate_fingerprint(alert: AlertPayload) -> str:
"""
生成告警唯一指紋 (SHA256 Hash)
指紋組成: namespace:deployment:alert_type:target_resource
同一個告警模式(相同位置、相同類型)會產生相同指紋,
用於識別重複告警並進行聚合。
"""
# 從 labels 取得 deployment如果沒有則用 target_resource
deployment = ""
if alert.labels:
deployment = alert.labels.get("deployment", alert.labels.get("app", ""))
if not deployment:
deployment = alert.target_resource
# 組合指紋來源
fingerprint_source = f"{alert.namespace}:{deployment}:{alert.alert_type}:{alert.target_resource}"
# SHA256 Hash
return hashlib.sha256(fingerprint_source.encode()).hexdigest()[:32]