381 lines
17 KiB
Python
381 lines
17 KiB
Python
import logging
|
||
import os
|
||
import json
|
||
import requests
|
||
import config
|
||
import re
|
||
import smtplib
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.image import MIMEImage
|
||
from services.telegram_templates import send_telegram_with_result
|
||
|
||
def _format_product_changes_message(products, title="WOOO TECH 商品異動通知", public_url=None):
|
||
"""
|
||
將商品異動列表格式化為易讀的區塊式訊息。
|
||
|
||
:param products: PromoProduct 或類似結構的物件列表
|
||
:param title: 訊息的標題
|
||
:return: 一個或多個格式化後的訊息字串列表 (為避免超長,會自動分段)
|
||
"""
|
||
if not products:
|
||
return []
|
||
|
||
# 1. 按狀態分組
|
||
grouped = defaultdict(list)
|
||
for p in products:
|
||
grouped[p.status_change].append(p)
|
||
|
||
# V-New: 儀表板連結區塊
|
||
dashboard_link = f"{public_url}/edm" if public_url else "https://mo.wooo.work/edm"
|
||
header_block = [
|
||
f"📊 {title}",
|
||
"完整內容請看:",
|
||
"=" * 30,
|
||
dashboard_link,
|
||
"=" * 30,
|
||
"",
|
||
"✨ 本輪重點摘要:"
|
||
]
|
||
|
||
# 2. 產生摘要
|
||
summary_lines = header_block
|
||
if grouped.get("NEW"): summary_lines.append(f" - 🟢 新增商品: {len(grouped['NEW'])} 件")
|
||
if grouped.get("PRICE_DOWN"): summary_lines.append(f" - 📉 價格下跌: {len(grouped['PRICE_DOWN'])} 件")
|
||
if grouped.get("PRICE_UP"): summary_lines.append(f" - 📈 價格上漲: {len(grouped['PRICE_UP'])} 件")
|
||
if grouped.get("UPDATE"): summary_lines.append(f" - ℹ️ 資訊更新: {len(grouped['UPDATE'])} 件")
|
||
if grouped.get("DELISTED"): summary_lines.append(f" - 🗑️ 商品下架: {len(grouped['DELISTED'])} 件")
|
||
if grouped.get("SLOT_END"): summary_lines.append(f" - ⏳ 時段結束: {len(grouped['SLOT_END'])} 件")
|
||
|
||
summary_lines.append("\n👇 詳細異動清單如下:")
|
||
|
||
# 3. 產生詳細清單
|
||
details_lines = []
|
||
|
||
# 優先顯示順序
|
||
status_order = [
|
||
("PRICE_DOWN", "📉 價格下跌"),
|
||
("PRICE_UP", "📈 價格上漲"),
|
||
("NEW", "🟢 新增商品"),
|
||
("UPDATE", "ℹ️ 資訊更新"),
|
||
("DELISTED", "🗑️ 商品下架"),
|
||
("SLOT_END", "⏳ 時段結束"),
|
||
]
|
||
|
||
for status, header in status_order:
|
||
if status in grouped:
|
||
items = grouped[status]
|
||
details_lines.append(f"\n--- {header} / 共 {len(items)} 件 ---")
|
||
for item in items:
|
||
# V-New: 價格顯示邏輯優化 (帶出原價)
|
||
current_price = item.price if item.price is not None else 0
|
||
price_str = f"${current_price:,.0f}"
|
||
|
||
# 價格變動顯示
|
||
if item.status_change in ['PRICE_DOWN', 'PRICE_UP', 'UPDATE'] and item.previous_price:
|
||
if item.previous_price != current_price:
|
||
arrow = "▼" if current_price < item.previous_price else "▲"
|
||
price_str = f"${current_price:,.0f} ({arrow} 原 ${item.previous_price:,.0f})"
|
||
|
||
# 庫存顯示 (僅限 EDM)
|
||
qty_str = f" 🔥剩 {item.remain_qty} 組" if hasattr(item, 'remain_qty') and item.remain_qty is not None else ""
|
||
|
||
# V-New: 移除商品連結,僅保留名稱與價格
|
||
details_lines.append(
|
||
f"\n{item.name}{qty_str}\n"
|
||
f"💰 {price_str}"
|
||
)
|
||
|
||
# 4. 組合並分割訊息 (避免超長)
|
||
# Line/Telegram 字數上限約 4000-5000,我們用 3800 作為安全值
|
||
MAX_LENGTH = 3800
|
||
|
||
messages = []
|
||
current_message = "\n".join(summary_lines)
|
||
|
||
for line in details_lines:
|
||
if len(current_message) + len(line) + 1 > MAX_LENGTH:
|
||
messages.append(current_message)
|
||
current_message = "" # 開始一則新訊息
|
||
|
||
if not current_message: # 如果是新訊息的開頭,加上標題
|
||
current_message = f"📄 {title} (續)"
|
||
|
||
current_message += "\n" + line
|
||
|
||
if current_message:
|
||
messages.append(current_message)
|
||
|
||
return messages
|
||
|
||
class NotificationManager:
|
||
def __init__(self):
|
||
self.logger = logging.getLogger("System")
|
||
# Line
|
||
self.line_token = getattr(config, 'LINE_CHANNEL_ACCESS_TOKEN', None)
|
||
|
||
# V-Fix: 同時支援 Group ID 與 User ID
|
||
line_ids_config = getattr(config, 'LINE_GROUP_ID', [])
|
||
if not isinstance(line_ids_config, list):
|
||
line_ids_config = [line_ids_config]
|
||
|
||
user_id_config = getattr(config, 'LINE_USER_ID', None)
|
||
if user_id_config:
|
||
line_ids_config.append(user_id_config)
|
||
|
||
self.line_group_ids = []
|
||
for gid in line_ids_config:
|
||
if gid:
|
||
# V-Fix: 改回僅去除前後空白,避免過度清洗導致誤刪有效符號
|
||
raw_id = str(gid)
|
||
clean_id = raw_id.strip()
|
||
|
||
# 增加除錯日誌,顯示原始讀取到的 ID
|
||
if len(clean_id) != 33:
|
||
self.logger.warning(f"[Notification] [Line] ⚠️ LINE ID 長度異常 | Raw: '{raw_id}' | Clean: '{clean_id}' | Len: {len(clean_id)} | Info: 標準 ID (User/Group) 通常為 33 碼")
|
||
self.line_group_ids.append(clean_id)
|
||
|
||
# Telegram
|
||
self.telegram_token = getattr(config, 'TELEGRAM_BOT_TOKEN', None)
|
||
# 支援 TELEGRAM_CHAT_IDS (列表) 或舊的 TELEGRAM_CHAT_ID (字串)
|
||
chat_ids_config = getattr(config, 'TELEGRAM_CHAT_IDS', getattr(config, 'TELEGRAM_CHAT_ID', []))
|
||
if not isinstance(chat_ids_config, list):
|
||
# 如果是舊的字串格式,將其轉換為只有一個元素的列表
|
||
chat_ids_config = [chat_ids_config]
|
||
# 過濾掉空的 ID 並確保所有 ID 都是字串
|
||
self.telegram_chat_ids = [str(cid).strip() for cid in chat_ids_config if cid]
|
||
# Email
|
||
self.email_host = getattr(config, 'EMAIL_HOST', None)
|
||
self.email_port = getattr(config, 'EMAIL_PORT', 587)
|
||
self.email_user = getattr(config, 'EMAIL_HOST_USER', None)
|
||
self.email_pass = getattr(config, 'EMAIL_HOST_PASSWORD', None)
|
||
self.email_sender = getattr(config, 'EMAIL_SENDER', None)
|
||
self.email_receiver = getattr(config, 'EMAIL_RECEIVER', None)
|
||
|
||
# V-New: 讀取公開網址 (用於圖片連結)
|
||
self.public_url = None
|
||
try:
|
||
url_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'url_config.json')
|
||
if os.path.exists(url_config_path):
|
||
with open(url_config_path, 'r') as f:
|
||
self.public_url = json.load(f).get('public_url')
|
||
except Exception:
|
||
self.logger.exception("[Notification] 讀取 public_url 設定失敗")
|
||
|
||
def _send_line_messages(self, messages, image_url=None):
|
||
# 檢查 LINE 是否啟用
|
||
line_enabled = getattr(config, 'LINE_ENABLED', False)
|
||
if not line_enabled:
|
||
self.logger.info("[Notification] [Line] ⏸️ LINE 通知已停用 (LINE_ENABLED=false)")
|
||
return False
|
||
|
||
if not self.line_token or not self.line_group_ids:
|
||
self.logger.warning("[Notification] [Line] ⚠️ LINE 通知未設定 (Token 或 Group ID 缺失)")
|
||
return False
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {self.line_token}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
success = True
|
||
for group_id in self.line_group_ids:
|
||
for message in messages:
|
||
# V-Fix: 確保不發送空訊息,避免 LINE API 回傳 400 錯誤
|
||
if not message or not message.strip():
|
||
continue
|
||
|
||
payload = {
|
||
"to": group_id,
|
||
"messages": [{"type": "text", "text": message}]
|
||
}
|
||
|
||
# V-New: 如果有圖片且是最後一則訊息,附加圖片
|
||
if image_url and message == messages[-1]:
|
||
payload['messages'].append({
|
||
"type": "image",
|
||
"originalContentUrl": image_url,
|
||
"previewImageUrl": image_url
|
||
})
|
||
|
||
try:
|
||
resp = requests.post("https://api.line.me/v2/bot/message/push", headers=headers, json=payload, timeout=10)
|
||
if resp.status_code != 200:
|
||
self.logger.error(f"[Notification] [Line] ❌ LINE 通知發送失敗 | Target: '{group_id}' | Len: {len(group_id)} | Code: {resp.status_code} | Body: {resp.text}")
|
||
# V-New: 增加針對 ID 格式錯誤的明確建議
|
||
if resp.status_code == 400 and "to" in resp.text:
|
||
self.logger.error("[Notification] [Line] 💡 診斷建議: LINE API 拒絕了此 ID。您的 ID 長度為 32 碼,但標準格式通常為 33 碼 (U/C + 32位Hex)。請檢查 config.py 是否少複製了一碼。")
|
||
success = False
|
||
except Exception as e:
|
||
self.logger.error(f"[Notification] [Line] ❌ LINE 連線錯誤 | Target: {group_id} | Error: {e}")
|
||
success = False
|
||
|
||
if success: self.logger.info("[Notification] [Line] ✅ LINE 通知發送成功")
|
||
return success
|
||
|
||
def _send_telegram_messages(self, messages, image_path=None):
|
||
if not self.telegram_token or not self.telegram_chat_ids:
|
||
self.logger.warning("[Notification] [Telegram] ⚠️ Telegram 通知未設定 (Token 或 Chat ID 缺失)")
|
||
return False
|
||
|
||
success = True
|
||
for message in messages:
|
||
result = send_telegram_with_result(message, chat_ids=self.telegram_chat_ids, parse_mode=None)
|
||
if result["failed"] > 0 or result["sent"] == 0:
|
||
self.logger.error(
|
||
"[Notification] [Telegram] ❌ Telegram 通知發送失敗 | Sent: %s | Failed: %s | Errors: %s",
|
||
result["sent"], result["failed"], result["errors"]
|
||
)
|
||
success = False
|
||
|
||
if image_path and os.path.exists(image_path):
|
||
for chat_id in self.telegram_chat_ids:
|
||
try:
|
||
with open(image_path, 'rb') as photo:
|
||
requests.post(
|
||
f"https://api.telegram.org/bot{self.telegram_token}/sendDocument",
|
||
data={'chat_id': chat_id},
|
||
files={'document': photo},
|
||
timeout=20
|
||
)
|
||
except Exception as e:
|
||
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 圖片發送失敗 | ChatID: {chat_id} | Error: {e}")
|
||
success = False
|
||
|
||
if success: self.logger.info("[Notification] [Telegram] ✅ Telegram 通知發送成功")
|
||
return success
|
||
|
||
def _send_email_message(self, subject, markdown_content, image_path=None):
|
||
if not all([self.email_host, self.email_user, self.email_pass, self.email_sender, self.email_receiver]):
|
||
self.logger.warning("[Notification] [Email] ⚠️ Email 通知未完整設定 (Host/User/Pass/Sender/Receiver 缺失)")
|
||
return False
|
||
|
||
msg = MIMEMultipart()
|
||
msg['From'] = self.email_sender
|
||
msg['To'] = self.email_receiver
|
||
msg['Subject'] = subject
|
||
|
||
html_body = markdown_content.replace('\n', '<br>')
|
||
html_body = re.sub(r'\[(.*?)\]\((.*?)\)', r'<a href="\2">\1</a>', html_body)
|
||
html_body = re.sub(r'---', r'<hr>', html_body)
|
||
|
||
msg.attach(MIMEText(f"<html><body>{html_body}</body></html>", 'html', 'utf-8'))
|
||
|
||
# V-New: 附加圖片
|
||
if image_path and os.path.exists(image_path):
|
||
try:
|
||
with open(image_path, 'rb') as f:
|
||
img_data = f.read()
|
||
image = MIMEImage(img_data, name=os.path.basename(image_path))
|
||
msg.attach(image)
|
||
except Exception as e:
|
||
self.logger.error(f"[Notification] [Email] ❌ Email 圖片附加失敗 | Error: {e}")
|
||
|
||
try:
|
||
with smtplib.SMTP(self.email_host, self.email_port) as server:
|
||
server.starttls()
|
||
server.login(self.email_user, self.email_pass)
|
||
server.send_message(msg)
|
||
self.logger.info("[Notification] [Email] ✅ Email 通知發送成功")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"[Notification] [Email] ❌ Email 連線或發送錯誤 | Error: {e}")
|
||
return False
|
||
|
||
def send_edm_report(self, products, screenshot_path=None):
|
||
if not products:
|
||
self.logger.info("[Notification] [EDM] 📢 無 EDM 商品異動,不發送通知。")
|
||
return
|
||
|
||
# V-New: 標題改為 "更新通知"
|
||
title = f"🔥 限時搶購 更新通知 ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
|
||
messages = _format_product_changes_message(products, title, self.public_url)
|
||
|
||
if not messages:
|
||
self.logger.info("[Notification] [EDM] 📢 格式化後無訊息內容,不發送通知。")
|
||
return
|
||
|
||
# V-New: 準備圖片公開連結 (給 Line 用)
|
||
image_url = None
|
||
if screenshot_path and self.public_url:
|
||
filename = os.path.basename(screenshot_path)
|
||
image_url = f"{self.public_url}/static/screenshots/{filename}"
|
||
|
||
self.logger.info(f"[Notification] [EDM] 📢 準備發送 EDM 通知至所有頻道...")
|
||
self._send_line_messages(messages, image_url)
|
||
self._send_telegram_messages(messages, screenshot_path)
|
||
|
||
email_body = "\n\n".join(messages)
|
||
self._send_email_message(title, email_body, screenshot_path)
|
||
|
||
def send_momo_report(self, stats, screenshot_path=None):
|
||
"""發送商品看板異動通知"""
|
||
if not stats or (stats.get('up', 0) == 0 and stats.get('down', 0) == 0 and stats.get('new', 0) == 0 and stats.get('delisted', 0) == 0):
|
||
self.logger.info("[Notification] [MOMO] 📢 無商品異動,不發送通知。")
|
||
return
|
||
|
||
title = f"🛍️ 商品看板 更新通知 ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
|
||
|
||
dashboard_url = self.public_url if self.public_url else "https://mo.wooo.work"
|
||
|
||
summary_lines = [
|
||
f"📊 {title}",
|
||
f"詳細數據請參閱儀表板: {dashboard_url}",
|
||
"="*25,
|
||
"📈 本日異動摘要:"
|
||
]
|
||
if stats.get("up", 0) > 0:
|
||
summary_lines.append(f" - ▲ 漲價商品: {stats['up']} 件")
|
||
if stats.get("down", 0) > 0:
|
||
summary_lines.append(f" - ▼ 跌價商品: {stats['down']} 件")
|
||
if stats.get("new", 0) > 0:
|
||
summary_lines.append(f" - 🟢 新上架: {stats['new']} 件")
|
||
if stats.get("delisted", 0) > 0:
|
||
summary_lines.append(f" - 🗑️ 已下架: {stats['delisted']} 件")
|
||
|
||
message = "\n".join(summary_lines)
|
||
messages = [message]
|
||
|
||
image_url = None
|
||
if screenshot_path and self.public_url:
|
||
filename = os.path.basename(screenshot_path)
|
||
image_url = f"{self.public_url}/static/screenshots/{filename}"
|
||
|
||
self.logger.info(f"[Notification] [MOMO] 📢 準備發送商品看板通知至所有頻道...")
|
||
self._send_line_messages(messages, image_url)
|
||
self._send_telegram_messages(messages, screenshot_path)
|
||
|
||
email_body = "\n\n".join(messages)
|
||
self._send_email_message(title, email_body, screenshot_path)
|
||
|
||
def send_daily_report(self):
|
||
msg = f"📊 每日監控報表 ({datetime.now().strftime('%Y-%m-%d')})\n系統運作正常。"
|
||
self._send_line_messages([msg])
|
||
self._send_telegram_messages([msg])
|
||
self._send_email_message(f"📊 每日監控報表 ({datetime.now().strftime('%Y-%m-%d')})", msg)
|
||
|
||
def send_whitepage_alert(self, url, error_msg):
|
||
"""
|
||
發送網頁白頁/異常告警
|
||
|
||
Args:
|
||
url: 檢測的網址
|
||
error_msg: 錯誤訊息
|
||
"""
|
||
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
msg = (
|
||
f"🚨 網頁異常告警 ({now_str})\n"
|
||
f"{'='*30}\n"
|
||
f"❌ 檢測狀態:異常\n"
|
||
f"🌐 檢測網址:{url}\n"
|
||
f"📌 錯誤訊息:{error_msg}\n"
|
||
f"{'='*30}\n"
|
||
f"請立即檢查服務狀態!"
|
||
)
|
||
self.logger.warning(f"[Notification] [Whitepage] 🚨 準備發送白頁告警 | URL: {url} | Error: {error_msg}")
|
||
self._send_line_messages([msg])
|
||
self._send_telegram_messages([msg])
|
||
self._send_email_message(f"🚨 網頁異常告警 - {url}", msg)
|