Files
ewoooc/routes/daily_sales_routes.py
OoO 1dd4181fab
All checks were successful
CD Pipeline / deploy (push) Successful in 1m17s
V10.585 提升比價覆蓋工作台與每日業績圖表
2026-06-04 19:12:34 +08:00

1015 lines
41 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
當日業績路由模組
包含:當日業績看板、分類明細匯出、行銷活動分析
"""
import io
import calendar
import hashlib
import os
import pickle
from datetime import datetime, timezone, timedelta
from urllib.parse import quote
from flask import Blueprint, request, render_template, send_file, url_for
from auth import login_required
from sqlalchemy import inspect, text
import pandas as pd
from config import BASE_DIR, SYSTEM_VERSION
from database.manager import DatabaseManager
from services.logger_manager import SystemLogger
from utils.df_helpers import find_col
from services.daily_sales_service import (
get_taiwan_holiday,
prepare_calendar_data,
prepare_marketing_summary,
)
from services.competitor_intel_repository import build_competitor_intel_payload
from services.cache_manager import (
_DAILY_SALES_PROCESSED_CACHE as _SALES_PROCESSED_CACHE,
_DAILY_SALES_VIEW_CACHE_DIR,
clear_daily_sales_cache as _clear_daily_sales_cache,
)
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Logger
sys_log = SystemLogger("DailySalesRoutes").get_logger()
# Blueprint 定義
daily_sales_bp = Blueprint('daily_sales', __name__)
_CACHE_EXPIRY_SECONDS = 300 # 5 分鐘緩存過期
_VIEW_CACHE_EXPIRY_SECONDS = 120
_SHARED_VIEW_CACHE_EXPIRY_SECONDS = 1800
_DAILY_SALES_VIEW_CACHE = {}
_DAILY_SALES_VIEW_CACHE_MAX = 24
_CATEGORY_TABLE_DEFAULT_LIMIT = 120
def _get_daily_view_cache(cache_key):
cache_data = _DAILY_SALES_VIEW_CACHE.get(cache_key)
if not cache_data:
return None
elapsed = (datetime.now() - cache_data['timestamp']).total_seconds()
if elapsed >= _VIEW_CACHE_EXPIRY_SECONDS:
_DAILY_SALES_VIEW_CACHE.pop(cache_key, None)
return None
return cache_data['context']
def _set_daily_view_cache(cache_key, context):
if len(_DAILY_SALES_VIEW_CACHE) >= _DAILY_SALES_VIEW_CACHE_MAX:
oldest_key = min(
_DAILY_SALES_VIEW_CACHE,
key=lambda key: _DAILY_SALES_VIEW_CACHE[key]['timestamp']
)
_DAILY_SALES_VIEW_CACHE.pop(oldest_key, None)
_DAILY_SALES_VIEW_CACHE[cache_key] = {
'context': context,
'timestamp': datetime.now()
}
def _daily_view_cache_file(cache_key):
digest = hashlib.md5(cache_key.encode('utf-8'), usedforsecurity=False).hexdigest()
return _DAILY_SALES_VIEW_CACHE_DIR / f"{digest}.pkl"
def _get_shared_daily_view_cache(cache_key):
path = _daily_view_cache_file(cache_key)
if not path.exists():
return None
try:
if (datetime.now().timestamp() - path.stat().st_mtime) >= _SHARED_VIEW_CACHE_EXPIRY_SECONDS:
path.unlink(missing_ok=True)
return None
with open(path, 'rb') as f:
payload = pickle.load(f)
if payload.get('version') != SYSTEM_VERSION:
return None
return payload.get('context')
except Exception as exc:
sys_log.warning(f"[DailySales] 共享 view cache 讀取失敗: {exc}")
return None
def _set_shared_daily_view_cache(cache_key, context):
path = _daily_view_cache_file(cache_key)
tmp_path = path.with_suffix(f".{os.getpid()}.tmp")
try:
os.makedirs(path.parent, exist_ok=True)
with open(tmp_path, 'wb') as f:
pickle.dump({'version': SYSTEM_VERSION, 'context': context}, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_path, path)
except Exception as exc:
sys_log.warning(f"[DailySales] 共享 view cache 寫入失敗: {exc}")
try:
if tmp_path.exists():
tmp_path.unlink()
except OSError:
pass
def _daily_sales_url_with_detail(detail_value):
args = request.args.to_dict(flat=True)
if detail_value == 'all':
args['detail'] = 'all'
else:
args.pop('detail', None)
return url_for('daily_sales.daily_sales', **args)
def clear_daily_sales_cache():
"""清除當日業績緩存(供匯入服務調用)"""
_clear_daily_sales_cache()
_DAILY_SALES_VIEW_CACHE.clear()
sys_log.info("已清除當日業績緩存")
# === 清除緩存 API 端點 ===
@daily_sales_bp.route('/api/daily_sales/clear_cache', methods=['POST'])
@login_required
def api_clear_daily_sales_cache():
"""手動清除快取(保留為 ops escape hatch正常失效靠 DB fingerprint"""
cache_size = len(_SALES_PROCESSED_CACHE) + len(_DAILY_SALES_VIEW_CACHE)
_clear_daily_sales_cache()
_DAILY_SALES_VIEW_CACHE.clear()
sys_log.info(f"[API] 已清除當日業績緩存 (原有 {cache_size} 個緩存項目)")
return {'success': True, 'message': f'已清除 {cache_size} 個緩存項目'}
def _get_data_fingerprint(engine, table_name='daily_sales_snapshot'):
"""DB 指紋 (max_snapshot_date, row_count):資料一變動指紋就跳號,
讓 4 worker 的 in-memory cache 在下一次 request 自然失效。"""
try:
validate_table_name(table_name)
if engine.dialect.name == 'postgresql':
fingerprint_sql = f'SELECT MAX("snapshot_date"::date)::text, COUNT(*) FROM "{table_name}"'
else:
fingerprint_sql = f'SELECT CAST(MAX(snapshot_date) AS TEXT), COUNT(*) FROM "{table_name}"'
with engine.connect() as conn:
row = conn.execute(text(fingerprint_sql)).fetchone()
return (row[0], row[1]) if row else (None, 0)
except Exception as e:
sys_log.warning(f"[Cache] fingerprint 查詢失敗(保守維持現有 cache: {e}")
return None
def _is_cache_valid(cache_key, engine=None, table_name='daily_sales_snapshot'):
if cache_key not in _SALES_PROCESSED_CACHE:
return False
cache_data = _SALES_PROCESSED_CACHE[cache_key]
if 'timestamp' not in cache_data:
return False
elapsed = (datetime.now() - cache_data['timestamp']).total_seconds()
if elapsed >= _CACHE_EXPIRY_SECONDS:
return False
if engine is not None:
current_fp = _get_data_fingerprint(engine, table_name)
if current_fp is None:
return True
if current_fp != cache_data.get('fingerprint'):
return False
return True
def _snapshot_date_expr(engine):
if engine.dialect.name == 'postgresql':
return '"snapshot_date"::date'
return 'date("snapshot_date")'
def _snapshot_date_window_clause(engine):
date_expr = _snapshot_date_expr(engine)
if engine.dialect.name == 'postgresql':
return f'{date_expr} >= CAST(:start_date AS date) AND {date_expr} <= CAST(:end_date AS date)'
return f'{date_expr} >= date(:start_date) AND {date_expr} <= date(:end_date)'
def _date_param(value):
return pd.to_datetime(value).strftime('%Y-%m-%d')
def _get_available_daily_dates(engine, table_name='daily_sales_snapshot'):
"""取得可選日期清單,避免為了 date selector 載入整張業績表。"""
validate_table_name(table_name)
date_expr = _snapshot_date_expr(engine)
query = text(
f'SELECT DISTINCT {date_expr} AS snapshot_date '
f'FROM "{table_name}" '
'WHERE "snapshot_date" IS NOT NULL '
'ORDER BY snapshot_date DESC'
)
with engine.connect() as conn:
rows = conn.execute(query).fetchall()
dates = []
for row in rows:
try:
dates.append(pd.to_datetime(row[0]).normalize())
except Exception:
continue
return dates
def _get_daily_sales_metadata(engine, table_name='daily_sales_snapshot'):
"""一次取得日期選單與資料指紋,避免首屏每次掃兩輪 daily snapshot。"""
validate_table_name(table_name)
if engine.dialect.name != 'postgresql':
return (
_get_available_daily_dates(engine, table_name),
_get_data_fingerprint(engine, table_name),
)
query = text(
f'SELECT MAX("snapshot_date"::date)::text AS max_snapshot_date, '
f'COUNT(*) AS row_count, '
f'ARRAY_AGG(DISTINCT "snapshot_date"::date ORDER BY "snapshot_date"::date DESC) '
f'FILTER (WHERE "snapshot_date" IS NOT NULL) AS available_dates '
f'FROM "{table_name}"'
)
try:
with engine.connect() as conn:
row = conn.execute(query).fetchone()
if not row:
return [], (None, 0)
raw_dates = row[2] or []
dates = []
for raw_date in raw_dates:
try:
dates.append(pd.to_datetime(raw_date).normalize())
except Exception:
continue
return dates, (row[0], row[1] or 0)
except Exception as exc:
sys_log.warning(f"[DailySales] metadata 查詢失敗,改用相容查詢: {exc}")
return (
_get_available_daily_dates(engine, table_name),
_get_data_fingerprint(engine, table_name),
)
def _read_daily_sales_window(engine, table_name, start_date, end_date):
"""只讀取畫面需要的日期窗口,降低冷 worker 首頁等待時間。"""
table_name = validate_table_name(table_name)
date_expr = _snapshot_date_expr(engine)
where_clause = _snapshot_date_window_clause(engine)
query = text(
f'SELECT * FROM "{table_name}" '
f'WHERE {where_clause} '
f'ORDER BY {date_expr} ASC'
)
return pd.read_sql(
query,
engine,
params={
'start_date': _date_param(start_date),
'end_date': _date_param(end_date),
},
)
# ==========================================
# 輔助函數
# ==========================================
# 共用工具改 import 自 utils去重原本檔案內定義已移除
from utils.security import validate_table_name, safe_read_sql # noqa: E402, F401
def preprocess_daily_sales_data(df):
"""前處理當日業績資料:欄位識別、型別轉換"""
cols = df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', 'Amount', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
if col_amount:
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
if col_cost:
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
if col_profit:
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
if col_qty:
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'], errors='coerce')
return df
def calculate_daily_kpis(df, date_str):
"""計算單日 6 個 KPI"""
day_df = df[df['snapshot_date'] == date_str]
cols = day_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
total_revenue = float(day_df[col_amount].sum()) if col_amount else 0
total_cost = float(day_df[col_cost].sum()) if col_cost else 0
gross_margin = float(day_df[col_profit].sum()) if col_profit else (total_revenue - total_cost)
total_qty = float(day_df[col_qty].sum()) if col_qty else 0
sku_count = int(day_df[col_name].nunique()) if col_name else 0
avg_price = total_revenue / total_qty if total_qty > 0 else 0
return {
'total_revenue': total_revenue,
'total_cost': total_cost,
'gross_margin': gross_margin,
'total_qty': total_qty,
'sku_count': sku_count,
'avg_price': avg_price
}
def calculate_dod(df, current_date):
"""計算 Day-over-Day 變化率"""
current = calculate_daily_kpis(df, current_date)
prev_date = current_date - timedelta(days=1)
if prev_date not in df['snapshot_date'].values:
return {k: 0.0 for k in current.keys()}
previous = calculate_daily_kpis(df, prev_date)
dod = {}
for key in current:
if previous[key] > 0:
dod[key] = ((current[key] - previous[key]) / previous[key]) * 100
else:
dod[key] = 0.0
return dod
def calculate_wow(df, current_date):
"""計算 Week-over-Week 變化率"""
current = calculate_daily_kpis(df, current_date)
prev_week_date = current_date - timedelta(days=7)
if prev_week_date not in df['snapshot_date'].values:
return {k: 0.0 for k in current.keys()}
previous = calculate_daily_kpis(df, prev_week_date)
wow = {}
for key in current:
if previous[key] > 0:
wow[key] = ((current[key] - previous[key]) / previous[key]) * 100
else:
wow[key] = 0.0
return wow
def prepare_daily_charts(df, selected_date, days=30):
"""準備 4 個圖表的數據(根據選擇的日期)"""
start_date = selected_date - timedelta(days=days)
df_range = df[(df['snapshot_date'] >= start_date) & (df['snapshot_date'] <= selected_date)]
cols = df_range.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', '總成本'])
col_profit = find_col(cols, ['毛利'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
agg_dict = {}
if col_amount:
agg_dict[col_amount] = 'sum'
if col_cost:
agg_dict[col_cost] = 'sum'
if col_profit:
agg_dict[col_profit] = 'sum'
if col_qty:
agg_dict[col_qty] = 'sum'
daily_agg = df_range.groupby('snapshot_date').agg(agg_dict).reset_index()
if col_profit and col_profit in daily_agg.columns:
daily_agg['profit'] = daily_agg[col_profit]
elif col_amount and col_cost and col_amount in daily_agg.columns and col_cost in daily_agg.columns:
daily_agg['profit'] = daily_agg[col_amount] - daily_agg[col_cost]
else:
daily_agg['profit'] = 0
if col_amount and col_qty and col_amount in daily_agg.columns and col_qty in daily_agg.columns:
daily_agg['avg_price'] = (daily_agg[col_amount] / daily_agg[col_qty]).fillna(0)
else:
daily_agg['avg_price'] = 0
if col_amount and col_amount in daily_agg.columns:
daily_agg['margin_rate'] = (
daily_agg['profit'] / daily_agg[col_amount].replace(0, pd.NA) * 100
).replace([float('inf'), float('-inf')], 0).fillna(0)
else:
daily_agg['margin_rate'] = 0
# DoD
if col_amount and col_amount in daily_agg.columns:
daily_agg['dod_revenue'] = daily_agg[col_amount].pct_change() * 100
if 'profit' in daily_agg.columns:
daily_agg['dod_profit'] = daily_agg['profit'].pct_change() * 100
if 'avg_price' in daily_agg.columns:
daily_agg['dod_avg_price'] = daily_agg['avg_price'].pct_change() * 100
if col_qty and col_qty in daily_agg.columns:
daily_agg['dod_qty'] = daily_agg[col_qty].pct_change() * 100
# WoW
if col_amount and col_amount in daily_agg.columns:
daily_agg['wow_revenue'] = daily_agg[col_amount].pct_change(periods=7) * 100
if 'profit' in daily_agg.columns:
daily_agg['wow_profit'] = daily_agg['profit'].pct_change(periods=7) * 100
if 'avg_price' in daily_agg.columns:
daily_agg['wow_avg_price'] = daily_agg['avg_price'].pct_change(periods=7) * 100
if col_qty and col_qty in daily_agg.columns:
daily_agg['wow_qty'] = daily_agg[col_qty].pct_change(periods=7) * 100
# Top 10 商品
selected_df = df[df['snapshot_date'] == selected_date]
top10_labels = []
top10_values = []
if col_name and col_amount:
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
if col_vendor:
top10_df = selected_df.groupby([col_name, col_vendor])[col_amount].sum().nlargest(10).reset_index()
top10_labels = [f"{row[col_name]} ({row[col_vendor]})" for _, row in top10_df.iterrows()]
top10_values = top10_df[col_amount].tolist()
else:
top10 = selected_df.groupby(col_name)[col_amount].sum().nlargest(10)
top10_labels = top10.index.tolist()
top10_values = top10.values.tolist()
return {
'labels': daily_agg['snapshot_date'].dt.strftime('%m/%d').tolist() if not daily_agg.empty else [],
'revenue': daily_agg[col_amount].tolist() if col_amount and col_amount in daily_agg.columns and not daily_agg.empty else [],
'cost': daily_agg[col_cost].tolist() if col_cost and col_cost in daily_agg.columns and not daily_agg.empty else [],
'profit': daily_agg['profit'].tolist() if 'profit' in daily_agg.columns and not daily_agg.empty else [],
'margin_rate': daily_agg['margin_rate'].tolist() if 'margin_rate' in daily_agg.columns and not daily_agg.empty else [],
'qty': daily_agg[col_qty].tolist() if col_qty and col_qty in daily_agg.columns and not daily_agg.empty else [],
'avg_price': daily_agg['avg_price'].tolist() if 'avg_price' in daily_agg.columns and not daily_agg.empty else [],
'dod_revenue': daily_agg['dod_revenue'].fillna(0).tolist() if 'dod_revenue' in daily_agg.columns and not daily_agg.empty else [],
'dod_profit': daily_agg['dod_profit'].fillna(0).tolist() if 'dod_profit' in daily_agg.columns and not daily_agg.empty else [],
'dod_avg_price': daily_agg['dod_avg_price'].fillna(0).tolist() if 'dod_avg_price' in daily_agg.columns and not daily_agg.empty else [],
'dod_qty': daily_agg['dod_qty'].fillna(0).tolist() if 'dod_qty' in daily_agg.columns and not daily_agg.empty else [],
'wow_revenue': daily_agg['wow_revenue'].fillna(0).tolist() if 'wow_revenue' in daily_agg.columns and not daily_agg.empty else [],
'wow_profit': daily_agg['wow_profit'].fillna(0).tolist() if 'wow_profit' in daily_agg.columns and not daily_agg.empty else [],
'wow_avg_price': daily_agg['wow_avg_price'].fillna(0).tolist() if 'wow_avg_price' in daily_agg.columns and not daily_agg.empty else [],
'wow_qty': daily_agg['wow_qty'].fillna(0).tolist() if 'wow_qty' in daily_agg.columns and not daily_agg.empty else [],
'top10_labels': top10_labels,
'top10_values': top10_values
}
def prepare_category_summary(df, date_str=None, is_month_view=False, month_start=None, month_end=None):
"""準備分類聚合列表 (支援單日或月度範圍)"""
if is_month_view and month_start is not None and month_end is not None:
day_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
else:
day_df = df[df['snapshot_date'] == date_str]
cols = day_df.columns.tolist()
col_category = find_col(cols, ['館別', '分類', 'Category'])
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
col_amount = find_col(cols, ['銷售金額', '業績', '總業績'])
col_cost = find_col(cols, ['成本', '總成本'])
col_profit = find_col(cols, ['毛利'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
if not col_category or not col_amount:
return []
agg_dict = {col_amount: 'sum'}
if col_cost:
agg_dict[col_cost] = 'sum'
if col_profit:
agg_dict[col_profit] = 'sum'
if col_qty:
agg_dict[col_qty] = 'sum'
if col_name:
agg_dict[col_name] = 'nunique'
if col_vendor:
category_df = day_df.groupby([col_category, col_vendor]).agg(agg_dict).reset_index()
else:
category_df = day_df.groupby(col_category).agg(agg_dict).reset_index()
if col_profit and col_profit in category_df.columns:
pass
elif col_amount and col_cost and col_amount in category_df.columns and col_cost in category_df.columns:
category_df['profit_calculated'] = category_df[col_amount] - category_df[col_cost]
col_profit = 'profit_calculated'
else:
col_profit = None
if col_profit and col_profit in category_df.columns and col_amount and col_amount in category_df.columns:
category_df['margin_rate'] = (category_df[col_profit] / category_df[col_amount] * 100).fillna(0)
else:
category_df['margin_rate'] = 0
if col_qty and col_amount:
category_df['avg_price'] = (category_df[col_amount] / category_df[col_qty]).fillna(0)
else:
category_df['avg_price'] = 0
rename_dict = {col_category: 'category', col_amount: 'revenue'}
if col_vendor:
rename_dict[col_vendor] = 'vendor'
if col_cost:
rename_dict[col_cost] = 'cost'
if col_profit and col_profit in category_df.columns:
rename_dict[col_profit] = 'profit'
if col_qty:
rename_dict[col_qty] = 'qty'
if col_name:
rename_dict[col_name] = 'sku_count'
category_df = category_df.rename(columns=rename_dict)
if 'profit' not in category_df.columns:
category_df['profit'] = 0
if 'revenue' in category_df.columns:
category_df = category_df.sort_values(by='revenue', ascending=False)
return category_df.to_dict('records')
def prepare_category_chart(category_records, limit=12):
"""將分類明細壓成圖表需要的前 N 分類,避免前端再做重資料整理。"""
if not category_records:
return {
'labels': [],
'revenue': [],
'profit': [],
'margin_rate': [],
'qty': [],
}
category_df = pd.DataFrame(category_records)
if category_df.empty or 'category' not in category_df.columns:
return {
'labels': [],
'revenue': [],
'profit': [],
'margin_rate': [],
'qty': [],
}
for column in ['revenue', 'profit', 'qty']:
if column not in category_df.columns:
category_df[column] = 0
category_df[column] = pd.to_numeric(category_df[column], errors='coerce').fillna(0)
grouped = (
category_df.groupby('category', dropna=False)
.agg({'revenue': 'sum', 'profit': 'sum', 'qty': 'sum'})
.reset_index()
.sort_values(by='revenue', ascending=False)
.head(limit)
)
grouped['margin_rate'] = (
grouped['profit'] / grouped['revenue'].replace(0, pd.NA) * 100
).replace([float('inf'), float('-inf')], 0).fillna(0)
return {
'labels': grouped['category'].fillna('未分類').astype(str).tolist(),
'revenue': grouped['revenue'].tolist(),
'profit': grouped['profit'].tolist(),
'margin_rate': grouped['margin_rate'].tolist(),
'qty': grouped['qty'].tolist(),
}
# ==========================================
# 頁面路由
# ==========================================
@daily_sales_bp.route('/daily_sales')
@login_required
def daily_sales():
"""當日業績看板 (Day-over-Day 與 Week-over-Week 分析)"""
now_taipei = datetime.now(TAIPEI_TZ)
datetime_now_str = now_taipei.strftime('%Y-%m-%d %H:%M:%S')
try:
db = DatabaseManager()
engine = db.engine
table_name = 'daily_sales_snapshot'
inspector = inspect(engine)
if table_name not in inspector.get_table_names():
return render_template('daily_sales.html',
error="尚未匯入當日業績資料,請先至系統設定頁面匯入 Excel。",
selected_date=None, available_dates=[], current=None, dod=None, wow=None,
chart_data=None, categories=None, calendar_data=None, selected_month=None,
category_chart=None,
datetime_now=datetime_now_str, active_page='daily_sales')
available_dates, current_fingerprint = _get_daily_sales_metadata(engine, table_name)
if not available_dates:
return render_template('daily_sales.html',
error="資料表為空,請先匯入當日業績資料。",
selected_date=None, available_dates=[], current=None, dod=None, wow=None,
chart_data=None, categories=None, calendar_data=None, selected_month=None,
category_chart=None,
datetime_now=datetime_now_str, active_page='daily_sales')
available_dates_str = [d.strftime('%Y-%m-%d') for d in available_dates]
selected_date_param = request.args.get('date')
if selected_date_param:
selected_date = pd.to_datetime(selected_date_param)
else:
selected_date = available_dates[0]
selected_month_param = request.args.get('month')
if selected_month_param:
selected_month = pd.to_datetime(selected_month_param)
else:
selected_month = selected_date
is_month_view = not selected_date_param and not request.args.get('month')
if selected_month_param and not selected_date_param:
is_month_view = True
show_all_categories = request.args.get('detail') == 'all'
month_start = selected_month.replace(day=1)
month_end = (month_start + pd.DateOffset(months=1)) - pd.Timedelta(days=1)
data_start = min(
selected_date - pd.Timedelta(days=30),
selected_date - pd.Timedelta(days=7),
month_start - pd.Timedelta(days=1),
)
data_end = max(selected_date, month_end)
view_cache_key = "|".join([
table_name,
'view',
selected_date.strftime('%Y-%m-%d'),
selected_month.strftime('%Y-%m'),
'month' if is_month_view else 'day',
'category_all' if show_all_categories else f'category_top{_CATEGORY_TABLE_DEFAULT_LIMIT}',
str(current_fingerprint),
])
cached_context = _get_daily_view_cache(view_cache_key) or _get_shared_daily_view_cache(view_cache_key)
if cached_context:
_set_daily_view_cache(view_cache_key, cached_context)
return render_template('daily_sales.html', **cached_context)
cache_key = f"{table_name}_daily_{data_start.strftime('%Y%m%d')}_{data_end.strftime('%Y%m%d')}"
# TTL + DB fingerprint 雙閘檢查(資料變動即自動失效,跨 worker 強一致)
if _is_cache_valid(cache_key) and _SALES_PROCESSED_CACHE[cache_key].get('fingerprint') == current_fingerprint:
df = _SALES_PROCESSED_CACHE[cache_key]['df']
sys_log.debug(f"使用緩存數據,剩餘有效時間: {_CACHE_EXPIRY_SECONDS - (datetime.now() - _SALES_PROCESSED_CACHE[cache_key]['timestamp']).total_seconds():.0f}")
else:
df = _read_daily_sales_window(engine, table_name, data_start, data_end)
if df.empty:
return render_template('daily_sales.html',
error="所選日期區間沒有業績資料。",
selected_date=None, available_dates=available_dates_str, current=None, dod=None, wow=None,
chart_data=None, categories=None, calendar_data=None, selected_month=None,
category_chart=None,
datetime_now=datetime_now_str, active_page='daily_sales')
df = preprocess_daily_sales_data(df)
_SALES_PROCESSED_CACHE[cache_key] = {
'df': df, 'timestamp': datetime.now(),
'fingerprint': current_fingerprint,
}
sys_log.info(
f"已載入當日業績窗口 {data_start.strftime('%Y-%m-%d')}~{data_end.strftime('%Y-%m-%d')},共 {len(df)} 筆記錄"
)
current_kpi = calculate_daily_kpis(df, selected_date)
dod_kpi = calculate_dod(df, selected_date)
wow_kpi = calculate_wow(df, selected_date)
month_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
cols = month_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
month_kpi = {
'total_revenue': float(month_df[col_amount].sum()) if col_amount else 0,
'total_cost': float(month_df[col_cost].sum()) if col_cost else 0,
'gross_margin': float(month_df[col_profit].sum()) if col_profit else 0,
'total_qty': float(month_df[col_qty].sum()) if col_qty else 0,
'sku_count': int(month_df[col_name].nunique()) if col_name else 0,
'days_with_data': int(month_df['snapshot_date'].nunique())
}
if not col_profit and col_amount and col_cost:
month_kpi['gross_margin'] = month_kpi['total_revenue'] - month_kpi['total_cost']
if month_kpi['total_revenue'] > 0:
month_kpi['margin_rate'] = month_kpi['gross_margin'] / month_kpi['total_revenue'] * 100
else:
month_kpi['margin_rate'] = 0
if month_kpi['total_qty'] > 0:
month_kpi['avg_price'] = month_kpi['total_revenue'] / month_kpi['total_qty']
else:
month_kpi['avg_price'] = 0
chart_data = prepare_daily_charts(df, selected_date, days=30)
try:
competitor_intel = build_competitor_intel_payload(engine, days=30)
except Exception as exc:
sys_log.warning(f"[DailySales] PChome 競價情報讀取失敗,略過圖表串接: {exc}")
competitor_intel = None
category_list = prepare_category_summary(
df,
date_str=selected_date,
is_month_view=is_month_view,
month_start=month_start if is_month_view else None,
month_end=month_end if is_month_view else None
)
category_chart = prepare_category_chart(category_list, limit=12)
category_total_count = len(category_list)
if show_all_categories:
visible_categories = category_list
else:
visible_categories = category_list[:_CATEGORY_TABLE_DEFAULT_LIMIT]
calendar_data = prepare_calendar_data(df, selected_month)
marketing_data = prepare_marketing_summary(
df,
selected_date=selected_date if not is_month_view else None,
is_month_view=is_month_view,
month_start=month_start if is_month_view else None,
month_end=month_end if is_month_view else None
)
context = {
'selected_date': selected_date.strftime('%Y-%m-%d') if isinstance(selected_date, pd.Timestamp) else selected_date,
'available_dates': available_dates_str,
'current': current_kpi,
'dod': dod_kpi,
'wow': wow_kpi,
'month_kpi': month_kpi,
'is_month_view': is_month_view,
'chart_data': chart_data,
'competitor_intel': competitor_intel,
'categories': visible_categories,
'category_chart': category_chart,
'category_total_count': category_total_count,
'category_visible_count': len(visible_categories),
'category_table_limit': _CATEGORY_TABLE_DEFAULT_LIMIT,
'category_show_all': show_all_categories,
'category_show_all_url': _daily_sales_url_with_detail('all'),
'category_limited_url': _daily_sales_url_with_detail(None),
'calendar_data': calendar_data,
'marketing_data': marketing_data,
'selected_month': selected_month.strftime('%Y-%m') if isinstance(selected_month, pd.Timestamp) else selected_month,
'datetime_now': datetime_now_str,
'active_page': 'daily_sales',
}
_set_daily_view_cache(view_cache_key, context)
_set_shared_daily_view_cache(view_cache_key, context)
return render_template('daily_sales.html', **context)
except Exception as e:
sys_log.error(f"[Web] [DailySales] Error: {e}")
import traceback
traceback.print_exc()
return render_template('daily_sales.html',
error=f"系統錯誤: {str(e)}",
selected_date=None,
available_dates=[],
current=None,
dod=None,
wow=None,
month_kpi=None,
is_month_view=False,
chart_data=None,
categories=None,
category_chart=None,
calendar_data=None,
marketing_data=None,
selected_month=None,
datetime_now=datetime_now_str,
active_page='daily_sales')
@daily_sales_bp.route('/daily_sales/export')
@login_required
def export_daily_sales_category():
"""匯出當日業績分類明細為 Excel"""
try:
db = DatabaseManager()
engine = db.engine
table_name = 'daily_sales_snapshot'
inspector = inspect(engine)
if table_name not in inspector.get_table_names():
return "資料表不存在", 404
cache_key = f'{table_name}_daily'
if _is_cache_valid(cache_key, engine, table_name):
df = _SALES_PROCESSED_CACHE[cache_key]['df']
else:
df = safe_read_sql(table_name, engine=engine)
df = preprocess_daily_sales_data(df)
_SALES_PROCESSED_CACHE[cache_key] = {
'df': df, 'timestamp': datetime.now(),
'fingerprint': _get_data_fingerprint(engine, table_name),
}
selected_date = request.args.get('date')
if not selected_date:
available_dates = sorted(df['snapshot_date'].unique(), reverse=True)
if available_dates:
selected_date = str(available_dates[0])
else:
return "無可用日期", 404
categories = prepare_category_summary(df, selected_date)
if not categories:
return "無資料可匯出", 404
export_df = pd.DataFrame(categories)
column_mapping = {
'category': '分類',
'vendor': '廠商',
'revenue': '總業績',
'cost': '總成本',
'profit': '毛利',
'margin_rate': '毛利率(%)',
'qty': '總銷量',
'sku_count': 'SKU數',
'avg_price': '平均單價'
}
export_columns = [col for col in column_mapping.keys() if col in export_df.columns]
export_df = export_df[export_columns]
export_df = export_df.rename(columns=column_mapping)
for col in export_df.columns:
if col in ['總業績', '總成本', '毛利', '總銷量', 'SKU數', '平均單價']:
export_df[col] = export_df[col].apply(lambda x: f"{x:,.0f}" if pd.notna(x) else "0")
elif col == '毛利率(%)':
export_df[col] = export_df[col].apply(lambda x: f"{x:.1f}" if pd.notna(x) else "0.0")
filename = f"當日業績_分類明細_{selected_date}.xlsx"
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
export_df.to_excel(writer, index=False, sheet_name='分類業績明細')
worksheet = writer.sheets['分類業績明細']
for idx, col in enumerate(export_df.columns, 1):
max_length = max(
export_df[col].astype(str).apply(len).max(),
len(col)
) + 2
worksheet.column_dimensions[chr(64 + idx)].width = min(max_length, 50)
output.seek(0)
sys_log.info(f"[Web] [DailySales] Excel 匯出成功: {filename}")
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename
)
except Exception as e:
sys_log.error(f"[Web] [DailySales] Excel 匯出失敗: {e}")
import traceback
traceback.print_exc()
return f"匯出失敗: {str(e)}", 500
@daily_sales_bp.route('/daily_sales/export_marketing')
@login_required
def export_marketing_summary_excel():
"""匯出行銷活動業績明細為 Excel"""
try:
db = DatabaseManager()
engine = db.engine
table_name = 'daily_sales_snapshot'
cache_key = f'{table_name}_daily'
if _is_cache_valid(cache_key, engine, table_name):
df = _SALES_PROCESSED_CACHE[cache_key]['df']
else:
df = safe_read_sql(table_name, engine=engine)
df = preprocess_daily_sales_data(df)
_SALES_PROCESSED_CACHE[cache_key] = {
'df': df, 'timestamp': datetime.now(),
'fingerprint': _get_data_fingerprint(engine, table_name),
}
activity_type = request.args.get('type', 'all')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
selected_date = request.args.get('date')
selected_category = request.args.get('category', 'all')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
keyword = request.args.get('keyword', '')
if start_date and end_date:
df = df[(df['snapshot_date'] >= pd.to_datetime(start_date)) &
(df['snapshot_date'] <= pd.to_datetime(end_date))]
date_label = f"{start_date}_{end_date}"
elif selected_date:
df = df[df['snapshot_date'] == pd.to_datetime(selected_date)]
date_label = selected_date
else:
date_label = "全部"
cols = df.columns.tolist()
col_category = find_col(cols, ['館別', '商品館', '分類', 'Category'])
col_brand = find_col(cols, ['品牌', 'Brand'])
col_vendor = find_col(cols, ['廠商名稱', 'Vendor Name', '廠商', '供應商', 'Vendor', 'Supplier'])
col_name = find_col(cols, ['商品名稱', '品名'])
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
if selected_category != 'all' and col_category:
df = df[df[col_category] == selected_category]
if selected_brand != 'all' and col_brand:
df = df[df[col_brand] == selected_brand]
if selected_vendor != 'all' and col_vendor:
df = df[df[col_vendor] == selected_vendor]
if keyword and col_name:
df = df[df[col_name].str.contains(keyword, case=False, na=False)]
marketing_cols = {
'coupon': ('折價券活動名稱', '折價券活動'),
'discount': ('折扣活動名稱', '折扣活動'),
'bonus': ('滿額再折扣活動名稱', '滿額再折扣'),
'click': ('點我再折扣', '點我再折扣')
}
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
types_to_export = [activity_type] if activity_type != 'all' else ['coupon', 'discount', 'bonus', 'click']
summary_rows = []
for t in types_to_export:
if t not in marketing_cols:
continue
col_internal, sheet_label = marketing_cols[t]
if col_internal not in df.columns:
continue
m_df = df[df[col_internal].notna() & (df[col_internal] != '') & (df[col_internal] != '0') & (df[col_internal] != 0)]
if m_df.empty:
continue
grouped = m_df.groupby(col_internal).agg({
col_amount: 'sum',
col_qty: 'sum',
col_name: 'count'
}).reset_index()
grouped.columns = ['活動名稱', '總業績', '總銷量', '項目筆數']
grouped = grouped.sort_values(by='總業績', ascending=False)
grouped.to_excel(writer, sheet_name=sheet_label[:31], index=False)
grouped['活動類型'] = sheet_label
summary_rows.append(grouped)
if len(summary_rows) > 1:
all_m_df = pd.concat(summary_rows).sort_values(by='總業績', ascending=False)
all_m_df = all_m_df[['活動類型', '活動名稱', '總業績', '總銷量', '項目筆數']]
all_m_df.to_excel(writer, sheet_name='合併總表', index=False)
output.seek(0)
filename = f"行銷活動分析_{date_label}.xlsx"
return send_file(
output,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=True,
download_name=filename,
conditional=True
)
except Exception as e:
sys_log.error(f"[Web] [Marketing] Excel 匯出失敗: {e}")
import traceback
traceback.print_exc()
return f"匯出失敗: {str(e)}", 500