Files
ewoooc/routes/export_routes.py
OoO 8145c227c7
All checks were successful
CD Pipeline / deploy (push) Successful in 1m7s
fix: sanitize excel export values
2026-06-18 14:58:32 +08:00

1013 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
匯出功能路由模組
包含:各種報表匯出 API (Excel、CSV)
"""
import os
import io
import json
import re
from datetime import datetime, timezone, timedelta
from flask import Blueprint, request, send_file, redirect, url_for, flash
from auth import login_required
from sqlalchemy import func, desc, text
import pandas as pd
import numpy as np
from config import BASE_DIR, EXCEL_EXPORT_DIR
from database.manager import DatabaseManager
from database.models import Product, PriceRecord
from services.exporter import Exporter
from services.logger_manager import SystemLogger
from utils.momo_url_utils import build_momo_product_url, normalize_momo_product_url
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Logger
sys_log = SystemLogger("ExportRoutes").get_logger()
# Blueprint 定義
export_bp = Blueprint('export', __name__)
_EXCEL_ILLEGAL_CHAR_RE = re.compile(r'[\x00-\x08\x0B-\x0C\x0E-\x1F]')
# ==========================================
# 輔助函數 (使用獨立模組,避免循環依賴)
# ==========================================
def _get_consolidated_data():
"""從 dashboard_routes 模組導入 get_consolidated_data 函數"""
from routes.dashboard_routes import get_consolidated_data
return get_consolidated_data()
def _get_sales_cache():
"""從 cache_manager 導入業績分析快取。"""
from services.cache_manager import _SALES_PROCESSED_CACHE
return _SALES_PROCESSED_CACHE
def _sanitize_excel_cell(value):
"""Remove control characters rejected by openpyxl worksheet cells."""
if isinstance(value, str):
return _EXCEL_ILLEGAL_CHAR_RE.sub('', value)
return value
def _sanitize_excel_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Return an Excel-safe copy without changing numeric/date columns."""
if df.empty:
return df
cleaned = df.copy()
for column in cleaned.columns:
if cleaned[column].dtype == object:
cleaned[column] = cleaned[column].map(_sanitize_excel_cell)
return cleaned
def _flatten_review_decision_envelope(item):
"""Flatten the shared review decision envelope into operator-friendly columns."""
envelope = item.get('decision_envelope') or {}
guardrails = envelope.get('guardrails') or {}
recommended_action = envelope.get('recommended_action') or {}
evidence = envelope.get('evidence') or []
evidence_parts = []
if isinstance(evidence, list):
for row in evidence[:6]:
if not isinstance(row, dict):
continue
metric = row.get('metric') or row.get('type') or 'evidence'
value = row.get('value')
basis = row.get('basis') or ''
text = f"{metric}={value}" if value not in (None, '') else str(metric)
if basis:
text = f"{text} ({basis})"
evidence_parts.append(text)
return {
'決策信封ID': envelope.get('decision_id') or '',
'決策類型': envelope.get('decision_type') or '',
'決策等級': envelope.get('severity') or '',
'決策建議代碼': recommended_action.get('action') or '',
'決策責任人': recommended_action.get('owner') or '',
'需人工覆核': '' if recommended_action.get('requires_hitl') else '',
'資料品質': guardrails.get('data_quality') or '',
'自動執行允許': '' if guardrails.get('can_auto_execute') else '',
'自動執行阻擋原因': guardrails.get('blocked_reason') or '',
'決策證據摘要': ''.join(evidence_parts),
}
# ==========================================
# 全分類匯出
# ==========================================
@export_bp.route('/api/export/all_categories')
@login_required
def export_all_categories():
"""處理全分類報表匯出請求"""
try:
sys_log.info("執行全分類 CSV 數據導出...")
# 獲取與看板一致的整合數據
items, _ = _get_consolidated_data()
# 呼叫匯出服務
exporter = Exporter()
file_path = exporter.generate_all_categories_report()
if file_path:
abs_file_path = os.path.abspath(file_path)
if os.path.exists(abs_file_path):
sys_log.info(f"報表匯出成功,準備下載: {abs_file_path}")
return send_file(abs_file_path, as_attachment=True)
return "匯出失敗:資料庫內尚無足夠數據", 404
except Exception as e:
sys_log.error(f"[Web] [Export] 全分類報表匯出異常 | Error: {e}")
return f"匯出失敗,錯誤詳情:{e}", 500
@export_bp.route('/api/export/excel/all')
@login_required
def export_excel_all():
"""匯出所有商品 Excel"""
try:
items, _ = _get_consolidated_data()
exporter = Exporter()
file_path = exporter.generate_all_products_excel(items)
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "匯出失敗", 500
except Exception as e:
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (All) | Error: {e}")
return f"匯出失敗: {e}", 500
# ==========================================
# 價格變動匯出
# ==========================================
@export_bp.route('/api/export/excel/changes')
@login_required
def export_excel_changes():
"""匯出價格變動商品 Excel (漲價/跌價)"""
try:
items, _ = _get_consolidated_data()
increase = [i for i in items if i['yesterday_diff'] > 0]
decrease = [i for i in items if i['yesterday_diff'] < 0]
exporter = Exporter()
file_path = exporter.generate_changes_excel(increase, decrease)
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "匯出失敗", 500
except Exception as e:
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (Changes) | Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/ai-picks')
@login_required
def export_excel_ai_picks():
"""匯出 AI 挑品清單 Excel資料來源為正式 ai_price_recommendations。"""
db = DatabaseManager()
session = db.get_session()
try:
rows = session.execute(text("""
WITH valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
)
SELECT
ROW_NUMBER() OVER (
ORDER BY ar.confidence DESC NULLS LAST,
ar.gap_pct DESC NULLS LAST,
ar.created_at DESC
) AS rank,
ar.sku,
ar.name,
p.category,
ar.momo_price,
ar.pchome_price,
ar.gap_pct,
ar.confidence,
ar.sales_7d_delta,
ar.reason,
ar.model_footprint,
ar.created_at,
p.url AS momo_url,
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.crawled_at
FROM ai_price_recommendations ar
LEFT JOIN products p ON p.i_code = ar.sku
LEFT JOIN valid_competitor vc ON vc.sku = ar.sku
WHERE ar.strategy = 'product_pick'
AND ar.status = 'pending'
ORDER BY ar.confidence DESC NULLS LAST,
ar.gap_pct DESC NULLS LAST,
ar.created_at DESC
LIMIT 50
""")).mappings().all()
if not rows:
return "目前沒有 AI 挑品資料可匯出", 404
export_rows = []
for row in rows:
sku = str(row.get('sku') or '')
normalized_sku = str(sku or '').strip()
pchome_id = row.get('competitor_product_id') or ''
momo_url = normalize_momo_product_url(row.get('momo_url'), normalized_sku) or build_momo_product_url(normalized_sku)
pchome_url = f"https://24h.pchome.com.tw/prod/{str(pchome_id).strip()}" if pchome_id else ''
footprint = row.get('model_footprint') or {}
if isinstance(footprint, str):
try:
footprint = json.loads(footprint)
except Exception:
footprint = {}
agent_footprint = footprint.get('agent', {}) if isinstance(footprint, dict) else {}
missing_evidence = agent_footprint.get('missing_evidence') or []
export_rows.append({
'AI排名': int(row.get('rank') or 0),
'MOMO商品ID': sku,
'MOMO商品名稱': row.get('name') or '',
'分類': row.get('category') or '',
'MOMO價格': float(row.get('momo_price') or 0),
'PChome價格': float(row.get('pchome_price') or 0),
'價差百分比': float(row.get('gap_pct') or 0),
'AI信心百分比': round(float(row.get('confidence') or 0) * 100, 1),
'機會分數': float(agent_footprint.get('opportunity_score') or 0),
'證據完整度': float(agent_footprint.get('evidence_quality') or 0),
'信心分層': agent_footprint.get('confidence_band') or '',
'待補證據': ''.join(str(item) for item in missing_evidence),
'近7日銷售變化': float(row.get('sales_7d_delta') or 0),
'PChome商品ID': pchome_id,
'PChome商品名稱': row.get('competitor_product_name') or '',
'PChome比對分數': round(float(row.get('match_score') or 0) * 100, 1),
'AI建議理由': row.get('reason') or '',
'MOMO商品URL': momo_url,
'PChome商品URL': pchome_url,
'AI產生時間': row.get('created_at').strftime('%Y-%m-%d %H:%M:%S') if row.get('created_at') else '',
'PChome抓取時間': row.get('crawled_at').strftime('%Y-%m-%d %H:%M:%S') if row.get('crawled_at') else '',
})
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df = _sanitize_excel_dataframe(pd.DataFrame(export_rows))
df.to_excel(writer, index=False, sheet_name='AI挑品清單')
worksheet = writer.sheets['AI挑品清單']
for column_cells in worksheet.columns:
header = str(column_cells[0].value or '')
width = min(max(len(header) + 4, 12), 42)
if header in {'MOMO商品名稱', 'PChome商品名稱', 'AI建議理由', 'MOMO商品URL', 'PChome商品URL'}:
width = 48
worksheet.column_dimensions[column_cells[0].column_letter].width = width
worksheet.freeze_panes = 'A2'
output.seek(0)
filename = f"AI挑品清單_{datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')}.xlsx"
sys_log.info(f"[Web] [Export] AI 挑品清單匯出成功 | rows={len(export_rows)}")
return send_file(
output,
as_attachment=True,
download_name=filename,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
sys_log.error(f"[Web] [Export] AI 挑品清單匯出失敗 | Error: {e}")
return f"匯出失敗: {e}", 500
finally:
session.close()
@export_bp.route('/api/export/excel/pchome-review')
@login_required
def export_excel_pchome_review():
"""匯出 PChome 比價覆核隊列,保留 matcher 診斷與人工處置欄位。"""
from services.competitor_intel_repository import (
REVIEW_STATUS_FILTER_GROUPS,
fetch_competitor_review_queue_page,
)
db = DatabaseManager()
session = db.get_session()
try:
search_query = (request.args.get('q') or request.args.get('search') or '').strip()
category = (request.args.get('category') or '').strip()
if category.lower() == 'all':
category = ''
status_filter = (request.args.get('review_status') or request.args.get('status') or '').strip()
if status_filter == 'all' or status_filter not in REVIEW_STATUS_FILTER_GROUPS:
status_filter = ''
try:
limit = int(request.args.get('limit') or 500)
except (TypeError, ValueError):
limit = 500
limit = max(1, min(limit, 2000))
engine = session.get_bind()
rows = []
page = 1
while len(rows) < limit:
per_page = min(100, limit - len(rows))
payload = fetch_competitor_review_queue_page(
engine,
page=page,
per_page=per_page,
search_query=search_query,
category=category,
status_filter=status_filter,
)
batch = payload.get('items') or []
if not batch:
break
rows.extend(batch)
if len(rows) >= int(payload.get('total') or 0) or len(batch) < per_page:
break
page += 1
if not rows:
return "目前沒有 PChome 覆核資料可匯出", 404
export_rows = []
for idx, item in enumerate(rows[:limit], start=1):
sku = str(item.get('sku') or '').strip()
pchome_id = str(item.get('candidate_pc_id') or '').strip()
unit_comparison = item.get('unit_comparison') or {}
momo_url = build_momo_product_url(sku) if sku else ''
pchome_url = f"https://24h.pchome.com.tw/prod/{pchome_id}" if pchome_id else ''
export_rows.append({
'覆核序': idx,
'狀態': item.get('status_label') or '',
'建議處置': item.get('action_label') or '',
'診斷原因': item.get('diagnostic_reason_text') or '',
**_flatten_review_decision_envelope(item),
'MOMO商品ID': sku,
'MOMO商品名稱': item.get('name') or '',
'分類': item.get('category') or '',
'MOMO價格': float(item.get('momo_price') or 0),
'候選PChome商品ID': pchome_id,
'候選PChome商品名稱': item.get('candidate_pc_name') or '',
'候選PChome價格': float(item.get('candidate_pc_price') or 0),
'Match分數%': round(float(item.get('best_match_score') or 0) * 100, 1),
'候選數': int(item.get('candidate_count') or 0),
'單位價比較': unit_comparison.get('summary') or '',
'原始診斷': item.get('match_diagnostic') or '',
'嘗試時間': item.get('attempted_at') or '',
'MOMO商品URL': momo_url,
'PChome商品URL': pchome_url,
})
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df = _sanitize_excel_dataframe(pd.DataFrame(export_rows))
df.to_excel(writer, index=False, sheet_name='PChome覆核隊列')
worksheet = writer.sheets['PChome覆核隊列']
for column_cells in worksheet.columns:
header = str(column_cells[0].value or '')
width = min(max(len(header) + 4, 12), 42)
if header in {
'MOMO商品名稱',
'候選PChome商品名稱',
'建議處置',
'決策信封ID',
'決策建議代碼',
'診斷原因',
'自動執行阻擋原因',
'決策證據摘要',
'單位價比較',
'原始診斷',
'MOMO商品URL',
'PChome商品URL',
}:
width = 52
worksheet.column_dimensions[column_cells[0].column_letter].width = width
worksheet.freeze_panes = 'A2'
output.seek(0)
status_label = status_filter or 'all'
filename = f"PChome比價覆核_{status_label}_{datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')}.xlsx"
sys_log.info(
f"[Web] [Export] PChome 覆核隊列匯出成功 | rows={len(export_rows)} status={status_label}"
)
return send_file(
output,
as_attachment=True,
download_name=filename,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
sys_log.error(f"[Web] [Export] PChome 覆核隊列匯出失敗 | Error: {e}")
return f"匯出失敗: {e}", 500
finally:
session.close()
@export_bp.route('/api/export/excel/delisted')
@login_required
def export_excel_delisted():
"""匯出下架商品 Excel"""
db = DatabaseManager()
session = db.get_session()
try:
_, today_start = _get_consolidated_data()
today_delisted_query = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start # 保持台北時區
)
raw_items = today_delisted_query.all()
delisted_items = [{
'product': p,
'last_price': (session.query(PriceRecord).filter_by(product_id=p.id)
.order_by(desc(PriceRecord.timestamp)).first().price
if session.query(PriceRecord).filter_by(product_id=p.id).first() else 0)
} for p in raw_items]
exporter = Exporter()
file_path = exporter.generate_delisted_excel(delisted_items)
return send_file(file_path, as_attachment=True)
except Exception as e:
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (Delisted) | Error: {e}")
return f"匯出失敗: {e}", 500
finally:
session.close()
@export_bp.route('/api/export/price_changes')
@login_required
def export_price_changes():
"""匯出今日價格異動明細 (支援篩選)"""
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
filter_type = request.args.get('type', '')
filter_category = request.args.get('category', '')
try:
db = DatabaseManager()
session = db.get_session()
now_taipei = datetime.now(TAIPEI_TZ)
today_start = now_taipei.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
# 基礎查詢:取得所有商品的最新記錄
latest_records_subq = session.query(
func.max(PriceRecord.id).label('max_id')
).group_by(PriceRecord.product_id).subquery()
query = session.query(PriceRecord, Product).join(
latest_records_subq,
PriceRecord.id == latest_records_subq.c.max_id
).join(Product, PriceRecord.product_id == Product.id)
# 查詢所有商品的「今日之前最後價格」
product_ids = [r[0] for r in session.query(PriceRecord.product_id).join(
latest_records_subq, PriceRecord.id == latest_records_subq.c.max_id
).all()]
yesterday_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < today_start
).group_by(PriceRecord.product_id).subquery()
yesterday_prices_q = session.query(
PriceRecord.product_id, PriceRecord.price
).join(
yesterday_prices_subq,
PriceRecord.id == yesterday_prices_subq.c.max_id
)
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
products = []
# 根據 filter_type 篩選
if filter_type == 'increase':
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price > old_price:
products.append((product, record, old_price))
elif filter_type == 'decrease':
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price < old_price:
products.append((product, record, old_price))
elif filter_type == 'delisted':
today_delisted = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start
).all()
for product in today_delisted:
last_record = session.query(PriceRecord).filter(
PriceRecord.product_id == product.id
).order_by(PriceRecord.timestamp.desc()).first()
if last_record:
products.append((product, last_record, last_record.price))
elif filter_type == 'active':
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price != old_price:
products.append((product, record, old_price))
elif filter_type == 'category' and filter_category:
for record, product in query.filter(Product.category == filter_category).all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price != old_price:
products.append((product, record, old_price))
else:
# 預設:所有變動商品
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price != old_price:
products.append((product, record, old_price))
session.close()
if not products:
return "無符合條件的商品資料", 404
# 建立 Excel
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "價格變動明細"
# 標題列
headers = ['商品ID', '商品名稱', '分類', '原價格', '現價格', '變動金額', '變動百分比', '更新時間', '商品網址']
ws.append(headers)
# 設定標題列樣式
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
header_font = Font(bold=True, color='FFFFFF')
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 填充資料
for product, record, old_price in products:
change = record.price - old_price
change_pct = (change / old_price * 100) if old_price > 0 else 0
safe_product_url = normalize_momo_product_url(product.url, product.i_code) or build_momo_product_url(product.i_code)
ws.append([
product.i_code,
product.name,
product.category or '未分類',
old_price,
record.price,
change,
f"{change_pct:.2f}%",
record.timestamp.strftime('%Y-%m-%d %H:%M'),
safe_product_url
])
# 調整欄寬
ws.column_dimensions['A'].width = 12
ws.column_dimensions['B'].width = 40
ws.column_dimensions['C'].width = 15
ws.column_dimensions['D'].width = 12
ws.column_dimensions['E'].width = 12
ws.column_dimensions['F'].width = 12
ws.column_dimensions['G'].width = 12
ws.column_dimensions['H'].width = 18
ws.column_dimensions['I'].width = 50
# 儲存檔案
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"價格變動明細_{filter_type or 'all'}_{timestamp}.xlsx"
filepath = os.path.join(EXCEL_EXPORT_DIR, filename)
os.makedirs(EXCEL_EXPORT_DIR, exist_ok=True)
wb.save(filepath)
return send_file(filepath, as_attachment=True, download_name=filename)
except Exception as e:
sys_log.error(f"[Web] [Export] 異動報表匯出失敗 | Type: {filter_type} | Error: {e}")
return f"匯出失敗: {e}", 500
# ==========================================
# 其他匯出功能
# ==========================================
@export_bp.route('/api/export/low_prices')
@login_required
def export_low_prices():
"""匯出歷史低價商品"""
try:
exporter = Exporter()
file_path = exporter.generate_low_price_report()
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "目前無歷史低價商品", 404
except Exception as e:
sys_log.error(f"[Web] [Export] 低價報表匯出失敗 | Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/changes')
@login_required
def export_changes():
"""匯出篩選後的資料 (漲/跌/下架)"""
filter_type = request.args.get('type')
exporter = Exporter()
file_path = None
try:
unique_items, today_start = _get_consolidated_data()
if filter_type == 'increase':
target_items = [i for i in unique_items if i['yesterday_diff'] > 0]
file_path = exporter.generate_custom_report(target_items, "今日漲價商品")
elif filter_type == 'decrease':
target_items = [i for i in unique_items if i['yesterday_diff'] < 0]
file_path = exporter.generate_custom_report(target_items, "今日跌價商品")
elif filter_type == 'delisted':
db = DatabaseManager()
session = db.get_session()
try:
today_delisted_query = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start # 保持台北時區
)
raw_delisted_items = today_delisted_query.all()
delisted_items_with_price = []
for p in raw_delisted_items:
last_rec = session.query(PriceRecord).filter_by(product_id=p.id).order_by(
desc(PriceRecord.timestamp)).first()
price = last_rec.price if last_rec else 0
delisted_items_with_price.append({'product': p, 'last_price': price})
file_path = exporter.generate_delisted_report(delisted_items_with_price, "今日下架商品")
finally:
session.close()
if file_path and os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "無資料可匯出", 404
except Exception as e:
sys_log.error(f"[Web] [Export] 篩選匯出失敗 | Type: {filter_type} | Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/abc')
@login_required
def export_abc_analysis():
"""匯出 ABC 分析報表 (Excel)"""
try:
table_name = 'realtime_sales_monthly'
_SALES_PROCESSED_CACHE = _get_sales_cache()
# 嘗試從快取讀取資料
df = None
cols_map = {}
if table_name in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[table_name]
df = cache_data['df']
cols_map = cache_data['cols']
else:
return "請先瀏覽「業績分析」頁面以載入資料與快取。", 400
# 恢復欄位變數
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
# 篩選資料
selected_category = request.args.get('category', 'all')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
keyword = request.args.get('keyword', '').strip()
min_price = request.args.get('min_price', '')
max_price = request.args.get('max_price', '')
min_margin = request.args.get('min_margin', '')
max_margin = request.args.get('max_margin', '')
target_df = df.copy()
# 重新計算 Top N 分類
TOP_N_CATS = 12
top_cats_names = []
if col_category:
cat_group_all = df.groupby(col_category)[col_amount].sum().sort_values(ascending=False)
if len(cat_group_all) > TOP_N_CATS:
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
if selected_category != 'all' and col_category:
if selected_category == '其他' and top_cats_names:
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
else:
target_df = target_df[target_df[col_category] == selected_category]
if selected_brand != 'all' and col_brand:
target_df = target_df[target_df[col_brand] == selected_brand]
if selected_vendor != 'all' and col_vendor:
target_df = target_df[target_df[col_vendor] == selected_vendor]
if keyword:
target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
if min_margin:
target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
if max_margin:
target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
# 執行 ABC 分析與匯出
if col_amount and not target_df.empty:
agg_rules = {col_amount: 'sum'}
if col_qty:
agg_rules[col_qty] = 'sum'
if col_cost:
agg_rules[col_cost] = 'sum'
if col_profit:
agg_rules[col_profit] = 'sum'
if col_category:
agg_rules[col_category] = 'first'
if col_vendor:
agg_rules[col_vendor] = 'first'
if col_brand:
agg_rules[col_brand] = 'first'
if col_pid:
agg_rules[col_pid] = 'first'
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
# 計算毛利率
if col_profit:
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['calculated_margin_rate'] = 0.0
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 排序與 ABC 分類
target_df = df_agg.sort_values(by=col_amount, ascending=False)
target_df['cumulative_revenue'] = target_df[col_amount].cumsum()
total_revenue = target_df[col_amount].sum()
target_df['cumulative_pct'] = (target_df['cumulative_revenue'] / total_revenue) * 100
conditions = [(target_df['cumulative_pct'] <= 80), (target_df['cumulative_pct'] <= 95)]
choices = ['A', 'B']
target_df['ABC_Class'] = np.select(conditions, choices, default='C')
# 支援依類別篩選匯出
filter_class = request.args.get('class')
if filter_class:
target_df = target_df[target_df['ABC_Class'] == filter_class]
# 計算平均單價
if col_qty:
target_df['avg_unit_price'] = (target_df[col_amount] / target_df[col_qty]).fillna(0)
# 計算建議補貨量
if col_qty:
custom_factor = request.args.get('factor')
if custom_factor:
try:
factor = float(custom_factor)
target_df['suggested_restock'] = (target_df[col_qty] * factor).astype(int)
except:
conditions_restock = [(target_df['ABC_Class'] == 'A'), (target_df['ABC_Class'] == 'B')]
choices_restock = [target_df[col_qty] * 1.5, target_df[col_qty] * 1.2]
target_df['suggested_restock'] = np.select(conditions_restock, choices_restock, default=0).astype(int)
else:
conditions_restock = [(target_df['ABC_Class'] == 'A'), (target_df['ABC_Class'] == 'B')]
choices_restock = [target_df[col_qty] * 1.5, target_df[col_qty] * 1.2]
target_df['suggested_restock'] = np.select(conditions_restock, choices_restock, default=0).astype(int)
# 整理匯出欄位
export_cols = []
header_map = {}
if col_pid:
export_cols.append(col_pid)
header_map[col_pid] = '商品ID'
if col_name:
export_cols.append(col_name)
header_map[col_name] = '商品名稱'
if col_category:
export_cols.append(col_category)
header_map[col_category] = '分類'
if col_brand:
export_cols.append(col_brand)
header_map[col_brand] = '品牌'
if col_vendor:
export_cols.append(col_vendor)
header_map[col_vendor] = '廠商'
export_cols.append('ABC_Class')
header_map['ABC_Class'] = 'ABC分類'
if col_amount:
export_cols.append(col_amount)
header_map[col_amount] = '銷售金額'
if col_qty:
export_cols.append(col_qty)
header_map[col_qty] = '銷售數量'
if 'avg_unit_price' in target_df.columns:
export_cols.append('avg_unit_price')
header_map['avg_unit_price'] = '平均單價'
if col_cost:
export_cols.append(col_cost)
header_map[col_cost] = '成本'
if col_profit:
export_cols.append(col_profit)
header_map[col_profit] = '毛利'
if 'calculated_margin_rate' in target_df.columns:
export_cols.append('calculated_margin_rate')
header_map['calculated_margin_rate'] = '毛利率(%)'
if 'suggested_restock' in target_df.columns:
export_cols.append('suggested_restock')
header_map['suggested_restock'] = '建議補貨量'
export_df = target_df[export_cols].rename(columns=header_map)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
export_df.to_excel(writer, index=False, sheet_name='ABC分析')
output.seek(0)
filename_prefix = f"ABC_Analysis_{filter_class}_" if filter_class else "ABC_Analysis_"
return send_file(
output,
as_attachment=True,
download_name=f"{filename_prefix}{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
return "無資料可匯出", 404
except Exception as e:
sys_log.error(f"ABC Export Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/vendor')
@login_required
def export_vendor_analysis():
"""匯出廠商獲利能力排行 (Excel)"""
try:
table_name = 'realtime_sales_monthly'
_SALES_PROCESSED_CACHE = _get_sales_cache()
# 嘗試從快取讀取資料
df = None
cols_map = {}
if table_name in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[table_name]
df = cache_data['df']
cols_map = cache_data['cols']
else:
params = {k: v for k, v in request.args.items()}
flash('資料快取已失效,請稍候重新載入資料後再匯出。', 'warning')
return redirect(url_for('sales.sales_analysis', **params))
col_vendor = cols_map.get('vendor')
col_amount = cols_map.get('amount')
col_profit = cols_map.get('profit')
col_cost = cols_map.get('cost')
if not col_vendor or not col_amount:
return "資料缺少必要欄位(廠商、銷售金額)", 400
# 按廠商聚合
agg_rules = {col_amount: 'sum'}
if col_profit:
agg_rules[col_profit] = 'sum'
if col_cost:
agg_rules[col_cost] = 'sum'
vendor_df = df.groupby(col_vendor).agg(agg_rules).reset_index()
# 計算毛利率
if col_profit:
vendor_df['margin_rate'] = (vendor_df[col_profit] / vendor_df[col_amount]) * 100
elif col_cost:
vendor_df['margin_rate'] = ((vendor_df[col_amount] - vendor_df[col_cost]) / vendor_df[col_amount]) * 100
else:
vendor_df['margin_rate'] = 0
vendor_df['margin_rate'] = vendor_df['margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 排序
vendor_df = vendor_df.sort_values(by=col_amount, ascending=False)
# 重命名欄位
rename_map = {col_vendor: '廠商', col_amount: '銷售金額', 'margin_rate': '毛利率(%)'}
if col_profit:
rename_map[col_profit] = '毛利'
if col_cost:
rename_map[col_cost] = '成本'
export_df = vendor_df.rename(columns=rename_map)
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
export_df.to_excel(writer, index=False, sheet_name='廠商分析')
output.seek(0)
return send_file(
output,
as_attachment=True,
download_name=f"Vendor_Analysis_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
sys_log.error(f"Vendor Export Error: {e}")
return f"匯出失敗: {e}", 500
@export_bp.route('/api/export/excel/seasonality_detail')
@login_required
def export_seasonality_detail():
"""匯出淡旺季熱力圖的詳細資料。"""
try:
from services.cache_manager import _SALES_PROCESSED_CACHE
from routes.sales_routes import _get_filtered_sales_data
table_name = 'realtime_sales_monthly'
data_range_months = int(request.args.get('data_range', '1') or '1')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
if err and table_name in _SALES_PROCESSED_CACHE:
target_df, cols_map, err = _get_filtered_sales_data(table_name)
if err:
return f"匯出失敗: {err}", 400
target_month = request.args.get('target_month')
target_category = request.args.get('target_category')
if not target_month or not target_category:
return "缺少必要參數 (month, category)", 400
col_category = cols_map.get('category')
if not col_category:
return "資料缺少分類欄位", 400
export_df = target_df[
(target_df['_month_str'] == target_month) &
(target_df[col_category] == target_category)
]
if export_df.empty:
return "該月份與分類無資料", 404
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
export_df.to_excel(writer, index=False, sheet_name='明細')
output.seek(0)
filename = f"Seasonality_{target_category}_{target_month}.xlsx"
return send_file(
output,
as_attachment=True,
download_name=filename,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
sys_log.error(f"Seasonality Export Error: {e}")
return f"匯出失敗: {e}", 500