All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
發現 utils/validators.py 已存在且完整重複 utils/security.py 的 9 個函數。 不收拾的話會繼續腐爛 — 立刻整併為單一權威來源。 變更: - utils/security.py 增加 safe_read_sql(取自 validators.py 較完整版本,含 limit + params) - utils/security.py ALLOWED_TABLES 取兩份聯集(補上 monthly_summary_analysis, realtime_sales_daily),避免破壞既有呼叫者 - utils/validators.py 改為純 re-export shim(保 from utils.validators import 不破) - app.py 移除原 safe_read_sql 重複定義(35 行),改 import utils.security routes/import_routes.py 不變(它 from utils.validators 走得到 re-export,等下輪統一)。 行數變化: app.py 7,187 → 7,151 (-36)
197 lines
7.4 KiB
Python
197 lines
7.4 KiB
Python
"""安全相關工具:SQL injection 防護、路徑遍歷防護、檔案上傳驗證。
|
||
|
||
從 app.py + utils/validators.py 整併的單一權威來源。純驗證邏輯,無 Flask 依賴。
|
||
舊的 utils/validators.py 已 deprecate,僅保留 re-export 不破壞既有 import。
|
||
"""
|
||
import os
|
||
import re
|
||
import unicodedata
|
||
from pathlib import Path
|
||
|
||
import pandas as pd
|
||
|
||
from utils.logger_manager import SystemLogger
|
||
|
||
_log = SystemLogger("Security").get_logger()
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────────────
|
||
# SQL Injection 防護
|
||
# ────────────────────────────────────────────────────────────────────────
|
||
|
||
# 整合 app.py 與 utils/validators.py 的兩份 ALLOWED_TABLES(取聯集,避免破壞既有呼叫者)
|
||
ALLOWED_TABLES = {
|
||
'realtime_sales_monthly',
|
||
'realtime_sales_daily',
|
||
'daily_sales_snapshot',
|
||
'monthly_summary_analysis',
|
||
'products',
|
||
'price_records',
|
||
'promo_products',
|
||
'edm_products',
|
||
'festival_products',
|
||
}
|
||
|
||
_SQL_KEYWORDS = ('SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM')
|
||
|
||
|
||
def validate_table_name(table_name):
|
||
"""驗證資料表名稱,防止 SQL Injection。"""
|
||
table_name = str(table_name).strip()
|
||
|
||
if not table_name:
|
||
raise ValueError("表名不能為空")
|
||
|
||
if not re.match(r'^[a-zA-Z0-9_]+$', table_name):
|
||
raise ValueError(f"表名包含非法字符: {table_name}")
|
||
|
||
if table_name not in ALLOWED_TABLES:
|
||
_log.warning(f"[Security] 表名不在白名單中: {table_name}")
|
||
if any(keyword in table_name.upper() for keyword in _SQL_KEYWORDS):
|
||
raise ValueError(f"表名包含 SQL 關鍵字: {table_name}")
|
||
|
||
return table_name
|
||
|
||
|
||
def validate_column_names(column_names):
|
||
"""驗證欄位名稱列表,防止 SQL Injection。"""
|
||
if isinstance(column_names, str):
|
||
column_names = [column_names]
|
||
|
||
validated = []
|
||
for col in column_names:
|
||
col = str(col).strip()
|
||
if not re.match(r'^[\w一-鿿]+$', col):
|
||
raise ValueError(f"欄位名稱包含非法字符: {col}")
|
||
validated.append(col)
|
||
|
||
return validated
|
||
|
||
|
||
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None):
|
||
"""安全的 SQL 查詢函數,防止 SQL Injection。
|
||
|
||
Args:
|
||
table_name: 資料表名稱(必驗證白名單)
|
||
columns: 欄位列表,None 表示 *
|
||
engine: SQLAlchemy engine
|
||
where_clause: WHERE 子句(呼叫端負責安全)
|
||
limit: 限制筆數(自動轉 int)
|
||
params: 參數化查詢的參數字典
|
||
"""
|
||
from sqlalchemy import text
|
||
|
||
table_name = validate_table_name(table_name)
|
||
|
||
if columns:
|
||
columns = validate_column_names(columns)
|
||
col_str = ', '.join([f'"{col}"' for col in columns])
|
||
else:
|
||
col_str = '*'
|
||
|
||
try:
|
||
query = f'SELECT {col_str} FROM "{table_name}"'
|
||
if where_clause:
|
||
query += f' WHERE {where_clause}'
|
||
if limit:
|
||
query += f' LIMIT {int(limit)}'
|
||
|
||
return pd.read_sql(text(query), engine, params=params)
|
||
except Exception as e:
|
||
_log.error(f"[Security] SQL 查詢失敗: {e}")
|
||
raise
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────────────
|
||
# 路徑遍歷防護
|
||
# ────────────────────────────────────────────────────────────────────────
|
||
|
||
def safe_join(base, *paths):
|
||
"""安全的路徑拼接,防止路徑遍歷攻擊。"""
|
||
base = Path(base).resolve()
|
||
|
||
for path_component in paths:
|
||
path_str = str(path_component)
|
||
|
||
if '\\' in path_str:
|
||
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}")
|
||
raise ValueError("路徑遍歷偵測: 不允許使用反斜線")
|
||
|
||
if '..' in path_str.replace('\\', '/'):
|
||
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}")
|
||
raise ValueError("路徑遍歷偵測: 不允許使用 '..'")
|
||
|
||
full_path = (base / Path(*paths)).resolve()
|
||
|
||
try:
|
||
full_path.relative_to(base)
|
||
except ValueError:
|
||
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}")
|
||
raise ValueError("路徑遍歷偵測: 不允許存取基礎目錄外的檔案")
|
||
|
||
return full_path
|
||
|
||
|
||
# ────────────────────────────────────────────────────────────────────────
|
||
# 檔案上傳安全驗證
|
||
# ────────────────────────────────────────────────────────────────────────
|
||
|
||
ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'}
|
||
|
||
ALLOWED_MIME_TYPES = {
|
||
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
|
||
'application/vnd.ms-excel', # .xls
|
||
'text/csv', # .csv
|
||
'application/octet-stream', # CSV sometimes detected as this
|
||
}
|
||
|
||
|
||
def secure_filename_unicode(filename):
|
||
"""支援中文的安全檔案名稱清理。"""
|
||
filename = unicodedata.normalize('NFKC', filename)
|
||
safe_chars = re.sub(r'[^一-龥a-zA-Z0-9\s\(\)_\-\.]', '', filename)
|
||
safe_chars = re.sub(r'\s+', ' ', safe_chars)
|
||
return safe_chars.strip()
|
||
|
||
|
||
def allowed_file(filename):
|
||
"""檢查檔案副檔名是否在白名單中。"""
|
||
if not filename or '.' not in filename:
|
||
return False
|
||
|
||
parts = filename.rsplit('.', 1)
|
||
if len(parts) != 2:
|
||
return False
|
||
|
||
basename, ext = parts
|
||
if not basename or basename.strip() == '':
|
||
return False
|
||
|
||
return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS
|
||
|
||
|
||
def validate_upload_file(file):
|
||
"""完整的檔案上傳驗證(副檔名、檔案名稱清理)。"""
|
||
if not file or file.filename == '':
|
||
return False, '未選擇檔案', None
|
||
|
||
original_filename = file.filename
|
||
|
||
if '..' in original_filename:
|
||
_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}")
|
||
return False, '檔案名稱包含非法字元', None
|
||
|
||
if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename):
|
||
if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename:
|
||
_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}")
|
||
return False, '檔案名稱包含非法字元', None
|
||
|
||
safe_name = secure_filename_unicode(original_filename)
|
||
if not safe_name:
|
||
return False, '檔案名稱不合法', None
|
||
|
||
if not allowed_file(safe_name):
|
||
return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None
|
||
|
||
return True, None, safe_name
|