""" 用戶服務模組 提供: - 用戶 CRUD 操作 - 用戶認證 - 登入歷史記錄 """ from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from database.user_models import User, LoginHistory from services.password_service import ( hash_password, verify_password, validate_password_complexity, is_password_expired, ) class UserService: """用戶服務類""" def __init__(self, db_session: Session): """ 初始化用戶服務 Args: db_session: SQLAlchemy Session 物件 """ self.db = db_session # ========================================== # 用戶 CRUD 操作 # ========================================== def create_user(self, username, password, role='user', email=None, display_name=None, created_by=None): """ 建立新用戶 Args: username: 帳號名稱 password: 明文密碼 role: 角色 (admin/manager/user) email: 電子郵件 (可選) display_name: 顯示名稱 (可選) created_by: 建立者 ID (可選) Returns: tuple: (user, error_message) - user: User 物件(成功時)或 None(失敗時) - error_message: 錯誤訊息(成功時為 None) """ # 驗證密碼複雜度 is_valid, error = validate_password_complexity(password) if not is_valid: return None, f"密碼不符合要求:{error}" # 驗證角色 if role not in User.ROLES: return None, f"無效的角色:{role}" # 檢查帳號是否已存在 existing = self.get_user_by_username(username) if existing: return None, f"帳號 '{username}' 已存在" # 檢查 email 是否已存在 if email: existing_email = self.db.query(User).filter(User.email == email).first() if existing_email: return None, f"電子郵件 '{email}' 已被使用" try: user = User( username=username, password_hash=hash_password(password), role=role, email=email, display_name=display_name or username, is_active=True, password_changed_at=datetime.now(), created_at=datetime.now(), created_by=created_by ) self.db.add(user) self.db.commit() self.db.refresh(user) # 自動套用角色預設權限(admin 以外的角色) if role != 'admin': try: from services.permission_service import PermissionService PermissionService.init_user_permissions_by_role(user.id, role, created_by) except Exception as perm_err: print(f"⚠️ 初始化用戶權限失敗: {perm_err}") return user, None except IntegrityError as e: self.db.rollback() return None, f"資料庫錯誤:{str(e)}" except Exception as e: self.db.rollback() return None, f"建立用戶失敗:{str(e)}" def get_user_by_id(self, user_id): """ 根據 ID 取得用戶 Args: user_id: 用戶 ID Returns: User 或 None """ return self.db.query(User).filter(User.id == user_id).first() def get_user_by_username(self, username): """ 根據帳號名稱取得用戶 Args: username: 帳號名稱 Returns: User 或 None """ return self.db.query(User).filter(User.username == username).first() def get_all_users(self, include_inactive=False): """ 取得所有用戶 Args: include_inactive: 是否包含停用的用戶 Returns: list[User] """ query = self.db.query(User) if not include_inactive: query = query.filter(User.is_active == True) return query.order_by(User.username).all() def update_user(self, user_id, **kwargs): """ 更新用戶資料 Args: user_id: 用戶 ID **kwargs: 要更新的欄位 Returns: tuple: (success, error_message) """ user = self.get_user_by_id(user_id) if not user: return False, "用戶不存在" allowed_fields = ['email', 'display_name', 'role', 'is_active'] try: for key, value in kwargs.items(): if key in allowed_fields: # 驗證角色 if key == 'role' and value not in User.ROLES: return False, f"無效的角色:{value}" # 檢查 email 唯一性 if key == 'email' and value: existing = self.db.query(User).filter( User.email == value, User.id != user_id ).first() if existing: return False, f"電子郵件 '{value}' 已被使用" setattr(user, key, value) user.updated_at = datetime.now() self.db.commit() return True, None except Exception as e: self.db.rollback() return False, f"更新失敗:{str(e)}" def change_password(self, user_id, old_password, new_password): """ 變更密碼 Args: user_id: 用戶 ID old_password: 舊密碼 new_password: 新密碼 Returns: tuple: (success, error_message) """ user = self.get_user_by_id(user_id) if not user: return False, "用戶不存在" # 驗證舊密碼 if not verify_password(user.password_hash, old_password): return False, "舊密碼不正確" # 驗證新密碼複雜度 is_valid, error = validate_password_complexity(new_password) if not is_valid: return False, f"新密碼不符合要求:{error}" # 檢查新密碼是否與舊密碼相同 if verify_password(user.password_hash, new_password): return False, "新密碼不能與舊密碼相同" try: user.password_hash = hash_password(new_password) user.password_changed_at = datetime.now() user.updated_at = datetime.now() self.db.commit() return True, None except Exception as e: self.db.rollback() return False, f"變更密碼失敗:{str(e)}" def reset_password(self, user_id, new_password, admin_id=None): """ 重設密碼(管理員功能) Args: user_id: 用戶 ID new_password: 新密碼 admin_id: 執行重設的管理員 ID Returns: tuple: (success, error_message) """ user = self.get_user_by_id(user_id) if not user: return False, "用戶不存在" # 驗證新密碼複雜度 is_valid, error = validate_password_complexity(new_password) if not is_valid: return False, f"新密碼不符合要求:{error}" try: user.password_hash = hash_password(new_password) user.password_changed_at = datetime.now() user.updated_at = datetime.now() self.db.commit() print(f"🔐 密碼重設 | User: {user.username} | Admin ID: {admin_id}") return True, None except Exception as e: self.db.rollback() return False, f"重設密碼失敗:{str(e)}" def delete_user(self, user_id): """ 刪除用戶(實際是停用) Args: user_id: 用戶 ID Returns: tuple: (success, error_message) """ user = self.get_user_by_id(user_id) if not user: return False, "用戶不存在" # 不允許刪除最後一個 admin if user.role == User.ROLE_ADMIN: admin_count = self.db.query(User).filter( User.role == User.ROLE_ADMIN, User.is_active == True ).count() if admin_count <= 1: return False, "無法刪除:系統必須保留至少一個管理員帳號" try: user.is_active = False user.updated_at = datetime.now() self.db.commit() return True, None except Exception as e: self.db.rollback() return False, f"刪除用戶失敗:{str(e)}" # ========================================== # 用戶認證 # ========================================== def authenticate(self, username, password): """ 驗證用戶登入 Args: username: 帳號名稱 password: 密碼 Returns: tuple: (user, error_message, need_password_change) - user: User 物件(成功時)或 None(失敗時) - error_message: 錯誤訊息(成功時為 None) - need_password_change: 是否需要變更密碼 """ user = self.get_user_by_username(username) if not user: return None, "帳號或密碼錯誤", False if not user.is_active: return None, "帳號已停用,請聯繫管理員", False if not verify_password(user.password_hash, password): return None, "帳號或密碼錯誤", False # 檢查密碼是否過期 is_expired, days_until = is_password_expired(user.password_changed_at) return user, None, is_expired # ========================================== # 登入歷史記錄 # ========================================== def record_login(self, user_id, username_attempted, ip_address, user_agent, status, failure_reason=None): """ 記錄登入歷史 Args: user_id: 用戶 ID (可為 None,用於記錄失敗的登入嘗試) username_attempted: 嘗試登入的帳號名稱 ip_address: IP 位址 user_agent: User-Agent 字串 status: 狀態 (success/failed/locked) failure_reason: 失敗原因 Returns: LoginHistory 物件 """ try: history = LoginHistory( user_id=user_id, username_attempted=username_attempted, login_time=datetime.now(), ip_address=ip_address, user_agent=user_agent[:256] if user_agent else None, # 限制長度 status=status, failure_reason=failure_reason ) self.db.add(history) self.db.commit() return history except Exception as e: self.db.rollback() print(f"⚠️ 記錄登入歷史失敗: {e}") return None def get_login_history(self, user_id=None, limit=100): """ 取得登入歷史記錄 Args: user_id: 用戶 ID (可選,None 表示取得所有用戶) limit: 最大筆數 Returns: list[LoginHistory] """ query = self.db.query(LoginHistory) if user_id: query = query.filter(LoginHistory.user_id == user_id) return query.order_by(LoginHistory.login_time.desc()).limit(limit).all() def get_recent_failed_logins(self, ip_address=None, minutes=30): """ 取得最近的登入失敗記錄 Args: ip_address: IP 位址 (可選) minutes: 時間範圍(分鐘) Returns: list[LoginHistory] """ from datetime import timedelta since = datetime.now() - timedelta(minutes=minutes) query = self.db.query(LoginHistory).filter( LoginHistory.login_time >= since, LoginHistory.status == LoginHistory.STATUS_FAILED ) if ip_address: query = query.filter(LoginHistory.ip_address == ip_address) return query.order_by(LoginHistory.login_time.desc()).all() def create_initial_admin(db_session, password=None): """ 建立初始管理員帳號 Args: db_session: SQLAlchemy Session password: 初始密碼(若不傳,從 INITIAL_ADMIN_PASSWORD 環境變數讀取) Returns: tuple: (success, message) """ import os if password is None: password = os.getenv('INITIAL_ADMIN_PASSWORD') if not password: raise RuntimeError( "INITIAL_ADMIN_PASSWORD 環境變數未設定,無法建立初始管理員。" "請在 .env 或 Docker 環境設定此值。" ) service = UserService(db_session) # 檢查是否已有管理員 existing_admin = db_session.query(User).filter(User.role == User.ROLE_ADMIN).first() if existing_admin: return False, f"已存在管理員帳號:{existing_admin.username}" # 建立預設管理員 user, error = service.create_user( username='admin', password=password, role=User.ROLE_ADMIN, display_name='系統管理員', ) if error: return False, f"建立管理員失敗:{error}" return True, f"管理員帳號 'admin' 建立成功" print("✅ User service 已載入")