feat(reports): 新增日報/月報系統,整合圖表推播至 Telegram
All checks were successful
CD Pipeline / deploy (push) Successful in 4m51s

- services/openclaw_strategist_service.py:新增 generate_daily_report()(每日09:00業績快報+競品威脅+2圖表)和 generate_monthly_report()(每月1日07:00月度全景洞察+3圖表+MoM/YoY比較)
- services/chart_generator_service.py:新建圖表生成服務(6種深色商業圖表,revenue_trend / category_revenue / monthly_overview / price_gap / price_history_heatmap / price_trend)
- services/telegram_templates.py:重建訊息模板系統(5類模板:告警/報告/決策/系統/洞察)、新增 send_photo + send_report_with_charts 圖文推播
- scheduler.py:新增 run_daily_report_task / run_monthly_report_task(含 auto_heal 保護)
- run_scheduler.py:每日09:00日報 + 每月1日07:00月報排程(月報用每日gate判斷day==1)
- requirements.txt:新增 matplotlib + matplotlib-inline

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-21 15:17:48 +08:00
parent 784a3135c1
commit 38200a5e93
6 changed files with 1439 additions and 103 deletions

View File

@@ -23,4 +23,6 @@ lxml
prometheus-client
python-telegram-bot[job-queue]
paramiko # ADR-013: AIOps SSH 跳板修復
python-pptx # ADR-014: PPT 簡報系統
python-pptx # ADR-014: PPT 簡報系統
matplotlib # 圖表生成(日報/週報/月報)
matplotlib-inline # Jupyter 相容層(可選)

View File

