1013 lines
40 KiB
Python
1013 lines
40 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
匯出功能路由模組
|
||
包含:各種報表匯出 API (Excel、CSV)
|
||
"""
|
||
|
||
import os
|
||
import io
|
||
import json
|
||
import re
|
||
from datetime import datetime, timezone, timedelta
|
||
from flask import Blueprint, request, send_file, redirect, url_for, flash
|
||
from auth import login_required
|
||
from sqlalchemy import func, desc, text
|
||
import pandas as pd
|
||
import numpy as np
|
||
|
||
from config import BASE_DIR, EXCEL_EXPORT_DIR
|
||
from database.manager import DatabaseManager
|
||
from database.models import Product, PriceRecord
|
||
from services.exporter import Exporter
|
||
from services.logger_manager import SystemLogger
|
||
from utils.momo_url_utils import build_momo_product_url, normalize_momo_product_url
|
||
|
||
# 時區設定
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
# Logger
|
||
sys_log = SystemLogger("ExportRoutes").get_logger()
|
||
|
||
# Blueprint 定義
|
||
export_bp = Blueprint('export', __name__)
|
||
|
||
_EXCEL_ILLEGAL_CHAR_RE = re.compile(r'[\x00-\x08\x0B-\x0C\x0E-\x1F]')
|
||
|
||
|
||
# ==========================================
|
||
# 輔助函數 (使用獨立模組,避免循環依賴)
|
||
# ==========================================
|
||
|
||
def _get_consolidated_data():
|
||
"""從 dashboard_routes 模組導入 get_consolidated_data 函數"""
|
||
from routes.dashboard_routes import get_consolidated_data
|
||
return get_consolidated_data()
|
||
|
||
|
||
def _get_sales_cache():
|
||
"""從 cache_manager 導入業績分析快取。"""
|
||
from services.cache_manager import _SALES_PROCESSED_CACHE
|
||
return _SALES_PROCESSED_CACHE
|
||
|
||
|
||
def _sanitize_excel_cell(value):
|
||
"""Remove control characters rejected by openpyxl worksheet cells."""
|
||
if isinstance(value, str):
|
||
return _EXCEL_ILLEGAL_CHAR_RE.sub('', value)
|
||
return value
|
||
|
||
|
||
def _sanitize_excel_dataframe(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Return an Excel-safe copy without changing numeric/date columns."""
|
||
if df.empty:
|
||
return df
|
||
cleaned = df.copy()
|
||
for column in cleaned.columns:
|
||
if cleaned[column].dtype == object:
|
||
cleaned[column] = cleaned[column].map(_sanitize_excel_cell)
|
||
return cleaned
|
||
|
||
|
||
def _flatten_review_decision_envelope(item):
|
||
"""Flatten the shared review decision envelope into operator-friendly columns."""
|
||
envelope = item.get('decision_envelope') or {}
|
||
guardrails = envelope.get('guardrails') or {}
|
||
recommended_action = envelope.get('recommended_action') or {}
|
||
evidence = envelope.get('evidence') or []
|
||
evidence_parts = []
|
||
if isinstance(evidence, list):
|
||
for row in evidence[:6]:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
metric = row.get('metric') or row.get('type') or 'evidence'
|
||
value = row.get('value')
|
||
basis = row.get('basis') or ''
|
||
text = f"{metric}={value}" if value not in (None, '') else str(metric)
|
||
if basis:
|
||
text = f"{text} ({basis})"
|
||
evidence_parts.append(text)
|
||
|
||
return {
|
||
'決策信封ID': envelope.get('decision_id') or '',
|
||
'決策類型': envelope.get('decision_type') or '',
|
||
'決策等級': envelope.get('severity') or '',
|
||
'決策建議代碼': recommended_action.get('action') or '',
|
||
'決策責任人': recommended_action.get('owner') or '',
|
||
'需人工覆核': '是' if recommended_action.get('requires_hitl') else '否',
|
||
'資料品質': guardrails.get('data_quality') or '',
|
||
'自動執行允許': '是' if guardrails.get('can_auto_execute') else '否',
|
||
'自動執行阻擋原因': guardrails.get('blocked_reason') or '',
|
||
'決策證據摘要': ';'.join(evidence_parts),
|
||
}
|
||
|
||
|
||
# ==========================================
|
||
# 全分類匯出
|
||
# ==========================================
|
||
|
||
@export_bp.route('/api/export/all_categories')
|
||
@login_required
|
||
def export_all_categories():
|
||
"""處理全分類報表匯出請求"""
|
||
try:
|
||
sys_log.info("執行全分類 CSV 數據導出...")
|
||
|
||
# 獲取與看板一致的整合數據
|
||
items, _ = _get_consolidated_data()
|
||
|
||
# 呼叫匯出服務
|
||
exporter = Exporter()
|
||
file_path = exporter.generate_all_categories_report()
|
||
|
||
if file_path:
|
||
abs_file_path = os.path.abspath(file_path)
|
||
if os.path.exists(abs_file_path):
|
||
sys_log.info(f"報表匯出成功,準備下載: {abs_file_path}")
|
||
return send_file(abs_file_path, as_attachment=True)
|
||
|
||
return "匯出失敗:資料庫內尚無足夠數據", 404
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] 全分類報表匯出異常 | Error: {e}")
|
||
return f"匯出失敗,錯誤詳情:{e}", 500
|
||
|
||
|
||
@export_bp.route('/api/export/excel/all')
|
||
@login_required
|
||
def export_excel_all():
|
||
"""匯出所有商品 Excel"""
|
||
try:
|
||
items, _ = _get_consolidated_data()
|
||
exporter = Exporter()
|
||
file_path = exporter.generate_all_products_excel(items)
|
||
if file_path and os.path.exists(file_path):
|
||
return send_file(file_path, as_attachment=True)
|
||
return "匯出失敗", 500
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (All) | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
# ==========================================
|
||
# 價格變動匯出
|
||
# ==========================================
|
||
|
||
@export_bp.route('/api/export/excel/changes')
|
||
@login_required
|
||
def export_excel_changes():
|
||
"""匯出價格變動商品 Excel (漲價/跌價)"""
|
||
try:
|
||
items, _ = _get_consolidated_data()
|
||
increase = [i for i in items if i['yesterday_diff'] > 0]
|
||
decrease = [i for i in items if i['yesterday_diff'] < 0]
|
||
|
||
exporter = Exporter()
|
||
file_path = exporter.generate_changes_excel(increase, decrease)
|
||
if file_path and os.path.exists(file_path):
|
||
return send_file(file_path, as_attachment=True)
|
||
return "匯出失敗", 500
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (Changes) | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
@export_bp.route('/api/export/excel/ai-picks')
|
||
@login_required
|
||
def export_excel_ai_picks():
|
||
"""匯出 AI 挑品清單 Excel,資料來源為正式 ai_price_recommendations。"""
|
||
db = DatabaseManager()
|
||
session = db.get_session()
|
||
try:
|
||
rows = session.execute(text("""
|
||
WITH valid_competitor AS (
|
||
SELECT DISTINCT ON (cp.sku)
|
||
cp.sku,
|
||
cp.competitor_product_id,
|
||
cp.competitor_product_name,
|
||
cp.match_score,
|
||
cp.crawled_at
|
||
FROM competitor_prices cp
|
||
WHERE cp.source = 'pchome'
|
||
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
|
||
AND cp.price IS NOT NULL
|
||
AND cp.price > 0
|
||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
|
||
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
|
||
)
|
||
SELECT
|
||
ROW_NUMBER() OVER (
|
||
ORDER BY ar.confidence DESC NULLS LAST,
|
||
ar.gap_pct DESC NULLS LAST,
|
||
ar.created_at DESC
|
||
) AS rank,
|
||
ar.sku,
|
||
ar.name,
|
||
p.category,
|
||
ar.momo_price,
|
||
ar.pchome_price,
|
||
ar.gap_pct,
|
||
ar.confidence,
|
||
ar.sales_7d_delta,
|
||
ar.reason,
|
||
ar.model_footprint,
|
||
ar.created_at,
|
||
p.url AS momo_url,
|
||
vc.competitor_product_id,
|
||
vc.competitor_product_name,
|
||
vc.match_score,
|
||
vc.crawled_at
|
||
FROM ai_price_recommendations ar
|
||
LEFT JOIN products p ON p.i_code = ar.sku
|
||
LEFT JOIN valid_competitor vc ON vc.sku = ar.sku
|
||
WHERE ar.strategy = 'product_pick'
|
||
AND ar.status = 'pending'
|
||
ORDER BY ar.confidence DESC NULLS LAST,
|
||
ar.gap_pct DESC NULLS LAST,
|
||
ar.created_at DESC
|
||
LIMIT 50
|
||
""")).mappings().all()
|
||
|
||
if not rows:
|
||
return "目前沒有 AI 挑品資料可匯出", 404
|
||
|
||
export_rows = []
|
||
for row in rows:
|
||
sku = str(row.get('sku') or '')
|
||
normalized_sku = str(sku or '').strip()
|
||
pchome_id = row.get('competitor_product_id') or ''
|
||
momo_url = normalize_momo_product_url(row.get('momo_url'), normalized_sku) or build_momo_product_url(normalized_sku)
|
||
pchome_url = f"https://24h.pchome.com.tw/prod/{str(pchome_id).strip()}" if pchome_id else ''
|
||
footprint = row.get('model_footprint') or {}
|
||
if isinstance(footprint, str):
|
||
try:
|
||
footprint = json.loads(footprint)
|
||
except Exception:
|
||
footprint = {}
|
||
agent_footprint = footprint.get('agent', {}) if isinstance(footprint, dict) else {}
|
||
missing_evidence = agent_footprint.get('missing_evidence') or []
|
||
export_rows.append({
|
||
'AI排名': int(row.get('rank') or 0),
|
||
'MOMO商品ID': sku,
|
||
'MOMO商品名稱': row.get('name') or '',
|
||
'分類': row.get('category') or '',
|
||
'MOMO價格': float(row.get('momo_price') or 0),
|
||
'PChome價格': float(row.get('pchome_price') or 0),
|
||
'價差百分比': float(row.get('gap_pct') or 0),
|
||
'AI信心百分比': round(float(row.get('confidence') or 0) * 100, 1),
|
||
'機會分數': float(agent_footprint.get('opportunity_score') or 0),
|
||
'證據完整度': float(agent_footprint.get('evidence_quality') or 0),
|
||
'信心分層': agent_footprint.get('confidence_band') or '',
|
||
'待補證據': '、'.join(str(item) for item in missing_evidence),
|
||
'近7日銷售變化': float(row.get('sales_7d_delta') or 0),
|
||
'PChome商品ID': pchome_id,
|
||
'PChome商品名稱': row.get('competitor_product_name') or '',
|
||
'PChome比對分數': round(float(row.get('match_score') or 0) * 100, 1),
|
||
'AI建議理由': row.get('reason') or '',
|
||
'MOMO商品URL': momo_url,
|
||
'PChome商品URL': pchome_url,
|
||
'AI產生時間': row.get('created_at').strftime('%Y-%m-%d %H:%M:%S') if row.get('created_at') else '',
|
||
'PChome抓取時間': row.get('crawled_at').strftime('%Y-%m-%d %H:%M:%S') if row.get('crawled_at') else '',
|
||
})
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
df = _sanitize_excel_dataframe(pd.DataFrame(export_rows))
|
||
df.to_excel(writer, index=False, sheet_name='AI挑品清單')
|
||
worksheet = writer.sheets['AI挑品清單']
|
||
for column_cells in worksheet.columns:
|
||
header = str(column_cells[0].value or '')
|
||
width = min(max(len(header) + 4, 12), 42)
|
||
if header in {'MOMO商品名稱', 'PChome商品名稱', 'AI建議理由', 'MOMO商品URL', 'PChome商品URL'}:
|
||
width = 48
|
||
worksheet.column_dimensions[column_cells[0].column_letter].width = width
|
||
worksheet.freeze_panes = 'A2'
|
||
|
||
output.seek(0)
|
||
filename = f"AI挑品清單_{datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')}.xlsx"
|
||
sys_log.info(f"[Web] [Export] AI 挑品清單匯出成功 | rows={len(export_rows)}")
|
||
return send_file(
|
||
output,
|
||
as_attachment=True,
|
||
download_name=filename,
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
)
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] AI 挑品清單匯出失敗 | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@export_bp.route('/api/export/excel/pchome-review')
|
||
@login_required
|
||
def export_excel_pchome_review():
|
||
"""匯出 PChome 比價覆核隊列,保留 matcher 診斷與人工處置欄位。"""
|
||
from services.competitor_intel_repository import (
|
||
REVIEW_STATUS_FILTER_GROUPS,
|
||
fetch_competitor_review_queue_page,
|
||
)
|
||
|
||
db = DatabaseManager()
|
||
session = db.get_session()
|
||
try:
|
||
search_query = (request.args.get('q') or request.args.get('search') or '').strip()
|
||
category = (request.args.get('category') or '').strip()
|
||
if category.lower() == 'all':
|
||
category = ''
|
||
|
||
status_filter = (request.args.get('review_status') or request.args.get('status') or '').strip()
|
||
if status_filter == 'all' or status_filter not in REVIEW_STATUS_FILTER_GROUPS:
|
||
status_filter = ''
|
||
|
||
try:
|
||
limit = int(request.args.get('limit') or 500)
|
||
except (TypeError, ValueError):
|
||
limit = 500
|
||
limit = max(1, min(limit, 2000))
|
||
|
||
engine = session.get_bind()
|
||
rows = []
|
||
page = 1
|
||
while len(rows) < limit:
|
||
per_page = min(100, limit - len(rows))
|
||
payload = fetch_competitor_review_queue_page(
|
||
engine,
|
||
page=page,
|
||
per_page=per_page,
|
||
search_query=search_query,
|
||
category=category,
|
||
status_filter=status_filter,
|
||
)
|
||
batch = payload.get('items') or []
|
||
if not batch:
|
||
break
|
||
rows.extend(batch)
|
||
if len(rows) >= int(payload.get('total') or 0) or len(batch) < per_page:
|
||
break
|
||
page += 1
|
||
|
||
if not rows:
|
||
return "目前沒有 PChome 覆核資料可匯出", 404
|
||
|
||
export_rows = []
|
||
for idx, item in enumerate(rows[:limit], start=1):
|
||
sku = str(item.get('sku') or '').strip()
|
||
pchome_id = str(item.get('candidate_pc_id') or '').strip()
|
||
unit_comparison = item.get('unit_comparison') or {}
|
||
momo_url = build_momo_product_url(sku) if sku else ''
|
||
pchome_url = f"https://24h.pchome.com.tw/prod/{pchome_id}" if pchome_id else ''
|
||
export_rows.append({
|
||
'覆核序': idx,
|
||
'狀態': item.get('status_label') or '',
|
||
'建議處置': item.get('action_label') or '',
|
||
'診斷原因': item.get('diagnostic_reason_text') or '',
|
||
**_flatten_review_decision_envelope(item),
|
||
'MOMO商品ID': sku,
|
||
'MOMO商品名稱': item.get('name') or '',
|
||
'分類': item.get('category') or '',
|
||
'MOMO價格': float(item.get('momo_price') or 0),
|
||
'候選PChome商品ID': pchome_id,
|
||
'候選PChome商品名稱': item.get('candidate_pc_name') or '',
|
||
'候選PChome價格': float(item.get('candidate_pc_price') or 0),
|
||
'Match分數%': round(float(item.get('best_match_score') or 0) * 100, 1),
|
||
'候選數': int(item.get('candidate_count') or 0),
|
||
'單位價比較': unit_comparison.get('summary') or '',
|
||
'原始診斷': item.get('match_diagnostic') or '',
|
||
'嘗試時間': item.get('attempted_at') or '',
|
||
'MOMO商品URL': momo_url,
|
||
'PChome商品URL': pchome_url,
|
||
})
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
df = _sanitize_excel_dataframe(pd.DataFrame(export_rows))
|
||
df.to_excel(writer, index=False, sheet_name='PChome覆核隊列')
|
||
worksheet = writer.sheets['PChome覆核隊列']
|
||
for column_cells in worksheet.columns:
|
||
header = str(column_cells[0].value or '')
|
||
width = min(max(len(header) + 4, 12), 42)
|
||
if header in {
|
||
'MOMO商品名稱',
|
||
'候選PChome商品名稱',
|
||
'建議處置',
|
||
'決策信封ID',
|
||
'決策建議代碼',
|
||
'診斷原因',
|
||
'自動執行阻擋原因',
|
||
'決策證據摘要',
|
||
'單位價比較',
|
||
'原始診斷',
|
||
'MOMO商品URL',
|
||
'PChome商品URL',
|
||
}:
|
||
width = 52
|
||
worksheet.column_dimensions[column_cells[0].column_letter].width = width
|
||
worksheet.freeze_panes = 'A2'
|
||
|
||
output.seek(0)
|
||
status_label = status_filter or 'all'
|
||
filename = f"PChome比價覆核_{status_label}_{datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')}.xlsx"
|
||
sys_log.info(
|
||
f"[Web] [Export] PChome 覆核隊列匯出成功 | rows={len(export_rows)} status={status_label}"
|
||
)
|
||
return send_file(
|
||
output,
|
||
as_attachment=True,
|
||
download_name=filename,
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
)
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] PChome 覆核隊列匯出失敗 | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@export_bp.route('/api/export/excel/delisted')
|
||
@login_required
|
||
def export_excel_delisted():
|
||
"""匯出下架商品 Excel"""
|
||
db = DatabaseManager()
|
||
session = db.get_session()
|
||
try:
|
||
_, today_start = _get_consolidated_data()
|
||
today_delisted_query = session.query(Product).filter(
|
||
Product.status == 'INACTIVE',
|
||
Product.updated_at >= today_start # 保持台北時區
|
||
)
|
||
raw_items = today_delisted_query.all()
|
||
delisted_items = [{
|
||
'product': p,
|
||
'last_price': (session.query(PriceRecord).filter_by(product_id=p.id)
|
||
.order_by(desc(PriceRecord.timestamp)).first().price
|
||
if session.query(PriceRecord).filter_by(product_id=p.id).first() else 0)
|
||
} for p in raw_items]
|
||
|
||
exporter = Exporter()
|
||
file_path = exporter.generate_delisted_excel(delisted_items)
|
||
return send_file(file_path, as_attachment=True)
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] Excel 匯出失敗 (Delisted) | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
finally:
|
||
session.close()
|
||
|
||
|
||
@export_bp.route('/api/export/price_changes')
|
||
@login_required
|
||
def export_price_changes():
|
||
"""匯出今日價格異動明細 (支援篩選)"""
|
||
import openpyxl
|
||
from openpyxl.styles import Font, Alignment, PatternFill
|
||
|
||
filter_type = request.args.get('type', '')
|
||
filter_category = request.args.get('category', '')
|
||
|
||
try:
|
||
db = DatabaseManager()
|
||
session = db.get_session()
|
||
|
||
now_taipei = datetime.now(TAIPEI_TZ)
|
||
today_start = now_taipei.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
|
||
|
||
# 基礎查詢:取得所有商品的最新記錄
|
||
latest_records_subq = session.query(
|
||
func.max(PriceRecord.id).label('max_id')
|
||
).group_by(PriceRecord.product_id).subquery()
|
||
|
||
query = session.query(PriceRecord, Product).join(
|
||
latest_records_subq,
|
||
PriceRecord.id == latest_records_subq.c.max_id
|
||
).join(Product, PriceRecord.product_id == Product.id)
|
||
|
||
# 查詢所有商品的「今日之前最後價格」
|
||
product_ids = [r[0] for r in session.query(PriceRecord.product_id).join(
|
||
latest_records_subq, PriceRecord.id == latest_records_subq.c.max_id
|
||
).all()]
|
||
|
||
yesterday_prices_subq = session.query(
|
||
PriceRecord.product_id,
|
||
func.max(PriceRecord.id).label('max_id')
|
||
).filter(
|
||
PriceRecord.product_id.in_(product_ids),
|
||
PriceRecord.timestamp < today_start
|
||
).group_by(PriceRecord.product_id).subquery()
|
||
|
||
yesterday_prices_q = session.query(
|
||
PriceRecord.product_id, PriceRecord.price
|
||
).join(
|
||
yesterday_prices_subq,
|
||
PriceRecord.id == yesterday_prices_subq.c.max_id
|
||
)
|
||
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
|
||
|
||
products = []
|
||
|
||
# 根據 filter_type 篩選
|
||
if filter_type == 'increase':
|
||
for record, product in query.all():
|
||
old_price = yesterday_prices_map.get(product.id)
|
||
if old_price is not None and record.price > old_price:
|
||
products.append((product, record, old_price))
|
||
|
||
elif filter_type == 'decrease':
|
||
for record, product in query.all():
|
||
old_price = yesterday_prices_map.get(product.id)
|
||
if old_price is not None and record.price < old_price:
|
||
products.append((product, record, old_price))
|
||
|
||
elif filter_type == 'delisted':
|
||
today_delisted = session.query(Product).filter(
|
||
Product.status == 'INACTIVE',
|
||
Product.updated_at >= today_start
|
||
).all()
|
||
for product in today_delisted:
|
||
last_record = session.query(PriceRecord).filter(
|
||
PriceRecord.product_id == product.id
|
||
).order_by(PriceRecord.timestamp.desc()).first()
|
||
if last_record:
|
||
products.append((product, last_record, last_record.price))
|
||
|
||
elif filter_type == 'active':
|
||
for record, product in query.all():
|
||
old_price = yesterday_prices_map.get(product.id)
|
||
if old_price is not None and record.price != old_price:
|
||
products.append((product, record, old_price))
|
||
|
||
elif filter_type == 'category' and filter_category:
|
||
for record, product in query.filter(Product.category == filter_category).all():
|
||
old_price = yesterday_prices_map.get(product.id)
|
||
if old_price is not None and record.price != old_price:
|
||
products.append((product, record, old_price))
|
||
|
||
else:
|
||
# 預設:所有變動商品
|
||
for record, product in query.all():
|
||
old_price = yesterday_prices_map.get(product.id)
|
||
if old_price is not None and record.price != old_price:
|
||
products.append((product, record, old_price))
|
||
|
||
session.close()
|
||
|
||
if not products:
|
||
return "無符合條件的商品資料", 404
|
||
|
||
# 建立 Excel
|
||
wb = openpyxl.Workbook()
|
||
ws = wb.active
|
||
ws.title = "價格變動明細"
|
||
|
||
# 標題列
|
||
headers = ['商品ID', '商品名稱', '分類', '原價格', '現價格', '變動金額', '變動百分比', '更新時間', '商品網址']
|
||
ws.append(headers)
|
||
|
||
# 設定標題列樣式
|
||
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
|
||
header_font = Font(bold=True, color='FFFFFF')
|
||
for cell in ws[1]:
|
||
cell.fill = header_fill
|
||
cell.font = header_font
|
||
cell.alignment = Alignment(horizontal='center', vertical='center')
|
||
|
||
# 填充資料
|
||
for product, record, old_price in products:
|
||
change = record.price - old_price
|
||
change_pct = (change / old_price * 100) if old_price > 0 else 0
|
||
safe_product_url = normalize_momo_product_url(product.url, product.i_code) or build_momo_product_url(product.i_code)
|
||
ws.append([
|
||
product.i_code,
|
||
product.name,
|
||
product.category or '未分類',
|
||
old_price,
|
||
record.price,
|
||
change,
|
||
f"{change_pct:.2f}%",
|
||
record.timestamp.strftime('%Y-%m-%d %H:%M'),
|
||
safe_product_url
|
||
])
|
||
|
||
# 調整欄寬
|
||
ws.column_dimensions['A'].width = 12
|
||
ws.column_dimensions['B'].width = 40
|
||
ws.column_dimensions['C'].width = 15
|
||
ws.column_dimensions['D'].width = 12
|
||
ws.column_dimensions['E'].width = 12
|
||
ws.column_dimensions['F'].width = 12
|
||
ws.column_dimensions['G'].width = 12
|
||
ws.column_dimensions['H'].width = 18
|
||
ws.column_dimensions['I'].width = 50
|
||
|
||
# 儲存檔案
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
filename = f"價格變動明細_{filter_type or 'all'}_{timestamp}.xlsx"
|
||
filepath = os.path.join(EXCEL_EXPORT_DIR, filename)
|
||
|
||
os.makedirs(EXCEL_EXPORT_DIR, exist_ok=True)
|
||
wb.save(filepath)
|
||
|
||
return send_file(filepath, as_attachment=True, download_name=filename)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] 異動報表匯出失敗 | Type: {filter_type} | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
# ==========================================
|
||
# 其他匯出功能
|
||
# ==========================================
|
||
|
||
@export_bp.route('/api/export/low_prices')
|
||
@login_required
|
||
def export_low_prices():
|
||
"""匯出歷史低價商品"""
|
||
try:
|
||
exporter = Exporter()
|
||
file_path = exporter.generate_low_price_report()
|
||
|
||
if file_path and os.path.exists(file_path):
|
||
return send_file(file_path, as_attachment=True)
|
||
return "目前無歷史低價商品", 404
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] 低價報表匯出失敗 | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
@export_bp.route('/api/export/changes')
|
||
@login_required
|
||
def export_changes():
|
||
"""匯出篩選後的資料 (漲/跌/下架)"""
|
||
filter_type = request.args.get('type')
|
||
exporter = Exporter()
|
||
file_path = None
|
||
|
||
try:
|
||
unique_items, today_start = _get_consolidated_data()
|
||
|
||
if filter_type == 'increase':
|
||
target_items = [i for i in unique_items if i['yesterday_diff'] > 0]
|
||
file_path = exporter.generate_custom_report(target_items, "今日漲價商品")
|
||
elif filter_type == 'decrease':
|
||
target_items = [i for i in unique_items if i['yesterday_diff'] < 0]
|
||
file_path = exporter.generate_custom_report(target_items, "今日跌價商品")
|
||
elif filter_type == 'delisted':
|
||
db = DatabaseManager()
|
||
session = db.get_session()
|
||
try:
|
||
today_delisted_query = session.query(Product).filter(
|
||
Product.status == 'INACTIVE',
|
||
Product.updated_at >= today_start # 保持台北時區
|
||
)
|
||
raw_delisted_items = today_delisted_query.all()
|
||
|
||
delisted_items_with_price = []
|
||
for p in raw_delisted_items:
|
||
last_rec = session.query(PriceRecord).filter_by(product_id=p.id).order_by(
|
||
desc(PriceRecord.timestamp)).first()
|
||
price = last_rec.price if last_rec else 0
|
||
delisted_items_with_price.append({'product': p, 'last_price': price})
|
||
|
||
file_path = exporter.generate_delisted_report(delisted_items_with_price, "今日下架商品")
|
||
finally:
|
||
session.close()
|
||
|
||
if file_path and os.path.exists(file_path):
|
||
return send_file(file_path, as_attachment=True)
|
||
return "無資料可匯出", 404
|
||
except Exception as e:
|
||
sys_log.error(f"[Web] [Export] 篩選匯出失敗 | Type: {filter_type} | Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
@export_bp.route('/api/export/excel/abc')
|
||
@login_required
|
||
def export_abc_analysis():
|
||
"""匯出 ABC 分析報表 (Excel)"""
|
||
try:
|
||
table_name = 'realtime_sales_monthly'
|
||
_SALES_PROCESSED_CACHE = _get_sales_cache()
|
||
|
||
# 嘗試從快取讀取資料
|
||
df = None
|
||
cols_map = {}
|
||
|
||
if table_name in _SALES_PROCESSED_CACHE:
|
||
cache_data = _SALES_PROCESSED_CACHE[table_name]
|
||
df = cache_data['df']
|
||
cols_map = cache_data['cols']
|
||
else:
|
||
return "請先瀏覽「業績分析」頁面以載入資料與快取。", 400
|
||
|
||
# 恢復欄位變數
|
||
col_name = cols_map.get('name')
|
||
col_amount = cols_map.get('amount')
|
||
col_qty = cols_map.get('qty')
|
||
col_category = cols_map.get('category')
|
||
col_brand = cols_map.get('brand')
|
||
col_vendor = cols_map.get('vendor')
|
||
col_cost = cols_map.get('cost')
|
||
col_profit = cols_map.get('profit')
|
||
col_date = cols_map.get('date')
|
||
col_pid = cols_map.get('pid')
|
||
|
||
# 篩選資料
|
||
selected_category = request.args.get('category', 'all')
|
||
selected_brand = request.args.get('brand', 'all')
|
||
selected_vendor = request.args.get('vendor', 'all')
|
||
keyword = request.args.get('keyword', '').strip()
|
||
min_price = request.args.get('min_price', '')
|
||
max_price = request.args.get('max_price', '')
|
||
min_margin = request.args.get('min_margin', '')
|
||
max_margin = request.args.get('max_margin', '')
|
||
|
||
target_df = df.copy()
|
||
|
||
# 重新計算 Top N 分類
|
||
TOP_N_CATS = 12
|
||
top_cats_names = []
|
||
if col_category:
|
||
cat_group_all = df.groupby(col_category)[col_amount].sum().sort_values(ascending=False)
|
||
if len(cat_group_all) > TOP_N_CATS:
|
||
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
|
||
|
||
if selected_category != 'all' and col_category:
|
||
if selected_category == '其他' and top_cats_names:
|
||
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
|
||
else:
|
||
target_df = target_df[target_df[col_category] == selected_category]
|
||
|
||
if selected_brand != 'all' and col_brand:
|
||
target_df = target_df[target_df[col_brand] == selected_brand]
|
||
if selected_vendor != 'all' and col_vendor:
|
||
target_df = target_df[target_df[col_vendor] == selected_vendor]
|
||
if keyword:
|
||
target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
|
||
if min_margin:
|
||
target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
|
||
if max_margin:
|
||
target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
|
||
|
||
# 執行 ABC 分析與匯出
|
||
if col_amount and not target_df.empty:
|
||
agg_rules = {col_amount: 'sum'}
|
||
if col_qty:
|
||
agg_rules[col_qty] = 'sum'
|
||
if col_cost:
|
||
agg_rules[col_cost] = 'sum'
|
||
if col_profit:
|
||
agg_rules[col_profit] = 'sum'
|
||
if col_category:
|
||
agg_rules[col_category] = 'first'
|
||
if col_vendor:
|
||
agg_rules[col_vendor] = 'first'
|
||
if col_brand:
|
||
agg_rules[col_brand] = 'first'
|
||
if col_pid:
|
||
agg_rules[col_pid] = 'first'
|
||
|
||
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
|
||
|
||
# 計算毛利率
|
||
if col_profit:
|
||
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
|
||
elif col_cost:
|
||
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
|
||
else:
|
||
df_agg['calculated_margin_rate'] = 0.0
|
||
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
|
||
|
||
# 排序與 ABC 分類
|
||
target_df = df_agg.sort_values(by=col_amount, ascending=False)
|
||
target_df['cumulative_revenue'] = target_df[col_amount].cumsum()
|
||
total_revenue = target_df[col_amount].sum()
|
||
target_df['cumulative_pct'] = (target_df['cumulative_revenue'] / total_revenue) * 100
|
||
|
||
conditions = [(target_df['cumulative_pct'] <= 80), (target_df['cumulative_pct'] <= 95)]
|
||
choices = ['A', 'B']
|
||
target_df['ABC_Class'] = np.select(conditions, choices, default='C')
|
||
|
||
# 支援依類別篩選匯出
|
||
filter_class = request.args.get('class')
|
||
if filter_class:
|
||
target_df = target_df[target_df['ABC_Class'] == filter_class]
|
||
|
||
# 計算平均單價
|
||
if col_qty:
|
||
target_df['avg_unit_price'] = (target_df[col_amount] / target_df[col_qty]).fillna(0)
|
||
|
||
# 計算建議補貨量
|
||
if col_qty:
|
||
custom_factor = request.args.get('factor')
|
||
if custom_factor:
|
||
try:
|
||
factor = float(custom_factor)
|
||
target_df['suggested_restock'] = (target_df[col_qty] * factor).astype(int)
|
||
except:
|
||
conditions_restock = [(target_df['ABC_Class'] == 'A'), (target_df['ABC_Class'] == 'B')]
|
||
choices_restock = [target_df[col_qty] * 1.5, target_df[col_qty] * 1.2]
|
||
target_df['suggested_restock'] = np.select(conditions_restock, choices_restock, default=0).astype(int)
|
||
else:
|
||
conditions_restock = [(target_df['ABC_Class'] == 'A'), (target_df['ABC_Class'] == 'B')]
|
||
choices_restock = [target_df[col_qty] * 1.5, target_df[col_qty] * 1.2]
|
||
target_df['suggested_restock'] = np.select(conditions_restock, choices_restock, default=0).astype(int)
|
||
|
||
# 整理匯出欄位
|
||
export_cols = []
|
||
header_map = {}
|
||
if col_pid:
|
||
export_cols.append(col_pid)
|
||
header_map[col_pid] = '商品ID'
|
||
if col_name:
|
||
export_cols.append(col_name)
|
||
header_map[col_name] = '商品名稱'
|
||
if col_category:
|
||
export_cols.append(col_category)
|
||
header_map[col_category] = '分類'
|
||
if col_brand:
|
||
export_cols.append(col_brand)
|
||
header_map[col_brand] = '品牌'
|
||
if col_vendor:
|
||
export_cols.append(col_vendor)
|
||
header_map[col_vendor] = '廠商'
|
||
export_cols.append('ABC_Class')
|
||
header_map['ABC_Class'] = 'ABC分類'
|
||
if col_amount:
|
||
export_cols.append(col_amount)
|
||
header_map[col_amount] = '銷售金額'
|
||
if col_qty:
|
||
export_cols.append(col_qty)
|
||
header_map[col_qty] = '銷售數量'
|
||
if 'avg_unit_price' in target_df.columns:
|
||
export_cols.append('avg_unit_price')
|
||
header_map['avg_unit_price'] = '平均單價'
|
||
if col_cost:
|
||
export_cols.append(col_cost)
|
||
header_map[col_cost] = '成本'
|
||
if col_profit:
|
||
export_cols.append(col_profit)
|
||
header_map[col_profit] = '毛利'
|
||
if 'calculated_margin_rate' in target_df.columns:
|
||
export_cols.append('calculated_margin_rate')
|
||
header_map['calculated_margin_rate'] = '毛利率(%)'
|
||
if 'suggested_restock' in target_df.columns:
|
||
export_cols.append('suggested_restock')
|
||
header_map['suggested_restock'] = '建議補貨量'
|
||
|
||
export_df = target_df[export_cols].rename(columns=header_map)
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
export_df.to_excel(writer, index=False, sheet_name='ABC分析')
|
||
output.seek(0)
|
||
|
||
filename_prefix = f"ABC_Analysis_{filter_class}_" if filter_class else "ABC_Analysis_"
|
||
return send_file(
|
||
output,
|
||
as_attachment=True,
|
||
download_name=f"{filename_prefix}{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
)
|
||
|
||
return "無資料可匯出", 404
|
||
except Exception as e:
|
||
sys_log.error(f"ABC Export Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
@export_bp.route('/api/export/excel/vendor')
|
||
@login_required
|
||
def export_vendor_analysis():
|
||
"""匯出廠商獲利能力排行 (Excel)"""
|
||
try:
|
||
table_name = 'realtime_sales_monthly'
|
||
_SALES_PROCESSED_CACHE = _get_sales_cache()
|
||
|
||
# 嘗試從快取讀取資料
|
||
df = None
|
||
cols_map = {}
|
||
|
||
if table_name in _SALES_PROCESSED_CACHE:
|
||
cache_data = _SALES_PROCESSED_CACHE[table_name]
|
||
df = cache_data['df']
|
||
cols_map = cache_data['cols']
|
||
else:
|
||
params = {k: v for k, v in request.args.items()}
|
||
flash('資料快取已失效,請稍候重新載入資料後再匯出。', 'warning')
|
||
return redirect(url_for('sales.sales_analysis', **params))
|
||
|
||
col_vendor = cols_map.get('vendor')
|
||
col_amount = cols_map.get('amount')
|
||
col_profit = cols_map.get('profit')
|
||
col_cost = cols_map.get('cost')
|
||
|
||
if not col_vendor or not col_amount:
|
||
return "資料缺少必要欄位(廠商、銷售金額)", 400
|
||
|
||
# 按廠商聚合
|
||
agg_rules = {col_amount: 'sum'}
|
||
if col_profit:
|
||
agg_rules[col_profit] = 'sum'
|
||
if col_cost:
|
||
agg_rules[col_cost] = 'sum'
|
||
|
||
vendor_df = df.groupby(col_vendor).agg(agg_rules).reset_index()
|
||
|
||
# 計算毛利率
|
||
if col_profit:
|
||
vendor_df['margin_rate'] = (vendor_df[col_profit] / vendor_df[col_amount]) * 100
|
||
elif col_cost:
|
||
vendor_df['margin_rate'] = ((vendor_df[col_amount] - vendor_df[col_cost]) / vendor_df[col_amount]) * 100
|
||
else:
|
||
vendor_df['margin_rate'] = 0
|
||
|
||
vendor_df['margin_rate'] = vendor_df['margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
|
||
|
||
# 排序
|
||
vendor_df = vendor_df.sort_values(by=col_amount, ascending=False)
|
||
|
||
# 重命名欄位
|
||
rename_map = {col_vendor: '廠商', col_amount: '銷售金額', 'margin_rate': '毛利率(%)'}
|
||
if col_profit:
|
||
rename_map[col_profit] = '毛利'
|
||
if col_cost:
|
||
rename_map[col_cost] = '成本'
|
||
|
||
export_df = vendor_df.rename(columns=rename_map)
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
export_df.to_excel(writer, index=False, sheet_name='廠商分析')
|
||
output.seek(0)
|
||
|
||
return send_file(
|
||
output,
|
||
as_attachment=True,
|
||
download_name=f"Vendor_Analysis_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx",
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
)
|
||
|
||
except Exception as e:
|
||
sys_log.error(f"Vendor Export Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|
||
|
||
|
||
@export_bp.route('/api/export/excel/seasonality_detail')
|
||
@login_required
|
||
def export_seasonality_detail():
|
||
"""匯出淡旺季熱力圖的詳細資料。"""
|
||
try:
|
||
from services.cache_manager import _SALES_PROCESSED_CACHE
|
||
from routes.sales_routes import _get_filtered_sales_data
|
||
|
||
table_name = 'realtime_sales_monthly'
|
||
data_range_months = int(request.args.get('data_range', '1') or '1')
|
||
start_date = request.args.get('start_date', '')
|
||
end_date = request.args.get('end_date', '')
|
||
|
||
if start_date or end_date:
|
||
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
|
||
else:
|
||
cache_key = f"{table_name}_{data_range_months}m"
|
||
|
||
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
|
||
|
||
if err and table_name in _SALES_PROCESSED_CACHE:
|
||
target_df, cols_map, err = _get_filtered_sales_data(table_name)
|
||
|
||
if err:
|
||
return f"匯出失敗: {err}", 400
|
||
|
||
target_month = request.args.get('target_month')
|
||
target_category = request.args.get('target_category')
|
||
|
||
if not target_month or not target_category:
|
||
return "缺少必要參數 (month, category)", 400
|
||
|
||
col_category = cols_map.get('category')
|
||
if not col_category:
|
||
return "資料缺少分類欄位", 400
|
||
|
||
export_df = target_df[
|
||
(target_df['_month_str'] == target_month) &
|
||
(target_df[col_category] == target_category)
|
||
]
|
||
|
||
if export_df.empty:
|
||
return "該月份與分類無資料", 404
|
||
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
export_df.to_excel(writer, index=False, sheet_name='明細')
|
||
output.seek(0)
|
||
|
||
filename = f"Seasonality_{target_category}_{target_month}.xlsx"
|
||
return send_file(
|
||
output,
|
||
as_attachment=True,
|
||
download_name=filename,
|
||
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
)
|
||
except Exception as e:
|
||
sys_log.error(f"Seasonality Export Error: {e}")
|
||
return f"匯出失敗: {e}", 500
|