refactor(routes): 刪除 app.py import monthly 重複路由

ADR-017 Phase 3f-1 import/monthly sprint
This commit is contained in:
OoO
2026-04-29 21:06:00 +08:00
parent 512f93c6b1
commit 1f88c2817b

757
app.py
View File

@@ -1712,763 +1712,6 @@ def download_backup(filename):
sys_log.error(f"[System] 下載備份失敗 | Error: {e}")
return jsonify({'error': '下載失敗'}), 500
@app.route('/api/import_excel', methods=['POST'])
@login_required
def import_excel():
"""
API: 匯入 Excel/CSV 並自動建表
已加入檔案上傳安全驗證 (副檔名白名單、檔案名稱清理)
"""
try:
# 1. 檢查是否有上傳檔案
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': '未上傳檔案'}), 400
file = request.files['file']
# 2. 使用安全驗證函數
is_valid, error_msg, safe_name = validate_upload_file(file)
if not is_valid:
sys_log.warning(f"[Security] 檔案上傳驗證失敗 | Filename: {file.filename} | Error: {error_msg}")
return jsonify({'status': 'error', 'message': error_msg}), 400
sys_log.info(f"[Web] [Import] 檔案上傳驗證通過 | Original: {file.filename} | Safe: {safe_name}")
# 3. 根據副檔名讀取檔案
df = None
filename_lower = safe_name.lower()
if filename_lower.endswith(('.xlsx', '.xls')):
try:
df = pd.read_excel(file, engine='openpyxl', dtype=str)
except Exception as e:
return jsonify({'status': 'error', 'message': f'Excel 讀取失敗: {str(e)}'}), 500
elif filename_lower.endswith('.csv'):
try:
# V-New: 嘗試用多種編碼讀取 CSV
try:
df = pd.read_csv(file, dtype=str)
except UnicodeDecodeError:
file.seek(0) # 重置文件指針
df = pd.read_csv(file, encoding='big5', dtype=str)
except Exception as e:
return jsonify({'status': 'error', 'message': f'CSV 讀取失敗: {str(e)}'}), 500
else:
# 理論上不會到這裡,因為 validate_upload_file 已經檢查過
return jsonify({'status': 'error', 'message': '不支援的檔案格式'}), 400
if df is None:
return jsonify({'status': 'error', 'message': '無法讀取檔案內容'}), 500
# V-New: 增加日誌以確認目前為原始匯入模式 (提醒使用者已略過清理)
sys_log.info("[Web] [Import] ⚠️ 偵測到原始匯入模式 (Raw Import Mode) - 已略過智慧清理")
# V-Fix: 1. 先標準化欄位名稱,確保後續關鍵字比對準確
# df.columns = [str(c).strip().replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '').replace('/', '_') for c in df.columns]
# V-Fix: 2. 執行智慧資料清理 (v3 保守模式 - 解決 'F' 被強制轉 0 的問題)
# sys_log.info("[Web] [Import] 執行智慧資料清理程序 (v3 保守模式)...")
# 定義必須是數值的欄位關鍵字 (這些欄位必須是數字,髒資料轉 0 以免影響計算)
# numeric_keywords = ['序號', '數量', '單價', '金額', '成本', '毛利', '售價', '應收', '營收',
# 'Quantity', 'Qty', 'Price', 'Amount', 'Cost', 'Profit', 'Sales', 'Revenue']
# for col in df.columns:
# # 判斷是否為強制數值欄位
# is_force_numeric = any(k in col for k in numeric_keywords)
# if df[col].dtype == 'object':
# if is_force_numeric:
# # 策略 A: 強制數值欄位 -> 激進清理 (保留數字,其餘轉 0)
# # 先移除千分位逗號等非數值字符
# cleaned_series = df[col].astype(str).str.replace(r'[^\d.-]', '', regex=True)
# converted_series = pd.to_numeric(cleaned_series, errors='coerce')
# df[col] = converted_series.fillna(0)
# sys_log.info(f"[Web] [Import] 強制清理數值欄位 '{col}' (髒資料已轉為 0)")
# else:
# # 策略 B: 一般欄位 -> 保守檢查 (保留 'F' 等文字)
# # 直接嘗試轉換,不移除文字
# converted_series = pd.to_numeric(df[col], errors='coerce')
# # 檢查有多少值變成了 NaN (原本不是 NaN/空字串,但轉換後變成 NaN 的)
# original_valid_mask = df[col].notna() & (df[col].astype(str).str.strip() != '')
# converted_valid_mask = converted_series.notna()
# loss_count = (original_valid_mask & ~converted_valid_mask).sum()
# if loss_count == 0:
# # 如果沒有資料損失 (代表全是數字或空值),才轉換
# df[col] = converted_series
# else:
# # 有資料損失 (例如包含 'F'),保留為文字
# sys_log.info(f"[Web] [Import] 欄位 '{col}' 保留為文字 (含 {loss_count} 筆非數值資料,如 'F')")
# 識別檔案類型
is_daily_sales = '即時業績' in file.filename and '當日' in file.filename
is_sales_report = '即時業績' in file.filename and '全月' in file.filename
if is_daily_sales:
table_name = 'daily_sales_snapshot'
# V-New: 智慧匯入 - 根據 Excel 內的日期欄位自動拆分 snapshot_date
date_col = None
for possible_col in ['日期', '訂單日期', '交易日期', 'Date']:
if possible_col in df.columns:
date_col = possible_col
break
if date_col:
# 使用 Excel 內的日期欄位作為 snapshot_date
sys_log.info(f"[Web] [Import] 使用 Excel 內的「{date_col}」欄位作為快照日期")
# 將日期欄位轉換為標準格式 YYYY-MM-DD
df['snapshot_date'] = pd.to_datetime(df[date_col], errors='coerce').dt.strftime('%Y-%m-%d')
# 移除無效日期的資料
invalid_count = df['snapshot_date'].isna().sum()
if invalid_count > 0:
sys_log.warning(f"[Web] [Import] 發現 {invalid_count} 筆無效日期資料,已移除")
df = df.dropna(subset=['snapshot_date'])
unique_dates = df['snapshot_date'].nunique()
sys_log.info(f"[Web] [Import] 識別為當日業績報表,包含 {unique_dates} 個不同日期")
else:
# 備用方案:從檔名提取日期
snapshot_date = extract_snapshot_date_from_filename(file.filename)
if not snapshot_date:
return jsonify({'status': 'error', 'message': '無法從檔名提取日期,且 Excel 中無日期欄位'}), 400
df['snapshot_date'] = snapshot_date
sys_log.info(f"[Web] [Import] Excel 無日期欄位,使用檔名日期: {snapshot_date}")
elif is_sales_report:
table_name = 'realtime_sales_monthly'
else:
filename_no_ext = os.path.splitext(file.filename)[0]
table_name = re.sub(r'[^\w\u4e00-\u9fff]+', '_', filename_no_ext).strip('_')
if not table_name: table_name = f"import_{int(time.time())}"
# S4 \u5b89\u5168\u4fee\u5fa9\uff1atable_name \u767d\u540d\u55ae\u9a57\u8b49\uff0c\u9632\u6b62 SQL Injection
# \u5141\u8a31\uff1a\u5b57\u6bcd\u3001\u6578\u5b57\u3001\u5e95\u7dda\u3001\u4e2d\u6587\u5b57\uff08\u4e0d\u5141\u8a31\u7a7a\u683c\u3001\u5f15\u865f\u3001\u6ce8\u5165\u7b26\u865f\uff09
import re as _re_sec
if not _re_sec.match(r'^[\w\u4e00-\u9fff]+$', table_name):
sys_log.error(f"[Web] [Import] \u274c \u975e\u6cd5\u8cc7\u6599\u8868\u540d\u7a31\u88ab\u62d2\u7d55\uff1a{table_name!r}")
return jsonify({'status': 'error', 'message': '\u975e\u6cd5\u7684\u8cc7\u6599\u8868\u540d\u7a31\uff0c\u532f\u5165\u4e2d\u6b62\u3002'}), 400
db = DatabaseManager()
engine = db.engine
# V-Debug: 顯示實際寫入的資料庫路徑
sys_log.info(f"[Web] [Import] 正在寫入資料庫: {engine.url}")
if table_name in ['realtime_sales_monthly', 'daily_sales_snapshot']:
try:
# V-Fix: 實作自動去重邏輯 (Deduplication)
# 1. 檢查資料表是否存在
inspector = inspect(engine)
if not inspector.has_table(table_name):
sys_log.info(f"[Web] [Import] 資料表不存在,建立新表: {table_name}")
df.to_sql(table_name, con=engine, if_exists='replace', index=False)
rows_imported = len(df)
message = f'匯入成功!已建立新資料表並寫入 {rows_imported} 筆資料。'
else:
sys_log.info(f"[Web] [Import] 資料表已存在,執行自動去重 (Deduplication)...")
# 2. 讀取現有資料(優化:僅讀取相關日期的資料以進行去重)
try:
# 嘗試根據 incoming df 的日期範圍來過濾現有資料
filter_clause = ""
if '日期' in df.columns:
# V-Fix: 確保日期格式與資料庫一致 (YYYY/MM/DD) 以便 SQL IN 查詢能正確比對
# 有時 Pandas 會將其轉換為 datetime 或 2024-01-01 格式
temp_dates = pd.to_datetime(df['日期'], errors='coerce')
unique_dates = temp_dates.dropna().dt.strftime('%Y/%m/%d').unique()
if len(unique_dates) > 0:
# S4 修復格式驗證日期值YYYY/MM/DD 格式,防止注入)
safe_dates = [str(d) for d in unique_dates if _re_sec.match(r'^[\d/\-: ]+$', str(d))]
if safe_dates:
filter_clause = ("日期", safe_dates)
sys_log.info(f"[Web] [Import] 🔍 優化去重:僅讀取 {len(safe_dates)} 個日期相關的現有資料 (範例: {safe_dates[0] if safe_dates else 'N/A'})")
elif 'snapshot_date' in df.columns:
unique_dates = df['snapshot_date'].dropna().unique()
if len(unique_dates) > 0:
# S4 修復date 值來自 DataFrame仍做格式驗證防止隱式注入
safe_dates = [str(d) for d in unique_dates if _re_sec.match(r'^[\d/\-: ]+$', str(d))]
if safe_dates:
filter_clause = ("snapshot_dates", safe_dates)
sys_log.info(f"[Web] [Import] 🔍 優化去重:僅讀取 {len(safe_dates)} 個快照日期相關的現有資料")
if filter_clause:
# S4 修復:使用安全的 IN 查詢table_name 已白名單驗證dates 格式已驗證)
if isinstance(filter_clause, tuple):
col_name, date_vals = filter_clause
placeholders = ",".join([f"'{d}'" for d in date_vals])
df_existing = pd.read_sql(
f'SELECT * FROM {table_name} WHERE {col_name} IN ({placeholders})',
con=engine
)
else:
df_existing = pd.read_sql(f"SELECT * FROM {table_name}{filter_clause}", con=engine)
else:
# 備用方案:若無日期欄位,仍讀取全表
sys_log.warning(f"[Web] [Import] ⚠️ 無法根據日期過濾,讀取全表進行去重 (可能效能較差)")
df_existing = safe_read_sql(table_name, engine=engine)
except Exception as e:
sys_log.warning(f"[Web] [Import] ⚠️ 讀取舊資料失敗 ({e}),略過去重直接累加。")
df_existing = pd.DataFrame()
rows_to_write = df
if not df_existing.empty:
# 3. 執行比對 (找出共有欄位)
common_cols = list(set(df.columns) & set(df_existing.columns))
# 針對 daily_sales_snapshot 使用特定去重鍵
if table_name == 'daily_sales_snapshot':
# 優先使用 snapshot_date + 訂單編號
if 'snapshot_date' in common_cols and '訂單編號' in common_cols:
common_cols = ['snapshot_date', '訂單編號']
sys_log.info(f"[Web] [Import] 使用去重鍵: snapshot_date + 訂單編號")
elif 'snapshot_date' in common_cols:
# 備用方案:使用所有共有欄位
sys_log.info(f"[Web] [Import] 使用全欄位去重 (共 {len(common_cols)} 個欄位)")
if common_cols:
# 轉換為字串以確保比對準確 (處理 NaN 與型別差異)
# V-Fix: 加強去重邏輯,處理 '100.0' vs '100' 的問題
def normalize_series(s):
return s.astype(str).str.strip().str.replace(r'\.0$', '', regex=True)
df_str = df[common_cols].apply(normalize_series).fillna('')
existing_str = df_existing[common_cols].apply(normalize_series).fillna('')
# 移除 df_existing 中的重複項 (優化 merge 效能)
existing_str = existing_str.drop_duplicates()
# 使用 merge 找出 df 中已存在的資料
merged = df_str.merge(existing_str, on=common_cols, how='left', indicator=True)
# 只保留 'left_only' 的資料 (即新資料)
rows_to_write = df[merged['_merge'] == 'left_only']
duplicates_count = len(df) - len(rows_to_write)
sys_log.info(f"[Web] [Import] 🔍 自動去重: 發現 {duplicates_count} 筆重複資料,已忽略。")
# 4. 寫入新資料
if not rows_to_write.empty:
rows_to_write.to_sql(table_name, con=engine, if_exists='append', index=False)
rows_imported = len(rows_to_write)
message = f'匯入成功!已去重並新增 {rows_imported} 筆資料。'
else:
rows_imported = 0
message = '匯入完成,但所有資料皆已存在 (重複),無新增數據。'
# V-Fix: 無條件清除快取,確保行事曆能夠顯示最新資料
# 原問題:只有 rows_imported > 0 時才清除快取,導致匯入後行事曆不更新
if table_name in _SALES_DF_CACHE:
del _SALES_DF_CACHE[table_name]
sys_log.info(f"[Web] [Cache] 🧹 已清除資料表快取: {table_name}")
# V-Opt: 清除所有相關的處理後快取(包含不同 data_range 的快取)
cache_keys_to_delete = [key for key in _SALES_PROCESSED_CACHE.keys() if key.startswith(table_name)]
for cache_key in cache_keys_to_delete:
del _SALES_PROCESSED_CACHE[cache_key]
sys_log.info(f"[Web] [Cache] 🧹 已清除處理後快取: {cache_key}")
return jsonify({'status': 'success', 'message': message, 'rows': rows_imported, 'table': table_name})
except Exception as de:
sys_log.error(f"[Web] [Import] 業績報表匯入去重或寫入時發生錯誤: {de}")
return jsonify({'status': 'error', 'message': f'業績報表匯入失敗: {de}'}), 500
else:
# 對於非業績報表,維持覆蓋邏輯
sys_log.info(f"[Web] [Import] 使用覆蓋模式 (replace)寫入資料表: {table_name}")
df.to_sql(table_name, con=engine, if_exists='replace', index=False)
if table_name in _SALES_DF_CACHE:
del _SALES_DF_CACHE[table_name]
sys_log.info(f"[Web] [Cache] 🧹 已清除資料表快取: {table_name}")
# V-Opt: 清除所有相關的處理後快取
cache_keys_to_delete = [key for key in _SALES_PROCESSED_CACHE.keys() if key.startswith(table_name)]
for cache_key in cache_keys_to_delete:
del _SALES_PROCESSED_CACHE[cache_key]
sys_log.info(f"[Web] [Cache] 🧹 已清除處理後快取: {cache_key}")
return jsonify({'status': 'success', 'message': f'通用匯入成功!資料已覆蓋至 {table_name}', 'rows': len(df), 'table': table_name})
except Exception as e:
sys_log.error(f"[Web] [Import] ❌ 檔案匯入發生嚴重錯誤 | Error: {str(e)}")
return jsonify({'status': 'error', 'message': f'檔案匯入失敗: {str(e)}'}), 500
@app.route('/api/import/monthly_summary', methods=['POST'])
def import_monthly_summary():
"""API: 匯入月份總表數據分析"""
try:
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': '未上傳檔案'}), 400
file = request.files['file']
is_valid, error_msg, safe_name = validate_upload_file(file)
if not is_valid:
sys_log.warning(f"[Security] 月份總表上傳驗證失敗: {error_msg}")
return jsonify({'status': 'error', 'message': error_msg}), 400
# 讀取 Excel
try:
df = pd.read_excel(file, engine='openpyxl')
except Exception as e:
return jsonify({'status': 'error', 'message': f'Excel 讀取失敗: {str(e)}'}), 500
if df.empty:
return jsonify({'status': 'error', 'message': '檔案內容為空'}), 400
# 欄位對照表 (對應 Excel 繁體中文標題與資料庫英文欄位)
mapping = {
'': 'year', '': 'month', '商品部': 'department', '3C百貨': 'category_3c',
'處別': 'division', '科別': 'section', '區ID': 'area_id', '區名稱': 'area_name',
'商品_PM': 'pm_name', '品牌名稱_合併': 'brand_name', '廠商編號': 'vendor_id',
'廠商名稱': 'vendor_name', '借採轉': 'trade_type', '件單價': 'unit_price',
'銷售額_本月': 'sales_amt_curr', '銷售額_上月': 'sales_amt_prev', '銷售額_去年同期': 'sales_amt_yoa',
'毛1額_本月': 'profit_amt_curr', '毛1額_上月': 'profit_amt_prev', '毛1額_去年同期': 'profit_amt_yoa',
'折扣金額_本月': 'discount_amt_curr', '折扣金額_上月': 'discount_amt_prev', '折扣金額_去年同期': 'discount_amt_yoa',
'折價券_本月': 'coupon_amt_curr', '折價券_上月': 'coupon_amt_prev', '折價券_去年同期': 'coupon_amt_yoa',
'其他行銷活動_本月': 'other_mkt_curr', '其他行銷活動_上月': 'other_mkt_prev', '其他行銷活動_去年同期': 'other_mkt_yoa',
'點我折_本月': 'spot_disc_curr', '點我折_上月': 'spot_disc_prev', '點我折_去年同期': 'spot_disc_yoa',
'點數折抵_本月': 'point_disc_curr', '點數折抵_上月': 'point_disc_prev', '點數折抵_去年同期': 'point_disc_yoa',
'銷售量_本月': 'sales_vol_curr', '銷售量_上月': 'sales_vol_prev', '銷售量_去年同期': 'sales_vol_yoa',
'轉換率': 'conv_rate', '瀏覽數_本月': 'views_curr', '瀏覽數_上月': 'views_prev', '瀏覽數_去年同期': 'views_yoa'
}
# 檢查必備欄位 (寬鬆檢查:只要有 mapping 中的欄位就匯入)
current_cols = df.columns.tolist()
import_mapping = {k: v for k, v in mapping.items() if k in current_cols}
if len(import_mapping) < 5: # 至少要有幾個維度
return jsonify({'status': 'error', 'message': '檔案欄位不符,請確認是否為正確的月份業績總表'}), 400
# 重新命名與清理資料
target_df = df[list(import_mapping.keys())].rename(columns=import_mapping)
# 轉換數值欄位,填補 NaN
numeric_cols = [v for k, v in import_mapping.items() if v not in [
'department', 'category_3c', 'division', 'section', 'area_id', 'area_name',
'pm_name', 'brand_name', 'vendor_name', 'trade_type'
]]
for col in numeric_cols:
target_df[col] = pd.to_numeric(target_df[col], errors='coerce').fillna(0)
# 寫入資料庫 - 優化效能版本 (Phase 9 Optimization)
db = DatabaseManager()
engine = db.engine
try:
# 取得要匯入的年月份,用於先行刪除重複資料
years_months = target_df[['year', 'month']].drop_duplicates()
with engine.begin() as conn:
# 1. 刪除該月份舊資料 (Transaction 開始)
for _, row in years_months.iterrows():
conn.execute(text("DELETE FROM monthly_summary_analysis WHERE year = :y AND month = :m"),
{'y': int(row['year']), 'm': int(row['month'])})
# 2. 批量寫入 (使用 multi 方法加速SQLite chunksize 建議 2000 避免參數過多)
# 比照 realtime_sales_monthly 的優化方式
target_df.to_sql('monthly_summary_analysis',
con=conn,
if_exists='append',
index=False,
chunksize=2000,
method='multi')
except Exception as e:
sys_log.error(f"[Web] [Import] 匯入資料庫失敗: {e}")
raise e
sys_log.info(f"[Web] [Import] 🚀 月份總表資料匯入成功 | 筆數: {len(target_df)}")
return jsonify({
'status': 'success',
'message': f'成功匯入 {len(target_df)} 筆分析數據。',
'rows': len(target_df)
})
except Exception as e:
sys_log.error(f"[Web] [Import] ❌ 月份總表匯入嚴重失敗: {str(e)}")
return jsonify({'status': 'error', 'message': f'匯入失敗: {str(e)}'}), 500
@app.route('/monthly_summary_analysis')
def monthly_summary_analysis_page():
"""月份總表數據分析展示頁 (Phase 9)"""
return render_template('monthly_summary_analysis.html',
datetime_now=datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'),
system_version=SYSTEM_VERSION)
@app.route('/api/monthly_summary_data')
def get_monthly_summary_data():
"""API: 取得月份總表數據與分析指標 (Phase 9)"""
year = request.args.get('year', type=int)
month = request.args.get('month', type=int)
division = request.args.get('division')
pm_name = request.args.get('pm_name')
brand_name = request.args.get('brand_name')
vendor_name = request.args.get('vendor')
area_name = request.args.get('area_name')
trade_type = request.args.get('trade_type')
limit = request.args.get('limit', default=1000, type=int)
# DEBUG LOGGING
import logging
debug_logger = logging.getLogger('app')
debug_logger.info(f"🔍 [API Debug] Request Args: {request.args}")
db = DatabaseManager()
session = db.get_session()
try:
# 基礎查詢
query = session.query(MonthlySummaryAnalysis)
# 套用過濾
if year: query = query.filter(MonthlySummaryAnalysis.year == year)
if month: query = query.filter(MonthlySummaryAnalysis.month == month)
if division: query = query.filter(MonthlySummaryAnalysis.division == division)
if pm_name: query = query.filter(MonthlySummaryAnalysis.pm_name == pm_name)
if brand_name: query = query.filter(MonthlySummaryAnalysis.brand_name == brand_name)
if vendor_name: query = query.filter(MonthlySummaryAnalysis.vendor_name == vendor_name)
if area_name:
if ',' in area_name:
query = query.filter(MonthlySummaryAnalysis.area_name.in_(area_name.split(',')))
else:
query = query.filter(MonthlySummaryAnalysis.area_name == area_name)
if trade_type: query = query.filter(MonthlySummaryAnalysis.trade_type == trade_type)
# 取得統計數據 (KPIs)
kpi_query = session.query(
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('total_sales'),
func.sum(MonthlySummaryAnalysis.sales_amt_prev).label('total_sales_prev'),
func.sum(MonthlySummaryAnalysis.sales_amt_yoa).label('total_sales_yoa'),
func.sum(MonthlySummaryAnalysis.profit_amt_curr).label('total_profit'),
func.sum(MonthlySummaryAnalysis.sales_vol_curr).label('total_vol'),
func.sum(MonthlySummaryAnalysis.views_curr).label('total_views')
)
# 同樣套用過濾到 KPI
if year: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.year == year)
if month: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.month == month)
if division: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.division == division)
if pm_name: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.pm_name == pm_name)
if brand_name: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.brand_name == brand_name)
if vendor_name: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.vendor_name == vendor_name)
if area_name:
if ',' in area_name:
kpi_query = kpi_query.filter(MonthlySummaryAnalysis.area_name.in_(area_name.split(',')))
else:
kpi_query = kpi_query.filter(MonthlySummaryAnalysis.area_name == area_name)
if trade_type: kpi_query = kpi_query.filter(MonthlySummaryAnalysis.trade_type == trade_type)
kpi_res = kpi_query.one()
# 取得總筆數與月數
total_rows = session.query(func.count(MonthlySummaryAnalysis.id))
total_months_query = session.query(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month).distinct()
if year:
total_rows = total_rows.filter(MonthlySummaryAnalysis.year == year)
total_months_query = total_months_query.filter(MonthlySummaryAnalysis.year == year)
if month:
total_rows = total_rows.filter(MonthlySummaryAnalysis.month == month)
total_rows = total_rows.scalar()
total_months = total_months_query.count()
# 取得趨勢數據 (按月加總)
trend_query = session.query(
MonthlySummaryAnalysis.year,
MonthlySummaryAnalysis.month,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales')
).group_by(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month).order_by(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month)
if division: trend_query = trend_query.filter(MonthlySummaryAnalysis.division == division)
if pm_name: trend_query = trend_query.filter(MonthlySummaryAnalysis.pm_name == pm_name)
if brand_name: trend_query = trend_query.filter(MonthlySummaryAnalysis.brand_name == brand_name)
if vendor_name: trend_query = trend_query.filter(MonthlySummaryAnalysis.vendor_name == vendor_name)
if area_name:
if ',' in area_name:
trend_query = trend_query.filter(MonthlySummaryAnalysis.area_name.in_(area_name.split(',')))
else:
trend_query = trend_query.filter(MonthlySummaryAnalysis.area_name == area_name)
if trade_type: trend_query = trend_query.filter(MonthlySummaryAnalysis.trade_type == trade_type)
# 取得排行榜 (Top 10 Brands)
rank_query = session.query(
MonthlySummaryAnalysis.brand_name,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales')
).group_by(MonthlySummaryAnalysis.brand_name)
if year: rank_query = rank_query.filter(MonthlySummaryAnalysis.year == year)
if month: rank_query = rank_query.filter(MonthlySummaryAnalysis.month == month)
if division: rank_query = rank_query.filter(MonthlySummaryAnalysis.division == division)
if pm_name: rank_query = rank_query.filter(MonthlySummaryAnalysis.pm_name == pm_name)
if brand_name: rank_query = rank_query.filter(MonthlySummaryAnalysis.brand_name == brand_name)
if vendor_name: rank_query = rank_query.filter(MonthlySummaryAnalysis.vendor_name == vendor_name)
if area_name:
if ',' in area_name:
rank_query = rank_query.filter(MonthlySummaryAnalysis.area_name.in_(area_name.split(',')))
else:
rank_query = rank_query.filter(MonthlySummaryAnalysis.area_name == area_name)
if trade_type: rank_query = rank_query.filter(MonthlySummaryAnalysis.trade_type == trade_type)
rank_query = rank_query.order_by(desc('sales')).limit(10)
# 取得明細資料
rows_query = query.order_by(
MonthlySummaryAnalysis.year.desc(),
MonthlySummaryAnalysis.month.desc(),
MonthlySummaryAnalysis.sales_amt_curr.desc()
).limit(limit)
# --- 📊 V-New: 進階分析子查詢 (Phase 17) ---
def apply_filters(q, ignore_year=False):
if year and not ignore_year: q = q.filter(MonthlySummaryAnalysis.year == year)
if month: q = q.filter(MonthlySummaryAnalysis.month == month)
if division: q = q.filter(MonthlySummaryAnalysis.division == division)
if pm_name: q = q.filter(MonthlySummaryAnalysis.pm_name == pm_name)
if brand_name: q = q.filter(MonthlySummaryAnalysis.brand_name == brand_name)
if vendor_name: q = q.filter(MonthlySummaryAnalysis.vendor_name == vendor_name)
if area_name:
if ',' in area_name:
q = q.filter(MonthlySummaryAnalysis.area_name.in_(area_name.split(',')))
else:
q = q.filter(MonthlySummaryAnalysis.area_name == area_name)
if trade_type: q = q.filter(MonthlySummaryAnalysis.trade_type == trade_type)
return q
# 廠商排行
# 廠商排行 (Top 20, 分年度)
# 廠商排行 (Top 20, 分年度)
vendor_rank_q = session.query(
MonthlySummaryAnalysis.vendor_name,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales'),
func.sum(case((MonthlySummaryAnalysis.year == 2024, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2024'),
func.sum(case((MonthlySummaryAnalysis.year == 2025, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2025'),
func.sum(MonthlySummaryAnalysis.profit_amt_curr).label('profit'),
func.sum(case((MonthlySummaryAnalysis.year == 2024, MonthlySummaryAnalysis.profit_amt_curr), else_=0)).label('profit_2024'),
func.sum(case((MonthlySummaryAnalysis.year == 2025, MonthlySummaryAnalysis.profit_amt_curr), else_=0)).label('profit_2025'),
).group_by(MonthlySummaryAnalysis.vendor_name)
vendor_rank_q = apply_filters(vendor_rank_q, ignore_year=True)
vendor_rank_q = vendor_rank_q.order_by(desc('sales')).limit(20)
# 分類分佈 (按 Division, Top 12, 分年度)
div_dist_q = session.query(
MonthlySummaryAnalysis.division,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales'),
func.sum(case((MonthlySummaryAnalysis.year == 2024, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2024'),
func.sum(case((MonthlySummaryAnalysis.year == 2025, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2025')
).group_by(MonthlySummaryAnalysis.division)
div_dist_q = apply_filters(div_dist_q, ignore_year=True)
div_dist_q = div_dist_q.order_by(desc('sales')).limit(12)
# 價格帶貢獻 (分年度)
price_cont_q = session.query(
case(
(MonthlySummaryAnalysis.unit_price < 500, '0-499'),
(MonthlySummaryAnalysis.unit_price < 1000, '500-999'),
(MonthlySummaryAnalysis.unit_price < 2000, '1,000-1,999'),
(MonthlySummaryAnalysis.unit_price < 5000, '2,000-4,999'),
(MonthlySummaryAnalysis.unit_price < 10000, '5,000-9,999'),
else_='10,000+'
).label('price_range'),
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales'),
func.sum(case((MonthlySummaryAnalysis.year == 2024, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2024'),
func.sum(case((MonthlySummaryAnalysis.year == 2025, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2025')
).group_by('price_range')
price_cont_q = apply_filters(price_cont_q, ignore_year=True)
# BCG 矩陣 (品牌 x 區域)
bcg_q = session.query(
MonthlySummaryAnalysis.brand_name,
MonthlySummaryAnalysis.area_name,
func.sum(MonthlySummaryAnalysis.sales_vol_curr).label('vol'),
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales'),
func.sum(MonthlySummaryAnalysis.profit_amt_curr).label('profit')
).group_by(MonthlySummaryAnalysis.brand_name, MonthlySummaryAnalysis.area_name)\
.having(func.sum(MonthlySummaryAnalysis.sales_amt_curr) > 0)
bcg_q = apply_filters(bcg_q)
bcg_q = bcg_q.order_by(desc('sales')).limit(100)
# 熱力圖 (月份 x 分類)
# 熱力圖 (月份 x 分類)
# 為了保持一致性,這裡我們應該只取 Top 12 的 Division
# 先取得 Top 12 Division 的名稱列表
top_12_divs = [r.division for r in div_dist_q.all()]
heatmap_q = session.query(
MonthlySummaryAnalysis.year,
MonthlySummaryAnalysis.month,
MonthlySummaryAnalysis.division,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales')
).filter(MonthlySummaryAnalysis.division.in_(top_12_divs))\
.group_by(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month, MonthlySummaryAnalysis.division)\
.order_by(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month)
heatmap_q = apply_filters(heatmap_q, ignore_year=True)
# Highlights (Top 3)
def get_highlights_q(metric_col):
q = session.query(MonthlySummaryAnalysis.brand_name, func.sum(metric_col).label('val'))
q = apply_filters(q)
q = q.group_by(MonthlySummaryAnalysis.brand_name).order_by(desc('val')).limit(3)
return q
rev_top_q = get_highlights_q(MonthlySummaryAnalysis.sales_amt_curr)
profit_top_q = get_highlights_q(MonthlySummaryAnalysis.profit_amt_curr)
vol_top_q = get_highlights_q(MonthlySummaryAnalysis.sales_vol_curr)
# 區域排行
area_rank_q = session.query(
MonthlySummaryAnalysis.area_name,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales'),
func.sum(case((MonthlySummaryAnalysis.year == 2024, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2024'),
func.sum(case((MonthlySummaryAnalysis.year == 2025, MonthlySummaryAnalysis.sales_amt_curr), else_=0)).label('sales_2025')
).group_by(MonthlySummaryAnalysis.area_name)
area_rank_q = apply_filters(area_rank_q, ignore_year=True)
area_rank_q = area_rank_q.order_by(desc('sales'))
# 年度對比趨勢 (需要包含本期與去年同期)
# 年度對比趨勢 (需要包含本期與去年同期)
yoy_trend_q = session.query(
MonthlySummaryAnalysis.year,
MonthlySummaryAnalysis.month,
func.sum(MonthlySummaryAnalysis.sales_amt_curr).label('sales_curr'),
func.sum(MonthlySummaryAnalysis.sales_amt_yoa).label('sales_yoa')
)
yoy_trend_q = apply_filters(yoy_trend_q)
yoy_trend_q = yoy_trend_q.group_by(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month).order_by(MonthlySummaryAnalysis.year, MonthlySummaryAnalysis.month)
rows = []
for r in rows_query.all():
rows.append({
'year': r.year,
'month': r.month,
'division': r.division,
'pm_name': r.pm_name,
'area_name': r.area_name,
'brand_name': r.brand_name,
'vendor_name': r.vendor_name,
'trade_type': r.trade_type,
'sales_amt_curr': r.sales_amt_curr,
'sales_amt_yoa': r.sales_amt_yoa,
'sales_vol_curr': r.sales_vol_curr,
'profit_amt_curr': r.profit_amt_curr,
'views_curr': r.views_curr
})
# 取得不重複的維度列表
years_list = [r[0] for r in session.query(MonthlySummaryAnalysis.year).distinct().all()]
months_list = [r[0] for r in session.query(MonthlySummaryAnalysis.month).distinct().all()]
divisions_list = [r[0] for r in session.query(MonthlySummaryAnalysis.division).distinct().all() if r[0]]
pms_list = [r[0] for r in session.query(MonthlySummaryAnalysis.pm_name).distinct().all() if r[0]]
areas_list = [r[0] for r in session.query(MonthlySummaryAnalysis.area_name).distinct().all() if r[0]]
vendors_list = [r[0] for r in session.query(MonthlySummaryAnalysis.vendor_name).distinct().all() if r[0]]
trades_list = [r[0] for r in session.query(MonthlySummaryAnalysis.trade_type).distinct().all() if r[0]]
# DEBUG LOGGING FOR RESULTS
debug_logger.info(f"🔍 [API Debug] Result Counts: Area={len(area_rank_q.all())}, Vendor={len(vendor_rank_q.all())}, Div={len(div_dist_q.all())}, Price={len(price_cont_q.all())}")
return jsonify({
'status': 'success',
'total_rows': total_rows,
'total_months': total_months,
'kpis': {
'sales': int(kpi_res.total_sales or 0),
'sales_prev': int(kpi_res.total_sales_prev or 0),
'sales_yoa': int(kpi_res.total_sales_yoa or 0),
'profit': int(kpi_res.total_profit or 0),
'vol': int(kpi_res.total_vol or 0),
'views': int(kpi_res.total_views or 0),
'margin': round((kpi_res.total_profit / kpi_res.total_sales * 100), 2) if kpi_res.total_sales and kpi_res.total_profit else 0
},
'trend': [{'date': f"{r.year}/{r.month}", 'sales': int(r.sales or 0)} for r in trend_query.all()],
'yoy_trend': [{'date': f"{r.year}/{r.month}", 'curr': int(r.sales_curr or 0), 'yoa': int(r.sales_yoa or 0)} for r in yoy_trend_q.all()],
'rankings': [{'brand': r.brand_name, 'sales': int(r.sales or 0)} for r in rank_query.all()],
'rankings': [{'brand': r.brand_name, 'sales': int(r.sales or 0)} for r in rank_query.all()],
'area_ranking': [
{
'name': r.area_name,
'sales': int(r.sales or 0),
'sales_2024': int(r.sales_2024 or 0),
'sales_2025': int(r.sales_2025 or 0)
}
for r in area_rank_q.all()
],
'vendor_ranking': [
{
'name': r.vendor_name,
'sales': int(r.sales or 0),
'sales_2024': int(r.sales_2024 or 0),
'sales_2025': int(r.sales_2025 or 0),
'profit': int(r.profit or 0),
'profit_2024': int(r.profit_2024 or 0),
'profit_2025': int(r.profit_2025 or 0),
'margin': round((r.profit/r.sales*100), 2) if r.sales and r.profit else 0
}
for r in vendor_rank_q.all()
],
'division_dist': [
{
'name': r.division,
'value': int(r.sales or 0),
'sales_2024': int(r.sales_2024 or 0),
'sales_2025': int(r.sales_2025 or 0)
}
for r in div_dist_q.all()
],
'price_contribution': [
{
'range': r.price_range,
'sales': int(r.sales or 0),
'sales_2024': int(r.sales_2024 or 0),
'sales_2025': int(r.sales_2025 or 0)
}
for r in price_cont_q.all()
],
'bcg_data': [
{'name': f"{r.brand_name}-{r.area_name}", 'qty': int(r.vol or 0), 'margin': round((r.profit/r.sales*100), 2) if r.sales and r.profit else 0, 'sales': int(r.sales or 0)}
for r in bcg_q.all()
],
'heatmap_data': [
{'month': f"{r.year}-{r.month:02d}", 'category': r.division, 'sales': int(r.sales or 0)}
for r in heatmap_q.all()
],
'highlights': {
'rev_top': [{'name': r.brand_name, 'value': int(r.val or 0)} for r in rev_top_q.all()],
'profit_top': [{'name': r.brand_name, 'value': int(r.val or 0)} for r in profit_top_q.all()],
'vol_top': [{'name': r.brand_name, 'value': int(r.val or 0)} for r in vol_top_q.all()]
},
'filters': {
'years': sorted(years_list, reverse=True),
'months': sorted(months_list),
'divisions': sorted(divisions_list),
'pms': sorted(pms_list),
'areas': sorted(areas_list),
'vendors': sorted(vendors_list),
'trades': sorted(trades_list)
},
'rows': rows
})
except Exception as e:
sys_log.error(f"取得月份總表數據失敗: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
finally:
session.close()
# ================= 📊 V-New: 業績分析報表 =================