import os import time from flask import render_template, redirect, url_for, request, session, flash from functools import wraps from werkzeug.security import check_password_hash from config import LOGIN_PASSWORD from datetime import datetime, timedelta # ========================================== # 🔓 登入功能開關 # ========================================== # 設定環境變數 DISABLE_LOGIN=true 可關閉登入驗證 DISABLE_LOGIN = os.getenv('DISABLE_LOGIN', 'false').lower() == 'true' if DISABLE_LOGIN: print("⚠️ 警告:登入驗證已關閉 (DISABLE_LOGIN=true)") # ========================================== # 🔒 登入失敗追蹤與帳號鎖定機制 # ========================================== # 登入失敗記錄(IP-based) # 格式:{ip: {'count': 失敗次數, 'locked_until': 鎖定到期時間, 'first_attempt': 首次失敗時間}} LOGIN_ATTEMPTS = {} # 鎖定設定 MAX_LOGIN_ATTEMPTS = 5 # 最多失敗次數 LOCKOUT_DURATION = 300 # 鎖定時長(秒)= 5 分鐘 ATTEMPT_RESET_TIME = 1800 # 失敗記錄重置時間(秒)= 30 分鐘 def get_client_ip(): """ 取得客戶端 IP 位址(支援代理伺服器) """ if request.headers.get('X-Forwarded-For'): # 如果通過代理,取第一個 IP return request.headers.get('X-Forwarded-For').split(',')[0].strip() elif request.headers.get('X-Real-IP'): return request.headers.get('X-Real-IP') else: return request.remote_addr def is_ip_locked(ip): """ 檢查 IP 是否被鎖定 Returns: tuple: (is_locked, remaining_seconds) """ if ip not in LOGIN_ATTEMPTS: return False, 0 attempt_data = LOGIN_ATTEMPTS[ip] # 檢查是否已鎖定 if 'locked_until' in attempt_data: locked_until = attempt_data['locked_until'] current_time = time.time() if current_time < locked_until: # 仍在鎖定期間 remaining = int(locked_until - current_time) return True, remaining else: # 鎖定期已過,清除記錄 del LOGIN_ATTEMPTS[ip] return False, 0 return False, 0 def record_login_failure(ip): """ 記錄登入失敗,並檢查是否需要鎖定 Returns: bool: 是否已達到鎖定條件 """ current_time = time.time() if ip not in LOGIN_ATTEMPTS: LOGIN_ATTEMPTS[ip] = { 'count': 1, 'first_attempt': current_time } return False attempt_data = LOGIN_ATTEMPTS[ip] # 檢查是否超過重置時間(30分鐘無活動則重置計數) if current_time - attempt_data.get('first_attempt', 0) > ATTEMPT_RESET_TIME: LOGIN_ATTEMPTS[ip] = { 'count': 1, 'first_attempt': current_time } return False # 增加失敗次數 attempt_data['count'] += 1 # 檢查是否達到鎖定條件 if attempt_data['count'] >= MAX_LOGIN_ATTEMPTS: attempt_data['locked_until'] = current_time + LOCKOUT_DURATION return True return False def clear_login_attempts(ip): """ 清除登入失敗記錄(登入成功時調用) """ if ip in LOGIN_ATTEMPTS: del LOGIN_ATTEMPTS[ip] def validate_password_strength(password): """ 驗證密碼強度 要求: - 至少 8 個字元 - 包含英文字母 - 包含數字 Returns: tuple: (is_valid, error_message) """ if not password: return False, "密碼不能為空" if len(password) < 8: return False, "密碼長度至少需要 8 個字元" has_letter = any(c.isalpha() for c in password) has_digit = any(c.isdigit() for c in password) if not has_letter: return False, "密碼必須包含英文字母" if not has_digit: return False, "密碼必須包含數字" return True, None # ========================================== # 權限驗證裝飾器 # ========================================== def login_required(f): """ 權限驗證裝飾器:確保使用者必須登入才能存取特定路由。 若環境變數 DISABLE_LOGIN=true,則跳過驗證直接放行。 """ @wraps(f) def decorated_view(*args, **kwargs): # 如果登入功能已關閉,直接放行 if DISABLE_LOGIN: return f(*args, **kwargs) # 檢查 session 中是否有登入標記 if not session.get('logged_in'): # 如果未登入,跳轉至登入頁面 return redirect(url_for('login')) return f(*args, **kwargs) return decorated_view def get_current_user(): """ 取得目前登入的使用者資訊 Returns: dict: 包含使用者資訊的字典,如果未登入則返回 None """ if not session.get('logged_in'): return None return { 'logged_in': True, 'login_time': session.get('login_time'), 'client_ip': session.get('client_ip'), 'role': session.get('role', 'admin'), # 預設為 admin(向後兼容) 'username': session.get('username', 'admin') } def role_required(*roles): """ 角色權限驗證裝飾器:確保使用者具有指定角色之一 Args: *roles: 允許的角色列表,例如 'admin', 'manager', 'user' Usage: @role_required('admin') def admin_only_page(): ... @role_required('admin', 'manager') def admin_or_manager_page(): ... 若環境變數 DISABLE_LOGIN=true,則跳過驗證直接放行。 """ def decorator(f): @wraps(f) def decorated_view(*args, **kwargs): # 如果登入功能已關閉,直接放行 if DISABLE_LOGIN: return f(*args, **kwargs) # 先檢查登入狀態 if not session.get('logged_in'): return redirect(url_for('login')) # 取得使用者角色(預設為 admin,向後兼容) user_role = session.get('role', 'admin') # 檢查角色權限 if user_role not in roles: # 權限不足,返回 403 flash('您沒有權限存取此頁面', 'danger') return redirect(url_for('dashboard.index')) return f(*args, **kwargs) return decorated_view return decorator def admin_required(f): """ 管理員權限驗證裝飾器:只允許 admin 角色存取 這是 role_required('admin') 的便捷寫法 若環境變數 DISABLE_LOGIN=true,則跳過驗證直接放行。 """ @wraps(f) def decorated_view(*args, **kwargs): # 如果登入功能已關閉,直接放行 if DISABLE_LOGIN: return f(*args, **kwargs) # 先檢查登入狀態 if not session.get('logged_in'): return redirect(url_for('login')) # 取得使用者角色(預設為 admin,向後兼容) user_role = session.get('role', 'admin') # 檢查是否為管理員 if user_role != 'admin': flash('此功能僅限管理員使用', 'danger') return redirect(url_for('dashboard')) return f(*args, **kwargs) return decorated_view # ========================================== # 路由初始化 # ========================================== def init_auth_routes(app): """ 初始化驗證相關路由:註冊 /login 與 /logout。 """ @app.route('/login', methods=['GET', 'POST']) def login(): client_ip = get_client_ip() if request.method == 'POST': print(f"🔐 收到登入請求 | IP: {client_ip}") # 1. 檢查 IP 是否被鎖定 is_locked, remaining_seconds = is_ip_locked(client_ip) if is_locked: minutes = remaining_seconds // 60 seconds = remaining_seconds % 60 error = f'登入失敗次數過多,帳號已鎖定。請在 {minutes} 分 {seconds} 秒後再試。' print(f"⚠️ 登入嘗試被拒絕 | IP: {client_ip} | 原因: 帳號鎖定") return render_template('login.html', error=error), 429 # 2. 取得輸入的密碼 input_password = request.form.get('password') if not input_password: error = '請輸入密碼' return render_template('login.html', error=error), 400 # 3. 驗證密碼 # 注意:LOGIN_PASSWORD 可能是明文或雜湊值 # 首先檢查是否為雜湊值(以 pbkdf2:sha256 開頭) is_password_valid = False if LOGIN_PASSWORD.startswith('pbkdf2:sha256:'): # 使用雜湊比對 is_password_valid = check_password_hash(LOGIN_PASSWORD, input_password) else: # 向後兼容:明文比對(僅用於過渡期) is_password_valid = (input_password == LOGIN_PASSWORD) if is_password_valid: print("⚠️ 警告:系統仍在使用明文密碼,請盡快執行密碼雜湊更新") # 4. 驗證結果處理 if is_password_valid: # 登入成功 session.permanent = True session['logged_in'] = True session['login_time'] = datetime.now().isoformat() session['client_ip'] = client_ip # 清除失敗記錄 clear_login_attempts(client_ip) print(f"✅ 登入成功 | IP: {client_ip}") return redirect(url_for('dashboard.index')) else: # 登入失敗 is_now_locked = record_login_failure(client_ip) if is_now_locked: error = f'登入失敗次數過多,帳號已鎖定 {LOCKOUT_DURATION // 60} 分鐘。' print(f"🔒 帳號已鎖定 | IP: {client_ip} | 原因: 連續 {MAX_LOGIN_ATTEMPTS} 次失敗") else: remaining_attempts = MAX_LOGIN_ATTEMPTS - LOGIN_ATTEMPTS.get(client_ip, {}).get('count', 0) error = f'密碼錯誤,請重新輸入。(剩餘嘗試次數:{remaining_attempts})' print(f"❌ 登入失敗 | IP: {client_ip} | 剩餘嘗試: {remaining_attempts}") return render_template('login.html', error=error), 401 # GET 請求:檢查是否被鎖定 is_locked, remaining_seconds = is_ip_locked(client_ip) if is_locked: minutes = remaining_seconds // 60 seconds = remaining_seconds % 60 error = f'登入失敗次數過多,帳號已鎖定。請在 {minutes} 分 {seconds} 秒後再試。' return render_template('login.html', error=error), 429 # 顯示登入頁面 return render_template('login.html', error=None) @app.route('/logout') def logout(): """ 登出路由:清除 session 並導回登入頁。 """ client_ip = get_client_ip() session.pop('logged_in', None) session.pop('login_time', None) session.pop('client_ip', None) print(f"👋 使用者已登出 | IP: {client_ip}") return redirect(url_for('login')) print("✅ Auth 模組已載入(增強安全版本)")