1015 lines
41 KiB
Python
1015 lines
41 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
當日業績路由模組
|
||
包含:當日業績看板、分類明細匯出、行銷活動分析
|
||
"""
|
||
|
||
import io
|
||
import calendar
|
||
import hashlib
|
||
import os
|
||
import pickle
|
||
from datetime import datetime, timezone, timedelta
|
||
from urllib.parse import quote
|
||
from flask import Blueprint, request, render_template, send_file, url_for
|
||
from auth import login_required
|
||
from sqlalchemy import inspect, text
|
||
import pandas as pd
|
||
|
||
from config import BASE_DIR, SYSTEM_VERSION
|
||
from database.manager import DatabaseManager
|
||
from services.logger_manager import SystemLogger
|
||
from utils.df_helpers import find_col
|
||
from services.daily_sales_service import (
|
||
get_taiwan_holiday,
|
||
prepare_calendar_data,
|
||
prepare_marketing_summary,
|
||
)
|
||
from services.competitor_intel_repository import build_competitor_intel_payload
|
||
from services.cache_manager import (
|
||
_DAILY_SALES_PROCESSED_CACHE as _SALES_PROCESSED_CACHE,
|
||
_DAILY_SALES_VIEW_CACHE_DIR,
|
||
clear_daily_sales_cache as _clear_daily_sales_cache,
|
||
)
|
||
|
||
# 時區設定
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# Logger
|
||
sys_log = SystemLogger("DailySalesRoutes").get_logger()
|
||
|
||
# Blueprint 定義
|
||
daily_sales_bp = Blueprint('daily_sales', __name__)
|
||
|
||
_CACHE_EXPIRY_SECONDS = 300 # 5 分鐘緩存過期
|
||
_VIEW_CACHE_EXPIRY_SECONDS = 120
|
||
_SHARED_VIEW_CACHE_EXPIRY_SECONDS = 1800
|
||
_DAILY_SALES_VIEW_CACHE = {}
|
||
_DAILY_SALES_VIEW_CACHE_MAX = 24
|
||
_CATEGORY_TABLE_DEFAULT_LIMIT = 120
|
||
|
||
|
||
def _get_daily_view_cache(cache_key):
|
||
cache_data = _DAILY_SALES_VIEW_CACHE.get(cache_key)
|
||
if not cache_data:
|
||
return None
|
||
elapsed = (datetime.now() - cache_data['timestamp']).total_seconds()
|
||
if elapsed >= _VIEW_CACHE_EXPIRY_SECONDS:
|
||
_DAILY_SALES_VIEW_CACHE.pop(cache_key, None)
|
||
return None
|
||
return cache_data['context']
|
||
|
||
|
||
def _set_daily_view_cache(cache_key, context):
|
||
if len(_DAILY_SALES_VIEW_CACHE) >= _DAILY_SALES_VIEW_CACHE_MAX:
|
||
oldest_key = min(
|
||
_DAILY_SALES_VIEW_CACHE,
|
||
key=lambda key: _DAILY_SALES_VIEW_CACHE[key]['timestamp']
|
||
)
|
||
_DAILY_SALES_VIEW_CACHE.pop(oldest_key, None)
|
||
_DAILY_SALES_VIEW_CACHE[cache_key] = {
|
||
'context': context,
|
||
'timestamp': datetime.now()
|
||
}
|
||
|
||
|
||
def _daily_view_cache_file(cache_key):
|
||
digest = hashlib.md5(cache_key.encode('utf-8'), usedforsecurity=False).hexdigest()
|
||
return _DAILY_SALES_VIEW_CACHE_DIR / f"{digest}.pkl"
|
||
|
||
|
||
def _get_shared_daily_view_cache(cache_key):
|
||
path = _daily_view_cache_file(cache_key)
|
||
if not path.exists():
|
||
return None
|
||
try:
|
||
if (datetime.now().timestamp() - path.stat().st_mtime) >= _SHARED_VIEW_CACHE_EXPIRY_SECONDS:
|
||
path.unlink(missing_ok=True)
|
||
return None
|
||
with open(path, 'rb') as f:
|
||
payload = pickle.load(f)
|
||
if payload.get('version') != SYSTEM_VERSION:
|
||
return None
|
||
return payload.get('context')
|
||
except Exception as exc:
|
||
sys_log.warning(f"[DailySales] 共享 view cache 讀取失敗: {exc}")
|
||
return None
|
||
|
||
|
||
def _set_shared_daily_view_cache(cache_key, context):
|
||
path = _daily_view_cache_file(cache_key)
|
||
tmp_path = path.with_suffix(f".{os.getpid()}.tmp")
|
||
try:
|
||
os.makedirs(path.parent, exist_ok=True)
|
||
with open(tmp_path, 'wb') as f:
|
||
pickle.dump({'version': SYSTEM_VERSION, 'context': context}, f, protocol=pickle.HIGHEST_PROTOCOL)
|
||
os.replace(tmp_path, path)
|
||
except Exception as exc:
|
||
sys_log.warning(f"[DailySales] 共享 view cache 寫入失敗: {exc}")
|
||
try:
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _daily_sales_url_with_detail(detail_value):
|
||
args = request.args.to_dict(flat=True)
|
||
if detail_value == 'all':
|
||
args['detail'] = 'all'
|
||
else:
|
||
args.pop('detail', None)
|
||
return url_for('daily_sales.daily_sales', **args)
|
||
|
||
|
||
def clear_daily_sales_cache():
|
||
"""清除當日業績緩存(供匯入服務調用)"""
|
||
_clear_daily_sales_cache()
|
||
_DAILY_SALES_VIEW_CACHE.clear()
|
||
sys_log.info("已清除當日業績緩存")
|
||
|
||
|
||
# === 清除緩存 API 端點 ===
|
||
@daily_sales_bp.route('/api/daily_sales/clear_cache', methods=['POST'])
|
||
@login_required
|
||
def api_clear_daily_sales_cache():
|
||
"""手動清除快取(保留為 ops escape hatch;正常失效靠 DB fingerprint)"""
|
||
cache_size = len(_SALES_PROCESSED_CACHE) + len(_DAILY_SALES_VIEW_CACHE)
|
||
_clear_daily_sales_cache()
|
||
_DAILY_SALES_VIEW_CACHE.clear()
|
||
sys_log.info(f"[API] 已清除當日業績緩存 (原有 {cache_size} 個緩存項目)")
|
||
return {'success': True, 'message': f'已清除 {cache_size} 個緩存項目'}
|
||
|
||
|
||
def _get_data_fingerprint(engine, table_name='daily_sales_snapshot'):
|
||
"""DB 指紋 (max_snapshot_date, row_count):資料一變動指紋就跳號,
|
||
讓 4 worker 的 in-memory cache 在下一次 request 自然失效。"""
|
||
try:
|
||
validate_table_name(table_name)
|
||
if engine.dialect.name == 'postgresql':
|
||
fingerprint_sql = f'SELECT MAX("snapshot_date"::date)::text, COUNT(*) FROM "{table_name}"'
|
||
else:
|
||
fingerprint_sql = f'SELECT CAST(MAX(snapshot_date) AS TEXT), COUNT(*) FROM "{table_name}"'
|
||
with engine.connect() as conn:
|
||
row = conn.execute(text(fingerprint_sql)).fetchone()
|
||
return (row[0], row[1]) if row else (None, 0)
|
||
except Exception as e:
|
||
sys_log.warning(f"[Cache] fingerprint 查詢失敗(保守維持現有 cache): {e}")
|
||
return None
|
||
|
||
|
||
def _is_cache_valid(cache_key, engine=None, table_name='daily_sales_snapshot'):
|
||
if cache_key not in _SALES_PROCESSED_CACHE:
|
||
return False
|
||
cache_data = _SALES_PROCESSED_CACHE[cache_key]
|
||
if 'timestamp' not in cache_data:
|
||
return False
|
||
elapsed = (datetime.now() - cache_data['timestamp']).total_seconds()
|
||
if elapsed >= _CACHE_EXPIRY_SECONDS:
|
||
return False
|
||
if engine is not None:
|
||
current_fp = _get_data_fingerprint(engine, table_name)
|
||
if current_fp is None:
|
||
return True
|
||
if current_fp != cache_data.get('fingerprint'):
|
||
return False
|
||
return True
|
||
|
||
|
||
def _snapshot_date_expr(engine):
|
||
if engine.dialect.name == 'postgresql':
|
||
return '"snapshot_date"::date'
|
||
return 'date("snapshot_date")'
|
||
|
||
|
||
def _snapshot_date_window_clause(engine):
|
||
date_expr = _snapshot_date_expr(engine)
|
||
if engine.dialect.name == 'postgresql':
|
||
return f'{date_expr} >= CAST(:start_date AS date) AND {date_expr} <= CAST(:end_date AS date)'
|
||
return f'{date_expr} >= date(:start_date) AND {date_expr} <= date(:end_date)'
|
||
|
||
|
||
def _date_param(value):
|
||
return pd.to_datetime(value).strftime('%Y-%m-%d')
|
||
|
||
|
||
def _get_available_daily_dates(engine, table_name='daily_sales_snapshot'):
|
||
"""取得可選日期清單,避免為了 date selector 載入整張業績表。"""
|
||
validate_table_name(table_name)
|
||
date_expr = _snapshot_date_expr(engine)
|
||
|
||
query = text(
|
||
f'SELECT DISTINCT {date_expr} AS snapshot_date '
|
||
f'FROM "{table_name}" '
|
||
'WHERE "snapshot_date" IS NOT NULL '
|
||
'ORDER BY snapshot_date DESC'
|
||
)
|
||
with engine.connect() as conn:
|
||
rows = conn.execute(query).fetchall()
|
||
|
||
dates = []
|
||
for row in rows:
|
||
try:
|
||
dates.append(pd.to_datetime(row[0]).normalize())
|
||
except Exception:
|
||
continue
|
||
return dates
|
||
|
||
|
||
def _get_daily_sales_metadata(engine, table_name='daily_sales_snapshot'):
|
||
"""一次取得日期選單與資料指紋,避免首屏每次掃兩輪 daily snapshot。"""
|
||
validate_table_name(table_name)
|
||
if engine.dialect.name != 'postgresql':
|
||
return (
|
||
_get_available_daily_dates(engine, table_name),
|
||
_get_data_fingerprint(engine, table_name),
|
||
)
|
||
|
||
query = text(
|
||
f'SELECT MAX("snapshot_date"::date)::text AS max_snapshot_date, '
|
||
f'COUNT(*) AS row_count, '
|
||
f'ARRAY_AGG(DISTINCT "snapshot_date"::date ORDER BY "snapshot_date"::date DESC) '
|
||
f'FILTER (WHERE "snapshot_date" IS NOT NULL) AS available_dates '
|
||
f'FROM "{table_name}"'
|
||
)
|
||
try:
|
||
with engine.connect() as conn:
|
||
row = conn.execute(query).fetchone()
|
||
if not row:
|
||
return [], (None, 0)
|
||
raw_dates = row[2] or []
|
||
dates = []
|
||
for raw_date in raw_dates:
|
||
try:
|
||
dates.append(pd.to_datetime(raw_date).normalize())
|
||
except Exception:
|
||
continue
|
||
return dates, (row[0], row[1] or 0)
|
||
except Exception as exc:
|
||
sys_log.warning(f"[DailySales] metadata 查詢失敗,改用相容查詢: {exc}")
|
||
return (
|
||
_get_available_daily_dates(engine, table_name),
|
||
_get_data_fingerprint(engine, table_name),
|
||
)
|
||
|
||
|
||
def _read_daily_sales_window(engine, table_name, start_date, end_date):
|
||
"""只讀取畫面需要的日期窗口,降低冷 worker 首頁等待時間。"""
|
||
table_name = validate_table_name(table_name)
|
||
date_expr = _snapshot_date_expr(engine)
|
||
where_clause = _snapshot_date_window_clause(engine)
|
||
query = text(
|
||
f'SELECT * FROM "{table_name}" '
|
||
f'WHERE {where_clause} '
|
||
f'ORDER BY {date_expr} ASC'
|
||
)
|
||
return pd.read_sql(
|
||
query,
|
||
engine,
|
||
params={
|
||
'start_date': _date_param(start_date),
|
||
'end_date': _date_param(end_date),
|
||
},
|
||
)
|
||
|
||
|
||
# ==========================================
|
||
# 輔助函數
|
||
# ==========================================
|
||
|
||
# 共用工具改 import 自 utils(去重,原本檔案內定義已移除)
|
||
from utils.security import validate_table_name, safe_read_sql # noqa: E402, F401
|
||
|
||
|
||
def preprocess_daily_sales_data(df):
|
||
"""前處理當日業績資料:欄位識別、型別轉換"""
|
||
cols = df.columns.tolist()
|
||
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', 'Amount', '總業績'])
|
||
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
|
||
col_profit = find_col(cols, ['毛利', 'Profit'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
|
||
|
||
if col_amount:
|
||
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
|
||
if col_cost:
|
||
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
|
||
if col_profit:
|
||
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
|
||
if col_qty:
|
||
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
|
||
|
||
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'], errors='coerce')
|
||
|
||
return df
|
||
|
||
|
||
def calculate_daily_kpis(df, date_str):
|
||
"""計算單日 6 個 KPI"""
|
||
day_df = df[df['snapshot_date'] == date_str]
|
||
cols = day_df.columns.tolist()
|
||
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
|
||
col_profit = find_col(cols, ['毛利', 'Profit'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
|
||
|
||
total_revenue = float(day_df[col_amount].sum()) if col_amount else 0
|
||
total_cost = float(day_df[col_cost].sum()) if col_cost else 0
|
||
gross_margin = float(day_df[col_profit].sum()) if col_profit else (total_revenue - total_cost)
|
||
total_qty = float(day_df[col_qty].sum()) if col_qty else 0
|
||
sku_count = int(day_df[col_name].nunique()) if col_name else 0
|
||
avg_price = total_revenue / total_qty if total_qty > 0 else 0
|
||
|
||
return {
|
||
'total_revenue': total_revenue,
|
||
'total_cost': total_cost,
|
||
'gross_margin': gross_margin,
|
||
'total_qty': total_qty,
|
||
'sku_count': sku_count,
|
||
'avg_price': avg_price
|
||
}
|
||
|
||
|
||
def calculate_dod(df, current_date):
|
||
"""計算 Day-over-Day 變化率"""
|
||
current = calculate_daily_kpis(df, current_date)
|
||
prev_date = current_date - timedelta(days=1)
|
||
|
||
if prev_date not in df['snapshot_date'].values:
|
||
return {k: 0.0 for k in current.keys()}
|
||
|
||
previous = calculate_daily_kpis(df, prev_date)
|
||
|
||
dod = {}
|
||
for key in current:
|
||
if previous[key] > 0:
|
||
dod[key] = ((current[key] - previous[key]) / previous[key]) * 100
|
||
else:
|
||
dod[key] = 0.0
|
||
return dod
|
||
|
||
|
||
def calculate_wow(df, current_date):
|
||
"""計算 Week-over-Week 變化率"""
|
||
current = calculate_daily_kpis(df, current_date)
|
||
prev_week_date = current_date - timedelta(days=7)
|
||
|
||
if prev_week_date not in df['snapshot_date'].values:
|
||
return {k: 0.0 for k in current.keys()}
|
||
|
||
previous = calculate_daily_kpis(df, prev_week_date)
|
||
|
||
wow = {}
|
||
for key in current:
|
||
if previous[key] > 0:
|
||
wow[key] = ((current[key] - previous[key]) / previous[key]) * 100
|
||
else:
|
||
wow[key] = 0.0
|
||
return wow
|
||
|
||
|
||
def prepare_daily_charts(df, selected_date, days=30):
|
||
"""準備 4 個圖表的數據(根據選擇的日期)"""
|
||
start_date = selected_date - timedelta(days=days)
|
||
df_range = df[(df['snapshot_date'] >= start_date) & (df['snapshot_date'] <= selected_date)]
|
||
|
||
cols = df_range.columns.tolist()
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_cost = find_col(cols, ['成本', '總成本'])
|
||
col_profit = find_col(cols, ['毛利'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名'])
|
||
|
||
agg_dict = {}
|
||
if col_amount:
|
||
agg_dict[col_amount] = 'sum'
|
||
if col_cost:
|
||
agg_dict[col_cost] = 'sum'
|
||
if col_profit:
|
||
agg_dict[col_profit] = 'sum'
|
||
if col_qty:
|
||
agg_dict[col_qty] = 'sum'
|
||
|
||
daily_agg = df_range.groupby('snapshot_date').agg(agg_dict).reset_index()
|
||
|
||
if col_profit and col_profit in daily_agg.columns:
|
||
daily_agg['profit'] = daily_agg[col_profit]
|
||
elif col_amount and col_cost and col_amount in daily_agg.columns and col_cost in daily_agg.columns:
|
||
daily_agg['profit'] = daily_agg[col_amount] - daily_agg[col_cost]
|
||
else:
|
||
daily_agg['profit'] = 0
|
||
|
||
if col_amount and col_qty and col_amount in daily_agg.columns and col_qty in daily_agg.columns:
|
||
daily_agg['avg_price'] = (daily_agg[col_amount] / daily_agg[col_qty]).fillna(0)
|
||
else:
|
||
daily_agg['avg_price'] = 0
|
||
|
||
if col_amount and col_amount in daily_agg.columns:
|
||
daily_agg['margin_rate'] = (
|
||
daily_agg['profit'] / daily_agg[col_amount].replace(0, pd.NA) * 100
|
||
).replace([float('inf'), float('-inf')], 0).fillna(0)
|
||
else:
|
||
daily_agg['margin_rate'] = 0
|
||
|
||
# DoD
|
||
if col_amount and col_amount in daily_agg.columns:
|
||
daily_agg['dod_revenue'] = daily_agg[col_amount].pct_change() * 100
|
||
if 'profit' in daily_agg.columns:
|
||
daily_agg['dod_profit'] = daily_agg['profit'].pct_change() * 100
|
||
if 'avg_price' in daily_agg.columns:
|
||
daily_agg['dod_avg_price'] = daily_agg['avg_price'].pct_change() * 100
|
||
if col_qty and col_qty in daily_agg.columns:
|
||
daily_agg['dod_qty'] = daily_agg[col_qty].pct_change() * 100
|
||
|
||
# WoW
|
||
if col_amount and col_amount in daily_agg.columns:
|
||
daily_agg['wow_revenue'] = daily_agg[col_amount].pct_change(periods=7) * 100
|
||
if 'profit' in daily_agg.columns:
|
||
daily_agg['wow_profit'] = daily_agg['profit'].pct_change(periods=7) * 100
|
||
if 'avg_price' in daily_agg.columns:
|
||
daily_agg['wow_avg_price'] = daily_agg['avg_price'].pct_change(periods=7) * 100
|
||
if col_qty and col_qty in daily_agg.columns:
|
||
daily_agg['wow_qty'] = daily_agg[col_qty].pct_change(periods=7) * 100
|
||
|
||
# Top 10 商品
|
||
selected_df = df[df['snapshot_date'] == selected_date]
|
||
top10_labels = []
|
||
top10_values = []
|
||
|
||
if col_name and col_amount:
|
||
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
|
||
if col_vendor:
|
||
top10_df = selected_df.groupby([col_name, col_vendor])[col_amount].sum().nlargest(10).reset_index()
|
||
top10_labels = [f"{row[col_name]} ({row[col_vendor]})" for _, row in top10_df.iterrows()]
|
||
top10_values = top10_df[col_amount].tolist()
|
||
else:
|
||
top10 = selected_df.groupby(col_name)[col_amount].sum().nlargest(10)
|
||
top10_labels = top10.index.tolist()
|
||
top10_values = top10.values.tolist()
|
||
|
||
return {
|
||
'labels': daily_agg['snapshot_date'].dt.strftime('%m/%d').tolist() if not daily_agg.empty else [],
|
||
'revenue': daily_agg[col_amount].tolist() if col_amount and col_amount in daily_agg.columns and not daily_agg.empty else [],
|
||
'cost': daily_agg[col_cost].tolist() if col_cost and col_cost in daily_agg.columns and not daily_agg.empty else [],
|
||
'profit': daily_agg['profit'].tolist() if 'profit' in daily_agg.columns and not daily_agg.empty else [],
|
||
'margin_rate': daily_agg['margin_rate'].tolist() if 'margin_rate' in daily_agg.columns and not daily_agg.empty else [],
|
||
'qty': daily_agg[col_qty].tolist() if col_qty and col_qty in daily_agg.columns and not daily_agg.empty else [],
|
||
'avg_price': daily_agg['avg_price'].tolist() if 'avg_price' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_revenue': daily_agg['dod_revenue'].fillna(0).tolist() if 'dod_revenue' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_profit': daily_agg['dod_profit'].fillna(0).tolist() if 'dod_profit' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_avg_price': daily_agg['dod_avg_price'].fillna(0).tolist() if 'dod_avg_price' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_qty': daily_agg['dod_qty'].fillna(0).tolist() if 'dod_qty' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_revenue': daily_agg['wow_revenue'].fillna(0).tolist() if 'wow_revenue' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_profit': daily_agg['wow_profit'].fillna(0).tolist() if 'wow_profit' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_avg_price': daily_agg['wow_avg_price'].fillna(0).tolist() if 'wow_avg_price' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_qty': daily_agg['wow_qty'].fillna(0).tolist() if 'wow_qty' in daily_agg.columns and not daily_agg.empty else [],
|
||
'top10_labels': top10_labels,
|
||
'top10_values': top10_values
|
||
}
|
||
|
||
|
||
def prepare_category_summary(df, date_str=None, is_month_view=False, month_start=None, month_end=None):
|
||
"""準備分類聚合列表 (支援單日或月度範圍)"""
|
||
if is_month_view and month_start is not None and month_end is not None:
|
||
day_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
|
||
else:
|
||
day_df = df[df['snapshot_date'] == date_str]
|
||
cols = day_df.columns.tolist()
|
||
|
||
col_category = find_col(cols, ['館別', '分類', 'Category'])
|
||
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '總業績'])
|
||
col_cost = find_col(cols, ['成本', '總成本'])
|
||
col_profit = find_col(cols, ['毛利'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名'])
|
||
|
||
if not col_category or not col_amount:
|
||
return []
|
||
|
||
agg_dict = {col_amount: 'sum'}
|
||
if col_cost:
|
||
agg_dict[col_cost] = 'sum'
|
||
if col_profit:
|
||
agg_dict[col_profit] = 'sum'
|
||
if col_qty:
|
||
agg_dict[col_qty] = 'sum'
|
||
if col_name:
|
||
agg_dict[col_name] = 'nunique'
|
||
|
||
if col_vendor:
|
||
category_df = day_df.groupby([col_category, col_vendor]).agg(agg_dict).reset_index()
|
||
else:
|
||
category_df = day_df.groupby(col_category).agg(agg_dict).reset_index()
|
||
|
||
if col_profit and col_profit in category_df.columns:
|
||
pass
|
||
elif col_amount and col_cost and col_amount in category_df.columns and col_cost in category_df.columns:
|
||
category_df['profit_calculated'] = category_df[col_amount] - category_df[col_cost]
|
||
col_profit = 'profit_calculated'
|
||
else:
|
||
col_profit = None
|
||
|
||
if col_profit and col_profit in category_df.columns and col_amount and col_amount in category_df.columns:
|
||
category_df['margin_rate'] = (category_df[col_profit] / category_df[col_amount] * 100).fillna(0)
|
||
else:
|
||
category_df['margin_rate'] = 0
|
||
|
||
if col_qty and col_amount:
|
||
category_df['avg_price'] = (category_df[col_amount] / category_df[col_qty]).fillna(0)
|
||
else:
|
||
category_df['avg_price'] = 0
|
||
|
||
rename_dict = {col_category: 'category', col_amount: 'revenue'}
|
||
if col_vendor:
|
||
rename_dict[col_vendor] = 'vendor'
|
||
if col_cost:
|
||
rename_dict[col_cost] = 'cost'
|
||
if col_profit and col_profit in category_df.columns:
|
||
rename_dict[col_profit] = 'profit'
|
||
if col_qty:
|
||
rename_dict[col_qty] = 'qty'
|
||
if col_name:
|
||
rename_dict[col_name] = 'sku_count'
|
||
|
||
category_df = category_df.rename(columns=rename_dict)
|
||
|
||
if 'profit' not in category_df.columns:
|
||
category_df['profit'] = 0
|
||
if 'revenue' in category_df.columns:
|
||
category_df = category_df.sort_values(by='revenue', ascending=False)
|
||
|
||
return category_df.to_dict('records')
|
||
|
||
|
||
def prepare_category_chart(category_records, limit=12):
|
||
"""將分類明細壓成圖表需要的前 N 分類,避免前端再做重資料整理。"""
|
||
if not category_records:
|
||
return {
|
||
'labels': [],
|
||
'revenue': [],
|
||
'profit': [],
|
||
'margin_rate': [],
|
||
'qty': [],
|
||
}
|
||
|
||
category_df = pd.DataFrame(category_records)
|
||
if category_df.empty or 'category' not in category_df.columns:
|
||
return {
|
||
'labels': [],
|
||
'revenue': [],
|
||
'profit': [],
|
||
'margin_rate': [],
|
||
'qty': [],
|
||
}
|
||
|
||
for column in ['revenue', 'profit', 'qty']:
|
||
if column not in category_df.columns:
|
||
category_df[column] = 0
|
||
category_df[column] = pd.to_numeric(category_df[column], errors='coerce').fillna(0)
|
||
|
||
grouped = (
|
||
category_df.groupby('category', dropna=False)
|
||
.agg({'revenue': 'sum', 'profit': 'sum', 'qty': 'sum'})
|
||
.reset_index()
|
||
.sort_values(by='revenue', ascending=False)
|
||
.head(limit)
|
||
)
|
||
grouped['margin_rate'] = (
|
||
grouped['profit'] / grouped['revenue'].replace(0, pd.NA) * 100
|
||
).replace([float('inf'), float('-inf')], 0).fillna(0)
|
||
|
||
return {
|
||
'labels': grouped['category'].fillna('未分類').astype(str).tolist(),
|
||
'revenue': grouped['revenue'].tolist(),
|
||
'profit': grouped['profit'].tolist(),
|
||
'margin_rate': grouped['margin_rate'].tolist(),
|
||
'qty': grouped['qty'].tolist(),
|
||
}
|
||
|
||
|
||
# ==========================================
|
||
# 頁面路由
|
||
# ==========================================
|
||
|
||
@daily_sales_bp.route('/daily_sales')
|
||
@login_required
|
||
def daily_sales():
|
||
"""當日業績看板 (Day-over-Day 與 Week-over-Week 分析)"""
|
||
now_taipei = datetime.now(TAIPEI_TZ)
|
||
datetime_now_str = now_taipei.strftime('%Y-%m-%d %H:%M:%S')
|
||
try:
|
||
db = DatabaseManager()
|
||
engine = db.engine
|
||
table_name = 'daily_sales_snapshot'
|
||
|
||
inspector = inspect(engine)
|
||
if table_name not in inspector.get_table_names():
|
||
return render_template('daily_sales.html',
|
||
error="尚未匯入當日業績資料,請先至系統設定頁面匯入 Excel。",
|
||
selected_date=None, available_dates=[], current=None, dod=None, wow=None,
|
||
chart_data=None, categories=None, calendar_data=None, selected_month=None,
|
||
category_chart=None,
|
||
datetime_now=datetime_now_str, active_page='daily_sales')
|
||
|
||
available_dates, current_fingerprint = _get_daily_sales_metadata(engine, table_name)
|
||
if not available_dates:
|
||
return render_template('daily_sales.html',
|
||
error="資料表為空,請先匯入當日業績資料。",
|
||
selected_date=None, available_dates=[], current=None, dod=None, wow=None,
|
||
chart_data=None, categories=None, calendar_data=None, selected_month=None,
|
||
category_chart=None,
|
||
datetime_now=datetime_now_str, active_page='daily_sales')
|
||
|
||
available_dates_str = [d.strftime('%Y-%m-%d') for d in available_dates]
|
||
|
||
selected_date_param = request.args.get('date')
|
||
if selected_date_param:
|
||
selected_date = pd.to_datetime(selected_date_param)
|
||
else:
|
||
selected_date = available_dates[0]
|
||
|
||
selected_month_param = request.args.get('month')
|
||
if selected_month_param:
|
||
selected_month = pd.to_datetime(selected_month_param)
|
||
else:
|
||
selected_month = selected_date
|
||
|
||
is_month_view = not selected_date_param and not request.args.get('month')
|
||
if selected_month_param and not selected_date_param:
|
||
is_month_view = True
|
||
show_all_categories = request.args.get('detail') == 'all'
|
||
|
||
month_start = selected_month.replace(day=1)
|
||
month_end = (month_start + pd.DateOffset(months=1)) - pd.Timedelta(days=1)
|
||
data_start = min(
|
||
selected_date - pd.Timedelta(days=30),
|
||
selected_date - pd.Timedelta(days=7),
|
||
month_start - pd.Timedelta(days=1),
|
||
)
|
||
data_end = max(selected_date, month_end)
|
||
|
||
view_cache_key = "|".join([
|
||
table_name,
|
||
'view',
|
||
selected_date.strftime('%Y-%m-%d'),
|
||
selected_month.strftime('%Y-%m'),
|
||
'month' if is_month_view else 'day',
|
||
'category_all' if show_all_categories else f'category_top{_CATEGORY_TABLE_DEFAULT_LIMIT}',
|
||
str(current_fingerprint),
|
||
])
|
||
cached_context = _get_daily_view_cache(view_cache_key) or _get_shared_daily_view_cache(view_cache_key)
|
||
if cached_context:
|
||
_set_daily_view_cache(view_cache_key, cached_context)
|
||
return render_template('daily_sales.html', **cached_context)
|
||
|
||
cache_key = f"{table_name}_daily_{data_start.strftime('%Y%m%d')}_{data_end.strftime('%Y%m%d')}"
|
||
# TTL + DB fingerprint 雙閘檢查(資料變動即自動失效,跨 worker 強一致)
|
||
if _is_cache_valid(cache_key) and _SALES_PROCESSED_CACHE[cache_key].get('fingerprint') == current_fingerprint:
|
||
df = _SALES_PROCESSED_CACHE[cache_key]['df']
|
||
sys_log.debug(f"使用緩存數據,剩餘有效時間: {_CACHE_EXPIRY_SECONDS - (datetime.now() - _SALES_PROCESSED_CACHE[cache_key]['timestamp']).total_seconds():.0f}秒")
|
||
else:
|
||
df = _read_daily_sales_window(engine, table_name, data_start, data_end)
|
||
if df.empty:
|
||
return render_template('daily_sales.html',
|
||
error="所選日期區間沒有業績資料。",
|
||
selected_date=None, available_dates=available_dates_str, current=None, dod=None, wow=None,
|
||
chart_data=None, categories=None, calendar_data=None, selected_month=None,
|
||
category_chart=None,
|
||
datetime_now=datetime_now_str, active_page='daily_sales')
|
||
|
||
df = preprocess_daily_sales_data(df)
|
||
_SALES_PROCESSED_CACHE[cache_key] = {
|
||
'df': df, 'timestamp': datetime.now(),
|
||
'fingerprint': current_fingerprint,
|
||
}
|
||
sys_log.info(
|
||
f"已載入當日業績窗口 {data_start.strftime('%Y-%m-%d')}~{data_end.strftime('%Y-%m-%d')},共 {len(df)} 筆記錄"
|
||
)
|
||
|
||
current_kpi = calculate_daily_kpis(df, selected_date)
|
||
dod_kpi = calculate_dod(df, selected_date)
|
||
wow_kpi = calculate_wow(df, selected_date)
|
||
|
||
month_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
|
||
|
||
cols = month_df.columns.tolist()
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
|
||
col_profit = find_col(cols, ['毛利', 'Profit'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
|
||
|
||
month_kpi = {
|
||
'total_revenue': float(month_df[col_amount].sum()) if col_amount else 0,
|
||
'total_cost': float(month_df[col_cost].sum()) if col_cost else 0,
|
||
'gross_margin': float(month_df[col_profit].sum()) if col_profit else 0,
|
||
'total_qty': float(month_df[col_qty].sum()) if col_qty else 0,
|
||
'sku_count': int(month_df[col_name].nunique()) if col_name else 0,
|
||
'days_with_data': int(month_df['snapshot_date'].nunique())
|
||
}
|
||
if not col_profit and col_amount and col_cost:
|
||
month_kpi['gross_margin'] = month_kpi['total_revenue'] - month_kpi['total_cost']
|
||
if month_kpi['total_revenue'] > 0:
|
||
month_kpi['margin_rate'] = month_kpi['gross_margin'] / month_kpi['total_revenue'] * 100
|
||
else:
|
||
month_kpi['margin_rate'] = 0
|
||
if month_kpi['total_qty'] > 0:
|
||
month_kpi['avg_price'] = month_kpi['total_revenue'] / month_kpi['total_qty']
|
||
else:
|
||
month_kpi['avg_price'] = 0
|
||
|
||
chart_data = prepare_daily_charts(df, selected_date, days=30)
|
||
try:
|
||
competitor_intel = build_competitor_intel_payload(engine, days=30)
|
||
except Exception as exc:
|
||
sys_log.warning(f"[DailySales] PChome 競價情報讀取失敗,略過圖表串接: {exc}")
|
||
competitor_intel = None
|
||
category_list = prepare_category_summary(
|
||
df,
|
||
date_str=selected_date,
|
||
is_month_view=is_month_view,
|
||
month_start=month_start if is_month_view else None,
|
||
month_end=month_end if is_month_view else None
|
||
)
|
||
category_chart = prepare_category_chart(category_list, limit=12)
|
||
category_total_count = len(category_list)
|
||
if show_all_categories:
|
||
visible_categories = category_list
|
||
else:
|
||
visible_categories = category_list[:_CATEGORY_TABLE_DEFAULT_LIMIT]
|
||
calendar_data = prepare_calendar_data(df, selected_month)
|
||
marketing_data = prepare_marketing_summary(
|
||
df,
|
||
selected_date=selected_date if not is_month_view else None,
|
||
is_month_view=is_month_view,
|
||
month_start=month_start if is_month_view else None,
|
||
month_end=month_end if is_month_view else None
|
||
)
|
||
|
||
context = {
|
||
'selected_date': selected_date.strftime('%Y-%m-%d') if isinstance(selected_date, pd.Timestamp) else selected_date,
|
||
'available_dates': available_dates_str,
|
||
'current': current_kpi,
|
||
'dod': dod_kpi,
|
||
'wow': wow_kpi,
|
||
'month_kpi': month_kpi,
|
||
'is_month_view': is_month_view,
|
||
'chart_data': chart_data,
|
||
'competitor_intel': competitor_intel,
|
||
'categories': visible_categories,
|
||
'category_chart': category_chart,
|
||
'category_total_count': category_total_count,
|
||
'category_visible_count': len(visible_categories),
|
||
'category_table_limit': _CATEGORY_TABLE_DEFAULT_LIMIT,
|
||
'category_show_all': show_all_categories,
|
||
'category_show_all_url': _daily_sales_url_with_detail('all'),
|
||
'category_limited_url': _daily_sales_url_with_detail(None),
|
||
'calendar_data': calendar_data,
|
||
'marketing_data': marketing_data,
|
||
'selected_month': selected_month.strftime('%Y-%m') if isinstance(selected_month, pd.Timestamp) else selected_month,
|
||
'datetime_now': datetime_now_str,
|
||
'active_page': 'daily_sales',
|
||
}
|
||
_set_daily_view_cache(view_cache_key, context)
|
||
_set_shared_daily_view_cache(view_cache_key, context)
|
||
return render_template('daily_sales.html', **context)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [DailySales] Error: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return render_template('daily_sales.html',
|
||
error=f"系統錯誤: {str(e)}",
|
||
selected_date=None,
|
||
available_dates=[],
|
||
current=None,
|
||
dod=None,
|
||
wow=None,
|
||
month_kpi=None,
|
||
is_month_view=False,
|
||
chart_data=None,
|
||
categories=None,
|
||
category_chart=None,
|
||
calendar_data=None,
|
||
marketing_data=None,
|
||
selected_month=None,
|
||
datetime_now=datetime_now_str,
|
||
active_page='daily_sales')
|
||
|
||
|
||
@daily_sales_bp.route('/daily_sales/export')
|
||
@login_required
|
||
def export_daily_sales_category():
|
||
"""匯出當日業績分類明細為 Excel"""
|
||
try:
|
||
db = DatabaseManager()
|
||
engine = db.engine
|
||
table_name = 'daily_sales_snapshot'
|
||
|
||
inspector = inspect(engine)
|
||
if table_name not in inspector.get_table_names():
|
||
return "資料表不存在", 404
|
||
|
||
cache_key = f'{table_name}_daily'
|
||
if _is_cache_valid(cache_key, engine, table_name):
|
||
df = _SALES_PROCESSED_CACHE[cache_key]['df']
|
||
else:
|
||
df = safe_read_sql(table_name, engine=engine)
|
||
df = preprocess_daily_sales_data(df)
|
||
_SALES_PROCESSED_CACHE[cache_key] = {
|
||
'df': df, 'timestamp': datetime.now(),
|
||
'fingerprint': _get_data_fingerprint(engine, table_name),
|
||
}
|
||
|
||
selected_date = request.args.get('date')
|
||
if not selected_date:
|
||
available_dates = sorted(df['snapshot_date'].unique(), reverse=True)
|
||
if available_dates:
|
||
selected_date = str(available_dates[0])
|
||
else:
|
||
return "無可用日期", 404
|
||
|
||
categories = prepare_category_summary(df, selected_date)
|
||
|
||
if not categories:
|
||
return "無資料可匯出", 404
|
||
|
||
export_df = pd.DataFrame(categories)
|
||
|
||
column_mapping = {
|
||
'category': '分類',
|
||
'vendor': '廠商',
|
||
'revenue': '總業績',
|
||
'cost': '總成本',
|
||
'profit': '毛利',
|
||
'margin_rate': '毛利率(%)',
|
||
'qty': '總銷量',
|
||
'sku_count': 'SKU數',
|
||
'avg_price': '平均單價'
|
||
}
|
||
|
||
export_columns = [col for col in column_mapping.keys() if col in export_df.columns]
|
||
export_df = export_df[export_columns]
|
||
export_df = export_df.rename(columns=column_mapping)
|
||
|
||
for col in export_df.columns:
|
||
if col in ['總業績', '總成本', '毛利', '總銷量', 'SKU數', '平均單價']:
|
||
export_df[col] = export_df[col].apply(lambda x: f"{x:,.0f}" if pd.notna(x) else "0")
|
||
elif col == '毛利率(%)':
|
||
export_df[col] = export_df[col].apply(lambda x: f"{x:.1f}" if pd.notna(x) else "0.0")
|
||
|
||
filename = f"當日業績_分類明細_{selected_date}.xlsx"
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
export_df.to_excel(writer, index=False, sheet_name='分類業績明細')
|
||
|
||
worksheet = writer.sheets['分類業績明細']
|
||
for idx, col in enumerate(export_df.columns, 1):
|
||
max_length = max(
|
||
export_df[col].astype(str).apply(len).max(),
|
||
len(col)
|
||
) + 2
|
||
worksheet.column_dimensions[chr(64 + idx)].width = min(max_length, 50)
|
||
|
||
output.seek(0)
|
||
|
||
sys_log.info(f"[Web] [DailySales] Excel 匯出成功: {filename}")
|
||
|
||
return send_file(
|
||
output,
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||
as_attachment=True,
|
||
download_name=filename
|
||
)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [DailySales] Excel 匯出失敗: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return f"匯出失敗: {str(e)}", 500
|
||
|
||
|
||
@daily_sales_bp.route('/daily_sales/export_marketing')
|
||
@login_required
|
||
def export_marketing_summary_excel():
|
||
"""匯出行銷活動業績明細為 Excel"""
|
||
try:
|
||
db = DatabaseManager()
|
||
engine = db.engine
|
||
table_name = 'daily_sales_snapshot'
|
||
|
||
cache_key = f'{table_name}_daily'
|
||
if _is_cache_valid(cache_key, engine, table_name):
|
||
df = _SALES_PROCESSED_CACHE[cache_key]['df']
|
||
else:
|
||
df = safe_read_sql(table_name, engine=engine)
|
||
df = preprocess_daily_sales_data(df)
|
||
_SALES_PROCESSED_CACHE[cache_key] = {
|
||
'df': df, 'timestamp': datetime.now(),
|
||
'fingerprint': _get_data_fingerprint(engine, table_name),
|
||
}
|
||
|
||
activity_type = request.args.get('type', 'all')
|
||
start_date = request.args.get('start_date')
|
||
end_date = request.args.get('end_date')
|
||
selected_date = request.args.get('date')
|
||
|
||
selected_category = request.args.get('category', 'all')
|
||
selected_brand = request.args.get('brand', 'all')
|
||
selected_vendor = request.args.get('vendor', 'all')
|
||
keyword = request.args.get('keyword', '')
|
||
|
||
if start_date and end_date:
|
||
df = df[(df['snapshot_date'] >= pd.to_datetime(start_date)) &
|
||
(df['snapshot_date'] <= pd.to_datetime(end_date))]
|
||
date_label = f"{start_date}_{end_date}"
|
||
elif selected_date:
|
||
df = df[df['snapshot_date'] == pd.to_datetime(selected_date)]
|
||
date_label = selected_date
|
||
else:
|
||
date_label = "全部"
|
||
|
||
cols = df.columns.tolist()
|
||
col_category = find_col(cols, ['館別', '商品館', '分類', 'Category'])
|
||
col_brand = find_col(cols, ['品牌', 'Brand'])
|
||
col_vendor = find_col(cols, ['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor', 'Supplier'])
|
||
col_name = find_col(cols, ['商品名稱', '品名'])
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
|
||
if selected_category != 'all' and col_category:
|
||
df = df[df[col_category] == selected_category]
|
||
if selected_brand != 'all' and col_brand:
|
||
df = df[df[col_brand] == selected_brand]
|
||
if selected_vendor != 'all' and col_vendor:
|
||
df = df[df[col_vendor] == selected_vendor]
|
||
if keyword and col_name:
|
||
df = df[df[col_name].str.contains(keyword, case=False, na=False)]
|
||
|
||
marketing_cols = {
|
||
'coupon': ('折價券活動名稱', '折價券活動'),
|
||
'discount': ('折扣活動名稱', '折扣活動'),
|
||
'bonus': ('滿額再折扣活動名稱', '滿額再折扣'),
|
||
'click': ('點我再折扣', '點我再折扣')
|
||
}
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
types_to_export = [activity_type] if activity_type != 'all' else ['coupon', 'discount', 'bonus', 'click']
|
||
|
||
summary_rows = []
|
||
|
||
for t in types_to_export:
|
||
if t not in marketing_cols:
|
||
continue
|
||
col_internal, sheet_label = marketing_cols[t]
|
||
if col_internal not in df.columns:
|
||
continue
|
||
|
||
m_df = df[df[col_internal].notna() & (df[col_internal] != '') & (df[col_internal] != '0') & (df[col_internal] != 0)]
|
||
|
||
if m_df.empty:
|
||
continue
|
||
|
||
grouped = m_df.groupby(col_internal).agg({
|
||
col_amount: 'sum',
|
||
col_qty: 'sum',
|
||
col_name: 'count'
|
||
}).reset_index()
|
||
|
||
grouped.columns = ['活動名稱', '總業績', '總銷量', '項目筆數']
|
||
grouped = grouped.sort_values(by='總業績', ascending=False)
|
||
|
||
grouped.to_excel(writer, sheet_name=sheet_label[:31], index=False)
|
||
|
||
grouped['活動類型'] = sheet_label
|
||
summary_rows.append(grouped)
|
||
|
||
if len(summary_rows) > 1:
|
||
all_m_df = pd.concat(summary_rows).sort_values(by='總業績', ascending=False)
|
||
all_m_df = all_m_df[['活動類型', '活動名稱', '總業績', '總銷量', '項目筆數']]
|
||
all_m_df.to_excel(writer, sheet_name='合併總表', index=False)
|
||
|
||
output.seek(0)
|
||
|
||
filename = f"行銷活動分析_{date_label}.xlsx"
|
||
|
||
return send_file(
|
||
output,
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||
as_attachment=True,
|
||
download_name=filename,
|
||
conditional=True
|
||
)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Marketing] Excel 匯出失敗: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return f"匯出失敗: {str(e)}", 500
|