Files
ewoooc/routes/alert_routes.py
OoO ea78d0814b refactor(telegram): migrate alert_routes sender to EventRouter (ADR-019 Phase 5)
routes/alert_routes.py 的 send_telegram_message() 是 Alertmanager webhook 通用告警
出口,原本直接 POST sendMessage 並對每個 chat_id 迴圈。改走 services.event_router.
dispatch_sync() 統一入口(event_type=alertmanager_webhook, severity=alert)。

行為變化:
- 移除手動 chat_id 迴圈,改傳 admin_chat_ids 給 EventRouter(內部仍逐一發)
- 新增 ADR-012 三層分流(L0/L1/L2)+ JSONL queue replay 失敗重送
- parse_mode 參數保留簽名向後相容,但實際統一走 EventRouter HTML 模板
- caller(Alertmanager webhook handler)的 try/except 結構未動

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 13:09:34 +08:00

488 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系統告警路由模組
處理 Prometheus Alertmanager 的 Webhook 通知
並執行自動排查和修復
"""
import os
import json
import subprocess
import time
import re
from datetime import datetime, timezone, timedelta
from flask import Blueprint, request, jsonify, current_app
from functools import wraps
import logging
import requests
logger = logging.getLogger(__name__)
# 容器名稱驗證正則表達式 (只允許字母、數字、連字號、底線、點)
CONTAINER_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9._-]*$')
def is_valid_container_name(name: str) -> bool:
"""驗證容器名稱是否安全 (防止命令注入)"""
if not name or len(name) > 128:
return False
return bool(CONTAINER_NAME_PATTERN.match(name))
# 豁免 CSRF 的路由列表
CSRF_EXEMPT_ROUTES = [
'/api/alert/webhook',
'/api/alert/test',
'/api/alert/fix'
]
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Blueprint 定義
alert_bp = Blueprint('alert', __name__)
@alert_bp.before_app_request
def csrf_exempt_for_alerts():
"""豁免告警相關路由的 CSRF 檢查"""
if request.path in CSRF_EXEMPT_ROUTES:
# 標記此請求豁免 CSRF
request.csrf_exempt = True
# 告警 Webhook 認證
ALERT_WEBHOOK_USER = os.getenv('ALERT_WEBHOOK_USER', 'alertmanager')
ALERT_WEBHOOK_PASSWORD = os.getenv('ALERT_WEBHOOK_PASSWORD', '')
if not ALERT_WEBHOOK_PASSWORD:
logger.warning(
'[SECURITY] ALERT_WEBHOOK_PASSWORD is not set. '
'All webhook requests will be rejected. Set this environment variable.'
)
# Telegram 設定
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
TELEGRAM_CHAT_IDS = json.loads(os.getenv('TELEGRAM_CHAT_IDS', '[]'))
# 告警歷史記錄(記憶體快取)
_alert_history = []
_MAX_ALERT_HISTORY = 100
# 自動修復設定
AUTO_FIX_ENABLED = os.getenv('AUTO_FIX_ENABLED', 'true').lower() == 'true'
AUTO_FIX_COOLDOWN = 300 # 同一容器 5 分鐘內不重複修復
_last_fix_time = {}
# issue_type allowlist防止任意值傳入 auto_fix_container
ALLOWED_ISSUE_TYPES = frozenset({
'memory', 'container_memory',
'cpu', 'container_cpu',
})
def check_alert_auth(f):
"""告警 Webhook 認證裝飾器fail-secure密碼未設定時拒絕所有請求"""
@wraps(f)
def decorated(*args, **kwargs):
# fail-secure: 若密碼未設定,拒絕一切存取
if not ALERT_WEBHOOK_PASSWORD:
return jsonify({'error': 'Server misconfiguration: auth not configured'}), 503
auth = request.authorization
if not auth:
return jsonify({'error': 'Missing authentication'}), 401
if auth.username != ALERT_WEBHOOK_USER or auth.password != ALERT_WEBHOOK_PASSWORD:
return jsonify({'error': 'Invalid credentials'}), 401
return f(*args, **kwargs)
return decorated
def send_telegram_message(message: str, parse_mode: str = 'HTML') -> bool:
"""
發送 Telegram 訊息
ADR-019 Phase 5: 改走 EventRouter 統一入口,享 retry + JSONL queue replay。
parse_mode 參數保留向後相容EventRouter 內部統一 HTML舊 caller 若指定
其他 mode 會被忽略;此 route 原本就只用 HTML影響面=0
"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_IDS:
logger.warning("[Alert] Telegram 未設定,無法發送告警")
return False
try:
from services.event_router import dispatch_sync
result = dispatch_sync(event={
"event_type": "alertmanager_webhook",
"severity": "alert",
"source": "Alertmanager.Webhook",
"title": "Alertmanager 告警",
"summary": message[:400],
"status": "received",
"payload": {"raw_message": message, "parse_mode": parse_mode},
}, admin_chat_ids=TELEGRAM_CHAT_IDS)
delivered = bool(result.get("delivered"))
if not delivered:
logger.error(f"[Alert] EventRouter 告警未送達: {result.get('errors')}")
else:
logger.info(f"[Alert] EventRouter 告警已派送 sent={result.get('sent')}")
return delivered
except Exception as e:
logger.error(f"[Alert] EventRouter dispatch 失敗: {e}")
return False
def format_alert_message(alert: dict, status: str) -> str:
"""格式化告警訊息"""
labels = alert.get('labels', {})
annotations = alert.get('annotations', {})
severity = labels.get('severity', 'unknown')
alertname = labels.get('alertname', 'Unknown')
category = labels.get('category', '')
# 嚴重程度圖示
severity_icons = {
'critical': '🔴',
'warning': '🟡',
'info': '🔵'
}
icon = severity_icons.get(severity, '')
# 狀態圖示
status_icon = '🚨' if status == 'firing' else ''
status_text = '觸發' if status == 'firing' else '恢復'
# 時間
now = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S')
# 取得詳細資訊
summary = annotations.get('summary', alertname)
description = annotations.get('description', '')
value = annotations.get('value', '')
container = annotations.get('container', labels.get('name', ''))
# 組裝訊息
message = f"""
{status_icon} <b>系統告警{status_text}</b> {icon}
<b>告警名稱:</b>{summary}
<b>嚴重程度:</b>{severity.upper()}
<b>類別:</b>{category}
"""
if container:
message += f"<b>容器:</b>{container}\n"
if value:
message += f"<b>當前值:</b>{value}\n"
if description:
message += f"\n<b>詳細說明:</b>\n{description}\n"
message += f"\n<b>時間:</b>{now}"
return message.strip()
def analyze_high_load() -> dict:
"""分析系統高負載原因"""
result = {
'timestamp': datetime.now(TAIPEI_TZ).isoformat(),
'high_cpu_containers': [],
'high_memory_containers': [],
'system_info': {},
'recommendations': []
}
try:
# 取得所有容器的資源使用情況 (使用列表參數避免 shell 注入)
cmd = ["docker", "stats", "--no-stream", "--format", "{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}"]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # nosec B603
if proc.returncode == 0:
for line in proc.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) >= 4:
name = parts[0]
cpu = float(parts[1].replace('%', ''))
mem_usage = parts[2]
mem_pct = float(parts[3].replace('%', ''))
if cpu > 50:
result['high_cpu_containers'].append({
'name': name,
'cpu_percent': cpu,
'memory_usage': mem_usage
})
if mem_pct > 50:
result['high_memory_containers'].append({
'name': name,
'memory_percent': mem_pct,
'memory_usage': mem_usage
})
# 排序
result['high_cpu_containers'].sort(key=lambda x: x['cpu_percent'], reverse=True)
result['high_memory_containers'].sort(key=lambda x: x['memory_percent'], reverse=True)
# 生成建議
for container in result['high_cpu_containers'][:3]:
result['recommendations'].append(
f"容器 {container['name']} CPU 使用率 {container['cpu_percent']:.1f}%,建議檢查或重啟"
)
for container in result['high_memory_containers'][:3]:
result['recommendations'].append(
f"容器 {container['name']} 記憶體使用 {container['memory_usage']},可能有記憶體洩漏"
)
except Exception as e:
logger.error(f"[Alert] 分析高負載失敗: {e}")
result['error'] = str(e)
return result
def auto_fix_container(container_name: str, issue_type: str) -> dict:
"""自動修復容器問題"""
result = {
'container': container_name,
'issue_type': issue_type,
'action': None,
'success': False,
'message': ''
}
# 安全性檢查: 驗證容器名稱格式 (防止命令注入)
if not is_valid_container_name(container_name):
result['message'] = "無效的容器名稱格式"
return result
# 檢查冷卻時間
key = f"{container_name}_{issue_type}"
now = time.time()
if key in _last_fix_time:
elapsed = now - _last_fix_time[key]
if elapsed < AUTO_FIX_COOLDOWN:
result['message'] = f"冷卻中,{int(AUTO_FIX_COOLDOWN - elapsed)} 秒後可再次修復"
return result
try:
# 排除關鍵容器
protected_containers = ['momo-postgres', 'momo-prometheus', 'momo-grafana']
if container_name in protected_containers:
result['message'] = "此為受保護的關鍵容器,不自動重啟"
return result
# 根據問題類型決定修復動作
if issue_type in ['container_memory', 'memory']:
# 記憶體問題 - 重啟容器 (使用列表參數避免 shell 注入)
result['action'] = 'restart'
cmd = ["docker", "restart", container_name]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) # nosec B603
if proc.returncode == 0:
result['success'] = True
result['message'] = f"已重啟容器 {container_name}"
_last_fix_time[key] = now
else:
result['message'] = f"重啟失敗: {proc.stderr}"
elif issue_type in ['container_cpu', 'cpu']:
# CPU 問題 - 先分析,如果持續則重啟
# 取得容器內的高 CPU 進程 (使用列表參數避免 shell 注入)
cmd = ["docker", "exec", container_name, "ps", "aux", "--sort=-%cpu"]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # nosec B603
# 取前 5 行
output_lines = proc.stdout.strip().split('\n')[:5]
if proc.returncode == 0:
result['action'] = 'analyze'
result['message'] = f"容器內 CPU 使用情況:\n" + '\n'.join(output_lines)
result['success'] = True
else:
# 如果無法分析,嘗試重啟
result['action'] = 'restart'
cmd = ["docker", "restart", container_name]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) # nosec B603
result['success'] = proc.returncode == 0
result['message'] = f"已重啟容器 {container_name}" if result['success'] else f"重啟失敗: {proc.stderr}"
if result['success']:
_last_fix_time[key] = now
except subprocess.TimeoutExpired:
result['message'] = "操作超時"
except Exception as e:
result['message'] = f"修復錯誤: {str(e)}"
return result
@alert_bp.route('/api/alert/webhook', methods=['POST'])
@check_alert_auth
def alert_webhook():
"""
接收 Alertmanager Webhook 通知
自動分析問題並嘗試修復
"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data received'}), 400
status = data.get('status', 'unknown')
alerts = data.get('alerts', [])
logger.info(f"[Alert] 收到告警 Webhook: status={status}, alerts={len(alerts)}")
processed_alerts = []
fix_results = []
for alert in alerts:
alert_status = alert.get('status', status)
labels = alert.get('labels', {})
category = labels.get('category', '')
severity = labels.get('severity', '')
container_name = alert.get('annotations', {}).get('container', labels.get('name', ''))
# 格式化並發送 Telegram 通知
message = format_alert_message(alert, alert_status)
send_telegram_message(message)
# 記錄告警歷史
alert_record = {
'timestamp': datetime.now(TAIPEI_TZ).isoformat(),
'status': alert_status,
'alertname': labels.get('alertname'),
'severity': severity,
'category': category,
'container': container_name,
'message': message
}
_alert_history.append(alert_record)
if len(_alert_history) > _MAX_ALERT_HISTORY:
_alert_history.pop(0)
processed_alerts.append(alert_record)
# 如果是觸發狀態且啟用自動修復
if alert_status == 'firing' and AUTO_FIX_ENABLED:
# 分析問題
if severity in ['warning', 'critical']:
analysis = analyze_high_load()
# 發送分析報告
if analysis.get('recommendations'):
analysis_msg = "📊 <b>系統分析報告</b>\n\n"
for rec in analysis['recommendations'][:5]:
analysis_msg += f"{rec}\n"
send_telegram_message(analysis_msg)
# 自動修復容器問題
if container_name and category in ['container_cpu', 'container_memory']:
fix_result = auto_fix_container(container_name, category)
fix_results.append(fix_result)
# 發送修復結果通知
if fix_result['action']:
fix_msg = f"🔧 <b>自動修復</b>\n\n"
fix_msg += f"容器: {fix_result['container']}\n"
fix_msg += f"動作: {fix_result['action']}\n"
fix_msg += f"結果: {'成功 ✅' if fix_result['success'] else '失敗 ❌'}\n"
fix_msg += f"說明: {fix_result['message']}"
send_telegram_message(fix_msg)
return jsonify({
'success': True,
'processed': len(processed_alerts),
'fix_results': fix_results
})
except Exception as e:
logger.error(f"[Alert] Webhook 處理錯誤: {e}")
return jsonify({'error': str(e)}), 500
@alert_bp.route('/api/alert/history')
@check_alert_auth
def get_alert_history():
"""取得告警歷史記錄(需認證,防止系統資訊外洩)"""
limit = request.args.get('limit', 50, type=int)
# 限制 limit 範圍,防止記憶體 DoS
limit = max(1, min(limit, 200))
return jsonify({
'success': True,
'data': _alert_history[-limit:][::-1] # 最新的在前
})
@alert_bp.route('/api/alert/analyze')
@check_alert_auth
def analyze_system():
"""手動觸發系統分析(需認證,防止系統資訊外洩)"""
analysis = analyze_high_load()
return jsonify({
'success': True,
'data': analysis
})
@alert_bp.route('/api/alert/fix', methods=['POST'])
@check_alert_auth
def manual_fix():
"""手動觸發修復(需認證 + issue_type allowlist"""
data = request.get_json() or {}
container_name = data.get('container')
issue_type = data.get('issue_type', 'memory')
if not container_name:
return jsonify({'error': 'Missing container name'}), 400
# Security: issue_type 只允許已知有效值
if issue_type not in ALLOWED_ISSUE_TYPES:
return jsonify({
'error': f'Invalid issue_type: {issue_type}. '
f'Allowed: {sorted(ALLOWED_ISSUE_TYPES)}'
}), 400
result = auto_fix_container(container_name, issue_type)
return jsonify({
'success': result['success'],
'data': result
})
@alert_bp.route('/api/alert/test', methods=['POST'])
def test_alert():
"""測試告警通知"""
message = """
🧪 <b>測試告警</b>
這是一則測試告警訊息。
如果您收到此訊息,表示告警系統正常運作。
<b>時間:</b>{}
""".format(datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'))
success = send_telegram_message(message)
return jsonify({
'success': success,
'message': '測試訊息已發送' if success else '發送失敗,請檢查 Telegram 設定'
})
@alert_bp.route('/api/alert/status')
def alert_status():
"""取得告警系統狀態"""
return jsonify({
'success': True,
'data': {
'telegram_configured': bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_IDS),
'auto_fix_enabled': AUTO_FIX_ENABLED,
'auto_fix_cooldown': AUTO_FIX_COOLDOWN,
'alert_history_count': len(_alert_history),
'last_fix_times': _last_fix_time
}
})