Files
ewoooc/routes/user_routes.py
OoO 2c869edcb1
All checks were successful
CD Pipeline / deploy (push) Successful in 1m0s
統一系統管理頁新版殼層
2026-05-14 00:43:55 +08:00

603 lines
17 KiB
Python

"""
用戶管理路由模組
提供:
- 用戶管理頁面 (Admin only)
- 用戶 CRUD API
- 修改密碼頁面
- 登入歷史查詢
"""
from flask import Blueprint, render_template, request, jsonify, session, redirect, url_for, flash
from auth import login_required, role_required, admin_required, get_current_user
from database.manager import get_session
from services.user_service import UserService
from services.password_service import get_password_requirements
from database.user_models import User
user_bp = Blueprint('user_bp', __name__)
# ==========================================
# 頁面路由
# ==========================================
@user_bp.route('/user_management')
@admin_required
def user_management():
"""用戶管理頁面(僅管理員)"""
return render_template('user_management.html', active_page='user_management')
@user_bp.route('/change_password')
@login_required
def change_password():
"""修改密碼頁面"""
password_requirements = get_password_requirements()
return render_template(
'change_password.html',
password_requirements=password_requirements,
active_page='change_password',
)
@user_bp.route('/login_history')
@admin_required
def login_history_page():
"""登入歷史頁面(僅管理員)"""
return render_template('login_history.html', active_page='login_history')
# ==========================================
# 用戶 CRUD API
# ==========================================
@user_bp.route('/api/users', methods=['GET'])
@admin_required
def get_users():
"""取得所有用戶列表"""
include_inactive = request.args.get('include_inactive', 'false').lower() == 'true'
db_session = get_session()
try:
service = UserService(db_session)
users = service.get_all_users(include_inactive=include_inactive)
return jsonify({
'success': True,
'data': [user.to_dict() for user in users]
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
finally:
db_session.close()
@user_bp.route('/api/users', methods=['POST'])
@admin_required
def create_user():
"""建立新用戶"""
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '無效的請求資料'}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
role = data.get('role', 'user')
email = data.get('email', '').strip() or None
display_name = data.get('display_name', '').strip() or None
if not username:
return jsonify({'success': False, 'message': '帳號為必填'}), 400
if not password:
return jsonify({'success': False, 'message': '密碼為必填'}), 400
db_session = get_session()
try:
service = UserService(db_session)
current_user = get_current_user()
created_by = current_user.get('user_id') if current_user else None
user, error = service.create_user(
username=username,
password=password,
role=role,
email=email,
display_name=display_name,
created_by=created_by
)
if error:
return jsonify({'success': False, 'message': error}), 400
return jsonify({
'success': True,
'message': f"用戶 '{username}' 建立成功",
'data': user.to_dict()
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
@user_bp.route('/api/users/<int:user_id>', methods=['GET'])
@admin_required
def get_user(user_id):
"""取得單一用戶資料"""
db_session = get_session()
try:
service = UserService(db_session)
user = service.get_user_by_id(user_id)
if not user:
return jsonify({'success': False, 'message': '用戶不存在'}), 404
return jsonify({
'success': True,
'data': user.to_dict()
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
@user_bp.route('/api/users/<int:user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""更新用戶資料"""
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '無效的請求資料'}), 400
# 過濾允許更新的欄位
update_data = {}
if 'email' in data:
update_data['email'] = data['email'].strip() or None
if 'display_name' in data:
update_data['display_name'] = data['display_name'].strip() or None
if 'role' in data:
update_data['role'] = data['role']
if 'is_active' in data:
update_data['is_active'] = data['is_active']
db_session = get_session()
try:
service = UserService(db_session)
success, error = service.update_user(user_id, **update_data)
if not success:
return jsonify({'success': False, 'message': error}), 400
return jsonify({
'success': True,
'message': '用戶資料更新成功'
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
@user_bp.route('/api/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
"""刪除用戶(停用)"""
# 不允許刪除自己
current_user = get_current_user()
if current_user and current_user.get('user_id') == user_id:
return jsonify({'success': False, 'message': '無法刪除自己的帳號'}), 400
db_session = get_session()
try:
service = UserService(db_session)
success, error = service.delete_user(user_id)
if not success:
return jsonify({'success': False, 'message': error}), 400
return jsonify({
'success': True,
'message': '用戶已停用'
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
@user_bp.route('/api/users/<int:user_id>/reset_password', methods=['POST'])
@admin_required
def reset_user_password(user_id):
"""重設用戶密碼(管理員)"""
data = request.get_json()
if not data or not data.get('new_password'):
return jsonify({'success': False, 'message': '請提供新密碼'}), 400
new_password = data.get('new_password')
db_session = get_session()
try:
service = UserService(db_session)
current_user = get_current_user()
admin_id = current_user.get('user_id') if current_user else None
success, error = service.reset_password(user_id, new_password, admin_id=admin_id)
if not success:
return jsonify({'success': False, 'message': error}), 400
return jsonify({
'success': True,
'message': '密碼重設成功'
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
# ==========================================
# 修改密碼 API
# ==========================================
@user_bp.route('/api/change_password', methods=['POST'])
@login_required
def api_change_password():
"""修改自己的密碼"""
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '無效的請求資料'}), 400
old_password = data.get('old_password', '')
new_password = data.get('new_password', '')
if not old_password:
return jsonify({'success': False, 'message': '請輸入舊密碼'}), 400
if not new_password:
return jsonify({'success': False, 'message': '請輸入新密碼'}), 400
current_user = get_current_user()
if not current_user or not current_user.get('user_id'):
return jsonify({'success': False, 'message': '無法取得用戶資訊'}), 401
user_id = current_user['user_id']
db_session = get_session()
try:
service = UserService(db_session)
success, error = service.change_password(user_id, old_password, new_password)
if not success:
return jsonify({'success': False, 'message': error}), 400
return jsonify({
'success': True,
'message': '密碼變更成功'
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
# ==========================================
# 登入歷史 API
# ==========================================
@user_bp.route('/api/login_history', methods=['GET'])
@admin_required
def get_login_history():
"""取得登入歷史記錄"""
user_id = request.args.get('user_id', type=int)
limit = request.args.get('limit', 100, type=int)
db_session = get_session()
try:
service = UserService(db_session)
history = service.get_login_history(user_id=user_id, limit=limit)
return jsonify({
'success': True,
'data': [h.to_dict() for h in history]
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
@user_bp.route('/api/my_login_history', methods=['GET'])
@login_required
def get_my_login_history():
"""取得自己的登入歷史記錄"""
current_user = get_current_user()
if not current_user or not current_user.get('user_id'):
return jsonify({'success': False, 'message': '無法取得用戶資訊'}), 401
user_id = current_user['user_id']
limit = request.args.get('limit', 50, type=int)
db_session = get_session()
try:
service = UserService(db_session)
history = service.get_login_history(user_id=user_id, limit=limit)
return jsonify({
'success': True,
'data': [h.to_dict() for h in history]
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db_session.close()
# ==========================================
# 角色定義 API
# ==========================================
@user_bp.route('/api/roles', methods=['GET'])
@login_required
def get_roles():
"""取得角色定義"""
return jsonify({
'success': True,
'data': {
'roles': User.ROLES,
'labels': User.ROLE_LABELS
}
})
# ==========================================
# 密碼要求 API
# ==========================================
@user_bp.route('/api/password_requirements', methods=['GET'])
def get_password_reqs():
"""取得密碼複雜度要求"""
requirements = get_password_requirements()
return jsonify({
'success': True,
'data': requirements
})
# ==========================================
# 權限管理 API
# ==========================================
@user_bp.route('/api/permissions', methods=['GET'])
@admin_required
def get_all_permissions():
"""取得所有權限定義(按分類分組)"""
from services.permission_service import PermissionService
try:
permissions = PermissionService.get_all_permissions()
return jsonify({
'success': True,
'data': permissions
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
@user_bp.route('/api/permissions/flat', methods=['GET'])
@admin_required
def get_all_permissions_flat():
"""取得所有權限定義(扁平列表)"""
from services.permission_service import PermissionService
try:
permissions = PermissionService.get_all_permissions_flat()
return jsonify({
'success': True,
'data': permissions
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
@user_bp.route('/api/users/<int:user_id>/permissions', methods=['GET'])
@admin_required
def get_user_permissions(user_id):
"""取得用戶的權限詳細資訊"""
from services.permission_service import PermissionService
db_session = get_session()
try:
# 確認用戶存在
service = UserService(db_session)
user = service.get_user_by_id(user_id)
if not user:
return jsonify({'success': False, 'message': '用戶不存在'}), 404
# 取得用戶權限詳細資訊
permissions_detail = PermissionService.get_user_permissions_detail(user_id)
return jsonify({
'success': True,
'data': {
'user': user.to_dict(),
'permissions': permissions_detail
}
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
finally:
db_session.close()
@user_bp.route('/api/users/<int:user_id>/permissions', methods=['PUT'])
@admin_required
def update_user_permissions(user_id):
"""更新用戶權限"""
from services.permission_service import PermissionService
data = request.get_json()
if not data or 'permissions' not in data:
return jsonify({'success': False, 'message': '請提供權限列表'}), 400
permission_codes = data.get('permissions', [])
if not isinstance(permission_codes, list):
return jsonify({'success': False, 'message': '權限列表格式錯誤'}), 400
# 不允許修改自己的權限
current_user = get_current_user()
if current_user and current_user.get('user_id') == user_id:
return jsonify({'success': False, 'message': '無法修改自己的權限'}), 400
db_session = get_session()
try:
# 確認用戶存在
service = UserService(db_session)
user = service.get_user_by_id(user_id)
if not user:
return jsonify({'success': False, 'message': '用戶不存在'}), 404
# 不允許修改其他 admin 的權限
if user.role == 'admin':
return jsonify({'success': False, 'message': '無法修改管理員的權限'}), 400
# 設定用戶權限
granted_by = current_user.get('user_id') if current_user else None
success, message = PermissionService.set_user_permissions(
user_id=user_id,
permission_codes=permission_codes,
granted_by=granted_by
)
if success:
# 清除權限快取
PermissionService.clear_user_permission_cache(user_id)
return jsonify({
'success': True,
'message': message
})
else:
return jsonify({
'success': False,
'message': message
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
finally:
db_session.close()
@user_bp.route('/api/users/<int:user_id>/apply_role_template', methods=['POST'])
@admin_required
def apply_role_template(user_id):
"""套用角色預設權限模板"""
from services.permission_service import PermissionService
data = request.get_json()
if not data or 'role' not in data:
return jsonify({'success': False, 'message': '請提供角色名稱'}), 400
role = data.get('role')
# 不允許修改自己的權限
current_user = get_current_user()
if current_user and current_user.get('user_id') == user_id:
return jsonify({'success': False, 'message': '無法修改自己的權限'}), 400
db_session = get_session()
try:
# 確認用戶存在
service = UserService(db_session)
user = service.get_user_by_id(user_id)
if not user:
return jsonify({'success': False, 'message': '用戶不存在'}), 404
# 不允許修改其他 admin 的權限
if user.role == 'admin':
return jsonify({'success': False, 'message': '無法修改管理員的權限'}), 400
# 套用角色模板
granted_by = current_user.get('user_id') if current_user else None
success, message = PermissionService.apply_role_template(
user_id=user_id,
role=role,
granted_by=granted_by
)
if success:
# 清除權限快取
PermissionService.clear_user_permission_cache(user_id)
return jsonify({
'success': True,
'message': f"已套用 {role} 角色模板"
})
else:
return jsonify({
'success': False,
'message': message
}), 400
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
finally:
db_session.close()
@user_bp.route('/api/role_templates', methods=['GET'])
@admin_required
def get_role_templates():
"""取得所有角色預設權限模板"""
from services.permission_service import PermissionService
try:
templates = PermissionService.get_all_role_templates()
return jsonify({
'success': True,
'data': templates
})
except Exception as e:
return jsonify({
'success': False,
'message': str(e)
}), 500
print("✅ User routes 已載入")