Files
ewoooc/routes/sales_routes.py
2026-06-25 18:05:48 +08:00

3291 lines
151 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
業績分析路由模組
包含:業績分析儀表板、成長分析、各種 API 路由
注意:此模組非常複雜,包含大量的數據處理邏輯
為避免循環依賴,部分函數使用延遲導入
"""
import hashlib
import io
import math
import os
import pickle
import time
import traceback
from datetime import datetime, timezone, timedelta
from flask import Blueprint, request, render_template, jsonify, send_file, redirect, url_for
from auth import login_required
from sqlalchemy import inspect, text
import pandas as pd
import numpy as np
from config import BASE_DIR, DATABASE_TYPE, SYSTEM_VERSION
from database.manager import DatabaseManager
from services.logger_manager import SystemLogger
from services.daily_sales_service import prepare_marketing_summary
from services.cache_manager import (
_SALES_PROCESSED_CACHE,
_SALES_OPTIONS_CACHE,
_SALES_ANALYSIS_RESULT_CACHE,
_SALES_ANALYSIS_PAGE_CACHE_DIR,
set_sales_processed_cache,
)
from utils.text_helpers import get_color_for_string
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Logger
sys_log = SystemLogger("SalesRoutes").get_logger()
# Blueprint 定義
sales_bp = Blueprint('sales', __name__)
_TABLE_DATA_CACHE = {}
_TABLE_DATA_CACHE_TTL = 60
_SALES_PREVIEW_CACHE_TTL = 600
_SALES_OPTIONS_CACHE_TTL = 1800
_SALES_PAGE_CONTEXT_CACHE_TTL = 180
_SALES_PAGE_CONTEXT_CACHE_MAX = 24
_SALES_SHARED_PAGE_CONTEXT_CACHE_TTL = 1800
# ==========================================
# 輔助函數
# ==========================================
# 共用工具改 import 自 utils去重原各 routes 各自定義已移除)
from utils.df_helpers import find_col # noqa: E402, F401
from utils.security import validate_table_name, safe_read_sql # noqa: E402, F401
def _get_timed_cache(store, key, ttl_seconds):
entry = store.get(key)
if not entry:
return None
if time.time() - entry.get('time', 0) >= ttl_seconds:
store.pop(key, None)
return None
return entry.get('data')
def _set_timed_cache(store, key, data, max_entries=64):
if len(store) >= max_entries:
oldest_key = min(store, key=lambda k: store[k].get('time', 0))
store.pop(oldest_key, None)
store[key] = {'data': data, 'time': time.time()}
def _sales_analysis_args_fingerprint(table_name, cache_key, processed_time=None):
args = tuple(
(key, tuple(values))
for key, values in sorted(request.args.lists())
)
raw = repr((table_name, cache_key, processed_time or 0, args)).encode('utf-8')
return hashlib.md5(raw).hexdigest()
def _get_sales_page_context_cache(cache_key):
return _get_timed_cache(
_SALES_ANALYSIS_RESULT_CACHE,
cache_key,
_SALES_PAGE_CONTEXT_CACHE_TTL,
)
def _set_sales_page_context_cache(cache_key, context):
_set_timed_cache(
_SALES_ANALYSIS_RESULT_CACHE,
cache_key,
context,
max_entries=_SALES_PAGE_CONTEXT_CACHE_MAX,
)
def _sales_page_context_cache_file(cache_key):
return _SALES_ANALYSIS_PAGE_CACHE_DIR / f"{cache_key}.pkl"
def _get_sales_shared_page_context_cache(cache_key):
path = _sales_page_context_cache_file(cache_key)
if not path.exists():
return None
try:
if time.time() - path.stat().st_mtime >= _SALES_SHARED_PAGE_CONTEXT_CACHE_TTL:
path.unlink(missing_ok=True)
return None
with open(path, 'rb') as f:
payload = pickle.load(f)
if payload.get('version') != SYSTEM_VERSION:
return None
return payload.get('context')
except Exception as exc:
sys_log.warning(f"[Sales Analysis] 共享頁面快取讀取失敗: {exc}")
return None
def _set_sales_shared_page_context_cache(cache_key, context):
path = _sales_page_context_cache_file(cache_key)
tmp_path = path.with_suffix(f".{os.getpid()}.tmp")
try:
os.makedirs(path.parent, exist_ok=True)
with open(tmp_path, 'wb') as f:
pickle.dump({'version': SYSTEM_VERSION, 'context': context}, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_path, path)
except Exception as exc:
sys_log.warning(f"[Sales Analysis] 共享頁面快取寫入失敗: {exc}")
try:
if tmp_path.exists():
tmp_path.unlink()
except OSError:
pass
def _format_sales_data_range(min_date, max_date):
if not min_date or not max_date:
return ''
try:
if isinstance(min_date, str):
min_date_obj = datetime.strptime(min_date.split()[0], '%Y-%m-%d')
max_date_obj = datetime.strptime(max_date.split()[0], '%Y-%m-%d')
else:
min_date_obj = min_date
max_date_obj = max_date
return f"{min_date_obj.year}{min_date_obj.month}月 ~ {max_date_obj.year}{max_date_obj.month}"
except Exception:
return f"{min_date} ~ {max_date}"
def _fetch_sales_data_range(engine, table_name):
validate_table_name(table_name)
cache_key = f"sales_analysis:data_range:{table_name}"
cached = _get_timed_cache(_SALES_OPTIONS_CACHE, cache_key, _SALES_PREVIEW_CACHE_TTL)
if cached is not None:
return cached
try:
date_query = text(f'SELECT MIN("日期") as min_date, MAX("日期") as max_date FROM "{table_name}"')
with engine.connect() as conn:
result = conn.execute(date_query).fetchone()
db_data_range = _format_sales_data_range(result[0], result[1]) if result else ''
except Exception as e:
sys_log.warning(f"[Sales Analysis] 無法取得資料期間範圍: {e}")
db_data_range = ''
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, db_data_range)
return db_data_range
def _preview_sales_filter_options(engine, table_name):
"""首次進入業績分析時的輕量下拉資料,避免每次載入都掃預覽資料。"""
validate_table_name(table_name)
cache_key = f"sales_analysis:preview_options:{table_name}"
cached = _get_timed_cache(_SALES_OPTIONS_CACHE, cache_key, _SALES_PREVIEW_CACHE_TTL)
if cached:
return cached
options = {
'categories': [],
'brands': [],
'vendors': [],
'activities': [],
'payments': [],
'months': [],
}
preview_df = pd.read_sql(f'SELECT * FROM "{table_name}" LIMIT 1000', engine)
if preview_df.empty:
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, options)
return options
cols = preview_df.columns.tolist()
def find_preview_col(keywords):
for k in keywords:
for col in cols:
if k in str(col):
return col
return None
col_category = find_preview_col(['館別', '商品館', '分類', 'Category'])
col_brand = find_preview_col(['品牌', 'Brand'])
col_vendor = find_preview_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor', 'Supplier'])
col_activity = find_preview_col(['折扣活動名稱', '折價券活動名稱', '滿額再折扣活動名稱', '活動', 'Activity', 'Campaign'])
col_payment = find_preview_col(['付款', 'Payment', 'Pay'])
col_date_part = find_preview_col(['日期', '交易日期', 'Date', 'Day'])
def clean_values(col):
if not col:
return []
return sorted([
x for x in preview_df[col].dropna().astype(str).unique().tolist()
if x and x.strip()
])
options['categories'] = clean_values(col_category)
options['brands'] = clean_values(col_brand)
options['vendors'] = clean_values(col_vendor)
options['activities'] = clean_values(col_activity)
options['payments'] = clean_values(col_payment)
if col_date_part:
try:
with engine.connect() as conn:
result = conn.execute(text(f"""
SELECT DISTINCT replace(substr("{col_date_part}", 1, 7), '/', '-') as month
FROM "{table_name}"
WHERE "{col_date_part}" IS NOT NULL AND "{col_date_part}" != ''
ORDER BY month
""")).fetchall()
options['months'] = [row[0] for row in result if row[0] and '-' in str(row[0])]
sys_log.info(f"[Sales Analysis] 預覽模式從「{col_date_part}」欄位提取到 {len(options['months'])} 個月份")
except Exception as e:
sys_log.warning(f"[Sales Analysis] 無法從「{col_date_part}」欄位提取月份: {e}")
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, options)
return options
def _get_sales_filter_options(engine, table_name, cols_map):
"""完整篩選選項共用快取;資料匯入後 clear_sales_cache() 會一起清掉。"""
validate_table_name(table_name)
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_activity = cols_map.get('activity')
col_payment = cols_map.get('payment')
col_date = cols_map.get('date')
option_cols = (col_category, col_brand, col_vendor, col_activity, col_payment, col_date)
digest = hashlib.md5(repr(option_cols).encode('utf-8')).hexdigest()
cache_key = f"sales_analysis:filter_options:{table_name}:{digest}"
cached = _get_timed_cache(_SALES_OPTIONS_CACHE, cache_key, _SALES_OPTIONS_CACHE_TTL)
if cached:
return cached
options = {
'categories': [],
'brands': [],
'vendors': [],
'activities': [],
'payments': [],
'months': [],
}
def read_distinct(conn, col):
if not col:
return []
sql = f'SELECT DISTINCT "{col}" FROM "{table_name}" WHERE "{col}" IS NOT NULL AND "{col}" <> \'\' ORDER BY "{col}"'
result = conn.execute(text(sql)).fetchall()
return [str(row[0]) for row in result if row[0]]
with engine.connect() as conn:
options['categories'] = read_distinct(conn, col_category)
options['brands'] = read_distinct(conn, col_brand)
options['vendors'] = read_distinct(conn, col_vendor)
options['activities'] = read_distinct(conn, col_activity)
options['payments'] = read_distinct(conn, col_payment)
if col_date:
date_fields = ['日期', '訂單日期', '時間']
for field in date_fields:
try:
result = conn.execute(text(f"""
SELECT DISTINCT replace(substr("{field}", 1, 7), '/', '-') as month
FROM "{table_name}"
WHERE "{field}" IS NOT NULL AND "{field}" != ''
ORDER BY month
""")).fetchall()
months = [row[0] for row in result if row[0] and '-' in str(row[0])]
if months:
options['months'] = months
sys_log.info(f"[Sales Analysis] 從欄位 {field} 提取到 {len(months)} 個月份: {months}")
break
except Exception as ex:
sys_log.warning(f"[Sales Analysis] 從欄位 {field} 提取月份失敗: {ex}")
_set_timed_cache(_SALES_OPTIONS_CACHE, cache_key, options)
return options
def _growth_empty_payload(now_taipei=None):
now_taipei = now_taipei or datetime.now(TAIPEI_TZ)
return (
{
'labels': [],
'revenue': [],
'profit': [],
'orders': [],
'aov': [],
'mom': [],
'yoy': [],
'margin_rate': []
},
{
'ytd_revenue': 0,
'ytd_growth': 0,
'current_year': now_taipei.year,
'recent_aov': 0,
'total_orders': 0
}
)
def _growth_number(value):
try:
number = float(value or 0)
except (TypeError, ValueError):
return 0.0
return number if math.isfinite(number) else 0.0
def _growth_month_iter(start_month, end_month):
year = start_month.year
month = start_month.month
while (year, month) <= (end_month.year, end_month.month):
yield datetime(year, month, 1).date()
month += 1
if month > 12:
year += 1
month = 1
def _build_growth_chart_data(monthly_rows):
monthly_values = {}
for row in monthly_rows:
month_raw = row.get('month_start')
month_dt = pd.to_datetime(month_raw, errors='coerce')
if pd.isna(month_dt):
continue
month_key = month_dt.date().replace(day=1)
monthly_values[month_key] = {
'amount': _growth_number(row.get('amount')),
'profit': _growth_number(row.get('profit')),
'orders': int(_growth_number(row.get('orders') or row.get('volume'))),
}
if not monthly_values:
return None
months = list(_growth_month_iter(min(monthly_values), max(monthly_values)))
revenue = []
profit = []
orders = []
aov = []
margin_rate = []
mom = []
yoy = []
for index, month_key in enumerate(months):
item = monthly_values.get(month_key, {'amount': 0, 'profit': 0, 'orders': 0})
amount = item['amount']
profit_value = item['profit']
order_count = item['orders']
revenue.append(amount)
profit.append(profit_value)
orders.append(order_count)
aov.append(round(amount / order_count, 0) if order_count > 0 else 0)
margin_rate.append(round((profit_value / amount) * 100, 1) if amount > 0 else 0)
prev_amount = revenue[index - 1] if index >= 1 else 0
last_year_amount = revenue[index - 12] if index >= 12 else 0
mom.append(round(((amount - prev_amount) / prev_amount) * 100, 2) if prev_amount > 0 else 0)
yoy.append(round(((amount - last_year_amount) / last_year_amount) * 100, 2) if last_year_amount > 0 else 0)
return {
'labels': [month.strftime('%Y-%m') for month in months],
'revenue': revenue,
'profit': profit,
'orders': orders,
'aov': aov,
'mom': mom,
'yoy': yoy,
'margin_rate': margin_rate
}
def _attach_growth_competitor_intel(engine, chart_data):
"""把 MOMO 低價壓力對齊 growth_analysis 月份序列。"""
if not chart_data:
return chart_data
enriched = dict(chart_data)
try:
from services.competitor_intel_repository import (
fetch_competitor_coverage,
fetch_competitor_monthly_pressure,
)
pressure = fetch_competitor_monthly_pressure(engine, months=max(len(enriched.get('labels') or []), 12))
by_month = {
label: {
'avg_gap_pct': gap,
'risk_count': risk,
'match_count': match_count,
}
for label, gap, risk, match_count in zip(
pressure.get('labels', []),
pressure.get('avg_gap_pct', []),
pressure.get('risk_count', []),
pressure.get('match_count', []),
)
}
labels = enriched.get('labels') or []
enriched['competitor_gap_pct'] = [by_month.get(label, {}).get('avg_gap_pct', 0) for label in labels]
enriched['competitor_risk_count'] = [by_month.get(label, {}).get('risk_count', 0) for label in labels]
enriched['competitor_match_count'] = [by_month.get(label, {}).get('match_count', 0) for label in labels]
enriched['competitor_coverage'] = fetch_competitor_coverage(engine)
except Exception as exc:
sys_log.warning(f"[GrowthAnalysis] PChome 競價情報串接失敗,略過: {exc}")
enriched['competitor_gap_pct'] = [0 for _ in enriched.get('labels', [])]
enriched['competitor_risk_count'] = [0 for _ in enriched.get('labels', [])]
enriched['competitor_match_count'] = [0 for _ in enriched.get('labels', [])]
enriched['competitor_coverage'] = {}
return enriched
def _build_growth_kpi(kpi_row):
if not kpi_row:
return None
max_dt = pd.to_datetime(kpi_row.get('max_date'), errors='coerce')
current_year = int(kpi_row.get('current_year') or (max_dt.year if not pd.isna(max_dt) else datetime.now(TAIPEI_TZ).year))
ytd_revenue = _growth_number(kpi_row.get('ytd_revenue'))
last_ytd_revenue = _growth_number(kpi_row.get('last_ytd_revenue'))
recent_revenue = _growth_number(kpi_row.get('recent_revenue'))
recent_orders = int(_growth_number(kpi_row.get('recent_orders')))
return {
'ytd_revenue': ytd_revenue,
'ytd_growth': ((ytd_revenue - last_ytd_revenue) / last_ytd_revenue) * 100 if last_ytd_revenue > 0 else 0,
'current_year': current_year,
'recent_aov': recent_revenue / recent_orders if recent_orders > 0 else 0,
'total_orders': int(_growth_number(kpi_row.get('total_orders')))
}
def _growth_pg_number_expr(column_name):
return f'COALESCE(NULLIF(regexp_replace("{column_name}"::text, \'[^0-9.-]\', \'\', \'g\'), \'\')::numeric, 0)'
def _fetch_growth_latest_detail_month(engine, table_name='realtime_sales_monthly'):
"""回傳業績明細表最新月份;摘要表落後時需改走明細聚合。"""
try:
if not inspect(engine).has_table(table_name):
return None
table_name = validate_table_name(table_name)
table_ref = f'"{table_name}"'
if engine.dialect.name == 'postgresql':
latest_sql = text(f"""
SELECT date_trunc('month', MAX("日期"::date))::date AS latest_month
FROM {table_ref}
WHERE "日期" IS NOT NULL
""")
else:
latest_sql = text(f"""
SELECT date(MAX("日期"), 'start of month') AS latest_month
FROM {table_ref}
WHERE "日期" IS NOT NULL
""")
with engine.connect() as conn:
latest = conn.execute(latest_sql).scalar()
latest_dt = pd.to_datetime(latest, errors='coerce')
if pd.isna(latest_dt):
return None
return latest_dt.date().replace(day=1)
except Exception as exc:
sys_log.warning(f"[GrowthAnalysis] 無法取得明細最新月份,摘要新鮮度檢查略過: {exc}")
return None
def _get_growth_source_fingerprint(engine, table_name='realtime_sales_monthly'):
"""成長分析快取指紋;匯入新資料後同一 worker 下一次 request 會重算。"""
try:
if not inspect(engine).has_table(table_name):
return None
table_name = validate_table_name(table_name)
if engine.dialect.name == 'postgresql':
from services.cache_service import (
get_growth_source_fingerprint_cache,
set_growth_source_fingerprint_cache,
)
cache_key = f"{engine.url}|{table_name}"
cached_fingerprint = get_growth_source_fingerprint_cache(cache_key)
if cached_fingerprint is not None:
return cached_fingerprint
else:
cache_key = None
table_ref = f'"{table_name}"'
if engine.dialect.name == 'postgresql':
fingerprint_sql = text(f"""
SELECT MAX("日期"::date)::text, COUNT(*)
FROM {table_ref}
WHERE "日期" IS NOT NULL
""")
else:
fingerprint_sql = text(f"""
SELECT CAST(MAX("日期") AS TEXT), COUNT(*)
FROM {table_ref}
WHERE "日期" IS NOT NULL
""")
with engine.connect() as conn:
row = conn.execute(fingerprint_sql).fetchone()
fingerprint = (row[0], row[1]) if row else None
if cache_key is not None:
set_growth_source_fingerprint_cache(cache_key, fingerprint)
return fingerprint
except Exception as exc:
sys_log.warning(f"[GrowthAnalysis] 快取指紋查詢失敗,保守沿用 TTL: {exc}")
return None
def _fetch_growth_payload_summary(engine):
"""優先使用月結摘要表,避免成長頁冷 worker 掃 70+ 萬列原始明細。"""
table_name = 'monthly_summary_analysis'
if not inspect(engine).has_table(table_name):
return None
table_ref = f'"{table_name}"'
if engine.dialect.name == 'postgresql':
summary_sql = text(f"""
SELECT
make_date("year"::int, "month"::int, 1) AS month_start,
SUM(COALESCE(sales_amt_curr, 0)) AS amount,
SUM(COALESCE(profit_amt_curr, 0)) AS profit,
SUM(COALESCE(sales_vol_curr, 0)) AS volume
FROM {table_ref}
WHERE "year" IS NOT NULL
AND "month" IS NOT NULL
GROUP BY 1
ORDER BY 1
""")
else:
summary_sql = text(f"""
SELECT
printf('%04d-%02d-01', "year", "month") AS month_start,
SUM(COALESCE(sales_amt_curr, 0)) AS amount,
SUM(COALESCE(profit_amt_curr, 0)) AS profit,
SUM(COALESCE(sales_vol_curr, 0)) AS volume
FROM {table_ref}
WHERE "year" IS NOT NULL
AND "month" IS NOT NULL
GROUP BY 1
ORDER BY 1
""")
with engine.connect() as conn:
rows = conn.execute(summary_sql).mappings().all()
chart_data = _build_growth_chart_data(rows)
if not chart_data or not chart_data['labels']:
return None
latest_label = chart_data['labels'][-1]
summary_latest_month = pd.to_datetime(f"{latest_label}-01", errors='coerce')
detail_latest_month = _fetch_growth_latest_detail_month(engine)
if detail_latest_month and not pd.isna(summary_latest_month):
if summary_latest_month.date().replace(day=1) < detail_latest_month:
sys_log.info(
"[GrowthAnalysis] 月結摘要落後明細資料,改走 realtime_sales_monthly 聚合 | "
f"summary={latest_label}, detail={detail_latest_month.strftime('%Y-%m')}"
)
return None
current_year = int(latest_label[:4])
latest_month = int(latest_label[5:7])
last_year = current_year - 1
ytd_revenue = 0.0
last_ytd_revenue = 0.0
for label, revenue in zip(chart_data['labels'], chart_data['revenue']):
year = int(label[:4])
month = int(label[5:7])
if year == current_year and month <= latest_month:
ytd_revenue += _growth_number(revenue)
elif year == last_year and month <= latest_month:
last_ytd_revenue += _growth_number(revenue)
latest_revenue = _growth_number(chart_data['revenue'][-1])
latest_volume = int(_growth_number(chart_data['orders'][-1]))
kpi = {
'ytd_revenue': ytd_revenue,
'ytd_growth': ((ytd_revenue - last_ytd_revenue) / last_ytd_revenue) * 100 if last_ytd_revenue > 0 else 0,
'current_year': current_year,
'recent_aov': latest_revenue / latest_volume if latest_volume > 0 else 0,
'total_orders': int(sum(_growth_number(value) for value in chart_data['orders']))
}
return chart_data, kpi
def _fetch_growth_payload_sql(engine, table_name):
"""用 DB 聚合取代全表 pandas 載入,降低成長分析冷啟動 TTFB。"""
table_name = validate_table_name(table_name)
table_ref = f'"{table_name}"'
if engine.dialect.name == 'postgresql':
date_expr = '"日期"::date'
amount_expr = _growth_pg_number_expr('總業績')
cost_expr = _growth_pg_number_expr('總成本')
volume_expr = _growth_pg_number_expr('數量')
monthly_sql = text(f"""
SELECT
date_trunc('month', {date_expr})::date AS month_start,
SUM({amount_expr}) AS amount,
SUM({amount_expr} - {cost_expr}) AS profit,
SUM({volume_expr}) AS orders
FROM {table_ref}
WHERE "日期" IS NOT NULL
GROUP BY 1
ORDER BY 1
""")
kpi_sql = text(f"""
WITH bounds AS (
SELECT MAX({date_expr}) AS max_date
FROM {table_ref}
WHERE "日期" IS NOT NULL
)
SELECT
EXTRACT(YEAR FROM b.max_date)::int AS current_year,
b.max_date::text AS max_date,
COALESCE(SUM(CASE
WHEN {date_expr} >= date_trunc('year', b.max_date)::date
AND {date_expr} <= b.max_date
THEN {amount_expr} ELSE 0 END), 0) AS ytd_revenue,
COALESCE(SUM(CASE
WHEN {date_expr} >= (date_trunc('year', b.max_date) - interval '1 year')::date
AND {date_expr} <= (b.max_date - interval '1 year')::date
THEN {amount_expr} ELSE 0 END), 0) AS last_ytd_revenue,
COALESCE(SUM(CASE
WHEN {date_expr} >= (b.max_date - interval '30 days')::date
AND {date_expr} <= b.max_date
THEN {amount_expr} ELSE 0 END), 0) AS recent_revenue,
COALESCE(SUM(CASE
WHEN {date_expr} >= (b.max_date - interval '30 days')::date
AND {date_expr} <= b.max_date
THEN {volume_expr} ELSE 0 END), 0) AS recent_orders,
COALESCE(SUM({volume_expr}), 0) AS total_orders
FROM {table_ref}
CROSS JOIN bounds b
WHERE "日期" IS NOT NULL
GROUP BY b.max_date
""")
else:
monthly_sql = text(f"""
SELECT
date("日期", 'start of month') AS month_start,
SUM(COALESCE(CAST("總業績" AS REAL), 0)) AS amount,
SUM(COALESCE(CAST("總業績" AS REAL), 0) - COALESCE(CAST("總成本" AS REAL), 0)) AS profit,
SUM(COALESCE(CAST("數量" AS REAL), 0)) AS orders
FROM {table_ref}
WHERE "日期" IS NOT NULL
GROUP BY 1
ORDER BY 1
""")
kpi_sql = text(f"""
WITH bounds AS (
SELECT date(MAX("日期")) AS max_date
FROM {table_ref}
WHERE "日期" IS NOT NULL
)
SELECT
CAST(strftime('%Y', b.max_date) AS INTEGER) AS current_year,
b.max_date AS max_date,
COALESCE(SUM(CASE
WHEN date("日期") >= date(b.max_date, 'start of year')
AND date("日期") <= b.max_date
THEN COALESCE(CAST("總業績" AS REAL), 0) ELSE 0 END), 0) AS ytd_revenue,
COALESCE(SUM(CASE
WHEN date("日期") >= date(b.max_date, 'start of year', '-1 year')
AND date("日期") <= date(b.max_date, '-1 year')
THEN COALESCE(CAST("總業績" AS REAL), 0) ELSE 0 END), 0) AS last_ytd_revenue,
COALESCE(SUM(CASE
WHEN date("日期") >= date(b.max_date, '-30 days')
AND date("日期") <= b.max_date
THEN COALESCE(CAST("總業績" AS REAL), 0) ELSE 0 END), 0) AS recent_revenue,
COALESCE(SUM(CASE
WHEN date("日期") >= date(b.max_date, '-30 days')
AND date("日期") <= b.max_date
THEN COALESCE(CAST("數量" AS REAL), 0) ELSE 0 END), 0) AS recent_orders,
COALESCE(SUM(COALESCE(CAST("數量" AS REAL), 0)), 0) AS total_orders
FROM {table_ref}
CROSS JOIN bounds b
WHERE "日期" IS NOT NULL
GROUP BY b.max_date
""")
with engine.connect() as conn:
monthly_rows = conn.execute(monthly_sql).mappings().all()
kpi_row = conn.execute(kpi_sql).mappings().first()
chart_data = _build_growth_chart_data(monthly_rows)
kpi = _build_growth_kpi(kpi_row)
if not chart_data or not kpi:
return None
return chart_data, kpi
def _fetch_growth_payload_pandas(engine, table_name):
req_cols = ['日期', '總業績', '訂單編號', '總成本', '數量']
df = safe_read_sql(table_name, columns=req_cols, engine=engine)
if df.empty:
return None
df['dt'] = pd.to_datetime(df['日期'], errors='coerce')
df = df.dropna(subset=['dt'])
if df.empty:
return None
df['amount'] = pd.to_numeric(df['總業績'], errors='coerce').fillna(0)
df['cost'] = pd.to_numeric(df['總成本'], errors='coerce').fillna(0)
df['volume'] = pd.to_numeric(df['數量'], errors='coerce').fillna(0)
df['profit'] = df['amount'] - df['cost']
monthly_stats = df.set_index('dt').resample('MS').agg({
'amount': 'sum',
'profit': 'sum',
'volume': 'sum'
}).rename(columns={'volume': 'orders'})
monthly_rows = [
{
'month_start': month_start,
'amount': row['amount'],
'profit': row['profit'],
'orders': row['orders'],
}
for month_start, row in monthly_stats.iterrows()
]
chart_data = _build_growth_chart_data(monthly_rows)
current_year = df['dt'].max().year
last_year = current_year - 1
ytd_mask = df['dt'].dt.year == current_year
last_ytd_mask = (df['dt'].dt.year == last_year) & (df['dt'].dt.dayofyear <= df['dt'].max().dayofyear)
last_month_mask = df['dt'] >= (df['dt'].max() - pd.Timedelta(days=30))
ytd_revenue = float(df.loc[ytd_mask, 'amount'].sum())
last_ytd_revenue = float(df.loc[last_ytd_mask, 'amount'].sum())
recent_revenue = float(df.loc[last_month_mask, 'amount'].sum())
recent_orders = int(df.loc[last_month_mask, 'volume'].sum())
kpi = {
'ytd_revenue': ytd_revenue,
'ytd_growth': ((ytd_revenue - last_ytd_revenue) / last_ytd_revenue) * 100 if last_ytd_revenue > 0 else 0,
'current_year': current_year,
'recent_aov': recent_revenue / recent_orders if recent_orders > 0 else 0,
'total_orders': int(monthly_stats['orders'].sum())
}
if not chart_data:
return None
return chart_data, kpi
def _get_filtered_sales_data(cache_key):
"""
🚩 共用函式:從快取讀取資料並根據 request.args 進行篩選
回傳: (target_df, cols_map, error_message)
參數: cache_key - 快取鍵值 (例如: "realtime_sales_monthly_3m")
"""
db = DatabaseManager()
# 1. 檢查資料表與快取
df = None
cols_map = {}
if cache_key in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[cache_key]
df = cache_data['df']
cols_map = cache_data['cols']
else:
# 快取不存在時,直接回傳錯誤讓呼叫端顯示 spinner 導回 sales_analysis
# 不在此發起全表 DB 查詢748k 行會 hang Gunicorn worker
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),回傳錯誤讓 UI 導回 sales_analysis")
return None, {}, f"快取未就緒,請先從業績分析主頁載入資料 (cache_key={cache_key})"
if False: # 保留舊冷快取重載邏輯(已停用,避免全表掃描 hang
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),試圖重新從資料庫載入...")
try:
# V-Fix: 從 cache_key 提取 table_name
# 格式: realtime_sales_monthly_3m 或 realtime_sales_monthly_custom_2025-01-01_2025-01-31
if "_custom_" in cache_key:
table_name = cache_key.split('_custom_')[0] # realtime_sales_monthly
else:
# 移除最後的 _Xm 部分
parts = cache_key.rsplit('_', 1)
table_name = parts[0] if len(parts) > 1 else 'realtime_sales_monthly'
# 判斷是自訂區間還是標配區間
if "_custom_" in cache_key:
# 格式: realtime_sales_monthly_custom_2025-01-01_2025-01-31
parts = cache_key.split('_custom_')
dates = parts[1].split('_')
start_d, end_d = dates[0], dates[1]
# 呼叫資料庫讀取 (不傳入 view, 會自動處理欄位映射)
result_df, result_cols = db.get_sales_data(table_name=table_name, start_date=start_d, end_date=end_d)
else:
# 格式: realtime_sales_monthly_1mmonths=0 表示全時段但上限 12 個月避免全表掃描 hang
months = int(cache_key.split('_')[-1].replace('m', '') or '12')
if months == 0:
months = 12
result_df, result_cols = db.get_sales_data(table_name=table_name, months=months)
if result_df is not None and not result_df.empty:
# V-Fix (2026-01-23): 補回所有日期維度欄位供後續篩選 (_dow, _hour, _month_str)
if '日期' in result_df.columns:
# 先轉換為 datetime
result_df['_parsed_date'] = pd.to_datetime(result_df['日期'], errors='coerce')
result_df['_month_str'] = result_df['_parsed_date'].dt.strftime('%Y-%m')
result_df['_dow'] = result_df['_parsed_date'].dt.dayofweek
# 小時需要從「時間」欄位提取
if '時間' in result_df.columns:
result_df['_hour'] = pd.to_datetime(result_df['時間'], format='%H:%M:%S', errors='coerce').dt.hour
else:
result_df['_hour'] = 0 # 如果沒有時間欄位,預設為 0
# 清理臨時欄位
result_df.drop(columns=['_parsed_date'], inplace=True, errors='ignore')
# 自動存入快取
_SALES_PROCESSED_CACHE[cache_key] = {'df': result_df, 'cols': result_cols, 'time': time.time()}
df = result_df
cols_map = result_cols
sys_log.info(f"[Sales Analysis] ✅ 快取成功自動重載 | 筆數: {len(df)}")
else:
return None, None, "資料庫無可用資料,請確認匯入狀態"
except Exception as ex:
sys_log.error(f"[Sales Analysis] 🚨 自動重載失敗: {ex}")
return None, None, f"快取失效且無法重載: {ex}"
# 恢復欄位變數
col_name = cols_map.get('name')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_activity = cols_map.get('activity')
col_payment = cols_map.get('payment')
col_price = cols_map.get('price')
col_date = cols_map.get('date')
col_return_qty = cols_map.get('return_qty') # V-New: 取得退貨欄位
# 2. 取得篩選參數
selected_category = request.args.get('category', 'all')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
selected_activity = request.args.get('activity', 'all')
selected_payment = request.args.get('payment', 'all')
selected_dow = request.args.get('dow', 'all')
selected_hour = request.args.get('hour', 'all')
selected_month = request.args.get('month', 'all')
keyword = request.args.get('keyword', '').strip()
min_price = request.args.get('min_price', '')
max_price = request.args.get('max_price', '')
min_margin = request.args.get('min_margin', '')
max_margin = request.args.get('max_margin', '')
# 3. 執行篩選
target_df = df
# Top N 分類處理 (用於 '其他' 篩選)
TOP_N_CATS = 12
top_cats_names = []
if col_category:
# 注意:這裡為了效能,簡單重算一次 Top N或可考慮也快取起來
cat_group_all = df.groupby(col_category)[cols_map.get('amount')].sum().sort_values(ascending=False)
if len(cat_group_all) > TOP_N_CATS:
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
if selected_category != 'all' and col_category:
if selected_category == '其他' and top_cats_names:
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
else:
target_df = target_df[target_df[col_category] == selected_category]
if selected_brand != 'all' and col_brand: target_df = target_df[target_df[col_brand] == selected_brand]
if selected_vendor != 'all' and col_vendor: target_df = target_df[target_df[col_vendor] == selected_vendor]
if selected_activity != 'all' and col_activity: target_df = target_df[target_df[col_activity] == selected_activity]
if selected_payment != 'all' and col_payment: target_df = target_df[target_df[col_payment] == selected_payment]
if selected_dow != 'all' and col_date: target_df = target_df[target_df['_dow'] == int(selected_dow)]
if selected_hour != 'all' and col_date: target_df = target_df[target_df['_hour'] == int(selected_hour)]
if selected_month != 'all' and col_date: target_df = target_df[target_df['_month_str'] == selected_month]
if keyword: target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
if col_price:
if min_price: target_df = target_df[target_df[col_price] >= float(min_price)]
if max_price: target_df = target_df[target_df[col_price] <= float(max_price)]
if min_margin: target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
if max_margin: target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
return target_df, cols_map, None
# ==========================================
# 頁面路由
# ==========================================
@sales_bp.route('/sales_analysis')
@login_required
def sales_analysis():
"""業績分析報表頁面"""
try:
db = DatabaseManager()
table_name = 'realtime_sales_monthly'
# 1. 檢查資料表是否存在
inspector = inspect(db.engine)
if not inspector.has_table(table_name):
return render_template('sales_analysis.html',
error="尚未匯入「即時業績(全月)」資料,請先至設定頁面匯入 Excel。",
table_name=table_name,
selected_metric='amount',
no_filter=False,
data_range_months=0,
start_date='',
end_date='',
total_records=0,
active_page='sales',
db_data_range='')
# V-Opt: 資料期間在同一批匯入後穩定,避免每次首屏都查 MIN/MAX。
db_data_range = _fetch_sales_data_range(db.engine, table_name)
# V-New: 取得篩選參數
data_range_param = request.args.get('data_range', '') # 不再設預設值
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
# V-New: 按需載入 - 如果沒有任何篩選條件,顯示引導頁面
if not data_range_param and not start_date and not end_date:
sys_log.info("[Sales Analysis] 👋 首次進入頁面,等待用戶選擇篩選條件")
preview_cache_key = "sales_analysis:page_context:" + _sales_analysis_args_fingerprint(
table_name,
'preview',
SYSTEM_VERSION,
)
cached_preview_context = (
_get_sales_page_context_cache(preview_cache_key)
or _get_sales_shared_page_context_cache(preview_cache_key)
)
if cached_preview_context:
_set_sales_page_context_cache(preview_cache_key, cached_preview_context)
return render_template('sales_analysis.html', **cached_preview_context)
preview_options = _preview_sales_filter_options(db.engine, table_name)
preview_categories = preview_options['categories']
preview_brands = preview_options['brands']
preview_vendors = preview_options['vendors']
preview_activities = preview_options['activities']
preview_payments = preview_options['payments']
preview_months = preview_options['months']
# 傳遞必要的變數以避免模板錯誤
selected_metric = request.args.get('metric', 'amount')
# 建立空的數據結構
empty_data = {'labels': [], 'chart_values': [], 'values': [], 'metric_label': ''}
preview_context = {
'no_filter': True,
'table_name': table_name,
'selected_metric': selected_metric,
'total_records': 0,
'items': [],
'kpi': {'revenue': 0, 'qty': 0, 'count': 0, 'cost': 0, 'gross_margin': 0, 'gross_margin_rate': 0, 'avg_price': 0},
'insights': {},
'abc_stats': {},
'vendor_stats': [],
'seasonality_data': {'datasets': [], 'yLabels': [], 'xLabels': []},
'bar_data': empty_data,
'cat_data': empty_data,
'category_data': empty_data,
'price_dist_data': empty_data,
'scatter_data': [],
'bcg_data': [],
'dow_data': empty_data,
'hourly_data': empty_data,
'monthly_data': empty_data,
'weekly_data': empty_data,
'heatmap_data': [],
'treemap_data': [],
'cols': {'name': True, 'amount': True, 'qty': True, 'cat': True, 'date': True,
'cost': True, 'profit': True, 'vendor': True, 'brand': True,
'return_qty': True, 'pid': True},
'all_categories': preview_categories,
'all_brands': preview_brands,
'all_vendors': preview_vendors,
'all_activities': preview_activities,
'all_payments': preview_payments,
'all_months': preview_months,
'selected_category': 'all',
'selected_brand': 'all',
'selected_vendor': 'all',
'selected_activity': 'all',
'selected_payment': 'all',
'selected_dow': 'all',
'selected_hour': 'all',
'selected_month': 'all',
'keyword': '',
'min_price': '',
'max_price': '',
'min_margin': '',
'max_margin': '',
'data_range_months': 0,
'start_date': '',
'end_date': '',
'db_data_range': db_data_range,
'active_page': 'sales',
'marketing_data': None,
}
_set_sales_page_context_cache(preview_cache_key, preview_context)
_set_sales_shared_page_context_cache(preview_cache_key, preview_context)
return render_template('sales_analysis.html', **preview_context)
# 解析 data_range_months有篩選時才處理
data_range_months = int(data_range_param or '0')
# V-New: 如果有自訂日期區間,則優先使用
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
page_cache_key = "sales_analysis:page_context:" + _sales_analysis_args_fingerprint(
table_name,
cache_key,
SYSTEM_VERSION,
)
cached_context = (
_get_sales_page_context_cache(page_cache_key)
or _get_sales_shared_page_context_cache(page_cache_key)
)
if cached_context:
_set_sales_page_context_cache(page_cache_key, cached_context)
return render_template('sales_analysis.html', **cached_context)
# 2. 讀取與處理資料 (V-Opt: 使用二級快取機制 Raw -> Processed)
df = None
cols_map = {}
# A. 優先檢查是否已有處理好的快取 (最快)
if cache_key in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[cache_key]
df = cache_data['df']
cols_map = cache_data['cols']
# 恢復欄位變數
col_name = cols_map.get('name')
col_date = cols_map.get('date')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_activity = cols_map.get('activity')
col_payment = cols_map.get('payment')
col_pid = cols_map.get('pid') # V-New: 取得 PID 欄位
col_price = cols_map.get('price')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_return_qty = cols_map.get('return_qty')
cached_pie_data = cache_data.get('pie_data', {'labels': [], 'chart_values': []}) # V-Opt: 讀取圓餅圖快取
else:
# B. 若無處理後快取,則從 Raw Cache 或 DB 讀取並處理
# V-Opt: 加入日期範圍篩選以減少記憶體使用
# (data_range_months 已在上方定義)
# 先讀取小樣本以識別日期欄位
sample_df = pd.read_sql(f"SELECT * FROM {table_name} LIMIT 100", db.engine)
if sample_df.empty:
return render_template('sales_analysis.html',
error="資料表為空,請重新匯入。",
table_name=table_name,
selected_metric=request.args.get('metric', 'amount'),
no_filter=False,
data_range_months=data_range_months,
start_date=start_date,
end_date=end_date,
total_records=0,
db_data_range=db_data_range,
active_page='sales',
marketing_data=None)
# 自動識別日期欄位V-Fix: 優先匹配「日期」,因為「訂單日期」可能是固定文字)
sample_cols = sample_df.columns.tolist()
date_col_name = None
for col in sample_cols:
if any(keyword in str(col) for keyword in ['日期', '交易日期', 'Date', '訂單時間', '成立時間', '下單時間', '購買時間', '時間', 'Time', 'Created']):
date_col_name = col
break
# 根據是否有日期欄位決定查詢方式
if date_col_name:
from datetime import datetime, timedelta, timezone
TAIPEI_TZ = timezone(timedelta(hours=8))
# V-New: 優先處理自訂日期區間
if start_date or end_date:
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
start_date_slash = start_date.replace('-', '/') if start_date else ''
end_date_slash = end_date.replace('-', '/') if end_date else ''
# 有自訂日期區間 - 使用 BETWEEN 或單邊範圍
if start_date and end_date:
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" BETWEEN '{start_date_slash}' AND '{end_date_slash}'"
sys_log.info(f"[Sales Analysis] 📅 使用自訂日期範圍: {start_date} ~ {end_date} (DB格式: {start_date_slash} ~ {end_date_slash})")
elif start_date:
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" >= '{start_date_slash}'"
sys_log.info(f"[Sales Analysis] 📅 使用自訂開始日期: >= {start_date} (DB格式: {start_date_slash})")
else: # only end_date
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" <= '{end_date_slash}'"
sys_log.info(f"[Sales Analysis] 📅 使用自訂結束日期: <= {end_date} (DB格式: {end_date_slash})")
elif data_range_months > 0:
# 使用相對日期範圍最近N個月
# V-Fix: 使用斜線格式以匹配資料庫格式
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
sql_query = f"SELECT * FROM {table_name} WHERE \"{date_col_name}\" >= '{cutoff_date}'"
sys_log.info(f"[Sales Analysis] 📊 使用日期範圍篩選: 最近 {data_range_months} 個月 (>= {cutoff_date})")
else:
# data_range_months == 0載入全部資料
sql_query = f"SELECT * FROM {table_name}"
sys_log.info(f"[Sales Analysis] 📊 載入全部資料(用戶選擇)")
else:
# 無日期欄位 - 載入全部
sql_query = f"SELECT * FROM {table_name}"
sys_log.info(f"[Sales Analysis] ⚠️ 未找到日期欄位,載入全部資料")
# V-Opt (2026-01-23): 優先使用 PostgreSQL 聚合視圖 (mv_sales_summary)
# 聚合視圖已預先計算:資料量 -20%, 大小 -73%, 欄位類型已轉換
from sqlalchemy import text as sql_text
from config import DATABASE_TYPE
use_materialized_view = False
if DATABASE_TYPE == 'postgresql':
# 檢查聚合視圖是否存在
try:
with db.engine.connect() as conn:
check_mv = conn.execute(sql_text(
"SELECT EXISTS (SELECT 1 FROM pg_matviews WHERE matviewname = 'mv_sales_summary')"
)).fetchone()
use_materialized_view = check_mv[0] if check_mv else False
except:
use_materialized_view = False
if use_materialized_view:
# 使用聚合視圖 - 欄位已標準化為英文
sys_log.info(f"[Sales Analysis] 📊 使用 PostgreSQL 聚合視圖 (mv_sales_summary)")
# 構建日期篩選條件
mv_where = ""
if start_date or end_date:
if start_date and end_date:
mv_where = f"WHERE sale_date BETWEEN '{start_date}' AND '{end_date}'"
elif start_date:
mv_where = f"WHERE sale_date >= '{start_date}'"
else:
mv_where = f"WHERE sale_date <= '{end_date}'"
elif data_range_months > 0:
from datetime import datetime, timedelta, timezone
TAIPEI_TZ = timezone(timedelta(hours=8))
cutoff = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y-%m-%d')
mv_where = f"WHERE sale_date >= '{cutoff}'"
mv_query = f"""
SELECT
sale_date as "日期",
product_id as "商品ID",
product_name as "商品名稱",
category as "商品館",
brand as "品牌",
vendor_name as "廠商名稱",
payment as "付款",
total_revenue as "總業績",
total_qty as "數量",
total_cost as "總成本",
order_count
FROM mv_sales_summary
{mv_where}
"""
df = pd.read_sql(mv_query, db.engine)
sys_log.info(f"[Sales Analysis] 📊 聚合視圖載入完成: {len(df):,} 筆記錄")
else:
# 原始邏輯:使用原始表
sys_log.info(f"[Sales Analysis] 📊 使用原始表載入...")
df = pd.read_sql(sql_query, db.engine)
sys_log.info(f"[Sales Analysis] 📊 載入完成: {len(df):,} 筆記錄")
# 聚合模式標記
is_aggregated_mode = use_materialized_view
# V-Opt: 不再快取完整 DataFrame 到 _SALES_DF_CACHE (避免記憶體累積)
# 改用輕量級處理後快取 (_SALES_PROCESSED_CACHE)
if df.empty:
return render_template('sales_analysis.html',
error="資料表為空,請重新匯入。",
table_name=table_name,
selected_metric=request.args.get('metric', 'amount'),
no_filter=False,
data_range_months=data_range_months,
start_date=start_date,
end_date=end_date,
total_records=0,
db_data_range=db_data_range,
active_page='sales',
marketing_data=None)
# 3. 自動識別關鍵欄位 (模糊比對)
cols = df.columns.tolist()
def find_col(keywords):
# V-Opt: 改為優先遍歷關鍵字,確保優先匹配更精確的名稱 (例如 '廠商名稱' 優於 '廠商')
for k in keywords:
for col in cols:
if k in str(col): return col
return None
col_name = find_col(['商品名稱', '品名', 'Name', 'Product'])
col_pid = find_col(['商品ID', 'Product ID', 'ID', 'i_code', 'Item Code']) # V-New: 偵測商品ID欄位
# V-Fix: 優先匹配「日期」欄位(「訂單日期」是固定文字,不是實際日期)
col_date_part = find_col(['日期', '交易日期', 'Date', 'Day'])
col_time_part = find_col(['訂單時間', '成立時間', '下單時間', '購買時間', '時間', 'Time', 'Created'])
col_brand = find_col(['品牌', 'Brand']) # V-New: 品牌欄位
col_vendor = find_col(['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor', 'Supplier']) # V-Opt: 優先抓取名稱
col_activity = find_col(['活動', '折扣', 'Activity', 'Campaign', 'Promotion', '專案']) # V-New: 活動欄位
col_payment = find_col(['付款方式', 'Payment', 'Pay']) # V-New: 付款方式欄位
col_price = find_col(['單價', 'Price', '價格', 'Avg Price']) # V-New: 嘗試尋找單價欄位
col_cost = find_col(['成本', 'Cost', '進價', 'Cost Price', 'Wholesale']) # V-New: 成本欄位
col_profit = find_col(['毛利', 'Profit', '利潤']) # V-New: 直接尋找毛利欄位 (若有)
col_return_qty = find_col(['退貨數量', 'Return Qty', '退貨']) # V-New: 退貨欄位
col_amount = find_col(['銷售金額', '業績', '金額', 'Amount', 'Sales', 'Total'])
col_qty = find_col(['銷售數量', '銷量', '數量', 'Qty', 'Quantity'])
col_category = find_col(['館別', '分類', 'Category'])
if not col_name or not col_amount:
return render_template('sales_analysis.html',
error=f"無法自動識別關鍵欄位 (需包含 '名稱''金額')。偵測到的欄位: {cols}",
table_name=table_name,
selected_metric=request.args.get('metric', 'amount'),
no_filter=False,
data_range_months=data_range_months,
start_date=start_date,
end_date=end_date,
total_records=0,
db_data_range=db_data_range,
active_page='sales',
marketing_data=None)
# 4. 資料處理 (Heavy Lifting - 只在快取建立時執行一次)
# 確保金額與數量是數字
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
if col_qty:
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
if col_cost:
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
if col_profit:
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
if col_return_qty:
df[col_return_qty] = pd.to_numeric(df[col_return_qty], errors='coerce').fillna(0)
# V-Fix: 智慧日期時間合併邏輯 (聚合模式下跳過)
col_date = None
if not is_aggregated_mode:
if col_date_part and col_time_part:
# 兩者都有,嘗試合併
try:
df['combined_dt'] = pd.to_datetime(df[col_date_part].astype(str) + ' ' + df[col_time_part].astype(str), errors='coerce')
col_date = 'combined_dt'
except:
# 合併失敗,退回使用時間欄位 (假設包含日期) 或日期欄位
col_date = col_time_part or col_date_part
elif col_time_part:
# 只有時間欄位 (可能包含日期)
df[col_time_part] = pd.to_datetime(df[col_time_part], errors='coerce')
col_date = col_time_part
elif col_date_part:
# 只有日期欄位
df[col_date_part] = pd.to_datetime(df[col_date_part], errors='coerce')
col_date = col_date_part
# V-New: 若無明確單價欄位,則自動計算 (金額 / 數量)
if not col_price and col_amount and col_qty:
col_price = 'calculated_price'
# V-Opt: 使用 numpy 向量化運算加速 (取代 apply)
df[col_price] = np.where(df[col_qty] > 0, df[col_amount] / df[col_qty], 0)
if col_price:
df[col_price] = pd.to_numeric(df[col_price], errors='coerce').fillna(0)
# V-New: 預先計算毛利率 (Margin Rate) 用於篩選
# 邏輯: (毛利 / 金額) * 100
col_margin_rate = 'calculated_margin_rate'
with np.errstate(divide='ignore', invalid='ignore'):
if col_profit:
df[col_margin_rate] = (df[col_profit] / df[col_amount]) * 100
elif col_cost:
df[col_margin_rate] = ((df[col_amount] - df[col_cost]) / df[col_amount]) * 100
else:
df[col_margin_rate] = 0.0
# 處理無限大與 NaN (轉為 0)
df[col_margin_rate] = df[col_margin_rate].replace([np.inf, -np.inf, np.nan], 0)
# === V-Opt: 效能優化預計算 (V9.98) ===
# 1. 日期維度 (加速篩選與聚合,避免重複呼叫 .dt 存取器)
# 聚合模式下跳過日期維度計算
if col_date and not is_aggregated_mode:
df['_dow'] = df[col_date].dt.dayofweek
df['_hour'] = df[col_date].dt.hour
df['_week'] = df[col_date].dt.strftime('%G-W%V')
df['_month_str'] = df[col_date].dt.strftime('%Y-%m') # V-New: 月份維度 (YYYY-MM)
# 2. 毛利額 (加速 Top 3 分析,避免 runtime 計算)
if col_profit:
df['calculated_profit'] = df[col_profit]
elif col_cost:
df['calculated_profit'] = df[col_amount] - df[col_cost]
else:
df['calculated_profit'] = 0.0
# 3. 全站分類圓餅圖 (已移至下方使用 target_df 計算)
# 建立/更新處理後快取
cache_entry = {
'df': df,
'cols': {
'name': col_name, 'date': col_date, 'amount': col_amount,
'qty': col_qty, 'category': col_category, 'brand': col_brand,
'vendor': col_vendor, 'activity': col_activity, 'payment': col_payment,
'price': col_price, 'cost': col_cost, 'profit': col_profit,
'return_qty': col_return_qty,
'pid': col_pid # V-New: 儲存商品ID欄位
},
'pid': col_pid, # V-New: 儲存商品ID欄位
'time': time.time()
}
set_sales_processed_cache(cache_key, cache_entry, aliases=(table_name,))
# 🚩 V-Opt: 使用共用篩選函式
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
if err:
# V-Fix: 若快取失效,重新導向自己以觸發重新讀取(保留所有查詢參數)
params = {k: v for k, v in request.args.items()}
return redirect(url_for('sales.sales_analysis', **params))
# 重新取得變數 (因為 _get_filtered_sales_data 內部使用了 cols_map)
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_activity = cols_map.get('activity')
col_payment = cols_map.get('payment')
col_price = cols_map.get('price')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_return_qty = cols_map.get('return_qty')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
all_categories = []
all_brands = []
all_vendors = []
all_activities = []
all_payments = []
all_months = []
try:
filter_options = _get_sales_filter_options(db.engine, table_name, cols_map)
all_categories = filter_options['categories']
all_brands = filter_options['brands']
all_vendors = filter_options['vendors']
all_activities = filter_options['activities']
all_payments = filter_options['payments']
all_months = filter_options['months']
except Exception as e:
sys_log.warning(f"[Sales Analysis] 從數據庫查詢下拉選項失敗: {e}")
# 如果查詢失敗,回退到從快取讀取
if cache_key in _SALES_PROCESSED_CACHE:
original_df = _SALES_PROCESSED_CACHE[cache_key]['df']
elif table_name in _SALES_PROCESSED_CACHE:
original_df = _SALES_PROCESSED_CACHE[table_name]['df']
else:
original_df = pd.DataFrame()
if not original_df.empty:
all_categories = sorted(original_df[col_category].dropna().astype(str).unique().tolist()) if col_category else []
all_brands = sorted(original_df[col_brand].dropna().astype(str).unique().tolist()) if col_brand else []
all_vendors = sorted(original_df[col_vendor].dropna().astype(str).unique().tolist()) if col_vendor else []
all_activities = sorted(original_df[col_activity].dropna().astype(str).unique().tolist()) if col_activity else []
all_payments = sorted(original_df[col_payment].dropna().astype(str).unique().tolist()) if col_payment else []
all_months = sorted(original_df['_month_str'].dropna().unique().tolist()) if col_date and '_month_str' in original_df.columns else []
# 取得前端參數供模板回填
selected_category = request.args.get('category', 'all')
selected_metric = request.args.get('metric', 'amount')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
selected_activity = request.args.get('activity', 'all')
selected_payment = request.args.get('payment', 'all')
selected_dow = request.args.get('dow', 'all')
selected_hour = request.args.get('hour', 'all')
selected_month = request.args.get('month', 'all')
keyword = request.args.get('keyword', '').strip()
min_price = request.args.get('min_price', '')
max_price = request.args.get('max_price', '')
min_margin = request.args.get('min_margin', '')
max_margin = request.args.get('max_margin', '')
# 決定排序欄位
sort_col = col_amount
if selected_metric == 'qty' and col_qty:
sort_col = col_qty
target_df = target_df.sort_values(by=sort_col, ascending=False)
# 📊 KPI 計算 (針對篩選後的資料)
total_revenue = float(target_df[col_amount].sum())
total_qty = float(target_df[col_qty].sum()) if col_qty else 0
total_count = int(len(target_df)) # 訂單筆數
# V-Fix 2026-01-15: SKU 數應計算唯一商品數,而非記錄筆數
sku_count = int(target_df[col_name].nunique()) if col_name else total_count
# V-New: 成本與毛利計算
total_cost = float(target_df[col_cost].sum()) if col_cost else 0
if col_profit:
gross_margin = float(target_df[col_profit].sum())
else:
gross_margin = total_revenue - total_cost
gross_margin_rate = (gross_margin / total_revenue * 100) if total_revenue > 0 else 0
avg_price = total_revenue / total_qty if total_qty > 0 else 0
# 📊 V-New: 商業洞察 (Top 3 Analysis)
insights = {
'rev_cats': [], 'rev_prods': [],
'margin_cats': [], 'margin_prods': [],
'qty_cats': [], 'qty_prods': []
}
# Helper function to get top 3
# Helper function to get top 3
def get_top_3(groupby_col, metric_col, is_margin=False, is_qty=False):
if not groupby_col or not metric_col: return []
# V-Opt: 直接使用 target_df 與預計算欄位,避免 copy() 與 assign()
target_metric = metric_col
if is_margin:
target_metric = 'calculated_profit'
try:
# 直接聚合並取前3名
# V-Fix 2026-01-15: 若 groupby_col 是 list (例如 [PID, Name]),結果 index 會是 MultiIndex
grouped = target_df.groupby(groupby_col)[target_metric].sum()
def get_name(k):
# 如果是 Tuple (MultiIndex),通常最後一個是 Name取之
return str(k[-1]) if isinstance(k, tuple) else str(k)
return [{'name': get_name(k), 'value': float(v)} for k, v in grouped.nlargest(3).items() if v > 0]
except Exception:
return []
insights['rev_cats'] = get_top_3(col_category, col_amount)
# V-Fix: 商品聚合改用 [PID, Name] 避免同名不同ID商品被合併
product_groupby = [col_pid, col_name] if col_pid else col_name
insights['rev_prods'] = get_top_3(product_groupby, col_amount)
insights['qty_cats'] = get_top_3(col_category, col_qty, is_qty=True)
insights['qty_prods'] = get_top_3(product_groupby, col_qty, is_qty=True)
if col_cost or col_profit:
insights['margin_cats'] = get_top_3(col_category, col_amount, is_margin=True)
insights['margin_prods'] = get_top_3(product_groupby, col_amount, is_margin=True)
# 📊 V-Opt: 改為橫向長條圖數據 (Top 20)
top_chart = target_df.head(20)
bar_data = {
'labels': [str(n)[:20] + '...' if len(str(n)) > 20 else str(n) for n in top_chart[col_name]], # 稍微放寬長度限制
'chart_values': [float(x) for x in top_chart[sort_col]],
'metric_label': '銷售金額 ($)' if selected_metric == 'amount' else '銷售數量'
}
# 📋 V-Opt: 列表資料改為 AJAX 載入,這裡只傳空列表以加快初始渲染
table_items = []
# 準備類別圓餅圖資料
# V-Fix: 使用 target_df (篩選後資料) 動態計算
cat_data = {'labels': [], 'chart_values': []}
if col_category and not target_df.empty:
cat_group_all = target_df.groupby(col_category)[col_amount].sum().sort_values(ascending=False)
TOP_N_CATS = 12
if len(cat_group_all) > TOP_N_CATS:
top_cats = cat_group_all.head(TOP_N_CATS)
other_val = cat_group_all.iloc[TOP_N_CATS:].sum()
cat_data['labels'] = [str(x) for x in top_cats.index.tolist()] + ['其他']
cat_data['chart_values'] = [float(x) for x in top_cats.tolist()] + [float(other_val)]
else:
cat_data['labels'] = [str(x) for x in cat_group_all.index.tolist()]
cat_data['chart_values'] = [float(x) for x in cat_group_all.tolist()]
# 📊 V-New: 價格帶分析 (Price Range Analysis)
price_dist_data = {'labels': [], 'chart_values': []}
if col_price and not target_df.empty:
# 定義價格區間 (0-500, 500-1000, 1000-2000, 2000-5000, 5000-10000, 10000+)
bins = [0, 500, 1000, 2000, 5000, 10000, float('inf')]
labels = ['0-499', '500-999', '1,000-1,999', '2,000-4,999', '5,000-9,999', '10,000+']
# V-Opt: 使用 pd.cut 進行分組,但不修改 target_df (避免污染快取)
# right=False 表示包含左邊界,例如 500 在 500-999 這一組
price_bins = pd.cut(target_df[col_price], bins=bins, labels=labels, right=False)
# 統計各區間的「銷售金額」貢獻 (直接使用外部 Series 進行 groupby)
range_group = target_df.groupby(price_bins, observed=False)[col_amount].sum()
price_dist_data['labels'] = labels
price_dist_data['chart_values'] = [float(range_group.get(l, 0)) for l in labels]
# 📊 V-New: 價格 vs 銷量 散佈圖 (Scatter Plot)
scatter_data = []
if col_price and col_qty and not target_df.empty:
# 取前 300 筆主要商品,避免圖表過於密集導致瀏覽器卡頓
scatter_source = target_df.head(300)
for _, row in scatter_source.iterrows():
# V-Fix (2026-01-23): 處理 NaN 值
price_val = row[col_price] if pd.notna(row[col_price]) else 0
qty_val = row[col_qty] if pd.notna(row[col_qty]) else 0
amt_val = row[col_amount] if pd.notna(row[col_amount]) else 0
scatter_data.append({
'x': float(price_val),
'y': float(qty_val),
'name': str(row[col_name]) if pd.notna(row[col_name]) else '',
'amt': float(amt_val) # 用於 tooltip 顯示金額
})
# 📊 V-New: BCG 矩陣分析 (BCG Matrix)
# X軸: 銷量 (Qty), Y軸: 毛利率 (Margin %)
bcg_data = {'datasets': [], 'thresholds': {'x': 0, 'y': 0}}
# V-Fix: 確保 calculated_margin_rate 欄位存在
if col_qty and (col_cost or col_profit) and not target_df.empty and 'calculated_margin_rate' in target_df.columns:
# 1. 計算閾值 (使用中位數,避免極端值影響)
# 過濾掉銷量為 0 的商品,避免干擾閾值計算
active_products = target_df[target_df[col_qty] > 0]
if not active_products.empty and 'calculated_margin_rate' in active_products.columns:
median_qty = active_products[col_qty].median()
median_margin = active_products['calculated_margin_rate'].median()
# 若中位數為 0 (例如大部分商品沒銷量),則給一個預設值以利顯示
if median_qty == 0: median_qty = 1
bcg_data['thresholds'] = {'x': float(median_qty), 'y': float(median_margin)}
# 2. 分類商品 (四象限)
# Stars (明星): High Qty, High Margin
stars = active_products[(active_products[col_qty] >= median_qty) & (active_products['calculated_margin_rate'] >= median_margin)]
# Cows (金牛): High Qty, Low Margin
cows = active_products[(active_products[col_qty] >= median_qty) & (active_products['calculated_margin_rate'] < median_margin)]
# Questions (問題): Low Qty, High Margin
questions = active_products[(active_products[col_qty] < median_qty) & (active_products['calculated_margin_rate'] >= median_margin)]
# Dogs (瘦狗): Low Qty, Low Margin
dogs = active_products[(active_products[col_qty] < median_qty) & (active_products['calculated_margin_rate'] < median_margin)]
def format_bcg_points(df_segment):
# 限制點數,避免前端卡頓 (各象限最多 100 點)
return [{'x': float(row[col_qty]), 'y': float(row['calculated_margin_rate']), 'name': str(row[col_name]), 'amt': float(row[col_amount])} for _, row in df_segment.head(100).iterrows()]
bcg_data['datasets'] = [
{'label': '明星商品 (Stars)', 'data': format_bcg_points(stars), 'backgroundColor': 'rgba(255, 206, 86, 0.8)', 'borderColor': 'rgba(255, 206, 86, 1)'}, # Yellow
{'label': '金牛商品 (Cows)', 'data': format_bcg_points(cows), 'backgroundColor': 'rgba(75, 192, 192, 0.8)', 'borderColor': 'rgba(75, 192, 192, 1)'}, # Green
{'label': '問題商品 (Questions)', 'data': format_bcg_points(questions), 'backgroundColor': 'rgba(54, 162, 235, 0.8)', 'borderColor': 'rgba(54, 162, 235, 1)'}, # Blue
{'label': '瘦狗商品 (Dogs)', 'data': format_bcg_points(dogs), 'backgroundColor': 'rgba(201, 203, 207, 0.8)', 'borderColor': 'rgba(201, 203, 207, 1)'} # Grey
]
# 📊 V-New: 時間維度分析 (Time Analysis)
dow_data = {'labels': ['週一', '週二', '週三', '週四', '週五', '週六', '週日'], 'chart_values': [0]*7}
hourly_data = {'labels': [f"{i:02d}:00" for i in range(24)], 'chart_values': [0]*24}
weekly_data = {'labels': [], 'chart_values': []} # V-New: 每週趨勢
monthly_data = {'labels': [], 'chart_values': []} # V-New: 每月趨勢
heatmap_data = [] # V-New: 多維度熱力圖 (Day x Hour)
treemap_data = [] # V-New: 板塊圖數據
if col_date:
# 過濾掉日期無效的資料
# V-Opt: 使用預計算欄位進行分組,速度更快
if not target_df.empty:
# 1. 星期分析 (Day of Week)
dow_group = target_df.groupby('_dow')[col_amount].sum()
for day, val in dow_group.items():
if not np.isnan(day):
dow_data['chart_values'][int(day)] = float(val)
# 2. 小時分析 (Hourly)
hour_group = target_df.groupby('_hour')[col_amount].sum()
for hour, val in hour_group.items():
if not np.isnan(hour):
hourly_data['chart_values'][int(hour)] = float(val)
# 3. 每月趨勢 (Monthly Trend) - V-New
month_group = target_df.groupby('_month_str')[col_amount].sum().sort_index()
monthly_data['labels'] = month_group.index.tolist()
# V-Fix (2026-01-23): 處理 NaN 值避免 JSON 序列化失敗
monthly_data['chart_values'] = [float(x) if not np.isnan(x) else 0 for x in month_group.tolist()]
# 3. 每週趨勢 (Weekly Trend) - V-New
week_group = target_df.groupby('_week')[col_amount].sum().sort_index()
# V-Opt: 解除 12 週限制,顯示完整年度趨勢 (因應一年份數據需求)
weekly_data['labels'] = week_group.index.tolist()
# V-Fix (2026-01-23): 處理 NaN 值避免 JSON 序列化失敗
weekly_data['chart_values'] = [float(x) if not np.isnan(x) else 0 for x in week_group.tolist()]
# 4. 多維度熱力圖 (Day x Hour) - V-Fix: 確保數據完整性
dh_group = target_df.groupby(['_dow', '_hour'])[col_amount].sum()
# V-Opt: 正規化氣泡大小 (Normalize Bubble Size) 以提升可讀性
max_val = dh_group.max() if not dh_group.empty else 1
for (day, hour), val in dh_group.items():
# V-Fix (2026-01-23): 處理 NaN 值
if np.isnan(val):
val = 0
# 將數值映射到 3~25px 的半徑範圍,確保視覺可辨識
radius = 3 + (math.sqrt(val) / math.sqrt(max_val)) * 22 if val > 0 else 0
heatmap_data.append({
'x': int(hour), # X軸: 小時 (0-23)
'y': int(day), # Y軸: 星期 (0-6)
'r': float(radius) if not np.isnan(radius) else 0, # V-Adj: 正規化後半徑
'v': float(val) # 實際數值 (用於 Tooltip)
})
# 📊 V-New: 板塊圖 (Treemap) 數據準備
# 結構: Root -> Category -> Product (Top 5 per cat)
if col_category and col_name and col_amount and not target_df.empty:
# V-Opt: 優化聚合邏輯,先聚合再篩選,避免在迴圈中重複過濾大表
# 1. 先聚合 Category + Product (大幅減少資料量)
cat_prod_group = target_df.groupby([col_category, col_name])[col_amount].sum().reset_index()
# 2. 找出前 10 大分類
top_cats = cat_prod_group.groupby(col_category)[col_amount].sum().nlargest(10).index.tolist()
# 3. 針對前 10 大分類,各取前 5 大商品
for cat in top_cats:
if not cat: continue
# 在縮減後的資料中篩選,速度極快
cat_subset = cat_prod_group[cat_prod_group[col_category] == cat]
top_prods = cat_subset.nlargest(5, col_amount)
for _, row in top_prods.iterrows():
# V-Fix (2026-01-23): 處理 NaN 值
amount_val = row[col_amount]
if pd.isna(amount_val):
amount_val = 0
treemap_data.append({
'category': str(cat),
'product': str(row[col_name]) if pd.notna(row[col_name]) else '',
'value': float(amount_val),
'color': get_color_for_string(str(cat)) # V-Fix: 增加顏色參數,確保與分類顏色一致且清晰
})
# 📊 V-New: ABC 分析 (Pareto Analysis) - TODO #8
# A類: 累積營收 0-80% (核心商品)
# B類: 累積營收 80-95% (次要商品)
# C類: 累積營收 95-100% (長尾商品)
abc_stats = {'A': {'count': 0, 'revenue': 0, 'pct_rev': 0, 'pct_sku': 0},
'B': {'count': 0, 'revenue': 0, 'pct_rev': 0, 'pct_sku': 0},
'C': {'count': 0, 'revenue': 0, 'pct_rev': 0, 'pct_sku': 0}}
if not target_df.empty and col_amount:
# 使用 numpy 加速累積計算
sorted_rev = target_df[col_amount].values # 已在上方排序過
cumsum_rev = np.cumsum(sorted_rev)
total_rev_abc = cumsum_rev[-1] if len(cumsum_rev) > 0 else 0
if total_rev_abc > 0:
pct_cumsum = cumsum_rev / total_rev_abc * 100
# 找出分界點索引
idx_a = np.searchsorted(pct_cumsum, 80)
idx_b = np.searchsorted(pct_cumsum, 95)
# A類: 0 ~ idx_a
count_a = idx_a + 1
rev_a = cumsum_rev[idx_a] if idx_a < len(cumsum_rev) else total_rev_abc
# B類: idx_a+1 ~ idx_b
count_b = max(0, idx_b - idx_a)
rev_b = (cumsum_rev[idx_b] - cumsum_rev[idx_a]) if idx_b < len(cumsum_rev) else (total_rev_abc - cumsum_rev[idx_a])
# C類: idx_b+1 ~ end
count_c = max(0, len(cumsum_rev) - 1 - idx_b)
rev_c = total_rev_abc - cumsum_rev[idx_b] if idx_b < len(cumsum_rev) else 0
abc_stats['A'] = {'count': int(count_a), 'revenue': float(rev_a), 'pct_rev': float(rev_a/total_rev_abc*100), 'pct_sku': float(count_a/total_count*100)}
abc_stats['B'] = {'count': int(count_b), 'revenue': float(rev_b), 'pct_rev': float(rev_b/total_rev_abc*100), 'pct_sku': float(count_b/total_count*100)}
abc_stats['C'] = {'count': int(count_c), 'revenue': float(rev_c), 'pct_rev': float(rev_c/total_rev_abc*100), 'pct_sku': float(count_c/total_count*100)}
# 📊 V-New: 廠商獲利能力排行 (Vendor Profitability) - TODO #9
vendor_stats = []
if col_vendor and col_amount and not target_df.empty:
# Group by vendor
agg_dict = {col_amount: 'sum', col_name: 'nunique'} # nunique 計算不重複商品數 (SKU)
if col_qty: agg_dict[col_qty] = 'sum' # V-New: 累加銷量
if col_profit:
agg_dict[col_profit] = 'sum'
elif col_cost:
agg_dict[col_cost] = 'sum'
# 使用 groupby 聚合
vendor_group = target_df.groupby(col_vendor).agg(agg_dict).reset_index()
# 計算毛利與毛利率
if col_profit:
vendor_group['total_profit'] = vendor_group[col_profit]
elif col_cost:
vendor_group['total_profit'] = vendor_group[col_amount] - vendor_group[col_cost]
else:
vendor_group['total_profit'] = 0
# 計算營收佔比 (Share %)
total_vendor_revenue = vendor_group[col_amount].sum()
if total_vendor_revenue > 0:
vendor_group['revenue_share'] = (vendor_group[col_amount] / total_vendor_revenue * 100)
else:
vendor_group['revenue_share'] = 0.0
# 避免除以零
vendor_group['margin_rate'] = np.where(vendor_group[col_amount] > 0, (vendor_group['total_profit'] / vendor_group[col_amount] * 100), 0)
# 計算平均客單價 (ASP)
if col_qty:
vendor_group['asp'] = np.where(vendor_group[col_qty] > 0, vendor_group[col_amount] / vendor_group[col_qty], 0)
# 排序:預設按總業績降序
vendor_group = vendor_group.sort_values(by=col_amount, ascending=False)
# 格式化輸出 (Top 100)
for _, row in vendor_group.head(100).iterrows():
vendor_stats.append({
'name': str(row[col_vendor]),
'revenue': float(row[col_amount]),
'share': float(row['revenue_share']), # V-New
'qty': float(row[col_qty]) if col_qty else 0, # V-New
'asp': float(row.get('asp', 0)), # V-New
'profit': float(row['total_profit']),
'margin_rate': float(row['margin_rate']),
'sku_count': int(row[col_name])
})
# 📊 V-New: 淡旺季熱力圖 (Seasonality Analysis) - TODO #10
seasonality_data = None
if col_date and col_category and col_amount and not target_df.empty:
# 1. 取得前 10 大分類 (避免圖表過大)
# 使用 target_df (受篩選影響),這樣可以看特定品牌下的分類季節性
top_cats_season = target_df.groupby(col_category)[col_amount].sum().nlargest(10).index.tolist()
# 2. 聚合數據 (Month x Category)
season_group = target_df[target_df[col_category].isin(top_cats_season)].groupby(['_month_str', col_category])[col_amount].sum().reset_index()
# 3. 轉換為 Bubble Chart 格式
# X軸: 月份 (需解析 _month_str 取得順序)
# Y軸: 分類 (使用 top_cats_season 的索引)
# 取得所有月份並排序
all_months_sorted = sorted(target_df['_month_str'].unique())
month_map = {m: i for i, m in enumerate(all_months_sorted)}
cat_map = {c: i for i, c in enumerate(top_cats_season)}
points = []
max_val_season = season_group[col_amount].max() if not season_group.empty else 1
for _, row in season_group.iterrows():
m_str = row['_month_str']
cat = row[col_category]
val = row[col_amount]
if m_str in month_map and cat in cat_map:
# 正規化大小 (3~25px)
radius = 3 + (math.sqrt(val) / math.sqrt(max_val_season)) * 25 if val > 0 else 0
points.append({
'x': month_map[m_str],
'y': cat_map[cat],
'r': radius,
'v': float(val),
'm': m_str,
'c': cat
})
seasonality_data = {
'datasets': [{
'label': '淡旺季熱點',
'data': points,
# 顏色將在前端動態生成
}],
'yLabels': top_cats_season,
'xLabels': all_months_sorted
}
# 📊 V-New 2026-01-15: 行銷活動業績貢獻 (Marketing Campaign Contribution)
marketing_data = None
if not target_df.empty:
marketing_data = prepare_marketing_summary(target_df, sort_by=selected_metric)
context = {
'marketing_data': marketing_data,
'items': table_items,
'kpi': {
'revenue': total_revenue,
'qty': total_qty,
'count': total_count,
'sku_count': sku_count,
'cost': total_cost,
'gross_margin': gross_margin,
'gross_margin_rate': gross_margin_rate,
'avg_price': avg_price,
},
'insights': insights,
'abc_stats': abc_stats,
'vendor_stats': vendor_stats,
'seasonality_data': seasonality_data,
'bar_data': bar_data,
'cat_data': cat_data,
'category_data': cat_data,
'price_dist_data': price_dist_data,
'scatter_data': scatter_data,
'bcg_data': bcg_data,
'dow_data': dow_data,
'hourly_data': hourly_data,
'monthly_data': monthly_data,
'weekly_data': weekly_data,
'heatmap_data': heatmap_data,
'treemap_data': treemap_data,
'all_categories': all_categories,
'all_brands': all_brands,
'all_vendors': all_vendors,
'all_activities': all_activities,
'all_payments': all_payments,
'all_months': all_months,
'selected_category': selected_category,
'selected_brand': selected_brand,
'selected_vendor': selected_vendor,
'selected_activity': selected_activity,
'selected_payment': selected_payment,
'selected_dow': selected_dow,
'selected_hour': selected_hour,
'selected_month': selected_month,
'selected_metric': selected_metric,
'keyword': keyword,
'min_price': min_price,
'max_price': max_price,
'min_margin': min_margin,
'max_margin': max_margin,
'cols': {
'name': col_name,
'amount': col_amount,
'qty': col_qty,
'cat': col_category,
'date': col_date,
'cost': col_cost,
'profit': col_profit,
'vendor': col_vendor,
'brand': col_brand,
'return_qty': col_return_qty,
'pid': col_pid,
},
'table_name': table_name,
'data_range_months': data_range_months,
'start_date': start_date,
'end_date': end_date,
'total_records': len(df),
'active_page': 'sales',
'db_data_range': db_data_range,
}
_set_sales_page_context_cache(page_cache_key, context)
_set_sales_shared_page_context_cache(page_cache_key, context)
return render_template('sales_analysis.html', **context)
except Exception as e:
sys_log.error(f"Sales Analysis Error: {e}")
import traceback
traceback.print_exc()
# 提供完整的變數以避免模板錯誤
return render_template('sales_analysis.html',
error=f"系統發生錯誤: {str(e)}",
marketing_data=None,
insights=None,
abc_stats=None,
vendor_stats=None,
seasonality_data=None,
scatter_data=None,
bcg_data=None,
dow_data=None,
hourly_data=None,
monthly_data=None,
weekly_data=None,
heatmap_data=None,
treemap_data=None,
all_categories=[],
all_brands=[], all_vendors=[], all_activities=[], all_payments=[],
all_months=[],
selected_category='all',
selected_brand='all', selected_vendor='all',
selected_activity='all', selected_payment='all',
selected_dow='all', selected_hour='all',
selected_month='all',
selected_metric=request.args.get('metric', 'amount'),
keyword='', min_price='', max_price='',
min_margin='', max_margin='',
cols={},
table_name='realtime_sales_monthly',
no_filter=False,
data_range_months=int(request.args.get('data_range', '0') or '0'),
start_date=request.args.get('start_date', ''),
end_date=request.args.get('end_date', ''),
total_records=0,
active_page='sales',
db_data_range='')
@sales_bp.route('/growth_analysis')
@login_required
def growth_analysis():
"""營運成長策略報表 (MoM, YoY, AOV, YTD) - 含快取優化"""
from services.cache_service import (
get_growth_cache, set_growth_cache, is_growth_cache_valid
)
def _render_growth_empty(message=None):
now_taipei = datetime.now(TAIPEI_TZ)
empty_chart_data, empty_kpi = _growth_empty_payload(now_taipei)
return render_template('growth_analysis.html',
chart_data=empty_chart_data,
kpi=empty_kpi,
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
active_page='growth',
cache_hit=False,
cache_age=0,
is_empty_state=True,
empty_message=message)
try:
start_time = time.time()
db = DatabaseManager()
table_name = 'realtime_sales_monthly'
inspector = inspect(db.engine)
if not inspector.has_table(table_name):
return _render_growth_empty("尚未匯入業績資料,請先完成數據匯入後再查看成長分析。")
source_fingerprint = _get_growth_source_fingerprint(db.engine, table_name)
# 檢查快取
if is_growth_cache_valid(source_fingerprint):
cache = get_growth_cache()
cache_age = int((datetime.now(TAIPEI_TZ) - cache['timestamp']).total_seconds())
sys_log.debug(f"[GrowthAnalysis] [Cache] 使用快取 | 快取年齡: {cache_age}")
now_taipei = datetime.now(TAIPEI_TZ)
chart_data = _attach_growth_competitor_intel(db.engine, cache['chart_data'])
return render_template('growth_analysis.html',
chart_data=chart_data,
kpi=cache['kpi'],
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
cache_hit=True,
active_page='growth',
cache_age=cache_age)
# 快取失效,重新計算
sys_log.debug("[GrowthAnalysis] [Cache] 快取失效,重新計算數據...")
try:
payload = _fetch_growth_payload_summary(db.engine)
except Exception as summary_error:
sys_log.warning(f"[GrowthAnalysis] 月結摘要聚合失敗,改走明細聚合: {summary_error}")
payload = None
if not payload:
try:
payload = _fetch_growth_payload_sql(db.engine, table_name)
except Exception as sql_error:
sys_log.warning(f"[GrowthAnalysis] SQL 聚合失敗,回退 pandas 路徑: {sql_error}")
payload = _fetch_growth_payload_pandas(db.engine, table_name)
if not payload:
return _render_growth_empty("業績資料目前沒有可分析的紀錄,請確認匯入檔案是否包含有效銷售資料。")
chart_data, kpi = payload
# 儲存快取
set_growth_cache(chart_data, kpi, source_fingerprint)
chart_data = _attach_growth_competitor_intel(db.engine, chart_data)
elapsed = time.time() - start_time
sys_log.debug(f"[GrowthAnalysis] [Cache] 數據計算完成 | 耗時: {elapsed:.3f}")
now_taipei = datetime.now(TAIPEI_TZ)
return render_template('growth_analysis.html',
chart_data=chart_data,
kpi=kpi,
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
active_page='growth',
cache_hit=False)
except Exception as e:
sys_log.error(f"Growth Analysis Error: {e}")
return f"系統錯誤: {e}"
@sales_bp.route('/abc_analysis/detail')
def abc_analysis_detail():
"""ABC 分析詳細報表頁面"""
try:
target_class = request.args.get('class', 'A') # 預設 A 類
table_name = 'realtime_sales_monthly'
# 1. 生成與主頁面一致的 cache_key
data_range_months = int(request.args.get('data_range', '0') or '0')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 2. 使用共用篩選函式取得資料
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
# V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
if err and table_name in _SALES_PROCESSED_CACHE:
target_df, cols_map, err = _get_filtered_sales_data(table_name)
if err:
return render_template(
'abc_analysis_detail.html',
loading_state=True,
info={'title': '數據準備中', 'desc': '業績分析快取正在重新載入。'},
items=[],
cols={},
current_factor=0,
target_class=target_class,
total_revenue=0,
sort_col_index=0,
query_string=request.query_string.decode(),
active_page='sales',
), 200
# 恢復欄位變數
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_price = cols_map.get('price')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
# 3. 執行 ABC 分類
items = []
total_revenue = 0
current_factor = 0.0
if col_amount and not target_df.empty:
# V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
agg_rules = {col_amount: 'sum'}
if col_qty: agg_rules[col_qty] = 'sum'
if col_cost: agg_rules[col_cost] = 'sum'
if col_profit: agg_rules[col_profit] = 'sum'
if col_category: agg_rules[col_category] = 'first'
if col_vendor: agg_rules[col_vendor] = 'first'
if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
# 重新計算聚合後的毛利率
if col_profit:
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['calculated_margin_rate'] = 0.0
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 執行 ABC 排序與計算
df_agg = df_agg.sort_values(by=col_amount, ascending=False)
df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
total_revenue = df_agg[col_amount].sum()
df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
choices = ['A', 'B']
df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
# 4. 篩選特定類別
class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
# V-New: 計算平均單價與庫存建議
if col_qty:
class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
# V-New: 處理動態補貨係數
custom_factor = request.args.get('factor')
current_factor = 0.0
if custom_factor:
try:
current_factor = float(custom_factor)
except:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
else:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
items = class_df.to_dict('records')
# 準備標題與描述
class_info = {
'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
}
info = class_info.get(target_class, {'title': f'{target_class}', 'desc': '', 'color': 'secondary'})
# 計算 DataTables 預設排序欄位 (銷售金額) 的索引
# 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
sort_col_index = 1 # Rank
if col_pid: sort_col_index += 1
sort_col_index += 1 # Name
if col_brand: sort_col_index += 1
if col_vendor: sort_col_index += 1
if col_category: sort_col_index += 1
if col_cost or col_profit: sort_col_index += 1
if col_qty: sort_col_index += 3
# 此時 sort_col_index 即為 Amount 欄位的索引
return render_template('abc_analysis_detail.html',
items=items,
info=info,
target_class=target_class,
current_factor=current_factor, # V-New: 傳遞當前係數
total_revenue=total_revenue,
sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
# 傳遞當前查詢參數以供匯出連結使用
query_string=request.query_string.decode(),
active_page='sales')
except Exception as e:
sys_log.error(f"ABC Detail Error: {e}")
return f"系統錯誤: {e}"
# ==========================================
# API 路由
# ==========================================
@sales_bp.route('/api/sales_analysis/table_data')
@login_required
def api_sales_table_data():
"""API: 取得業績分析的詳細列表資料 (Server-side AJAX) - 使用 SQL 聚合優化"""
try:
import hashlib
from datetime import datetime, timedelta, timezone
TAIPEI_TZ = timezone(timedelta(hours=8))
# V-Opt: 產生查詢快取 key (根據所有篩選條件)
cache_params = request.args.to_dict()
cache_key = hashlib.md5(str(sorted(cache_params.items())).encode(), usedforsecurity=False).hexdigest()
# V-Opt: 檢查快取
if cache_key in _TABLE_DATA_CACHE:
cached = _TABLE_DATA_CACHE[cache_key]
if time.time() - cached['time'] < _TABLE_DATA_CACHE_TTL:
sys_log.debug(f"[API] Table Data: 使用快取 (key={cache_key[:8]})")
return jsonify(cached['data'])
table_name = 'realtime_sales_monthly'
data_range_months = int(request.args.get('data_range', '1') or '1')
start_date = request.args.get('start_date', '') # V-New: 自訂開始日期
end_date = request.args.get('end_date', '') # V-New: 自訂結束日期
# V-Fix: 取得所有篩選參數
category_filter = request.args.get('category', 'all')
brand_filter = request.args.get('brand', 'all') # V-Fix: 品牌篩選
vendor_filter = request.args.get('vendor', 'all') # V-Fix: 廠商篩選
activity_filter = request.args.get('activity', 'all') # V-Fix: 活動篩選
payment_filter = request.args.get('payment', 'all') # V-Fix: 付款方式篩選
month_filter = request.args.get('month', 'all')
dow_filter = request.args.get('dow', 'all') # 星期篩選
hour_filter = request.args.get('hour', 'all') # 小時篩選
min_price_str = request.args.get('min_price', '')
max_price_str = request.args.get('max_price', '')
min_margin_str = request.args.get('min_margin', '')
max_margin_str = request.args.get('max_margin', '')
keyword = request.args.get('keyword', '').strip()
db = DatabaseManager()
# V-Fix: 從快取讀取欄位名稱對應,以支援不同的資料庫欄位名稱
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 嘗試從快取讀取欄位名稱
cols_map = {}
if cache_key in _SALES_PROCESSED_CACHE:
cols_map = _SALES_PROCESSED_CACHE[cache_key].get('cols', {})
elif table_name in _SALES_PROCESSED_CACHE: # V-Fix: 也嘗試使用固定 key
cols_map = _SALES_PROCESSED_CACHE[table_name].get('cols', {})
# 取得實際欄位名稱(如果快取中沒有,使用預設名稱)
# V-Fix (2026-01-23): 使用 or 確保不會得到 None 值
col_name = cols_map.get('name') or '商品名稱'
col_pid = cols_map.get('pid') or '商品ID'
col_brand = cols_map.get('brand') or '品牌'
col_vendor = cols_map.get('vendor') or '廠商名稱'
col_category = cols_map.get('category') or '商品館'
col_amount = cols_map.get('amount') or '總業績'
col_qty = cols_map.get('qty') or '數量'
col_cost = cols_map.get('cost') or '總成本'
col_profit = cols_map.get('profit') or '毛利'
col_return_qty = cols_map.get('return_qty') or '退貨數量'
# V-Opt: 使用純 SQL 聚合查詢,避免載入完整資料集
# 建立日期篩選條件
date_filter = ""
# V-New: 優先處理自訂日期區間
if start_date or end_date:
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
start_date_slash = start_date.replace('-', '/') if start_date else ''
end_date_slash = end_date.replace('-', '/') if end_date else ''
# V-Fix: 只使用「日期」欄位(「訂單日期」欄位是固定文字「訂單日期」,不是實際日期)
if start_date and end_date:
date_filter = f"""AND ("日期" BETWEEN '{start_date_slash}' AND '{end_date_slash}')"""
elif start_date:
date_filter = f"""AND ("日期" >= '{start_date_slash}')"""
else: # only end_date
date_filter = f"""AND ("日期" <= '{end_date_slash}')"""
elif data_range_months > 0:
# V-Fix: 使用斜線格式以匹配資料庫格式
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
# V-Fix: 只使用「日期」欄位進行篩選(「訂單日期」是固定文字,不是實際日期)
date_filter = f"""AND ("日期" >= '{cutoff_date}')"""
# V-Fix: 建立其他篩選條件
additional_filters = []
# 分類篩選
if category_filter and category_filter != 'all' and col_category:
additional_filters.append(f""""{col_category}" = '{category_filter}'""")
# V-Fix: 品牌篩選
if brand_filter and brand_filter != 'all' and col_brand:
additional_filters.append(f""""{col_brand}" = '{brand_filter}'""")
# V-Fix: 廠商篩選
if vendor_filter and vendor_filter != 'all' and col_vendor:
additional_filters.append(f""""{col_vendor}" = '{vendor_filter}'""")
# V-Fix: 活動篩選
col_activity = cols_map.get('activity')
if activity_filter and activity_filter != 'all' and col_activity:
additional_filters.append(f""""{col_activity}" = '{activity_filter}'""")
# V-Fix: 付款方式篩選
col_payment = cols_map.get('payment')
if payment_filter and payment_filter != 'all' and col_payment:
additional_filters.append(f""""{col_payment}" = '{payment_filter}'""")
# 月份篩選
if month_filter and month_filter != 'all':
# V-Fix: 月份格式例如 "2025-01",但資料庫可能使用斜線格式 "2025/01"
# 只使用「日期」欄位(「訂單日期」是固定文字,「時間」只包含時間)
month_filter_slash = month_filter.replace('-', '/') # "2025-01" -> "2025/01"
# 同時匹配橫線和斜線格式
additional_filters.append(f"""("日期" LIKE '{month_filter}%' OR "日期" LIKE '{month_filter_slash}%')""")
# 星期篩選 (需要從日期計算)
if dow_filter and dow_filter != 'all':
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite 兩種資料庫
# Pandas dt.dayofweek: 0=Monday, 6=Sunday
pandas_dow = int(dow_filter)
if DATABASE_TYPE == 'postgresql':
# PostgreSQL: EXTRACT(DOW FROM date) 0=Sunday, 6=Saturday
# Pandas 0(Mon) -> PostgreSQL 1(Mon), Pandas 6(Sun) -> PostgreSQL 0(Sun)
pg_dow = (pandas_dow + 1) % 7
# 日期格式可能是 2025/01/01需要轉換為 YYYY-MM-DD
additional_filters.append(f"""EXTRACT(DOW FROM TO_DATE(REPLACE("日期", '/', '-'), 'YYYY-MM-DD')) = {pg_dow}""")
else:
# SQLite: strftime('%w', date) 0=Sunday, 6=Saturday
sqlite_dow = str((pandas_dow + 1) % 7)
additional_filters.append(f"""strftime('%w', replace("日期", '/', '-')) = '{sqlite_dow}'""")
# 小時篩選 (需要從時間欄位提取)
if hour_filter and hour_filter != 'all':
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite 兩種資料庫
hour_val = int(hour_filter)
if DATABASE_TYPE == 'postgresql':
# PostgreSQL: 使用 SUBSTRING 或 CAST
additional_filters.append(f"""CAST(SUBSTRING("時間" FROM 1 FOR 2) AS INTEGER) = {hour_val}""")
else:
# SQLite: 使用 substr
additional_filters.append(f"""CAST(substr("時間", 1, 2) AS INTEGER) = {hour_val}""")
# 關鍵字篩選
if keyword:
keyword_escaped = keyword.replace("'", "''") # SQL 注入防護
keyword_conditions = []
if col_name:
keyword_conditions.append(f""""{col_name}" LIKE '%{keyword_escaped}%'""")
if col_pid:
keyword_conditions.append(f""""{col_pid}" LIKE '%{keyword_escaped}%'""")
if col_brand:
keyword_conditions.append(f""""{col_brand}" LIKE '%{keyword_escaped}%'""")
if col_vendor:
keyword_conditions.append(f""""{col_vendor}" LIKE '%{keyword_escaped}%'""")
if keyword_conditions:
additional_filters.append(f"({' OR '.join(keyword_conditions)})")
# V-New: 價格區間篩選 (Price Range)
if (min_price_str or max_price_str) and col_qty and col_amount:
# 假設單價 = 總業績 / 數量 (防止除以零)
price_cal_sql = f'CAST("{col_amount}" AS FLOAT) / NULLIF("{col_qty}", 0)'
if min_price_str:
additional_filters.append(f"{price_cal_sql} >= {float(min_price_str)}")
if max_price_str:
additional_filters.append(f"{price_cal_sql} <= {float(max_price_str)}")
# V-New: 毛利率區間篩選 (Margin Range)
if (min_margin_str or max_margin_str) and col_amount:
# 計算毛利額 SQL
if col_profit:
profit_cal_sql = f'"{col_profit}"'
elif col_cost:
profit_cal_sql = f'("{col_amount}" - "{col_cost}")'
else:
profit_cal_sql = "0"
# 計算毛利率 SQL: (毛利 / 業績) * 100
margin_cal_sql = f'({profit_cal_sql} * 100.0 / NULLIF("{col_amount}", 0))'
if min_margin_str:
additional_filters.append(f"{margin_cal_sql} >= {float(min_margin_str)}")
if max_margin_str:
additional_filters.append(f"{margin_cal_sql} <= {float(max_margin_str)}")
# 組合所有篩選條件
all_filters = date_filter
if additional_filters:
all_filters += " AND " + " AND ".join(additional_filters)
# SQL 聚合查詢 - 直接在資料庫層級完成聚合
# V-Fix: 使用動態欄位名稱
group_by_cols = []
if col_pid: group_by_cols.append(f'"{col_pid}"')
if col_name: group_by_cols.append(f'"{col_name}"')
if col_brand: group_by_cols.append(f'"{col_brand}"')
if col_vendor: group_by_cols.append(f'"{col_vendor}"')
if col_category: group_by_cols.append(f'"{col_category}"')
group_by_clause = ', '.join(group_by_cols) if group_by_cols else '"商品ID"'
sql_query = f"""
SELECT
{f'"{col_pid}" as product_id' if col_pid else "'未知' as product_id"},
{f'"{col_name}" as name' if col_name else "'未知' as name"},
{f'"{col_brand}" as brand' if col_brand else "'' as brand"},
{f'"{col_vendor}" as vendor' if col_vendor else "'' as vendor"},
{f'"{col_category}" as category' if col_category else "'' as category"},
{f'SUM(CAST("{col_amount}" AS REAL)) as amount' if col_amount else '0 as amount'},
{f'SUM(CAST("{col_qty}" AS REAL)) as qty' if col_qty else '0 as qty'},
{f'SUM(CAST("{col_cost}" AS REAL)) as cost' if col_cost else '0 as cost'},
{f'SUM(CAST("{col_return_qty}" AS REAL)) as return_qty' if col_return_qty else '0 as return_qty'},
COUNT(*) as order_count
FROM {table_name}
WHERE 1=1 {all_filters}
GROUP BY {group_by_clause}
ORDER BY amount DESC
LIMIT 300
"""
df_agg = pd.read_sql(sql_query, db.engine)
sys_log.info(f"[API] Table Data: SQL聚合查詢返回 {len(df_agg)} 筆商品 (篩選: category={category_filter}, month={month_filter}, dow={dow_filter}, hour={hour_filter}, keyword={keyword})")
if df_agg.empty:
return jsonify({'data': []})
# 計算衍生欄位
df_agg['margin_rate'] = ((df_agg['amount'] - df_agg['cost']) / df_agg['amount'] * 100).fillna(0)
df_agg['margin_rate'] = df_agg['margin_rate'].replace([np.inf, -np.inf], 0)
df_agg['avg_price'] = (df_agg['amount'] / df_agg['qty']).fillna(0)
df_agg['return_rate'] = (df_agg['return_qty'] / df_agg['qty'] * 100).fillna(0)
# V-Fix: 應用價格區間篩選 (在計算欄位後才能篩選)
if min_price_str:
try:
min_price = float(min_price_str)
df_agg = df_agg[df_agg['avg_price'] >= min_price]
except ValueError:
pass
if max_price_str:
try:
max_price = float(max_price_str)
df_agg = df_agg[df_agg['avg_price'] <= max_price]
except ValueError:
pass
# V-Fix: 應用毛利區間篩選 (在計算欄位後才能篩選)
if min_margin_str:
try:
min_margin = float(min_margin_str)
df_agg = df_agg[df_agg['margin_rate'] >= min_margin]
except ValueError:
pass
if max_margin_str:
try:
max_margin = float(max_margin_str)
df_agg = df_agg[df_agg['margin_rate'] <= max_margin]
except ValueError:
pass
# 重新排序並限制到 300 筆 (減少前端渲染負擔)
df_agg = df_agg.sort_values('amount', ascending=False).head(300)
# V-Opt: 使用向量化操作取代逐列迴圈
df_agg['rank'] = range(1, len(df_agg) + 1)
df_agg['month_str'] = '' # SQL聚合模式不需要月份字串
# 重新命名欄位以符合前端格式
result_df = df_agg.rename(columns={
'product_id': 'product_id',
'name': 'name',
'brand': 'brand',
'vendor': 'vendor',
'category': 'category',
'margin_rate': 'margin_rate',
'avg_price': 'avg_price',
'return_rate': 'return_rate',
'qty': 'qty',
'amount': 'amount'
})
# 選擇需要的欄位並轉換為字典列表
columns = ['rank', 'product_id', 'name', 'brand', 'vendor', 'category',
'margin_rate', 'month_str', 'avg_price', 'return_rate', 'qty', 'amount']
# V-Fix (2026-01-23): 確保所有數值欄位無 NaN/Infinity避免 JSON 序列化失敗
numeric_cols = ['margin_rate', 'avg_price', 'return_rate', 'qty', 'amount']
for col in numeric_cols:
if col in result_df.columns:
result_df[col] = result_df[col].replace([np.inf, -np.inf], 0).fillna(0)
# V-Fix (2026-01-23): 確保字串欄位無 None避免 JSON 序列化失敗
string_cols = ['product_id', 'name', 'brand', 'vendor', 'category', 'month_str']
for col in string_cols:
if col in result_df.columns:
result_df[col] = result_df[col].fillna('').astype(str)
data = result_df[columns].to_dict('records')
response_data = {'data': data}
# V-Opt: 儲存到快取
_TABLE_DATA_CACHE[cache_key] = {'data': response_data, 'time': time.time()}
# V-Opt: 清理過期快取 (保留最近 50 個)
if len(_TABLE_DATA_CACHE) > 50:
sorted_keys = sorted(_TABLE_DATA_CACHE.keys(),
key=lambda k: _TABLE_DATA_CACHE[k]['time'])
for old_key in sorted_keys[:-50]:
del _TABLE_DATA_CACHE[old_key]
return jsonify(response_data)
except Exception as e:
sys_log.error(f"[API] Table Data Error: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@sales_bp.route('/api/sales_analysis/table_data_pandas')
@login_required
def api_sales_table_data_pandas():
"""API: 取得業績分析的詳細列表資料 (使用 pandas 聚合 - 舊版本)"""
try:
table_name = 'realtime_sales_monthly'
data_range_months = int(request.args.get('data_range', '1'))
cache_key = f"{table_name}_{data_range_months}m"
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
if err or target_df is None:
sys_log.warning(f"[API] Table Data: 快取不存在 ({cache_key}),返回空資料")
return jsonify({'data': []})
if target_df.empty:
return jsonify({'data': []})
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_category = cols_map.get('category')
col_vendor = cols_map.get('vendor')
col_date = cols_map.get('date')
col_brand = cols_map.get('brand')
col_return_qty = cols_map.get('return_qty')
selected_metric = request.args.get('metric', 'amount')
# 執行聚合 (V-Opt: 多維度聚合,增加精確度)
agg_rules = {col_amount: 'sum'}
if col_qty: agg_rules[col_qty] = 'sum'
if col_cost: agg_rules[col_cost] = 'sum'
if col_profit: agg_rules[col_profit] = 'sum'
if col_return_qty: agg_rules[col_return_qty] = 'sum'
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
# Group By 鍵值:商品名稱 + 品牌 + 廠商 + 分類 (確保唯一性)
group_cols = [col_name]
if col_brand: group_cols.append(col_brand)
if col_vendor: group_cols.append(col_vendor)
if col_category: group_cols.append(col_category)
df_agg = target_df.groupby(group_cols).agg(agg_rules).reset_index()
# 計算毛利率
if col_profit:
df_agg['agg_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['agg_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['agg_margin_rate'] = 0.0
df_agg['agg_margin_rate'] = df_agg['agg_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# V-New: 計算平均單價與退貨率
if col_qty:
df_agg['avg_price'] = (df_agg[col_amount] / df_agg[col_qty]).fillna(0)
if col_return_qty:
df_agg['return_rate'] = (df_agg[col_return_qty] / df_agg[col_qty] * 100).fillna(0)
# 排序
sort_col_agg = col_amount
if selected_metric == 'qty' and col_qty:
sort_col_agg = col_qty
df_agg = df_agg.sort_values(by=sort_col_agg, ascending=False).head(1000) # 限制前 1000 筆
# 轉換為 DataTables 需要的格式
data = []
for i, row in enumerate(df_agg.to_dict('records')):
data.append({
'rank': i + 1,
'name': row.get(col_name, ''),
'brand': row.get(col_brand, ''),
'vendor': row.get(col_vendor, ''),
'category': row.get(col_category, ''),
'margin_rate': row.get('agg_margin_rate', 0),
'month_str': row.get('_month_str', ''),
'avg_price': row.get('avg_price', 0),
'return_rate': row.get('return_rate', 0),
'qty': row.get(col_qty, 0),
'amount': row.get(col_amount, 0)
})
return jsonify({'data': data})
except Exception as e:
sys_log.error(f"Table Data API Error: {e}")
return jsonify({'error': str(e)}), 500
@sales_bp.route('/api/sales_analysis/top_detail')
@login_required
def api_sales_top_detail():
"""API: 取得 Top N 詳細列表(業績貢獻王/獲利金雞母/人氣引流款)"""
try:
from datetime import datetime, timedelta, timezone
TAIPEI_TZ = timezone(timedelta(hours=8))
table_name = 'realtime_sales_monthly'
data_range_months = int(request.args.get('data_range', '1') or '1')
start_date = request.args.get('start_date', '') # V-New: 自訂開始日期
end_date = request.args.get('end_date', '') # V-New: 自訂結束日期
top_type = request.args.get('type', 'revenue') # revenue/margin/quantity
metric = request.args.get('metric', 'amount') # amount/profit/qty
view_type = request.args.get('view', 'product') # product/category
db = DatabaseManager()
# V-Fix: 從快取讀取欄位名稱對應,以支援不同的資料庫欄位名稱
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 嘗試從快取讀取欄位名稱
cols_map = {}
if cache_key in _SALES_PROCESSED_CACHE:
cols_map = _SALES_PROCESSED_CACHE[cache_key].get('cols', {})
elif table_name in _SALES_PROCESSED_CACHE: # V-Fix: 也嘗試使用固定 key
cols_map = _SALES_PROCESSED_CACHE[table_name].get('cols', {})
# 取得實際欄位名稱(如果快取中沒有,使用預設名稱)
# V-Fix (2026-01-23): 使用 or 確保不會得到 None 值
col_name = cols_map.get('name') or '商品名稱'
col_brand = cols_map.get('brand') or '品牌'
col_vendor = cols_map.get('vendor') or '廠商名稱'
col_category = cols_map.get('category') or '商品館'
col_amount = cols_map.get('amount') or '總業績'
col_qty = cols_map.get('qty') or '數量'
col_cost = cols_map.get('cost') or '總成本'
col_profit = cols_map.get('profit') # 可以為 None
col_activity = cols_map.get('activity') or '活動名稱'
col_payment = cols_map.get('payment') or '付款方式'
# 建立日期篩選條件
date_filter = ""
# V-New: 優先處理自訂日期區間
if start_date or end_date:
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
start_date_slash = start_date.replace('-', '/') if start_date else ''
end_date_slash = end_date.replace('-', '/') if end_date else ''
if start_date and end_date:
date_filter = f"""AND "日期" BETWEEN '{start_date_slash}' AND '{end_date_slash}'"""
elif start_date:
date_filter = f"""AND "日期" >= '{start_date_slash}'"""
else: # only end_date
date_filter = f"""AND "日期" <= '{end_date_slash}'"""
elif data_range_months > 0:
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
date_filter = f"""AND "日期" >= '{cutoff_date}'"""
# V-Fix: 補上其他所有篩選條件 (與 get_sales_table_data 一致)
category_filter = request.args.get('category', 'all')
brand_filter = request.args.get('brand', 'all')
vendor_filter = request.args.get('vendor', 'all')
activity_filter = request.args.get('activity', 'all')
payment_filter = request.args.get('payment', 'all')
month_filter = request.args.get('month', 'all')
dow_filter = request.args.get('dow', 'all')
hour_filter = request.args.get('hour', 'all')
min_price_str = request.args.get('min_price', '')
max_price_str = request.args.get('max_price', '')
min_margin_str = request.args.get('min_margin', '')
max_margin_str = request.args.get('max_margin', '')
keyword = request.args.get('keyword', '').strip()
additional_filters = []
if category_filter and category_filter != 'all':
additional_filters.append(f""""{col_category}" = '{category_filter}'""")
if brand_filter and brand_filter != 'all':
additional_filters.append(f""""{col_brand}" = '{brand_filter}'""")
if vendor_filter and vendor_filter != 'all':
additional_filters.append(f""""{col_vendor}" = '{vendor_filter}'""")
if activity_filter and activity_filter != 'all':
additional_filters.append(f""""{col_activity}" = '{activity_filter}'""")
if payment_filter and payment_filter != 'all':
additional_filters.append(f""""{col_payment}" = '{payment_filter}'""")
# 時間維度
if month_filter and month_filter != 'all':
month_filter_slash = month_filter.replace('-', '/')
# 使用「日期」欄位 (這似乎是系統內部固定欄位,不需 dynamic map除非資料表結構也變了)
# 假設 "日期" 是固定欄位
additional_filters.append(f"""("日期" LIKE '{month_filter}%' OR "日期" LIKE '{month_filter_slash}%')""")
if dow_filter and dow_filter != 'all':
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (top_detail API)
pandas_dow = int(dow_filter)
if DATABASE_TYPE == 'postgresql':
pg_dow = (pandas_dow + 1) % 7
additional_filters.append(f"""EXTRACT(DOW FROM TO_DATE(REPLACE("日期", '/', '-'), 'YYYY-MM-DD')) = {pg_dow}""")
else:
sqlite_dow = str((pandas_dow + 1) % 7)
additional_filters.append(f"""strftime('%w', replace("日期", '/', '-')) = '{sqlite_dow}'""")
if hour_filter and hour_filter != 'all':
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (top_detail API)
hour_val = int(hour_filter)
if DATABASE_TYPE == 'postgresql':
additional_filters.append(f"""CAST(SUBSTRING("時間" FROM 1 FOR 2) AS INTEGER) = {hour_val}""")
else:
additional_filters.append(f"""CAST(substr("時間", 1, 2) AS INTEGER) = {hour_val}""")
# 關鍵字
if keyword:
keyword_escaped = keyword.replace("'", "''")
k_conds = []
for col in [col_name, cols_map.get("pid", "商品ID"), col_brand, col_vendor]:
k_conds.append(f""""{col}" LIKE '%{keyword_escaped}%'""")
additional_filters.append(f"({' OR '.join(k_conds)})")
if (min_price_str or max_price_str):
price_sql = f'CAST("{col_amount}" AS FLOAT) / NULLIF("{col_qty}", 0)'
if min_price_str: additional_filters.append(f"{price_sql} >= {float(min_price_str)}")
if max_price_str: additional_filters.append(f"{price_sql} <= {float(max_price_str)}")
if (min_margin_str or max_margin_str):
if col_profit:
profit_sql = f'"{col_profit}"'
else:
profit_sql = f'("{col_amount}" - "{col_cost}")'
margin_sql = f'({profit_sql} * 100.0 / NULLIF("{col_amount}", 0))'
if min_margin_str: additional_filters.append(f"{margin_sql} >= {float(min_margin_str)}")
if max_margin_str: additional_filters.append(f"{margin_sql} <= {float(max_margin_str)}")
if additional_filters:
date_filter += " AND " + " AND ".join(additional_filters)
# V-New: 準備利潤計算 SQL 片段 (SELECT 子句使用 SUM 聚合)
if col_profit:
profit_select_sql = f'SUM(CAST("{col_profit}" AS REAL))'
else:
profit_select_sql = f'SUM(CAST("{col_amount}" AS REAL)) - SUM(CAST("{col_cost}" AS REAL))'
# 根據檢視類型和指標建立 SQL 查詢
if view_type == 'category':
# 分類排行
if metric == 'qty':
sql_query = f"""
SELECT
"{col_category}" as name,
SUM(CAST("{col_qty}" AS REAL)) as value
FROM {table_name}
WHERE "{col_category}" IS NOT NULL {date_filter}
GROUP BY "{col_category}"
ORDER BY value DESC
LIMIT 50
"""
elif metric == 'profit':
sql_query = f"""
SELECT
"{col_category}" as name,
{profit_select_sql} as value,
CASE
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
ELSE 0
END as margin_rate
FROM {table_name}
WHERE "{col_category}" IS NOT NULL {date_filter}
GROUP BY "{col_category}"
ORDER BY value DESC
LIMIT 50
"""
else: # amount
sql_query = f"""
SELECT
"{col_category}" as name,
SUM(CAST("{col_amount}" AS REAL)) as value
FROM {table_name}
WHERE "{col_category}" IS NOT NULL {date_filter}
GROUP BY "{col_category}"
ORDER BY value DESC
LIMIT 50
"""
else:
# 商品排行包含商品ID
pid_col_sql = f'"{cols_map.get("pid", "商品ID")}"' # 商品ID 欄位
if metric == 'qty':
sql_query = f"""
SELECT
{pid_col_sql} as product_id,
"{col_name}" as name,
"{col_brand}" as brand,
"{col_vendor}" as vendor,
"{col_category}" as category,
SUM(CAST("{col_qty}" AS REAL)) as value
FROM {table_name}
WHERE "{col_name}" IS NOT NULL {date_filter}
GROUP BY {pid_col_sql}, "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
ORDER BY value DESC
LIMIT 100
"""
elif metric == 'profit':
sql_query = f"""
SELECT
{pid_col_sql} as product_id,
"{col_name}" as name,
"{col_brand}" as brand,
"{col_vendor}" as vendor,
"{col_category}" as category,
{profit_select_sql} as value,
CASE
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
ELSE 0
END as margin_rate
FROM {table_name}
WHERE "{col_name}" IS NOT NULL {date_filter}
GROUP BY {pid_col_sql}, "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
ORDER BY value DESC
LIMIT 100
"""
else: # amount
sql_query = f"""
SELECT
{pid_col_sql} as product_id,
"{col_name}" as name,
"{col_brand}" as brand,
"{col_vendor}" as vendor,
"{col_category}" as category,
SUM(CAST("{col_amount}" AS REAL)) as value
FROM {table_name}
WHERE "{col_name}" IS NOT NULL {date_filter}
GROUP BY {pid_col_sql}, "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
ORDER BY value DESC
LIMIT 100
"""
# 執行查詢
df = pd.read_sql(sql_query, db.engine)
sys_log.info(f"[API] Top Detail: {top_type}/{view_type} 返回 {len(df)} 筆資料")
if df.empty:
return jsonify({'items': []})
# V-Fix (2026-01-23): 確保數值欄位無 NaN/Infinity避免 JSON 序列化失敗
numeric_cols = ['value', 'margin_rate']
for col in numeric_cols:
if col in df.columns:
df[col] = df[col].replace([np.inf, -np.inf], 0).fillna(0)
# V-Fix (2026-01-23): 確保字串欄位無 None
string_cols = ['product_id', 'name', 'brand', 'vendor', 'category']
for col in string_cols:
if col in df.columns:
df[col] = df[col].fillna('').astype(str)
# 轉換為 JSON
items = df.to_dict('records')
return jsonify({'items': items})
except Exception as e:
sys_log.error(f"[API] Top Detail Error: {e}")
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@sales_bp.route('/api/sales_analysis/export_top_detail')
@login_required
def api_export_top_detail():
"""API: 匯出 Top N 詳細列表為 Excel"""
try:
from datetime import datetime, timedelta, timezone
import io
TAIPEI_TZ = timezone(timedelta(hours=8))
table_name = 'realtime_sales_monthly'
data_range_months = int(request.args.get('data_range', '1') or '1')
start_date = request.args.get('start_date', '') # V-New: 自訂開始日期
end_date = request.args.get('end_date', '') # V-New: 自訂結束日期
top_type = request.args.get('type', 'revenue')
metric = request.args.get('metric', 'amount')
view_type = request.args.get('view', 'product')
db = DatabaseManager()
# V-Fix: 從快取讀取欄位名稱對應,以支援不同的資料庫欄位名稱
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 嘗試從快取讀取欄位名稱
cols_map = {}
if cache_key in _SALES_PROCESSED_CACHE:
cols_map = _SALES_PROCESSED_CACHE[cache_key].get('cols', {})
elif table_name in _SALES_PROCESSED_CACHE: # V-Fix: 也嘗試使用固定 key
cols_map = _SALES_PROCESSED_CACHE[table_name].get('cols', {})
# 取得實際欄位名稱(如果快取中沒有,使用預設名稱)
# V-Fix (2026-01-23): 使用 or 確保不會得到 None 值
col_name = cols_map.get('name') or '商品名稱'
col_brand = cols_map.get('brand') or '品牌'
col_vendor = cols_map.get('vendor') or '廠商名稱'
col_category = cols_map.get('category') or '商品館'
col_amount = cols_map.get('amount') or '總業績'
col_qty = cols_map.get('qty') or '數量'
col_cost = cols_map.get('cost') or '總成本'
col_profit = cols_map.get('profit') # 可以為 None
col_activity = cols_map.get('activity') or '活動名稱'
col_payment = cols_map.get('payment') or '付款方式'
# 建立日期篩選條件
date_filter = ""
# V-New: 優先處理自訂日期區間
if start_date or end_date:
# V-Fix: 處理日期格式轉換 (2025-01-01 -> 2025/01/01)
start_date_slash = start_date.replace('-', '/') if start_date else ''
end_date_slash = end_date.replace('-', '/') if end_date else ''
if start_date and end_date:
date_filter = f"""AND "日期" BETWEEN '{start_date_slash}' AND '{end_date_slash}'"""
elif start_date:
date_filter = f"""AND "日期" >= '{start_date_slash}'"""
else: # only end_date
date_filter = f"""AND "日期" <= '{end_date_slash}'"""
elif data_range_months > 0:
cutoff_date = (datetime.now(TAIPEI_TZ) - timedelta(days=data_range_months * 30)).strftime('%Y/%m/%d')
date_filter = f"""AND "日期" >= '{cutoff_date}'"""
# V-Fix: 補上其他所有篩選條件 (與 get_top_detail 一致)
category_filter = request.args.get('category', 'all')
brand_filter = request.args.get('brand', 'all')
vendor_filter = request.args.get('vendor', 'all')
activity_filter = request.args.get('activity', 'all')
payment_filter = request.args.get('payment', 'all')
month_filter = request.args.get('month', 'all')
dow_filter = request.args.get('dow', 'all')
hour_filter = request.args.get('hour', 'all')
min_price_str = request.args.get('min_price', '')
max_price_str = request.args.get('max_price', '')
min_margin_str = request.args.get('min_margin', '')
max_margin_str = request.args.get('max_margin', '')
keyword = request.args.get('keyword', '').strip()
additional_filters = []
if category_filter and category_filter != 'all':
additional_filters.append(f""""{col_category}" = '{category_filter}'""")
if brand_filter and brand_filter != 'all':
additional_filters.append(f""""{col_brand}" = '{brand_filter}'""")
if vendor_filter and vendor_filter != 'all':
additional_filters.append(f""""{col_vendor}" = '{vendor_filter}'""")
if activity_filter and activity_filter != 'all':
additional_filters.append(f""""{col_activity}" = '{activity_filter}'""")
if payment_filter and payment_filter != 'all':
additional_filters.append(f""""{col_payment}" = '{payment_filter}'""")
if month_filter and month_filter != 'all':
month_filter_slash = month_filter.replace('-', '/')
additional_filters.append(f"""("日期" LIKE '{month_filter}%' OR "日期" LIKE '{month_filter_slash}%')""")
if dow_filter and dow_filter != 'all':
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (export API)
pandas_dow = int(dow_filter)
if DATABASE_TYPE == 'postgresql':
pg_dow = (pandas_dow + 1) % 7
additional_filters.append(f"""EXTRACT(DOW FROM TO_DATE(REPLACE("日期", '/', '-'), 'YYYY-MM-DD')) = {pg_dow}""")
else:
sqlite_dow = str((pandas_dow + 1) % 7)
additional_filters.append(f"""strftime('%w', replace("日期", '/', '-')) = '{sqlite_dow}'""")
if hour_filter and hour_filter != 'all':
# V-Fix (2026-01-23): 支援 PostgreSQL 和 SQLite (export API)
hour_val = int(hour_filter)
if DATABASE_TYPE == 'postgresql':
additional_filters.append(f"""CAST(SUBSTRING("時間" FROM 1 FOR 2) AS INTEGER) = {hour_val}""")
else:
additional_filters.append(f"""CAST(substr("時間", 1, 2) AS INTEGER) = {hour_val}""")
if keyword:
keyword_escaped = keyword.replace("'", "''")
k_conds = []
for col in [col_name, cols_map.get("pid", "商品ID"), col_brand, col_vendor]:
k_conds.append(f""""{col}" LIKE '%{keyword_escaped}%'""")
additional_filters.append(f"({' OR '.join(k_conds)})")
if (min_price_str or max_price_str):
price_sql = f'CAST("{col_amount}" AS FLOAT) / NULLIF("{col_qty}", 0)'
if min_price_str: additional_filters.append(f"{price_sql} >= {float(min_price_str)}")
if max_price_str: additional_filters.append(f"{price_sql} <= {float(max_price_str)}")
if (min_margin_str or max_margin_str):
if col_profit:
profit_sql = f'"{col_profit}"'
else:
profit_sql = f'("{col_amount}" - "{col_cost}")'
margin_sql = f'({profit_sql} * 100.0 / NULLIF("{col_amount}", 0))'
if min_margin_str: additional_filters.append(f"{margin_sql} >= {float(min_margin_str)}")
if max_margin_str: additional_filters.append(f"{margin_sql} <= {float(max_margin_str)}")
if additional_filters:
date_filter += " AND " + " AND ".join(additional_filters)
# V-New: 準備利潤計算 SQL 片段 (SELECT 子句使用 SUM 聚合)
if col_profit:
profit_select_sql = f'SUM(CAST("{col_profit}" AS REAL))'
else:
profit_select_sql = f'SUM(CAST("{col_amount}" AS REAL)) - SUM(CAST("{col_cost}" AS REAL))'
# 根據檢視類型和指標建立 SQL 查詢(與上面相同)
if view_type == 'category':
if metric == 'qty':
sql_query = f"""
SELECT
"{col_category}" as 分類名稱,
SUM(CAST("{col_qty}" AS REAL)) as 銷售數量
FROM {table_name}
WHERE "{col_category}" IS NOT NULL {date_filter}
GROUP BY "{col_category}"
ORDER BY 銷售數量 DESC
LIMIT 50
"""
elif metric == 'profit':
sql_query = f"""
SELECT
"{col_category}" as 分類名稱,
{profit_select_sql} as 毛利金額,
CASE
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
ELSE 0
END as 毛利率
FROM {table_name}
WHERE "{col_category}" IS NOT NULL {date_filter}
GROUP BY "{col_category}"
ORDER BY 毛利金額 DESC
LIMIT 50
"""
else: # amount
sql_query = f"""
SELECT
"{col_category}" as 分類名稱,
SUM(CAST("{col_amount}" AS REAL)) as 銷售金額
FROM {table_name}
WHERE "{col_category}" IS NOT NULL {date_filter}
GROUP BY "{col_category}"
ORDER BY 銷售金額 DESC
LIMIT 50
"""
else:
# 商品排行包含商品ID
if metric == 'qty':
sql_query = f"""
SELECT
"{cols_map.get("pid", "商品ID")}" as 商品ID,
"{col_name}" as 商品名稱,
"{col_brand}" as 品牌,
"{col_vendor}" as 廠商名稱,
"{col_category}" as 分類名稱,
SUM(CAST("{col_qty}" AS REAL)) as 銷售數量
FROM {table_name}
WHERE "{col_name}" IS NOT NULL {date_filter}
GROUP BY "{cols_map.get("pid", "商品ID")}", "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
ORDER BY 銷售數量 DESC
LIMIT 100
"""
elif metric == 'profit':
sql_query = f"""
SELECT
"{cols_map.get("pid", "商品ID")}" as 商品ID,
"{col_name}" as 商品名稱,
"{col_brand}" as 品牌,
"{col_vendor}" as 廠商名稱,
"{col_category}" as 分類名稱,
{profit_select_sql} as 毛利金額,
CASE
WHEN SUM(CAST("{col_amount}" AS REAL)) > 0
THEN (({profit_select_sql}) / SUM(CAST("{col_amount}" AS REAL))) * 100
ELSE 0
END as 毛利率
FROM {table_name}
WHERE "{col_name}" IS NOT NULL {date_filter}
GROUP BY "{cols_map.get("pid", "商品ID")}", "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
ORDER BY 毛利金額 DESC
LIMIT 100
"""
else: # amount
sql_query = f"""
SELECT
"{cols_map.get("pid", "商品ID")}" as 商品ID,
"{col_name}" as 商品名稱,
"{col_brand}" as 品牌,
"{col_vendor}" as 廠商名稱,
"{col_category}" as 分類名稱,
SUM(CAST("{col_amount}" AS REAL)) as 銷售金額
FROM {table_name}
WHERE "{col_name}" IS NOT NULL {date_filter}
GROUP BY "{cols_map.get("pid", "商品ID")}", "{col_name}", "{col_brand}", "{col_vendor}", "{col_category}"
ORDER BY 銷售金額 DESC
LIMIT 100
"""
# 執行查詢並匯出
df = pd.read_sql(sql_query, db.engine)
if df.empty:
return "無資料可匯出", 400
# 生成 Excel
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='Top排行')
output.seek(0)
# 生成檔案名稱
type_names = {'revenue': '業績貢獻王', 'margin': '獲利金雞母', 'quantity': '人氣引流款'}
view_names = {'product': '商品排行', 'category': '分類排行'}
filename = f"{type_names.get(top_type, '排行')}_{view_names.get(view_type, '')}_{datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')}.xlsx"
sys_log.info(f"[Export] Top Detail: {filename} ({len(df)} 筆)")
return send_file(
output,
as_attachment=True,
download_name=filename,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
sys_log.error(f"[Export] Top Detail Error: {e}")
return f"匯出失敗: {e}", 500
@sales_bp.route('/api/sales_analysis/yoy_comparison')
@login_required
def api_yoy_comparison():
"""
API: 年度對比分析 (YoY Comparison)
參數:
year1: 基準年 (例如 2024)
year2: 對比年 (例如 2025)
month: 月份 (可選1-12不帶則為全年)
metric: 指標 (revenue/qty/profit)
回傳:
JSON with year1 total, year2 total, growth rate, and monthly breakdown
"""
try:
from datetime import datetime, timedelta, timezone
TAIPEI_TZ = timezone(timedelta(hours=8))
table_name = 'realtime_sales_monthly'
year1 = request.args.get('year1', '2024')
year2 = request.args.get('year2', '2025')
month = request.args.get('month', '') # 可選1-12
metric = request.args.get('metric', 'revenue') # revenue/qty/profit
db = DatabaseManager()
# 欄位名稱
col_amount = '總業績'
col_qty = '數量'
col_cost = '總成本'
col_date = '日期'
# 根據指標決定聚合欄位
if metric == 'qty':
agg_sql = f'SUM(CAST("{col_qty}" AS REAL))'
metric_label = '銷售數量'
elif metric == 'profit':
agg_sql = f'SUM(CAST("{col_amount}" AS REAL)) - SUM(CAST("{col_cost}" AS REAL))'
metric_label = '毛利金額'
else: # revenue
agg_sql = f'SUM(CAST("{col_amount}" AS REAL))'
metric_label = '銷售金額'
# 建立年度篩選條件
# 日期格式為 2025/01/01 或 2025-01-01
def build_year_filter(year, month_filter=''):
if month_filter:
month_str = month_filter.zfill(2)
return f"""("{col_date}" LIKE '{year}/{month_str}%' OR "{col_date}" LIKE '{year}-{month_str}%')"""
else:
return f"""("{col_date}" LIKE '{year}/%' OR "{col_date}" LIKE '{year}-%')"""
year1_filter = build_year_filter(year1, month)
year2_filter = build_year_filter(year2, month)
# 查詢年度總計
sql_year1 = f"""
SELECT {agg_sql} as total
FROM {table_name}
WHERE {year1_filter}
"""
sql_year2 = f"""
SELECT {agg_sql} as total
FROM {table_name}
WHERE {year2_filter}
"""
# V-Fix: SQLAlchemy 2.0 需要使用 text() 包裹 SQL 字串
from sqlalchemy import text
result_year1 = pd.read_sql(text(sql_year1), db.engine)
result_year2 = pd.read_sql(text(sql_year2), db.engine)
total_year1 = float(result_year1['total'].iloc[0] or 0)
total_year2 = float(result_year2['total'].iloc[0] or 0)
# 計算成長率
if total_year1 > 0:
growth_rate = ((total_year2 - total_year1) / total_year1) * 100
else:
growth_rate = 0 if total_year2 == 0 else 100
# 月度明細 (如果沒有指定月份,則查詢 12 個月的明細)
monthly_breakdown = []
if not month:
for m in range(1, 13):
m_str = str(m).zfill(2)
y1_filter = build_year_filter(year1, m_str)
y2_filter = build_year_filter(year2, m_str)
sql_m1 = f"SELECT {agg_sql} as total FROM {table_name} WHERE {y1_filter}"
sql_m2 = f"SELECT {agg_sql} as total FROM {table_name} WHERE {y2_filter}"
# V-Fix: SQLAlchemy 2.0 需要使用 text()
r1 = pd.read_sql(text(sql_m1), db.engine)
r2 = pd.read_sql(text(sql_m2), db.engine)
v1 = float(r1['total'].iloc[0] or 0)
v2 = float(r2['total'].iloc[0] or 0)
m_growth = ((v2 - v1) / v1 * 100) if v1 > 0 else (0 if v2 == 0 else 100)
monthly_breakdown.append({
'month': m,
'month_label': f'{m}',
'year1_value': v1,
'year2_value': v2,
'growth_rate': round(m_growth, 2)
})
response = {
'year1': {
'label': f'{year1}' + (f'{month}' if month else ''),
'total': total_year1
},
'year2': {
'label': f'{year2}' + (f'{month}' if month else ''),
'total': total_year2
},
'growth_rate': round(growth_rate, 2),
'metric': metric,
'metric_label': metric_label,
'monthly_breakdown': monthly_breakdown
}
sys_log.info(f"[YoY] {year1} vs {year2}: {total_year1:,.0f} -> {total_year2:,.0f} ({growth_rate:+.1f}%)")
return jsonify(response)
except Exception as e:
sys_log.error(f"[YoY] Error: {e}")
traceback.print_exc()
return jsonify({'error': str(e)}), 500