refactor(routes): 遷移公開系統與 ABC 路由

ADR-017 Phase 3f-1 system sprint;新增無 prefix system_public_bp,保留公開 URL 與 backup CSRF;ABC detail 併入 sales_bp。
This commit is contained in:
OoO
2026-04-29 21:22:29 +08:00
parent 567f138b2d
commit d51d8031f5
4 changed files with 383 additions and 620 deletions

623
app.py
View File

@@ -78,39 +78,8 @@ sys_log = SystemLogger("Web_Server").get_logger()
for _warn in validate_critical_config():
sys_log.warning(_warn)
# 🚩 V-Opt: 全域資料快取 (用於加速業績分析)
_SALES_DF_CACHE = {} # 已棄用,保留相容性
_SALES_PROCESSED_CACHE = {} # V-Opt: 處理後資料快取
_SALES_CACHE_MAX_ENTRIES = 10 # V-Opt (2026-01-23): 快取最大條目數
_SALES_CACHE_TTL = 600 # V-Opt (2026-01-23): 快取有效期 10 分鐘
def _cleanup_sales_cache():
"""清理過期和過多的快取條目"""
global _SALES_PROCESSED_CACHE
current_time = time.time()
# 1. 清理過期條目
expired_keys = [
k for k, v in _SALES_PROCESSED_CACHE.items()
if v.get('time') and current_time - v['time'] > _SALES_CACHE_TTL
]
for k in expired_keys:
del _SALES_PROCESSED_CACHE[k]
# 2. 如果仍超過限制,刪除最舊的條目
if len(_SALES_PROCESSED_CACHE) > _SALES_CACHE_MAX_ENTRIES:
sorted_items = sorted(
[(k, v.get('time', 0)) for k, v in _SALES_PROCESSED_CACHE.items()],
key=lambda x: x[1]
)
# 保留最新的 _SALES_CACHE_MAX_ENTRIES 條
keys_to_delete = [k for k, _ in sorted_items[:-_SALES_CACHE_MAX_ENTRIES]]
for k in keys_to_delete:
del _SALES_PROCESSED_CACHE[k]
if expired_keys or len(_SALES_PROCESSED_CACHE) > _SALES_CACHE_MAX_ENTRIES - 2:
sys_log.debug(f"[Cache] 清理快取: 移除 {len(expired_keys)} 條過期, 剩餘 {len(_SALES_PROCESSED_CACHE)}")
# 🚩 V-New: 商品看板資料快取 (用於加速首頁載入)
_DASHBOARD_DATA_CACHE = {
@@ -317,6 +286,10 @@ app.register_blueprint(system_bp)
csrf.exempt(system_bp) # n8n API 需要豁免 CSRF
sys_log.info("[Blueprint] ✅ 系統管理 Blueprint 已註冊 (CSRF 已豁免)")
from routes.system_public_routes import system_public_bp
app.register_blueprint(system_public_bp)
sys_log.info("[Blueprint] ✅ 公開系統頁面 Blueprint 已註冊")
from routes.category_routes import category_bp
app.register_blueprint(category_bp)
sys_log.info("[Blueprint] ✅ 分類 CRUD Blueprint 已註冊")
@@ -650,516 +623,20 @@ def refresh_session():
if session.get('logged_in'):
session.modified = True # 標記 Session 已修改,觸發 Cookie 更新
@app.route('/health')
def health_check():
"""健康檢查端點 - 供 Nginx 和 Docker healthcheck 使用"""
try:
# 簡單檢查資料庫連線
from config import DATABASE_TYPE
return jsonify({
'status': 'healthy',
'database': DATABASE_TYPE,
'version': SYSTEM_VERSION
}), 200
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e)
}), 500
@app.route('/metrics')
def prometheus_metrics():
"""Prometheus 指標端點 - 供 Prometheus 抓取監控資料"""
try:
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Counter, Gauge, CollectorRegistry
from config import DATABASE_TYPE
# 建立獨立的 registry 以避免重複註冊
registry = CollectorRegistry()
# 應用程式資訊
app_info = Gauge('momo_app_info', '應用程式資訊', ['version', 'database_type'], registry=registry)
app_info.labels(version=SYSTEM_VERSION, database_type=DATABASE_TYPE).set(1)
# 應用程式健康狀態 (1=健康, 0=不健康)
app_health = Gauge('momo_app_health', '應用程式健康狀態', registry=registry)
# 資料庫連線狀態
db_status = Gauge('momo_database_up', '資料庫連線狀態', registry=registry)
try:
db = DatabaseManager()
with db.engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status.set(1)
app_health.set(1)
except Exception:
db_status.set(0)
app_health.set(0)
# 資料庫記錄數
try:
db = DatabaseManager()
session = db.get_session()
# 商品數量
product_count = Gauge('momo_products_total', '商品總數', registry=registry)
product_count.set(session.query(Product).count())
# 價格記錄數量
price_record_count = Gauge('momo_price_records_total', '價格記錄總數', registry=registry)
price_record_count.set(session.query(PriceRecord).count())
# 業績資料筆數
from database.realtime_sales_models import RealtimeSalesMonthly
sales_count = Gauge('momo_sales_records_total', '業績資料總數', registry=registry)
sales_count.set(session.query(RealtimeSalesMonthly).count())
session.close()
except Exception as e:
sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}")
# 返回 Prometheus 格式
from flask import Response
return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST)
except ImportError:
# prometheus_client 未安裝時的備用方案
metrics_text = """# HELP momo_app_health 應用程式健康狀態
# TYPE momo_app_health gauge
momo_app_health 1
# HELP momo_app_info 應用程式資訊
# TYPE momo_app_info gauge
momo_app_info{version="9.4",database_type="postgresql"} 1
"""
from flask import Response
return Response(metrics_text, mimetype='text/plain; charset=utf-8')
except Exception as e:
sys_log.error(f"[Metrics] 指標生成錯誤: {e}")
from flask import Response
return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500
@app.route('/settings')
def settings():
"""分類設定頁面"""
categories = load_categories()
return render_template('settings.html',
categories=categories,
public_url=public_url,
system_version=SYSTEM_VERSION)
@app.route('/system_settings')
def system_settings_page():
"""系統設定與匯入頁面"""
return render_template('system_settings.html', system_version=SYSTEM_VERSION)
@app.route('/abc_analysis/detail')
def abc_analysis_detail():
"""ABC 分析詳細報表頁面"""
try:
target_class = request.args.get('class', 'A') # 預設 A 類
table_name = 'realtime_sales_monthly'
# 1. 生成與主頁面一致的 cache_key
data_range_months = int(request.args.get('data_range', '0') or '0')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 2. 使用共用篩選函式取得資料
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
# V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
if err and table_name in _SALES_PROCESSED_CACHE:
target_df, cols_map, err = _get_filtered_sales_data(table_name)
if err:
# V-Fix: 如果自動重載也失敗,則顯示稍後再試,並引導回主頁面
return f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>數據加載中 - WOOO TECH</title>
<style>
body {{ font-family: sans-serif; display: flex; align-items: center; justify-content: center; height: 100vh; margin: 0; background: #f8f9fa; }}
.card {{ background: white; padding: 2rem; border-radius: 12px; box-shadow: 0 4px 20px rgba(0,0,0,0.08); text-align: center; }}
.spinner {{ border: 3px solid #f3f3f3; border-top: 3px solid #1e3c72; border-radius: 50%; width: 30px; height: 30px; animation: spin 1s linear infinite; margin: 0 auto 1rem; }}
@keyframes spin {{ 0% {{ transform: rotate(0deg); }} 100% {{ transform: rotate(360deg); }} }}
</style>
</head>
<body>
<div class="card">
<div class="spinner"></div>
<h3>數據準備中</h3>
<p>正在自動重新加載數據,請稍後...</p>
<script>
// 1.5 秒後嘗試重載當前頁面
setTimeout(function() {{
window.location.reload();
}}, 1500);
// 若重試 3 次仍失敗,引導回主頁
let retryCount = parseInt(sessionStorage.getItem('abc_retry') || '0');
if (retryCount > 3) {{
sessionStorage.removeItem('abc_retry');
alert('數據載入過久,請先在業績分析主頁重新整理。');
window.location.href = '/sales_analysis';
}} else {{
sessionStorage.setItem('abc_retry', retryCount + 1);
}}
</script>
</div>
</body>
</html>
''', 200
# 恢復欄位變數
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_price = cols_map.get('price')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
# 3. 執行 ABC 分類
items = []
total_revenue = 0
if col_amount and not target_df.empty:
# V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
agg_rules = {col_amount: 'sum'}
if col_qty: agg_rules[col_qty] = 'sum'
if col_cost: agg_rules[col_cost] = 'sum'
if col_profit: agg_rules[col_profit] = 'sum'
if col_category: agg_rules[col_category] = 'first'
if col_vendor: agg_rules[col_vendor] = 'first'
if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
# 重新計算聚合後的毛利率
if col_profit:
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['calculated_margin_rate'] = 0.0
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 執行 ABC 排序與計算
df_agg = df_agg.sort_values(by=col_amount, ascending=False)
df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
total_revenue = df_agg[col_amount].sum()
df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
choices = ['A', 'B']
df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
# 4. 篩選特定類別
class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
# V-New: 計算平均單價與庫存建議
if col_qty:
class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
# V-New: 處理動態補貨係數
custom_factor = request.args.get('factor')
current_factor = 0.0
if custom_factor:
try:
current_factor = float(custom_factor)
except:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
else:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
items = class_df.to_dict('records')
# 準備標題與描述
class_info = {
'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
}
info = class_info.get(target_class, {'title': f'{target_class}', 'desc': '', 'color': 'secondary'})
# 計算 DataTables 預設排序欄位 (銷售金額) 的索引
# 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
sort_col_index = 1 # Rank
if col_pid: sort_col_index += 1
sort_col_index += 1 # Name
if col_brand: sort_col_index += 1
if col_vendor: sort_col_index += 1
if col_category: sort_col_index += 1
if col_cost or col_profit: sort_col_index += 1
if col_qty: sort_col_index += 3
# 此時 sort_col_index 即為 Amount 欄位的索引
return render_template('abc_analysis_detail.html',
items=items,
info=info,
target_class=target_class,
current_factor=current_factor, # V-New: 傳遞當前係數
total_revenue=total_revenue,
sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
# 傳遞當前查詢參數以供匯出連結使用
query_string=request.query_string.decode())
except Exception as e:
sys_log.error(f"ABC Detail Error: {e}")
return f"系統錯誤: {e}"
@app.route('/logs')
def show_logs():
return render_template('logs.html')
@app.route('/api/logs')
def get_logs_api():
if os.path.exists(LOG_FILE_PATH):
try:
with open(LOG_FILE_PATH, 'r', encoding='utf-8') as f:
return jsonify({"logs": "".join(f.readlines()[-60:])})
except Exception as e:
sys_log.error(f"[Web] [Logs] ❌ 日誌 API 讀取異常 | Error: {e}")
return jsonify({"logs": "讀取日誌異常"})
return jsonify({"logs": "等待系統啟動中..."})
@app.route('/api/backup', methods=['POST'])
@login_required
def trigger_backup():
"""API: 觸發系統完整備份"""
# Note: [功能] 尚未實作「系統還原」功能 (Restore),需評估安全性後加入
try:
sys_log.info("[System] [Backup] 💾 開始執行系統完整備份...")
backup_dir = os.path.join(BASE_DIR, 'backups')
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')
zip_filename = f"momo_system_backup_{SYSTEM_VERSION}_{timestamp}.zip"
zip_filepath = os.path.join(backup_dir, zip_filename)
with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(BASE_DIR):
# 排除不必要的目錄
dirs[:] = [d for d in dirs if d not in ['backups', '__pycache__', 'venv', '.git', '.idea', '.vscode', 'node_modules']]
for file in files:
if file == zip_filename: continue # 跳過正在寫入的檔案
if file.endswith('.pyc') or file.endswith('.DS_Store'): continue
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, BASE_DIR)
zipf.write(file_path, arcname)
sys_log.info(f"[System] [Backup] ✅ 系統備份完成 | File: {zip_filename}")
# V-New: 回傳下載連結
download_url = url_for('download_backup', filename=zip_filename)
return jsonify({
"status": "success",
"message": f"備份成功!\n檔案已儲存為: {zip_filename}\n即將開始下載...",
"download_url": download_url
})
except Exception as e:
sys_log.error(f"[System] [Backup] ❌ 備份失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/api/backup/download/<path:filename>')
@login_required
def download_backup(filename):
"""
API: 下載備份檔案(已加入路徑遍歷防護)
"""
try:
backup_dir = os.path.join(BASE_DIR, 'backups')
# 使用 safe_join 驗證路徑,防止路徑遍歷攻擊
safe_path = safe_join(backup_dir, filename)
# 確保檔案存在
if not safe_path.exists():
sys_log.warning(f"[Security] 備份檔案不存在 | File: {filename}")
return jsonify({'error': '檔案不存在'}), 404
# 確保是檔案而非目錄
if not safe_path.is_file():
sys_log.warning(f"[Security] 嘗試下載非檔案路徑 | Path: {filename}")
return jsonify({'error': '非法路徑'}), 400
return send_from_directory(backup_dir, safe_path.name, as_attachment=True)
except ValueError as e:
# safe_join 偵測到路徑遍歷嘗試
sys_log.error(f"[Security] 路徑遍歷攻擊嘗試被阻擋 | Filename: {filename} | Error: {e}")
return jsonify({'error': '非法路徑'}), 400
except Exception as e:
sys_log.error(f"[System] 下載備份失敗 | Error: {e}")
return jsonify({'error': '下載失敗'}), 500
# ================= 📊 V-New: 業績分析報表 =================
def _get_filtered_sales_data(cache_key):
"""
🚩 共用函式:從快取讀取資料並根據 request.args 進行篩選
回傳: (target_df, cols_map, error_message)
參數: cache_key - 快取鍵值 (例如: "realtime_sales_monthly_3m")
"""
db = DatabaseManager()
# 1. 檢查資料表與快取
df = None
cols_map = {}
if cache_key in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[cache_key]
df = cache_data['df']
cols_map = cache_data['cols']
else:
# 快取不存在時,直接回傳錯誤讓呼叫端顯示 spinner 導回 sales_analysis
# 不在此發起全表 DB 查詢748k 行會 hang Gunicorn worker
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),回傳錯誤讓 UI 導回 sales_analysis")
return None, {}, f"快取未就緒,請先從業績分析主頁載入資料 (cache_key={cache_key})"
if False: # 保留舊冷快取重載邏輯(已停用,避免全表掃描 hang
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),試圖重新從資料庫載入...")
try:
# V-Fix: 從 cache_key 提取 table_name
# 格式: realtime_sales_monthly_3m 或 realtime_sales_monthly_custom_2025-01-01_2025-01-31
if "_custom_" in cache_key:
table_name = cache_key.split('_custom_')[0] # realtime_sales_monthly
else:
# 移除最後的 _Xm 部分
parts = cache_key.rsplit('_', 1)
table_name = parts[0] if len(parts) > 1 else 'realtime_sales_monthly'
# 判斷是自訂區間還是標配區間
if "_custom_" in cache_key:
# 格式: realtime_sales_monthly_custom_2025-01-01_2025-01-31
parts = cache_key.split('_custom_')
dates = parts[1].split('_')
start_d, end_d = dates[0], dates[1]
# 呼叫資料庫讀取 (不傳入 view, 會自動處理欄位映射)
result_df, result_cols = db.get_sales_data(table_name=table_name, start_date=start_d, end_date=end_d)
else:
# 格式: realtime_sales_monthly_1mmonths=0 表示全時段但上限 12 個月避免全表掃描 hang
months = int(cache_key.split('_')[-1].replace('m', '') or '12')
if months == 0:
months = 12
result_df, result_cols = db.get_sales_data(table_name=table_name, months=months)
if result_df is not None and not result_df.empty:
# V-Fix (2026-01-23): 補回所有日期維度欄位供後續篩選 (_dow, _hour, _month_str)
if '日期' in result_df.columns:
# 先轉換為 datetime
result_df['_parsed_date'] = pd.to_datetime(result_df['日期'], errors='coerce')
result_df['_month_str'] = result_df['_parsed_date'].dt.strftime('%Y-%m')
result_df['_dow'] = result_df['_parsed_date'].dt.dayofweek
# 小時需要從「時間」欄位提取
if '時間' in result_df.columns:
result_df['_hour'] = pd.to_datetime(result_df['時間'], format='%H:%M:%S', errors='coerce').dt.hour
else:
result_df['_hour'] = 0 # 如果沒有時間欄位,預設為 0
# 清理臨時欄位
result_df.drop(columns=['_parsed_date'], inplace=True, errors='ignore')
# 自動存入快取
_SALES_PROCESSED_CACHE[cache_key] = {'df': result_df, 'cols': result_cols, 'time': time.time()}
df = result_df
cols_map = result_cols
sys_log.info(f"[Sales Analysis] ✅ 快取成功自動重載 | 筆數: {len(df)}")
else:
return None, None, "資料庫無可用資料,請確認匯入狀態"
except Exception as ex:
sys_log.error(f"[Sales Analysis] 🚨 自動重載失敗: {ex}")
return None, None, f"快取失效且無法重載: {ex}"
# 恢復欄位變數
col_name = cols_map.get('name')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_activity = cols_map.get('activity')
col_payment = cols_map.get('payment')
col_price = cols_map.get('price')
col_date = cols_map.get('date')
col_return_qty = cols_map.get('return_qty') # V-New: 取得退貨欄位
# 2. 取得篩選參數
selected_category = request.args.get('category', 'all')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
selected_activity = request.args.get('activity', 'all')
selected_payment = request.args.get('payment', 'all')
selected_dow = request.args.get('dow', 'all')
selected_hour = request.args.get('hour', 'all')
selected_month = request.args.get('month', 'all')
keyword = request.args.get('keyword', '').strip()
min_price = request.args.get('min_price', '')
max_price = request.args.get('max_price', '')
min_margin = request.args.get('min_margin', '')
max_margin = request.args.get('max_margin', '')
# 3. 執行篩選
target_df = df
# Top N 分類處理 (用於 '其他' 篩選)
TOP_N_CATS = 12
top_cats_names = []
if col_category:
# 注意:這裡為了效能,簡單重算一次 Top N或可考慮也快取起來
cat_group_all = df.groupby(col_category)[cols_map.get('amount')].sum().sort_values(ascending=False)
if len(cat_group_all) > TOP_N_CATS:
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
if selected_category != 'all' and col_category:
if selected_category == '其他' and top_cats_names:
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
else:
target_df = target_df[target_df[col_category] == selected_category]
if selected_brand != 'all' and col_brand: target_df = target_df[target_df[col_brand] == selected_brand]
if selected_vendor != 'all' and col_vendor: target_df = target_df[target_df[col_vendor] == selected_vendor]
if selected_activity != 'all' and col_activity: target_df = target_df[target_df[col_activity] == selected_activity]
if selected_payment != 'all' and col_payment: target_df = target_df[target_df[col_payment] == selected_payment]
if selected_dow != 'all' and col_date: target_df = target_df[target_df['_dow'] == int(selected_dow)]
if selected_hour != 'all' and col_date: target_df = target_df[target_df['_hour'] == int(selected_hour)]
if selected_month != 'all' and col_date: target_df = target_df[target_df['_month_str'] == selected_month]
if keyword: target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
if col_price:
if min_price: target_df = target_df[target_df[col_price] >= float(min_price)]
if max_price: target_df = target_df[target_df[col_price] <= float(max_price)]
if min_margin: target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
if max_margin: target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
return target_df, cols_map, None
# V-Opt: API 層級快取 (減少重複查詢)
@@ -1451,98 +928,6 @@ def prepare_category_summary(df, date_str=None, is_month_view=False, month_start
return category_df.to_dict('records')
# V-New 2026-01-15: 行銷活動業績聚合函數
def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'):
"""
準備行銷活動業績貢獻數據
支援單日模式和月度模式,並可指定排序維度 (revenue, qty, profit)
"""
# 決定使用的數據範圍
if is_month_view and month_start is not None and month_end is not None:
target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
elif selected_date is not None:
target_df = df[df['snapshot_date'] == selected_date]
else:
target_df = df
if target_df.empty:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
cols = target_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty'])
col_profit = find_col(cols, ['毛利', 'Profit', '利潤'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
if not col_amount:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
# 定義四種行銷活動欄位
marketing_cols = {
'coupon': '折價券活動名稱', # 折價券活動
'discount': '折扣活動名稱', # 折扣活動
'bonus': '滿額再折扣活動名稱', # 滿額再折扣
'click': '點我再折扣' # 點我再折扣
}
result = {}
# 確保 sort_by 欄位存在,否則退回 revenue
actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue'
for key, col_name in marketing_cols.items():
if col_name not in cols:
result[key] = []
continue
# 篩選有該行銷活動的記錄
activity_df = target_df[
(target_df[col_name].notna()) &
(target_df[col_name] != '') &
(target_df[col_name] != '0') &
(target_df[col_name] != 0)
]
if activity_df.empty:
result[key] = []
continue
# 聚合計算
agg_args = {
'revenue': (col_amount, 'sum'),
'order_count': (col_amount, 'count')
}
if col_qty: agg_args['qty'] = (col_qty, 'sum')
if col_profit: agg_args['profit'] = (col_profit, 'sum')
grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index()
# 若需要手動計算毛利 (金額 - 成本)
if 'profit' not in agg_args and col_cost:
cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index()
grouped = grouped.merge(cost_agg, on=col_name)
grouped['profit'] = grouped['revenue'] - grouped[col_cost]
grouped = grouped.rename(columns={col_name: 'name'})
# 動態排序
sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue'
grouped = grouped.sort_values(sort_col, ascending=False).head(15)
# 轉為字典列表
records = []
for _, row in grouped.iterrows():
record = {
'name': str(row['name'])[:50],
'revenue': float(row['revenue']),
'order_count': int(row['order_count'])
}
if 'qty' in row: record['qty'] = float(row['qty'])
if 'profit' in row: record['profit'] = float(row['profit'])
records.append(record)
result[key] = records
return result
def get_taiwan_holiday(date):

View File

@@ -24,7 +24,7 @@ CONFIG_PATH = os.path.join(os.path.dirname(__file__), 'data', 'crawler_config.js
@crawler_bp.route('/crawler_management')
def crawler_management_page():
"""爬蟲管理頁面 - 重定向到整合後的 settings 頁面"""
return redirect(url_for('settings'))
return redirect(url_for('system_public.settings'))
@crawler_bp.route('/api/crawlers', methods=['GET'])
def get_crawlers():

View File

@@ -1413,6 +1413,180 @@ def growth_analysis():
return f"系統錯誤: {e}"
@sales_bp.route('/abc_analysis/detail')
def abc_analysis_detail():
"""ABC 分析詳細報表頁面"""
try:
target_class = request.args.get('class', 'A') # 預設 A 類
table_name = 'realtime_sales_monthly'
# 1. 生成與主頁面一致的 cache_key
data_range_months = int(request.args.get('data_range', '0') or '0')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 2. 使用共用篩選函式取得資料
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
# V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
if err and table_name in _SALES_PROCESSED_CACHE:
target_df, cols_map, err = _get_filtered_sales_data(table_name)
if err:
# V-Fix: 如果自動重載也失敗,則顯示稍後再試,並引導回主頁面
return f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>數據加載中 - WOOO TECH</title>
<style>
body {{ font-family: sans-serif; display: flex; align-items: center; justify-content: center; height: 100vh; margin: 0; background: #f8f9fa; }}
.card {{ background: white; padding: 2rem; border-radius: 12px; box-shadow: 0 4px 20px rgba(0,0,0,0.08); text-align: center; }}
.spinner {{ border: 3px solid #f3f3f3; border-top: 3px solid #1e3c72; border-radius: 50%; width: 30px; height: 30px; animation: spin 1s linear infinite; margin: 0 auto 1rem; }}
@keyframes spin {{ 0% {{ transform: rotate(0deg); }} 100% {{ transform: rotate(360deg); }} }}
</style>
</head>
<body>
<div class="card">
<div class="spinner"></div>
<h3>數據準備中</h3>
<p>正在自動重新加載數據,請稍後...</p>
<script>
// 1.5 秒後嘗試重載當前頁面
setTimeout(function() {{
window.location.reload();
}}, 1500);
// 若重試 3 次仍失敗,引導回主頁
let retryCount = parseInt(sessionStorage.getItem('abc_retry') || '0');
if (retryCount > 3) {{
sessionStorage.removeItem('abc_retry');
alert('數據載入過久,請先在業績分析主頁重新整理。');
window.location.href = '/sales_analysis';
}} else {{
sessionStorage.setItem('abc_retry', retryCount + 1);
}}
</script>
</div>
</body>
</html>
''', 200
# 恢復欄位變數
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_price = cols_map.get('price')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
# 3. 執行 ABC 分類
items = []
total_revenue = 0
if col_amount and not target_df.empty:
# V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
agg_rules = {col_amount: 'sum'}
if col_qty: agg_rules[col_qty] = 'sum'
if col_cost: agg_rules[col_cost] = 'sum'
if col_profit: agg_rules[col_profit] = 'sum'
if col_category: agg_rules[col_category] = 'first'
if col_vendor: agg_rules[col_vendor] = 'first'
if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
# 重新計算聚合後的毛利率
if col_profit:
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['calculated_margin_rate'] = 0.0
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 執行 ABC 排序與計算
df_agg = df_agg.sort_values(by=col_amount, ascending=False)
df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
total_revenue = df_agg[col_amount].sum()
df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
choices = ['A', 'B']
df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
# 4. 篩選特定類別
class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
# V-New: 計算平均單價與庫存建議
if col_qty:
class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
# V-New: 處理動態補貨係數
custom_factor = request.args.get('factor')
current_factor = 0.0
if custom_factor:
try:
current_factor = float(custom_factor)
except:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
else:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
items = class_df.to_dict('records')
# 準備標題與描述
class_info = {
'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
}
info = class_info.get(target_class, {'title': f'{target_class}', 'desc': '', 'color': 'secondary'})
# 計算 DataTables 預設排序欄位 (銷售金額) 的索引
# 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
sort_col_index = 1 # Rank
if col_pid: sort_col_index += 1
sort_col_index += 1 # Name
if col_brand: sort_col_index += 1
if col_vendor: sort_col_index += 1
if col_category: sort_col_index += 1
if col_cost or col_profit: sort_col_index += 1
if col_qty: sort_col_index += 3
# 此時 sort_col_index 即為 Amount 欄位的索引
return render_template('abc_analysis_detail.html',
items=items,
info=info,
target_class=target_class,
current_factor=current_factor, # V-New: 傳遞當前係數
total_revenue=total_revenue,
sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
# 傳遞當前查詢參數以供匯出連結使用
query_string=request.query_string.decode())
except Exception as e:
sys_log.error(f"ABC Detail Error: {e}")
return f"系統錯誤: {e}"
# ==========================================
# API 路由
# ==========================================

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""公開系統頁面與健康檢查路由。
此 blueprint 無 url_prefix保留外部監控與既有前端使用的公開 URL。
"""
import os
import zipfile
from datetime import datetime, timezone, timedelta
from flask import Blueprint, Response, jsonify, render_template, send_from_directory, url_for
from sqlalchemy import text
from auth import login_required
from config import BASE_DIR, DATABASE_TYPE, SYSTEM_VERSION
from database.manager import DatabaseManager
from database.models import Product, PriceRecord
from services.json_storage import load_categories
from services.logger_manager import SystemLogger
from utils.security import safe_join
system_public_bp = Blueprint('system_public', __name__)
sys_log = SystemLogger("SystemPublicRoutes").get_logger()
TAIPEI_TZ = timezone(timedelta(hours=8))
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = os.getenv('PUBLIC_URL', '服務啟動中...')
@system_public_bp.route('/health')
def health_check():
"""健康檢查端點 - 供 Nginx 和 Docker healthcheck 使用"""
try:
return jsonify({
'status': 'healthy',
'database': DATABASE_TYPE,
'version': SYSTEM_VERSION
}), 200
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e)
}), 500
@system_public_bp.route('/metrics')
def prometheus_metrics():
"""Prometheus 指標端點 - 供 Prometheus 抓取監控資料"""
try:
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Gauge, CollectorRegistry
registry = CollectorRegistry()
app_info = Gauge('momo_app_info', '應用程式資訊', ['version', 'database_type'], registry=registry)
app_info.labels(version=SYSTEM_VERSION, database_type=DATABASE_TYPE).set(1)
app_health = Gauge('momo_app_health', '應用程式健康狀態', registry=registry)
db_status = Gauge('momo_database_up', '資料庫連線狀態', registry=registry)
try:
db = DatabaseManager()
with db.engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status.set(1)
app_health.set(1)
except Exception:
db_status.set(0)
app_health.set(0)
try:
db = DatabaseManager()
session = db.get_session()
product_count = Gauge('momo_products_total', '商品總數', registry=registry)
product_count.set(session.query(Product).count())
price_record_count = Gauge('momo_price_records_total', '價格記錄總數', registry=registry)
price_record_count.set(session.query(PriceRecord).count())
from database.realtime_sales_models import RealtimeSalesMonthly
sales_count = Gauge('momo_sales_records_total', '業績資料總數', registry=registry)
sales_count.set(session.query(RealtimeSalesMonthly).count())
session.close()
except Exception as e:
sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}")
return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST)
except ImportError:
metrics_text = f"""# HELP momo_app_health 應用程式健康狀態
# TYPE momo_app_health gauge
momo_app_health 1
# HELP momo_app_info 應用程式資訊
# TYPE momo_app_info gauge
momo_app_info{{version="{SYSTEM_VERSION}",database_type="{DATABASE_TYPE}"}} 1
"""
return Response(metrics_text, mimetype='text/plain; charset=utf-8')
except Exception as e:
sys_log.error(f"[Metrics] 指標生成錯誤: {e}")
return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500
@system_public_bp.route('/settings')
def settings():
"""分類設定頁面"""
categories = load_categories()
return render_template('settings.html',
categories=categories,
public_url=public_url,
system_version=SYSTEM_VERSION)
@system_public_bp.route('/system_settings')
def system_settings_page():
"""系統設定與匯入頁面"""
return render_template('system_settings.html', system_version=SYSTEM_VERSION)
@system_public_bp.route('/logs')
def show_logs():
return render_template('logs.html')
@system_public_bp.route('/api/logs')
def get_logs_api():
if os.path.exists(LOG_FILE_PATH):
try:
with open(LOG_FILE_PATH, 'r', encoding='utf-8') as f:
return jsonify({"logs": "".join(f.readlines()[-60:])})
except Exception as e:
sys_log.error(f"[Web] [Logs] ❌ 日誌 API 讀取異常 | Error: {e}")
return jsonify({"logs": "讀取日誌異常"})
return jsonify({"logs": "等待系統啟動中..."})
@system_public_bp.route('/api/backup', methods=['POST'])
@login_required
def trigger_backup():
"""API: 觸發系統完整備份"""
try:
sys_log.info("[System] [Backup] 💾 開始執行系統完整備份...")
backup_dir = os.path.join(BASE_DIR, 'backups')
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')
zip_filename = f"momo_system_backup_{SYSTEM_VERSION}_{timestamp}.zip"
zip_filepath = os.path.join(backup_dir, zip_filename)
with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(BASE_DIR):
dirs[:] = [d for d in dirs if d not in ['backups', '__pycache__', 'venv', '.git', '.idea', '.vscode', 'node_modules']]
for file in files:
if file == zip_filename:
continue
if file.endswith('.pyc') or file.endswith('.DS_Store'):
continue
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, BASE_DIR)
zipf.write(file_path, arcname)
sys_log.info(f"[System] [Backup] ✅ 系統備份完成 | File: {zip_filename}")
download_url = url_for('system_public.download_backup', filename=zip_filename)
return jsonify({
"status": "success",
"message": f"備份成功!\n檔案已儲存為: {zip_filename}\n即將開始下載...",
"download_url": download_url
})
except Exception as e:
sys_log.error(f"[System] [Backup] ❌ 備份失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@system_public_bp.route('/api/backup/download/<path:filename>')
@login_required
def download_backup(filename):
"""API: 下載備份檔案(已加入路徑遍歷防護)"""
try:
backup_dir = os.path.join(BASE_DIR, 'backups')
safe_path = safe_join(backup_dir, filename)
if not safe_path.exists():
sys_log.warning(f"[Security] 備份檔案不存在 | File: {filename}")
return jsonify({'error': '檔案不存在'}), 404
if not safe_path.is_file():
sys_log.warning(f"[Security] 嘗試下載非檔案路徑 | Path: {filename}")
return jsonify({'error': '非法路徑'}), 400
return send_from_directory(backup_dir, safe_path.name, as_attachment=True)
except ValueError as e:
sys_log.error(f"[Security] 路徑遍歷攻擊嘗試被阻擋 | Filename: {filename} | Error: {e}")
return jsonify({'error': '非法路徑'}), 400
except Exception as e:
sys_log.error(f"[System] 下載備份失敗 | Error: {e}")
return jsonify({'error': '下載失敗'}), 500