#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 驗證函數模組 提供安全驗證、路徑防護、檔案上傳驗證等功能 """ import re import os import unicodedata from pathlib import Path import pandas as pd from services.logger_manager import SystemLogger sys_log = SystemLogger("Validators").get_logger() # ========================================== # SQL Injection 防護 # ========================================== # 允許的資料表白名單 ALLOWED_TABLES = { 'products', 'price_records', 'promo_products', 'monthly_summary_analysis', 'realtime_sales_monthly', 'realtime_sales_daily', 'daily_sales_snapshot', 'festival_products' } def validate_table_name(table_name): """ 驗證資料表名稱,防止 SQL Injection Args: table_name: 要驗證的資料表名稱 Returns: str: 驗證通過的表名 Raises: ValueError: 表名不在白名單中 """ table_name = str(table_name).strip() if not table_name: raise ValueError("表名不能為空") if not re.match(r'^[a-zA-Z0-9_]+$', table_name): raise ValueError(f"表名包含非法字符: {table_name}") if table_name not in ALLOWED_TABLES: sys_log.warning(f"[Security] 表名不在白名單中: {table_name}") sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM'] if any(keyword in table_name.upper() for keyword in sql_keywords): raise ValueError(f"表名包含 SQL 關鍵字: {table_name}") return table_name def validate_column_names(column_names): """ 驗證欄位名稱列表,防止 SQL Injection Args: column_names: 欄位名稱列表 Returns: list: 驗證通過的欄位名稱列表 Raises: ValueError: 欄位名稱包含非法字符 """ if isinstance(column_names, str): column_names = [column_names] validated = [] for col in column_names: col = str(col).strip() if not re.match(r'^[\w\u4e00-\u9fff]+$', col): raise ValueError(f"欄位名稱包含非法字符: {col}") validated.append(col) return validated def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None): """ 安全的 SQL 查詢函數,防止 SQL Injection Args: table_name: 資料表名稱 columns: 欄位列表,None 表示 * engine: SQLAlchemy engine where_clause: WHERE 子句 limit: 限制筆數 params: 參數化查詢的參數字典 Returns: DataFrame: 查詢結果 """ from sqlalchemy import text table_name = validate_table_name(table_name) if columns: columns = validate_column_names(columns) col_str = ', '.join([f'"{col}"' for col in columns]) else: col_str = '*' try: query = f'SELECT {col_str} FROM "{table_name}"' if where_clause: query += f' WHERE {where_clause}' if limit: query += f' LIMIT {int(limit)}' return pd.read_sql(text(query), engine, params=params) except Exception as e: sys_log.error(f"[Security] SQL 查詢失敗: {e}") raise # ========================================== # 路徑遍歷防護 # ========================================== def safe_join(base, *paths): """ 安全的路徑拼接,防止路徑遍歷攻擊 Args: base: 基礎目錄(絕對路徑) *paths: 子路徑組件 Returns: Path: 安全的完整路徑 Raises: ValueError: 偵測到路徑遍歷嘗試 """ base = Path(base).resolve() for path_component in paths: path_str = str(path_component) if '\\' in path_str: sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}") raise ValueError(f"路徑遍歷偵測: 不允許使用反斜線") if '..' in path_str.replace('\\', '/'): sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}") raise ValueError(f"路徑遍歷偵測: 不允許使用 '..'") full_path = (base / Path(*paths)).resolve() try: full_path.relative_to(base) except ValueError: sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}") raise ValueError(f"路徑遍歷偵測: 不允許存取基礎目錄外的檔案") return full_path # ========================================== # 檔案上傳安全驗證 # ========================================== ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'} ALLOWED_MIME_TYPES = { 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'application/vnd.ms-excel', 'text/csv', 'application/octet-stream' } def secure_filename_unicode(filename): """ 支援中文的安全檔案名稱清理 Args: filename: 原始檔案名稱 Returns: str: 清理後的安全檔案名稱 """ filename = unicodedata.normalize('NFKC', filename) safe_chars = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\(\)_\-\.]', '', filename) safe_chars = re.sub(r'\s+', ' ', safe_chars) safe_chars = safe_chars.strip() return safe_chars def allowed_file(filename): """ 檢查檔案副檔名是否在白名單中 Args: filename: 檔案名稱 Returns: bool: 是否允許上傳 """ if not filename or '.' not in filename: return False parts = filename.rsplit('.', 1) if len(parts) != 2: return False basename, ext = parts if not basename or basename.strip() == '': return False return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS def validate_upload_file(file): """ 完整的檔案上傳驗證(副檔名、檔案名稱清理) Args: file: Flask request.files 物件 Returns: tuple: (is_valid, error_message, safe_filename) """ if not file or file.filename == '': return False, '未選擇檔案', None original_filename = file.filename if '..' in original_filename: sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}") return False, '檔案名稱包含非法字元', None if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename): if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename: sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}") return False, '檔案名稱包含非法字元', None safe_name = secure_filename_unicode(original_filename) if not safe_name: return False, '檔案名稱不合法', None if not allowed_file(safe_name): return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None return True, None, safe_name