Files
ewoooc/services/permission_service.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

432 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
權限服務層
==========
提供權限 CRUD 操作、角色模板應用、權限檢查等功能
"""
from datetime import datetime
from collections import defaultdict
from database.manager import get_session
from database.permission_models import (
Permission, UserPermission,
PERMISSIONS, ROLE_DEFAULT_PERMISSIONS, ALL_PERMISSION_CODES
)
class PermissionService:
"""權限服務類"""
# ========================================================================
# 權限定義查詢
# ========================================================================
@staticmethod
def get_all_permissions():
"""取得所有權限定義
Returns:
list: 按分類分組的權限列表
[
{
'category': '首頁/看板',
'permissions': [
{'code': 'dashboard.view', 'name': '查看首頁看板', ...},
...
]
},
...
]
"""
session = get_session()
try:
permissions = session.query(Permission).order_by(Permission.sort_order).all()
# 按分類分組
grouped = defaultdict(list)
for perm in permissions:
grouped[perm.category].append(perm.to_dict())
# 轉換為列表格式,保持分類順序
category_order = ['首頁/看板', '報表', '活動看板', '廠商缺貨', '匯入', '系統', '其他']
result = []
for category in category_order:
if category in grouped:
result.append({
'category': category,
'permissions': grouped[category]
})
# 添加未在預定義順序中的分類
for category in grouped:
if category not in category_order:
result.append({
'category': category,
'permissions': grouped[category]
})
return result
finally:
session.close()
@staticmethod
def get_all_permissions_flat():
"""取得所有權限定義(扁平列表)
Returns:
list: 權限字典列表
"""
session = get_session()
try:
permissions = session.query(Permission).order_by(Permission.sort_order).all()
return [perm.to_dict() for perm in permissions]
finally:
session.close()
@staticmethod
def get_permission_by_code(code):
"""根據代碼取得權限定義
Args:
code: 權限代碼
Returns:
Permission or None
"""
session = get_session()
try:
return session.query(Permission).filter_by(code=code).first()
finally:
session.close()
# ========================================================================
# 用戶權限管理
# ========================================================================
@staticmethod
def get_user_permissions(user_id):
"""取得用戶的所有權限代碼
Args:
user_id: 用戶 ID
Returns:
set: 權限代碼集合
"""
session = get_session()
try:
user_perms = session.query(UserPermission).filter_by(user_id=user_id).all()
return {up.permission_code for up in user_perms}
finally:
session.close()
@staticmethod
def get_user_permissions_detail(user_id):
"""取得用戶的權限詳細資訊
Args:
user_id: 用戶 ID
Returns:
list: 權限詳細資訊列表
"""
user_perm_codes = PermissionService.get_user_permissions(user_id)
all_permissions = PermissionService.get_all_permissions()
# 標記用戶已有的權限
for category_data in all_permissions:
for perm in category_data['permissions']:
perm['granted'] = perm['code'] in user_perm_codes
return all_permissions
@staticmethod
def set_user_permissions(user_id, permission_codes, granted_by=None):
"""設定用戶的權限(完全覆蓋)
Args:
user_id: 用戶 ID
permission_codes: 權限代碼列表
granted_by: 授權者 ID
Returns:
tuple: (success, message)
"""
session = get_session()
try:
# 驗證權限代碼是否有效
valid_codes = set(ALL_PERMISSION_CODES)
invalid_codes = set(permission_codes) - valid_codes
if invalid_codes:
return False, f"無效的權限代碼: {', '.join(invalid_codes)}"
# 刪除舊權限
session.query(UserPermission).filter_by(user_id=user_id).delete()
# 新增新權限
now = datetime.utcnow()
for code in permission_codes:
user_perm = UserPermission(
user_id=user_id,
permission_code=code,
granted_by=granted_by,
granted_at=now
)
session.add(user_perm)
session.commit()
return True, f"已設定 {len(permission_codes)} 項權限"
except Exception as e:
session.rollback()
return False, f"設定權限失敗: {str(e)}"
finally:
session.close()
@staticmethod
def add_user_permission(user_id, permission_code, granted_by=None):
"""新增單一用戶權限
Args:
user_id: 用戶 ID
permission_code: 權限代碼
granted_by: 授權者 ID
Returns:
tuple: (success, message)
"""
session = get_session()
try:
# 驗證權限代碼
if permission_code not in ALL_PERMISSION_CODES:
return False, f"無效的權限代碼: {permission_code}"
# 檢查是否已存在
existing = session.query(UserPermission).filter_by(
user_id=user_id,
permission_code=permission_code
).first()
if existing:
return True, "權限已存在"
user_perm = UserPermission(
user_id=user_id,
permission_code=permission_code,
granted_by=granted_by,
granted_at=datetime.utcnow()
)
session.add(user_perm)
session.commit()
return True, "權限已新增"
except Exception as e:
session.rollback()
return False, f"新增權限失敗: {str(e)}"
finally:
session.close()
@staticmethod
def remove_user_permission(user_id, permission_code):
"""移除單一用戶權限
Args:
user_id: 用戶 ID
permission_code: 權限代碼
Returns:
tuple: (success, message)
"""
session = get_session()
try:
deleted = session.query(UserPermission).filter_by(
user_id=user_id,
permission_code=permission_code
).delete()
session.commit()
if deleted:
return True, "權限已移除"
else:
return True, "權限不存在"
except Exception as e:
session.rollback()
return False, f"移除權限失敗: {str(e)}"
finally:
session.close()
# ========================================================================
# 角色模板
# ========================================================================
@staticmethod
def apply_role_template(user_id, role, granted_by=None):
"""套用角色預設權限模板
Args:
user_id: 用戶 ID
role: 角色名稱 ('admin', 'manager', 'user')
granted_by: 授權者 ID
Returns:
tuple: (success, message)
"""
if role not in ROLE_DEFAULT_PERMISSIONS:
return False, f"無效的角色: {role}"
permission_codes = ROLE_DEFAULT_PERMISSIONS[role]
return PermissionService.set_user_permissions(user_id, permission_codes, granted_by)
@staticmethod
def get_role_template(role):
"""取得角色的預設權限列表
Args:
role: 角色名稱
Returns:
list: 權限代碼列表,如果角色無效則返回空列表
"""
return ROLE_DEFAULT_PERMISSIONS.get(role, [])
@staticmethod
def get_all_role_templates():
"""取得所有角色模板
Returns:
dict: {role: [permission_codes]}
"""
return ROLE_DEFAULT_PERMISSIONS.copy()
# ========================================================================
# 權限檢查
# ========================================================================
@staticmethod
def has_permission(user_id, permission_code):
"""檢查用戶是否擁有特定權限
Args:
user_id: 用戶 ID
permission_code: 權限代碼
Returns:
bool: 是否擁有權限
"""
session = get_session()
try:
exists = session.query(UserPermission).filter_by(
user_id=user_id,
permission_code=permission_code
).first()
return exists is not None
finally:
session.close()
@staticmethod
def has_any_permission(user_id, *permission_codes):
"""檢查用戶是否擁有任一指定權限
Args:
user_id: 用戶 ID
*permission_codes: 權限代碼列表
Returns:
bool: 是否擁有任一權限
"""
if not permission_codes:
return False
session = get_session()
try:
exists = session.query(UserPermission).filter(
UserPermission.user_id == user_id,
UserPermission.permission_code.in_(permission_codes)
).first()
return exists is not None
finally:
session.close()
@staticmethod
def has_all_permissions(user_id, *permission_codes):
"""檢查用戶是否擁有所有指定權限
Args:
user_id: 用戶 ID
*permission_codes: 權限代碼列表
Returns:
bool: 是否擁有所有權限
"""
if not permission_codes:
return True
user_perms = PermissionService.get_user_permissions(user_id)
return all(code in user_perms for code in permission_codes)
# ========================================================================
# 用戶創建時的權限初始化
# ========================================================================
@staticmethod
def init_user_permissions_by_role(user_id, role, granted_by=None):
"""根據角色初始化用戶權限(用於新建用戶)
Args:
user_id: 用戶 ID
role: 角色名稱
granted_by: 授權者 ID
Returns:
tuple: (success, message)
"""
return PermissionService.apply_role_template(user_id, role, granted_by)
# ========================================================================
# 快取相關(可選優化)
# ========================================================================
_permission_cache = {}
@classmethod
def get_user_permissions_cached(cls, user_id, use_cache=True):
"""取得用戶權限(帶快取)
注意:修改權限後需要清除快取
Args:
user_id: 用戶 ID
use_cache: 是否使用快取
Returns:
set: 權限代碼集合
"""
cache_key = f"user_{user_id}"
if use_cache and cache_key in cls._permission_cache:
return cls._permission_cache[cache_key]
permissions = cls.get_user_permissions(user_id)
if use_cache:
cls._permission_cache[cache_key] = permissions
return permissions
@classmethod
def clear_user_permission_cache(cls, user_id=None):
"""清除用戶權限快取
Args:
user_id: 用戶 ID如果為 None 則清除所有快取
"""
if user_id is None:
cls._permission_cache.clear()
else:
cache_key = f"user_{user_id}"
cls._permission_cache.pop(cache_key, None)