3291 lines
151 KiB
Python
3291 lines
151 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
業績分析路由模組
|
||
包含:業績分析儀表板、成長分析、各種 API 路由
|
||
|
||
注意:此模組非常複雜,包含大量的數據處理邏輯
|
||
為避免循環依賴,部分函數使用延遲導入
|
||
"""
|
||
|
||
import hashlib
|
||
import io
|
||
import math
|
||
import os
|
||
import pickle
|
||
import time
|
||
import traceback
|
||
from datetime import datetime, timezone, timedelta
|
||
from flask import Blueprint, request, render_template, jsonify, send_file, redirect, url_for
|
||
from auth import login_required
|
||
from sqlalchemy import inspect, text
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
from config import BASE_DIR, DATABASE_TYPE, SYSTEM_VERSION
|
||
from database.manager import DatabaseManager
|
||
from services.logger_manager import SystemLogger
|
||
from services.daily_sales_service import prepare_marketing_summary
|
||
from services.cache_manager import (
|
||
_SALES_PROCESSED_CACHE,
|
||
_SALES_OPTIONS_CACHE,
|
||
_SALES_ANALYSIS_RESULT_CACHE,
|
||
_SALES_ANALYSIS_PAGE_CACHE_DIR,
|
||
set_sales_processed_cache,
|
||
)
|
||
from utils.text_helpers import get_color_for_string
|
||
|
||
# 時區設定
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# Logger
|
||
sys_log = SystemLogger("SalesRoutes").get_logger()
|
||
|
||
# Blueprint 定義
|
||
sales_bp = Blueprint('sales', __name__)
|
||
|
||
_TABLE_DATA_CACHE = {}
|
||
_TABLE_DATA_CACHE_TTL = 60
|
||
_SALES_PREVIEW_CACHE_TTL = 600
|
||
_SALES_OPTIONS_CACHE_TTL = 1800
|
||
_SALES_PAGE_CONTEXT_CACHE_TTL = 180
|
||
_SALES_PAGE_CONTEXT_CACHE_MAX = 24
|
||
_SALES_SHARED_PAGE_CONTEXT_CACHE_TTL = 1800
|
||
|
||
|
||
# ==========================================
|
||
# 輔助函數
|
||
# ==========================================
|
||
|
||
# 共用工具改 import 自 utils(去重,原各 routes 各自定義已移除)
|
||
from utils.df_helpers import find_col # noqa: E402, F401
|
||
from utils.security import validate_table_name, safe_read_sql # noqa: E402, F401
|
||
|
||
|
||
def _get_timed_cache(store, key, ttl_seconds):
|
||
entry = store.get(key)
|
||
if not entry:
|
||
return None
|
||
if time.time() - entry.get('time', 0) >= ttl_seconds:
|
||
store.pop(key, None)
|
||
return None
|
||
return entry.get('data')
|
||
|
||
|
||
def _set_timed_cache(store, key, data, max_entries=64):
|
||
if len(store) >= max_entries:
|
||
oldest_key = min(store, key=lambda k: store[k].get('time', 0))
|
||
store.pop(oldest_key, None)
|
||
store[key] = {'data': data, 'time': time.time()}
|
||
|
||
|
||
def _sales_analysis_args_fingerprint(table_name, cache_key, processed_time=None):
|
||
args = tuple(
|
||
(key, tuple(values))
|
||
for key, values in sorted(request.args.lists())
|
||
)
|
||
raw = repr((table_name, cache_key, processed_time or 0, args)).encode('utf-8')
|
||
return hashlib.md5(raw).hexdigest()
|
||
|
||
|
||
def _get_sales_page_context_cache(cache_key):
|
||
return _get_timed_cache(
|
||
_SALES_ANALYSIS_RESULT_CACHE,
|
||
cache_key,
|
||
_SALES_PAGE_CONTEXT_CACHE_TTL,
|
||
)
|
||
|
||
|
||
def _set_sales_page_context_cache(cache_key, context):
|
||
_set_timed_cache(
|
||
_SALES_ANALYSIS_RESULT_CACHE,
|
||
cache_key,
|
||
context,
|
||
max_entries=_SALES_PAGE_CONTEXT_CACHE_MAX,
|
||
)
|
||
|
||
|
||
def _sales_page_context_cache_file(cache_key):
|
||
return _SALES_ANALYSIS_PAGE_CACHE_DIR / f"{cache_key}.pkl"
|
||
|
||
|
||
def _get_sales_shared_page_context_cache(cache_key):
|
||
path = _sales_page_context_cache_file(cache_key)
|
||
if not path.exists():
|
||
return None
|
||
try:
|
||
if time.time() - path.stat().st_mtime >= _SALES_SHARED_PAGE_CONTEXT_CACHE_TTL:
|
||
path.unlink(missing_ok=True)
|
||
return None
|
||
with open(path, 'rb') as f:
|
||
payload = pickle.load(f)
|
||
if payload.get('version') != SYSTEM_VERSION:
|
||
return None
|
||
return payload.get('context')
|
||
except Exception as exc:
|
||
sys_log.warning(f"[Sales Analysis] 共享頁面快取讀取失敗: {exc}")
|
||
return None
|
||
|
||
|
||
def _set_sales_shared_page_context_cache(cache_key, context):
|
||
path = _sales_page_context_cache_file(cache_key)
|
||
tmp_path = path.with_suffix(f".{os.getpid()}.tmp")
|
||
try:
|
||
os.makedirs(path.parent, exist_ok=True)
|
||
with open(tmp_path, 'wb') as f:
|
||
pickle.dump({'version': SYSTEM_VERSION, 'context': context}, f, protocol=pickle.HIGHEST_PROTOCOL)
|
||
os.replace(tmp_path, path)
|
||
except Exception as exc:
|
||
sys_log.warning(f"[Sales Analysis] 共享頁面快取寫入失敗: {exc}")
|
||
try:
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _format_sales_data_range(min_date, max_date):
|
||
if not min_date or not max_date:
|
||
return ''
|
||
try:
|
||
if isinstance(min_date, str):
|
||
min_date_obj = datetime.strptime(min_date.split()[0], '%Y-%m-%d')
|
||
max_date_obj = datetime.strptime(max_date.split()[0], '%Y-%m-%d')
|
||
else:
|
||
min_date_obj = min_date
|
||
max_date_obj = max_date
|
||
return f"{min_date_obj.year}年{min_date_obj.month}月 ~ {max_date_obj.year}年{max_date_obj.month}月"
|
||
except Exception:
|
||
return f"{min_date} ~ {max_date}"
|
||
|
||
|
||
def _fetch_sales_data_range(engine, table_name):
|
||
validate_table_name(table_name)
|
||
cache_key = f"sales_analysis:data_range:{table_name}"
|
||
cached = _get_timed_cache(_SALES_OPTIONS_CACHE, cache_key, _SALES_PREVIEW_CACHE_TTL)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
try:
|
||
date_query = text(f'SELECT MIN("日期") as min_date, MAX("日期") as max_date FROM "{table_name}"')
|
||
with engine.connect() as conn:
|
||
result = conn.execute(date_query).fetchone()
|
||
db_data_range = _format_sales_data_range(result[0], result[1]) if result else ''
|
||
except Exception as e:
|
||
sys_log.warning(f"[Sales Analysis] 無法取得資料期間範圍: {e}")
|
||
db_data_range = ''
|
||
|
||
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, db_data_range)
|
||
return db_data_range
|
||
|
||
|
||
def _preview_sales_filter_options(engine, table_name):
|
||
"""首次進入業績分析時的輕量下拉資料,避免每次載入都掃預覽資料。"""
|
||
validate_table_name(table_name)
|
||
cache_key = f"sales_analysis:preview_options:{table_name}"
|
||
cached = _get_timed_cache(_SALES_OPTIONS_CACHE, cache_key, _SALES_PREVIEW_CACHE_TTL)
|
||
if cached:
|
||
return cached
|
||
|
||
options = {
|
||
'categories': [],
|
||
'brands': [],
|
||
'vendors': [],
|
||
'activities': [],
|
||
'payments': [],
|
||
'months': [],
|
||
}
|
||
preview_df = pd.read_sql(f'SELECT * FROM "{table_name}" LIMIT 1000', engine)
|
||
if preview_df.empty:
|
||
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, options)
|
||
return options
|
||
|
||
cols = preview_df.columns.tolist()
|
||
|
||
def find_preview_col(keywords):
|
||
for k in keywords:
|
||
for col in cols:
|
||
if k in str(col):
|
||
return col
|
||
return None
|
||
|
||
col_category = find_preview_col(['館別', '商品館', '分類', 'Category'])
|
||
col_brand = find_preview_col(['品牌', 'Brand'])
|
||
col_vendor = find_preview_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor', 'Supplier'])
|
||
col_activity = find_preview_col(['折扣活動名稱', '折價券活動名稱', '滿額再折扣活動名稱', '活動', 'Activity', 'Campaign'])
|
||
col_payment = find_preview_col(['付款', 'Payment', 'Pay'])
|
||
col_date_part = find_preview_col(['日期', '交易日期', 'Date', 'Day'])
|
||
|
||
def clean_values(col):
|
||
if not col:
|
||
return []
|
||
return sorted([
|
||
x for x in preview_df[col].dropna().astype(str).unique().tolist()
|
||
if x and x.strip()
|
||
])
|
||
|
||
options['categories'] = clean_values(col_category)
|
||
options['brands'] = clean_values(col_brand)
|
||
options['vendors'] = clean_values(col_vendor)
|
||
options['activities'] = clean_values(col_activity)
|
||
options['payments'] = clean_values(col_payment)
|
||
|
||
if col_date_part:
|
||
try:
|
||
with engine.connect() as conn:
|
||
result = conn.execute(text(f"""
|
||
SELECT DISTINCT replace(substr("{col_date_part}", 1, 7), '/', '-') as month
|
||
FROM "{table_name}"
|
||
WHERE "{col_date_part}" IS NOT NULL AND "{col_date_part}" != ''
|
||
ORDER BY month
|
||
""")).fetchall()
|
||
options['months'] = [row[0] for row in result if row[0] and '-' in str(row[0])]
|
||
sys_log.info(f"[Sales Analysis] 預覽模式從「{col_date_part}」欄位提取到 {len(options['months'])} 個月份")
|
||
except Exception as e:
|
||
sys_log.warning(f"[Sales Analysis] 無法從「{col_date_part}」欄位提取月份: {e}")
|
||
|
||
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, options)
|
||
return options
|
||
|
||
|
||
def _get_sales_filter_options(engine, table_name, cols_map):
|
||
"""完整篩選選項共用快取;資料匯入後 clear_sales_cache() 會一起清掉。"""
|
||
validate_table_name(table_name)
|
||
col_category = cols_map.get('category')
|
||
col_brand = cols_map.get('brand')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_activity = cols_map.get('activity')
|
||
col_payment = cols_map.get('payment')
|
||
col_date = cols_map.get('date')
|
||
option_cols = (col_category, col_brand, col_vendor, col_activity, col_payment, col_date)
|
||
digest = hashlib.md5(repr(option_cols).encode('utf-8')).hexdigest()
|
||
cache_key = f"sales_analysis:filter_options:{table_name}:{digest}"
|
||
cached = _get_timed_cache(_SALES_OPTIONS_CACHE, cache_key, _SALES_OPTIONS_CACHE_TTL)
|
||
if cached:
|
||
return cached
|
||
|
||
options = {
|
||
'categories': [],
|
||
'brands': [],
|
||
'vendors': [],
|
||
'activities': [],
|
||
'payments': [],
|
||
'months': [],
|
||
}
|
||
|
||
def read_distinct(conn, col):
|
||
if not col:
|
||
return []
|
||
sql = f'SELECT DISTINCT "{col}" FROM "{table_name}" WHERE "{col}" IS NOT NULL AND "{col}" <> \'\' ORDER BY "{col}"'
|
||
result = conn.execute(text(sql)).fetchall()
|
||
return [str(row[0]) for row in result if row[0]]
|
||
|
||
with engine.connect() as conn:
|
||
options['categories'] = read_distinct(conn, col_category)
|
||
options['brands'] = read_distinct(conn, col_brand)
|
||
options['vendors'] = read_distinct(conn, col_vendor)
|
||
options['activities'] = read_distinct(conn, col_activity)
|
||
options['payments'] = read_distinct(conn, col_payment)
|
||
|
||
if col_date:
|
||
date_fields = ['日期', '訂單日期', '時間']
|
||
for field in date_fields:
|
||
try:
|
||
result = conn.execute(text(f"""
|
||
SELECT DISTINCT replace(substr("{field}", 1, 7), '/', '-') as month
|
||
FROM "{table_name}"
|
||
WHERE "{field}" IS NOT NULL AND "{field}" != ''
|
||
ORDER BY month
|
||
""")).fetchall()
|
||
months = [row[0] for row in result if row[0] and '-' in str(row[0])]
|
||
if months:
|
||
options['months'] = months
|
||
sys_log.info(f"[Sales Analysis] 從欄位 {field} 提取到 {len(months)} 個月份: {months}")
|
||
break
|
||
except Exception as ex:
|
||
sys_log.warning(f"[Sales Analysis] 從欄位 {field} 提取月份失敗: {ex}")
|
||
|
||
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, options)
|
||
return options
|
||
|
||
|
||
def _growth_empty_payload(now_taipei=None):
|
||
now_taipei = now_taipei or datetime.now(TAIPEI_TZ)
|
||
return (
|
||
{
|
||
'labels': [],
|
||
'revenue': [],
|
||
'profit': [],
|
||
'orders': [],
|
||
'aov': [],
|
||
'mom': [],
|
||
'yoy': [],
|
||
'margin_rate': []
|
||
},
|
||
{
|
||
'ytd_revenue': 0,
|
||
'ytd_growth': 0,
|
||
'current_year': now_taipei.year,
|
||
'recent_aov': 0,
|
||
'total_orders': 0
|
||
}
|
||
)
|
||
|
||
|
||
def _growth_number(value):
|
||
try:
|
||
number = float(value or 0)
|
||
except (TypeError, ValueError):
|
||
return 0.0
|
||
return number if math.isfinite(number) else 0.0
|
||
|
||
|
||
def _growth_month_iter(start_month, end_month):
|
||
year = start_month.year
|
||
month = start_month.month
|
||
while (year, month) <= (end_month.year, end_month.month):
|
||
yield datetime(year, month, 1).date()
|
||
month += 1
|
||
if month > 12:
|
||
year += 1
|
||
month = 1
|
||
|
||
|
||
def _build_growth_chart_data(monthly_rows):
|
||
monthly_values = {}
|
||
for row in monthly_rows:
|
||
month_raw = row.get('month_start')
|
||
month_dt = pd.to_datetime(month_raw, errors='coerce')
|
||
if pd.isna(month_dt):
|
||
continue
|
||
month_key = month_dt.date().replace(day=1)
|
||
monthly_values[month_key] = {
|
||
'amount': _growth_number(row.get('amount')),
|
||
'profit': _growth_number(row.get('profit')),
|
||
'orders': int(_growth_number(row.get('orders') or row.get('volume'))),
|
||
}
|
||
|
||
if not monthly_values:
|
||
return None
|
||
|
||
months = list(_growth_month_iter(min(monthly_values), max(monthly_values)))
|
||
revenue = []
|
||
profit = []
|
||
orders = []
|
||
aov = []
|
||
margin_rate = []
|
||
mom = []
|
||
yoy = []
|
||
|
||
for index, month_key in enumerate(months):
|
||
item = monthly_values.get(month_key, {'amount': 0, 'profit': 0, 'orders': 0})
|
||
amount = item['amount']
|
||
profit_value = item['profit']
|
||
order_count = item['orders']
|
||
|
||
revenue.append(amount)
|
||
profit.append(profit_value)
|
||
orders.append(order_count)
|
||
aov.append(round(amount / order_count, 0) if order_count > 0 else 0)
|
||
margin_rate.append(round((profit_value / amount) * 100, 1) if amount > 0 else 0)
|
||
|
||
prev_amount = revenue[index - 1] if index >= 1 else 0
|
||
last_year_amount = revenue[index - 12] if index >= 12 else 0
|
||
mom.append(round(((amount - prev_amount) / prev_amount) * 100, 2) if prev_amount > 0 else 0)
|
||
yoy.append(round(((amount - last_year_amount) / last_year_amount) * 100, 2) if last_year_amount > 0 else 0)
|
||
|
||
return {
|
||
'labels': [month.strftime('%Y-%m') for month in months],
|
||
'revenue': revenue,
|
||
'profit': profit,
|
||
'orders': orders,
|
||
'aov': aov,
|
||
'mom': mom,
|
||
'yoy': yoy,
|
||
'margin_rate': margin_rate
|
||
}
|
||
|
||
|
||
def _attach_growth_competitor_intel(engine, chart_data):
|
||
"""把 MOMO 低價壓力對齊 growth_analysis 月份序列。"""
|
||
if not chart_data:
|
||
return chart_data
|
||
enriched = dict(chart_data)
|
||
try:
|
||
from services.competitor_intel_repository import (
|
||
fetch_competitor_coverage,
|
||
fetch_competitor_monthly_pressure,
|
||
)
|
||
pressure = fetch_competitor_monthly_pressure(engine, months=max(len(enriched.get('labels') or []), 12))
|
||
by_month = {
|
||
label: {
|
||
'avg_gap_pct': gap,
|
||
'risk_count': risk,
|
||
'match_count': match_count,
|
||
}
|
||
for label, gap, risk, match_count in zip(
|
||
pressure.get('labels', []),
|
||
pressure.get('avg_gap_pct', []),
|
||
pressure.get('risk_count', []),
|
||
pressure.get('match_count', []),
|
||
)
|
||
}
|
||
labels = enriched.get('labels') or []
|
||
enriched['competitor_gap_pct'] = [by_month.get(label, {}).get('avg_gap_pct', 0) for label in labels]
|
||
enriched['competitor_risk_count'] = [by_month.get(label, {}).get('risk_count', 0) for label in labels]
|
||
enriched['competitor_match_count'] = [by_month.get(label, {}).get('match_count', 0) for label in labels]
|
||
enriched['competitor_coverage'] = fetch_competitor_coverage(engine)
|
||
except Exception as exc:
|
||
sys_log.warning(f"[GrowthAnalysis] PChome 競價情報串接失敗,略過: {exc}")
|
||
enriched['competitor_gap_pct'] = [0 for _ in enriched.get('labels', [])]
|
||
enriched['competitor_risk_count'] = [0 for _ in enriched.get('labels', [])]
|
||
enriched['competitor_match_count'] = [0 for _ in enriched.get('labels', [])]
|
||
enriched['competitor_coverage'] = {}
|
||
return enriched
|
||
|
||
|
||
def _build_growth_kpi(kpi_row):
|
||
if not kpi_row:
|
||
return None
|
||
|
||
max_dt = pd.to_datetime(kpi_row.get('max_date'), errors='coerce')
|
||
current_year = int(kpi_row.get('current_year') or (max_dt.year if not pd.isna(max_dt) else datetime.now(TAIPEI_TZ).year))
|
||
ytd_revenue = _growth_number(kpi_row.get('ytd_revenue'))
|
||
last_ytd_revenue = _growth_number(kpi_row.get('last_ytd_revenue'))
|
||
recent_revenue = _growth_number(kpi_row.get('recent_revenue'))
|
||
recent_orders = int(_growth_number(kpi_row.get('recent_orders')))
|
||
|
||
return {
|
||
'ytd_revenue': ytd_revenue,
|
||
'ytd_growth': ((ytd_revenue - last_ytd_revenue) / last_ytd_revenue) * 100 if last_ytd_revenue > 0 else 0,
|
||
'current_year': current_year,
|
||
'recent_aov': recent_revenue / recent_orders if recent_orders > 0 else 0,
|
||
'total_orders': int(_growth_number(kpi_row.get('total_orders')))
|
||
}
|
||
|
||
|
||
def _growth_pg_number_expr(column_name):
|
||
return f'COALESCE(NULLIF(regexp_replace("{column_name}"::text, \'[^0-9.-]\', \'\', \'g\'), \'\')::numeric, 0)'
|
||
|
||
|
||
def _fetch_growth_latest_detail_month(engine, table_name='realtime_sales_monthly'):
|
||
"""回傳業績明細表最新月份;摘要表落後時需改走明細聚合。"""
|
||
try:
|
||
if not inspect(engine).has_table(table_name):
|
||
return None
|
||
table_name = validate_table_name(table_name)
|
||
table_ref = f'"{table_name}"'
|
||
if engine.dialect.name == 'postgresql':
|
||
latest_sql = text(f"""
|
||
SELECT date_trunc('month', MAX("日期"::date))::date AS latest_month
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
""")
|
||
else:
|
||
latest_sql = text(f"""
|
||
SELECT date(MAX("日期"), 'start of month') AS latest_month
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
""")
|
||
with engine.connect() as conn:
|
||
latest = conn.execute(latest_sql).scalar()
|
||
latest_dt = pd.to_datetime(latest, errors='coerce')
|
||
if pd.isna(latest_dt):
|
||
return None
|
||
return latest_dt.date().replace(day=1)
|
||
except Exception as exc:
|
||
sys_log.warning(f"[GrowthAnalysis] 無法取得明細最新月份,摘要新鮮度檢查略過: {exc}")
|
||
return None
|
||
|
||
|
||
def _get_growth_source_fingerprint(engine, table_name='realtime_sales_monthly'):
|
||
"""成長分析快取指紋;匯入新資料後同一 worker 下一次 request 會重算。"""
|
||
try:
|
||
if not inspect(engine).has_table(table_name):
|
||
return None
|
||
table_name = validate_table_name(table_name)
|
||
if engine.dialect.name == 'postgresql':
|
||
from services.cache_service import (
|
||
get_growth_source_fingerprint_cache,
|
||
set_growth_source_fingerprint_cache,
|
||
)
|
||
cache_key = f"{engine.url}|{table_name}"
|
||
cached_fingerprint = get_growth_source_fingerprint_cache(cache_key)
|
||
if cached_fingerprint is not None:
|
||
return cached_fingerprint
|
||
else:
|
||
cache_key = None
|
||
|
||
table_ref = f'"{table_name}"'
|
||
if engine.dialect.name == 'postgresql':
|
||
fingerprint_sql = text(f"""
|
||
SELECT MAX("日期"::date)::text, COUNT(*)
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
""")
|
||
else:
|
||
fingerprint_sql = text(f"""
|
||
SELECT CAST(MAX("日期") AS TEXT), COUNT(*)
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
""")
|
||
with engine.connect() as conn:
|
||
row = conn.execute(fingerprint_sql).fetchone()
|
||
fingerprint = (row[0], row[1]) if row else None
|
||
if cache_key is not None:
|
||
set_growth_source_fingerprint_cache(cache_key, fingerprint)
|
||
return fingerprint
|
||
except Exception as exc:
|
||
sys_log.warning(f"[GrowthAnalysis] 快取指紋查詢失敗,保守沿用 TTL: {exc}")
|
||
return None
|
||
|
||
|
||
def _fetch_growth_payload_summary(engine):
|
||
"""優先使用月結摘要表,避免成長頁冷 worker 掃 70+ 萬列原始明細。"""
|
||
table_name = 'monthly_summary_analysis'
|
||
if not inspect(engine).has_table(table_name):
|
||
return None
|
||
|
||
table_ref = f'"{table_name}"'
|
||
if engine.dialect.name == 'postgresql':
|
||
summary_sql = text(f"""
|
||
SELECT
|
||
make_date("year"::int, "month"::int, 1) AS month_start,
|
||
SUM(COALESCE(sales_amt_curr, 0)) AS amount,
|
||
SUM(COALESCE(profit_amt_curr, 0)) AS profit,
|
||
SUM(COALESCE(sales_vol_curr, 0)) AS volume
|
||
FROM {table_ref}
|
||
WHERE "year" IS NOT NULL
|
||
AND "month" IS NOT NULL
|
||
GROUP BY 1
|
||
ORDER BY 1
|
||
""")
|
||
else:
|
||
summary_sql = text(f"""
|
||
SELECT
|
||
printf('%04d-%02d-01', "year", "month") AS month_start,
|
||
SUM(COALESCE(sales_amt_curr, 0)) AS amount,
|
||
SUM(COALESCE(profit_amt_curr, 0)) AS profit,
|
||
SUM(COALESCE(sales_vol_curr, 0)) AS volume
|
||
FROM {table_ref}
|
||
WHERE "year" IS NOT NULL
|
||
AND "month" IS NOT NULL
|
||
GROUP BY 1
|
||
ORDER BY 1
|
||
""")
|
||
|
||
with engine.connect() as conn:
|
||
rows = conn.execute(summary_sql).mappings().all()
|
||
|
||
chart_data = _build_growth_chart_data(rows)
|
||
if not chart_data or not chart_data['labels']:
|
||
return None
|
||
|
||
latest_label = chart_data['labels'][-1]
|
||
summary_latest_month = pd.to_datetime(f"{latest_label}-01", errors='coerce')
|
||
detail_latest_month = _fetch_growth_latest_detail_month(engine)
|
||
if detail_latest_month and not pd.isna(summary_latest_month):
|
||
if summary_latest_month.date().replace(day=1) < detail_latest_month:
|
||
sys_log.info(
|
||
"[GrowthAnalysis] 月結摘要落後明細資料,改走 realtime_sales_monthly 聚合 | "
|
||
f"summary={latest_label}, detail={detail_latest_month.strftime('%Y-%m')}"
|
||
)
|
||
return None
|
||
|
||
current_year = int(latest_label[:4])
|
||
latest_month = int(latest_label[5:7])
|
||
last_year = current_year - 1
|
||
|
||
ytd_revenue = 0.0
|
||
last_ytd_revenue = 0.0
|
||
for label, revenue in zip(chart_data['labels'], chart_data['revenue']):
|
||
year = int(label[:4])
|
||
month = int(label[5:7])
|
||
if year == current_year and month <= latest_month:
|
||
ytd_revenue += _growth_number(revenue)
|
||
elif year == last_year and month <= latest_month:
|
||
last_ytd_revenue += _growth_number(revenue)
|
||
|
||
latest_revenue = _growth_number(chart_data['revenue'][-1])
|
||
latest_volume = int(_growth_number(chart_data['orders'][-1]))
|
||
kpi = {
|
||
'ytd_revenue': ytd_revenue,
|
||
'ytd_growth': ((ytd_revenue - last_ytd_revenue) / last_ytd_revenue) * 100 if last_ytd_revenue > 0 else 0,
|
||
'current_year': current_year,
|
||
'recent_aov': latest_revenue / latest_volume if latest_volume > 0 else 0,
|
||
'total_orders': int(sum(_growth_number(value) for value in chart_data['orders']))
|
||
}
|
||
|
||
return chart_data, kpi
|
||
|
||
|
||
def _fetch_growth_payload_sql(engine, table_name):
|
||
"""用 DB 聚合取代全表 pandas 載入,降低成長分析冷啟動 TTFB。"""
|
||
table_name = validate_table_name(table_name)
|
||
table_ref = f'"{table_name}"'
|
||
|
||
if engine.dialect.name == 'postgresql':
|
||
date_expr = '"日期"::date'
|
||
amount_expr = _growth_pg_number_expr('總業績')
|
||
cost_expr = _growth_pg_number_expr('總成本')
|
||
volume_expr = _growth_pg_number_expr('數量')
|
||
monthly_sql = text(f"""
|
||
SELECT
|
||
date_trunc('month', {date_expr})::date AS month_start,
|
||
SUM({amount_expr}) AS amount,
|
||
SUM({amount_expr} - {cost_expr}) AS profit,
|
||
SUM({volume_expr}) AS orders
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
GROUP BY 1
|
||
ORDER BY 1
|
||
""")
|
||
kpi_sql = text(f"""
|
||
WITH bounds AS (
|
||
SELECT MAX({date_expr}) AS max_date
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
)
|
||
SELECT
|
||
EXTRACT(YEAR FROM b.max_date)::int AS current_year,
|
||
b.max_date::text AS max_date,
|
||
COALESCE(SUM(CASE
|
||
WHEN {date_expr} >= date_trunc('year', b.max_date)::date
|
||
AND {date_expr} <= b.max_date
|
||
THEN {amount_expr} ELSE 0 END), 0) AS ytd_revenue,
|
||
COALESCE(SUM(CASE
|
||
WHEN {date_expr} >= (date_trunc('year', b.max_date) - interval '1 year')::date
|
||
AND {date_expr} <= (b.max_date - interval '1 year')::date
|
||
THEN {amount_expr} ELSE 0 END), 0) AS last_ytd_revenue,
|
||
COALESCE(SUM(CASE
|
||
WHEN {date_expr} >= (b.max_date - interval '30 days')::date
|
||
AND {date_expr} <= b.max_date
|
||
THEN {amount_expr} ELSE 0 END), 0) AS recent_revenue,
|
||
COALESCE(SUM(CASE
|
||
WHEN {date_expr} >= (b.max_date - interval '30 days')::date
|
||
AND {date_expr} <= b.max_date
|
||
THEN {volume_expr} ELSE 0 END), 0) AS recent_orders,
|
||
COALESCE(SUM({volume_expr}), 0) AS total_orders
|
||
FROM {table_ref}
|
||
CROSS JOIN bounds b
|
||
WHERE "日期" IS NOT NULL
|
||
GROUP BY b.max_date
|
||
""")
|
||
else:
|
||
monthly_sql = text(f"""
|
||
SELECT
|
||
date("日期", 'start of month') AS month_start,
|
||
SUM(COALESCE(CAST("總業績" AS REAL), 0)) AS amount,
|
||
SUM(COALESCE(CAST("總業績" AS REAL), 0) - COALESCE(CAST("總成本" AS REAL), 0)) AS profit,
|
||
SUM(COALESCE(CAST("數量" AS REAL), 0)) AS orders
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
GROUP BY 1
|
||
ORDER BY 1
|
||
""")
|
||
kpi_sql = text(f"""
|
||
WITH bounds AS (
|
||
SELECT date(MAX("日期")) AS max_date
|
||
FROM {table_ref}
|
||
WHERE "日期" IS NOT NULL
|
||
)
|
||
SELECT
|
||
CAST(strftime('%Y', b.max_date) AS INTEGER) AS current_year,
|
||
b.max_date AS max_date,
|
||
COALESCE(SUM(CASE
|
||
WHEN date("日期") >= date(b.max_date, 'start of year')
|
||
AND date("日期") <= b.max_date
|
||
THEN COALESCE(CAST("總業績" AS REAL), 0) ELSE 0 END), 0) AS ytd_revenue,
|
||
COALESCE(SUM(CASE
|
||
WHEN date("日期") >= date(b.max_date, 'start of year', '-1 year')
|
||
AND date("日期") <= date(b.max_date, '-1 year')
|
||
THEN COALESCE(CAST("總業績" AS REAL), 0) ELSE 0 END), 0) AS last_ytd_revenue,
|
||
COALESCE(SUM(CASE
|
||
WHEN date("日期") >= date(b.max_date, '-30 days')
|
||
AND date("日期") <= b.max_date
|
||
THEN COALESCE(CAST("總業績" AS REAL), 0) ELSE 0 END), 0) AS recent_revenue,
|
||
COALESCE(SUM(CASE
|
||
WHEN date("日期") >= date(b.max_date, '-30 days')
|
||
AND date("日期") <= b.max_date
|
||
THEN COALESCE(CAST("數量" AS REAL), 0) ELSE 0 END), 0) AS recent_orders,
|
||
COALESCE(SUM(COALESCE(CAST("數量" AS REAL), 0)), 0) AS total_orders
|
||
FROM {table_ref}
|
||
CROSS JOIN bounds b
|
||
WHERE "日期" IS NOT NULL
|
||
GROUP BY b.max_date
|
||
""")
|
||
|
||
with engine.connect() as conn:
|
||
monthly_rows = conn.execute(monthly_sql).mappings().all()
|
||
kpi_row = conn.execute(kpi_sql).mappings().first()
|
||
|
||
chart_data = _build_growth_chart_data(monthly_rows)
|
||
kpi = _build_growth_kpi(kpi_row)
|
||
if not chart_data or not kpi:
|
||
return None
|
||
return chart_data, kpi
|
||
|
||
|
||
def _fetch_growth_payload_pandas(engine, table_name):
|
||
req_cols = ['日期', '總業績', '訂單編號', '總成本', '數量']
|
||
df = safe_read_sql(table_name, columns=req_cols, engine=engine)
|
||
|
||
if df.empty:
|
||
return None
|
||
|
||
df['dt'] = pd.to_datetime(df['日期'], errors='coerce')
|
||
df = df.dropna(subset=['dt'])
|
||
if df.empty:
|
||
return None
|
||
|
||
df['amount'] = pd.to_numeric(df['總業績'], errors='coerce').fillna(0)
|
||
df['cost'] = pd.to_numeric(df['總成本'], errors='coerce').fillna(0)
|
||
df['volume'] = pd.to_numeric(df['數量'], errors='coerce').fillna(0)
|
||
df['profit'] = df['amount'] - df['cost']
|
||
|
||
monthly_stats = df.set_index('dt').resample('MS').agg({
|
||
'amount': 'sum',
|
||
'profit': 'sum',
|
||
'volume': 'sum'
|
||
}).rename(columns={'volume': 'orders'})
|
||
|
||
monthly_rows = [
|
||
{
|
||
'month_start': month_start,
|
||
'amount': row['amount'],
|
||
'profit': row['profit'],
|
||
'orders': row['orders'],
|
||
}
|
||
for month_start, row in monthly_stats.iterrows()
|
||
]
|
||
chart_data = _build_growth_chart_data(monthly_rows)
|
||
|
||
current_year = df['dt'].max().year
|
||
last_year = current_year - 1
|
||
ytd_mask = df['dt'].dt.year == current_year
|
||
last_ytd_mask = (df['dt'].dt.year == last_year) & (df['dt'].dt.dayofyear <= df['dt'].max().dayofyear)
|
||
last_month_mask = df['dt'] >= (df['dt'].max() - pd.Timedelta(days=30))
|
||
|
||
ytd_revenue = float(df.loc[ytd_mask, 'amount'].sum())
|
||
last_ytd_revenue = float(df.loc[last_ytd_mask, 'amount'].sum())
|
||
recent_revenue = float(df.loc[last_month_mask, 'amount'].sum())
|
||
recent_orders = int(df.loc[last_month_mask, 'volume'].sum())
|
||
|
||
kpi = {
|
||
'ytd_revenue': ytd_revenue,
|
||
'ytd_growth': ((ytd_revenue - last_ytd_revenue) / last_ytd_revenue) * 100 if last_ytd_revenue > 0 else 0,
|
||
'current_year': current_year,
|
||
'recent_aov': recent_revenue / recent_orders if recent_orders > 0 else 0,
|
||
'total_orders': int(monthly_stats['orders'].sum())
|
||
}
|
||
|
||
if not chart_data:
|
||
return None
|
||
return chart_data, kpi
|
||
|
||
|
||
def _get_filtered_sales_data(cache_key):
|
||
"""
|
||
🚩 共用函式:從快取讀取資料並根據 request.args 進行篩選
|
||
回傳: (target_df, cols_map, error_message)
|
||
參數: cache_key - 快取鍵值 (例如: "realtime_sales_monthly_3m")
|
||
"""
|
||
db = DatabaseManager()
|
||
|
||
# 1. 檢查資料表與快取
|
||
df = None
|
||
cols_map = {}
|
||
|
||
if cache_key in _SALES_PROCESSED_CACHE:
|
||
cache_data = _SALES_PROCESSED_CACHE[cache_key]
|
||
df = cache_data['df']
|
||
cols_map = cache_data['cols']
|
||
else:
|
||
# 快取不存在時,直接回傳錯誤讓呼叫端顯示 spinner 導回 sales_analysis
|
||
# 不在此發起全表 DB 查詢(748k 行會 hang Gunicorn worker)
|
||
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),回傳錯誤讓 UI 導回 sales_analysis")
|
||
return None, {}, f"快取未就緒,請先從業績分析主頁載入資料 (cache_key={cache_key})"
|
||
|
||
if False: # 保留舊冷快取重載邏輯(已停用,避免全表掃描 hang)
|
||
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),試圖重新從資料庫載入...")
|
||
try:
|
||
# V-Fix: 從 cache_key 提取 table_name
|
||
# 格式: realtime_sales_monthly_3m 或 realtime_sales_monthly_custom_2025-01-01_2025-01-31
|
||
if "_custom_" in cache_key:
|
||
table_name = cache_key.split('_custom_')[0] # realtime_sales_monthly
|
||
else:
|
||
# 移除最後的 _Xm 部分
|
||
parts = cache_key.rsplit('_', 1)
|
||
table_name = parts[0] if len(parts) > 1 else 'realtime_sales_monthly'
|
||
|
||
# 判斷是自訂區間還是標配區間
|
||
if "_custom_" in cache_key:
|
||
# 格式: realtime_sales_monthly_custom_2025-01-01_2025-01-31
|
||
parts = cache_key.split('_custom_')
|
||
dates = parts[1].split('_')
|
||
start_d, end_d = dates[0], dates[1]
|
||
# 呼叫資料庫讀取 (不傳入 view, 會自動處理欄位映射)
|
||
result_df, result_cols = db.get_sales_data(table_name=table_name, start_date=start_d, end_date=end_d)
|
||
else:
|
||
# 格式: realtime_sales_monthly_1m;months=0 表示全時段但上限 12 個月避免全表掃描 hang
|
||
months = int(cache_key.split('_')[-1].replace('m', '') or '12')
|
||
if months == 0:
|
||
months = 12
|
||
result_df, result_cols = db.get_sales_data(table_name=table_name, months=months)
|
||
|
||
if result_df is not None and not result_df.empty:
|
||
# V-Fix (2026-01-23): 補回所有日期維度欄位供後續篩選 (_dow, _hour, _month_str)
|
||
if '日期' in result_df.columns:
|
||
# 先轉換為 datetime
|
||
result_df['_parsed_date'] = pd.to_datetime(result_df['日期'], errors='coerce')
|
||
result_df['_month_str'] = result_df['_parsed_date'].dt.strftime('%Y-%m')
|
||
result_df['_dow'] = result_df['_parsed_date'].dt.dayofweek
|
||
|
||
# 小時需要從「時間」欄位提取
|
||
if '時間' in result_df.columns:
|
||
result_df['_hour'] = pd.to_datetime(result_df['時間'], format='%H:%M:%S', errors='coerce').dt.hour
|
||
else:
|
||
result_df['_hour'] = 0 # 如果沒有時間欄位,預設為 0
|
||
|
||
# 清理臨時欄位
|
||
result_df.drop(columns=['_parsed_date'], inplace=True, errors='ignore')
|
||
|
||
# 自動存入快取
|
||
_SALES_PROCESSED_CACHE[cache_key] = {'df': result_df, 'cols': result_cols, 'time': time.time()}
|
||
df = result_df
|
||
cols_map = result_cols
|
||
sys_log.info(f"[Sales Analysis] ✅ 快取成功自動重載 | 筆數: {len(df)}")
|
||
else:
|
||
return None, None, "資料庫無可用資料,請確認匯入狀態"
|
||
except Exception as ex:
|
||
sys_log.error(f"[Sales Analysis] 🚨 自動重載失敗: {ex}")
|
||
return None, None, f"快取失效且無法重載: {ex}"
|
||
|
||
# 恢復欄位變數
|
||
col_name = cols_map.get('name')
|
||
col_category = cols_map.get('category')
|
||
col_brand = cols_map.get('brand')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_activity = cols_map.get('activity')
|
||
col_payment = cols_map.get('payment')
|
||
col_price = cols_map.get('price')
|
||
col_date = cols_map.get('date')
|
||
col_return_qty = cols_map.get('return_qty') # V-New: 取得退貨欄位
|
||
|
||
# 2. 取得篩選參數
|
||
selected_category = request.args.get('category', 'all')
|
||
selected_brand = request.args.get('brand', 'all')
|
||
selected_vendor = request.args.get('vendor', 'all')
|
||
selected_activity = request.args.get('activity', 'all')
|
||
selected_payment = request.args.get('payment', 'all')
|
||
selected_dow = request.args.get('dow', 'all')
|
||
selected_hour = request.args.get('hour', 'all')
|
||
selected_month = request.args.get('month', 'all')
|
||
keyword = request.args.get('keyword', '').strip()
|
||
min_price = request.args.get('min_price', '')
|
||
max_price = request.args.get('max_price', '')
|
||
min_margin = request.args.get('min_margin', '')
|
||
max_margin = request.args.get('max_margin', '')
|
||
|
||
# 3. 執行篩選
|
||
target_df = df
|
||
|
||
# Top N 分類處理 (用於 '其他' 篩選)
|
||
TOP_N_CATS = 12
|
||
top_cats_names = []
|
||
if col_category:
|
||
# 注意:這裡為了效能,簡單重算一次 Top N,或可考慮也快取起來
|
||
cat_group_all = df.groupby(col_category)[cols_map.get('amount')].sum().sort_values(ascending=False)
|
||
if len(cat_group_all) > TOP_N_CATS:
|
||
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
|
||
|
||
if selected_category != 'all' and col_category:
|
||
if selected_category == '其他' and top_cats_names:
|
||
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
|
||
else:
|
||
target_df = target_df[target_df[col_category] == selected_category]
|
||
|
||
if selected_brand != 'all' and col_brand: target_df = target_df[target_df[col_brand] == selected_brand]
|
||
if selected_vendor != 'all' and col_vendor: target_df = target_df[target_df[col_vendor] == selected_vendor]
|
||
if selected_activity != 'all' and col_activity: target_df = target_df[target_df[col_activity] == selected_activity]
|
||
if selected_payment != 'all' and col_payment: target_df = target_df[target_df[col_payment] == selected_payment]
|
||
|
||
if selected_dow != 'all' and col_date: target_df = target_df[target_df['_dow'] == int(selected_dow)]
|
||
if selected_hour != 'all' and col_date: target_df = target_df[target_df['_hour'] == int(selected_hour)]
|
||
if selected_month != 'all' and col_date: target_df = target_df[target_df['_month_str'] == selected_month]
|
||
|
||
if keyword: target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
|
||
|
||
if col_price:
|
||
if min_price: target_df = target_df[target_df[col_price] >= float(min_price)]
|
||
if max_price: target_df = target_df[target_df[col_price] <= float(max_price)]
|
||
|
||
if min_margin: target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
|
||
if max_margin: target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
|
||
|
||
return target_df, cols_map, None
|
||
|
||
|
||
# ==========================================
|
||
# 頁面路由
|
||
# ==========================================
|
||
|
||
@sales_bp.route('/sales_analysis')
|
||
@login_required
|
||
def sales_analysis():
|
||
"""業績分析報表頁面"""
|
||
try:
|
||
db = DatabaseManager()
|
||
table_name = 'realtime_sales_monthly'
|
||
|
||
# 1. 檢查資料表是否存在
|
||
inspector = inspect(db.engine)
|
||
if not inspector.has_table(table_name):
|
||
return render_template('sales_analysis.html',
|
||
error="尚未匯入「即時業績(全月)」資料,請先至設定頁面匯入 Excel。",
|
||
table_name=table_name,
|
||
selected_metric='amount',
|
||
no_filter=False,
|
||
data_range_months=0,
|
||
start_date='',
|
||
end_date='',
|
||
total_records=0,
|
||
active_page='sales',
|
||
db_data_range='')
|
||
|
||
# V-Opt: 資料期間在同一批匯入後穩定,避免每次首屏都查 MIN/MAX。
|
||
db_data_range = _fetch_sales_data_range(db.engine, table_name)
|
||
|
||
# V-New: 取得篩選參數
|
||
data_range_param = request.args.get('data_range', '') # 不再設預設值
|
||
start_date = request.args.get('start_date', '')
|
||
end_date = request.args.get('end_date', '')
|
||
|
||
# V-New: 按需載入 - 如果沒有任何篩選條件,顯示引導頁面
|
||
if not data_range_param and not start_date and not end_date:
|
||
sys_log.info("[Sales Analysis] 👋 首次進入頁面,等待用戶選擇篩選條件")
|
||
preview_cache_key = "sales_analysis:page_context:" + _sales_analysis_args_fingerprint(
|
||
table_name,
|
||
'preview',
|
||
SYSTEM_VERSION,
|
||
)
|
||
cached_preview_context = (
|
||
_get_sales_page_context_cache(preview_cache_key)
|
||
or _get_sales_shared_page_context_cache(preview_cache_key)
|
||
)
|
||
if cached_preview_context:
|
||
_set_sales_page_context_cache(preview_cache_key, cached_preview_context)
|
||
return render_template('sales_analysis.html', **cached_preview_context)
|
||
|
||
preview_options = _preview_sales_filter_options(db.engine, table_name)
|
||
preview_categories = preview_options['categories']
|
||
preview_brands = preview_options['brands']
|
||
preview_vendors = preview_options['vendors']
|
||
preview_activities = preview_options['activities']
|
||
preview_payments = preview_options['payments']
|
||
preview_months = preview_options['months']
|
||
|
||
# 傳遞必要的變數以避免模板錯誤
|
||
selected_metric = request.args.get('metric', 'amount')
|
||
# 建立空的數據結構
|
||
empty_data = {'labels': [], 'chart_values': [], 'values': [], 'metric_label': ''}
|
||
preview_context = {
|
||
'no_filter': True,
|
||
'table_name': table_name,
|
||
'selected_metric': selected_metric,
|
||
'total_records': 0,
|
||
'items': [],
|
||
'kpi': {'revenue': 0, 'qty': 0, 'count': 0, 'cost': 0, 'gross_margin': 0, 'gross_margin_rate': 0, 'avg_price': 0},
|
||
'insights': {},
|
||
'abc_stats': {},
|
||
'vendor_stats': [],
|
||
'seasonality_data': {'datasets': [], 'yLabels': [], 'xLabels': []},
|
||
'bar_data': empty_data,
|
||
'cat_data': empty_data,
|
||
'category_data': empty_data,
|
||
'price_dist_data': empty_data,
|
||
'scatter_data': [],
|
||
'bcg_data': [],
|
||
'dow_data': empty_data,
|
||
'hourly_data': empty_data,
|
||
'monthly_data': empty_data,
|
||
'weekly_data': empty_data,
|
||
'heatmap_data': [],
|
||
'treemap_data': [],
|
||
'cols': {'name': True, 'amount': True, 'qty': True, 'cat': True, 'date': True,
|
||
'cost': True, 'profit': True, 'vendor': True, 'brand': True,
|
||
'return_qty': True, 'pid': True},
|
||
'all_categories': preview_categories,
|
||
'all_brands': preview_brands,
|
||
'all_vendors': preview_vendors,
|
||
'all_activities': preview_activities,
|
||
'all_payments': preview_payments,
|
||
'all_months': preview_months,
|
||
'selected_category': 'all',
|
||
'selected_brand': 'all',
|
||
'selected_vendor': 'all',
|
||
'selected_activity': 'all',
|
||
'selected_payment': 'all',
|
||
'selected_dow': 'all',
|
||
'selected_hour': 'all',
|
||
'selected_month': 'all',
|
||
'keyword': '',
|
||
'min_price': '',
|
||
'max_price': '',
|
||
'min_margin': '',
|
||
'max_margin': '',
|
||
'data_range_months': 0,
|
||
'start_date': '',
|
||
'end_date': '',
|
||
'db_data_range': db_data_range,
|
||
'active_page': 'sales',
|
||
'marketing_data': None,
|
||
}
|
||
_set_sales_page_context_cache(preview_cache_key, preview_context)
|
||
_set_sales_shared_page_context_cache(preview_cache_key, preview_context)
|
||
return render_template('sales_analysis.html', **preview_context)
|
||
|
||
# 解析 data_range_months(有篩選時才處理)
|
||
data_range_months = int(data_range_param or '0')
|
||
|
||
# V-New: 如果有自訂日期區間,則優先使用
|
||
if start_date or end_date:
|
||
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
|
||
else:
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
|
||
page_cache_key = "sales_analysis:page_context:" + _sales_analysis_args_fingerprint(
|
||
table_name,
|
||
cache_key,
|
||
SYSTEM_VERSION,
|
||
)
|
||
cached_context = (
|
||
_get_sales_page_context_cache(page_cache_key)
|
||
or _get_sales_shared_page_context_cache(page_cache_key)
|
||
)
|
||
if cached_context:
|
||
_set_sales_page_context_cache(page_cache_key, cached_context)
|
||
return render_template('sales_analysis.html', **cached_context)
|
||
|
||
# 2. 讀取與處理資料 (V-Opt: 使用二級快取機制 Raw -> Processed)
|
||
df = None
|
||
cols_map = {}
|
||
|
||
# A. 優先檢查是否已有處理好的快取 (最快)
|
||
if cache_key in _SALES_PROCESSED_CACHE:
|
||
cache_data = _SALES_PROCESSED_CACHE[cache_key]
|
||
df = cache_data['df']
|
||
cols_map = cache_data['cols']
|
||
|
||
# 恢復欄位變數
|
||
col_name = cols_map.get('name')
|
||
col_date = cols_map.get('date')
|
||
col_amount = cols_map.get('amount')
|
||
col_qty = cols_map.get('qty')
|
||
col_category = cols_map.get('category')
|
||
col_brand = cols_map.get('brand')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_activity = cols_map.get('activity')
|
||
col_payment = cols_map.get('payment')
|
||
col_pid = cols_map.get('pid') # V-New: 取得 PID 欄位
|
||
col_price = cols_map.get('price')
|
||
col_cost = cols_map.get('cost')
|
||
col_profit = cols_map.get('profit')
|
||
col_return_qty = cols_map.get('return_qty')
|
||
|
||
cached_pie_data = cache_data.get('pie_data', {'labels': [], 'chart_values': []}) # V-Opt: 讀取圓餅圖快取
|
||
else:
|
||
# B. 若無處理後快取,則從 Raw Cache 或 DB 讀取並處理
|
||
# V-Opt: 加入日期範圍篩選以減少記憶體使用
|
||
# (data_range_months 已在上方定義)
|
||
|
||
# 先讀取小樣本以識別日期欄位
|
||
sample_df = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 100", db.engine)
|
||
if sample_df.empty:
|
||
return render_template('sales_analysis.html',
|
||
error="資料表為空,請重新匯入。",
|
||
table_name=table_name,
|
||
selected_metric=request.args.get('metric', 'amount'),
|
||
no_filter=False,
|
||
data_range_months=data_range_months,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
total_records=0,
|
||
db_data_range=db_data_range,
|
||
active_page='sales',
|
||
marketing_data=None)
|
||
|
||
# 自動識別日期欄位(V-Fix: 優先匹配「日期」,因為「訂單日期」可能是固定文字)
|
||
sample_cols = sample_df.columns.tolist()
|
||
date_col_name = None
|
||
for col in sample_cols:
|
||
if any(keyword in str(col) for keyword in ['日期', '交易日期', 'Date', '訂單時間', '成立時間', '下單時間', '購買時間', '時間', 'Time', 'Created']):
|
||
date_col_name = col
|
||
break
|
||
|
||
# 根據是否有日期欄位決定查詢方式
|
||
if date_col_name:
|
||
from datetime import datetime, timedelta, timezone
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# V-New: 優先處理自訂日期區間
|
||
if start_date or end_date:
|
||
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
|
||
start_date_slash = start_date.replace('-', '/') if start_date else ''
|
||
end_date_slash = end_date.replace('-', '/') if end_date else ''
|
||
|
||
# 有自訂日期區間 - 使用 BETWEEN 或單邊範圍
|
||
if start_date and end_date:
|
||
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" BETWEEN '{start_date_slash}' AND '{end_date_slash}'"
|
||
sys_log.info(f"[Sales Analysis] 📅 使用自訂日期範圍: {start_date} ~ {end_date} (DB格式: {start_date_slash} ~ {end_date_slash})")
|
||
elif start_date:
|
||
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" >= '{start_date_slash}'"
|
||
sys_log.info(f"[Sales Analysis] 📅 使用自訂開始日期: >= {start_date} (DB格式: {start_date_slash})")
|
||
else: # only end_date
|
||
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" <= '{end_date_slash}'"
|
||
sys_log.info(f"[Sales Analysis] 📅 使用自訂結束日期: <= {end_date} (DB格式: {end_date_slash})")
|
||
elif data_range_months > 0:
|
||
# 使用相對日期範圍(最近N個月)
|
||
# V-Fix: 使用斜線格式以匹配資料庫格式
|
||
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
|
||
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" >= '{cutoff_date}'"
|
||
sys_log.info(f"[Sales Analysis] 📊 使用日期範圍篩選: 最近 {data_range_months} 個月 (>= {cutoff_date})")
|
||
else:
|
||
# data_range_months == 0,載入全部資料
|
||
sql_query = f"SELECT * FROM {table_name}"
|
||
sys_log.info(f"[Sales Analysis] 📊 載入全部資料(用戶選擇)")
|
||
else:
|
||
# 無日期欄位 - 載入全部
|
||
sql_query = f"SELECT * FROM {table_name}"
|
||
sys_log.info(f"[Sales Analysis] ⚠️ 未找到日期欄位,載入全部資料")
|
||
|
||
# V-Opt (2026-01-23): 優先使用 PostgreSQL 聚合視圖 (mv_sales_summary)
|
||
# 聚合視圖已預先計算:資料量 -20%, 大小 -73%, 欄位類型已轉換
|
||
from sqlalchemy import text as sql_text
|
||
from config import DATABASE_TYPE
|
||
|
||
use_materialized_view = False
|
||
if DATABASE_TYPE == 'postgresql':
|
||
# 檢查聚合視圖是否存在
|
||
try:
|
||
with db.engine.connect() as conn:
|
||
check_mv = conn.execute(sql_text(
|
||
"SELECT EXISTS (SELECT 1 FROM pg_matviews WHERE matviewname = 'mv_sales_summary')"
|
||
)).fetchone()
|
||
use_materialized_view = check_mv[0] if check_mv else False
|
||
except:
|
||
use_materialized_view = False
|
||
|
||
if use_materialized_view:
|
||
# 使用聚合視圖 - 欄位已標準化為英文
|
||
sys_log.info(f"[Sales Analysis] 📊 使用 PostgreSQL 聚合視圖 (mv_sales_summary)")
|
||
|
||
# 構建日期篩選條件
|
||
mv_where = ""
|
||
if start_date or end_date:
|
||
if start_date and end_date:
|
||
mv_where = f"WHERE sale_date BETWEEN '{start_date}' AND '{end_date}'"
|
||
elif start_date:
|
||
mv_where = f"WHERE sale_date >= '{start_date}'"
|
||
else:
|
||
mv_where = f"WHERE sale_date <= '{end_date}'"
|
||
elif data_range_months > 0:
|
||
from datetime import datetime, timedelta, timezone
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
cutoff = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y-%m-%d')
|
||
mv_where = f"WHERE sale_date >= '{cutoff}'"
|
||
|
||
mv_query = f"""
|
||
SELECT
|
||
sale_date as "日期",
|
||
product_id as "商品ID",
|
||
product_name as "商品名稱",
|
||
category as "商品館",
|
||
brand as "品牌",
|
||
vendor_name as "廠商名稱",
|
||
payment as "付款",
|
||
total_revenue as "總業績",
|
||
total_qty as "數量",
|
||
total_cost as "總成本",
|
||
order_count
|
||
FROM mv_sales_summary
|
||
{mv_where}
|
||
"""
|
||
df = pd.read_sql(mv_query, db.engine)
|
||
sys_log.info(f"[Sales Analysis] 📊 聚合視圖載入完成: {len(df):,} 筆記錄")
|
||
else:
|
||
# 原始邏輯:使用原始表
|
||
sys_log.info(f"[Sales Analysis] 📊 使用原始表載入...")
|
||
df = pd.read_sql(sql_query, db.engine)
|
||
sys_log.info(f"[Sales Analysis] 📊 載入完成: {len(df):,} 筆記錄")
|
||
|
||
# 聚合模式標記
|
||
is_aggregated_mode = use_materialized_view
|
||
|
||
# V-Opt: 不再快取完整 DataFrame 到 _SALES_DF_CACHE (避免記憶體累積)
|
||
# 改用輕量級處理後快取 (_SALES_PROCESSED_CACHE)
|
||
|
||
if df.empty:
|
||
return render_template('sales_analysis.html',
|
||
error="資料表為空,請重新匯入。",
|
||
table_name=table_name,
|
||
selected_metric=request.args.get('metric', 'amount'),
|
||
no_filter=False,
|
||
data_range_months=data_range_months,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
total_records=0,
|
||
db_data_range=db_data_range,
|
||
active_page='sales',
|
||
marketing_data=None)
|
||
|
||
# 3. 自動識別關鍵欄位 (模糊比對)
|
||
cols = df.columns.tolist()
|
||
def find_col(keywords):
|
||
# V-Opt: 改為優先遍歷關鍵字,確保優先匹配更精確的名稱 (例如 '廠商名稱' 優於 '廠商')
|
||
for k in keywords:
|
||
for col in cols:
|
||
if k in str(col): return col
|
||
return None
|
||
|
||
col_name = find_col(['商品名稱', '品名', 'Name', 'Product'])
|
||
col_pid = find_col(['商品ID', 'Product ID', 'ID', 'i_code', 'Item Code']) # V-New: 偵測商品ID欄位
|
||
|
||
# V-Fix: 優先匹配「日期」欄位(「訂單日期」是固定文字,不是實際日期)
|
||
col_date_part = find_col(['日期', '交易日期', 'Date', 'Day'])
|
||
col_time_part = find_col(['訂單時間', '成立時間', '下單時間', '購買時間', '時間', 'Time', 'Created'])
|
||
|
||
col_brand = find_col(['品牌', 'Brand']) # V-New: 品牌欄位
|
||
col_vendor = find_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor', 'Supplier']) # V-Opt: 優先抓取名稱
|
||
col_activity = find_col(['活動', '折扣', 'Activity', 'Campaign', 'Promotion', '專案']) # V-New: 活動欄位
|
||
col_payment = find_col(['付款方式', 'Payment', 'Pay']) # V-New: 付款方式欄位
|
||
col_price = find_col(['單價', 'Price', '價格', 'Avg Price']) # V-New: 嘗試尋找單價欄位
|
||
col_cost = find_col(['成本', 'Cost', '進價', 'Cost Price', 'Wholesale']) # V-New: 成本欄位
|
||
col_profit = find_col(['毛利', 'Profit', '利潤']) # V-New: 直接尋找毛利欄位 (若有)
|
||
col_return_qty = find_col(['退貨數量', 'Return Qty', '退貨']) # V-New: 退貨欄位
|
||
col_amount = find_col(['銷售金額', '業績', '金額', 'Amount', 'Sales', 'Total'])
|
||
col_qty = find_col(['銷售數量', '銷量', '數量', 'Qty', 'Quantity'])
|
||
col_category = find_col(['館別', '分類', 'Category'])
|
||
|
||
if not col_name or not col_amount:
|
||
return render_template('sales_analysis.html',
|
||
error=f"無法自動識別關鍵欄位 (需包含 '名稱' 與 '金額')。偵測到的欄位: {cols}",
|
||
table_name=table_name,
|
||
selected_metric=request.args.get('metric', 'amount'),
|
||
no_filter=False,
|
||
data_range_months=data_range_months,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
total_records=0,
|
||
db_data_range=db_data_range,
|
||
active_page='sales',
|
||
marketing_data=None)
|
||
|
||
# 4. 資料處理 (Heavy Lifting - 只在快取建立時執行一次)
|
||
# 確保金額與數量是數字
|
||
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
|
||
if col_qty:
|
||
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
|
||
|
||
if col_cost:
|
||
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
|
||
|
||
if col_profit:
|
||
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
|
||
|
||
if col_return_qty:
|
||
df[col_return_qty] = pd.to_numeric(df[col_return_qty], errors='coerce').fillna(0)
|
||
|
||
# V-Fix: 智慧日期時間合併邏輯 (聚合模式下跳過)
|
||
col_date = None
|
||
if not is_aggregated_mode:
|
||
if col_date_part and col_time_part:
|
||
# 兩者都有,嘗試合併
|
||
try:
|
||
df['combined_dt'] = pd.to_datetime(df[col_date_part].astype(str) + ' ' + df[col_time_part].astype(str), errors='coerce')
|
||
col_date = 'combined_dt'
|
||
except:
|
||
# 合併失敗,退回使用時間欄位 (假設包含日期) 或日期欄位
|
||
col_date = col_time_part or col_date_part
|
||
elif col_time_part:
|
||
# 只有時間欄位 (可能包含日期)
|
||
df[col_time_part] = pd.to_datetime(df[col_time_part], errors='coerce')
|
||
col_date = col_time_part
|
||
elif col_date_part:
|
||
# 只有日期欄位
|
||
df[col_date_part] = pd.to_datetime(df[col_date_part], errors='coerce')
|
||
col_date = col_date_part
|
||
|
||
# V-New: 若無明確單價欄位,則自動計算 (金額 / 數量)
|
||
if not col_price and col_amount and col_qty:
|
||
col_price = 'calculated_price'
|
||
# V-Opt: 使用 numpy 向量化運算加速 (取代 apply)
|
||
df[col_price] = np.where(df[col_qty] > 0, df[col_amount] / df[col_qty], 0)
|
||
|
||
if col_price:
|
||
df[col_price] = pd.to_numeric(df[col_price], errors='coerce').fillna(0)
|
||
|
||
# V-New: 預先計算毛利率 (Margin Rate) 用於篩選
|
||
# 邏輯: (毛利 / 金額) * 100
|
||
col_margin_rate = 'calculated_margin_rate'
|
||
with np.errstate(divide='ignore', invalid='ignore'):
|
||
if col_profit:
|
||
df[col_margin_rate] = (df[col_profit] / df[col_amount]) * 100
|
||
elif col_cost:
|
||
df[col_margin_rate] = ((df[col_amount] - df[col_cost]) / df[col_amount]) * 100
|
||
else:
|
||
df[col_margin_rate] = 0.0
|
||
# 處理無限大與 NaN (轉為 0)
|
||
df[col_margin_rate] = df[col_margin_rate].replace([np.inf, -np.inf, np.nan], 0)
|
||
|
||
# === V-Opt: 效能優化預計算 (V9.98) ===
|
||
# 1. 日期維度 (加速篩選與聚合,避免重複呼叫 .dt 存取器)
|
||
# 聚合模式下跳過日期維度計算
|
||
if col_date and not is_aggregated_mode:
|
||
df['_dow'] = df[col_date].dt.dayofweek
|
||
df['_hour'] = df[col_date].dt.hour
|
||
df['_week'] = df[col_date].dt.strftime('%G-W%V')
|
||
df['_month_str'] = df[col_date].dt.strftime('%Y-%m') # V-New: 月份維度 (YYYY-MM)
|
||
|
||
# 2. 毛利額 (加速 Top 3 分析,避免 runtime 計算)
|
||
if col_profit:
|
||
df['calculated_profit'] = df[col_profit]
|
||
elif col_cost:
|
||
df['calculated_profit'] = df[col_amount] - df[col_cost]
|
||
else:
|
||
df['calculated_profit'] = 0.0
|
||
|
||
# 3. 全站分類圓餅圖 (已移至下方使用 target_df 計算)
|
||
|
||
|
||
# 建立/更新處理後快取
|
||
cache_entry = {
|
||
'df': df,
|
||
'cols': {
|
||
'name': col_name, 'date': col_date, 'amount': col_amount,
|
||
'qty': col_qty, 'category': col_category, 'brand': col_brand,
|
||
'vendor': col_vendor, 'activity': col_activity, 'payment': col_payment,
|
||
'price': col_price, 'cost': col_cost, 'profit': col_profit,
|
||
'return_qty': col_return_qty,
|
||
'pid': col_pid # V-New: 儲存商品ID欄位
|
||
},
|
||
'pid': col_pid, # V-New: 儲存商品ID欄位
|
||
'time': time.time()
|
||
}
|
||
set_sales_processed_cache(cache_key, cache_entry, aliases=(table_name,))
|
||
|
||
# 🚩 V-Opt: 使用共用篩選函式
|
||
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
|
||
if err:
|
||
# V-Fix: 若快取失效,重新導向自己以觸發重新讀取(保留所有查詢參數)
|
||
params = {k: v for k, v in request.args.items()}
|
||
return redirect(url_for('sales.sales_analysis', **params))
|
||
|
||
# 重新取得變數 (因為 _get_filtered_sales_data 內部使用了 cols_map)
|
||
col_name = cols_map.get('name')
|
||
col_amount = cols_map.get('amount')
|
||
col_qty = cols_map.get('qty')
|
||
col_category = cols_map.get('category')
|
||
col_brand = cols_map.get('brand')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_activity = cols_map.get('activity')
|
||
col_payment = cols_map.get('payment')
|
||
col_price = cols_map.get('price')
|
||
col_cost = cols_map.get('cost')
|
||
col_profit = cols_map.get('profit')
|
||
col_return_qty = cols_map.get('return_qty')
|
||
col_date = cols_map.get('date')
|
||
col_pid = cols_map.get('pid')
|
||
|
||
all_categories = []
|
||
all_brands = []
|
||
all_vendors = []
|
||
all_activities = []
|
||
all_payments = []
|
||
all_months = []
|
||
|
||
try:
|
||
filter_options = _get_sales_filter_options(db.engine, table_name, cols_map)
|
||
all_categories = filter_options['categories']
|
||
all_brands = filter_options['brands']
|
||
all_vendors = filter_options['vendors']
|
||
all_activities = filter_options['activities']
|
||
all_payments = filter_options['payments']
|
||
all_months = filter_options['months']
|
||
except Exception as e:
|
||
sys_log.warning(f"[Sales Analysis] 從數據庫查詢下拉選項失敗: {e}")
|
||
# 如果查詢失敗,回退到從快取讀取
|
||
if cache_key in _SALES_PROCESSED_CACHE:
|
||
original_df = _SALES_PROCESSED_CACHE[cache_key]['df']
|
||
elif table_name in _SALES_PROCESSED_CACHE:
|
||
original_df = _SALES_PROCESSED_CACHE[table_name]['df']
|
||
else:
|
||
original_df = pd.DataFrame()
|
||
|
||
if not original_df.empty:
|
||
all_categories = sorted(original_df[col_category].dropna().astype(str).unique().tolist()) if col_category else []
|
||
all_brands = sorted(original_df[col_brand].dropna().astype(str).unique().tolist()) if col_brand else []
|
||
all_vendors = sorted(original_df[col_vendor].dropna().astype(str).unique().tolist()) if col_vendor else []
|
||
all_activities = sorted(original_df[col_activity].dropna().astype(str).unique().tolist()) if col_activity else []
|
||
all_payments = sorted(original_df[col_payment].dropna().astype(str).unique().tolist()) if col_payment else []
|
||
all_months = sorted(original_df['_month_str'].dropna().unique().tolist()) if col_date and '_month_str' in original_df.columns else []
|
||
|
||
# 取得前端參數供模板回填
|
||
selected_category = request.args.get('category', 'all')
|
||
selected_metric = request.args.get('metric', 'amount')
|
||
selected_brand = request.args.get('brand', 'all')
|
||
selected_vendor = request.args.get('vendor', 'all')
|
||
selected_activity = request.args.get('activity', 'all')
|
||
selected_payment = request.args.get('payment', 'all')
|
||
selected_dow = request.args.get('dow', 'all')
|
||
selected_hour = request.args.get('hour', 'all')
|
||
selected_month = request.args.get('month', 'all')
|
||
keyword = request.args.get('keyword', '').strip()
|
||
min_price = request.args.get('min_price', '')
|
||
max_price = request.args.get('max_price', '')
|
||
min_margin = request.args.get('min_margin', '')
|
||
max_margin = request.args.get('max_margin', '')
|
||
|
||
# 決定排序欄位
|
||
sort_col = col_amount
|
||
if selected_metric == 'qty' and col_qty:
|
||
sort_col = col_qty
|
||
|
||
target_df = target_df.sort_values(by=sort_col, ascending=False)
|
||
|
||
# 📊 KPI 計算 (針對篩選後的資料)
|
||
total_revenue = float(target_df[col_amount].sum())
|
||
total_qty = float(target_df[col_qty].sum()) if col_qty else 0
|
||
total_count = int(len(target_df)) # 訂單筆數
|
||
# V-Fix 2026-01-15: SKU 數應計算唯一商品數,而非記錄筆數
|
||
sku_count = int(target_df[col_name].nunique()) if col_name else total_count
|
||
|
||
# V-New: 成本與毛利計算
|
||
total_cost = float(target_df[col_cost].sum()) if col_cost else 0
|
||
if col_profit:
|
||
gross_margin = float(target_df[col_profit].sum())
|
||
else:
|
||
gross_margin = total_revenue - total_cost
|
||
|
||
gross_margin_rate = (gross_margin / total_revenue * 100) if total_revenue > 0 else 0
|
||
|
||
avg_price = total_revenue / total_qty if total_qty > 0 else 0
|
||
|
||
# 📊 V-New: 商業洞察 (Top 3 Analysis)
|
||
insights = {
|
||
'rev_cats': [], 'rev_prods': [],
|
||
'margin_cats': [], 'margin_prods': [],
|
||
'qty_cats': [], 'qty_prods': []
|
||
}
|
||
|
||
# Helper function to get top 3
|
||
# Helper function to get top 3
|
||
def get_top_3(groupby_col, metric_col, is_margin=False, is_qty=False):
|
||
if not groupby_col or not metric_col: return []
|
||
|
||
# V-Opt: 直接使用 target_df 與預計算欄位,避免 copy() 與 assign()
|
||
target_metric = metric_col
|
||
if is_margin:
|
||
target_metric = 'calculated_profit'
|
||
|
||
try:
|
||
# 直接聚合並取前3名
|
||
# V-Fix 2026-01-15: 若 groupby_col 是 list (例如 [PID, Name]),結果 index 會是 MultiIndex
|
||
grouped = target_df.groupby(groupby_col)[target_metric].sum()
|
||
|
||
def get_name(k):
|
||
# 如果是 Tuple (MultiIndex),通常最後一個是 Name,取之
|
||
return str(k[-1]) if isinstance(k, tuple) else str(k)
|
||
|
||
return [{'name': get_name(k), 'value': float(v)} for k, v in grouped.nlargest(3).items() if v > 0]
|
||
except Exception:
|
||
return []
|
||
|
||
insights['rev_cats'] = get_top_3(col_category, col_amount)
|
||
# V-Fix: 商品聚合改用 [PID, Name] 避免同名不同ID商品被合併
|
||
product_groupby = [col_pid, col_name] if col_pid else col_name
|
||
insights['rev_prods'] = get_top_3(product_groupby, col_amount)
|
||
insights['qty_cats'] = get_top_3(col_category, col_qty, is_qty=True)
|
||
insights['qty_prods'] = get_top_3(product_groupby, col_qty, is_qty=True)
|
||
|
||
if col_cost or col_profit:
|
||
insights['margin_cats'] = get_top_3(col_category, col_amount, is_margin=True)
|
||
insights['margin_prods'] = get_top_3(product_groupby, col_amount, is_margin=True)
|
||
|
||
# 📊 V-Opt: 改為橫向長條圖數據 (Top 20)
|
||
top_chart = target_df.head(20)
|
||
bar_data = {
|
||
'labels': [str(n)[:20] + '...' if len(str(n)) > 20 else str(n) for n in top_chart[col_name]], # 稍微放寬長度限制
|
||
'chart_values': [float(x) for x in top_chart[sort_col]],
|
||
'metric_label': '銷售金額 ($)' if selected_metric == 'amount' else '銷售數量'
|
||
}
|
||
|
||
# 📋 V-Opt: 列表資料改為 AJAX 載入,這裡只傳空列表以加快初始渲染
|
||
table_items = []
|
||
|
||
# 準備類別圓餅圖資料
|
||
# V-Fix: 使用 target_df (篩選後資料) 動態計算
|
||
cat_data = {'labels': [], 'chart_values': []}
|
||
if col_category and not target_df.empty:
|
||
cat_group_all = target_df.groupby(col_category)[col_amount].sum().sort_values(ascending=False)
|
||
TOP_N_CATS = 12
|
||
if len(cat_group_all) > TOP_N_CATS:
|
||
top_cats = cat_group_all.head(TOP_N_CATS)
|
||
other_val = cat_group_all.iloc[TOP_N_CATS:].sum()
|
||
cat_data['labels'] = [str(x) for x in top_cats.index.tolist()] + ['其他']
|
||
cat_data['chart_values'] = [float(x) for x in top_cats.tolist()] + [float(other_val)]
|
||
else:
|
||
cat_data['labels'] = [str(x) for x in cat_group_all.index.tolist()]
|
||
cat_data['chart_values'] = [float(x) for x in cat_group_all.tolist()]
|
||
|
||
# 📊 V-New: 價格帶分析 (Price Range Analysis)
|
||
price_dist_data = {'labels': [], 'chart_values': []}
|
||
if col_price and not target_df.empty:
|
||
# 定義價格區間 (0-500, 500-1000, 1000-2000, 2000-5000, 5000-10000, 10000+)
|
||
bins = [0, 500, 1000, 2000, 5000, 10000, float('inf')]
|
||
labels = ['0-499', '500-999', '1,000-1,999', '2,000-4,999', '5,000-9,999', '10,000+']
|
||
|
||
# V-Opt: 使用 pd.cut 進行分組,但不修改 target_df (避免污染快取)
|
||
# right=False 表示包含左邊界,例如 500 在 500-999 這一組
|
||
price_bins = pd.cut(target_df[col_price], bins=bins, labels=labels, right=False)
|
||
|
||
# 統計各區間的「銷售金額」貢獻 (直接使用外部 Series 進行 groupby)
|
||
range_group = target_df.groupby(price_bins, observed=False)[col_amount].sum()
|
||
|
||
price_dist_data['labels'] = labels
|
||
price_dist_data['chart_values'] = [float(range_group.get(l, 0)) for l in labels]
|
||
|
||
# 📊 V-New: 價格 vs 銷量 散佈圖 (Scatter Plot)
|
||
scatter_data = []
|
||
if col_price and col_qty and not target_df.empty:
|
||
# 取前 300 筆主要商品,避免圖表過於密集導致瀏覽器卡頓
|
||
scatter_source = target_df.head(300)
|
||
for _, row in scatter_source.iterrows():
|
||
# V-Fix (2026-01-23): 處理 NaN 值
|
||
price_val = row[col_price] if pd.notna(row[col_price]) else 0
|
||
qty_val = row[col_qty] if pd.notna(row[col_qty]) else 0
|
||
amt_val = row[col_amount] if pd.notna(row[col_amount]) else 0
|
||
scatter_data.append({
|
||
'x': float(price_val),
|
||
'y': float(qty_val),
|
||
'name': str(row[col_name]) if pd.notna(row[col_name]) else '',
|
||
'amt': float(amt_val) # 用於 tooltip 顯示金額
|
||
})
|
||
|
||
# 📊 V-New: BCG 矩陣分析 (BCG Matrix)
|
||
# X軸: 銷量 (Qty), Y軸: 毛利率 (Margin %)
|
||
bcg_data = {'datasets': [], 'thresholds': {'x': 0, 'y': 0}}
|
||
# V-Fix: 確保 calculated_margin_rate 欄位存在
|
||
if col_qty and (col_cost or col_profit) and not target_df.empty and 'calculated_margin_rate' in target_df.columns:
|
||
# 1. 計算閾值 (使用中位數,避免極端值影響)
|
||
# 過濾掉銷量為 0 的商品,避免干擾閾值計算
|
||
active_products = target_df[target_df[col_qty] > 0]
|
||
if not active_products.empty and 'calculated_margin_rate' in active_products.columns:
|
||
median_qty = active_products[col_qty].median()
|
||
median_margin = active_products['calculated_margin_rate'].median()
|
||
|
||
# 若中位數為 0 (例如大部分商品沒銷量),則給一個預設值以利顯示
|
||
if median_qty == 0: median_qty = 1
|
||
|
||
bcg_data['thresholds'] = {'x': float(median_qty), 'y': float(median_margin)}
|
||
|
||
# 2. 分類商品 (四象限)
|
||
# Stars (明星): High Qty, High Margin
|
||
stars = active_products[(active_products[col_qty] >= median_qty) & (active_products['calculated_margin_rate'] >= median_margin)]
|
||
# Cows (金牛): High Qty, Low Margin
|
||
cows = active_products[(active_products[col_qty] >= median_qty) & (active_products['calculated_margin_rate'] < median_margin)]
|
||
# Questions (問題): Low Qty, High Margin
|
||
questions = active_products[(active_products[col_qty] < median_qty) & (active_products['calculated_margin_rate'] >= median_margin)]
|
||
# Dogs (瘦狗): Low Qty, Low Margin
|
||
dogs = active_products[(active_products[col_qty] < median_qty) & (active_products['calculated_margin_rate'] < median_margin)]
|
||
|
||
def format_bcg_points(df_segment):
|
||
# 限制點數,避免前端卡頓 (各象限最多 100 點)
|
||
return [{'x': float(row[col_qty]), 'y': float(row['calculated_margin_rate']), 'name': str(row[col_name]), 'amt': float(row[col_amount])} for _, row in df_segment.head(100).iterrows()]
|
||
|
||
bcg_data['datasets'] = [
|
||
{'label': '明星商品 (Stars)', 'data': format_bcg_points(stars), 'backgroundColor': 'rgba(255, 206, 86, 0.8)', 'borderColor': 'rgba(255, 206, 86, 1)'}, # Yellow
|
||
{'label': '金牛商品 (Cows)', 'data': format_bcg_points(cows), 'backgroundColor': 'rgba(75, 192, 192, 0.8)', 'borderColor': 'rgba(75, 192, 192, 1)'}, # Green
|
||
{'label': '問題商品 (Questions)', 'data': format_bcg_points(questions), 'backgroundColor': 'rgba(54, 162, 235, 0.8)', 'borderColor': 'rgba(54, 162, 235, 1)'}, # Blue
|
||
{'label': '瘦狗商品 (Dogs)', 'data': format_bcg_points(dogs), 'backgroundColor': 'rgba(201, 203, 207, 0.8)', 'borderColor': 'rgba(201, 203, 207, 1)'} # Grey
|
||
]
|
||
|
||
# 📊 V-New: 時間維度分析 (Time Analysis)
|
||
dow_data = {'labels': ['週一', '週二', '週三', '週四', '週五', '週六', '週日'], 'chart_values': [0]*7}
|
||
hourly_data = {'labels': [f"{i:02d}:00" for i in range(24)], 'chart_values': [0]*24}
|
||
weekly_data = {'labels': [], 'chart_values': []} # V-New: 每週趨勢
|
||
monthly_data = {'labels': [], 'chart_values': []} # V-New: 每月趨勢
|
||
heatmap_data = [] # V-New: 多維度熱力圖 (Day x Hour)
|
||
treemap_data = [] # V-New: 板塊圖數據
|
||
|
||
if col_date:
|
||
# 過濾掉日期無效的資料
|
||
# V-Opt: 使用預計算欄位進行分組,速度更快
|
||
if not target_df.empty:
|
||
# 1. 星期分析 (Day of Week)
|
||
dow_group = target_df.groupby('_dow')[col_amount].sum()
|
||
for day, val in dow_group.items():
|
||
if not np.isnan(day):
|
||
dow_data['chart_values'][int(day)] = float(val)
|
||
|
||
# 2. 小時分析 (Hourly)
|
||
hour_group = target_df.groupby('_hour')[col_amount].sum()
|
||
for hour, val in hour_group.items():
|
||
if not np.isnan(hour):
|
||
hourly_data['chart_values'][int(hour)] = float(val)
|
||
|
||
# 3. 每月趨勢 (Monthly Trend) - V-New
|
||
month_group = target_df.groupby('_month_str')[col_amount].sum().sort_index()
|
||
monthly_data['labels'] = month_group.index.tolist()
|
||
# V-Fix (2026-01-23): 處理 NaN 值避免 JSON 序列化失敗
|
||
monthly_data['chart_values'] = [float(x) if not np.isnan(x) else 0 for x in month_group.tolist()]
|
||
|
||
# 3. 每週趨勢 (Weekly Trend) - V-New
|
||
week_group = target_df.groupby('_week')[col_amount].sum().sort_index()
|
||
# V-Opt: 解除 12 週限制,顯示完整年度趨勢 (因應一年份數據需求)
|
||
weekly_data['labels'] = week_group.index.tolist()
|
||
# V-Fix (2026-01-23): 處理 NaN 值避免 JSON 序列化失敗
|
||
weekly_data['chart_values'] = [float(x) if not np.isnan(x) else 0 for x in week_group.tolist()]
|
||
|
||
# 4. 多維度熱力圖 (Day x Hour) - V-Fix: 確保數據完整性
|
||
dh_group = target_df.groupby(['_dow', '_hour'])[col_amount].sum()
|
||
# V-Opt: 正規化氣泡大小 (Normalize Bubble Size) 以提升可讀性
|
||
max_val = dh_group.max() if not dh_group.empty else 1
|
||
|
||
for (day, hour), val in dh_group.items():
|
||
# V-Fix (2026-01-23): 處理 NaN 值
|
||
if np.isnan(val):
|
||
val = 0
|
||
# 將數值映射到 3~25px 的半徑範圍,確保視覺可辨識
|
||
radius = 3 + (math.sqrt(val) / math.sqrt(max_val)) * 22 if val > 0 else 0
|
||
heatmap_data.append({
|
||
'x': int(hour), # X軸: 小時 (0-23)
|
||
'y': int(day), # Y軸: 星期 (0-6)
|
||
'r': float(radius) if not np.isnan(radius) else 0, # V-Adj: 正規化後半徑
|
||
'v': float(val) # 實際數值 (用於 Tooltip)
|
||
})
|
||
|
||
# 📊 V-New: 板塊圖 (Treemap) 數據準備
|
||
# 結構: Root -> Category -> Product (Top 5 per cat)
|
||
if col_category and col_name and col_amount and not target_df.empty:
|
||
# V-Opt: 優化聚合邏輯,先聚合再篩選,避免在迴圈中重複過濾大表
|
||
# 1. 先聚合 Category + Product (大幅減少資料量)
|
||
cat_prod_group = target_df.groupby([col_category, col_name])[col_amount].sum().reset_index()
|
||
|
||
# 2. 找出前 10 大分類
|
||
top_cats = cat_prod_group.groupby(col_category)[col_amount].sum().nlargest(10).index.tolist()
|
||
|
||
# 3. 針對前 10 大分類,各取前 5 大商品
|
||
for cat in top_cats:
|
||
if not cat: continue
|
||
# 在縮減後的資料中篩選,速度極快
|
||
cat_subset = cat_prod_group[cat_prod_group[col_category] == cat]
|
||
top_prods = cat_subset.nlargest(5, col_amount)
|
||
|
||
for _, row in top_prods.iterrows():
|
||
# V-Fix (2026-01-23): 處理 NaN 值
|
||
amount_val = row[col_amount]
|
||
if pd.isna(amount_val):
|
||
amount_val = 0
|
||
treemap_data.append({
|
||
'category': str(cat),
|
||
'product': str(row[col_name]) if pd.notna(row[col_name]) else '',
|
||
'value': float(amount_val),
|
||
'color': get_color_for_string(str(cat)) # V-Fix: 增加顏色參數,確保與分類顏色一致且清晰
|
||
})
|
||
|
||
# 📊 V-New: ABC 分析 (Pareto Analysis) - TODO #8
|
||
# A類: 累積營收 0-80% (核心商品)
|
||
# B類: 累積營收 80-95% (次要商品)
|
||
# C類: 累積營收 95-100% (長尾商品)
|
||
abc_stats = {'A': {'count': 0, 'revenue': 0, 'pct_rev': 0, 'pct_sku': 0},
|
||
'B': {'count': 0, 'revenue': 0, 'pct_rev': 0, 'pct_sku': 0},
|
||
'C': {'count': 0, 'revenue': 0, 'pct_rev': 0, 'pct_sku': 0}}
|
||
|
||
if not target_df.empty and col_amount:
|
||
# 使用 numpy 加速累積計算
|
||
sorted_rev = target_df[col_amount].values # 已在上方排序過
|
||
cumsum_rev = np.cumsum(sorted_rev)
|
||
total_rev_abc = cumsum_rev[-1] if len(cumsum_rev) > 0 else 0
|
||
|
||
if total_rev_abc > 0:
|
||
pct_cumsum = cumsum_rev / total_rev_abc * 100
|
||
|
||
# 找出分界點索引
|
||
idx_a = np.searchsorted(pct_cumsum, 80)
|
||
idx_b = np.searchsorted(pct_cumsum, 95)
|
||
|
||
# A類: 0 ~ idx_a
|
||
count_a = idx_a + 1
|
||
rev_a = cumsum_rev[idx_a] if idx_a < len(cumsum_rev) else total_rev_abc
|
||
|
||
# B類: idx_a+1 ~ idx_b
|
||
count_b = max(0, idx_b - idx_a)
|
||
rev_b = (cumsum_rev[idx_b] - cumsum_rev[idx_a]) if idx_b < len(cumsum_rev) else (total_rev_abc - cumsum_rev[idx_a])
|
||
|
||
# C類: idx_b+1 ~ end
|
||
count_c = max(0, len(cumsum_rev) - 1 - idx_b)
|
||
rev_c = total_rev_abc - cumsum_rev[idx_b] if idx_b < len(cumsum_rev) else 0
|
||
|
||
abc_stats['A'] = {'count': int(count_a), 'revenue': float(rev_a), 'pct_rev': float(rev_a/total_rev_abc*100), 'pct_sku': float(count_a/total_count*100)}
|
||
abc_stats['B'] = {'count': int(count_b), 'revenue': float(rev_b), 'pct_rev': float(rev_b/total_rev_abc*100), 'pct_sku': float(count_b/total_count*100)}
|
||
abc_stats['C'] = {'count': int(count_c), 'revenue': float(rev_c), 'pct_rev': float(rev_c/total_rev_abc*100), 'pct_sku': float(count_c/total_count*100)}
|
||
|
||
# 📊 V-New: 廠商獲利能力排行 (Vendor Profitability) - TODO #9
|
||
vendor_stats = []
|
||
if col_vendor and col_amount and not target_df.empty:
|
||
# Group by vendor
|
||
agg_dict = {col_amount: 'sum', col_name: 'nunique'} # nunique 計算不重複商品數 (SKU)
|
||
if col_qty: agg_dict[col_qty] = 'sum' # V-New: 累加銷量
|
||
if col_profit:
|
||
agg_dict[col_profit] = 'sum'
|
||
elif col_cost:
|
||
agg_dict[col_cost] = 'sum'
|
||
|
||
# 使用 groupby 聚合
|
||
vendor_group = target_df.groupby(col_vendor).agg(agg_dict).reset_index()
|
||
|
||
# 計算毛利與毛利率
|
||
if col_profit:
|
||
vendor_group['total_profit'] = vendor_group[col_profit]
|
||
elif col_cost:
|
||
vendor_group['total_profit'] = vendor_group[col_amount] - vendor_group[col_cost]
|
||
else:
|
||
vendor_group['total_profit'] = 0
|
||
|
||
# 計算營收佔比 (Share %)
|
||
total_vendor_revenue = vendor_group[col_amount].sum()
|
||
if total_vendor_revenue > 0:
|
||
vendor_group['revenue_share'] = (vendor_group[col_amount] / total_vendor_revenue * 100)
|
||
else:
|
||
vendor_group['revenue_share'] = 0.0
|
||
|
||
# 避免除以零
|
||
vendor_group['margin_rate'] = np.where(vendor_group[col_amount] > 0, (vendor_group['total_profit'] / vendor_group[col_amount] * 100), 0)
|
||
|
||
# 計算平均客單價 (ASP)
|
||
if col_qty:
|
||
vendor_group['asp'] = np.where(vendor_group[col_qty] > 0, vendor_group[col_amount] / vendor_group[col_qty], 0)
|
||
|
||
# 排序:預設按總業績降序
|
||
vendor_group = vendor_group.sort_values(by=col_amount, ascending=False)
|
||
|
||
# 格式化輸出 (Top 100)
|
||
for _, row in vendor_group.head(100).iterrows():
|
||
vendor_stats.append({
|
||
'name': str(row[col_vendor]),
|
||
'revenue': float(row[col_amount]),
|
||
'share': float(row['revenue_share']), # V-New
|
||
'qty': float(row[col_qty]) if col_qty else 0, # V-New
|
||
'asp': float(row.get('asp', 0)), # V-New
|
||
'profit': float(row['total_profit']),
|
||
'margin_rate': float(row['margin_rate']),
|
||
'sku_count': int(row[col_name])
|
||
})
|
||
|
||
# 📊 V-New: 淡旺季熱力圖 (Seasonality Analysis) - TODO #10
|
||
seasonality_data = None
|
||
if col_date and col_category and col_amount and not target_df.empty:
|
||
# 1. 取得前 10 大分類 (避免圖表過大)
|
||
# 使用 target_df (受篩選影響),這樣可以看特定品牌下的分類季節性
|
||
top_cats_season = target_df.groupby(col_category)[col_amount].sum().nlargest(10).index.tolist()
|
||
|
||
# 2. 聚合數據 (Month x Category)
|
||
season_group = target_df[target_df[col_category].isin(top_cats_season)].groupby(['_month_str', col_category])[col_amount].sum().reset_index()
|
||
|
||
# 3. 轉換為 Bubble Chart 格式
|
||
# X軸: 月份 (需解析 _month_str 取得順序)
|
||
# Y軸: 分類 (使用 top_cats_season 的索引)
|
||
|
||
# 取得所有月份並排序
|
||
all_months_sorted = sorted(target_df['_month_str'].unique())
|
||
month_map = {m: i for i, m in enumerate(all_months_sorted)}
|
||
cat_map = {c: i for i, c in enumerate(top_cats_season)}
|
||
|
||
points = []
|
||
max_val_season = season_group[col_amount].max() if not season_group.empty else 1
|
||
|
||
for _, row in season_group.iterrows():
|
||
m_str = row['_month_str']
|
||
cat = row[col_category]
|
||
val = row[col_amount]
|
||
|
||
if m_str in month_map and cat in cat_map:
|
||
# 正規化大小 (3~25px)
|
||
radius = 3 + (math.sqrt(val) / math.sqrt(max_val_season)) * 25 if val > 0 else 0
|
||
points.append({
|
||
'x': month_map[m_str],
|
||
'y': cat_map[cat],
|
||
'r': radius,
|
||
'v': float(val),
|
||
'm': m_str,
|
||
'c': cat
|
||
})
|
||
|
||
seasonality_data = {
|
||
'datasets': [{
|
||
'label': '淡旺季熱點',
|
||
'data': points,
|
||
# 顏色將在前端動態生成
|
||
}],
|
||
'yLabels': top_cats_season,
|
||
'xLabels': all_months_sorted
|
||
}
|
||
|
||
# 📊 V-New 2026-01-15: 行銷活動業績貢獻 (Marketing Campaign Contribution)
|
||
marketing_data = None
|
||
if not target_df.empty:
|
||
marketing_data = prepare_marketing_summary(target_df, sort_by=selected_metric)
|
||
|
||
context = {
|
||
'marketing_data': marketing_data,
|
||
'items': table_items,
|
||
'kpi': {
|
||
'revenue': total_revenue,
|
||
'qty': total_qty,
|
||
'count': total_count,
|
||
'sku_count': sku_count,
|
||
'cost': total_cost,
|
||
'gross_margin': gross_margin,
|
||
'gross_margin_rate': gross_margin_rate,
|
||
'avg_price': avg_price,
|
||
},
|
||
'insights': insights,
|
||
'abc_stats': abc_stats,
|
||
'vendor_stats': vendor_stats,
|
||
'seasonality_data': seasonality_data,
|
||
'bar_data': bar_data,
|
||
'cat_data': cat_data,
|
||
'category_data': cat_data,
|
||
'price_dist_data': price_dist_data,
|
||
'scatter_data': scatter_data,
|
||
'bcg_data': bcg_data,
|
||
'dow_data': dow_data,
|
||
'hourly_data': hourly_data,
|
||
'monthly_data': monthly_data,
|
||
'weekly_data': weekly_data,
|
||
'heatmap_data': heatmap_data,
|
||
'treemap_data': treemap_data,
|
||
'all_categories': all_categories,
|
||
'all_brands': all_brands,
|
||
'all_vendors': all_vendors,
|
||
'all_activities': all_activities,
|
||
'all_payments': all_payments,
|
||
'all_months': all_months,
|
||
'selected_category': selected_category,
|
||
'selected_brand': selected_brand,
|
||
'selected_vendor': selected_vendor,
|
||
'selected_activity': selected_activity,
|
||
'selected_payment': selected_payment,
|
||
'selected_dow': selected_dow,
|
||
'selected_hour': selected_hour,
|
||
'selected_month': selected_month,
|
||
'selected_metric': selected_metric,
|
||
'keyword': keyword,
|
||
'min_price': min_price,
|
||
'max_price': max_price,
|
||
'min_margin': min_margin,
|
||
'max_margin': max_margin,
|
||
'cols': {
|
||
'name': col_name,
|
||
'amount': col_amount,
|
||
'qty': col_qty,
|
||
'cat': col_category,
|
||
'date': col_date,
|
||
'cost': col_cost,
|
||
'profit': col_profit,
|
||
'vendor': col_vendor,
|
||
'brand': col_brand,
|
||
'return_qty': col_return_qty,
|
||
'pid': col_pid,
|
||
},
|
||
'table_name': table_name,
|
||
'data_range_months': data_range_months,
|
||
'start_date': start_date,
|
||
'end_date': end_date,
|
||
'total_records': len(df),
|
||
'active_page': 'sales',
|
||
'db_data_range': db_data_range,
|
||
}
|
||
_set_sales_page_context_cache(page_cache_key, context)
|
||
_set_sales_shared_page_context_cache(page_cache_key, context)
|
||
return render_template('sales_analysis.html', **context)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"Sales Analysis Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
# 提供完整的變數以避免模板錯誤
|
||
return render_template('sales_analysis.html',
|
||
error=f"系統發生錯誤: {str(e)}",
|
||
marketing_data=None,
|
||
insights=None,
|
||
abc_stats=None,
|
||
vendor_stats=None,
|
||
seasonality_data=None,
|
||
scatter_data=None,
|
||
bcg_data=None,
|
||
dow_data=None,
|
||
hourly_data=None,
|
||
monthly_data=None,
|
||
weekly_data=None,
|
||
heatmap_data=None,
|
||
treemap_data=None,
|
||
all_categories=[],
|
||
all_brands=[], all_vendors=[], all_activities=[], all_payments=[],
|
||
all_months=[],
|
||
selected_category='all',
|
||
selected_brand='all', selected_vendor='all',
|
||
selected_activity='all', selected_payment='all',
|
||
selected_dow='all', selected_hour='all',
|
||
selected_month='all',
|
||
selected_metric=request.args.get('metric', 'amount'),
|
||
keyword='', min_price='', max_price='',
|
||
min_margin='', max_margin='',
|
||
cols={},
|
||
table_name='realtime_sales_monthly',
|
||
no_filter=False,
|
||
data_range_months=int(request.args.get('data_range', '0') or '0'),
|
||
start_date=request.args.get('start_date', ''),
|
||
end_date=request.args.get('end_date', ''),
|
||
total_records=0,
|
||
active_page='sales',
|
||
db_data_range='')
|
||
|
||
|
||
@sales_bp.route('/growth_analysis')
|
||
@login_required
|
||
def growth_analysis():
|
||
"""營運成長策略報表 (MoM, YoY, AOV, YTD) - 含快取優化"""
|
||
from services.cache_service import (
|
||
get_growth_cache, set_growth_cache, is_growth_cache_valid
|
||
)
|
||
|
||
def _render_growth_empty(message=None):
|
||
now_taipei = datetime.now(TAIPEI_TZ)
|
||
empty_chart_data, empty_kpi = _growth_empty_payload(now_taipei)
|
||
return render_template('growth_analysis.html',
|
||
chart_data=empty_chart_data,
|
||
kpi=empty_kpi,
|
||
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
|
||
active_page='growth',
|
||
cache_hit=False,
|
||
cache_age=0,
|
||
is_empty_state=True,
|
||
empty_message=message)
|
||
|
||
try:
|
||
start_time = time.time()
|
||
|
||
db = DatabaseManager()
|
||
table_name = 'realtime_sales_monthly'
|
||
inspector = inspect(db.engine)
|
||
if not inspector.has_table(table_name):
|
||
return _render_growth_empty("尚未匯入業績資料,請先完成數據匯入後再查看成長分析。")
|
||
|
||
source_fingerprint = _get_growth_source_fingerprint(db.engine, table_name)
|
||
|
||
# 檢查快取
|
||
if is_growth_cache_valid(source_fingerprint):
|
||
cache = get_growth_cache()
|
||
cache_age = int((datetime.now(TAIPEI_TZ) - cache['timestamp']).total_seconds())
|
||
sys_log.debug(f"[GrowthAnalysis] [Cache] 使用快取 | 快取年齡: {cache_age}秒")
|
||
|
||
now_taipei = datetime.now(TAIPEI_TZ)
|
||
chart_data = _attach_growth_competitor_intel(db.engine, cache['chart_data'])
|
||
return render_template('growth_analysis.html',
|
||
chart_data=chart_data,
|
||
kpi=cache['kpi'],
|
||
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
|
||
cache_hit=True,
|
||
active_page='growth',
|
||
cache_age=cache_age)
|
||
|
||
# 快取失效,重新計算
|
||
sys_log.debug("[GrowthAnalysis] [Cache] 快取失效,重新計算數據...")
|
||
|
||
try:
|
||
payload = _fetch_growth_payload_summary(db.engine)
|
||
except Exception as summary_error:
|
||
sys_log.warning(f"[GrowthAnalysis] 月結摘要聚合失敗,改走明細聚合: {summary_error}")
|
||
payload = None
|
||
|
||
if not payload:
|
||
try:
|
||
payload = _fetch_growth_payload_sql(db.engine, table_name)
|
||
except Exception as sql_error:
|
||
sys_log.warning(f"[GrowthAnalysis] SQL 聚合失敗,回退 pandas 路徑: {sql_error}")
|
||
payload = _fetch_growth_payload_pandas(db.engine, table_name)
|
||
|
||
if not payload:
|
||
return _render_growth_empty("業績資料目前沒有可分析的紀錄,請確認匯入檔案是否包含有效銷售資料。")
|
||
chart_data, kpi = payload
|
||
|
||
# 儲存快取
|
||
set_growth_cache(chart_data, kpi, source_fingerprint)
|
||
chart_data = _attach_growth_competitor_intel(db.engine, chart_data)
|
||
|
||
elapsed = time.time() - start_time
|
||
sys_log.debug(f"[GrowthAnalysis] [Cache] 數據計算完成 | 耗時: {elapsed:.3f}秒")
|
||
|
||
now_taipei = datetime.now(TAIPEI_TZ)
|
||
return render_template('growth_analysis.html',
|
||
chart_data=chart_data,
|
||
kpi=kpi,
|
||
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
|
||
active_page='growth',
|
||
cache_hit=False)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"Growth Analysis Error: {e}")
|
||
return f"系統錯誤: {e}"
|
||
|
||
|
||
@sales_bp.route('/abc_analysis/detail')
|
||
def abc_analysis_detail():
|
||
"""ABC 分析詳細報表頁面"""
|
||
try:
|
||
target_class = request.args.get('class', 'A') # 預設 A 類
|
||
table_name = 'realtime_sales_monthly'
|
||
|
||
# 1. 生成與主頁面一致的 cache_key
|
||
data_range_months = int(request.args.get('data_range', '0') or '0')
|
||
start_date = request.args.get('start_date', '')
|
||
end_date = request.args.get('end_date', '')
|
||
|
||
if start_date or end_date:
|
||
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
|
||
else:
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
|
||
# 2. 使用共用篩選函式取得資料
|
||
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
|
||
|
||
# V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
|
||
if err and table_name in _SALES_PROCESSED_CACHE:
|
||
target_df, cols_map, err = _get_filtered_sales_data(table_name)
|
||
|
||
if err:
|
||
return render_template(
|
||
'abc_analysis_detail.html',
|
||
loading_state=True,
|
||
info={'title': '數據準備中', 'desc': '業績分析快取正在重新載入。'},
|
||
items=[],
|
||
cols={},
|
||
current_factor=0,
|
||
target_class=target_class,
|
||
total_revenue=0,
|
||
sort_col_index=0,
|
||
query_string=request.query_string.decode(),
|
||
active_page='sales',
|
||
), 200
|
||
|
||
# 恢復欄位變數
|
||
col_name = cols_map.get('name')
|
||
col_amount = cols_map.get('amount')
|
||
col_qty = cols_map.get('qty')
|
||
col_category = cols_map.get('category')
|
||
col_brand = cols_map.get('brand')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_price = cols_map.get('price')
|
||
col_cost = cols_map.get('cost')
|
||
col_profit = cols_map.get('profit')
|
||
col_date = cols_map.get('date')
|
||
col_pid = cols_map.get('pid')
|
||
|
||
|
||
# 3. 執行 ABC 分類
|
||
items = []
|
||
total_revenue = 0
|
||
current_factor = 0.0
|
||
if col_amount and not target_df.empty:
|
||
# V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
|
||
agg_rules = {col_amount: 'sum'}
|
||
if col_qty: agg_rules[col_qty] = 'sum'
|
||
if col_cost: agg_rules[col_cost] = 'sum'
|
||
if col_profit: agg_rules[col_profit] = 'sum'
|
||
if col_category: agg_rules[col_category] = 'first'
|
||
if col_vendor: agg_rules[col_vendor] = 'first'
|
||
if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
|
||
if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
|
||
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
|
||
|
||
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
|
||
|
||
# 重新計算聚合後的毛利率
|
||
if col_profit:
|
||
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
|
||
elif col_cost:
|
||
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
|
||
else:
|
||
df_agg['calculated_margin_rate'] = 0.0
|
||
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
|
||
|
||
# 執行 ABC 排序與計算
|
||
df_agg = df_agg.sort_values(by=col_amount, ascending=False)
|
||
df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
|
||
total_revenue = df_agg[col_amount].sum()
|
||
df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
|
||
|
||
conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
|
||
choices = ['A', 'B']
|
||
df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
|
||
|
||
# 4. 篩選特定類別
|
||
class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
|
||
|
||
# V-New: 計算平均單價與庫存建議
|
||
if col_qty:
|
||
class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
|
||
|
||
# V-New: 處理動態補貨係數
|
||
custom_factor = request.args.get('factor')
|
||
current_factor = 0.0
|
||
|
||
if custom_factor:
|
||
try:
|
||
current_factor = float(custom_factor)
|
||
except:
|
||
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
|
||
else:
|
||
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
|
||
|
||
class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
|
||
|
||
items = class_df.to_dict('records')
|
||
|
||
# 準備標題與描述
|
||
class_info = {
|
||
'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
|
||
'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
|
||
'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
|
||
}
|
||
info = class_info.get(target_class, {'title': f'{target_class} 類', 'desc': '', 'color': 'secondary'})
|
||
|
||
# 計算 DataTables 預設排序欄位 (銷售金額) 的索引
|
||
# 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
|
||
sort_col_index = 1 # Rank
|
||
if col_pid: sort_col_index += 1
|
||
sort_col_index += 1 # Name
|
||
if col_brand: sort_col_index += 1
|
||
if col_vendor: sort_col_index += 1
|
||
if col_category: sort_col_index += 1
|
||
if col_cost or col_profit: sort_col_index += 1
|
||
if col_qty: sort_col_index += 3
|
||
# 此時 sort_col_index 即為 Amount 欄位的索引
|
||
|
||
return render_template('abc_analysis_detail.html',
|
||
items=items,
|
||
info=info,
|
||
target_class=target_class,
|
||
current_factor=current_factor, # V-New: 傳遞當前係數
|
||
total_revenue=total_revenue,
|
||
sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
|
||
cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
|
||
'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
|
||
# 傳遞當前查詢參數以供匯出連結使用
|
||
query_string=request.query_string.decode(),
|
||
active_page='sales')
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"ABC Detail Error: {e}")
|
||
return f"系統錯誤: {e}"
|
||
|
||
# ==========================================
|
||
# API 路由
|
||
# ==========================================
|
||
|
||
@sales_bp.route('/api/sales_analysis/table_data')
|
||
@login_required
|
||
def api_sales_table_data():
|
||
"""API: 取得業績分析的詳細列表資料 (Server-side AJAX) - 使用 SQL 聚合優化"""
|
||
try:
|
||
import hashlib
|
||
from datetime import datetime, timedelta, timezone
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# V-Opt: 產生查詢快取 key (根據所有篩選條件)
|
||
cache_params = request.args.to_dict()
|
||
cache_key = hashlib.md5(str(sorted(cache_params.items())).encode(), usedforsecurity=False).hexdigest()
|
||
|
||
# V-Opt: 檢查快取
|
||
if cache_key in _TABLE_DATA_CACHE:
|
||
cached = _TABLE_DATA_CACHE[cache_key]
|
||
if time.time() - cached['time'] < _TABLE_DATA_CACHE_TTL:
|
||
sys_log.debug(f"[API] Table Data: 使用快取 (key={cache_key[:8]})")
|
||
return jsonify(cached['data'])
|
||
|
||
table_name = 'realtime_sales_monthly'
|
||
data_range_months = int(request.args.get('data_range', '1') or '1')
|
||
start_date = request.args.get('start_date', '') # V-New: 自訂開始日期
|
||
end_date = request.args.get('end_date', '') # V-New: 自訂結束日期
|
||
|
||
# V-Fix: 取得所有篩選參數
|
||
category_filter = request.args.get('category', 'all')
|
||
brand_filter = request.args.get('brand', 'all') # V-Fix: 品牌篩選
|
||
vendor_filter = request.args.get('vendor', 'all') # V-Fix: 廠商篩選
|
||
activity_filter = request.args.get('activity', 'all') # V-Fix: 活動篩選
|
||
payment_filter = request.args.get('payment', 'all') # V-Fix: 付款方式篩選
|
||
month_filter = request.args.get('month', 'all')
|
||
dow_filter = request.args.get('dow', 'all') # 星期篩選
|
||
hour_filter = request.args.get('hour', 'all') # 小時篩選
|
||
min_price_str = request.args.get('min_price', '')
|
||
max_price_str = request.args.get('max_price', '')
|
||
min_margin_str = request.args.get('min_margin', '')
|
||
max_margin_str = request.args.get('max_margin', '')
|
||
keyword = request.args.get('keyword', '').strip()
|
||
|
||
db = DatabaseManager()
|
||
|
||
# V-Fix: 從快取讀取欄位名稱對應,以支援不同的資料庫欄位名稱
|
||
if start_date or end_date:
|
||
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
|
||
else:
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
|
||
# 嘗試從快取讀取欄位名稱
|
||
cols_map = {}
|
||
if cache_key in _SALES_PROCESSED_CACHE:
|
||
cols_map = _SALES_PROCESSED_CACHE[cache_key].get('cols', {})
|
||
elif table_name in _SALES_PROCESSED_CACHE: # V-Fix: 也嘗試使用固定 key
|
||
cols_map = _SALES_PROCESSED_CACHE[table_name].get('cols', {})
|
||
|
||
# 取得實際欄位名稱(如果快取中沒有,使用預設名稱)
|
||
# V-Fix (2026-01-23): 使用 or 確保不會得到 None 值
|
||
col_name = cols_map.get('name') or '商品名稱'
|
||
col_pid = cols_map.get('pid') or '商品ID'
|
||
col_brand = cols_map.get('brand') or '品牌'
|
||
col_vendor = cols_map.get('vendor') or '廠商名稱'
|
||
col_category = cols_map.get('category') or '商品館'
|
||
col_amount = cols_map.get('amount') or '總業績'
|
||
col_qty = cols_map.get('qty') or '數量'
|
||
col_cost = cols_map.get('cost') or '總成本'
|
||
col_profit = cols_map.get('profit') or '毛利'
|
||
col_return_qty = cols_map.get('return_qty') or '退貨數量'
|
||
|
||
# V-Opt: 使用純 SQL 聚合查詢,避免載入完整資料集
|
||
# 建立日期篩選條件
|
||
date_filter = ""
|
||
# V-New: 優先處理自訂日期區間
|
||
if start_date or end_date:
|
||
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
|
||
start_date_slash = start_date.replace('-', '/') if start_date else ''
|
||
end_date_slash = end_date.replace('-', '/') if end_date else ''
|
||
|
||
# V-Fix: 只使用「日期」欄位(「訂單日期」欄位是固定文字「訂單日期」,不是實際日期)
|
||
if start_date and end_date:
|
||
date_filter = f"""AND ("日期" BETWEEN '{start_date_slash}' AND '{end_date_slash}')"""
|
||
elif start_date:
|
||
date_filter = f"""AND ("日期" >= '{start_date_slash}')"""
|
||
else: # only end_date
|
||
date_filter = f"""AND ("日期" <= '{end_date_slash}')"""
|
||
elif data_range_months > 0:
|
||
# V-Fix: 使用斜線格式以匹配資料庫格式
|
||
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
|
||
# V-Fix: 只使用「日期」欄位進行篩選(「訂單日期」是固定文字,不是實際日期)
|
||
date_filter = f"""AND ("日期" >= '{cutoff_date}')"""
|
||
|
||
# V-Fix: 建立其他篩選條件
|
||
additional_filters = []
|
||
|
||
# 分類篩選
|
||
if category_filter and category_filter != 'all' and col_category:
|
||
additional_filters.append(f""""{col_category}" = '{category_filter}'""")
|
||
|
||
# V-Fix: 品牌篩選
|
||
if brand_filter and brand_filter != 'all' and col_brand:
|
||
additional_filters.append(f""""{col_brand}" = '{brand_filter}'""")
|
||
|
||
# V-Fix: 廠商篩選
|
||
if vendor_filter and vendor_filter != 'all' and col_vendor:
|
||
additional_filters.append(f""""{col_vendor}" = '{vendor_filter}'""")
|
||
|
||
# V-Fix: 活動篩選
|
||
col_activity = cols_map.get('activity')
|
||
if activity_filter and activity_filter != 'all' and col_activity:
|
||
additional_filters.append(f""""{col_activity}" = '{activity_filter}'""")
|
||
|
||
# V-Fix: 付款方式篩選
|
||
col_payment = cols_map.get('payment')
|
||
if payment_filter and payment_filter != 'all' and col_payment:
|
||
additional_filters.append(f""""{col_payment}" = '{payment_filter}'""")
|
||
|
||
# 月份篩選
|
||
if month_filter and month_filter != 'all':
|
||
# V-Fix: 月份格式例如 "2025-01",但資料庫可能使用斜線格式 "2025/01"
|
||
# 只使用「日期」欄位(「訂單日期」是固定文字,「時間」只包含時間)
|
||
month_filter_slash = month_filter.replace('-', '/') # "2025-01" -> "2025/01"
|
||
# 同時匹配橫線和斜線格式
|
||
additional_filters.append(f"""("日期" LIKE '{month_filter}%' OR "日期" LIKE '{month_filter_slash}%')""")
|
||
|
||
# 星期篩選 (需要從日期計算)
|
||
if dow_filter and dow_filter != 'all':
|
||
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite 兩種資料庫
|
||
# Pandas dt.dayofweek: 0=Monday, 6=Sunday
|
||
pandas_dow = int(dow_filter)
|
||
if DATABASE_TYPE == 'postgresql':
|
||
# PostgreSQL: EXTRACT(DOW FROM date) 0=Sunday, 6=Saturday
|
||
# Pandas 0(Mon) -> PostgreSQL 1(Mon), Pandas 6(Sun) -> PostgreSQL 0(Sun)
|
||
pg_dow = (pandas_dow + 1) % 7
|
||
# 日期格式可能是 2025/01/01,需要轉換為 YYYY-MM-DD
|
||
additional_filters.append(f"""EXTRACT(DOW FROM TO_DATE(REPLACE("日期", '/', '-'), 'YYYY-MM-DD')) = {pg_dow}""")
|
||
else:
|
||
# SQLite: strftime('%w', date) 0=Sunday, 6=Saturday
|
||
sqlite_dow = str((pandas_dow + 1) % 7)
|
||
additional_filters.append(f"""strftime('%w', replace("日期", '/', '-')) = '{sqlite_dow}'""")
|
||
|
||
# 小時篩選 (需要從時間欄位提取)
|
||
if hour_filter and hour_filter != 'all':
|
||
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite 兩種資料庫
|
||
hour_val = int(hour_filter)
|
||
if DATABASE_TYPE == 'postgresql':
|
||
# PostgreSQL: 使用 SUBSTRING 或 CAST
|
||
additional_filters.append(f"""CAST(SUBSTRING("時間" FROM 1 FOR 2) AS INTEGER) = {hour_val}""")
|
||
else:
|
||
# SQLite: 使用 substr
|
||
additional_filters.append(f"""CAST(substr("時間", 1, 2) AS INTEGER) = {hour_val}""")
|
||
|
||
# 關鍵字篩選
|
||
if keyword:
|
||
keyword_escaped = keyword.replace("'", "''") # SQL 注入防護
|
||
keyword_conditions = []
|
||
if col_name:
|
||
keyword_conditions.append(f""""{col_name}" LIKE '%{keyword_escaped}%'""")
|
||
if col_pid:
|
||
keyword_conditions.append(f""""{col_pid}" LIKE '%{keyword_escaped}%'""")
|
||
if col_brand:
|
||
keyword_conditions.append(f""""{col_brand}" LIKE '%{keyword_escaped}%'""")
|
||
if col_vendor:
|
||
keyword_conditions.append(f""""{col_vendor}" LIKE '%{keyword_escaped}%'""")
|
||
if keyword_conditions:
|
||
additional_filters.append(f"({' OR '.join(keyword_conditions)})")
|
||
|
||
# V-New: 價格區間篩選 (Price Range)
|
||
if (min_price_str or max_price_str) and col_qty and col_amount:
|
||
# 假設單價 = 總業績 / 數量 (防止除以零)
|
||
price_cal_sql = f'CAST("{col_amount}" AS FLOAT) / NULLIF("{col_qty}", 0)'
|
||
if min_price_str:
|
||
additional_filters.append(f"{price_cal_sql} >= {float(min_price_str)}")
|
||
if max_price_str:
|
||
additional_filters.append(f"{price_cal_sql} <= {float(max_price_str)}")
|
||
|
||
# V-New: 毛利率區間篩選 (Margin Range)
|
||
if (min_margin_str or max_margin_str) and col_amount:
|
||
# 計算毛利額 SQL
|
||
if col_profit:
|
||
profit_cal_sql = f'"{col_profit}"'
|
||
elif col_cost:
|
||
profit_cal_sql = f'("{col_amount}" - "{col_cost}")'
|
||
else:
|
||
profit_cal_sql = "0"
|
||
|
||
# 計算毛利率 SQL: (毛利 / 業績) * 100
|
||
margin_cal_sql = f'({profit_cal_sql} * 100.0 / NULLIF("{col_amount}", 0))'
|
||
|
||
if min_margin_str:
|
||
additional_filters.append(f"{margin_cal_sql} >= {float(min_margin_str)}")
|
||
if max_margin_str:
|
||
additional_filters.append(f"{margin_cal_sql} <= {float(max_margin_str)}")
|
||
|
||
# 組合所有篩選條件
|
||
all_filters = date_filter
|
||
if additional_filters:
|
||
all_filters += " AND " + " AND ".join(additional_filters)
|
||
|
||
# SQL 聚合查詢 - 直接在資料庫層級完成聚合
|
||
# V-Fix: 使用動態欄位名稱
|
||
group_by_cols = []
|
||
if col_pid: group_by_cols.append(f'"{col_pid}"')
|
||
if col_name: group_by_cols.append(f'"{col_name}"')
|
||
if col_brand: group_by_cols.append(f'"{col_brand}"')
|
||
if col_vendor: group_by_cols.append(f'"{col_vendor}"')
|
||
if col_category: group_by_cols.append(f'"{col_category}"')
|
||
group_by_clause = ', '.join(group_by_cols) if group_by_cols else '"商品ID"'
|
||
|
||
sql_query = f"""
|
||
SELECT
|
||
{f'"{col_pid}" as product_id' if col_pid else "'未知' as product_id"},
|
||
{f'"{col_name}" as name' if col_name else "'未知' as name"},
|
||
{f'"{col_brand}" as brand' if col_brand else "'' as brand"},
|
||
{f'"{col_vendor}" as vendor' if col_vendor else "'' as vendor"},
|
||
{f'"{col_category}" as category' if col_category else "'' as category"},
|
||
{f'SUM(CAST("{col_amount}" AS REAL)) as amount' if col_amount else '0 as amount'},
|
||
{f'SUM(CAST("{col_qty}" AS REAL)) as qty' if col_qty else '0 as qty'},
|
||
{f'SUM(CAST("{col_cost}" AS REAL)) as cost' if col_cost else '0 as cost'},
|
||
{f'SUM(CAST("{col_return_qty}" AS REAL)) as return_qty' if col_return_qty else '0 as return_qty'},
|
||
COUNT(*) as order_count
|
||
FROM {table_name}
|
||
WHERE 1=1 {all_filters}
|
||
GROUP BY {group_by_clause}
|
||
ORDER BY amount DESC
|
||
LIMIT 300
|
||
"""
|
||
|
||
df_agg = pd.read_sql(sql_query, db.engine)
|
||
sys_log.info(f"[API] Table Data: SQL聚合查詢返回 {len(df_agg)} 筆商品 (篩選: category={category_filter}, month={month_filter}, dow={dow_filter}, hour={hour_filter}, keyword={keyword})")
|
||
|
||
if df_agg.empty:
|
||
return jsonify({'data': []})
|
||
|
||
# 計算衍生欄位
|
||
df_agg['margin_rate'] = ((df_agg['amount'] - df_agg['cost']) / df_agg['amount'] * 100).fillna(0)
|
||
df_agg['margin_rate'] = df_agg['margin_rate'].replace([np.inf, -np.inf], 0)
|
||
df_agg['avg_price'] = (df_agg['amount'] / df_agg['qty']).fillna(0)
|
||
df_agg['return_rate'] = (df_agg['return_qty'] / df_agg['qty'] * 100).fillna(0)
|
||
|
||
# V-Fix: 應用價格區間篩選 (在計算欄位後才能篩選)
|
||
if min_price_str:
|
||
try:
|
||
min_price = float(min_price_str)
|
||
df_agg = df_agg[df_agg['avg_price'] >= min_price]
|
||
except ValueError:
|
||
pass
|
||
|
||
if max_price_str:
|
||
try:
|
||
max_price = float(max_price_str)
|
||
df_agg = df_agg[df_agg['avg_price'] <= max_price]
|
||
except ValueError:
|
||
pass
|
||
|
||
# V-Fix: 應用毛利區間篩選 (在計算欄位後才能篩選)
|
||
if min_margin_str:
|
||
try:
|
||
min_margin = float(min_margin_str)
|
||
df_agg = df_agg[df_agg['margin_rate'] >= min_margin]
|
||
except ValueError:
|
||
pass
|
||
|
||
if max_margin_str:
|
||
try:
|
||
max_margin = float(max_margin_str)
|
||
df_agg = df_agg[df_agg['margin_rate'] <= max_margin]
|
||
except ValueError:
|
||
pass
|
||
|
||
# 重新排序並限制到 300 筆 (減少前端渲染負擔)
|
||
df_agg = df_agg.sort_values('amount', ascending=False).head(300)
|
||
|
||
# V-Opt: 使用向量化操作取代逐列迴圈
|
||
df_agg['rank'] = range(1, len(df_agg) + 1)
|
||
df_agg['month_str'] = '' # SQL聚合模式不需要月份字串
|
||
|
||
# 重新命名欄位以符合前端格式
|
||
result_df = df_agg.rename(columns={
|
||
'product_id': 'product_id',
|
||
'name': 'name',
|
||
'brand': 'brand',
|
||
'vendor': 'vendor',
|
||
'category': 'category',
|
||
'margin_rate': 'margin_rate',
|
||
'avg_price': 'avg_price',
|
||
'return_rate': 'return_rate',
|
||
'qty': 'qty',
|
||
'amount': 'amount'
|
||
})
|
||
|
||
# 選擇需要的欄位並轉換為字典列表
|
||
columns = ['rank', 'product_id', 'name', 'brand', 'vendor', 'category',
|
||
'margin_rate', 'month_str', 'avg_price', 'return_rate', 'qty', 'amount']
|
||
|
||
# V-Fix (2026-01-23): 確保所有數值欄位無 NaN/Infinity,避免 JSON 序列化失敗
|
||
numeric_cols = ['margin_rate', 'avg_price', 'return_rate', 'qty', 'amount']
|
||
for col in numeric_cols:
|
||
if col in result_df.columns:
|
||
result_df[col] = result_df[col].replace([np.inf, -np.inf], 0).fillna(0)
|
||
|
||
# V-Fix (2026-01-23): 確保字串欄位無 None,避免 JSON 序列化失敗
|
||
string_cols = ['product_id', 'name', 'brand', 'vendor', 'category', 'month_str']
|
||
for col in string_cols:
|
||
if col in result_df.columns:
|
||
result_df[col] = result_df[col].fillna('').astype(str)
|
||
|
||
data = result_df[columns].to_dict('records')
|
||
|
||
response_data = {'data': data}
|
||
|
||
# V-Opt: 儲存到快取
|
||
_TABLE_DATA_CACHE[cache_key] = {'data': response_data, 'time': time.time()}
|
||
|
||
# V-Opt: 清理過期快取 (保留最近 50 個)
|
||
if len(_TABLE_DATA_CACHE) > 50:
|
||
sorted_keys = sorted(_TABLE_DATA_CACHE.keys(),
|
||
key=lambda k: _TABLE_DATA_CACHE[k]['time'])
|
||
for old_key in sorted_keys[:-50]:
|
||
del _TABLE_DATA_CACHE[old_key]
|
||
|
||
return jsonify(response_data)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[API] Table Data Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@sales_bp.route('/api/sales_analysis/table_data_pandas')
|
||
@login_required
|
||
def api_sales_table_data_pandas():
|
||
"""API: 取得業績分析的詳細列表資料 (使用 pandas 聚合 - 舊版本)"""
|
||
try:
|
||
table_name = 'realtime_sales_monthly'
|
||
data_range_months = int(request.args.get('data_range', '1'))
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
|
||
|
||
if err or target_df is None:
|
||
sys_log.warning(f"[API] Table Data: 快取不存在 ({cache_key}),返回空資料")
|
||
return jsonify({'data': []})
|
||
|
||
if target_df.empty:
|
||
return jsonify({'data': []})
|
||
|
||
col_name = cols_map.get('name')
|
||
col_amount = cols_map.get('amount')
|
||
col_qty = cols_map.get('qty')
|
||
col_cost = cols_map.get('cost')
|
||
col_profit = cols_map.get('profit')
|
||
col_category = cols_map.get('category')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_date = cols_map.get('date')
|
||
col_brand = cols_map.get('brand')
|
||
col_return_qty = cols_map.get('return_qty')
|
||
|
||
selected_metric = request.args.get('metric', 'amount')
|
||
|
||
# 執行聚合 (V-Opt: 多維度聚合,增加精確度)
|
||
agg_rules = {col_amount: 'sum'}
|
||
if col_qty: agg_rules[col_qty] = 'sum'
|
||
if col_cost: agg_rules[col_cost] = 'sum'
|
||
if col_profit: agg_rules[col_profit] = 'sum'
|
||
if col_return_qty: agg_rules[col_return_qty] = 'sum'
|
||
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
|
||
|
||
# Group By 鍵值:商品名稱 + 品牌 + 廠商 + 分類 (確保唯一性)
|
||
group_cols = [col_name]
|
||
if col_brand: group_cols.append(col_brand)
|
||
if col_vendor: group_cols.append(col_vendor)
|
||
if col_category: group_cols.append(col_category)
|
||
|
||
df_agg = target_df.groupby(group_cols).agg(agg_rules).reset_index()
|
||
|
||
# 計算毛利率
|
||
if col_profit:
|
||
df_agg['agg_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
|
||
elif col_cost:
|
||
df_agg['agg_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
|
||
else:
|
||
df_agg['agg_margin_rate'] = 0.0
|
||
df_agg['agg_margin_rate'] = df_agg['agg_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
|
||
|
||
# V-New: 計算平均單價與退貨率
|
||
if col_qty:
|
||
df_agg['avg_price'] = (df_agg[col_amount] / df_agg[col_qty]).fillna(0)
|
||
if col_return_qty:
|
||
df_agg['return_rate'] = (df_agg[col_return_qty] / df_agg[col_qty] * 100).fillna(0)
|
||
|
||
# 排序
|
||
sort_col_agg = col_amount
|
||
if selected_metric == 'qty' and col_qty:
|
||
sort_col_agg = col_qty
|
||
|
||
df_agg = df_agg.sort_values(by=sort_col_agg, ascending=False).head(1000) # 限制前 1000 筆
|
||
|
||
# 轉換為 DataTables 需要的格式
|
||
data = []
|
||
for i, row in enumerate(df_agg.to_dict('records')):
|
||
data.append({
|
||
'rank': i + 1,
|
||
'name': row.get(col_name, ''),
|
||
'brand': row.get(col_brand, ''),
|
||
'vendor': row.get(col_vendor, ''),
|
||
'category': row.get(col_category, ''),
|
||
'margin_rate': row.get('agg_margin_rate', 0),
|
||
'month_str': row.get('_month_str', ''),
|
||
'avg_price': row.get('avg_price', 0),
|
||
'return_rate': row.get('return_rate', 0),
|
||
'qty': row.get(col_qty, 0),
|
||
'amount': row.get(col_amount, 0)
|
||
})
|
||
|
||
return jsonify({'data': data})
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"Table Data API Error: {e}")
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@sales_bp.route('/api/sales_analysis/top_detail')
|
||
@login_required
|
||
def api_sales_top_detail():
|
||
"""API: 取得 Top N 詳細列表(業績貢獻王/獲利金雞母/人氣引流款)"""
|
||
try:
|
||
from datetime import datetime, timedelta, timezone
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
table_name = 'realtime_sales_monthly'
|
||
data_range_months = int(request.args.get('data_range', '1') or '1')
|
||
start_date = request.args.get('start_date', '') # V-New: 自訂開始日期
|
||
end_date = request.args.get('end_date', '') # V-New: 自訂結束日期
|
||
top_type = request.args.get('type', 'revenue') # revenue/margin/quantity
|
||
metric = request.args.get('metric', 'amount') # amount/profit/qty
|
||
view_type = request.args.get('view', 'product') # product/category
|
||
|
||
db = DatabaseManager()
|
||
|
||
# V-Fix: 從快取讀取欄位名稱對應,以支援不同的資料庫欄位名稱
|
||
if start_date or end_date:
|
||
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
|
||
else:
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
|
||
# 嘗試從快取讀取欄位名稱
|
||
cols_map = {}
|
||
if cache_key in _SALES_PROCESSED_CACHE:
|
||
cols_map = _SALES_PROCESSED_CACHE[cache_key].get('cols', {})
|
||
elif table_name in _SALES_PROCESSED_CACHE: # V-Fix: 也嘗試使用固定 key
|
||
cols_map = _SALES_PROCESSED_CACHE[table_name].get('cols', {})
|
||
|
||
# 取得實際欄位名稱(如果快取中沒有,使用預設名稱)
|
||
# V-Fix (2026-01-23): 使用 or 確保不會得到 None 值
|
||
col_name = cols_map.get('name') or '商品名稱'
|
||
col_brand = cols_map.get('brand') or '品牌'
|
||
col_vendor = cols_map.get('vendor') or '廠商名稱'
|
||
col_category = cols_map.get('category') or '商品館'
|
||
col_amount = cols_map.get('amount') or '總業績'
|
||
col_qty = cols_map.get('qty') or '數量'
|
||
col_cost = cols_map.get('cost') or '總成本'
|
||
col_profit = cols_map.get('profit') # 可以為 None
|
||
col_activity = cols_map.get('activity') or '活動名稱'
|
||
col_payment = cols_map.get('payment') or '付款方式'
|
||
|
||
# 建立日期篩選條件
|
||
date_filter = ""
|
||
# V-New: 優先處理自訂日期區間
|
||
if start_date or end_date:
|
||
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
|
||
start_date_slash = start_date.replace('-', '/') if start_date else ''
|
||
end_date_slash = end_date.replace('-', '/') if end_date else ''
|
||
|
||
if start_date and end_date:
|
||
date_filter = f"""AND "日期" BETWEEN '{start_date_slash}' AND '{end_date_slash}'"""
|
||
elif start_date:
|
||
date_filter = f"""AND "日期" >= '{start_date_slash}'"""
|
||
else: # only end_date
|
||
date_filter = f"""AND "日期" <= '{end_date_slash}'"""
|
||
elif data_range_months > 0:
|
||
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
|
||
date_filter = f"""AND "日期" >= '{cutoff_date}'"""
|
||
|
||
# V-Fix: 補上其他所有篩選條件 (與 get_sales_table_data 一致)
|
||
category_filter = request.args.get('category', 'all')
|
||
brand_filter = request.args.get('brand', 'all')
|
||
vendor_filter = request.args.get('vendor', 'all')
|
||
activity_filter = request.args.get('activity', 'all')
|
||
payment_filter = request.args.get('payment', 'all')
|
||
month_filter = request.args.get('month', 'all')
|
||
dow_filter = request.args.get('dow', 'all')
|
||
hour_filter = request.args.get('hour', 'all')
|
||
min_price_str = request.args.get('min_price', '')
|
||
max_price_str = request.args.get('max_price', '')
|
||
min_margin_str = request.args.get('min_margin', '')
|
||
max_margin_str = request.args.get('max_margin', '')
|
||
keyword = request.args.get('keyword', '').strip()
|
||
|
||
additional_filters = []
|
||
|
||
if category_filter and category_filter != 'all':
|
||
additional_filters.append(f""""{col_category}" = '{category_filter}'""")
|
||
if brand_filter and brand_filter != 'all':
|
||
additional_filters.append(f""""{col_brand}" = '{brand_filter}'""")
|
||
if vendor_filter and vendor_filter != 'all':
|
||
additional_filters.append(f""""{col_vendor}" = '{vendor_filter}'""")
|
||
if activity_filter and activity_filter != 'all':
|
||
additional_filters.append(f""""{col_activity}" = '{activity_filter}'""")
|
||
if payment_filter and payment_filter != 'all':
|
||
additional_filters.append(f""""{col_payment}" = '{payment_filter}'""")
|
||
|
||
# 時間維度
|
||
if month_filter and month_filter != 'all':
|
||
month_filter_slash = month_filter.replace('-', '/')
|
||
# 使用「日期」欄位 (這似乎是系統內部固定欄位,不需 dynamic map,除非資料表結構也變了)
|
||
# 假設 "日期" 是固定欄位
|
||
additional_filters.append(f"""("日期" LIKE '{month_filter}%' OR "日期" LIKE '{month_filter_slash}%')""")
|
||
|
||
if dow_filter and dow_filter != 'all':
|
||
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (top_detail API)
|
||
pandas_dow = int(dow_filter)
|
||
if DATABASE_TYPE == 'postgresql':
|
||
pg_dow = (pandas_dow + 1) % 7
|
||
additional_filters.append(f"""EXTRACT(DOW FROM TO_DATE(REPLACE("日期", '/', '-'), 'YYYY-MM-DD')) = {pg_dow}""")
|
||
else:
|
||
sqlite_dow = str((pandas_dow + 1) % 7)
|
||
additional_filters.append(f"""strftime('%w', replace("日期", '/', '-')) = '{sqlite_dow}'""")
|
||
|
||
if hour_filter and hour_filter != 'all':
|
||
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (top_detail API)
|
||
hour_val = int(hour_filter)
|
||
if DATABASE_TYPE == 'postgresql':
|
||
additional_filters.append(f"""CAST(SUBSTRING("時間" FROM 1 FOR 2) AS INTEGER) = {hour_val}""")
|
||
else:
|
||
additional_filters.append(f"""CAST(substr("時間", 1, 2) AS INTEGER) = {hour_val}""")
|
||
|
||
# 關鍵字
|
||
if keyword:
|
||
keyword_escaped = keyword.replace("'", "''")
|
||
k_conds = []
|
||
for col in [col_name, cols_map.get("pid", "商品ID"), col_brand, col_vendor]:
|
||
k_conds.append(f""""{col}" LIKE '%{keyword_escaped}%'""")
|
||
additional_filters.append(f"({' OR '.join(k_conds)})")
|
||
|
||
if (min_price_str or max_price_str):
|
||
price_sql = f'CAST("{col_amount}" AS FLOAT) / NULLIF("{col_qty}", 0)'
|
||
if min_price_str: additional_filters.append(f"{price_sql} >= {float(min_price_str)}")
|
||
if max_price_str: additional_filters.append(f"{price_sql} <= {float(max_price_str)}")
|
||
|
||
if (min_margin_str or max_margin_str):
|
||
if col_profit:
|
||
profit_sql = f'"{col_profit}"'
|
||
else:
|
||
profit_sql = f'("{col_amount}" - "{col_cost}")'
|
||
|
||
margin_sql = f'({profit_sql} * 100.0 / NULLIF("{col_amount}", 0))'
|
||
if min_margin_str: additional_filters.append(f"{margin_sql} >= {float(min_margin_str)}")
|
||
if max_margin_str: additional_filters.append(f"{margin_sql} <= {float(max_margin_str)}")
|
||
|
||
if additional_filters:
|
||
date_filter += " AND " + " AND ".join(additional_filters)
|
||
|
||
# V-New: 準備利潤計算 SQL 片段 (SELECT 子句使用 SUM 聚合)
|
||
if col_profit:
|
||
profit_select_sql = f'SUM(CAST("{col_profit}" AS REAL))'
|
||
else:
|
||
profit_select_sql = f'SUM(CAST("{col_amount}" AS REAL)) - SUM(CAST("{col_cost}" AS REAL))'
|
||
|
||
# 根據檢視類型和指標建立 SQL 查詢
|
||
if view_type == 'category':
|
||
# 分類排行
|
||
if metric == 'qty':
|
||
sql_query = f"""
|
||
SELECT
|
||
"{col_category}" as name,
|
||
SUM(CAST("{col_qty}" AS REAL)) as value
|
||
FROM {table_name}
|
||
WHERE "{col_category}" IS NOT NULL {date_filter}
|
||
GROUP BY "{col_category}"
|
||
ORDER BY value DESC
|
||
LIMIT 50
|
||
"""
|
||
elif metric == 'profit':
|
||
sql_query = f"""
|
||
SELECT
|
||
"{col_category}" as name,
|
||
{profit_select_sql} as value,
|
||
CASE
|
||
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
|
||
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
|
||
ELSE 0
|
||
END as margin_rate
|
||
FROM {table_name}
|
||
WHERE "{col_category}" IS NOT NULL {date_filter}
|
||
GROUP BY "{col_category}"
|
||
ORDER BY value DESC
|
||
LIMIT 50
|
||
"""
|
||
else: # amount
|
||
sql_query = f"""
|
||
SELECT
|
||
"{col_category}" as name,
|
||
SUM(CAST("{col_amount}" AS REAL)) as value
|
||
FROM {table_name}
|
||
WHERE "{col_category}" IS NOT NULL {date_filter}
|
||
GROUP BY "{col_category}"
|
||
ORDER BY value DESC
|
||
LIMIT 50
|
||
"""
|
||
else:
|
||
# 商品排行(包含商品ID)
|
||
pid_col_sql = f'"{cols_map.get("pid", "商品ID")}"' # 商品ID 欄位
|
||
if metric == 'qty':
|
||
sql_query = f"""
|
||
SELECT
|
||
{pid_col_sql} as product_id,
|
||
"{col_name}" as name,
|
||
"{col_brand}" as brand,
|
||
"{col_vendor}" as vendor,
|
||
"{col_category}" as category,
|
||
SUM(CAST("{col_qty}" AS REAL)) as value
|
||
FROM {table_name}
|
||
WHERE "{col_name}" IS NOT NULL {date_filter}
|
||
GROUP BY {pid_col_sql}, "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
|
||
ORDER BY value DESC
|
||
LIMIT 100
|
||
"""
|
||
elif metric == 'profit':
|
||
sql_query = f"""
|
||
SELECT
|
||
{pid_col_sql} as product_id,
|
||
"{col_name}" as name,
|
||
"{col_brand}" as brand,
|
||
"{col_vendor}" as vendor,
|
||
"{col_category}" as category,
|
||
{profit_select_sql} as value,
|
||
CASE
|
||
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
|
||
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
|
||
ELSE 0
|
||
END as margin_rate
|
||
FROM {table_name}
|
||
WHERE "{col_name}" IS NOT NULL {date_filter}
|
||
GROUP BY {pid_col_sql}, "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
|
||
ORDER BY value DESC
|
||
LIMIT 100
|
||
"""
|
||
else: # amount
|
||
sql_query = f"""
|
||
SELECT
|
||
{pid_col_sql} as product_id,
|
||
"{col_name}" as name,
|
||
"{col_brand}" as brand,
|
||
"{col_vendor}" as vendor,
|
||
"{col_category}" as category,
|
||
SUM(CAST("{col_amount}" AS REAL)) as value
|
||
FROM {table_name}
|
||
WHERE "{col_name}" IS NOT NULL {date_filter}
|
||
GROUP BY {pid_col_sql}, "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
|
||
ORDER BY value DESC
|
||
LIMIT 100
|
||
"""
|
||
|
||
# 執行查詢
|
||
df = pd.read_sql(sql_query, db.engine)
|
||
sys_log.info(f"[API] Top Detail: {top_type}/{view_type} 返回 {len(df)} 筆資料")
|
||
|
||
if df.empty:
|
||
return jsonify({'items': []})
|
||
|
||
# V-Fix (2026-01-23): 確保數值欄位無 NaN/Infinity,避免 JSON 序列化失敗
|
||
numeric_cols = ['value', 'margin_rate']
|
||
for col in numeric_cols:
|
||
if col in df.columns:
|
||
df[col] = df[col].replace([np.inf, -np.inf], 0).fillna(0)
|
||
|
||
# V-Fix (2026-01-23): 確保字串欄位無 None
|
||
string_cols = ['product_id', 'name', 'brand', 'vendor', 'category']
|
||
for col in string_cols:
|
||
if col in df.columns:
|
||
df[col] = df[col].fillna('').astype(str)
|
||
|
||
# 轉換為 JSON
|
||
items = df.to_dict('records')
|
||
return jsonify({'items': items})
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[API] Top Detail Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@sales_bp.route('/api/sales_analysis/export_top_detail')
|
||
@login_required
|
||
def api_export_top_detail():
|
||
"""API: 匯出 Top N 詳細列表為 Excel"""
|
||
try:
|
||
from datetime import datetime, timedelta, timezone
|
||
import io
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
table_name = 'realtime_sales_monthly'
|
||
data_range_months = int(request.args.get('data_range', '1') or '1')
|
||
start_date = request.args.get('start_date', '') # V-New: 自訂開始日期
|
||
end_date = request.args.get('end_date', '') # V-New: 自訂結束日期
|
||
top_type = request.args.get('type', 'revenue')
|
||
metric = request.args.get('metric', 'amount')
|
||
view_type = request.args.get('view', 'product')
|
||
|
||
db = DatabaseManager()
|
||
|
||
# V-Fix: 從快取讀取欄位名稱對應,以支援不同的資料庫欄位名稱
|
||
if start_date or end_date:
|
||
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
|
||
else:
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
|
||
# 嘗試從快取讀取欄位名稱
|
||
cols_map = {}
|
||
if cache_key in _SALES_PROCESSED_CACHE:
|
||
cols_map = _SALES_PROCESSED_CACHE[cache_key].get('cols', {})
|
||
elif table_name in _SALES_PROCESSED_CACHE: # V-Fix: 也嘗試使用固定 key
|
||
cols_map = _SALES_PROCESSED_CACHE[table_name].get('cols', {})
|
||
|
||
# 取得實際欄位名稱(如果快取中沒有,使用預設名稱)
|
||
# V-Fix (2026-01-23): 使用 or 確保不會得到 None 值
|
||
col_name = cols_map.get('name') or '商品名稱'
|
||
col_brand = cols_map.get('brand') or '品牌'
|
||
col_vendor = cols_map.get('vendor') or '廠商名稱'
|
||
col_category = cols_map.get('category') or '商品館'
|
||
col_amount = cols_map.get('amount') or '總業績'
|
||
col_qty = cols_map.get('qty') or '數量'
|
||
col_cost = cols_map.get('cost') or '總成本'
|
||
col_profit = cols_map.get('profit') # 可以為 None
|
||
col_activity = cols_map.get('activity') or '活動名稱'
|
||
col_payment = cols_map.get('payment') or '付款方式'
|
||
|
||
# 建立日期篩選條件
|
||
date_filter = ""
|
||
# V-New: 優先處理自訂日期區間
|
||
if start_date or end_date:
|
||
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
|
||
start_date_slash = start_date.replace('-', '/') if start_date else ''
|
||
end_date_slash = end_date.replace('-', '/') if end_date else ''
|
||
|
||
if start_date and end_date:
|
||
date_filter = f"""AND "日期" BETWEEN '{start_date_slash}' AND '{end_date_slash}'"""
|
||
elif start_date:
|
||
date_filter = f"""AND "日期" >= '{start_date_slash}'"""
|
||
else: # only end_date
|
||
date_filter = f"""AND "日期" <= '{end_date_slash}'"""
|
||
elif data_range_months > 0:
|
||
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
|
||
date_filter = f"""AND "日期" >= '{cutoff_date}'"""
|
||
|
||
# V-Fix: 補上其他所有篩選條件 (與 get_top_detail 一致)
|
||
category_filter = request.args.get('category', 'all')
|
||
brand_filter = request.args.get('brand', 'all')
|
||
vendor_filter = request.args.get('vendor', 'all')
|
||
activity_filter = request.args.get('activity', 'all')
|
||
payment_filter = request.args.get('payment', 'all')
|
||
month_filter = request.args.get('month', 'all')
|
||
dow_filter = request.args.get('dow', 'all')
|
||
hour_filter = request.args.get('hour', 'all')
|
||
min_price_str = request.args.get('min_price', '')
|
||
max_price_str = request.args.get('max_price', '')
|
||
min_margin_str = request.args.get('min_margin', '')
|
||
max_margin_str = request.args.get('max_margin', '')
|
||
keyword = request.args.get('keyword', '').strip()
|
||
|
||
additional_filters = []
|
||
|
||
if category_filter and category_filter != 'all':
|
||
additional_filters.append(f""""{col_category}" = '{category_filter}'""")
|
||
if brand_filter and brand_filter != 'all':
|
||
additional_filters.append(f""""{col_brand}" = '{brand_filter}'""")
|
||
if vendor_filter and vendor_filter != 'all':
|
||
additional_filters.append(f""""{col_vendor}" = '{vendor_filter}'""")
|
||
if activity_filter and activity_filter != 'all':
|
||
additional_filters.append(f""""{col_activity}" = '{activity_filter}'""")
|
||
if payment_filter and payment_filter != 'all':
|
||
additional_filters.append(f""""{col_payment}" = '{payment_filter}'""")
|
||
|
||
if month_filter and month_filter != 'all':
|
||
month_filter_slash = month_filter.replace('-', '/')
|
||
additional_filters.append(f"""("日期" LIKE '{month_filter}%' OR "日期" LIKE '{month_filter_slash}%')""")
|
||
|
||
if dow_filter and dow_filter != 'all':
|
||
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (export API)
|
||
pandas_dow = int(dow_filter)
|
||
if DATABASE_TYPE == 'postgresql':
|
||
pg_dow = (pandas_dow + 1) % 7
|
||
additional_filters.append(f"""EXTRACT(DOW FROM TO_DATE(REPLACE("日期", '/', '-'), 'YYYY-MM-DD')) = {pg_dow}""")
|
||
else:
|
||
sqlite_dow = str((pandas_dow + 1) % 7)
|
||
additional_filters.append(f"""strftime('%w', replace("日期", '/', '-')) = '{sqlite_dow}'""")
|
||
|
||
if hour_filter and hour_filter != 'all':
|
||
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (export API)
|
||
hour_val = int(hour_filter)
|
||
if DATABASE_TYPE == 'postgresql':
|
||
additional_filters.append(f"""CAST(SUBSTRING("時間" FROM 1 FOR 2) AS INTEGER) = {hour_val}""")
|
||
else:
|
||
additional_filters.append(f"""CAST(substr("時間", 1, 2) AS INTEGER) = {hour_val}""")
|
||
|
||
if keyword:
|
||
keyword_escaped = keyword.replace("'", "''")
|
||
k_conds = []
|
||
for col in [col_name, cols_map.get("pid", "商品ID"), col_brand, col_vendor]:
|
||
k_conds.append(f""""{col}" LIKE '%{keyword_escaped}%'""")
|
||
additional_filters.append(f"({' OR '.join(k_conds)})")
|
||
|
||
if (min_price_str or max_price_str):
|
||
price_sql = f'CAST("{col_amount}" AS FLOAT) / NULLIF("{col_qty}", 0)'
|
||
if min_price_str: additional_filters.append(f"{price_sql} >= {float(min_price_str)}")
|
||
if max_price_str: additional_filters.append(f"{price_sql} <= {float(max_price_str)}")
|
||
|
||
if (min_margin_str or max_margin_str):
|
||
if col_profit:
|
||
profit_sql = f'"{col_profit}"'
|
||
else:
|
||
profit_sql = f'("{col_amount}" - "{col_cost}")'
|
||
|
||
margin_sql = f'({profit_sql} * 100.0 / NULLIF("{col_amount}", 0))'
|
||
if min_margin_str: additional_filters.append(f"{margin_sql} >= {float(min_margin_str)}")
|
||
if max_margin_str: additional_filters.append(f"{margin_sql} <= {float(max_margin_str)}")
|
||
|
||
if additional_filters:
|
||
date_filter += " AND " + " AND ".join(additional_filters)
|
||
|
||
# V-New: 準備利潤計算 SQL 片段 (SELECT 子句使用 SUM 聚合)
|
||
if col_profit:
|
||
profit_select_sql = f'SUM(CAST("{col_profit}" AS REAL))'
|
||
else:
|
||
profit_select_sql = f'SUM(CAST("{col_amount}" AS REAL)) - SUM(CAST("{col_cost}" AS REAL))'
|
||
|
||
# 根據檢視類型和指標建立 SQL 查詢(與上面相同)
|
||
if view_type == 'category':
|
||
if metric == 'qty':
|
||
sql_query = f"""
|
||
SELECT
|
||
"{col_category}" as 分類名稱,
|
||
SUM(CAST("{col_qty}" AS REAL)) as 銷售數量
|
||
FROM {table_name}
|
||
WHERE "{col_category}" IS NOT NULL {date_filter}
|
||
GROUP BY "{col_category}"
|
||
ORDER BY 銷售數量 DESC
|
||
LIMIT 50
|
||
"""
|
||
elif metric == 'profit':
|
||
sql_query = f"""
|
||
SELECT
|
||
"{col_category}" as 分類名稱,
|
||
{profit_select_sql} as 毛利金額,
|
||
CASE
|
||
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
|
||
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
|
||
ELSE 0
|
||
END as 毛利率
|
||
FROM {table_name}
|
||
WHERE "{col_category}" IS NOT NULL {date_filter}
|
||
GROUP BY "{col_category}"
|
||
ORDER BY 毛利金額 DESC
|
||
LIMIT 50
|
||
"""
|
||
else: # amount
|
||
sql_query = f"""
|
||
SELECT
|
||
"{col_category}" as 分類名稱,
|
||
SUM(CAST("{col_amount}" AS REAL)) as 銷售金額
|
||
FROM {table_name}
|
||
WHERE "{col_category}" IS NOT NULL {date_filter}
|
||
GROUP BY "{col_category}"
|
||
ORDER BY 銷售金額 DESC
|
||
LIMIT 50
|
||
"""
|
||
else:
|
||
# 商品排行(包含商品ID)
|
||
if metric == 'qty':
|
||
sql_query = f"""
|
||
SELECT
|
||
"{cols_map.get("pid", "商品ID")}" as 商品ID,
|
||
"{col_name}" as 商品名稱,
|
||
"{col_brand}" as 品牌,
|
||
"{col_vendor}" as 廠商名稱,
|
||
"{col_category}" as 分類名稱,
|
||
SUM(CAST("{col_qty}" AS REAL)) as 銷售數量
|
||
FROM {table_name}
|
||
WHERE "{col_name}" IS NOT NULL {date_filter}
|
||
GROUP BY "{cols_map.get("pid", "商品ID")}", "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
|
||
ORDER BY 銷售數量 DESC
|
||
LIMIT 100
|
||
"""
|
||
elif metric == 'profit':
|
||
sql_query = f"""
|
||
SELECT
|
||
"{cols_map.get("pid", "商品ID")}" as 商品ID,
|
||
"{col_name}" as 商品名稱,
|
||
"{col_brand}" as 品牌,
|
||
"{col_vendor}" as 廠商名稱,
|
||
"{col_category}" as 分類名稱,
|
||
{profit_select_sql} as 毛利金額,
|
||
CASE
|
||
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
|
||
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
|
||
ELSE 0
|
||
END as 毛利率
|
||
FROM {table_name}
|
||
WHERE "{col_name}" IS NOT NULL {date_filter}
|
||
GROUP BY "{cols_map.get("pid", "商品ID")}", "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
|
||
ORDER BY 毛利金額 DESC
|
||
LIMIT 100
|
||
"""
|
||
else: # amount
|
||
sql_query = f"""
|
||
SELECT
|
||
"{cols_map.get("pid", "商品ID")}" as 商品ID,
|
||
"{col_name}" as 商品名稱,
|
||
"{col_brand}" as 品牌,
|
||
"{col_vendor}" as 廠商名稱,
|
||
"{col_category}" as 分類名稱,
|
||
SUM(CAST("{col_amount}" AS REAL)) as 銷售金額
|
||
FROM {table_name}
|
||
WHERE "{col_name}" IS NOT NULL {date_filter}
|
||
GROUP BY "{cols_map.get("pid", "商品ID")}", "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
|
||
ORDER BY 銷售金額 DESC
|
||
LIMIT 100
|
||
"""
|
||
|
||
# 執行查詢並匯出
|
||
df = pd.read_sql(sql_query, db.engine)
|
||
|
||
if df.empty:
|
||
return "無資料可匯出", 400
|
||
|
||
# 生成 Excel
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
df.to_excel(writer, index=False, sheet_name='Top排行')
|
||
output.seek(0)
|
||
|
||
# 生成檔案名稱
|
||
type_names = {'revenue': '業績貢獻王', 'margin': '獲利金雞母', 'quantity': '人氣引流款'}
|
||
view_names = {'product': '商品排行', 'category': '分類排行'}
|
||
filename = f"{type_names.get(top_type, '排行')}_{view_names.get(view_type, '')}_{datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')}.xlsx"
|
||
|
||
sys_log.info(f"[Export] Top Detail: {filename} ({len(df)} 筆)")
|
||
|
||
return send_file(
|
||
output,
|
||
as_attachment=True,
|
||
download_name=filename,
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Export] Top Detail Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
@sales_bp.route('/api/sales_analysis/yoy_comparison')
|
||
@login_required
|
||
def api_yoy_comparison():
|
||
"""
|
||
API: 年度對比分析 (YoY Comparison)
|
||
|
||
參數:
|
||
year1: 基準年 (例如 2024)
|
||
year2: 對比年 (例如 2025)
|
||
month: 月份 (可選,1-12,不帶則為全年)
|
||
metric: 指標 (revenue/qty/profit)
|
||
|
||
回傳:
|
||
JSON with year1 total, year2 total, growth rate, and monthly breakdown
|
||
"""
|
||
try:
|
||
from datetime import datetime, timedelta, timezone
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
table_name = 'realtime_sales_monthly'
|
||
year1 = request.args.get('year1', '2024')
|
||
year2 = request.args.get('year2', '2025')
|
||
month = request.args.get('month', '') # 可選,1-12
|
||
metric = request.args.get('metric', 'revenue') # revenue/qty/profit
|
||
|
||
db = DatabaseManager()
|
||
|
||
# 欄位名稱
|
||
col_amount = '總業績'
|
||
col_qty = '數量'
|
||
col_cost = '總成本'
|
||
col_date = '日期'
|
||
|
||
# 根據指標決定聚合欄位
|
||
if metric == 'qty':
|
||
agg_sql = f'SUM(CAST("{col_qty}" AS REAL))'
|
||
metric_label = '銷售數量'
|
||
elif metric == 'profit':
|
||
agg_sql = f'SUM(CAST("{col_amount}" AS REAL)) - SUM(CAST("{col_cost}" AS REAL))'
|
||
metric_label = '毛利金額'
|
||
else: # revenue
|
||
agg_sql = f'SUM(CAST("{col_amount}" AS REAL))'
|
||
metric_label = '銷售金額'
|
||
|
||
# 建立年度篩選條件
|
||
# 日期格式為 2025/01/01 或 2025-01-01
|
||
def build_year_filter(year, month_filter=''):
|
||
if month_filter:
|
||
month_str = month_filter.zfill(2)
|
||
return f"""("{col_date}" LIKE '{year}/{month_str}%' OR "{col_date}" LIKE '{year}-{month_str}%')"""
|
||
else:
|
||
return f"""("{col_date}" LIKE '{year}/%' OR "{col_date}" LIKE '{year}-%')"""
|
||
|
||
year1_filter = build_year_filter(year1, month)
|
||
year2_filter = build_year_filter(year2, month)
|
||
|
||
# 查詢年度總計
|
||
sql_year1 = f"""
|
||
SELECT {agg_sql} as total
|
||
FROM {table_name}
|
||
WHERE {year1_filter}
|
||
"""
|
||
sql_year2 = f"""
|
||
SELECT {agg_sql} as total
|
||
FROM {table_name}
|
||
WHERE {year2_filter}
|
||
"""
|
||
|
||
# V-Fix: SQLAlchemy 2.0 需要使用 text() 包裹 SQL 字串
|
||
from sqlalchemy import text
|
||
result_year1 = pd.read_sql(text(sql_year1), db.engine)
|
||
result_year2 = pd.read_sql(text(sql_year2), db.engine)
|
||
|
||
total_year1 = float(result_year1['total'].iloc[0] or 0)
|
||
total_year2 = float(result_year2['total'].iloc[0] or 0)
|
||
|
||
# 計算成長率
|
||
if total_year1 > 0:
|
||
growth_rate = ((total_year2 - total_year1) / total_year1) * 100
|
||
else:
|
||
growth_rate = 0 if total_year2 == 0 else 100
|
||
|
||
# 月度明細 (如果沒有指定月份,則查詢 12 個月的明細)
|
||
monthly_breakdown = []
|
||
if not month:
|
||
for m in range(1, 13):
|
||
m_str = str(m).zfill(2)
|
||
y1_filter = build_year_filter(year1, m_str)
|
||
y2_filter = build_year_filter(year2, m_str)
|
||
|
||
sql_m1 = f"SELECT {agg_sql} as total FROM {table_name} WHERE {y1_filter}"
|
||
sql_m2 = f"SELECT {agg_sql} as total FROM {table_name} WHERE {y2_filter}"
|
||
|
||
# V-Fix: SQLAlchemy 2.0 需要使用 text()
|
||
r1 = pd.read_sql(text(sql_m1), db.engine)
|
||
r2 = pd.read_sql(text(sql_m2), db.engine)
|
||
|
||
v1 = float(r1['total'].iloc[0] or 0)
|
||
v2 = float(r2['total'].iloc[0] or 0)
|
||
|
||
m_growth = ((v2 - v1) / v1 * 100) if v1 > 0 else (0 if v2 == 0 else 100)
|
||
|
||
monthly_breakdown.append({
|
||
'month': m,
|
||
'month_label': f'{m}月',
|
||
'year1_value': v1,
|
||
'year2_value': v2,
|
||
'growth_rate': round(m_growth, 2)
|
||
})
|
||
|
||
response = {
|
||
'year1': {
|
||
'label': f'{year1}年' + (f'{month}月' if month else ''),
|
||
'total': total_year1
|
||
},
|
||
'year2': {
|
||
'label': f'{year2}年' + (f'{month}月' if month else ''),
|
||
'total': total_year2
|
||
},
|
||
'growth_rate': round(growth_rate, 2),
|
||
'metric': metric,
|
||
'metric_label': metric_label,
|
||
'monthly_breakdown': monthly_breakdown
|
||
}
|
||
|
||
sys_log.info(f"[YoY] {year1} vs {year2}: {total_year1:,.0f} -> {total_year2:,.0f} ({growth_rate:+.1f}%)")
|
||
|
||
return jsonify(response)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[YoY] Error: {e}")
|
||
traceback.print_exc()
|
||
return jsonify({'error': str(e)}), 500
|