Files
ewoooc/services/edm_notifier.py
OoO 477aab3f6f
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
refactor(telegram): migrate edm_notifier text path to EventRouter (ADR-019 Phase 5)
services/edm_notifier.py 的 _send_telegram() 處理 EDM 媒體告警,原本 if/else
分流 sendPhoto / sendMessage。

行為變化:
- 純文字分支(無 image_path):改走 services.event_router.dispatch_sync()
  event_type=edm_media_alert, severity=warning
- 含圖片分支(sendPhoto with multipart file upload):依 ADR-019 任務指示
  保留直連 Telegram API(EventRouter 不支援 file upload,列為 known skip)
- caller 行為不變,失敗仍 logger.error 不阻斷主線

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 13:09:34 +08:00

134 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import logging
import requests
from datetime import datetime, timedelta, timezone
import config
# 設定台北時區
TAIPEI_TZ = timezone(timedelta(hours=8))
class EdmNotifier:
def __init__(self):
self.logger = logging.getLogger("EdmNotifier")
self.logger.info("[EdmNotifier] 🔄 通知模組已載入 (v2026.01.09-URL-Fix)")
self.public_url = self._get_public_url()
def _get_public_url(self):
# V-New: 優先讀取 config.py 中的 PUBLIC_URL (若有設定)
if getattr(config, 'PUBLIC_URL', None):
return config.PUBLIC_URL
try:
url_config_path = os.path.join(config.DATA_DIR, 'url_config.json')
if os.path.exists(url_config_path):
with open(url_config_path, 'r') as f:
data = json.load(f)
return data.get('public_url', 'https://mo.wooo.work')
except Exception as e:
self.logger.error(f"讀取 Public URL 失敗: {e}")
return 'https://mo.wooo.work'
def send_edm_report(self, products, screenshot_path=None):
if not products:
return
# 1. 計算統計數據
new_count = sum(1 for p in products if p.status_change == 'NEW')
update_count = sum(1 for p in products if p.status_change in ['UPDATE', 'PRICE_UP', 'PRICE_DOWN'])
delisted_count = sum(1 for p in products if p.status_change in ['DELISTED', 'SLOT_END'])
now_str = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M')
edm_url = f"{self.public_url}/edm"
# 2. 組合訊息內容 (符合指定格式)
message = (
f"📊 🔥 限時搶購 更新通知 ({now_str})\n"
f"完整內容請看:\n"
f"==================================\n"
f"{edm_url}\n"
f"==================================\n\n"
f"✨ 本輪重點摘要:\n"
f" - 🟢 新增商品: {new_count}\n"
f" - 資訊更新: {update_count}\n"
f" - 🗑️ 商品下架: {delisted_count}"
)
# 3. 發送 Line 通知 (需檢查 LINE_ENABLED)
line_enabled = getattr(config, 'LINE_ENABLED', False)
if line_enabled and getattr(config, 'LINE_CHANNEL_ACCESS_TOKEN', None) and getattr(config, 'LINE_GROUP_ID', None):
self._send_line(message, screenshot_path)
elif not line_enabled:
self.logger.info("[EdmNotifier] ⏸️ LINE 通知已停用 (LINE_ENABLED=false)")
# 4. 發送 Telegram 通知
if getattr(config, 'TELEGRAM_BOT_TOKEN', None) and getattr(config, 'TELEGRAM_CHAT_IDS', None):
self._send_telegram(message, screenshot_path)
def _send_line(self, text, image_path):
headers = {
"Authorization": f"Bearer {config.LINE_CHANNEL_ACCESS_TOKEN}",
"Content-Type": "application/json"
}
url = "https://api.line.me/v2/bot/message/push"
messages = [{"type": "text", "text": text}]
# 若有截圖且 URL 為 HTTPS (ngrok),則嘗試發送圖片
if image_path and os.path.exists(image_path):
filename = os.path.basename(image_path)
image_url = f"{self.public_url}/static/screenshots/{filename}"
# Line API 要求圖片 URL 必須是 HTTPS
if image_url.startswith("https"):
messages.append({
"type": "image",
"originalContentUrl": image_url,
"previewImageUrl": image_url
})
payload = {"to": config.LINE_GROUP_ID, "messages": messages}
try:
requests.post(url, headers=headers, json=payload, timeout=10)
except Exception as e:
self.logger.error(f"Line 發送失敗: {e}")
def _send_telegram(self, text, image_path):
"""
EDM / 媒體告警 Telegram 出口
ADR-019 Phase 5
- 純文字(無 image_path走 EventRouter dispatch_sync 統一入口
- 含圖片sendPhoto with file uploadEventRouter 不支援 multipart
file upload保留直連 Telegram APIADR-019 任務指示中明列為 skip 類別)
"""
bot_token = config.TELEGRAM_BOT_TOKEN
chat_ids = config.TELEGRAM_CHAT_IDS
# 純文字分支:走 EventRouter
if not (image_path and os.path.exists(image_path)):
try:
from services.event_router import dispatch_sync
dispatch_sync(event={
"event_type": "edm_media_alert",
"severity": "warning",
"source": "EDMNotifier",
"title": "EDM 媒體告警",
"summary": text[:400],
"status": "media_alert",
"payload": {"raw_message": text},
}, admin_chat_ids=list(chat_ids) if chat_ids else None)
except Exception as e:
self.logger.error(f"Telegram 發送失敗 (EventRouter): {e}")
return
# 含圖片分支:保留 sendPhoto multipart uploadEventRouter 不支援檔案)
for chat_id in chat_ids:
try:
url = f"https://api.telegram.org/bot{bot_token}/sendPhoto"
with open(image_path, 'rb') as f:
data = {'chat_id': chat_id, 'caption': text}
files = {'photo': f}
requests.post(url, data=data, files=files, timeout=20)
except Exception as e:
self.logger.error(f"Telegram 發送失敗 (ChatID: {chat_id}): {e}")