493 lines
20 KiB
Python
493 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
services/chart_generator_service.py
|
||
專業圖表生成服務
|
||
|
||
供日報 / 週報 / 月報 + Telegram 圖文通知使用。
|
||
所有圖表回傳 BytesIO(可直接傳 Telegram sendPhoto)。
|
||
|
||
圖表清單:
|
||
price_trend_chart(sku, days) — 單 SKU 30天 MOMO vs 競品價格趨勢
|
||
category_revenue_chart(days) — 品類業績橫條圖
|
||
revenue_trend_chart(days) — 每日業績折線圖(含 MoM 基準線)
|
||
price_gap_distribution_chart(threats) — 競價威脅價差分佈
|
||
monthly_overview_chart(months) — 月份業績對比柱狀圖
|
||
top_sku_heatmap(days) — TOP SKU 業績熱力圖
|
||
"""
|
||
|
||
import io
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Matplotlib 無頭模式初始化 ────────────────────────────────────────────────
|
||
try:
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
import matplotlib.dates as mdates
|
||
import numpy as np
|
||
_MPL_OK = True
|
||
except ImportError:
|
||
_MPL_OK = False
|
||
logger.warning("[Chart] matplotlib 未安裝,圖表功能停用")
|
||
|
||
# ── 視覺設定(深色商業風格) ──────────────────────────────────────────────────
|
||
_BG = "#1a1a2e"
|
||
_PANEL = "#16213e"
|
||
_ACCENT1 = "#e94560" # 紅:MOMO / 警示
|
||
_ACCENT2 = "#0f3460" # 深藍:競品
|
||
_ACCENT3 = "#533483" # 紫:歷史基準
|
||
_GREEN = "#00b4d8" # 青綠:業績正成長
|
||
_YELLOW = "#ffd166" # 黃:中性 / 警示
|
||
_TEXT = "#e0e0e0"
|
||
_SUBTEXT = "#888888"
|
||
_GRID = "#2a2a4a"
|
||
|
||
plt_params = {
|
||
"figure.facecolor": _BG,
|
||
"axes.facecolor": _PANEL,
|
||
"axes.edgecolor": _GRID,
|
||
"axes.labelcolor": _TEXT,
|
||
"xtick.color": _SUBTEXT,
|
||
"ytick.color": _SUBTEXT,
|
||
"text.color": _TEXT,
|
||
"grid.color": _GRID,
|
||
"grid.linestyle": "--",
|
||
"grid.alpha": 0.5,
|
||
"font.family": ["DejaVu Sans", "sans-serif"],
|
||
"font.size": 10,
|
||
}
|
||
|
||
|
||
def _apply_style():
|
||
if _MPL_OK:
|
||
plt.rcParams.update(plt_params)
|
||
|
||
|
||
def _fig_to_bytes(fig) -> Optional[bytes]:
|
||
buf = io.BytesIO()
|
||
fig.savefig(buf, format="png", dpi=150, bbox_inches="tight",
|
||
facecolor=fig.get_facecolor())
|
||
plt.close(fig)
|
||
buf.seek(0)
|
||
return buf.read()
|
||
|
||
|
||
def _unavailable() -> None:
|
||
return None
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# 資料讀取層(與 DB 解耦,各函式自行呼叫)
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def _fetch_price_history(sku: str, days: int = 30) -> Dict[str, Any]:
|
||
"""讀取單 SKU 的 MOMO + 競品歷史價格"""
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
session = get_session()
|
||
try:
|
||
momo_rows = session.execute(text("""
|
||
SELECT pr.timestamp::date AS dt, AVG(pr.price) AS price
|
||
FROM price_records pr
|
||
JOIN products p ON p.id = pr.product_id
|
||
WHERE p.i_code = :sku
|
||
AND pr.timestamp >= NOW() - INTERVAL ':days days'
|
||
GROUP BY dt ORDER BY dt
|
||
""".replace(":days", str(days))), {"sku": sku}).fetchall()
|
||
|
||
comp_rows = session.execute(text("""
|
||
SELECT crawled_at::date AS dt, AVG(price) AS price
|
||
FROM competitor_price_history
|
||
WHERE sku = :sku
|
||
AND source = 'pchome'
|
||
AND crawled_at >= NOW() - INTERVAL ':days days'
|
||
AND COALESCE(match_score, 0) >= 0.76
|
||
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
|
||
GROUP BY dt ORDER BY dt
|
||
""".replace(":days", str(days))), {"sku": sku}).fetchall()
|
||
|
||
return {
|
||
"momo": [(str(r[0]), float(r[1])) for r in momo_rows],
|
||
"pchome":[(str(r[0]), float(r[1])) for r in comp_rows],
|
||
}
|
||
finally:
|
||
session.close()
|
||
except Exception as e:
|
||
logger.warning("[Chart] price_history 讀取失敗: %s", e)
|
||
return {"momo": [], "pchome": []}
|
||
|
||
|
||
def _fetch_daily_revenue(days: int = 30) -> List[Dict]:
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(text(f"""
|
||
SELECT snapshot_date::date AS dt,
|
||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||
FROM daily_sales_snapshot
|
||
WHERE snapshot_date::date >= CURRENT_DATE - {days}
|
||
GROUP BY dt ORDER BY dt
|
||
""")).fetchall()
|
||
return [{"date": str(r[0]), "revenue": float(r[1] or 0)} for r in rows]
|
||
finally:
|
||
session.close()
|
||
except Exception as e:
|
||
logger.warning("[Chart] daily_revenue 讀取失敗: %s", e)
|
||
return []
|
||
|
||
|
||
def _fetch_category_revenue(days: int = 7) -> List[Dict]:
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(text(f"""
|
||
SELECT p.category,
|
||
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue
|
||
FROM daily_sales_snapshot s
|
||
JOIN products p ON p.name = s."商品名稱"
|
||
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
|
||
AND p.status = 'ACTIVE'
|
||
AND p.category IS NOT NULL
|
||
GROUP BY p.category
|
||
ORDER BY revenue DESC LIMIT 12
|
||
""")).fetchall()
|
||
return [{"category": r[0] or "未分類", "revenue": float(r[1] or 0)} for r in rows]
|
||
finally:
|
||
session.close()
|
||
except Exception as e:
|
||
logger.warning("[Chart] category_revenue 讀取失敗: %s", e)
|
||
return []
|
||
|
||
|
||
def _fetch_monthly_revenue(months: int = 6) -> List[Dict]:
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(text(f"""
|
||
SELECT DATE_TRUNC('month', snapshot_date)::date AS mon,
|
||
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||
FROM daily_sales_snapshot
|
||
WHERE snapshot_date >= NOW() - INTERVAL '{months} months'
|
||
GROUP BY mon ORDER BY mon
|
||
""")).fetchall()
|
||
return [{"month": str(r[0])[:7], "revenue": float(r[1] or 0)} for r in rows]
|
||
finally:
|
||
session.close()
|
||
except Exception as e:
|
||
logger.warning("[Chart] monthly_revenue 讀取失敗: %s", e)
|
||
return []
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
# 圖表函式
|
||
# ═══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def price_trend_chart(sku: str, sku_name: str = "", days: int = 30) -> Optional[bytes]:
|
||
"""單 SKU:MOMO vs PChome 價格趨勢折線圖"""
|
||
if not _MPL_OK:
|
||
return _unavailable()
|
||
data = _fetch_price_history(sku, days)
|
||
if not data["momo"]:
|
||
return None
|
||
|
||
_apply_style()
|
||
fig, ax = plt.subplots(figsize=(10, 4))
|
||
fig.patch.set_facecolor(_BG)
|
||
|
||
if data["momo"]:
|
||
dates = [r[0] for r in data["momo"]]
|
||
prices = [r[1] for r in data["momo"]]
|
||
ax.plot(dates, prices, color=_ACCENT1, linewidth=2.5, marker="o",
|
||
markersize=4, label="MOMO 自家", zorder=3)
|
||
ax.fill_between(dates, prices, alpha=0.15, color=_ACCENT1)
|
||
|
||
if data["pchome"]:
|
||
dates2 = [r[0] for r in data["pchome"]]
|
||
prices2 = [r[1] for r in data["pchome"]]
|
||
ax.plot(dates2, prices2, color=_GREEN, linewidth=2, linestyle="--",
|
||
marker="s", markersize=3, label="PChome", zorder=2)
|
||
|
||
title = f"{' ' + sku_name if sku_name else sku} · {days}日價格趨勢"
|
||
ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10)
|
||
ax.set_ylabel("價格 (NT$)", color=_TEXT)
|
||
ax.legend(loc="upper left", framealpha=0.3, facecolor=_PANEL, edgecolor=_GRID)
|
||
ax.grid(True, alpha=0.4)
|
||
ax.tick_params(axis="x", rotation=30, labelsize=8)
|
||
ax.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
|
||
lambda x, _: f"${x:,.0f}"))
|
||
|
||
# 標註最新價差
|
||
if data["momo"] and data["pchome"]:
|
||
m_last = data["momo"][-1][1]
|
||
p_last = data["pchome"][-1][1]
|
||
gap = (m_last - p_last) / m_last * 100
|
||
color = _ACCENT1 if gap > 0 else _GREEN
|
||
ax.annotate(f"現價差 {gap:+.1f}%", xy=(0.98, 0.05),
|
||
xycoords="axes fraction", ha="right",
|
||
fontsize=10, fontweight="bold", color=color,
|
||
bbox=dict(boxstyle="round,pad=0.3", facecolor=_PANEL, alpha=0.8))
|
||
|
||
_add_watermark(ax)
|
||
fig.tight_layout()
|
||
return _fig_to_bytes(fig)
|
||
|
||
|
||
def revenue_trend_chart(days: int = 30, title_suffix: str = "") -> Optional[bytes]:
|
||
"""每日業績折線圖(含 MoM 同期基準線)"""
|
||
if not _MPL_OK:
|
||
return _unavailable()
|
||
data = _fetch_daily_revenue(days * 2) # 取雙倍天數供 MoM 對比
|
||
if not data:
|
||
return None
|
||
|
||
curr = data[-days:] if len(data) >= days else data
|
||
prev = data[:-days] if len(data) >= days * 2 else []
|
||
|
||
_apply_style()
|
||
fig, ax = plt.subplots(figsize=(12, 4))
|
||
|
||
if curr:
|
||
labels = [d["date"] for d in curr]
|
||
vals = [d["revenue"] for d in curr]
|
||
ax.plot(labels, vals, color=_GREEN, linewidth=2.5, marker="o",
|
||
markersize=4, label="本期", zorder=3)
|
||
ax.fill_between(labels, vals, alpha=0.2, color=_GREEN)
|
||
|
||
# 7天移動平均
|
||
if len(vals) >= 7:
|
||
ma7 = np.convolve(vals, np.ones(7)/7, mode="valid")
|
||
ax.plot(labels[6:], ma7, color=_YELLOW, linewidth=1.5,
|
||
linestyle=":", label="7MA", zorder=2, alpha=0.8)
|
||
|
||
if prev:
|
||
labels2 = [d["date"] for d in prev[-len(curr):]]
|
||
vals2 = [d["revenue"] for d in prev[-len(curr):]]
|
||
ax.plot(labels2, vals2, color=_SUBTEXT, linewidth=1.5, linestyle="--",
|
||
label="前期同比", zorder=1, alpha=0.6)
|
||
|
||
title = f"業績趨勢 · 近 {days} 天{title_suffix}"
|
||
ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10)
|
||
ax.set_ylabel("日業績 (NT$)", color=_TEXT)
|
||
ax.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
|
||
lambda x, _: f"${x/10000:.1f}萬"))
|
||
ax.legend(loc="upper left", framealpha=0.3, facecolor=_PANEL, edgecolor=_GRID)
|
||
ax.grid(True, alpha=0.4)
|
||
ax.tick_params(axis="x", rotation=30, labelsize=7)
|
||
|
||
# 標注最高/最低點
|
||
if curr:
|
||
vals = [d["revenue"] for d in curr]
|
||
max_i, min_i = vals.index(max(vals)), vals.index(min(vals))
|
||
ax.annotate(f"↑ ${vals[max_i]/10000:.1f}萬",
|
||
xy=(curr[max_i]["date"], vals[max_i]),
|
||
xytext=(0, 8), textcoords="offset points",
|
||
ha="center", fontsize=8, color=_GREEN)
|
||
ax.annotate(f"↓ ${vals[min_i]/10000:.1f}萬",
|
||
xy=(curr[min_i]["date"], vals[min_i]),
|
||
xytext=(0, -14), textcoords="offset points",
|
||
ha="center", fontsize=8, color=_ACCENT1)
|
||
|
||
_add_watermark(ax)
|
||
fig.tight_layout()
|
||
return _fig_to_bytes(fig)
|
||
|
||
|
||
def category_revenue_chart(days: int = 7, title_suffix: str = "") -> Optional[bytes]:
|
||
"""品類業績橫條圖"""
|
||
if not _MPL_OK:
|
||
return _unavailable()
|
||
data = _fetch_category_revenue(days)
|
||
if not data:
|
||
return None
|
||
|
||
_apply_style()
|
||
n = min(len(data), 10)
|
||
cats = [d["category"][:10] for d in data[:n]][::-1]
|
||
revs = [d["revenue"] for d in data[:n]][::-1]
|
||
max_r = max(revs) if revs else 1
|
||
|
||
fig, ax = plt.subplots(figsize=(10, max(4, n * 0.55)))
|
||
|
||
colors = [_GREEN if r == max(revs) else (_ACCENT1 if r < max_r * 0.2 else _ACCENT2)
|
||
for r in revs]
|
||
bars = ax.barh(cats, revs, color=colors, alpha=0.85, edgecolor=_GRID, linewidth=0.5)
|
||
|
||
for bar, val in zip(bars, revs):
|
||
ax.text(bar.get_width() + max_r * 0.01, bar.get_y() + bar.get_height() / 2,
|
||
f"${val/10000:.1f}萬", va="center", fontsize=9, color=_TEXT)
|
||
|
||
ax.set_title(f"品類業績排行 · 近 {days} 天{title_suffix}",
|
||
fontsize=12, fontweight="bold", color=_TEXT, pad=10)
|
||
ax.set_xlabel("業績 (NT$)", color=_TEXT)
|
||
ax.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
|
||
lambda x, _: f"${x/10000:.0f}萬"))
|
||
ax.grid(True, axis="x", alpha=0.4)
|
||
ax.set_xlim(0, max_r * 1.18)
|
||
|
||
_add_watermark(ax)
|
||
fig.tight_layout()
|
||
return _fig_to_bytes(fig)
|
||
|
||
|
||
def monthly_overview_chart(months: int = 6) -> Optional[bytes]:
|
||
"""月份業績對比柱狀圖(含 MoM 成長率折線)"""
|
||
if not _MPL_OK:
|
||
return _unavailable()
|
||
data = _fetch_monthly_revenue(months)
|
||
if not data:
|
||
return None
|
||
|
||
_apply_style()
|
||
labels = [d["month"] for d in data]
|
||
vals = [d["revenue"] for d in data]
|
||
mom = [0] + [(vals[i] - vals[i-1]) / vals[i-1] * 100
|
||
if vals[i-1] else 0 for i in range(1, len(vals))]
|
||
|
||
fig, ax1 = plt.subplots(figsize=(10, 4.5))
|
||
ax2 = ax1.twinx()
|
||
|
||
bar_colors = [_GREEN if m >= 0 else _ACCENT1 for m in mom]
|
||
bars = ax1.bar(labels, vals, color=bar_colors, alpha=0.7,
|
||
edgecolor=_GRID, linewidth=0.5, width=0.6)
|
||
|
||
for bar, val in zip(bars, vals):
|
||
ax1.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + max(vals) * 0.01,
|
||
f"${val/10000:.0f}萬", ha="center", fontsize=8, color=_TEXT)
|
||
|
||
ax2.plot(labels, mom, color=_YELLOW, linewidth=2.5, marker="D",
|
||
markersize=6, label="MoM 成長率", zorder=5)
|
||
ax2.axhline(0, color=_SUBTEXT, linewidth=0.8, linestyle="--")
|
||
for x, y in zip(labels, mom):
|
||
ax2.annotate(f"{y:+.1f}%", (x, y), textcoords="offset points",
|
||
xytext=(0, 8), ha="center", fontsize=8,
|
||
color=_GREEN if y >= 0 else _ACCENT1)
|
||
|
||
ax1.set_title(f"月度業績概覽 · 近 {months} 個月",
|
||
fontsize=12, fontweight="bold", color=_TEXT, pad=10)
|
||
ax1.set_ylabel("月業績 (NT$)", color=_TEXT)
|
||
ax2.set_ylabel("MoM 成長率 (%)", color=_YELLOW)
|
||
ax1.yaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(
|
||
lambda x, _: f"${x/10000:.0f}萬"))
|
||
ax2.tick_params(axis="y", colors=_YELLOW)
|
||
ax1.grid(True, axis="y", alpha=0.3)
|
||
ax1.tick_params(axis="x", rotation=20)
|
||
|
||
_add_watermark(ax1)
|
||
fig.tight_layout()
|
||
return _fig_to_bytes(fig)
|
||
|
||
|
||
def price_gap_bar_chart(threats: List[Dict], title: str = "競價威脅 TOP 10") -> Optional[bytes]:
|
||
"""競價威脅價差橫條圖"""
|
||
if not _MPL_OK or not threats:
|
||
return _unavailable()
|
||
|
||
_apply_style()
|
||
n = min(len(threats), 10)
|
||
names = [str(t.get("name", t.get("sku", "")))[:18] for t in threats[:n]][::-1]
|
||
gaps = [float(t.get("gap_pct", 0)) for t in threats[:n]][::-1]
|
||
|
||
fig, ax = plt.subplots(figsize=(10, max(4, n * 0.55)))
|
||
colors = [_ACCENT1 if g > 15 else _YELLOW if g > 5 else _GREEN for g in gaps]
|
||
bars = ax.barh(names, gaps, color=colors, alpha=0.85, edgecolor=_GRID, linewidth=0.5)
|
||
|
||
for bar, val in zip(bars, gaps):
|
||
ax.text(bar.get_width() + 0.2, bar.get_y() + bar.get_height() / 2,
|
||
f"{val:+.1f}%", va="center", fontsize=9, color=_TEXT)
|
||
|
||
ax.axvline(0, color=_SUBTEXT, linewidth=1)
|
||
ax.axvline(5, color=_YELLOW, linewidth=0.8, linestyle=":", alpha=0.6)
|
||
ax.axvline(15, color=_ACCENT1, linewidth=0.8, linestyle=":", alpha=0.6)
|
||
ax.set_title(title, fontsize=12, fontweight="bold", color=_TEXT, pad=10)
|
||
ax.set_xlabel("MOMO 相對競品價差 (%) ↑我貴 / ↓我便宜", color=_TEXT)
|
||
ax.grid(True, axis="x", alpha=0.4)
|
||
|
||
_add_watermark(ax)
|
||
fig.tight_layout()
|
||
return _fig_to_bytes(fig)
|
||
|
||
|
||
def price_history_heatmap(days: int = 30) -> Optional[bytes]:
|
||
"""
|
||
品類 × 日期 價差熱力圖(月報用)
|
||
顯示哪個品類在哪幾天的競品威脅最嚴重
|
||
"""
|
||
if not _MPL_OK:
|
||
return _unavailable()
|
||
try:
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(text(f"""
|
||
SELECT p.category,
|
||
cp.crawled_at::date AS dt,
|
||
AVG((cp.price - pr.price) / NULLIF(pr.price, 0) * 100) AS gap_pct
|
||
FROM competitor_price_history cp
|
||
JOIN products p ON p.i_code = cp.sku
|
||
JOIN (
|
||
SELECT DISTINCT ON (product_id) product_id, price
|
||
FROM price_records ORDER BY product_id, timestamp DESC
|
||
) pr ON pr.product_id = p.id
|
||
WHERE cp.crawled_at >= NOW() - INTERVAL '{days} days'
|
||
AND p.category IS NOT NULL
|
||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
|
||
GROUP BY p.category, dt
|
||
ORDER BY p.category, dt
|
||
""")).fetchall()
|
||
finally:
|
||
session.close()
|
||
|
||
if not rows:
|
||
return None
|
||
|
||
import pandas as pd
|
||
df = pd.DataFrame(rows, columns=["category", "date", "gap_pct"])
|
||
pivot = df.pivot(index="category", columns="date", values="gap_pct").fillna(0)
|
||
|
||
_apply_style()
|
||
fig, ax = plt.subplots(figsize=(min(16, max(8, len(pivot.columns) * 0.5)),
|
||
max(4, len(pivot) * 0.6)))
|
||
|
||
im = ax.imshow(pivot.values, cmap="RdYlGn_r", aspect="auto",
|
||
vmin=-20, vmax=20)
|
||
plt.colorbar(im, ax=ax, label="價差 % (+我貴/-我便宜)", shrink=0.6)
|
||
|
||
ax.set_xticks(range(len(pivot.columns)))
|
||
ax.set_xticklabels([str(c)[-5:] for c in pivot.columns],
|
||
rotation=45, ha="right", fontsize=7)
|
||
ax.set_yticks(range(len(pivot.index)))
|
||
ax.set_yticklabels(pivot.index, fontsize=9)
|
||
ax.set_title(f"品類競品價差熱力圖 · 近 {days} 天",
|
||
fontsize=12, fontweight="bold", color=_TEXT, pad=10)
|
||
|
||
_add_watermark(ax)
|
||
fig.tight_layout()
|
||
return _fig_to_bytes(fig)
|
||
except Exception as e:
|
||
logger.warning("[Chart] heatmap 生成失敗: %s", e)
|
||
return None
|
||
|
||
|
||
def _add_watermark(ax):
|
||
ax.text(0.99, 0.01, "EwoooC AI · momo-pro-system",
|
||
transform=ax.transAxes, ha="right", va="bottom",
|
||
fontsize=7, color=_SUBTEXT, alpha=0.5)
|
||
|
||
|
||
# ── 模組單例(供外部直接 import) ────────────────────────────────────────────
|
||
import matplotlib.ticker
|