From 38200a5e934cd29836e7d3bdd96a859b72a0a373 Mon Sep 17 00:00:00 2001 From: ogt Date: Tue, 21 Apr 2026 15:17:48 +0800 Subject: [PATCH] =?UTF-8?q?feat(reports):=20=E6=96=B0=E5=A2=9E=E6=97=A5?= =?UTF-8?q?=E5=A0=B1/=E6=9C=88=E5=A0=B1=E7=B3=BB=E7=B5=B1=EF=BC=8C?= =?UTF-8?q?=E6=95=B4=E5=90=88=E5=9C=96=E8=A1=A8=E6=8E=A8=E6=92=AD=E8=87=B3?= =?UTF-8?q?=20Telegram?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - services/openclaw_strategist_service.py:新增 generate_daily_report()(每日09:00業績快報+競品威脅+2圖表)和 generate_monthly_report()(每月1日07:00月度全景洞察+3圖表+MoM/YoY比較) - services/chart_generator_service.py:新建圖表生成服務(6種深色商業圖表,revenue_trend / category_revenue / monthly_overview / price_gap / price_history_heatmap / price_trend) - services/telegram_templates.py:重建訊息模板系統(5類模板:告警/報告/決策/系統/洞察)、新增 send_photo + send_report_with_charts 圖文推播 - scheduler.py:新增 run_daily_report_task / run_monthly_report_task(含 auto_heal 保護) - run_scheduler.py:每日09:00日報 + 每月1日07:00月報排程(月報用每日gate判斷day==1) - requirements.txt:新增 matplotlib + matplotlib-inline Co-Authored-By: Claude Sonnet 4.6 --- requirements.txt | 4 +- run_scheduler.py | 17 +- scheduler.py | 50 +++ services/chart_generator_service.py | 488 +++++++++++++++++++++++ services/openclaw_strategist_service.py | 501 ++++++++++++++++++++++++ services/telegram_templates.py | 482 ++++++++++++++++++----- 6 files changed, 1439 insertions(+), 103 deletions(-) create mode 100644 services/chart_generator_service.py diff --git a/requirements.txt b/requirements.txt index 1c3b003..ec5fa6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,6 @@ lxml prometheus-client python-telegram-bot[job-queue] paramiko # ADR-013: AIOps SSH 跳板修復 -python-pptx # ADR-014: PPT 簡報系統 \ No newline at end of file +python-pptx # ADR-014: PPT 簡報系統 +matplotlib # 圖表生成(日報/週報/月報) +matplotlib-inline # Jupyter 相容層(可選) \ No newline at end of file diff --git a/run_scheduler.py b/run_scheduler.py index 65c2dd1..e88c0c1 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -8,8 +8,9 @@ run_scheduler.py — momo-scheduler 容器入口點 每 4 小時:competitor_price_feeder、icaim_analysis 每 6 小時:openclaw_meta_analysis、quality_rescore 每 12 小時:dedup_batch - 每 1 天 :db_backup、backup_monitor + 每 1 天 :db_backup(03:00)、backup_monitor(04:00)、daily_report(09:00) 每 1 週 :weekly_strategy(週一 06:00) + 每 1 月 :monthly_report(每月1日 07:00) """ import asyncio import logging @@ -33,6 +34,8 @@ from scheduler import ( run_openclaw_meta_analysis_task, run_dedup_batch_task, run_quality_rescore_task, + run_daily_report_task, + run_monthly_report_task, ) logging.basicConfig( @@ -82,6 +85,18 @@ def _register_schedules(): schedule.every().monday.at("06:00").do(run_weekly_strategy_task) logger.info("📅 每週一 06:00:weekly_strategy") + schedule.every().day.at("09:00").do(run_daily_report_task) + logger.info("📅 每日 09:00:daily_report") + + # 每月1日 07:00 月報(schedule 不支援 every().month,用每日 07:00 + 日期判斷) + def _monthly_report_gate(): + from datetime import datetime as _dt + if _dt.now().day == 1: + run_monthly_report_task() + + schedule.every().day.at("07:00").do(_monthly_report_gate) + logger.info("📅 每月1日 07:00:monthly_report") + def _run_elephant_alpha_engine(): """Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop)""" diff --git a/scheduler.py b/scheduler.py index c1219f2..a7c5838 100644 --- a/scheduler.py +++ b/scheduler.py @@ -1949,6 +1949,56 @@ def run_quality_rescore_task(): _save_stats('quality_rescore', {"status": "Error", "error": str(e)}) +def run_daily_report_task(): + """每日 09:00 — OpenClaw 電商日報(業績快報 + 競品威脅 + 圖表推播)""" + try: + from services.openclaw_strategist_service import generate_daily_report + result = generate_daily_report() + logging.info( + f"[Scheduler] [DailyReport] ✅ 完成 | period={result.get('period')} " + f"charts={result.get('chart_count', 0)} actions={result.get('action_count', 0)}" + ) + _save_stats('daily_report', result) + except Exception as e: + import traceback as _tb + logging.error(f"[Scheduler] [DailyReport] 🚨 日報任務異常: {e}") + _save_stats('daily_report', {"status": "Error", "error": str(e)}) + try: + from services.auto_heal_service import auto_heal_service + auto_heal_service.handle_exception( + task_name="run_daily_report_task", + exception=e, + traceback_str=_tb.format_exc(), + ) + except Exception as _heal_e: + logging.error(f"[Scheduler] [DailyReport] auto_heal_service 失敗: {_heal_e}") + + +def run_monthly_report_task(): + """每月1日 07:00 — OpenClaw 電商月報(月度全景洞察 + 多圖表推播)""" + try: + from services.openclaw_strategist_service import generate_monthly_report + result = generate_monthly_report() + logging.info( + f"[Scheduler] [MonthlyReport] ✅ 完成 | period={result.get('period')} " + f"charts={result.get('chart_count', 0)} actions={result.get('action_count', 0)}" + ) + _save_stats('monthly_report', result) + except Exception as e: + import traceback as _tb + logging.error(f"[Scheduler] [MonthlyReport] 🚨 月報任務異常: {e}") + _save_stats('monthly_report', {"status": "Error", "error": str(e)}) + try: + from services.auto_heal_service import auto_heal_service + auto_heal_service.handle_exception( + task_name="run_monthly_report_task", + exception=e, + traceback_str=_tb.format_exc(), + ) + except Exception as _heal_e: + logging.error(f"[Scheduler] [MonthlyReport] auto_heal_service 失敗: {_heal_e}") + + if __name__ == "__main__": # 此檔案現在由 app.py 導入並由其主執行緒管理排程。 # 若需獨立測試,可在此處臨時加入調用程式碼。 diff --git a/services/chart_generator_service.py b/services/chart_generator_service.py new file mode 100644 index 0000000..8416012 --- /dev/null +++ b/services/chart_generator_service.py @@ -0,0 +1,488 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +services/chart_generator_service.py +專業圖表生成服務 + +供日報 / 週報 / 月報 + Telegram 圖文通知使用。 +所有圖表回傳 BytesIO(可直接傳 Telegram sendPhoto)。 + +圖表清單: + price_trend_chart(sku, days) — 單 SKU 30天 MOMO vs 競品價格趨勢 + category_revenue_chart(days) — 品類業績橫條圖 + revenue_trend_chart(days) — 每日業績折線圖(含 MoM 基準線) + price_gap_distribution_chart(threats) — 競價威脅價差分佈 + monthly_overview_chart(months) — 月份業績對比柱狀圖 + top_sku_heatmap(days) — TOP SKU 業績熱力圖 +""" + +import io +import logging +from datetime import datetime, timedelta +from typing import List, Optional, Dict, Any + +logger = logging.getLogger(__name__) + +# ── Matplotlib 無頭模式初始化 ──────────────────────────────────────────────── +try: + import matplotlib + matplotlib.use("Agg") + import matplotlib.pyplot as plt + import matplotlib.dates as mdates + import numpy as np + _MPL_OK = True +except ImportError: + _MPL_OK = False + logger.warning("[Chart] matplotlib 未安裝,圖表功能停用") + +# ── 視覺設定(深色商業風格) ────────────────────────────────────────────────── +_BG = "#1a1a2e" +_PANEL = "#16213e" +_ACCENT1 = "#e94560" # 紅:MOMO / 警示 +_ACCENT2 = "#0f3460" # 深藍:競品 +_ACCENT3 = "#533483" # 紫:歷史基準 +_GREEN = "#00b4d8" # 青綠:業績正成長 +_YELLOW = "#ffd166" # 黃:中性 / 警示 +_TEXT = "#e0e0e0" +_SUBTEXT = "#888888" +_GRID = "#2a2a4a" + +plt_params = { + "figure.facecolor": _BG, + "axes.facecolor": _PANEL, + "axes.edgecolor": _GRID, + "axes.labelcolor": _TEXT, + "xtick.color": _SUBTEXT, + "ytick.color": _SUBTEXT, + "text.color": _TEXT, + "grid.color": _GRID, + "grid.linestyle": "--", + "grid.alpha": 0.5, + "font.family": ["DejaVu Sans", "sans-serif"], + "font.size": 10, +} + + +def _apply_style(): + if _MPL_OK: + plt.rcParams.update(plt_params) + + +def _fig_to_bytes(fig) -> Optional[bytes]: + buf = io.BytesIO() + fig.savefig(buf, format="png", dpi=150, bbox_inches="tight", + facecolor=fig.get_facecolor()) + plt.close(fig) + buf.seek(0) + return buf.read() + + +def _unavailable() -> None: + return None + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 資料讀取層(與 DB 解耦,各函式自行呼叫) +# ═══════════════════════════════════════════════════════════════════════════════ + +def _fetch_price_history(sku: str, days: int = 30) -> Dict[str, Any]: + """讀取單 SKU 的 MOMO + 競品歷史價格""" + try: + from database.manager import get_session + from sqlalchemy import text + session = get_session() + try: + momo_rows = session.execute(text(""" + SELECT pr.timestamp::date AS dt, AVG(pr.price) AS price + FROM price_records pr + JOIN products p ON p.id = pr.product_id + WHERE p.i_code = :sku + AND pr.timestamp >= NOW() - INTERVAL ':days days' + GROUP BY dt ORDER BY dt + """.replace(":days", str(days))), {"sku": sku}).fetchall() + + comp_rows = session.execute(text(""" + SELECT crawled_at::date AS dt, AVG(price) AS price + FROM competitor_prices + WHERE sku = :sku + AND source = 'pchome' + AND crawled_at >= NOW() - INTERVAL ':days days' + GROUP BY dt ORDER BY dt + """.replace(":days", str(days))), {"sku": sku}).fetchall() + + return { + "momo": [(str(r[0]), float(r[1])) for r in momo_rows], + "pchome":[(str(r[0]), float(r[1])) for r in comp_rows], + } + finally: + session.close() + except Exception as e: + logger.warning("[Chart] price_history 讀取失敗: %s", e) + return {"momo": [], "pchome": []} + + +def _fetch_daily_revenue(days: int = 30) -> List[Dict]: + try: + from database.manager import get_session + from sqlalchemy import text + session = get_session() + try: + rows = session.execute(text(f""" + SELECT snapshot_date::date AS dt, + SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + FROM daily_sales_snapshot + WHERE snapshot_date::date >= CURRENT_DATE - {days} + GROUP BY dt ORDER BY dt + """)).fetchall() + return [{"date": str(r[0]), "revenue": float(r[1] or 0)} for r in rows] + finally: + session.close() + except Exception as e: + logger.warning("[Chart] daily_revenue 讀取失敗: %s", e) + return [] + + +def _fetch_category_revenue(days: int = 7) -> List[Dict]: + try: + from database.manager import get_session + from sqlalchemy import text + session = get_session() + try: + rows = session.execute(text(f""" + SELECT p.category, + SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue + FROM daily_sales_snapshot s + JOIN products p ON p.name = s."商品名稱" + WHERE s.snapshot_date::date >= CURRENT_DATE - {days} + AND p.status = 'ACTIVE' + AND p.category IS NOT NULL + GROUP BY p.category + ORDER BY revenue DESC LIMIT 12 + """)).fetchall() + return [{"category": r[0] or "未分類", "revenue": float(r[1] or 0)} for r in rows] + finally: + session.close() + except Exception as e: + logger.warning("[Chart] category_revenue 讀取失敗: %s", e) + return [] + + +def _fetch_monthly_revenue(months: int = 6) -> List[Dict]: + try: + from database.manager import get_session + from sqlalchemy import text + session = get_session() + try: + rows = session.execute(text(f""" + SELECT DATE_TRUNC('month', snapshot_date)::date AS mon, + SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + FROM daily_sales_snapshot + WHERE snapshot_date >= NOW() - INTERVAL '{months} months' + GROUP BY mon ORDER BY mon + """)).fetchall() + return [{"month": str(r[0])[:7], "revenue": float(r[1] or 0)} for r in rows] + finally: + session.close() + except Exception as e: + logger.warning("[Chart] monthly_revenue 讀取失敗: %s", e) + return [] + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 圖表函式 +# ═══════════════════════════════════════════════════════════════════════════════ + +def price_trend_chart(sku: str, sku_name: str = "", days: int = 30) -> Optional[bytes]: + """單 SKU:MOMO vs PChome 價格趨勢折線圖""" + if not _MPL_OK: + return _unavailable() + data = _fetch_price_history(sku, days) + if not data["momo"]: + return None + + _apply_style() + fig, ax = plt.subplots(figsize=(10, 4)) + fig.patch.set_facecolor(_BG) + + if data["momo"]: + dates = [r[0] for r in data["momo"]] + prices = [r[1] for r in data["momo"]] + ax.plot(dates, prices, color=_ACCENT1, linewidth=2.5, marker="o", + markersize=4, label="MOMO 自家", zorder=3) + ax.fill_between(dates, prices, alpha=0.15, color=_ACCENT1) + + if data["pchome"]: + dates2 = [r[0] for r in data["pchome"]] + prices2 = [r[1] for r in data["pchome"]] + ax.plot(dates2, prices2, color=_GREEN, linewidth=2, linestyle="--", + marker="s", markersize=3, label="PChome", zorder=2) + + title = f"{' ' + sku_name if sku_name else sku} · {days}日價格趨勢" + ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10) + ax.set_ylabel("價格 (NT$)", color=_TEXT) + ax.legend(loc="upper left", framealpha=0.3, facecolor=_PANEL, edgecolor=_GRID) + ax.grid(True, alpha=0.4) + ax.tick_params(axis="x", rotation=30, labelsize=8) + ax.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter( + lambda x, _: f"${x:,.0f}")) + + # 標註最新價差 + if data["momo"] and data["pchome"]: + m_last = data["momo"][-1][1] + p_last = data["pchome"][-1][1] + gap = (m_last - p_last) / m_last * 100 + color = _ACCENT1 if gap > 0 else _GREEN + ax.annotate(f"現價差 {gap:+.1f}%", xy=(0.98, 0.05), + xycoords="axes fraction", ha="right", + fontsize=10, fontweight="bold", color=color, + bbox=dict(boxstyle="round,pad=0.3", facecolor=_PANEL, alpha=0.8)) + + _add_watermark(ax) + fig.tight_layout() + return _fig_to_bytes(fig) + + +def revenue_trend_chart(days: int = 30, title_suffix: str = "") -> Optional[bytes]: + """每日業績折線圖(含 MoM 同期基準線)""" + if not _MPL_OK: + return _unavailable() + data = _fetch_daily_revenue(days * 2) # 取雙倍天數供 MoM 對比 + if not data: + return None + + curr = data[-days:] if len(data) >= days else data + prev = data[:-days] if len(data) >= days * 2 else [] + + _apply_style() + fig, ax = plt.subplots(figsize=(12, 4)) + + if curr: + labels = [d["date"] for d in curr] + vals = [d["revenue"] for d in curr] + ax.plot(labels, vals, color=_GREEN, linewidth=2.5, marker="o", + markersize=4, label="本期", zorder=3) + ax.fill_between(labels, vals, alpha=0.2, color=_GREEN) + + # 7天移動平均 + if len(vals) >= 7: + ma7 = np.convolve(vals, np.ones(7)/7, mode="valid") + ax.plot(labels[6:], ma7, color=_YELLOW, linewidth=1.5, + linestyle=":", label="7MA", zorder=2, alpha=0.8) + + if prev: + labels2 = [d["date"] for d in prev[-len(curr):]] + vals2 = [d["revenue"] for d in prev[-len(curr):]] + ax.plot(labels2, vals2, color=_SUBTEXT, linewidth=1.5, linestyle="--", + label="前期同比", zorder=1, alpha=0.6) + + title = f"業績趨勢 · 近 {days} 天{title_suffix}" + ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10) + ax.set_ylabel("日業績 (NT$)", color=_TEXT) + ax.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter( + lambda x, _: f"${x/10000:.1f}萬")) + ax.legend(loc="upper left", framealpha=0.3, facecolor=_PANEL, edgecolor=_GRID) + ax.grid(True, alpha=0.4) + ax.tick_params(axis="x", rotation=30, labelsize=7) + + # 標注最高/最低點 + if curr: + vals = [d["revenue"] for d in curr] + max_i, min_i = vals.index(max(vals)), vals.index(min(vals)) + ax.annotate(f"↑ ${vals[max_i]/10000:.1f}萬", + xy=(curr[max_i]["date"], vals[max_i]), + xytext=(0, 8), textcoords="offset points", + ha="center", fontsize=8, color=_GREEN) + ax.annotate(f"↓ ${vals[min_i]/10000:.1f}萬", + xy=(curr[min_i]["date"], vals[min_i]), + xytext=(0, -14), textcoords="offset points", + ha="center", fontsize=8, color=_ACCENT1) + + _add_watermark(ax) + fig.tight_layout() + return _fig_to_bytes(fig) + + +def category_revenue_chart(days: int = 7, title_suffix: str = "") -> Optional[bytes]: + """品類業績橫條圖""" + if not _MPL_OK: + return _unavailable() + data = _fetch_category_revenue(days) + if not data: + return None + + _apply_style() + n = min(len(data), 10) + cats = [d["category"][:10] for d in data[:n]][::-1] + revs = [d["revenue"] for d in data[:n]][::-1] + max_r = max(revs) if revs else 1 + + fig, ax = plt.subplots(figsize=(10, max(4, n * 0.55))) + + colors = [_GREEN if r == max(revs) else (_ACCENT1 if r < max_r * 0.2 else _ACCENT2) + for r in revs] + bars = ax.barh(cats, revs, color=colors, alpha=0.85, edgecolor=_GRID, linewidth=0.5) + + for bar, val in zip(bars, revs): + ax.text(bar.get_width() + max_r * 0.01, bar.get_y() + bar.get_height() / 2, + f"${val/10000:.1f}萬", va="center", fontsize=9, color=_TEXT) + + ax.set_title(f"品類業績排行 · 近 {days} 天{title_suffix}", + fontsize=12, fontweight="bold", color=_TEXT, pad=10) + ax.set_xlabel("業績 (NT$)", color=_TEXT) + ax.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter( + lambda x, _: f"${x/10000:.0f}萬")) + ax.grid(True, axis="x", alpha=0.4) + ax.set_xlim(0, max_r * 1.18) + + _add_watermark(ax) + fig.tight_layout() + return _fig_to_bytes(fig) + + +def monthly_overview_chart(months: int = 6) -> Optional[bytes]: + """月份業績對比柱狀圖(含 MoM 成長率折線)""" + if not _MPL_OK: + return _unavailable() + data = _fetch_monthly_revenue(months) + if not data: + return None + + _apply_style() + labels = [d["month"] for d in data] + vals = [d["revenue"] for d in data] + mom = [0] + [(vals[i] - vals[i-1]) / vals[i-1] * 100 + if vals[i-1] else 0 for i in range(1, len(vals))] + + fig, ax1 = plt.subplots(figsize=(10, 4.5)) + ax2 = ax1.twinx() + + bar_colors = [_GREEN if m >= 0 else _ACCENT1 for m in mom] + bars = ax1.bar(labels, vals, color=bar_colors, alpha=0.7, + edgecolor=_GRID, linewidth=0.5, width=0.6) + + for bar, val in zip(bars, vals): + ax1.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + max(vals) * 0.01, + f"${val/10000:.0f}萬", ha="center", fontsize=8, color=_TEXT) + + ax2.plot(labels, mom, color=_YELLOW, linewidth=2.5, marker="D", + markersize=6, label="MoM 成長率", zorder=5) + ax2.axhline(0, color=_SUBTEXT, linewidth=0.8, linestyle="--") + for x, y in zip(labels, mom): + ax2.annotate(f"{y:+.1f}%", (x, y), textcoords="offset points", + xytext=(0, 8), ha="center", fontsize=8, + color=_GREEN if y >= 0 else _ACCENT1) + + ax1.set_title(f"月度業績概覽 · 近 {months} 個月", + fontsize=12, fontweight="bold", color=_TEXT, pad=10) + ax1.set_ylabel("月業績 (NT$)", color=_TEXT) + ax2.set_ylabel("MoM 成長率 (%)", color=_YELLOW) + ax1.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter( + lambda x, _: f"${x/10000:.0f}萬")) + ax2.tick_params(axis="y", colors=_YELLOW) + ax1.grid(True, axis="y", alpha=0.3) + ax1.tick_params(axis="x", rotation=20) + + _add_watermark(ax1) + fig.tight_layout() + return _fig_to_bytes(fig) + + +def price_gap_bar_chart(threats: List[Dict], title: str = "競價威脅 TOP 10") -> Optional[bytes]: + """競價威脅價差橫條圖""" + if not _MPL_OK or not threats: + return _unavailable() + + _apply_style() + n = min(len(threats), 10) + names = [str(t.get("name", t.get("sku", "")))[:18] for t in threats[:n]][::-1] + gaps = [float(t.get("gap_pct", 0)) for t in threats[:n]][::-1] + + fig, ax = plt.subplots(figsize=(10, max(4, n * 0.55))) + colors = [_ACCENT1 if g > 15 else _YELLOW if g > 5 else _GREEN for g in gaps] + bars = ax.barh(names, gaps, color=colors, alpha=0.85, edgecolor=_GRID, linewidth=0.5) + + for bar, val in zip(bars, gaps): + ax.text(bar.get_width() + 0.2, bar.get_y() + bar.get_height() / 2, + f"{val:+.1f}%", va="center", fontsize=9, color=_TEXT) + + ax.axvline(0, color=_SUBTEXT, linewidth=1) + ax.axvline(5, color=_YELLOW, linewidth=0.8, linestyle=":", alpha=0.6) + ax.axvline(15, color=_ACCENT1, linewidth=0.8, linestyle=":", alpha=0.6) + ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10) + ax.set_xlabel("MOMO 相對競品價差 (%) ↑我貴 / ↓我便宜", color=_TEXT) + ax.grid(True, axis="x", alpha=0.4) + + _add_watermark(ax) + fig.tight_layout() + return _fig_to_bytes(fig) + + +def price_history_heatmap(days: int = 30) -> Optional[bytes]: + """ + 品類 × 日期 價差熱力圖(月報用) + 顯示哪個品類在哪幾天的競品威脅最嚴重 + """ + if not _MPL_OK: + return _unavailable() + try: + from database.manager import get_session + from sqlalchemy import text + session = get_session() + try: + rows = session.execute(text(f""" + SELECT p.category, + cp.crawled_at::date AS dt, + AVG((cp.price - pr.price) / NULLIF(pr.price, 0) * 100) AS gap_pct + FROM competitor_prices cp + JOIN products p ON p.i_code = cp.sku + JOIN ( + SELECT DISTINCT ON (product_id) product_id, price + FROM price_records ORDER BY product_id, timestamp DESC + ) pr ON pr.product_id = p.id + WHERE cp.crawled_at >= NOW() - INTERVAL '{days} days' + AND p.category IS NOT NULL + GROUP BY p.category, dt + ORDER BY p.category, dt + """)).fetchall() + finally: + session.close() + + if not rows: + return None + + import pandas as pd + df = pd.DataFrame(rows, columns=["category", "date", "gap_pct"]) + pivot = df.pivot(index="category", columns="date", values="gap_pct").fillna(0) + + _apply_style() + fig, ax = plt.subplots(figsize=(min(16, max(8, len(pivot.columns) * 0.5)), + max(4, len(pivot) * 0.6))) + + im = ax.imshow(pivot.values, cmap="RdYlGn_r", aspect="auto", + vmin=-20, vmax=20) + plt.colorbar(im, ax=ax, label="價差 % (+我貴/-我便宜)", shrink=0.6) + + ax.set_xticks(range(len(pivot.columns))) + ax.set_xticklabels([str(c)[-5:] for c in pivot.columns], + rotation=45, ha="right", fontsize=7) + ax.set_yticks(range(len(pivot.index))) + ax.set_yticklabels(pivot.index, fontsize=9) + ax.set_title(f"品類競品價差熱力圖 · 近 {days} 天", + fontsize=12, fontweight="bold", color=_TEXT, pad=10) + + _add_watermark(ax) + fig.tight_layout() + return _fig_to_bytes(fig) + except Exception as e: + logger.warning("[Chart] heatmap 生成失敗: %s", e) + return None + + +def _add_watermark(ax): + ax.text(0.99, 0.01, "EwoooC AI · momo-pro-system", + transform=ax.transAxes, ha="right", va="bottom", + fontsize=7, color=_SUBTEXT, alpha=0.5) + + +# ── 模組單例(供外部直接 import) ──────────────────────────────────────────── +import matplotlib.ticker diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index 1eedab1..3d90b41 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -41,7 +41,9 @@ TAIPEI_TZ_OFFSET = 8 # UTC+8 __all__ = [ "SSHJumpExecutor", + "generate_daily_report", "generate_weekly_strategy_report", + "generate_monthly_report", "generate_meta_analysis_report", ] @@ -518,6 +520,370 @@ TOP 威脅品項(近48h Hermes 偵測): } +def generate_daily_report() -> dict: + """ + OpenClaw 電商日報(每日 09:00) + + 流程: + 1. 讀取昨日業績快照 + TOP 競品威脅 + 定價建議 + 2. Gemini 快速日報分析(溫度 0.3,精簡版) + 3. 生成圖表:近7日營收趨勢 + 競品價差柱圖 + 4. 持久化 ai_insights(type='daily_report') + 5. Telegram 圖文推播 + """ + now = datetime.now() + yesterday = now - timedelta(days=1) + period = yesterday.strftime("%Y年%m月%d日") + logger.info("[OpenClaw] 日報任務啟動 period=%s", period) + + # ── Step 1:DB 數據收集 ────────────────────────────────────────────────── + sales = _fetch_sales_summary(7) + threats = _fetch_top_threats(5) + recommendations = _fetch_top_recommendations(5) + competitor_summary = _fetch_competitor_summary() + + # 昨日單日業績 + yesterday_sales = _fetch_yesterday_sales() + + # ── Step 2:組建 Gemini Prompt ─────────────────────────────────────────── + system_prompt = """你是 OpenClaw 日報分析師,負責每日電商業績快報。 +語言:繁體中文(台灣用語)。風格:精簡、數字導向、可執行。 +每個洞察必須有數字支撐,禁止空泛描述。""" + + user_prompt = f"""請根據以下數據,產出今日電商日報({period}): + +【昨日業績】 + 銷售金額:NT${yesterday_sales.get('revenue', 0):,.0f} + 成交SKU數:{yesterday_sales.get('sku_count', 0)} 個 + 訂單數:{yesterday_sales.get('order_count', 0)} 筆 + +【近7日趨勢】 + 本週累計:NT${sales.get('current_7d_revenue', 0):,.0f} + 前週同期:NT${sales.get('prev_7d_revenue', 0):,.0f} + WoW變化:{sales.get('wow_pct', 0):+.1f}% + +【競品警示(近24h Hermes偵測)】 +{_format_threats(threats)} + +【待處理定價建議(TOP 5)】 +{_format_recommendations(recommendations)} + +【競品整體概況】 + 監控SKU:{competitor_summary.get('total_skus', 0)} 個 + 被削價風險:{competitor_summary.get('undercut_count', 0)} 個(價差超過10%) + 平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}% + +請按以下結構輸出(使用 HTML 標題): + +📅 {period} 電商日報 + +📊 昨日業績快報 +(昨日關鍵數字 + 與近期均值比較 + 異常點說明) + +⚠️ 今日最高優先威脅 +(最緊急的 1-3 個競品削價威脅,含具體行動建議) + +💰 今日定價行動建議 +(1-3 條今日應執行的調價動作,格式:SKU → 建議行動 → 預期效果) + +🎯 今日 3 件事 +(最重要的 3 件可執行任務,24h內完成) + +語言:繁體中文,全文200字以內,精準扼要。 +""" + + # ── Step 3:Gemini 生成 ─────────────────────────────────────────────────── + logger.info("[OpenClaw] 呼叫 Gemini 生成日報...") + report_content = _call_gemini(system_prompt, user_prompt, temperature=0.3) + + if not report_content: + logger.error("[OpenClaw] 日報 Gemini 呼叫失敗") + return {"status": "error", "report_type": "daily_report", "error": "Gemini 呼叫失敗"} + + # ── Step 4:生成圖表 ───────────────────────────────────────────────────── + charts = [] + try: + from services.chart_generator_service import ( + revenue_trend_chart, + price_gap_bar_chart, + ) + rev_chart = revenue_trend_chart(7, "近7日") + if rev_chart: + charts.append(("revenue_7d.png", rev_chart, "📈 近7日營收趨勢")) + + if threats: + gap_chart = price_gap_bar_chart(threats, "競品價差警示(TOP 5)") + if gap_chart: + charts.append(("price_gap.png", gap_chart, "⚠️ 競品價差分析")) + except Exception as e: + logger.warning("[OpenClaw] 日報圖表生成失敗(非阻塞): %s", e) + + # ── Step 5:持久化 DB ──────────────────────────────────────────────────── + metadata = { + "period": period, + "model": STRATEGY_MODEL, + "yesterday_revenue": yesterday_sales.get("revenue", 0), + "wow_pct": sales.get("wow_pct", 0), + "threat_count": len(threats), + "chart_count": len(charts), + "generated_at": now.isoformat(), + } + insight_id = _save_to_ai_insights( + insight_type="daily_report", + content=report_content, + confidence=0.85, + metadata=metadata, + period=yesterday.strftime("%Y-%m-%d"), + ) + action_items = _extract_action_items_daily(report_content) + _save_action_items(action_items, insight_id) + + # ── Step 6:Telegram 推播(圖文)──────────────────────────────────────── + try: + from services.telegram_templates import ( + daily_report_header, + send_report_with_charts, + _get_chat_ids, + ) + header = daily_report_header( + date_str=period, + revenue=yesterday_sales.get("revenue", 0), + wow=sales.get("wow_pct", 0), + threat_count=len(threats), + opportunity_count=competitor_summary.get("premium_count", 0), + ) + full_msg = header + "\n\n" + report_content + if charts: + send_report_with_charts(full_msg, charts, _get_chat_ids()) + else: + from services.telegram_templates import _send_telegram_raw + _send_telegram_raw(full_msg) + except Exception as e: + logger.error("[OpenClaw] 日報 Telegram 推播失敗: %s", e) + + logger.info("[OpenClaw] 日報完成 insight_id=%s charts=%d", insight_id, len(charts)) + return { + "status": "ok", + "report_type": "daily_report", + "insight_id": insight_id, + "period": period, + "chart_count": len(charts), + "action_count": len(action_items), + } + + +def generate_monthly_report() -> dict: + """ + OpenClaw 電商月報(每月1日 07:00) + + 流程: + 1. 讀取上月完整業績 + 品類分佈 + 競品趨勢 + 2. MCP 收集月度外部情報 + 3. Gemini 深度月度分析(完整版) + 4. 生成圖表:月度概覽 + 品類橫條 + 價格熱圖 + 5. 持久化 ai_insights(type='monthly_report') + 6. Telegram 圖文推播 + """ + now = datetime.now() + # 上個月 + first_of_this_month = now.replace(day=1) + last_month_end = first_of_this_month - timedelta(days=1) + last_month_start = last_month_end.replace(day=1) + period = last_month_end.strftime("%Y年%m月") + logger.info("[OpenClaw] 月報任務啟動 period=%s", period) + + # ── Step 1:DB 數據收集(上月完整數據)───────────────────────────────── + days_in_month = (first_of_this_month - last_month_start).days + sales = _fetch_monthly_sales_summary(last_month_start, last_month_end) + categories = _fetch_category_breakdown(days_in_month) + threats = _fetch_top_threats(10) + competitor_summary = _fetch_competitor_summary() + price_trend_data = _fetch_price_trend_summary(days_in_month) + + # ── Step 2:MCP 外部情報(月度版)─────────────────────────────────────── + mcp_data: Dict[str, str] = {} + try: + from services.mcp_collector_service import mcp_collector + mcp_data = mcp_collector.collect_all() + holiday_ctx = mcp_collector.get_holiday_context() + seasonal_ctx = mcp_collector.get_seasonal_context() + except Exception as e: + logger.warning("[OpenClaw] 月報 MCP 收集失敗(非阻塞): %s", e) + holiday_ctx = "" + seasonal_ctx = "" + + # ── Step 3:組建 Gemini Prompt ─────────────────────────────────────────── + system_prompt = """你是 OpenClaw 月報首席分析師,負責 momo 平台電商月度深度報告。 +語言:繁體中文(台灣用語)。格式:HTML標題 + 條列式數據。 +每個洞察必須有月度數字支撐,並與上月/去年同期比較。 +重點:月度趨勢、品類策略、定價最佳化、下月行動計畫。""" + + db_section = f""" +【{period} 業績總覽】 + 月營收:NT${sales.get('revenue', 0):,.0f} + MoM 變化:{sales.get('mom_pct', 0):+.1f}% + YoY 變化:{sales.get('yoy_pct', 0):+.1f}% + 活躍SKU數:{sales.get('sku_count', 0)} 個 + 平均客單價:NT${sales.get('avg_order_value', 0):,.0f} + +【品類業績分佈(TOP 10)】 +{_format_categories(categories)} + +【競品整體概況】 + 監控SKU:{competitor_summary.get('total_skus', 0)} 個 + 月均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}% + 被削價風險SKU:{competitor_summary.get('undercut_count', 0)} 個 + +【價格變動概況】 + 本月調價次數:{price_trend_data.get('price_changes', 0)} 次 + 平均調幅:{price_trend_data.get('avg_change_pct', 0):+.1f}% + 主動降價SKU:{price_trend_data.get('price_cuts', 0)} 個 + 主動提價SKU:{price_trend_data.get('price_raises', 0)} 個 +""" + + mcp_section = f""" +【MCP 外部情報(月度)】 +市場趨勢: +{mcp_data.get('market_trends', '(未取得)')[:600]} + +競品動態: +{mcp_data.get('competitor_intel', '(未取得)')[:500]} + +下月節日行事曆: +{holiday_ctx} +{mcp_data.get('holiday_calendar', '')[:400]} + +季節性洞察: +{seasonal_ctx} +{mcp_data.get('seasonal_insights', '')[:400]} +""" + + user_prompt = f"""請根據以下數據,產出 {period} 電商月度策略報告: + +{db_section} +{mcp_section} + +請按以下結構輸出(使用 HTML 標題,詳細分析): + +📅 {period} 電商月度報告 + +📊 月度業績總結 +(月營收 + MoM/YoY 變化 + 超出/低於預期分析 + 關鍵驅動因素) + +🏆 本月品類贏家 vs 輸家 +(成長最快3個品類 vs 衰退最嚴重3個品類,含原因分析) + +💰 本月定價策略回顧 +(調價效果評估 + 最佳定價案例 + 失誤案例 + 改進建議) + +⚔️ 競品月度分析 +(主要競爭對手動態 + 我方優劣勢 + 市場份額評估) + +📢 行銷效益評估 +(本月活動效果 + ROI 估算 + 最佳行銷時機分析) + +🔮 下月趨勢預測 +(季節性機會 + 節日活動規劃 + 風險預警 + 庫存建議) + +🎯 下月優先行動計畫 +(8-10條具體可執行任務,含時間節點和負責方向, + 格式:[週次/日期] 行動說明 → 預期效益) + +📈 Q{((now.month-1)//3)+1} 策略展望 +(季度目標設定 + 關鍵里程碑 + 需人工決策事項) + +語言:繁體中文,數據必須引用上方提供的實際數字。 +""" + + # ── Step 4:Gemini 生成 ─────────────────────────────────────────────────── + logger.info("[OpenClaw] 呼叫 Gemini 生成月報...") + report_content = _call_gemini(system_prompt, user_prompt, temperature=0.35) + + if not report_content: + logger.error("[OpenClaw] 月報 Gemini 呼叫失敗") + return {"status": "error", "report_type": "monthly_report", "error": "Gemini 呼叫失敗"} + + # ── Step 5:生成圖表 ───────────────────────────────────────────────────── + charts = [] + try: + from services.chart_generator_service import ( + monthly_overview_chart, + category_revenue_chart, + price_history_heatmap, + ) + overview_chart = monthly_overview_chart(6) + if overview_chart: + charts.append(("monthly_overview.png", overview_chart, f"📊 近6個月營收趨勢")) + + cat_chart = category_revenue_chart(days_in_month, period) + if cat_chart: + charts.append(("category_revenue.png", cat_chart, "🏆 品類業績分佈")) + + heatmap = price_history_heatmap(days_in_month) + if heatmap: + charts.append(("price_heatmap.png", heatmap, "🔥 品類價格熱圖")) + except Exception as e: + logger.warning("[OpenClaw] 月報圖表生成失敗(非阻塞): %s", e) + + # ── Step 6:持久化 DB ──────────────────────────────────────────────────── + action_items = _extract_action_items(report_content) + metadata = { + "period": period, + "model": STRATEGY_MODEL, + "monthly_revenue": sales.get("revenue", 0), + "mom_pct": sales.get("mom_pct", 0), + "yoy_pct": sales.get("yoy_pct", 0), + "category_count": len(categories), + "chart_count": len(charts), + "mcp_topics_collected": sum(1 for v in mcp_data.values() if v), + "action_count": len(action_items), + "generated_at": now.isoformat(), + } + insight_id = _save_to_ai_insights( + insight_type="monthly_report", + content=report_content, + confidence=0.90, + metadata=metadata, + period=last_month_end.strftime("%Y-%m"), + ) + _save_action_items(action_items, insight_id) + + # ── Step 7:Telegram 推播(圖文)──────────────────────────────────────── + try: + from services.telegram_templates import ( + monthly_report_header, + send_report_with_charts, + _get_chat_ids, + ) + top3 = [c.get("category", "N/A") for c in categories[:3]] or ["N/A"] + header = monthly_report_header( + month_str=period, + revenue=sales.get("revenue", 0), + mom=sales.get("mom_pct", 0), + yoy=sales.get("yoy_pct", 0), + top3_categories=top3, + ) + full_msg = header + "\n\n" + report_content + if charts: + send_report_with_charts(full_msg, charts, _get_chat_ids()) + else: + from services.telegram_templates import _send_telegram_raw + _send_telegram_raw(full_msg) + except Exception as e: + logger.error("[OpenClaw] 月報 Telegram 推播失敗: %s", e) + + logger.info("[OpenClaw] 月報完成 insight_id=%s charts=%d actions=%d", + insight_id, len(charts), len(action_items)) + return { + "status": "ok", + "report_type": "monthly_report", + "insight_id": insight_id, + "period": period, + "chart_count": len(charts), + "action_count": len(action_items), + } + + def generate_meta_analysis_report() -> str: """ AI 系統效能自我審視(每 6 小時 run_openclaw_meta_analysis_task 呼叫) @@ -660,6 +1026,141 @@ def generate_meta_analysis_report() -> str: # 輔助格式化函式 # ═══════════════════════════════════════════════════════════════════════════════ +def _fetch_yesterday_sales() -> Dict[str, Any]: + """昨日單日業績""" + session = get_session() + try: + row = session.execute(text(""" + SELECT + SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue, + COUNT(DISTINCT "商品ID") AS sku_count, + COUNT(*) AS order_count + FROM daily_sales_snapshot + WHERE snapshot_date::date = CURRENT_DATE - 1 + """)).fetchone() + if row: + return { + "revenue": float(row[0] or 0), + "sku_count": int(row[1] or 0), + "order_count": int(row[2] or 0), + } + return {"revenue": 0, "sku_count": 0, "order_count": 0} + except Exception as e: + logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e) + return {"revenue": 0, "sku_count": 0, "order_count": 0} + finally: + session.close() + + +def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Dict[str, Any]: + """上月業績彙總,含 MoM / YoY 比較""" + session = get_session() + try: + row = session.execute(text(""" + SELECT + SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue, + COUNT(DISTINCT "商品ID") AS sku_count, + COUNT(*) AS order_count, + AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value + FROM daily_sales_snapshot + WHERE snapshot_date::date BETWEEN :start AND :end + """), {"start": start_date.date(), "end": end_date.date()}).fetchone() + + revenue = float(row[0] or 0) if row else 0 + sku_count = int(row[1] or 0) if row else 0 + avg_order_value = float(row[3] or 0) if row else 0 + + # 上上月(MoM) + prev_start = (start_date - timedelta(days=1)).replace(day=1) + prev_end = start_date - timedelta(days=1) + prev_row = session.execute(text(""" + SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + FROM daily_sales_snapshot + WHERE snapshot_date::date BETWEEN :start AND :end + """), {"start": prev_start.date(), "end": prev_end.date()}).fetchone() + prev_revenue = float(prev_row[0] or 0) if prev_row else 0 + mom_pct = ((revenue - prev_revenue) / prev_revenue * 100) if prev_revenue else 0 + + # 去年同月(YoY) + yoy_start = start_date.replace(year=start_date.year - 1) + yoy_end = end_date.replace(year=end_date.year - 1) + yoy_row = session.execute(text(""" + SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue + FROM daily_sales_snapshot + WHERE snapshot_date::date BETWEEN :start AND :end + """), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone() + yoy_revenue = float(yoy_row[0] or 0) if yoy_row else 0 + yoy_pct = ((revenue - yoy_revenue) / yoy_revenue * 100) if yoy_revenue else 0 + + return { + "revenue": revenue, + "sku_count": sku_count, + "order_count": int(row[2] or 0) if row else 0, + "avg_order_value": avg_order_value, + "mom_pct": round(mom_pct, 1), + "yoy_pct": round(yoy_pct, 1), + } + except Exception as e: + logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e) + return {"revenue": 0, "sku_count": 0, "order_count": 0, + "avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0} + finally: + session.close() + + +def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]: + """近N天價格異動統計""" + session = get_session() + try: + row = session.execute(text(f""" + SELECT + COUNT(*) AS total_changes, + AVG(ABS(pr2.price - pr1.price) / pr1.price * 100) AS avg_change_pct, + SUM(CASE WHEN pr2.price < pr1.price THEN 1 ELSE 0 END) AS price_cuts, + SUM(CASE WHEN pr2.price > pr1.price THEN 1 ELSE 0 END) AS price_raises + FROM price_records pr2 + JOIN price_records pr1 ON pr1.product_id = pr2.product_id + AND pr1.timestamp = ( + SELECT MAX(timestamp) FROM price_records + WHERE product_id = pr2.product_id + AND timestamp < pr2.timestamp - INTERVAL '1 day' + ) + WHERE pr2.timestamp >= NOW() - INTERVAL '{days} days' + AND ABS(pr2.price - pr1.price) / pr1.price > 0.005 + """)).fetchone() + if row and row[0]: + return { + "price_changes": int(row[0]), + "avg_change_pct": round(float(row[1] or 0), 1), + "price_cuts": int(row[2] or 0), + "price_raises": int(row[3] or 0), + } + return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0} + except Exception as e: + logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e) + return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0} + finally: + session.close() + + +def _extract_action_items_daily(report_text: str) -> List[str]: + """從日報文字中解析「今日3件事」行動項目""" + lines = report_text.split("\n") + items = [] + in_action_section = False + for line in lines: + if "今日" in line and ("3件" in line or "行動" in line or "優先" in line): + in_action_section = True + continue + if in_action_section: + stripped = line.strip() + if stripped.startswith(("•", "-", "1.", "2.", "3.", "①", "②", "③")): + items.append(stripped.lstrip("•-①②③").strip().lstrip("123.").strip()) + elif stripped.startswith("") and items: + break + return items[:5] + + def _format_threats(threats: List[Dict]) -> str: if not threats: return " (無近期競價威脅)" diff --git a/services/telegram_templates.py b/services/telegram_templates.py index e4ca3b4..170c9ec 100644 --- a/services/telegram_templates.py +++ b/services/telegram_templates.py @@ -1,42 +1,62 @@ +""" +services/telegram_templates.py +Telegram 訊息模板系統 v2 + +═══ 訊息分類 ═══════════════════════════════════════════════ + 🚨 告警類 — price_alert_msg / threat_alert_msg / system_alert_msg + 📊 報告類 — daily_report_msg / weekly_report_msg / monthly_report_msg + 💰 決策類 — price_decision / batch_decision_msg + 🤖 系統類 — deploy_msg / heal_msg / meta_analysis_msg + 💡 洞察類 — triaged_alert / insight_summary_msg +═══════════════════════════════════════════════════════════ + +規範: + 1. 所有模板使用 HTML parse_mode + 2. 一律繁體中文,Agent 名稱保留英文(Hermes/NemoTron/OpenClaw/EA) + 3. 每則訊息須含:標題行 / 核心數據 / 建議行動(三段式) + 4. 圖文訊息使用 send_photo_with_caption(附說明文字) +""" + +import io import json import logging -from typing import Any, Dict, Optional - -from database.manager import get_session -from database.trend_models import TelegramUser +import os +from datetime import datetime +from typing import Any, Dict, List, Optional sys_log = logging.getLogger("TelegramTpl") -# ─── 常數 ──────────────────────────────────────────────── - TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN" TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS" -# ─── 工具:取得 Token 與 Chat ID(容錯) ───────────────── + +# ══════════════════════════════════════════════════════════════════════════════ +# 基礎工具 +# ══════════════════════════════════════════════════════════════════════════════ def _get_bot_token() -> Optional[str]: from dotenv import load_dotenv load_dotenv() - import os return os.getenv(TELEGRAM_BOT_TOKEN_ENV) + def _get_chat_ids() -> list: token = _get_bot_token() if not token: sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV) return [] - raw = __import__("os").getenv(TELEGRAM_CHAT_IDS_ENV, "[]") + raw = os.getenv(TELEGRAM_CHAT_IDS_ENV, "[]") try: return json.loads(raw) except json.JSONDecodeError: sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV) return [] -# ─── 原始發送(內部使用) ───────────────────────────────── def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None, parse_mode: str = "HTML") -> bool: + """發送純文字訊息""" import requests token = _get_bot_token() if not token: @@ -47,11 +67,7 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, chat_ids = [-1003940688311] # fallback url = f"https://api.telegram.org/bot{token}/sendMessage" - payload = { - "chat_id": chat_ids[0], - "text": text, - "parse_mode": parse_mode, - } + payload = {"chat_id": chat_ids[0], "text": text, "parse_mode": parse_mode} if reply_markup: payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False) try: @@ -64,125 +80,389 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, sys_log.error("[TelegramTpl] send 失敗: %s", e) return False -# ─── 公用模板 ───────────────────────────────────────────── -def alert(title: str, content: str, actions: Optional[list] = None) -> str: - """高危險警報(紅色)""" - msg = f"🚨 {title}\n\n{content}" - if actions: - msg += "\n\n" + "\n".join(f"• {a}" for a in actions) - return msg +def send_photo(photo_bytes: bytes, caption: str = "", + chat_ids: Optional[list] = None, + parse_mode: str = "HTML") -> bool: + """發送圖片(含說明文字),供圖表報告使用""" + import requests + token = _get_bot_token() + if not token or not photo_bytes: + return False + if chat_ids is None: + chat_ids = _get_chat_ids() + if not chat_ids: + chat_ids = [-1003940688311] -def warning(title: str, summary: str, details: Optional[dict] = None) -> str: - """中風險警告(橙色)""" - msg = f"⚠️ {title}\n\n{summary}" - if details: - msg += "\n\n細節:\n" + "\n".join(f"• {k}: {v}" for k, v in details.items()) - return msg + url = f"https://api.telegram.org/bot{token}/sendPhoto" + try: + r = requests.post( + url, + data={"chat_id": chat_ids[0], "caption": caption[:1024], + "parse_mode": parse_mode}, + files={"photo": ("chart.png", photo_bytes, "image/png")}, + timeout=30, + ) + if not r.ok: + sys_log.warning("[TelegramTpl] sendPhoto HTTP %s: %s", r.status_code, r.text[:200]) + return False + return True + except Exception as e: + sys_log.error("[TelegramTpl] sendPhoto 失敗: %s", e) + return False -def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str: - """普通信息(藍色)""" - t_str = f" · {time}" if time else "" - return f"📊 {title} [{module}]{t_str}\n\n{content}" -def success(title: str, module: str, stats: str = "") -> str: - """成功通知(綠色)""" - return f"✅ {title} [{module}]\n{stats}" +def send_report_with_charts(text_msg: str, charts: List[Optional[bytes]], + chat_ids: Optional[list] = None) -> bool: + """先發文字報告,再逐張發送圖表""" + ok = _send_telegram_raw(text_msg, chat_ids=chat_ids) + for i, chart in enumerate(charts): + if chart: + send_photo(chart, caption=f"圖表 {i+1}/{len(charts)}", chat_ids=chat_ids) + return ok -def price_decision( - product_name: str, - product_sku: str, - current_price: float, - suggested_price: float, - reason: str, - insight_id: Optional[int] = None, -) -> tuple: + +# ══════════════════════════════════════════════════════════════════════════════ +# 🚨 告警類模板 +# ══════════════════════════════════════════════════════════════════════════════ + +def price_alert_msg(threats: List[Dict], analysis_period: str = "近48h") -> str: """ - 降價決策通知(含 Inline Keyboard)。 - 回傳 (message_text, reply_markup) + 競品削價告警(Hermes + NemoTron 偵測結果) + ┌─────────────────────────────────┐ + │ 🚨 競品削價告警 · N 個 SKU │ + │ ─────────────────────────────── │ + │ ⚠️ SKU名稱 MOMO $X → 競品 $Y │ + │ 價差 +X% 業績跌幅 -Y% │ + └─────────────────────────────────┘ """ + n = len(threats) + high = [t for t in threats if t.get("risk") == "HIGH"] + lines = [ + f"🚨 競品削價告警 · 偵測到 {n} 個 SKU [{analysis_period}]", + f"🔴 高危:{len(high)} 個  🟡 中危:{n - len(high)} 個", + "─" * 32, + ] + for t in threats[:8]: + risk_icon = "🔴" if t.get("risk") == "HIGH" else "🟡" + name = str(t.get("name", t.get("sku", "")))[:22] + gap = float(t.get("gap_pct", 0)) + delta = float(t.get("sales_7d_delta_pct", t.get("sales_delta", 0))) + momo_p = t.get("momo_price") + pchome_p = t.get("pchome_price") + price_str = f" MOMO ${momo_p:,.0f} vs 競品 ${pchome_p:,.0f}" \ + if momo_p and pchome_p else "" + lines.append( + f"{risk_icon} {name}\n" + f" 價差 {gap:+.1f}% 業績週跌 {delta:+.1f}%{price_str}" + ) + if n > 8: + lines.append(f"…另有 {n-8} 個 SKU(查看完整報告)") + lines += ["─" * 32, + "💡 建議:優先處理紅色高危品項,確認競品是否促銷或長期低價"] + return "\n".join(lines) + + +def threat_alert_msg(sku: str, name: str, momo_price: float, + comp_price: float, gap_pct: float, + sales_delta: float, action: str, confidence: float) -> str: + """單品威脅告警(NemoTron 派發)""" + risk = "🔴 高危" if gap_pct > 15 else "🟡 中危" if gap_pct > 5 else "🟢 低危" + return ( + f"{risk} 競品威脅通報\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"🏷️ {name} {sku}\n\n" + f"💴 MOMO  NT${momo_price:,.0f}\n" + f"💴 競品(PChome)NT${comp_price:,.0f}\n" + f"📉 價差  {gap_pct:+.1f}%(我方較貴)\n" + f"📊 業績週變 {sales_delta:+.1f}%\n\n" + f"🤖 AI 建議:{action}\n" + f"📈 信心度:{confidence:.0%}\n" + f"━━━━━━━━━━━━━━━━━━━━" + ) + + +def system_alert_msg(level: str, title: str, detail: str, source: str = "") -> str: + """系統級告警(AIOps / 部署失敗等)""" + icons = {"critical": "🚨", "error": "❌", "warning": "⚠️", "info": "ℹ️"} + icon = icons.get(level, "⚠️") + src = f" [{source}]" if source else "" + return ( + f"{icon} {title}{src}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"{detail[:600]}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"🕐 {datetime.now().strftime('%m/%d %H:%M')}" + ) + + +# ══════════════════════════════════════════════════════════════════════════════ +# 📊 報告類模板(三種週期) +# ══════════════════════════════════════════════════════════════════════════════ + +def daily_report_header(date_str: str, revenue: float, wow: float, + threat_count: int, opportunity_count: int) -> str: + """ + 日報標題卡(附圖前發送的文字說明) + """ + wow_icon = "📈" if wow >= 0 else "📉" + wow_color = "+" if wow >= 0 else "" + return ( + f"📊 EwoooC 電商日報 · {date_str}\n" + f"══════════════════════════\n" + f"💰 今日業績  NT${revenue/10000:.1f} 萬\n" + f"{wow_icon} 週同比   {wow_color}{wow:.1f}%\n" + f"🔴 競品威脅  {threat_count} 個 SKU\n" + f"🟢 市場機會  {opportunity_count} 個 SKU\n" + f"══════════════════════════\n" + f"🤖 Hermes + NemoTron + OpenClaw 聯合分析" + ) + + +def weekly_report_header(period: str, curr_rev: float, prev_rev: float, + wow: float, top_category: str) -> str: + """週報標題卡""" + wow_icon = "📈" if wow >= 0 else "📉" + return ( + f"📊 EwoooC 電商週報 · {period}\n" + f"══════════════════════════\n" + f"💰 本週業績  NT${curr_rev/10000:.1f} 萬\n" + f"📦 前週業績  NT${prev_rev/10000:.1f} 萬\n" + f"{wow_icon} 週成長率  {wow:+.1f}%\n" + f"🏆 最強品類  {top_category}\n" + f"══════════════════════════\n" + f"🤖 OpenClaw × MCP 全景策略分析" + ) + + +def monthly_report_header(month_str: str, revenue: float, mom: float, + yoy: float, top3_categories: List[str]) -> str: + """月報標題卡""" + mom_icon = "📈" if mom >= 0 else "📉" + yoy_icon = "🚀" if yoy >= 10 else "📈" if yoy >= 0 else "📉" + cats = " / ".join(top3_categories[:3]) + return ( + f"📅 EwoooC 電商月報 · {month_str}\n" + f"══════════════════════════\n" + f"💰 月度業績  NT${revenue/10000:.0f} 萬\n" + f"{mom_icon} 月成長率  {mom:+.1f}% {yoy_icon} 年成長 {yoy:+.1f}%\n" + f"🏆 TOP 3 品類 {cats}\n" + f"══════════════════════════\n" + f"🤖 OpenClaw × Hermes × MCP 月度全景洞察" + ) + + +def report_section(icon: str, title: str, lines: List[str]) -> str: + """通用報告節段(供日/週/月報各節使用)""" + body = "\n".join(f" {l}" for l in lines) + return f"\n{icon} {title}\n{body}" + + +# ══════════════════════════════════════════════════════════════════════════════ +# 💰 決策類模板 +# ══════════════════════════════════════════════════════════════════════════════ + +def price_decision(product_name: str, product_sku: str, + current_price: float, suggested_price: float, + reason: str, insight_id: Optional[int] = None) -> tuple: + """降 / 提價決策通知(含 Inline Keyboard)""" diff = current_price - suggested_price - if diff > 0: - action_text = f"降價 ${diff:,.0f}" - elif diff < 0: - action_text = f"提價 ${-diff:,.0f}" - else: - action_text = "維持" + action_text = f"降價 NT${diff:,.0f}" if diff > 0 else \ + f"提價 NT${-diff:,.0f}" if diff < 0 else "維持現價" + direction = "📉" if diff > 0 else "📈" if diff < 0 else "➡️" message = ( - f"💰 自動降價建議\n" - f"商品:{product_name} (SKU: {product_sku})\n" - f"現價:${current_price:,.0f} → 建議:${suggested_price:,.0f}\n" - f"原因:{reason}\n" + f"💰 AI 定價決策建議\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"🏷️ {product_name} {product_sku}\n\n" + f"現價:NT${current_price:,.0f}\n" + f"建議:NT${suggested_price:,.0f} {direction} {action_text}\n\n" + f"💡 依據:{reason}\n" ) if insight_id: - message += f"洞察 ID:{insight_id}\n" + message += f"🔗 洞察 ID:{insight_id}\n" + message += f"━━━━━━━━━━━━━━━━━━━━" - keyboard = { - "inline_keyboard": [ - [ - {"text": "✅ 確認執行", "callback_data": f"price_decision:approve:{product_sku}"}, - {"text": "❌ 拒絕", "callback_data": f"price_decision:reject:{product_sku}"}, - ], - [ - {"text": "📊 查看洞察", "url": f"https://your-dashboard.example/insight/{insight_id}" if insight_id else "#"}, - ], - ] - } + keyboard = {"inline_keyboard": [[ + {"text": "✅ 確認執行", "callback_data": f"momo:price_decision:approve:{product_sku}"}, + {"text": "❌ 拒絕", "callback_data": f"momo:price_decision:reject:{product_sku}"}, + ]]} return message, keyboard -def triaged_alert( - base_event: Dict[str, Any], - tier_label: str, - ai_summary: str, - ai_cause: Optional[str] = None, - ai_actions: Optional[list] = None, - ai_executed: Optional[list] = None, -) -> str: - """ - L1/L2 整合通知(帶 AI 摘要與可執行動作)。 - """ - msg = ( - f"⚡ {tier_label} · {base_event.get('event_type', 'alert')}\n" - f"📌 {base_event.get('title')}\n\n" + +def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple: + """批次定價決策(多 SKU 一次確認)""" + lines = [f"💰 批次定價決策 · 共 {len(items)} 個 SKU\n━━━━━━━━━━━━━━━━━━━━"] + for i, item in enumerate(items[:6], 1): + diff = item.get("current_price", 0) - item.get("suggested_price", 0) + direction = "📉" if diff > 0 else "📈" + lines.append( + f"{i}. {direction} {item.get('name','')[:20]}\n" + f" ${item.get('current_price',0):,.0f} → ${item.get('suggested_price',0):,.0f}" + ) + if len(items) > 6: + lines.append(f"…另有 {len(items)-6} 個 SKU") + lines.append("━━━━━━━━━━━━━━━━━━━━") + keyboard = {"inline_keyboard": [[ + {"text": f"✅ 全部確認({len(items)}項)", + "callback_data": f"momo:batch_decision:approve:{batch_id}"}, + {"text": "❌ 取消", + "callback_data": f"momo:batch_decision:reject:{batch_id}"}, + ]]} + return "\n".join(lines), keyboard + + +# ══════════════════════════════════════════════════════════════════════════════ +# 🤖 系統類模板 +# ══════════════════════════════════════════════════════════════════════════════ + +def deploy_msg(status: str, branch: str, commit: str, duration: str = "") -> str: + """CI/CD 部署通知""" + icons = {"success": "✅", "failed": "❌", "started": "🚀"} + icon = icons.get(status, "ℹ️") + dur = f" 耗時 {duration}" if duration else "" + return ( + f"{icon} 部署{status.upper()}{dur}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"🌿 分支:{branch}\n" + f"🔖 Commit:{commit[:12]}\n" + f"🕐 {datetime.now().strftime('%m/%d %H:%M')}" ) + + +def heal_msg(status: str, error_type: str, target_file: str, + commit_sha: str = "", diff_lines: int = 0) -> str: + """AiderHeal 自動修復通知""" + if status == "started": + icon, title = "🔧", "AiderHeal 啟動" + elif status == "success": + icon, title = "✅", "AiderHeal 修復完成" + elif status == "reverted": + icon, title = "🔄", "AiderHeal 自動回滾" + else: + icon, title = "❌", "AiderHeal 失敗" + + lines = [f"{icon} {title}", "━━━━━━━━━━━━━━━━━━━━", + f"🐛 錯誤類型:{error_type}", + f"📄 目標檔案:{target_file}"] + if commit_sha: + lines.append(f"🔖 Commit:{commit_sha[:12]}") + if diff_lines: + lines.append(f"📝 修改行數:{diff_lines} 行") + lines += ["━━━━━━━━━━━━━━━━━━━━", + f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"] + return "\n".join(lines) + + +def meta_analysis_msg(period: str, content: str) -> str: + """AI 系統自我審視報告""" + return ( + f"🤖 AI 系統效能自我審視 · {period}\n" + f"══════════════════════════\n" + f"{content[:1200]}\n" + f"══════════════════════════\n" + f"由 OpenClaw 定期分析 · {datetime.now().strftime('%m/%d %H:%M')}" + ) + + +# ══════════════════════════════════════════════════════════════════════════════ +# 💡 洞察類模板 +# ══════════════════════════════════════════════════════════════════════════════ + +def triaged_alert(base_event: Dict[str, Any], tier_label: str, + ai_summary: str, ai_cause: Optional[str] = None, + ai_actions: Optional[list] = None, + ai_executed: Optional[list] = None) -> tuple: + """EA L1/L2 自主執行通知(保留原有介面,升級排版)""" + event_type = base_event.get("event_type", "alert") + title = base_event.get("title", "") summary = base_event.get("summary", "") + event_id = base_event.get("id", "unknown") + + lines = [ + f"⚡ {tier_label} · {event_type}", + f"📌 {title}", + "", + ] if summary: - msg += f"🔍 概要:{summary}\n\n" + lines += [f"🔍 概要:{summary}", ""] if ai_summary: - msg += f"🧠 AI 摘要:{ai_summary}\n\n" + lines += [f"🧠 AI 摘要:{ai_summary[:400]}", ""] if ai_cause: - msg += f"💡 可能原因:{ai_cause}\n\n" + lines += [f"💡 可能原因:{ai_cause}", ""] if ai_actions: - msg += "📋 建議行動:\n" + "\n".join(f"• {a}" for a in ai_actions) + "\n\n" + lines += ["📋 建議行動:"] + [f" • {a}" for a in ai_actions] + [""] if ai_executed: - msg += "✅ 已執行:\n" + "\n".join(f"• {a}" for a in ai_executed) + "\n\n" + lines += ["✅ 已執行:"] + [f" • {a}" for a in ai_executed] + [""] trace = base_event.get("trace") if trace: - msg += f"
{trace[-500:]}
" + lines.append(f"
{trace[-400:]}
") - # W2-D: momo: prefix 強制(共用 Bot 鐵律,ADR-011) - event_id = base_event.get("id", "unknown") - keyboard = { - "inline_keyboard": [ - [{"text": "📊 查看详情", "url": f"https://dashboard.example/event/{event_id}"}], - [{"text": "🛑 忽略此事件", "callback_data": f"momo:event_ignore:{event_id}"}], - ] + keyboard = {"inline_keyboard": [ + [{"text": "🛑 忽略此事件", + "callback_data": f"momo:event_ignore:{event_id}"}], + ]} + return "\n".join(lines), keyboard + + +def insight_summary_msg(insights: List[Dict], period: str = "近24h") -> str: + """AI 洞察摘要彙整(供定期推播)""" + n = len(insights) + lines = [ + f"💡 AI 洞察摘要 · {period} 共 {n} 筆", + "━━━━━━━━━━━━━━━━━━━━", + ] + type_icons = { + "price_alert": "🔴", "recommendation": "💰", "weekly_strategy": "📊", + "meta_analysis": "🤖", "market_opportunity": "🟢", "mcp_cache": "🌐", } - return msg, keyboard + for ins in insights[:6]: + icon = type_icons.get(ins.get("insight_type", ""), "💡") + content_preview = str(ins.get("content", ""))[:80].replace("\n", " ") + conf = float(ins.get("confidence", 0)) + lines.append(f"{icon} {content_preview}… ({conf:.0%})") + if n > 6: + lines.append(f"…另有 {n-6} 筆洞察") + lines.append("━━━━━━━━━━━━━━━━━━━━") + return "\n".join(lines) + + +# ══════════════════════════════════════════════════════════════════════════════ +# 舊版相容介面(保留供現有程式碼調用) +# ══════════════════════════════════════════════════════════════════════════════ + +def alert(title: str, content: str, actions: Optional[list] = None) -> str: + msg = system_alert_msg("error", title, content) + if actions: + msg += "\n" + "\n".join(f" • {a}" for a in actions) + return msg + + +def warning(title: str, summary: str, details: Optional[dict] = None) -> str: + detail_str = "\n".join(f" {k}: {v}" for k, v in (details or {}).items()) + return system_alert_msg("warning", title, summary + ("\n" + detail_str if detail_str else "")) + + +def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str: + t_str = f" · {time}" if time else "" + return f"ℹ️ {title} [{module}]{t_str}\n\n{content}" + + +def success(title: str, module: str, stats: str = "") -> str: + return f"✅ {title} [{module}]\n{stats}" + def report(title: str, report_type: str, period: str, content_md: str) -> str: - """策略/週報模板""" + icons = {"weekly_strategy": "📊", "daily": "📅", "monthly": "📆", "meta_analysis": "🤖"} + icon = icons.get(report_type, "📋") return ( - f"📊 {title} ({report_type})\n" - f"期間:{period}\n\n" + f"{icon} {title} ({report_type})\n" + f"📅 期間:{period}\n" + f"══════════════════════════\n" f"{content_md}" ) -def success(title: str, module: str, stats: str = "") -> str: - """成功通知(綠色)""" - return f"✅ {title} [{module}]\n{stats}" def _send_telegram(msg: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None) -> bool: