refactor(p1-01c): 整併 utils/security 與 utils/validators 重複實作
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s

發現 utils/validators.py 已存在且完整重複 utils/security.py 的 9 個函數。
不收拾的話會繼續腐爛 — 立刻整併為單一權威來源。

變更:
- utils/security.py 增加 safe_read_sql(取自 validators.py 較完整版本,含 limit + params)
- utils/security.py ALLOWED_TABLES 取兩份聯集(補上 monthly_summary_analysis,
  realtime_sales_daily),避免破壞既有呼叫者
- utils/validators.py 改為純 re-export shim(保 from utils.validators import 不破)
- app.py 移除原 safe_read_sql 重複定義(35 行),改 import utils.security

routes/import_routes.py 不變(它 from utils.validators 走得到 re-export,等下輪統一)。

行數變化: app.py 7,187 → 7,151 (-36)
This commit is contained in:
ooo
2026-04-28 15:48:41 +08:00
parent 0a3f6cb22d
commit 17cb012be7
3 changed files with 60 additions and 290 deletions

40
app.py
View File

@@ -148,45 +148,9 @@ from utils.security import ( # noqa: E402
validate_column_names,
)
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None):
"""
安全的 SQL 查詢函數,防止 SQL Injection
Args:
table_name: 資料表名稱
columns: 欄位列表None 表示 *
engine: SQLAlchemy engine
where_clause: WHERE 子句(僅支持簡單條件)
Returns:
DataFrame: 查詢結果
"""
from sqlalchemy import text, MetaData, Table
# 驗證表名
table_name = validate_table_name(table_name)
# 驗證欄位名
if columns:
columns = validate_column_names(columns)
col_str = ', '.join([f'"{col}"' for col in columns])
else:
col_str = '*'
# 使用 SQLAlchemy 的參數化查詢
# 注意:表名和欄位名不能參數化,所以必須先驗證
try:
query = f'SELECT {col_str} FROM "{table_name}"'
if where_clause:
query += f' WHERE {where_clause}'
return pd.read_sql(text(query), engine)
except Exception as e:
sys_log.error(f"[Security] SQL 查詢失敗: {e}")
raise
# 安全工具:路徑遍歷 + 檔案上傳驗證已搬至 utils/security.py
# 安全工具:路徑遍歷 + 檔案上傳驗證 + safe_read_sql 已搬至 utils/security.py
from utils.security import ( # noqa: E402
safe_read_sql,
safe_join,
ALLOWED_UPLOAD_EXTENSIONS,
ALLOWED_MIME_TYPES,

View File

@@ -1,12 +1,15 @@
"""安全相關工具SQL injection 防護、路徑遍歷防護、檔案上傳驗證。
從 app.py 抽出,純驗證邏輯,無 Flask 依賴。
從 app.py + utils/validators.py 整併的單一權威來源。純驗證邏輯,無 Flask 依賴。
舊的 utils/validators.py 已 deprecate僅保留 re-export 不破壞既有 import。
"""
import os
import re
import unicodedata
from pathlib import Path
import pandas as pd
from utils.logger_manager import SystemLogger
_log = SystemLogger("Security").get_logger()
@@ -16,9 +19,12 @@ _log = SystemLogger("Security").get_logger()
# SQL Injection 防護
# ────────────────────────────────────────────────────────────────────────
# 整合 app.py 與 utils/validators.py 的兩份 ALLOWED_TABLES取聯集避免破壞既有呼叫者
ALLOWED_TABLES = {
'realtime_sales_monthly',
'realtime_sales_daily',
'daily_sales_snapshot',
'monthly_summary_analysis',
'products',
'price_records',
'promo_products',
@@ -62,6 +68,40 @@ def validate_column_names(column_names):
return validated
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None):
"""安全的 SQL 查詢函數,防止 SQL Injection。
Args:
table_name: 資料表名稱(必驗證白名單)
columns: 欄位列表None 表示 *
engine: SQLAlchemy engine
where_clause: WHERE 子句(呼叫端負責安全)
limit: 限制筆數(自動轉 int
params: 參數化查詢的參數字典
"""
from sqlalchemy import text
table_name = validate_table_name(table_name)
if columns:
columns = validate_column_names(columns)
col_str = ', '.join([f'"{col}"' for col in columns])
else:
col_str = '*'
try:
query = f'SELECT {col_str} FROM "{table_name}"'
if where_clause:
query += f' WHERE {where_clause}'
if limit:
query += f' LIMIT {int(limit)}'
return pd.read_sql(text(query), engine, params=params)
except Exception as e:
_log.error(f"[Security] SQL 查詢失敗: {e}")
raise
# ────────────────────────────────────────────────────────────────────────
# 路徑遍歷防護
# ────────────────────────────────────────────────────────────────────────

View File

@@ -1,253 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
驗證函數模組
提供安全驗證、路徑防護、檔案上傳驗證等功能
"""DEPRECATED — 完整實作已搬至 utils/security.py。
此檔案僅保留為 backward-compat re-export避免破壞既有 import。
新程式碼請直接 `from utils.security import ...`。
"""
import re
import os
import unicodedata
from pathlib import Path
import pandas as pd
from services.logger_manager import SystemLogger
sys_log = SystemLogger("Validators").get_logger()
# ==========================================
# SQL Injection 防護
# ==========================================
# 允許的資料表白名單
ALLOWED_TABLES = {
'products', 'price_records', 'promo_products',
'monthly_summary_analysis', 'realtime_sales_monthly',
'realtime_sales_daily', 'daily_sales_snapshot',
'festival_products'
}
def validate_table_name(table_name):
"""
驗證資料表名稱,防止 SQL Injection
Args:
table_name: 要驗證的資料表名稱
Returns:
str: 驗證通過的表名
Raises:
ValueError: 表名不在白名單中
"""
table_name = str(table_name).strip()
if not table_name:
raise ValueError("表名不能為空")
if not re.match(r'^[a-zA-Z0-9_]+$', table_name):
raise ValueError(f"表名包含非法字符: {table_name}")
if table_name not in ALLOWED_TABLES:
sys_log.warning(f"[Security] 表名不在白名單中: {table_name}")
sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM']
if any(keyword in table_name.upper() for keyword in sql_keywords):
raise ValueError(f"表名包含 SQL 關鍵字: {table_name}")
return table_name
def validate_column_names(column_names):
"""
驗證欄位名稱列表,防止 SQL Injection
Args:
column_names: 欄位名稱列表
Returns:
list: 驗證通過的欄位名稱列表
Raises:
ValueError: 欄位名稱包含非法字符
"""
if isinstance(column_names, str):
column_names = [column_names]
validated = []
for col in column_names:
col = str(col).strip()
if not re.match(r'^[\w\u4e00-\u9fff]+$', col):
raise ValueError(f"欄位名稱包含非法字符: {col}")
validated.append(col)
return validated
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None):
"""
安全的 SQL 查詢函數,防止 SQL Injection
Args:
table_name: 資料表名稱
columns: 欄位列表None 表示 *
engine: SQLAlchemy engine
where_clause: WHERE 子句
limit: 限制筆數
params: 參數化查詢的參數字典
Returns:
DataFrame: 查詢結果
"""
from sqlalchemy import text
table_name = validate_table_name(table_name)
if columns:
columns = validate_column_names(columns)
col_str = ', '.join([f'"{col}"' for col in columns])
else:
col_str = '*'
try:
query = f'SELECT {col_str} FROM "{table_name}"'
if where_clause:
query += f' WHERE {where_clause}'
if limit:
query += f' LIMIT {int(limit)}'
return pd.read_sql(text(query), engine, params=params)
except Exception as e:
sys_log.error(f"[Security] SQL 查詢失敗: {e}")
raise
# ==========================================
# 路徑遍歷防護
# ==========================================
def safe_join(base, *paths):
"""
安全的路徑拼接,防止路徑遍歷攻擊
Args:
base: 基礎目錄(絕對路徑)
*paths: 子路徑組件
Returns:
Path: 安全的完整路徑
Raises:
ValueError: 偵測到路徑遍歷嘗試
"""
base = Path(base).resolve()
for path_component in paths:
path_str = str(path_component)
if '\\' in path_str:
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}")
raise ValueError(f"路徑遍歷偵測: 不允許使用反斜線")
if '..' in path_str.replace('\\', '/'):
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}")
raise ValueError(f"路徑遍歷偵測: 不允許使用 '..'")
full_path = (base / Path(*paths)).resolve()
try:
full_path.relative_to(base)
except ValueError:
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}")
raise ValueError(f"路徑遍歷偵測: 不允許存取基礎目錄外的檔案")
return full_path
# ==========================================
# 檔案上傳安全驗證
# ==========================================
ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'}
ALLOWED_MIME_TYPES = {
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-excel',
'text/csv',
'application/octet-stream'
}
def secure_filename_unicode(filename):
"""
支援中文的安全檔案名稱清理
Args:
filename: 原始檔案名稱
Returns:
str: 清理後的安全檔案名稱
"""
filename = unicodedata.normalize('NFKC', filename)
safe_chars = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\(\)_\-\.]', '', filename)
safe_chars = re.sub(r'\s+', ' ', safe_chars)
safe_chars = safe_chars.strip()
return safe_chars
def allowed_file(filename):
"""
檢查檔案副檔名是否在白名單中
Args:
filename: 檔案名稱
Returns:
bool: 是否允許上傳
"""
if not filename or '.' not in filename:
return False
parts = filename.rsplit('.', 1)
if len(parts) != 2:
return False
basename, ext = parts
if not basename or basename.strip() == '':
return False
return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS
def validate_upload_file(file):
"""
完整的檔案上傳驗證(副檔名、檔案名稱清理)
Args:
file: Flask request.files 物件
Returns:
tuple: (is_valid, error_message, safe_filename)
"""
if not file or file.filename == '':
return False, '未選擇檔案', None
original_filename = file.filename
if '..' in original_filename:
sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}")
return False, '檔案名稱包含非法字元', None
if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename):
if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename:
sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}")
return False, '檔案名稱包含非法字元', None
safe_name = secure_filename_unicode(original_filename)
if not safe_name:
return False, '檔案名稱不合法', None
if not allowed_file(safe_name):
return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None
return True, None, safe_name
# ruff: noqa: F401, F403
from utils.security import (
ALLOWED_TABLES,
ALLOWED_UPLOAD_EXTENSIONS,
ALLOWED_MIME_TYPES,
validate_table_name,
validate_column_names,
safe_read_sql,
safe_join,
secure_filename_unicode,
allowed_file,
validate_upload_file,
)