Some checks failed
CD Pipeline / deploy (push) Failing after 59s
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml) - 部署模式: rsync Python 檔案至 188 → docker restart (volume mount) - Dockerfile/requirements 變動時自動重建 Docker image - 部署通知: Telegram (開始/成功/失敗) - 健康檢查: https://mo.wooo.work/health (最多 5 次重試) - 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
254 lines
7.0 KiB
Python
254 lines
7.0 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
驗證函數模組
|
||
提供安全驗證、路徑防護、檔案上傳驗證等功能
|
||
"""
|
||
|
||
import re
|
||
import os
|
||
import unicodedata
|
||
from pathlib import Path
|
||
import pandas as pd
|
||
from services.logger_manager import SystemLogger
|
||
|
||
sys_log = SystemLogger("Validators").get_logger()
|
||
|
||
# ==========================================
|
||
# SQL Injection 防護
|
||
# ==========================================
|
||
|
||
# 允許的資料表白名單
|
||
ALLOWED_TABLES = {
|
||
'products', 'price_records', 'promo_products',
|
||
'monthly_summary_analysis', 'realtime_sales_monthly',
|
||
'realtime_sales_daily', 'daily_sales_snapshot',
|
||
'festival_products'
|
||
}
|
||
|
||
|
||
def validate_table_name(table_name):
|
||
"""
|
||
驗證資料表名稱,防止 SQL Injection
|
||
|
||
Args:
|
||
table_name: 要驗證的資料表名稱
|
||
|
||
Returns:
|
||
str: 驗證通過的表名
|
||
|
||
Raises:
|
||
ValueError: 表名不在白名單中
|
||
"""
|
||
table_name = str(table_name).strip()
|
||
|
||
if not table_name:
|
||
raise ValueError("表名不能為空")
|
||
|
||
if not re.match(r'^[a-zA-Z0-9_]+$', table_name):
|
||
raise ValueError(f"表名包含非法字符: {table_name}")
|
||
|
||
if table_name not in ALLOWED_TABLES:
|
||
sys_log.warning(f"[Security] 表名不在白名單中: {table_name}")
|
||
sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'UNION', 'WHERE', 'FROM']
|
||
if any(keyword in table_name.upper() for keyword in sql_keywords):
|
||
raise ValueError(f"表名包含 SQL 關鍵字: {table_name}")
|
||
|
||
return table_name
|
||
|
||
|
||
def validate_column_names(column_names):
|
||
"""
|
||
驗證欄位名稱列表,防止 SQL Injection
|
||
|
||
Args:
|
||
column_names: 欄位名稱列表
|
||
|
||
Returns:
|
||
list: 驗證通過的欄位名稱列表
|
||
|
||
Raises:
|
||
ValueError: 欄位名稱包含非法字符
|
||
"""
|
||
if isinstance(column_names, str):
|
||
column_names = [column_names]
|
||
|
||
validated = []
|
||
for col in column_names:
|
||
col = str(col).strip()
|
||
if not re.match(r'^[\w\u4e00-\u9fff]+$', col):
|
||
raise ValueError(f"欄位名稱包含非法字符: {col}")
|
||
validated.append(col)
|
||
|
||
return validated
|
||
|
||
|
||
def safe_read_sql(table_name, columns=None, engine=None, where_clause=None, limit=None, params=None):
|
||
"""
|
||
安全的 SQL 查詢函數,防止 SQL Injection
|
||
|
||
Args:
|
||
table_name: 資料表名稱
|
||
columns: 欄位列表,None 表示 *
|
||
engine: SQLAlchemy engine
|
||
where_clause: WHERE 子句
|
||
limit: 限制筆數
|
||
params: 參數化查詢的參數字典
|
||
|
||
Returns:
|
||
DataFrame: 查詢結果
|
||
"""
|
||
from sqlalchemy import text
|
||
|
||
table_name = validate_table_name(table_name)
|
||
|
||
if columns:
|
||
columns = validate_column_names(columns)
|
||
col_str = ', '.join([f'"{col}"' for col in columns])
|
||
else:
|
||
col_str = '*'
|
||
|
||
try:
|
||
query = f'SELECT {col_str} FROM "{table_name}"'
|
||
if where_clause:
|
||
query += f' WHERE {where_clause}'
|
||
|
||
if limit:
|
||
query += f' LIMIT {int(limit)}'
|
||
|
||
return pd.read_sql(text(query), engine, params=params)
|
||
except Exception as e:
|
||
sys_log.error(f"[Security] SQL 查詢失敗: {e}")
|
||
raise
|
||
|
||
|
||
# ==========================================
|
||
# 路徑遍歷防護
|
||
# ==========================================
|
||
|
||
def safe_join(base, *paths):
|
||
"""
|
||
安全的路徑拼接,防止路徑遍歷攻擊
|
||
|
||
Args:
|
||
base: 基礎目錄(絕對路徑)
|
||
*paths: 子路徑組件
|
||
|
||
Returns:
|
||
Path: 安全的完整路徑
|
||
|
||
Raises:
|
||
ValueError: 偵測到路徑遍歷嘗試
|
||
"""
|
||
base = Path(base).resolve()
|
||
|
||
for path_component in paths:
|
||
path_str = str(path_component)
|
||
|
||
if '\\' in path_str:
|
||
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (Windows 反斜線) | Base: {base} | Requested: {paths}")
|
||
raise ValueError(f"路徑遍歷偵測: 不允許使用反斜線")
|
||
|
||
if '..' in path_str.replace('\\', '/'):
|
||
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 (雙點) | Base: {base} | Requested: {paths}")
|
||
raise ValueError(f"路徑遍歷偵測: 不允許使用 '..'")
|
||
|
||
full_path = (base / Path(*paths)).resolve()
|
||
|
||
try:
|
||
full_path.relative_to(base)
|
||
except ValueError:
|
||
sys_log.warning(f"[Security] 偵測到路徑遍歷嘗試 | Base: {base} | Requested: {paths}")
|
||
raise ValueError(f"路徑遍歷偵測: 不允許存取基礎目錄外的檔案")
|
||
|
||
return full_path
|
||
|
||
|
||
# ==========================================
|
||
# 檔案上傳安全驗證
|
||
# ==========================================
|
||
|
||
ALLOWED_UPLOAD_EXTENSIONS = {'xlsx', 'xls', 'csv'}
|
||
ALLOWED_MIME_TYPES = {
|
||
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||
'application/vnd.ms-excel',
|
||
'text/csv',
|
||
'application/octet-stream'
|
||
}
|
||
|
||
|
||
def secure_filename_unicode(filename):
|
||
"""
|
||
支援中文的安全檔案名稱清理
|
||
|
||
Args:
|
||
filename: 原始檔案名稱
|
||
|
||
Returns:
|
||
str: 清理後的安全檔案名稱
|
||
"""
|
||
filename = unicodedata.normalize('NFKC', filename)
|
||
safe_chars = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s\(\)_\-\.]', '', filename)
|
||
safe_chars = re.sub(r'\s+', ' ', safe_chars)
|
||
safe_chars = safe_chars.strip()
|
||
return safe_chars
|
||
|
||
|
||
def allowed_file(filename):
|
||
"""
|
||
檢查檔案副檔名是否在白名單中
|
||
|
||
Args:
|
||
filename: 檔案名稱
|
||
|
||
Returns:
|
||
bool: 是否允許上傳
|
||
"""
|
||
if not filename or '.' not in filename:
|
||
return False
|
||
|
||
parts = filename.rsplit('.', 1)
|
||
if len(parts) != 2:
|
||
return False
|
||
|
||
basename, ext = parts
|
||
|
||
if not basename or basename.strip() == '':
|
||
return False
|
||
|
||
return ext.lower() in ALLOWED_UPLOAD_EXTENSIONS
|
||
|
||
|
||
def validate_upload_file(file):
|
||
"""
|
||
完整的檔案上傳驗證(副檔名、檔案名稱清理)
|
||
|
||
Args:
|
||
file: Flask request.files 物件
|
||
|
||
Returns:
|
||
tuple: (is_valid, error_message, safe_filename)
|
||
"""
|
||
if not file or file.filename == '':
|
||
return False, '未選擇檔案', None
|
||
|
||
original_filename = file.filename
|
||
|
||
if '..' in original_filename:
|
||
sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(雙點): {original_filename}")
|
||
return False, '檔案名稱包含非法字元', None
|
||
|
||
if os.path.sep in original_filename or (os.path.altsep and os.path.altsep in original_filename):
|
||
if original_filename.startswith(('/', '\\')) or './' in original_filename or '.\\' in original_filename:
|
||
sys_log.warning(f"[Security] 檔案上傳 - 偵測到路徑遍歷嘗試(路徑分隔符): {original_filename}")
|
||
return False, '檔案名稱包含非法字元', None
|
||
|
||
safe_name = secure_filename_unicode(original_filename)
|
||
if not safe_name:
|
||
return False, '檔案名稱不合法', None
|
||
|
||
if not allowed_file(safe_name):
|
||
return False, f'不支援的檔案格式,僅允許: {", ".join(ALLOWED_UPLOAD_EXTENSIONS)}', None
|
||
|
||
return True, None, safe_name
|