Files
ewoooc/services/notification_manager.py
ogt 1b4f3a7bbe
Some checks failed
CD Pipeline / deploy (push) Failing after 59s
feat: EwoooC 初始化 — 完整專案推版至 Gitea
- 建立 Gitea Actions CD pipeline (.gitea/workflows/cd.yaml)
- 部署模式: rsync Python 檔案至 188 → docker restart (volume mount)
- Dockerfile/requirements 變動時自動重建 Docker image
- 部署通知: Telegram (開始/成功/失敗)
- 健康檢查: https://mo.wooo.work/health (最多 5 次重試)
- 同步最新 CLAUDE.md / ADR-008 / memory (2026-04-19)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 01:21:13 +08:00

387 lines
17 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import json
import requests
import config
import re
import smtplib
from datetime import datetime
from collections import defaultdict
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
def _format_product_changes_message(products, title="WOOO TECH 商品異動通知", public_url=None):
"""
將商品異動列表格式化為易讀的區塊式訊息。
:param products: PromoProduct 或類似結構的物件列表
:param title: 訊息的標題
:return: 一個或多個格式化後的訊息字串列表 (為避免超長,會自動分段)
"""
if not products:
return []
# 1. 按狀態分組
grouped = defaultdict(list)
for p in products:
grouped[p.status_change].append(p)
# V-New: 儀表板連結區塊
dashboard_link = f"{public_url}/edm" if public_url else "https://mo.wooo.work/edm"
header_block = [
f"📊 {title}",
"完整內容請看:",
"=" * 30,
dashboard_link,
"=" * 30,
"",
"✨ 本輪重點摘要:"
]
# 2. 產生摘要
summary_lines = header_block
if grouped.get("NEW"): summary_lines.append(f" - 🟢 新增商品: {len(grouped['NEW'])}")
if grouped.get("PRICE_DOWN"): summary_lines.append(f" - 📉 價格下跌: {len(grouped['PRICE_DOWN'])}")
if grouped.get("PRICE_UP"): summary_lines.append(f" - 📈 價格上漲: {len(grouped['PRICE_UP'])}")
if grouped.get("UPDATE"): summary_lines.append(f" - 資訊更新: {len(grouped['UPDATE'])}")
if grouped.get("DELISTED"): summary_lines.append(f" - 🗑️ 商品下架: {len(grouped['DELISTED'])}")
if grouped.get("SLOT_END"): summary_lines.append(f" - ⏳ 時段結束: {len(grouped['SLOT_END'])}")
summary_lines.append("\n👇 詳細異動清單如下:")
# 3. 產生詳細清單
details_lines = []
# 優先顯示順序
status_order = [
("PRICE_DOWN", "📉 價格下跌"),
("PRICE_UP", "📈 價格上漲"),
("NEW", "🟢 新增商品"),
("UPDATE", " 資訊更新"),
("DELISTED", "🗑️ 商品下架"),
("SLOT_END", "⏳ 時段結束"),
]
for status, header in status_order:
if status in grouped:
items = grouped[status]
details_lines.append(f"\n--- {header} / 共 {len(items)} 件 ---")
for item in items:
# V-New: 價格顯示邏輯優化 (帶出原價)
current_price = item.price if item.price is not None else 0
price_str = f"${current_price:,.0f}"
# 價格變動顯示
if item.status_change in ['PRICE_DOWN', 'PRICE_UP', 'UPDATE'] and item.previous_price:
if item.previous_price != current_price:
arrow = "" if current_price < item.previous_price else ""
price_str = f"${current_price:,.0f} ({arrow} 原 ${item.previous_price:,.0f})"
# 庫存顯示 (僅限 EDM)
qty_str = f" 🔥剩 {item.remain_qty}" if hasattr(item, 'remain_qty') and item.remain_qty is not None else ""
# V-New: 移除商品連結,僅保留名稱與價格
details_lines.append(
f"\n{item.name}{qty_str}\n"
f"💰 {price_str}"
)
# 4. 組合並分割訊息 (避免超長)
# Line/Telegram 字數上限約 4000-5000我們用 3800 作為安全值
MAX_LENGTH = 3800
messages = []
current_message = "\n".join(summary_lines)
for line in details_lines:
if len(current_message) + len(line) + 1 > MAX_LENGTH:
messages.append(current_message)
current_message = "" # 開始一則新訊息
if not current_message: # 如果是新訊息的開頭,加上標題
current_message = f"📄 {title} (續)"
current_message += "\n" + line
if current_message:
messages.append(current_message)
return messages
class NotificationManager:
def __init__(self):
self.logger = logging.getLogger("System")
# Line
self.line_token = getattr(config, 'LINE_CHANNEL_ACCESS_TOKEN', None)
# V-Fix: 同時支援 Group ID 與 User ID
line_ids_config = getattr(config, 'LINE_GROUP_ID', [])
if not isinstance(line_ids_config, list):
line_ids_config = [line_ids_config]
user_id_config = getattr(config, 'LINE_USER_ID', None)
if user_id_config:
line_ids_config.append(user_id_config)
self.line_group_ids = []
for gid in line_ids_config:
if gid:
# V-Fix: 改回僅去除前後空白,避免過度清洗導致誤刪有效符號
raw_id = str(gid)
clean_id = raw_id.strip()
# 增加除錯日誌,顯示原始讀取到的 ID
if len(clean_id) != 33:
self.logger.warning(f"[Notification] [Line] ⚠️ LINE ID 長度異常 | Raw: '{raw_id}' | Clean: '{clean_id}' | Len: {len(clean_id)} | Info: 標準 ID (User/Group) 通常為 33 碼")
self.line_group_ids.append(clean_id)
# Telegram
self.telegram_token = getattr(config, 'TELEGRAM_BOT_TOKEN', None)
# 支援 TELEGRAM_CHAT_IDS (列表) 或舊的 TELEGRAM_CHAT_ID (字串)
chat_ids_config = getattr(config, 'TELEGRAM_CHAT_IDS', getattr(config, 'TELEGRAM_CHAT_ID', []))
if not isinstance(chat_ids_config, list):
# 如果是舊的字串格式,將其轉換為只有一個元素的列表
chat_ids_config = [chat_ids_config]
# 過濾掉空的 ID 並確保所有 ID 都是字串
self.telegram_chat_ids = [str(cid).strip() for cid in chat_ids_config if cid]
# Email
self.email_host = getattr(config, 'EMAIL_HOST', None)
self.email_port = getattr(config, 'EMAIL_PORT', 587)
self.email_user = getattr(config, 'EMAIL_HOST_USER', None)
self.email_pass = getattr(config, 'EMAIL_HOST_PASSWORD', None)
self.email_sender = getattr(config, 'EMAIL_SENDER', None)
self.email_receiver = getattr(config, 'EMAIL_RECEIVER', None)
# V-New: 讀取公開網址 (用於圖片連結)
self.public_url = None
try:
url_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'url_config.json')
if os.path.exists(url_config_path):
with open(url_config_path, 'r') as f:
self.public_url = json.load(f).get('public_url')
except: pass
def _send_line_messages(self, messages, image_url=None):
# 檢查 LINE 是否啟用
line_enabled = getattr(config, 'LINE_ENABLED', False)
if not line_enabled:
self.logger.info("[Notification] [Line] ⏸️ LINE 通知已停用 (LINE_ENABLED=false)")
return False
if not self.line_token or not self.line_group_ids:
self.logger.warning("[Notification] [Line] ⚠️ LINE 通知未設定 (Token 或 Group ID 缺失)")
return False
headers = {
"Authorization": f"Bearer {self.line_token}",
"Content-Type": "application/json"
}
success = True
for group_id in self.line_group_ids:
for message in messages:
# V-Fix: 確保不發送空訊息,避免 LINE API 回傳 400 錯誤
if not message or not message.strip():
continue
payload = {
"to": group_id,
"messages": [{"type": "text", "text": message}]
}
# V-New: 如果有圖片且是最後一則訊息,附加圖片
if image_url and message == messages[-1]:
payload['messages'].append({
"type": "image",
"originalContentUrl": image_url,
"previewImageUrl": image_url
})
try:
resp = requests.post("https://api.line.me/v2/bot/message/push", headers=headers, json=payload, timeout=10)
if resp.status_code != 200:
self.logger.error(f"[Notification] [Line] ❌ LINE 通知發送失敗 | Target: '{group_id}' | Len: {len(group_id)} | Code: {resp.status_code} | Body: {resp.text}")
# V-New: 增加針對 ID 格式錯誤的明確建議
if resp.status_code == 400 and "to" in resp.text:
self.logger.error("[Notification] [Line] 💡 診斷建議: LINE API 拒絕了此 ID。您的 ID 長度為 32 碼,但標準格式通常為 33 碼 (U/C + 32位Hex)。請檢查 config.py 是否少複製了一碼。")
success = False
except Exception as e:
self.logger.error(f"[Notification] [Line] ❌ LINE 連線錯誤 | Target: {group_id} | Error: {e}")
success = False
if success: self.logger.info("[Notification] [Line] ✅ LINE 通知發送成功")
return success
def _send_telegram_messages(self, messages, image_path=None):
if not self.telegram_token or not self.telegram_chat_ids:
self.logger.warning("[Notification] [Telegram] ⚠️ Telegram 通知未設定 (Token 或 Chat ID 缺失)")
return False
success = True
for chat_id in self.telegram_chat_ids:
for message in messages:
url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
# Telegram 的 Markdown V2 需要對特殊字元進行轉義
safe_message = re.sub(r'([_*\\~`>#\+\-=|{}.!()\[\]])', r'\\\1', message)
payload = {
'chat_id': chat_id,
'text': safe_message,
'parse_mode': 'MarkdownV2',
'disable_web_page_preview': True
}
try:
resp = requests.post(url, json=payload, timeout=10)
if resp.status_code != 200:
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 通知發送失敗 | ChatID: {chat_id} | Code: {resp.status_code} | Body: {resp.text}")
success = False
# V-New: 發送圖片 (僅在最後一則訊息後發送)
# V-Fix: 改用 sendDocument 以保留高解析度,避免 Telegram 自動壓縮
if image_path and message == messages[-1] and os.path.exists(image_path):
try:
with open(image_path, 'rb') as photo:
# 使用 sendDocument 發送完整解析度截圖(不壓縮)
requests.post(f"https://api.telegram.org/bot{self.telegram_token}/sendDocument",
data={'chat_id': chat_id}, files={'document': photo}, timeout=20)
except Exception as e:
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 圖片發送失敗 | Error: {e}")
except Exception as e:
self.logger.error(f"[Notification] [Telegram] ❌ Telegram 連線錯誤 | ChatID: {chat_id} | Error: {e}")
success = False
if success: self.logger.info("[Notification] [Telegram] ✅ Telegram 通知發送成功")
return success
def _send_email_message(self, subject, markdown_content, image_path=None):
if not all([self.email_host, self.email_user, self.email_pass, self.email_sender, self.email_receiver]):
self.logger.warning("[Notification] [Email] ⚠️ Email 通知未完整設定 (Host/User/Pass/Sender/Receiver 缺失)")
return False
msg = MIMEMultipart()
msg['From'] = self.email_sender
msg['To'] = self.email_receiver
msg['Subject'] = subject
html_body = markdown_content.replace('\n', '<br>')
html_body = re.sub(r'\[(.*?)\]\((.*?)\)', r'<a href="\2">\1</a>', html_body)
html_body = re.sub(r'---', r'<hr>', html_body)
msg.attach(MIMEText(f"<html><body>{html_body}</body></html>", 'html', 'utf-8'))
# V-New: 附加圖片
if image_path and os.path.exists(image_path):
try:
with open(image_path, 'rb') as f:
img_data = f.read()
image = MIMEImage(img_data, name=os.path.basename(image_path))
msg.attach(image)
except Exception as e:
self.logger.error(f"[Notification] [Email] ❌ Email 圖片附加失敗 | Error: {e}")
try:
with smtplib.SMTP(self.email_host, self.email_port) as server:
server.starttls()
server.login(self.email_user, self.email_pass)
server.send_message(msg)
self.logger.info("[Notification] [Email] ✅ Email 通知發送成功")
return True
except Exception as e:
self.logger.error(f"[Notification] [Email] ❌ Email 連線或發送錯誤 | Error: {e}")
return False
def send_edm_report(self, products, screenshot_path=None):
if not products:
self.logger.info("[Notification] [EDM] 📢 無 EDM 商品異動,不發送通知。")
return
# V-New: 標題改為 "更新通知"
title = f"🔥 限時搶購 更新通知 ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
messages = _format_product_changes_message(products, title, self.public_url)
if not messages:
self.logger.info("[Notification] [EDM] 📢 格式化後無訊息內容,不發送通知。")
return
# V-New: 準備圖片公開連結 (給 Line 用)
image_url = None
if screenshot_path and self.public_url:
filename = os.path.basename(screenshot_path)
image_url = f"{self.public_url}/static/screenshots/{filename}"
self.logger.info(f"[Notification] [EDM] 📢 準備發送 EDM 通知至所有頻道...")
self._send_line_messages(messages, image_url)
self._send_telegram_messages(messages, screenshot_path)
email_body = "\n\n".join(messages)
self._send_email_message(title, email_body, screenshot_path)
def send_momo_report(self, stats, screenshot_path=None):
"""發送商品看板異動通知"""
if not stats or (stats.get('up', 0) == 0 and stats.get('down', 0) == 0 and stats.get('new', 0) == 0 and stats.get('delisted', 0) == 0):
self.logger.info("[Notification] [MOMO] 📢 無商品異動,不發送通知。")
return
title = f"🛍️ 商品看板 更新通知 ({datetime.now().strftime('%Y-%m-%d %H:%M')})"
dashboard_url = self.public_url if self.public_url else "https://mo.wooo.work"
summary_lines = [
f"📊 {title}",
f"詳細數據請參閱儀表板: {dashboard_url}",
"="*25,
"📈 本日異動摘要:"
]
if stats.get("up", 0) > 0:
summary_lines.append(f" - ▲ 漲價商品: {stats['up']}")
if stats.get("down", 0) > 0:
summary_lines.append(f" - ▼ 跌價商品: {stats['down']}")
if stats.get("new", 0) > 0:
summary_lines.append(f" - 🟢 新上架: {stats['new']}")
if stats.get("delisted", 0) > 0:
summary_lines.append(f" - 🗑️ 已下架: {stats['delisted']}")
message = "\n".join(summary_lines)
messages = [message]
image_url = None
if screenshot_path and self.public_url:
filename = os.path.basename(screenshot_path)
image_url = f"{self.public_url}/static/screenshots/{filename}"
self.logger.info(f"[Notification] [MOMO] 📢 準備發送商品看板通知至所有頻道...")
self._send_line_messages(messages, image_url)
self._send_telegram_messages(messages, screenshot_path)
email_body = "\n\n".join(messages)
self._send_email_message(title, email_body, screenshot_path)
def send_daily_report(self):
msg = f"📊 每日監控報表 ({datetime.now().strftime('%Y-%m-%d')})\n系統運作正常。"
self._send_line_messages([msg])
self._send_telegram_messages([msg])
self._send_email_message(f"📊 每日監控報表 ({datetime.now().strftime('%Y-%m-%d')})", msg)
def send_whitepage_alert(self, url, error_msg):
"""
發送網頁白頁/異常告警
Args:
url: 檢測的網址
error_msg: 錯誤訊息
"""
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg = (
f"🚨 網頁異常告警 ({now_str})\n"
f"{'='*30}\n"
f"❌ 檢測狀態:異常\n"
f"🌐 檢測網址:{url}\n"
f"📌 錯誤訊息:{error_msg}\n"
f"{'='*30}\n"
f"請立即檢查服務狀態!"
)
self.logger.warning(f"[Notification] [Whitepage] 🚨 準備發送白頁告警 | URL: {url} | Error: {error_msg}")
self._send_line_messages([msg])
self._send_telegram_messages([msg])
self._send_email_message(f"🚨 網頁異常告警 - {url}", msg)