Files
ewoooc/routes/export_routes.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

642 lines
25 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
匯出功能路由模組
包含:各種報表匯出 API (Excel、CSV)
"""
import os
import io
from datetime import datetime, timezone, timedelta
from flask import Blueprint, request, send_file, redirect, url_for, flash
from auth import login_required
from sqlalchemy import func, desc
import pandas as pd
import numpy as np
from config import BASE_DIR, EXCEL_EXPORT_DIR
from database.manager import DatabaseManager
from database.models import Product, PriceRecord
from services.exporter import Exporter
from services.logger_manager import SystemLogger
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Logger
sys_log = SystemLogger("ExportRoutes").get_logger()
# Blueprint 定義
export_bp = Blueprint('export', __name__)
# ==========================================
# 輔助函數 (使用獨立模組,避免循環依賴)
# ==========================================
def _get_consolidated_data():
"""從 dashboard_routes 模組導入 get_consolidated_data 函數"""
from routes.dashboard_routes import get_consolidated_data
return get_consolidated_data()
def _get_sales_cache():
"""從 cache_service 導入業績分析快取"""
from services.cache_service import _SALES_PROCESSED_CACHE
return _SALES_PROCESSED_CACHE
# ==========================================
# 全分類匯出
# ==========================================
@export_bp.route('/api/export/all_categories')
@login_required
def export_all_categories():
"""處理全分類報表匯出請求"""
try:
sys_log.info("執行全分類 CSV 數據導出...")
# 獲取與看板一致的整合數據
items, _ = _get_consolidated_data()
# 呼叫匯出服務
exporter = Exporter()
file_path = exporter.generate_all_categories_report()
if file_path:
abs_file_path = os.path.abspath(file_path)
if os.path.exists(abs_file_path):
sys_log.info(f"報表匯出成功,準備下載: {abs_file_path}")
return send_file(abs_file_path, as_attachment=True)
return "匯出失敗:資料庫內尚無足夠數據", 404
except Exception as e:
sys_log.error(f"[Web] [Export] 全分類報表匯出異常 | Error: {e}")
return f"匯出失敗,錯誤詳情:{e}", 500
@export_bp.route('/api/export/excel/all')
@login_required
def export_excel_all():
"""匯出所有商品 Excel"""
try:
items, _ = _get_consolidated_data()
exporter = Exporter()
file_path = exporter.generate_all_products_excel(items)
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "匯出失敗", 500
except Exception as e:
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (All) | Error: {e}")
return f"匯出失敗: {e}", 500
# ==========================================
# 價格變動匯出
# ==========================================
@export_bp.route('/api/export/excel/changes')
@login_required
def export_excel_changes():
"""匯出價格變動商品 Excel (漲價/跌價)"""
try:
items, _ = _get_consolidated_data()
increase = [i for i in items if i['yesterday_diff'] > 0]
decrease = [i for i in items if i['yesterday_diff'] < 0]
exporter = Exporter()
file_path = exporter.generate_changes_excel(increase, decrease)
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "匯出失敗", 500
except Exception as e:
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (Changes) | Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/delisted')
@login_required
def export_excel_delisted():
"""匯出下架商品 Excel"""
db = DatabaseManager()
session = db.get_session()
try:
_, today_start = _get_consolidated_data()
today_delisted_query = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start # 保持台北時區
)
raw_items = today_delisted_query.all()
delisted_items = [{
'product': p,
'last_price': (session.query(PriceRecord).filter_by(product_id=p.id)
.order_by(desc(PriceRecord.timestamp)).first().price
if session.query(PriceRecord).filter_by(product_id=p.id).first() else 0)
} for p in raw_items]
exporter = Exporter()
file_path = exporter.generate_delisted_excel(delisted_items)
return send_file(file_path, as_attachment=True)
except Exception as e:
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (Delisted) | Error: {e}")
return f"匯出失敗: {e}", 500
finally:
session.close()
@export_bp.route('/api/export/price_changes')
@login_required
def export_price_changes():
"""匯出今日價格異動明細 (支援篩選)"""
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
filter_type = request.args.get('type', '')
filter_category = request.args.get('category', '')
try:
db = DatabaseManager()
session = db.get_session()
now_taipei = datetime.now(TAIPEI_TZ)
today_start = now_taipei.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
# 基礎查詢:取得所有商品的最新記錄
latest_records_subq = session.query(
func.max(PriceRecord.id).label('max_id')
).group_by(PriceRecord.product_id).subquery()
query = session.query(PriceRecord, Product).join(
latest_records_subq,
PriceRecord.id == latest_records_subq.c.max_id
).join(Product, PriceRecord.product_id == Product.id)
# 查詢所有商品的「今日之前最後價格」
product_ids = [r[0] for r in session.query(PriceRecord.product_id).join(
latest_records_subq, PriceRecord.id == latest_records_subq.c.max_id
).all()]
yesterday_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < today_start
).group_by(PriceRecord.product_id).subquery()
yesterday_prices_q = session.query(
PriceRecord.product_id, PriceRecord.price
).join(
yesterday_prices_subq,
PriceRecord.id == yesterday_prices_subq.c.max_id
)
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
products = []
# 根據 filter_type 篩選
if filter_type == 'increase':
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price > old_price:
products.append((product, record, old_price))
elif filter_type == 'decrease':
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price < old_price:
products.append((product, record, old_price))
elif filter_type == 'delisted':
today_delisted = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start
).all()
for product in today_delisted:
last_record = session.query(PriceRecord).filter(
PriceRecord.product_id == product.id
).order_by(PriceRecord.timestamp.desc()).first()
if last_record:
products.append((product, last_record, last_record.price))
elif filter_type == 'active':
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price != old_price:
products.append((product, record, old_price))
elif filter_type == 'category' and filter_category:
for record, product in query.filter(Product.category == filter_category).all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price != old_price:
products.append((product, record, old_price))
else:
# 預設:所有變動商品
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price != old_price:
products.append((product, record, old_price))
session.close()
if not products:
return "無符合條件的商品資料", 404
# 建立 Excel
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "價格變動明細"
# 標題列
headers = ['商品ID', '商品名稱', '分類', '原價格', '現價格', '變動金額', '變動百分比', '更新時間', '商品網址']
ws.append(headers)
# 設定標題列樣式
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
header_font = Font(bold=True, color='FFFFFF')
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 填充資料
for product, record, old_price in products:
change = record.price - old_price
change_pct = (change / old_price * 100) if old_price > 0 else 0
ws.append([
product.i_code,
product.name,
product.category or '未分類',
old_price,
record.price,
change,
f"{change_pct:.2f}%",
record.timestamp.strftime('%Y-%m-%d %H:%M'),
product.url
])
# 調整欄寬
ws.column_dimensions['A'].width = 12
ws.column_dimensions['B'].width = 40
ws.column_dimensions['C'].width = 15
ws.column_dimensions['D'].width = 12
ws.column_dimensions['E'].width = 12
ws.column_dimensions['F'].width = 12
ws.column_dimensions['G'].width = 12
ws.column_dimensions['H'].width = 18
ws.column_dimensions['I'].width = 50
# 儲存檔案
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"價格變動明細_{filter_type or 'all'}_{timestamp}.xlsx"
filepath = os.path.join(EXCEL_EXPORT_DIR, filename)
os.makedirs(EXCEL_EXPORT_DIR, exist_ok=True)
wb.save(filepath)
return send_file(filepath, as_attachment=True, download_name=filename)
except Exception as e:
sys_log.error(f"[Web] [Export] 異動報表匯出失敗 | Type: {filter_type} | Error: {e}")
return f"匯出失敗: {e}", 500
# ==========================================
# 其他匯出功能
# ==========================================
@export_bp.route('/api/export/low_prices')
@login_required
def export_low_prices():
"""匯出歷史低價商品"""
try:
exporter = Exporter()
file_path = exporter.generate_low_price_report()
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "目前無歷史低價商品", 404
except Exception as e:
sys_log.error(f"[Web] [Export] 低價報表匯出失敗 | Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/changes')
@login_required
def export_changes():
"""匯出篩選後的資料 (漲/跌/下架)"""
filter_type = request.args.get('type')
exporter = Exporter()
file_path = None
try:
unique_items, today_start = _get_consolidated_data()
if filter_type == 'increase':
target_items = [i for i in unique_items if i['yesterday_diff'] > 0]
file_path = exporter.generate_custom_report(target_items, "今日漲價商品")
elif filter_type == 'decrease':
target_items = [i for i in unique_items if i['yesterday_diff'] < 0]
file_path = exporter.generate_custom_report(target_items, "今日跌價商品")
elif filter_type == 'delisted':
db = DatabaseManager()
session = db.get_session()
try:
today_delisted_query = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start # 保持台北時區
)
raw_delisted_items = today_delisted_query.all()
delisted_items_with_price = []
for p in raw_delisted_items:
last_rec = session.query(PriceRecord).filter_by(product_id=p.id).order_by(
desc(PriceRecord.timestamp)).first()
price = last_rec.price if last_rec else 0
delisted_items_with_price.append({'product': p, 'last_price': price})
file_path = exporter.generate_delisted_report(delisted_items_with_price, "今日下架商品")
finally:
session.close()
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "無資料可匯出", 404
except Exception as e:
sys_log.error(f"[Web] [Export] 篩選匯出失敗 | Type: {filter_type} | Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/abc')
@login_required
def export_abc_analysis():
"""匯出 ABC 分析報表 (Excel)"""
try:
table_name = 'realtime_sales_monthly'
_SALES_PROCESSED_CACHE = _get_sales_cache()
# 嘗試從快取讀取資料
df = None
cols_map = {}
if table_name in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[table_name]
df = cache_data['df']
cols_map = cache_data['cols']
else:
return "請先瀏覽「業績分析」頁面以載入資料與快取。", 400
# 恢復欄位變數
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
# 篩選資料
selected_category = request.args.get('category', 'all')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
keyword = request.args.get('keyword', '').strip()
min_price = request.args.get('min_price', '')
max_price = request.args.get('max_price', '')
min_margin = request.args.get('min_margin', '')
max_margin = request.args.get('max_margin', '')
target_df = df.copy()
# 重新計算 Top N 分類
TOP_N_CATS = 12
top_cats_names = []
if col_category:
cat_group_all = df.groupby(col_category)[col_amount].sum().sort_values(ascending=False)
if len(cat_group_all) > TOP_N_CATS:
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
if selected_category != 'all' and col_category:
if selected_category == '其他' and top_cats_names:
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
else:
target_df = target_df[target_df[col_category] == selected_category]
if selected_brand != 'all' and col_brand:
target_df = target_df[target_df[col_brand] == selected_brand]
if selected_vendor != 'all' and col_vendor:
target_df = target_df[target_df[col_vendor] == selected_vendor]
if keyword:
target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
if min_margin:
target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
if max_margin:
target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
# 執行 ABC 分析與匯出
if col_amount and not target_df.empty:
agg_rules = {col_amount: 'sum'}
if col_qty:
agg_rules[col_qty] = 'sum'
if col_cost:
agg_rules[col_cost] = 'sum'
if col_profit:
agg_rules[col_profit] = 'sum'
if col_category:
agg_rules[col_category] = 'first'
if col_vendor:
agg_rules[col_vendor] = 'first'
if col_brand:
agg_rules[col_brand] = 'first'
if col_pid:
agg_rules[col_pid] = 'first'
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
# 計算毛利率
if col_profit:
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['calculated_margin_rate'] = 0.0
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 排序與 ABC 分類
target_df = df_agg.sort_values(by=col_amount, ascending=False)
target_df['cumulative_revenue'] = target_df[col_amount].cumsum()
total_revenue = target_df[col_amount].sum()
target_df['cumulative_pct'] = (target_df['cumulative_revenue'] / total_revenue) * 100
conditions = [(target_df['cumulative_pct'] <= 80), (target_df['cumulative_pct'] <= 95)]
choices = ['A', 'B']
target_df['ABC_Class'] = np.select(conditions, choices, default='C')
# 支援依類別篩選匯出
filter_class = request.args.get('class')
if filter_class:
target_df = target_df[target_df['ABC_Class'] == filter_class]
# 計算平均單價
if col_qty:
target_df['avg_unit_price'] = (target_df[col_amount] / target_df[col_qty]).fillna(0)
# 計算建議補貨量
if col_qty:
custom_factor = request.args.get('factor')
if custom_factor:
try:
factor = float(custom_factor)
target_df['suggested_restock'] = (target_df[col_qty] * factor).astype(int)
except:
conditions_restock = [(target_df['ABC_Class'] == 'A'), (target_df['ABC_Class'] == 'B')]
choices_restock = [target_df[col_qty] * 1.5, target_df[col_qty] * 1.2]
target_df['suggested_restock'] = np.select(conditions_restock, choices_restock, default=0).astype(int)
else:
conditions_restock = [(target_df['ABC_Class'] == 'A'), (target_df['ABC_Class'] == 'B')]
choices_restock = [target_df[col_qty] * 1.5, target_df[col_qty] * 1.2]
target_df['suggested_restock'] = np.select(conditions_restock, choices_restock, default=0).astype(int)
# 整理匯出欄位
export_cols = []
header_map = {}
if col_pid:
export_cols.append(col_pid)
header_map[col_pid] = '商品ID'
if col_name:
export_cols.append(col_name)
header_map[col_name] = '商品名稱'
if col_category:
export_cols.append(col_category)
header_map[col_category] = '分類'
if col_brand:
export_cols.append(col_brand)
header_map[col_brand] = '品牌'
if col_vendor:
export_cols.append(col_vendor)
header_map[col_vendor] = '廠商'
export_cols.append('ABC_Class')
header_map['ABC_Class'] = 'ABC分類'
if col_amount:
export_cols.append(col_amount)
header_map[col_amount] = '銷售金額'
if col_qty:
export_cols.append(col_qty)
header_map[col_qty] = '銷售數量'
if 'avg_unit_price' in target_df.columns:
export_cols.append('avg_unit_price')
header_map['avg_unit_price'] = '平均單價'
if col_cost:
export_cols.append(col_cost)
header_map[col_cost] = '成本'
if col_profit:
export_cols.append(col_profit)
header_map[col_profit] = '毛利'
if 'calculated_margin_rate' in target_df.columns:
export_cols.append('calculated_margin_rate')
header_map['calculated_margin_rate'] = '毛利率(%)'
if 'suggested_restock' in target_df.columns:
export_cols.append('suggested_restock')
header_map['suggested_restock'] = '建議補貨量'
export_df = target_df[export_cols].rename(columns=header_map)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
export_df.to_excel(writer, index=False, sheet_name='ABC分析')
output.seek(0)
filename_prefix = f"ABC_Analysis_{filter_class}_" if filter_class else "ABC_Analysis_"
return send_file(
output,
as_attachment=True,
download_name=f"{filename_prefix}{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
return "無資料可匯出", 404
except Exception as e:
sys_log.error(f"ABC Export Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/vendor')
@login_required
def export_vendor_analysis():
"""匯出廠商獲利能力排行 (Excel)"""
try:
table_name = 'realtime_sales_monthly'
_SALES_PROCESSED_CACHE = _get_sales_cache()
# 嘗試從快取讀取資料
df = None
cols_map = {}
if table_name in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[table_name]
df = cache_data['df']
cols_map = cache_data['cols']
else:
params = {k: v for k, v in request.args.items()}
flash('資料快取已失效,請稍候重新載入資料後再匯出。', 'warning')
return redirect(url_for('sales_analysis', **params))
col_vendor = cols_map.get('vendor')
col_amount = cols_map.get('amount')
col_profit = cols_map.get('profit')
col_cost = cols_map.get('cost')
if not col_vendor or not col_amount:
return "資料缺少必要欄位(廠商、銷售金額)", 400
# 按廠商聚合
agg_rules = {col_amount: 'sum'}
if col_profit:
agg_rules[col_profit] = 'sum'
if col_cost:
agg_rules[col_cost] = 'sum'
vendor_df = df.groupby(col_vendor).agg(agg_rules).reset_index()
# 計算毛利率
if col_profit:
vendor_df['margin_rate'] = (vendor_df[col_profit] / vendor_df[col_amount]) * 100
elif col_cost:
vendor_df['margin_rate'] = ((vendor_df[col_amount] - vendor_df[col_cost]) / vendor_df[col_amount]) * 100
else:
vendor_df['margin_rate'] = 0
vendor_df['margin_rate'] = vendor_df['margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 排序
vendor_df = vendor_df.sort_values(by=col_amount, ascending=False)
# 重命名欄位
rename_map = {col_vendor: '廠商', col_amount: '銷售金額', 'margin_rate': '毛利率(%)'}
if col_profit:
rename_map[col_profit] = '毛利'
if col_cost:
rename_map[col_cost] = '成本'
export_df = vendor_df.rename(columns=rename_map)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
export_df.to_excel(writer, index=False, sheet_name='廠商分析')
output.seek(0)
return send_file(
output,
as_attachment=True,
download_name=f"Vendor_Analysis_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
sys_log.error(f"Vendor Export Error: {e}")
return f"匯出失敗: {e}", 500