Files
ewoooc/utils/security.py
ooo f7a5f8505f
All checks were successful
CD Pipeline / deploy (push) Successful in 1m8s
refactor(p1-01a): app.py 安全工具抽到 utils/security.py
從 app.py 抽出純驗證邏輯 (~180 行) 到 utils/security.py:
- ALLOWED_TABLES 白名單常數
- validate_table_name / validate_column_names (SQL injection 防護)
- safe_join (路徑遍歷防護)
- ALLOWED_UPLOAD_EXTENSIONS / ALLOWED_MIME_TYPES
- secure_filename_unicode / allowed_file / validate_upload_file (上傳驗證)

app.py 保留 from utils.security import * 維持 backward compat,
讓 tests/test_path_traversal.py、tests/test_sql_security.py、
tests/test_file_upload.py 不需修改即可繼續使用 from app import xxx。

行數變化: app.py 7,386 → 7,206 (-180)
2026-04-28 15:42:44 +08:00

157 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""安全相關工具SQL injection 防護、路徑遍歷防護、檔案上傳驗證。
從 app.py 抽出,純驗證邏輯,無 Flask 依賴。
"""
import os
import re
import unicodedata
from pathlib import Path
from utils.logger_manager import SystemLogger
_log = SystemLogger("Security").get_logger()
# ────────────────────────────────────────────────────────────────────────
# SQL Injection 防護
# ────────────────────────────────────────────────────────────────────────
ALLOWED_TABLES = {
'realtime_sales_monthly',
'daily_sales_snapshot',
'products',
'price_records',
'promo_products',
'edm_products',
'festival_products',
}
_SQL_KEYWORDS = ('SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM')
def validate_table_name(table_name):
"""驗證資料表名稱,防止 SQL Injection。"""
table_name = str(table_name).strip()
if not table_name:
raise ValueError("表名不能為空")
if not re.match(r'^[a-zA-Z0-9_]+$', table_name):
raise ValueError(f"表名包含非法字符: {table_name}")
if table_name not in ALLOWED_TABLES:
_log.warning(f"[Security] 表名不在白名單中: {table_name}")
if any(keyword in table_name.upper() for keyword in _SQL_KEYWORDS):
raise ValueError(f"表名包含 SQL 關鍵字: {table_name}")
return table_name
def validate_column_names(column_names):
"""驗證欄位名稱列表,防止 SQL Injection。"""
if isinstance(column_names, str):
column_names = [column_names]
validated = []
for col in column_names:
col = str(col).strip()
if not re.match(r'^[\w一-鿿]+$', col):
raise ValueError(f"欄位名稱包含非法字符: {col}")
validated.append(col)
return validated
# ────────────────────────────────────────────────────────────────────────
# 路徑遍歷防護
# ────────────────────────────────────────────────────────────────────────
def safe_join(base, *paths):
"""安全的路徑拼接,防止路徑遍歷攻擊。"""
base = Path(base).resolve()
for path_component in paths:
path_str = str(path_component)
if '\\' in path_str:
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}")
raise ValueError("路徑遍歷偵測: 不允許使用反斜線")
if '..' in path_str.replace('\\', '/'):
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}")
raise ValueError("路徑遍歷偵測: 不允許使用 '..'")
full_path = (base / Path(*paths)).resolve()
try:
full_path.relative_to(base)
except ValueError:
_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}")
raise ValueError("路徑遍歷偵測: 不允許存取基礎目錄外的檔案")
return full_path
# ────────────────────────────────────────────────────────────────────────
# 檔案上傳安全驗證
# ────────────────────────────────────────────────────────────────────────
ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'}
ALLOWED_MIME_TYPES = {
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', # .xlsx
'application/vnd.ms-excel', # .xls
'text/csv', # .csv
'application/octet-stream', # CSV sometimes detected as this
}
def secure_filename_unicode(filename):
"""支援中文的安全檔案名稱清理。"""
filename = unicodedata.normalize('NFKC', filename)
safe_chars = re.sub(r'[^一-龥a-zA-Z0-9\s\(\)_\-\.]', '', filename)
safe_chars = re.sub(r'\s+', ' ', safe_chars)
return safe_chars.strip()
def allowed_file(filename):
"""檢查檔案副檔名是否在白名單中。"""
if not filename or '.' not in filename:
return False
parts = filename.rsplit('.', 1)
if len(parts) != 2:
return False
basename, ext = parts
if not basename or basename.strip() == '':
return False
return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS
def validate_upload_file(file):
"""完整的檔案上傳驗證(副檔名、檔案名稱清理)。"""
if not file or file.filename == '':
return False, '未選擇檔案', None
original_filename = file.filename
if '..' in original_filename:
_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}")
return False, '檔案名稱包含非法字元', None
if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename):
if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename:
_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}")
return False, '檔案名稱包含非法字元', None
safe_name = secure_filename_unicode(original_filename)
if not safe_name:
return False, '檔案名稱不合法', None
if not allowed_file(safe_name):
return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None
return True, None, safe_name