security: harden alert_routes.py — auth coverage + input validation
All checks were successful
CD Pipeline / deploy (push) Successful in 1m19s
All checks were successful
CD Pipeline / deploy (push) Successful in 1m19s
Issues fixed: 1. [CRITICAL] /api/alert/fix unauthenticated (CWE-306) POST /api/alert/fix had no @check_alert_auth and was CSRF-exempt. Any unauthenticated caller could trigger docker restart or docker exec on arbitrary container names (container_name is validated by is_valid_container_name but restart of any valid name is still a DoS vector). Fix: @check_alert_auth added. 2. [HIGH] Hardcoded ALERT_WEBHOOK_PASSWORD fallback (CWE-798) Default 'wooo_alert_2026' exposed in source. Fix: default='', startup warning if unset. check_alert_auth now fail-secure: returns 503 if password not configured. 3. [MEDIUM] /api/alert/history and /api/alert/analyze unauthenticated Both endpoints expose container names, memory usage, CPU stats, system recommendations. Fix: @check_alert_auth added to both. 4. [MEDIUM] issue_type unvalidated in manual_fix (CWE-20) Any string value could be passed through to auto_fix_container. Fix: ALLOWED_ISSUE_TYPES frozenset — only memory/cpu variants allowed. 5. [LOW] limit parameter unbounded in get_alert_history Arbitrarily large limit → large list slice → memory pressure. Fix: clamped to [1, 200]. NOTE: L177 docker stats command (original report) is SAFE as-is — list argv, fixed arguments, no user input. nosec B603 correctly placed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -51,7 +51,13 @@ def csrf_exempt_for_alerts():
|
||||
|
||||
# 告警 Webhook 認證
|
||||
ALERT_WEBHOOK_USER = os.getenv('ALERT_WEBHOOK_USER', 'alertmanager')
|
||||
ALERT_WEBHOOK_PASSWORD = os.getenv('ALERT_WEBHOOK_PASSWORD', 'wooo_alert_2026')
|
||||
ALERT_WEBHOOK_PASSWORD = os.getenv('ALERT_WEBHOOK_PASSWORD', '')
|
||||
|
||||
if not ALERT_WEBHOOK_PASSWORD:
|
||||
logger.warning(
|
||||
'[SECURITY] ALERT_WEBHOOK_PASSWORD is not set. '
|
||||
'All webhook requests will be rejected. Set this environment variable.'
|
||||
)
|
||||
|
||||
# Telegram 設定
|
||||
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
|
||||
@@ -66,11 +72,20 @@ AUTO_FIX_ENABLED = os.getenv('AUTO_FIX_ENABLED', 'true').lower() == 'true'
|
||||
AUTO_FIX_COOLDOWN = 300 # 同一容器 5 分鐘內不重複修復
|
||||
_last_fix_time = {}
|
||||
|
||||
# issue_type allowlist(防止任意值傳入 auto_fix_container)
|
||||
ALLOWED_ISSUE_TYPES = frozenset({
|
||||
'memory', 'container_memory',
|
||||
'cpu', 'container_cpu',
|
||||
})
|
||||
|
||||
|
||||
def check_alert_auth(f):
|
||||
"""告警 Webhook 認證裝飾器"""
|
||||
"""告警 Webhook 認證裝飾器(fail-secure:密碼未設定時拒絕所有請求)"""
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
# fail-secure: 若密碼未設定,拒絕一切存取
|
||||
if not ALERT_WEBHOOK_PASSWORD:
|
||||
return jsonify({'error': 'Server misconfiguration: auth not configured'}), 503
|
||||
auth = request.authorization
|
||||
if not auth:
|
||||
return jsonify({'error': 'Missing authentication'}), 401
|
||||
@@ -384,9 +399,12 @@ def alert_webhook():
|
||||
|
||||
|
||||
@alert_bp.route('/api/alert/history')
|
||||
@check_alert_auth
|
||||
def get_alert_history():
|
||||
"""取得告警歷史記錄"""
|
||||
"""取得告警歷史記錄(需認證,防止系統資訊外洩)"""
|
||||
limit = request.args.get('limit', 50, type=int)
|
||||
# 限制 limit 範圍,防止記憶體 DoS
|
||||
limit = max(1, min(limit, 200))
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': _alert_history[-limit:][::-1] # 最新的在前
|
||||
@@ -394,8 +412,9 @@ def get_alert_history():
|
||||
|
||||
|
||||
@alert_bp.route('/api/alert/analyze')
|
||||
@check_alert_auth
|
||||
def analyze_system():
|
||||
"""手動觸發系統分析"""
|
||||
"""手動觸發系統分析(需認證,防止系統資訊外洩)"""
|
||||
analysis = analyze_high_load()
|
||||
return jsonify({
|
||||
'success': True,
|
||||
@@ -404,15 +423,23 @@ def analyze_system():
|
||||
|
||||
|
||||
@alert_bp.route('/api/alert/fix', methods=['POST'])
|
||||
@check_alert_auth
|
||||
def manual_fix():
|
||||
"""手動觸發修復"""
|
||||
data = request.get_json()
|
||||
"""手動觸發修復(需認證 + issue_type allowlist)"""
|
||||
data = request.get_json() or {}
|
||||
container_name = data.get('container')
|
||||
issue_type = data.get('issue_type', 'memory')
|
||||
|
||||
if not container_name:
|
||||
return jsonify({'error': 'Missing container name'}), 400
|
||||
|
||||
# Security: issue_type 只允許已知有效值
|
||||
if issue_type not in ALLOWED_ISSUE_TYPES:
|
||||
return jsonify({
|
||||
'error': f'Invalid issue_type: {issue_type}. '
|
||||
f'Allowed: {sorted(ALLOWED_ISSUE_TYPES)}'
|
||||
}), 400
|
||||
|
||||
result = auto_fix_container(container_name, issue_type)
|
||||
return jsonify({
|
||||
'success': result['success'],
|
||||
|
||||
Reference in New Issue
Block a user