diff --git a/routes/daily_sales_routes.py b/routes/daily_sales_routes.py index 35f433d..929f1ed 100644 --- a/routes/daily_sales_routes.py +++ b/routes/daily_sales_routes.py @@ -17,6 +17,12 @@ import pandas as pd from config import BASE_DIR from database.manager import DatabaseManager from services.logger_manager import SystemLogger +from utils.df_helpers import find_col +from services.daily_sales_service import ( + get_taiwan_holiday, + prepare_calendar_data, + prepare_marketing_summary, +) # 時區設定 TAIPEI_TZ = timezone(timedelta(hours=8)) @@ -68,15 +74,6 @@ def _is_cache_valid(cache_key): # 輔助函數 # ========================================== -def find_col(df_cols, keywords): - """從欄位列表中,根據關鍵字列表找出最匹配的欄位名稱""" - for k in keywords: - for col in df_cols: - if k in str(col): - return col - return None - - def validate_table_name(table_name): """驗證表名(防止 SQL Injection)""" import re @@ -360,239 +357,6 @@ def prepare_category_summary(df, date_str=None, is_month_view=False, month_start return category_df.to_dict('records') -def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'): - """準備行銷活動業績貢獻數據""" - if is_month_view and month_start is not None and month_end is not None: - target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)] - elif selected_date is not None: - target_df = df[df['snapshot_date'] == selected_date] - else: - target_df = df - - if target_df.empty: - return {'coupon': [], 'discount': [], 'bonus': [], 'click': []} - - cols = target_df.columns.tolist() - col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) - col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty']) - col_profit = find_col(cols, ['毛利', 'Profit', '利潤']) - col_cost = find_col(cols, ['成本', 'Cost', '總成本']) - - if not col_amount: - return {'coupon': [], 'discount': [], 'bonus': [], 'click': []} - - marketing_cols = { - 'coupon': '折價券活動名稱', - 'discount': '折扣活動名稱', - 'bonus': '滿額再折扣活動名稱', - 'click': '點我再折扣' - } - - result = {} - actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue' - - for key, col_name in marketing_cols.items(): - if col_name not in cols: - result[key] = [] - continue - - activity_df = target_df[ - (target_df[col_name].notna()) & - (target_df[col_name] != '') & - (target_df[col_name] != '0') & - (target_df[col_name] != 0) - ] - - if activity_df.empty: - result[key] = [] - continue - - agg_args = { - 'revenue': (col_amount, 'sum'), - 'order_count': (col_amount, 'count') - } - if col_qty: - agg_args['qty'] = (col_qty, 'sum') - if col_profit: - agg_args['profit'] = (col_profit, 'sum') - - grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index() - - if 'profit' not in agg_args and col_cost: - cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index() - grouped = grouped.merge(cost_agg, on=col_name) - grouped['profit'] = grouped['revenue'] - grouped[col_cost] - - grouped = grouped.rename(columns={col_name: 'name'}) - - sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue' - grouped = grouped.sort_values(sort_col, ascending=False).head(15) - - records = [] - for _, row in grouped.iterrows(): - record = { - 'name': str(row['name'])[:50], - 'revenue': float(row['revenue']), - 'order_count': int(row['order_count']) - } - if 'qty' in row: - record['qty'] = float(row['qty']) - if 'profit' in row: - record['profit'] = float(row['profit']) - records.append(record) - - result[key] = records - - return result - - -def get_taiwan_holiday(date): - """判斷是否為台灣國定假日""" - year = date.year - month = date.month - day = date.day - - holidays_2026 = { - (1, 1): '元旦', - (2, 14): '春節連假', (2, 15): '小年夜', (2, 16): '除夕', - (2, 17): '春節 (初一)', (2, 18): '春節 (初二)', (2, 19): '春節 (初三)', - (2, 20): '春節連假', (2, 21): '春節連假', (2, 22): '春節連假', - (2, 28): '和平紀念日', (3, 2): '和平紀念日補假', - (4, 3): '兒童節補假', (4, 4): '清明節', (4, 5): '清明節連假', (4, 6): '清明節補假', - (5, 1): '勞動節', - (6, 19): '端午節', - (9, 25): '中秋節', (9, 28): '教師節', - (10, 9): '國慶日補假', (10, 10): '國慶日', - (10, 25): '臺灣光復節', (10, 26): '光復節補假', - (12, 25): '行憲紀念日', - } - - holidays_2027 = { - (1, 1): '元旦', - (2, 11): '春節 (除夕)', (2, 12): '春節 (初一)', (2, 13): '春節 (初二)', - (2, 14): '春節 (初三)', (2, 15): '春節 (初四)', (2, 16): '春節 (初五)', (2, 17): '春節 (初六)', - (2, 28): '和平紀念日', - (4, 4): '清明節', (4, 5): '清明節連假', - (6, 14): '端午節', - (9, 21): '中秋節', - (10, 10): '國慶日', (10, 11): '國慶日連假', - } - - holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {}) - holiday_name = holidays.get((month, day)) - return (True, holiday_name) if holiday_name else (False, None) - - -def prepare_calendar_data(df, selected_month): - """準備行事曆數據""" - year = selected_month.year - month = selected_month.month - - first_day = pd.Timestamp(year=year, month=month, day=1) - last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1]) - - first_weekday = first_day.weekday() - calendar_start = first_day - timedelta(days=first_weekday) - last_weekday = last_day.weekday() - calendar_end = last_day + timedelta(days=(6 - last_weekday)) - - data_start = first_day - timedelta(days=1) - data_end = last_day - month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)] - - cols = df.columns.tolist() - col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) - col_cost = find_col(cols, ['成本', 'Cost']) - col_profit = find_col(cols, ['毛利', 'Profit']) - col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量']) - col_name = find_col(cols, ['商品名稱', '品名']) - - calendar_days = [] - current_date = calendar_start - - while current_date <= calendar_end: - weekday = current_date.weekday() - weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'] - - is_holiday, holiday_name = get_taiwan_holiday(current_date) - - day_data = { - 'date': current_date.strftime('%Y-%m-%d'), - 'day': current_date.day, - 'weekday': weekday_names[weekday], - 'is_weekend': weekday >= 5, - 'is_holiday': is_holiday, - 'holiday_name': holiday_name, - 'is_current_month': current_date.month == month, - 'has_data': False, - 'revenue': 0, - 'profit': 0, - 'margin_rate': 0, - 'sku_count': 0, - 'qty': 0, - 'avg_price': 0, - 'dod_percent': 0, - 'dod_direction': 'neutral' - } - - if first_day <= current_date <= last_day: - day_df = month_df[month_df['snapshot_date'] == current_date] - - if not day_df.empty: - day_data['has_data'] = True - - if col_amount: - day_data['revenue'] = float(day_df[col_amount].sum()) - - if col_profit: - day_data['profit'] = float(day_df[col_profit].sum()) - elif col_cost and col_amount: - total_cost = float(day_df[col_cost].sum()) - day_data['profit'] = day_data['revenue'] - total_cost - - if day_data['revenue'] > 0: - day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100 - - if col_qty: - day_data['qty'] = float(day_df[col_qty].sum()) - - if day_data['qty'] > 0: - day_data['avg_price'] = day_data['revenue'] / day_data['qty'] - - if col_name: - day_data['sku_count'] = int(day_df[col_name].nunique()) - - # DoD% - prev_date = current_date - timedelta(days=1) - prev_df = month_df[month_df['snapshot_date'] == prev_date] - - if not prev_df.empty and col_amount: - prev_revenue = float(prev_df[col_amount].sum()) - if prev_revenue > 0: - dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100 - day_data['dod_percent'] = round(dod, 1) - day_data['dod_direction'] = 'up' if dod >= 0 else 'down' - - calendar_days.append(day_data) - current_date += timedelta(days=1) - - weeks = [] - for i in range(0, len(calendar_days), 7): - weeks.append(calendar_days[i:i + 7]) - - prev_month = selected_month - pd.DateOffset(months=1) - next_month = selected_month + pd.DateOffset(months=1) - - return { - 'year': year, - 'month': month, - 'month_name': selected_month.strftime('%Y年%m月'), - 'weeks': weeks, - 'prev_month': prev_month.strftime('%Y-%m'), - 'next_month': next_month.strftime('%Y-%m') - } - - # ========================================== # 頁面路由 # ========================================== diff --git a/services/daily_sales_service.py b/services/daily_sales_service.py new file mode 100644 index 0000000..570b63b --- /dev/null +++ b/services/daily_sales_service.py @@ -0,0 +1,243 @@ +"""日報業務邏輯 — 從 routes/daily_sales_routes.py 抽出的純計算函數。 + +這些函數不依賴 Flask request context,可獨立測試。 +""" +import calendar +from datetime import timedelta + +import pandas as pd + +from utils.df_helpers import find_col + + +def get_taiwan_holiday(date): + """判斷是否為台灣國定假日""" + year = date.year + month = date.month + day = date.day + + holidays_2026 = { + (1, 1): '元旦', + (2, 14): '春節連假', (2, 15): '小年夜', (2, 16): '除夕', + (2, 17): '春節 (初一)', (2, 18): '春節 (初二)', (2, 19): '春節 (初三)', + (2, 20): '春節連假', (2, 21): '春節連假', (2, 22): '春節連假', + (2, 28): '和平紀念日', (3, 2): '和平紀念日補假', + (4, 3): '兒童節補假', (4, 4): '清明節', (4, 5): '清明節連假', (4, 6): '清明節補假', + (5, 1): '勞動節', + (6, 19): '端午節', + (9, 25): '中秋節', (9, 28): '教師節', + (10, 9): '國慶日補假', (10, 10): '國慶日', + (10, 25): '臺灣光復節', (10, 26): '光復節補假', + (12, 25): '行憲紀念日', + } + + holidays_2027 = { + (1, 1): '元旦', + (2, 11): '春節 (除夕)', (2, 12): '春節 (初一)', (2, 13): '春節 (初二)', + (2, 14): '春節 (初三)', (2, 15): '春節 (初四)', (2, 16): '春節 (初五)', (2, 17): '春節 (初六)', + (2, 28): '和平紀念日', + (4, 4): '清明節', (4, 5): '清明節連假', + (6, 14): '端午節', + (9, 21): '中秋節', + (10, 10): '國慶日', (10, 11): '國慶日連假', + } + + holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {}) + holiday_name = holidays.get((month, day)) + return (True, holiday_name) if holiday_name else (False, None) + + +def prepare_calendar_data(df, selected_month): + """準備行事曆數據""" + year = selected_month.year + month = selected_month.month + + first_day = pd.Timestamp(year=year, month=month, day=1) + last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1]) + + first_weekday = first_day.weekday() + calendar_start = first_day - timedelta(days=first_weekday) + last_weekday = last_day.weekday() + calendar_end = last_day + timedelta(days=(6 - last_weekday)) + + data_start = first_day - timedelta(days=1) + data_end = last_day + month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)] + + cols = df.columns.tolist() + col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) + col_cost = find_col(cols, ['成本', 'Cost']) + col_profit = find_col(cols, ['毛利', 'Profit']) + col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量']) + col_name = find_col(cols, ['商品名稱', '品名']) + + calendar_days = [] + current_date = calendar_start + + while current_date <= calendar_end: + weekday = current_date.weekday() + weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'] + + is_holiday, holiday_name = get_taiwan_holiday(current_date) + + day_data = { + 'date': current_date.strftime('%Y-%m-%d'), + 'day': current_date.day, + 'weekday': weekday_names[weekday], + 'is_weekend': weekday >= 5, + 'is_holiday': is_holiday, + 'holiday_name': holiday_name, + 'is_current_month': current_date.month == month, + 'has_data': False, + 'revenue': 0, + 'profit': 0, + 'margin_rate': 0, + 'sku_count': 0, + 'qty': 0, + 'avg_price': 0, + 'dod_percent': 0, + 'dod_direction': 'neutral' + } + + if first_day <= current_date <= last_day: + day_df = month_df[month_df['snapshot_date'] == current_date] + + if not day_df.empty: + day_data['has_data'] = True + + if col_amount: + day_data['revenue'] = float(day_df[col_amount].sum()) + + if col_profit: + day_data['profit'] = float(day_df[col_profit].sum()) + elif col_cost and col_amount: + total_cost = float(day_df[col_cost].sum()) + day_data['profit'] = day_data['revenue'] - total_cost + + if day_data['revenue'] > 0: + day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100 + + if col_qty: + day_data['qty'] = float(day_df[col_qty].sum()) + + if day_data['qty'] > 0: + day_data['avg_price'] = day_data['revenue'] / day_data['qty'] + + if col_name: + day_data['sku_count'] = int(day_df[col_name].nunique()) + + # DoD% + prev_date = current_date - timedelta(days=1) + prev_df = month_df[month_df['snapshot_date'] == prev_date] + + if not prev_df.empty and col_amount: + prev_revenue = float(prev_df[col_amount].sum()) + if prev_revenue > 0: + dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100 + day_data['dod_percent'] = round(dod, 1) + day_data['dod_direction'] = 'up' if dod >= 0 else 'down' + + calendar_days.append(day_data) + current_date += timedelta(days=1) + + weeks = [] + for i in range(0, len(calendar_days), 7): + weeks.append(calendar_days[i:i + 7]) + + prev_month = selected_month - pd.DateOffset(months=1) + next_month = selected_month + pd.DateOffset(months=1) + + return { + 'year': year, + 'month': month, + 'month_name': selected_month.strftime('%Y年%m月'), + 'weeks': weeks, + 'prev_month': prev_month.strftime('%Y-%m'), + 'next_month': next_month.strftime('%Y-%m') + } + + +def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'): + """準備行銷活動業績貢獻數據""" + if is_month_view and month_start is not None and month_end is not None: + target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)] + elif selected_date is not None: + target_df = df[df['snapshot_date'] == selected_date] + else: + target_df = df + + if target_df.empty: + return {'coupon': [], 'discount': [], 'bonus': [], 'click': []} + + cols = target_df.columns.tolist() + col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) + col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty']) + col_profit = find_col(cols, ['毛利', 'Profit', '利潤']) + col_cost = find_col(cols, ['成本', 'Cost', '總成本']) + + if not col_amount: + return {'coupon': [], 'discount': [], 'bonus': [], 'click': []} + + marketing_cols = { + 'coupon': '折價券活動名稱', + 'discount': '折扣活動名稱', + 'bonus': '滿額再折扣活動名稱', + 'click': '點我再折扣' + } + + result = {} + actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue' + + for key, col_name in marketing_cols.items(): + if col_name not in cols: + result[key] = [] + continue + + activity_df = target_df[ + (target_df[col_name].notna()) & + (target_df[col_name] != '') & + (target_df[col_name] != '0') & + (target_df[col_name] != 0) + ] + + if activity_df.empty: + result[key] = [] + continue + + agg_args = { + 'revenue': (col_amount, 'sum'), + 'order_count': (col_amount, 'count') + } + if col_qty: + agg_args['qty'] = (col_qty, 'sum') + if col_profit: + agg_args['profit'] = (col_profit, 'sum') + + grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index() + + if 'profit' not in agg_args and col_cost: + cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index() + grouped = grouped.merge(cost_agg, on=col_name) + grouped['profit'] = grouped['revenue'] - grouped[col_cost] + + grouped = grouped.rename(columns={col_name: 'name'}) + + sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue' + grouped = grouped.sort_values(sort_col, ascending=False).head(15) + + records = [] + for _, row in grouped.iterrows(): + record = { + 'name': str(row['name'])[:50], + 'revenue': float(row['revenue']), + 'order_count': int(row['order_count']) + } + if 'qty' in row: + record['qty'] = float(row['qty']) + if 'profit' in row: + record['profit'] = float(row['profit']) + records.append(record) + + result[key] = records + + return result diff --git a/utils/df_helpers.py b/utils/df_helpers.py new file mode 100644 index 0000000..a605bf3 --- /dev/null +++ b/utils/df_helpers.py @@ -0,0 +1,18 @@ +"""DataFrame 共用工具 — 跨 routes/services 共享。""" + + +def find_col(df_cols, keywords): + """從欄位列表中,根據關鍵字列表找出最匹配的欄位名稱。 + + Args: + df_cols: DataFrame 的欄位名稱 iterable + keywords: 關鍵字列表,依優先序 + + Returns: + 匹配的欄位名稱字串,找不到則回傳 None + """ + for k in keywords: + for col in df_cols: + if k in str(col): + return col + return None