""" 權限服務層 ========== 提供權限 CRUD 操作、角色模板應用、權限檢查等功能 """ from datetime import datetime from collections import defaultdict from database.manager import get_session from database.permission_models import ( Permission, UserPermission, PERMISSIONS, ROLE_DEFAULT_PERMISSIONS, ALL_PERMISSION_CODES ) class PermissionService: """權限服務類""" # ======================================================================== # 權限定義查詢 # ======================================================================== @staticmethod def get_all_permissions(): """取得所有權限定義 Returns: list: 按分類分組的權限列表 [ { 'category': '首頁/看板', 'permissions': [ {'code': 'dashboard.view', 'name': '查看首頁看板', ...}, ... ] }, ... ] """ session = get_session() try: permissions = session.query(Permission).order_by(Permission.sort_order).all() # 按分類分組 grouped = defaultdict(list) for perm in permissions: grouped[perm.category].append(perm.to_dict()) # 轉換為列表格式,保持分類順序 category_order = ['首頁/看板', '報表', '活動看板', '廠商缺貨', '匯入', '系統', '其他'] result = [] for category in category_order: if category in grouped: result.append({ 'category': category, 'permissions': grouped[category] }) # 添加未在預定義順序中的分類 for category in grouped: if category not in category_order: result.append({ 'category': category, 'permissions': grouped[category] }) return result finally: session.close() @staticmethod def get_all_permissions_flat(): """取得所有權限定義(扁平列表) Returns: list: 權限字典列表 """ session = get_session() try: permissions = session.query(Permission).order_by(Permission.sort_order).all() return [perm.to_dict() for perm in permissions] finally: session.close() @staticmethod def get_permission_by_code(code): """根據代碼取得權限定義 Args: code: 權限代碼 Returns: Permission or None """ session = get_session() try: return session.query(Permission).filter_by(code=code).first() finally: session.close() # ======================================================================== # 用戶權限管理 # ======================================================================== @staticmethod def get_user_permissions(user_id): """取得用戶的所有權限代碼 Args: user_id: 用戶 ID Returns: set: 權限代碼集合 """ session = get_session() try: user_perms = session.query(UserPermission).filter_by(user_id=user_id).all() return {up.permission_code for up in user_perms} finally: session.close() @staticmethod def get_user_permissions_detail(user_id): """取得用戶的權限詳細資訊 Args: user_id: 用戶 ID Returns: list: 權限詳細資訊列表 """ user_perm_codes = PermissionService.get_user_permissions(user_id) all_permissions = PermissionService.get_all_permissions() # 標記用戶已有的權限 for category_data in all_permissions: for perm in category_data['permissions']: perm['granted'] = perm['code'] in user_perm_codes return all_permissions @staticmethod def set_user_permissions(user_id, permission_codes, granted_by=None): """設定用戶的權限(完全覆蓋) Args: user_id: 用戶 ID permission_codes: 權限代碼列表 granted_by: 授權者 ID Returns: tuple: (success, message) """ session = get_session() try: # 驗證權限代碼是否有效 valid_codes = set(ALL_PERMISSION_CODES) invalid_codes = set(permission_codes) - valid_codes if invalid_codes: return False, f"無效的權限代碼: {', '.join(invalid_codes)}" # 刪除舊權限 session.query(UserPermission).filter_by(user_id=user_id).delete() # 新增新權限 now = datetime.utcnow() for code in permission_codes: user_perm = UserPermission( user_id=user_id, permission_code=code, granted_by=granted_by, granted_at=now ) session.add(user_perm) session.commit() return True, f"已設定 {len(permission_codes)} 項權限" except Exception as e: session.rollback() return False, f"設定權限失敗: {str(e)}" finally: session.close() @staticmethod def add_user_permission(user_id, permission_code, granted_by=None): """新增單一用戶權限 Args: user_id: 用戶 ID permission_code: 權限代碼 granted_by: 授權者 ID Returns: tuple: (success, message) """ session = get_session() try: # 驗證權限代碼 if permission_code not in ALL_PERMISSION_CODES: return False, f"無效的權限代碼: {permission_code}" # 檢查是否已存在 existing = session.query(UserPermission).filter_by( user_id=user_id, permission_code=permission_code ).first() if existing: return True, "權限已存在" user_perm = UserPermission( user_id=user_id, permission_code=permission_code, granted_by=granted_by, granted_at=datetime.utcnow() ) session.add(user_perm) session.commit() return True, "權限已新增" except Exception as e: session.rollback() return False, f"新增權限失敗: {str(e)}" finally: session.close() @staticmethod def remove_user_permission(user_id, permission_code): """移除單一用戶權限 Args: user_id: 用戶 ID permission_code: 權限代碼 Returns: tuple: (success, message) """ session = get_session() try: deleted = session.query(UserPermission).filter_by( user_id=user_id, permission_code=permission_code ).delete() session.commit() if deleted: return True, "權限已移除" else: return True, "權限不存在" except Exception as e: session.rollback() return False, f"移除權限失敗: {str(e)}" finally: session.close() # ======================================================================== # 角色模板 # ======================================================================== @staticmethod def apply_role_template(user_id, role, granted_by=None): """套用角色預設權限模板 Args: user_id: 用戶 ID role: 角色名稱 ('admin', 'manager', 'user') granted_by: 授權者 ID Returns: tuple: (success, message) """ if role not in ROLE_DEFAULT_PERMISSIONS: return False, f"無效的角色: {role}" permission_codes = ROLE_DEFAULT_PERMISSIONS[role] return PermissionService.set_user_permissions(user_id, permission_codes, granted_by) @staticmethod def get_role_template(role): """取得角色的預設權限列表 Args: role: 角色名稱 Returns: list: 權限代碼列表,如果角色無效則返回空列表 """ return ROLE_DEFAULT_PERMISSIONS.get(role, []) @staticmethod def get_all_role_templates(): """取得所有角色模板 Returns: dict: {role: [permission_codes]} """ return ROLE_DEFAULT_PERMISSIONS.copy() # ======================================================================== # 權限檢查 # ======================================================================== @staticmethod def has_permission(user_id, permission_code): """檢查用戶是否擁有特定權限 Args: user_id: 用戶 ID permission_code: 權限代碼 Returns: bool: 是否擁有權限 """ session = get_session() try: exists = session.query(UserPermission).filter_by( user_id=user_id, permission_code=permission_code ).first() return exists is not None finally: session.close() @staticmethod def has_any_permission(user_id, *permission_codes): """檢查用戶是否擁有任一指定權限 Args: user_id: 用戶 ID *permission_codes: 權限代碼列表 Returns: bool: 是否擁有任一權限 """ if not permission_codes: return False session = get_session() try: exists = session.query(UserPermission).filter( UserPermission.user_id == user_id, UserPermission.permission_code.in_(permission_codes) ).first() return exists is not None finally: session.close() @staticmethod def has_all_permissions(user_id, *permission_codes): """檢查用戶是否擁有所有指定權限 Args: user_id: 用戶 ID *permission_codes: 權限代碼列表 Returns: bool: 是否擁有所有權限 """ if not permission_codes: return True user_perms = PermissionService.get_user_permissions(user_id) return all(code in user_perms for code in permission_codes) # ======================================================================== # 用戶創建時的權限初始化 # ======================================================================== @staticmethod def init_user_permissions_by_role(user_id, role, granted_by=None): """根據角色初始化用戶權限(用於新建用戶) Args: user_id: 用戶 ID role: 角色名稱 granted_by: 授權者 ID Returns: tuple: (success, message) """ return PermissionService.apply_role_template(user_id, role, granted_by) # ======================================================================== # 快取相關(可選優化) # ======================================================================== _permission_cache = {} @classmethod def get_user_permissions_cached(cls, user_id, use_cache=True): """取得用戶權限(帶快取) 注意:修改權限後需要清除快取 Args: user_id: 用戶 ID use_cache: 是否使用快取 Returns: set: 權限代碼集合 """ cache_key = f"user_{user_id}" if use_cache and cache_key in cls._permission_cache: return cls._permission_cache[cache_key] permissions = cls.get_user_permissions(user_id) if use_cache: cls._permission_cache[cache_key] = permissions return permissions @classmethod def clear_user_permission_cache(cls, user_id=None): """清除用戶權限快取 Args: user_id: 用戶 ID,如果為 None 則清除所有快取 """ if user_id is None: cls._permission_cache.clear() else: cache_key = f"user_{user_id}" cls._permission_cache.pop(cache_key, None)