@@ -8,8 +8,9 @@ run_scheduler.py — momo-scheduler 容器入口點
每 4 小時competitor_price_feeder、icaim_analysis
每 6 小時openclaw_meta_analysis、quality_rescore
每 12 小時dedup_batch
每 1 天 db_backup、backup_monitor
每 1 天 db_backup03:00、backup_monitor04:00、daily_report09:00
每 1 週 weekly_strategy週一 06:00
每 1 月 monthly_report每月1日 07:00
"""
import asyncio
import logging
@@ -33,6 +34,8 @@ from scheduler import (
run_openclaw_meta_analysis_task,
run_dedup_batch_task,
run_quality_rescore_task,
run_daily_report_task,
run_monthly_report_task,
)
logging.basicConfig(
@@ -82,6 +85,18 @@ def _register_schedules():
schedule.every().monday.at("06:00").do(run_weekly_strategy_task)
logger.info("📅 每週一 06:00weekly_strategy")
schedule.every().day.at("09:00").do(run_daily_report_task)
logger.info("📅 每日 09:00daily_report")
# 每月1日 07:00 月報schedule 不支援 every().month用每日 07:00 + 日期判斷)
def _monthly_report_gate():
from datetime import datetime as _dt
if _dt.now().day == 1:
run_monthly_report_task()
schedule.every().day.at("07:00").do(_monthly_report_gate)
logger.info("📅 每月1日 07:00monthly_report")
def _run_elephant_alpha_engine():
"""Daemon thread: ElephantAlpha 自主監控引擎(獨立 asyncio loop"""

View File

@@ -1949,6 +1949,56 @@ def run_quality_rescore_task():
_save_stats('quality_rescore', {"status": "Error", "error": str(e)})
def run_daily_report_task():
"""每日 09:00 — OpenClaw 電商日報(業績快報 + 競品威脅 + 圖表推播)"""
try:
from services.openclaw_strategist_service import generate_daily_report
result = generate_daily_report()
logging.info(
f"[Scheduler] [DailyReport] ✅ 完成 | period={result.get('period')} "
f"charts={result.get('chart_count', 0)} actions={result.get('action_count', 0)}"
)
_save_stats('daily_report', result)
except Exception as e:
import traceback as _tb
logging.error(f"[Scheduler] [DailyReport] 🚨 日報任務異常: {e}")
_save_stats('daily_report', {"status": "Error", "error": str(e)})
try:
from services.auto_heal_service import auto_heal_service
auto_heal_service.handle_exception(
task_name="run_daily_report_task",
exception=e,
traceback_str=_tb.format_exc(),
)
except Exception as _heal_e:
logging.error(f"[Scheduler] [DailyReport] auto_heal_service 失敗: {_heal_e}")
def run_monthly_report_task():
"""每月1日 07:00 — OpenClaw 電商月報(月度全景洞察 + 多圖表推播)"""
try:
from services.openclaw_strategist_service import generate_monthly_report
result = generate_monthly_report()
logging.info(
f"[Scheduler] [MonthlyReport] ✅ 完成 | period={result.get('period')} "
f"charts={result.get('chart_count', 0)} actions={result.get('action_count', 0)}"
)
_save_stats('monthly_report', result)
except Exception as e:
import traceback as _tb
logging.error(f"[Scheduler] [MonthlyReport] 🚨 月報任務異常: {e}")
_save_stats('monthly_report', {"status": "Error", "error": str(e)})
try:
from services.auto_heal_service import auto_heal_service
auto_heal_service.handle_exception(
task_name="run_monthly_report_task",
exception=e,
traceback_str=_tb.format_exc(),
)
except Exception as _heal_e:
logging.error(f"[Scheduler] [MonthlyReport] auto_heal_service 失敗: {_heal_e}")
if __name__ == "__main__":
# 此檔案現在由 app.py 導入並由其主執行緒管理排程。
# 若需獨立測試,可在此處臨時加入調用程式碼。

View File

@@ -0,0 +1,488 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/chart_generator_service.py
專業圖表生成服務
供日報 / 週報 / 月報 + Telegram 圖文通知使用。
所有圖表回傳 BytesIO可直接傳 Telegram sendPhoto
圖表清單:
price_trend_chart(sku, days) — 單 SKU 30天 MOMO vs 競品價格趨勢
category_revenue_chart(days) — 品類業績橫條圖
revenue_trend_chart(days) — 每日業績折線圖(含 MoM 基準線)
price_gap_distribution_chart(threats) — 競價威脅價差分佈
monthly_overview_chart(months) — 月份業績對比柱狀圖
top_sku_heatmap(days) — TOP SKU 業績熱力圖
"""
import io
import logging
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
logger = logging.getLogger(__name__)
# ── Matplotlib 無頭模式初始化 ────────────────────────────────────────────────
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
_MPL_OK = True
except ImportError:
_MPL_OK = False
logger.warning("[Chart] matplotlib 未安裝,圖表功能停用")
# ── 視覺設定(深色商業風格) ──────────────────────────────────────────────────
_BG = "#1a1a2e"
_PANEL = "#16213e"
_ACCENT1 = "#e94560" # 紅MOMO / 警示
_ACCENT2 = "#0f3460" # 深藍:競品
_ACCENT3 = "#533483" # 紫:歷史基準
_GREEN = "#00b4d8" # 青綠:業績正成長
_YELLOW = "#ffd166" # 黃:中性 / 警示
_TEXT = "#e0e0e0"
_SUBTEXT = "#888888"
_GRID = "#2a2a4a"
plt_params = {
"figure.facecolor": _BG,
"axes.facecolor": _PANEL,
"axes.edgecolor": _GRID,
"axes.labelcolor": _TEXT,
"xtick.color": _SUBTEXT,
"ytick.color": _SUBTEXT,
"text.color": _TEXT,
"grid.color": _GRID,
"grid.linestyle": "--",
"grid.alpha": 0.5,
"font.family": ["DejaVu Sans", "sans-serif"],
"font.size": 10,
}
def _apply_style():
if _MPL_OK:
plt.rcParams.update(plt_params)
def _fig_to_bytes(fig) -> Optional[bytes]:
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=150, bbox_inches="tight",
facecolor=fig.get_facecolor())
plt.close(fig)
buf.seek(0)
return buf.read()
def _unavailable() -> None:
return None
# ═══════════════════════════════════════════════════════════════════════════════
# 資料讀取層(與 DB 解耦,各函式自行呼叫)
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_price_history(sku: str, days: int = 30) -> Dict[str, Any]:
"""讀取單 SKU 的 MOMO + 競品歷史價格"""
try:
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
momo_rows = session.execute(text("""
SELECT pr.timestamp::date AS dt, AVG(pr.price) AS price
FROM price_records pr
JOIN products p ON p.id = pr.product_id
WHERE p.i_code = :sku
AND pr.timestamp >= NOW() - INTERVAL ':days days'
GROUP BY dt ORDER BY dt
""".replace(":days", str(days))), {"sku": sku}).fetchall()
comp_rows = session.execute(text("""
SELECT crawled_at::date AS dt, AVG(price) AS price
FROM competitor_prices
WHERE sku = :sku
AND source = 'pchome'
AND crawled_at >= NOW() - INTERVAL ':days days'
GROUP BY dt ORDER BY dt
""".replace(":days", str(days))), {"sku": sku}).fetchall()
return {
"momo": [(str(r[0]), float(r[1])) for r in momo_rows],
"pchome":[(str(r[0]), float(r[1])) for r in comp_rows],
}
finally:
session.close()
except Exception as e:
logger.warning("[Chart] price_history 讀取失敗: %s", e)
return {"momo": [], "pchome": []}
def _fetch_daily_revenue(days: int = 30) -> List[Dict]:
try:
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
rows = session.execute(text(f"""
SELECT snapshot_date::date AS dt,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date >= CURRENT_DATE - {days}
GROUP BY dt ORDER BY dt
""")).fetchall()
return [{"date": str(r[0]), "revenue": float(r[1] or 0)} for r in rows]
finally:
session.close()
except Exception as e:
logger.warning("[Chart] daily_revenue 讀取失敗: %s", e)
return []
def _fetch_category_revenue(days: int = 7) -> List[Dict]:
try:
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
rows = session.execute(text(f"""
SELECT p.category,
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot s
JOIN products p ON p.name = s."商品名稱"
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
AND p.status = 'ACTIVE'
AND p.category IS NOT NULL
GROUP BY p.category
ORDER BY revenue DESC LIMIT 12
""")).fetchall()
return [{"category": r[0] or "未分類", "revenue": float(r[1] or 0)} for r in rows]
finally:
session.close()
except Exception as e:
logger.warning("[Chart] category_revenue 讀取失敗: %s", e)
return []
def _fetch_monthly_revenue(months: int = 6) -> List[Dict]:
try:
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
rows = session.execute(text(f"""
SELECT DATE_TRUNC('month', snapshot_date)::date AS mon,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date >= NOW() - INTERVAL '{months} months'
GROUP BY mon ORDER BY mon
""")).fetchall()
return [{"month": str(r[0])[:7], "revenue": float(r[1] or 0)} for r in rows]
finally:
session.close()
except Exception as e:
logger.warning("[Chart] monthly_revenue 讀取失敗: %s", e)
return []
# ═══════════════════════════════════════════════════════════════════════════════
# 圖表函式
# ═══════════════════════════════════════════════════════════════════════════════
def price_trend_chart(sku: str, sku_name: str = "", days: int = 30) -> Optional[bytes]:
"""單 SKUMOMO vs PChome 價格趨勢折線圖"""
if not _MPL_OK:
return _unavailable()
data = _fetch_price_history(sku, days)
if not data["momo"]:
return None
_apply_style()
fig, ax = plt.subplots(figsize=(10, 4))
fig.patch.set_facecolor(_BG)
if data["momo"]:
dates = [r[0] for r in data["momo"]]
prices = [r[1] for r in data["momo"]]
ax.plot(dates, prices, color=_ACCENT1, linewidth=2.5, marker="o",
markersize=4, label="MOMO 自家", zorder=3)
ax.fill_between(dates, prices, alpha=0.15, color=_ACCENT1)
if data["pchome"]:
dates2 = [r[0] for r in data["pchome"]]
prices2 = [r[1] for r in data["pchome"]]
ax.plot(dates2, prices2, color=_GREEN, linewidth=2, linestyle="--",
marker="s", markersize=3, label="PChome", zorder=2)
title = f"{' ' + sku_name if sku_name else sku} · {days}日價格趨勢"
ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10)
ax.set_ylabel("價格 (NT$)", color=_TEXT)
ax.legend(loc="upper left", framealpha=0.3, facecolor=_PANEL, edgecolor=_GRID)
ax.grid(True, alpha=0.4)
ax.tick_params(axis="x", rotation=30, labelsize=8)
ax.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
lambda x, _: f"${x:,.0f}"))
# 標註最新價差
if data["momo"] and data["pchome"]:
m_last = data["momo"][-1][1]
p_last = data["pchome"][-1][1]
gap = (m_last - p_last) / m_last * 100
color = _ACCENT1 if gap > 0 else _GREEN
ax.annotate(f"現價差 {gap:+.1f}%", xy=(0.98, 0.05),
xycoords="axes fraction", ha="right",
fontsize=10, fontweight="bold", color=color,
bbox=dict(boxstyle="round,pad=0.3", facecolor=_PANEL, alpha=0.8))
_add_watermark(ax)
fig.tight_layout()
return _fig_to_bytes(fig)
def revenue_trend_chart(days: int = 30, title_suffix: str = "") -> Optional[bytes]:
"""每日業績折線圖(含 MoM 同期基準線)"""
if not _MPL_OK:
return _unavailable()
data = _fetch_daily_revenue(days * 2) # 取雙倍天數供 MoM 對比
if not data:
return None
curr = data[-days:] if len(data) >= days else data
prev = data[:-days] if len(data) >= days * 2 else []
_apply_style()
fig, ax = plt.subplots(figsize=(12, 4))
if curr:
labels = [d["date"] for d in curr]
vals = [d["revenue"] for d in curr]
ax.plot(labels, vals, color=_GREEN, linewidth=2.5, marker="o",
markersize=4, label="本期", zorder=3)
ax.fill_between(labels, vals, alpha=0.2, color=_GREEN)
# 7天移動平均
if len(vals) >= 7:
ma7 = np.convolve(vals, np.ones(7)/7, mode="valid")
ax.plot(labels[6:], ma7, color=_YELLOW, linewidth=1.5,
linestyle=":", label="7MA", zorder=2, alpha=0.8)
if prev:
labels2 = [d["date"] for d in prev[-len(curr):]]
vals2 = [d["revenue"] for d in prev[-len(curr):]]
ax.plot(labels2, vals2, color=_SUBTEXT, linewidth=1.5, linestyle="--",
label="前期同比", zorder=1, alpha=0.6)
title = f"業績趨勢 · 近 {days}{title_suffix}"
ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10)
ax.set_ylabel("日業績 (NT$)", color=_TEXT)
ax.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
lambda x, _: f"${x/10000:.1f}"))
ax.legend(loc="upper left", framealpha=0.3, facecolor=_PANEL, edgecolor=_GRID)
ax.grid(True, alpha=0.4)
ax.tick_params(axis="x", rotation=30, labelsize=7)
# 標注最高/最低點
if curr:
vals = [d["revenue"] for d in curr]
max_i, min_i = vals.index(max(vals)), vals.index(min(vals))
ax.annotate(f"↑ ${vals[max_i]/10000:.1f}",
xy=(curr[max_i]["date"], vals[max_i]),
xytext=(0, 8), textcoords="offset points",
ha="center", fontsize=8, color=_GREEN)
ax.annotate(f"↓ ${vals[min_i]/10000:.1f}",
xy=(curr[min_i]["date"], vals[min_i]),
xytext=(0, -14), textcoords="offset points",
ha="center", fontsize=8, color=_ACCENT1)
_add_watermark(ax)
fig.tight_layout()
return _fig_to_bytes(fig)
def category_revenue_chart(days: int = 7, title_suffix: str = "") -> Optional[bytes]:
"""品類業績橫條圖"""
if not _MPL_OK:
return _unavailable()
data = _fetch_category_revenue(days)
if not data:
return None
_apply_style()
n = min(len(data), 10)
cats = [d["category"][:10] for d in data[:n]][::-1]
revs = [d["revenue"] for d in data[:n]][::-1]
max_r = max(revs) if revs else 1
fig, ax = plt.subplots(figsize=(10, max(4, n * 0.55)))
colors = [_GREEN if r == max(revs) else (_ACCENT1 if r < max_r * 0.2 else _ACCENT2)
for r in revs]
bars = ax.barh(cats, revs, color=colors, alpha=0.85, edgecolor=_GRID, linewidth=0.5)
for bar, val in zip(bars, revs):
ax.text(bar.get_width() + max_r * 0.01, bar.get_y() + bar.get_height() / 2,
f"${val/10000:.1f}", va="center", fontsize=9, color=_TEXT)
ax.set_title(f"品類業績排行 · 近 {days}{title_suffix}",
fontsize=12, fontweight="bold", color=_TEXT, pad=10)
ax.set_xlabel("業績 (NT$)", color=_TEXT)
ax.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
lambda x, _: f"${x/10000:.0f}"))
ax.grid(True, axis="x", alpha=0.4)
ax.set_xlim(0, max_r * 1.18)
_add_watermark(ax)
fig.tight_layout()
return _fig_to_bytes(fig)
def monthly_overview_chart(months: int = 6) -> Optional[bytes]:
"""月份業績對比柱狀圖(含 MoM 成長率折線)"""
if not _MPL_OK:
return _unavailable()
data = _fetch_monthly_revenue(months)
if not data:
return None
_apply_style()
labels = [d["month"] for d in data]
vals = [d["revenue"] for d in data]
mom = [0] + [(vals[i] - vals[i-1]) / vals[i-1] * 100
if vals[i-1] else 0 for i in range(1, len(vals))]
fig, ax1 = plt.subplots(figsize=(10, 4.5))
ax2 = ax1.twinx()
bar_colors = [_GREEN if m >= 0 else _ACCENT1 for m in mom]
bars = ax1.bar(labels, vals, color=bar_colors, alpha=0.7,
edgecolor=_GRID, linewidth=0.5, width=0.6)
for bar, val in zip(bars, vals):
ax1.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + max(vals) * 0.01,
f"${val/10000:.0f}", ha="center", fontsize=8, color=_TEXT)
ax2.plot(labels, mom, color=_YELLOW, linewidth=2.5, marker="D",
markersize=6, label="MoM 成長率", zorder=5)
ax2.axhline(0, color=_SUBTEXT, linewidth=0.8, linestyle="--")
for x, y in zip(labels, mom):
ax2.annotate(f"{y:+.1f}%", (x, y), textcoords="offset points",
xytext=(0, 8), ha="center", fontsize=8,
color=_GREEN if y >= 0 else _ACCENT1)
ax1.set_title(f"月度業績概覽 · 近 {months} 個月",
fontsize=12, fontweight="bold", color=_TEXT, pad=10)
ax1.set_ylabel("月業績 (NT$)", color=_TEXT)
ax2.set_ylabel("MoM 成長率 (%)", color=_YELLOW)
ax1.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
lambda x, _: f"${x/10000:.0f}"))
ax2.tick_params(axis="y", colors=_YELLOW)
ax1.grid(True, axis="y", alpha=0.3)
ax1.tick_params(axis="x", rotation=20)
_add_watermark(ax1)
fig.tight_layout()
return _fig_to_bytes(fig)
def price_gap_bar_chart(threats: List[Dict], title: str = "競價威脅 TOP 10") -> Optional[bytes]:
"""競價威脅價差橫條圖"""
if not _MPL_OK or not threats:
return _unavailable()
_apply_style()
n = min(len(threats), 10)
names = [str(t.get("name", t.get("sku", "")))[:18] for t in threats[:n]][::-1]
gaps = [float(t.get("gap_pct", 0)) for t in threats[:n]][::-1]
fig, ax = plt.subplots(figsize=(10, max(4, n * 0.55)))
colors = [_ACCENT1 if g > 15 else _YELLOW if g > 5 else _GREEN for g in gaps]
bars = ax.barh(names, gaps, color=colors, alpha=0.85, edgecolor=_GRID, linewidth=0.5)
for bar, val in zip(bars, gaps):
ax.text(bar.get_width() + 0.2, bar.get_y() + bar.get_height() / 2,
f"{val:+.1f}%", va="center", fontsize=9, color=_TEXT)
ax.axvline(0, color=_SUBTEXT, linewidth=1)
ax.axvline(5, color=_YELLOW, linewidth=0.8, linestyle=":", alpha=0.6)
ax.axvline(15, color=_ACCENT1, linewidth=0.8, linestyle=":", alpha=0.6)
ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10)
ax.set_xlabel("MOMO 相對競品價差 (%) ↑我貴 / ↓我便宜", color=_TEXT)
ax.grid(True, axis="x", alpha=0.4)
_add_watermark(ax)
fig.tight_layout()
return _fig_to_bytes(fig)
def price_history_heatmap(days: int = 30) -> Optional[bytes]:
"""
品類 × 日期 價差熱力圖(月報用)
顯示哪個品類在哪幾天的競品威脅最嚴重
"""
if not _MPL_OK:
return _unavailable()
try:
from database.manager import get_session
from sqlalchemy import text
session = get_session()
try:
rows = session.execute(text(f"""
SELECT p.category,
cp.crawled_at::date AS dt,
AVG((cp.price - pr.price) / NULLIF(pr.price, 0) * 100) AS gap_pct
FROM competitor_prices cp
JOIN products p ON p.i_code = cp.sku
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
WHERE cp.crawled_at >= NOW() - INTERVAL '{days} days'
AND p.category IS NOT NULL
GROUP BY p.category, dt
ORDER BY p.category, dt
""")).fetchall()
finally:
session.close()
if not rows:
return None
import pandas as pd
df = pd.DataFrame(rows, columns=["category", "date", "gap_pct"])
pivot = df.pivot(index="category", columns="date", values="gap_pct").fillna(0)
_apply_style()
fig, ax = plt.subplots(figsize=(min(16, max(8, len(pivot.columns) * 0.5)),
max(4, len(pivot) * 0.6)))
im = ax.imshow(pivot.values, cmap="RdYlGn_r", aspect="auto",
vmin=-20, vmax=20)
plt.colorbar(im, ax=ax, label="價差 % (+我貴/-我便宜)", shrink=0.6)
ax.set_xticks(range(len(pivot.columns)))
ax.set_xticklabels([str(c)[-5:] for c in pivot.columns],
rotation=45, ha="right", fontsize=7)
ax.set_yticks(range(len(pivot.index)))
ax.set_yticklabels(pivot.index, fontsize=9)
ax.set_title(f"品類競品價差熱力圖 · 近 {days}",
fontsize=12, fontweight="bold", color=_TEXT, pad=10)
_add_watermark(ax)
fig.tight_layout()
return _fig_to_bytes(fig)
except Exception as e:
logger.warning("[Chart] heatmap 生成失敗: %s", e)
return None
def _add_watermark(ax):
ax.text(0.99, 0.01, "EwoooC AI · momo-pro-system",
transform=ax.transAxes, ha="right", va="bottom",
fontsize=7, color=_SUBTEXT, alpha=0.5)
# ── 模組單例(供外部直接 import ────────────────────────────────────────────
import matplotlib.ticker

