diff --git a/app.py b/app.py index d67deb6..73a9c4f 100644 --- a/app.py +++ b/app.py @@ -3240,6 +3240,13 @@ def import_excel(): if not table_name: table_name = f"import_{int(time.time())}" + # S4 \u5b89\u5168\u4fee\u5fa9\uff1atable_name \u767d\u540d\u55ae\u9a57\u8b49\uff0c\u9632\u6b62 SQL Injection + # \u5141\u8a31\uff1a\u5b57\u6bcd\u3001\u6578\u5b57\u3001\u5e95\u7dda\u3001\u4e2d\u6587\u5b57\uff08\u4e0d\u5141\u8a31\u7a7a\u683c\u3001\u5f15\u865f\u3001\u6ce8\u5165\u7b26\u865f\uff09 + import re as _re_sec + if not _re_sec.match(r'^[\w\u4e00-\u9fff]+$', table_name): + sys_log.error(f"[Web] [Import] \u274c \u975e\u6cd5\u8cc7\u6599\u8868\u540d\u7a31\u88ab\u62d2\u7d55\uff1a{table_name!r}") + return jsonify({'status': 'error', 'message': '\u975e\u6cd5\u7684\u8cc7\u6599\u8868\u540d\u7a31\uff0c\u532f\u5165\u4e2d\u6b62\u3002'}), 400 + db = DatabaseManager() engine = db.engine @@ -3270,20 +3277,31 @@ def import_excel(): unique_dates = temp_dates.dropna().dt.strftime('%Y/%m/%d').unique() if len(unique_dates) > 0: - date_list = "', '".join([str(d) for d in unique_dates]) - filter_clause = f" WHERE 日期 IN ('{date_list}')" - sys_log.info(f"[Web] [Import] 🔍 優化去重:僅讀取 {len(unique_dates)} 個日期相關的現有資料 (範例: {unique_dates[0]})") + # S4 修復:格式驗證日期值(YYYY/MM/DD 格式,防止注入) + safe_dates = [str(d) for d in unique_dates if _re_sec.match(r'^[\d/\-: ]+$', str(d))] + if safe_dates: + filter_clause = ("日期", safe_dates) + sys_log.info(f"[Web] [Import] 🔍 優化去重:僅讀取 {len(safe_dates)} 個日期相關的現有資料 (範例: {safe_dates[0] if safe_dates else 'N/A'})") elif 'snapshot_date' in df.columns: unique_dates = df['snapshot_date'].dropna().unique() if len(unique_dates) > 0: - date_list = "', '".join([str(d) for d in unique_dates]) - filter_clause = f" WHERE snapshot_date IN ('{date_list}')" - sys_log.info(f"[Web] [Import] 🔍 優化去重:僅讀取 {len(unique_dates)} 個快照日期相關的現有資料") - + # S4 修復:date 值來自 DataFrame,仍做格式驗證防止隱式注入 + safe_dates = [str(d) for d in unique_dates if _re_sec.match(r'^[\d/\-: ]+$', str(d))] + if safe_dates: + filter_clause = ("snapshot_dates", safe_dates) + sys_log.info(f"[Web] [Import] 🔍 優化去重:僅讀取 {len(safe_dates)} 個快照日期相關的現有資料") + if filter_clause: - # V-Debug: 顯示實際執行的 SQL (限開發者日誌) - # sys_log.debug(f"[Web] [Import] SQL Filter: SELECT * FROM {table_name}{filter_clause}") - df_existing = pd.read_sql(f"SELECT * FROM {table_name}{filter_clause}", con=engine) + # S4 修復:使用安全的 IN 查詢(table_name 已白名單驗證,dates 格式已驗證) + if isinstance(filter_clause, tuple): + col_name, date_vals = filter_clause + placeholders = ",".join([f"'{d}'" for d in date_vals]) + df_existing = pd.read_sql( + f'SELECT * FROM {table_name} WHERE {col_name} IN ({placeholders})', + con=engine + ) + else: + df_existing = pd.read_sql(f"SELECT * FROM {table_name}{filter_clause}", con=engine) else: # 備用方案:若無日期欄位,仍讀取全表 sys_log.warning(f"[Web] [Import] ⚠️ 無法根據日期過濾,讀取全表進行去重 (可能效能較差)") diff --git a/config.py b/config.py index 452428a..ee10a2c 100644 --- a/config.py +++ b/config.py @@ -50,8 +50,17 @@ else: # ========================================== # 安全設定(從環境變數讀取) # ========================================== -LOGIN_PASSWORD = os.getenv('LOGIN_PASSWORD', '0936223270') # 進入後台的密碼 -SECRET_KEY = os.getenv('SECRET_KEY', 'your_flask_secret_key') +import sys as _sys + +LOGIN_PASSWORD = os.getenv('LOGIN_PASSWORD') +if not LOGIN_PASSWORD: + print("[FATAL] LOGIN_PASSWORD 環境變數未設定,拒絕啟動。請在 .env 或 Docker 環境設定此值。", file=_sys.stderr) + _sys.exit(1) + +SECRET_KEY = os.getenv('SECRET_KEY') +if not SECRET_KEY or SECRET_KEY in ('your_flask_secret_key', 'change_me', ''): + print("[FATAL] SECRET_KEY 環境變數未設定或仍為不安全預設值,拒絕啟動。請執行:python3 -c \"import secrets; print(secrets.token_hex(32))\" 產生安全金鑰。", file=_sys.stderr) + _sys.exit(1) # ========================================== # 通訊模組設定(從環境變數讀取) diff --git a/database/manager.py b/database/manager.py index 3b0cfc0..1fd720a 100644 --- a/database/manager.py +++ b/database/manager.py @@ -275,6 +275,18 @@ class DatabaseManager: if not external_session: session.close() + # S5 安全修復:table_name 白名單,防止 SQL Injection + ALLOWED_SALES_TABLES = frozenset({ + 'realtime_sales_monthly', + 'daily_sales_snapshot', + 'monthly_summary_analysis', + 'vendor_performance', + 'daily_performance', + 'edm_products', + 'festival_products', + 'competitor_prices', + }) + def get_sales_data(self, table_name='realtime_sales_monthly', start_date=None, end_date=None, months=None): """ 從指定的銷售資料表中讀取資料 @@ -290,23 +302,35 @@ class DatabaseManager: """ import pandas as pd from datetime import datetime, timedelta + import re as _re_sec + + # S5 白名單驗證:只允許已知的銷售資料表 + if table_name not in self.ALLOWED_SALES_TABLES: + raise ValueError(f"[Security] 非法的銷售資料表名稱:{table_name!r}。允許的表名:{sorted(self.ALLOWED_SALES_TABLES)}") try: - # 建立日期過濾條件 + # 建立日期過濾條件(使用參數化查詢防止日期注入) date_filter = "" + params = {} if start_date and end_date: - date_filter = f" WHERE \"日期\" BETWEEN '{start_date}' AND '{end_date}'" + # 格式驗證:只允許 YYYY-MM-DD 格式 + if not _re_sec.match(r'^\d{4}-\d{2}-\d{2}$', str(start_date)) or \ + not _re_sec.match(r'^\d{4}-\d{2}-\d{2}$', str(end_date)): + raise ValueError(f"日期格式不合法:start={start_date}, end={end_date}") + date_filter = " WHERE \"日期\" BETWEEN :start_date AND :end_date" + params = {'start_date': str(start_date), 'end_date': str(end_date)} elif months: # 計算 months 個月前的日期 end_dt = datetime.now() start_dt = end_dt - timedelta(days=months * 30) start_date_str = start_dt.strftime('%Y-%m-%d') end_date_str = end_dt.strftime('%Y-%m-%d') - date_filter = f" WHERE \"日期\" BETWEEN '{start_date_str}' AND '{end_date_str}'" + date_filter = " WHERE \"日期\" BETWEEN :start_date AND :end_date" + params = {'start_date': start_date_str, 'end_date': end_date_str} - # 執行查詢 + # 執行查詢(table_name 已白名單驗證,日期已參數化) sql = f"SELECT * FROM {table_name}{date_filter}" - df = pd.read_sql(text(sql), self.engine) + df = pd.read_sql(text(sql), self.engine, params=params if params else None) # V-Fix: 將數值欄位轉換為數字類型 numeric_columns = ['總業績', '數量', '總成本', '退貨數量', '商品單位售價', diff --git a/services/user_service.py b/services/user_service.py index 952eefd..afbb5d2 100644 --- a/services/user_service.py +++ b/services/user_service.py @@ -407,17 +407,25 @@ class UserService: return query.order_by(LoginHistory.login_time.desc()).all() -def create_initial_admin(db_session, password='Wooo@2026!'): +def create_initial_admin(db_session, password=None): """ 建立初始管理員帳號 Args: db_session: SQLAlchemy Session - password: 初始密碼 + password: 初始密碼(若不傳,從 INITIAL_ADMIN_PASSWORD 環境變數讀取) Returns: tuple: (success, message) """ + import os + if password is None: + password = os.getenv('INITIAL_ADMIN_PASSWORD') + if not password: + raise RuntimeError( + "INITIAL_ADMIN_PASSWORD 環境變數未設定,無法建立初始管理員。" + "請在 .env 或 Docker 環境設定此值。" + ) service = UserService(db_session) # 檢查是否已有管理員