#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 通用 API 路由模組 包含:任務觸發、通知、歷史查詢、價格變動等 API """ import os import threading import importlib from datetime import datetime, timezone, timedelta import re from flask import Blueprint, request, jsonify from sqlalchemy import func, desc, text from urllib.parse import parse_qs, urlparse from auth import login_required from config import BASE_DIR from database.manager import DatabaseManager from database.models import Product, PriceRecord from database.edm_models import PromoProduct from services.logger_manager import SystemLogger from utils.momo_url_utils import build_momo_product_url, normalize_momo_product_url from utils.momo_url_utils import is_probable_momo_icode # 時區設定 TAIPEI_TZ = timezone(timedelta(hours=8)) # Logger sys_log = SystemLogger("APIRoutes").get_logger() # Blueprint 定義 api_bp = Blueprint('api', __name__) # ========================================== # 任務觸發 API # ========================================== @api_bp.route('/api/run_task', methods=['POST']) @login_required def trigger_task(): """API: 手動觸發 MOMO 爬蟲任務""" try: client_ip = request.remote_addr sys_log.info(f"[Web] [Task] 接收到手動執行請求 | IP: {client_ip}") # 使用獨立的 task_runner 服務,避免循環依賴 from services.task_runner import run_momo_task_with_notification run_momo_task_with_notification() return jsonify({"status": "success", "message": "爬蟲任務已在背景啟動"}) except Exception as e: sys_log.error(f"[Web] [Task] 手動觸發任務失敗 | Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @api_bp.route('/api/run_edm_task', methods=['POST']) @login_required def trigger_edm_task(): """API: 手動觸發 EDM 爬蟲任務""" try: target_lpn = "O1K5FBOqsvN" # 預設活動代碼 sys_log.info(f"[Web] [Task] 接收到手動 EDM 執行請求 | LPN: {target_lpn}") # 強制重載 scheduler 模組 import scheduler importlib.reload(scheduler) # 使用執行緒啟動,避免卡住 Web Server task_thread = threading.Thread(target=scheduler.run_edm_task, args=(target_lpn,)) task_thread.daemon = True task_thread.start() return jsonify({"status": "success", "message": f"EDM 爬蟲任務 (LPN: {target_lpn}) 已在背景啟動,請稍後刷新頁面查看結果"}) except Exception as e: sys_log.error(f"[Web] [Task] 手動觸發 EDM 任務失敗 | Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @api_bp.route('/api/run_festival_task', methods=['POST']) @login_required def trigger_festival_task(): """API: 手動觸發 1.1 狂歡購物節爬蟲任務""" try: target_lpn = "O7ylWfihYUM" sys_log.info(f"[Web] [Task] 接收到手動 Festival 執行請求 | LPN: {target_lpn}") # 延遲導入 import scheduler importlib.reload(scheduler) # 使用執行緒啟動,避免卡住 Web Server task_thread = threading.Thread(target=scheduler.run_festival_task, args=(target_lpn,)) task_thread.daemon = True task_thread.start() return jsonify({"status": "success", "message": f"Festival 爬蟲任務 (LPN: {target_lpn}) 已在背景啟動,請稍後刷新頁面查看結果"}) except Exception as e: sys_log.error(f"[Web] [Task] 手動觸發 Festival 任務失敗 | Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @api_bp.route('/api/run_promo_event_task', methods=['POST']) @login_required def trigger_promo_event_task(): """API: 手動觸發促銷活動爬蟲任務(支援母親節、520、勞動節等)""" try: data = request.get_json() page_type = data.get('page_type', '') lpn_code = data.get('lpn_code', '') activity_name = data.get('activity_name', '促銷活動') if not page_type or not lpn_code: return jsonify({"status": "error", "message": "缺少必要參數: page_type 和 lpn_code"}), 400 sys_log.info(f"[Web] [Task] 接收到手動促銷活動執行請求 | Type: {page_type} | LPN: {lpn_code} | Name: {activity_name}") # 延遲導入 import scheduler importlib.reload(scheduler) # 使用執行緒啟動,避免卡住 Web Server task_thread = threading.Thread(target=scheduler.run_promo_event_task, args=(lpn_code, page_type, activity_name)) task_thread.daemon = True task_thread.start() return jsonify({"status": "success", "message": f"{activity_name} 爬蟲任務 (LPN: {lpn_code}) 已在背景啟動,請稍後刷新頁面查看結果"}) except Exception as e: sys_log.error(f"[Web] [Task] 手動觸發促銷活動任務失敗 | Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # ========================================== # 通知 API # ========================================== @api_bp.route('/api/trigger_momo_notification', methods=['POST']) @login_required def trigger_momo_notification(): """API: 手動觸發商品看板通知""" try: # 強制重載通知模組 import scheduler import services.notification_manager importlib.reload(scheduler) importlib.reload(services.notification_manager) from services.notification_manager import NotificationManager # 從 dashboard_routes 導入 get_dashboard_stats,避免循環依賴 from routes.dashboard_routes import get_dashboard_stats # 1. 取得統計數據 stats = get_dashboard_stats() # 2. 截取儀表板畫面 dashboard_url = "http://127.0.0.1/" screenshot_path = scheduler.capture_page_screenshot(dashboard_url, "momo_dashboard") # 3. 發送通知 notifier = NotificationManager() sys_log.info(f"[Web] [Notification] 手動觸發 MOMO 通知") notifier.send_momo_report(stats, screenshot_path) return jsonify({"status": "success", "message": "已發送商品看板通知"}) except Exception as e: sys_log.error(f"[Web] [Notification] 手動通知失敗 | Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @api_bp.route('/api/trigger_edm_notification', methods=['POST']) @login_required def trigger_edm_notification(): """API: 手動觸發 EDM 比價通知 (不重爬,僅重發)""" try: # 強制重新載入設定與通知模組 import config import services.notification_manager import services.edm_notifier importlib.reload(config) importlib.reload(services.notification_manager) importlib.reload(services.edm_notifier) db = DatabaseManager() session = db.get_session() try: # 找出最新的 batch_id latest_batch_tuple = session.query(PromoProduct.batch_id).filter( PromoProduct.page_type == 'edm' ).order_by(desc(PromoProduct.crawled_at)).first() if not latest_batch_tuple: return jsonify({"status": "warning", "message": "目前無 EDM 商品資料,請先執行爬蟲"}), 400 latest_batch_id = latest_batch_tuple[0] # 取得最新批次的所有異動商品 products = session.query(PromoProduct).filter( PromoProduct.batch_id == latest_batch_id ).all() if not products: return jsonify({"status": "info", "message": "最新一輪掃描中無任何商品異動"}), 200 # 嘗試尋找對應的截圖檔案 screenshot_path = None try: filename = f"edm_{latest_batch_id}.png" potential_path = os.path.join(BASE_DIR, 'web/static/screenshots', filename) if os.path.exists(potential_path): screenshot_path = potential_path except Exception: pass from services.edm_notifier import EdmNotifier notifier = EdmNotifier() sys_log.info(f"[Web] [Notification] 手動觸發 EDM 通知 | Count: {len(products)} | BatchID: {latest_batch_id}") notifier.send_edm_report(products, screenshot_path) return jsonify({"status": "success", "message": f"已針對最新批次的 {len(products)} 筆商品異動發送通知"}) finally: session.close() except Exception as e: sys_log.error(f"[Web] [Notification] 手動通知失敗 | Error: {e}") return jsonify({"status": "error", "message": str(e)}), 500 @api_bp.route('/api/test_notification', methods=['POST']) @login_required def test_notification(): """API: 測試訊息通知功能""" try: from services.notification_manager import NotificationManager import config import requests notifier = NotificationManager() sys_log.info("[Web] [Notification] 執行手動通知發送測試 (Line/Telegram/Email)...") token = getattr(config, 'LINE_CHANNEL_ACCESS_TOKEN', None) target_id = getattr(config, 'LINE_GROUP_ID', None) if token and target_id: sys_log.info(f"[Web] [Notification] 偵測到 Channel Token: {token[:4]}...{token[-4:]}") sys_log.info(f"[Web] [Notification] 目標 ID: {target_id}") # 嘗試直接發送請求 try: headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } payload = { "to": target_id, "messages": [ { "type": "text", "text": "這是系統診斷測試訊息 (Messaging API)\n\n連線測試成功!" } ] } sys_log.info("[Web] [Notification] 正在嘗試連線至 Line Messaging API (push)...") resp = requests.post("https://api.line.me/v2/bot/message/push", headers=headers, json=payload, timeout=10) sys_log.info(f"[Web] [Notification] Line API 回應 | Code: {resp.status_code}") sys_log.info(f"[Web] [Notification] Line API 內容 | Body: {resp.text}") if resp.status_code != 200: return jsonify({"status": "error", "message": f"Line API 拒絕連線: {resp.status_code} - {resp.text}"}), 400 except Exception as req_err: sys_log.error(f"[Web] [Notification] 直接連線測試發生異常 | Error: {req_err}") return jsonify({"status": "error", "message": f"連線異常: {req_err}"}), 500 else: sys_log.warning("[Web] [Notification] 無法偵測到 Messaging API 設定 (Token 或 Group ID 缺失)") return jsonify({"status": "error", "message": "設定檔缺少 LINE_CHANNEL_ACCESS_TOKEN 或 LINE_GROUP_ID"}), 400 # 呼叫真實的日報發送邏輯 notifier.send_daily_report() return jsonify({"status": "success", "message": "當日異動通知已發送 (Line/Telegram/Email)"}) except ImportError: return jsonify({"status": "error", "message": "找不到 NotificationManager 模組"}), 500 except Exception as e: sys_log.error(f"[Web] [Notification] 測試通知失敗 | Error: {e}") return jsonify({"status": "error", "message": f"發送失敗: {str(e)}"}), 500 # ========================================== # 歷史查詢 API # ========================================== PRICE_HISTORY_RANGES = { 'week': {'days': 7, 'label': '近 7 天'}, 'month': {'days': 30, 'label': '近 30 天'}, 'quarter': {'days': 90, 'label': '近 90 天'}, 'year': {'days': 365, 'label': '近 365 天'}, } def _resolve_history_range(): range_key = request.args.get('range', 'month') if range_key not in PRICE_HISTORY_RANGES: range_key = 'month' return range_key, PRICE_HISTORY_RANGES[range_key] def _build_price_history_payload(session, product): range_key, range_meta = _resolve_history_range() start_date = datetime.now(TAIPEI_TZ) - timedelta(days=range_meta['days']) records = session.query(PriceRecord).filter( PriceRecord.product_id == product.id, PriceRecord.timestamp >= start_date ).order_by(PriceRecord.timestamp).all() data = [{ 't': r.timestamp.strftime('%Y-%m-%d %H:%M'), 'p': r.price } for r in records] competitor_data = [] competitor_latest = None try: competitor_rows = session.execute(text(""" SELECT price, momo_price, competitor_product_id, competitor_product_name, match_score, crawled_at FROM competitor_price_history WHERE sku = :sku AND source = 'pchome' AND crawled_at >= :start_date AND COALESCE(match_score, 0) >= 0.76 AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2' ORDER BY crawled_at """), { "sku": str(product.i_code), "start_date": start_date.replace(tzinfo=None), }).mappings().all() competitor_data = [{ 't': r['crawled_at'].strftime('%Y-%m-%d %H:%M'), 'p': float(r['price']), 'momo_price': float(r['momo_price']) if r['momo_price'] is not None else None, 'product_id': r['competitor_product_id'], 'product_name': r['competitor_product_name'], 'match_score': float(r['match_score']) if r['match_score'] is not None else None, } for r in competitor_rows] if competitor_data: competitor_latest = competitor_data[-1] except Exception as exc: sys_log.warning( f"[Web] [History] PChome 競品歷史資料讀取略過 | ICode: {product.i_code} | Error: {exc}" ) return { 'range': range_key, 'range_label': range_meta['label'], 'product': { 'id': product.id, 'i_code': product.i_code, 'name': product.name, }, 'data': data, 'series': { 'momo': data, 'pchome': competitor_data, }, 'competitor': competitor_latest, } @api_bp.route('/api/history/') @login_required def get_price_history(product_id): """API: 取得商品價格歷史,支援 week/month/quarter/year 區間""" db = DatabaseManager() session = db.get_session() try: product = session.query(Product).filter(Product.id == product_id).first() if not product: return jsonify({'data': [], 'message': '找不到商品'}), 404 payload = _build_price_history_payload(session, product) if request.args.get('format') == 'v2': return jsonify(payload) return jsonify(payload['data']) except Exception as e: sys_log.error(f"[Web] [History] 獲取歷史價格失敗 | ProductID: {product_id} | Error: {e}") return jsonify([]), 500 finally: session.close() @api_bp.route('/api/history/i-code/') @login_required def get_price_history_by_i_code(i_code): """API: 以 MOMO 商品 i_code 取得主商品價格歷史""" db = DatabaseManager() session = db.get_session() try: product = session.query(Product).filter(Product.i_code == str(i_code)).first() if not product: return jsonify({'data': [], 'message': '找不到商品'}) return jsonify(_build_price_history_payload(session, product)) except Exception as e: sys_log.error(f"[Web] [History] 以 i_code 獲取歷史價格失敗 | ICode: {i_code} | Error: {e}") return jsonify({'data': []}), 500 finally: session.close() @api_bp.route('/api/price_change_details') @login_required def get_price_change_details(): """API: 取得價格變動商品明細 (供彈窗使用)""" filter_type = request.args.get('type', '') # 以下參數保留以供未來擴展 # filter_category = request.args.get('category', '') # filter_product_id = request.args.get('product_id', '') db = DatabaseManager() session = db.get_session() try: def _safe_product_url(product): return normalize_momo_product_url(product.url, product.i_code) or build_momo_product_url(product.i_code) # 取得今日起始時間 now_taipei = datetime.now(TAIPEI_TZ) today_start = now_taipei.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None) # 基礎查詢:取得所有商品的最新記錄 latest_records_subq = session.query( func.max(PriceRecord.id).label('max_id') ).group_by(PriceRecord.product_id).subquery() query = session.query(PriceRecord, Product).join( latest_records_subq, PriceRecord.id == latest_records_subq.c.max_id ).join(Product, PriceRecord.product_id == Product.id) # 一次性查詢所有商品的「今日之前最後價格」 product_ids = [r[0] for r in session.query(PriceRecord.product_id).join( latest_records_subq, PriceRecord.id == latest_records_subq.c.max_id ).all()] yesterday_prices_subq = session.query( PriceRecord.product_id, func.max(PriceRecord.id).label('max_id') ).filter( PriceRecord.product_id.in_(product_ids), PriceRecord.timestamp < today_start ).group_by(PriceRecord.product_id).subquery() yesterday_prices_q = session.query( PriceRecord.product_id, PriceRecord.price ).join( yesterday_prices_subq, PriceRecord.id == yesterday_prices_subq.c.max_id ) yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q} # 根據 filter_type 進行篩選 products = [] if filter_type == 'increase': # 漲價商品 for record, product in query.all(): old_price = yesterday_prices_map.get(product.id) if old_price is not None and record.price > old_price: products.append({ 'product_id': product.i_code, 'name': product.name, 'category': product.category, 'url': _safe_product_url(product), 'image_url': product.image_url or '/static/placeholder.png', 'old_price': old_price, 'current_price': record.price, 'change': record.price - old_price, 'update_time': record.timestamp.strftime('%Y-%m-%d %H:%M') }) elif filter_type == 'decrease': # 降價商品 for record, product in query.all(): old_price = yesterday_prices_map.get(product.id) if old_price is not None and record.price < old_price: products.append({ 'product_id': product.i_code, 'name': product.name, 'category': product.category, 'url': _safe_product_url(product), 'image_url': product.image_url or '/static/placeholder.png', 'old_price': old_price, 'current_price': record.price, 'change': record.price - old_price, 'update_time': record.timestamp.strftime('%Y-%m-%d %H:%M') }) elif filter_type == 'delisted': # 下架商品 (今日狀態為 INACTIVE 且今天更新的) today_delisted = session.query(Product).filter( Product.status == 'INACTIVE', Product.updated_at >= today_start ).all() for product in today_delisted: last_record = session.query(PriceRecord).filter( PriceRecord.product_id == product.id ).order_by(PriceRecord.timestamp.desc()).first() if last_record: products.append({ 'product_id': product.i_code, 'name': product.name, 'category': product.category, 'url': _safe_product_url(product), 'image_url': product.image_url or '/static/placeholder.png', 'last_price': last_record.price, 'update_time': product.updated_at.strftime('%Y-%m-%d %H:%M') if product.updated_at else '' }) return jsonify({'products': products}) except Exception as e: sys_log.error(f"[Web] [PriceChange] 獲取價格變動明細失敗 | Error: {e}") return jsonify({'products': []}), 500 finally: session.close() @api_bp.route('/api/track_momo_link', methods=['POST']) @login_required def track_momo_link(): """API: 記錄 MOMO 連結點擊與異常開啟事件,用於診斷自動開啟來源。""" def _is_blocked_momo_url(url: str) -> bool: url_l = str(url or '').lower() if 'ec404.html' in url_l or 'ec404' in url_l: return True try: parsed = urlparse(str(url or '')) path = (parsed.path or '').lower() if 'goodsdetail' in path: query = parse_qs(parsed.query or '') i_code = (query.get('i_code') or [''])[0] if i_code: return not is_probable_momo_icode(i_code) if not re.search(r'/goodsdetail/[^/]+', path): return True except Exception: pass return False payload = request.get_json(silent=True) or {} url = str(payload.get('url') or '').strip() effective_url = str(payload.get('effective_url') or '').strip() if not url: return jsonify({'status': 'ignored', 'reason': 'missing_url'}), 400 is_blocked = _is_blocked_momo_url(url) or _is_blocked_momo_url(effective_url) level = "[Web] [MOMO_LINK_TRACK] " product_id = str(payload.get('product_id', '') or '').strip() i_code = str(payload.get('i_code', '') or '').strip() source = str(payload.get('source', '') or 'unknown').strip() page = str(payload.get('page', '') or '').strip() label = str(payload.get('label', '') or '').strip() platform = str(payload.get('platform', '') or 'momo').strip() product_name = str(payload.get('product_name', '') or '').strip() referer = request.headers.get('Referer', '') user_ip = request.remote_addr if not effective_url: effective_url = url msg = ( f"{level}platform={platform} source={source} page={page} " f"i_code={i_code} product_id={product_id} label={label} " f"name={product_name} url={url} effective_url={effective_url} ip={user_ip} referer={referer}" ) if is_blocked: sys_log.warning(msg + " | status=blocked_link") else: sys_log.info(msg + " | status=tracked") return jsonify({'status': 'ok'})