Files
awoooi/apps/api/src/api/v1/webhooks.py
OG T 1fb0c0ca90
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
fix(auto-repair): Bug #5+#6 — SSH binary + affected_services 匹配修正
Bug #5 (webhooks.py): target_resource 現在優先用 component label
  - SentryDown alert 有 labels.component="sentry"
  - 舊邏輯: labels.instance="192.168.0.110:9000" → Playbook affected_services 不匹配
  - 新邏輯: component → pod → instance → alertname

Bug #6 (Dockerfile): python:3.11-slim 無 openssh-client
  - SSH_COMMAND Playbook 執行路徑調用 asyncio.create_subprocess_exec("ssh", ...)
  - image 沒有 ssh binary → 所有 SSH 修復必然失敗
  - 修正: 在 production stage 安裝 openssh-client

服務清單: 補 sentry 主服務到 service-registry.yaml (AUTO 級別)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 14:11:50 +08:00

1534 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Webhook API - 外部告警接收 (OpenClaw Integration)
==================================================
Phase 5: OpenClaw 實體化升級
CAI-201: AWOOOI 核心大腦 Webhook 入口
戰略 B: 告警風暴收斂與成本控制
Phase 6.1: Event Bus (Redis Streams)
- POST /api/v1/webhooks/signals - 輕量級訊號接收 (直接進 Redis Stream)
Endpoints:
- POST /api/v1/webhooks/alerts - 接收外部系統告警 (含 HMAC 驗證)
流程 (Phase 5: OpenClaw + HMAC 安全):
1. HMAC 簽章驗證 (CISO 要求)
2. 接收告警 (K8s, Prometheus, etc.)
3. 生成告警指紋 (namespace:deployment:alert_type Hash)
4. 查詢 DB 是否有同指紋 pending 或 5 分鐘內的記錄
5. [收斂] 如果有hit_count +1跳過 LLM節省成本
6. [新告警] 如果沒有:觸發 OpenClaw LLM 分析
7. 建立/更新 ApprovalRecord
8. 前端戰情室即時顯示聚合次數
"""
import hashlib
import hmac
from typing import Literal
from fastapi import APIRouter, BackgroundTasks, Header, HTTPException, Request, status
from pydantic import BaseModel, Field
from src.core.config import settings
from src.core.constants import is_cicd_alertname
from src.core.logging import get_logger
from src.core.metrics import record_alert_chain_success
# Phase 15.2: Trace Context (moved to SignalProducerService)
# get_trace_context 已移至 Service 層
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
DataImpact,
DryRunCheck,
RiskLevel,
)
from src.models.incident import Incident, IncidentStatus, Severity, Signal
# R4 #129 (2026-04-01 ogt): AlertPayload/AlertResponse 移至 models 層AlertAnalyzer 移至 services 層
# ogt 更新 v1.1 2026-04-01 台北時間: generate_alert_fingerprint 移至 alert_analyzer_service (ADR-024)
# [首席架構師] 移除 generate_alert_fingerprint 直接 import改用 AlertAnalyzer.generate_fingerprint v1.2 2026-04-01 Asia/Taipei
from src.models.webhook import AlertPayload, AlertResponse
from src.services.alert_analyzer_service import AlertAnalyzer
from src.services.approval_db import get_approval_service
# Phase 17 P0: Service 層 (消除 Router 直接存取 Redis)
from src.services.incident_service import get_incident_service
from src.services.auto_repair_service import AutoRepairService
# Phase 5: OpenClaw AI Engine
from src.services.openclaw import get_openclaw
from src.services.signal_producer import SignalData, get_signal_producer
# Phase 5: Telegram Gateway (行動戰情室)
from src.services.telegram_gateway import TelegramGatewayError, get_telegram_gateway
# Phase 18.1.7: K8s 資源名稱正規化 已移至 alert_analyzer_service (R4 #129)
from src.utils.timezone import now_taipei
router = APIRouter(prefix="/webhooks", tags=["Webhooks"])
logger = get_logger("awoooi.webhooks")
# =============================================================================
# Incident-Approval 同步 (feedback_incident_approval_sync.md 鐵律)
# =============================================================================
# 風險等級 → 事件嚴重度映射
RISK_TO_SEVERITY = {
"critical": Severity.P0,
"high": Severity.P1,
"medium": Severity.P2,
"low": Severity.P3,
}
# Incident TTL: 7 天 (秒)
INCIDENT_TTL_SECONDS = 7 * 24 * 60 * 60
async def create_incident_for_approval(
approval_id: str,
risk_level: str,
target_resource: str,
namespace: str,
alert_type: str,
message: str,
source: str = "alertmanager",
alertname: str | None = None,
) -> str:
"""
為 Approval 創建對應的 Incident (活躍事件同步)
設計原則:
- Approval 和 Incident 必須同時存在
- Incident 存入 Redis (Working Memory)
- 7 天 TTL 自動過期
Returns:
str: Incident ID
"""
from uuid import UUID
# Phase 17 P0: Router 層違規修復 - 改用 Service 層
incident_service = get_incident_service()
# 映射嚴重度
severity = RISK_TO_SEVERITY.get(risk_level.lower(), Severity.P2)
# 建立 Signal (原始告警)
signal = Signal(
alert_name=alert_type,
severity=severity,
source=source,
fired_at=now_taipei(),
labels={"namespace": namespace, "resource": target_resource, "alertname": alertname or alert_type},
annotations={"message": message},
)
# 建立 Incident
incident = Incident(
status=IncidentStatus.INVESTIGATING,
severity=severity,
signals=[signal],
affected_services=[target_resource],
proposal_ids=[UUID(approval_id)],
)
# Phase 17 P0: 透過 Service 存入 Working Memory (Redis)
await incident_service.save_to_working_memory(incident)
# 2026-04-06 ogt: Phase 26 — 同時寫入 Episodic Memory (PostgreSQL)
# 原本只存 RedisTTL 7天後消失Playbook 萃取和 KM 永遠找不到 incident
try:
await incident_service.save_to_episodic_memory(incident)
except Exception as _pg_err:
logger.warning(
"incident_episodic_memory_failed",
incident_id=incident.incident_id,
error=str(_pg_err),
)
logger.info(
"incident_created_for_approval",
incident_id=incident.incident_id,
approval_id=approval_id,
severity=severity.value,
target=target_resource,
)
return incident.incident_id
# =============================================================================
# 2026-04-05 ogt: 自動修復背景任務 (ADR-058 閉環)
# =============================================================================
async def _try_auto_repair_background(
incident_id: str,
approval_id: str,
alert_type: str,
target_resource: str,
namespace: str,
) -> None:
"""
背景評估並執行自動修復
流程:
1. 重新載入 Incident
2. evaluate_auto_repair() — 只保留 P0/P1 嚴重度阻擋 (統帥指令: 直接全部自動修復)
3. 可修復 → execute_auto_repair() 執行
4. 不可修復 → 靜默,等人工批准
所有步驟都寫入 alert_operation_log
"""
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
op_log = get_alert_operation_log_repository()
try:
incident_service = get_incident_service()
incident = await incident_service.get_from_working_memory(incident_id)
if not incident:
logger.warning("auto_repair_incident_not_found", incident_id=incident_id)
return
repair_service = AutoRepairService()
decision = await repair_service.evaluate_auto_repair(incident)
logger.info(
"auto_repair_decision",
incident_id=incident_id,
approval_id=approval_id,
can_auto_repair=decision.can_auto_repair,
reason=decision.reason,
blocked_by=decision.blocked_by,
)
if not decision.can_auto_repair:
# 記錄評估被阻擋
await op_log.append(
"AUTO_REPAIR_TRIGGERED",
incident_id=incident_id,
approval_id=approval_id,
actor="auto_repair",
action_detail=f"blocked:{decision.blocked_by}",
success=False,
error_message=decision.reason,
context={
"blocked_by": decision.blocked_by,
"reason": decision.reason,
"playbook_id": decision.playbook.playbook_id if decision.playbook else None,
},
)
return
# 記錄自動修復觸發 (Sprint 5.1 Q10: 加入 Langfuse trace_id 追蹤)
# (2026-04-08 Claude Sonnet 4.6 Asia/TaipeiADR-062)
_langfuse_trace_id = getattr(incident, "langfuse_trace_id", None)
await op_log.append(
"AUTO_REPAIR_TRIGGERED",
incident_id=incident_id,
approval_id=approval_id,
actor="auto_repair",
action_detail=decision.playbook.name if decision.playbook else "unknown",
success=True,
context={
"playbook_id": decision.playbook.playbook_id,
"playbook_name": decision.playbook.name,
"similarity_score": decision.similarity_score,
"risk_level": decision.risk_level.value if decision.risk_level else None,
"langfuse_trace_id": _langfuse_trace_id,
"langfuse_url": (
f"{settings.LANGFUSE_URL}/trace/{_langfuse_trace_id}"
if _langfuse_trace_id else None
),
},
)
# 執行自動修復
logger.info(
"auto_repair_executing",
incident_id=incident_id,
playbook_id=decision.playbook.playbook_id if decision.playbook else None,
)
result = await repair_service.execute_auto_repair(
incident=incident,
playbook=decision.playbook,
is_cold_start=decision.is_cold_start,
similarity_score=decision.similarity_score,
)
logger.info(
"auto_repair_result",
incident_id=incident_id,
success=result.success if result else False,
)
# 記錄執行結果
if result:
await op_log.append(
"EXECUTION_COMPLETED",
incident_id=incident_id,
approval_id=approval_id,
actor="auto_repair",
action_detail=f"playbook:{result.playbook_id}",
success=result.success,
error_message=result.error,
context={
"playbook_id": result.playbook_id,
"steps_count": len(result.executed_steps),
"execution_time_ms": result.execution_time_ms,
"alert_type": alert_type,
"target_resource": target_resource,
"namespace": namespace,
},
)
# 通知 Telegram 自動修復結果
if result:
try:
telegram = get_telegram_gateway()
status_icon = "" if result.success else ""
steps_summary = "\n".join(f"{s}" for s in result.executed_steps[:3]) or "-"
await telegram.send_message(
f"{status_icon} *自動修復{'完成' if result.success else '失敗'}*\n"
f"資源: `{target_resource}` ({namespace})\n"
f"告警: {alert_type}\n"
f"耗時: {result.execution_time_ms}ms\n"
f"步驟:\n{steps_summary}"
)
# 記錄 Telegram 推送
await op_log.append(
"TELEGRAM_RESULT_SENT",
incident_id=incident_id,
approval_id=approval_id,
actor="system",
action_detail="auto_repair_result",
success=result.success,
context={"target_resource": target_resource, "namespace": namespace},
)
except Exception as tg_err:
logger.warning("auto_repair_telegram_notify_failed", error=str(tg_err))
except Exception as e:
logger.exception(
"auto_repair_background_failed",
incident_id=incident_id,
error=str(e),
)
# =============================================================================
# Phase 5: Telegram 背景推送任務 (非阻塞)
# =============================================================================
async def _push_to_telegram_background(
approval_id: str,
risk_level: str,
resource_name: str,
root_cause: str,
suggested_action: str,
estimated_downtime: str,
hit_count: int = 1,
# v6.0 AI 仲裁欄位
primary_responsibility: str = "COLLAB",
confidence: float = 0.0,
namespace: str = "default",
# v7.0 SignOz 整合
signoz_rps: float = 0.0,
signoz_rps_trend: str = "stable",
signoz_error_rate: float = 0.0,
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens: int = 0,
ai_cost: float = 0.0,
signoz_p99_latency: float = 0.0,
signoz_latency_trend: str = "stable",
signoz_trace_url: str = "",
auto_tuning_command: str = "",
# 2026-04-02 ogt: 修復 ai_provider 未傳遞 → Telegram 顯示「AI 仲裁判定」而非具體模型名稱
ai_provider: str = "",
# 2026-04-08 ogt: 補傳 incident_id 以啟用詳情/重診/歷史按鈕
incident_id: str = "",
) -> None:
"""
背景任務: 推送待簽核卡片到 Telegram (v7.0 含 SignOz 整合)
使用 BackgroundTasks 執行,絕不阻塞 Webhook 回應。
任何 Telegram API 錯誤都會被捕捉並記錄,不影響主流程。
"""
try:
gateway = get_telegram_gateway()
# 檢查是否有設定 Bot Token
if not settings.OPENCLAW_TG_BOT_TOKEN:
logger.debug(
"telegram_push_skipped",
reason="Bot token not configured",
approval_id=approval_id,
)
return
# 如果是收斂告警,在訊息中加入聚合次數
root_cause_with_count = root_cause
if hit_count > 1:
root_cause_with_count = f"[x{hit_count}] {root_cause}"
# TODO(2026-04-05): Alertmanager 路徑透過 Approval 建立,尚無 incident_id
# 待 Approval→Incident 關聯建立後,補傳 incident_id 以啟用 detail/reanalyze/history 按鈕
await gateway.send_approval_card(
approval_id=approval_id,
risk_level=risk_level,
resource_name=resource_name[:50],
root_cause=root_cause_with_count[:100],
suggested_action=suggested_action[:50],
estimated_downtime=estimated_downtime,
# v6.0 AI 仲裁
primary_responsibility=primary_responsibility,
confidence=confidence,
namespace=namespace,
# v7.0 SignOz 整合
signoz_rps=signoz_rps,
signoz_rps_trend=signoz_rps_trend,
signoz_error_rate=signoz_error_rate,
signoz_p99_latency=signoz_p99_latency,
signoz_latency_trend=signoz_latency_trend,
signoz_trace_url=signoz_trace_url,
auto_tuning_command=auto_tuning_command,
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens=ai_tokens,
ai_cost=ai_cost,
ai_provider=ai_provider,
incident_id=incident_id,
)
logger.info(
"telegram_push_success",
approval_id=approval_id,
risk_level=risk_level,
hit_count=hit_count,
primary_responsibility=primary_responsibility,
confidence=confidence,
signoz_integrated=signoz_rps > 0 or signoz_error_rate > 0,
ai_tokens=ai_tokens,
ai_cost=f"${ai_cost:.6f}",
)
# 2026-04-08 Claude Code: 記錄 Telegram 推送事件
try:
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
await get_alert_operation_log_repository().append(
"TELEGRAM_SENT",
approval_id=approval_id,
actor="system",
action_detail="approval_card",
success=True,
context={
"risk_level": risk_level,
"resource_name": resource_name,
"hit_count": hit_count,
"namespace": namespace,
},
)
except Exception as _log_e:
logger.warning("alert_op_log_telegram_sent_failed", error=str(_log_e))
except TelegramGatewayError as e:
logger.warning(
"telegram_push_failed",
approval_id=approval_id,
error=str(e),
error_type="TelegramGatewayError",
)
except Exception as e:
logger.error(
"telegram_push_unexpected_error",
approval_id=approval_id,
error=str(e),
error_type=type(e).__name__,
)
# =============================================================================
# Phase 5: HMAC Signature Verification (CISO 要求)
# =============================================================================
class HMACVerificationError(Exception):
"""HMAC 簽章驗證失敗"""
pass
async def verify_webhook_signature(
request: Request,
x_signature_256: str | None = Header(None, alias="X-Signature-256"),
) -> bool:
"""
驗證 Webhook 請求的 HMAC-SHA256 簽章
CISO 安全要求:
- 所有外部 Webhook 必須攜帶 X-Signature-256 Header
- 簽章格式: sha256=<hex_digest>
- 使用 WEBHOOK_HMAC_SECRET 進行驗證
安全鐵律 (Fail-Closed):
- 生產環境: HMAC Secret 未設定 → 直接拒絕 (不可跳過)
- 開發環境: 可跳過驗證 (僅供本地測試)
Args:
request: FastAPI Request 物件
x_signature_256: X-Signature-256 Header 值
Returns:
bool: 驗證是否通過
Raises:
HMACVerificationError: 簽章驗證失敗
"""
# ==========================================================================
# Fail-Closed 安全策略 (CISO 要求)
# ==========================================================================
if not settings.WEBHOOK_HMAC_SECRET:
# 生產環境: 強制拒絕 (Fail-Closed)
if settings.ENVIRONMENT == "prod":
logger.critical(
"hmac_secret_missing_in_production",
environment=settings.ENVIRONMENT,
message="CRITICAL: HMAC Secret not configured in production!",
)
raise HMACVerificationError(
"Critical: WEBHOOK_HMAC_SECRET missing in production environment"
)
# 開發環境: 允許跳過 (僅供本地測試)
logger.warning(
"hmac_verification_skipped_dev_only",
environment=settings.ENVIRONMENT,
reason="WEBHOOK_HMAC_SECRET not configured (dev mode only)",
)
return True
# 必須提供簽章
if not x_signature_256:
logger.warning("hmac_signature_missing")
raise HMACVerificationError("Missing X-Signature-256 header")
# 解析簽章格式
if not x_signature_256.startswith("sha256="):
raise HMACVerificationError("Invalid signature format (expected sha256=...)")
provided_signature = x_signature_256[7:] # 移除 "sha256=" 前綴
# 讀取 Request Body
body = await request.body()
# 計算預期簽章
expected_signature = hmac.new(
settings.WEBHOOK_HMAC_SECRET.encode(),
body,
hashlib.sha256,
).hexdigest()
# 常數時間比較 (防止計時攻擊)
if not hmac.compare_digest(provided_signature, expected_signature):
logger.warning(
"hmac_verification_failed",
provided=provided_signature[:16] + "...",
expected=expected_signature[:16] + "...",
)
raise HMACVerificationError("Invalid signature")
logger.info("hmac_verification_success")
return True
# generate_alert_fingerprint 已封裝為 AlertAnalyzer.generate_fingerprint (首席架構師 v1.2 2026-04-01 Asia/Taipei)
# 戰略 B: 滑動時間窗 (5 分鐘)
DEBOUNCE_WINDOW_MINUTES = 5
# =============================================================================
# Request Models
# =============================================================================
# AlertPayload 和 AlertResponse 已移至 src/models/webhook.py (R4 #129, 2026-04-01 ogt)
# 由 import 區塊頂部的 from src.models.webhook import ... 引入
# =============================================================================
# Phase 6.1: Signal Producer (Redis Streams)
# =============================================================================
# Redis Stream 常量
# 2026-03-27 ogt: 統一 Stream Key 格式 (P0 修復)
SIGNAL_STREAM_KEY = "awoooi:signals"
SIGNAL_STREAM_MAXLEN = 10000 # 防止 Stream 無限增長
class SignalPayload(BaseModel):
"""
Phase 6.1: 輕量級訊號 Payload
設計原則:
- 只做資料轉換,不做複雜運算
- 直接寫入 Redis Stream解耦處理邏輯
- 支援多來源: Prometheus, Grafana, K8s Events, 自訂
與 AlertPayload 的區別:
- SignalPayload: 輕量級,直接進 Stream
- AlertPayload: 同步處理,含 LLM 分析
"""
source: str = Field(
...,
description="訊號來源 (prometheus, grafana, k8s-events, signoz)",
)
alert_name: str = Field(
...,
description="告警名稱 (例如: HighCPUUsage, PodCrashLooping)",
)
severity: Literal["info", "warning", "critical"] = Field(
"warning",
description="嚴重度",
)
namespace: str = Field(
"default",
description="K8s Namespace",
)
target: str = Field(
...,
description="受影響目標 (Pod, Node, Service 名稱)",
)
message: str = Field(
"",
description="訊號描述",
)
labels: dict | None = Field(
None,
description="標籤 (例如: {app: harbor, team: devops})",
)
annotations: dict | None = Field(
None,
description="附加資訊 (例如: {runbook_url: ..., dashboard_url: ...})",
)
class SignalResponse(BaseModel):
"""
Signal 接收回應
"""
success: bool = Field(..., description="是否成功寫入 Stream")
message_id: str | None = Field(None, description="Redis Stream Message ID")
stream: str = Field(SIGNAL_STREAM_KEY, description="寫入的 Stream 名稱")
async def produce_signal_to_stream(signal: SignalPayload) -> str:
"""
將 Signal 寫入 Redis Stream
Phase 17 P0: Router 層違規修復 - 改用 Service 層
Returns:
str: Redis Stream Message ID
"""
# Phase 17 P0: 透過 Service 寫入 Stream
producer = get_signal_producer()
signal_data = SignalData(
source=signal.source,
alert_name=signal.alert_name,
severity=signal.severity,
namespace=signal.namespace,
target=signal.target,
message=signal.message,
labels=signal.labels,
annotations=signal.annotations,
)
return await producer.produce(signal_data)
@router.post(
"/signals",
response_model=SignalResponse,
summary="Phase 6.1: 輕量級訊號接收 (Event Bus)",
description="接收訊號並直接寫入 Redis Stream完全解耦接收與處理。",
)
async def receive_signal(
request: Request,
signal: SignalPayload,
x_signature_256: str | None = Header(None, alias="X-Signature-256"),
) -> SignalResponse:
"""
Phase 6.1: Event Bus Producer
職責:
1. HMAC 驗證 (可選,依環境)
2. 將 Signal 轉換為字典
3. XADD 寫入 stream:awoooi_signals
4. 立即返回,不做任何複雜運算
處理邏輯由 SignalWorker (Consumer) 負責。
"""
# HMAC 驗證 (與 /alerts 相同邏輯)
try:
await verify_webhook_signature(request, x_signature_256)
except HMACVerificationError as e:
logger.warning("signal_hmac_rejected", error=str(e))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"HMAC verification failed: {str(e)}",
) from e
try:
# 寫入 Redis Stream
message_id = await produce_signal_to_stream(signal)
return SignalResponse(
success=True,
message_id=message_id,
stream=SIGNAL_STREAM_KEY,
)
except Exception as e:
logger.exception("signal_produce_error", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to produce signal: {str(e)}",
) from e
# AlertAnalyzer 已移至 src/services/alert_analyzer_service.py (R4 #129, 2026-04-01 ogt)
# 由 import 區塊頂部的 from src.services.alert_analyzer_service import ... 引入
# =============================================================================
# Endpoints
# =============================================================================
@router.post(
"/alerts",
response_model=AlertResponse,
summary="接收外部告警 (戰略 B: 告警風暴收斂)",
description="接收告警並自動收斂重複告警。相同指紋的告警會聚合,避免重複呼叫 LLM 造成成本爆炸。",
)
async def receive_alert(
request: Request,
alert: AlertPayload,
background_tasks: BackgroundTasks,
x_signature_256: str | None = Header(None, alias="X-Signature-256"),
) -> AlertResponse:
"""
接收外部告警並觸發 OpenClaw AI 大腦分析
戰略 B 流程 (告警風暴收斂):
0. HMAC 簽章驗證 (CISO 要求)
1. 生成告警指紋 (namespace:deployment:alert_type Hash)
2. 查詢 DB 是否有同指紋的 pending 或 5 分鐘內記錄
3. [收斂] 如果有hit_count +1跳過 LLM
4. [新告警] 如果沒有:觸發 LLM 分析
5. 建立/更新 ApprovalRecord
"""
# ==========================================================================
# Phase 5 Step 0: HMAC 簽章驗證 (CISO 要求)
# ==========================================================================
try:
await verify_webhook_signature(request, x_signature_256)
except HMACVerificationError as e:
logger.warning("webhook_hmac_rejected", error=str(e))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"HMAC verification failed: {str(e)}",
) from e
alert_id = f"alert-{now_taipei().strftime('%Y%m%d%H%M%S')}"
# ==========================================================================
# 戰略 B Step 1: 生成告警指紋
# ==========================================================================
fingerprint = AlertAnalyzer.generate_fingerprint(alert)
logger.info(
"webhook_alert_received",
alert_id=alert_id,
alert_type=alert.alert_type,
severity=alert.severity,
source=alert.source,
target=alert.target_resource,
fingerprint=fingerprint,
)
try:
service = get_approval_service()
# ==========================================================================
# 戰略 B Step 2: 查詢是否有同指紋的現有記錄
# ==========================================================================
existing_approval = await service.find_by_fingerprint(
fingerprint=fingerprint,
debounce_minutes=DEBOUNCE_WINDOW_MINUTES,
)
if existing_approval:
# ==========================================================================
# 戰略 B Step 3: [收斂] 同指紋告警 - 跳過 LLM只更新計數
# ==========================================================================
logger.info(
"alert_converged_skip_llm",
alert_id=alert_id,
fingerprint=fingerprint,
existing_approval_id=str(existing_approval.id),
old_hit_count=existing_approval.hit_count,
message="🛡️ 告警收斂生效!跳過 LLM 分析,節省成本!",
)
# 增加 hit_count
updated_approval = await service.increment_hit_count(existing_approval.id)
if updated_approval:
# =================================================================
# 2026-03-27 ogt: 收斂告警不重複發送 Telegram只更新 hit_count
# 避免 Telegram 洗版,用戶可在 UI 查看聚合次數
# =================================================================
logger.info(
"alert_converged_telegram_skipped",
approval_id=str(updated_approval.id),
hit_count=updated_approval.hit_count,
reason="Converged alert - Telegram already sent for this fingerprint",
)
return AlertResponse(
success=True,
message=f"🛡️ 告警收斂 (x{updated_approval.hit_count}) - Telegram 已發送,跳過重複通知",
alert_id=alert_id,
approval_created=False, # 未建立新卡片
approval_id=str(updated_approval.id),
risk_level=updated_approval.risk_level.value,
suggested_action=updated_approval.action,
# 戰略 B
hit_count=updated_approval.hit_count,
converged=True, # 標記為收斂告警
)
# ==========================================================================
# 戰略 B Step 4: [新告警] 無同指紋記錄 - 進入 LLM 分析流程
# ==========================================================================
logger.info(
"alert_new_fingerprint_proceed_llm",
alert_id=alert_id,
fingerprint=fingerprint,
message="新指紋告警,啟動 LLM 分析",
)
# 準備告警上下文給 LLM
alert_context = {
"alert_type": alert.alert_type,
"severity": alert.severity,
"source": alert.source,
"target_resource": alert.target_resource,
"namespace": alert.namespace,
"message": alert.message,
"metrics": alert.metrics or {},
"labels": alert.labels or {},
}
# 呼叫 OpenClaw LLM 分析 (v7.0 含 SignOz 整合)
# 2026-03-29 ogt: 加入 Token/Cost 追蹤
openclaw = get_openclaw()
analysis_result, ai_provider, raw_response, signoz_metrics, signoz_trace_url, ai_tokens, ai_cost = await openclaw.analyze_alert(alert_context)
if analysis_result:
# LLM 分析成功
logger.info(
"llm_analysis_success",
alert_id=alert_id,
provider=ai_provider,
action_title=analysis_result.action_title,
risk_level=analysis_result.risk_level.value,
confidence=analysis_result.confidence,
)
risk_mapping = {
"low": RiskLevel.LOW,
"medium": RiskLevel.MEDIUM,
"critical": RiskLevel.CRITICAL,
}
risk_level = risk_mapping.get(analysis_result.risk_level.value, RiskLevel.MEDIUM)
impact_mapping = {
"NONE": DataImpact.NONE,
"READ_ONLY": DataImpact.READ_ONLY,
"WRITE": DataImpact.WRITE,
"DESTRUCTIVE": DataImpact.DESTRUCTIVE,
}
blast = analysis_result.blast_radius
data_impact = impact_mapping.get(blast.data_impact.value, DataImpact.NONE)
approval_create = ApprovalRequestCreate(
action=f"{analysis_result.action_title} | {analysis_result.kubectl_command}",
description=f"[AI: {ai_provider}] {analysis_result.description}",
risk_level=risk_level,
blast_radius=BlastRadius(
affected_pods=blast.affected_pods,
estimated_downtime=blast.estimated_downtime,
related_services=list(set(blast.related_services + analysis_result.affected_services)),
data_impact=data_impact,
),
dry_run_checks=[
DryRunCheck(name="AI 信心度", passed=analysis_result.confidence >= 0.7, message=f"{analysis_result.confidence:.0%}"),
DryRunCheck(name="權限驗證", passed=True, message="cluster-admin"),
DryRunCheck(name="語法驗證", passed=True, message="kubectl valid"),
DryRunCheck(name="偏差分析", passed=True, message=analysis_result.deviation_analysis[:50] if analysis_result.deviation_analysis else "N/A"),
],
requested_by=f"OpenClaw ({ai_provider})",
)
suggested_action = analysis_result.kubectl_command
else:
# LLM 失敗,降級使用靜態分析
logger.warning(
"llm_analysis_failed_fallback_static",
alert_id=alert_id,
provider=ai_provider,
)
approval_create = AlertAnalyzer.analyze(alert)
suggested_action = approval_create.action
ai_provider = "static_analyzer"
# ==========================================================================
# Step 5: 建立帶指紋的 ApprovalRecord
# ==========================================================================
approval = await service.create_approval_with_fingerprint(
request=approval_create,
fingerprint=fingerprint,
)
logger.info(
"approval_auto_created_with_fingerprint",
alert_id=alert_id,
approval_id=str(approval.id),
fingerprint=fingerprint,
status=approval.status.value,
ai_provider=ai_provider,
)
# ==========================================================================
# Step 6: 推送到 Telegram 行動戰情室 (BackgroundTasks - 非阻塞)
# ==========================================================================
# 提取 AI 仲裁欄位 (v6.0)
primary_resp = getattr(analysis_result, "primary_responsibility", "COLLAB")
ai_confidence = getattr(analysis_result, "confidence", 0.0)
# 提取 SignOz 數據 (v7.0)
signoz_rps = 0.0
signoz_rps_trend = "stable"
signoz_error_rate = 0.0
signoz_p99_latency = 0.0
signoz_latency_trend = "stable"
auto_tuning_cmd = ""
if signoz_metrics:
signoz_rps = signoz_metrics.rps
signoz_rps_trend = signoz_metrics.rps_trend
signoz_error_rate = signoz_metrics.error_rate
signoz_p99_latency = signoz_metrics.p99_latency_ms
signoz_latency_trend = signoz_metrics.latency_trend
# 提取調優指令
if analysis_result and hasattr(analysis_result, "optimization_suggestions"):
suggestions = getattr(analysis_result, "optimization_suggestions", [])
if suggestions and len(suggestions) > 0:
first_suggestion = suggestions[0]
if hasattr(first_suggestion, "kubectl_or_config"):
auto_tuning_cmd = first_suggestion.kubectl_or_config
elif isinstance(first_suggestion, dict):
auto_tuning_cmd = first_suggestion.get("kubectl_or_config", "")
background_tasks.add_task(
_push_to_telegram_background,
approval_id=str(approval.id),
risk_level=approval_create.risk_level.value,
resource_name=alert.target_resource,
root_cause=analysis_result.description if analysis_result else alert.message,
suggested_action=suggested_action,
estimated_downtime=approval_create.blast_radius.estimated_downtime,
hit_count=1,
# v6.0 AI 仲裁
primary_responsibility=primary_resp,
confidence=ai_confidence,
namespace=alert.namespace,
# v7.0 SignOz 整合
signoz_rps=signoz_rps,
signoz_rps_trend=signoz_rps_trend,
signoz_error_rate=signoz_error_rate,
signoz_p99_latency=signoz_p99_latency,
signoz_latency_trend=signoz_latency_trend,
signoz_trace_url=signoz_trace_url,
auto_tuning_command=auto_tuning_cmd,
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens=ai_tokens,
ai_cost=ai_cost,
ai_provider=ai_provider,
# 2026-04-08 ogt: 補傳 incident_id 以啟用詳情/重診/歷史按鈕
incident_id="", # /alerts 路徑尚無 incidentdetail/reanalyze/history 按鈕不顯示
)
return AlertResponse(
success=True,
message=f"告警已接收OpenClaw ({ai_provider}) 已建立待簽核卡片 (Telegram 背景推送中)",
alert_id=alert_id,
approval_created=True,
approval_id=str(approval.id),
risk_level=approval_create.risk_level.value,
suggested_action=suggested_action,
# 戰略 B
hit_count=1, # 新建立的告警,計數為 1
converged=False, # 非收斂告警
)
except Exception as e:
logger.error(
"webhook_alert_processing_failed",
alert_id=alert_id,
error=str(e),
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"告警處理失敗: {str(e)}",
) from e
# =============================================================================
# Phase 10: Alertmanager 原生格式支援 (內網免 HMAC)
# =============================================================================
class AlertmanagerAlert(BaseModel):
"""Alertmanager 單一告警"""
status: str # firing, resolved
labels: dict = {}
annotations: dict = {}
startsAt: str | None = None
endsAt: str | None = None
generatorURL: str | None = None
fingerprint: str | None = None
class AlertmanagerPayload(BaseModel):
"""Alertmanager Webhook Payload"""
version: str | None = "4"
groupKey: str | None = None
status: str # firing, resolved
receiver: str | None = None
groupLabels: dict | None = {}
commonLabels: dict | None = {}
commonAnnotations: dict | None = {}
externalURL: str | None = None
alerts: list[AlertmanagerAlert]
def is_internal_ip(client_ip: str) -> bool:
"""檢查是否為內網 IP"""
import ipaddress
try:
ip = ipaddress.ip_address(client_ip)
# 私有網段: 10.x.x.x, 172.16-31.x.x, 192.168.x.x, localhost
return ip.is_private or ip.is_loopback
except ValueError:
return False
@router.post(
"/alertmanager",
response_model=AlertResponse,
summary="Phase 10: Alertmanager 原生格式 (內網免 HMAC)",
description="接收 Alertmanager Webhook內網來源免 HMAC 驗證,觸發 LLM 分析 + Telegram 通知。",
)
async def alertmanager_webhook(
request: Request,
payload: AlertmanagerPayload,
background_tasks: BackgroundTasks,
) -> AlertResponse:
"""
接收 Alertmanager 格式告警並觸發完整 AWOOOI 流程
原始架構流程 (phase5_telemetry_architecture.md):
Alertmanager → /alertmanager → Alert Normalizer → Fingerprint → LLM → Telegram
安全策略:
- 內網 IP (192.168.x.x, 10.x.x.x, 172.x.x.x): 免 HMAC
- 外網 IP: 拒絕
"""
# 取得客戶端 IP
client_ip = request.client.host if request.client else "unknown"
forwarded_for = request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
actual_ip = forwarded_for or client_ip
# 內網檢查
if not is_internal_ip(actual_ip):
logger.warning(
"alertmanager_external_rejected",
client_ip=actual_ip,
reason="External IP must use /alerts with HMAC",
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="External sources must use /alerts endpoint with HMAC signature",
)
logger.info(
"alertmanager_webhook_received",
client_ip=actual_ip,
status=payload.status,
alert_count=len(payload.alerts),
)
# 只處理第一個 firing 告警 (避免告警風暴)
firing_alerts = [a for a in payload.alerts if a.status == "firing"]
if not firing_alerts:
return AlertResponse(
success=True,
message="No firing alerts to process",
alert_id="",
approval_created=False,
)
alert = firing_alerts[0]
alert_id = f"alert-{now_taipei().strftime('%Y%m%d%H%M%S')}"
# ==========================================================================
# Sprint 5.1 L4-2: ALERT_RECEIVED 溯源記錄 + auto_repair flag 讀取
# (2026-04-08 Claude Sonnet 4.6 Asia/TaipeiADR-062 Q9)
# ==========================================================================
_alert_labels = alert.labels or {}
_alertname_for_log = _alert_labels.get("alertname", "UnknownAlert")
# Q9: auto_repair flag — Rule=false 強制 HITL不觸發自動修復背景任務
_can_auto_repair_by_rule = _alert_labels.get("auto_repair", "true").lower() == "true"
try:
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
_op_log = get_alert_operation_log_repository()
await _op_log.append(
"ALERT_RECEIVED",
actor="alertmanager",
action_detail=f"收到告警: {_alertname_for_log}",
context={
"source": "alertmanager",
"alert_id": alert_id,
"alertname": _alertname_for_log,
"labels": _alert_labels,
"auto_repair_flag": _can_auto_repair_by_rule,
},
)
except Exception as _log_err:
logger.warning("alert_received_log_failed", error=str(_log_err))
# ==========================================================================
# Alert Normalizer: 轉換 Alertmanager 格式 → AWOOOI AlertPayload
# ==========================================================================
alertname = alert.labels.get("alertname", "UnknownAlert")
# ==========================================================================
# 2026-03-30 P1: CI/CD 告警偵測 - 配置化 (constants.py)
# 跳過 AI 仲裁,使用簡潔格式
# ==========================================================================
if is_cicd_alertname(alertname):
# CI/CD 告警 - 使用簡潔格式,不走 AI 仲裁
logger.info(
"alertmanager_cicd_detected",
alert_id=alert_id,
alertname=alertname,
message="Bypassing AI arbitration for CI/CD alert",
)
try:
telegram = get_telegram_gateway()
# 解析 CI/CD 狀態
stage = alert.labels.get("stage", "")
job_status = "success" if alert.labels.get("severity") == "info" else "running"
commit_sha = alert.labels.get("commit", "")
triggered_by = alert.labels.get("triggered_by", "CI")
workflow_url = alert.annotations.get("workflow_url", "")
summary = alert.annotations.get("summary", alertname)
await telegram.send_cicd_progress(
job_name=summary,
status=job_status,
stage=stage,
commit_sha=commit_sha,
triggered_by=triggered_by,
workflow_url=workflow_url,
)
record_alert_chain_success("alertmanager")
return AlertResponse(
success=True,
message=f"CI/CD alert processed (simple format): {alertname}",
alert_id=alert_id,
approval_created=False,
)
except Exception as e:
logger.error("cicd_telegram_failed", error=str(e), alertname=alertname)
# CI/CD 通知失敗不阻擋流程
return AlertResponse(
success=True,
message=f"CI/CD alert logged (telegram failed): {alertname}",
alert_id=alert_id,
approval_created=False,
)
# 映射 alertname → alert_type
alertname_to_type = {
"KubePodCrashLooping": "k8s_pod_crash",
"KubePodNotReady": "k8s_pod_crash",
"KubeNodeNotReady": "k8s_node_failure",
"KubeNodeUnreachable": "k8s_node_failure",
"HighCPUUsage": "high_cpu",
"HighMemoryUsage": "high_memory",
"DiskSpaceLow": "disk_full",
"SSLCertExpiringSoon": "ssl_expiry",
"TargetDown": "service_404",
}
alert_type = alertname_to_type.get(alertname, "custom")
severity_map = {"critical": "critical", "warning": "warning", "info": "info"}
severity = severity_map.get(
alert.labels.get("severity", "warning").lower(),
"warning"
)
# 優先用 component labelDocker 層告警用 component如 SentryDown → "sentry"
# 次優 podK8s 告警),再次 instanceblackbox probe最後 alertname
# (2026-04-09 Claude Sonnet 4.6 Asia/Taipei, Bug #5 修正 — affected_services 匹配 Playbook)
target_resource = (
alert.labels.get("component")
or alert.labels.get("pod")
or alert.labels.get("instance")
or alertname
)
namespace = alert.labels.get("namespace", "default")
message = alert.annotations.get("summary") or alert.annotations.get("description") or alertname
# 建立正規化的 AlertPayload
normalized_alert = AlertPayload(
alert_type=alert_type,
severity=severity,
source="alertmanager",
target_resource=target_resource,
namespace=namespace,
message=message,
metrics={},
labels=alert.labels,
)
# ==========================================================================
# 告警指紋 + 收斂
# ==========================================================================
fingerprint = AlertAnalyzer.generate_fingerprint(normalized_alert)
logger.info(
"alertmanager_normalized",
alert_id=alert_id,
alert_type=alert_type,
severity=severity,
target=target_resource,
fingerprint=fingerprint,
)
try:
service = get_approval_service()
# 查詢是否有同指紋的現有記錄
existing_approval = await service.find_by_fingerprint(
fingerprint=fingerprint,
debounce_minutes=DEBOUNCE_WINDOW_MINUTES,
)
if existing_approval:
# 收斂告警 - 跳過 LLM
logger.info(
"alertmanager_converged",
alert_id=alert_id,
fingerprint=fingerprint,
existing_id=str(existing_approval.id),
)
updated_approval = await service.increment_hit_count(existing_approval.id)
if updated_approval:
# 2026-03-27 ogt: 收斂告警不重複發送 Telegram只更新 hit_count
# 用戶可在 UI 查看聚合次數,避免 Telegram 洗版
logger.info(
"alertmanager_converged_telegram_skipped",
approval_id=str(updated_approval.id),
hit_count=updated_approval.hit_count,
reason="Converged alert - Telegram already sent for this fingerprint",
)
return AlertResponse(
success=True,
message=f"🛡️ 告警收斂 (x{updated_approval.hit_count}) - Telegram 已發送,跳過重複通知",
alert_id=alert_id,
approval_created=False,
approval_id=str(updated_approval.id),
risk_level=updated_approval.risk_level.value,
suggested_action=updated_approval.action,
hit_count=updated_approval.hit_count,
converged=True,
)
# ==========================================================================
# 新告警 - LLM 分析
# ==========================================================================
alert_context = {
"alert_type": alert_type,
"severity": severity,
"source": "alertmanager",
"target_resource": target_resource,
"namespace": namespace,
"message": message,
"metrics": {},
"labels": alert.labels,
}
# 2026-03-29 ogt: 加入 Token/Cost 追蹤
openclaw = get_openclaw()
analysis_result, ai_provider, raw_response, signoz_metrics, signoz_trace_url, ai_tokens, ai_cost = await openclaw.analyze_alert(alert_context)
if analysis_result:
# analysis_result 是 OpenClawDecision Pydantic 模型
# 轉換風險等級
risk_mapping = {
"low": RiskLevel.LOW,
"medium": RiskLevel.MEDIUM,
"critical": RiskLevel.CRITICAL,
}
risk_level = risk_mapping.get(analysis_result.risk_level.value, RiskLevel.MEDIUM)
# 提取爆炸半徑
blast = analysis_result.blast_radius
impact_mapping = {
"NONE": DataImpact.NONE,
"READ_ONLY": DataImpact.READ_ONLY,
"WRITE": DataImpact.WRITE,
"DESTRUCTIVE": DataImpact.DESTRUCTIVE,
}
data_impact = impact_mapping.get(blast.data_impact.value, DataImpact.NONE) if blast else DataImpact.NONE
# 建立 ApprovalRequestCreate (同 /alerts 流程)
approval_create = ApprovalRequestCreate(
action=f"{analysis_result.action_title} | {analysis_result.kubectl_command}",
description=f"[AI: {ai_provider}] {analysis_result.description}",
risk_level=risk_level,
blast_radius=BlastRadius(
affected_pods=blast.affected_pods if blast else 1,
estimated_downtime=blast.estimated_downtime if blast else "~30s",
related_services=list(set((blast.related_services if blast else []) + analysis_result.affected_services)),
data_impact=data_impact,
),
dry_run_checks=[
DryRunCheck(name="AI 信心度", passed=analysis_result.confidence >= 0.7, message=f"{analysis_result.confidence:.0%}"),
DryRunCheck(name="來源", passed=True, message="alertmanager"),
],
requested_by=f"OpenClaw ({ai_provider})",
)
# 使用 create_approval_with_fingerprint (同 /alerts)
approval = await service.create_approval_with_fingerprint(
request=approval_create,
fingerprint=fingerprint,
)
# ================================================================
# Incident-Approval 同步 (鐵律: 必須同時創建)
# ================================================================
incident_id = await create_incident_for_approval(
approval_id=str(approval.id),
risk_level=risk_level.value,
target_resource=target_resource,
namespace=namespace,
alert_type=alert_type,
message=message,
source="alertmanager",
alertname=alertname,
)
# 2026-04-06 ogt: Phase 26 — 回寫 incident_id 到 Approval
# 這樣 Playbook 萃取和 KM 寫入才能找到對應的 Incident
try:
await service.update_incident_id(approval.id, incident_id)
approval.incident_id = incident_id
except Exception as _meta_err:
logger.warning(
"approval_incident_id_update_failed",
approval_id=str(approval.id),
incident_id=incident_id,
error=str(_meta_err),
)
root_cause = analysis_result.description or message
estimated_downtime = blast.estimated_downtime if blast else "~30s"
primary_responsibility = analysis_result.primary_responsibility or "COLLAB"
confidence = analysis_result.confidence
# ================================================================
# 2026-04-05 ogt: 自動修復評估 (ADR-058 閉環)
# Incident 建立後立即評估是否可自動修復
# P2 以下 + 高品質 Playbook + 低風險 → 背景自動執行
# Sprint 5.1 Q9: auto_repair=false 旗標 → 強制 HITL不觸發背景任務
# (2026-04-08 Claude Sonnet 4.6 Asia/TaipeiADR-062)
# ================================================================
if _can_auto_repair_by_rule:
background_tasks.add_task(
_try_auto_repair_background,
incident_id=incident_id,
approval_id=str(approval.id),
alert_type=alert_type,
target_resource=target_resource,
namespace=namespace,
)
else:
# auto_repair=false → 記錄 GUARDRAIL_BLOCKED不觸發自動修復
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
_op_log_rule = get_alert_operation_log_repository()
background_tasks.add_task(
_op_log_rule.append,
"GUARDRAIL_BLOCKED",
incident_id=incident_id,
approval_id=str(approval.id),
actor="prometheus-rule",
action_detail=f"Prometheus rule 設定 auto_repair=false強制人工審核: {alertname}",
success=False,
context={"alertname": alertname, "auto_repair_flag": False},
)
# 推送 Telegram
background_tasks.add_task(
_push_to_telegram_background,
approval_id=str(approval.id),
risk_level=risk_level.value,
resource_name=target_resource,
root_cause=root_cause,
suggested_action=(analysis_result.kubectl_command or "").strip() or analysis_result.suggested_action.value,
estimated_downtime=estimated_downtime,
hit_count=1,
primary_responsibility=primary_responsibility,
confidence=confidence,
namespace=namespace,
signoz_rps=signoz_metrics.rps if signoz_metrics else 0,
signoz_rps_trend=signoz_metrics.rps_trend if signoz_metrics else "stable",
signoz_error_rate=signoz_metrics.error_rate if signoz_metrics else 0,
signoz_p99_latency=signoz_metrics.p99_latency_ms if signoz_metrics else 0,
signoz_latency_trend=signoz_metrics.latency_trend if signoz_metrics else "stable",
signoz_trace_url=signoz_trace_url or "",
# 2026-03-29 ogt: AI Token/Cost 追蹤
ai_tokens=ai_tokens,
ai_cost=ai_cost,
ai_provider=ai_provider,
# 2026-04-08 ogt: 補傳 incident_id 以啟用詳情/重診/歷史按鈕
incident_id=incident_id,
)
record_alert_chain_success("alertmanager")
return AlertResponse(
success=True,
message=f"✅ LLM 分析完成 (via {ai_provider})",
alert_id=alert_id,
approval_created=True,
approval_id=str(approval.id),
risk_level=risk_level.value,
suggested_action=approval_create.action,
hit_count=1,
converged=False,
)
else:
# LLM 失敗 - 使用預設值
fallback_create = ApprovalRequestCreate(
action="OBSERVE",
description=f"[LLM Failed] {message}",
risk_level=RiskLevel.MEDIUM,
blast_radius=BlastRadius(
affected_pods=1,
estimated_downtime="unknown",
related_services=[],
data_impact=DataImpact.NONE,
),
dry_run_checks=[],
requested_by="OpenClaw (fallback)",
)
approval = await service.create_approval_with_fingerprint(
request=fallback_create,
fingerprint=fingerprint,
)
# ================================================================
# Incident-Approval 同步 (鐵律: 即使 LLM 失敗也必須創建)
# ================================================================
fallback_incident_id = await create_incident_for_approval(
approval_id=str(approval.id),
risk_level="medium",
target_resource=target_resource,
namespace=namespace,
alert_type=alert_type,
message=message,
source="alertmanager",
alertname=alertname,
)
background_tasks.add_task(
_push_to_telegram_background,
approval_id=str(approval.id),
risk_level="medium",
resource_name=target_resource,
root_cause=message,
suggested_action="OBSERVE",
estimated_downtime="unknown",
hit_count=1,
primary_responsibility="HUMAN",
confidence=0.0,
namespace=namespace,
incident_id=fallback_incident_id,
)
return AlertResponse(
success=True,
message="⚠️ LLM 分析失敗,使用預設值",
alert_id=alert_id,
approval_created=True,
approval_id=str(approval.id),
risk_level="medium",
suggested_action="OBSERVE",
hit_count=1,
converged=False,
)
except Exception as e:
logger.error("alertmanager_error", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to process alert: {str(e)}",
) from e
@router.get(
"/health",
summary="Webhook 健康檢查",
)
async def webhook_health() -> dict:
"""Webhook 服務健康檢查"""
return {
"status": "healthy",
"service": "AWOOOI Webhook Gateway",
"supported_alert_types": [
"k8s_node_failure",
"k8s_pod_crash",
"db_connection_timeout",
"service_404",
"high_cpu",
"high_memory",
"disk_full",
"ssl_expiry",
"custom",
],
}