"""安全相關工具:SQL injection 防護、路徑遍歷防護、檔案上傳驗證。 從 app.py + utils/validators.py 整併的單一權威來源。純驗證邏輯,無 Flask 依賴。 舊的 utils/validators.py 已 deprecate,僅保留 re-export 不破壞既有 import。 """ import os import re import unicodedata from pathlib import Path import pandas as pd from utils.logger_manager import SystemLogger _log = SystemLogger("Security").get_logger() # ──────────────────────────────────────────────────────────────────────── # SQL Injection 防護 # ──────────────────────────────────────────────────────────────────────── # 整合 app.py 與 utils/validators.py 的兩份 ALLOWED_TABLES(取聯集,避免破壞既有呼叫者) ALLOWED_TABLES = { 'realtime_sales_monthly', 'realtime_sales_daily', 'daily_sales_snapshot', 'monthly_summary_analysis', 'products', 'price_records', 'promo_products', 'edm_products', 'festival_products', } _SQL_KEYWORDS = ('SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM') def validate_table_name(table_name): """驗證資料表名稱,防止 SQL Injection。""" table_name = str(table_name).strip() if not table_name: raise ValueError("表名不能為空") if not re.match(r'^[a-zA-Z0-9_]+$', table_name): raise ValueError(f"表名包含非法字符: {table_name}") if table_name not in ALLOWED_TABLES: _log.warning(f"[Security] 表名不在白名單中: {table_name}") if any(keyword in table_name.upper() for keyword in _SQL_KEYWORDS): raise ValueError(f"表名包含 SQL 關鍵字: {table_name}") return table_name def validate_column_names(column_names): """驗證欄位名稱列表,防止 SQL Injection。""" if isinstance(column_names, str): column_names = [column_names] validated = [] for col in column_names: col = str(col).strip() if not re.match(r'^[\w一-鿿]+$', col): raise ValueError(f"欄位名稱包含非法字符: {col}") validated.append(col) return validated def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None): """安全的 SQL 查詢函數,防止 SQL Injection。 Args: table_name: 資料表名稱(必驗證白名單) columns: 欄位列表,None 表示 * engine: SQLAlchemy engine where_clause: WHERE 子句(呼叫端負責安全) limit: 限制筆數(自動轉 int) params: 參數化查詢的參數字典 """ from sqlalchemy import text table_name = validate_table_name(table_name) if columns: columns = validate_column_names(columns) col_str = ', '.join([f'"{col}"' for col in columns]) else: col_str = '*' try: query = f'SELECT {col_str} FROM "{table_name}"' if where_clause: query += f' WHERE {where_clause}' if limit: query += f' LIMIT {int(limit)}' return pd.read_sql(text(query), engine, params=params) except Exception as e: _log.error(f"[Security] SQL 查詢失敗: {e}") raise # ──────────────────────────────────────────────────────────────────────── # 路徑遍歷防護 # ──────────────────────────────────────────────────────────────────────── def safe_join(base, *paths): """安全的路徑拼接,防止路徑遍歷攻擊。""" base = Path(base).resolve() for path_component in paths: path_str = str(path_component) if '\\' in path_str: _log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}") raise ValueError("路徑遍歷偵測: 不允許使用反斜線") if '..' in path_str.replace('\\', '/'): _log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}") raise ValueError("路徑遍歷偵測: 不允許使用 '..'") full_path = (base / Path(*paths)).resolve() try: full_path.relative_to(base) except ValueError: _log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}") raise ValueError("路徑遍歷偵測: 不允許存取基礎目錄外的檔案") return full_path # ──────────────────────────────────────────────────────────────────────── # 檔案上傳安全驗證 # ──────────────────────────────────────────────────────────────────────── ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'} ALLOWED_MIME_TYPES = { 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx 'application/vnd.ms-excel', # .xls 'text/csv', # .csv 'application/octet-stream', # CSV sometimes detected as this } def secure_filename_unicode(filename): """支援中文的安全檔案名稱清理。""" filename = unicodedata.normalize('NFKC', filename) safe_chars = re.sub(r'[^一-龥a-zA-Z0-9\s\(\)_\-\.]', '', filename) safe_chars = re.sub(r'\s+', ' ', safe_chars) return safe_chars.strip() def allowed_file(filename): """檢查檔案副檔名是否在白名單中。""" if not filename or '.' not in filename: return False parts = filename.rsplit('.', 1) if len(parts) != 2: return False basename, ext = parts if not basename or basename.strip() == '': return False return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS def validate_upload_file(file): """完整的檔案上傳驗證(副檔名、檔案名稱清理)。""" if not file or file.filename == '': return False, '未選擇檔案', None original_filename = file.filename if '..' in original_filename: _log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}") return False, '檔案名稱包含非法字元', None if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename): if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename: _log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}") return False, '檔案名稱包含非法字元', None safe_name = secure_filename_unicode(original_filename) if not safe_name: return False, '檔案名稱不合法', None if not allowed_file(safe_name): return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None return True, None, safe_name