1207 lines
48 KiB
Python
1207 lines
48 KiB
Python
# ================= TODO LIST (待辦事項 - 重開機後請依序執行) =================
|
||
# 1. [驗證] 重啟 app.py 後,重新匯入 Excel,確認「自動去重」功能是否生效 (重複匯入應顯示 0 筆新增)。
|
||
# 2. [檢查] 前往 /sales_analysis 頁面,確認 '狀態' 欄位是否正確顯示 'F' (目前為原始匯入模式)。
|
||
# 3. [決策] 若資料顯示正常,評估是否需要恢復「智慧資料清理」邏輯 (目前程式碼第 1160 行左右已註解)。
|
||
# 4. [備份] 確認系統運作正常後,執行系統備份。
|
||
# =======================================================================
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import threading
|
||
import math
|
||
import json
|
||
import hashlib
|
||
import shutil
|
||
import zipfile
|
||
import re
|
||
import io # V-New: 用於 Excel 匯出
|
||
import traceback # V-Fix: 用於錯誤追蹤
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
# ================= 🔧 1. 環境與路徑鎖定 =================
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
# 確保專案根目錄在 sys.path 的最前面,優先讀取本地模組
|
||
sys.path.insert(0, BASE_DIR)
|
||
|
||
# 自動檢核並建立必要目錄
|
||
try:
|
||
for folder in ['database', 'services', 'crawler', 'logs', 'data', 'templates', 'web/static']:
|
||
folder_path = os.path.join(BASE_DIR, folder)
|
||
if not os.path.exists(folder_path):
|
||
os.makedirs(folder_path)
|
||
|
||
# 僅針對 Python 套件目錄建立 __init__.py
|
||
if 'web' not in folder:
|
||
init_file = os.path.join(folder_path, '__init__.py')
|
||
if not os.path.exists(init_file):
|
||
with open(init_file, 'w') as f: pass
|
||
except OSError as e:
|
||
print(f"❌ 系統初始化失敗: 無法建立目錄或檔案 (磁碟可能已滿) - {e}")
|
||
|
||
# ================= 🔧 2. 核心模組導入 =================
|
||
try:
|
||
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, send_from_directory, flash, session
|
||
from werkzeug.utils import secure_filename
|
||
from pyngrok import ngrok, conf
|
||
import schedule
|
||
from sqlalchemy import desc, and_, func, text, literal, case
|
||
from sqlalchemy import inspect # V-New: 用於檢查資料表是否存在
|
||
from sqlalchemy.orm import joinedload
|
||
import pandas as pd # type: ignore
|
||
from pandas.api.types import is_numeric_dtype # type: ignore
|
||
import numpy as np # type: ignore # V-Opt: 引入 numpy 進行向量化運算加速
|
||
|
||
# 導入自定義模組
|
||
try:
|
||
from scheduler import run_momo_task, run_edm_task, run_festival_task, run_auto_import_task, run_whitepage_check, run_competitor_price_feeder_task
|
||
from database.manager import DatabaseManager
|
||
from database.models import Base, Product, PriceRecord, MonthlySummaryAnalysis
|
||
from database.edm_models import PromoProduct
|
||
except ImportError as e:
|
||
print(f"❌ 專案內部檔案缺失: {e}\n請檢查 database/ 或 services/ 目錄下的 .py 檔案是否存在。")
|
||
sys.exit(1)
|
||
|
||
from services.logger_manager import SystemLogger
|
||
from services.exporter import Exporter # 🚩 導入匯出模組
|
||
except ImportError as e:
|
||
print(f"❌ 關鍵套件導入失敗: {e}")
|
||
sys.exit(1)
|
||
|
||
# ================= 🔧 3. 系統核心配置 =================
|
||
# 從 config.py 匯入必要的設定
|
||
from config import EXCEL_EXPORT_DIR, DATABASE_TYPE, validate_critical_config
|
||
|
||
sys_log = SystemLogger("Web_Server").get_logger()
|
||
|
||
# 驗證選用配置,缺少時輸出 warning(非 fatal)
|
||
for _warn in validate_critical_config():
|
||
sys_log.warning(_warn)
|
||
|
||
|
||
|
||
|
||
# 商品看板 cache 單一來源。實際路由已在 routes/dashboard_routes.py。
|
||
from services.cache_manager import _DASHBOARD_DATA_CACHE, _DASHBOARD_CACHE_TTL # noqa: E402
|
||
|
||
# 🚩 檢查磁碟空間 (V9.52 新增)
|
||
try:
|
||
total, used, free = shutil.disk_usage(BASE_DIR)
|
||
if free < 200 * 1024 * 1024: # 小於 200MB
|
||
sys_log.critical(f"[System] [DISK_CHECK] 🚨 嚴重警告: 磁碟空間極低 | Free: {free // (1024*1024)} MB")
|
||
elif free < 1024 * 1024 * 1024: # 小於 1GB
|
||
sys_log.warning(f"[System] [DISK_CHECK] ⚠️ 警告: 磁碟空間不足 1GB | Free: {free // (1024*1024)} MB")
|
||
except Exception as e:
|
||
sys_log.error(f"無法檢測磁碟空間: {e}")
|
||
|
||
# 🚩 系統版本定義 (備份與顯示用)
|
||
# 🚩 2026-05-01 V10.45: AI product pick list and improved PChome matching
|
||
SYSTEM_VERSION = "V10.45"
|
||
|
||
# ==========================================
|
||
# 🔒 SQL Injection 防護函數
|
||
# ==========================================
|
||
|
||
# 允許的資料表白名單
|
||
# 安全工具:實作已搬至 utils/security.py,此處 re-export 維持向後相容
|
||
from utils.security import ( # noqa: E402
|
||
ALLOWED_TABLES,
|
||
validate_table_name,
|
||
validate_column_names,
|
||
)
|
||
|
||
# 安全工具:路徑遍歷 + 檔案上傳驗證 + safe_read_sql 已搬至 utils/security.py
|
||
from utils.security import ( # noqa: E402
|
||
safe_read_sql,
|
||
safe_join,
|
||
ALLOWED_UPLOAD_EXTENSIONS,
|
||
ALLOWED_MIME_TYPES,
|
||
secure_filename_unicode,
|
||
allowed_file,
|
||
validate_upload_file,
|
||
)
|
||
|
||
# 🚩 資料庫結構自動修復 (V9.53 新增) — 實作搬至 database/schema_repair.py
|
||
from database.schema_repair import repair_database_schema # noqa: E402, F401
|
||
|
||
# 從環境變數讀取 NGROK_AUTH_TOKEN;未設定時禁止使用硬編碼預設值
|
||
NGROK_AUTH_TOKEN = os.getenv('NGROK_AUTH_TOKEN', '')
|
||
if NGROK_AUTH_TOKEN:
|
||
conf.get_default().auth_token = NGROK_AUTH_TOKEN
|
||
else:
|
||
sys_log.warning("[Security] ⚠️ NGROK_AUTH_TOKEN 未設定,已跳過 ngrok auth token 注入")
|
||
|
||
TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates')
|
||
STATIC_DIR = os.path.join(BASE_DIR, 'web/static')
|
||
|
||
# 檢查關鍵模板是否存在
|
||
if not os.path.exists(os.path.join(TEMPLATE_DIR, 'dashboard.html')):
|
||
sys_log.warning(f"[Web] [Template] ⚠️ 警告: 找不到 dashboard.html | Path: {TEMPLATE_DIR}")
|
||
|
||
app = Flask(__name__,
|
||
template_folder=TEMPLATE_DIR,
|
||
static_folder=STATIC_DIR)
|
||
|
||
# ==========================================
|
||
# 🔒 Flask 安全配置
|
||
# ==========================================
|
||
|
||
# 從 config.py 導入 SECRET_KEY
|
||
from config import SECRET_KEY
|
||
|
||
# 基本配置
|
||
app.config['SECRET_KEY'] = SECRET_KEY
|
||
|
||
# Session 安全配置
|
||
app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止 JavaScript 存取 cookie(防 XSS)
|
||
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # 防止 CSRF 攻擊
|
||
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24) # Session 有效期 24 小時(延長避免長時間閒置斷線)
|
||
|
||
# 如果使用 HTTPS,啟用 SECURE cookie(本地開發時應設為 False)
|
||
# 注意:如果您的系統部署在 HTTPS 環境,請將 .env 中的 USE_HTTPS 設為 true
|
||
USE_HTTPS = os.getenv('USE_HTTPS', 'false').lower() == 'true'
|
||
if USE_HTTPS:
|
||
app.config['SESSION_COOKIE_SECURE'] = True
|
||
sys_log.info("[Security] ✅ HTTPS 模式已啟用,Session cookie 僅透過 HTTPS 傳輸")
|
||
else:
|
||
app.config['SESSION_COOKIE_SECURE'] = False
|
||
sys_log.warning("[Security] ⚠️ HTTP 模式(開發環境),Session cookie 未強制 HTTPS")
|
||
|
||
# 檔案上傳大小限制(10MB)
|
||
# V-New: 提高檔案上傳大小限制 (從 10MB 提高到 100MB)
|
||
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
|
||
|
||
sys_log.info("[Security] ✅ Flask 安全配置已載入")
|
||
sys_log.info(f"[Security] • Session 有效期: 2 小時")
|
||
sys_log.info(f"[Security] • 檔案上傳限制: 10 MB")
|
||
sys_log.info(f"[Security] • CSRF 防護: SameSite=Lax")
|
||
sys_log.info(f"[Security] • XSS 防護: HttpOnly=True")
|
||
|
||
# ==========================================
|
||
# 🔒 CSRF 防護配置
|
||
# ==========================================
|
||
|
||
from flask_wtf.csrf import CSRFProtect
|
||
|
||
csrf = CSRFProtect(app)
|
||
sys_log.info("[Security] ✅ CSRF 防護已啟用 (Flask-WTF)")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - 廠商缺貨系統
|
||
# ==========================================
|
||
from routes.vendor_routes import vendor_bp
|
||
app.register_blueprint(vendor_bp)
|
||
sys_log.info("[Blueprint] ✅ 廠商缺貨系統 Blueprint 已註冊")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - Google Drive 自動匯入
|
||
# ==========================================
|
||
from routes.auto_import_routes import auto_import_bp
|
||
app.register_blueprint(auto_import_bp)
|
||
csrf.exempt(auto_import_bp)
|
||
sys_log.info("[Blueprint] ✅ Google Drive 自動匯入 Blueprint 已註冊 (CSRF 已豁免)")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - 爬蟲管理系統
|
||
# ==========================================
|
||
from routes.crawler_management_routes import crawler_bp
|
||
app.register_blueprint(crawler_bp)
|
||
sys_log.info("[Blueprint] ✅ 爬蟲管理系統 Blueprint 已註冊")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - AI 智慧文案系統
|
||
# ==========================================
|
||
from routes.ai_routes import ai_bp
|
||
app.register_blueprint(ai_bp)
|
||
csrf.exempt(ai_bp) # ICAIM API 使用內部呼叫,不需要 CSRF
|
||
sys_log.info("[Blueprint] ✅ AI 智慧文案系統 Blueprint 已註冊")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - CI/CD Dashboard
|
||
# ==========================================
|
||
from routes.cicd_routes import cicd_bp
|
||
app.register_blueprint(cicd_bp)
|
||
csrf.exempt(cicd_bp) # CI/CD API doesn't need CSRF
|
||
sys_log.info("[Blueprint] CI/CD Dashboard Blueprint registered")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - Code Review 系統
|
||
# ==========================================
|
||
try:
|
||
from routes.code_review_routes import code_review_bp
|
||
app.register_blueprint(code_review_bp)
|
||
csrf.exempt(code_review_bp) # Code Review API 使用內部認證,不需要 CSRF
|
||
sys_log.info("[Blueprint] ✅ Code Review 系統 Blueprint 已註冊 (CSRF 已豁免)")
|
||
except Exception as _e:
|
||
sys_log.warning(f"[Blueprint] ⚠️ Code Review 系統 Blueprint 註冊失敗: {_e}")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - 趨勢資料系統
|
||
# ==========================================
|
||
from routes.trend_routes import trend_bp
|
||
app.register_blueprint(trend_bp)
|
||
sys_log.info("[Blueprint] ✅ 趨勢資料系統 Blueprint 已註冊")
|
||
|
||
# ==========================================
|
||
# 🔒 Auth 路由註冊 - 登入/登出
|
||
# ==========================================
|
||
from auth import init_auth_routes, login_required
|
||
init_auth_routes(app)
|
||
sys_log.info("[Auth] ✅ 登入/登出路由已註冊")
|
||
|
||
# ==========================================
|
||
# 🔧 Blueprint 註冊 - 用戶管理系統
|
||
# ==========================================
|
||
from routes.user_routes import user_bp
|
||
app.register_blueprint(user_bp)
|
||
sys_log.info("[Blueprint] ✅ 用戶管理系統 Blueprint 已註冊")
|
||
|
||
# ==========================================
|
||
# 🚨 Blueprint 註冊 - 系統告警
|
||
# ==========================================
|
||
from routes.alert_routes import alert_bp
|
||
app.register_blueprint(alert_bp)
|
||
csrf.exempt(alert_bp)
|
||
sys_log.info("[Blueprint] ✅ 系統告警 Blueprint 已註冊 (CSRF 已豁免)")
|
||
|
||
# ==========================================
|
||
# 系統管理路由 Blueprint
|
||
# ==========================================
|
||
from routes.system_routes import system_bp
|
||
app.register_blueprint(system_bp)
|
||
csrf.exempt(system_bp) # n8n API 需要豁免 CSRF
|
||
sys_log.info("[Blueprint] ✅ 系統管理 Blueprint 已註冊 (CSRF 已豁免)")
|
||
|
||
from routes.system_public_routes import system_public_bp
|
||
app.register_blueprint(system_public_bp)
|
||
sys_log.info("[Blueprint] ✅ 公開系統頁面 Blueprint 已註冊")
|
||
|
||
from routes.category_routes import category_bp
|
||
app.register_blueprint(category_bp)
|
||
sys_log.info("[Blueprint] ✅ 分類 CRUD Blueprint 已註冊")
|
||
|
||
from routes.misc_routes import misc_bp
|
||
app.register_blueprint(misc_bp)
|
||
sys_log.info("[Blueprint] ✅ 雜項 Routes Blueprint 已註冊 (/api/test_url, /brand_assets)")
|
||
|
||
# ==========================================
|
||
# 通知模板管理 Blueprint
|
||
# ==========================================
|
||
from routes.notification_routes import notification_bp
|
||
app.register_blueprint(notification_bp)
|
||
csrf.exempt(notification_bp) # n8n API 需要豁免 CSRF
|
||
sys_log.info("[Blueprint] ✅ 通知模板管理 Blueprint 已註冊")
|
||
|
||
# ==========================================
|
||
# Bot API Blueprint (Clawdbot 整合)
|
||
# ==========================================
|
||
from routes.bot_api_routes import bot_api_bp
|
||
app.register_blueprint(bot_api_bp)
|
||
csrf.exempt(bot_api_bp) # Bot API 使用 Token 認證,不需要 CSRF
|
||
sys_log.info("[Blueprint] ✅ Bot API Blueprint 已註冊")
|
||
|
||
|
||
# ==========================================
|
||
# Elephant Alpha AI Agent Super Orchestrator Blueprint
|
||
# ==========================================
|
||
try:
|
||
from routes.elephant_alpha_routes import elephant_alpha_bp
|
||
app.register_blueprint(elephant_alpha_bp)
|
||
csrf.exempt(elephant_alpha_bp) # Elephant Alpha API uses internal auth
|
||
sys_log.info("[Blueprint] Elephant Alpha AI Agent Super Orchestrator Blueprint registered")
|
||
except Exception as _e:
|
||
sys_log.warning(f"[Blueprint] Elephant Alpha registration failed: {_e}")
|
||
sys_log.info("[Blueprint] Elephant Alpha features will be unavailable")
|
||
|
||
# [2026-04-18 台北] OpenClaw Bot Blueprint — 修復 /menu 啞巴 (/bot/telegram/webhook 404)
|
||
# 原因:routes/openclaw_bot_routes.py 有 5000+ 行完整 telegram bot handler,但 app.py 從未 register
|
||
# 效果:Telegram 送進來的 update (包含 /menu) 能被正確接收與處理
|
||
try:
|
||
from routes.openclaw_bot_routes import openclaw_bot_bp
|
||
app.register_blueprint(openclaw_bot_bp)
|
||
csrf.exempt(openclaw_bot_bp) # Telegram webhook 不需要 CSRF
|
||
sys_log.info("[Blueprint] ✅ OpenClaw Bot Blueprint 已註冊 (Telegram /menu 復活)")
|
||
except Exception as _e:
|
||
sys_log.error(f"[Blueprint] ❌ OpenClaw Bot Blueprint 註冊失敗: {_e}")
|
||
|
||
from routes.api_routes import api_bp
|
||
app.register_blueprint(api_bp)
|
||
sys_log.info("[Blueprint] ✅ api_bp 已註冊")
|
||
|
||
from routes.edm_routes import edm_bp
|
||
app.register_blueprint(edm_bp)
|
||
sys_log.info("[Blueprint] ✅ edm_bp 已註冊")
|
||
|
||
from routes.sales_routes import sales_bp
|
||
app.register_blueprint(sales_bp)
|
||
sys_log.info("[Blueprint] ✅ sales_bp 已註冊")
|
||
|
||
from routes.monthly_routes import monthly_bp
|
||
app.register_blueprint(monthly_bp)
|
||
sys_log.info("[Blueprint] ✅ monthly_bp 已註冊")
|
||
|
||
from routes.price_comparison_routes import price_comparison_bp
|
||
app.register_blueprint(price_comparison_bp)
|
||
sys_log.info("[Blueprint] ✅ price_comparison_bp 已註冊")
|
||
|
||
from routes.export_routes import export_bp
|
||
app.register_blueprint(export_bp)
|
||
sys_log.info("[Blueprint] ✅ export_bp 已註冊")
|
||
|
||
from routes.daily_sales_routes import daily_sales_bp
|
||
app.register_blueprint(daily_sales_bp)
|
||
sys_log.info("[Blueprint] ✅ daily_sales_bp 已註冊")
|
||
|
||
from routes.dashboard_routes import dashboard_bp
|
||
app.register_blueprint(dashboard_bp)
|
||
sys_log.info("[Blueprint] ✅ dashboard_bp 已註冊")
|
||
|
||
from routes.import_routes import import_bp
|
||
app.register_blueprint(import_bp)
|
||
sys_log.info("[Blueprint] ✅ import_bp 已註冊")
|
||
|
||
from routes.pchome_routes import pchome_bp
|
||
app.register_blueprint(pchome_bp)
|
||
sys_log.info("[Blueprint] ✅ pchome_bp 已註冊")
|
||
|
||
# V-Fix: 註冊 slugify 函數供模板使用(實作搬至 utils/text_helpers.py)
|
||
from utils.text_helpers import slugify # noqa: E402
|
||
|
||
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
|
||
public_url = "服務啟動中..."
|
||
|
||
# 🚩 時區設定:台北時間 (UTC+8)
|
||
TAIPEI_TZ = timezone(timedelta(hours=8))
|
||
|
||
EXPECTED_METADATA_TABLES = {
|
||
'categories', 'products', 'price_records', 'monthly_summary_analysis',
|
||
'users', 'login_history', 'permissions', 'user_permissions',
|
||
'promo_products', 'trend_records', 'trend_keywords', 'trend_analysis',
|
||
'web_search_cache', 'telegram_users',
|
||
'ai_generation_history', 'ai_prompt_templates', 'ai_usage_tracking', 'ai_insights',
|
||
'agent_context', 'action_plans', 'action_outcomes', 'agent_strategy_weights',
|
||
'incidents', 'playbooks', 'heal_logs',
|
||
'import_jobs', 'import_config', 'notification_templates', 'ppt_reports',
|
||
'vendor_stockout', 'vendor_list', 'vendor_emails', 'email_send_log',
|
||
'realtime_sales_monthly',
|
||
}
|
||
|
||
|
||
def verify_metadata_tables():
|
||
missing = EXPECTED_METADATA_TABLES - set(Base.metadata.tables.keys())
|
||
if missing:
|
||
raise SystemExit(f"Base.metadata 漏表: {sorted(missing)}")
|
||
|
||
|
||
verify_metadata_tables()
|
||
|
||
# ==========================================
|
||
# 🔧 全域模板變數注入 (Context Processor)
|
||
# ==========================================
|
||
from config import METABASE_URL, GRIST_URL
|
||
|
||
@app.context_processor
|
||
def inject_global_vars():
|
||
"""注入全域變數到所有模板"""
|
||
return {
|
||
'metabase_url': METABASE_URL,
|
||
'grist_url': GRIST_URL,
|
||
'datetime_now': datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'),
|
||
}
|
||
|
||
sys_log.info("[Template] ✅ 全域模板變數已注入 (metabase_url, grist_url)")
|
||
|
||
# ================= 🛠️ V9.72: 分類設定管理核心 =================
|
||
CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json')
|
||
|
||
# JSON 持久化:實作搬至 services/json_storage.py
|
||
from services.json_storage import ( # noqa: E402, F401
|
||
load_categories,
|
||
save_categories,
|
||
load_scheduler_stats,
|
||
)
|
||
|
||
# ================= 🛠️ 數據處理核心 (封裝) =================
|
||
|
||
# 純工具:實作已搬至 utils/text_helpers.py
|
||
from utils.text_helpers import ( # noqa: E402
|
||
get_color_for_string,
|
||
extract_snapshot_date_from_filename,
|
||
number_format as _number_format,
|
||
)
|
||
|
||
|
||
@app.template_filter('number_format')
|
||
def number_format_filter(value):
|
||
"""Jinja filter wrapper — 實作見 utils.text_helpers.number_format。"""
|
||
return _number_format(value)
|
||
|
||
# V-Refactor: 將 find_col 移至全域,方便多個函式共用
|
||
from utils.df_helpers import find_col # noqa: E402
|
||
|
||
def get_consolidated_data():
|
||
"""🚩 統一封裝:獲取全分類去重後的當前數據、昨日對比及差值 (V-Opt: 優化查詢效能 + 快取)"""
|
||
global _DASHBOARD_DATA_CACHE
|
||
|
||
# V-New: 檢查快取是否有效
|
||
now = datetime.now(TAIPEI_TZ)
|
||
if (_DASHBOARD_DATA_CACHE['consolidated_data'] is not None and
|
||
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] is not None):
|
||
cache_age = (now.timestamp() - _DASHBOARD_DATA_CACHE['consolidated_timestamp'])
|
||
if cache_age < _DASHBOARD_CACHE_TTL:
|
||
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用快取資料 | 快取年齡: {cache_age:.1f}秒")
|
||
return _DASHBOARD_DATA_CACHE['consolidated_data'], _DASHBOARD_DATA_CACHE['today_start']
|
||
|
||
sys_log.debug("[Dashboard] [Cache] 🔄 快取過期或不存在,重新查詢資料庫")
|
||
|
||
db = DatabaseManager()
|
||
session = db.get_session()
|
||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
|
||
seven_days_ago = today_start - timedelta(days=7)
|
||
thirty_days_ago = today_start - timedelta(days=30)
|
||
|
||
try:
|
||
# Query 1: Get the latest price record for every product. This is our main list of items.
|
||
latest_price_subq = session.query(
|
||
func.max(PriceRecord.id).label('max_id')
|
||
).group_by(PriceRecord.product_id).subquery()
|
||
|
||
latest_records = session.query(PriceRecord).options(
|
||
joinedload(PriceRecord.product)
|
||
).join(latest_price_subq, PriceRecord.id == latest_price_subq.c.max_id).all()
|
||
|
||
product_ids = [r.product_id for r in latest_records]
|
||
if not product_ids:
|
||
session.close() # 提前關閉連線
|
||
return [], today_start
|
||
|
||
# Query 2: Get yesterday's closing prices for all products in one go
|
||
yesterday_prices_subq = session.query(
|
||
PriceRecord.product_id,
|
||
func.max(PriceRecord.id).label('max_id')
|
||
).filter(
|
||
PriceRecord.product_id.in_(product_ids),
|
||
PriceRecord.timestamp < today_start
|
||
).group_by(PriceRecord.product_id).subquery()
|
||
|
||
yesterday_prices_q = session.query(
|
||
PriceRecord.product_id, PriceRecord.price
|
||
).join(
|
||
yesterday_prices_subq,
|
||
PriceRecord.id == yesterday_prices_subq.c.max_id
|
||
)
|
||
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
|
||
|
||
# Query 3: Get specific historical price points (7 days ago and 30 days ago)
|
||
# Instead of fetching ALL history, we fetch only the records closest to the target dates.
|
||
# This is a significant optimization.
|
||
|
||
# Helper to get price map for a specific date (start of day)
|
||
def get_price_map_before(target_date):
|
||
subq = session.query(
|
||
PriceRecord.product_id,
|
||
func.max(PriceRecord.timestamp).label('max_ts')
|
||
).filter(
|
||
PriceRecord.product_id.in_(product_ids),
|
||
PriceRecord.timestamp < target_date
|
||
).group_by(PriceRecord.product_id).subquery()
|
||
|
||
q = session.query(PriceRecord.product_id, PriceRecord.price).join(
|
||
subq,
|
||
and_(PriceRecord.product_id == subq.c.product_id, PriceRecord.timestamp == subq.c.max_ts)
|
||
)
|
||
return {pid: price for pid, price in q}
|
||
|
||
prices_7d_ago_map = get_price_map_before(seven_days_ago + timedelta(days=1)) # Approximate 7 days ago
|
||
prices_30d_ago_map = get_price_map_before(thirty_days_ago + timedelta(days=1)) # Approximate 30 days ago
|
||
|
||
# Query 4: Get TODAY's records only (for sparkline/intraday change)
|
||
today_records_q = session.query(PriceRecord).filter(
|
||
PriceRecord.product_id.in_(product_ids),
|
||
PriceRecord.timestamp >= today_start
|
||
).order_by(PriceRecord.product_id, PriceRecord.timestamp).all()
|
||
|
||
today_map = {}
|
||
for r in today_records_q:
|
||
if r.product_id not in today_map: today_map[r.product_id] = []
|
||
today_map[r.product_id].append(r)
|
||
|
||
# Final Assembly (in-memory, no more DB queries)
|
||
unique_items = []
|
||
for r in latest_records:
|
||
pid = r.product_id
|
||
|
||
# 7d/30d stats
|
||
price_7d = prices_7d_ago_map.get(pid)
|
||
price_30d = prices_30d_ago_map.get(pid)
|
||
|
||
stats_7d_diff = r.price - price_7d if price_7d is not None else 0
|
||
stats_30d_diff = r.price - price_30d if price_30d is not None else 0
|
||
|
||
# Today's stats
|
||
today_records = today_map.get(pid, [])
|
||
today_diff = 0
|
||
today_changes = []
|
||
if len(today_records) > 1:
|
||
today_diff = today_records[-1].price - today_records[0].price
|
||
|
||
# Yesterday diff
|
||
y_price = yesterday_prices_map.get(pid)
|
||
yesterday_diff = r.price - y_price if y_price is not None else 0
|
||
|
||
status = "NONE"
|
||
if yesterday_diff > 0:
|
||
status = "PRICE_UP"
|
||
elif yesterday_diff < 0:
|
||
status = "PRICE_DOWN"
|
||
|
||
# Today's changes details
|
||
last_p = y_price if y_price is not None else (today_records[0].price if today_records else r.price)
|
||
for tr in today_records:
|
||
if tr.price != last_p:
|
||
diff = tr.price - last_p
|
||
today_changes.append({
|
||
'time': tr.timestamp.strftime('%H:%M'),
|
||
'price': tr.price,
|
||
'diff': diff
|
||
})
|
||
last_p = tr.price
|
||
|
||
unique_items.append({
|
||
'record': r,
|
||
'stats': {'7d_diff': stats_7d_diff, '30d_diff': stats_30d_diff, '1d_diff': today_diff},
|
||
'yesterday_diff': yesterday_diff,
|
||
'today_changes': today_changes,
|
||
'status': status
|
||
})
|
||
|
||
# V-New: 更新快取
|
||
_DASHBOARD_DATA_CACHE['consolidated_data'] = unique_items
|
||
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] = now.timestamp()
|
||
_DASHBOARD_DATA_CACHE['today_start'] = today_start
|
||
sys_log.debug(f"[Dashboard] [Cache] 💾 快取已更新 | 商品數: {len(unique_items)}")
|
||
|
||
return unique_items, today_start
|
||
finally:
|
||
session.close()
|
||
|
||
def get_dashboard_stats():
|
||
"""計算看板統計數據 (供通知使用) — backward-compat wrapper."""
|
||
from services.dashboard_service import get_dashboard_stats as _get_dashboard_stats
|
||
return _get_dashboard_stats()
|
||
|
||
# ================= 🛣️ 4. Flask 路由 =================
|
||
|
||
# Session 自動續期機制
|
||
@app.before_request
|
||
def refresh_session():
|
||
"""
|
||
在每次請求時自動刷新 Session,避免長時間閒置後突然斷線
|
||
只要用戶有任何操作,Session 就會自動延長
|
||
"""
|
||
if session.get('logged_in'):
|
||
session.modified = True # 標記 Session 已修改,觸發 Cookie 更新
|
||
|
||
|
||
def verify_unique_routes():
|
||
"""啟動期防線:同一 URL + method 不得由兩個 endpoint 同時註冊。"""
|
||
seen = {}
|
||
for rule in app.url_map.iter_rules():
|
||
key = (str(rule), frozenset(rule.methods - {'HEAD', 'OPTIONS'}))
|
||
if key in seen:
|
||
raise SystemExit(f"重複路由: {key} 來自 {seen[key]} 與 {rule.endpoint}")
|
||
seen[key] = rule.endpoint
|
||
|
||
|
||
verify_unique_routes()
|
||
|
||
|
||
def preprocess_daily_sales_data(df):
|
||
"""前處理當日業績資料:欄位識別、型別轉換"""
|
||
cols = df.columns.tolist()
|
||
|
||
# 欄位自動識別(使用現有的 find_col 函式)
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', 'Amount', '總業績'])
|
||
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
|
||
col_profit = find_col(cols, ['毛利', 'Profit'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
|
||
|
||
# 型別轉換
|
||
if col_amount:
|
||
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
|
||
if col_cost:
|
||
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
|
||
if col_profit:
|
||
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
|
||
if col_qty:
|
||
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
|
||
|
||
# 日期轉換
|
||
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'], errors='coerce')
|
||
|
||
return df
|
||
|
||
def calculate_daily_kpis(df, date_str):
|
||
"""計算單日 6 個 KPI"""
|
||
day_df = df[df['snapshot_date'] == date_str]
|
||
cols = day_df.columns.tolist()
|
||
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
|
||
col_profit = find_col(cols, ['毛利', 'Profit'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
|
||
|
||
total_revenue = float(day_df[col_amount].sum()) if col_amount else 0
|
||
total_cost = float(day_df[col_cost].sum()) if col_cost else 0
|
||
gross_margin = float(day_df[col_profit].sum()) if col_profit else (total_revenue - total_cost)
|
||
total_qty = float(day_df[col_qty].sum()) if col_qty else 0
|
||
sku_count = int(day_df[col_name].nunique()) if col_name else 0
|
||
avg_price = total_revenue / total_qty if total_qty > 0 else 0
|
||
|
||
return {
|
||
'total_revenue': total_revenue,
|
||
'total_cost': total_cost,
|
||
'gross_margin': gross_margin,
|
||
'total_qty': total_qty,
|
||
'sku_count': sku_count,
|
||
'avg_price': avg_price
|
||
}
|
||
|
||
def calculate_dod(df, current_date):
|
||
"""計算 Day-over-Day 變化率"""
|
||
current = calculate_daily_kpis(df, current_date)
|
||
prev_date = current_date - timedelta(days=1)
|
||
|
||
if prev_date not in df['snapshot_date'].values:
|
||
return {k: 0.0 for k in current.keys()}
|
||
|
||
previous = calculate_daily_kpis(df, prev_date)
|
||
|
||
dod = {}
|
||
for key in current:
|
||
if previous[key] > 0:
|
||
dod[key] = ((current[key] - previous[key]) / previous[key]) * 100
|
||
else:
|
||
dod[key] = 0.0
|
||
return dod
|
||
|
||
def calculate_wow(df, current_date):
|
||
"""計算 Week-over-Week 變化率"""
|
||
current = calculate_daily_kpis(df, current_date)
|
||
prev_week_date = current_date - timedelta(days=7)
|
||
|
||
if prev_week_date not in df['snapshot_date'].values:
|
||
return {k: 0.0 for k in current.keys()}
|
||
|
||
previous = calculate_daily_kpis(df, prev_week_date)
|
||
|
||
wow = {}
|
||
for key in current:
|
||
if previous[key] > 0:
|
||
wow[key] = ((current[key] - previous[key]) / previous[key]) * 100
|
||
else:
|
||
wow[key] = 0.0
|
||
return wow
|
||
|
||
def prepare_daily_charts(df, selected_date, days=30):
|
||
"""準備 4 個圖表的數據(根據選擇的日期)"""
|
||
# 取選擇日期前 N 天的數據
|
||
start_date = selected_date - timedelta(days=days)
|
||
df_range = df[(df['snapshot_date'] >= start_date) & (df['snapshot_date'] <= selected_date)]
|
||
|
||
# 按日期聚合
|
||
cols = df_range.columns.tolist()
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_cost = find_col(cols, ['成本', '總成本'])
|
||
col_profit = find_col(cols, ['毛利'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名'])
|
||
|
||
# 日期聚合
|
||
agg_dict = {}
|
||
if col_amount:
|
||
agg_dict[col_amount] = 'sum'
|
||
if col_cost:
|
||
agg_dict[col_cost] = 'sum'
|
||
if col_profit:
|
||
agg_dict[col_profit] = 'sum'
|
||
if col_qty:
|
||
agg_dict[col_qty] = 'sum'
|
||
|
||
daily_agg = df_range.groupby('snapshot_date').agg(agg_dict).reset_index()
|
||
|
||
# 計算或取得毛利(如果沒有毛利欄位,用業績-成本計算)
|
||
if col_profit and col_profit in daily_agg.columns:
|
||
daily_agg['profit'] = daily_agg[col_profit]
|
||
elif col_amount and col_cost and col_amount in daily_agg.columns and col_cost in daily_agg.columns:
|
||
daily_agg['profit'] = daily_agg[col_amount] - daily_agg[col_cost]
|
||
else:
|
||
daily_agg['profit'] = 0
|
||
|
||
# 計算客單價
|
||
if col_amount and col_qty and col_amount in daily_agg.columns and col_qty in daily_agg.columns:
|
||
daily_agg['avg_price'] = (daily_agg[col_amount] / daily_agg[col_qty]).fillna(0)
|
||
else:
|
||
daily_agg['avg_price'] = 0
|
||
|
||
# 計算 DoD (Day-over-Day) 變化率 - 多個維度
|
||
if col_amount and col_amount in daily_agg.columns:
|
||
daily_agg['dod_revenue'] = daily_agg[col_amount].pct_change() * 100
|
||
if 'profit' in daily_agg.columns:
|
||
daily_agg['dod_profit'] = daily_agg['profit'].pct_change() * 100
|
||
if 'avg_price' in daily_agg.columns:
|
||
daily_agg['dod_avg_price'] = daily_agg['avg_price'].pct_change() * 100
|
||
if col_qty and col_qty in daily_agg.columns:
|
||
daily_agg['dod_qty'] = daily_agg[col_qty].pct_change() * 100
|
||
|
||
# 計算 WoW (Week-over-Week) 變化率 - 多個維度
|
||
if col_amount and col_amount in daily_agg.columns:
|
||
daily_agg['wow_revenue'] = daily_agg[col_amount].pct_change(periods=7) * 100
|
||
if 'profit' in daily_agg.columns:
|
||
daily_agg['wow_profit'] = daily_agg['profit'].pct_change(periods=7) * 100
|
||
if 'avg_price' in daily_agg.columns:
|
||
daily_agg['wow_avg_price'] = daily_agg['avg_price'].pct_change(periods=7) * 100
|
||
if col_qty and col_qty in daily_agg.columns:
|
||
daily_agg['wow_qty'] = daily_agg[col_qty].pct_change(periods=7) * 100
|
||
|
||
# Top 10 商品(選擇的日期,包含廠商)
|
||
selected_df = df[df['snapshot_date'] == selected_date]
|
||
top10_labels = []
|
||
top10_values = []
|
||
|
||
if col_name and col_amount:
|
||
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
|
||
|
||
if col_vendor:
|
||
# 如果有廠商欄位,按商品+廠商聚合
|
||
top10_df = selected_df.groupby([col_name, col_vendor])[col_amount].sum().nlargest(10).reset_index()
|
||
top10_labels = [f"{row[col_name]} ({row[col_vendor]})" for _, row in top10_df.iterrows()]
|
||
top10_values = top10_df[col_amount].tolist()
|
||
else:
|
||
# 沒有廠商欄位,只按商品聚合
|
||
top10 = selected_df.groupby(col_name)[col_amount].sum().nlargest(10)
|
||
top10_labels = top10.index.tolist()
|
||
top10_values = top10.values.tolist()
|
||
|
||
return {
|
||
'labels': daily_agg['snapshot_date'].dt.strftime('%m/%d').tolist() if not daily_agg.empty else [],
|
||
'revenue': daily_agg[col_amount].tolist() if col_amount and col_amount in daily_agg.columns and not daily_agg.empty else [],
|
||
'cost': daily_agg[col_cost].tolist() if col_cost and col_cost in daily_agg.columns and not daily_agg.empty else [],
|
||
'profit': daily_agg['profit'].tolist() if 'profit' in daily_agg.columns and not daily_agg.empty else [],
|
||
'qty': daily_agg[col_qty].tolist() if col_qty and col_qty in daily_agg.columns and not daily_agg.empty else [],
|
||
'avg_price': daily_agg['avg_price'].tolist() if 'avg_price' in daily_agg.columns and not daily_agg.empty else [],
|
||
|
||
# DoD 多維度
|
||
'dod_revenue': daily_agg['dod_revenue'].fillna(0).tolist() if 'dod_revenue' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_profit': daily_agg['dod_profit'].fillna(0).tolist() if 'dod_profit' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_avg_price': daily_agg['dod_avg_price'].fillna(0).tolist() if 'dod_avg_price' in daily_agg.columns and not daily_agg.empty else [],
|
||
'dod_qty': daily_agg['dod_qty'].fillna(0).tolist() if 'dod_qty' in daily_agg.columns and not daily_agg.empty else [],
|
||
|
||
# WoW 多維度
|
||
'wow_revenue': daily_agg['wow_revenue'].fillna(0).tolist() if 'wow_revenue' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_profit': daily_agg['wow_profit'].fillna(0).tolist() if 'wow_profit' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_avg_price': daily_agg['wow_avg_price'].fillna(0).tolist() if 'wow_avg_price' in daily_agg.columns and not daily_agg.empty else [],
|
||
'wow_qty': daily_agg['wow_qty'].fillna(0).tolist() if 'wow_qty' in daily_agg.columns and not daily_agg.empty else [],
|
||
|
||
'top10_labels': top10_labels,
|
||
'top10_values': top10_values
|
||
}
|
||
|
||
def prepare_category_summary(df, date_str=None, is_month_view=False, month_start=None, month_end=None):
|
||
"""準備分類聚合列表 (支援單日或月度範圍)"""
|
||
if is_month_view and month_start is not None and month_end is not None:
|
||
day_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
|
||
else:
|
||
day_df = df[df['snapshot_date'] == date_str]
|
||
cols = day_df.columns.tolist()
|
||
|
||
col_category = find_col(cols, ['館別', '分類', 'Category'])
|
||
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '總業績'])
|
||
col_cost = find_col(cols, ['成本', '總成本'])
|
||
col_profit = find_col(cols, ['毛利'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名'])
|
||
|
||
if not col_category or not col_amount:
|
||
return []
|
||
|
||
# 分類 + 廠商聚合
|
||
agg_dict = {col_amount: 'sum'}
|
||
if col_cost:
|
||
agg_dict[col_cost] = 'sum'
|
||
if col_profit:
|
||
agg_dict[col_profit] = 'sum'
|
||
if col_qty:
|
||
agg_dict[col_qty] = 'sum'
|
||
if col_name:
|
||
agg_dict[col_name] = 'nunique'
|
||
|
||
# 如果有廠商欄位,按分類+廠商聚合;否則只按分類聚合
|
||
if col_vendor:
|
||
category_df = day_df.groupby([col_category, col_vendor]).agg(agg_dict).reset_index()
|
||
else:
|
||
category_df = day_df.groupby(col_category).agg(agg_dict).reset_index()
|
||
|
||
# 計算毛利(如果資料中沒有毛利欄位,自動計算)
|
||
if col_profit and col_profit in category_df.columns:
|
||
# 資料中有毛利欄位,直接使用
|
||
pass
|
||
elif col_amount and col_cost and col_amount in category_df.columns and col_cost in category_df.columns:
|
||
# 資料中沒有毛利欄位,用 業績 - 成本 計算
|
||
category_df['profit_calculated'] = category_df[col_amount] - category_df[col_cost]
|
||
col_profit = 'profit_calculated'
|
||
else:
|
||
col_profit = None
|
||
|
||
# 計算毛利率
|
||
if col_profit and col_profit in category_df.columns and col_amount and col_amount in category_df.columns:
|
||
category_df['margin_rate'] = (category_df[col_profit] / category_df[col_amount] * 100).fillna(0)
|
||
else:
|
||
category_df['margin_rate'] = 0
|
||
|
||
# 計算均價
|
||
if col_qty and col_amount:
|
||
category_df['avg_price'] = (category_df[col_amount] / category_df[col_qty]).fillna(0)
|
||
else:
|
||
category_df['avg_price'] = 0
|
||
|
||
# 重新命名欄位以便模板使用
|
||
rename_dict = {col_category: 'category', col_amount: 'revenue'}
|
||
if col_vendor:
|
||
rename_dict[col_vendor] = 'vendor'
|
||
if col_cost:
|
||
rename_dict[col_cost] = 'cost'
|
||
if col_profit and col_profit in category_df.columns:
|
||
rename_dict[col_profit] = 'profit'
|
||
if col_qty:
|
||
rename_dict[col_qty] = 'qty'
|
||
if col_name:
|
||
rename_dict[col_name] = 'sku_count'
|
||
|
||
category_df = category_df.rename(columns=rename_dict)
|
||
|
||
# 確保 profit 欄位存在,如果不存在則設為 0
|
||
if 'profit' not in category_df.columns:
|
||
category_df['profit'] = 0
|
||
|
||
# 轉為字典列表
|
||
return category_df.to_dict('records')
|
||
|
||
# V-New 2026-01-15: 行銷活動業績聚合函數
|
||
|
||
|
||
def get_taiwan_holiday(date):
|
||
"""判斷是否為台灣國定假日,回傳 (is_holiday, holiday_name)"""
|
||
year = date.year
|
||
month = date.month
|
||
day = date.day
|
||
|
||
# 2026年台灣國定假日(根據人事行政總處公佈)
|
||
holidays_2026 = {
|
||
(1, 1): '元旦',
|
||
# 春節連假 (2/14-2/22,共9天)
|
||
(2, 14): '春節連假',
|
||
(2, 15): '小年夜',
|
||
(2, 16): '除夕',
|
||
(2, 17): '春節 (初一)',
|
||
(2, 18): '春節 (初二)',
|
||
(2, 19): '春節 (初三)',
|
||
(2, 20): '春節連假',
|
||
(2, 21): '春節連假',
|
||
(2, 22): '春節連假',
|
||
# 和平紀念日 (2/28-3/2,共3天)
|
||
(2, 28): '和平紀念日',
|
||
(3, 2): '和平紀念日補假',
|
||
# 兒童節+清明節 (4/3-4/6,共4天)
|
||
(4, 3): '兒童節補假',
|
||
(4, 4): '清明節',
|
||
(4, 5): '清明節連假',
|
||
(4, 6): '清明節補假',
|
||
# 勞動節 (5/1-5/3,共3天)
|
||
(5, 1): '勞動節',
|
||
# 端午節 (6/19-6/21,共3天)
|
||
(6, 19): '端午節',
|
||
# 中秋節+教師節 (9/25-9/28,共4天)
|
||
(9, 25): '中秋節',
|
||
(9, 28): '教師節',
|
||
# 國慶日 (10/9-10/11,共3天)
|
||
(10, 9): '國慶日補假',
|
||
(10, 10): '國慶日',
|
||
# 光復節 (10/25-10/26,共2天)
|
||
(10, 25): '臺灣光復節',
|
||
(10, 26): '光復節補假',
|
||
# 行憲紀念日 (12/25-12/27,共3天)
|
||
(12, 25): '行憲紀念日',
|
||
}
|
||
|
||
# 2027年台灣國定假日(預先計算部分)
|
||
holidays_2027 = {
|
||
(1, 1): '元旦',
|
||
(2, 11): '春節 (除夕)',
|
||
(2, 12): '春節 (初一)',
|
||
(2, 13): '春節 (初二)',
|
||
(2, 14): '春節 (初三)',
|
||
(2, 15): '春節 (初四)',
|
||
(2, 16): '春節 (初五)',
|
||
(2, 17): '春節 (初六)',
|
||
(2, 28): '和平紀念日',
|
||
(4, 4): '清明節',
|
||
(4, 5): '清明節連假',
|
||
(6, 14): '端午節',
|
||
(9, 21): '中秋節',
|
||
(10, 10): '國慶日',
|
||
(10, 11): '國慶日連假',
|
||
}
|
||
|
||
holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {})
|
||
|
||
holiday_name = holidays.get((month, day))
|
||
return (True, holiday_name) if holiday_name else (False, None)
|
||
|
||
def prepare_calendar_data(df, selected_month):
|
||
"""準備行事曆數據(豐富版:顯示總業績、毛利、SKU數 + DoD%)"""
|
||
import calendar
|
||
|
||
# 取得該月份的年月
|
||
year = selected_month.year
|
||
month = selected_month.month
|
||
|
||
# 計算該月第一天和最後一天
|
||
first_day = pd.Timestamp(year=year, month=month, day=1)
|
||
last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1])
|
||
|
||
# 計算行事曆顯示範圍(包含前後月份的日期以填滿週)
|
||
# 取得該月第一天是星期幾 (0=Monday, 6=Sunday)
|
||
first_weekday = first_day.weekday()
|
||
|
||
# 計算行事曆起始日(從週一開始)
|
||
calendar_start = first_day - timedelta(days=first_weekday)
|
||
|
||
# 計算該月最後一天是星期幾
|
||
last_weekday = last_day.weekday()
|
||
|
||
# 計算行事曆結束日(到週日結束)
|
||
calendar_end = last_day + timedelta(days=(6 - last_weekday))
|
||
|
||
# 取得該月份及前後各一天的所有資料(用於計算 DoD)
|
||
data_start = first_day - timedelta(days=1)
|
||
data_end = last_day
|
||
month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)]
|
||
|
||
# 取得欄位
|
||
cols = df.columns.tolist()
|
||
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
|
||
col_cost = find_col(cols, ['成本', 'Cost'])
|
||
col_profit = find_col(cols, ['毛利', 'Profit'])
|
||
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
|
||
col_name = find_col(cols, ['商品名稱', '品名'])
|
||
|
||
# 為每一天計算 KPI
|
||
calendar_days = []
|
||
current_date = calendar_start
|
||
|
||
while current_date <= calendar_end:
|
||
# 取得星期(0=週一, 6=週日)
|
||
weekday = current_date.weekday()
|
||
weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
|
||
|
||
# 判斷是否為國定假日
|
||
is_holiday, holiday_name = get_taiwan_holiday(current_date)
|
||
|
||
day_data = {
|
||
'date': current_date.strftime('%Y-%m-%d'),
|
||
'day': current_date.day,
|
||
'weekday': weekday_names[weekday],
|
||
'is_weekend': weekday >= 5, # 週六或週日
|
||
'is_holiday': is_holiday,
|
||
'holiday_name': holiday_name,
|
||
'is_current_month': current_date.month == month,
|
||
'has_data': False,
|
||
'revenue': 0,
|
||
'profit': 0,
|
||
'margin_rate': 0,
|
||
'sku_count': 0,
|
||
'qty': 0,
|
||
'avg_price': 0,
|
||
'dod_percent': 0,
|
||
'dod_direction': 'neutral' # 'up', 'down', 'neutral'
|
||
}
|
||
|
||
# 如果該日期在當前月份範圍內,計算 KPI
|
||
if first_day <= current_date <= last_day:
|
||
day_df = month_df[month_df['snapshot_date'] == current_date]
|
||
|
||
if not day_df.empty:
|
||
day_data['has_data'] = True
|
||
|
||
# 計算總業績
|
||
if col_amount:
|
||
day_data['revenue'] = float(day_df[col_amount].sum())
|
||
|
||
# 計算毛利(優先使用毛利欄位,否則用業績-成本計算)
|
||
if col_profit:
|
||
day_data['profit'] = float(day_df[col_profit].sum())
|
||
elif col_cost and col_amount:
|
||
total_cost = float(day_df[col_cost].sum())
|
||
day_data['profit'] = day_data['revenue'] - total_cost
|
||
|
||
# 計算毛利率
|
||
if day_data['revenue'] > 0:
|
||
day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100
|
||
|
||
# 計算銷量
|
||
if col_qty:
|
||
day_data['qty'] = float(day_df[col_qty].sum())
|
||
|
||
# 計算客單價(總業績 / 總銷量)
|
||
if day_data['qty'] > 0:
|
||
day_data['avg_price'] = day_data['revenue'] / day_data['qty']
|
||
|
||
# 計算 SKU 數
|
||
if col_name:
|
||
day_data['sku_count'] = int(day_df[col_name].nunique())
|
||
|
||
# 計算 DoD%
|
||
prev_date = current_date - timedelta(days=1)
|
||
prev_df = month_df[month_df['snapshot_date'] == prev_date]
|
||
|
||
if not prev_df.empty and col_amount:
|
||
prev_revenue = float(prev_df[col_amount].sum())
|
||
if prev_revenue > 0:
|
||
dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100
|
||
day_data['dod_percent'] = round(dod, 1)
|
||
day_data['dod_direction'] = 'up' if dod >= 0 else 'down'
|
||
|
||
calendar_days.append(day_data)
|
||
current_date += timedelta(days=1)
|
||
|
||
# 組織成週結構(每週 7 天)
|
||
weeks = []
|
||
for i in range(0, len(calendar_days), 7):
|
||
weeks.append(calendar_days[i:i+7])
|
||
|
||
# 計算上個月和下個月的年月
|
||
prev_month = selected_month - pd.DateOffset(months=1)
|
||
next_month = selected_month + pd.DateOffset(months=1)
|
||
|
||
return {
|
||
'year': year,
|
||
'month': month,
|
||
'month_name': selected_month.strftime('%Y年%m月'),
|
||
'weeks': weeks,
|
||
'prev_month': prev_month.strftime('%Y-%m'),
|
||
'next_month': next_month.strftime('%Y-%m')
|
||
}
|
||
|
||
# ================= ⚙️ 5. 服務啟動邏輯 =================
|
||
|
||
def run_schedule():
|
||
"""在背景執行緒中運行排程"""
|
||
sys_log.info("🚀 排程服務已啟動,等待任務...")
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|
||
|
||
def init_scheduler():
|
||
"""初始化排程任務(Gunicorn 模式下也會執行)"""
|
||
schedule.every(1).hours.do(run_momo_task)
|
||
schedule.every(1).hours.do(run_edm_task)
|
||
schedule.every(1).hours.do(run_festival_task)
|
||
sys_log.info(f"📅 已設定每小時執行主站、EDM與購物節爬蟲任務")
|
||
|
||
schedule.every(30).minutes.do(run_auto_import_task)
|
||
sys_log.info(f"📅 已設定每 30 分鐘執行 Google Drive 自動匯入任務")
|
||
|
||
schedule.every(30).minutes.do(run_whitepage_check)
|
||
sys_log.info(f"📅 已設定每 30 分鐘執行網頁白頁監控任務")
|
||
|
||
schedule.every(4).hours.do(run_competitor_price_feeder_task)
|
||
sys_log.info(f"📅 已設定每 4 小時執行 PChome 競品價格抓取任務")
|
||
|
||
# 啟動排程執行緒
|
||
scheduler_thread = threading.Thread(target=run_schedule, daemon=True)
|
||
scheduler_thread.start()
|
||
sys_log.info("✅ 排程器已在背景執行緒中啟動")
|
||
|
||
# V-New: 在模組載入時自動初始化排程(Gunicorn 模式下也會執行)
|
||
# 🚩 V-Fix 2026-01-14: 停用自動排程器以避免多個 gunicorn workers 重複執行任務
|
||
# 原因:每個 worker 都會啟動排程器,導致 4x 資源消耗(4 workers × 3 爬蟲任務 = 12 Chrome 實例同時運行)
|
||
# 解決方案:改用獨立的 run_scheduler.py 或透過 Web UI 手動觸發任務
|
||
# try:
|
||
# init_scheduler()
|
||
# except Exception as e:
|
||
# sys_log.error(f"❌ 排程器初始化失敗: {e}")
|
||
sys_log.info("ℹ️ 自動排程器已停用(避免重複執行),請使用 run_scheduler.py 或 Web UI 手動觸發")
|
||
|
||
def start_flask():
|
||
sys_log.info("🚀 Web 服務正在啟動於 port 80...")
|
||
app.run(host='0.0.0.0', port=80, use_reloader=False)
|
||
|
||
def scheduled_job_wrapper():
|
||
"""執行 MOMO 爬蟲任務並發送通知"""
|
||
timestamp = datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')
|
||
sys_log.info(f"⏰ [{timestamp}] 啟動背景抓取執行緒...")
|
||
|
||
def job():
|
||
# 1. 執行爬蟲
|
||
run_momo_task()
|
||
|
||
# 2. 發送通知 (僅發送今日異動)
|
||
try:
|
||
# 重新載入通知模組
|
||
import importlib
|
||
import scheduler
|
||
import services.notification_manager
|
||
importlib.reload(scheduler)
|
||
importlib.reload(services.notification_manager)
|
||
from services.notification_manager import NotificationManager
|
||
|
||
stats = get_dashboard_stats()
|
||
|
||
# 只要有任何異動數據就發送通知
|
||
if any(stats.values()):
|
||
screenshot_path = scheduler.capture_page_screenshot("http://127.0.0.1/", "momo_dashboard")
|
||
NotificationManager().send_momo_report(stats, screenshot_path)
|
||
except Exception as e:
|
||
sys_log.error(f"[Scheduler] ❌ 發送通知失敗: {e}")
|
||
|
||
threading.Thread(target=job, daemon=True).start()
|
||
|
||
if __name__ == "__main__":
|
||
banner = f" MOMO 專業數據管理系統 {SYSTEM_VERSION} "
|
||
sys_log.info(f"{ '='*20} {banner} {'='*20}")
|
||
|
||
# 啟動前先檢查資料庫結構
|
||
repair_database_schema()
|
||
|
||
# 使用生產環境域名
|
||
public_url = "https://mo.wooo.work"
|
||
sys_log.info(f"✅ 使用固定網址: {public_url}")
|
||
|
||
# 🚩 V9.7 將公開 URL 寫入設定檔,供其他模組使用
|
||
try:
|
||
url_config_path = os.path.join(BASE_DIR, 'data', 'url_config.json')
|
||
with open(url_config_path, 'w') as f:
|
||
json.dump({"public_url": public_url}, f)
|
||
except Exception as file_err:
|
||
sys_log.error(f"⚠️ URL 設定檔寫入失敗 (不影響服務運行,可能磁碟已滿): {file_err}")
|
||
|
||
web_server = threading.Thread(target=start_flask)
|
||
web_server.daemon = True
|
||
web_server.start()
|
||
|
||
# 排程器已在模組載入時自動初始化(見 init_scheduler() 函式)
|
||
sys_log.info("ℹ️ 排程器已在全域範圍初始化完成")
|
||
|
||
try:
|
||
while True:
|
||
time.sleep(3600)
|
||
except KeyboardInterrupt:
|
||
sys_log.info("🔌 Web 服務已關閉")
|
||
try:
|
||
ngrok.disconnect(public_url)
|
||
except Exception as e:
|
||
sys_log.info(f"ℹ️ Ngrok 關閉時無需額外操作: {e}")
|