refactor(p1-06/07): daily_sales 純函數抽到 services/
All checks were successful
CD Pipeline / deploy (push) Successful in 1m14s

- 新增 utils/df_helpers.py 放共用 find_col(避免 routes/services 雙向依賴)
- 新增 services/daily_sales_service.py 收:
  * get_taiwan_holiday(date)
  * prepare_calendar_data(df, selected_month)
  * prepare_marketing_summary(df, ...)
- routes/daily_sales_routes.py 改為 import service,行數 949 → 713(-236)
- 行為 100% 保留,僅檔案位置搬移
This commit is contained in:
ooo
2026-04-28 15:36:40 +08:00
parent 832030b6de
commit d4ea555030
3 changed files with 267 additions and 242 deletions

View File

@@ -17,6 +17,12 @@ import pandas as pd
from config import BASE_DIR
from database.manager import DatabaseManager
from services.logger_manager import SystemLogger
from utils.df_helpers import find_col
from services.daily_sales_service import (
get_taiwan_holiday,
prepare_calendar_data,
prepare_marketing_summary,
)
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
@@ -68,15 +74,6 @@ def _is_cache_valid(cache_key):
# 輔助函數
# ==========================================
def find_col(df_cols, keywords):
"""從欄位列表中,根據關鍵字列表找出最匹配的欄位名稱"""
for k in keywords:
for col in df_cols:
if k in str(col):
return col
return None
def validate_table_name(table_name):
"""驗證表名(防止 SQL Injection"""
import re
@@ -360,239 +357,6 @@ def prepare_category_summary(df, date_str=None, is_month_view=False, month_start
return category_df.to_dict('records')
def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'):
"""準備行銷活動業績貢獻數據"""
if is_month_view and month_start is not None and month_end is not None:
target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
elif selected_date is not None:
target_df = df[df['snapshot_date'] == selected_date]
else:
target_df = df
if target_df.empty:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
cols = target_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty'])
col_profit = find_col(cols, ['毛利', 'Profit', '利潤'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
if not col_amount:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
marketing_cols = {
'coupon': '折價券活動名稱',
'discount': '折扣活動名稱',
'bonus': '滿額再折扣活動名稱',
'click': '點我再折扣'
}
result = {}
actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue'
for key, col_name in marketing_cols.items():
if col_name not in cols:
result[key] = []
continue
activity_df = target_df[
(target_df[col_name].notna()) &
(target_df[col_name] != '') &
(target_df[col_name] != '0') &
(target_df[col_name] != 0)
]
if activity_df.empty:
result[key] = []
continue
agg_args = {
'revenue': (col_amount, 'sum'),
'order_count': (col_amount, 'count')
}
if col_qty:
agg_args['qty'] = (col_qty, 'sum')
if col_profit:
agg_args['profit'] = (col_profit, 'sum')
grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index()
if 'profit' not in agg_args and col_cost:
cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index()
grouped = grouped.merge(cost_agg, on=col_name)
grouped['profit'] = grouped['revenue'] - grouped[col_cost]
grouped = grouped.rename(columns={col_name: 'name'})
sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue'
grouped = grouped.sort_values(sort_col, ascending=False).head(15)
records = []
for _, row in grouped.iterrows():
record = {
'name': str(row['name'])[:50],
'revenue': float(row['revenue']),
'order_count': int(row['order_count'])
}
if 'qty' in row:
record['qty'] = float(row['qty'])
if 'profit' in row:
record['profit'] = float(row['profit'])
records.append(record)
result[key] = records
return result
def get_taiwan_holiday(date):
"""判斷是否為台灣國定假日"""
year = date.year
month = date.month
day = date.day
holidays_2026 = {
(1, 1): '元旦',
(2, 14): '春節連假', (2, 15): '小年夜', (2, 16): '除夕',
(2, 17): '春節 (初一)', (2, 18): '春節 (初二)', (2, 19): '春節 (初三)',
(2, 20): '春節連假', (2, 21): '春節連假', (2, 22): '春節連假',
(2, 28): '和平紀念日', (3, 2): '和平紀念日補假',
(4, 3): '兒童節補假', (4, 4): '清明節', (4, 5): '清明節連假', (4, 6): '清明節補假',
(5, 1): '勞動節',
(6, 19): '端午節',
(9, 25): '中秋節', (9, 28): '教師節',
(10, 9): '國慶日補假', (10, 10): '國慶日',
(10, 25): '臺灣光復節', (10, 26): '光復節補假',
(12, 25): '行憲紀念日',
}
holidays_2027 = {
(1, 1): '元旦',
(2, 11): '春節 (除夕)', (2, 12): '春節 (初一)', (2, 13): '春節 (初二)',
(2, 14): '春節 (初三)', (2, 15): '春節 (初四)', (2, 16): '春節 (初五)', (2, 17): '春節 (初六)',
(2, 28): '和平紀念日',
(4, 4): '清明節', (4, 5): '清明節連假',
(6, 14): '端午節',
(9, 21): '中秋節',
(10, 10): '國慶日', (10, 11): '國慶日連假',
}
holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {})
holiday_name = holidays.get((month, day))
return (True, holiday_name) if holiday_name else (False, None)
def prepare_calendar_data(df, selected_month):
"""準備行事曆數據"""
year = selected_month.year
month = selected_month.month
first_day = pd.Timestamp(year=year, month=month, day=1)
last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1])
first_weekday = first_day.weekday()
calendar_start = first_day - timedelta(days=first_weekday)
last_weekday = last_day.weekday()
calendar_end = last_day + timedelta(days=(6 - last_weekday))
data_start = first_day - timedelta(days=1)
data_end = last_day
month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)]
cols = df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
calendar_days = []
current_date = calendar_start
while current_date <= calendar_end:
weekday = current_date.weekday()
weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
is_holiday, holiday_name = get_taiwan_holiday(current_date)
day_data = {
'date': current_date.strftime('%Y-%m-%d'),
'day': current_date.day,
'weekday': weekday_names[weekday],
'is_weekend': weekday >= 5,
'is_holiday': is_holiday,
'holiday_name': holiday_name,
'is_current_month': current_date.month == month,
'has_data': False,
'revenue': 0,
'profit': 0,
'margin_rate': 0,
'sku_count': 0,
'qty': 0,
'avg_price': 0,
'dod_percent': 0,
'dod_direction': 'neutral'
}
if first_day <= current_date <= last_day:
day_df = month_df[month_df['snapshot_date'] == current_date]
if not day_df.empty:
day_data['has_data'] = True
if col_amount:
day_data['revenue'] = float(day_df[col_amount].sum())
if col_profit:
day_data['profit'] = float(day_df[col_profit].sum())
elif col_cost and col_amount:
total_cost = float(day_df[col_cost].sum())
day_data['profit'] = day_data['revenue'] - total_cost
if day_data['revenue'] > 0:
day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100
if col_qty:
day_data['qty'] = float(day_df[col_qty].sum())
if day_data['qty'] > 0:
day_data['avg_price'] = day_data['revenue'] / day_data['qty']
if col_name:
day_data['sku_count'] = int(day_df[col_name].nunique())
# DoD%
prev_date = current_date - timedelta(days=1)
prev_df = month_df[month_df['snapshot_date'] == prev_date]
if not prev_df.empty and col_amount:
prev_revenue = float(prev_df[col_amount].sum())
if prev_revenue > 0:
dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100
day_data['dod_percent'] = round(dod, 1)
day_data['dod_direction'] = 'up' if dod >= 0 else 'down'
calendar_days.append(day_data)
current_date += timedelta(days=1)
weeks = []
for i in range(0, len(calendar_days), 7):
weeks.append(calendar_days[i:i + 7])
prev_month = selected_month - pd.DateOffset(months=1)
next_month = selected_month + pd.DateOffset(months=1)
return {
'year': year,
'month': month,
'month_name': selected_month.strftime('%Y年%m月'),
'weeks': weeks,
'prev_month': prev_month.strftime('%Y-%m'),
'next_month': next_month.strftime('%Y-%m')
}
# ==========================================
# 頁面路由
# ==========================================

