872 lines
29 KiB
Python
872 lines
29 KiB
Python
"""
|
||
PChome 24h 爬蟲服務
|
||
|
||
爬取 PChome 24h 商品資料,支援:
|
||
- 館別頁面爬取 (如 /region/DDAB)
|
||
- 商品詳細資料取得
|
||
- 批次查詢商品 API
|
||
|
||
API 參考:
|
||
- 商品 API: https://ecapi-cdn.pchome.com.tw/cdn/ecshop/prodapi/v2/prod?id=ID1,ID2,...
|
||
- 圖片 URL: https://img.pchome.com.tw/cs{Pic.B}
|
||
"""
|
||
|
||
import re
|
||
import time
|
||
import logging
|
||
from typing import List, Dict, Optional, Tuple
|
||
from dataclasses import dataclass, asdict
|
||
from datetime import datetime
|
||
|
||
import requests
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class PChomeProduct:
|
||
"""PChome 商品資料結構"""
|
||
product_id: str # 商品 ID (如 DDABSD-1900HIE3P)
|
||
name: str # 商品名稱
|
||
price: int # 售價
|
||
original_price: int # 原價
|
||
discount: Optional[int] # 折扣 (%)
|
||
image_url: str # 圖片 URL
|
||
product_url: str # 商品頁面 URL
|
||
stock: int # 庫存數量
|
||
store: str # 店家代碼
|
||
rating: Optional[float] # 評分
|
||
review_count: int # 評論數
|
||
is_on_sale: bool # 是否特價中
|
||
crawled_at: datetime # 爬取時間
|
||
|
||
def to_dict(self) -> dict:
|
||
"""轉換為字典"""
|
||
data = asdict(self)
|
||
data['crawled_at'] = self.crawled_at.isoformat()
|
||
return data
|
||
|
||
|
||
class PChomeCrawler:
|
||
"""PChome 24h 爬蟲"""
|
||
|
||
# 基礎 URL
|
||
BASE_URL = 'https://24h.pchome.com.tw'
|
||
API_URL = 'https://ecapi-cdn.pchome.com.tw/cdn/ecshop/prodapi/v2/prod'
|
||
IMAGE_BASE_URL = 'https://img.pchome.com.tw/cs'
|
||
|
||
# 預設 Headers
|
||
DEFAULT_HEADERS = {
|
||
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||
'Accept': 'application/json, text/html,application/xhtml+xml',
|
||
'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8',
|
||
'Referer': 'https://24h.pchome.com.tw/',
|
||
}
|
||
|
||
# 商品 ID 正則表達式
|
||
PRODUCT_ID_PATTERN = re.compile(r'[A-Z]{4}[A-Z0-9]{2}-?[A-Z0-9]{8,10}')
|
||
|
||
def __init__(
|
||
self,
|
||
timeout: int = 30,
|
||
delay: float = 1.0,
|
||
max_retries: int = 2,
|
||
retry_backoff: float = 0.8,
|
||
):
|
||
"""
|
||
初始化爬蟲
|
||
|
||
Args:
|
||
timeout: 請求超時時間 (秒)
|
||
delay: 請求間隔延遲 (秒),避免過度頻繁請求
|
||
max_retries: 暫時性錯誤的重試次數
|
||
retry_backoff: 指數退避基礎秒數
|
||
"""
|
||
self.timeout = timeout
|
||
self.delay = delay
|
||
self.max_retries = max(0, int(max_retries))
|
||
self.retry_backoff = max(0.0, float(retry_backoff))
|
||
self.session = requests.Session()
|
||
self.session.headers.update(self.DEFAULT_HEADERS)
|
||
self._last_request_time = 0
|
||
|
||
def _rate_limit(self):
|
||
"""速率限制"""
|
||
elapsed = time.time() - self._last_request_time
|
||
if elapsed < self.delay:
|
||
time.sleep(self.delay - elapsed)
|
||
self._last_request_time = time.time()
|
||
|
||
def _get_with_retry(self, url: str, **kwargs) -> requests.Response:
|
||
"""GET with polite rate limiting and bounded retry for transient failures."""
|
||
retryable_statuses = {429, 500, 502, 503, 504}
|
||
last_error = None
|
||
for attempt in range(self.max_retries + 1):
|
||
self._rate_limit()
|
||
try:
|
||
response = self.session.get(url, **kwargs)
|
||
status_code = getattr(response, "status_code", 200)
|
||
if (
|
||
status_code in retryable_statuses
|
||
and attempt < self.max_retries
|
||
):
|
||
last_error = requests.HTTPError(
|
||
f"HTTP {status_code} for {url}",
|
||
response=response,
|
||
)
|
||
else:
|
||
response.raise_for_status()
|
||
return response
|
||
except (requests.Timeout, requests.ConnectionError, requests.HTTPError) as exc:
|
||
last_error = exc
|
||
response = getattr(exc, "response", None)
|
||
status_code = getattr(response, "status_code", None)
|
||
if (
|
||
attempt >= self.max_retries
|
||
or (
|
||
isinstance(exc, requests.HTTPError)
|
||
and status_code not in retryable_statuses
|
||
)
|
||
):
|
||
raise
|
||
|
||
sleep_sec = self.retry_backoff * (2 ** attempt)
|
||
if sleep_sec > 0:
|
||
time.sleep(sleep_sec)
|
||
|
||
if last_error:
|
||
raise last_error
|
||
raise requests.RequestException(f"GET failed: {url}")
|
||
|
||
def _normalize_product_id(self, product_id: str) -> str:
|
||
"""
|
||
正規化商品 ID 格式
|
||
|
||
Args:
|
||
product_id: 原始商品 ID
|
||
|
||
Returns:
|
||
正規化後的 ID (格式: XXXXXX-XXXXXXXX)
|
||
"""
|
||
# 移除空白
|
||
product_id = product_id.strip()
|
||
|
||
# 如果已經有 - 就直接返回
|
||
if '-' in product_id:
|
||
return product_id
|
||
|
||
# 在第 6 個字元後加入 -
|
||
if len(product_id) > 6:
|
||
return f"{product_id[:6]}-{product_id[6:]}"
|
||
|
||
return product_id
|
||
|
||
def _extract_product_ids_from_html(self, html: str) -> List[str]:
|
||
"""
|
||
從 HTML 中提取商品 ID
|
||
|
||
Args:
|
||
html: 頁面 HTML 內容
|
||
|
||
Returns:
|
||
商品 ID 列表 (已去重)
|
||
"""
|
||
raw_ids = self.PRODUCT_ID_PATTERN.findall(html)
|
||
|
||
# 正規化並去重
|
||
normalized_ids = set()
|
||
for pid in raw_ids:
|
||
normalized = self._normalize_product_id(pid)
|
||
normalized_ids.add(normalized)
|
||
|
||
return list(normalized_ids)
|
||
|
||
def fetch_region_page(self, region_code: str) -> Tuple[bool, str, List[str]]:
|
||
"""
|
||
爬取館別頁面,取得商品 ID 列表
|
||
|
||
Args:
|
||
region_code: 館別代碼 (如 DDAB)
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品 ID 列表)
|
||
"""
|
||
url = f"{self.BASE_URL}/region/{region_code}"
|
||
|
||
try:
|
||
response = self._get_with_retry(url, timeout=self.timeout)
|
||
|
||
product_ids = self._extract_product_ids_from_html(response.text)
|
||
logger.info(f"從 {url} 取得 {len(product_ids)} 個商品 ID")
|
||
|
||
return True, f"成功取得 {len(product_ids)} 個商品", product_ids
|
||
|
||
except requests.RequestException as e:
|
||
logger.error(f"爬取 {url} 失敗: {e}")
|
||
return False, f"請求失敗: {str(e)}", []
|
||
|
||
def fetch_product_details(self, product_ids: List[str], batch_size: int = 20) -> Tuple[bool, str, List[PChomeProduct]]:
|
||
"""
|
||
批次取得商品詳細資料
|
||
|
||
Args:
|
||
product_ids: 商品 ID 列表
|
||
batch_size: 每批次查詢數量 (API 限制約 20-30 個)
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品資料列表)
|
||
"""
|
||
if not product_ids:
|
||
return False, "沒有提供商品 ID", []
|
||
|
||
all_products = []
|
||
failed_count = 0
|
||
|
||
# 分批處理
|
||
for i in range(0, len(product_ids), batch_size):
|
||
batch = product_ids[i:i + batch_size]
|
||
|
||
try:
|
||
# 呼叫商品 API
|
||
params = {'id': ','.join(batch)}
|
||
response = self._get_with_retry(
|
||
self.API_URL,
|
||
params=params,
|
||
timeout=self.timeout
|
||
)
|
||
|
||
data = response.json()
|
||
crawled_at = datetime.now()
|
||
|
||
if isinstance(data, dict):
|
||
product_entries = data.items()
|
||
product_count = len(data)
|
||
elif isinstance(data, list):
|
||
product_entries = [
|
||
((item or {}).get('Id') or f'index_{idx}', item)
|
||
for idx, item in enumerate(data)
|
||
if isinstance(item, dict)
|
||
]
|
||
product_count = len(product_entries)
|
||
failed_count += max(0, len(data) - product_count)
|
||
else:
|
||
logger.warning(
|
||
"PChome 商品 API 回傳格式異常 (批次 %s): %s",
|
||
i // batch_size + 1,
|
||
type(data).__name__,
|
||
)
|
||
failed_count += len(batch)
|
||
continue
|
||
|
||
# 解析商品資料
|
||
for prod_key, prod_data in product_entries:
|
||
try:
|
||
product = self._parse_product_data(prod_data, crawled_at)
|
||
if product:
|
||
all_products.append(product)
|
||
except Exception as e:
|
||
logger.warning(f"解析商品 {prod_key} 失敗: {e}")
|
||
failed_count += 1
|
||
|
||
logger.info(f"批次 {i // batch_size + 1}: 取得 {product_count} 個商品資料")
|
||
|
||
except requests.RequestException as e:
|
||
logger.error(f"API 請求失敗 (批次 {i // batch_size + 1}): {e}")
|
||
failed_count += len(batch)
|
||
|
||
message = f"成功取得 {len(all_products)} 個商品資料"
|
||
if failed_count > 0:
|
||
message += f",{failed_count} 個失敗"
|
||
|
||
return len(all_products) > 0, message, all_products
|
||
|
||
def _parse_product_data(self, data: dict, crawled_at: datetime) -> Optional[PChomeProduct]:
|
||
"""
|
||
解析 API 回傳的商品資料
|
||
|
||
Args:
|
||
data: API 回傳的商品資料
|
||
crawled_at: 爬取時間
|
||
|
||
Returns:
|
||
PChomeProduct 物件
|
||
"""
|
||
try:
|
||
product_id = data.get('Id', '')
|
||
# 移除尾部的 -000
|
||
if product_id.endswith('-000'):
|
||
product_id = product_id[:-4]
|
||
|
||
# 取得價格資訊
|
||
# [2026-04-18 台北] Bug-1 防禦 Layer C:PChome API 若改版導致 'P' 欄位消失,
|
||
# 舊版靜默歸零 → DB 寫入 $0 → 全鏈路幻覺。改為偵測到缺值時 return None,
|
||
# 由上層決定跳過 — Claude Opus 4.7
|
||
price_info = data.get('Price', {})
|
||
if isinstance(price_info, dict):
|
||
price = price_info.get('P')
|
||
if price is None or price == 0:
|
||
logger.warning(
|
||
f"[PChome] Id={data.get('Id', '?')} 價格欄位 Price.P 缺失或為 0,"
|
||
f"疑似 API 格式變更或商品下架,跳過此筆"
|
||
)
|
||
return None
|
||
original_price = price_info.get('M', price)
|
||
else:
|
||
price = price_info
|
||
if not price:
|
||
logger.warning(
|
||
f"[PChome] Id={data.get('Id', '?')} 價格欄位為非 dict 且為空,跳過"
|
||
)
|
||
return None
|
||
original_price = price
|
||
|
||
# 計算折扣
|
||
discount = None
|
||
if original_price and original_price > price:
|
||
discount = round((1 - price / original_price) * 100)
|
||
|
||
# 取得圖片 URL
|
||
pic_info = data.get('Pic', {})
|
||
if isinstance(pic_info, dict):
|
||
pic_path = pic_info.get('B', '')
|
||
else:
|
||
pic_path = pic_info or ''
|
||
|
||
image_url = f"{self.IMAGE_BASE_URL}{pic_path}" if pic_path else ''
|
||
|
||
return PChomeProduct(
|
||
product_id=product_id,
|
||
name=data.get('Name', ''),
|
||
price=price,
|
||
original_price=original_price,
|
||
discount=discount,
|
||
image_url=image_url,
|
||
product_url=f"{self.BASE_URL}/prod/{product_id}",
|
||
stock=data.get('Qty', 0),
|
||
store=data.get('Store', ''),
|
||
rating=data.get('RatingValue'),
|
||
review_count=data.get('ReviewCount', 0),
|
||
is_on_sale=data.get('isOnSale', False),
|
||
crawled_at=crawled_at
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析商品資料失敗: {e}")
|
||
return None
|
||
|
||
def crawl_region(self, region_code: str) -> Tuple[bool, str, List[PChomeProduct]]:
|
||
"""
|
||
完整爬取館別頁面 (取得 ID + 詳細資料)
|
||
|
||
Args:
|
||
region_code: 館別代碼
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品資料列表)
|
||
"""
|
||
# Step 1: 取得商品 ID
|
||
success, message, product_ids = self.fetch_region_page(region_code)
|
||
if not success:
|
||
return False, message, []
|
||
|
||
if not product_ids:
|
||
return False, "頁面中沒有找到商品", []
|
||
|
||
# Step 2: 取得詳細資料
|
||
success, message, products = self.fetch_product_details(product_ids)
|
||
|
||
return success, message, products
|
||
|
||
def search_products(
|
||
self,
|
||
keyword: str,
|
||
limit: int = 50,
|
||
max_pages: Optional[int] = None,
|
||
sort: str = "rnk/dc",
|
||
) -> Tuple[bool, str, List[PChomeProduct]]:
|
||
"""
|
||
搜尋商品 (使用搜尋 API)
|
||
|
||
Args:
|
||
keyword: 搜尋關鍵字
|
||
limit: 最多回傳數量
|
||
max_pages: 搜尋結果最多掃描頁數;預設依 limit 最多掃到 3 頁
|
||
sort: 搜尋排序;預設 relevance ranking (`rnk/dc`)
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品資料列表)
|
||
"""
|
||
search_url = f"https://ecshweb.pchome.com.tw/search/v4.3/all/results"
|
||
limit = max(1, int(limit or 1))
|
||
page_cap = max_pages if max_pages is not None else min(3, max(1, (limit // 20) + 1))
|
||
page_cap = max(1, int(page_cap or 1))
|
||
|
||
try:
|
||
product_ids = []
|
||
seen_ids = set()
|
||
pages_scanned = 0
|
||
for page in range(1, page_cap + 1):
|
||
params = {
|
||
'q': keyword,
|
||
'page': page,
|
||
'sort': sort,
|
||
'cateid': '24h',
|
||
}
|
||
response = self._get_with_retry(search_url, params=params, timeout=self.timeout)
|
||
pages_scanned += 1
|
||
|
||
data = response.json()
|
||
prods = data.get('Prods', [])
|
||
if not prods:
|
||
break
|
||
|
||
for item in prods:
|
||
product_id = item.get('Id', '')
|
||
if not product_id or product_id in seen_ids:
|
||
continue
|
||
seen_ids.add(product_id)
|
||
product_ids.append(product_id)
|
||
if len(product_ids) >= limit:
|
||
break
|
||
if len(product_ids) >= limit:
|
||
break
|
||
|
||
if not product_ids:
|
||
return False, "沒有找到符合的商品", []
|
||
|
||
# 取得詳細資料
|
||
success, message, products = self.fetch_product_details(product_ids[:limit])
|
||
if success:
|
||
message = f"{message};搜尋頁數 {pages_scanned};排序 {sort}"
|
||
return success, message, products
|
||
|
||
except requests.RequestException as e:
|
||
logger.error(f"搜尋失敗: {e}")
|
||
return False, f"搜尋失敗: {str(e)}", []
|
||
|
||
|
||
# 預設爬蟲實例
|
||
_crawler_instance = None
|
||
|
||
|
||
def get_crawler() -> PChomeCrawler:
|
||
"""取得爬蟲實例 (單例模式)"""
|
||
global _crawler_instance
|
||
if _crawler_instance is None:
|
||
_crawler_instance = PChomeCrawler()
|
||
return _crawler_instance
|
||
|
||
|
||
# 快捷函數
|
||
def crawl_pchome_region(region_code: str) -> Tuple[bool, str, List[dict]]:
|
||
"""
|
||
爬取 PChome 館別頁面
|
||
|
||
Args:
|
||
region_code: 館別代碼 (如 DDAB)
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品資料列表)
|
||
"""
|
||
crawler = get_crawler()
|
||
success, message, products = crawler.crawl_region(region_code)
|
||
return success, message, [p.to_dict() for p in products]
|
||
|
||
|
||
def search_pchome_products(keyword: str, limit: int = 50) -> Tuple[bool, str, List[dict]]:
|
||
"""
|
||
搜尋 PChome 商品
|
||
|
||
Args:
|
||
keyword: 搜尋關鍵字
|
||
limit: 最多回傳數量
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品資料列表)
|
||
"""
|
||
crawler = get_crawler()
|
||
success, message, products = crawler.search_products(keyword, limit)
|
||
return success, message, [p.to_dict() for p in products]
|
||
|
||
|
||
def get_pchome_bestsellers(category: str, limit: int = 5) -> Tuple[bool, str, List[dict]]:
|
||
"""
|
||
取得 PChome 分類熱銷商品
|
||
|
||
Args:
|
||
category: 分類關鍵字 (如 '面膜', '乳液', '精華液')
|
||
limit: 最多回傳數量
|
||
|
||
Returns:
|
||
(成功與否, 訊息, 商品資料列表)
|
||
"""
|
||
crawler = get_crawler()
|
||
# 使用搜尋 API,按銷量排序
|
||
success, message, products = crawler.search_products(category, limit=limit)
|
||
if success and products:
|
||
# 轉換為精簡格式
|
||
result = []
|
||
for p in products[:limit]:
|
||
result.append({
|
||
'name': p.name,
|
||
'price': p.price,
|
||
'original_price': p.original_price,
|
||
'discount': p.discount,
|
||
'url': p.product_url,
|
||
'image': p.image_url
|
||
})
|
||
return True, f"成功取得 {len(result)} 個熱銷商品", result
|
||
return success, message, []
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 測試
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
print("=== PChome 爬蟲測試 ===\n")
|
||
|
||
# 測試館別爬取
|
||
print("[1] 測試館別爬取 (DDAB - 美妝保養)")
|
||
success, msg, products = crawl_pchome_region('DDAB')
|
||
print(f"結果: {msg}")
|
||
if products:
|
||
print(f"範例商品:")
|
||
for p in products[:3]:
|
||
print(f" - {p['name'][:30]}... ${p['price']} (原價 ${p['original_price']})")
|
||
|
||
print("\n" + "=" * 50 + "\n")
|
||
|
||
# 測試搜尋
|
||
print("[2] 測試搜尋 (關鍵字: iPhone)")
|
||
success, msg, products = search_pchome_products('iPhone', limit=5)
|
||
print(f"結果: {msg}")
|
||
if products:
|
||
print(f"搜尋結果:")
|
||
for p in products[:3]:
|
||
print(f" - {p['name'][:30]}... ${p['price']}")
|
||
|
||
|
||
# =============================================================================
|
||
# 高階競品比較函數(供 openclaw_bot_routes 使用)
|
||
# =============================================================================
|
||
|
||
def search_pchome(keyword: str, limit: int = 10) -> List[dict]:
|
||
"""
|
||
搜尋 PChome 商品(簡化版,直接回傳 list)。
|
||
|
||
Returns:
|
||
[{'name', 'price', 'url', 'in_stock'}, ...]
|
||
"""
|
||
ok, _, products = search_pchome_products(keyword, limit=limit)
|
||
if not ok:
|
||
return []
|
||
result = []
|
||
for p in products:
|
||
result.append({
|
||
'name': p.get('name', ''),
|
||
'price': p.get('price', 0),
|
||
'url': p.get('product_url', ''),
|
||
'in_stock': p.get('stock', 0) > 0,
|
||
})
|
||
return result
|
||
|
||
|
||
def find_best_match(keyword: str, momo_price: float) -> Optional[dict]:
|
||
"""
|
||
在 PChome 搜尋最接近 keyword 的商品並回傳最佳匹配。
|
||
|
||
Returns:
|
||
{'name', 'price', 'url', 'price_diff', 'match_score'} or None
|
||
"""
|
||
results = search_pchome(keyword, limit=5)
|
||
if not results:
|
||
return None
|
||
|
||
try:
|
||
from services.marketplace_product_matcher import score_marketplace_match
|
||
best = None
|
||
best_score = 0.0
|
||
best_diagnostics = None
|
||
for result in results:
|
||
diagnostics = score_marketplace_match(
|
||
keyword,
|
||
result.get('name', ''),
|
||
momo_price=momo_price,
|
||
competitor_price=result.get('price'),
|
||
)
|
||
if diagnostics.score > best_score:
|
||
best = result
|
||
best_score = diagnostics.score
|
||
best_diagnostics = diagnostics
|
||
if not best or best_score < 0.76:
|
||
return None
|
||
best['match_score'] = best_score
|
||
best['match_reasons'] = list(getattr(best_diagnostics, 'reasons', ()) or ())
|
||
except Exception:
|
||
logger.warning("[PChome] identity matcher unavailable, fallback to price distance", exc_info=True)
|
||
best = min(results, key=lambda r: abs(r['price'] - momo_price))
|
||
|
||
best['price_diff'] = best['price'] - momo_price
|
||
return best
|
||
|
||
|
||
def compare_product(
|
||
momo_name: str,
|
||
momo_price: float,
|
||
momo_icode: str = '',
|
||
) -> dict:
|
||
"""
|
||
單一商品 momo vs PChome 比價。
|
||
|
||
Returns:
|
||
{
|
||
'momo_name', 'momo_price', 'momo_icode',
|
||
'found': bool,
|
||
'pc_name', 'pc_price', 'pc_url',
|
||
'price_diff': pc_price - momo_price (正值=PChome貴=momo有優勢),
|
||
'price_diff_pct': %
|
||
}
|
||
"""
|
||
base: dict = {
|
||
'momo_name': momo_name,
|
||
'momo_price': momo_price,
|
||
'momo_icode': momo_icode,
|
||
'found': False,
|
||
'pc_name': '',
|
||
'pc_price': 0,
|
||
'pc_url': '',
|
||
'price_diff': 0,
|
||
'price_diff_pct': 0.0,
|
||
}
|
||
try:
|
||
match = find_best_match(momo_name, momo_price)
|
||
if not match:
|
||
return base
|
||
pc_price = float(match.get('price', 0))
|
||
diff = pc_price - momo_price
|
||
pct = (diff / momo_price * 100) if momo_price else 0
|
||
base.update({
|
||
'found': True,
|
||
'pc_name': match.get('name', ''),
|
||
'pc_price': pc_price,
|
||
'pc_url': match.get('url', ''),
|
||
'price_diff': diff,
|
||
'price_diff_pct': pct,
|
||
'match_score': match.get('match_score', 0),
|
||
})
|
||
except Exception as e:
|
||
logger.warning("[PChome] compare_product error: %s", e)
|
||
return base
|
||
|
||
|
||
def batch_compare_top(
|
||
db,
|
||
top_n: int = 30,
|
||
date_str: str = '',
|
||
) -> List[dict]:
|
||
"""
|
||
批量比較 momo TOP-N 熱銷商品 vs PChome。
|
||
|
||
Args:
|
||
db: SQLAlchemy engine(由 _db() 回傳)
|
||
top_n: 取 momo 前 N 名熱銷商品
|
||
date_str: 日期字串,格式 'YYYY/MM/DD';空則取最新日期
|
||
|
||
Returns:
|
||
[compare_product() 結果, ...]
|
||
"""
|
||
results: List[dict] = []
|
||
try:
|
||
from sqlalchemy import text as _text
|
||
|
||
date_filter = ''
|
||
params: dict = {'limit': top_n}
|
||
if date_str:
|
||
date_filter = "WHERE DATE(s.date) = DATE(:date_str)"
|
||
params['date_str'] = date_str.replace('/', '-')
|
||
|
||
sql = f"""
|
||
SELECT p.name, p.i_code,
|
||
COALESCE(SUM(s.revenue), 0) AS total_rev,
|
||
(
|
||
SELECT pr.price
|
||
FROM price_records pr
|
||
WHERE pr.product_id = p.id
|
||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||
LIMIT 1
|
||
) AS momo_price
|
||
FROM products p
|
||
JOIN daily_sales s ON p.id = s.product_id
|
||
{date_filter}
|
||
GROUP BY p.id, p.name, p.i_code
|
||
ORDER BY total_rev DESC
|
||
LIMIT :limit
|
||
"""
|
||
with db.connect() as conn:
|
||
rows = conn.execute(_text(sql), params).fetchall()
|
||
|
||
for row in rows:
|
||
name, icode, rev, momo_price = row[0], row[1], float(row[2] or 0), float(row[3] or 0)
|
||
if momo_price <= 0:
|
||
logger.warning("[PChome] skip %s because latest momo price is missing; total_rev=%s", icode, rev)
|
||
continue
|
||
try:
|
||
cmp = compare_product(name, momo_price, icode)
|
||
results.append(cmp)
|
||
time.sleep(0.4) # 限速
|
||
except Exception as e:
|
||
logger.warning("[PChome] batch item error: %s", e)
|
||
except Exception as e:
|
||
logger.error("[PChome] batch_compare_top error: %s", e)
|
||
return results
|
||
|
||
|
||
def save_matches(db, results: List[dict]) -> None:
|
||
"""
|
||
將比價結果寫入 pchome_matches 表(若不存在則建立)。
|
||
"""
|
||
if not results:
|
||
return
|
||
try:
|
||
from sqlalchemy import text as _text
|
||
ensure_tables(db)
|
||
with db.begin() as conn:
|
||
for r in results:
|
||
if not r.get('found'):
|
||
continue
|
||
conn.execute(_text("""
|
||
INSERT INTO pchome_matches
|
||
(momo_icode, momo_name, momo_price,
|
||
pc_name, pc_price, pc_url,
|
||
price_diff, price_diff_pct, matched_at)
|
||
VALUES
|
||
(:icode, :mname, :mprice,
|
||
:pcname, :pcprice, :pcurl,
|
||
:diff, :pct, NOW())
|
||
ON CONFLICT (momo_icode) DO UPDATE SET
|
||
pc_name = EXCLUDED.pc_name,
|
||
pc_price = EXCLUDED.pc_price,
|
||
pc_url = EXCLUDED.pc_url,
|
||
price_diff = EXCLUDED.price_diff,
|
||
price_diff_pct = EXCLUDED.price_diff_pct,
|
||
matched_at = NOW()
|
||
"""), {
|
||
'icode': r.get('momo_icode', ''),
|
||
'mname': r.get('momo_name', ''),
|
||
'mprice': r.get('momo_price', 0),
|
||
'pcname': r.get('pc_name', ''),
|
||
'pcprice': r.get('pc_price', 0),
|
||
'pcurl': r.get('pc_url', ''),
|
||
'diff': r.get('price_diff', 0),
|
||
'pct': r.get('price_diff_pct', 0),
|
||
})
|
||
except Exception as e:
|
||
logger.warning("[PChome] save_matches error: %s", e)
|
||
|
||
|
||
def ensure_tables(db) -> None:
|
||
"""建立 pchome_matches 表(冪等)"""
|
||
try:
|
||
from sqlalchemy import text as _text
|
||
with db.begin() as conn:
|
||
conn.execute(_text("""
|
||
CREATE TABLE IF NOT EXISTS pchome_matches (
|
||
id SERIAL PRIMARY KEY,
|
||
momo_icode VARCHAR(64) UNIQUE,
|
||
momo_name TEXT,
|
||
momo_price NUMERIC(12,2),
|
||
pc_name TEXT,
|
||
pc_price NUMERIC(12,2),
|
||
pc_url TEXT,
|
||
price_diff NUMERIC(12,2),
|
||
price_diff_pct NUMERIC(8,2),
|
||
matched_at TIMESTAMP DEFAULT NOW()
|
||
)
|
||
"""))
|
||
except Exception as e:
|
||
logger.warning("[PChome] ensure_tables error: %s", e)
|
||
|
||
|
||
def fmt_compare_msg(results: List[dict], keyword: str = '') -> str:
|
||
"""
|
||
格式化單品比價訊息(Telegram Markdown)。
|
||
"""
|
||
if not results:
|
||
return f"⚠️ 找不到「{keyword}」的 PChome 比價資料"
|
||
|
||
lines = [f"🔍 *momo vs PChome 比價|{keyword}*\n"]
|
||
for r in results[:5]:
|
||
found = r.get('found')
|
||
mname = r.get('momo_name', '')[:28]
|
||
mprice = r.get('momo_price', 0)
|
||
|
||
if not found:
|
||
lines.append(f"• {mname}\n momo `NT${mprice:,.0f}` / PChome _未找到_\n")
|
||
continue
|
||
|
||
pcprice = r.get('pc_price', 0)
|
||
diff = r.get('price_diff', 0)
|
||
pct = r.get('price_diff_pct', 0)
|
||
pcurl = r.get('pc_url', '')
|
||
|
||
if diff > 10:
|
||
icon = "✅" # PChome 貴 → momo 有優勢
|
||
note = f"momo 便宜 NT${abs(diff):,.0f}({abs(pct):.1f}%)"
|
||
elif diff < -10:
|
||
icon = "⚠️" # momo 貴
|
||
note = f"PChome 便宜 NT${abs(diff):,.0f}({abs(pct):.1f}%)"
|
||
else:
|
||
icon = "➖"
|
||
note = "價差 <NT$10,持平"
|
||
|
||
lines.append(
|
||
f"{icon} *{mname}*\n"
|
||
f" momo `NT${mprice:,.0f}` / "
|
||
f"[PChome `NT${pcprice:,.0f}`]({pcurl})\n"
|
||
f" {note}\n"
|
||
)
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def fmt_daily_report(results: List[dict], date_str: str = '') -> str:
|
||
"""
|
||
格式化競品日報訊息(Telegram Markdown)。
|
||
"""
|
||
found = [r for r in results if r.get('found')]
|
||
pc_wins = [r for r in found if r.get('price_diff', 0) > 10] # PChome 貴 → momo優
|
||
mo_wins = [r for r in found if r.get('price_diff', 0) < -10] # momo 貴 → PChome優
|
||
avg_pct = (sum(r.get('price_diff_pct', 0) for r in found) / len(found)
|
||
if found else 0)
|
||
|
||
label = date_str or datetime.now().strftime('%Y/%m/%d')
|
||
lines = [
|
||
f"📊 *競品比價日報|{label}*\n",
|
||
f"🔢 掃描 `{len(results)}` 件 | 比對成功 `{len(found)}` 件",
|
||
f"✅ momo 具優勢 `{len(pc_wins)}` 件 | ⚠️ 需注意 `{len(mo_wins)}` 件",
|
||
f"📈 平均價差 `{avg_pct:+.1f}%`(正=PChome貴=momo有優勢)\n",
|
||
]
|
||
|
||
if pc_wins:
|
||
lines.append("🏆 *momo 優勢商品(TOP5)*")
|
||
for r in pc_wins[:5]:
|
||
lines.append(
|
||
f" ✅ {r['momo_name'][:20]} "
|
||
f"momo `NT${r['momo_price']:,.0f}` vs PC `NT${r['pc_price']:,.0f}`"
|
||
f" 省 NT${abs(r['price_diff']):,.0f}"
|
||
)
|
||
lines.append("")
|
||
|
||
if mo_wins:
|
||
lines.append("⚠️ *需注意商品(PChome 更便宜 TOP5)*")
|
||
for r in mo_wins[:5]:
|
||
lines.append(
|
||
f" ⚠️ {r['momo_name'][:20]} "
|
||
f"momo `NT${r['momo_price']:,.0f}` vs PC `NT${r['pc_price']:,.0f}`"
|
||
f" 差 NT${abs(r['price_diff']):,.0f}"
|
||
)
|
||
lines.append("")
|
||
|
||
lines.append("_資料來源:PChome 24h 即時爬取_")
|
||
return "\n".join(lines)
|