Files
ewoooc/app.py
OoO fb9c4ad1b5
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
refactor(openclaw): 抽出 Telegram API helper
2026-04-30 14:24:45 +08:00

1207 lines
48 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ================= TODO LIST (待辦事項 - 重開機後請依序執行) =================
# 1. [驗證] 重啟 app.py 後,重新匯入 Excel確認「自動去重」功能是否生效 (重複匯入應顯示 0 筆新增)。
# 2. [檢查] 前往 /sales_analysis 頁面,確認 '狀態' 欄位是否正確顯示 'F' (目前為原始匯入模式)。
# 3. [決策] 若資料顯示正常,評估是否需要恢復「智慧資料清理」邏輯 (目前程式碼第 1160 行左右已註解)。
# 4. [備份] 確認系統運作正常後,執行系統備份。
# =======================================================================
import os
import sys
import time
import threading
import math
import json
import hashlib
import shutil
import zipfile
import re
import io # V-New: 用於 Excel 匯出
import traceback # V-Fix: 用於錯誤追蹤
from datetime import datetime, timedelta, timezone
# ================= 🔧 1. 環境與路徑鎖定 =================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 確保專案根目錄在 sys.path 的最前面,優先讀取本地模組
sys.path.insert(0, BASE_DIR)
# 自動檢核並建立必要目錄
try:
for folder in ['database', 'services', 'crawler', 'logs', 'data', 'templates', 'web/static']:
folder_path = os.path.join(BASE_DIR, folder)
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# 僅針對 Python 套件目錄建立 __init__.py
if 'web' not in folder:
init_file = os.path.join(folder_path, '__init__.py')
if not os.path.exists(init_file):
with open(init_file, 'w') as f: pass
except OSError as e:
print(f"❌ 系統初始化失敗: 無法建立目錄或檔案 (磁碟可能已滿) - {e}")
# ================= 🔧 2. 核心模組導入 =================
try:
from flask import Flask, render_template, jsonify, request, send_file, redirect, url_for, send_from_directory, flash, session
from werkzeug.utils import secure_filename
from pyngrok import ngrok, conf
import schedule
from sqlalchemy import desc, and_, func, text, literal, case
from sqlalchemy import inspect # V-New: 用於檢查資料表是否存在
from sqlalchemy.orm import joinedload
import pandas as pd # type: ignore
from pandas.api.types import is_numeric_dtype # type: ignore
import numpy as np # type: ignore # V-Opt: 引入 numpy 進行向量化運算加速
# 導入自定義模組
try:
from scheduler import run_momo_task, run_edm_task, run_festival_task, run_auto_import_task, run_whitepage_check, run_competitor_price_feeder_task
from database.manager import DatabaseManager
from database.models import Base, Product, PriceRecord, MonthlySummaryAnalysis
from database.edm_models import PromoProduct
except ImportError as e:
print(f"❌ 專案內部檔案缺失: {e}\n請檢查 database/ 或 services/ 目錄下的 .py 檔案是否存在。")
sys.exit(1)
from services.logger_manager import SystemLogger
from services.exporter import Exporter # 🚩 導入匯出模組
except ImportError as e:
print(f"❌ 關鍵套件導入失敗: {e}")
sys.exit(1)
# ================= 🔧 3. 系統核心配置 =================
# 從 config.py 匯入必要的設定
from config import EXCEL_EXPORT_DIR, DATABASE_TYPE, validate_critical_config
sys_log = SystemLogger("Web_Server").get_logger()
# 驗證選用配置,缺少時輸出 warning非 fatal
for _warn in validate_critical_config():
sys_log.warning(_warn)
# 商品看板 cache 單一來源。實際路由已在 routes/dashboard_routes.py。
from services.cache_manager import _DASHBOARD_DATA_CACHE, _DASHBOARD_CACHE_TTL # noqa: E402
# 🚩 檢查磁碟空間 (V9.52 新增)
try:
total, used, free = shutil.disk_usage(BASE_DIR)
if free < 200 * 1024 * 1024: # 小於 200MB
sys_log.critical(f"[System] [DISK_CHECK] 🚨 嚴重警告: 磁碟空間極低 | Free: {free // (1024*1024)} MB")
elif free < 1024 * 1024 * 1024: # 小於 1GB
sys_log.warning(f"[System] [DISK_CHECK] ⚠️ 警告: 磁碟空間不足 1GB | Free: {free // (1024*1024)} MB")
except Exception as e:
sys_log.error(f"無法檢測磁碟空間: {e}")
# 🚩 系統版本定義 (備份與顯示用)
# 🚩 2026-04-30 V10.23: OpenClaw Telegram API helper extraction
SYSTEM_VERSION = "V10.23"
# ==========================================
# 🔒 SQL Injection 防護函數
# ==========================================
# 允許的資料表白名單
# 安全工具:實作已搬至 utils/security.py此處 re-export 維持向後相容
from utils.security import ( # noqa: E402
ALLOWED_TABLES,
validate_table_name,
validate_column_names,
)
# 安全工具:路徑遍歷 + 檔案上傳驗證 + safe_read_sql 已搬至 utils/security.py
from utils.security import ( # noqa: E402
safe_read_sql,
safe_join,
ALLOWED_UPLOAD_EXTENSIONS,
ALLOWED_MIME_TYPES,
secure_filename_unicode,
allowed_file,
validate_upload_file,
)
# 🚩 資料庫結構自動修復 (V9.53 新增) — 實作搬至 database/schema_repair.py
from database.schema_repair import repair_database_schema # noqa: E402, F401
# 從環境變數讀取 NGROK_AUTH_TOKEN未設定時禁止使用硬編碼預設值
NGROK_AUTH_TOKEN = os.getenv('NGROK_AUTH_TOKEN', '')
if NGROK_AUTH_TOKEN:
conf.get_default().auth_token = NGROK_AUTH_TOKEN
else:
sys_log.warning("[Security] ⚠️ NGROK_AUTH_TOKEN 未設定,已跳過 ngrok auth token 注入")
TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates')
STATIC_DIR = os.path.join(BASE_DIR, 'web/static')
# 檢查關鍵模板是否存在
if not os.path.exists(os.path.join(TEMPLATE_DIR, 'dashboard.html')):
sys_log.warning(f"[Web] [Template] ⚠️ 警告: 找不到 dashboard.html | Path: {TEMPLATE_DIR}")
app = Flask(__name__,
template_folder=TEMPLATE_DIR,
static_folder=STATIC_DIR)
# ==========================================
# 🔒 Flask 安全配置
# ==========================================
# 從 config.py 導入 SECRET_KEY
from config import SECRET_KEY
# 基本配置
app.config['SECRET_KEY'] = SECRET_KEY
# Session 安全配置
app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止 JavaScript 存取 cookie防 XSS
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # 防止 CSRF 攻擊
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24) # Session 有效期 24 小時(延長避免長時間閒置斷線)
# 如果使用 HTTPS啟用 SECURE cookie本地開發時應設為 False
# 注意:如果您的系統部署在 HTTPS 環境,請將 .env 中的 USE_HTTPS 設為 true
USE_HTTPS = os.getenv('USE_HTTPS', 'false').lower() == 'true'
if USE_HTTPS:
app.config['SESSION_COOKIE_SECURE'] = True
sys_log.info("[Security] ✅ HTTPS 模式已啟用Session cookie 僅透過 HTTPS 傳輸")
else:
app.config['SESSION_COOKIE_SECURE'] = False
sys_log.warning("[Security] ⚠️ HTTP 模式開發環境Session cookie 未強制 HTTPS")
# 檔案上傳大小限制10MB
# V-New: 提高檔案上傳大小限制 (從 10MB 提高到 100MB)
app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024
sys_log.info("[Security] ✅ Flask 安全配置已載入")
sys_log.info(f"[Security] • Session 有效期: 2 小時")
sys_log.info(f"[Security] • 檔案上傳限制: 10 MB")
sys_log.info(f"[Security] • CSRF 防護: SameSite=Lax")
sys_log.info(f"[Security] • XSS 防護: HttpOnly=True")
# ==========================================
# 🔒 CSRF 防護配置
# ==========================================
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
sys_log.info("[Security] ✅ CSRF 防護已啟用 (Flask-WTF)")
# ==========================================
# 🔧 Blueprint 註冊 - 廠商缺貨系統
# ==========================================
from routes.vendor_routes import vendor_bp
app.register_blueprint(vendor_bp)
sys_log.info("[Blueprint] ✅ 廠商缺貨系統 Blueprint 已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - Google Drive 自動匯入
# ==========================================
from routes.auto_import_routes import auto_import_bp
app.register_blueprint(auto_import_bp)
csrf.exempt(auto_import_bp)
sys_log.info("[Blueprint] ✅ Google Drive 自動匯入 Blueprint 已註冊 (CSRF 已豁免)")
# ==========================================
# 🔧 Blueprint 註冊 - 爬蟲管理系統
# ==========================================
from routes.crawler_management_routes import crawler_bp
app.register_blueprint(crawler_bp)
sys_log.info("[Blueprint] ✅ 爬蟲管理系統 Blueprint 已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - AI 智慧文案系統
# ==========================================
from routes.ai_routes import ai_bp
app.register_blueprint(ai_bp)
csrf.exempt(ai_bp) # ICAIM API 使用內部呼叫,不需要 CSRF
sys_log.info("[Blueprint] ✅ AI 智慧文案系統 Blueprint 已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - CI/CD Dashboard
# ==========================================
from routes.cicd_routes import cicd_bp
app.register_blueprint(cicd_bp)
csrf.exempt(cicd_bp) # CI/CD API doesn't need CSRF
sys_log.info("[Blueprint] CI/CD Dashboard Blueprint registered")
# ==========================================
# 🔧 Blueprint 註冊 - Code Review 系統
# ==========================================
try:
from routes.code_review_routes import code_review_bp
app.register_blueprint(code_review_bp)
csrf.exempt(code_review_bp) # Code Review API 使用內部認證,不需要 CSRF
sys_log.info("[Blueprint] ✅ Code Review 系統 Blueprint 已註冊 (CSRF 已豁免)")
except Exception as _e:
sys_log.warning(f"[Blueprint] ⚠️ Code Review 系統 Blueprint 註冊失敗: {_e}")
# ==========================================
# 🔧 Blueprint 註冊 - 趨勢資料系統
# ==========================================
from routes.trend_routes import trend_bp
app.register_blueprint(trend_bp)
sys_log.info("[Blueprint] ✅ 趨勢資料系統 Blueprint 已註冊")
# ==========================================
# 🔒 Auth 路由註冊 - 登入/登出
# ==========================================
from auth import init_auth_routes, login_required
init_auth_routes(app)
sys_log.info("[Auth] ✅ 登入/登出路由已註冊")
# ==========================================
# 🔧 Blueprint 註冊 - 用戶管理系統
# ==========================================
from routes.user_routes import user_bp
app.register_blueprint(user_bp)
sys_log.info("[Blueprint] ✅ 用戶管理系統 Blueprint 已註冊")
# ==========================================
# 🚨 Blueprint 註冊 - 系統告警
# ==========================================
from routes.alert_routes import alert_bp
app.register_blueprint(alert_bp)
csrf.exempt(alert_bp)
sys_log.info("[Blueprint] ✅ 系統告警 Blueprint 已註冊 (CSRF 已豁免)")
# ==========================================
# 系統管理路由 Blueprint
# ==========================================
from routes.system_routes import system_bp
app.register_blueprint(system_bp)
csrf.exempt(system_bp) # n8n API 需要豁免 CSRF
sys_log.info("[Blueprint] ✅ 系統管理 Blueprint 已註冊 (CSRF 已豁免)")
from routes.system_public_routes import system_public_bp
app.register_blueprint(system_public_bp)
sys_log.info("[Blueprint] ✅ 公開系統頁面 Blueprint 已註冊")
from routes.category_routes import category_bp
app.register_blueprint(category_bp)
sys_log.info("[Blueprint] ✅ 分類 CRUD Blueprint 已註冊")
from routes.misc_routes import misc_bp
app.register_blueprint(misc_bp)
sys_log.info("[Blueprint] ✅ 雜項 Routes Blueprint 已註冊 (/api/test_url, /brand_assets)")
# ==========================================
# 通知模板管理 Blueprint
# ==========================================
from routes.notification_routes import notification_bp
app.register_blueprint(notification_bp)
csrf.exempt(notification_bp) # n8n API 需要豁免 CSRF
sys_log.info("[Blueprint] ✅ 通知模板管理 Blueprint 已註冊")
# ==========================================
# Bot API Blueprint (Clawdbot 整合)
# ==========================================
from routes.bot_api_routes import bot_api_bp
app.register_blueprint(bot_api_bp)
csrf.exempt(bot_api_bp) # Bot API 使用 Token 認證,不需要 CSRF
sys_log.info("[Blueprint] ✅ Bot API Blueprint 已註冊")
# ==========================================
# Elephant Alpha AI Agent Super Orchestrator Blueprint
# ==========================================
try:
from routes.elephant_alpha_routes import elephant_alpha_bp
app.register_blueprint(elephant_alpha_bp)
csrf.exempt(elephant_alpha_bp) # Elephant Alpha API uses internal auth
sys_log.info("[Blueprint] Elephant Alpha AI Agent Super Orchestrator Blueprint registered")
except Exception as _e:
sys_log.warning(f"[Blueprint] Elephant Alpha registration failed: {_e}")
sys_log.info("[Blueprint] Elephant Alpha features will be unavailable")
# [2026-04-18 台北] OpenClaw Bot Blueprint — 修復 /menu 啞巴 (/bot/telegram/webhook 404)
# 原因routes/openclaw_bot_routes.py 有 5000+ 行完整 telegram bot handler但 app.py 從未 register
# 效果Telegram 送進來的 update (包含 /menu) 能被正確接收與處理
try:
from routes.openclaw_bot_routes import openclaw_bot_bp
app.register_blueprint(openclaw_bot_bp)
csrf.exempt(openclaw_bot_bp) # Telegram webhook 不需要 CSRF
sys_log.info("[Blueprint] ✅ OpenClaw Bot Blueprint 已註冊 (Telegram /menu 復活)")
except Exception as _e:
sys_log.error(f"[Blueprint] ❌ OpenClaw Bot Blueprint 註冊失敗: {_e}")
from routes.api_routes import api_bp
app.register_blueprint(api_bp)
sys_log.info("[Blueprint] ✅ api_bp 已註冊")
from routes.edm_routes import edm_bp
app.register_blueprint(edm_bp)
sys_log.info("[Blueprint] ✅ edm_bp 已註冊")
from routes.sales_routes import sales_bp
app.register_blueprint(sales_bp)
sys_log.info("[Blueprint] ✅ sales_bp 已註冊")
from routes.monthly_routes import monthly_bp
app.register_blueprint(monthly_bp)
sys_log.info("[Blueprint] ✅ monthly_bp 已註冊")
from routes.price_comparison_routes import price_comparison_bp
app.register_blueprint(price_comparison_bp)
sys_log.info("[Blueprint] ✅ price_comparison_bp 已註冊")
from routes.export_routes import export_bp
app.register_blueprint(export_bp)
sys_log.info("[Blueprint] ✅ export_bp 已註冊")
from routes.daily_sales_routes import daily_sales_bp
app.register_blueprint(daily_sales_bp)
sys_log.info("[Blueprint] ✅ daily_sales_bp 已註冊")
from routes.dashboard_routes import dashboard_bp
app.register_blueprint(dashboard_bp)
sys_log.info("[Blueprint] ✅ dashboard_bp 已註冊")
from routes.import_routes import import_bp
app.register_blueprint(import_bp)
sys_log.info("[Blueprint] ✅ import_bp 已註冊")
from routes.pchome_routes import pchome_bp
app.register_blueprint(pchome_bp)
sys_log.info("[Blueprint] ✅ pchome_bp 已註冊")
# V-Fix: 註冊 slugify 函數供模板使用(實作搬至 utils/text_helpers.py
from utils.text_helpers import slugify # noqa: E402
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = "服務啟動中..."
# 🚩 時區設定:台北時間 (UTC+8)
TAIPEI_TZ = timezone(timedelta(hours=8))
EXPECTED_METADATA_TABLES = {
'categories', 'products', 'price_records', 'monthly_summary_analysis',
'users', 'login_history', 'permissions', 'user_permissions',
'promo_products', 'trend_records', 'trend_keywords', 'trend_analysis',
'web_search_cache', 'telegram_users',
'ai_generation_history', 'ai_prompt_templates', 'ai_usage_tracking', 'ai_insights',
'agent_context', 'action_plans', 'action_outcomes', 'agent_strategy_weights',
'incidents', 'playbooks', 'heal_logs',
'import_jobs', 'import_config', 'notification_templates', 'ppt_reports',
'vendor_stockout', 'vendor_list', 'vendor_emails', 'email_send_log',
'realtime_sales_monthly',
}
def verify_metadata_tables():
missing = EXPECTED_METADATA_TABLES - set(Base.metadata.tables.keys())
if missing:
raise SystemExit(f"Base.metadata 漏表: {sorted(missing)}")
verify_metadata_tables()
# ==========================================
# 🔧 全域模板變數注入 (Context Processor)
# ==========================================
from config import METABASE_URL, GRIST_URL
@app.context_processor
def inject_global_vars():
"""注入全域變數到所有模板"""
return {
'metabase_url': METABASE_URL,
'grist_url': GRIST_URL,
'datetime_now': datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M:%S'),
}
sys_log.info("[Template] ✅ 全域模板變數已注入 (metabase_url, grist_url)")
# ================= 🛠️ V9.72: 分類設定管理核心 =================
CATEGORIES_JSON_PATH = os.path.join(BASE_DIR, 'data', 'categories.json')
# JSON 持久化:實作搬至 services/json_storage.py
from services.json_storage import ( # noqa: E402, F401
load_categories,
save_categories,
load_scheduler_stats,
)
# ================= 🛠️ 數據處理核心 (封裝) =================
# 純工具:實作已搬至 utils/text_helpers.py
from utils.text_helpers import ( # noqa: E402
get_color_for_string,
extract_snapshot_date_from_filename,
number_format as _number_format,
)
@app.template_filter('number_format')
def number_format_filter(value):
"""Jinja filter wrapper — 實作見 utils.text_helpers.number_format。"""
return _number_format(value)
# V-Refactor: 將 find_col 移至全域,方便多個函式共用
from utils.df_helpers import find_col # noqa: E402
def get_consolidated_data():
"""🚩 統一封裝:獲取全分類去重後的當前數據、昨日對比及差值 (V-Opt: 優化查詢效能 + 快取)"""
global _DASHBOARD_DATA_CACHE
# V-New: 檢查快取是否有效
now = datetime.now(TAIPEI_TZ)
if (_DASHBOARD_DATA_CACHE['consolidated_data'] is not None and
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] is not None):
cache_age = (now.timestamp() - _DASHBOARD_DATA_CACHE['consolidated_timestamp'])
if cache_age < _DASHBOARD_CACHE_TTL:
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用快取資料 | 快取年齡: {cache_age:.1f}")
return _DASHBOARD_DATA_CACHE['consolidated_data'], _DASHBOARD_DATA_CACHE['today_start']
sys_log.debug("[Dashboard] [Cache] 🔄 快取過期或不存在,重新查詢資料庫")
db = DatabaseManager()
session = db.get_session()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
seven_days_ago = today_start - timedelta(days=7)
thirty_days_ago = today_start - timedelta(days=30)
try:
# Query 1: Get the latest price record for every product. This is our main list of items.
latest_price_subq = session.query(
func.max(PriceRecord.id).label('max_id')
).group_by(PriceRecord.product_id).subquery()
latest_records = session.query(PriceRecord).options(
joinedload(PriceRecord.product)
).join(latest_price_subq, PriceRecord.id == latest_price_subq.c.max_id).all()
product_ids = [r.product_id for r in latest_records]
if not product_ids:
session.close() # 提前關閉連線
return [], today_start
# Query 2: Get yesterday's closing prices for all products in one go
yesterday_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < today_start
).group_by(PriceRecord.product_id).subquery()
yesterday_prices_q = session.query(
PriceRecord.product_id, PriceRecord.price
).join(
yesterday_prices_subq,
PriceRecord.id == yesterday_prices_subq.c.max_id
)
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
# Query 3: Get specific historical price points (7 days ago and 30 days ago)
# Instead of fetching ALL history, we fetch only the records closest to the target dates.
# This is a significant optimization.
# Helper to get price map for a specific date (start of day)
def get_price_map_before(target_date):
subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.timestamp).label('max_ts')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < target_date
).group_by(PriceRecord.product_id).subquery()
q = session.query(PriceRecord.product_id, PriceRecord.price).join(
subq,
and_(PriceRecord.product_id == subq.c.product_id, PriceRecord.timestamp == subq.c.max_ts)
)
return {pid: price for pid, price in q}
prices_7d_ago_map = get_price_map_before(seven_days_ago + timedelta(days=1)) # Approximate 7 days ago
prices_30d_ago_map = get_price_map_before(thirty_days_ago + timedelta(days=1)) # Approximate 30 days ago
# Query 4: Get TODAY's records only (for sparkline/intraday change)
today_records_q = session.query(PriceRecord).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp >= today_start
).order_by(PriceRecord.product_id, PriceRecord.timestamp).all()
today_map = {}
for r in today_records_q:
if r.product_id not in today_map: today_map[r.product_id] = []
today_map[r.product_id].append(r)
# Final Assembly (in-memory, no more DB queries)
unique_items = []
for r in latest_records:
pid = r.product_id
# 7d/30d stats
price_7d = prices_7d_ago_map.get(pid)
price_30d = prices_30d_ago_map.get(pid)
stats_7d_diff = r.price - price_7d if price_7d is not None else 0
stats_30d_diff = r.price - price_30d if price_30d is not None else 0
# Today's stats
today_records = today_map.get(pid, [])
today_diff = 0
today_changes = []
if len(today_records) > 1:
today_diff = today_records[-1].price - today_records[0].price
# Yesterday diff
y_price = yesterday_prices_map.get(pid)
yesterday_diff = r.price - y_price if y_price is not None else 0
status = "NONE"
if yesterday_diff > 0:
status = "PRICE_UP"
elif yesterday_diff < 0:
status = "PRICE_DOWN"
# Today's changes details
last_p = y_price if y_price is not None else (today_records[0].price if today_records else r.price)
for tr in today_records:
if tr.price != last_p:
diff = tr.price - last_p
today_changes.append({
'time': tr.timestamp.strftime('%H:%M'),
'price': tr.price,
'diff': diff
})
last_p = tr.price
unique_items.append({
'record': r,
'stats': {'7d_diff': stats_7d_diff, '30d_diff': stats_30d_diff, '1d_diff': today_diff},
'yesterday_diff': yesterday_diff,
'today_changes': today_changes,
'status': status
})
# V-New: 更新快取
_DASHBOARD_DATA_CACHE['consolidated_data'] = unique_items
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] = now.timestamp()
_DASHBOARD_DATA_CACHE['today_start'] = today_start
sys_log.debug(f"[Dashboard] [Cache] 💾 快取已更新 | 商品數: {len(unique_items)}")
return unique_items, today_start
finally:
session.close()
def get_dashboard_stats():
"""計算看板統計數據 (供通知使用) — backward-compat wrapper."""
from services.dashboard_service import get_dashboard_stats as _get_dashboard_stats
return _get_dashboard_stats()
# ================= 🛣️ 4. Flask 路由 =================
# Session 自動續期機制
@app.before_request
def refresh_session():
"""
在每次請求時自動刷新 Session避免長時間閒置後突然斷線
只要用戶有任何操作Session 就會自動延長
"""
if session.get('logged_in'):
session.modified = True # 標記 Session 已修改,觸發 Cookie 更新
def verify_unique_routes():
"""啟動期防線:同一 URL + method 不得由兩個 endpoint 同時註冊。"""
seen = {}
for rule in app.url_map.iter_rules():
key = (str(rule), frozenset(rule.methods - {'HEAD', 'OPTIONS'}))
if key in seen:
raise SystemExit(f"重複路由: {key} 來自 {seen[key]}{rule.endpoint}")
seen[key] = rule.endpoint
verify_unique_routes()
def preprocess_daily_sales_data(df):
"""前處理當日業績資料:欄位識別、型別轉換"""
cols = df.columns.tolist()
# 欄位自動識別(使用現有的 find_col 函式)
col_amount = find_col(cols, ['銷售金額', '業績', '金額', 'Amount', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
# 型別轉換
if col_amount:
df[col_amount] = pd.to_numeric(df[col_amount], errors='coerce').fillna(0)
if col_cost:
df[col_cost] = pd.to_numeric(df[col_cost], errors='coerce').fillna(0)
if col_profit:
df[col_profit] = pd.to_numeric(df[col_profit], errors='coerce').fillna(0)
if col_qty:
df[col_qty] = pd.to_numeric(df[col_qty], errors='coerce').fillna(0)
# 日期轉換
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'], errors='coerce')
return df
def calculate_daily_kpis(df, date_str):
"""計算單日 6 個 KPI"""
day_df = df[df['snapshot_date'] == date_str]
cols = day_df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost', '總成本'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名', 'Name'])
total_revenue = float(day_df[col_amount].sum()) if col_amount else 0
total_cost = float(day_df[col_cost].sum()) if col_cost else 0
gross_margin = float(day_df[col_profit].sum()) if col_profit else (total_revenue - total_cost)
total_qty = float(day_df[col_qty].sum()) if col_qty else 0
sku_count = int(day_df[col_name].nunique()) if col_name else 0
avg_price = total_revenue / total_qty if total_qty > 0 else 0
return {
'total_revenue': total_revenue,
'total_cost': total_cost,
'gross_margin': gross_margin,
'total_qty': total_qty,
'sku_count': sku_count,
'avg_price': avg_price
}
def calculate_dod(df, current_date):
"""計算 Day-over-Day 變化率"""
current = calculate_daily_kpis(df, current_date)
prev_date = current_date - timedelta(days=1)
if prev_date not in df['snapshot_date'].values:
return {k: 0.0 for k in current.keys()}
previous = calculate_daily_kpis(df, prev_date)
dod = {}
for key in current:
if previous[key] > 0:
dod[key] = ((current[key] - previous[key]) / previous[key]) * 100
else:
dod[key] = 0.0
return dod
def calculate_wow(df, current_date):
"""計算 Week-over-Week 變化率"""
current = calculate_daily_kpis(df, current_date)
prev_week_date = current_date - timedelta(days=7)
if prev_week_date not in df['snapshot_date'].values:
return {k: 0.0 for k in current.keys()}
previous = calculate_daily_kpis(df, prev_week_date)
wow = {}
for key in current:
if previous[key] > 0:
wow[key] = ((current[key] - previous[key]) / previous[key]) * 100
else:
wow[key] = 0.0
return wow
def prepare_daily_charts(df, selected_date, days=30):
"""準備 4 個圖表的數據(根據選擇的日期)"""
# 取選擇日期前 N 天的數據
start_date = selected_date - timedelta(days=days)
df_range = df[(df['snapshot_date'] >= start_date) & (df['snapshot_date'] <= selected_date)]
# 按日期聚合
cols = df_range.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', '總成本'])
col_profit = find_col(cols, ['毛利'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
# 日期聚合
agg_dict = {}
if col_amount:
agg_dict[col_amount] = 'sum'
if col_cost:
agg_dict[col_cost] = 'sum'
if col_profit:
agg_dict[col_profit] = 'sum'
if col_qty:
agg_dict[col_qty] = 'sum'
daily_agg = df_range.groupby('snapshot_date').agg(agg_dict).reset_index()
# 計算或取得毛利(如果沒有毛利欄位,用業績-成本計算)
if col_profit and col_profit in daily_agg.columns:
daily_agg['profit'] = daily_agg[col_profit]
elif col_amount and col_cost and col_amount in daily_agg.columns and col_cost in daily_agg.columns:
daily_agg['profit'] = daily_agg[col_amount] - daily_agg[col_cost]
else:
daily_agg['profit'] = 0
# 計算客單價
if col_amount and col_qty and col_amount in daily_agg.columns and col_qty in daily_agg.columns:
daily_agg['avg_price'] = (daily_agg[col_amount] / daily_agg[col_qty]).fillna(0)
else:
daily_agg['avg_price'] = 0
# 計算 DoD (Day-over-Day) 變化率 - 多個維度
if col_amount and col_amount in daily_agg.columns:
daily_agg['dod_revenue'] = daily_agg[col_amount].pct_change() * 100
if 'profit' in daily_agg.columns:
daily_agg['dod_profit'] = daily_agg['profit'].pct_change() * 100
if 'avg_price' in daily_agg.columns:
daily_agg['dod_avg_price'] = daily_agg['avg_price'].pct_change() * 100
if col_qty and col_qty in daily_agg.columns:
daily_agg['dod_qty'] = daily_agg[col_qty].pct_change() * 100
# 計算 WoW (Week-over-Week) 變化率 - 多個維度
if col_amount and col_amount in daily_agg.columns:
daily_agg['wow_revenue'] = daily_agg[col_amount].pct_change(periods=7) * 100
if 'profit' in daily_agg.columns:
daily_agg['wow_profit'] = daily_agg['profit'].pct_change(periods=7) * 100
if 'avg_price' in daily_agg.columns:
daily_agg['wow_avg_price'] = daily_agg['avg_price'].pct_change(periods=7) * 100
if col_qty and col_qty in daily_agg.columns:
daily_agg['wow_qty'] = daily_agg[col_qty].pct_change(periods=7) * 100
# Top 10 商品(選擇的日期,包含廠商)
selected_df = df[df['snapshot_date'] == selected_date]
top10_labels = []
top10_values = []
if col_name and col_amount:
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
if col_vendor:
# 如果有廠商欄位,按商品+廠商聚合
top10_df = selected_df.groupby([col_name, col_vendor])[col_amount].sum().nlargest(10).reset_index()
top10_labels = [f"{row[col_name]} ({row[col_vendor]})" for _, row in top10_df.iterrows()]
top10_values = top10_df[col_amount].tolist()
else:
# 沒有廠商欄位,只按商品聚合
top10 = selected_df.groupby(col_name)[col_amount].sum().nlargest(10)
top10_labels = top10.index.tolist()
top10_values = top10.values.tolist()
return {
'labels': daily_agg['snapshot_date'].dt.strftime('%m/%d').tolist() if not daily_agg.empty else [],
'revenue': daily_agg[col_amount].tolist() if col_amount and col_amount in daily_agg.columns and not daily_agg.empty else [],
'cost': daily_agg[col_cost].tolist() if col_cost and col_cost in daily_agg.columns and not daily_agg.empty else [],
'profit': daily_agg['profit'].tolist() if 'profit' in daily_agg.columns and not daily_agg.empty else [],
'qty': daily_agg[col_qty].tolist() if col_qty and col_qty in daily_agg.columns and not daily_agg.empty else [],
'avg_price': daily_agg['avg_price'].tolist() if 'avg_price' in daily_agg.columns and not daily_agg.empty else [],
# DoD 多維度
'dod_revenue': daily_agg['dod_revenue'].fillna(0).tolist() if 'dod_revenue' in daily_agg.columns and not daily_agg.empty else [],
'dod_profit': daily_agg['dod_profit'].fillna(0).tolist() if 'dod_profit' in daily_agg.columns and not daily_agg.empty else [],
'dod_avg_price': daily_agg['dod_avg_price'].fillna(0).tolist() if 'dod_avg_price' in daily_agg.columns and not daily_agg.empty else [],
'dod_qty': daily_agg['dod_qty'].fillna(0).tolist() if 'dod_qty' in daily_agg.columns and not daily_agg.empty else [],
# WoW 多維度
'wow_revenue': daily_agg['wow_revenue'].fillna(0).tolist() if 'wow_revenue' in daily_agg.columns and not daily_agg.empty else [],
'wow_profit': daily_agg['wow_profit'].fillna(0).tolist() if 'wow_profit' in daily_agg.columns and not daily_agg.empty else [],
'wow_avg_price': daily_agg['wow_avg_price'].fillna(0).tolist() if 'wow_avg_price' in daily_agg.columns and not daily_agg.empty else [],
'wow_qty': daily_agg['wow_qty'].fillna(0).tolist() if 'wow_qty' in daily_agg.columns and not daily_agg.empty else [],
'top10_labels': top10_labels,
'top10_values': top10_values
}
def prepare_category_summary(df, date_str=None, is_month_view=False, month_start=None, month_end=None):
"""準備分類聚合列表 (支援單日或月度範圍)"""
if is_month_view and month_start is not None and month_end is not None:
day_df = df[(df['snapshot_date'] >= month_start) & (df['snapshot_date'] <= month_end)]
else:
day_df = df[df['snapshot_date'] == date_str]
cols = day_df.columns.tolist()
col_category = find_col(cols, ['館別', '分類', 'Category'])
col_vendor = find_col(cols, ['廠商名稱', '廠商', 'Vendor', 'Supplier'])
col_amount = find_col(cols, ['銷售金額', '業績', '總業績'])
col_cost = find_col(cols, ['成本', '總成本'])
col_profit = find_col(cols, ['毛利'])
col_qty = find_col(cols, ['銷售數量', '銷量', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
if not col_category or not col_amount:
return []
# 分類 + 廠商聚合
agg_dict = {col_amount: 'sum'}
if col_cost:
agg_dict[col_cost] = 'sum'
if col_profit:
agg_dict[col_profit] = 'sum'
if col_qty:
agg_dict[col_qty] = 'sum'
if col_name:
agg_dict[col_name] = 'nunique'
# 如果有廠商欄位,按分類+廠商聚合;否則只按分類聚合
if col_vendor:
category_df = day_df.groupby([col_category, col_vendor]).agg(agg_dict).reset_index()
else:
category_df = day_df.groupby(col_category).agg(agg_dict).reset_index()
# 計算毛利(如果資料中沒有毛利欄位,自動計算)
if col_profit and col_profit in category_df.columns:
# 資料中有毛利欄位,直接使用
pass
elif col_amount and col_cost and col_amount in category_df.columns and col_cost in category_df.columns:
# 資料中沒有毛利欄位,用 業績 - 成本 計算
category_df['profit_calculated'] = category_df[col_amount] - category_df[col_cost]
col_profit = 'profit_calculated'
else:
col_profit = None
# 計算毛利率
if col_profit and col_profit in category_df.columns and col_amount and col_amount in category_df.columns:
category_df['margin_rate'] = (category_df[col_profit] / category_df[col_amount] * 100).fillna(0)
else:
category_df['margin_rate'] = 0
# 計算均價
if col_qty and col_amount:
category_df['avg_price'] = (category_df[col_amount] / category_df[col_qty]).fillna(0)
else:
category_df['avg_price'] = 0
# 重新命名欄位以便模板使用
rename_dict = {col_category: 'category', col_amount: 'revenue'}
if col_vendor:
rename_dict[col_vendor] = 'vendor'
if col_cost:
rename_dict[col_cost] = 'cost'
if col_profit and col_profit in category_df.columns:
rename_dict[col_profit] = 'profit'
if col_qty:
rename_dict[col_qty] = 'qty'
if col_name:
rename_dict[col_name] = 'sku_count'
category_df = category_df.rename(columns=rename_dict)
# 確保 profit 欄位存在,如果不存在則設為 0
if 'profit' not in category_df.columns:
category_df['profit'] = 0
# 轉為字典列表
return category_df.to_dict('records')
# V-New 2026-01-15: 行銷活動業績聚合函數
def get_taiwan_holiday(date):
"""判斷是否為台灣國定假日,回傳 (is_holiday, holiday_name)"""
year = date.year
month = date.month
day = date.day
# 2026年台灣國定假日根據人事行政總處公佈
holidays_2026 = {
(1, 1): '元旦',
# 春節連假 (2/14-2/22共9天)
(2, 14): '春節連假',
(2, 15): '小年夜',
(2, 16): '除夕',
(2, 17): '春節 (初一)',
(2, 18): '春節 (初二)',
(2, 19): '春節 (初三)',
(2, 20): '春節連假',
(2, 21): '春節連假',
(2, 22): '春節連假',
# 和平紀念日 (2/28-3/2共3天)
(2, 28): '和平紀念日',
(3, 2): '和平紀念日補假',
# 兒童節+清明節 (4/3-4/6共4天)
(4, 3): '兒童節補假',
(4, 4): '清明節',
(4, 5): '清明節連假',
(4, 6): '清明節補假',
# 勞動節 (5/1-5/3共3天)
(5, 1): '勞動節',
# 端午節 (6/19-6/21共3天)
(6, 19): '端午節',
# 中秋節+教師節 (9/25-9/28共4天)
(9, 25): '中秋節',
(9, 28): '教師節',
# 國慶日 (10/9-10/11共3天)
(10, 9): '國慶日補假',
(10, 10): '國慶日',
# 光復節 (10/25-10/26共2天)
(10, 25): '臺灣光復節',
(10, 26): '光復節補假',
# 行憲紀念日 (12/25-12/27共3天)
(12, 25): '行憲紀念日',
}
# 2027年台灣國定假日預先計算部分
holidays_2027 = {
(1, 1): '元旦',
(2, 11): '春節 (除夕)',
(2, 12): '春節 (初一)',
(2, 13): '春節 (初二)',
(2, 14): '春節 (初三)',
(2, 15): '春節 (初四)',
(2, 16): '春節 (初五)',
(2, 17): '春節 (初六)',
(2, 28): '和平紀念日',
(4, 4): '清明節',
(4, 5): '清明節連假',
(6, 14): '端午節',
(9, 21): '中秋節',
(10, 10): '國慶日',
(10, 11): '國慶日連假',
}
holidays = holidays_2026 if year == 2026 else (holidays_2027 if year == 2027 else {})
holiday_name = holidays.get((month, day))
return (True, holiday_name) if holiday_name else (False, None)
def prepare_calendar_data(df, selected_month):
"""準備行事曆數據豐富版顯示總業績、毛利、SKU數 + DoD%"""
import calendar
# 取得該月份的年月
year = selected_month.year
month = selected_month.month
# 計算該月第一天和最後一天
first_day = pd.Timestamp(year=year, month=month, day=1)
last_day = pd.Timestamp(year=year, month=month, day=calendar.monthrange(year, month)[1])
# 計算行事曆顯示範圍(包含前後月份的日期以填滿週)
# 取得該月第一天是星期幾 (0=Monday, 6=Sunday)
first_weekday = first_day.weekday()
# 計算行事曆起始日(從週一開始)
calendar_start = first_day - timedelta(days=first_weekday)
# 計算該月最後一天是星期幾
last_weekday = last_day.weekday()
# 計算行事曆結束日(到週日結束)
calendar_end = last_day + timedelta(days=(6 - last_weekday))
# 取得該月份及前後各一天的所有資料(用於計算 DoD
data_start = first_day - timedelta(days=1)
data_end = last_day
month_df = df[(df['snapshot_date'] >= data_start) & (df['snapshot_date'] <= data_end)]
# 取得欄位
cols = df.columns.tolist()
col_amount = find_col(cols, ['銷售金額', '業績', '金額', '總業績'])
col_cost = find_col(cols, ['成本', 'Cost'])
col_profit = find_col(cols, ['毛利', 'Profit'])
col_qty = find_col(cols, ['銷售數量', '銷量', 'Qty', '數量'])
col_name = find_col(cols, ['商品名稱', '品名'])
# 為每一天計算 KPI
calendar_days = []
current_date = calendar_start
while current_date <= calendar_end:
# 取得星期0=週一, 6=週日)
weekday = current_date.weekday()
weekday_names = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
# 判斷是否為國定假日
is_holiday, holiday_name = get_taiwan_holiday(current_date)
day_data = {
'date': current_date.strftime('%Y-%m-%d'),
'day': current_date.day,
'weekday': weekday_names[weekday],
'is_weekend': weekday >= 5, # 週六或週日
'is_holiday': is_holiday,
'holiday_name': holiday_name,
'is_current_month': current_date.month == month,
'has_data': False,
'revenue': 0,
'profit': 0,
'margin_rate': 0,
'sku_count': 0,
'qty': 0,
'avg_price': 0,
'dod_percent': 0,
'dod_direction': 'neutral' # 'up', 'down', 'neutral'
}
# 如果該日期在當前月份範圍內,計算 KPI
if first_day <= current_date <= last_day:
day_df = month_df[month_df['snapshot_date'] == current_date]
if not day_df.empty:
day_data['has_data'] = True
# 計算總業績
if col_amount:
day_data['revenue'] = float(day_df[col_amount].sum())
# 計算毛利(優先使用毛利欄位,否則用業績-成本計算)
if col_profit:
day_data['profit'] = float(day_df[col_profit].sum())
elif col_cost and col_amount:
total_cost = float(day_df[col_cost].sum())
day_data['profit'] = day_data['revenue'] - total_cost
# 計算毛利率
if day_data['revenue'] > 0:
day_data['margin_rate'] = (day_data['profit'] / day_data['revenue']) * 100
# 計算銷量
if col_qty:
day_data['qty'] = float(day_df[col_qty].sum())
# 計算客單價(總業績 / 總銷量)
if day_data['qty'] > 0:
day_data['avg_price'] = day_data['revenue'] / day_data['qty']
# 計算 SKU 數
if col_name:
day_data['sku_count'] = int(day_df[col_name].nunique())
# 計算 DoD%
prev_date = current_date - timedelta(days=1)
prev_df = month_df[month_df['snapshot_date'] == prev_date]
if not prev_df.empty and col_amount:
prev_revenue = float(prev_df[col_amount].sum())
if prev_revenue > 0:
dod = ((day_data['revenue'] - prev_revenue) / prev_revenue) * 100
day_data['dod_percent'] = round(dod, 1)
day_data['dod_direction'] = 'up' if dod >= 0 else 'down'
calendar_days.append(day_data)
current_date += timedelta(days=1)
# 組織成週結構(每週 7 天)
weeks = []
for i in range(0, len(calendar_days), 7):
weeks.append(calendar_days[i:i+7])
# 計算上個月和下個月的年月
prev_month = selected_month - pd.DateOffset(months=1)
next_month = selected_month + pd.DateOffset(months=1)
return {
'year': year,
'month': month,
'month_name': selected_month.strftime('%Y年%m月'),
'weeks': weeks,
'prev_month': prev_month.strftime('%Y-%m'),
'next_month': next_month.strftime('%Y-%m')
}
# ================= ⚙️ 5. 服務啟動邏輯 =================
def run_schedule():
"""在背景執行緒中運行排程"""
sys_log.info("🚀 排程服務已啟動,等待任務...")
while True:
schedule.run_pending()
time.sleep(1)
def init_scheduler():
"""初始化排程任務Gunicorn 模式下也會執行)"""
schedule.every(1).hours.do(run_momo_task)
schedule.every(1).hours.do(run_edm_task)
schedule.every(1).hours.do(run_festival_task)
sys_log.info(f"📅 已設定每小時執行主站、EDM與購物節爬蟲任務")
schedule.every(30).minutes.do(run_auto_import_task)
sys_log.info(f"📅 已設定每 30 分鐘執行 Google Drive 自動匯入任務")
schedule.every(30).minutes.do(run_whitepage_check)
sys_log.info(f"📅 已設定每 30 分鐘執行網頁白頁監控任務")
schedule.every(4).hours.do(run_competitor_price_feeder_task)
sys_log.info(f"📅 已設定每 4 小時執行 PChome 競品價格抓取任務")
# 啟動排程執行緒
scheduler_thread = threading.Thread(target=run_schedule, daemon=True)
scheduler_thread.start()
sys_log.info("✅ 排程器已在背景執行緒中啟動")
# V-New: 在模組載入時自動初始化排程Gunicorn 模式下也會執行)
# 🚩 V-Fix 2026-01-14: 停用自動排程器以避免多個 gunicorn workers 重複執行任務
# 原因:每個 worker 都會啟動排程器,導致 4x 資源消耗4 workers × 3 爬蟲任務 = 12 Chrome 實例同時運行)
# 解決方案:改用獨立的 run_scheduler.py 或透過 Web UI 手動觸發任務
# try:
# init_scheduler()
# except Exception as e:
# sys_log.error(f"❌ 排程器初始化失敗: {e}")
sys_log.info(" 自動排程器已停用(避免重複執行),請使用 run_scheduler.py 或 Web UI 手動觸發")
def start_flask():
sys_log.info("🚀 Web 服務正在啟動於 port 80...")
app.run(host='0.0.0.0', port=80, use_reloader=False)
def scheduled_job_wrapper():
"""執行 MOMO 爬蟲任務並發送通知"""
timestamp = datetime.now(TAIPEI_TZ).strftime('%H:%M:%S')
sys_log.info(f"⏰ [{timestamp}] 啟動背景抓取執行緒...")
def job():
# 1. 執行爬蟲
run_momo_task()
# 2. 發送通知 (僅發送今日異動)
try:
# 重新載入通知模組
import importlib
import scheduler
import services.notification_manager
importlib.reload(scheduler)
importlib.reload(services.notification_manager)
from services.notification_manager import NotificationManager
stats = get_dashboard_stats()
# 只要有任何異動數據就發送通知
if any(stats.values()):
screenshot_path = scheduler.capture_page_screenshot("http://127.0.0.1/", "momo_dashboard")
NotificationManager().send_momo_report(stats, screenshot_path)
except Exception as e:
sys_log.error(f"[Scheduler] ❌ 發送通知失敗: {e}")
threading.Thread(target=job, daemon=True).start()
if __name__ == "__main__":
banner = f" MOMO 專業數據管理系統 {SYSTEM_VERSION} "
sys_log.info(f"{ '='*20} {banner} {'='*20}")
# 啟動前先檢查資料庫結構
repair_database_schema()
# 使用生產環境域名
public_url = "https://mo.wooo.work"
sys_log.info(f"✅ 使用固定網址: {public_url}")
# 🚩 V9.7 將公開 URL 寫入設定檔,供其他模組使用
try:
url_config_path = os.path.join(BASE_DIR, 'data', 'url_config.json')
with open(url_config_path, 'w') as f:
json.dump({"public_url": public_url}, f)
except Exception as file_err:
sys_log.error(f"⚠️ URL 設定檔寫入失敗 (不影響服務運行,可能磁碟已滿): {file_err}")
web_server = threading.Thread(target=start_flask)
web_server.daemon = True
web_server.start()
# 排程器已在模組載入時自動初始化(見 init_scheduler() 函式)
sys_log.info(" 排程器已在全域範圍初始化完成")
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
sys_log.info("🔌 Web 服務已關閉")
try:
ngrok.disconnect(public_url)
except Exception as e:
sys_log.info(f" Ngrok 關閉時無需額外操作: {e}")