Files
ewoooc/services/user_service.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

443 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
用戶服務模組
提供:
- 用戶 CRUD 操作
- 用戶認證
- 登入歷史記錄
"""
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from database.user_models import User, LoginHistory
from services.password_service import (
hash_password,
verify_password,
validate_password_complexity,
is_password_expired,
)
class UserService:
"""用戶服務類"""
def __init__(self, db_session: Session):
"""
初始化用戶服務
Args:
db_session: SQLAlchemy Session 物件
"""
self.db = db_session
# ==========================================
# 用戶 CRUD 操作
# ==========================================
def create_user(self, username, password, role='user', email=None, display_name=None, created_by=None):
"""
建立新用戶
Args:
username: 帳號名稱
password: 明文密碼
role: 角色 (admin/manager/user)
email: 電子郵件 (可選)
display_name: 顯示名稱 (可選)
created_by: 建立者 ID (可選)
Returns:
tuple: (user, error_message)
- user: User 物件(成功時)或 None失敗時
- error_message: 錯誤訊息(成功時為 None
"""
# 驗證密碼複雜度
is_valid, error = validate_password_complexity(password)
if not is_valid:
return None, f"密碼不符合要求:{error}"
# 驗證角色
if role not in User.ROLES:
return None, f"無效的角色:{role}"
# 檢查帳號是否已存在
existing = self.get_user_by_username(username)
if existing:
return None, f"帳號 '{username}' 已存在"
# 檢查 email 是否已存在
if email:
existing_email = self.db.query(User).filter(User.email == email).first()
if existing_email:
return None, f"電子郵件 '{email}' 已被使用"
try:
user = User(
username=username,
password_hash=hash_password(password),
role=role,
email=email,
display_name=display_name or username,
is_active=True,
password_changed_at=datetime.now(),
created_at=datetime.now(),
created_by=created_by
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
# 自動套用角色預設權限admin 以外的角色)
if role != 'admin':
try:
from services.permission_service import PermissionService
PermissionService.init_user_permissions_by_role(user.id, role, created_by)
except Exception as perm_err:
print(f"⚠️ 初始化用戶權限失敗: {perm_err}")
return user, None
except IntegrityError as e:
self.db.rollback()
return None, f"資料庫錯誤:{str(e)}"
except Exception as e:
self.db.rollback()
return None, f"建立用戶失敗:{str(e)}"
def get_user_by_id(self, user_id):
"""
根據 ID 取得用戶
Args:
user_id: 用戶 ID
Returns:
User 或 None
"""
return self.db.query(User).filter(User.id == user_id).first()
def get_user_by_username(self, username):
"""
根據帳號名稱取得用戶
Args:
username: 帳號名稱
Returns:
User 或 None
"""
return self.db.query(User).filter(User.username == username).first()
def get_all_users(self, include_inactive=False):
"""
取得所有用戶
Args:
include_inactive: 是否包含停用的用戶
Returns:
list[User]
"""
query = self.db.query(User)
if not include_inactive:
query = query.filter(User.is_active == True)
return query.order_by(User.username).all()
def update_user(self, user_id, **kwargs):
"""
更新用戶資料
Args:
user_id: 用戶 ID
**kwargs: 要更新的欄位
Returns:
tuple: (success, error_message)
"""
user = self.get_user_by_id(user_id)
if not user:
return False, "用戶不存在"
allowed_fields = ['email', 'display_name', 'role', 'is_active']
try:
for key, value in kwargs.items():
if key in allowed_fields:
# 驗證角色
if key == 'role' and value not in User.ROLES:
return False, f"無效的角色:{value}"
# 檢查 email 唯一性
if key == 'email' and value:
existing = self.db.query(User).filter(
User.email == value,
User.id != user_id
).first()
if existing:
return False, f"電子郵件 '{value}' 已被使用"
setattr(user, key, value)
user.updated_at = datetime.now()
self.db.commit()
return True, None
except Exception as e:
self.db.rollback()
return False, f"更新失敗:{str(e)}"
def change_password(self, user_id, old_password, new_password):
"""
變更密碼
Args:
user_id: 用戶 ID
old_password: 舊密碼
new_password: 新密碼
Returns:
tuple: (success, error_message)
"""
user = self.get_user_by_id(user_id)
if not user:
return False, "用戶不存在"
# 驗證舊密碼
if not verify_password(user.password_hash, old_password):
return False, "舊密碼不正確"
# 驗證新密碼複雜度
is_valid, error = validate_password_complexity(new_password)
if not is_valid:
return False, f"新密碼不符合要求:{error}"
# 檢查新密碼是否與舊密碼相同
if verify_password(user.password_hash, new_password):
return False, "新密碼不能與舊密碼相同"
try:
user.password_hash = hash_password(new_password)
user.password_changed_at = datetime.now()
user.updated_at = datetime.now()
self.db.commit()
return True, None
except Exception as e:
self.db.rollback()
return False, f"變更密碼失敗:{str(e)}"
def reset_password(self, user_id, new_password, admin_id=None):
"""
重設密碼(管理員功能)
Args:
user_id: 用戶 ID
new_password: 新密碼
admin_id: 執行重設的管理員 ID
Returns:
tuple: (success, error_message)
"""
user = self.get_user_by_id(user_id)
if not user:
return False, "用戶不存在"
# 驗證新密碼複雜度
is_valid, error = validate_password_complexity(new_password)
if not is_valid:
return False, f"新密碼不符合要求:{error}"
try:
user.password_hash = hash_password(new_password)
user.password_changed_at = datetime.now()
user.updated_at = datetime.now()
self.db.commit()
print(f"🔐 密碼重設 | User: {user.username} | Admin ID: {admin_id}")
return True, None
except Exception as e:
self.db.rollback()
return False, f"重設密碼失敗:{str(e)}"
def delete_user(self, user_id):
"""
刪除用戶(實際是停用)
Args:
user_id: 用戶 ID
Returns:
tuple: (success, error_message)
"""
user = self.get_user_by_id(user_id)
if not user:
return False, "用戶不存在"
# 不允許刪除最後一個 admin
if user.role == User.ROLE_ADMIN:
admin_count = self.db.query(User).filter(
User.role == User.ROLE_ADMIN,
User.is_active == True
).count()
if admin_count <= 1:
return False, "無法刪除:系統必須保留至少一個管理員帳號"
try:
user.is_active = False
user.updated_at = datetime.now()
self.db.commit()
return True, None
except Exception as e:
self.db.rollback()
return False, f"刪除用戶失敗:{str(e)}"
# ==========================================
# 用戶認證
# ==========================================
def authenticate(self, username, password):
"""
驗證用戶登入
Args:
username: 帳號名稱
password: 密碼
Returns:
tuple: (user, error_message, need_password_change)
- user: User 物件(成功時)或 None失敗時
- error_message: 錯誤訊息(成功時為 None
- need_password_change: 是否需要變更密碼
"""
user = self.get_user_by_username(username)
if not user:
return None, "帳號或密碼錯誤", False
if not user.is_active:
return None, "帳號已停用,請聯繫管理員", False
if not verify_password(user.password_hash, password):
return None, "帳號或密碼錯誤", False
# 檢查密碼是否過期
is_expired, days_until = is_password_expired(user.password_changed_at)
return user, None, is_expired
# ==========================================
# 登入歷史記錄
# ==========================================
def record_login(self, user_id, username_attempted, ip_address, user_agent, status, failure_reason=None):
"""
記錄登入歷史
Args:
user_id: 用戶 ID (可為 None用於記錄失敗的登入嘗試)
username_attempted: 嘗試登入的帳號名稱
ip_address: IP 位址
user_agent: User-Agent 字串
status: 狀態 (success/failed/locked)
failure_reason: 失敗原因
Returns:
LoginHistory 物件
"""
try:
history = LoginHistory(
user_id=user_id,
username_attempted=username_attempted,
login_time=datetime.now(),
ip_address=ip_address,
user_agent=user_agent[:256] if user_agent else None, # 限制長度
status=status,
failure_reason=failure_reason
)
self.db.add(history)
self.db.commit()
return history
except Exception as e:
self.db.rollback()
print(f"⚠️ 記錄登入歷史失敗: {e}")
return None
def get_login_history(self, user_id=None, limit=100):
"""
取得登入歷史記錄
Args:
user_id: 用戶 ID (可選None 表示取得所有用戶)
limit: 最大筆數
Returns:
list[LoginHistory]
"""
query = self.db.query(LoginHistory)
if user_id:
query = query.filter(LoginHistory.user_id == user_id)
return query.order_by(LoginHistory.login_time.desc()).limit(limit).all()
def get_recent_failed_logins(self, ip_address=None, minutes=30):
"""
取得最近的登入失敗記錄
Args:
ip_address: IP 位址 (可選)
minutes: 時間範圍(分鐘)
Returns:
list[LoginHistory]
"""
from datetime import timedelta
since = datetime.now() - timedelta(minutes=minutes)
query = self.db.query(LoginHistory).filter(
LoginHistory.login_time >= since,
LoginHistory.status == LoginHistory.STATUS_FAILED
)
if ip_address:
query = query.filter(LoginHistory.ip_address == ip_address)
return query.order_by(LoginHistory.login_time.desc()).all()
def create_initial_admin(db_session, password='Wooo@2026!'):
"""
建立初始管理員帳號
Args:
db_session: SQLAlchemy Session
password: 初始密碼
Returns:
tuple: (success, message)
"""
service = UserService(db_session)
# 檢查是否已有管理員
existing_admin = db_session.query(User).filter(User.role == User.ROLE_ADMIN).first()
if existing_admin:
return False, f"已存在管理員帳號:{existing_admin.username}"
# 建立預設管理員
user, error = service.create_user(
username='admin',
password=password,
role=User.ROLE_ADMIN,
display_name='系統管理員',
)
if error:
return False, f"建立管理員失敗:{error}"
return True, f"管理員帳號 'admin' 建立成功"
print("✅ User service 已載入")