Some checks failed
CD Pipeline / deploy (push) Failing after 59s
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml) - 部署模式: rsync Python 檔案至 188 → docker restart (volume mount) - Dockerfile/requirements 變動時自動重建 Docker image - 部署通知: Telegram (開始/成功/失敗) - 健康檢查: https://mo.wooo.work/health (最多 5 次重試) - 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
432 lines
12 KiB
Python
432 lines
12 KiB
Python
"""
|
||
權限服務層
|
||
==========
|
||
提供權限 CRUD 操作、角色模板應用、權限檢查等功能
|
||
"""
|
||
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
from database.manager import get_session
|
||
from database.permission_models import (
|
||
Permission, UserPermission,
|
||
PERMISSIONS, ROLE_DEFAULT_PERMISSIONS, ALL_PERMISSION_CODES
|
||
)
|
||
|
||
|
||
class PermissionService:
|
||
"""權限服務類"""
|
||
|
||
# ========================================================================
|
||
# 權限定義查詢
|
||
# ========================================================================
|
||
|
||
@staticmethod
|
||
def get_all_permissions():
|
||
"""取得所有權限定義
|
||
|
||
Returns:
|
||
list: 按分類分組的權限列表
|
||
[
|
||
{
|
||
'category': '首頁/看板',
|
||
'permissions': [
|
||
{'code': 'dashboard.view', 'name': '查看首頁看板', ...},
|
||
...
|
||
]
|
||
},
|
||
...
|
||
]
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
permissions = session.query(Permission).order_by(Permission.sort_order).all()
|
||
|
||
# 按分類分組
|
||
grouped = defaultdict(list)
|
||
for perm in permissions:
|
||
grouped[perm.category].append(perm.to_dict())
|
||
|
||
# 轉換為列表格式,保持分類順序
|
||
category_order = ['首頁/看板', '報表', '活動看板', '廠商缺貨', '匯入', '系統', '其他']
|
||
result = []
|
||
|
||
for category in category_order:
|
||
if category in grouped:
|
||
result.append({
|
||
'category': category,
|
||
'permissions': grouped[category]
|
||
})
|
||
|
||
# 添加未在預定義順序中的分類
|
||
for category in grouped:
|
||
if category not in category_order:
|
||
result.append({
|
||
'category': category,
|
||
'permissions': grouped[category]
|
||
})
|
||
|
||
return result
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def get_all_permissions_flat():
|
||
"""取得所有權限定義(扁平列表)
|
||
|
||
Returns:
|
||
list: 權限字典列表
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
permissions = session.query(Permission).order_by(Permission.sort_order).all()
|
||
return [perm.to_dict() for perm in permissions]
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def get_permission_by_code(code):
|
||
"""根據代碼取得權限定義
|
||
|
||
Args:
|
||
code: 權限代碼
|
||
|
||
Returns:
|
||
Permission or None
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
return session.query(Permission).filter_by(code=code).first()
|
||
finally:
|
||
session.close()
|
||
|
||
# ========================================================================
|
||
# 用戶權限管理
|
||
# ========================================================================
|
||
|
||
@staticmethod
|
||
def get_user_permissions(user_id):
|
||
"""取得用戶的所有權限代碼
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
|
||
Returns:
|
||
set: 權限代碼集合
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
user_perms = session.query(UserPermission).filter_by(user_id=user_id).all()
|
||
return {up.permission_code for up in user_perms}
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def get_user_permissions_detail(user_id):
|
||
"""取得用戶的權限詳細資訊
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
|
||
Returns:
|
||
list: 權限詳細資訊列表
|
||
"""
|
||
user_perm_codes = PermissionService.get_user_permissions(user_id)
|
||
|
||
all_permissions = PermissionService.get_all_permissions()
|
||
|
||
# 標記用戶已有的權限
|
||
for category_data in all_permissions:
|
||
for perm in category_data['permissions']:
|
||
perm['granted'] = perm['code'] in user_perm_codes
|
||
|
||
return all_permissions
|
||
|
||
@staticmethod
|
||
def set_user_permissions(user_id, permission_codes, granted_by=None):
|
||
"""設定用戶的權限(完全覆蓋)
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
permission_codes: 權限代碼列表
|
||
granted_by: 授權者 ID
|
||
|
||
Returns:
|
||
tuple: (success, message)
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
# 驗證權限代碼是否有效
|
||
valid_codes = set(ALL_PERMISSION_CODES)
|
||
invalid_codes = set(permission_codes) - valid_codes
|
||
if invalid_codes:
|
||
return False, f"無效的權限代碼: {', '.join(invalid_codes)}"
|
||
|
||
# 刪除舊權限
|
||
session.query(UserPermission).filter_by(user_id=user_id).delete()
|
||
|
||
# 新增新權限
|
||
now = datetime.utcnow()
|
||
for code in permission_codes:
|
||
user_perm = UserPermission(
|
||
user_id=user_id,
|
||
permission_code=code,
|
||
granted_by=granted_by,
|
||
granted_at=now
|
||
)
|
||
session.add(user_perm)
|
||
|
||
session.commit()
|
||
return True, f"已設定 {len(permission_codes)} 項權限"
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
return False, f"設定權限失敗: {str(e)}"
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def add_user_permission(user_id, permission_code, granted_by=None):
|
||
"""新增單一用戶權限
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
permission_code: 權限代碼
|
||
granted_by: 授權者 ID
|
||
|
||
Returns:
|
||
tuple: (success, message)
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
# 驗證權限代碼
|
||
if permission_code not in ALL_PERMISSION_CODES:
|
||
return False, f"無效的權限代碼: {permission_code}"
|
||
|
||
# 檢查是否已存在
|
||
existing = session.query(UserPermission).filter_by(
|
||
user_id=user_id,
|
||
permission_code=permission_code
|
||
).first()
|
||
|
||
if existing:
|
||
return True, "權限已存在"
|
||
|
||
user_perm = UserPermission(
|
||
user_id=user_id,
|
||
permission_code=permission_code,
|
||
granted_by=granted_by,
|
||
granted_at=datetime.utcnow()
|
||
)
|
||
session.add(user_perm)
|
||
session.commit()
|
||
|
||
return True, "權限已新增"
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
return False, f"新增權限失敗: {str(e)}"
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def remove_user_permission(user_id, permission_code):
|
||
"""移除單一用戶權限
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
permission_code: 權限代碼
|
||
|
||
Returns:
|
||
tuple: (success, message)
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
deleted = session.query(UserPermission).filter_by(
|
||
user_id=user_id,
|
||
permission_code=permission_code
|
||
).delete()
|
||
|
||
session.commit()
|
||
|
||
if deleted:
|
||
return True, "權限已移除"
|
||
else:
|
||
return True, "權限不存在"
|
||
|
||
except Exception as e:
|
||
session.rollback()
|
||
return False, f"移除權限失敗: {str(e)}"
|
||
finally:
|
||
session.close()
|
||
|
||
# ========================================================================
|
||
# 角色模板
|
||
# ========================================================================
|
||
|
||
@staticmethod
|
||
def apply_role_template(user_id, role, granted_by=None):
|
||
"""套用角色預設權限模板
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
role: 角色名稱 ('admin', 'manager', 'user')
|
||
granted_by: 授權者 ID
|
||
|
||
Returns:
|
||
tuple: (success, message)
|
||
"""
|
||
if role not in ROLE_DEFAULT_PERMISSIONS:
|
||
return False, f"無效的角色: {role}"
|
||
|
||
permission_codes = ROLE_DEFAULT_PERMISSIONS[role]
|
||
return PermissionService.set_user_permissions(user_id, permission_codes, granted_by)
|
||
|
||
@staticmethod
|
||
def get_role_template(role):
|
||
"""取得角色的預設權限列表
|
||
|
||
Args:
|
||
role: 角色名稱
|
||
|
||
Returns:
|
||
list: 權限代碼列表,如果角色無效則返回空列表
|
||
"""
|
||
return ROLE_DEFAULT_PERMISSIONS.get(role, [])
|
||
|
||
@staticmethod
|
||
def get_all_role_templates():
|
||
"""取得所有角色模板
|
||
|
||
Returns:
|
||
dict: {role: [permission_codes]}
|
||
"""
|
||
return ROLE_DEFAULT_PERMISSIONS.copy()
|
||
|
||
# ========================================================================
|
||
# 權限檢查
|
||
# ========================================================================
|
||
|
||
@staticmethod
|
||
def has_permission(user_id, permission_code):
|
||
"""檢查用戶是否擁有特定權限
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
permission_code: 權限代碼
|
||
|
||
Returns:
|
||
bool: 是否擁有權限
|
||
"""
|
||
session = get_session()
|
||
try:
|
||
exists = session.query(UserPermission).filter_by(
|
||
user_id=user_id,
|
||
permission_code=permission_code
|
||
).first()
|
||
return exists is not None
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def has_any_permission(user_id, *permission_codes):
|
||
"""檢查用戶是否擁有任一指定權限
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
*permission_codes: 權限代碼列表
|
||
|
||
Returns:
|
||
bool: 是否擁有任一權限
|
||
"""
|
||
if not permission_codes:
|
||
return False
|
||
|
||
session = get_session()
|
||
try:
|
||
exists = session.query(UserPermission).filter(
|
||
UserPermission.user_id == user_id,
|
||
UserPermission.permission_code.in_(permission_codes)
|
||
).first()
|
||
return exists is not None
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def has_all_permissions(user_id, *permission_codes):
|
||
"""檢查用戶是否擁有所有指定權限
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
*permission_codes: 權限代碼列表
|
||
|
||
Returns:
|
||
bool: 是否擁有所有權限
|
||
"""
|
||
if not permission_codes:
|
||
return True
|
||
|
||
user_perms = PermissionService.get_user_permissions(user_id)
|
||
return all(code in user_perms for code in permission_codes)
|
||
|
||
# ========================================================================
|
||
# 用戶創建時的權限初始化
|
||
# ========================================================================
|
||
|
||
@staticmethod
|
||
def init_user_permissions_by_role(user_id, role, granted_by=None):
|
||
"""根據角色初始化用戶權限(用於新建用戶)
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
role: 角色名稱
|
||
granted_by: 授權者 ID
|
||
|
||
Returns:
|
||
tuple: (success, message)
|
||
"""
|
||
return PermissionService.apply_role_template(user_id, role, granted_by)
|
||
|
||
# ========================================================================
|
||
# 快取相關(可選優化)
|
||
# ========================================================================
|
||
|
||
_permission_cache = {}
|
||
|
||
@classmethod
|
||
def get_user_permissions_cached(cls, user_id, use_cache=True):
|
||
"""取得用戶權限(帶快取)
|
||
|
||
注意:修改權限後需要清除快取
|
||
|
||
Args:
|
||
user_id: 用戶 ID
|
||
use_cache: 是否使用快取
|
||
|
||
Returns:
|
||
set: 權限代碼集合
|
||
"""
|
||
cache_key = f"user_{user_id}"
|
||
|
||
if use_cache and cache_key in cls._permission_cache:
|
||
return cls._permission_cache[cache_key]
|
||
|
||
permissions = cls.get_user_permissions(user_id)
|
||
|
||
if use_cache:
|
||
cls._permission_cache[cache_key] = permissions
|
||
|
||
return permissions
|
||
|
||
@classmethod
|
||
def clear_user_permission_cache(cls, user_id=None):
|
||
"""清除用戶權限快取
|
||
|
||
Args:
|
||
user_id: 用戶 ID,如果為 None 則清除所有快取
|
||
"""
|
||
if user_id is None:
|
||
cls._permission_cache.clear()
|
||
else:
|
||
cache_key = f"user_{user_id}"
|
||
cls._permission_cache.pop(cache_key, None)
|