routes/alert_routes.py 的 send_telegram_message() 是 Alertmanager webhook 通用告警 出口,原本直接 POST sendMessage 並對每個 chat_id 迴圈。改走 services.event_router. dispatch_sync() 統一入口(event_type=alertmanager_webhook, severity=alert)。 行為變化: - 移除手動 chat_id 迴圈,改傳 admin_chat_ids 給 EventRouter(內部仍逐一發) - 新增 ADR-012 三層分流(L0/L1/L2)+ JSONL queue replay 失敗重送 - parse_mode 參數保留簽名向後相容,但實際統一走 EventRouter HTML 模板 - caller(Alertmanager webhook handler)的 try/except 結構未動 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
488 lines
17 KiB
Python
488 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
系統告警路由模組
|
||
處理 Prometheus Alertmanager 的 Webhook 通知
|
||
並執行自動排查和修復
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import subprocess
|
||
import time
|
||
import re
|
||
from datetime import datetime, timezone, timedelta
|
||
from flask import Blueprint, request, jsonify, current_app
|
||
from functools import wraps
|
||
import logging
|
||
import requests
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 容器名稱驗證正則表達式 (只允許字母、數字、連字號、底線、點)
|
||
CONTAINER_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9._-]*$')
|
||
|
||
def is_valid_container_name(name: str) -> bool:
|
||
"""驗證容器名稱是否安全 (防止命令注入)"""
|
||
if not name or len(name) > 128:
|
||
return False
|
||
return bool(CONTAINER_NAME_PATTERN.match(name))
|
||
|
||
# 豁免 CSRF 的路由列表
|
||
CSRF_EXEMPT_ROUTES = [
|
||
'/api/alert/webhook',
|
||
'/api/alert/test',
|
||
'/api/alert/fix'
|
||
]
|
||
|
||
# 時區設定
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# Blueprint 定義
|
||
alert_bp = Blueprint('alert', __name__)
|
||
|
||
|
||
@alert_bp.before_app_request
|
||
def csrf_exempt_for_alerts():
|
||
"""豁免告警相關路由的 CSRF 檢查"""
|
||
if request.path in CSRF_EXEMPT_ROUTES:
|
||
# 標記此請求豁免 CSRF
|
||
request.csrf_exempt = True
|
||
|
||
# 告警 Webhook 認證
|
||
ALERT_WEBHOOK_USER = os.getenv('ALERT_WEBHOOK_USER', 'alertmanager')
|
||
ALERT_WEBHOOK_PASSWORD = os.getenv('ALERT_WEBHOOK_PASSWORD', '')
|
||
|
||
if not ALERT_WEBHOOK_PASSWORD:
|
||
logger.warning(
|
||
'[SECURITY] ALERT_WEBHOOK_PASSWORD is not set. '
|
||
'All webhook requests will be rejected. Set this environment variable.'
|
||
)
|
||
|
||
# Telegram 設定
|
||
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
|
||
TELEGRAM_CHAT_IDS = json.loads(os.getenv('TELEGRAM_CHAT_IDS', '[]'))
|
||
|
||
# 告警歷史記錄(記憶體快取)
|
||
_alert_history = []
|
||
_MAX_ALERT_HISTORY = 100
|
||
|
||
# 自動修復設定
|
||
AUTO_FIX_ENABLED = os.getenv('AUTO_FIX_ENABLED', 'true').lower() == 'true'
|
||
AUTO_FIX_COOLDOWN = 300 # 同一容器 5 分鐘內不重複修復
|
||
_last_fix_time = {}
|
||
|
||
# issue_type allowlist(防止任意值傳入 auto_fix_container)
|
||
ALLOWED_ISSUE_TYPES = frozenset({
|
||
'memory', 'container_memory',
|
||
'cpu', 'container_cpu',
|
||
})
|
||
|
||
|
||
def check_alert_auth(f):
|
||
"""告警 Webhook 認證裝飾器(fail-secure:密碼未設定時拒絕所有請求)"""
|
||
@wraps(f)
|
||
def decorated(*args, **kwargs):
|
||
# fail-secure: 若密碼未設定,拒絕一切存取
|
||
if not ALERT_WEBHOOK_PASSWORD:
|
||
return jsonify({'error': 'Server misconfiguration: auth not configured'}), 503
|
||
auth = request.authorization
|
||
if not auth:
|
||
return jsonify({'error': 'Missing authentication'}), 401
|
||
if auth.username != ALERT_WEBHOOK_USER or auth.password != ALERT_WEBHOOK_PASSWORD:
|
||
return jsonify({'error': 'Invalid credentials'}), 401
|
||
return f(*args, **kwargs)
|
||
return decorated
|
||
|
||
|
||
def send_telegram_message(message: str, parse_mode: str = 'HTML') -> bool:
|
||
"""
|
||
發送 Telegram 訊息
|
||
|
||
ADR-019 Phase 5: 改走 EventRouter 統一入口,享 retry + JSONL queue replay。
|
||
parse_mode 參數保留向後相容(EventRouter 內部統一 HTML,舊 caller 若指定
|
||
其他 mode 會被忽略;此 route 原本就只用 HTML,影響面=0)。
|
||
"""
|
||
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_IDS:
|
||
logger.warning("[Alert] Telegram 未設定,無法發送告警")
|
||
return False
|
||
|
||
try:
|
||
from services.event_router import dispatch_sync
|
||
result = dispatch_sync(event={
|
||
"event_type": "alertmanager_webhook",
|
||
"severity": "alert",
|
||
"source": "Alertmanager.Webhook",
|
||
"title": "Alertmanager 告警",
|
||
"summary": message[:400],
|
||
"status": "received",
|
||
"payload": {"raw_message": message, "parse_mode": parse_mode},
|
||
}, admin_chat_ids=TELEGRAM_CHAT_IDS)
|
||
delivered = bool(result.get("delivered"))
|
||
if not delivered:
|
||
logger.error(f"[Alert] EventRouter 告警未送達: {result.get('errors')}")
|
||
else:
|
||
logger.info(f"[Alert] EventRouter 告警已派送 sent={result.get('sent')}")
|
||
return delivered
|
||
except Exception as e:
|
||
logger.error(f"[Alert] EventRouter dispatch 失敗: {e}")
|
||
return False
|
||
|
||
|
||
def format_alert_message(alert: dict, status: str) -> str:
|
||
"""格式化告警訊息"""
|
||
labels = alert.get('labels', {})
|
||
annotations = alert.get('annotations', {})
|
||
|
||
severity = labels.get('severity', 'unknown')
|
||
alertname = labels.get('alertname', 'Unknown')
|
||
category = labels.get('category', '')
|
||
|
||
# 嚴重程度圖示
|
||
severity_icons = {
|
||
'critical': '🔴',
|
||
'warning': '🟡',
|
||
'info': '🔵'
|
||
}
|
||
icon = severity_icons.get(severity, '⚪')
|
||
|
||
# 狀態圖示
|
||
status_icon = '🚨' if status == 'firing' else '✅'
|
||
status_text = '觸發' if status == 'firing' else '恢復'
|
||
|
||
# 時間
|
||
now = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 取得詳細資訊
|
||
summary = annotations.get('summary', alertname)
|
||
description = annotations.get('description', '')
|
||
value = annotations.get('value', '')
|
||
container = annotations.get('container', labels.get('name', ''))
|
||
|
||
# 組裝訊息
|
||
message = f"""
|
||
{status_icon} <b>系統告警{status_text}</b> {icon}
|
||
|
||
<b>告警名稱:</b>{summary}
|
||
<b>嚴重程度:</b>{severity.upper()}
|
||
<b>類別:</b>{category}
|
||
"""
|
||
|
||
if container:
|
||
message += f"<b>容器:</b>{container}\n"
|
||
|
||
if value:
|
||
message += f"<b>當前值:</b>{value}\n"
|
||
|
||
if description:
|
||
message += f"\n<b>詳細說明:</b>\n{description}\n"
|
||
|
||
message += f"\n<b>時間:</b>{now}"
|
||
|
||
return message.strip()
|
||
|
||
|
||
def analyze_high_load() -> dict:
|
||
"""分析系統高負載原因"""
|
||
result = {
|
||
'timestamp': datetime.now(TAIPEI_TZ).isoformat(),
|
||
'high_cpu_containers': [],
|
||
'high_memory_containers': [],
|
||
'system_info': {},
|
||
'recommendations': []
|
||
}
|
||
|
||
try:
|
||
# 取得所有容器的資源使用情況 (使用列表參數避免 shell 注入)
|
||
cmd = ["docker", "stats", "--no-stream", "--format", "{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"]
|
||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # nosec B603
|
||
|
||
if proc.returncode == 0:
|
||
for line in proc.stdout.strip().split('\n'):
|
||
if not line:
|
||
continue
|
||
parts = line.split('\t')
|
||
if len(parts) >= 4:
|
||
name = parts[0]
|
||
cpu = float(parts[1].replace('%', ''))
|
||
mem_usage = parts[2]
|
||
mem_pct = float(parts[3].replace('%', ''))
|
||
|
||
if cpu > 50:
|
||
result['high_cpu_containers'].append({
|
||
'name': name,
|
||
'cpu_percent': cpu,
|
||
'memory_usage': mem_usage
|
||
})
|
||
|
||
if mem_pct > 50:
|
||
result['high_memory_containers'].append({
|
||
'name': name,
|
||
'memory_percent': mem_pct,
|
||
'memory_usage': mem_usage
|
||
})
|
||
|
||
# 排序
|
||
result['high_cpu_containers'].sort(key=lambda x: x['cpu_percent'], reverse=True)
|
||
result['high_memory_containers'].sort(key=lambda x: x['memory_percent'], reverse=True)
|
||
|
||
# 生成建議
|
||
for container in result['high_cpu_containers'][:3]:
|
||
result['recommendations'].append(
|
||
f"容器 {container['name']} CPU 使用率 {container['cpu_percent']:.1f}%,建議檢查或重啟"
|
||
)
|
||
|
||
for container in result['high_memory_containers'][:3]:
|
||
result['recommendations'].append(
|
||
f"容器 {container['name']} 記憶體使用 {container['memory_usage']},可能有記憶體洩漏"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Alert] 分析高負載失敗: {e}")
|
||
result['error'] = str(e)
|
||
|
||
return result
|
||
|
||
|
||
def auto_fix_container(container_name: str, issue_type: str) -> dict:
|
||
"""自動修復容器問題"""
|
||
result = {
|
||
'container': container_name,
|
||
'issue_type': issue_type,
|
||
'action': None,
|
||
'success': False,
|
||
'message': ''
|
||
}
|
||
|
||
# 安全性檢查: 驗證容器名稱格式 (防止命令注入)
|
||
if not is_valid_container_name(container_name):
|
||
result['message'] = "無效的容器名稱格式"
|
||
return result
|
||
|
||
# 檢查冷卻時間
|
||
key = f"{container_name}_{issue_type}"
|
||
now = time.time()
|
||
if key in _last_fix_time:
|
||
elapsed = now - _last_fix_time[key]
|
||
if elapsed < AUTO_FIX_COOLDOWN:
|
||
result['message'] = f"冷卻中,{int(AUTO_FIX_COOLDOWN - elapsed)} 秒後可再次修復"
|
||
return result
|
||
|
||
try:
|
||
# 排除關鍵容器
|
||
protected_containers = ['momo-postgres', 'momo-prometheus', 'momo-grafana']
|
||
if container_name in protected_containers:
|
||
result['message'] = "此為受保護的關鍵容器,不自動重啟"
|
||
return result
|
||
|
||
# 根據問題類型決定修復動作
|
||
if issue_type in ['container_memory', 'memory']:
|
||
# 記憶體問題 - 重啟容器 (使用列表參數避免 shell 注入)
|
||
result['action'] = 'restart'
|
||
cmd = ["docker", "restart", container_name]
|
||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) # nosec B603
|
||
|
||
if proc.returncode == 0:
|
||
result['success'] = True
|
||
result['message'] = f"已重啟容器 {container_name}"
|
||
_last_fix_time[key] = now
|
||
else:
|
||
result['message'] = f"重啟失敗: {proc.stderr}"
|
||
|
||
elif issue_type in ['container_cpu', 'cpu']:
|
||
# CPU 問題 - 先分析,如果持續則重啟
|
||
# 取得容器內的高 CPU 進程 (使用列表參數避免 shell 注入)
|
||
cmd = ["docker", "exec", container_name, "ps", "aux", "--sort=-%cpu"]
|
||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # nosec B603
|
||
# 取前 5 行
|
||
output_lines = proc.stdout.strip().split('\n')[:5]
|
||
|
||
if proc.returncode == 0:
|
||
result['action'] = 'analyze'
|
||
result['message'] = f"容器內 CPU 使用情況:\n" + '\n'.join(output_lines)
|
||
result['success'] = True
|
||
else:
|
||
# 如果無法分析,嘗試重啟
|
||
result['action'] = 'restart'
|
||
cmd = ["docker", "restart", container_name]
|
||
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) # nosec B603
|
||
result['success'] = proc.returncode == 0
|
||
result['message'] = f"已重啟容器 {container_name}" if result['success'] else f"重啟失敗: {proc.stderr}"
|
||
if result['success']:
|
||
_last_fix_time[key] = now
|
||
|
||
except subprocess.TimeoutExpired:
|
||
result['message'] = "操作超時"
|
||
except Exception as e:
|
||
result['message'] = f"修復錯誤: {str(e)}"
|
||
|
||
return result
|
||
|
||
|
||
@alert_bp.route('/api/alert/webhook', methods=['POST'])
|
||
@check_alert_auth
|
||
def alert_webhook():
|
||
"""
|
||
接收 Alertmanager Webhook 通知
|
||
自動分析問題並嘗試修復
|
||
"""
|
||
try:
|
||
data = request.get_json()
|
||
if not data:
|
||
return jsonify({'error': 'No data received'}), 400
|
||
|
||
status = data.get('status', 'unknown')
|
||
alerts = data.get('alerts', [])
|
||
|
||
logger.info(f"[Alert] 收到告警 Webhook: status={status}, alerts={len(alerts)}")
|
||
|
||
processed_alerts = []
|
||
fix_results = []
|
||
|
||
for alert in alerts:
|
||
alert_status = alert.get('status', status)
|
||
labels = alert.get('labels', {})
|
||
category = labels.get('category', '')
|
||
severity = labels.get('severity', '')
|
||
container_name = alert.get('annotations', {}).get('container', labels.get('name', ''))
|
||
|
||
# 格式化並發送 Telegram 通知
|
||
message = format_alert_message(alert, alert_status)
|
||
send_telegram_message(message)
|
||
|
||
# 記錄告警歷史
|
||
alert_record = {
|
||
'timestamp': datetime.now(TAIPEI_TZ).isoformat(),
|
||
'status': alert_status,
|
||
'alertname': labels.get('alertname'),
|
||
'severity': severity,
|
||
'category': category,
|
||
'container': container_name,
|
||
'message': message
|
||
}
|
||
_alert_history.append(alert_record)
|
||
if len(_alert_history) > _MAX_ALERT_HISTORY:
|
||
_alert_history.pop(0)
|
||
|
||
processed_alerts.append(alert_record)
|
||
|
||
# 如果是觸發狀態且啟用自動修復
|
||
if alert_status == 'firing' and AUTO_FIX_ENABLED:
|
||
# 分析問題
|
||
if severity in ['warning', 'critical']:
|
||
analysis = analyze_high_load()
|
||
|
||
# 發送分析報告
|
||
if analysis.get('recommendations'):
|
||
analysis_msg = "📊 <b>系統分析報告</b>\n\n"
|
||
for rec in analysis['recommendations'][:5]:
|
||
analysis_msg += f"• {rec}\n"
|
||
send_telegram_message(analysis_msg)
|
||
|
||
# 自動修復容器問題
|
||
if container_name and category in ['container_cpu', 'container_memory']:
|
||
fix_result = auto_fix_container(container_name, category)
|
||
fix_results.append(fix_result)
|
||
|
||
# 發送修復結果通知
|
||
if fix_result['action']:
|
||
fix_msg = f"🔧 <b>自動修復</b>\n\n"
|
||
fix_msg += f"容器: {fix_result['container']}\n"
|
||
fix_msg += f"動作: {fix_result['action']}\n"
|
||
fix_msg += f"結果: {'成功 ✅' if fix_result['success'] else '失敗 ❌'}\n"
|
||
fix_msg += f"說明: {fix_result['message']}"
|
||
send_telegram_message(fix_msg)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'processed': len(processed_alerts),
|
||
'fix_results': fix_results
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Alert] Webhook 處理錯誤: {e}")
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@alert_bp.route('/api/alert/history')
|
||
@check_alert_auth
|
||
def get_alert_history():
|
||
"""取得告警歷史記錄(需認證,防止系統資訊外洩)"""
|
||
limit = request.args.get('limit', 50, type=int)
|
||
# 限制 limit 範圍,防止記憶體 DoS
|
||
limit = max(1, min(limit, 200))
|
||
return jsonify({
|
||
'success': True,
|
||
'data': _alert_history[-limit:][::-1] # 最新的在前
|
||
})
|
||
|
||
|
||
@alert_bp.route('/api/alert/analyze')
|
||
@check_alert_auth
|
||
def analyze_system():
|
||
"""手動觸發系統分析(需認證,防止系統資訊外洩)"""
|
||
analysis = analyze_high_load()
|
||
return jsonify({
|
||
'success': True,
|
||
'data': analysis
|
||
})
|
||
|
||
|
||
@alert_bp.route('/api/alert/fix', methods=['POST'])
|
||
@check_alert_auth
|
||
def manual_fix():
|
||
"""手動觸發修復(需認證 + issue_type allowlist)"""
|
||
data = request.get_json() or {}
|
||
container_name = data.get('container')
|
||
issue_type = data.get('issue_type', 'memory')
|
||
|
||
if not container_name:
|
||
return jsonify({'error': 'Missing container name'}), 400
|
||
|
||
# Security: issue_type 只允許已知有效值
|
||
if issue_type not in ALLOWED_ISSUE_TYPES:
|
||
return jsonify({
|
||
'error': f'Invalid issue_type: {issue_type}. '
|
||
f'Allowed: {sorted(ALLOWED_ISSUE_TYPES)}'
|
||
}), 400
|
||
|
||
result = auto_fix_container(container_name, issue_type)
|
||
return jsonify({
|
||
'success': result['success'],
|
||
'data': result
|
||
})
|
||
|
||
|
||
@alert_bp.route('/api/alert/test', methods=['POST'])
|
||
def test_alert():
|
||
"""測試告警通知"""
|
||
message = """
|
||
🧪 <b>測試告警</b>
|
||
|
||
這是一則測試告警訊息。
|
||
如果您收到此訊息,表示告警系統正常運作。
|
||
|
||
<b>時間:</b>{}
|
||
""".format(datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'))
|
||
|
||
success = send_telegram_message(message)
|
||
return jsonify({
|
||
'success': success,
|
||
'message': '測試訊息已發送' if success else '發送失敗,請檢查 Telegram 設定'
|
||
})
|
||
|
||
|
||
@alert_bp.route('/api/alert/status')
|
||
def alert_status():
|
||
"""取得告警系統狀態"""
|
||
return jsonify({
|
||
'success': True,
|
||
'data': {
|
||
'telegram_configured': bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_IDS),
|
||
'auto_fix_enabled': AUTO_FIX_ENABLED,
|
||
'auto_fix_cooldown': AUTO_FIX_COOLDOWN,
|
||
'alert_history_count': len(_alert_history),
|
||
'last_fix_times': _last_fix_time
|
||
}
|
||
})
|