diff --git a/app.py b/app.py index beed05b..a76154b 100644 --- a/app.py +++ b/app.py @@ -148,45 +148,9 @@ from utils.security import ( # noqa: E402 validate_column_names, ) -def safe_read_sql(table_name, columns=None, engine=None, where_clause=None): - """ - 安全的 SQL 查詢函數,防止 SQL Injection - - Args: - table_name: 資料表名稱 - columns: 欄位列表,None 表示 * - engine: SQLAlchemy engine - where_clause: WHERE 子句(僅支持簡單條件) - - Returns: - DataFrame: 查詢結果 - """ - from sqlalchemy import text, MetaData, Table - - # 驗證表名 - table_name = validate_table_name(table_name) - - # 驗證欄位名 - if columns: - columns = validate_column_names(columns) - col_str = ', '.join([f'"{col}"' for col in columns]) - else: - col_str = '*' - - # 使用 SQLAlchemy 的參數化查詢 - # 注意:表名和欄位名不能參數化,所以必須先驗證 - try: - query = f'SELECT {col_str} FROM "{table_name}"' - if where_clause: - query += f' WHERE {where_clause}' - - return pd.read_sql(text(query), engine) - except Exception as e: - sys_log.error(f"[Security] SQL 查詢失敗: {e}") - raise - -# 安全工具:路徑遍歷 + 檔案上傳驗證已搬至 utils/security.py +# 安全工具:路徑遍歷 + 檔案上傳驗證 + safe_read_sql 已搬至 utils/security.py from utils.security import ( # noqa: E402 + safe_read_sql, safe_join, ALLOWED_UPLOAD_EXTENSIONS, ALLOWED_MIME_TYPES, diff --git a/utils/security.py b/utils/security.py index 654caf9..ae6a792 100644 --- a/utils/security.py +++ b/utils/security.py @@ -1,12 +1,15 @@ """安全相關工具:SQL injection 防護、路徑遍歷防護、檔案上傳驗證。 -從 app.py 抽出,純驗證邏輯,無 Flask 依賴。 +從 app.py + utils/validators.py 整併的單一權威來源。純驗證邏輯,無 Flask 依賴。 +舊的 utils/validators.py 已 deprecate,僅保留 re-export 不破壞既有 import。 """ import os import re import unicodedata from pathlib import Path +import pandas as pd + from utils.logger_manager import SystemLogger _log = SystemLogger("Security").get_logger() @@ -16,9 +19,12 @@ _log = SystemLogger("Security").get_logger() # SQL Injection 防護 # ──────────────────────────────────────────────────────────────────────── +# 整合 app.py 與 utils/validators.py 的兩份 ALLOWED_TABLES(取聯集,避免破壞既有呼叫者) ALLOWED_TABLES = { 'realtime_sales_monthly', + 'realtime_sales_daily', 'daily_sales_snapshot', + 'monthly_summary_analysis', 'products', 'price_records', 'promo_products', @@ -62,6 +68,40 @@ def validate_column_names(column_names): return validated +def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None): + """安全的 SQL 查詢函數,防止 SQL Injection。 + + Args: + table_name: 資料表名稱(必驗證白名單) + columns: 欄位列表,None 表示 * + engine: SQLAlchemy engine + where_clause: WHERE 子句(呼叫端負責安全) + limit: 限制筆數(自動轉 int) + params: 參數化查詢的參數字典 + """ + from sqlalchemy import text + + table_name = validate_table_name(table_name) + + if columns: + columns = validate_column_names(columns) + col_str = ', '.join([f'"{col}"' for col in columns]) + else: + col_str = '*' + + try: + query = f'SELECT {col_str} FROM "{table_name}"' + if where_clause: + query += f' WHERE {where_clause}' + if limit: + query += f' LIMIT {int(limit)}' + + return pd.read_sql(text(query), engine, params=params) + except Exception as e: + _log.error(f"[Security] SQL 查詢失敗: {e}") + raise + + # ──────────────────────────────────────────────────────────────────────── # 路徑遍歷防護 # ──────────────────────────────────────────────────────────────────────── diff --git a/utils/validators.py b/utils/validators.py index 5d1dd2e..858b7d0 100644 --- a/utils/validators.py +++ b/utils/validators.py @@ -1,253 +1,19 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -驗證函數模組 -提供安全驗證、路徑防護、檔案上傳驗證等功能 +"""DEPRECATED — 完整實作已搬至 utils/security.py。 + +此檔案僅保留為 backward-compat re-export,避免破壞既有 import。 +新程式碼請直接 `from utils.security import ...`。 """ -import re -import os -import unicodedata -from pathlib import Path -import pandas as pd -from services.logger_manager import SystemLogger - -sys_log = SystemLogger("Validators").get_logger() - -# ========================================== -# SQL Injection 防護 -# ========================================== - -# 允許的資料表白名單 -ALLOWED_TABLES = { - 'products', 'price_records', 'promo_products', - 'monthly_summary_analysis', 'realtime_sales_monthly', - 'realtime_sales_daily', 'daily_sales_snapshot', - 'festival_products' -} - - -def validate_table_name(table_name): - """ - 驗證資料表名稱,防止 SQL Injection - - Args: - table_name: 要驗證的資料表名稱 - - Returns: - str: 驗證通過的表名 - - Raises: - ValueError: 表名不在白名單中 - """ - table_name = str(table_name).strip() - - if not table_name: - raise ValueError("表名不能為空") - - if not re.match(r'^[a-zA-Z0-9_]+$', table_name): - raise ValueError(f"表名包含非法字符: {table_name}") - - if table_name not in ALLOWED_TABLES: - sys_log.warning(f"[Security] 表名不在白名單中: {table_name}") - sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM'] - if any(keyword in table_name.upper() for keyword in sql_keywords): - raise ValueError(f"表名包含 SQL 關鍵字: {table_name}") - - return table_name - - -def validate_column_names(column_names): - """ - 驗證欄位名稱列表,防止 SQL Injection - - Args: - column_names: 欄位名稱列表 - - Returns: - list: 驗證通過的欄位名稱列表 - - Raises: - ValueError: 欄位名稱包含非法字符 - """ - if isinstance(column_names, str): - column_names = [column_names] - - validated = [] - for col in column_names: - col = str(col).strip() - if not re.match(r'^[\w\u4e00-\u9fff]+$', col): - raise ValueError(f"欄位名稱包含非法字符: {col}") - validated.append(col) - - return validated - - -def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None): - """ - 安全的 SQL 查詢函數,防止 SQL Injection - - Args: - table_name: 資料表名稱 - columns: 欄位列表,None 表示 * - engine: SQLAlchemy engine - where_clause: WHERE 子句 - limit: 限制筆數 - params: 參數化查詢的參數字典 - - Returns: - DataFrame: 查詢結果 - """ - from sqlalchemy import text - - table_name = validate_table_name(table_name) - - if columns: - columns = validate_column_names(columns) - col_str = ', '.join([f'"{col}"' for col in columns]) - else: - col_str = '*' - - try: - query = f'SELECT {col_str} FROM "{table_name}"' - if where_clause: - query += f' WHERE {where_clause}' - - if limit: - query += f' LIMIT {int(limit)}' - - return pd.read_sql(text(query), engine, params=params) - except Exception as e: - sys_log.error(f"[Security] SQL 查詢失敗: {e}") - raise - - -# ========================================== -# 路徑遍歷防護 -# ========================================== - -def safe_join(base, *paths): - """ - 安全的路徑拼接,防止路徑遍歷攻擊 - - Args: - base: 基礎目錄(絕對路徑) - *paths: 子路徑組件 - - Returns: - Path: 安全的完整路徑 - - Raises: - ValueError: 偵測到路徑遍歷嘗試 - """ - base = Path(base).resolve() - - for path_component in paths: - path_str = str(path_component) - - if '\\' in path_str: - sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}") - raise ValueError(f"路徑遍歷偵測: 不允許使用反斜線") - - if '..' in path_str.replace('\\', '/'): - sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}") - raise ValueError(f"路徑遍歷偵測: 不允許使用 '..'") - - full_path = (base / Path(*paths)).resolve() - - try: - full_path.relative_to(base) - except ValueError: - sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}") - raise ValueError(f"路徑遍歷偵測: 不允許存取基礎目錄外的檔案") - - return full_path - - -# ========================================== -# 檔案上傳安全驗證 -# ========================================== - -ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'} -ALLOWED_MIME_TYPES = { - 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', - 'application/vnd.ms-excel', - 'text/csv', - 'application/octet-stream' -} - - -def secure_filename_unicode(filename): - """ - 支援中文的安全檔案名稱清理 - - Args: - filename: 原始檔案名稱 - - Returns: - str: 清理後的安全檔案名稱 - """ - filename = unicodedata.normalize('NFKC', filename) - safe_chars = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\(\)_\-\.]', '', filename) - safe_chars = re.sub(r'\s+', ' ', safe_chars) - safe_chars = safe_chars.strip() - return safe_chars - - -def allowed_file(filename): - """ - 檢查檔案副檔名是否在白名單中 - - Args: - filename: 檔案名稱 - - Returns: - bool: 是否允許上傳 - """ - if not filename or '.' not in filename: - return False - - parts = filename.rsplit('.', 1) - if len(parts) != 2: - return False - - basename, ext = parts - - if not basename or basename.strip() == '': - return False - - return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS - - -def validate_upload_file(file): - """ - 完整的檔案上傳驗證(副檔名、檔案名稱清理) - - Args: - file: Flask request.files 物件 - - Returns: - tuple: (is_valid, error_message, safe_filename) - """ - if not file or file.filename == '': - return False, '未選擇檔案', None - - original_filename = file.filename - - if '..' in original_filename: - sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}") - return False, '檔案名稱包含非法字元', None - - if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename): - if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename: - sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}") - return False, '檔案名稱包含非法字元', None - - safe_name = secure_filename_unicode(original_filename) - if not safe_name: - return False, '檔案名稱不合法', None - - if not allowed_file(safe_name): - return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None - - return True, None, safe_name +# ruff: noqa: F401, F403 +from utils.security import ( + ALLOWED_TABLES, + ALLOWED_UPLOAD_EXTENSIONS, + ALLOWED_MIME_TYPES, + validate_table_name, + validate_column_names, + safe_read_sql, + safe_join, + secure_filename_unicode, + allowed_file, + validate_upload_file, +)