355 lines
16 KiB
Python
355 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
匯入功能路由模組
|
||
包含:Excel/CSV 匯入、月份總表匯入等 API
|
||
|
||
注意:由於匯入邏輯複雜且依賴多個全域變數,
|
||
此模組採用延遲導入方式引用 app.py 中的相關函數和變數
|
||
"""
|
||
|
||
import os
|
||
import time
|
||
import re
|
||
from datetime import datetime, timezone, timedelta
|
||
from flask import Blueprint, request, jsonify
|
||
from auth import login_required
|
||
from sqlalchemy import text, inspect
|
||
import pandas as pd
|
||
|
||
from config import BASE_DIR
|
||
from database.manager import DatabaseManager
|
||
from database.models import MonthlySummaryAnalysis
|
||
from services.logger_manager import SystemLogger
|
||
from utils.text_helpers import extract_snapshot_date_from_filename as _extract_snapshot_date_from_filename
|
||
from utils.validators import validate_upload_file
|
||
|
||
# 時區設定
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# Logger
|
||
sys_log = SystemLogger("ImportRoutes").get_logger()
|
||
|
||
# Blueprint 定義
|
||
import_bp = Blueprint('import', __name__)
|
||
|
||
|
||
# ==========================================
|
||
# 輔助函數 (使用獨立模組,避免循環依賴)
|
||
# ==========================================
|
||
|
||
def _get_cache_refs():
|
||
"""從 cache_manager 導入快取變數與清除 helper。"""
|
||
from services.cache_manager import (
|
||
_SALES_DF_CACHE,
|
||
_SALES_PROCESSED_CACHE,
|
||
clear_sales_cache_for_table,
|
||
)
|
||
return _SALES_DF_CACHE, _SALES_PROCESSED_CACHE, clear_sales_cache_for_table
|
||
|
||
|
||
def _safe_read_sql(table_name, engine):
|
||
"""安全的 SQL 讀取函數"""
|
||
from utils.validators import safe_read_sql
|
||
return safe_read_sql(table_name, engine=engine)
|
||
|
||
|
||
# ==========================================
|
||
# 匯入 API
|
||
# ==========================================
|
||
|
||
@import_bp.route('/api/import_excel', methods=['POST'])
|
||
@login_required
|
||
def import_excel():
|
||
"""
|
||
API: 匯入 Excel/CSV 並自動建表
|
||
已加入檔案上傳安全驗證 (副檔名白名單、檔案名稱清理)
|
||
"""
|
||
try:
|
||
# 1. 檢查是否有上傳檔案
|
||
if 'file' not in request.files:
|
||
return jsonify({'status': 'error', 'message': '未上傳檔案'}), 400
|
||
|
||
file = request.files['file']
|
||
|
||
# 2. 使用安全驗證函數
|
||
is_valid, error_msg, safe_name = validate_upload_file(file)
|
||
if not is_valid:
|
||
sys_log.warning(f"[Security] 檔案上傳驗證失敗 | Filename: {file.filename} | Error: {error_msg}")
|
||
return jsonify({'status': 'error', 'message': error_msg}), 400
|
||
|
||
sys_log.info(f"[Web] [Import] 檔案上傳驗證通過 | Original: {file.filename} | Safe: {safe_name}")
|
||
|
||
# 3. 根據副檔名讀取檔案
|
||
df = None
|
||
filename_lower = safe_name.lower()
|
||
|
||
if filename_lower.endswith(('.xlsx', '.xls')):
|
||
try:
|
||
df = pd.read_excel(file, engine='openpyxl', dtype=str)
|
||
except Exception as e:
|
||
return jsonify({'status': 'error', 'message': f'Excel 讀取失敗: {str(e)}'}), 500
|
||
elif filename_lower.endswith('.csv'):
|
||
try:
|
||
try:
|
||
df = pd.read_csv(file, dtype=str)
|
||
except UnicodeDecodeError:
|
||
file.seek(0)
|
||
df = pd.read_csv(file, encoding='big5', dtype=str)
|
||
except Exception as e:
|
||
return jsonify({'status': 'error', 'message': f'CSV 讀取失敗: {str(e)}'}), 500
|
||
else:
|
||
return jsonify({'status': 'error', 'message': '不支援的檔案格式'}), 400
|
||
|
||
if df is None:
|
||
return jsonify({'status': 'error', 'message': '無法讀取檔案內容'}), 500
|
||
|
||
sys_log.info("[Web] [Import] 偵測到原始匯入模式 (Raw Import Mode) - 已略過智慧清理")
|
||
|
||
# 識別檔案類型
|
||
is_daily_sales = '即時業績' in file.filename and '當日' in file.filename
|
||
is_sales_report = '即時業績' in file.filename and '全月' in file.filename
|
||
|
||
if is_daily_sales:
|
||
table_name = 'daily_sales_snapshot'
|
||
|
||
# 智慧匯入 - 根據 Excel 內的日期欄位自動拆分 snapshot_date
|
||
date_col = None
|
||
for possible_col in ['日期', '訂單日期', '交易日期', 'Date']:
|
||
if possible_col in df.columns:
|
||
date_col = possible_col
|
||
break
|
||
|
||
if date_col:
|
||
sys_log.info(f"[Web] [Import] 使用 Excel 內的「{date_col}」欄位作為快照日期")
|
||
df['snapshot_date'] = pd.to_datetime(df[date_col], errors='coerce').dt.strftime('%Y-%m-%d')
|
||
|
||
invalid_count = df['snapshot_date'].isna().sum()
|
||
if invalid_count > 0:
|
||
sys_log.warning(f"[Web] [Import] 發現 {invalid_count} 筆無效日期資料,已移除")
|
||
df = df.dropna(subset=['snapshot_date'])
|
||
|
||
unique_dates = df['snapshot_date'].nunique()
|
||
sys_log.info(f"[Web] [Import] 識別為當日業績報表,包含 {unique_dates} 個不同日期")
|
||
else:
|
||
snapshot_date = _extract_snapshot_date_from_filename(file.filename)
|
||
if not snapshot_date:
|
||
return jsonify({'status': 'error', 'message': '無法從檔名提取日期,且 Excel 中無日期欄位'}), 400
|
||
df['snapshot_date'] = snapshot_date
|
||
sys_log.info(f"[Web] [Import] Excel 無日期欄位,使用檔名日期: {snapshot_date}")
|
||
elif is_sales_report:
|
||
table_name = 'realtime_sales_monthly'
|
||
else:
|
||
filename_no_ext = os.path.splitext(file.filename)[0]
|
||
table_name = re.sub(r'[^\w\u4e00-\u9fff]+', '_', filename_no_ext).strip('_')
|
||
|
||
if not table_name:
|
||
table_name = f"import_{int(time.time())}"
|
||
|
||
db = DatabaseManager()
|
||
engine = db.engine
|
||
|
||
sys_log.info(f"[Web] [Import] 正在寫入資料庫: {engine.url}")
|
||
|
||
# 取得快取引用
|
||
_SALES_DF_CACHE, _SALES_PROCESSED_CACHE, clear_sales_cache_for_table = _get_cache_refs()
|
||
|
||
if table_name in ['realtime_sales_monthly', 'daily_sales_snapshot']:
|
||
try:
|
||
inspector = inspect(engine)
|
||
if not inspector.has_table(table_name):
|
||
sys_log.info(f"[Web] [Import] 資料表不存在,建立新表: {table_name}")
|
||
df.to_sql(table_name, con=engine, if_exists='replace', index=False)
|
||
rows_imported = len(df)
|
||
message = f'匯入成功!已建立新資料表並寫入 {rows_imported} 筆資料。'
|
||
else:
|
||
sys_log.info(f"[Web] [Import] 資料表已存在,執行自動去重 (Deduplication)...")
|
||
|
||
# 讀取現有資料
|
||
try:
|
||
filter_clause = ""
|
||
unique_dates = []
|
||
|
||
if '日期' in df.columns:
|
||
temp_dates = pd.to_datetime(df['日期'], errors='coerce')
|
||
unique_dates = temp_dates.dropna().dt.strftime('%Y/%m/%d').unique()
|
||
if len(unique_dates) > 0:
|
||
sys_log.info(f"[Web] [Import] 優化去重:僅讀取 {len(unique_dates)} 個日期相關的現有資料")
|
||
elif 'snapshot_date' in df.columns:
|
||
unique_dates = df['snapshot_date'].dropna().unique()
|
||
if len(unique_dates) > 0:
|
||
sys_log.info(f"[Web] [Import] 優化去重:僅讀取 {len(unique_dates)} 個快照日期相關的現有資料")
|
||
|
||
if len(unique_dates) > 0:
|
||
params = {f"d{i}": str(d) for i, d in enumerate(unique_dates)}
|
||
param_names = ", ".join([f":d{i}" for i in range(len(unique_dates))])
|
||
date_col = "日期" if '日期' in df.columns else "snapshot_date"
|
||
sql = text(f"SELECT * FROM {table_name} WHERE {date_col} IN ({param_names})")
|
||
df_existing = pd.read_sql(sql, con=engine, params=params)
|
||
else:
|
||
sys_log.warning(f"[Web] [Import] 無法根據日期過濾,讀取全表進行去重")
|
||
df_existing = pd.read_sql(f"SELECT * FROM {table_name}", con=engine)
|
||
|
||
except Exception as e:
|
||
sys_log.warning(f"[Web] [Import] 讀取舊資料失敗 ({e}),略過去重直接累加。")
|
||
df_existing = pd.DataFrame()
|
||
|
||
rows_to_write = df
|
||
|
||
if not df_existing.empty:
|
||
common_cols = list(set(df.columns) & set(df_existing.columns))
|
||
|
||
if table_name == 'daily_sales_snapshot':
|
||
if 'snapshot_date' in common_cols and '訂單編號' in common_cols:
|
||
common_cols = ['snapshot_date', '訂單編號']
|
||
sys_log.info(f"[Web] [Import] 使用去重鍵: snapshot_date + 訂單編號")
|
||
|
||
if common_cols:
|
||
def normalize_series(s):
|
||
return s.astype(str).str.strip().str.replace(r'\.0$', '', regex=True)
|
||
|
||
df_str = df[common_cols].apply(normalize_series).fillna('')
|
||
existing_str = df_existing[common_cols].apply(normalize_series).fillna('')
|
||
existing_str = existing_str.drop_duplicates()
|
||
|
||
merged = df_str.merge(existing_str, on=common_cols, how='left', indicator=True)
|
||
rows_to_write = df[merged['_merge'] == 'left_only']
|
||
|
||
duplicates_count = len(df) - len(rows_to_write)
|
||
sys_log.info(f"[Web] [Import] 自動去重: 發現 {duplicates_count} 筆重複資料,已忽略。")
|
||
|
||
# 寫入新資料
|
||
if not rows_to_write.empty:
|
||
rows_to_write.to_sql(table_name, con=engine, if_exists='append', index=False)
|
||
rows_imported = len(rows_to_write)
|
||
message = f'匯入成功!已去重並新增 {rows_imported} 筆資料。'
|
||
else:
|
||
rows_imported = 0
|
||
message = '匯入完成,但所有資料皆已存在 (重複),無新增數據。'
|
||
|
||
clear_sales_cache_for_table(table_name)
|
||
sys_log.info(f"[Web] [Cache] 已清除業績分析快取: {table_name}")
|
||
|
||
return jsonify({'status': 'success', 'message': message, 'rows': rows_imported, 'table': table_name})
|
||
|
||
except Exception as de:
|
||
sys_log.error(f"[Web] [Import] 業績報表匯入去重或寫入時發生錯誤: {de}")
|
||
return jsonify({'status': 'error', 'message': f'業績報表匯入失敗: {de}'}), 500
|
||
else:
|
||
# 對於非業績報表,維持覆蓋邏輯
|
||
sys_log.info(f"[Web] [Import] 使用覆蓋模式 (replace)寫入資料表: {table_name}")
|
||
df.to_sql(table_name, con=engine, if_exists='replace', index=False)
|
||
|
||
clear_sales_cache_for_table(table_name)
|
||
sys_log.info(f"[Web] [Cache] 已清除業績分析快取: {table_name}")
|
||
|
||
return jsonify({
|
||
'status': 'success',
|
||
'message': f'通用匯入成功!資料已覆蓋至 {table_name}。',
|
||
'rows': len(df),
|
||
'table': table_name
|
||
})
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Import] 檔案匯入發生嚴重錯誤 | Error: {str(e)}")
|
||
return jsonify({'status': 'error', 'message': f'檔案匯入失敗: {str(e)}'}), 500
|
||
|
||
|
||
@import_bp.route('/api/import/monthly_summary', methods=['POST'])
|
||
@login_required
|
||
def import_monthly_summary():
|
||
"""API: 匯入月份總表數據分析"""
|
||
try:
|
||
if 'file' not in request.files:
|
||
return jsonify({'status': 'error', 'message': '未上傳檔案'}), 400
|
||
|
||
file = request.files['file']
|
||
is_valid, error_msg, safe_name = validate_upload_file(file)
|
||
if not is_valid:
|
||
sys_log.warning(f"[Security] 月份總表上傳驗證失敗: {error_msg}")
|
||
return jsonify({'status': 'error', 'message': error_msg}), 400
|
||
|
||
# 讀取 Excel
|
||
try:
|
||
df = pd.read_excel(file, engine='openpyxl')
|
||
except Exception as e:
|
||
return jsonify({'status': 'error', 'message': f'Excel 讀取失敗: {str(e)}'}), 500
|
||
|
||
if df.empty:
|
||
return jsonify({'status': 'error', 'message': '檔案內容為空'}), 400
|
||
|
||
# 欄位對照表
|
||
mapping = {
|
||
'年': 'year', '月': 'month', '商品部': 'department', '3C百貨': 'category_3c',
|
||
'處別': 'division', '科別': 'section', '區ID': 'area_id', '區名稱': 'area_name',
|
||
'商品_PM': 'pm_name', '品牌名稱_合併': 'brand_name', '廠商編號': 'vendor_id',
|
||
'廠商名稱': 'vendor_name', '借採轉': 'trade_type', '件單價': 'unit_price',
|
||
'銷售額_本月': 'sales_amt_curr', '銷售額_上月': 'sales_amt_prev', '銷售額_去年同期': 'sales_amt_yoa',
|
||
'毛1額_本月': 'profit_amt_curr', '毛1額_上月': 'profit_amt_prev', '毛1額_去年同期': 'profit_amt_yoa',
|
||
'折扣金額_本月': 'discount_amt_curr', '折扣金額_上月': 'discount_amt_prev', '折扣金額_去年同期': 'discount_amt_yoa',
|
||
'折價券_本月': 'coupon_amt_curr', '折價券_上月': 'coupon_amt_prev', '折價券_去年同期': 'coupon_amt_yoa',
|
||
'其他行銷活動_本月': 'other_mkt_curr', '其他行銷活動_上月': 'other_mkt_prev', '其他行銷活動_去年同期': 'other_mkt_yoa',
|
||
'點我折_本月': 'spot_disc_curr', '點我折_上月': 'spot_disc_prev', '點我折_去年同期': 'spot_disc_yoa',
|
||
'點數折抵_本月': 'point_disc_curr', '點數折抵_上月': 'point_disc_prev', '點數折抵_去年同期': 'point_disc_yoa',
|
||
'銷售量_本月': 'sales_vol_curr', '銷售量_上月': 'sales_vol_prev', '銷售量_去年同期': 'sales_vol_yoa',
|
||
'轉換率': 'conv_rate', '瀏覽數_本月': 'views_curr', '瀏覽數_上月': 'views_prev', '瀏覽數_去年同期': 'views_yoa'
|
||
}
|
||
|
||
# 檢查必備欄位
|
||
current_cols = df.columns.tolist()
|
||
import_mapping = {k: v for k, v in mapping.items() if k in current_cols}
|
||
|
||
if len(import_mapping) < 5:
|
||
return jsonify({'status': 'error', 'message': '檔案欄位不符,請確認是否為正確的月份業績總表'}), 400
|
||
|
||
# 重新命名與清理資料
|
||
target_df = df[list(import_mapping.keys())].rename(columns=import_mapping)
|
||
|
||
# 轉換數值欄位
|
||
numeric_cols = [v for k, v in import_mapping.items() if v not in [
|
||
'department', 'category_3c', 'division', 'section', 'area_id', 'area_name',
|
||
'pm_name', 'brand_name', 'vendor_name', 'trade_type'
|
||
]]
|
||
for col in numeric_cols:
|
||
target_df[col] = pd.to_numeric(target_df[col], errors='coerce').fillna(0)
|
||
|
||
# 寫入資料庫
|
||
db = DatabaseManager()
|
||
engine = db.engine
|
||
|
||
try:
|
||
years_months = target_df[['year', 'month']].drop_duplicates()
|
||
|
||
with engine.begin() as conn:
|
||
# 刪除該月份舊資料
|
||
for _, row in years_months.iterrows():
|
||
conn.execute(
|
||
text("DELETE FROM monthly_summary_analysis WHERE year = :y AND month = :m"),
|
||
{'y': int(row['year']), 'm': int(row['month'])}
|
||
)
|
||
|
||
# 批量寫入
|
||
target_df.to_sql(
|
||
'monthly_summary_analysis',
|
||
con=conn,
|
||
if_exists='append',
|
||
index=False,
|
||
chunksize=2000,
|
||
method='multi'
|
||
)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Import] 匯入資料庫失敗: {e}")
|
||
raise e
|
||
|
||
sys_log.info(f"[Web] [Import] 月份總表資料匯入成功 | 筆數: {len(target_df)}")
|
||
return jsonify({
|
||
'status': 'success',
|
||
'message': f'成功匯入 {len(target_df)} 筆分析數據。',
|
||
'rows': len(target_df)
|
||
})
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Import] 月份總表匯入嚴重失敗: {str(e)}")
|
||
return jsonify({'status': 'error', 'message': f'匯入失敗: {str(e)}'}), 500
|