Files
ewoooc/services/daily_sales_service.py
ooo d4ea555030
All checks were successful
CD Pipeline / deploy (push) Successful in 1m14s
refactor(p1-06/07): daily_sales 純函數抽到 services/
- 新增 utils/df_helpers.py 放共用 find_col(避免 routes/services 雙向依賴)
- 新增 services/daily_sales_service.py 收:
  * get_taiwan_holiday(date)
  * prepare_calendar_data(df, selected_month)
  * prepare_marketing_summary(df, ...)
- routes/daily_sales_routes.py 改為 import service,行數 949 → 713(-236)
- 行為 100% 保留,僅檔案位置搬移
2026-04-28 15:37:07 +08:00

244 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""日報業務邏輯 — 從 routes/daily_sales_routes.py 抽出的純計算函數。
這些函數不依賴 Flask request context可獨立測試。
"""
import calendar
from datetime import timedelta
import pandas as pd
from utils.df_helpers import find_col
def get_taiwan_holiday(date):
"""判斷是否為台灣國定假日"""
year = date.year
month = date.month
day = date.day
holidays_2026 = {
(1, 1): '元旦',
(2, 14): '春節連假', (2, 15): '小年夜', (2, 16): '除夕',
(2, 17): '春節 (初一)', (2, 18): '春節 (初二)', (2, 19): '春節 (初三)',
(2, 20): '春節連假', (2, 21): '春節連假', (2, 22): '春節連假',
(2, 28): '和平紀念日', (3, 2): '和平紀念日補假',
(4, 3): '兒童節補假', (4, 4): '清明節', (4, 5): '清明節連假', (4, 6): '清明節補假',
(5, 1): '勞動節',
(6, 19): '端午節',
(9, 25): '中秋節', (9, 28): '教師節',
(10, 9): '國慶日補假', (10, 10): '國慶日',
(10, 25): '臺灣光復節', (10, 26): '光復節補假',
(12, 25): '行憲紀念日',
}
holidays_2027 = {
(1, 1): '元旦',
(2, 11): '春節 (除夕)', (2, 12): '春節 (初一)', (2, 13): '春節 (初二)',
(2, 14): '春節 (初三)', (2, 15): '春節 (初四)', (2, 16): '春節 (初五)', (2, 17): '春節 (初六)',
(2, 28): '和平紀念日',
(4, 4): '清明節', (4, 5): '清明節連假',
(6, 14): '端午節',
(9, 21): '中秋節',
(10, 10): '國慶日', (10, 11): '國慶日連假',
}
holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {})
holiday_name = holidays.get((month, day))
return (True, holiday_name) if holiday_name else (False, None)
def prepare_calendar_data(df, selected_month):
"""準備行事曆數據"""
year = selected_month.year
month = selected_month.month
first_day = pd.Timestamp(year=year, month=month, day=1)
last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1])
first_weekday = first_day.weekday()
calendar_start = first_day - timedelta(days=first_weekday)
last_weekday = last_day.weekday()
calendar_end = last_day + timedelta(days=(6 - last_weekday))
data_start = first_day - timedelta(days=1)
data_end = last_day
month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)]
cols = df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
calendar_days = []
current_date = calendar_start
while current_date <= calendar_end:
weekday = current_date.weekday()
weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
is_holiday, holiday_name = get_taiwan_holiday(current_date)
day_data = {
'date': current_date.strftime('%Y-%m-%d'),
'day': current_date.day,
'weekday': weekday_names[weekday],
'is_weekend': weekday >= 5,
'is_holiday': is_holiday,
'holiday_name': holiday_name,
'is_current_month': current_date.month == month,
'has_data': False,
'revenue': 0,
'profit': 0,
'margin_rate': 0,
'sku_count': 0,
'qty': 0,
'avg_price': 0,
'dod_percent': 0,
'dod_direction': 'neutral'
}
if first_day <= current_date <= last_day:
day_df = month_df[month_df['snapshot_date'] == current_date]
if not day_df.empty:
day_data['has_data'] = True
if col_amount:
day_data['revenue'] = float(day_df[col_amount].sum())
if col_profit:
day_data['profit'] = float(day_df[col_profit].sum())
elif col_cost and col_amount:
total_cost = float(day_df[col_cost].sum())
day_data['profit'] = day_data['revenue'] - total_cost
if day_data['revenue'] > 0:
day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100
if col_qty:
day_data['qty'] = float(day_df[col_qty].sum())
if day_data['qty'] > 0:
day_data['avg_price'] = day_data['revenue'] / day_data['qty']
if col_name:
day_data['sku_count'] = int(day_df[col_name].nunique())
# DoD%
prev_date = current_date - timedelta(days=1)
prev_df = month_df[month_df['snapshot_date'] == prev_date]
if not prev_df.empty and col_amount:
prev_revenue = float(prev_df[col_amount].sum())
if prev_revenue > 0:
dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100
day_data['dod_percent'] = round(dod, 1)
day_data['dod_direction'] = 'up' if dod >= 0 else 'down'
calendar_days.append(day_data)
current_date += timedelta(days=1)
weeks = []
for i in range(0, len(calendar_days), 7):
weeks.append(calendar_days[i:i + 7])
prev_month = selected_month - pd.DateOffset(months=1)
next_month = selected_month + pd.DateOffset(months=1)
return {
'year': year,
'month': month,
'month_name': selected_month.strftime('%Y年%m月'),
'weeks': weeks,
'prev_month': prev_month.strftime('%Y-%m'),
'next_month': next_month.strftime('%Y-%m')
}
def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'):
"""準備行銷活動業績貢獻數據"""
if is_month_view and month_start is not None and month_end is not None:
target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
elif selected_date is not None:
target_df = df[df['snapshot_date'] == selected_date]
else:
target_df = df
if target_df.empty:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
cols = target_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty'])
col_profit = find_col(cols, ['毛利', 'Profit', '利潤'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
if not col_amount:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
marketing_cols = {
'coupon': '折價券活動名稱',
'discount': '折扣活動名稱',
'bonus': '滿額再折扣活動名稱',
'click': '點我再折扣'
}
result = {}
actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue'
for key, col_name in marketing_cols.items():
if col_name not in cols:
result[key] = []
continue
activity_df = target_df[
(target_df[col_name].notna()) &
(target_df[col_name] != '') &
(target_df[col_name] != '0') &
(target_df[col_name] != 0)
]
if activity_df.empty:
result[key] = []
continue
agg_args = {
'revenue': (col_amount, 'sum'),
'order_count': (col_amount, 'count')
}
if col_qty:
agg_args['qty'] = (col_qty, 'sum')
if col_profit:
agg_args['profit'] = (col_profit, 'sum')
grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index()
if 'profit' not in agg_args and col_cost:
cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index()
grouped = grouped.merge(cost_agg, on=col_name)
grouped['profit'] = grouped['revenue'] - grouped[col_cost]
grouped = grouped.rename(columns={col_name: 'name'})
sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue'
grouped = grouped.sort_values(sort_col, ascending=False).head(15)
records = []
for _, row in grouped.iterrows():
record = {
'name': str(row['name'])[:50],
'revenue': float(row['revenue']),
'order_count': int(row['order_count'])
}
if 'qty' in row:
record['qty'] = float(row['qty'])
if 'profit' in row:
record['profit'] = float(row['profit'])
records.append(record)
result[key] = records
return result