Files
ewoooc/app.py
OoO 567f138b2d refactor(routes): 搬遷 sales 實作並刪除 app.py 重複路由
ADR-017 Phase 3f-1 sales sprint;sales_bp 改為真實實作,移除 app.py 7 條 sales duplicate route,保留 /growth_analysis Blueprint 版。
2026-04-29 21:16:55 +08:00

1860 lines
78 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ================= TODO LIST (待辦事項 - 重開機後請依序執行) =================
# 1. [驗證] 重啟 app.py 後,重新匯入 Excel確認「自動去重」功能是否生效 (重複匯入應顯示 0 筆新增)。
# 2. [檢查] 前往 /sales_analysis 頁面,確認 '狀態' 欄位是否正確顯示 'F' (目前為原始匯入模式)。
# 3. [決策] 若資料顯示正常,評估是否需要恢復「智慧資料清理」邏輯 (目前程式碼第 1160 行左右已註解)。
# 4. [備份] 確認系統運作正常後,執行系統備份。
# =======================================================================
import os
import sys
import time
import threading
import math
import json
import hashlib
import shutil
import zipfile
import re
import io # V-New: 用於 Excel 匯出
import traceback # V-Fix: 用於錯誤追蹤
from datetime import datetime, timedelta, timezone
# ================= 🔧 1. 環境與路徑鎖定 =================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 確保專案根目錄在 sys.path 的最前面,優先讀取本地模組
sys.path.insert(0, BASE_DIR)
# 自動檢核並建立必要目錄
try:
for folder in ['database', 'services', 'crawler', 'logs', 'data', 'web/templates', 'web/static']:
folder_path = os.path.join(BASE_DIR, folder)
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# 僅針對 Python 套件目錄建立 __init__.py
if 'web' not in folder:
init_file = os.path.join(folder_path, '__init__.py')
if not os.path.exists(init_file):
with open(init_file, 'w') as f: pass
except OSError as e:
print(f"❌ 系統初始化失敗: 無法建立目錄或檔案 (磁碟可能已滿) - {e}")
# ================= 🔧 2. 核心模組導入 =================
try:
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, send_from_directory, flash, session
from werkzeug.utils import secure_filename
from pyngrok import ngrok, conf
import schedule
from sqlalchemy import desc, and_, func, text, literal, case
from sqlalchemy import inspect # V-New: 用於檢查資料表是否存在
from sqlalchemy.orm import joinedload
import pandas as pd # type: ignore
from pandas.api.types import is_numeric_dtype # type: ignore
import numpy as np # type: ignore # V-Opt: 引入 numpy 進行向量化運算加速
# 導入自定義模組
try:
from scheduler import run_momo_task, run_edm_task, run_festival_task, run_auto_import_task, run_whitepage_check, run_competitor_price_feeder_task
from database.manager import DatabaseManager
from database.models import Base, Product, PriceRecord, MonthlySummaryAnalysis
from database.edm_models import PromoProduct
except ImportError as e:
print(f"❌ 專案內部檔案缺失: {e}\n請檢查 database/ 或 services/ 目錄下的 .py 檔案是否存在。")
sys.exit(1)
from services.logger_manager import SystemLogger
from services.exporter import Exporter # 🚩 導入匯出模組
except ImportError as e:
print(f"❌ 關鍵套件導入失敗: {e}")
sys.exit(1)
# ================= 🔧 3. 系統核心配置 =================
# 從 config.py 匯入必要的設定
from config import EXCEL_EXPORT_DIR, DATABASE_TYPE, validate_critical_config
sys_log = SystemLogger("Web_Server").get_logger()
# 驗證選用配置,缺少時輸出 warning非 fatal
for _warn in validate_critical_config():
sys_log.warning(_warn)
# 🚩 V-Opt: 全域資料快取 (用於加速業績分析)
_SALES_DF_CACHE = {} # 已棄用,保留相容性
_SALES_PROCESSED_CACHE = {} # V-Opt: 處理後資料快取
_SALES_CACHE_MAX_ENTRIES = 10 # V-Opt (2026-01-23): 快取最大條目數
_SALES_CACHE_TTL = 600 # V-Opt (2026-01-23): 快取有效期 10 分鐘
def _cleanup_sales_cache():
"""清理過期和過多的快取條目"""
global _SALES_PROCESSED_CACHE
current_time = time.time()
# 1. 清理過期條目
expired_keys = [
k for k, v in _SALES_PROCESSED_CACHE.items()
if v.get('time') and current_time - v['time'] > _SALES_CACHE_TTL
]
for k in expired_keys:
del _SALES_PROCESSED_CACHE[k]
# 2. 如果仍超過限制,刪除最舊的條目
if len(_SALES_PROCESSED_CACHE) > _SALES_CACHE_MAX_ENTRIES:
sorted_items = sorted(
[(k, v.get('time', 0)) for k, v in _SALES_PROCESSED_CACHE.items()],
key=lambda x: x[1]
)
# 保留最新的 _SALES_CACHE_MAX_ENTRIES 條
keys_to_delete = [k for k, _ in sorted_items[:-_SALES_CACHE_MAX_ENTRIES]]
for k in keys_to_delete:
del _SALES_PROCESSED_CACHE[k]
if expired_keys or len(_SALES_PROCESSED_CACHE) > _SALES_CACHE_MAX_ENTRIES - 2:
sys_log.debug(f"[Cache] 清理快取: 移除 {len(expired_keys)} 條過期, 剩餘 {len(_SALES_PROCESSED_CACHE)}")
# 🚩 V-New: 商品看板資料快取 (用於加速首頁載入)
_DASHBOARD_DATA_CACHE = {
'consolidated_data': None, # get_consolidated_data() 結果
'consolidated_timestamp': None, # 快取時間戳記
'stats_data': None, # 統計資料
'stats_timestamp': None # 統計資料時間戳記
}
_DASHBOARD_CACHE_TTL = 300 # 快取有效期 5 分鐘(秒)
# 🚩 檢查磁碟空間 (V9.52 新增)
try:
total, used, free = shutil.disk_usage(BASE_DIR)
if free < 200 * 1024 * 1024: # 小於 200MB
sys_log.critical(f"[System] [DISK_CHECK] 🚨 嚴重警告: 磁碟空間極低 | Free: {free // (1024*1024)} MB")
elif free < 1024 * 1024 * 1024: # 小於 1GB
sys_log.warning(f"[System] [DISK_CHECK] ⚠️ 警告: 磁碟空間不足 1GB | Free: {free // (1024*1024)} MB")
except Exception as e:
sys_log.error(f"無法檢測磁碟空間: {e}")
# 🚩 系統版本定義 (備份與顯示用)
# 🚩 2026-04-19 V10.3: 技術債清零 — Migration 010/011、retry queue 持久化、
# NemoTron store_insight 雙寫、import 前置欄位防禦、時間衰減 RAG
SYSTEM_VERSION = "V10.3"
# ==========================================
# 🔒 SQL Injection 防護函數
# ==========================================
# 允許的資料表白名單
# 安全工具:實作已搬至 utils/security.py此處 re-export 維持向後相容
from utils.security import ( # noqa: E402
ALLOWED_TABLES,
validate_table_name,
validate_column_names,
)
# 安全工具:路徑遍歷 + 檔案上傳驗證 + safe_read_sql 已搬至 utils/security.py
from utils.security import ( # noqa: E402
safe_read_sql,
safe_join,
ALLOWED_UPLOAD_EXTENSIONS,
ALLOWED_MIME_TYPES,
secure_filename_unicode,
allowed_file,
validate_upload_file,
)
# 🚩 資料庫結構自動修復 (V9.53 新增) — 實作搬至 database/schema_repair.py
from database.schema_repair import repair_database_schema # noqa: E402, F401
# 從環境變數讀取 NGROK_AUTH_TOKEN如果未設定則使用原值但會發出警告
NGROK_AUTH_TOKEN = os.getenv('NGROK_AUTH_TOKEN', '36e27NM5V7sUJ8QxJIAAWCp7sUv_3brtcrBarYvcP3SbvFKhF')
if NGROK_AUTH_TOKEN == '36e27NM5V7sUJ8QxJIAAWCp7sUv_3brtcrBarYvcP3SbvFKhF':
sys_log.warning("[Security] ⚠️ 使用預設 NGROK_AUTH_TOKEN請設定環境變數")
conf.get_default().auth_token = NGROK_AUTH_TOKEN
TEMPLATE_DIR = BASE_DIR # 修正:根據檔案結構,模板位於根目錄
TEMPLATE_DIR_NEW = os.path.join(BASE_DIR, 'templates') # 新模板路徑(模組化)
STATIC_DIR = os.path.join(BASE_DIR, 'web/static')
# 檢查關鍵模板是否存在
if not os.path.exists(os.path.join(BASE_DIR, 'dashboard.html')):
sys_log.warning(f"[Web] [Template] ⚠️ 警告: 找不到 dashboard.html | Path: {TEMPLATE_DIR}")
app = Flask(__name__,
template_folder=TEMPLATE_DIR,
static_folder=STATIC_DIR)
# 設定多路徑模板載入器(同時支援根目錄和 templates/ 目錄)
from jinja2 import FileSystemLoader, ChoiceLoader
app.jinja_loader = ChoiceLoader([
FileSystemLoader(TEMPLATE_DIR_NEW), # templates/ 目錄優先
FileSystemLoader(TEMPLATE_DIR), # 根目錄備用
])
# ==========================================
# 🔒 Flask 安全配置
# ==========================================
# 從 config.py 導入 SECRET_KEY
from config import SECRET_KEY
# 基本配置
app.config['SECRET_KEY'] = SECRET_KEY
# Session 安全配置
app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止 JavaScript 存取 cookie防 XSS
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # 防止 CSRF 攻擊
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24) # Session 有效期 24 小時(延長避免長時間閒置斷線)
# 如果使用 HTTPS啟用 SECURE cookie本地開發時應設為 False
# 注意:如果您的系統部署在 HTTPS 環境,請將 .env 中的 USE_HTTPS 設為 true
USE_HTTPS = os.getenv('USE_HTTPS', 'false').lower() == 'true'
if USE_HTTPS:
app.config['SESSION_COOKIE_SECURE'] = True
sys_log.info("[Security] ✅ HTTPS 模式已啟用Session cookie 僅透過 HTTPS 傳輸")
else:
app.config['SESSION_COOKIE_SECURE'] = False
sys_log.warning("[Security] ⚠️ HTTP 模式開發環境Session cookie 未強制 HTTPS")
# 檔案上傳大小限制10MB
# V-New: 提高檔案上傳大小限制 (從 10MB 提高到 100MB)
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
sys_log.info("[Security] ✅ Flask 安全配置已載入")
sys_log.info(f"[Security] • Session 有效期: 2 小時")
sys_log.info(f"[Security] • 檔案上傳限制: 10 MB")
sys_log.info(f"[Security] • CSRF 防護: SameSite=Lax")
sys_log.info(f"[Security] • XSS 防護: HttpOnly=True")
# ==========================================
# 🔒 CSRF 防護配置
# ==========================================
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
sys_log.info("[Security] ✅ CSRF 防護已啟用 (Flask-WTF)")
# ==========================================
# 🔧 Blueprint 註冊 - 廠商缺貨系統
# ==========================================
from routes.vendor_routes import vendor_bp
app.register_blueprint(vendor_bp)
sys_log.info("[Blueprint] ✅ 廠商缺貨系統 Blueprint 已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - Google Drive 自動匯入
# ==========================================
from routes.auto_import_routes import auto_import_bp
app.register_blueprint(auto_import_bp)
csrf.exempt(auto_import_bp)
sys_log.info("[Blueprint] ✅ Google Drive 自動匯入 Blueprint 已註冊 (CSRF 已豁免)")
# ==========================================
# 🔧 Blueprint 註冊 - 爬蟲管理系統
# ==========================================
from routes.crawler_management_routes import crawler_bp
app.register_blueprint(crawler_bp)
sys_log.info("[Blueprint] ✅ 爬蟲管理系統 Blueprint 已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - AI 智慧文案系統
# ==========================================
from routes.ai_routes import ai_bp
app.register_blueprint(ai_bp)
csrf.exempt(ai_bp) # ICAIM API 使用內部呼叫,不需要 CSRF
sys_log.info("[Blueprint] ✅ AI 智慧文案系統 Blueprint 已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - CI/CD Dashboard
# ==========================================
from routes.cicd_routes import cicd_bp
app.register_blueprint(cicd_bp)
csrf.exempt(cicd_bp) # CI/CD API doesn't need CSRF
sys_log.info("[Blueprint] CI/CD Dashboard Blueprint registered")
# ==========================================
# 🔧 Blueprint 註冊 - Code Review 系統
# ==========================================
try:
from routes.code_review_routes import code_review_bp
app.register_blueprint(code_review_bp)
csrf.exempt(code_review_bp) # Code Review API 使用內部認證,不需要 CSRF
sys_log.info("[Blueprint] ✅ Code Review 系統 Blueprint 已註冊 (CSRF 已豁免)")
except Exception as _e:
sys_log.warning(f"[Blueprint] ⚠️ Code Review 系統 Blueprint 註冊失敗: {_e}")
# ==========================================
# 🔧 Blueprint 註冊 - 趨勢資料系統
# ==========================================
from routes.trend_routes import trend_bp
app.register_blueprint(trend_bp)
sys_log.info("[Blueprint] ✅ 趨勢資料系統 Blueprint 已註冊")
# ==========================================
# 🔒 Auth 路由註冊 - 登入/登出
# ==========================================
from auth import init_auth_routes, login_required
init_auth_routes(app)
sys_log.info("[Auth] ✅ 登入/登出路由已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - 用戶管理系統
# ==========================================
from routes.user_routes import user_bp
app.register_blueprint(user_bp)
sys_log.info("[Blueprint] ✅ 用戶管理系統 Blueprint 已註冊")
# ==========================================
# 🚨 Blueprint 註冊 - 系統告警
# ==========================================
from routes.alert_routes import alert_bp
app.register_blueprint(alert_bp)
csrf.exempt(alert_bp)
sys_log.info("[Blueprint] ✅ 系統告警 Blueprint 已註冊 (CSRF 已豁免)")
# ==========================================
# 系統管理路由 Blueprint
# ==========================================
from routes.system_routes import system_bp
app.register_blueprint(system_bp)
csrf.exempt(system_bp) # n8n API 需要豁免 CSRF
sys_log.info("[Blueprint] ✅ 系統管理 Blueprint 已註冊 (CSRF 已豁免)")
from routes.category_routes import category_bp
app.register_blueprint(category_bp)
sys_log.info("[Blueprint] ✅ 分類 CRUD Blueprint 已註冊")
from routes.misc_routes import misc_bp
app.register_blueprint(misc_bp)
sys_log.info("[Blueprint] ✅ 雜項 Routes Blueprint 已註冊 (/api/test_url, /brand_assets)")
# ==========================================
# 通知模板管理 Blueprint
# ==========================================
from routes.notification_routes import notification_bp
app.register_blueprint(notification_bp)
csrf.exempt(notification_bp) # n8n API 需要豁免 CSRF
sys_log.info("[Blueprint] ✅ 通知模板管理 Blueprint 已註冊")
# ==========================================
# Bot API Blueprint (Clawdbot 整合)
# ==========================================
from routes.bot_api_routes import bot_api_bp
app.register_blueprint(bot_api_bp)
csrf.exempt(bot_api_bp) # Bot API 使用 Token 認證,不需要 CSRF
sys_log.info("[Blueprint] ✅ Bot API Blueprint 已註冊")
# ==========================================
# Elephant Alpha AI Agent Super Orchestrator Blueprint
# ==========================================
try:
from routes.elephant_alpha_routes import elephant_alpha_bp
app.register_blueprint(elephant_alpha_bp)
csrf.exempt(elephant_alpha_bp) # Elephant Alpha API uses internal auth
sys_log.info("[Blueprint] Elephant Alpha AI Agent Super Orchestrator Blueprint registered")
except Exception as _e:
sys_log.warning(f"[Blueprint] Elephant Alpha registration failed: {_e}")
sys_log.info("[Blueprint] Elephant Alpha features will be unavailable")
# [2026-04-18 台北] OpenClaw Bot Blueprint — 修復 /menu 啞巴 (/bot/telegram/webhook 404)
# 原因routes/openclaw_bot_routes.py 有 5000+ 行完整 telegram bot handler但 app.py 從未 register
# 效果Telegram 送進來的 update (包含 /menu) 能被正確接收與處理
try:
from routes.openclaw_bot_routes import openclaw_bot_bp
app.register_blueprint(openclaw_bot_bp)
csrf.exempt(openclaw_bot_bp) # Telegram webhook 不需要 CSRF
sys_log.info("[Blueprint] ✅ OpenClaw Bot Blueprint 已註冊 (Telegram /menu 復活)")
except Exception as _e:
sys_log.error(f"[Blueprint] ❌ OpenClaw Bot Blueprint 註冊失敗: {_e}")
# P0-12 修復:補齊缺少的 Blueprint 註冊
for _bp_module, _bp_name in [
('routes.api_routes', 'api_bp'),
('routes.edm_routes', 'edm_bp'),
('routes.sales_routes', 'sales_bp'),
('routes.monthly_routes', 'monthly_bp'),
('routes.price_comparison_routes', 'price_comparison_bp'),
('routes.export_routes', 'export_bp'),
('routes.daily_sales_routes', 'daily_sales_bp'),
('routes.dashboard_routes', 'dashboard_bp'),
('routes.import_routes', 'import_bp'),
('routes.pchome_routes', 'pchome_bp'),
]:
try:
import importlib as _il
_mod = _il.import_module(_bp_module)
_bp = getattr(_mod, _bp_name)
app.register_blueprint(_bp)
sys_log.info(f"[Blueprint] ✅ {_bp_name} 已註冊")
except Exception as _e:
sys_log.error(f"[Blueprint] ❌ {_bp_name} 註冊失敗: {_e}")
# V-Fix: 註冊 slugify 函數供模板使用(實作搬至 utils/text_helpers.py
from utils.text_helpers import slugify # noqa: E402
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = "服務啟動中..."
# 🚩 時區設定:台北時間 (UTC+8)
TAIPEI_TZ = timezone(timedelta(hours=8))
EXPECTED_METADATA_TABLES = {
'categories', 'products', 'price_records', 'monthly_summary_analysis',
'users', 'login_history', 'permissions', 'user_permissions',
'promo_products', 'trend_records', 'trend_keywords', 'trend_analysis',
'web_search_cache', 'telegram_users',
'ai_generation_history', 'ai_prompt_templates', 'ai_usage_tracking', 'ai_insights',
'agent_context', 'action_plans', 'action_outcomes', 'agent_strategy_weights',
'incidents', 'playbooks', 'heal_logs',
'import_jobs', 'import_config', 'notification_templates', 'ppt_reports',
'vendor_stockout', 'vendor_list', 'vendor_emails', 'email_send_log',
'realtime_sales_monthly',
}
def verify_metadata_tables():
missing = EXPECTED_METADATA_TABLES - set(Base.metadata.tables.keys())
if missing:
raise SystemExit(f"Base.metadata 漏表: {sorted(missing)}")
verify_metadata_tables()
# ==========================================
# 🔧 全域模板變數注入 (Context Processor)
# ==========================================
from config import METABASE_URL, GRIST_URL
@app.context_processor
def inject_global_vars():
"""注入全域變數到所有模板"""
return {
'metabase_url': METABASE_URL,
'grist_url': GRIST_URL,
'datetime_now': datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'),
}
sys_log.info("[Template] ✅ 全域模板變數已注入 (metabase_url, grist_url)")
# ================= 🛠️ V9.72: 分類設定管理核心 =================
CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json')
# JSON 持久化:實作搬至 services/json_storage.py
from services.json_storage import ( # noqa: E402, F401
load_categories,
save_categories,
load_scheduler_stats,
)
# ================= 🛠️ 數據處理核心 (封裝) =================
# 純工具:實作已搬至 utils/text_helpers.py
from utils.text_helpers import ( # noqa: E402
get_color_for_string,
extract_snapshot_date_from_filename,
number_format as _number_format,
)
@app.template_filter('number_format')
def number_format_filter(value):
"""Jinja filter wrapper — 實作見 utils.text_helpers.number_format。"""
return _number_format(value)
# V-Refactor: 將 find_col 移至全域,方便多個函式共用
from utils.df_helpers import find_col # noqa: E402
def get_consolidated_data():
"""🚩 統一封裝:獲取全分類去重後的當前數據、昨日對比及差值 (V-Opt: 優化查詢效能 + 快取)"""
global _DASHBOARD_DATA_CACHE
# V-New: 檢查快取是否有效
now = datetime.now(TAIPEI_TZ)
if (_DASHBOARD_DATA_CACHE['consolidated_data'] is not None and
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] is not None):
cache_age = (now.timestamp() - _DASHBOARD_DATA_CACHE['consolidated_timestamp'])
if cache_age < _DASHBOARD_CACHE_TTL:
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用快取資料 | 快取年齡: {cache_age:.1f}")
return _DASHBOARD_DATA_CACHE['consolidated_data'], _DASHBOARD_DATA_CACHE['today_start']
sys_log.debug("[Dashboard] [Cache] 🔄 快取過期或不存在,重新查詢資料庫")
db = DatabaseManager()
session = db.get_session()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
seven_days_ago = today_start - timedelta(days=7)
thirty_days_ago = today_start - timedelta(days=30)
try:
# Query 1: Get the latest price record for every product. This is our main list of items.
latest_price_subq = session.query(
func.max(PriceRecord.id).label('max_id')
).group_by(PriceRecord.product_id).subquery()
latest_records = session.query(PriceRecord).options(
joinedload(PriceRecord.product)
).join(latest_price_subq, PriceRecord.id == latest_price_subq.c.max_id).all()
product_ids = [r.product_id for r in latest_records]
if not product_ids:
session.close() # 提前關閉連線
return [], today_start
# Query 2: Get yesterday's closing prices for all products in one go
yesterday_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < today_start
).group_by(PriceRecord.product_id).subquery()
yesterday_prices_q = session.query(
PriceRecord.product_id, PriceRecord.price
).join(
yesterday_prices_subq,
PriceRecord.id == yesterday_prices_subq.c.max_id
)
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
# Query 3: Get specific historical price points (7 days ago and 30 days ago)
# Instead of fetching ALL history, we fetch only the records closest to the target dates.
# This is a significant optimization.
# Helper to get price map for a specific date (start of day)
def get_price_map_before(target_date):
subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.timestamp).label('max_ts')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < target_date
).group_by(PriceRecord.product_id).subquery()
q = session.query(PriceRecord.product_id, PriceRecord.price).join(
subq,
and_(PriceRecord.product_id == subq.c.product_id, PriceRecord.timestamp == subq.c.max_ts)
)
return {pid: price for pid, price in q}
prices_7d_ago_map = get_price_map_before(seven_days_ago + timedelta(days=1)) # Approximate 7 days ago
prices_30d_ago_map = get_price_map_before(thirty_days_ago + timedelta(days=1)) # Approximate 30 days ago
# Query 4: Get TODAY's records only (for sparkline/intraday change)
today_records_q = session.query(PriceRecord).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp >= today_start
).order_by(PriceRecord.product_id, PriceRecord.timestamp).all()
today_map = {}
for r in today_records_q:
if r.product_id not in today_map: today_map[r.product_id] = []
today_map[r.product_id].append(r)
# Final Assembly (in-memory, no more DB queries)
unique_items = []
for r in latest_records:
pid = r.product_id
# 7d/30d stats
price_7d = prices_7d_ago_map.get(pid)
price_30d = prices_30d_ago_map.get(pid)
stats_7d_diff = r.price - price_7d if price_7d is not None else 0
stats_30d_diff = r.price - price_30d if price_30d is not None else 0
# Today's stats
today_records = today_map.get(pid, [])
today_diff = 0
today_changes = []
if len(today_records) > 1:
today_diff = today_records[-1].price - today_records[0].price
# Yesterday diff
y_price = yesterday_prices_map.get(pid)
yesterday_diff = r.price - y_price if y_price is not None else 0
status = "NONE"
if yesterday_diff > 0:
status = "PRICE_UP"
elif yesterday_diff < 0:
status = "PRICE_DOWN"
# Today's changes details
last_p = y_price if y_price is not None else (today_records[0].price if today_records else r.price)
for tr in today_records:
if tr.price != last_p:
diff = tr.price - last_p
today_changes.append({
'time': tr.timestamp.strftime('%H:%M'),
'price': tr.price,
'diff': diff
})
last_p = tr.price
unique_items.append({
'record': r,
'stats': {'7d_diff': stats_7d_diff, '30d_diff': stats_30d_diff, '1d_diff': today_diff},
'yesterday_diff': yesterday_diff,
'today_changes': today_changes,
'status': status
})
# V-New: 更新快取
_DASHBOARD_DATA_CACHE['consolidated_data'] = unique_items
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] = now.timestamp()
_DASHBOARD_DATA_CACHE['today_start'] = today_start
sys_log.debug(f"[Dashboard] [Cache] 💾 快取已更新 | 商品數: {len(unique_items)}")
return unique_items, today_start
finally:
session.close()
def get_dashboard_stats():
"""計算看板統計數據 (供通知使用)"""
db = DatabaseManager()
session = db.get_session()
try:
unique_items, today_start = get_consolidated_data()
today_start_db = today_start.replace(tzinfo=None)
# 1. 漲跌
increase_count = sum(1 for item in unique_items if item['yesterday_diff'] > 0)
decrease_count = sum(1 for item in unique_items if item['yesterday_diff'] < 0)
# 2. 今日新增 (使用與 index 路由相同的邏輯)
new_pids_query = session.query(PriceRecord.product_id).group_by(PriceRecord.product_id).having(func.min(PriceRecord.timestamp) >= today_start_db)
new_product_ids = {r[0] for r in new_pids_query.all()}
new_count = len(new_product_ids)
# 3. 今日下架
today_delisted_count = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start_db
).count()
return {'new': new_count, 'up': increase_count, 'down': decrease_count, 'delisted': today_delisted_count}
except Exception as e:
sys_log.error(f"[Stats] ❌ 計算統計失敗: {e}")
return {'new': 0, 'up': 0, 'down': 0, 'delisted': 0}
finally:
session.close()
# ================= 🛣️ 4. Flask 路由 =================
# Session 自動續期機制
@app.before_request
def refresh_session():
"""
在每次請求時自動刷新 Session避免長時間閒置後突然斷線
只要用戶有任何操作Session 就會自動延長
"""
if session.get('logged_in'):
session.modified = True # 標記 Session 已修改,觸發 Cookie 更新
@app.route('/health')
def health_check():
"""健康檢查端點 - 供 Nginx 和 Docker healthcheck 使用"""
try:
# 簡單檢查資料庫連線
from config import DATABASE_TYPE
return jsonify({
'status': 'healthy',
'database': DATABASE_TYPE,
'version': SYSTEM_VERSION
}), 200
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e)
}), 500
@app.route('/metrics')
def prometheus_metrics():
"""Prometheus 指標端點 - 供 Prometheus 抓取監控資料"""
try:
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Counter, Gauge, CollectorRegistry
from config import DATABASE_TYPE
# 建立獨立的 registry 以避免重複註冊
registry = CollectorRegistry()
# 應用程式資訊
app_info = Gauge('momo_app_info', '應用程式資訊', ['version', 'database_type'], registry=registry)
app_info.labels(version=SYSTEM_VERSION, database_type=DATABASE_TYPE).set(1)
# 應用程式健康狀態 (1=健康, 0=不健康)
app_health = Gauge('momo_app_health', '應用程式健康狀態', registry=registry)
# 資料庫連線狀態
db_status = Gauge('momo_database_up', '資料庫連線狀態', registry=registry)
try:
db = DatabaseManager()
with db.engine.connect() as conn:
conn.execute(text("SELECT 1"))
db_status.set(1)
app_health.set(1)
except Exception:
db_status.set(0)
app_health.set(0)
# 資料庫記錄數
try:
db = DatabaseManager()
session = db.get_session()
# 商品數量
product_count = Gauge('momo_products_total', '商品總數', registry=registry)
product_count.set(session.query(Product).count())
# 價格記錄數量
price_record_count = Gauge('momo_price_records_total', '價格記錄總數', registry=registry)
price_record_count.set(session.query(PriceRecord).count())
# 業績資料筆數
from database.realtime_sales_models import RealtimeSalesMonthly
sales_count = Gauge('momo_sales_records_total', '業績資料總數', registry=registry)
sales_count.set(session.query(RealtimeSalesMonthly).count())
session.close()
except Exception as e:
sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}")
# 返回 Prometheus 格式
from flask import Response
return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST)
except ImportError:
# prometheus_client 未安裝時的備用方案
metrics_text = """# HELP momo_app_health 應用程式健康狀態
# TYPE momo_app_health gauge
momo_app_health 1
# HELP momo_app_info 應用程式資訊
# TYPE momo_app_info gauge
momo_app_info{version="9.4",database_type="postgresql"} 1
"""
from flask import Response
return Response(metrics_text, mimetype='text/plain; charset=utf-8')
except Exception as e:
sys_log.error(f"[Metrics] 指標生成錯誤: {e}")
from flask import Response
return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500
@app.route('/settings')
def settings():
"""分類設定頁面"""
categories = load_categories()
return render_template('settings.html',
categories=categories,
public_url=public_url,
system_version=SYSTEM_VERSION)
@app.route('/system_settings')
def system_settings_page():
"""系統設定與匯入頁面"""
return render_template('system_settings.html', system_version=SYSTEM_VERSION)
@app.route('/abc_analysis/detail')
def abc_analysis_detail():
"""ABC 分析詳細報表頁面"""
try:
target_class = request.args.get('class', 'A') # 預設 A 類
table_name = 'realtime_sales_monthly'
# 1. 生成與主頁面一致的 cache_key
data_range_months = int(request.args.get('data_range', '0') or '0')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
if start_date or end_date:
cache_key = f"{table_name}_custom_{start_date}_{end_date}"
else:
cache_key = f"{table_name}_{data_range_months}m"
# 2. 使用共用篩選函式取得資料
target_df, cols_map, err = _get_filtered_sales_data(cache_key)
# V-Fix: 如果 cache_key 不存在,嘗試後補使用 table_name 固定鍵值
if err and table_name in _SALES_PROCESSED_CACHE:
target_df, cols_map, err = _get_filtered_sales_data(table_name)
if err:
# V-Fix: 如果自動重載也失敗,則顯示稍後再試,並引導回主頁面
return f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>數據加載中 - WOOO TECH</title>
<style>
body {{ font-family: sans-serif; display: flex; align-items: center; justify-content: center; height: 100vh; margin: 0; background: #f8f9fa; }}
.card {{ background: white; padding: 2rem; border-radius: 12px; box-shadow: 0 4px 20px rgba(0,0,0,0.08); text-align: center; }}
.spinner {{ border: 3px solid #f3f3f3; border-top: 3px solid #1e3c72; border-radius: 50%; width: 30px; height: 30px; animation: spin 1s linear infinite; margin: 0 auto 1rem; }}
@keyframes spin {{ 0% {{ transform: rotate(0deg); }} 100% {{ transform: rotate(360deg); }} }}
</style>
</head>
<body>
<div class="card">
<div class="spinner"></div>
<h3>數據準備中</h3>
<p>正在自動重新加載數據,請稍後...</p>
<script>
// 1.5 秒後嘗試重載當前頁面
setTimeout(function() {{
window.location.reload();
}}, 1500);
// 若重試 3 次仍失敗,引導回主頁
let retryCount = parseInt(sessionStorage.getItem('abc_retry') || '0');
if (retryCount > 3) {{
sessionStorage.removeItem('abc_retry');
alert('數據載入過久,請先在業績分析主頁重新整理。');
window.location.href = '/sales_analysis';
}} else {{
sessionStorage.setItem('abc_retry', retryCount + 1);
}}
</script>
</div>
</body>
</html>
''', 200
# 恢復欄位變數
col_name = cols_map.get('name')
col_amount = cols_map.get('amount')
col_qty = cols_map.get('qty')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_price = cols_map.get('price')
col_cost = cols_map.get('cost')
col_profit = cols_map.get('profit')
col_date = cols_map.get('date')
col_pid = cols_map.get('pid')
# 3. 執行 ABC 分類
items = []
total_revenue = 0
if col_amount and not target_df.empty:
# V-Fix: 先針對商品進行聚合,確保 ABC 分析是基於「商品總銷量」而非「單筆訂單」
agg_rules = {col_amount: 'sum'}
if col_qty: agg_rules[col_qty] = 'sum'
if col_cost: agg_rules[col_cost] = 'sum'
if col_profit: agg_rules[col_profit] = 'sum'
if col_category: agg_rules[col_category] = 'first'
if col_vendor: agg_rules[col_vendor] = 'first'
if col_brand: agg_rules[col_brand] = 'first' # V-New: 加入品牌
if col_pid: agg_rules[col_pid] = 'first' # V-New: 聚合商品ID
if col_date: agg_rules['_month_str'] = lambda x: ', '.join(sorted(x.dropna().unique()))
df_agg = target_df.groupby(col_name).agg(agg_rules).reset_index()
# 重新計算聚合後的毛利率
if col_profit:
df_agg['calculated_margin_rate'] = (df_agg[col_profit] / df_agg[col_amount]) * 100
elif col_cost:
df_agg['calculated_margin_rate'] = ((df_agg[col_amount] - df_agg[col_cost]) / df_agg[col_amount]) * 100
else:
df_agg['calculated_margin_rate'] = 0.0
df_agg['calculated_margin_rate'] = df_agg['calculated_margin_rate'].replace([np.inf, -np.inf, np.nan], 0)
# 執行 ABC 排序與計算
df_agg = df_agg.sort_values(by=col_amount, ascending=False)
df_agg['cumulative_revenue'] = df_agg[col_amount].cumsum()
total_revenue = df_agg[col_amount].sum()
df_agg['cumulative_pct'] = (df_agg['cumulative_revenue'] / total_revenue) * 100
conditions = [(df_agg['cumulative_pct'] <= 80), (df_agg['cumulative_pct'] <= 95)]
choices = ['A', 'B']
df_agg['ABC_Class'] = np.select(conditions, choices, default='C')
# 4. 篩選特定類別
class_df = df_agg[df_agg['ABC_Class'] == target_class].copy()
# V-New: 計算平均單價與庫存建議
if col_qty:
class_df['avg_unit_price'] = (class_df[col_amount] / class_df[col_qty]).fillna(0)
# V-New: 處理動態補貨係數
custom_factor = request.args.get('factor')
current_factor = 0.0
if custom_factor:
try:
current_factor = float(custom_factor)
except:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
else:
current_factor = 1.5 if target_class == 'A' else (1.2 if target_class == 'B' else 0.0)
class_df['suggested_restock'] = (class_df[col_qty] * current_factor).astype(int)
items = class_df.to_dict('records')
# 準備標題與描述
class_info = {
'A': {'title': 'A 類 - 核心商品', 'desc': '營收佔比前 80% 的主力商品,建議重點備貨與監控。', 'color': 'danger'},
'B': {'title': 'B 類 - 次要商品', 'desc': '營收佔比 80%~95% 的輔助商品,維持正常庫存。', 'color': 'warning'},
'C': {'title': 'C 類 - 長尾商品', 'desc': '營收佔比最後 5% 的長尾商品,建議評估清倉或縮減 SKU。', 'color': 'success'}
}
info = class_info.get(target_class, {'title': f'{target_class}', 'desc': '', 'color': 'secondary'})
# 計算 DataTables 預設排序欄位 (銷售金額) 的索引
# 欄位順序: Rank(0), [PID], Name, [Brand], [Vendor], [Cat], [Margin], [AvgPrice, Qty, Restock], Amount
sort_col_index = 1 # Rank
if col_pid: sort_col_index += 1
sort_col_index += 1 # Name
if col_brand: sort_col_index += 1
if col_vendor: sort_col_index += 1
if col_category: sort_col_index += 1
if col_cost or col_profit: sort_col_index += 1
if col_qty: sort_col_index += 3
# 此時 sort_col_index 即為 Amount 欄位的索引
return render_template('abc_analysis_detail.html',
items=items,
info=info,
target_class=target_class,
current_factor=current_factor, # V-New: 傳遞當前係數
total_revenue=total_revenue,
sort_col_index=sort_col_index, # V-New: 傳遞排序欄位索引
cols={'name': col_name, 'amount': col_amount, 'qty': col_qty, 'cat': col_category,
'vendor': col_vendor, 'brand': col_brand, 'cost': col_cost, 'profit': col_profit, 'date': col_date, 'pid': col_pid},
# 傳遞當前查詢參數以供匯出連結使用
query_string=request.query_string.decode())
except Exception as e:
sys_log.error(f"ABC Detail Error: {e}")
return f"系統錯誤: {e}"
@app.route('/logs')
def show_logs():
return render_template('logs.html')
@app.route('/api/logs')
def get_logs_api():
if os.path.exists(LOG_FILE_PATH):
try:
with open(LOG_FILE_PATH, 'r', encoding='utf-8') as f:
return jsonify({"logs": "".join(f.readlines()[-60:])})
except Exception as e:
sys_log.error(f"[Web] [Logs] ❌ 日誌 API 讀取異常 | Error: {e}")
return jsonify({"logs": "讀取日誌異常"})
return jsonify({"logs": "等待系統啟動中..."})
@app.route('/api/backup', methods=['POST'])
@login_required
def trigger_backup():
"""API: 觸發系統完整備份"""
# Note: [功能] 尚未實作「系統還原」功能 (Restore),需評估安全性後加入
try:
sys_log.info("[System] [Backup] 💾 開始執行系統完整備份...")
backup_dir = os.path.join(BASE_DIR, 'backups')
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
timestamp = datetime.now(TAIPEI_TZ).strftime('%Y%m%d_%H%M')
zip_filename = f"momo_system_backup_{SYSTEM_VERSION}_{timestamp}.zip"
zip_filepath = os.path.join(backup_dir, zip_filename)
with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(BASE_DIR):
# 排除不必要的目錄
dirs[:] = [d for d in dirs if d not in ['backups', '__pycache__', 'venv', '.git', '.idea', '.vscode', 'node_modules']]
for file in files:
if file == zip_filename: continue # 跳過正在寫入的檔案
if file.endswith('.pyc') or file.endswith('.DS_Store'): continue
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, BASE_DIR)
zipf.write(file_path, arcname)
sys_log.info(f"[System] [Backup] ✅ 系統備份完成 | File: {zip_filename}")
# V-New: 回傳下載連結
download_url = url_for('download_backup', filename=zip_filename)
return jsonify({
"status": "success",
"message": f"備份成功!\n檔案已儲存為: {zip_filename}\n即將開始下載...",
"download_url": download_url
})
except Exception as e:
sys_log.error(f"[System] [Backup] ❌ 備份失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/api/backup/download/<path:filename>')
@login_required
def download_backup(filename):
"""
API: 下載備份檔案(已加入路徑遍歷防護)
"""
try:
backup_dir = os.path.join(BASE_DIR, 'backups')
# 使用 safe_join 驗證路徑,防止路徑遍歷攻擊
safe_path = safe_join(backup_dir, filename)
# 確保檔案存在
if not safe_path.exists():
sys_log.warning(f"[Security] 備份檔案不存在 | File: {filename}")
return jsonify({'error': '檔案不存在'}), 404
# 確保是檔案而非目錄
if not safe_path.is_file():
sys_log.warning(f"[Security] 嘗試下載非檔案路徑 | Path: {filename}")
return jsonify({'error': '非法路徑'}), 400
return send_from_directory(backup_dir, safe_path.name, as_attachment=True)
except ValueError as e:
# safe_join 偵測到路徑遍歷嘗試
sys_log.error(f"[Security] 路徑遍歷攻擊嘗試被阻擋 | Filename: {filename} | Error: {e}")
return jsonify({'error': '非法路徑'}), 400
except Exception as e:
sys_log.error(f"[System] 下載備份失敗 | Error: {e}")
return jsonify({'error': '下載失敗'}), 500
# ================= 📊 V-New: 業績分析報表 =================
def _get_filtered_sales_data(cache_key):
"""
🚩 共用函式:從快取讀取資料並根據 request.args 進行篩選
回傳: (target_df, cols_map, error_message)
參數: cache_key - 快取鍵值 (例如: "realtime_sales_monthly_3m")
"""
db = DatabaseManager()
# 1. 檢查資料表與快取
df = None
cols_map = {}
if cache_key in _SALES_PROCESSED_CACHE:
cache_data = _SALES_PROCESSED_CACHE[cache_key]
df = cache_data['df']
cols_map = cache_data['cols']
else:
# 快取不存在時,直接回傳錯誤讓呼叫端顯示 spinner 導回 sales_analysis
# 不在此發起全表 DB 查詢748k 行會 hang Gunicorn worker
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),回傳錯誤讓 UI 導回 sales_analysis")
return None, {}, f"快取未就緒,請先從業績分析主頁載入資料 (cache_key={cache_key})"
if False: # 保留舊冷快取重載邏輯(已停用,避免全表掃描 hang
sys_log.warning(f"[Sales Analysis] ⚠️ 快取不存在 ({cache_key}),試圖重新從資料庫載入...")
try:
# V-Fix: 從 cache_key 提取 table_name
# 格式: realtime_sales_monthly_3m 或 realtime_sales_monthly_custom_2025-01-01_2025-01-31
if "_custom_" in cache_key:
table_name = cache_key.split('_custom_')[0] # realtime_sales_monthly
else:
# 移除最後的 _Xm 部分
parts = cache_key.rsplit('_', 1)
table_name = parts[0] if len(parts) > 1 else 'realtime_sales_monthly'
# 判斷是自訂區間還是標配區間
if "_custom_" in cache_key:
# 格式: realtime_sales_monthly_custom_2025-01-01_2025-01-31
parts = cache_key.split('_custom_')
dates = parts[1].split('_')
start_d, end_d = dates[0], dates[1]
# 呼叫資料庫讀取 (不傳入 view, 會自動處理欄位映射)
result_df, result_cols = db.get_sales_data(table_name=table_name, start_date=start_d, end_date=end_d)
else:
# 格式: realtime_sales_monthly_1mmonths=0 表示全時段但上限 12 個月避免全表掃描 hang
months = int(cache_key.split('_')[-1].replace('m', '') or '12')
if months == 0:
months = 12
result_df, result_cols = db.get_sales_data(table_name=table_name, months=months)
if result_df is not None and not result_df.empty:
# V-Fix (2026-01-23): 補回所有日期維度欄位供後續篩選 (_dow, _hour, _month_str)
if '日期' in result_df.columns:
# 先轉換為 datetime
result_df['_parsed_date'] = pd.to_datetime(result_df['日期'], errors='coerce')
result_df['_month_str'] = result_df['_parsed_date'].dt.strftime('%Y-%m')
result_df['_dow'] = result_df['_parsed_date'].dt.dayofweek
# 小時需要從「時間」欄位提取
if '時間' in result_df.columns:
result_df['_hour'] = pd.to_datetime(result_df['時間'], format='%H:%M:%S', errors='coerce').dt.hour
else:
result_df['_hour'] = 0 # 如果沒有時間欄位,預設為 0
# 清理臨時欄位
result_df.drop(columns=['_parsed_date'], inplace=True, errors='ignore')
# 自動存入快取
_SALES_PROCESSED_CACHE[cache_key] = {'df': result_df, 'cols': result_cols, 'time': time.time()}
df = result_df
cols_map = result_cols
sys_log.info(f"[Sales Analysis] ✅ 快取成功自動重載 | 筆數: {len(df)}")
else:
return None, None, "資料庫無可用資料,請確認匯入狀態"
except Exception as ex:
sys_log.error(f"[Sales Analysis] 🚨 自動重載失敗: {ex}")
return None, None, f"快取失效且無法重載: {ex}"
# 恢復欄位變數
col_name = cols_map.get('name')
col_category = cols_map.get('category')
col_brand = cols_map.get('brand')
col_vendor = cols_map.get('vendor')
col_activity = cols_map.get('activity')
col_payment = cols_map.get('payment')
col_price = cols_map.get('price')
col_date = cols_map.get('date')
col_return_qty = cols_map.get('return_qty') # V-New: 取得退貨欄位
# 2. 取得篩選參數
selected_category = request.args.get('category', 'all')
selected_brand = request.args.get('brand', 'all')
selected_vendor = request.args.get('vendor', 'all')
selected_activity = request.args.get('activity', 'all')
selected_payment = request.args.get('payment', 'all')
selected_dow = request.args.get('dow', 'all')
selected_hour = request.args.get('hour', 'all')
selected_month = request.args.get('month', 'all')
keyword = request.args.get('keyword', '').strip()
min_price = request.args.get('min_price', '')
max_price = request.args.get('max_price', '')
min_margin = request.args.get('min_margin', '')
max_margin = request.args.get('max_margin', '')
# 3. 執行篩選
target_df = df
# Top N 分類處理 (用於 '其他' 篩選)
TOP_N_CATS = 12
top_cats_names = []
if col_category:
# 注意:這裡為了效能,簡單重算一次 Top N或可考慮也快取起來
cat_group_all = df.groupby(col_category)[cols_map.get('amount')].sum().sort_values(ascending=False)
if len(cat_group_all) > TOP_N_CATS:
top_cats_names = cat_group_all.head(TOP_N_CATS).index.tolist()
if selected_category != 'all' and col_category:
if selected_category == '其他' and top_cats_names:
target_df = target_df[~target_df[col_category].isin(top_cats_names)]
else:
target_df = target_df[target_df[col_category] == selected_category]
if selected_brand != 'all' and col_brand: target_df = target_df[target_df[col_brand] == selected_brand]
if selected_vendor != 'all' and col_vendor: target_df = target_df[target_df[col_vendor] == selected_vendor]
if selected_activity != 'all' and col_activity: target_df = target_df[target_df[col_activity] == selected_activity]
if selected_payment != 'all' and col_payment: target_df = target_df[target_df[col_payment] == selected_payment]
if selected_dow != 'all' and col_date: target_df = target_df[target_df['_dow'] == int(selected_dow)]
if selected_hour != 'all' and col_date: target_df = target_df[target_df['_hour'] == int(selected_hour)]
if selected_month != 'all' and col_date: target_df = target_df[target_df['_month_str'] == selected_month]
if keyword: target_df = target_df[target_df[col_name].astype(str).str.contains(keyword, case=False, na=False)]
if col_price:
if min_price: target_df = target_df[target_df[col_price] >= float(min_price)]
if max_price: target_df = target_df[target_df[col_price] <= float(max_price)]
if min_margin: target_df = target_df[target_df['calculated_margin_rate'] >= float(min_margin)]
if max_margin: target_df = target_df[target_df['calculated_margin_rate'] <= float(max_margin)]
return target_df, cols_map, None
# V-Opt: API 層級快取 (減少重複查詢)
_TABLE_DATA_CACHE = {}
_TABLE_DATA_CACHE_TTL = 60 # 快取 60 秒
# V-Old: 保留舊版本以防需要回滾
# ================= 💎 V-New: Top 3 Highlights 詳細列表 API =================
# ================= 📈 V-New: 年度對比 (Year-over-Year Comparison) =================
# ================= 📈 V-New: 營運成長報表 (Growth Strategy) =================
def preprocess_daily_sales_data(df):
"""前處理當日業績資料:欄位識別、型別轉換"""
cols = df.columns.tolist()
# 欄位自動識別(使用現有的 find_col 函式)
col_amount = find_col(cols, ['銷售金額', '業績', '金額', 'Amount', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
# 型別轉換
if col_amount:
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
if col_cost:
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
if col_profit:
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
if col_qty:
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
# 日期轉換
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'], errors='coerce')
return df
def calculate_daily_kpis(df, date_str):
"""計算單日 6 個 KPI"""
day_df = df[df['snapshot_date'] == date_str]
cols = day_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
total_revenue = float(day_df[col_amount].sum()) if col_amount else 0
total_cost = float(day_df[col_cost].sum()) if col_cost else 0
gross_margin = float(day_df[col_profit].sum()) if col_profit else (total_revenue - total_cost)
total_qty = float(day_df[col_qty].sum()) if col_qty else 0
sku_count = int(day_df[col_name].nunique()) if col_name else 0
avg_price = total_revenue / total_qty if total_qty > 0 else 0
return {
'total_revenue': total_revenue,
'total_cost': total_cost,
'gross_margin': gross_margin,
'total_qty': total_qty,
'sku_count': sku_count,
'avg_price': avg_price
}
def calculate_dod(df, current_date):
"""計算 Day-over-Day 變化率"""
current = calculate_daily_kpis(df, current_date)
prev_date = current_date - timedelta(days=1)
if prev_date not in df['snapshot_date'].values:
return {k: 0.0 for k in current.keys()}
previous = calculate_daily_kpis(df, prev_date)
dod = {}
for key in current:
if previous[key] > 0:
dod[key] = ((current[key] - previous[key]) / previous[key]) * 100
else:
dod[key] = 0.0
return dod
def calculate_wow(df, current_date):
"""計算 Week-over-Week 變化率"""
current = calculate_daily_kpis(df, current_date)
prev_week_date = current_date - timedelta(days=7)
if prev_week_date not in df['snapshot_date'].values:
return {k: 0.0 for k in current.keys()}
previous = calculate_daily_kpis(df, prev_week_date)
wow = {}
for key in current:
if previous[key] > 0:
wow[key] = ((current[key] - previous[key]) / previous[key]) * 100
else:
wow[key] = 0.0
return wow
def prepare_daily_charts(df, selected_date, days=30):
"""準備 4 個圖表的數據(根據選擇的日期)"""
# 取選擇日期前 N 天的數據
start_date = selected_date - timedelta(days=days)
df_range = df[(df['snapshot_date'] >= start_date) & (df['snapshot_date'] <= selected_date)]
# 按日期聚合
cols = df_range.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', '總成本'])
col_profit = find_col(cols, ['毛利'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
# 日期聚合
agg_dict = {}
if col_amount:
agg_dict[col_amount] = 'sum'
if col_cost:
agg_dict[col_cost] = 'sum'
if col_profit:
agg_dict[col_profit] = 'sum'
if col_qty:
agg_dict[col_qty] = 'sum'
daily_agg = df_range.groupby('snapshot_date').agg(agg_dict).reset_index()
# 計算或取得毛利(如果沒有毛利欄位,用業績-成本計算)
if col_profit and col_profit in daily_agg.columns:
daily_agg['profit'] = daily_agg[col_profit]
elif col_amount and col_cost and col_amount in daily_agg.columns and col_cost in daily_agg.columns:
daily_agg['profit'] = daily_agg[col_amount] - daily_agg[col_cost]
else:
daily_agg['profit'] = 0
# 計算客單價
if col_amount and col_qty and col_amount in daily_agg.columns and col_qty in daily_agg.columns:
daily_agg['avg_price'] = (daily_agg[col_amount] / daily_agg[col_qty]).fillna(0)
else:
daily_agg['avg_price'] = 0
# 計算 DoD (Day-over-Day) 變化率 - 多個維度
if col_amount and col_amount in daily_agg.columns:
daily_agg['dod_revenue'] = daily_agg[col_amount].pct_change() * 100
if 'profit' in daily_agg.columns:
daily_agg['dod_profit'] = daily_agg['profit'].pct_change() * 100
if 'avg_price' in daily_agg.columns:
daily_agg['dod_avg_price'] = daily_agg['avg_price'].pct_change() * 100
if col_qty and col_qty in daily_agg.columns:
daily_agg['dod_qty'] = daily_agg[col_qty].pct_change() * 100
# 計算 WoW (Week-over-Week) 變化率 - 多個維度
if col_amount and col_amount in daily_agg.columns:
daily_agg['wow_revenue'] = daily_agg[col_amount].pct_change(periods=7) * 100
if 'profit' in daily_agg.columns:
daily_agg['wow_profit'] = daily_agg['profit'].pct_change(periods=7) * 100
if 'avg_price' in daily_agg.columns:
daily_agg['wow_avg_price'] = daily_agg['avg_price'].pct_change(periods=7) * 100
if col_qty and col_qty in daily_agg.columns:
daily_agg['wow_qty'] = daily_agg[col_qty].pct_change(periods=7) * 100
# Top 10 商品(選擇的日期,包含廠商)
selected_df = df[df['snapshot_date'] == selected_date]
top10_labels = []
top10_values = []
if col_name and col_amount:
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
if col_vendor:
# 如果有廠商欄位,按商品+廠商聚合
top10_df = selected_df.groupby([col_name, col_vendor])[col_amount].sum().nlargest(10).reset_index()
top10_labels = [f"{row[col_name]} ({row[col_vendor]})" for _, row in top10_df.iterrows()]
top10_values = top10_df[col_amount].tolist()
else:
# 沒有廠商欄位,只按商品聚合
top10 = selected_df.groupby(col_name)[col_amount].sum().nlargest(10)
top10_labels = top10.index.tolist()
top10_values = top10.values.tolist()
return {
'labels': daily_agg['snapshot_date'].dt.strftime('%m/%d').tolist() if not daily_agg.empty else [],
'revenue': daily_agg[col_amount].tolist() if col_amount and col_amount in daily_agg.columns and not daily_agg.empty else [],
'cost': daily_agg[col_cost].tolist() if col_cost and col_cost in daily_agg.columns and not daily_agg.empty else [],
'profit': daily_agg['profit'].tolist() if 'profit' in daily_agg.columns and not daily_agg.empty else [],
'qty': daily_agg[col_qty].tolist() if col_qty and col_qty in daily_agg.columns and not daily_agg.empty else [],
'avg_price': daily_agg['avg_price'].tolist() if 'avg_price' in daily_agg.columns and not daily_agg.empty else [],
# DoD 多維度
'dod_revenue': daily_agg['dod_revenue'].fillna(0).tolist() if 'dod_revenue' in daily_agg.columns and not daily_agg.empty else [],
'dod_profit': daily_agg['dod_profit'].fillna(0).tolist() if 'dod_profit' in daily_agg.columns and not daily_agg.empty else [],
'dod_avg_price': daily_agg['dod_avg_price'].fillna(0).tolist() if 'dod_avg_price' in daily_agg.columns and not daily_agg.empty else [],
'dod_qty': daily_agg['dod_qty'].fillna(0).tolist() if 'dod_qty' in daily_agg.columns and not daily_agg.empty else [],
# WoW 多維度
'wow_revenue': daily_agg['wow_revenue'].fillna(0).tolist() if 'wow_revenue' in daily_agg.columns and not daily_agg.empty else [],
'wow_profit': daily_agg['wow_profit'].fillna(0).tolist() if 'wow_profit' in daily_agg.columns and not daily_agg.empty else [],
'wow_avg_price': daily_agg['wow_avg_price'].fillna(0).tolist() if 'wow_avg_price' in daily_agg.columns and not daily_agg.empty else [],
'wow_qty': daily_agg['wow_qty'].fillna(0).tolist() if 'wow_qty' in daily_agg.columns and not daily_agg.empty else [],
'top10_labels': top10_labels,
'top10_values': top10_values
}
def prepare_category_summary(df, date_str=None, is_month_view=False, month_start=None, month_end=None):
"""準備分類聚合列表 (支援單日或月度範圍)"""
if is_month_view and month_start is not None and month_end is not None:
day_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
else:
day_df = df[df['snapshot_date'] == date_str]
cols = day_df.columns.tolist()
col_category = find_col(cols, ['館別', '分類', 'Category'])
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
col_amount = find_col(cols, ['銷售金額', '業績', '總業績'])
col_cost = find_col(cols, ['成本', '總成本'])
col_profit = find_col(cols, ['毛利'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
if not col_category or not col_amount:
return []
# 分類 + 廠商聚合
agg_dict = {col_amount: 'sum'}
if col_cost:
agg_dict[col_cost] = 'sum'
if col_profit:
agg_dict[col_profit] = 'sum'
if col_qty:
agg_dict[col_qty] = 'sum'
if col_name:
agg_dict[col_name] = 'nunique'
# 如果有廠商欄位,按分類+廠商聚合;否則只按分類聚合
if col_vendor:
category_df = day_df.groupby([col_category, col_vendor]).agg(agg_dict).reset_index()
else:
category_df = day_df.groupby(col_category).agg(agg_dict).reset_index()
# 計算毛利(如果資料中沒有毛利欄位,自動計算)
if col_profit and col_profit in category_df.columns:
# 資料中有毛利欄位,直接使用
pass
elif col_amount and col_cost and col_amount in category_df.columns and col_cost in category_df.columns:
# 資料中沒有毛利欄位,用 業績 - 成本 計算
category_df['profit_calculated'] = category_df[col_amount] - category_df[col_cost]
col_profit = 'profit_calculated'
else:
col_profit = None
# 計算毛利率
if col_profit and col_profit in category_df.columns and col_amount and col_amount in category_df.columns:
category_df['margin_rate'] = (category_df[col_profit] / category_df[col_amount] * 100).fillna(0)
else:
category_df['margin_rate'] = 0
# 計算均價
if col_qty and col_amount:
category_df['avg_price'] = (category_df[col_amount] / category_df[col_qty]).fillna(0)
else:
category_df['avg_price'] = 0
# 重新命名欄位以便模板使用
rename_dict = {col_category: 'category', col_amount: 'revenue'}
if col_vendor:
rename_dict[col_vendor] = 'vendor'
if col_cost:
rename_dict[col_cost] = 'cost'
if col_profit and col_profit in category_df.columns:
rename_dict[col_profit] = 'profit'
if col_qty:
rename_dict[col_qty] = 'qty'
if col_name:
rename_dict[col_name] = 'sku_count'
category_df = category_df.rename(columns=rename_dict)
# 確保 profit 欄位存在,如果不存在則設為 0
if 'profit' not in category_df.columns:
category_df['profit'] = 0
# 轉為字典列表
return category_df.to_dict('records')
# V-New 2026-01-15: 行銷活動業績聚合函數
def prepare_marketing_summary(df, selected_date=None, is_month_view=False, month_start=None, month_end=None, sort_by='revenue'):
"""
準備行銷活動業績貢獻數據
支援單日模式和月度模式,並可指定排序維度 (revenue, qty, profit)
"""
# 決定使用的數據範圍
if is_month_view and month_start is not None and month_end is not None:
target_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
elif selected_date is not None:
target_df = df[df['snapshot_date'] == selected_date]
else:
target_df = df
if target_df.empty:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
cols = target_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量', 'Qty'])
col_profit = find_col(cols, ['毛利', 'Profit', '利潤'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
if not col_amount:
return {'coupon': [], 'discount': [], 'bonus': [], 'click': []}
# 定義四種行銷活動欄位
marketing_cols = {
'coupon': '折價券活動名稱', # 折價券活動
'discount': '折扣活動名稱', # 折扣活動
'bonus': '滿額再折扣活動名稱', # 滿額再折扣
'click': '點我再折扣' # 點我再折扣
}
result = {}
# 確保 sort_by 欄位存在,否則退回 revenue
actual_sort_key = sort_by if sort_by in ['revenue', 'qty', 'profit'] else 'revenue'
for key, col_name in marketing_cols.items():
if col_name not in cols:
result[key] = []
continue
# 篩選有該行銷活動的記錄
activity_df = target_df[
(target_df[col_name].notna()) &
(target_df[col_name] != '') &
(target_df[col_name] != '0') &
(target_df[col_name] != 0)
]
if activity_df.empty:
result[key] = []
continue
# 聚合計算
agg_args = {
'revenue': (col_amount, 'sum'),
'order_count': (col_amount, 'count')
}
if col_qty: agg_args['qty'] = (col_qty, 'sum')
if col_profit: agg_args['profit'] = (col_profit, 'sum')
grouped = activity_df.groupby(col_name).agg(**agg_args).reset_index()
# 若需要手動計算毛利 (金額 - 成本)
if 'profit' not in agg_args and col_cost:
cost_agg = activity_df.groupby(col_name)[col_cost].sum().reset_index()
grouped = grouped.merge(cost_agg, on=col_name)
grouped['profit'] = grouped['revenue'] - grouped[col_cost]
grouped = grouped.rename(columns={col_name: 'name'})
# 動態排序
sort_col = actual_sort_key if actual_sort_key in grouped.columns else 'revenue'
grouped = grouped.sort_values(sort_col, ascending=False).head(15)
# 轉為字典列表
records = []
for _, row in grouped.iterrows():
record = {
'name': str(row['name'])[:50],
'revenue': float(row['revenue']),
'order_count': int(row['order_count'])
}
if 'qty' in row: record['qty'] = float(row['qty'])
if 'profit' in row: record['profit'] = float(row['profit'])
records.append(record)
result[key] = records
return result
def get_taiwan_holiday(date):
"""判斷是否為台灣國定假日,回傳 (is_holiday, holiday_name)"""
year = date.year
month = date.month
day = date.day
# 2026年台灣國定假日根據人事行政總處公佈
holidays_2026 = {
(1, 1): '元旦',
# 春節連假 (2/14-2/22共9天)
(2, 14): '春節連假',
(2, 15): '小年夜',
(2, 16): '除夕',
(2, 17): '春節 (初一)',
(2, 18): '春節 (初二)',
(2, 19): '春節 (初三)',
(2, 20): '春節連假',
(2, 21): '春節連假',
(2, 22): '春節連假',
# 和平紀念日 (2/28-3/2共3天)
(2, 28): '和平紀念日',
(3, 2): '和平紀念日補假',
# 兒童節+清明節 (4/3-4/6共4天)
(4, 3): '兒童節補假',
(4, 4): '清明節',
(4, 5): '清明節連假',
(4, 6): '清明節補假',
# 勞動節 (5/1-5/3共3天)
(5, 1): '勞動節',
# 端午節 (6/19-6/21共3天)
(6, 19): '端午節',
# 中秋節+教師節 (9/25-9/28共4天)
(9, 25): '中秋節',
(9, 28): '教師節',
# 國慶日 (10/9-10/11共3天)
(10, 9): '國慶日補假',
(10, 10): '國慶日',
# 光復節 (10/25-10/26共2天)
(10, 25): '臺灣光復節',
(10, 26): '光復節補假',
# 行憲紀念日 (12/25-12/27共3天)
(12, 25): '行憲紀念日',
}
# 2027年台灣國定假日預先計算部分
holidays_2027 = {
(1, 1): '元旦',
(2, 11): '春節 (除夕)',
(2, 12): '春節 (初一)',
(2, 13): '春節 (初二)',
(2, 14): '春節 (初三)',
(2, 15): '春節 (初四)',
(2, 16): '春節 (初五)',
(2, 17): '春節 (初六)',
(2, 28): '和平紀念日',
(4, 4): '清明節',
(4, 5): '清明節連假',
(6, 14): '端午節',
(9, 21): '中秋節',
(10, 10): '國慶日',
(10, 11): '國慶日連假',
}
holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {})
holiday_name = holidays.get((month, day))
return (True, holiday_name) if holiday_name else (False, None)
def prepare_calendar_data(df, selected_month):
"""準備行事曆數據豐富版顯示總業績、毛利、SKU數 + DoD%"""
import calendar
# 取得該月份的年月
year = selected_month.year
month = selected_month.month
# 計算該月第一天和最後一天
first_day = pd.Timestamp(year=year, month=month, day=1)
last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1])
# 計算行事曆顯示範圍(包含前後月份的日期以填滿週)
# 取得該月第一天是星期幾 (0=Monday, 6=Sunday)
first_weekday = first_day.weekday()
# 計算行事曆起始日(從週一開始)
calendar_start = first_day - timedelta(days=first_weekday)
# 計算該月最後一天是星期幾
last_weekday = last_day.weekday()
# 計算行事曆結束日(到週日結束)
calendar_end = last_day + timedelta(days=(6 - last_weekday))
# 取得該月份及前後各一天的所有資料(用於計算 DoD
data_start = first_day - timedelta(days=1)
data_end = last_day
month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)]
# 取得欄位
cols = df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
# 為每一天計算 KPI
calendar_days = []
current_date = calendar_start
while current_date <= calendar_end:
# 取得星期0=週一, 6=週日)
weekday = current_date.weekday()
weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
# 判斷是否為國定假日
is_holiday, holiday_name = get_taiwan_holiday(current_date)
day_data = {
'date': current_date.strftime('%Y-%m-%d'),
'day': current_date.day,
'weekday': weekday_names[weekday],
'is_weekend': weekday >= 5, # 週六或週日
'is_holiday': is_holiday,
'holiday_name': holiday_name,
'is_current_month': current_date.month == month,
'has_data': False,
'revenue': 0,
'profit': 0,
'margin_rate': 0,
'sku_count': 0,
'qty': 0,
'avg_price': 0,
'dod_percent': 0,
'dod_direction': 'neutral' # 'up', 'down', 'neutral'
}
# 如果該日期在當前月份範圍內,計算 KPI
if first_day <= current_date <= last_day:
day_df = month_df[month_df['snapshot_date'] == current_date]
if not day_df.empty:
day_data['has_data'] = True
# 計算總業績
if col_amount:
day_data['revenue'] = float(day_df[col_amount].sum())
# 計算毛利(優先使用毛利欄位,否則用業績-成本計算)
if col_profit:
day_data['profit'] = float(day_df[col_profit].sum())
elif col_cost and col_amount:
total_cost = float(day_df[col_cost].sum())
day_data['profit'] = day_data['revenue'] - total_cost
# 計算毛利率
if day_data['revenue'] > 0:
day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100
# 計算銷量
if col_qty:
day_data['qty'] = float(day_df[col_qty].sum())
# 計算客單價(總業績 / 總銷量)
if day_data['qty'] > 0:
day_data['avg_price'] = day_data['revenue'] / day_data['qty']
# 計算 SKU 數
if col_name:
day_data['sku_count'] = int(day_df[col_name].nunique())
# 計算 DoD%
prev_date = current_date - timedelta(days=1)
prev_df = month_df[month_df['snapshot_date'] == prev_date]
if not prev_df.empty and col_amount:
prev_revenue = float(prev_df[col_amount].sum())
if prev_revenue > 0:
dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100
day_data['dod_percent'] = round(dod, 1)
day_data['dod_direction'] = 'up' if dod >= 0 else 'down'
calendar_days.append(day_data)
current_date += timedelta(days=1)
# 組織成週結構(每週 7 天)
weeks = []
for i in range(0, len(calendar_days), 7):
weeks.append(calendar_days[i:i+7])
# 計算上個月和下個月的年月
prev_month = selected_month - pd.DateOffset(months=1)
next_month = selected_month + pd.DateOffset(months=1)
return {
'year': year,
'month': month,
'month_name': selected_month.strftime('%Y年%m月'),
'weeks': weeks,
'prev_month': prev_month.strftime('%Y-%m'),
'next_month': next_month.strftime('%Y-%m')
}
# ================= ⚙️ 5. 服務啟動邏輯 =================
def run_schedule():
"""在背景執行緒中運行排程"""
sys_log.info("🚀 排程服務已啟動,等待任務...")
while True:
schedule.run_pending()
time.sleep(1)
def init_scheduler():
"""初始化排程任務Gunicorn 模式下也會執行)"""
schedule.every(1).hours.do(run_momo_task)
schedule.every(1).hours.do(run_edm_task)
schedule.every(1).hours.do(run_festival_task)
sys_log.info(f"📅 已設定每小時執行主站、EDM與購物節爬蟲任務")
schedule.every(30).minutes.do(run_auto_import_task)
sys_log.info(f"📅 已設定每 30 分鐘執行 Google Drive 自動匯入任務")
schedule.every(30).minutes.do(run_whitepage_check)
sys_log.info(f"📅 已設定每 30 分鐘執行網頁白頁監控任務")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
sys_log.info(f"📅 已設定每 4 小時執行 PChome 競品價格抓取任務")
# 啟動排程執行緒
scheduler_thread = threading.Thread(target=run_schedule, daemon=True)
scheduler_thread.start()
sys_log.info("✅ 排程器已在背景執行緒中啟動")
# V-New: 在模組載入時自動初始化排程Gunicorn 模式下也會執行)
# 🚩 V-Fix 2026-01-14: 停用自動排程器以避免多個 gunicorn workers 重複執行任務
# 原因:每個 worker 都會啟動排程器,導致 4x 資源消耗4 workers × 3 爬蟲任務 = 12 Chrome 實例同時運行)
# 解決方案:改用獨立的 run_scheduler.py 或透過 Web UI 手動觸發任務
# try:
# init_scheduler()
# except Exception as e:
# sys_log.error(f"❌ 排程器初始化失敗: {e}")
sys_log.info(" 自動排程器已停用(避免重複執行),請使用 run_scheduler.py 或 Web UI 手動觸發")
def start_flask():
sys_log.info("🚀 Web 服務正在啟動於 port 80...")
app.run(host='0.0.0.0', port=80, use_reloader=False)
def scheduled_job_wrapper():
"""執行 MOMO 爬蟲任務並發送通知"""
timestamp = datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')
sys_log.info(f"⏰ [{timestamp}] 啟動背景抓取執行緒...")
def job():
# 1. 執行爬蟲
run_momo_task()
# 2. 發送通知 (僅發送今日異動)
try:
# 重新載入通知模組
import importlib
import scheduler
import services.notification_manager
importlib.reload(scheduler)
importlib.reload(services.notification_manager)
from services.notification_manager import NotificationManager
stats = get_dashboard_stats()
# 只要有任何異動數據就發送通知
if any(stats.values()):
screenshot_path = scheduler.capture_page_screenshot("http://127.0.0.1/", "momo_dashboard")
NotificationManager().send_momo_report(stats, screenshot_path)
except Exception as e:
sys_log.error(f"[Scheduler] ❌ 發送通知失敗: {e}")
threading.Thread(target=job, daemon=True).start()
if __name__ == "__main__":
banner = f" MOMO 專業數據管理系統 {SYSTEM_VERSION} "
sys_log.info(f"{ '='*20} {banner} {'='*20}")
# 啟動前先檢查資料庫結構
repair_database_schema()
# 使用生產環境域名
public_url = "https://mo.wooo.work"
sys_log.info(f"✅ 使用固定網址: {public_url}")
# 🚩 V9.7 將公開 URL 寫入設定檔,供其他模組使用
try:
url_config_path = os.path.join(BASE_DIR, 'data', 'url_config.json')
with open(url_config_path, 'w') as f:
json.dump({"public_url": public_url}, f)
except Exception as file_err:
sys_log.error(f"⚠️ URL 設定檔寫入失敗 (不影響服務運行,可能磁碟已滿): {file_err}")
web_server = threading.Thread(target=start_flask)
web_server.daemon = True
web_server.start()
# 排程器已在模組載入時自動初始化(見 init_scheduler() 函式)
sys_log.info(" 排程器已在全域範圍初始化完成")
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
sys_log.info("🔌 Web 服務已關閉")
try:
ngrok.disconnect(public_url)
except Exception as e:
sys_log.info(f" Ngrok 關閉時無需額外操作: {e}")