Files
ewoooc/utils/security.py
ooo 17cb012be7
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
refactor(p1-01c): 整併 utils/security 與 utils/validators 重複實作
發現 utils/validators.py 已存在且完整重複 utils/security.py 的 9 個函數。
不收拾的話會繼續腐爛 — 立刻整併為單一權威來源。

變更:
- utils/security.py 增加 safe_read_sql(取自 validators.py 較完整版本,含 limit + params)
- utils/security.py ALLOWED_TABLES 取兩份聯集(補上 monthly_summary_analysis,
  realtime_sales_daily),避免破壞既有呼叫者
- utils/validators.py 改為純 re-export shim(保 from utils.validators import 不破)
- app.py 移除原 safe_read_sql 重複定義(35 行),改 import utils.security

routes/import_routes.py 不變(它 from utils.validators 走得到 re-export,等下輪統一)。

行數變化: app.py 7,187 → 7,151 (-36)
2026-04-28 15:48:41 +08:00

197 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""安全相關工具SQL injection 防護、路徑遍歷防護、檔案上傳驗證。
從 app.py + utils/validators.py 整併的單一權威來源。純驗證邏輯,無 Flask 依賴。
舊的 utils/validators.py 已 deprecate僅保留 re-export 不破壞既有 import。
"""
import os
import re
import unicodedata
from pathlib import Path
import pandas as pd
from utils.logger_manager import SystemLogger
_log = SystemLogger("Security").get_logger()
# ────────────────────────────────────────────────────────────────────────
# SQL Injection 防護
# ────────────────────────────────────────────────────────────────────────
# 整合 app.py 與 utils/validators.py 的兩份 ALLOWED_TABLES取聯集避免破壞既有呼叫者
ALLOWED_TABLES = {
'realtime_sales_monthly',
'realtime_sales_daily',
'daily_sales_snapshot',
'monthly_summary_analysis',
'products',
'price_records',
'promo_products',
'edm_products',
'festival_products',
}
_SQL_KEYWORDS = ('SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM')
def validate_table_name(table_name):
"""驗證資料表名稱,防止 SQL Injection。"""
table_name = str(table_name).strip()
if not table_name:
raise ValueError("表名不能為空")
if not re.match(r'^[a-zA-Z0-9_]+$', table_name):
raise ValueError(f"表名包含非法字符: {table_name}")
if table_name not in ALLOWED_TABLES:
_log.warning(f"[Security] 表名不在白名單中: {table_name}")
if any(keyword in table_name.upper() for keyword in _SQL_KEYWORDS):
raise ValueError(f"表名包含 SQL 關鍵字: {table_name}")
return table_name
def validate_column_names(column_names):
"""驗證欄位名稱列表,防止 SQL Injection。"""
if isinstance(column_names, str):
column_names = [column_names]
validated = []
for col in column_names:
col = str(col).strip()
if not re.match(r'^[\w一-鿿]+$', col):
raise ValueError(f"欄位名稱包含非法字符: {col}")
validated.append(col)
return validated
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None):
"""安全的 SQL 查詢函數,防止 SQL Injection。
Args:
table_name: 資料表名稱(必驗證白名單)
columns: 欄位列表None 表示 *
engine: SQLAlchemy engine
where_clause: WHERE 子句(呼叫端負責安全)
limit: 限制筆數(自動轉 int
params: 參數化查詢的參數字典
"""
from sqlalchemy import text
table_name = validate_table_name(table_name)
if columns:
columns = validate_column_names(columns)
col_str = ', '.join([f'"{col}"' for col in columns])
else:
col_str = '*'
try:
query = f'SELECT {col_str} FROM "{table_name}"'
if where_clause:
query += f' WHERE {where_clause}'
if limit:
query += f' LIMIT {int(limit)}'
return pd.read_sql(text(query), engine, params=params)
except Exception as e:
_log.error(f"[Security] SQL 查詢失敗: {e}")
raise
# ────────────────────────────────────────────────────────────────────────
# 路徑遍歷防護
# ────────────────────────────────────────────────────────────────────────
def safe_join(base, *paths):
"""安全的路徑拼接,防止路徑遍歷攻擊。"""
base = Path(base).resolve()
for path_component in paths:
path_str = str(path_component)
if '\\' in path_str:
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}")
raise ValueError("路徑遍歷偵測: 不允許使用反斜線")
if '..' in path_str.replace('\\', '/'):
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}")
raise ValueError("路徑遍歷偵測: 不允許使用 '..'")
full_path = (base / Path(*paths)).resolve()
try:
full_path.relative_to(base)
except ValueError:
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}")
raise ValueError("路徑遍歷偵測: 不允許存取基礎目錄外的檔案")
return full_path
# ────────────────────────────────────────────────────────────────────────
# 檔案上傳安全驗證
# ────────────────────────────────────────────────────────────────────────
ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'}
ALLOWED_MIME_TYPES = {
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
'application/vnd.ms-excel', # .xls
'text/csv', # .csv
'application/octet-stream', # CSV sometimes detected as this
}
def secure_filename_unicode(filename):
"""支援中文的安全檔案名稱清理。"""
filename = unicodedata.normalize('NFKC', filename)
safe_chars = re.sub(r'[^一-龥a-zA-Z0-9\s\(\)_\-\.]', '', filename)
safe_chars = re.sub(r'\s+', ' ', safe_chars)
return safe_chars.strip()
def allowed_file(filename):
"""檢查檔案副檔名是否在白名單中。"""
if not filename or '.' not in filename:
return False
parts = filename.rsplit('.', 1)
if len(parts) != 2:
return False
basename, ext = parts
if not basename or basename.strip() == '':
return False
return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS
def validate_upload_file(file):
"""完整的檔案上傳驗證(副檔名、檔案名稱清理)。"""
if not file or file.filename == '':
return False, '未選擇檔案', None
original_filename = file.filename
if '..' in original_filename:
_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}")
return False, '檔案名稱包含非法字元', None
if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename):
if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename:
_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}")
return False, '檔案名稱包含非法字元', None
safe_name = secure_filename_unicode(original_filename)
if not safe_name:
return False, '檔案名稱不合法', None
if not allowed_file(safe_name):
return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None
return True, None, safe_name