# ================= TODO LIST (待辦事項 - 重開機後請依序執行) ================= # 1. [驗證] 重啟 app.py 後,重新匯入 Excel,確認「自動去重」功能是否生效 (重複匯入應顯示 0 筆新增)。 # 2. [檢查] 前往 /sales_analysis 頁面,確認 '狀態' 欄位是否正確顯示 'F' (目前為原始匯入模式)。 # 3. [決策] 若資料顯示正常,評估是否需要恢復「智慧資料清理」邏輯 (目前程式碼第 1160 行左右已註解)。 # 4. [備份] 確認系統運作正常後,執行系統備份。 # ======================================================================= import os import sys import time import threading import math import json import hashlib import shutil import zipfile import re import io # V-New: 用於 Excel 匯出 import traceback # V-Fix: 用於錯誤追蹤 from datetime import datetime, timedelta, timezone # ================= 🔧 1. 環境與路徑鎖定 ================= BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 確保專案根目錄在 sys.path 的最前面,優先讀取本地模組 sys.path.insert(0, BASE_DIR) # 自動檢核並建立必要目錄 try: for folder in ['database', 'services', 'crawler', 'logs', 'data', 'web/templates', 'web/static']: folder_path = os.path.join(BASE_DIR, folder) if not os.path.exists(folder_path): os.makedirs(folder_path) # 僅針對 Python 套件目錄建立 __init__.py if 'web' not in folder: init_file = os.path.join(folder_path, '__init__.py') if not os.path.exists(init_file): with open(init_file, 'w') as f: pass except OSError as e: print(f"❌ 系統初始化失敗: 無法建立目錄或檔案 (磁碟可能已滿) - {e}") # ================= 🔧 2. 核心模組導入 ================= try: from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, send_from_directory, flash, session from werkzeug.utils import secure_filename from pyngrok import ngrok, conf import schedule from sqlalchemy import desc, and_, func, text, literal, case from sqlalchemy import inspect # V-New: 用於檢查資料表是否存在 from sqlalchemy.orm import joinedload import pandas as pd # type: ignore from pandas.api.types import is_numeric_dtype # type: ignore import numpy as np # type: ignore # V-Opt: 引入 numpy 進行向量化運算加速 # 導入自定義模組 try: from scheduler import run_momo_task, run_edm_task, run_festival_task, run_auto_import_task, run_whitepage_check, run_competitor_price_feeder_task from database.manager import DatabaseManager from database.models import Base, Product, PriceRecord, MonthlySummaryAnalysis from database.edm_models import PromoProduct except ImportError as e: print(f"❌ 專案內部檔案缺失: {e}\n請檢查 database/ 或 services/ 目錄下的 .py 檔案是否存在。") sys.exit(1) from services.logger_manager import SystemLogger from services.exporter import Exporter # 🚩 導入匯出模組 except ImportError as e: print(f"❌ 關鍵套件導入失敗: {e}") sys.exit(1) # ================= 🔧 3. 系統核心配置 ================= # 從 config.py 匯入必要的設定 from config import EXCEL_EXPORT_DIR, DATABASE_TYPE, validate_critical_config sys_log = SystemLogger("Web_Server").get_logger() # 驗證選用配置,缺少時輸出 warning(非 fatal) for _warn in validate_critical_config(): sys_log.warning(_warn) # 商品看板 cache 單一來源。實際路由已在 routes/dashboard_routes.py。 from services.cache_manager import _DASHBOARD_DATA_CACHE, _DASHBOARD_CACHE_TTL # noqa: E402 # 🚩 檢查磁碟空間 (V9.52 新增) try: total, used, free = shutil.disk_usage(BASE_DIR) if free < 200 * 1024 * 1024: # 小於 200MB sys_log.critical(f"[System] [DISK_CHECK] 🚨 嚴重警告: 磁碟空間極低 | Free: {free // (1024*1024)} MB") elif free < 1024 * 1024 * 1024: # 小於 1GB sys_log.warning(f"[System] [DISK_CHECK] ⚠️ 警告: 磁碟空間不足 1GB | Free: {free // (1024*1024)} MB") except Exception as e: sys_log.error(f"無法檢測磁碟空間: {e}") # 🚩 系統版本定義 (備份與顯示用) # 🚩 2026-04-30 V10.19: AI metrics zero-baseline export SYSTEM_VERSION = "V10.19" # ========================================== # 🔒 SQL Injection 防護函數 # ========================================== # 允許的資料表白名單 # 安全工具:實作已搬至 utils/security.py,此處 re-export 維持向後相容 from utils.security import ( # noqa: E402 ALLOWED_TABLES, validate_table_name, validate_column_names, ) # 安全工具:路徑遍歷 + 檔案上傳驗證 + safe_read_sql 已搬至 utils/security.py from utils.security import ( # noqa: E402 safe_read_sql, safe_join, ALLOWED_UPLOAD_EXTENSIONS, ALLOWED_MIME_TYPES, secure_filename_unicode, allowed_file, validate_upload_file, ) # 🚩 資料庫結構自動修復 (V9.53 新增) — 實作搬至 database/schema_repair.py from database.schema_repair import repair_database_schema # noqa: E402, F401 # 從環境變數讀取 NGROK_AUTH_TOKEN;未設定時禁止使用硬編碼預設值 NGROK_AUTH_TOKEN = os.getenv('NGROK_AUTH_TOKEN', '') if NGROK_AUTH_TOKEN: conf.get_default().auth_token = NGROK_AUTH_TOKEN else: sys_log.warning("[Security] ⚠️ NGROK_AUTH_TOKEN 未設定,已跳過 ngrok auth token 注入") TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates') STATIC_DIR = os.path.join(BASE_DIR, 'web/static') # 檢查關鍵模板是否存在 if not os.path.exists(os.path.join(TEMPLATE_DIR, 'dashboard.html')): sys_log.warning(f"[Web] [Template] ⚠️ 警告: 找不到 dashboard.html | Path: {TEMPLATE_DIR}") app = Flask(__name__, template_folder=TEMPLATE_DIR, static_folder=STATIC_DIR) # ========================================== # 🔒 Flask 安全配置 # ========================================== # 從 config.py 導入 SECRET_KEY from config import SECRET_KEY # 基本配置 app.config['SECRET_KEY'] = SECRET_KEY # Session 安全配置 app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止 JavaScript 存取 cookie(防 XSS) app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # 防止 CSRF 攻擊 app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24) # Session 有效期 24 小時(延長避免長時間閒置斷線) # 如果使用 HTTPS,啟用 SECURE cookie(本地開發時應設為 False) # 注意:如果您的系統部署在 HTTPS 環境,請將 .env 中的 USE_HTTPS 設為 true USE_HTTPS = os.getenv('USE_HTTPS', 'false').lower() == 'true' if USE_HTTPS: app.config['SESSION_COOKIE_SECURE'] = True sys_log.info("[Security] ✅ HTTPS 模式已啟用,Session cookie 僅透過 HTTPS 傳輸") else: app.config['SESSION_COOKIE_SECURE'] = False sys_log.warning("[Security] ⚠️ HTTP 模式(開發環境),Session cookie 未強制 HTTPS") # 檔案上傳大小限制(10MB) # V-New: 提高檔案上傳大小限制 (從 10MB 提高到 100MB) app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 sys_log.info("[Security] ✅ Flask 安全配置已載入") sys_log.info(f"[Security] • Session 有效期: 2 小時") sys_log.info(f"[Security] • 檔案上傳限制: 10 MB") sys_log.info(f"[Security] • CSRF 防護: SameSite=Lax") sys_log.info(f"[Security] • XSS 防護: HttpOnly=True") # ========================================== # 🔒 CSRF 防護配置 # ========================================== from flask_wtf.csrf import CSRFProtect csrf = CSRFProtect(app) sys_log.info("[Security] ✅ CSRF 防護已啟用 (Flask-WTF)") # ========================================== # 🔧 Blueprint 註冊 - 廠商缺貨系統 # ========================================== from routes.vendor_routes import vendor_bp app.register_blueprint(vendor_bp) sys_log.info("[Blueprint] ✅ 廠商缺貨系統 Blueprint 已註冊") # ========================================== # 🔧 Blueprint 註冊 - Google Drive 自動匯入 # ========================================== from routes.auto_import_routes import auto_import_bp app.register_blueprint(auto_import_bp) csrf.exempt(auto_import_bp) sys_log.info("[Blueprint] ✅ Google Drive 自動匯入 Blueprint 已註冊 (CSRF 已豁免)") # ========================================== # 🔧 Blueprint 註冊 - 爬蟲管理系統 # ========================================== from routes.crawler_management_routes import crawler_bp app.register_blueprint(crawler_bp) sys_log.info("[Blueprint] ✅ 爬蟲管理系統 Blueprint 已註冊") # ========================================== # 🔧 Blueprint 註冊 - AI 智慧文案系統 # ========================================== from routes.ai_routes import ai_bp app.register_blueprint(ai_bp) csrf.exempt(ai_bp) # ICAIM API 使用內部呼叫,不需要 CSRF sys_log.info("[Blueprint] ✅ AI 智慧文案系統 Blueprint 已註冊") # ========================================== # 🔧 Blueprint 註冊 - CI/CD Dashboard # ========================================== from routes.cicd_routes import cicd_bp app.register_blueprint(cicd_bp) csrf.exempt(cicd_bp) # CI/CD API doesn't need CSRF sys_log.info("[Blueprint] CI/CD Dashboard Blueprint registered") # ========================================== # 🔧 Blueprint 註冊 - Code Review 系統 # ========================================== try: from routes.code_review_routes import code_review_bp app.register_blueprint(code_review_bp) csrf.exempt(code_review_bp) # Code Review API 使用內部認證,不需要 CSRF sys_log.info("[Blueprint] ✅ Code Review 系統 Blueprint 已註冊 (CSRF 已豁免)") except Exception as _e: sys_log.warning(f"[Blueprint] ⚠️ Code Review 系統 Blueprint 註冊失敗: {_e}") # ========================================== # 🔧 Blueprint 註冊 - 趨勢資料系統 # ========================================== from routes.trend_routes import trend_bp app.register_blueprint(trend_bp) sys_log.info("[Blueprint] ✅ 趨勢資料系統 Blueprint 已註冊") # ========================================== # 🔒 Auth 路由註冊 - 登入/登出 # ========================================== from auth import init_auth_routes, login_required init_auth_routes(app) sys_log.info("[Auth] ✅ 登入/登出路由已註冊") # ========================================== # 🔧 Blueprint 註冊 - 用戶管理系統 # ========================================== from routes.user_routes import user_bp app.register_blueprint(user_bp) sys_log.info("[Blueprint] ✅ 用戶管理系統 Blueprint 已註冊") # ========================================== # 🚨 Blueprint 註冊 - 系統告警 # ========================================== from routes.alert_routes import alert_bp app.register_blueprint(alert_bp) csrf.exempt(alert_bp) sys_log.info("[Blueprint] ✅ 系統告警 Blueprint 已註冊 (CSRF 已豁免)") # ========================================== # 系統管理路由 Blueprint # ========================================== from routes.system_routes import system_bp app.register_blueprint(system_bp) csrf.exempt(system_bp) # n8n API 需要豁免 CSRF sys_log.info("[Blueprint] ✅ 系統管理 Blueprint 已註冊 (CSRF 已豁免)") from routes.system_public_routes import system_public_bp app.register_blueprint(system_public_bp) sys_log.info("[Blueprint] ✅ 公開系統頁面 Blueprint 已註冊") from routes.category_routes import category_bp app.register_blueprint(category_bp) sys_log.info("[Blueprint] ✅ 分類 CRUD Blueprint 已註冊") from routes.misc_routes import misc_bp app.register_blueprint(misc_bp) sys_log.info("[Blueprint] ✅ 雜項 Routes Blueprint 已註冊 (/api/test_url, /brand_assets)") # ========================================== # 通知模板管理 Blueprint # ========================================== from routes.notification_routes import notification_bp app.register_blueprint(notification_bp) csrf.exempt(notification_bp) # n8n API 需要豁免 CSRF sys_log.info("[Blueprint] ✅ 通知模板管理 Blueprint 已註冊") # ========================================== # Bot API Blueprint (Clawdbot 整合) # ========================================== from routes.bot_api_routes import bot_api_bp app.register_blueprint(bot_api_bp) csrf.exempt(bot_api_bp) # Bot API 使用 Token 認證,不需要 CSRF sys_log.info("[Blueprint] ✅ Bot API Blueprint 已註冊") # ========================================== # Elephant Alpha AI Agent Super Orchestrator Blueprint # ========================================== try: from routes.elephant_alpha_routes import elephant_alpha_bp app.register_blueprint(elephant_alpha_bp) csrf.exempt(elephant_alpha_bp) # Elephant Alpha API uses internal auth sys_log.info("[Blueprint] Elephant Alpha AI Agent Super Orchestrator Blueprint registered") except Exception as _e: sys_log.warning(f"[Blueprint] Elephant Alpha registration failed: {_e}") sys_log.info("[Blueprint] Elephant Alpha features will be unavailable") # [2026-04-18 台北] OpenClaw Bot Blueprint — 修復 /menu 啞巴 (/bot/telegram/webhook 404) # 原因:routes/openclaw_bot_routes.py 有 5000+ 行完整 telegram bot handler,但 app.py 從未 register # 效果:Telegram 送進來的 update (包含 /menu) 能被正確接收與處理 try: from routes.openclaw_bot_routes import openclaw_bot_bp app.register_blueprint(openclaw_bot_bp) csrf.exempt(openclaw_bot_bp) # Telegram webhook 不需要 CSRF sys_log.info("[Blueprint] ✅ OpenClaw Bot Blueprint 已註冊 (Telegram /menu 復活)") except Exception as _e: sys_log.error(f"[Blueprint] ❌ OpenClaw Bot Blueprint 註冊失敗: {_e}") from routes.api_routes import api_bp app.register_blueprint(api_bp) sys_log.info("[Blueprint] ✅ api_bp 已註冊") from routes.edm_routes import edm_bp app.register_blueprint(edm_bp) sys_log.info("[Blueprint] ✅ edm_bp 已註冊") from routes.sales_routes import sales_bp app.register_blueprint(sales_bp) sys_log.info("[Blueprint] ✅ sales_bp 已註冊") from routes.monthly_routes import monthly_bp app.register_blueprint(monthly_bp) sys_log.info("[Blueprint] ✅ monthly_bp 已註冊") from routes.price_comparison_routes import price_comparison_bp app.register_blueprint(price_comparison_bp) sys_log.info("[Blueprint] ✅ price_comparison_bp 已註冊") from routes.export_routes import export_bp app.register_blueprint(export_bp) sys_log.info("[Blueprint] ✅ export_bp 已註冊") from routes.daily_sales_routes import daily_sales_bp app.register_blueprint(daily_sales_bp) sys_log.info("[Blueprint] ✅ daily_sales_bp 已註冊") from routes.dashboard_routes import dashboard_bp app.register_blueprint(dashboard_bp) sys_log.info("[Blueprint] ✅ dashboard_bp 已註冊") from routes.import_routes import import_bp app.register_blueprint(import_bp) sys_log.info("[Blueprint] ✅ import_bp 已註冊") from routes.pchome_routes import pchome_bp app.register_blueprint(pchome_bp) sys_log.info("[Blueprint] ✅ pchome_bp 已註冊") # V-Fix: 註冊 slugify 函數供模板使用(實作搬至 utils/text_helpers.py) from utils.text_helpers import slugify # noqa: E402 LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = "服務啟動中..." # 🚩 時區設定:台北時間 (UTC+8) TAIPEI_TZ = timezone(timedelta(hours=8)) EXPECTED_METADATA_TABLES = { 'categories', 'products', 'price_records', 'monthly_summary_analysis', 'users', 'login_history', 'permissions', 'user_permissions', 'promo_products', 'trend_records', 'trend_keywords', 'trend_analysis', 'web_search_cache', 'telegram_users', 'ai_generation_history', 'ai_prompt_templates', 'ai_usage_tracking', 'ai_insights', 'agent_context', 'action_plans', 'action_outcomes', 'agent_strategy_weights', 'incidents', 'playbooks', 'heal_logs', 'import_jobs', 'import_config', 'notification_templates', 'ppt_reports', 'vendor_stockout', 'vendor_list', 'vendor_emails', 'email_send_log', 'realtime_sales_monthly', } def verify_metadata_tables(): missing = EXPECTED_METADATA_TABLES - set(Base.metadata.tables.keys()) if missing: raise SystemExit(f"Base.metadata 漏表: {sorted(missing)}") verify_metadata_tables() # ========================================== # 🔧 全域模板變數注入 (Context Processor) # ========================================== from config import METABASE_URL, GRIST_URL @app.context_processor def inject_global_vars(): """注入全域變數到所有模板""" return { 'metabase_url': METABASE_URL, 'grist_url': GRIST_URL, 'datetime_now': datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'), } sys_log.info("[Template] ✅ 全域模板變數已注入 (metabase_url, grist_url)") # ================= 🛠️ V9.72: 分類設定管理核心 ================= CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json') # JSON 持久化:實作搬至 services/json_storage.py from services.json_storage import ( # noqa: E402, F401 load_categories, save_categories, load_scheduler_stats, ) # ================= 🛠️ 數據處理核心 (封裝) ================= # 純工具:實作已搬至 utils/text_helpers.py from utils.text_helpers import ( # noqa: E402 get_color_for_string, extract_snapshot_date_from_filename, number_format as _number_format, ) @app.template_filter('number_format') def number_format_filter(value): """Jinja filter wrapper — 實作見 utils.text_helpers.number_format。""" return _number_format(value) # V-Refactor: 將 find_col 移至全域,方便多個函式共用 from utils.df_helpers import find_col # noqa: E402 def get_consolidated_data(): """🚩 統一封裝:獲取全分類去重後的當前數據、昨日對比及差值 (V-Opt: 優化查詢效能 + 快取)""" global _DASHBOARD_DATA_CACHE # V-New: 檢查快取是否有效 now = datetime.now(TAIPEI_TZ) if (_DASHBOARD_DATA_CACHE['consolidated_data'] is not None and _DASHBOARD_DATA_CACHE['consolidated_timestamp'] is not None): cache_age = (now.timestamp() - _DASHBOARD_DATA_CACHE['consolidated_timestamp']) if cache_age < _DASHBOARD_CACHE_TTL: sys_log.debug(f"[Dashboard] [Cache] ✅ 使用快取資料 | 快取年齡: {cache_age:.1f}秒") return _DASHBOARD_DATA_CACHE['consolidated_data'], _DASHBOARD_DATA_CACHE['today_start'] sys_log.debug("[Dashboard] [Cache] 🔄 快取過期或不存在,重新查詢資料庫") db = DatabaseManager() session = db.get_session() today_start = now.replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None) seven_days_ago = today_start - timedelta(days=7) thirty_days_ago = today_start - timedelta(days=30) try: # Query 1: Get the latest price record for every product. This is our main list of items. latest_price_subq = session.query( func.max(PriceRecord.id).label('max_id') ).group_by(PriceRecord.product_id).subquery() latest_records = session.query(PriceRecord).options( joinedload(PriceRecord.product) ).join(latest_price_subq, PriceRecord.id == latest_price_subq.c.max_id).all() product_ids = [r.product_id for r in latest_records] if not product_ids: session.close() # 提前關閉連線 return [], today_start # Query 2: Get yesterday's closing prices for all products in one go yesterday_prices_subq = session.query( PriceRecord.product_id, func.max(PriceRecord.id).label('max_id') ).filter( PriceRecord.product_id.in_(product_ids), PriceRecord.timestamp < today_start ).group_by(PriceRecord.product_id).subquery() yesterday_prices_q = session.query( PriceRecord.product_id, PriceRecord.price ).join( yesterday_prices_subq, PriceRecord.id == yesterday_prices_subq.c.max_id ) yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q} # Query 3: Get specific historical price points (7 days ago and 30 days ago) # Instead of fetching ALL history, we fetch only the records closest to the target dates. # This is a significant optimization. # Helper to get price map for a specific date (start of day) def get_price_map_before(target_date): subq = session.query( PriceRecord.product_id, func.max(PriceRecord.timestamp).label('max_ts') ).filter( PriceRecord.product_id.in_(product_ids), PriceRecord.timestamp < target_date ).group_by(PriceRecord.product_id).subquery() q = session.query(PriceRecord.product_id, PriceRecord.price).join( subq, and_(PriceRecord.product_id == subq.c.product_id, PriceRecord.timestamp == subq.c.max_ts) ) return {pid: price for pid, price in q} prices_7d_ago_map = get_price_map_before(seven_days_ago + timedelta(days=1)) # Approximate 7 days ago prices_30d_ago_map = get_price_map_before(thirty_days_ago + timedelta(days=1)) # Approximate 30 days ago # Query 4: Get TODAY's records only (for sparkline/intraday change) today_records_q = session.query(PriceRecord).filter( PriceRecord.product_id.in_(product_ids), PriceRecord.timestamp >= today_start ).order_by(PriceRecord.product_id, PriceRecord.timestamp).all() today_map = {} for r in today_records_q: if r.product_id not in today_map: today_map[r.product_id] = [] today_map[r.product_id].append(r) # Final Assembly (in-memory, no more DB queries) unique_items = [] for r in latest_records: pid = r.product_id # 7d/30d stats price_7d = prices_7d_ago_map.get(pid) price_30d = prices_30d_ago_map.get(pid) stats_7d_diff = r.price - price_7d if price_7d is not None else 0 stats_30d_diff = r.price - price_30d if price_30d is not None else 0 # Today's stats today_records = today_map.get(pid, []) today_diff = 0 today_changes = [] if len(today_records) > 1: today_diff = today_records[-1].price - today_records[0].price # Yesterday diff y_price = yesterday_prices_map.get(pid) yesterday_diff = r.price - y_price if y_price is not None else 0 status = "NONE" if yesterday_diff > 0: status = "PRICE_UP" elif yesterday_diff < 0: status = "PRICE_DOWN" # Today's changes details last_p = y_price if y_price is not None else (today_records[0].price if today_records else r.price) for tr in today_records: if tr.price != last_p: diff = tr.price - last_p today_changes.append({ 'time': tr.timestamp.strftime('%H:%M'), 'price': tr.price, 'diff': diff }) last_p = tr.price unique_items.append({ 'record': r, 'stats': {'7d_diff': stats_7d_diff, '30d_diff': stats_30d_diff, '1d_diff': today_diff}, 'yesterday_diff': yesterday_diff, 'today_changes': today_changes, 'status': status }) # V-New: 更新快取 _DASHBOARD_DATA_CACHE['consolidated_data'] = unique_items _DASHBOARD_DATA_CACHE['consolidated_timestamp'] = now.timestamp() _DASHBOARD_DATA_CACHE['today_start'] = today_start sys_log.debug(f"[Dashboard] [Cache] 💾 快取已更新 | 商品數: {len(unique_items)}") return unique_items, today_start finally: session.close() def get_dashboard_stats(): """計算看板統計數據 (供通知使用) — backward-compat wrapper.""" from services.dashboard_service import get_dashboard_stats as _get_dashboard_stats return _get_dashboard_stats() # ================= 🛣️ 4. Flask 路由 ================= # Session 自動續期機制 @app.before_request def refresh_session(): """ 在每次請求時自動刷新 Session,避免長時間閒置後突然斷線 只要用戶有任何操作,Session 就會自動延長 """ if session.get('logged_in'): session.modified = True # 標記 Session 已修改,觸發 Cookie 更新 def verify_unique_routes(): """啟動期防線:同一 URL + method 不得由兩個 endpoint 同時註冊。""" seen = {} for rule in app.url_map.iter_rules(): key = (str(rule), frozenset(rule.methods - {'HEAD', 'OPTIONS'})) if key in seen: raise SystemExit(f"重複路由: {key} 來自 {seen[key]} 與 {rule.endpoint}") seen[key] = rule.endpoint verify_unique_routes() def preprocess_daily_sales_data(df): """前處理當日業績資料:欄位識別、型別轉換""" cols = df.columns.tolist() # 欄位自動識別(使用現有的 find_col 函式) col_amount = find_col(cols, ['銷售金額', '業績', '金額', 'Amount', '總業績']) col_cost = find_col(cols, ['成本', 'Cost', '總成本']) col_profit = find_col(cols, ['毛利', 'Profit']) col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量']) # 型別轉換 if col_amount: df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0) if col_cost: df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0) if col_profit: df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0) if col_qty: df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0) # 日期轉換 df['snapshot_date'] = pd.to_datetime(df['snapshot_date'], errors='coerce') return df def calculate_daily_kpis(df, date_str): """計算單日 6 個 KPI""" day_df = df[df['snapshot_date'] == date_str] cols = day_df.columns.tolist() col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) col_cost = find_col(cols, ['成本', 'Cost', '總成本']) col_profit = find_col(cols, ['毛利', 'Profit']) col_qty = find_col(cols, ['銷售數量', '銷量', '數量']) col_name = find_col(cols, ['商品名稱', '品名', 'Name']) total_revenue = float(day_df[col_amount].sum()) if col_amount else 0 total_cost = float(day_df[col_cost].sum()) if col_cost else 0 gross_margin = float(day_df[col_profit].sum()) if col_profit else (total_revenue - total_cost) total_qty = float(day_df[col_qty].sum()) if col_qty else 0 sku_count = int(day_df[col_name].nunique()) if col_name else 0 avg_price = total_revenue / total_qty if total_qty > 0 else 0 return { 'total_revenue': total_revenue, 'total_cost': total_cost, 'gross_margin': gross_margin, 'total_qty': total_qty, 'sku_count': sku_count, 'avg_price': avg_price } def calculate_dod(df, current_date): """計算 Day-over-Day 變化率""" current = calculate_daily_kpis(df, current_date) prev_date = current_date - timedelta(days=1) if prev_date not in df['snapshot_date'].values: return {k: 0.0 for k in current.keys()} previous = calculate_daily_kpis(df, prev_date) dod = {} for key in current: if previous[key] > 0: dod[key] = ((current[key] - previous[key]) / previous[key]) * 100 else: dod[key] = 0.0 return dod def calculate_wow(df, current_date): """計算 Week-over-Week 變化率""" current = calculate_daily_kpis(df, current_date) prev_week_date = current_date - timedelta(days=7) if prev_week_date not in df['snapshot_date'].values: return {k: 0.0 for k in current.keys()} previous = calculate_daily_kpis(df, prev_week_date) wow = {} for key in current: if previous[key] > 0: wow[key] = ((current[key] - previous[key]) / previous[key]) * 100 else: wow[key] = 0.0 return wow def prepare_daily_charts(df, selected_date, days=30): """準備 4 個圖表的數據(根據選擇的日期)""" # 取選擇日期前 N 天的數據 start_date = selected_date - timedelta(days=days) df_range = df[(df['snapshot_date'] >= start_date) & (df['snapshot_date'] <= selected_date)] # 按日期聚合 cols = df_range.columns.tolist() col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) col_cost = find_col(cols, ['成本', '總成本']) col_profit = find_col(cols, ['毛利']) col_qty = find_col(cols, ['銷售數量', '銷量', '數量']) col_name = find_col(cols, ['商品名稱', '品名']) # 日期聚合 agg_dict = {} if col_amount: agg_dict[col_amount] = 'sum' if col_cost: agg_dict[col_cost] = 'sum' if col_profit: agg_dict[col_profit] = 'sum' if col_qty: agg_dict[col_qty] = 'sum' daily_agg = df_range.groupby('snapshot_date').agg(agg_dict).reset_index() # 計算或取得毛利(如果沒有毛利欄位,用業績-成本計算) if col_profit and col_profit in daily_agg.columns: daily_agg['profit'] = daily_agg[col_profit] elif col_amount and col_cost and col_amount in daily_agg.columns and col_cost in daily_agg.columns: daily_agg['profit'] = daily_agg[col_amount] - daily_agg[col_cost] else: daily_agg['profit'] = 0 # 計算客單價 if col_amount and col_qty and col_amount in daily_agg.columns and col_qty in daily_agg.columns: daily_agg['avg_price'] = (daily_agg[col_amount] / daily_agg[col_qty]).fillna(0) else: daily_agg['avg_price'] = 0 # 計算 DoD (Day-over-Day) 變化率 - 多個維度 if col_amount and col_amount in daily_agg.columns: daily_agg['dod_revenue'] = daily_agg[col_amount].pct_change() * 100 if 'profit' in daily_agg.columns: daily_agg['dod_profit'] = daily_agg['profit'].pct_change() * 100 if 'avg_price' in daily_agg.columns: daily_agg['dod_avg_price'] = daily_agg['avg_price'].pct_change() * 100 if col_qty and col_qty in daily_agg.columns: daily_agg['dod_qty'] = daily_agg[col_qty].pct_change() * 100 # 計算 WoW (Week-over-Week) 變化率 - 多個維度 if col_amount and col_amount in daily_agg.columns: daily_agg['wow_revenue'] = daily_agg[col_amount].pct_change(periods=7) * 100 if 'profit' in daily_agg.columns: daily_agg['wow_profit'] = daily_agg['profit'].pct_change(periods=7) * 100 if 'avg_price' in daily_agg.columns: daily_agg['wow_avg_price'] = daily_agg['avg_price'].pct_change(periods=7) * 100 if col_qty and col_qty in daily_agg.columns: daily_agg['wow_qty'] = daily_agg[col_qty].pct_change(periods=7) * 100 # Top 10 商品(選擇的日期,包含廠商) selected_df = df[df['snapshot_date'] == selected_date] top10_labels = [] top10_values = [] if col_name and col_amount: col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier']) if col_vendor: # 如果有廠商欄位,按商品+廠商聚合 top10_df = selected_df.groupby([col_name, col_vendor])[col_amount].sum().nlargest(10).reset_index() top10_labels = [f"{row[col_name]} ({row[col_vendor]})" for _, row in top10_df.iterrows()] top10_values = top10_df[col_amount].tolist() else: # 沒有廠商欄位,只按商品聚合 top10 = selected_df.groupby(col_name)[col_amount].sum().nlargest(10) top10_labels = top10.index.tolist() top10_values = top10.values.tolist() return { 'labels': daily_agg['snapshot_date'].dt.strftime('%m/%d').tolist() if not daily_agg.empty else [], 'revenue': daily_agg[col_amount].tolist() if col_amount and col_amount in daily_agg.columns and not daily_agg.empty else [], 'cost': daily_agg[col_cost].tolist() if col_cost and col_cost in daily_agg.columns and not daily_agg.empty else [], 'profit': daily_agg['profit'].tolist() if 'profit' in daily_agg.columns and not daily_agg.empty else [], 'qty': daily_agg[col_qty].tolist() if col_qty and col_qty in daily_agg.columns and not daily_agg.empty else [], 'avg_price': daily_agg['avg_price'].tolist() if 'avg_price' in daily_agg.columns and not daily_agg.empty else [], # DoD 多維度 'dod_revenue': daily_agg['dod_revenue'].fillna(0).tolist() if 'dod_revenue' in daily_agg.columns and not daily_agg.empty else [], 'dod_profit': daily_agg['dod_profit'].fillna(0).tolist() if 'dod_profit' in daily_agg.columns and not daily_agg.empty else [], 'dod_avg_price': daily_agg['dod_avg_price'].fillna(0).tolist() if 'dod_avg_price' in daily_agg.columns and not daily_agg.empty else [], 'dod_qty': daily_agg['dod_qty'].fillna(0).tolist() if 'dod_qty' in daily_agg.columns and not daily_agg.empty else [], # WoW 多維度 'wow_revenue': daily_agg['wow_revenue'].fillna(0).tolist() if 'wow_revenue' in daily_agg.columns and not daily_agg.empty else [], 'wow_profit': daily_agg['wow_profit'].fillna(0).tolist() if 'wow_profit' in daily_agg.columns and not daily_agg.empty else [], 'wow_avg_price': daily_agg['wow_avg_price'].fillna(0).tolist() if 'wow_avg_price' in daily_agg.columns and not daily_agg.empty else [], 'wow_qty': daily_agg['wow_qty'].fillna(0).tolist() if 'wow_qty' in daily_agg.columns and not daily_agg.empty else [], 'top10_labels': top10_labels, 'top10_values': top10_values } def prepare_category_summary(df, date_str=None, is_month_view=False, month_start=None, month_end=None): """準備分類聚合列表 (支援單日或月度範圍)""" if is_month_view and month_start is not None and month_end is not None: day_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)] else: day_df = df[df['snapshot_date'] == date_str] cols = day_df.columns.tolist() col_category = find_col(cols, ['館別', '分類', 'Category']) col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier']) col_amount = find_col(cols, ['銷售金額', '業績', '總業績']) col_cost = find_col(cols, ['成本', '總成本']) col_profit = find_col(cols, ['毛利']) col_qty = find_col(cols, ['銷售數量', '銷量', '數量']) col_name = find_col(cols, ['商品名稱', '品名']) if not col_category or not col_amount: return [] # 分類 + 廠商聚合 agg_dict = {col_amount: 'sum'} if col_cost: agg_dict[col_cost] = 'sum' if col_profit: agg_dict[col_profit] = 'sum' if col_qty: agg_dict[col_qty] = 'sum' if col_name: agg_dict[col_name] = 'nunique' # 如果有廠商欄位,按分類+廠商聚合;否則只按分類聚合 if col_vendor: category_df = day_df.groupby([col_category, col_vendor]).agg(agg_dict).reset_index() else: category_df = day_df.groupby(col_category).agg(agg_dict).reset_index() # 計算毛利(如果資料中沒有毛利欄位,自動計算) if col_profit and col_profit in category_df.columns: # 資料中有毛利欄位,直接使用 pass elif col_amount and col_cost and col_amount in category_df.columns and col_cost in category_df.columns: # 資料中沒有毛利欄位,用 業績 - 成本 計算 category_df['profit_calculated'] = category_df[col_amount] - category_df[col_cost] col_profit = 'profit_calculated' else: col_profit = None # 計算毛利率 if col_profit and col_profit in category_df.columns and col_amount and col_amount in category_df.columns: category_df['margin_rate'] = (category_df[col_profit] / category_df[col_amount] * 100).fillna(0) else: category_df['margin_rate'] = 0 # 計算均價 if col_qty and col_amount: category_df['avg_price'] = (category_df[col_amount] / category_df[col_qty]).fillna(0) else: category_df['avg_price'] = 0 # 重新命名欄位以便模板使用 rename_dict = {col_category: 'category', col_amount: 'revenue'} if col_vendor: rename_dict[col_vendor] = 'vendor' if col_cost: rename_dict[col_cost] = 'cost' if col_profit and col_profit in category_df.columns: rename_dict[col_profit] = 'profit' if col_qty: rename_dict[col_qty] = 'qty' if col_name: rename_dict[col_name] = 'sku_count' category_df = category_df.rename(columns=rename_dict) # 確保 profit 欄位存在,如果不存在則設為 0 if 'profit' not in category_df.columns: category_df['profit'] = 0 # 轉為字典列表 return category_df.to_dict('records') # V-New 2026-01-15: 行銷活動業績聚合函數 def get_taiwan_holiday(date): """判斷是否為台灣國定假日,回傳 (is_holiday, holiday_name)""" year = date.year month = date.month day = date.day # 2026年台灣國定假日(根據人事行政總處公佈) holidays_2026 = { (1, 1): '元旦', # 春節連假 (2/14-2/22,共9天) (2, 14): '春節連假', (2, 15): '小年夜', (2, 16): '除夕', (2, 17): '春節 (初一)', (2, 18): '春節 (初二)', (2, 19): '春節 (初三)', (2, 20): '春節連假', (2, 21): '春節連假', (2, 22): '春節連假', # 和平紀念日 (2/28-3/2,共3天) (2, 28): '和平紀念日', (3, 2): '和平紀念日補假', # 兒童節+清明節 (4/3-4/6,共4天) (4, 3): '兒童節補假', (4, 4): '清明節', (4, 5): '清明節連假', (4, 6): '清明節補假', # 勞動節 (5/1-5/3,共3天) (5, 1): '勞動節', # 端午節 (6/19-6/21,共3天) (6, 19): '端午節', # 中秋節+教師節 (9/25-9/28,共4天) (9, 25): '中秋節', (9, 28): '教師節', # 國慶日 (10/9-10/11,共3天) (10, 9): '國慶日補假', (10, 10): '國慶日', # 光復節 (10/25-10/26,共2天) (10, 25): '臺灣光復節', (10, 26): '光復節補假', # 行憲紀念日 (12/25-12/27,共3天) (12, 25): '行憲紀念日', } # 2027年台灣國定假日(預先計算部分) holidays_2027 = { (1, 1): '元旦', (2, 11): '春節 (除夕)', (2, 12): '春節 (初一)', (2, 13): '春節 (初二)', (2, 14): '春節 (初三)', (2, 15): '春節 (初四)', (2, 16): '春節 (初五)', (2, 17): '春節 (初六)', (2, 28): '和平紀念日', (4, 4): '清明節', (4, 5): '清明節連假', (6, 14): '端午節', (9, 21): '中秋節', (10, 10): '國慶日', (10, 11): '國慶日連假', } holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {}) holiday_name = holidays.get((month, day)) return (True, holiday_name) if holiday_name else (False, None) def prepare_calendar_data(df, selected_month): """準備行事曆數據(豐富版:顯示總業績、毛利、SKU數 + DoD%)""" import calendar # 取得該月份的年月 year = selected_month.year month = selected_month.month # 計算該月第一天和最後一天 first_day = pd.Timestamp(year=year, month=month, day=1) last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1]) # 計算行事曆顯示範圍(包含前後月份的日期以填滿週) # 取得該月第一天是星期幾 (0=Monday, 6=Sunday) first_weekday = first_day.weekday() # 計算行事曆起始日(從週一開始) calendar_start = first_day - timedelta(days=first_weekday) # 計算該月最後一天是星期幾 last_weekday = last_day.weekday() # 計算行事曆結束日(到週日結束) calendar_end = last_day + timedelta(days=(6 - last_weekday)) # 取得該月份及前後各一天的所有資料(用於計算 DoD) data_start = first_day - timedelta(days=1) data_end = last_day month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)] # 取得欄位 cols = df.columns.tolist() col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績']) col_cost = find_col(cols, ['成本', 'Cost']) col_profit = find_col(cols, ['毛利', 'Profit']) col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量']) col_name = find_col(cols, ['商品名稱', '品名']) # 為每一天計算 KPI calendar_days = [] current_date = calendar_start while current_date <= calendar_end: # 取得星期(0=週一, 6=週日) weekday = current_date.weekday() weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'] # 判斷是否為國定假日 is_holiday, holiday_name = get_taiwan_holiday(current_date) day_data = { 'date': current_date.strftime('%Y-%m-%d'), 'day': current_date.day, 'weekday': weekday_names[weekday], 'is_weekend': weekday >= 5, # 週六或週日 'is_holiday': is_holiday, 'holiday_name': holiday_name, 'is_current_month': current_date.month == month, 'has_data': False, 'revenue': 0, 'profit': 0, 'margin_rate': 0, 'sku_count': 0, 'qty': 0, 'avg_price': 0, 'dod_percent': 0, 'dod_direction': 'neutral' # 'up', 'down', 'neutral' } # 如果該日期在當前月份範圍內,計算 KPI if first_day <= current_date <= last_day: day_df = month_df[month_df['snapshot_date'] == current_date] if not day_df.empty: day_data['has_data'] = True # 計算總業績 if col_amount: day_data['revenue'] = float(day_df[col_amount].sum()) # 計算毛利(優先使用毛利欄位,否則用業績-成本計算) if col_profit: day_data['profit'] = float(day_df[col_profit].sum()) elif col_cost and col_amount: total_cost = float(day_df[col_cost].sum()) day_data['profit'] = day_data['revenue'] - total_cost # 計算毛利率 if day_data['revenue'] > 0: day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100 # 計算銷量 if col_qty: day_data['qty'] = float(day_df[col_qty].sum()) # 計算客單價(總業績 / 總銷量) if day_data['qty'] > 0: day_data['avg_price'] = day_data['revenue'] / day_data['qty'] # 計算 SKU 數 if col_name: day_data['sku_count'] = int(day_df[col_name].nunique()) # 計算 DoD% prev_date = current_date - timedelta(days=1) prev_df = month_df[month_df['snapshot_date'] == prev_date] if not prev_df.empty and col_amount: prev_revenue = float(prev_df[col_amount].sum()) if prev_revenue > 0: dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100 day_data['dod_percent'] = round(dod, 1) day_data['dod_direction'] = 'up' if dod >= 0 else 'down' calendar_days.append(day_data) current_date += timedelta(days=1) # 組織成週結構(每週 7 天) weeks = [] for i in range(0, len(calendar_days), 7): weeks.append(calendar_days[i:i+7]) # 計算上個月和下個月的年月 prev_month = selected_month - pd.DateOffset(months=1) next_month = selected_month + pd.DateOffset(months=1) return { 'year': year, 'month': month, 'month_name': selected_month.strftime('%Y年%m月'), 'weeks': weeks, 'prev_month': prev_month.strftime('%Y-%m'), 'next_month': next_month.strftime('%Y-%m') } # ================= ⚙️ 5. 服務啟動邏輯 ================= def run_schedule(): """在背景執行緒中運行排程""" sys_log.info("🚀 排程服務已啟動,等待任務...") while True: schedule.run_pending() time.sleep(1) def init_scheduler(): """初始化排程任務(Gunicorn 模式下也會執行)""" schedule.every(1).hours.do(run_momo_task) schedule.every(1).hours.do(run_edm_task) schedule.every(1).hours.do(run_festival_task) sys_log.info(f"📅 已設定每小時執行主站、EDM與購物節爬蟲任務") schedule.every(30).minutes.do(run_auto_import_task) sys_log.info(f"📅 已設定每 30 分鐘執行 Google Drive 自動匯入任務") schedule.every(30).minutes.do(run_whitepage_check) sys_log.info(f"📅 已設定每 30 分鐘執行網頁白頁監控任務") schedule.every(4).hours.do(run_competitor_price_feeder_task) sys_log.info(f"📅 已設定每 4 小時執行 PChome 競品價格抓取任務") # 啟動排程執行緒 scheduler_thread = threading.Thread(target=run_schedule, daemon=True) scheduler_thread.start() sys_log.info("✅ 排程器已在背景執行緒中啟動") # V-New: 在模組載入時自動初始化排程(Gunicorn 模式下也會執行) # 🚩 V-Fix 2026-01-14: 停用自動排程器以避免多個 gunicorn workers 重複執行任務 # 原因:每個 worker 都會啟動排程器,導致 4x 資源消耗(4 workers × 3 爬蟲任務 = 12 Chrome 實例同時運行) # 解決方案:改用獨立的 run_scheduler.py 或透過 Web UI 手動觸發任務 # try: # init_scheduler() # except Exception as e: # sys_log.error(f"❌ 排程器初始化失敗: {e}") sys_log.info("ℹ️ 自動排程器已停用(避免重複執行),請使用 run_scheduler.py 或 Web UI 手動觸發") def start_flask(): sys_log.info("🚀 Web 服務正在啟動於 port 80...") app.run(host='0.0.0.0', port=80, use_reloader=False) def scheduled_job_wrapper(): """執行 MOMO 爬蟲任務並發送通知""" timestamp = datetime.now(TAIPEI_TZ).strftime('%H:%M:%S') sys_log.info(f"⏰ [{timestamp}] 啟動背景抓取執行緒...") def job(): # 1. 執行爬蟲 run_momo_task() # 2. 發送通知 (僅發送今日異動) try: # 重新載入通知模組 import importlib import scheduler import services.notification_manager importlib.reload(scheduler) importlib.reload(services.notification_manager) from services.notification_manager import NotificationManager stats = get_dashboard_stats() # 只要有任何異動數據就發送通知 if any(stats.values()): screenshot_path = scheduler.capture_page_screenshot("http://127.0.0.1/", "momo_dashboard") NotificationManager().send_momo_report(stats, screenshot_path) except Exception as e: sys_log.error(f"[Scheduler] ❌ 發送通知失敗: {e}") threading.Thread(target=job, daemon=True).start() if __name__ == "__main__": banner = f" MOMO 專業數據管理系統 {SYSTEM_VERSION} " sys_log.info(f"{ '='*20} {banner} {'='*20}") # 啟動前先檢查資料庫結構 repair_database_schema() # 使用生產環境域名 public_url = "https://mo.wooo.work" sys_log.info(f"✅ 使用固定網址: {public_url}") # 🚩 V9.7 將公開 URL 寫入設定檔,供其他模組使用 try: url_config_path = os.path.join(BASE_DIR, 'data', 'url_config.json') with open(url_config_path, 'w') as f: json.dump({"public_url": public_url}, f) except Exception as file_err: sys_log.error(f"⚠️ URL 設定檔寫入失敗 (不影響服務運行,可能磁碟已滿): {file_err}") web_server = threading.Thread(target=start_flask) web_server.daemon = True web_server.start() # 排程器已在模組載入時自動初始化(見 init_scheduler() 函式) sys_log.info("ℹ️ 排程器已在全域範圍初始化完成") try: while True: time.sleep(3600) except KeyboardInterrupt: sys_log.info("🔌 Web 服務已關閉") try: ngrok.disconnect(public_url) except Exception as e: sys_log.info(f"ℹ️ Ngrok 關閉時無需額外操作: {e}")