1414 lines
62 KiB
Python
1414 lines
62 KiB
Python
"""
|
||
services/telegram_templates.py
|
||
Telegram 訊息模板系統 v2
|
||
|
||
═══ 訊息分類 ═══════════════════════════════════════════════
|
||
🚨 告警類 — price_alert_msg / threat_alert_msg / system_alert_msg
|
||
📊 報告類 — daily_report_msg / weekly_report_msg / monthly_report_msg
|
||
💰 決策類 — price_decision / batch_decision_msg
|
||
🤖 系統類 — deploy_msg / heal_msg / meta_analysis_msg
|
||
💡 洞察類 — triaged_alert / insight_summary_msg
|
||
═══════════════════════════════════════════════════════════
|
||
|
||
規範:
|
||
1. 所有模板使用 HTML parse_mode
|
||
2. 一律繁體中文,Agent 名稱保留英文(Hermes/NemoTron/OpenClaw/EA)
|
||
3. 每則訊息須含:標題行 / 核心數據 / 建議行動(三段式)
|
||
4. 圖文訊息使用 send_photo_with_caption(附說明文字)
|
||
"""
|
||
|
||
import io
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
from html import escape
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
sys_log = logging.getLogger("TelegramTpl")
|
||
|
||
TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
|
||
TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS"
|
||
_TELEGRAM_HTML_BR_RE = re.compile(r"<\s*br\s*/?\s*>", re.IGNORECASE)
|
||
_TELEGRAM_HTML_TAG_RE = re.compile(r"<[^<>\n]{1,500}>")
|
||
_TELEGRAM_ALLOWED_HTML_TAG_RE = re.compile(
|
||
r"(?:</?(?:b|strong|i|em|u|s|strike|del|code|pre)>|<a\s+href=\"[^\"]+\">|</a>)",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 基礎工具
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def _get_bot_token() -> Optional[str]:
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
return os.getenv(TELEGRAM_BOT_TOKEN_ENV)
|
||
|
||
|
||
def _get_chat_ids() -> list:
|
||
token = _get_bot_token()
|
||
if not token:
|
||
sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV)
|
||
return []
|
||
raw = os.getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
|
||
try:
|
||
return json.loads(raw)
|
||
except json.JSONDecodeError:
|
||
sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV)
|
||
return []
|
||
|
||
|
||
def _sanitize_telegram_html(text: str, parse_mode: Optional[str] = "HTML") -> str:
|
||
"""Telegram HTML 只保留白名單標籤,其餘轉成文字避免 sendMessage 400。"""
|
||
value = str(text or "")
|
||
if parse_mode and str(parse_mode).upper() == "HTML":
|
||
value = _normalize_telegram_html_linebreaks(value)
|
||
return _TELEGRAM_HTML_TAG_RE.sub(_escape_unsupported_telegram_html_tag, value)
|
||
return value
|
||
|
||
|
||
def _normalize_telegram_html_linebreaks(text: str) -> str:
|
||
return _TELEGRAM_HTML_BR_RE.sub("\n", str(text or ""))
|
||
|
||
|
||
def _escape_unsupported_telegram_html_tag(match: re.Match) -> str:
|
||
tag = match.group(0)
|
||
if _TELEGRAM_ALLOWED_HTML_TAG_RE.fullmatch(tag):
|
||
return tag
|
||
return escape(tag)
|
||
|
||
|
||
def _callback_payload_utf8(value: Any, max_bytes: int = 52) -> str:
|
||
"""Clamp callback payload by UTF-8 bytes without splitting multibyte chars."""
|
||
text = str(value or "unknown").strip() or "unknown"
|
||
encoded = text.encode("utf-8")
|
||
if len(encoded) <= max_bytes:
|
||
return text
|
||
|
||
clipped = encoded[:max_bytes]
|
||
while clipped:
|
||
try:
|
||
decoded = clipped.decode("utf-8").strip()
|
||
return decoded or "unknown"
|
||
except UnicodeDecodeError:
|
||
clipped = clipped[:-1]
|
||
return "unknown"
|
||
|
||
|
||
def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
|
||
reply_markup: Optional[Dict[str, Any]] = None,
|
||
parse_mode: Optional[str] = "HTML") -> bool:
|
||
"""發送純文字訊息"""
|
||
import requests
|
||
token = _get_bot_token()
|
||
if not token:
|
||
return False
|
||
if chat_ids is None:
|
||
chat_ids = _get_chat_ids()
|
||
if not chat_ids:
|
||
chat_ids = [-1003940688311] # fallback
|
||
|
||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||
payload = {"chat_id": chat_ids[0], "text": _sanitize_telegram_html(text, parse_mode)}
|
||
if parse_mode:
|
||
payload["parse_mode"] = parse_mode
|
||
if reply_markup:
|
||
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
|
||
try:
|
||
r = requests.post(url, json=payload, timeout=10)
|
||
if not r.ok:
|
||
sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200])
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
sys_log.error("[TelegramTpl] send 失敗: %s", e)
|
||
return False
|
||
|
||
|
||
def send_telegram_with_result(text: str, chat_ids: Optional[list] = None,
|
||
reply_markup: Optional[Dict[str, Any]] = None,
|
||
parse_mode: Optional[str] = "HTML") -> Dict[str, Any]:
|
||
"""發送 Telegram 並回傳結果明細,供 EventRouter / AIOps 使用。"""
|
||
token = _get_bot_token()
|
||
if not token:
|
||
return {"ok": False, "sent": 0, "failed": 0, "chat_ids": [], "errors": ["token_missing"]}
|
||
|
||
if chat_ids is None:
|
||
chat_ids = _get_chat_ids()
|
||
if not chat_ids:
|
||
chat_ids = [-1003940688311]
|
||
|
||
import requests
|
||
|
||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||
sent = 0
|
||
failed = 0
|
||
errors: List[str] = []
|
||
|
||
for chat_id in chat_ids:
|
||
payload = {"chat_id": chat_id, "text": _sanitize_telegram_html(text, parse_mode)}
|
||
if parse_mode:
|
||
payload["parse_mode"] = parse_mode
|
||
if reply_markup:
|
||
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
|
||
try:
|
||
response = requests.post(url, json=payload, timeout=10)
|
||
if response.ok:
|
||
sent += 1
|
||
else:
|
||
failed += 1
|
||
errors.append(f"{chat_id}:HTTP {response.status_code}")
|
||
sys_log.warning("[TelegramTpl] sendMessage chat=%s HTTP %s: %s",
|
||
chat_id, response.status_code, response.text[:200])
|
||
except Exception as e:
|
||
failed += 1
|
||
errors.append(f"{chat_id}:{type(e).__name__}")
|
||
sys_log.error("[TelegramTpl] send chat=%s 失敗: %s", chat_id, e)
|
||
|
||
return {
|
||
"ok": sent > 0 and failed == 0,
|
||
"sent": sent,
|
||
"failed": failed,
|
||
"chat_ids": list(chat_ids),
|
||
"errors": errors,
|
||
}
|
||
|
||
|
||
def send_photo(photo_bytes: bytes, caption: str = "",
|
||
chat_ids: Optional[list] = None,
|
||
parse_mode: str = "HTML") -> bool:
|
||
"""發送圖片(含說明文字),供圖表報告使用"""
|
||
import requests
|
||
token = _get_bot_token()
|
||
if not token or not photo_bytes:
|
||
return False
|
||
if chat_ids is None:
|
||
chat_ids = _get_chat_ids()
|
||
if not chat_ids:
|
||
chat_ids = [-1003940688311]
|
||
|
||
url = f"https://api.telegram.org/bot{token}/sendPhoto"
|
||
try:
|
||
r = requests.post(
|
||
url,
|
||
data={"chat_id": chat_ids[0], "caption": _sanitize_telegram_html(caption, parse_mode)[:1024],
|
||
"parse_mode": parse_mode},
|
||
files={"photo": ("chart.png", photo_bytes, "image/png")},
|
||
timeout=30,
|
||
)
|
||
if not r.ok:
|
||
sys_log.warning("[TelegramTpl] sendPhoto HTTP %s: %s", r.status_code, r.text[:200])
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
sys_log.error("[TelegramTpl] sendPhoto 失敗: %s", e)
|
||
return False
|
||
|
||
|
||
def send_report_with_charts(text_msg: str, charts: List[Optional[bytes]],
|
||
chat_ids: Optional[list] = None) -> bool:
|
||
"""先發文字報告,再逐張發送圖表"""
|
||
ok = _send_telegram_raw(text_msg, chat_ids=chat_ids)
|
||
for i, chart in enumerate(charts):
|
||
if chart:
|
||
send_photo(chart, caption=f"圖表 {i+1}/{len(charts)}", chat_ids=chat_ids)
|
||
return ok
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 🚨 告警類模板
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def price_alert_msg(threats: List[Dict], analysis_period: str = "近48h") -> str:
|
||
"""
|
||
競品削價告警(Hermes + NemoTron 偵測結果)
|
||
┌─────────────────────────────────┐
|
||
│ 🚨 競品削價告警 · N 個 SKU │
|
||
│ ─────────────────────────────── │
|
||
│ ⚠️ SKU名稱 MOMO $X → 競品 $Y │
|
||
│ 價差 +X% 業績跌幅 -Y% │
|
||
└─────────────────────────────────┘
|
||
"""
|
||
n = len(threats)
|
||
high = [t for t in threats if t.get("risk") == "HIGH"]
|
||
lines = [
|
||
f"🚨 <b>競品削價告警</b> · 偵測到 <b>{n}</b> 個 SKU [{analysis_period}]",
|
||
f"🔴 高危:{len(high)} 個 🟡 中危:{n - len(high)} 個",
|
||
"─" * 32,
|
||
]
|
||
for t in threats[:8]:
|
||
risk_icon = "🔴" if t.get("risk") == "HIGH" else "🟡"
|
||
name = str(t.get("name", t.get("sku", "")))[:22]
|
||
gap = float(t.get("gap_pct", 0))
|
||
delta = float(t.get("sales_7d_delta_pct", t.get("sales_delta", 0)))
|
||
momo_p = t.get("momo_price")
|
||
pchome_p = t.get("pchome_price")
|
||
price_str = f" MOMO <b>${momo_p:,.0f}</b> vs 競品 <b>${pchome_p:,.0f}</b>" \
|
||
if momo_p and pchome_p else ""
|
||
lines.append(
|
||
f"{risk_icon} <b>{name}</b>\n"
|
||
f" 價差 <b>{gap:+.1f}%</b> 業績週跌 <b>{delta:+.1f}%</b>{price_str}"
|
||
)
|
||
if n > 8:
|
||
lines.append(f"<i>…另有 {n-8} 個 SKU(查看完整報告)</i>")
|
||
lines += ["─" * 32,
|
||
"💡 <b>建議:</b>優先處理紅色高危品項,確認競品是否促銷或長期低價"]
|
||
return "\n".join(lines)
|
||
|
||
|
||
def threat_alert_msg(sku: str, name: str, momo_price: float,
|
||
comp_price: float, gap_pct: float,
|
||
sales_delta: float, action: str, confidence: float) -> str:
|
||
"""單品威脅告警(NemoTron 派發)"""
|
||
risk = "🔴 高危" if gap_pct > 15 else "🟡 中危" if gap_pct > 5 else "🟢 低危"
|
||
return (
|
||
f"{risk} <b>競品威脅通報</b>\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"🏷️ <b>{name}</b> <code>{sku}</code>\n\n"
|
||
f"💴 MOMO <b>NT${momo_price:,.0f}</b>\n"
|
||
f"💴 競品(PChome)<b>NT${comp_price:,.0f}</b>\n"
|
||
f"📉 價差 <b>{gap_pct:+.1f}%</b>(我方較貴)\n"
|
||
f"📊 業績週變 <b>{sales_delta:+.1f}%</b>\n\n"
|
||
f"🤖 <b>AI 建議:</b>{action}\n"
|
||
f"📈 信心度:{confidence:.0%}\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━"
|
||
)
|
||
|
||
|
||
def system_alert_msg(level: str, title: str, detail: str, source: str = "") -> str:
|
||
"""系統級告警(AIOps / 部署失敗等)"""
|
||
icons = {"critical": "🚨", "error": "❌", "warning": "⚠️", "info": "ℹ️"}
|
||
icon = icons.get(level, "⚠️")
|
||
src = f" [{source}]" if source else ""
|
||
return (
|
||
f"{icon} <b>{title}</b>{src}\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"{detail[:600]}\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 📊 報告類模板(三種週期)
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def daily_report_header(date_str: str, revenue: float, wow: float,
|
||
threat_count: int, opportunity_count: int) -> str:
|
||
"""
|
||
日報標題卡(附圖前發送的文字說明)
|
||
"""
|
||
wow_icon = "📈" if wow >= 0 else "📉"
|
||
wow_color = "+" if wow >= 0 else ""
|
||
return (
|
||
f"📊 <b>EwoooC 電商日報</b> · {date_str}\n"
|
||
f"══════════════════════════\n"
|
||
f"💰 今日業績 <b>NT${revenue/10000:.1f} 萬</b>\n"
|
||
f"{wow_icon} 週同比 <b>{wow_color}{wow:.1f}%</b>\n"
|
||
f"🔴 競品威脅 <b>{threat_count}</b> 個 SKU\n"
|
||
f"🟢 市場機會 <b>{opportunity_count}</b> 個 SKU\n"
|
||
f"══════════════════════════\n"
|
||
f"🤖 <i>Hermes + NemoTron + OpenClaw 聯合分析</i>"
|
||
)
|
||
|
||
|
||
def weekly_report_header(period: str, curr_rev: float, prev_rev: float,
|
||
wow: float, top_category: str) -> str:
|
||
"""週報標題卡"""
|
||
wow_icon = "📈" if wow >= 0 else "📉"
|
||
return (
|
||
f"📊 <b>EwoooC 電商週報</b> · {period}\n"
|
||
f"══════════════════════════\n"
|
||
f"💰 本週業績 <b>NT${curr_rev/10000:.1f} 萬</b>\n"
|
||
f"📦 前週業績 <b>NT${prev_rev/10000:.1f} 萬</b>\n"
|
||
f"{wow_icon} 週成長率 <b>{wow:+.1f}%</b>\n"
|
||
f"🏆 最強品類 <b>{top_category}</b>\n"
|
||
f"══════════════════════════\n"
|
||
f"🤖 <i>OpenClaw × MCP 全景策略分析</i>"
|
||
)
|
||
|
||
|
||
def monthly_report_header(month_str: str, revenue: float, mom: float,
|
||
yoy: float, top3_categories: List[str]) -> str:
|
||
"""月報標題卡"""
|
||
mom_icon = "📈" if mom >= 0 else "📉"
|
||
yoy_icon = "🚀" if yoy >= 10 else "📈" if yoy >= 0 else "📉"
|
||
cats = " / ".join(top3_categories[:3])
|
||
return (
|
||
f"📅 <b>EwoooC 電商月報</b> · {month_str}\n"
|
||
f"══════════════════════════\n"
|
||
f"💰 月度業績 <b>NT${revenue/10000:.0f} 萬</b>\n"
|
||
f"{mom_icon} 月成長率 <b>{mom:+.1f}%</b> {yoy_icon} 年成長 <b>{yoy:+.1f}%</b>\n"
|
||
f"🏆 TOP 3 品類 <b>{cats}</b>\n"
|
||
f"══════════════════════════\n"
|
||
f"🤖 <i>OpenClaw × Hermes × MCP 月度全景洞察</i>"
|
||
)
|
||
|
||
|
||
def report_section(icon: str, title: str, lines: List[str]) -> str:
|
||
"""通用報告節段(供日/週/月報各節使用)"""
|
||
body = "\n".join(f" {l}" for l in lines)
|
||
return f"\n{icon} <b>{title}</b>\n{body}"
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 💰 決策類模板
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def price_decision(product_name: str, product_sku: str,
|
||
current_price: float, suggested_price: float,
|
||
reason: str, insight_id: Optional[int] = None,
|
||
report_url: Optional[str] = None) -> tuple:
|
||
"""降 / 提價決策通知(含 Inline Keyboard)"""
|
||
diff = current_price - suggested_price
|
||
action_text = f"降價 NT${diff:,.0f}" if diff > 0 else \
|
||
f"提價 NT${-diff:,.0f}" if diff < 0 else "維持現價"
|
||
direction = "📉" if diff > 0 else "📈" if diff < 0 else "➡️"
|
||
safe_name = escape(str(product_name or ""))
|
||
safe_sku = escape(str(product_sku or ""))
|
||
safe_reason = escape(_normalize_telegram_html_linebreaks(str(reason or "")))
|
||
|
||
message = (
|
||
f"💰 <b>AI 定價決策建議</b>\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"🏷️ <b>{safe_name}</b> <code>{safe_sku}</code>\n\n"
|
||
f"現價:<b>NT${current_price:,.0f}</b>\n"
|
||
f"建議:<b>NT${suggested_price:,.0f}</b> {direction} {action_text}\n\n"
|
||
f"💡 <b>依據:</b>{safe_reason}\n"
|
||
)
|
||
if insight_id:
|
||
message += f"🔗 洞察 ID:<code>{insight_id}</code>\n"
|
||
if report_url:
|
||
message += f"📎 <a href=\"{escape(str(report_url), quote=True)}\">查看分析報表</a>\n"
|
||
message += f"━━━━━━━━━━━━━━━━━━━━"
|
||
|
||
# ADR-012: callback_data 採短 prefix(momo:pa/pr:{insight_id})— 64-byte 安全、與 L2 handler 一致
|
||
# 降級:若無 insight_id(不應發生,但保底),以 sku 做後綴 — sku 仍可能超長,呼叫者有責任提供 insight_id
|
||
_cb_id = str(insight_id) if insight_id else f"sku_{product_sku}"[:40]
|
||
keyboard = {"inline_keyboard": [[
|
||
{"text": "✅ 確認執行", "callback_data": f"momo:pa:{_cb_id}"},
|
||
{"text": "❌ 拒絕", "callback_data": f"momo:pr:{_cb_id}"},
|
||
]]}
|
||
return message, keyboard
|
||
|
||
|
||
def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple:
|
||
"""批次定價決策(多 SKU 一次確認)"""
|
||
lines = [f"💰 <b>批次定價決策</b> · 共 {len(items)} 個 SKU\n━━━━━━━━━━━━━━━━━━━━"]
|
||
for i, item in enumerate(items[:6], 1):
|
||
diff = item.get("current_price", 0) - item.get("suggested_price", 0)
|
||
direction = "📉" if diff > 0 else "📈"
|
||
lines.append(
|
||
f"{i}. {direction} <b>{item.get('name','')[:20]}</b>\n"
|
||
f" ${item.get('current_price',0):,.0f} → <b>${item.get('suggested_price',0):,.0f}</b>"
|
||
)
|
||
if len(items) > 6:
|
||
lines.append(f"<i>…另有 {len(items)-6} 個 SKU</i>")
|
||
lines.append("━━━━━━━━━━━━━━━━━━━━")
|
||
# ADR-012: 短 prefix bpa=batch_price_approve / bpr=batch_price_reject,預留 64-byte buffer
|
||
_bid = str(batch_id)[:48]
|
||
keyboard = {"inline_keyboard": [[
|
||
{"text": f"✅ 全部確認({len(items)}項)",
|
||
"callback_data": f"momo:bpa:{_bid}"},
|
||
{"text": "❌ 取消",
|
||
"callback_data": f"momo:bpr:{_bid}"},
|
||
]]}
|
||
return "\n".join(lines), keyboard
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 🧠 Phase 11 RAG 反饋(v5.0 護欄 #1:強制晉升門檻 — Stage 4 人工驗收)
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def rag_feedback_keyboard(rag_query_log_id: int) -> dict:
|
||
"""產生 RAG 命中後的 👍/👎 反饋鍵盤(callback prefix 'rag_fb:')。
|
||
|
||
callback_data 設計(與 ADR-012 短 prefix 規範一致,<= 64 byte):
|
||
rag_fb:{log_id}:5 → 👍(feedback_score=5)
|
||
rag_fb:{log_id}:1 → 👎(feedback_score=1)
|
||
|
||
使用範例(caller 拿 RAGResult 後):
|
||
from services.telegram_templates import rag_feedback_keyboard
|
||
send_message(chat_id, rag.synthesize(),
|
||
keyboard=rag_feedback_keyboard(rag.log_id))
|
||
"""
|
||
_id = int(rag_query_log_id) if rag_query_log_id else 0
|
||
return {
|
||
'inline_keyboard': [[
|
||
{'text': '👍 有用', 'callback_data': f'rag_fb:{_id}:5'},
|
||
{'text': '👎 沒用', 'callback_data': f'rag_fb:{_id}:1'},
|
||
]],
|
||
}
|
||
|
||
|
||
def promotion_review_keyboard(episode_id: int) -> dict:
|
||
"""蒸餾池高權重晉升人工驗收鍵盤(PromotionGate Stage 4)。
|
||
|
||
callback_data:
|
||
pg_ok:{episode_id} → 通過 → 寫 ai_insights
|
||
pg_no:{episode_id} → 駁回 → rejected_human
|
||
"""
|
||
_id = int(episode_id) if episode_id else 0
|
||
return {
|
||
'inline_keyboard': [[
|
||
{'text': '✅ 通過晉升', 'callback_data': f'pg_ok:{_id}'},
|
||
{'text': '🚫 駁回', 'callback_data': f'pg_no:{_id}'},
|
||
]],
|
||
}
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 🤖 系統類模板
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def deploy_msg(status: str, branch: str, commit: str, duration: str = "") -> str:
|
||
"""CI/CD 部署通知"""
|
||
icons = {"success": "✅", "failed": "❌", "started": "🚀"}
|
||
icon = icons.get(status, "ℹ️")
|
||
dur = f" 耗時 {duration}" if duration else ""
|
||
return (
|
||
f"{icon} <b>部署{status.upper()}</b>{dur}\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"🌿 分支:<code>{branch}</code>\n"
|
||
f"🔖 Commit:<code>{commit[:12]}</code>\n"
|
||
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
|
||
)
|
||
|
||
|
||
def heal_msg(status: str, error_type: str, target_file: str,
|
||
commit_sha: str = "", diff_lines: int = 0) -> str:
|
||
"""AiderHeal 自動修復通知"""
|
||
if status == "started":
|
||
icon, title = "🔧", "AiderHeal 啟動"
|
||
elif status == "success":
|
||
icon, title = "✅", "AiderHeal 修復完成"
|
||
elif status == "reverted":
|
||
icon, title = "🔄", "AiderHeal 自動回滾"
|
||
else:
|
||
icon, title = "❌", "AiderHeal 失敗"
|
||
|
||
lines = [f"{icon} <b>{title}</b>", "━━━━━━━━━━━━━━━━━━━━",
|
||
f"🐛 錯誤類型:<code>{error_type}</code>",
|
||
f"📄 目標檔案:<code>{target_file}</code>"]
|
||
if commit_sha:
|
||
lines.append(f"🔖 Commit:<code>{commit_sha[:12]}</code>")
|
||
if diff_lines:
|
||
lines.append(f"📝 修改行數:{diff_lines} 行")
|
||
lines += ["━━━━━━━━━━━━━━━━━━━━",
|
||
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"]
|
||
return "\n".join(lines)
|
||
|
||
|
||
def meta_analysis_msg(period: str, content: str) -> str:
|
||
"""AI 系統自我審視報告"""
|
||
return (
|
||
f"🤖 <b>AI 系統效能自我審視</b> · {period}\n"
|
||
f"══════════════════════════\n"
|
||
f"{content[:1200]}\n"
|
||
f"══════════════════════════\n"
|
||
f"<i>由 OpenClaw 定期分析 · {datetime.now().strftime('%m/%d %H:%M')}</i>"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 💡 洞察類模板
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def _short_text(value: Any, limit: int = 120) -> str:
|
||
text = str(value or "").strip()
|
||
if len(text) <= limit:
|
||
return text
|
||
return text[: max(0, limit - 1)].rstrip() + "…"
|
||
|
||
|
||
def _split_action_parts(action: Any) -> List[str]:
|
||
return [
|
||
part.strip()
|
||
for part in re.split(r"\s*[||]\s*", str(action or ""))
|
||
if part and part.strip()
|
||
]
|
||
|
||
|
||
def _parse_ea_action(action: Any) -> Dict[str, Any]:
|
||
parts = _split_action_parts(action)
|
||
item: Dict[str, Any] = {
|
||
"raw": str(action or ""),
|
||
"title": parts[0] if parts else str(action or ""),
|
||
"notes": [],
|
||
}
|
||
title_match = re.match(r"^\[([^\]]+)\]\s*(.+)$", item["title"])
|
||
if title_match:
|
||
item["sku"] = title_match.group(1).strip()
|
||
item["name"] = title_match.group(2).strip()
|
||
|
||
for part in parts[1:]:
|
||
if "MOMO" in part and "PChome" in part:
|
||
item["comparison"] = part
|
||
momo = re.search(r"MOMO\s*\$([0-9,]+)", part)
|
||
pchome = re.search(r"PChome\s*\$([0-9,]+)", part)
|
||
pct = re.search(r"\(([+-]?\d+(?:\.\d+)?)%\)", part)
|
||
if momo:
|
||
item["momo_price"] = momo.group(1)
|
||
if pchome:
|
||
item["pchome_price"] = pchome.group(1)
|
||
if pct:
|
||
item["gap_pct"] = float(pct.group(1))
|
||
item["gap_pct_text"] = f"{float(pct.group(1)):+.1f}%"
|
||
elif part.startswith("PChome "):
|
||
item["pchome_id"] = part.replace("PChome", "", 1).strip()
|
||
elif part.startswith("建議"):
|
||
item["action"] = part
|
||
elif part.startswith("證據"):
|
||
item["evidence"] = part.replace("證據", "", 1).strip(" ::")
|
||
elif "NT$" in part or "流失" in part or "價差" in part or "價格優勢" in part:
|
||
item["impact"] = part
|
||
amount = re.search(r"NT\$\s*([0-9,]+)", part)
|
||
if amount:
|
||
try:
|
||
item["impact_amount"] = int(amount.group(1).replace(",", ""))
|
||
except ValueError:
|
||
pass
|
||
else:
|
||
item["notes"].append(part)
|
||
return item
|
||
|
||
|
||
def _format_ea_risk_summary(actions: List[Dict[str, Any]]) -> List[str]:
|
||
count = len(actions)
|
||
gap_values = [a["gap_pct"] for a in actions if isinstance(a.get("gap_pct"), (int, float))]
|
||
amounts = [a["impact_amount"] for a in actions if isinstance(a.get("impact_amount"), int)]
|
||
lines = [f"• 待審 SKU:<b>{count}</b> 件"]
|
||
if gap_values:
|
||
low, high = min(gap_values), max(gap_values)
|
||
if low == high:
|
||
lines.append(f"• 價差幅度:<b>{low:+.1f}%</b>")
|
||
else:
|
||
lines.append(f"• 價差範圍:<b>{low:+.1f}%~{high:+.1f}%</b>")
|
||
if amounts:
|
||
lines.append(f"• 最大單件價差:<b>NT$ {max(amounts):,}</b>")
|
||
lines.append("• 核心判斷:先確認同款 identity_v2,再決定跟價、促銷或曝光")
|
||
return lines
|
||
|
||
|
||
def _is_ea_sku_action(item: Dict[str, Any]) -> bool:
|
||
return bool(
|
||
item.get("sku")
|
||
or item.get("comparison")
|
||
or item.get("momo_price")
|
||
or item.get("pchome_price")
|
||
)
|
||
|
||
|
||
def _format_ea_action_card(item: Dict[str, Any], index: int) -> List[str]:
|
||
sku = escape(str(item.get("sku") or ""))
|
||
name = escape(_short_text(item.get("name") or item.get("title") or "", 58))
|
||
heading = f"<b>{index}. [{sku}] {name}</b>" if sku else f"<b>{index}. {name}</b>"
|
||
lines = [heading]
|
||
|
||
momo_price = item.get("momo_price")
|
||
pchome_price = item.get("pchome_price")
|
||
if momo_price or pchome_price:
|
||
momo_text = f"${momo_price}" if momo_price else "—"
|
||
pchome_text = f"${pchome_price}" if pchome_price else "—"
|
||
lines.append(f" MOMO:<b>{momo_text}</b> PChome:<b>{pchome_text}</b>")
|
||
|
||
gap_text = item.get("gap_pct_text")
|
||
impact = item.get("impact")
|
||
if gap_text or impact:
|
||
impact_text = escape(str(impact or ""))
|
||
if gap_text and impact_text:
|
||
lines.append(f" 價差:<b>{escape(str(gap_text))}</b> {impact_text}")
|
||
elif gap_text:
|
||
lines.append(f" 價差:<b>{escape(str(gap_text))}</b>")
|
||
else:
|
||
lines.append(f" 影響:{impact_text}")
|
||
|
||
evidence = item.get("evidence")
|
||
if evidence:
|
||
lines.append(f" 證據:{escape(_short_text(evidence, 96))}")
|
||
|
||
action = str(item.get("action") or "").replace("建議", "", 1).strip(" ::")
|
||
if action:
|
||
lines.append(f" 動作:{escape(_short_text(action, 86))}")
|
||
|
||
if item.get("pchome_id"):
|
||
lines.append(f" PChome:<code>{escape(_short_text(item['pchome_id'], 40))}</code>")
|
||
return lines
|
||
|
||
|
||
def _format_ea_generic_action(item: Dict[str, Any], index: int) -> str:
|
||
text = item.get("raw") or item.get("title") or ""
|
||
return f"{index}. {escape(_short_text(text, 140))}"
|
||
|
||
|
||
def _format_ea_escalation_alert(
|
||
*,
|
||
base_event: Dict[str, Any],
|
||
tier_label: str,
|
||
ai_summary: str,
|
||
ai_cause: Optional[str],
|
||
ai_actions: Optional[list],
|
||
) -> str:
|
||
event_type = escape(str(base_event.get("event_type", "ea_escalation")))
|
||
title = escape(str(base_event.get("title", "EA 升級審核")))
|
||
summary = escape(str(base_event.get("summary", "")))
|
||
cause_parts = [
|
||
escape(part.strip())
|
||
for part in str(ai_cause or "").split("|")
|
||
if part and part.strip()
|
||
]
|
||
parsed_actions = [_parse_ea_action(action) for action in (ai_actions or [])]
|
||
sku_actions = [item for item in parsed_actions if _is_ea_sku_action(item)]
|
||
generic_actions = [item for item in parsed_actions if not _is_ea_sku_action(item)]
|
||
shown_actions = sku_actions[:5]
|
||
hidden_count = max(0, len(sku_actions) - len(shown_actions))
|
||
decision_envelope = base_event.get("decision_envelope") or base_event.get("decision")
|
||
|
||
lines = [
|
||
f"⚡ <b>{escape(str(tier_label))}</b>",
|
||
f"📌 <b>{title}</b>",
|
||
f"<code>{event_type}</code>",
|
||
"━━━━━━━━━━━━━━━━━━━━",
|
||
"🧭 <b>決策狀態</b>",
|
||
]
|
||
if summary:
|
||
lines.append(f"• {summary}")
|
||
for part in cause_parts[:3]:
|
||
lines.append(f"• {part}")
|
||
|
||
if isinstance(decision_envelope, dict) and decision_envelope:
|
||
lines += ["", *_format_decision_envelope(decision_envelope)]
|
||
|
||
if ai_summary:
|
||
lines += [
|
||
"",
|
||
"🧠 <b>背景摘要</b>",
|
||
f"• {escape(_short_text(ai_summary, 280))}",
|
||
]
|
||
|
||
if sku_actions:
|
||
lines += [
|
||
"",
|
||
"📊 <b>風險摘要</b>",
|
||
*_format_ea_risk_summary(sku_actions),
|
||
"",
|
||
"📋 <b>TOP 待審 SKU</b>",
|
||
]
|
||
for idx, item in enumerate(shown_actions, start=1):
|
||
if idx > 1:
|
||
lines.append("")
|
||
lines.extend(_format_ea_action_card(item, idx))
|
||
if hidden_count:
|
||
lines.append(f"\n<i>另有 {hidden_count} 件,請至觀測台查看完整清單。</i>")
|
||
lines += [
|
||
"",
|
||
"✅ <b>建議處置</b>",
|
||
"• 先人工確認 PChome identity_v2 與規格一致",
|
||
"• 同款:評估跟價、組合促銷或加強 PChome 價格優勢曝光",
|
||
"• 非同款:標記待審,避免進入自動調價或簡報決策",
|
||
]
|
||
elif generic_actions:
|
||
lines += [
|
||
"",
|
||
"📋 <b>待確認事項</b>",
|
||
*[
|
||
_format_ea_generic_action(item, idx)
|
||
for idx, item in enumerate(generic_actions[:5], start=1)
|
||
],
|
||
"",
|
||
"✅ <b>建議處置</b>",
|
||
"• 先確認資料來源、最近錯誤紀錄與觀測台狀態",
|
||
"• 補齊可審核證據後再批准執行",
|
||
"• 未取得實證前,不執行自動調價、修復或策略派發",
|
||
]
|
||
else:
|
||
lines += [
|
||
"",
|
||
"✅ <b>建議處置</b>",
|
||
"• 先確認資料來源與最近錯誤紀錄",
|
||
"• 補齊可審核證據後再批准執行",
|
||
]
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _numeric_value(value: Any) -> Optional[float]:
|
||
if value is None or value == "":
|
||
return None
|
||
try:
|
||
text = str(value).strip().replace(",", "")
|
||
text = text.replace("NT$", "").replace("$", "").replace("%", "")
|
||
return float(text)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def _format_money_value(value: Any) -> str:
|
||
number = _numeric_value(value)
|
||
if number is None or number <= 0:
|
||
return ""
|
||
if number == int(number):
|
||
return f"NT$ {int(number):,}"
|
||
return f"NT$ {number:,.2f}"
|
||
|
||
|
||
def _format_percent_value(value: Any) -> str:
|
||
number = _numeric_value(value)
|
||
if number is None:
|
||
text = str(value or "").strip()
|
||
return escape(text) if text else ""
|
||
return f"{number:+.1f}%"
|
||
|
||
|
||
def _evidence_items(envelope: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||
raw_items = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
|
||
return [item for item in raw_items if isinstance(item, dict)]
|
||
|
||
|
||
def _find_evidence(envelope: Dict[str, Any], metric: str) -> Optional[Dict[str, Any]]:
|
||
for item in _evidence_items(envelope):
|
||
if str(item.get("metric") or item.get("type") or "") == metric:
|
||
return item
|
||
return None
|
||
|
||
|
||
def _is_price_decision_envelope(envelope: Dict[str, Any]) -> bool:
|
||
decision_type = str(envelope.get("decision_type") or "").lower()
|
||
if decision_type in {"price_alert", "pchome_match_review", "competitor_price_review"}:
|
||
return True
|
||
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
|
||
return bool(
|
||
subject.get("competitor_product_id")
|
||
or subject.get("competitor_price")
|
||
or subject.get("pchome_price")
|
||
or _find_evidence(envelope, "candidate_gap_pct")
|
||
or _find_evidence(envelope, "unit_price_gap_pct")
|
||
)
|
||
|
||
|
||
def _action_label(action_code: str) -> str:
|
||
labels = {
|
||
"price_follow_review": "確認是否跟價或改用促銷防守",
|
||
"review_accept_identity": "人工確認同款後採納 identity",
|
||
"review_catalog_comparable": "依型錄證據覆核可比性",
|
||
"unit_price_required": "改用單位價覆核,不寫總價型價差",
|
||
"identity_or_price_review": "先確認身份,再判斷價格處置",
|
||
"verify_or_reject_identity": "確認候選是否同款;非同款即駁回",
|
||
"compare_existing_identity": "比較既有正式 identity 與新候選",
|
||
"refresh_or_compare_identity": "刷新過期 identity 後再覆核",
|
||
"needs_research": "補搜尋或補證據後再判斷",
|
||
"human_review": "人工覆核",
|
||
}
|
||
return labels.get(action_code or "", action_code or "人工覆核")
|
||
|
||
|
||
_PRICE_MATCH_TYPE_LABELS = {
|
||
"exact": "高信心同款",
|
||
"same_product_different_pack": "同商品不同包裝",
|
||
"same_line_variant": "同系列不同款",
|
||
"comparable": "可比但需覆核",
|
||
"no_match": "非同款",
|
||
}
|
||
_PRICE_BASIS_LABELS = {
|
||
"total_price": "總價可比",
|
||
"unit_price": "單位價可比",
|
||
"manual_review": "人工覆核後可比",
|
||
"none": "不可比",
|
||
}
|
||
_PRICE_ALERT_TIER_LABELS = {
|
||
"price_alert_exact": "可直接價格告警",
|
||
"unit_price_review": "單位價覆核",
|
||
"identity_review": "身份覆核",
|
||
"suppress": "壓制告警",
|
||
}
|
||
|
||
|
||
def _price_match_path(envelope: Dict[str, Any]) -> tuple[str, str, str]:
|
||
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
|
||
match_type = str(guardrails.get("match_type") or "")
|
||
price_basis = str(guardrails.get("price_basis") or "")
|
||
alert_tier = str(guardrails.get("alert_tier") or "")
|
||
if match_type and price_basis and alert_tier:
|
||
return match_type, price_basis, alert_tier
|
||
|
||
match_evidence = _find_evidence(envelope, "match_score") or {}
|
||
basis = str(match_evidence.get("basis") or "")
|
||
parts = [part.strip() for part in basis.split("/") if part.strip()]
|
||
if len(parts) >= 3:
|
||
return match_type or parts[0], price_basis or parts[1], alert_tier or parts[2]
|
||
return match_type, price_basis, alert_tier
|
||
|
||
|
||
def _price_notification_guidance(match_type: str, price_basis: str, alert_tier: str) -> tuple[str, str]:
|
||
if match_type == "exact" and price_basis == "total_price" and alert_tier == "price_alert_exact":
|
||
return "直接價格威脅", "可用總價比較;先確認庫存與促銷期,再人工決定跟價或促銷防守。"
|
||
if price_basis == "unit_price" or alert_tier == "unit_price_review":
|
||
return "單位價覆核", "先換算單位價與入數,禁止用總價直接判定價格威脅。"
|
||
if alert_tier == "identity_review" or price_basis == "manual_review":
|
||
return "身份覆核", "先確認同款、規格、組合與前台狀態,人工採納後才可寫入正式價差。"
|
||
if alert_tier == "suppress" or match_type == "no_match" or price_basis == "none":
|
||
return "壓制告警", "目前不可作為價格威脅;保留診斷紀錄,避免誤報。"
|
||
return "可比性待判讀", "依比對證據人工覆核,未確認前不自動調價、不覆蓋正式 identity。"
|
||
|
||
|
||
def _format_price_decision_envelope(envelope: Dict[str, Any]) -> List[str]:
|
||
"""將價格/競品決策信封排成可讀的專業 brief。"""
|
||
severity = escape(str(envelope.get("severity") or "info"))
|
||
decision_type = escape(str(envelope.get("decision_type") or "price_review"))
|
||
confidence = _numeric_value(envelope.get("confidence"))
|
||
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
|
||
expected = envelope.get("expected_impact") if isinstance(envelope.get("expected_impact"), dict) else {}
|
||
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
|
||
recommended_action = envelope.get("recommended_action") if isinstance(envelope.get("recommended_action"), dict) else {}
|
||
|
||
data_quality = escape(str(guardrails.get("data_quality") or envelope.get("data_quality") or "unknown"))
|
||
can_auto_execute = bool(guardrails.get("can_auto_execute", False))
|
||
blocked_reason = escape(str(guardrails.get("blocked_reason") or ""))
|
||
confidence_text = f" 信心度:<b>{confidence:.0%}</b>" if confidence is not None else ""
|
||
|
||
lines = [
|
||
"🧭 <b>決策信封</b>",
|
||
f"• 類型:<code>{decision_type}</code> 嚴重度:<b>{severity}</b>{confidence_text}",
|
||
f"• 資料品質:<code>{data_quality}</code> 自動執行:<b>{'允許' if can_auto_execute else '不允許'}</b>",
|
||
]
|
||
if blocked_reason:
|
||
lines.append(f"• 邊界:{blocked_reason}")
|
||
|
||
match_type, price_basis, alert_tier = _price_match_path(envelope)
|
||
if match_type or price_basis or alert_tier:
|
||
guidance_title, guidance_text = _price_notification_guidance(match_type, price_basis, alert_tier)
|
||
path_labels = [
|
||
_PRICE_MATCH_TYPE_LABELS.get(match_type, match_type) if match_type else "",
|
||
_PRICE_BASIS_LABELS.get(price_basis, price_basis) if price_basis else "",
|
||
_PRICE_ALERT_TIER_LABELS.get(alert_tier, alert_tier) if alert_tier else "",
|
||
]
|
||
lines += [
|
||
"",
|
||
"🚦 <b>通知分級</b>",
|
||
f"• 判讀:<b>{escape(guidance_title)}</b>",
|
||
f"• 路徑:{escape(' / '.join(part for part in path_labels if part))}",
|
||
f"• 邊界:{escape(guidance_text)}",
|
||
]
|
||
|
||
sku = escape(str(subject.get("sku") or ""))
|
||
name = escape(_short_text(subject.get("name") or "", 96))
|
||
competitor_id = escape(str(subject.get("competitor_product_id") or subject.get("pchome_id") or ""))
|
||
competitor_name = escape(_short_text(subject.get("competitor_product_name") or subject.get("pchome_name") or "", 96))
|
||
|
||
target_lines = []
|
||
if sku:
|
||
target_lines.append(f"• SKU:<code>{sku}</code>")
|
||
if name:
|
||
target_lines.append(f"• MOMO:{name}")
|
||
if competitor_id:
|
||
target_lines.append(f"• PChome:<code>{competitor_id}</code>")
|
||
if competitor_name:
|
||
target_lines.append(f"• 候選:{competitor_name}")
|
||
if target_lines:
|
||
lines += ["", "🎯 <b>標的</b>", *target_lines]
|
||
|
||
momo_price = (
|
||
subject.get("momo_price")
|
||
or expected.get("momo_price")
|
||
or (_find_evidence(envelope, "momo_price") or {}).get("value")
|
||
)
|
||
competitor_price = (
|
||
subject.get("competitor_price")
|
||
or subject.get("pchome_price")
|
||
or expected.get("competitor_price")
|
||
or expected.get("candidate_price")
|
||
or expected.get("pchome_price")
|
||
or (_find_evidence(envelope, "pchome_price") or {}).get("value")
|
||
)
|
||
gap_pct = (
|
||
expected.get("candidate_gap_pct")
|
||
if expected.get("candidate_gap_pct") is not None
|
||
else (_find_evidence(envelope, "candidate_gap_pct") or {}).get("value")
|
||
)
|
||
gap_amount = expected.get("gap_amount")
|
||
|
||
price_lines = []
|
||
momo_price_text = _format_money_value(momo_price)
|
||
competitor_price_text = _format_money_value(competitor_price)
|
||
if momo_price_text or competitor_price_text:
|
||
price_lines.append(
|
||
f"• MOMO:<b>{momo_price_text or '—'}</b> PChome:<b>{competitor_price_text or '—'}</b>"
|
||
)
|
||
gap_text = _format_percent_value(gap_pct)
|
||
gap_amount_text = _format_money_value(gap_amount)
|
||
if gap_text or gap_amount_text:
|
||
detail = []
|
||
if gap_text:
|
||
detail.append(f"<b>{gap_text}</b>")
|
||
if gap_amount_text:
|
||
detail.append(gap_amount_text)
|
||
price_lines.append(f"• 價差:{' / '.join(detail)}(正值代表 MOMO 較貴)")
|
||
|
||
unit_insight = expected.get("unit_price_insight")
|
||
if isinstance(unit_insight, dict) and unit_insight:
|
||
unit_summary = escape(_short_text(unit_insight.get("summary") or "", 120))
|
||
if unit_summary:
|
||
price_lines.append(f"• 單位價:{unit_summary}")
|
||
if price_lines:
|
||
lines += ["", "📊 <b>價格證據</b>", *price_lines]
|
||
|
||
evidence_lines = []
|
||
match_evidence = _find_evidence(envelope, "match_score")
|
||
if match_evidence:
|
||
score = match_evidence.get("value")
|
||
basis = escape(str(match_evidence.get("basis") or ""))
|
||
score_text = escape(str(score if score is not None else ""))
|
||
evidence_lines.append(f"• Match:<code>{score_text}</code>" + (f" {basis}" if basis else ""))
|
||
reason_evidence = _find_evidence(envelope, "reasons")
|
||
if reason_evidence:
|
||
evidence_lines.append(f"• 診斷:{escape(_short_text(reason_evidence.get('value') or '', 130))}")
|
||
conflict = expected.get("existing_match_conflict")
|
||
if isinstance(conflict, dict) and conflict:
|
||
incoming = escape(str(conflict.get("incoming_product_id") or "unknown"))
|
||
existing = escape(str(conflict.get("existing_product_id") or "unknown"))
|
||
delta_value = _numeric_value(conflict.get("score_delta"))
|
||
delta = f"{delta_value:+.3f}" if delta_value is not None else ""
|
||
evidence_lines.append(f"• 既有保護:新候選 <code>{incoming}</code> vs 既有 <code>{existing}</code>" + (f" delta {delta}" if delta else ""))
|
||
if evidence_lines:
|
||
lines += ["", "🧩 <b>比對證據</b>", *evidence_lines]
|
||
|
||
difference_highlights = envelope.get("difference_highlights")
|
||
if isinstance(difference_highlights, list) and difference_highlights:
|
||
diff_lines = []
|
||
for row in difference_highlights[:3]:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
dimension = escape(str(row.get("dimension") or row.get("label") or "差異"))
|
||
left = escape(_short_text(row.get("left") or row.get("momo") or "", 42))
|
||
right = escape(_short_text(row.get("right") or row.get("pchome") or "", 42))
|
||
if left or right:
|
||
diff_lines.append(f"• {dimension}:MOMO {left or '—'} / PChome {right or '—'}")
|
||
else:
|
||
note = escape(_short_text(row.get("note") or row.get("summary") or "", 84))
|
||
if note:
|
||
diff_lines.append(f"• {dimension}:{note}")
|
||
if diff_lines:
|
||
lines += ["", "⚖️ <b>差異提醒</b>", *diff_lines]
|
||
|
||
action_code = str(recommended_action.get("action") or "human_review")
|
||
owner = escape(str(recommended_action.get("owner") or "未指定"))
|
||
requires_hitl = bool(recommended_action.get("requires_hitl", True))
|
||
lines += [
|
||
"",
|
||
"✅ <b>人工下一步</b>",
|
||
f"• {_action_label(action_code)}",
|
||
f"• 動作:<code>{escape(action_code)}</code> 負責:<b>{owner}</b> HITL:<b>{'需要' if requires_hitl else '不需要'}</b>",
|
||
]
|
||
|
||
trace = envelope.get("trace")
|
||
if isinstance(trace, dict):
|
||
trace_parts = []
|
||
for key in ("ai_call_id", "insight_id", "action_plan_id", "source", "attempted_at", "model", "provider"):
|
||
if trace.get(key) is not None:
|
||
trace_parts.append(f"{key}={trace[key]}")
|
||
if trace_parts:
|
||
lines += ["", f"<code>{escape(' | '.join(trace_parts))}</code>"]
|
||
|
||
return lines + [""]
|
||
|
||
|
||
def _format_decision_envelope(envelope: Dict[str, Any]) -> List[str]:
|
||
"""將 12 Agent 共用決策信封轉成可審核的 Telegram 區塊。"""
|
||
if not isinstance(envelope, dict) or not envelope:
|
||
return []
|
||
if _is_price_decision_envelope(envelope):
|
||
return _format_price_decision_envelope(envelope)
|
||
|
||
severity = escape(str(envelope.get("severity") or "info"))
|
||
decision_type = escape(str(envelope.get("decision_type") or "general"))
|
||
confidence = envelope.get("confidence")
|
||
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
|
||
data_quality = escape(str(guardrails.get("data_quality") or envelope.get("data_quality") or "unknown"))
|
||
can_auto_execute = bool(guardrails.get("can_auto_execute", False))
|
||
blocked_reason = escape(str(guardrails.get("blocked_reason") or ""))
|
||
|
||
confidence_text = ""
|
||
try:
|
||
if confidence is not None:
|
||
confidence_text = f" 信心度:<b>{float(confidence):.0%}</b>"
|
||
except (TypeError, ValueError):
|
||
confidence_text = ""
|
||
|
||
lines = [
|
||
"🧭 <b>決策信封</b>",
|
||
f"• 類型:<code>{decision_type}</code> 嚴重度:<b>{severity}</b>{confidence_text}",
|
||
f"• 資料品質:<code>{data_quality}</code> 自動執行:<b>{'允許' if can_auto_execute else '不允許'}</b>",
|
||
]
|
||
if blocked_reason:
|
||
lines.append(f"• 邊界:{blocked_reason}")
|
||
|
||
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
|
||
if subject:
|
||
sku = escape(str(subject.get("sku") or ""))
|
||
name = escape(_short_text(subject.get("name") or "", 96))
|
||
competitor_id = escape(str(subject.get("competitor_product_id") or ""))
|
||
competitor_name = escape(_short_text(subject.get("competitor_product_name") or "", 96))
|
||
subject_lines = []
|
||
if sku:
|
||
subject_lines.append(f"• SKU:<code>{sku}</code>")
|
||
if name:
|
||
subject_lines.append(f"• 商品:{name}")
|
||
if competitor_id:
|
||
subject_lines.append(f"• PChome:<code>{competitor_id}</code>")
|
||
if competitor_name:
|
||
subject_lines.append(f"• 候選:{competitor_name}")
|
||
if subject_lines:
|
||
lines += ["", "<b>標的</b>", *subject_lines]
|
||
|
||
evidence_items = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
|
||
if evidence_items:
|
||
lines += ["", "<b>證據</b>"]
|
||
for item in evidence_items[:3]:
|
||
if not isinstance(item, dict):
|
||
lines.append(f"• {escape(str(item))[:180]}")
|
||
continue
|
||
metric = escape(str(item.get("metric") or item.get("type") or "evidence"))
|
||
value = escape(str(item.get("value") if item.get("value") is not None else ""))
|
||
basis = escape(str(item.get("basis") or ""))
|
||
freshness = escape(str(item.get("freshness") or ""))
|
||
item_confidence = item.get("confidence")
|
||
confidence_suffix = ""
|
||
try:
|
||
if item_confidence is not None:
|
||
confidence_suffix = f" / {float(item_confidence):.0%}"
|
||
except (TypeError, ValueError):
|
||
confidence_suffix = ""
|
||
detail = " / ".join(part for part in (value, basis, freshness) if part)
|
||
lines.append(f"• <code>{metric}</code>{confidence_suffix}" + (f":{detail}" if detail else ""))
|
||
|
||
recommended_action = envelope.get("recommended_action")
|
||
if isinstance(recommended_action, dict):
|
||
action = escape(str(recommended_action.get("action") or "human_review"))
|
||
owner = escape(str(recommended_action.get("owner") or "未指定"))
|
||
deadline = escape(str(recommended_action.get("deadline") or ""))
|
||
requires_hitl = bool(recommended_action.get("requires_hitl", True))
|
||
lines += [
|
||
"",
|
||
"<b>建議行動</b>",
|
||
f"• 動作:<code>{action}</code> 負責:<b>{owner}</b>",
|
||
f"• HITL:<b>{'需要' if requires_hitl else '不需要'}</b>" + (f" 期限:{deadline}" if deadline else ""),
|
||
]
|
||
|
||
expected_impact = envelope.get("expected_impact")
|
||
if isinstance(expected_impact, dict) and expected_impact:
|
||
impact_parts = []
|
||
for key in ("revenue_loss_7d", "gap_amount", "cost_usd", "risk_reduction"):
|
||
if key in expected_impact and expected_impact[key] is not None:
|
||
impact_parts.append(f"{escape(key)}={escape(str(expected_impact[key]))}")
|
||
if impact_parts:
|
||
lines += ["", "<b>預期影響</b>", "• " + " / ".join(impact_parts[:4])]
|
||
|
||
trace = envelope.get("trace")
|
||
if isinstance(trace, dict):
|
||
trace_parts = []
|
||
for key in ("ai_call_id", "insight_id", "action_plan_id", "model", "provider"):
|
||
if trace.get(key) is not None:
|
||
trace_parts.append(f"{key}={trace[key]}")
|
||
if trace_parts:
|
||
lines += ["", f"<code>{escape(' | '.join(trace_parts))}</code>"]
|
||
|
||
return lines + [""]
|
||
|
||
|
||
def triaged_alert(base_event: Dict[str, Any], tier_label: str,
|
||
ai_summary: str, ai_cause: Optional[str] = None,
|
||
ai_actions: Optional[list] = None,
|
||
ai_executed: Optional[list] = None) -> tuple:
|
||
"""EA L1/L2 自主執行通知(保留原有介面,升級排版)"""
|
||
event_type = base_event.get("event_type", "alert")
|
||
title = escape(str(base_event.get("title", "")))
|
||
summary = escape(str(base_event.get("summary", "")))
|
||
event_id = base_event.get("id")
|
||
decision_envelope = base_event.get("decision_envelope") or base_event.get("decision")
|
||
if not event_id and isinstance(decision_envelope, dict):
|
||
event_id = decision_envelope.get("decision_id")
|
||
safe_ai_summary = escape(str(ai_summary or ""))
|
||
safe_ai_cause = escape(str(ai_cause or "")) if ai_cause else None
|
||
safe_actions = [escape(str(a)) for a in (ai_actions or [])]
|
||
safe_executed = [escape(str(a)) for a in (ai_executed or [])]
|
||
|
||
if event_type == "ea_escalation":
|
||
message = _format_ea_escalation_alert(
|
||
base_event=base_event,
|
||
tier_label=tier_label,
|
||
ai_summary=str(ai_summary or ""),
|
||
ai_cause=ai_cause,
|
||
ai_actions=ai_actions,
|
||
)
|
||
else:
|
||
lines = [
|
||
f"⚡ <b>{tier_label} · {event_type}</b>",
|
||
f"📌 {title}",
|
||
"",
|
||
]
|
||
if summary:
|
||
lines += [f"🔍 <b>概要:</b>{summary}", ""]
|
||
if safe_ai_summary:
|
||
lines += [f"🧠 <b>AI 摘要:</b>{safe_ai_summary[:400]}", ""]
|
||
if safe_ai_cause:
|
||
lines += [f"💡 <b>可能原因:</b>{safe_ai_cause}", ""]
|
||
if isinstance(decision_envelope, dict):
|
||
lines += _format_decision_envelope(decision_envelope)
|
||
if not event_id:
|
||
event_id = decision_envelope.get("decision_id")
|
||
if safe_actions:
|
||
lines += ["<b>📋 建議行動:</b>"] + [f" • {a}" for a in safe_actions] + [""]
|
||
if safe_executed:
|
||
lines += ["<b>✅ 已執行:</b>"] + [f" • {a}" for a in safe_executed] + [""]
|
||
|
||
trace = base_event.get("trace")
|
||
if trace:
|
||
lines.append(f"<pre>{trace[-400:]}</pre>")
|
||
message = "\n".join(lines)
|
||
|
||
# ADR-012: eig=event_ignore,callback_data 需小於 Telegram 64-byte 限制。
|
||
_eid = _callback_payload_utf8(event_id, max_bytes=52)
|
||
keyboard = {"inline_keyboard": [
|
||
[{"text": "🛑 忽略此事件",
|
||
"callback_data": f"momo:eig:{_eid}"}],
|
||
]}
|
||
return message, keyboard
|
||
|
||
|
||
def insight_summary_msg(insights: List[Dict], period: str = "近24h") -> str:
|
||
"""AI 洞察摘要彙整(供定期推播)"""
|
||
n = len(insights)
|
||
lines = [
|
||
f"💡 <b>AI 洞察摘要</b> · {period} 共 {n} 筆",
|
||
"━━━━━━━━━━━━━━━━━━━━",
|
||
]
|
||
type_icons = {
|
||
"price_alert": "🔴", "recommendation": "💰", "weekly_strategy": "📊",
|
||
"meta_analysis": "🤖", "market_opportunity": "🟢", "mcp_cache": "🌐",
|
||
}
|
||
for ins in insights[:6]:
|
||
icon = type_icons.get(ins.get("insight_type", ""), "💡")
|
||
content_preview = str(ins.get("content", ""))[:80].replace("\n", " ")
|
||
conf = float(ins.get("confidence", 0))
|
||
lines.append(f"{icon} {content_preview}… <i>({conf:.0%})</i>")
|
||
if n > 6:
|
||
lines.append(f"<i>…另有 {n-6} 筆洞察</i>")
|
||
lines.append("━━━━━━━━━━━━━━━━━━━━")
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 舊版相容介面(保留供現有程式碼調用)
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
def alert(title: str, content: str, actions: Optional[list] = None) -> str:
|
||
msg = system_alert_msg("error", title, content)
|
||
if actions:
|
||
msg += "\n" + "\n".join(f" • {a}" for a in actions)
|
||
return msg
|
||
|
||
|
||
def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
|
||
detail_str = "\n".join(f" {k}: {v}" for k, v in (details or {}).items())
|
||
return system_alert_msg("warning", title, summary + ("\n" + detail_str if detail_str else ""))
|
||
|
||
|
||
def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
|
||
t_str = f" · {time}" if time else ""
|
||
return f"ℹ️ <b>{title}</b> [{module}]{t_str}\n\n{content}"
|
||
|
||
|
||
def success(title: str, module: str, stats: str = "") -> str:
|
||
return f"✅ <b>{title}</b> [{module}]\n{stats}"
|
||
|
||
|
||
def report(title: str, report_type: str, period: str, content_md: str) -> str:
|
||
icons = {"weekly_strategy": "📊", "daily": "📅", "monthly": "📆", "meta_analysis": "🤖"}
|
||
icon = icons.get(report_type, "📋")
|
||
return (
|
||
f"{icon} <b>{title}</b> ({report_type})\n"
|
||
f"📅 期間:{period}\n"
|
||
f"══════════════════════════\n"
|
||
f"{content_md}"
|
||
)
|
||
|
||
|
||
def _send_telegram(msg: str, chat_ids: Optional[list] = None,
|
||
reply_markup: Optional[Dict[str, Any]] = None) -> bool:
|
||
return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# LLM Token 日報模板(Operation Ollama-First v5.0 — Phase 1 收尾)
|
||
# 對應 services/token_report_service.py
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
# Telegram 單訊息上限(保險絲,sendMessage HTML 上限為 4096 字元)
|
||
_DAILY_TOKEN_REPORT_MAX_CHARS = 4096
|
||
|
||
|
||
def daily_token_report(report_html: str,
|
||
footer_url: Optional[str] = None) -> str:
|
||
"""LLM Token 日報訊息包裝(HTML parse_mode)。
|
||
|
||
本函數只負責「附上 footer + 截斷至 Telegram 上限」;報表本體由
|
||
services/token_report_service.generate_daily_report() 產出,已含 HTML escape。
|
||
|
||
Args:
|
||
report_html: 已 escape 的 HTML 報表字串
|
||
footer_url: 選填 admin 後台連結,會自動 escape
|
||
|
||
Returns:
|
||
≤ 4096 字元的 HTML 字串,安全送 Telegram
|
||
"""
|
||
body = report_html or ""
|
||
if footer_url:
|
||
body = f"{body}\n📎 <a href=\"{escape(footer_url)}\">詳細日誌</a>"
|
||
|
||
if len(body) <= _DAILY_TOKEN_REPORT_MAX_CHARS:
|
||
return body
|
||
|
||
# 超長 → 截斷並加省略尾(保留 80 字給 trailing notice)
|
||
truncated = body[: _DAILY_TOKEN_REPORT_MAX_CHARS - 80]
|
||
return truncated + "\n\n... <i>(訊息過長已截斷;完整內容存於 ai_insights)</i>"
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
# 決策回執模板(L2 / L3 按鈕點擊後編輯原訊息使用)
|
||
# ADR-012 Phase 4:審計留痕 — 操作者、時間、action、結果
|
||
# ══════════════════════════════════════════════════════════════════════════════
|
||
|
||
# Asia/Taipei(UTC+8)— 容器可能跑 UTC,這裡手動偏移避免依賴 tzdata
|
||
_TAIPEI_UTC_OFFSET_HOURS = 8
|
||
|
||
|
||
def _now_taipei_hhmm() -> str:
|
||
"""回傳 Asia/Taipei 的 HH:MM(容器 TZ 不可靠,手動加 8h)。"""
|
||
try:
|
||
from datetime import timedelta, timezone
|
||
tz = timezone(timedelta(hours=_TAIPEI_UTC_OFFSET_HOURS))
|
||
return datetime.now(tz).strftime("%H:%M")
|
||
except Exception:
|
||
return datetime.now().strftime("%H:%M")
|
||
|
||
|
||
def _html_escape(s: Any) -> str:
|
||
"""最小 HTML 轉義(Telegram HTML parse_mode 允許 <b><i><code><pre><a>,
|
||
故只轉 < > & 避免使用者輸入破版)。"""
|
||
text = "" if s is None else str(s)
|
||
return (text.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">"))
|
||
|
||
|
||
def decision_result(original_text: str, action: str, operator: str,
|
||
note: Optional[str] = None, **extras: Any) -> str:
|
||
"""L2 單品定價決策回執(approve / reject 後編輯原訊息)。
|
||
|
||
Args:
|
||
original_text: 原 Telegram 訊息文字(query.message.text)
|
||
action: "approve" / "reject"
|
||
operator: 操作者顯示名稱(user.full_name 或 id_<tg_id>)
|
||
note: 選填補充說明(例:訓練保守策略)
|
||
**extras: 前向相容預留(目前忽略)
|
||
|
||
Returns:
|
||
原訊息 + 分隔線 + 稽核區塊(HTML parse_mode 安全)
|
||
"""
|
||
act = (action or "").lower()
|
||
if act == "approve":
|
||
icon, label = "✅", "已執行"
|
||
elif act == "reject":
|
||
icon, label = "❌", "已拒絕"
|
||
else:
|
||
icon, label = "ℹ️", f"已處理({_html_escape(action)})"
|
||
|
||
ts = _now_taipei_hhmm()
|
||
op_esc = _html_escape(operator or "unknown")
|
||
base = original_text or ""
|
||
lines = [
|
||
base,
|
||
"",
|
||
"━━━━━━━━━━━━━━━━━━━━",
|
||
f"{icon} <b>{label}</b> by <b>{op_esc}</b> at {ts}",
|
||
]
|
||
if note:
|
||
lines.append(f"📝 {_html_escape(note)}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def ops_action_result(original_text: str, action: str, operator: str,
|
||
result: Optional[Dict[str, Any]] = None,
|
||
**extras: Any) -> str:
|
||
"""L3 運維決策回執(pause1h / pause6h / retry / resume 執行後編輯原訊息)。
|
||
|
||
Args:
|
||
original_text: 原 Telegram 訊息文字
|
||
action: "pause1h" / "pause6h" / "retry" / "resume" / 其他
|
||
operator: 操作者顯示名稱
|
||
result: OPS_ACTIONS 函數回傳 dict,常見欄位:
|
||
status: "ok" / "success" / "error" / "skipped"
|
||
error: 錯誤訊息(若 status=error)
|
||
message / detail: 成功訊息
|
||
task_name, duration_min 等
|
||
**extras: 前向相容預留
|
||
|
||
Returns:
|
||
原訊息 + 分隔線 + 運維執行結果區塊
|
||
"""
|
||
action_labels = {
|
||
"pause1h": "暫停 1 小時",
|
||
"pause6h": "暫停 6 小時",
|
||
"retry": "立即重試",
|
||
"resume": "恢復執行",
|
||
"execute": "執行",
|
||
"skip": "略過",
|
||
"rollback": "回滾",
|
||
}
|
||
act_label = action_labels.get((action or "").lower(), _html_escape(action or "未知動作"))
|
||
|
||
result = result or {}
|
||
status = str(result.get("status", "")).lower()
|
||
if status in ("ok", "success", "done"):
|
||
status_icon, status_text = "✅", "成功"
|
||
elif status in ("error", "failed", "fail"):
|
||
status_icon, status_text = "❌", "失敗"
|
||
elif status == "skipped":
|
||
status_icon, status_text = "⏭️", "已略過"
|
||
else:
|
||
status_icon, status_text = "ℹ️", status or "已提交"
|
||
|
||
ts = _now_taipei_hhmm()
|
||
op_esc = _html_escape(operator or "unknown")
|
||
|
||
# 擷取結果訊息(err > message > detail)
|
||
detail_msg: Optional[str] = None
|
||
for key in ("error", "message", "detail"):
|
||
if result.get(key):
|
||
detail_msg = str(result[key])[:300]
|
||
break
|
||
|
||
lines = [
|
||
original_text or "",
|
||
"",
|
||
"━━━━━━━━━━━━━━━━━━━━",
|
||
f"🛠️ <b>運維動作:</b>{act_label}",
|
||
f"👤 操作員:<b>{op_esc}</b> 🕐 {ts}",
|
||
f"{status_icon} <b>執行結果:</b>{_html_escape(status_text)}",
|
||
]
|
||
if detail_msg:
|
||
lines.append(f"📋 {_html_escape(detail_msg)}")
|
||
|
||
# 額外關鍵欄位(task_name / duration_min 等)
|
||
task_name = result.get("task_name")
|
||
if task_name:
|
||
lines.append(f"📌 任務:<code>{_html_escape(task_name)}</code>")
|
||
duration_min = result.get("duration_min")
|
||
if duration_min:
|
||
lines.append(f"⏱️ 時長:{_html_escape(duration_min)} 分鐘")
|
||
|
||
return "\n".join(lines)
|