View File

@@ -0,0 +1,243 @@
"""日報業務邏輯 — 從 routes/daily_sales_routes.py 抽出的純計算函數。
這些函數不依賴 Flask request context可獨立測試。
"""
import calendar
from datetime import timedelta
import pandas as pd
from utils.df_helpers import find_col
def get_taiwan_holiday(date):
"""判斷是否為台灣國定假日"""
year = date.year
month = date.month
day = date.day
holidays_2026 = {
(1, 1): '元旦',
(2, 14): '春節連假', (2, 15): '小年夜', (2, 16): '除夕',
(2, 17): '春節 (初一)', (2, 18): '春節 (初二)', (2, 19): '春節 (初三)',
(2, 20): '春節連假', (2, 21): '春節連假', (2, 22): '春節連假',
(2, 28): '和平紀念日', (3, 2): '和平紀念日補假',
(4, 3): '兒童節補假', (4, 4): '清明節', (4, 5): '清明節連假', (4, 6): '清明節補假',
(5, 1): '勞動節',
(6, 19): '端午節',
(9, 25): '中秋節', (9, 28): '教師節',
(10, 9): '國慶日補假', (10, 10): '國慶日',
(10, 25): '臺灣光復節', (10, 26): '光復節補假',
(12, 25): '行憲紀念日',
}
holidays_2027 = {
(1, 1): '元旦',
(2, 11): '春節 (除夕)', (2, 12): '春節 (初一)', (2, 13): '春節 (初二)',
(2, 14): '春節 (初三)', (2, 15): '春節 (初四)', (2, 16): '春節 (初五)', (2, 17): '春節 (初六)',
(2, 28): '和平紀念日',
(4, 4): '清明節', (4, 5): '清明節連假',
(6, 14): '端午節',
(9, 21): '中秋節',
(10, 10): '國慶日', (10, 11): '國慶日連假',
}
holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {})
holiday_name = holidays.get((month, day))
return (True, holiday_name) if holiday_name else (False, None)
def prepare_calendar_data(df, selected_month):
"""準備行事曆數據"""
year = selected_month.year
month = selected_month.month
first_day = pd.Timestamp(year=year, month=month, day=1)
last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1])
first_weekday = first_day.weekday()
calendar_start = first_day - timedelta(days=first_weekday)
last_weekday = last_day.weekday()
calendar_end = last_day + timedelta(days=(6 - last_weekday))
data_start = first_day - timedelta(days=1)
data_end = last_day
month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)]
cols = df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
calendar_days = []
current_date = calendar_start
while current_date <= calendar_end:
weekday = current_date.weekday()
weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
is_holiday, holiday_name = get_taiwan_holiday(current_date)
day_data = {
'date': current_date.strftime('%Y-%m-%d'),
'day': current_date.day,
'weekday': weekday_names[weekday],
'is_weekend': weekday >= 5,
'is_holiday': is_holiday,
'holiday_name': holiday_name,
'is_current_month': current_date.month == month,
'has_data': False,
'revenue': 0,
'profit': 0,
'margin_rate': 0,
'sku_count': 0,
'qty': 0,
'avg_price': 0,
'dod_percent': 0,
'dod_direction': 'neutral'
}
if first_day <= current_date <= last_day:
day_df = month_df[month_df['snapshot_date'] == current_date]
if not day_df.empty:
day_data['has_data'] = True
if col_amount:
day_data['revenue'] = float(day_df[col_amount].sum())
if col_profit:
day_data['profit'] = float(day_df[col_profit].sum())
elif col_cost and col_amount:
total_cost = float(day_df[col_cost].sum())
day_data['profit'] = day_data['revenue'] - total_cost
if day_data['revenue'] > 0:
day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100
if col_qty:
day_data['qty'] = float(day_df[col_qty].sum())
if day_data['qty'] > 0:
day_data['avg_price'] = day_data['revenue'] / day_data['qty']
if col_name:
day_data['sku_count'] = int(day_df[col_name].nunique())
# DoD%
prev_date = current_date - timedelta(days=1)
prev_df = month_df[month_df['snapshot_date'] == prev_date]
if not prev_df.empty and col_amount:
prev_revenue = float(prev_df[col_amount].sum())
if prev_revenue > 0:
dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100
day_data['dod_percent'] = round(dod, 1)
day_data['dod_direction'] = 'up' if dod >= 0 else 'down'
calendar_days.append(day_data)
current_date += timedelta(days=1)
weeks = []
for i in range(0, len(calendar_days), 7):
weeks.append(calendar_days[i:i + 7])
prev_month = selected_month - pd.DateOffset(months=1)
next_month = selected_month + pd.DateOffset(months=1)
return {
'year': year,
'month': month,
'month_name': selected_month.strftime('%Y年%m月'),
'weeks': weeks,
'prev_month': prev_month.strftime('%Y-%m'),
'next_month': next_month.strftime('%Y-%m')
}
def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'):
"""準備行銷活動業績貢獻數據"""
if is_month_view and month_start is not None and month_end is not None:
target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
elif selected_date is not None:
target_df = df[df['snapshot_date'] == selected_date]
else:
target_df = df
if target_df.empty:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
cols = target_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty'])
col_profit = find_col(cols, ['毛利', 'Profit', '利潤'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
if not col_amount:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
marketing_cols = {
'coupon': '折價券活動名稱',
'discount': '折扣活動名稱',
'bonus': '滿額再折扣活動名稱',
'click': '點我再折扣'
}
result = {}
actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue'
for key, col_name in marketing_cols.items():
if col_name not in cols:
result[key] = []
continue
activity_df = target_df[
(target_df[col_name].notna()) &
(target_df[col_name] != '') &
(target_df[col_name] != '0') &
(target_df[col_name] != 0)
]
if activity_df.empty:
result[key] = []
continue
agg_args = {
'revenue': (col_amount, 'sum'),
'order_count': (col_amount, 'count')
}
if col_qty:
agg_args['qty'] = (col_qty, 'sum')
if col_profit:
agg_args['profit'] = (col_profit, 'sum')
grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index()
if 'profit' not in agg_args and col_cost:
cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index()
grouped = grouped.merge(cost_agg, on=col_name)
grouped['profit'] = grouped['revenue'] - grouped[col_cost]
grouped = grouped.rename(columns={col_name: 'name'})
sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue'
grouped = grouped.sort_values(sort_col, ascending=False).head(15)
records = []
for _, row in grouped.iterrows():
record = {
'name': str(row['name'])[:50],
'revenue': float(row['revenue']),
'order_count': int(row['order_count'])
}
if 'qty' in row:
record['qty'] = float(row['qty'])
if 'profit' in row:
record['profit'] = float(row['profit'])
records.append(record)
result[key] = records
return result

18
utils/df_helpers.py Normal file
View File

@@ -0,0 +1,18 @@
"""DataFrame 共用工具 — 跨 routes/services 共享。"""
def find_col(df_cols, keywords):
"""從欄位列表中,根據關鍵字列表找出最匹配的欄位名稱。
Args:
df_cols: DataFrame 的欄位名稱 iterable
keywords: 關鍵字列表,依優先序
Returns:
匹配的欄位名稱字串,找不到則回傳 None
"""
for k in keywords:
for col in df_cols:
if k in str(col):
return col
return None