Files
ewoooc/auth.py
OoO 71ea819d06 refactor(routes): 刪除 app.py 首頁重複路由
ADR-017 Phase 3f-1 dashboard sprint;首頁改由 dashboard_bp 接管,並更新 url_for('index') 相容引用。
2026-04-29 21:11:45 +08:00

355 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
from flask import render_template, redirect, url_for, request, session, flash
from functools import wraps
from werkzeug.security import check_password_hash
from config import LOGIN_PASSWORD
from datetime import datetime, timedelta
# ==========================================
# 🔓 登入功能開關
# ==========================================
# 設定環境變數 DISABLE_LOGIN=true 可關閉登入驗證
DISABLE_LOGIN = os.getenv('DISABLE_LOGIN', 'false').lower() == 'true'
if DISABLE_LOGIN:
print("⚠️ 警告:登入驗證已關閉 (DISABLE_LOGIN=true)")
# ==========================================
# 🔒 登入失敗追蹤與帳號鎖定機制
# ==========================================
# 登入失敗記錄IP-based
# 格式:{ip: {'count': 失敗次數, 'locked_until': 鎖定到期時間, 'first_attempt': 首次失敗時間}}
LOGIN_ATTEMPTS = {}
# 鎖定設定
MAX_LOGIN_ATTEMPTS = 5 # 最多失敗次數
LOCKOUT_DURATION = 300 # 鎖定時長(秒)= 5 分鐘
ATTEMPT_RESET_TIME = 1800 # 失敗記錄重置時間(秒)= 30 分鐘
def get_client_ip():
"""
取得客戶端 IP 位址(支援代理伺服器)
"""
if request.headers.get('X-Forwarded-For'):
# 如果通過代理,取第一個 IP
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
elif request.headers.get('X-Real-IP'):
return request.headers.get('X-Real-IP')
else:
return request.remote_addr
def is_ip_locked(ip):
"""
檢查 IP 是否被鎖定
Returns:
tuple: (is_locked, remaining_seconds)
"""
if ip not in LOGIN_ATTEMPTS:
return False, 0
attempt_data = LOGIN_ATTEMPTS[ip]
# 檢查是否已鎖定
if 'locked_until' in attempt_data:
locked_until = attempt_data['locked_until']
current_time = time.time()
if current_time < locked_until:
# 仍在鎖定期間
remaining = int(locked_until - current_time)
return True, remaining
else:
# 鎖定期已過,清除記錄
del LOGIN_ATTEMPTS[ip]
return False, 0
return False, 0
def record_login_failure(ip):
"""
記錄登入失敗,並檢查是否需要鎖定
Returns:
bool: 是否已達到鎖定條件
"""
current_time = time.time()
if ip not in LOGIN_ATTEMPTS:
LOGIN_ATTEMPTS[ip] = {
'count': 1,
'first_attempt': current_time
}
return False
attempt_data = LOGIN_ATTEMPTS[ip]
# 檢查是否超過重置時間30分鐘無活動則重置計數
if current_time - attempt_data.get('first_attempt', 0) > ATTEMPT_RESET_TIME:
LOGIN_ATTEMPTS[ip] = {
'count': 1,
'first_attempt': current_time
}
return False
# 增加失敗次數
attempt_data['count'] += 1
# 檢查是否達到鎖定條件
if attempt_data['count'] >= MAX_LOGIN_ATTEMPTS:
attempt_data['locked_until'] = current_time + LOCKOUT_DURATION
return True
return False
def clear_login_attempts(ip):
"""
清除登入失敗記錄(登入成功時調用)
"""
if ip in LOGIN_ATTEMPTS:
del LOGIN_ATTEMPTS[ip]
def validate_password_strength(password):
"""
驗證密碼強度
要求:
- 至少 8 個字元
- 包含英文字母
- 包含數字
Returns:
tuple: (is_valid, error_message)
"""
if not password:
return False, "密碼不能為空"
if len(password) < 8:
return False, "密碼長度至少需要 8 個字元"
has_letter = any(c.isalpha() for c in password)
has_digit = any(c.isdigit() for c in password)
if not has_letter:
return False, "密碼必須包含英文字母"
if not has_digit:
return False, "密碼必須包含數字"
return True, None
# ==========================================
# 權限驗證裝飾器
# ==========================================
def login_required(f):
"""
權限驗證裝飾器:確保使用者必須登入才能存取特定路由。
若環境變數 DISABLE_LOGIN=true則跳過驗證直接放行。
"""
@wraps(f)
def decorated_view(*args, **kwargs):
# 如果登入功能已關閉,直接放行
if DISABLE_LOGIN:
return f(*args, **kwargs)
# 檢查 session 中是否有登入標記
if not session.get('logged_in'):
# 如果未登入,跳轉至登入頁面
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_view
def get_current_user():
"""
取得目前登入的使用者資訊
Returns:
dict: 包含使用者資訊的字典,如果未登入則返回 None
"""
if not session.get('logged_in'):
return None
return {
'logged_in': True,
'login_time': session.get('login_time'),
'client_ip': session.get('client_ip'),
'role': session.get('role', 'admin'), # 預設為 admin向後兼容
'username': session.get('username', 'admin')
}
def role_required(*roles):
"""
角色權限驗證裝飾器:確保使用者具有指定角色之一
Args:
*roles: 允許的角色列表,例如 'admin', 'manager', 'user'
Usage:
@role_required('admin')
def admin_only_page():
...
@role_required('admin', 'manager')
def admin_or_manager_page():
...
若環境變數 DISABLE_LOGIN=true則跳過驗證直接放行。
"""
def decorator(f):
@wraps(f)
def decorated_view(*args, **kwargs):
# 如果登入功能已關閉,直接放行
if DISABLE_LOGIN:
return f(*args, **kwargs)
# 先檢查登入狀態
if not session.get('logged_in'):
return redirect(url_for('login'))
# 取得使用者角色(預設為 admin向後兼容
user_role = session.get('role', 'admin')
# 檢查角色權限
if user_role not in roles:
# 權限不足,返回 403
flash('您沒有權限存取此頁面', 'danger')
return redirect(url_for('dashboard.index'))
return f(*args, **kwargs)
return decorated_view
return decorator
def admin_required(f):
"""
管理員權限驗證裝飾器:只允許 admin 角色存取
這是 role_required('admin') 的便捷寫法
若環境變數 DISABLE_LOGIN=true則跳過驗證直接放行。
"""
@wraps(f)
def decorated_view(*args, **kwargs):
# 如果登入功能已關閉,直接放行
if DISABLE_LOGIN:
return f(*args, **kwargs)
# 先檢查登入狀態
if not session.get('logged_in'):
return redirect(url_for('login'))
# 取得使用者角色(預設為 admin向後兼容
user_role = session.get('role', 'admin')
# 檢查是否為管理員
if user_role != 'admin':
flash('此功能僅限管理員使用', 'danger')
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated_view
# ==========================================
# 路由初始化
# ==========================================
def init_auth_routes(app):
"""
初始化驗證相關路由:註冊 /login 與 /logout。
"""
@app.route('/login', methods=['GET', 'POST'])
def login():
client_ip = get_client_ip()
if request.method == 'POST':
print(f"🔐 收到登入請求 | IP: {client_ip}")
# 1. 檢查 IP 是否被鎖定
is_locked, remaining_seconds = is_ip_locked(client_ip)
if is_locked:
minutes = remaining_seconds // 60
seconds = remaining_seconds % 60
error = f'登入失敗次數過多,帳號已鎖定。請在 {minutes}{seconds} 秒後再試。'
print(f"⚠️ 登入嘗試被拒絕 | IP: {client_ip} | 原因: 帳號鎖定")
return render_template('login.html', error=error), 429
# 2. 取得輸入的密碼
input_password = request.form.get('password')
if not input_password:
error = '請輸入密碼'
return render_template('login.html', error=error), 400
# 3. 驗證密碼
# 注意LOGIN_PASSWORD 可能是明文或雜湊值
# 首先檢查是否為雜湊值(以 pbkdf2:sha256 開頭)
is_password_valid = False
if LOGIN_PASSWORD.startswith('pbkdf2:sha256:'):
# 使用雜湊比對
is_password_valid = check_password_hash(LOGIN_PASSWORD, input_password)
else:
# 向後兼容:明文比對(僅用於過渡期)
is_password_valid = (input_password == LOGIN_PASSWORD)
if is_password_valid:
print("⚠️ 警告:系統仍在使用明文密碼,請盡快執行密碼雜湊更新")
# 4. 驗證結果處理
if is_password_valid:
# 登入成功
session.permanent = True
session['logged_in'] = True
session['login_time'] = datetime.now().isoformat()
session['client_ip'] = client_ip
# 清除失敗記錄
clear_login_attempts(client_ip)
print(f"✅ 登入成功 | IP: {client_ip}")
return redirect(url_for('dashboard.index'))
else:
# 登入失敗
is_now_locked = record_login_failure(client_ip)
if is_now_locked:
error = f'登入失敗次數過多,帳號已鎖定 {LOCKOUT_DURATION // 60} 分鐘。'
print(f"🔒 帳號已鎖定 | IP: {client_ip} | 原因: 連續 {MAX_LOGIN_ATTEMPTS} 次失敗")
else:
remaining_attempts = MAX_LOGIN_ATTEMPTS - LOGIN_ATTEMPTS.get(client_ip, {}).get('count', 0)
error = f'密碼錯誤,請重新輸入。(剩餘嘗試次數:{remaining_attempts}'
print(f"❌ 登入失敗 | IP: {client_ip} | 剩餘嘗試: {remaining_attempts}")
return render_template('login.html', error=error), 401
# GET 請求:檢查是否被鎖定
is_locked, remaining_seconds = is_ip_locked(client_ip)
if is_locked:
minutes = remaining_seconds // 60
seconds = remaining_seconds % 60
error = f'登入失敗次數過多,帳號已鎖定。請在 {minutes}{seconds} 秒後再試。'
return render_template('login.html', error=error), 429
# 顯示登入頁面
return render_template('login.html', error=None)
@app.route('/logout')
def logout():
"""
登出路由:清除 session 並導回登入頁。
"""
client_ip = get_client_ip()
session.pop('logged_in', None)
session.pop('login_time', None)
session.pop('client_ip', None)
print(f"👋 使用者已登出 | IP: {client_ip}")
return redirect(url_for('login'))
print("✅ Auth 模組已載入(增強安全版本)")