View File

@@ -41,7 +41,9 @@ TAIPEI_TZ_OFFSET = 8 # UTC+8
__all__ = [
"SSHJumpExecutor",
"generate_daily_report",
"generate_weekly_strategy_report",
"generate_monthly_report",
"generate_meta_analysis_report",
]
@@ -518,6 +520,370 @@ TOP 威脅品項近48h Hermes 偵測):
}
def generate_daily_report() -> dict:
"""
OpenClaw 電商日報(每日 09:00
流程:
1. 讀取昨日業績快照 + TOP 競品威脅 + 定價建議
2. Gemini 快速日報分析(溫度 0.3,精簡版)
3. 生成圖表近7日營收趨勢 + 競品價差柱圖
4. 持久化 ai_insightstype='daily_report'
5. Telegram 圖文推播
"""
now = datetime.now()
yesterday = now - timedelta(days=1)
period = yesterday.strftime("%Y年%m月%d")
logger.info("[OpenClaw] 日報任務啟動 period=%s", period)
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(7)
threats = _fetch_top_threats(5)
recommendations = _fetch_top_recommendations(5)
competitor_summary = _fetch_competitor_summary()
# 昨日單日業績
yesterday_sales = _fetch_yesterday_sales()
# ── Step 2組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw 日報分析師,負責每日電商業績快報。
語言:繁體中文(台灣用語)。風格:精簡、數字導向、可執行。
每個洞察必須有數字支撐,禁止空泛描述。"""
user_prompt = f"""請根據以下數據,產出今日電商日報({period}
【昨日業績】
銷售金額NT${yesterday_sales.get('revenue', 0):,.0f}
成交SKU數{yesterday_sales.get('sku_count', 0)}
訂單數:{yesterday_sales.get('order_count', 0)}
【近7日趨勢】
本週累計NT${sales.get('current_7d_revenue', 0):,.0f}
前週同期NT${sales.get('prev_7d_revenue', 0):,.0f}
WoW變化{sales.get('wow_pct', 0):+.1f}%
【競品警示近24h Hermes偵測
{_format_threats(threats)}
【待處理定價建議TOP 5
{_format_recommendations(recommendations)}
【競品整體概況】
監控SKU{competitor_summary.get('total_skus', 0)}
被削價風險:{competitor_summary.get('undercut_count', 0)}價差超過10%
平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
請按以下結構輸出(使用 HTML <b> 標題):
<b>📅 {period} 電商日報</b>
<b>📊 昨日業績快報</b>
(昨日關鍵數字 + 與近期均值比較 + 異常點說明)
<b>⚠️ 今日最高優先威脅</b>
(最緊急的 1-3 個競品削價威脅,含具體行動建議)
<b>💰 今日定價行動建議</b>
1-3 條今日應執行的調價動作格式SKU → 建議行動 → 預期效果)
<b>🎯 今日 3 件事</b>
(最重要的 3 件可執行任務24h內完成
語言繁體中文全文200字以內精準扼要。
"""
# ── Step 3Gemini 生成 ───────────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Gemini 生成日報...")
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.3)
if not report_content:
logger.error("[OpenClaw] 日報 Gemini 呼叫失敗")
return {"status": "error", "report_type": "daily_report", "error": "Gemini 呼叫失敗"}
# ── Step 4生成圖表 ─────────────────────────────────────────────────────
charts = []
try:
from services.chart_generator_service import (
revenue_trend_chart,
price_gap_bar_chart,
)
rev_chart = revenue_trend_chart(7, "近7日")
if rev_chart:
charts.append(("revenue_7d.png", rev_chart, "📈 近7日營收趨勢"))
if threats:
gap_chart = price_gap_bar_chart(threats, "競品價差警示TOP 5")
if gap_chart:
charts.append(("price_gap.png", gap_chart, "⚠️ 競品價差分析"))
except Exception as e:
logger.warning("[OpenClaw] 日報圖表生成失敗(非阻塞): %s", e)
# ── Step 5持久化 DB ────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"yesterday_revenue": yesterday_sales.get("revenue", 0),
"wow_pct": sales.get("wow_pct", 0),
"threat_count": len(threats),
"chart_count": len(charts),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="daily_report",
content=report_content,
confidence=0.85,
metadata=metadata,
period=yesterday.strftime("%Y-%m-%d"),
)
action_items = _extract_action_items_daily(report_content)
_save_action_items(action_items, insight_id)
# ── Step 6Telegram 推播(圖文)────────────────────────────────────────
try:
from services.telegram_templates import (
daily_report_header,
send_report_with_charts,
_get_chat_ids,
)
header = daily_report_header(
date_str=period,
revenue=yesterday_sales.get("revenue", 0),
wow=sales.get("wow_pct", 0),
threat_count=len(threats),
opportunity_count=competitor_summary.get("premium_count", 0),
)
full_msg = header + "\n\n" + report_content
if charts:
send_report_with_charts(full_msg, charts, _get_chat_ids())
else:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(full_msg)
except Exception as e:
logger.error("[OpenClaw] 日報 Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] 日報完成 insight_id=%s charts=%d", insight_id, len(charts))
return {
"status": "ok",
"report_type": "daily_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
}
def generate_monthly_report() -> dict:
"""
OpenClaw 電商月報每月1日 07:00
流程:
1. 讀取上月完整業績 + 品類分佈 + 競品趨勢
2. MCP 收集月度外部情報
3. Gemini 深度月度分析(完整版)
4. 生成圖表:月度概覽 + 品類橫條 + 價格熱圖
5. 持久化 ai_insightstype='monthly_report'
6. Telegram 圖文推播
"""
now = datetime.now()
# 上個月
first_of_this_month = now.replace(day=1)
last_month_end = first_of_this_month - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
period = last_month_end.strftime("%Y年%m月")
logger.info("[OpenClaw] 月報任務啟動 period=%s", period)
# ── Step 1DB 數據收集(上月完整數據)─────────────────────────────────
days_in_month = (first_of_this_month - last_month_start).days
sales = _fetch_monthly_sales_summary(last_month_start, last_month_end)
categories = _fetch_category_breakdown(days_in_month)
threats = _fetch_top_threats(10)
competitor_summary = _fetch_competitor_summary()
price_trend_data = _fetch_price_trend_summary(days_in_month)
# ── Step 2MCP 外部情報(月度版)───────────────────────────────────────
mcp_data: Dict[str, str] = {}
try:
from services.mcp_collector_service import mcp_collector
mcp_data = mcp_collector.collect_all()
holiday_ctx = mcp_collector.get_holiday_context()
seasonal_ctx = mcp_collector.get_seasonal_context()
except Exception as e:
logger.warning("[OpenClaw] 月報 MCP 收集失敗(非阻塞): %s", e)
holiday_ctx = ""
seasonal_ctx = ""
# ── Step 3組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw 月報首席分析師,負責 momo 平台電商月度深度報告。
語言繁體中文台灣用語。格式HTML標題 + 條列式數據。
每個洞察必須有月度數字支撐,並與上月/去年同期比較。
重點:月度趨勢、品類策略、定價最佳化、下月行動計畫。"""
db_section = f"""
{period} 業績總覽】
月營收NT${sales.get('revenue', 0):,.0f}
MoM 變化:{sales.get('mom_pct', 0):+.1f}%
YoY 變化:{sales.get('yoy_pct', 0):+.1f}%
活躍SKU數{sales.get('sku_count', 0)}
平均客單價NT${sales.get('avg_order_value', 0):,.0f}
【品類業績分佈TOP 10
{_format_categories(categories)}
【競品整體概況】
監控SKU{competitor_summary.get('total_skus', 0)}
月均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
被削價風險SKU{competitor_summary.get('undercut_count', 0)}
【價格變動概況】
本月調價次數:{price_trend_data.get('price_changes', 0)}
平均調幅:{price_trend_data.get('avg_change_pct', 0):+.1f}%
主動降價SKU{price_trend_data.get('price_cuts', 0)}
主動提價SKU{price_trend_data.get('price_raises', 0)}
"""
mcp_section = f"""
【MCP 外部情報(月度)】
市場趨勢:
{mcp_data.get('market_trends', '(未取得)')[:600]}
競品動態:
{mcp_data.get('competitor_intel', '(未取得)')[:500]}
下月節日行事曆:
{holiday_ctx}
{mcp_data.get('holiday_calendar', '')[:400]}
季節性洞察:
{seasonal_ctx}
{mcp_data.get('seasonal_insights', '')[:400]}
"""
user_prompt = f"""請根據以下數據,產出 {period} 電商月度策略報告:
{db_section}
{mcp_section}
請按以下結構輸出(使用 HTML <b> 標題,詳細分析):
<b>📅 {period} 電商月度報告</b>
<b>📊 月度業績總結</b>
(月營收 + MoM/YoY 變化 + 超出/低於預期分析 + 關鍵驅動因素)
<b>🏆 本月品類贏家 vs 輸家</b>
成長最快3個品類 vs 衰退最嚴重3個品類含原因分析
<b>💰 本月定價策略回顧</b>
(調價效果評估 + 最佳定價案例 + 失誤案例 + 改進建議)
<b>⚔️ 競品月度分析</b>
(主要競爭對手動態 + 我方優劣勢 + 市場份額評估)
<b>📢 行銷效益評估</b>
(本月活動效果 + ROI 估算 + 最佳行銷時機分析)
<b>🔮 下月趨勢預測</b>
(季節性機會 + 節日活動規劃 + 風險預警 + 庫存建議)
<b>🎯 下月優先行動計畫</b>
8-10條具體可執行任務含時間節點和負責方向
格式:[週次/日期] 行動說明 → 預期效益)
<b>📈 Q{((now.month-1)//3)+1} 策略展望</b>
(季度目標設定 + 關鍵里程碑 + 需人工決策事項)
語言:繁體中文,數據必須引用上方提供的實際數字。
"""
# ── Step 4Gemini 生成 ───────────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Gemini 生成月報...")
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.35)
if not report_content:
logger.error("[OpenClaw] 月報 Gemini 呼叫失敗")
return {"status": "error", "report_type": "monthly_report", "error": "Gemini 呼叫失敗"}
# ── Step 5生成圖表 ─────────────────────────────────────────────────────
charts = []
try:
from services.chart_generator_service import (
monthly_overview_chart,
category_revenue_chart,
price_history_heatmap,
)
overview_chart = monthly_overview_chart(6)
if overview_chart:
charts.append(("monthly_overview.png", overview_chart, f"📊 近6個月營收趨勢"))
cat_chart = category_revenue_chart(days_in_month, period)
if cat_chart:
charts.append(("category_revenue.png", cat_chart, "🏆 品類業績分佈"))
heatmap = price_history_heatmap(days_in_month)
if heatmap:
charts.append(("price_heatmap.png", heatmap, "🔥 品類價格熱圖"))
except Exception as e:
logger.warning("[OpenClaw] 月報圖表生成失敗(非阻塞): %s", e)
# ── Step 6持久化 DB ────────────────────────────────────────────────────
action_items = _extract_action_items(report_content)
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"monthly_revenue": sales.get("revenue", 0),
"mom_pct": sales.get("mom_pct", 0),
"yoy_pct": sales.get("yoy_pct", 0),
"category_count": len(categories),
"chart_count": len(charts),
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="monthly_report",
content=report_content,
confidence=0.90,
metadata=metadata,
period=last_month_end.strftime("%Y-%m"),
)
_save_action_items(action_items, insight_id)
# ── Step 7Telegram 推播(圖文)────────────────────────────────────────
try:
from services.telegram_templates import (
monthly_report_header,
send_report_with_charts,
_get_chat_ids,
)
top3 = [c.get("category", "N/A") for c in categories[:3]] or ["N/A"]
header = monthly_report_header(
month_str=period,
revenue=sales.get("revenue", 0),
mom=sales.get("mom_pct", 0),
yoy=sales.get("yoy_pct", 0),
top3_categories=top3,
)
full_msg = header + "\n\n" + report_content
if charts:
send_report_with_charts(full_msg, charts, _get_chat_ids())
else:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(full_msg)
except Exception as e:
logger.error("[OpenClaw] 月報 Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] 月報完成 insight_id=%s charts=%d actions=%d",
insight_id, len(charts), len(action_items))
return {
"status": "ok",
"report_type": "monthly_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
}
def generate_meta_analysis_report() -> str:
"""
AI 系統效能自我審視(每 6 小時 run_openclaw_meta_analysis_task 呼叫)
@@ -660,6 +1026,141 @@ def generate_meta_analysis_report() -> str:
# 輔助格式化函式
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_yesterday_sales() -> Dict[str, Any]:
"""昨日單日業績"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count
FROM daily_sales_snapshot
WHERE snapshot_date::date = CURRENT_DATE - 1
""")).fetchone()
if row:
return {
"revenue": float(row[0] or 0),
"sku_count": int(row[1] or 0),
"order_count": int(row[2] or 0),
}
return {"revenue": 0, "sku_count": 0, "order_count": 0}
except Exception as e:
logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e)
return {"revenue": 0, "sku_count": 0, "order_count": 0}
finally:
session.close()
def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""上月業績彙總,含 MoM / YoY 比較"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count,
AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
revenue = float(row[0] or 0) if row else 0
sku_count = int(row[1] or 0) if row else 0
avg_order_value = float(row[3] or 0) if row else 0
# 上上月MoM
prev_start = (start_date - timedelta(days=1)).replace(day=1)
prev_end = start_date - timedelta(days=1)
prev_row = session.execute(text("""
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
prev_revenue = float(prev_row[0] or 0) if prev_row else 0
mom_pct = ((revenue - prev_revenue) / prev_revenue * 100) if prev_revenue else 0
# 去年同月YoY
yoy_start = start_date.replace(year=start_date.year - 1)
yoy_end = end_date.replace(year=end_date.year - 1)
yoy_row = session.execute(text("""
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
yoy_revenue = float(yoy_row[0] or 0) if yoy_row else 0
yoy_pct = ((revenue - yoy_revenue) / yoy_revenue * 100) if yoy_revenue else 0
return {
"revenue": revenue,
"sku_count": sku_count,
"order_count": int(row[2] or 0) if row else 0,
"avg_order_value": avg_order_value,
"mom_pct": round(mom_pct, 1),
"yoy_pct": round(yoy_pct, 1),
}
except Exception as e:
logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e)
return {"revenue": 0, "sku_count": 0, "order_count": 0,
"avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0}
finally:
session.close()
def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]:
"""近N天價格異動統計"""
session = get_session()
try:
row = session.execute(text(f"""
SELECT
COUNT(*) AS total_changes,
AVG(ABS(pr2.price - pr1.price) / pr1.price * 100) AS avg_change_pct,
SUM(CASE WHEN pr2.price < pr1.price THEN 1 ELSE 0 END) AS price_cuts,
SUM(CASE WHEN pr2.price > pr1.price THEN 1 ELSE 0 END) AS price_raises
FROM price_records pr2
JOIN price_records pr1 ON pr1.product_id = pr2.product_id
AND pr1.timestamp = (
SELECT MAX(timestamp) FROM price_records
WHERE product_id = pr2.product_id
AND timestamp < pr2.timestamp - INTERVAL '1 day'
)
WHERE pr2.timestamp >= NOW() - INTERVAL '{days} days'
AND ABS(pr2.price - pr1.price) / pr1.price > 0.005
""")).fetchone()
if row and row[0]:
return {
"price_changes": int(row[0]),
"avg_change_pct": round(float(row[1] or 0), 1),
"price_cuts": int(row[2] or 0),
"price_raises": int(row[3] or 0),
}
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
except Exception as e:
logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
finally:
session.close()
def _extract_action_items_daily(report_text: str) -> List[str]:
"""從日報文字中解析「今日3件事」行動項目"""
lines = report_text.split("\n")
items = []
in_action_section = False
for line in lines:
if "今日" in line and ("3件" in line or "行動" in line or "優先" in line):
in_action_section = True
continue
if in_action_section:
stripped = line.strip()
if stripped.startswith(("", "-", "1.", "2.", "3.", "", "", "")):
items.append(stripped.lstrip("•-①②③").strip().lstrip("123.").strip())
elif stripped.startswith("<b>") and items:
break
return items[:5]
def _format_threats(threats: List[Dict]) -> str:
if not threats:
return " (無近期競價威脅)"

View File

@@ -1,42 +1,62 @@
"""
services/telegram_templates.py
Telegram 訊息模板系統 v2
═══ 訊息分類 ═══════════════════════════════════════════════
🚨 告警類 — price_alert_msg / threat_alert_msg / system_alert_msg
📊 報告類 — daily_report_msg / weekly_report_msg / monthly_report_msg
💰 決策類 — price_decision / batch_decision_msg
🤖 系統類 — deploy_msg / heal_msg / meta_analysis_msg
💡 洞察類 — triaged_alert / insight_summary_msg
═══════════════════════════════════════════════════════════
規範:
1. 所有模板使用 HTML parse_mode
2. 一律繁體中文Agent 名稱保留英文Hermes/NemoTron/OpenClaw/EA
3. 每則訊息須含:標題行 / 核心數據 / 建議行動(三段式)
4. 圖文訊息使用 send_photo_with_caption附說明文字
"""
import io
import json
import logging
from typing import Any, Dict, Optional
from database.manager import get_session
from database.trend_models import TelegramUser
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
sys_log = logging.getLogger("TelegramTpl")
# ─── 常數 ────────────────────────────────────────────────
TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS"
# ─── 工具:取得 Token 與 Chat ID容錯 ─────────────────
# ══════════════════════════════════════════════════════════════════════════════
# 基礎工具
# ══════════════════════════════════════════════════════════════════════════════
def _get_bot_token() -> Optional[str]:
from dotenv import load_dotenv
load_dotenv()
import os
return os.getenv(TELEGRAM_BOT_TOKEN_ENV)
def _get_chat_ids() -> list:
token = _get_bot_token()
if not token:
sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV)
return []
raw = __import__("os").getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
raw = os.getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
try:
return json.loads(raw)
except json.JSONDecodeError:
sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV)
return []
# ─── 原始發送(內部使用) ─────────────────────────────────
def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: str = "HTML") -> bool:
"""發送純文字訊息"""
import requests
token = _get_bot_token()
if not token:
@@ -47,11 +67,7 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
chat_ids = [-1003940688311] # fallback
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {
"chat_id": chat_ids[0],
"text": text,
"parse_mode": parse_mode,
}
payload = {"chat_id": chat_ids[0], "text": text, "parse_mode": parse_mode}
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
@@ -64,125 +80,389 @@ def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
sys_log.error("[TelegramTpl] send 失敗: %s", e)
return False
# ─── 公用模板 ─────────────────────────────────────────────
def alert(title: str, content: str, actions: Optional[list] = None) -> str:
"""高危險警報(紅色)"""
msg = f"<b>🚨 {title}</b>\n\n{content}"
if actions:
msg += "\n\n" + "\n".join(f"{a}" for a in actions)
return msg
def send_photo(photo_bytes: bytes, caption: str = "",
chat_ids: Optional[list] = None,
parse_mode: str = "HTML") -> bool:
"""發送圖片(含說明文字),供圖表報告使用"""
import requests
token = _get_bot_token()
if not token or not photo_bytes:
return False
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311]
def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
"""中風險警告(橙色)"""
msg = f"<b>⚠️ {title}</b>\n\n{summary}"
if details:
msg += "\n\n<b>細節:</b>\n" + "\n".join(f"{k}: {v}" for k, v in details.items())
return msg
url = f"https://api.telegram.org/bot{token}/sendPhoto"
try:
r = requests.post(
url,
data={"chat_id": chat_ids[0], "caption": caption[:1024],
"parse_mode": parse_mode},
files={"photo": ("chart.png", photo_bytes, "image/png")},
timeout=30,
)
if not r.ok:
sys_log.warning("[TelegramTpl] sendPhoto HTTP %s: %s", r.status_code, r.text[:200])
return False
return True
except Exception as e:
sys_log.error("[TelegramTpl] sendPhoto 失敗: %s", e)
return False
def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
"""普通信息(藍色)"""
t_str = f" · {time}" if time else ""
return f"<b>📊 {title}</b> [{module}]{t_str}\n\n{content}"
def success(title: str, module: str, stats: str = "") -> str:
"""成功通知(綠色)"""
return f"<b>✅ {title}</b> [{module}]\n{stats}"
def send_report_with_charts(text_msg: str, charts: List[Optional[bytes]],
chat_ids: Optional[list] = None) -> bool:
"""先發文字報告,再逐張發送圖表"""
ok = _send_telegram_raw(text_msg, chat_ids=chat_ids)
for i, chart in enumerate(charts):
if chart:
send_photo(chart, caption=f"圖表 {i+1}/{len(charts)}", chat_ids=chat_ids)
return ok
def price_decision(
product_name: str,
product_sku: str,
current_price: float,
suggested_price: float,
reason: str,
insight_id: Optional[int] = None,
) -> tuple:
# ══════════════════════════════════════════════════════════════════════════════
# 🚨 告警類模板
# ══════════════════════════════════════════════════════════════════════════════
def price_alert_msg(threats: List[Dict], analysis_period: str = "近48h") -> str:
"""
降價決策通知(含 Inline Keyboard
回傳 (message_text, reply_markup)
競品削價告警Hermes + NemoTron 偵測結果)
┌─────────────────────────────────┐
│ 🚨 競品削價告警 · N 個 SKU │
│ ─────────────────────────────── │
│ ⚠️ SKU名稱 MOMO $X → 競品 $Y │
│ 價差 +X% 業績跌幅 -Y%
└─────────────────────────────────┘
"""
n = len(threats)
high = [t for t in threats if t.get("risk") == "HIGH"]
lines = [
f"🚨 <b>競品削價告警</b> · 偵測到 <b>{n}</b> 個 SKU [{analysis_period}]",
f"🔴 高危:{len(high)} 個  🟡 中危:{n - len(high)}",
"" * 32,
]
for t in threats[:8]:
risk_icon = "🔴" if t.get("risk") == "HIGH" else "🟡"
name = str(t.get("name", t.get("sku", "")))[:22]
gap = float(t.get("gap_pct", 0))
delta = float(t.get("sales_7d_delta_pct", t.get("sales_delta", 0)))
momo_p = t.get("momo_price")
pchome_p = t.get("pchome_price")
price_str = f" MOMO <b>${momo_p:,.0f}</b> vs 競品 <b>${pchome_p:,.0f}</b>" \
if momo_p and pchome_p else ""
lines.append(
f"{risk_icon} <b>{name}</b>\n"
f" 價差 <b>{gap:+.1f}%</b> 業績週跌 <b>{delta:+.1f}%</b>{price_str}"
)
if n > 8:
lines.append(f"<i>…另有 {n-8} 個 SKU查看完整報告</i>")
lines += ["" * 32,
"💡 <b>建議:</b>優先處理紅色高危品項,確認競品是否促銷或長期低價"]
return "\n".join(lines)
def threat_alert_msg(sku: str, name: str, momo_price: float,
comp_price: float, gap_pct: float,
sales_delta: float, action: str, confidence: float) -> str:
"""單品威脅告警NemoTron 派發)"""
risk = "🔴 高危" if gap_pct > 15 else "🟡 中危" if gap_pct > 5 else "🟢 低危"
return (
f"{risk} <b>競品威脅通報</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🏷️ <b>{name}</b> <code>{sku}</code>\n\n"
f"💴 MOMO  <b>NT${momo_price:,.0f}</b>\n"
f"💴 競品PChome<b>NT${comp_price:,.0f}</b>\n"
f"📉 價差  <b>{gap_pct:+.1f}%</b>(我方較貴)\n"
f"📊 業績週變 <b>{sales_delta:+.1f}%</b>\n\n"
f"🤖 <b>AI 建議:</b>{action}\n"
f"📈 信心度:{confidence:.0%}\n"
f"━━━━━━━━━━━━━━━━━━━━"
)
def system_alert_msg(level: str, title: str, detail: str, source: str = "") -> str:
"""系統級告警AIOps / 部署失敗等)"""
icons = {"critical": "🚨", "error": "", "warning": "⚠️", "info": ""}
icon = icons.get(level, "⚠️")
src = f" [{source}]" if source else ""
return (
f"{icon} <b>{title}</b>{src}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"{detail[:600]}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
)
# ══════════════════════════════════════════════════════════════════════════════
# 📊 報告類模板(三種週期)
# ══════════════════════════════════════════════════════════════════════════════
def daily_report_header(date_str: str, revenue: float, wow: float,
threat_count: int, opportunity_count: int) -> str:
"""
日報標題卡(附圖前發送的文字說明)
"""
wow_icon = "📈" if wow >= 0 else "📉"
wow_color = "+" if wow >= 0 else ""
return (
f"📊 <b>EwoooC 電商日報</b> · {date_str}\n"
f"══════════════════════════\n"
f"💰 今日業績  <b>NT${revenue/10000:.1f} 萬</b>\n"
f"{wow_icon} 週同比   <b>{wow_color}{wow:.1f}%</b>\n"
f"🔴 競品威脅  <b>{threat_count}</b> 個 SKU\n"
f"🟢 市場機會  <b>{opportunity_count}</b> 個 SKU\n"
f"══════════════════════════\n"
f"🤖 <i>Hermes + NemoTron + OpenClaw 聯合分析</i>"
)
def weekly_report_header(period: str, curr_rev: float, prev_rev: float,
wow: float, top_category: str) -> str:
"""週報標題卡"""
wow_icon = "📈" if wow >= 0 else "📉"
return (
f"📊 <b>EwoooC 電商週報</b> · {period}\n"
f"══════════════════════════\n"
f"💰 本週業績  <b>NT${curr_rev/10000:.1f} 萬</b>\n"
f"📦 前週業績  <b>NT${prev_rev/10000:.1f} 萬</b>\n"
f"{wow_icon} 週成長率  <b>{wow:+.1f}%</b>\n"
f"🏆 最強品類  <b>{top_category}</b>\n"
f"══════════════════════════\n"
f"🤖 <i>OpenClaw × MCP 全景策略分析</i>"
)
def monthly_report_header(month_str: str, revenue: float, mom: float,
yoy: float, top3_categories: List[str]) -> str:
"""月報標題卡"""
mom_icon = "📈" if mom >= 0 else "📉"
yoy_icon = "🚀" if yoy >= 10 else "📈" if yoy >= 0 else "📉"
cats = " / ".join(top3_categories[:3])
return (
f"📅 <b>EwoooC 電商月報</b> · {month_str}\n"
f"══════════════════════════\n"
f"💰 月度業績  <b>NT${revenue/10000:.0f} 萬</b>\n"
f"{mom_icon} 月成長率  <b>{mom:+.1f}%</b> {yoy_icon} 年成長 <b>{yoy:+.1f}%</b>\n"
f"🏆 TOP 3 品類 <b>{cats}</b>\n"
f"══════════════════════════\n"
f"🤖 <i>OpenClaw × Hermes × MCP 月度全景洞察</i>"
)
def report_section(icon: str, title: str, lines: List[str]) -> str:
"""通用報告節段(供日/週/月報各節使用)"""
body = "\n".join(f" {l}" for l in lines)
return f"\n{icon} <b>{title}</b>\n{body}"
# ══════════════════════════════════════════════════════════════════════════════
# 💰 決策類模板
# ══════════════════════════════════════════════════════════════════════════════
def price_decision(product_name: str, product_sku: str,
current_price: float, suggested_price: float,
reason: str, insight_id: Optional[int] = None) -> tuple:
"""降 / 提價決策通知(含 Inline Keyboard"""
diff = current_price - suggested_price
if diff > 0:
action_text = f"價 ${diff:,.0f}"
elif diff < 0:
action_text = f"提價 ${-diff:,.0f}"
else:
action_text = "維持"
action_text = f"降價 NT${diff:,.0f}" if diff > 0 else \
f"NT${-diff:,.0f}" if diff < 0 else "維持現價"
direction = "📉" if diff > 0 else "📈" if diff < 0 else "➡️"
message = (
f"<b>💰 自動降價建議</b>\n"
f"商品:{product_name} (SKU: {product_sku})\n"
f"現價:${current_price:,.0f} → 建議:${suggested_price:,.0f}\n"
f"原因:{reason}\n"
f"💰 <b>AI 定價決策建議</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🏷️ <b>{product_name}</b> <code>{product_sku}</code>\n\n"
f"現價:<b>NT${current_price:,.0f}</b>\n"
f"建議:<b>NT${suggested_price:,.0f}</b> {direction} {action_text}\n\n"
f"💡 <b>依據:</b>{reason}\n"
)
if insight_id:
message += f"洞察 ID{insight_id}\n"
message += f"🔗 洞察 ID<code>{insight_id}</code>\n"
message += f"━━━━━━━━━━━━━━━━━━━━"
keyboard = {
"inline_keyboard": [
[
{"text": "✅ 確認執行", "callback_data": f"price_decision:approve:{product_sku}"},
{"text": "❌ 拒絕", "callback_data": f"price_decision:reject:{product_sku}"},
],
[
{"text": "📊 查看洞察", "url": f"https://your-dashboard.example/insight/{insight_id}" if insight_id else "#"},
],
]
}
keyboard = {"inline_keyboard": [[
{"text": "✅ 確認執行", "callback_data": f"momo:price_decision:approve:{product_sku}"},
{"text": "❌ 拒絕", "callback_data": f"momo:price_decision:reject:{product_sku}"},
]]}
return message, keyboard
def triaged_alert(
base_event: Dict[str, Any],
tier_label: str,
ai_summary: str,
ai_cause: Optional[str] = None,
ai_actions: Optional[list] = None,
ai_executed: Optional[list] = None,
) -> str:
"""
L1/L2 整合通知(帶 AI 摘要與可執行動作)。
"""
msg = (
f"<b>⚡ {tier_label} · {base_event.get('event_type', 'alert')}</b>\n"
f"📌 <code>{base_event.get('title')}</code>\n\n"
def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple:
"""批次定價決策(多 SKU 一次確認)"""
lines = [f"💰 <b>批次定價決策</b> · 共 {len(items)} 個 SKU\n━━━━━━━━━━━━━━━━━━━━"]
for i, item in enumerate(items[:6], 1):
diff = item.get("current_price", 0) - item.get("suggested_price", 0)
direction = "📉" if diff > 0 else "📈"
lines.append(
f"{i}. {direction} <b>{item.get('name','')[:20]}</b>\n"
f" ${item.get('current_price',0):,.0f} → <b>${item.get('suggested_price',0):,.0f}</b>"
)
if len(items) > 6:
lines.append(f"<i>…另有 {len(items)-6} 個 SKU</i>")
lines.append("━━━━━━━━━━━━━━━━━━━━")
keyboard = {"inline_keyboard": [[
{"text": f"✅ 全部確認({len(items)}項)",
"callback_data": f"momo:batch_decision:approve:{batch_id}"},
{"text": "❌ 取消",
"callback_data": f"momo:batch_decision:reject:{batch_id}"},
]]}
return "\n".join(lines), keyboard
# ══════════════════════════════════════════════════════════════════════════════
# 🤖 系統類模板
# ══════════════════════════════════════════════════════════════════════════════
def deploy_msg(status: str, branch: str, commit: str, duration: str = "") -> str:
"""CI/CD 部署通知"""
icons = {"success": "", "failed": "", "started": "🚀"}
icon = icons.get(status, "")
dur = f" 耗時 {duration}" if duration else ""
return (
f"{icon} <b>部署{status.upper()}</b>{dur}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🌿 分支:<code>{branch}</code>\n"
f"🔖 Commit<code>{commit[:12]}</code>\n"
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
)
def heal_msg(status: str, error_type: str, target_file: str,
commit_sha: str = "", diff_lines: int = 0) -> str:
"""AiderHeal 自動修復通知"""
if status == "started":
icon, title = "🔧", "AiderHeal 啟動"
elif status == "success":
icon, title = "", "AiderHeal 修復完成"
elif status == "reverted":
icon, title = "🔄", "AiderHeal 自動回滾"
else:
icon, title = "", "AiderHeal 失敗"
lines = [f"{icon} <b>{title}</b>", "━━━━━━━━━━━━━━━━━━━━",
f"🐛 錯誤類型:<code>{error_type}</code>",
f"📄 目標檔案:<code>{target_file}</code>"]
if commit_sha:
lines.append(f"🔖 Commit<code>{commit_sha[:12]}</code>")
if diff_lines:
lines.append(f"📝 修改行數:{diff_lines}")
lines += ["━━━━━━━━━━━━━━━━━━━━",
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"]
return "\n".join(lines)
def meta_analysis_msg(period: str, content: str) -> str:
"""AI 系統自我審視報告"""
return (
f"🤖 <b>AI 系統效能自我審視</b> · {period}\n"
f"══════════════════════════\n"
f"{content[:1200]}\n"
f"══════════════════════════\n"
f"<i>由 OpenClaw 定期分析 · {datetime.now().strftime('%m/%d %H:%M')}</i>"
)
# ══════════════════════════════════════════════════════════════════════════════
# 💡 洞察類模板
# ══════════════════════════════════════════════════════════════════════════════
def triaged_alert(base_event: Dict[str, Any], tier_label: str,
ai_summary: str, ai_cause: Optional[str] = None,
ai_actions: Optional[list] = None,
ai_executed: Optional[list] = None) -> tuple:
"""EA L1/L2 自主執行通知(保留原有介面,升級排版)"""
event_type = base_event.get("event_type", "alert")
title = base_event.get("title", "")
summary = base_event.get("summary", "")
event_id = base_event.get("id", "unknown")
lines = [
f"⚡ <b>{tier_label} · {event_type}</b>",
f"📌 {title}",
"",
]
if summary:
msg += f"🔍 概要:{summary}\n\n"
lines += [f"🔍 <b>概要:</b>{summary}", ""]
if ai_summary:
msg += f"🧠 AI 摘要:{ai_summary}\n\n"
lines += [f"🧠 <b>AI 摘要:</b>{ai_summary[:400]}", ""]
if ai_cause:
msg += f"💡 可能原因:{ai_cause}\n\n"
lines += [f"💡 <b>可能原因:</b>{ai_cause}", ""]
if ai_actions:
msg += "<b>📋 建議行動:</b>\n" + "\n".join(f"{a}" for a in ai_actions) + "\n\n"
lines += ["<b>📋 建議行動:</b>"] + [f" {a}" for a in ai_actions] + [""]
if ai_executed:
msg += "<b>✅ 已執行:</b>\n" + "\n".join(f"{a}" for a in ai_executed) + "\n\n"
lines += ["<b>✅ 已執行:</b>"] + [f" {a}" for a in ai_executed] + [""]
trace = base_event.get("trace")
if trace:
msg += f"<pre>{trace[-500:]}</pre>"
lines.append(f"<pre>{trace[-400:]}</pre>")
# W2-D: momo: prefix 強制(共用 Bot 鐵律ADR-011
event_id = base_event.get("id", "unknown")
keyboard = {
"inline_keyboard": [
[{"text": "📊 查看详情", "url": f"https://dashboard.example/event/{event_id}"}],
[{"text": "🛑 忽略此事件", "callback_data": f"momo:event_ignore:{event_id}"}],
]
keyboard = {"inline_keyboard": [
[{"text": "🛑 忽略此事件",
"callback_data": f"momo:event_ignore:{event_id}"}],
]}
return "\n".join(lines), keyboard
def insight_summary_msg(insights: List[Dict], period: str = "近24h") -> str:
"""AI 洞察摘要彙整(供定期推播)"""
n = len(insights)
lines = [
f"💡 <b>AI 洞察摘要</b> · {period}{n}",
"━━━━━━━━━━━━━━━━━━━━",
]
type_icons = {
"price_alert": "🔴", "recommendation": "💰", "weekly_strategy": "📊",
"meta_analysis": "🤖", "market_opportunity": "🟢", "mcp_cache": "🌐",
}
return msg, keyboard
for ins in insights[:6]:
icon = type_icons.get(ins.get("insight_type", ""), "💡")
content_preview = str(ins.get("content", ""))[:80].replace("\n", " ")
conf = float(ins.get("confidence", 0))
lines.append(f"{icon} {content_preview}… <i>({conf:.0%})</i>")
if n > 6:
lines.append(f"<i>…另有 {n-6} 筆洞察</i>")
lines.append("━━━━━━━━━━━━━━━━━━━━")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════════
# 舊版相容介面(保留供現有程式碼調用)
# ══════════════════════════════════════════════════════════════════════════════
def alert(title: str, content: str, actions: Optional[list] = None) -> str:
msg = system_alert_msg("error", title, content)
if actions:
msg += "\n" + "\n".join(f"{a}" for a in actions)
return msg
def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
detail_str = "\n".join(f" {k}: {v}" for k, v in (details or {}).items())
return system_alert_msg("warning", title, summary + ("\n" + detail_str if detail_str else ""))
def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
t_str = f" · {time}" if time else ""
return f" <b>{title}</b> [{module}]{t_str}\n\n{content}"
def success(title: str, module: str, stats: str = "") -> str:
return f"✅ <b>{title}</b> [{module}]\n{stats}"
def report(title: str, report_type: str, period: str, content_md: str) -> str:
"""策略/週報模板"""
icons = {"weekly_strategy": "📊", "daily": "📅", "monthly": "📆", "meta_analysis": "🤖"}
icon = icons.get(report_type, "📋")
return (
f"<b>📊 {title}</b> ({report_type})\n"
f"期間:{period}\n\n"
f"{icon} <b>{title}</b> ({report_type})\n"
f"📅 期間:{period}\n"
f"══════════════════════════\n"
f"{content_md}"
)
def success(title: str, module: str, stats: str = "") -> str:
"""成功通知(綠色)"""
return f"<b>✅ {title}</b> [{module}]\n{stats}"
def _send_telegram(msg: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None) -> bool: