Files
ewoooc/utils/validators.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

254 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
驗證函數模組
提供安全驗證、路徑防護、檔案上傳驗證等功能
"""
import re
import os
import unicodedata
from pathlib import Path
import pandas as pd
from services.logger_manager import SystemLogger
sys_log = SystemLogger("Validators").get_logger()
# ==========================================
# SQL Injection 防護
# ==========================================
# 允許的資料表白名單
ALLOWED_TABLES = {
'products', 'price_records', 'promo_products',
'monthly_summary_analysis', 'realtime_sales_monthly',
'realtime_sales_daily', 'daily_sales_snapshot',
'festival_products'
}
def validate_table_name(table_name):
"""
驗證資料表名稱,防止 SQL Injection
Args:
table_name: 要驗證的資料表名稱
Returns:
str: 驗證通過的表名
Raises:
ValueError: 表名不在白名單中
"""
table_name = str(table_name).strip()
if not table_name:
raise ValueError("表名不能為空")
if not re.match(r'^[a-zA-Z0-9_]+$', table_name):
raise ValueError(f"表名包含非法字符: {table_name}")
if table_name not in ALLOWED_TABLES:
sys_log.warning(f"[Security] 表名不在白名單中: {table_name}")
sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM']
if any(keyword in table_name.upper() for keyword in sql_keywords):
raise ValueError(f"表名包含 SQL 關鍵字: {table_name}")
return table_name
def validate_column_names(column_names):
"""
驗證欄位名稱列表,防止 SQL Injection
Args:
column_names: 欄位名稱列表
Returns:
list: 驗證通過的欄位名稱列表
Raises:
ValueError: 欄位名稱包含非法字符
"""
if isinstance(column_names, str):
column_names = [column_names]
validated = []
for col in column_names:
col = str(col).strip()
if not re.match(r'^[\w\u4e00-\u9fff]+$', col):
raise ValueError(f"欄位名稱包含非法字符: {col}")
validated.append(col)
return validated
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None):
"""
安全的 SQL 查詢函數,防止 SQL Injection
Args:
table_name: 資料表名稱
columns: 欄位列表None 表示 *
engine: SQLAlchemy engine
where_clause: WHERE 子句
limit: 限制筆數
params: 參數化查詢的參數字典
Returns:
DataFrame: 查詢結果
"""
from sqlalchemy import text
table_name = validate_table_name(table_name)
if columns:
columns = validate_column_names(columns)
col_str = ', '.join([f'"{col}"' for col in columns])
else:
col_str = '*'
try:
query = f'SELECT {col_str} FROM "{table_name}"'
if where_clause:
query += f' WHERE {where_clause}'
if limit:
query += f' LIMIT {int(limit)}'
return pd.read_sql(text(query), engine, params=params)
except Exception as e:
sys_log.error(f"[Security] SQL 查詢失敗: {e}")
raise
# ==========================================
# 路徑遍歷防護
# ==========================================
def safe_join(base, *paths):
"""
安全的路徑拼接,防止路徑遍歷攻擊
Args:
base: 基礎目錄(絕對路徑)
*paths: 子路徑組件
Returns:
Path: 安全的完整路徑
Raises:
ValueError: 偵測到路徑遍歷嘗試
"""
base = Path(base).resolve()
for path_component in paths:
path_str = str(path_component)
if '\\' in path_str:
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}")
raise ValueError(f"路徑遍歷偵測: 不允許使用反斜線")
if '..' in path_str.replace('\\', '/'):
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}")
raise ValueError(f"路徑遍歷偵測: 不允許使用 '..'")
full_path = (base / Path(*paths)).resolve()
try:
full_path.relative_to(base)
except ValueError:
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}")
raise ValueError(f"路徑遍歷偵測: 不允許存取基礎目錄外的檔案")
return full_path
# ==========================================
# 檔案上傳安全驗證
# ==========================================
ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'}
ALLOWED_MIME_TYPES = {
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-excel',
'text/csv',
'application/octet-stream'
}
def secure_filename_unicode(filename):
"""
支援中文的安全檔案名稱清理
Args:
filename: 原始檔案名稱
Returns:
str: 清理後的安全檔案名稱
"""
filename = unicodedata.normalize('NFKC', filename)
safe_chars = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\(\)_\-\.]', '', filename)
safe_chars = re.sub(r'\s+', ' ', safe_chars)
safe_chars = safe_chars.strip()
return safe_chars
def allowed_file(filename):
"""
檢查檔案副檔名是否在白名單中
Args:
filename: 檔案名稱
Returns:
bool: 是否允許上傳
"""
if not filename or '.' not in filename:
return False
parts = filename.rsplit('.', 1)
if len(parts) != 2:
return False
basename, ext = parts
if not basename or basename.strip() == '':
return False
return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS
def validate_upload_file(file):
"""
完整的檔案上傳驗證(副檔名、檔案名稱清理)
Args:
file: Flask request.files 物件
Returns:
tuple: (is_valid, error_message, safe_filename)
"""
if not file or file.filename == '':
return False, '未選擇檔案', None
original_filename = file.filename
if '..' in original_filename:
sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}")
return False, '檔案名稱包含非法字元', None
if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename):
if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename:
sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}")
return False, '檔案名稱包含非法字元', None
safe_name = secure_filename_unicode(original_filename)
if not safe_name:
return False, '檔案名稱不合法', None
if not allowed_file(safe_name):
return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None
return True, None, safe_name