Files
ewoooc/routes/api_routes.py
OoO 75390f8495
All checks were successful
CD Pipeline / deploy (push) Successful in 1m19s
收緊 PChome 同款比對門檻
2026-05-19 15:53:09 +08:00

593 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通用 API 路由模組
包含:任務觸發、通知、歷史查詢、價格變動等 API
"""
import os
import threading
import importlib
from datetime import datetime, timezone, timedelta
import re
from flask import Blueprint, request, jsonify
from sqlalchemy import func, desc, text
from urllib.parse import parse_qs, urlparse
from auth import login_required
from config import BASE_DIR
from database.manager import DatabaseManager
from database.models import Product, PriceRecord
from database.edm_models import PromoProduct
from services.logger_manager import SystemLogger
from utils.momo_url_utils import build_momo_product_url, normalize_momo_product_url
from utils.momo_url_utils import is_probable_momo_icode
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Logger
sys_log = SystemLogger("APIRoutes").get_logger()
# Blueprint 定義
api_bp = Blueprint('api', __name__)
# ==========================================
# 任務觸發 API
# ==========================================
@api_bp.route('/api/run_task', methods=['POST'])
@login_required
def trigger_task():
"""API: 手動觸發 MOMO 爬蟲任務"""
try:
client_ip = request.remote_addr
sys_log.info(f"[Web] [Task] 接收到手動執行請求 | IP: {client_ip}")
# 使用獨立的 task_runner 服務,避免循環依賴
from services.task_runner import run_momo_task_with_notification
run_momo_task_with_notification()
return jsonify({"status": "success", "message": "爬蟲任務已在背景啟動"})
except Exception as e:
sys_log.error(f"[Web] [Task] 手動觸發任務失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@api_bp.route('/api/run_edm_task', methods=['POST'])
@login_required
def trigger_edm_task():
"""API: 手動觸發 EDM 爬蟲任務"""
try:
target_lpn = "O1K5FBOqsvN" # 預設活動代碼
sys_log.info(f"[Web] [Task] 接收到手動 EDM 執行請求 | LPN: {target_lpn}")
# 強制重載 scheduler 模組
import scheduler
importlib.reload(scheduler)
# 使用執行緒啟動,避免卡住 Web Server
task_thread = threading.Thread(target=scheduler.run_edm_task, args=(target_lpn,))
task_thread.daemon = True
task_thread.start()
return jsonify({"status": "success", "message": f"EDM 爬蟲任務 (LPN: {target_lpn}) 已在背景啟動,請稍後刷新頁面查看結果"})
except Exception as e:
sys_log.error(f"[Web] [Task] 手動觸發 EDM 任務失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@api_bp.route('/api/run_festival_task', methods=['POST'])
@login_required
def trigger_festival_task():
"""API: 手動觸發 1.1 狂歡購物節爬蟲任務"""
try:
target_lpn = "O7ylWfihYUM"
sys_log.info(f"[Web] [Task] 接收到手動 Festival 執行請求 | LPN: {target_lpn}")
# 延遲導入
import scheduler
importlib.reload(scheduler)
# 使用執行緒啟動,避免卡住 Web Server
task_thread = threading.Thread(target=scheduler.run_festival_task, args=(target_lpn,))
task_thread.daemon = True
task_thread.start()
return jsonify({"status": "success", "message": f"Festival 爬蟲任務 (LPN: {target_lpn}) 已在背景啟動,請稍後刷新頁面查看結果"})
except Exception as e:
sys_log.error(f"[Web] [Task] 手動觸發 Festival 任務失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@api_bp.route('/api/run_promo_event_task', methods=['POST'])
@login_required
def trigger_promo_event_task():
"""API: 手動觸發促銷活動爬蟲任務支援母親節、520、勞動節等"""
try:
data = request.get_json()
page_type = data.get('page_type', '')
lpn_code = data.get('lpn_code', '')
activity_name = data.get('activity_name', '促銷活動')
if not page_type or not lpn_code:
return jsonify({"status": "error", "message": "缺少必要參數: page_type 和 lpn_code"}), 400
sys_log.info(f"[Web] [Task] 接收到手動促銷活動執行請求 | Type: {page_type} | LPN: {lpn_code} | Name: {activity_name}")
# 延遲導入
import scheduler
importlib.reload(scheduler)
# 使用執行緒啟動,避免卡住 Web Server
task_thread = threading.Thread(target=scheduler.run_promo_event_task, args=(lpn_code, page_type, activity_name))
task_thread.daemon = True
task_thread.start()
return jsonify({"status": "success", "message": f"{activity_name} 爬蟲任務 (LPN: {lpn_code}) 已在背景啟動,請稍後刷新頁面查看結果"})
except Exception as e:
sys_log.error(f"[Web] [Task] 手動觸發促銷活動任務失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# ==========================================
# 通知 API
# ==========================================
@api_bp.route('/api/trigger_momo_notification', methods=['POST'])
@login_required
def trigger_momo_notification():
"""API: 手動觸發商品看板通知"""
try:
# 強制重載通知模組
import scheduler
import services.notification_manager
importlib.reload(scheduler)
importlib.reload(services.notification_manager)
from services.notification_manager import NotificationManager
# 從 dashboard_routes 導入 get_dashboard_stats避免循環依賴
from routes.dashboard_routes import get_dashboard_stats
# 1. 取得統計數據
stats = get_dashboard_stats()
# 2. 截取儀表板畫面
dashboard_url = "http://127.0.0.1/"
screenshot_path = scheduler.capture_page_screenshot(dashboard_url, "momo_dashboard")
# 3. 發送通知
notifier = NotificationManager()
sys_log.info(f"[Web] [Notification] 手動觸發 MOMO 通知")
notifier.send_momo_report(stats, screenshot_path)
return jsonify({"status": "success", "message": "已發送商品看板通知"})
except Exception as e:
sys_log.error(f"[Web] [Notification] 手動通知失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@api_bp.route('/api/trigger_edm_notification', methods=['POST'])
@login_required
def trigger_edm_notification():
"""API: 手動觸發 EDM 比價通知 (不重爬,僅重發)"""
try:
# 強制重新載入設定與通知模組
import config
import services.notification_manager
import services.edm_notifier
importlib.reload(config)
importlib.reload(services.notification_manager)
importlib.reload(services.edm_notifier)
db = DatabaseManager()
session = db.get_session()
try:
# 找出最新的 batch_id
latest_batch_tuple = session.query(PromoProduct.batch_id).filter(
PromoProduct.page_type == 'edm'
).order_by(desc(PromoProduct.crawled_at)).first()
if not latest_batch_tuple:
return jsonify({"status": "warning", "message": "目前無 EDM 商品資料,請先執行爬蟲"}), 400
latest_batch_id = latest_batch_tuple[0]
# 取得最新批次的所有異動商品
products = session.query(PromoProduct).filter(
PromoProduct.batch_id == latest_batch_id
).all()
if not products:
return jsonify({"status": "info", "message": "最新一輪掃描中無任何商品異動"}), 200
# 嘗試尋找對應的截圖檔案
screenshot_path = None
try:
filename = f"edm_{latest_batch_id}.png"
potential_path = os.path.join(BASE_DIR, 'web/static/screenshots', filename)
if os.path.exists(potential_path):
screenshot_path = potential_path
except Exception:
pass
from services.edm_notifier import EdmNotifier
notifier = EdmNotifier()
sys_log.info(f"[Web] [Notification] 手動觸發 EDM 通知 | Count: {len(products)} | BatchID: {latest_batch_id}")
notifier.send_edm_report(products, screenshot_path)
return jsonify({"status": "success", "message": f"已針對最新批次的 {len(products)} 筆商品異動發送通知"})
finally:
session.close()
except Exception as e:
sys_log.error(f"[Web] [Notification] 手動通知失敗 | Error: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@api_bp.route('/api/test_notification', methods=['POST'])
@login_required
def test_notification():
"""API: 測試訊息通知功能"""
try:
from services.notification_manager import NotificationManager
import config
import requests
notifier = NotificationManager()
sys_log.info("[Web] [Notification] 執行手動通知發送測試 (Line/Telegram/Email)...")
token = getattr(config, 'LINE_CHANNEL_ACCESS_TOKEN', None)
target_id = getattr(config, 'LINE_GROUP_ID', None)
if token and target_id:
sys_log.info(f"[Web] [Notification] 偵測到 Channel Token: {token[:4]}...{token[-4:]}")
sys_log.info(f"[Web] [Notification] 目標 ID: {target_id}")
# 嘗試直接發送請求
try:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"to": target_id,
"messages": [
{
"type": "text",
"text": "這是系統診斷測試訊息 (Messaging API)\n\n連線測試成功!"
}
]
}
sys_log.info("[Web] [Notification] 正在嘗試連線至 Line Messaging API (push)...")
resp = requests.post("https://api.line.me/v2/bot/message/push", headers=headers, json=payload, timeout=10)
sys_log.info(f"[Web] [Notification] Line API 回應 | Code: {resp.status_code}")
sys_log.info(f"[Web] [Notification] Line API 內容 | Body: {resp.text}")
if resp.status_code != 200:
return jsonify({"status": "error", "message": f"Line API 拒絕連線: {resp.status_code} - {resp.text}"}), 400
except Exception as req_err:
sys_log.error(f"[Web] [Notification] 直接連線測試發生異常 | Error: {req_err}")
return jsonify({"status": "error", "message": f"連線異常: {req_err}"}), 500
else:
sys_log.warning("[Web] [Notification] 無法偵測到 Messaging API 設定 (Token 或 Group ID 缺失)")
return jsonify({"status": "error", "message": "設定檔缺少 LINE_CHANNEL_ACCESS_TOKEN 或 LINE_GROUP_ID"}), 400
# 呼叫真實的日報發送邏輯
notifier.send_daily_report()
return jsonify({"status": "success", "message": "當日異動通知已發送 (Line/Telegram/Email)"})
except ImportError:
return jsonify({"status": "error", "message": "找不到 NotificationManager 模組"}), 500
except Exception as e:
sys_log.error(f"[Web] [Notification] 測試通知失敗 | Error: {e}")
return jsonify({"status": "error", "message": f"發送失敗: {str(e)}"}), 500
# ==========================================
# 歷史查詢 API
# ==========================================
PRICE_HISTORY_RANGES = {
'week': {'days': 7, 'label': '近 7 天'},
'month': {'days': 30, 'label': '近 30 天'},
'quarter': {'days': 90, 'label': '近 90 天'},
'year': {'days': 365, 'label': '近 365 天'},
}
def _resolve_history_range():
range_key = request.args.get('range', 'month')
if range_key not in PRICE_HISTORY_RANGES:
range_key = 'month'
return range_key, PRICE_HISTORY_RANGES[range_key]
def _build_price_history_payload(session, product):
range_key, range_meta = _resolve_history_range()
start_date = datetime.now(TAIPEI_TZ) - timedelta(days=range_meta['days'])
records = session.query(PriceRecord).filter(
PriceRecord.product_id == product.id,
PriceRecord.timestamp >= start_date
).order_by(PriceRecord.timestamp).all()
data = [{
't': r.timestamp.strftime('%Y-%m-%d %H:%M'),
'p': r.price
} for r in records]
competitor_data = []
competitor_latest = None
try:
competitor_rows = session.execute(text("""
SELECT
price,
momo_price,
competitor_product_id,
competitor_product_name,
match_score,
crawled_at
FROM competitor_price_history
WHERE sku = :sku
AND source = 'pchome'
AND crawled_at >= :start_date
AND COALESCE(match_score, 0) >= 0.76
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY crawled_at
"""), {
"sku": str(product.i_code),
"start_date": start_date.replace(tzinfo=None),
}).mappings().all()
competitor_data = [{
't': r['crawled_at'].strftime('%Y-%m-%d %H:%M'),
'p': float(r['price']),
'momo_price': float(r['momo_price']) if r['momo_price'] is not None else None,
'product_id': r['competitor_product_id'],
'product_name': r['competitor_product_name'],
'match_score': float(r['match_score']) if r['match_score'] is not None else None,
} for r in competitor_rows]
if competitor_data:
competitor_latest = competitor_data[-1]
except Exception as exc:
sys_log.warning(
f"[Web] [History] PChome 競品歷史資料讀取略過 | ICode: {product.i_code} | Error: {exc}"
)
return {
'range': range_key,
'range_label': range_meta['label'],
'product': {
'id': product.id,
'i_code': product.i_code,
'name': product.name,
},
'data': data,
'series': {
'momo': data,
'pchome': competitor_data,
},
'competitor': competitor_latest,
}
@api_bp.route('/api/history/<int:product_id>')
@login_required
def get_price_history(product_id):
"""API: 取得商品價格歷史,支援 week/month/quarter/year 區間"""
db = DatabaseManager()
session = db.get_session()
try:
product = session.query(Product).filter(Product.id == product_id).first()
if not product:
return jsonify({'data': [], 'message': '找不到商品'}), 404
payload = _build_price_history_payload(session, product)
if request.args.get('format') == 'v2':
return jsonify(payload)
return jsonify(payload['data'])
except Exception as e:
sys_log.error(f"[Web] [History] 獲取歷史價格失敗 | ProductID: {product_id} | Error: {e}")
return jsonify([]), 500
finally:
session.close()
@api_bp.route('/api/history/i-code/<path:i_code>')
@login_required
def get_price_history_by_i_code(i_code):
"""API: 以 MOMO 商品 i_code 取得主商品價格歷史"""
db = DatabaseManager()
session = db.get_session()
try:
product = session.query(Product).filter(Product.i_code == str(i_code)).first()
if not product:
return jsonify({'data': [], 'message': '找不到商品'})
return jsonify(_build_price_history_payload(session, product))
except Exception as e:
sys_log.error(f"[Web] [History] 以 i_code 獲取歷史價格失敗 | ICode: {i_code} | Error: {e}")
return jsonify({'data': []}), 500
finally:
session.close()
@api_bp.route('/api/price_change_details')
@login_required
def get_price_change_details():
"""API: 取得價格變動商品明細 (供彈窗使用)"""
filter_type = request.args.get('type', '')
# 以下參數保留以供未來擴展
# filter_category = request.args.get('category', '')
# filter_product_id = request.args.get('product_id', '')
db = DatabaseManager()
session = db.get_session()
try:
def _safe_product_url(product):
return normalize_momo_product_url(product.url, product.i_code) or build_momo_product_url(product.i_code)
# 取得今日起始時間
now_taipei = datetime.now(TAIPEI_TZ)
today_start = now_taipei.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None)
# 基礎查詢:取得所有商品的最新記錄
latest_records_subq = session.query(
func.max(PriceRecord.id).label('max_id')
).group_by(PriceRecord.product_id).subquery()
query = session.query(PriceRecord, Product).join(
latest_records_subq,
PriceRecord.id == latest_records_subq.c.max_id
).join(Product, PriceRecord.product_id == Product.id)
# 一次性查詢所有商品的「今日之前最後價格」
product_ids = [r[0] for r in session.query(PriceRecord.product_id).join(
latest_records_subq, PriceRecord.id == latest_records_subq.c.max_id
).all()]
yesterday_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < today_start
).group_by(PriceRecord.product_id).subquery()
yesterday_prices_q = session.query(
PriceRecord.product_id, PriceRecord.price
).join(
yesterday_prices_subq,
PriceRecord.id == yesterday_prices_subq.c.max_id
)
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
# 根據 filter_type 進行篩選
products = []
if filter_type == 'increase':
# 漲價商品
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price > old_price:
products.append({
'product_id': product.i_code,
'name': product.name,
'category': product.category,
'url': _safe_product_url(product),
'image_url': product.image_url or '/static/placeholder.png',
'old_price': old_price,
'current_price': record.price,
'change': record.price - old_price,
'update_time': record.timestamp.strftime('%Y-%m-%d %H:%M')
})
elif filter_type == 'decrease':
# 降價商品
for record, product in query.all():
old_price = yesterday_prices_map.get(product.id)
if old_price is not None and record.price < old_price:
products.append({
'product_id': product.i_code,
'name': product.name,
'category': product.category,
'url': _safe_product_url(product),
'image_url': product.image_url or '/static/placeholder.png',
'old_price': old_price,
'current_price': record.price,
'change': record.price - old_price,
'update_time': record.timestamp.strftime('%Y-%m-%d %H:%M')
})
elif filter_type == 'delisted':
# 下架商品 (今日狀態為 INACTIVE 且今天更新的)
today_delisted = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start
).all()
for product in today_delisted:
last_record = session.query(PriceRecord).filter(
PriceRecord.product_id == product.id
).order_by(PriceRecord.timestamp.desc()).first()
if last_record:
products.append({
'product_id': product.i_code,
'name': product.name,
'category': product.category,
'url': _safe_product_url(product),
'image_url': product.image_url or '/static/placeholder.png',
'last_price': last_record.price,
'update_time': product.updated_at.strftime('%Y-%m-%d %H:%M') if product.updated_at else ''
})
return jsonify({'products': products})
except Exception as e:
sys_log.error(f"[Web] [PriceChange] 獲取價格變動明細失敗 | Error: {e}")
return jsonify({'products': []}), 500
finally:
session.close()
@api_bp.route('/api/track_momo_link', methods=['POST'])
@login_required
def track_momo_link():
"""API: 記錄 MOMO 連結點擊與異常開啟事件,用於診斷自動開啟來源。"""
def _is_blocked_momo_url(url: str) -> bool:
url_l = str(url or '').lower()
if 'ec404.html' in url_l or 'ec404' in url_l:
return True
try:
parsed = urlparse(str(url or ''))
path = (parsed.path or '').lower()
if 'goodsdetail' in path:
query = parse_qs(parsed.query or '')
i_code = (query.get('i_code') or [''])[0]
if i_code:
return not is_probable_momo_icode(i_code)
if not re.search(r'/goodsdetail/[^/]+', path):
return True
except Exception:
pass
return False
payload = request.get_json(silent=True) or {}
url = str(payload.get('url') or '').strip()
effective_url = str(payload.get('effective_url') or '').strip()
if not url:
return jsonify({'status': 'ignored', 'reason': 'missing_url'}), 400
is_blocked = _is_blocked_momo_url(url) or _is_blocked_momo_url(effective_url)
level = "[Web] [MOMO_LINK_TRACK] "
product_id = str(payload.get('product_id', '') or '').strip()
i_code = str(payload.get('i_code', '') or '').strip()
source = str(payload.get('source', '') or 'unknown').strip()
page = str(payload.get('page', '') or '').strip()
label = str(payload.get('label', '') or '').strip()
platform = str(payload.get('platform', '') or 'momo').strip()
product_name = str(payload.get('product_name', '') or '').strip()
referer = request.headers.get('Referer', '')
user_ip = request.remote_addr
if not effective_url:
effective_url = url
msg = (
f"{level}platform={platform} source={source} page={page} "
f"i_code={i_code} product_id={product_id} label={label} "
f"name={product_name} url={url} effective_url={effective_url} ip={user_ip} referer={referer}"
)
if is_blocked:
sys_log.warning(msg + " | status=blocked_link")
else:
sys_log.info(msg + " | status=tracked")
return jsonify({'status': 'ok'})