From 38586deff13545d909b6749ec11cbebfd9f0fbd3 Mon Sep 17 00:00:00 2001 From: ogt Date: Mon, 20 Apr 2026 05:49:04 +0800 Subject: [PATCH] =?UTF-8?q?security:=20harden=20alert=5Froutes.py=20?= =?UTF-8?q?=E2=80=94=20auth=20coverage=20+=20input=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issues fixed: 1. [CRITICAL] /api/alert/fix unauthenticated (CWE-306) POST /api/alert/fix had no @check_alert_auth and was CSRF-exempt. Any unauthenticated caller could trigger docker restart or docker exec on arbitrary container names (container_name is validated by is_valid_container_name but restart of any valid name is still a DoS vector). Fix: @check_alert_auth added. 2. [HIGH] Hardcoded ALERT_WEBHOOK_PASSWORD fallback (CWE-798) Default 'wooo_alert_2026' exposed in source. Fix: default='', startup warning if unset. check_alert_auth now fail-secure: returns 503 if password not configured. 3. [MEDIUM] /api/alert/history and /api/alert/analyze unauthenticated Both endpoints expose container names, memory usage, CPU stats, system recommendations. Fix: @check_alert_auth added to both. 4. [MEDIUM] issue_type unvalidated in manual_fix (CWE-20) Any string value could be passed through to auto_fix_container. Fix: ALLOWED_ISSUE_TYPES frozenset — only memory/cpu variants allowed. 5. [LOW] limit parameter unbounded in get_alert_history Arbitrarily large limit → large list slice → memory pressure. Fix: clamped to [1, 200]. NOTE: L177 docker stats command (original report) is SAFE as-is — list argv, fixed arguments, no user input. nosec B603 correctly placed. Co-Authored-By: Claude Sonnet 4.6 --- routes/alert_routes.py | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/routes/alert_routes.py b/routes/alert_routes.py index 7b740ad..04bc176 100644 --- a/routes/alert_routes.py +++ b/routes/alert_routes.py @@ -51,7 +51,13 @@ def csrf_exempt_for_alerts(): # 告警 Webhook 認證 ALERT_WEBHOOK_USER = os.getenv('ALERT_WEBHOOK_USER', 'alertmanager') -ALERT_WEBHOOK_PASSWORD = os.getenv('ALERT_WEBHOOK_PASSWORD', 'wooo_alert_2026') +ALERT_WEBHOOK_PASSWORD = os.getenv('ALERT_WEBHOOK_PASSWORD', '') + +if not ALERT_WEBHOOK_PASSWORD: + logger.warning( + '[SECURITY] ALERT_WEBHOOK_PASSWORD is not set. ' + 'All webhook requests will be rejected. Set this environment variable.' + ) # Telegram 設定 TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '') @@ -66,11 +72,20 @@ AUTO_FIX_ENABLED = os.getenv('AUTO_FIX_ENABLED', 'true').lower() == 'true' AUTO_FIX_COOLDOWN = 300 # 同一容器 5 分鐘內不重複修復 _last_fix_time = {} +# issue_type allowlist(防止任意值傳入 auto_fix_container) +ALLOWED_ISSUE_TYPES = frozenset({ + 'memory', 'container_memory', + 'cpu', 'container_cpu', +}) + def check_alert_auth(f): - """告警 Webhook 認證裝飾器""" + """告警 Webhook 認證裝飾器(fail-secure:密碼未設定時拒絕所有請求)""" @wraps(f) def decorated(*args, **kwargs): + # fail-secure: 若密碼未設定,拒絕一切存取 + if not ALERT_WEBHOOK_PASSWORD: + return jsonify({'error': 'Server misconfiguration: auth not configured'}), 503 auth = request.authorization if not auth: return jsonify({'error': 'Missing authentication'}), 401 @@ -384,9 +399,12 @@ def alert_webhook(): @alert_bp.route('/api/alert/history') +@check_alert_auth def get_alert_history(): - """取得告警歷史記錄""" + """取得告警歷史記錄(需認證,防止系統資訊外洩)""" limit = request.args.get('limit', 50, type=int) + # 限制 limit 範圍,防止記憶體 DoS + limit = max(1, min(limit, 200)) return jsonify({ 'success': True, 'data': _alert_history[-limit:][::-1] # 最新的在前 @@ -394,8 +412,9 @@ def get_alert_history(): @alert_bp.route('/api/alert/analyze') +@check_alert_auth def analyze_system(): - """手動觸發系統分析""" + """手動觸發系統分析(需認證,防止系統資訊外洩)""" analysis = analyze_high_load() return jsonify({ 'success': True, @@ -404,15 +423,23 @@ def analyze_system(): @alert_bp.route('/api/alert/fix', methods=['POST']) +@check_alert_auth def manual_fix(): - """手動觸發修復""" - data = request.get_json() + """手動觸發修復(需認證 + issue_type allowlist)""" + data = request.get_json() or {} container_name = data.get('container') issue_type = data.get('issue_type', 'memory') if not container_name: return jsonify({'error': 'Missing container name'}), 400 + # Security: issue_type 只允許已知有效值 + if issue_type not in ALLOWED_ISSUE_TYPES: + return jsonify({ + 'error': f'Invalid issue_type: {issue_type}. ' + f'Allowed: {sorted(ALLOWED_ISSUE_TYPES)}' + }), 400 + result = auto_fix_container(container_name, issue_type) return jsonify({ 'success': result['success'],