diff --git a/app.py b/app.py
index b31ece5..1a110e5 100644
--- a/app.py
+++ b/app.py
@@ -78,39 +78,8 @@ sys_log = SystemLogger("Web_Server").get_logger()
for _warn in validate_critical_config():
sys_log.warning(_warn)
-# 🚩 V-Opt: 全域資料快取 (用於加速業績分析)
-_SALES_DF_CACHE = {} # 已棄用,保留相容性
-_SALES_PROCESSED_CACHE = {} # V-Opt: 處理後資料快取
-_SALES_CACHE_MAX_ENTRIES = 10 # V-Opt (2026-01-23): 快取最大條目數
-_SALES_CACHE_TTL = 600 # V-Opt (2026-01-23): 快取有效期 10 分鐘
-def _cleanup_sales_cache():
- """清理過期和過多的快取條目"""
- global _SALES_PROCESSED_CACHE
- current_time = time.time()
-
- # 1. 清理過期條目
- expired_keys = [
- k for k, v in _SALES_PROCESSED_CACHE.items()
- if v.get('time') and current_time - v['time'] > _SALES_CACHE_TTL
- ]
- for k in expired_keys:
- del _SALES_PROCESSED_CACHE[k]
-
- # 2. 如果仍超過限制,刪除最舊的條目
- if len(_SALES_PROCESSED_CACHE) > _SALES_CACHE_MAX_ENTRIES:
- sorted_items = sorted(
- [(k, v.get('time', 0)) for k, v in _SALES_PROCESSED_CACHE.items()],
- key=lambda x: x[1]
- )
- # 保留最新的 _SALES_CACHE_MAX_ENTRIES 條
- keys_to_delete = [k for k, _ in sorted_items[:-_SALES_CACHE_MAX_ENTRIES]]
- for k in keys_to_delete:
- del _SALES_PROCESSED_CACHE[k]
-
- if expired_keys or len(_SALES_PROCESSED_CACHE) > _SALES_CACHE_MAX_ENTRIES - 2:
- sys_log.debug(f"[Cache] 清理快取: 移除 {len(expired_keys)} 條過期, 剩餘 {len(_SALES_PROCESSED_CACHE)} 條")
# 🚩 V-New: 商品看板資料快取 (用於加速首頁載入)
_DASHBOARD_DATA_CACHE = {
@@ -317,6 +286,10 @@ app.register_blueprint(system_bp)
csrf.exempt(system_bp) # n8n API 需要豁免 CSRF
sys_log.info("[Blueprint] ✅ 系統管理 Blueprint 已註冊 (CSRF 已豁免)")
+from routes.system_public_routes import system_public_bp
+app.register_blueprint(system_public_bp)
+sys_log.info("[Blueprint] ✅ 公開系統頁面 Blueprint 已註冊")
+
from routes.category_routes import category_bp
app.register_blueprint(category_bp)
sys_log.info("[Blueprint] ✅ 分類 CRUD Blueprint 已註冊")
@@ -650,516 +623,20 @@ def refresh_session():
if session.get('logged_in'):
session.modified = True # 標記 Session 已修改,觸發 Cookie 更新
-@app.route('/health')
-def health_check():
- """健康檢查端點 - 供 Nginx 和 Docker healthcheck 使用"""
- try:
- # 簡單檢查資料庫連線
- from config import DATABASE_TYPE
- return jsonify({
- 'status': 'healthy',
- 'database': DATABASE_TYPE,
- 'version': SYSTEM_VERSION
- }), 200
- except Exception as e:
- return jsonify({
- 'status': 'unhealthy',
- 'error': str(e)
- }), 500
-@app.route('/metrics')
-def prometheus_metrics():
- """Prometheus 指標端點 - 供 Prometheus 抓取監控資料"""
- try:
- from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Counter, Gauge, CollectorRegistry
- from config import DATABASE_TYPE
-
- # 建立獨立的 registry 以避免重複註冊
- registry = CollectorRegistry()
-
- # 應用程式資訊
- app_info = Gauge('momo_app_info', '應用程式資訊', ['version', 'database_type'], registry=registry)
- app_info.labels(version=SYSTEM_VERSION, database_type=DATABASE_TYPE).set(1)
-
- # 應用程式健康狀態 (1=健康, 0=不健康)
- app_health = Gauge('momo_app_health', '應用程式健康狀態', registry=registry)
-
- # 資料庫連線狀態
- db_status = Gauge('momo_database_up', '資料庫連線狀態', registry=registry)
-
- try:
- db = DatabaseManager()
- with db.engine.connect() as conn:
- conn.execute(text("SELECT 1"))
- db_status.set(1)
- app_health.set(1)
- except Exception:
- db_status.set(0)
- app_health.set(0)
-
- # 資料庫記錄數
- try:
- db = DatabaseManager()
- session = db.get_session()
-
- # 商品數量
- product_count = Gauge('momo_products_total', '商品總數', registry=registry)
- product_count.set(session.query(Product).count())
-
- # 價格記錄數量
- price_record_count = Gauge('momo_price_records_total', '價格記錄總數', registry=registry)
- price_record_count.set(session.query(PriceRecord).count())
-
- # 業績資料筆數
- from database.realtime_sales_models import RealtimeSalesMonthly
- sales_count = Gauge('momo_sales_records_total', '業績資料總數', registry=registry)
- sales_count.set(session.query(RealtimeSalesMonthly).count())
-
- session.close()
- except Exception as e:
- sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}")
-
- # 返回 Prometheus 格式
- from flask import Response
- return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST)
-
- except ImportError:
- # prometheus_client 未安裝時的備用方案
- metrics_text = """# HELP momo_app_health 應用程式健康狀態
-# TYPE momo_app_health gauge
-momo_app_health 1
-# HELP momo_app_info 應用程式資訊
-# TYPE momo_app_info gauge
-momo_app_info{version="9.4",database_type="postgresql"} 1
-"""
- from flask import Response
- return Response(metrics_text, mimetype='text/plain; charset=utf-8')
- except Exception as e:
- sys_log.error(f"[Metrics] 指標生成錯誤: {e}")
- from flask import Response
- return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500
-@app.route('/settings')
-def settings():
- """分類設定頁面"""
- categories = load_categories()
- return render_template('settings.html',
- categories=categories,
- public_url=public_url,
- system_version=SYSTEM_VERSION)
-
-@app.route('/system_settings')
-def system_settings_page():
- """系統設定與匯入頁面"""
- return render_template('system_settings.html', system_version=SYSTEM_VERSION)
-
-@app.route('/abc_analysis/detail')
-def abc_analysis_detail():
- """ABC 分析詳細報表頁面"""
- try:
- target_class = request.args.get('class', 'A') # 預設 A 類
- table_name = 'realtime_sales_monthly'
-
- # 1. 生成與主頁面一致的 cache_key
- data_range_months = int(request.args.get('data_range', '0') or '0')
- start_date = request.args.get('start_date', '')
- end_date = request.args.get('end_date', '')
-
- if start_date or end_date:
- cache_key = f"{table_name}_custom_{start_date}_{end_date}"
- else:
- cache_key = f"{table_name}_{data_range_months}m"
-
- # 2. 使用共用篩選函式取得資料
- target_df, cols_map, err = _get_filtered_sales_data(cache_key)
-
- # V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
- if err and table_name in _SALES_PROCESSED_CACHE:
- target_df, cols_map, err = _get_filtered_sales_data(table_name)
-
- if err:
- # V-Fix: 如果自動重載也失敗,則顯示稍後再試,並引導回主頁面
- return f'''
-
-
-
-
- 數據加載中 - WOOO TECH
-
-
-
-
-
-
數據準備中
-
正在自動重新加載數據,請稍後...
-
-
-
-
- ''', 200
-
- # 恢復欄位變數
- col_name = cols_map.get('name')
- col_amount = cols_map.get('amount')
- col_qty = cols_map.get('qty')
- col_category = cols_map.get('category')
- col_brand = cols_map.get('brand')
- col_vendor = cols_map.get('vendor')
- col_price = cols_map.get('price')
- col_cost = cols_map.get('cost')
- col_profit = cols_map.get('profit')
- col_date = cols_map.get('date')
- col_pid = cols_map.get('pid')
- # 3. 執行 ABC 分類
- items = []
- total_revenue = 0
- if col_amount and not target_df.empty:
- # V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
- agg_rules = {col_amount: 'sum'}
- if col_qty: agg_rules[col_qty] = 'sum'
- if col_cost: agg_rules[col_cost] = 'sum'
- if col_profit: agg_rules[col_profit] = 'sum'
- if col_category: agg_rules[col_category] = 'first'
- if col_vendor: agg_rules[col_vendor] = 'first'
- if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
- if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
- if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
- df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
- # 重新計算聚合後的毛利率
- if col_profit:
- df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
- elif col_cost:
- df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
- else:
- df_agg['calculated_margin_rate'] = 0.0
- df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
- # 執行 ABC 排序與計算
- df_agg = df_agg.sort_values(by=col_amount, ascending=False)
- df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
- total_revenue = df_agg[col_amount].sum()
- df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
-
- conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
- choices = ['A', 'B']
- df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
-
- # 4. 篩選特定類別
- class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
-
- # V-New: 計算平均單價與庫存建議
- if col_qty:
- class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
-
- # V-New: 處理動態補貨係數
- custom_factor = request.args.get('factor')
- current_factor = 0.0
-
- if custom_factor:
- try:
- current_factor = float(custom_factor)
- except:
- current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
- else:
- current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
-
- class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
-
- items = class_df.to_dict('records')
- # 準備標題與描述
- class_info = {
- 'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
- 'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
- 'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
- }
- info = class_info.get(target_class, {'title': f'{target_class} 類', 'desc': '', 'color': 'secondary'})
-
- # 計算 DataTables 預設排序欄位 (銷售金額) 的索引
- # 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
- sort_col_index = 1 # Rank
- if col_pid: sort_col_index += 1
- sort_col_index += 1 # Name
- if col_brand: sort_col_index += 1
- if col_vendor: sort_col_index += 1
- if col_category: sort_col_index += 1
- if col_cost or col_profit: sort_col_index += 1
- if col_qty: sort_col_index += 3
- # 此時 sort_col_index 即為 Amount 欄位的索引
-
- return render_template('abc_analysis_detail.html',
- items=items,
- info=info,
- target_class=target_class,
- current_factor=current_factor, # V-New: 傳遞當前係數
- total_revenue=total_revenue,
- sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
- cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
- 'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
- # 傳遞當前查詢參數以供匯出連結使用
- query_string=request.query_string.decode())
-
- except Exception as e:
- sys_log.error(f"ABC Detail Error: {e}")
- return f"系統錯誤: {e}"
-
-@app.route('/logs')
-def show_logs():
- return render_template('logs.html')
-
-@app.route('/api/logs')
-def get_logs_api():
- if os.path.exists(LOG_FILE_PATH):
- try:
- with open(LOG_FILE_PATH, 'r', encoding='utf-8') as f:
- return jsonify({"logs": "".join(f.readlines()[-60:])})
- except Exception as e:
- sys_log.error(f"[Web] [Logs] ❌ 日誌 API 讀取異常 | Error: {e}")
- return jsonify({"logs": "讀取日誌異常"})
- return jsonify({"logs": "等待系統啟動中..."})
-
-@app.route('/api/backup', methods=['POST'])
-@login_required
-def trigger_backup():
- """API: 觸發系統完整備份"""
- # Note: [功能] 尚未實作「系統還原」功能 (Restore),需評估安全性後加入
- try:
- sys_log.info("[System] [Backup] 💾 開始執行系統完整備份...")
- backup_dir = os.path.join(BASE_DIR, 'backups')
- if not os.path.exists(backup_dir):
- os.makedirs(backup_dir)
-
- timestamp = datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')
- zip_filename = f"momo_system_backup_{SYSTEM_VERSION}_{timestamp}.zip"
- zip_filepath = os.path.join(backup_dir, zip_filename)
-
- with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for root, dirs, files in os.walk(BASE_DIR):
- # 排除不必要的目錄
- dirs[:] = [d for d in dirs if d not in ['backups', '__pycache__', 'venv', '.git', '.idea', '.vscode', 'node_modules']]
-
- for file in files:
- if file == zip_filename: continue # 跳過正在寫入的檔案
- if file.endswith('.pyc') or file.endswith('.DS_Store'): continue
-
- file_path = os.path.join(root, file)
- arcname = os.path.relpath(file_path, BASE_DIR)
- zipf.write(file_path, arcname)
-
- sys_log.info(f"[System] [Backup] ✅ 系統備份完成 | File: {zip_filename}")
-
- # V-New: 回傳下載連結
- download_url = url_for('download_backup', filename=zip_filename)
-
- return jsonify({
- "status": "success",
- "message": f"備份成功!\n檔案已儲存為: {zip_filename}\n即將開始下載...",
- "download_url": download_url
- })
- except Exception as e:
- sys_log.error(f"[System] [Backup] ❌ 備份失敗 | Error: {e}")
- return jsonify({"status": "error", "message": str(e)}), 500
-
-@app.route('/api/backup/download/')
-@login_required
-def download_backup(filename):
- """
- API: 下載備份檔案(已加入路徑遍歷防護)
- """
- try:
- backup_dir = os.path.join(BASE_DIR, 'backups')
- # 使用 safe_join 驗證路徑,防止路徑遍歷攻擊
- safe_path = safe_join(backup_dir, filename)
-
- # 確保檔案存在
- if not safe_path.exists():
- sys_log.warning(f"[Security] 備份檔案不存在 | File: {filename}")
- return jsonify({'error': '檔案不存在'}), 404
-
- # 確保是檔案而非目錄
- if not safe_path.is_file():
- sys_log.warning(f"[Security] 嘗試下載非檔案路徑 | Path: {filename}")
- return jsonify({'error': '非法路徑'}), 400
-
- return send_from_directory(backup_dir, safe_path.name, as_attachment=True)
-
- except ValueError as e:
- # safe_join 偵測到路徑遍歷嘗試
- sys_log.error(f"[Security] 路徑遍歷攻擊嘗試被阻擋 | Filename: {filename} | Error: {e}")
- return jsonify({'error': '非法路徑'}), 400
- except Exception as e:
- sys_log.error(f"[System] 下載備份失敗 | Error: {e}")
- return jsonify({'error': '下載失敗'}), 500
# ================= 📊 V-New: 業績分析報表 =================
-def _get_filtered_sales_data(cache_key):
- """
- 🚩 共用函式:從快取讀取資料並根據 request.args 進行篩選
- 回傳: (target_df, cols_map, error_message)
- 參數: cache_key - 快取鍵值 (例如: "realtime_sales_monthly_3m")
- """
- db = DatabaseManager()
-
- # 1. 檢查資料表與快取
- df = None
- cols_map = {}
-
- if cache_key in _SALES_PROCESSED_CACHE:
- cache_data = _SALES_PROCESSED_CACHE[cache_key]
- df = cache_data['df']
- cols_map = cache_data['cols']
- else:
- # 快取不存在時,直接回傳錯誤讓呼叫端顯示 spinner 導回 sales_analysis
- # 不在此發起全表 DB 查詢(748k 行會 hang Gunicorn worker)
- sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),回傳錯誤讓 UI 導回 sales_analysis")
- return None, {}, f"快取未就緒,請先從業績分析主頁載入資料 (cache_key={cache_key})"
-
- if False: # 保留舊冷快取重載邏輯(已停用,避免全表掃描 hang)
- sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),試圖重新從資料庫載入...")
- try:
- # V-Fix: 從 cache_key 提取 table_name
- # 格式: realtime_sales_monthly_3m 或 realtime_sales_monthly_custom_2025-01-01_2025-01-31
- if "_custom_" in cache_key:
- table_name = cache_key.split('_custom_')[0] # realtime_sales_monthly
- else:
- # 移除最後的 _Xm 部分
- parts = cache_key.rsplit('_', 1)
- table_name = parts[0] if len(parts) > 1 else 'realtime_sales_monthly'
-
- # 判斷是自訂區間還是標配區間
- if "_custom_" in cache_key:
- # 格式: realtime_sales_monthly_custom_2025-01-01_2025-01-31
- parts = cache_key.split('_custom_')
- dates = parts[1].split('_')
- start_d, end_d = dates[0], dates[1]
- # 呼叫資料庫讀取 (不傳入 view, 會自動處理欄位映射)
- result_df, result_cols = db.get_sales_data(table_name=table_name, start_date=start_d, end_date=end_d)
- else:
- # 格式: realtime_sales_monthly_1m;months=0 表示全時段但上限 12 個月避免全表掃描 hang
- months = int(cache_key.split('_')[-1].replace('m', '') or '12')
- if months == 0:
- months = 12
- result_df, result_cols = db.get_sales_data(table_name=table_name, months=months)
-
- if result_df is not None and not result_df.empty:
- # V-Fix (2026-01-23): 補回所有日期維度欄位供後續篩選 (_dow, _hour, _month_str)
- if '日期' in result_df.columns:
- # 先轉換為 datetime
- result_df['_parsed_date'] = pd.to_datetime(result_df['日期'], errors='coerce')
- result_df['_month_str'] = result_df['_parsed_date'].dt.strftime('%Y-%m')
- result_df['_dow'] = result_df['_parsed_date'].dt.dayofweek
-
- # 小時需要從「時間」欄位提取
- if '時間' in result_df.columns:
- result_df['_hour'] = pd.to_datetime(result_df['時間'], format='%H:%M:%S', errors='coerce').dt.hour
- else:
- result_df['_hour'] = 0 # 如果沒有時間欄位,預設為 0
-
- # 清理臨時欄位
- result_df.drop(columns=['_parsed_date'], inplace=True, errors='ignore')
-
- # 自動存入快取
- _SALES_PROCESSED_CACHE[cache_key] = {'df': result_df, 'cols': result_cols, 'time': time.time()}
- df = result_df
- cols_map = result_cols
- sys_log.info(f"[Sales Analysis] ✅ 快取成功自動重載 | 筆數: {len(df)}")
- else:
- return None, None, "資料庫無可用資料,請確認匯入狀態"
- except Exception as ex:
- sys_log.error(f"[Sales Analysis] 🚨 自動重載失敗: {ex}")
- return None, None, f"快取失效且無法重載: {ex}"
-
- # 恢復欄位變數
- col_name = cols_map.get('name')
- col_category = cols_map.get('category')
- col_brand = cols_map.get('brand')
- col_vendor = cols_map.get('vendor')
- col_activity = cols_map.get('activity')
- col_payment = cols_map.get('payment')
- col_price = cols_map.get('price')
- col_date = cols_map.get('date')
- col_return_qty = cols_map.get('return_qty') # V-New: 取得退貨欄位
-
- # 2. 取得篩選參數
- selected_category = request.args.get('category', 'all')
- selected_brand = request.args.get('brand', 'all')
- selected_vendor = request.args.get('vendor', 'all')
- selected_activity = request.args.get('activity', 'all')
- selected_payment = request.args.get('payment', 'all')
- selected_dow = request.args.get('dow', 'all')
- selected_hour = request.args.get('hour', 'all')
- selected_month = request.args.get('month', 'all')
- keyword = request.args.get('keyword', '').strip()
- min_price = request.args.get('min_price', '')
- max_price = request.args.get('max_price', '')
- min_margin = request.args.get('min_margin', '')
- max_margin = request.args.get('max_margin', '')
-
- # 3. 執行篩選
- target_df = df
-
- # Top N 分類處理 (用於 '其他' 篩選)
- TOP_N_CATS = 12
- top_cats_names = []
- if col_category:
- # 注意:這裡為了效能,簡單重算一次 Top N,或可考慮也快取起來
- cat_group_all = df.groupby(col_category)[cols_map.get('amount')].sum().sort_values(ascending=False)
- if len(cat_group_all) > TOP_N_CATS:
- top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
-
- if selected_category != 'all' and col_category:
- if selected_category == '其他' and top_cats_names:
- target_df = target_df[~target_df[col_category].isin(top_cats_names)]
- else:
- target_df = target_df[target_df[col_category] == selected_category]
-
- if selected_brand != 'all' and col_brand: target_df = target_df[target_df[col_brand] == selected_brand]
- if selected_vendor != 'all' and col_vendor: target_df = target_df[target_df[col_vendor] == selected_vendor]
- if selected_activity != 'all' and col_activity: target_df = target_df[target_df[col_activity] == selected_activity]
- if selected_payment != 'all' and col_payment: target_df = target_df[target_df[col_payment] == selected_payment]
-
- if selected_dow != 'all' and col_date: target_df = target_df[target_df['_dow'] == int(selected_dow)]
- if selected_hour != 'all' and col_date: target_df = target_df[target_df['_hour'] == int(selected_hour)]
- if selected_month != 'all' and col_date: target_df = target_df[target_df['_month_str'] == selected_month]
-
- if keyword: target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
-
- if col_price:
- if min_price: target_df = target_df[target_df[col_price] >= float(min_price)]
- if max_price: target_df = target_df[target_df[col_price] <= float(max_price)]
-
- if min_margin: target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
- if max_margin: target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
-
- return target_df, cols_map, None
# V-Opt: API 層級快取 (減少重複查詢)
@@ -1451,98 +928,6 @@ def prepare_category_summary(df, date_str=None, is_month_view=False, month_start
return category_df.to_dict('records')
# V-New 2026-01-15: 行銷活動業績聚合函數
-def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'):
- """
- 準備行銷活動業績貢獻數據
- 支援單日模式和月度模式,並可指定排序維度 (revenue, qty, profit)
- """
- # 決定使用的數據範圍
- if is_month_view and month_start is not None and month_end is not None:
- target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
- elif selected_date is not None:
- target_df = df[df['snapshot_date'] == selected_date]
- else:
- target_df = df
-
- if target_df.empty:
- return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
-
- cols = target_df.columns.tolist()
- col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
- col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty'])
- col_profit = find_col(cols, ['毛利', 'Profit', '利潤'])
- col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
-
- if not col_amount:
- return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
-
- # 定義四種行銷活動欄位
- marketing_cols = {
- 'coupon': '折價券活動名稱', # 折價券活動
- 'discount': '折扣活動名稱', # 折扣活動
- 'bonus': '滿額再折扣活動名稱', # 滿額再折扣
- 'click': '點我再折扣' # 點我再折扣
- }
-
- result = {}
-
- # 確保 sort_by 欄位存在,否則退回 revenue
- actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue'
-
- for key, col_name in marketing_cols.items():
- if col_name not in cols:
- result[key] = []
- continue
-
- # 篩選有該行銷活動的記錄
- activity_df = target_df[
- (target_df[col_name].notna()) &
- (target_df[col_name] != '') &
- (target_df[col_name] != '0') &
- (target_df[col_name] != 0)
- ]
-
- if activity_df.empty:
- result[key] = []
- continue
-
- # 聚合計算
- agg_args = {
- 'revenue': (col_amount, 'sum'),
- 'order_count': (col_amount, 'count')
- }
- if col_qty: agg_args['qty'] = (col_qty, 'sum')
- if col_profit: agg_args['profit'] = (col_profit, 'sum')
-
- grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index()
-
- # 若需要手動計算毛利 (金額 - 成本)
- if 'profit' not in agg_args and col_cost:
- cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index()
- grouped = grouped.merge(cost_agg, on=col_name)
- grouped['profit'] = grouped['revenue'] - grouped[col_cost]
-
- grouped = grouped.rename(columns={col_name: 'name'})
-
- # 動態排序
- sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue'
- grouped = grouped.sort_values(sort_col, ascending=False).head(15)
-
- # 轉為字典列表
- records = []
- for _, row in grouped.iterrows():
- record = {
- 'name': str(row['name'])[:50],
- 'revenue': float(row['revenue']),
- 'order_count': int(row['order_count'])
- }
- if 'qty' in row: record['qty'] = float(row['qty'])
- if 'profit' in row: record['profit'] = float(row['profit'])
- records.append(record)
-
- result[key] = records
-
- return result
def get_taiwan_holiday(date):
diff --git a/routes/crawler_management_routes.py b/routes/crawler_management_routes.py
index bb9617e..380ac0b 100644
--- a/routes/crawler_management_routes.py
+++ b/routes/crawler_management_routes.py
@@ -24,7 +24,7 @@ CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'data', 'crawler_config.js
@crawler_bp.route('/crawler_management')
def crawler_management_page():
"""爬蟲管理頁面 - 重定向到整合後的 settings 頁面"""
- return redirect(url_for('settings'))
+ return redirect(url_for('system_public.settings'))
@crawler_bp.route('/api/crawlers', methods=['GET'])
def get_crawlers():
diff --git a/routes/sales_routes.py b/routes/sales_routes.py
index 3969847..a6d6d25 100644
--- a/routes/sales_routes.py
+++ b/routes/sales_routes.py
@@ -1413,6 +1413,180 @@ def growth_analysis():
return f"系統錯誤: {e}"
+@sales_bp.route('/abc_analysis/detail')
+def abc_analysis_detail():
+ """ABC 分析詳細報表頁面"""
+ try:
+ target_class = request.args.get('class', 'A') # 預設 A 類
+ table_name = 'realtime_sales_monthly'
+
+ # 1. 生成與主頁面一致的 cache_key
+ data_range_months = int(request.args.get('data_range', '0') or '0')
+ start_date = request.args.get('start_date', '')
+ end_date = request.args.get('end_date', '')
+
+ if start_date or end_date:
+ cache_key = f"{table_name}_custom_{start_date}_{end_date}"
+ else:
+ cache_key = f"{table_name}_{data_range_months}m"
+
+ # 2. 使用共用篩選函式取得資料
+ target_df, cols_map, err = _get_filtered_sales_data(cache_key)
+
+ # V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
+ if err and table_name in _SALES_PROCESSED_CACHE:
+ target_df, cols_map, err = _get_filtered_sales_data(table_name)
+
+ if err:
+ # V-Fix: 如果自動重載也失敗,則顯示稍後再試,並引導回主頁面
+ return f'''
+
+
+
+
+ 數據加載中 - WOOO TECH
+
+
+
+
+
+
數據準備中
+
正在自動重新加載數據,請稍後...
+
+
+
+
+ ''', 200
+
+ # 恢復欄位變數
+ col_name = cols_map.get('name')
+ col_amount = cols_map.get('amount')
+ col_qty = cols_map.get('qty')
+ col_category = cols_map.get('category')
+ col_brand = cols_map.get('brand')
+ col_vendor = cols_map.get('vendor')
+ col_price = cols_map.get('price')
+ col_cost = cols_map.get('cost')
+ col_profit = cols_map.get('profit')
+ col_date = cols_map.get('date')
+ col_pid = cols_map.get('pid')
+
+
+ # 3. 執行 ABC 分類
+ items = []
+ total_revenue = 0
+ if col_amount and not target_df.empty:
+ # V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
+ agg_rules = {col_amount: 'sum'}
+ if col_qty: agg_rules[col_qty] = 'sum'
+ if col_cost: agg_rules[col_cost] = 'sum'
+ if col_profit: agg_rules[col_profit] = 'sum'
+ if col_category: agg_rules[col_category] = 'first'
+ if col_vendor: agg_rules[col_vendor] = 'first'
+ if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
+ if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
+ if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
+
+ df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
+
+ # 重新計算聚合後的毛利率
+ if col_profit:
+ df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
+ elif col_cost:
+ df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
+ else:
+ df_agg['calculated_margin_rate'] = 0.0
+ df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
+
+ # 執行 ABC 排序與計算
+ df_agg = df_agg.sort_values(by=col_amount, ascending=False)
+ df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
+ total_revenue = df_agg[col_amount].sum()
+ df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
+
+ conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
+ choices = ['A', 'B']
+ df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
+
+ # 4. 篩選特定類別
+ class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
+
+ # V-New: 計算平均單價與庫存建議
+ if col_qty:
+ class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
+
+ # V-New: 處理動態補貨係數
+ custom_factor = request.args.get('factor')
+ current_factor = 0.0
+
+ if custom_factor:
+ try:
+ current_factor = float(custom_factor)
+ except:
+ current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
+ else:
+ current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
+
+ class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
+
+ items = class_df.to_dict('records')
+
+ # 準備標題與描述
+ class_info = {
+ 'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
+ 'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
+ 'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
+ }
+ info = class_info.get(target_class, {'title': f'{target_class} 類', 'desc': '', 'color': 'secondary'})
+
+ # 計算 DataTables 預設排序欄位 (銷售金額) 的索引
+ # 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
+ sort_col_index = 1 # Rank
+ if col_pid: sort_col_index += 1
+ sort_col_index += 1 # Name
+ if col_brand: sort_col_index += 1
+ if col_vendor: sort_col_index += 1
+ if col_category: sort_col_index += 1
+ if col_cost or col_profit: sort_col_index += 1
+ if col_qty: sort_col_index += 3
+ # 此時 sort_col_index 即為 Amount 欄位的索引
+
+ return render_template('abc_analysis_detail.html',
+ items=items,
+ info=info,
+ target_class=target_class,
+ current_factor=current_factor, # V-New: 傳遞當前係數
+ total_revenue=total_revenue,
+ sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
+ cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
+ 'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
+ # 傳遞當前查詢參數以供匯出連結使用
+ query_string=request.query_string.decode())
+
+ except Exception as e:
+ sys_log.error(f"ABC Detail Error: {e}")
+ return f"系統錯誤: {e}"
+
# ==========================================
# API 路由
# ==========================================
diff --git a/routes/system_public_routes.py b/routes/system_public_routes.py
new file mode 100644
index 0000000..294be92
--- /dev/null
+++ b/routes/system_public_routes.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""公開系統頁面與健康檢查路由。
+
+此 blueprint 無 url_prefix,保留外部監控與既有前端使用的公開 URL。
+"""
+
+import os
+import zipfile
+from datetime import datetime, timezone, timedelta
+
+from flask import Blueprint, Response, jsonify, render_template, send_from_directory, url_for
+from sqlalchemy import text
+
+from auth import login_required
+from config import BASE_DIR, DATABASE_TYPE, SYSTEM_VERSION
+from database.manager import DatabaseManager
+from database.models import Product, PriceRecord
+from services.json_storage import load_categories
+from services.logger_manager import SystemLogger
+from utils.security import safe_join
+
+
+system_public_bp = Blueprint('system_public', __name__)
+
+sys_log = SystemLogger("SystemPublicRoutes").get_logger()
+TAIPEI_TZ = timezone(timedelta(hours=8))
+LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
+public_url = os.getenv('PUBLIC_URL', '服務啟動中...')
+
+
+@system_public_bp.route('/health')
+def health_check():
+ """健康檢查端點 - 供 Nginx 和 Docker healthcheck 使用"""
+ try:
+ return jsonify({
+ 'status': 'healthy',
+ 'database': DATABASE_TYPE,
+ 'version': SYSTEM_VERSION
+ }), 200
+ except Exception as e:
+ return jsonify({
+ 'status': 'unhealthy',
+ 'error': str(e)
+ }), 500
+
+
+@system_public_bp.route('/metrics')
+def prometheus_metrics():
+ """Prometheus 指標端點 - 供 Prometheus 抓取監控資料"""
+ try:
+ from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Gauge, CollectorRegistry
+
+ registry = CollectorRegistry()
+
+ app_info = Gauge('momo_app_info', '應用程式資訊', ['version', 'database_type'], registry=registry)
+ app_info.labels(version=SYSTEM_VERSION, database_type=DATABASE_TYPE).set(1)
+
+ app_health = Gauge('momo_app_health', '應用程式健康狀態', registry=registry)
+ db_status = Gauge('momo_database_up', '資料庫連線狀態', registry=registry)
+
+ try:
+ db = DatabaseManager()
+ with db.engine.connect() as conn:
+ conn.execute(text("SELECT 1"))
+ db_status.set(1)
+ app_health.set(1)
+ except Exception:
+ db_status.set(0)
+ app_health.set(0)
+
+ try:
+ db = DatabaseManager()
+ session = db.get_session()
+
+ product_count = Gauge('momo_products_total', '商品總數', registry=registry)
+ product_count.set(session.query(Product).count())
+
+ price_record_count = Gauge('momo_price_records_total', '價格記錄總數', registry=registry)
+ price_record_count.set(session.query(PriceRecord).count())
+
+ from database.realtime_sales_models import RealtimeSalesMonthly
+ sales_count = Gauge('momo_sales_records_total', '業績資料總數', registry=registry)
+ sales_count.set(session.query(RealtimeSalesMonthly).count())
+
+ session.close()
+ except Exception as e:
+ sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}")
+
+ return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST)
+
+ except ImportError:
+ metrics_text = f"""# HELP momo_app_health 應用程式健康狀態
+# TYPE momo_app_health gauge
+momo_app_health 1
+# HELP momo_app_info 應用程式資訊
+# TYPE momo_app_info gauge
+momo_app_info{{version="{SYSTEM_VERSION}",database_type="{DATABASE_TYPE}"}} 1
+"""
+ return Response(metrics_text, mimetype='text/plain; charset=utf-8')
+ except Exception as e:
+ sys_log.error(f"[Metrics] 指標生成錯誤: {e}")
+ return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500
+
+
+@system_public_bp.route('/settings')
+def settings():
+ """分類設定頁面"""
+ categories = load_categories()
+ return render_template('settings.html',
+ categories=categories,
+ public_url=public_url,
+ system_version=SYSTEM_VERSION)
+
+
+@system_public_bp.route('/system_settings')
+def system_settings_page():
+ """系統設定與匯入頁面"""
+ return render_template('system_settings.html', system_version=SYSTEM_VERSION)
+
+
+@system_public_bp.route('/logs')
+def show_logs():
+ return render_template('logs.html')
+
+
+@system_public_bp.route('/api/logs')
+def get_logs_api():
+ if os.path.exists(LOG_FILE_PATH):
+ try:
+ with open(LOG_FILE_PATH, 'r', encoding='utf-8') as f:
+ return jsonify({"logs": "".join(f.readlines()[-60:])})
+ except Exception as e:
+ sys_log.error(f"[Web] [Logs] ❌ 日誌 API 讀取異常 | Error: {e}")
+ return jsonify({"logs": "讀取日誌異常"})
+ return jsonify({"logs": "等待系統啟動中..."})
+
+
+@system_public_bp.route('/api/backup', methods=['POST'])
+@login_required
+def trigger_backup():
+ """API: 觸發系統完整備份"""
+ try:
+ sys_log.info("[System] [Backup] 💾 開始執行系統完整備份...")
+ backup_dir = os.path.join(BASE_DIR, 'backups')
+ if not os.path.exists(backup_dir):
+ os.makedirs(backup_dir)
+
+ timestamp = datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')
+ zip_filename = f"momo_system_backup_{SYSTEM_VERSION}_{timestamp}.zip"
+ zip_filepath = os.path.join(backup_dir, zip_filename)
+
+ with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
+ for root, dirs, files in os.walk(BASE_DIR):
+ dirs[:] = [d for d in dirs if d not in ['backups', '__pycache__', 'venv', '.git', '.idea', '.vscode', 'node_modules']]
+
+ for file in files:
+ if file == zip_filename:
+ continue
+ if file.endswith('.pyc') or file.endswith('.DS_Store'):
+ continue
+
+ file_path = os.path.join(root, file)
+ arcname = os.path.relpath(file_path, BASE_DIR)
+ zipf.write(file_path, arcname)
+
+ sys_log.info(f"[System] [Backup] ✅ 系統備份完成 | File: {zip_filename}")
+
+ download_url = url_for('system_public.download_backup', filename=zip_filename)
+
+ return jsonify({
+ "status": "success",
+ "message": f"備份成功!\n檔案已儲存為: {zip_filename}\n即將開始下載...",
+ "download_url": download_url
+ })
+ except Exception as e:
+ sys_log.error(f"[System] [Backup] ❌ 備份失敗 | Error: {e}")
+ return jsonify({"status": "error", "message": str(e)}), 500
+
+
+@system_public_bp.route('/api/backup/download/')
+@login_required
+def download_backup(filename):
+ """API: 下載備份檔案(已加入路徑遍歷防護)"""
+ try:
+ backup_dir = os.path.join(BASE_DIR, 'backups')
+ safe_path = safe_join(backup_dir, filename)
+
+ if not safe_path.exists():
+ sys_log.warning(f"[Security] 備份檔案不存在 | File: {filename}")
+ return jsonify({'error': '檔案不存在'}), 404
+
+ if not safe_path.is_file():
+ sys_log.warning(f"[Security] 嘗試下載非檔案路徑 | Path: {filename}")
+ return jsonify({'error': '非法路徑'}), 400
+
+ return send_from_directory(backup_dir, safe_path.name, as_attachment=True)
+
+ except ValueError as e:
+ sys_log.error(f"[Security] 路徑遍歷攻擊嘗試被阻擋 | Filename: {filename} | Error: {e}")
+ return jsonify({'error': '非法路徑'}), 400
+ except Exception as e:
+ sys_log.error(f"[System] 下載備份失敗 | Error: {e}")
+ return jsonify({'error': '下載失敗'}), 500