603 lines
17 KiB
Python
603 lines
17 KiB
Python
"""
|
|
用戶管理路由模組
|
|
|
|
提供:
|
|
- 用戶管理頁面 (Admin only)
|
|
- 用戶 CRUD API
|
|
- 修改密碼頁面
|
|
- 登入歷史查詢
|
|
"""
|
|
|
|
from flask import Blueprint, render_template, request, jsonify, session, redirect, url_for, flash
|
|
from auth import login_required, role_required, admin_required, get_current_user
|
|
from database.manager import get_session
|
|
from services.user_service import UserService
|
|
from services.password_service import get_password_requirements
|
|
from database.user_models import User
|
|
|
|
user_bp = Blueprint('user_bp', __name__)
|
|
|
|
|
|
# ==========================================
|
|
# 頁面路由
|
|
# ==========================================
|
|
|
|
@user_bp.route('/user_management')
|
|
@admin_required
|
|
def user_management():
|
|
"""用戶管理頁面(僅管理員)"""
|
|
return render_template('user_management.html', active_page='user_management')
|
|
|
|
|
|
@user_bp.route('/change_password')
|
|
@login_required
|
|
def change_password():
|
|
"""修改密碼頁面"""
|
|
password_requirements = get_password_requirements()
|
|
return render_template(
|
|
'change_password.html',
|
|
password_requirements=password_requirements,
|
|
active_page='change_password',
|
|
)
|
|
|
|
|
|
@user_bp.route('/login_history')
|
|
@admin_required
|
|
def login_history_page():
|
|
"""登入歷史頁面(僅管理員)"""
|
|
return render_template('login_history.html', active_page='login_history')
|
|
|
|
|
|
# ==========================================
|
|
# 用戶 CRUD API
|
|
# ==========================================
|
|
|
|
@user_bp.route('/api/users', methods=['GET'])
|
|
@admin_required
|
|
def get_users():
|
|
"""取得所有用戶列表"""
|
|
include_inactive = request.args.get('include_inactive', 'false').lower() == 'true'
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
users = service.get_all_users(include_inactive=include_inactive)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [user.to_dict() for user in users]
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users', methods=['POST'])
|
|
@admin_required
|
|
def create_user():
|
|
"""建立新用戶"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'success': False, 'message': '無效的請求資料'}), 400
|
|
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '')
|
|
role = data.get('role', 'user')
|
|
email = data.get('email', '').strip() or None
|
|
display_name = data.get('display_name', '').strip() or None
|
|
|
|
if not username:
|
|
return jsonify({'success': False, 'message': '帳號為必填'}), 400
|
|
|
|
if not password:
|
|
return jsonify({'success': False, 'message': '密碼為必填'}), 400
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
current_user = get_current_user()
|
|
created_by = current_user.get('user_id') if current_user else None
|
|
|
|
user, error = service.create_user(
|
|
username=username,
|
|
password=password,
|
|
role=role,
|
|
email=email,
|
|
display_name=display_name,
|
|
created_by=created_by
|
|
)
|
|
|
|
if error:
|
|
return jsonify({'success': False, 'message': error}), 400
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f"用戶 '{username}' 建立成功",
|
|
'data': user.to_dict()
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>', methods=['GET'])
|
|
@admin_required
|
|
def get_user(user_id):
|
|
"""取得單一用戶資料"""
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
user = service.get_user_by_id(user_id)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': '用戶不存在'}), 404
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': user.to_dict()
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>', methods=['PUT'])
|
|
@admin_required
|
|
def update_user(user_id):
|
|
"""更新用戶資料"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'success': False, 'message': '無效的請求資料'}), 400
|
|
|
|
# 過濾允許更新的欄位
|
|
update_data = {}
|
|
if 'email' in data:
|
|
update_data['email'] = data['email'].strip() or None
|
|
if 'display_name' in data:
|
|
update_data['display_name'] = data['display_name'].strip() or None
|
|
if 'role' in data:
|
|
update_data['role'] = data['role']
|
|
if 'is_active' in data:
|
|
update_data['is_active'] = data['is_active']
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
success, error = service.update_user(user_id, **update_data)
|
|
|
|
if not success:
|
|
return jsonify({'success': False, 'message': error}), 400
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': '用戶資料更新成功'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>', methods=['DELETE'])
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
"""刪除用戶(停用)"""
|
|
# 不允許刪除自己
|
|
current_user = get_current_user()
|
|
if current_user and current_user.get('user_id') == user_id:
|
|
return jsonify({'success': False, 'message': '無法刪除自己的帳號'}), 400
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
success, error = service.delete_user(user_id)
|
|
|
|
if not success:
|
|
return jsonify({'success': False, 'message': error}), 400
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': '用戶已停用'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>/reset_password', methods=['POST'])
|
|
@admin_required
|
|
def reset_user_password(user_id):
|
|
"""重設用戶密碼(管理員)"""
|
|
data = request.get_json()
|
|
|
|
if not data or not data.get('new_password'):
|
|
return jsonify({'success': False, 'message': '請提供新密碼'}), 400
|
|
|
|
new_password = data.get('new_password')
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
current_user = get_current_user()
|
|
admin_id = current_user.get('user_id') if current_user else None
|
|
|
|
success, error = service.reset_password(user_id, new_password, admin_id=admin_id)
|
|
|
|
if not success:
|
|
return jsonify({'success': False, 'message': error}), 400
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': '密碼重設成功'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
# ==========================================
|
|
# 修改密碼 API
|
|
# ==========================================
|
|
|
|
@user_bp.route('/api/change_password', methods=['POST'])
|
|
@login_required
|
|
def api_change_password():
|
|
"""修改自己的密碼"""
|
|
data = request.get_json()
|
|
|
|
if not data:
|
|
return jsonify({'success': False, 'message': '無效的請求資料'}), 400
|
|
|
|
old_password = data.get('old_password', '')
|
|
new_password = data.get('new_password', '')
|
|
|
|
if not old_password:
|
|
return jsonify({'success': False, 'message': '請輸入舊密碼'}), 400
|
|
|
|
if not new_password:
|
|
return jsonify({'success': False, 'message': '請輸入新密碼'}), 400
|
|
|
|
current_user = get_current_user()
|
|
if not current_user or not current_user.get('user_id'):
|
|
return jsonify({'success': False, 'message': '無法取得用戶資訊'}), 401
|
|
|
|
user_id = current_user['user_id']
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
success, error = service.change_password(user_id, old_password, new_password)
|
|
|
|
if not success:
|
|
return jsonify({'success': False, 'message': error}), 400
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': '密碼變更成功'
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
# ==========================================
|
|
# 登入歷史 API
|
|
# ==========================================
|
|
|
|
@user_bp.route('/api/login_history', methods=['GET'])
|
|
@admin_required
|
|
def get_login_history():
|
|
"""取得登入歷史記錄"""
|
|
user_id = request.args.get('user_id', type=int)
|
|
limit = request.args.get('limit', 100, type=int)
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
history = service.get_login_history(user_id=user_id, limit=limit)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [h.to_dict() for h in history]
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/my_login_history', methods=['GET'])
|
|
@login_required
|
|
def get_my_login_history():
|
|
"""取得自己的登入歷史記錄"""
|
|
current_user = get_current_user()
|
|
if not current_user or not current_user.get('user_id'):
|
|
return jsonify({'success': False, 'message': '無法取得用戶資訊'}), 401
|
|
|
|
user_id = current_user['user_id']
|
|
limit = request.args.get('limit', 50, type=int)
|
|
|
|
db_session = get_session()
|
|
try:
|
|
service = UserService(db_session)
|
|
history = service.get_login_history(user_id=user_id, limit=limit)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [h.to_dict() for h in history]
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'message': str(e)}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
# ==========================================
|
|
# 角色定義 API
|
|
# ==========================================
|
|
|
|
@user_bp.route('/api/roles', methods=['GET'])
|
|
@login_required
|
|
def get_roles():
|
|
"""取得角色定義"""
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'roles': User.ROLES,
|
|
'labels': User.ROLE_LABELS
|
|
}
|
|
})
|
|
|
|
|
|
# ==========================================
|
|
# 密碼要求 API
|
|
# ==========================================
|
|
|
|
@user_bp.route('/api/password_requirements', methods=['GET'])
|
|
def get_password_reqs():
|
|
"""取得密碼複雜度要求"""
|
|
requirements = get_password_requirements()
|
|
return jsonify({
|
|
'success': True,
|
|
'data': requirements
|
|
})
|
|
|
|
|
|
# ==========================================
|
|
# 權限管理 API
|
|
# ==========================================
|
|
|
|
@user_bp.route('/api/permissions', methods=['GET'])
|
|
@admin_required
|
|
def get_all_permissions():
|
|
"""取得所有權限定義(按分類分組)"""
|
|
from services.permission_service import PermissionService
|
|
|
|
try:
|
|
permissions = PermissionService.get_all_permissions()
|
|
return jsonify({
|
|
'success': True,
|
|
'data': permissions
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
|
|
|
|
@user_bp.route('/api/permissions/flat', methods=['GET'])
|
|
@admin_required
|
|
def get_all_permissions_flat():
|
|
"""取得所有權限定義(扁平列表)"""
|
|
from services.permission_service import PermissionService
|
|
|
|
try:
|
|
permissions = PermissionService.get_all_permissions_flat()
|
|
return jsonify({
|
|
'success': True,
|
|
'data': permissions
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>/permissions', methods=['GET'])
|
|
@admin_required
|
|
def get_user_permissions(user_id):
|
|
"""取得用戶的權限詳細資訊"""
|
|
from services.permission_service import PermissionService
|
|
|
|
db_session = get_session()
|
|
try:
|
|
# 確認用戶存在
|
|
service = UserService(db_session)
|
|
user = service.get_user_by_id(user_id)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': '用戶不存在'}), 404
|
|
|
|
# 取得用戶權限詳細資訊
|
|
permissions_detail = PermissionService.get_user_permissions_detail(user_id)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'user': user.to_dict(),
|
|
'permissions': permissions_detail
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>/permissions', methods=['PUT'])
|
|
@admin_required
|
|
def update_user_permissions(user_id):
|
|
"""更新用戶權限"""
|
|
from services.permission_service import PermissionService
|
|
|
|
data = request.get_json()
|
|
if not data or 'permissions' not in data:
|
|
return jsonify({'success': False, 'message': '請提供權限列表'}), 400
|
|
|
|
permission_codes = data.get('permissions', [])
|
|
if not isinstance(permission_codes, list):
|
|
return jsonify({'success': False, 'message': '權限列表格式錯誤'}), 400
|
|
|
|
# 不允許修改自己的權限
|
|
current_user = get_current_user()
|
|
if current_user and current_user.get('user_id') == user_id:
|
|
return jsonify({'success': False, 'message': '無法修改自己的權限'}), 400
|
|
|
|
db_session = get_session()
|
|
try:
|
|
# 確認用戶存在
|
|
service = UserService(db_session)
|
|
user = service.get_user_by_id(user_id)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': '用戶不存在'}), 404
|
|
|
|
# 不允許修改其他 admin 的權限
|
|
if user.role == 'admin':
|
|
return jsonify({'success': False, 'message': '無法修改管理員的權限'}), 400
|
|
|
|
# 設定用戶權限
|
|
granted_by = current_user.get('user_id') if current_user else None
|
|
success, message = PermissionService.set_user_permissions(
|
|
user_id=user_id,
|
|
permission_codes=permission_codes,
|
|
granted_by=granted_by
|
|
)
|
|
|
|
if success:
|
|
# 清除權限快取
|
|
PermissionService.clear_user_permission_cache(user_id)
|
|
return jsonify({
|
|
'success': True,
|
|
'message': message
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': message
|
|
}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/users/<int:user_id>/apply_role_template', methods=['POST'])
|
|
@admin_required
|
|
def apply_role_template(user_id):
|
|
"""套用角色預設權限模板"""
|
|
from services.permission_service import PermissionService
|
|
|
|
data = request.get_json()
|
|
if not data or 'role' not in data:
|
|
return jsonify({'success': False, 'message': '請提供角色名稱'}), 400
|
|
|
|
role = data.get('role')
|
|
|
|
# 不允許修改自己的權限
|
|
current_user = get_current_user()
|
|
if current_user and current_user.get('user_id') == user_id:
|
|
return jsonify({'success': False, 'message': '無法修改自己的權限'}), 400
|
|
|
|
db_session = get_session()
|
|
try:
|
|
# 確認用戶存在
|
|
service = UserService(db_session)
|
|
user = service.get_user_by_id(user_id)
|
|
|
|
if not user:
|
|
return jsonify({'success': False, 'message': '用戶不存在'}), 404
|
|
|
|
# 不允許修改其他 admin 的權限
|
|
if user.role == 'admin':
|
|
return jsonify({'success': False, 'message': '無法修改管理員的權限'}), 400
|
|
|
|
# 套用角色模板
|
|
granted_by = current_user.get('user_id') if current_user else None
|
|
success, message = PermissionService.apply_role_template(
|
|
user_id=user_id,
|
|
role=role,
|
|
granted_by=granted_by
|
|
)
|
|
|
|
if success:
|
|
# 清除權限快取
|
|
PermissionService.clear_user_permission_cache(user_id)
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f"已套用 {role} 角色模板"
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': message
|
|
}), 400
|
|
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
@user_bp.route('/api/role_templates', methods=['GET'])
|
|
@admin_required
|
|
def get_role_templates():
|
|
"""取得所有角色預設權限模板"""
|
|
from services.permission_service import PermissionService
|
|
|
|
try:
|
|
templates = PermissionService.get_all_role_templates()
|
|
return jsonify({
|
|
'success': True,
|
|
'data': templates
|
|
})
|
|
except Exception as e:
|
|
return jsonify({
|
|
'success': False,
|
|
'message': str(e)
|
|
}), 500
|
|
|
|
|
|
print("✅ User routes 已載入")
|