354 lines
11 KiB
Python
354 lines
11 KiB
Python
import os
|
||
import time
|
||
from flask import render_template, redirect, url_for, request, session, flash
|
||
from functools import wraps
|
||
from werkzeug.security import check_password_hash
|
||
from config import LOGIN_PASSWORD
|
||
from datetime import datetime, timedelta
|
||
|
||
# ==========================================
|
||
# 🔓 登入功能開關
|
||
# ==========================================
|
||
# 設定環境變數 DISABLE_LOGIN=true 可關閉登入驗證
|
||
DISABLE_LOGIN = os.getenv('DISABLE_LOGIN', 'false').lower() == 'true'
|
||
|
||
if DISABLE_LOGIN:
|
||
print("⚠️ 警告:登入驗證已關閉 (DISABLE_LOGIN=true)")
|
||
|
||
# ==========================================
|
||
# 🔒 登入失敗追蹤與帳號鎖定機制
|
||
# ==========================================
|
||
|
||
# 登入失敗記錄(IP-based)
|
||
# 格式:{ip: {'count': 失敗次數, 'locked_until': 鎖定到期時間, 'first_attempt': 首次失敗時間}}
|
||
LOGIN_ATTEMPTS = {}
|
||
|
||
# 鎖定設定
|
||
MAX_LOGIN_ATTEMPTS = 5 # 最多失敗次數
|
||
LOCKOUT_DURATION = 300 # 鎖定時長(秒)= 5 分鐘
|
||
ATTEMPT_RESET_TIME = 1800 # 失敗記錄重置時間(秒)= 30 分鐘
|
||
|
||
def get_client_ip():
|
||
"""
|
||
取得客戶端 IP 位址(支援代理伺服器)
|
||
"""
|
||
if request.headers.get('X-Forwarded-For'):
|
||
# 如果通過代理,取第一個 IP
|
||
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
|
||
elif request.headers.get('X-Real-IP'):
|
||
return request.headers.get('X-Real-IP')
|
||
else:
|
||
return request.remote_addr
|
||
|
||
def is_ip_locked(ip):
|
||
"""
|
||
檢查 IP 是否被鎖定
|
||
|
||
Returns:
|
||
tuple: (is_locked, remaining_seconds)
|
||
"""
|
||
if ip not in LOGIN_ATTEMPTS:
|
||
return False, 0
|
||
|
||
attempt_data = LOGIN_ATTEMPTS[ip]
|
||
|
||
# 檢查是否已鎖定
|
||
if 'locked_until' in attempt_data:
|
||
locked_until = attempt_data['locked_until']
|
||
current_time = time.time()
|
||
|
||
if current_time < locked_until:
|
||
# 仍在鎖定期間
|
||
remaining = int(locked_until - current_time)
|
||
return True, remaining
|
||
else:
|
||
# 鎖定期已過,清除記錄
|
||
del LOGIN_ATTEMPTS[ip]
|
||
return False, 0
|
||
|
||
return False, 0
|
||
|
||
def record_login_failure(ip):
|
||
"""
|
||
記錄登入失敗,並檢查是否需要鎖定
|
||
|
||
Returns:
|
||
bool: 是否已達到鎖定條件
|
||
"""
|
||
current_time = time.time()
|
||
|
||
if ip not in LOGIN_ATTEMPTS:
|
||
LOGIN_ATTEMPTS[ip] = {
|
||
'count': 1,
|
||
'first_attempt': current_time
|
||
}
|
||
return False
|
||
|
||
attempt_data = LOGIN_ATTEMPTS[ip]
|
||
|
||
# 檢查是否超過重置時間(30分鐘無活動則重置計數)
|
||
if current_time - attempt_data.get('first_attempt', 0) > ATTEMPT_RESET_TIME:
|
||
LOGIN_ATTEMPTS[ip] = {
|
||
'count': 1,
|
||
'first_attempt': current_time
|
||
}
|
||
return False
|
||
|
||
# 增加失敗次數
|
||
attempt_data['count'] += 1
|
||
|
||
# 檢查是否達到鎖定條件
|
||
if attempt_data['count'] >= MAX_LOGIN_ATTEMPTS:
|
||
attempt_data['locked_until'] = current_time + LOCKOUT_DURATION
|
||
return True
|
||
|
||
return False
|
||
|
||
def clear_login_attempts(ip):
|
||
"""
|
||
清除登入失敗記錄(登入成功時調用)
|
||
"""
|
||
if ip in LOGIN_ATTEMPTS:
|
||
del LOGIN_ATTEMPTS[ip]
|
||
|
||
def validate_password_strength(password):
|
||
"""
|
||
驗證密碼強度
|
||
|
||
要求:
|
||
- 至少 8 個字元
|
||
- 包含英文字母
|
||
- 包含數字
|
||
|
||
Returns:
|
||
tuple: (is_valid, error_message)
|
||
"""
|
||
if not password:
|
||
return False, "密碼不能為空"
|
||
|
||
if len(password) < 8:
|
||
return False, "密碼長度至少需要 8 個字元"
|
||
|
||
has_letter = any(c.isalpha() for c in password)
|
||
has_digit = any(c.isdigit() for c in password)
|
||
|
||
if not has_letter:
|
||
return False, "密碼必須包含英文字母"
|
||
|
||
if not has_digit:
|
||
return False, "密碼必須包含數字"
|
||
|
||
return True, None
|
||
|
||
# ==========================================
|
||
# 權限驗證裝飾器
|
||
# ==========================================
|
||
|
||
def login_required(f):
|
||
"""
|
||
權限驗證裝飾器:確保使用者必須登入才能存取特定路由。
|
||
|
||
若環境變數 DISABLE_LOGIN=true,則跳過驗證直接放行。
|
||
"""
|
||
@wraps(f)
|
||
def decorated_view(*args, **kwargs):
|
||
# 如果登入功能已關閉,直接放行
|
||
if DISABLE_LOGIN:
|
||
return f(*args, **kwargs)
|
||
|
||
# 檢查 session 中是否有登入標記
|
||
if not session.get('logged_in'):
|
||
# 如果未登入,跳轉至登入頁面
|
||
return redirect(url_for('login'))
|
||
return f(*args, **kwargs)
|
||
return decorated_view
|
||
|
||
|
||
def get_current_user():
|
||
"""
|
||
取得目前登入的使用者資訊
|
||
|
||
Returns:
|
||
dict: 包含使用者資訊的字典,如果未登入則返回 None
|
||
"""
|
||
if not session.get('logged_in'):
|
||
return None
|
||
|
||
return {
|
||
'logged_in': True,
|
||
'login_time': session.get('login_time'),
|
||
'client_ip': session.get('client_ip'),
|
||
'role': session.get('role', 'admin'), # 預設為 admin(向後兼容)
|
||
'username': session.get('username', 'admin')
|
||
}
|
||
|
||
|
||
def role_required(*roles):
|
||
"""
|
||
角色權限驗證裝飾器:確保使用者具有指定角色之一
|
||
|
||
Args:
|
||
*roles: 允許的角色列表,例如 'admin', 'manager', 'user'
|
||
|
||
Usage:
|
||
@role_required('admin')
|
||
def admin_only_page():
|
||
...
|
||
|
||
@role_required('admin', 'manager')
|
||
def admin_or_manager_page():
|
||
...
|
||
|
||
若環境變數 DISABLE_LOGIN=true,則跳過驗證直接放行。
|
||
"""
|
||
def decorator(f):
|
||
@wraps(f)
|
||
def decorated_view(*args, **kwargs):
|
||
# 如果登入功能已關閉,直接放行
|
||
if DISABLE_LOGIN:
|
||
return f(*args, **kwargs)
|
||
|
||
# 先檢查登入狀態
|
||
if not session.get('logged_in'):
|
||
return redirect(url_for('login'))
|
||
|
||
# 取得使用者角色(預設為 admin,向後兼容)
|
||
user_role = session.get('role', 'admin')
|
||
|
||
# 檢查角色權限
|
||
if user_role not in roles:
|
||
# 權限不足,返回 403
|
||
flash('您沒有權限存取此頁面', 'danger')
|
||
return redirect(url_for('index'))
|
||
|
||
return f(*args, **kwargs)
|
||
return decorated_view
|
||
return decorator
|
||
|
||
|
||
def admin_required(f):
|
||
"""
|
||
管理員權限驗證裝飾器:只允許 admin 角色存取
|
||
|
||
這是 role_required('admin') 的便捷寫法
|
||
|
||
若環境變數 DISABLE_LOGIN=true,則跳過驗證直接放行。
|
||
"""
|
||
@wraps(f)
|
||
def decorated_view(*args, **kwargs):
|
||
# 如果登入功能已關閉,直接放行
|
||
if DISABLE_LOGIN:
|
||
return f(*args, **kwargs)
|
||
|
||
# 先檢查登入狀態
|
||
if not session.get('logged_in'):
|
||
return redirect(url_for('login'))
|
||
|
||
# 取得使用者角色(預設為 admin,向後兼容)
|
||
user_role = session.get('role', 'admin')
|
||
|
||
# 檢查是否為管理員
|
||
if user_role != 'admin':
|
||
flash('此功能僅限管理員使用', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
|
||
return f(*args, **kwargs)
|
||
return decorated_view
|
||
|
||
# ==========================================
|
||
# 路由初始化
|
||
# ==========================================
|
||
|
||
def init_auth_routes(app):
|
||
"""
|
||
初始化驗證相關路由:註冊 /login 與 /logout。
|
||
"""
|
||
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
client_ip = get_client_ip()
|
||
|
||
if request.method == 'POST':
|
||
print(f"🔐 收到登入請求 | IP: {client_ip}")
|
||
|
||
# 1. 檢查 IP 是否被鎖定
|
||
is_locked, remaining_seconds = is_ip_locked(client_ip)
|
||
if is_locked:
|
||
minutes = remaining_seconds // 60
|
||
seconds = remaining_seconds % 60
|
||
error = f'登入失敗次數過多,帳號已鎖定。請在 {minutes} 分 {seconds} 秒後再試。'
|
||
print(f"⚠️ 登入嘗試被拒絕 | IP: {client_ip} | 原因: 帳號鎖定")
|
||
return render_template('login.html', error=error), 429
|
||
|
||
# 2. 取得輸入的密碼
|
||
input_password = request.form.get('password')
|
||
|
||
if not input_password:
|
||
error = '請輸入密碼'
|
||
return render_template('login.html', error=error), 400
|
||
|
||
# 3. 驗證密碼
|
||
# 注意:LOGIN_PASSWORD 可能是明文或雜湊值
|
||
# 首先檢查是否為雜湊值(以 pbkdf2:sha256 開頭)
|
||
is_password_valid = False
|
||
|
||
if LOGIN_PASSWORD.startswith('pbkdf2:sha256:'):
|
||
# 使用雜湊比對
|
||
is_password_valid = check_password_hash(LOGIN_PASSWORD, input_password)
|
||
else:
|
||
# 向後兼容:明文比對(僅用於過渡期)
|
||
is_password_valid = (input_password == LOGIN_PASSWORD)
|
||
if is_password_valid:
|
||
print("⚠️ 警告:系統仍在使用明文密碼,請盡快執行密碼雜湊更新")
|
||
|
||
# 4. 驗證結果處理
|
||
if is_password_valid:
|
||
# 登入成功
|
||
session.permanent = True
|
||
session['logged_in'] = True
|
||
session['login_time'] = datetime.now().isoformat()
|
||
session['client_ip'] = client_ip
|
||
|
||
# 清除失敗記錄
|
||
clear_login_attempts(client_ip)
|
||
|
||
print(f"✅ 登入成功 | IP: {client_ip}")
|
||
return redirect(url_for('index'))
|
||
else:
|
||
# 登入失敗
|
||
is_now_locked = record_login_failure(client_ip)
|
||
|
||
if is_now_locked:
|
||
error = f'登入失敗次數過多,帳號已鎖定 {LOCKOUT_DURATION // 60} 分鐘。'
|
||
print(f"🔒 帳號已鎖定 | IP: {client_ip} | 原因: 連續 {MAX_LOGIN_ATTEMPTS} 次失敗")
|
||
else:
|
||
remaining_attempts = MAX_LOGIN_ATTEMPTS - LOGIN_ATTEMPTS.get(client_ip, {}).get('count', 0)
|
||
error = f'密碼錯誤,請重新輸入。(剩餘嘗試次數:{remaining_attempts})'
|
||
print(f"❌ 登入失敗 | IP: {client_ip} | 剩餘嘗試: {remaining_attempts}")
|
||
|
||
return render_template('login.html', error=error), 401
|
||
|
||
# GET 請求:檢查是否被鎖定
|
||
is_locked, remaining_seconds = is_ip_locked(client_ip)
|
||
if is_locked:
|
||
minutes = remaining_seconds // 60
|
||
seconds = remaining_seconds % 60
|
||
error = f'登入失敗次數過多,帳號已鎖定。請在 {minutes} 分 {seconds} 秒後再試。'
|
||
return render_template('login.html', error=error), 429
|
||
|
||
# 顯示登入頁面
|
||
return render_template('login.html', error=None)
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
"""
|
||
登出路由:清除 session 並導回登入頁。
|
||
"""
|
||
client_ip = get_client_ip()
|
||
session.pop('logged_in', None)
|
||
session.pop('login_time', None)
|
||
session.pop('client_ip', None)
|
||
print(f"👋 使用者已登出 | IP: {client_ip}")
|
||
return redirect(url_for('login'))
|
||
|
||
print("✅ Auth 模組已載入(增強安全版本)") |