Files
ewoooc/services/telegram_templates.py
ogt 3b14368d4e
All checks were successful
CD Pipeline / deploy (push) Successful in 1m4s
fix: harden alerts and backup deployment guard
2026-06-26 17:52:06 +08:00

1414 lines
62 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
services/telegram_templates.py
Telegram 訊息模板系統 v2
═══ 訊息分類 ═══════════════════════════════════════════════
🚨 告警類 — price_alert_msg / threat_alert_msg / system_alert_msg
📊 報告類 — daily_report_msg / weekly_report_msg / monthly_report_msg
💰 決策類 — price_decision / batch_decision_msg
🤖 系統類 — deploy_msg / heal_msg / meta_analysis_msg
💡 洞察類 — triaged_alert / insight_summary_msg
═══════════════════════════════════════════════════════════
規範:
1. 所有模板使用 HTML parse_mode
2. 一律繁體中文Agent 名稱保留英文Hermes/NemoTron/OpenClaw/EA
3. 每則訊息須含:標題行 / 核心數據 / 建議行動(三段式)
4. 圖文訊息使用 send_photo_with_caption附說明文字
"""
import io
import json
import logging
import os
import re
from html import escape
from datetime import datetime
from typing import Any, Dict, List, Optional
sys_log = logging.getLogger("TelegramTpl")
TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS"
_TELEGRAM_HTML_BR_RE = re.compile(r"<\s*br\s*/?\s*>", re.IGNORECASE)
_TELEGRAM_HTML_TAG_RE = re.compile(r"<[^<>\n]{1,500}>")
_TELEGRAM_ALLOWED_HTML_TAG_RE = re.compile(
r"(?:</?(?:b|strong|i|em|u|s|strike|del|code|pre)>|<a\s+href=\"[^\"]+\">|</a>)",
re.IGNORECASE,
)
# ══════════════════════════════════════════════════════════════════════════════
# 基礎工具
# ══════════════════════════════════════════════════════════════════════════════
def _get_bot_token() -> Optional[str]:
from dotenv import load_dotenv
load_dotenv()
return os.getenv(TELEGRAM_BOT_TOKEN_ENV)
def _get_chat_ids() -> list:
token = _get_bot_token()
if not token:
sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV)
return []
raw = os.getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
try:
return json.loads(raw)
except json.JSONDecodeError:
sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV)
return []
def _sanitize_telegram_html(text: str, parse_mode: Optional[str] = "HTML") -> str:
"""Telegram HTML 只保留白名單標籤,其餘轉成文字避免 sendMessage 400。"""
value = str(text or "")
if parse_mode and str(parse_mode).upper() == "HTML":
value = _normalize_telegram_html_linebreaks(value)
return _TELEGRAM_HTML_TAG_RE.sub(_escape_unsupported_telegram_html_tag, value)
return value
def _normalize_telegram_html_linebreaks(text: str) -> str:
return _TELEGRAM_HTML_BR_RE.sub("\n", str(text or ""))
def _escape_unsupported_telegram_html_tag(match: re.Match) -> str:
tag = match.group(0)
if _TELEGRAM_ALLOWED_HTML_TAG_RE.fullmatch(tag):
return tag
return escape(tag)
def _callback_payload_utf8(value: Any, max_bytes: int = 52) -> str:
"""Clamp callback payload by UTF-8 bytes without splitting multibyte chars."""
text = str(value or "unknown").strip() or "unknown"
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
clipped = encoded[:max_bytes]
while clipped:
try:
decoded = clipped.decode("utf-8").strip()
return decoded or "unknown"
except UnicodeDecodeError:
clipped = clipped[:-1]
return "unknown"
def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: Optional[str] = "HTML") -> bool:
"""發送純文字訊息"""
import requests
token = _get_bot_token()
if not token:
return False
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311] # fallback
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_ids[0], "text": _sanitize_telegram_html(text, parse_mode)}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
r = requests.post(url, json=payload, timeout=10)
if not r.ok:
sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200])
return False
return True
except Exception as e:
sys_log.error("[TelegramTpl] send 失敗: %s", e)
return False
def send_telegram_with_result(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: Optional[str] = "HTML") -> Dict[str, Any]:
"""發送 Telegram 並回傳結果明細,供 EventRouter / AIOps 使用。"""
token = _get_bot_token()
if not token:
return {"ok": False, "sent": 0, "failed": 0, "chat_ids": [], "errors": ["token_missing"]}
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311]
import requests
url = f"https://api.telegram.org/bot{token}/sendMessage"
sent = 0
failed = 0
errors: List[str] = []
for chat_id in chat_ids:
payload = {"chat_id": chat_id, "text": _sanitize_telegram_html(text, parse_mode)}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
response = requests.post(url, json=payload, timeout=10)
if response.ok:
sent += 1
else:
failed += 1
errors.append(f"{chat_id}:HTTP {response.status_code}")
sys_log.warning("[TelegramTpl] sendMessage chat=%s HTTP %s: %s",
chat_id, response.status_code, response.text[:200])
except Exception as e:
failed += 1
errors.append(f"{chat_id}:{type(e).__name__}")
sys_log.error("[TelegramTpl] send chat=%s 失敗: %s", chat_id, e)
return {
"ok": sent > 0 and failed == 0,
"sent": sent,
"failed": failed,
"chat_ids": list(chat_ids),
"errors": errors,
}
def send_photo(photo_bytes: bytes, caption: str = "",
chat_ids: Optional[list] = None,
parse_mode: str = "HTML") -> bool:
"""發送圖片(含說明文字),供圖表報告使用"""
import requests
token = _get_bot_token()
if not token or not photo_bytes:
return False
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311]
url = f"https://api.telegram.org/bot{token}/sendPhoto"
try:
r = requests.post(
url,
data={"chat_id": chat_ids[0], "caption": _sanitize_telegram_html(caption, parse_mode)[:1024],
"parse_mode": parse_mode},
files={"photo": ("chart.png", photo_bytes, "image/png")},
timeout=30,
)
if not r.ok:
sys_log.warning("[TelegramTpl] sendPhoto HTTP %s: %s", r.status_code, r.text[:200])
return False
return True
except Exception as e:
sys_log.error("[TelegramTpl] sendPhoto 失敗: %s", e)
return False
def send_report_with_charts(text_msg: str, charts: List[Optional[bytes]],
chat_ids: Optional[list] = None) -> bool:
"""先發文字報告,再逐張發送圖表"""
ok = _send_telegram_raw(text_msg, chat_ids=chat_ids)
for i, chart in enumerate(charts):
if chart:
send_photo(chart, caption=f"圖表 {i+1}/{len(charts)}", chat_ids=chat_ids)
return ok
# ══════════════════════════════════════════════════════════════════════════════
# 🚨 告警類模板
# ══════════════════════════════════════════════════════════════════════════════
def price_alert_msg(threats: List[Dict], analysis_period: str = "近48h") -> str:
"""
競品削價告警Hermes + NemoTron 偵測結果)
┌─────────────────────────────────┐
│ 🚨 競品削價告警 · N 個 SKU │
│ ─────────────────────────────── │
│ ⚠️ SKU名稱 MOMO $X → 競品 $Y │
│ 價差 +X% 業績跌幅 -Y%
└─────────────────────────────────┘
"""
n = len(threats)
high = [t for t in threats if t.get("risk") == "HIGH"]
lines = [
f"🚨 <b>競品削價告警</b> · 偵測到 <b>{n}</b> 個 SKU [{analysis_period}]",
f"🔴 高危:{len(high)} 個  🟡 中危:{n - len(high)}",
"" * 32,
]
for t in threats[:8]:
risk_icon = "🔴" if t.get("risk") == "HIGH" else "🟡"
name = str(t.get("name", t.get("sku", "")))[:22]
gap = float(t.get("gap_pct", 0))
delta = float(t.get("sales_7d_delta_pct", t.get("sales_delta", 0)))
momo_p = t.get("momo_price")
pchome_p = t.get("pchome_price")
price_str = f" MOMO <b>${momo_p:,.0f}</b> vs 競品 <b>${pchome_p:,.0f}</b>" \
if momo_p and pchome_p else ""
lines.append(
f"{risk_icon} <b>{name}</b>\n"
f" 價差 <b>{gap:+.1f}%</b> 業績週跌 <b>{delta:+.1f}%</b>{price_str}"
)
if n > 8:
lines.append(f"<i>…另有 {n-8} 個 SKU查看完整報告</i>")
lines += ["" * 32,
"💡 <b>建議:</b>優先處理紅色高危品項,確認競品是否促銷或長期低價"]
return "\n".join(lines)
def threat_alert_msg(sku: str, name: str, momo_price: float,
comp_price: float, gap_pct: float,
sales_delta: float, action: str, confidence: float) -> str:
"""單品威脅告警NemoTron 派發)"""
risk = "🔴 高危" if gap_pct > 15 else "🟡 中危" if gap_pct > 5 else "🟢 低危"
return (
f"{risk} <b>競品威脅通報</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🏷️ <b>{name}</b> <code>{sku}</code>\n\n"
f"💴 MOMO  <b>NT${momo_price:,.0f}</b>\n"
f"💴 競品PChome<b>NT${comp_price:,.0f}</b>\n"
f"📉 價差  <b>{gap_pct:+.1f}%</b>(我方較貴)\n"
f"📊 業績週變 <b>{sales_delta:+.1f}%</b>\n\n"
f"🤖 <b>AI 建議:</b>{action}\n"
f"📈 信心度:{confidence:.0%}\n"
f"━━━━━━━━━━━━━━━━━━━━"
)
def system_alert_msg(level: str, title: str, detail: str, source: str = "") -> str:
"""系統級告警AIOps / 部署失敗等)"""
icons = {"critical": "🚨", "error": "", "warning": "⚠️", "info": ""}
icon = icons.get(level, "⚠️")
src = f" [{source}]" if source else ""
return (
f"{icon} <b>{title}</b>{src}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"{detail[:600]}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
)
# ══════════════════════════════════════════════════════════════════════════════
# 📊 報告類模板(三種週期)
# ══════════════════════════════════════════════════════════════════════════════
def daily_report_header(date_str: str, revenue: float, wow: float,
threat_count: int, opportunity_count: int) -> str:
"""
日報標題卡(附圖前發送的文字說明)
"""
wow_icon = "📈" if wow >= 0 else "📉"
wow_color = "+" if wow >= 0 else ""
return (
f"📊 <b>EwoooC 電商日報</b> · {date_str}\n"
f"══════════════════════════\n"
f"💰 今日業績  <b>NT${revenue/10000:.1f} 萬</b>\n"
f"{wow_icon} 週同比   <b>{wow_color}{wow:.1f}%</b>\n"
f"🔴 競品威脅  <b>{threat_count}</b> 個 SKU\n"
f"🟢 市場機會  <b>{opportunity_count}</b> 個 SKU\n"
f"══════════════════════════\n"
f"🤖 <i>Hermes + NemoTron + OpenClaw 聯合分析</i>"
)
def weekly_report_header(period: str, curr_rev: float, prev_rev: float,
wow: float, top_category: str) -> str:
"""週報標題卡"""
wow_icon = "📈" if wow >= 0 else "📉"
return (
f"📊 <b>EwoooC 電商週報</b> · {period}\n"
f"══════════════════════════\n"
f"💰 本週業績  <b>NT${curr_rev/10000:.1f} 萬</b>\n"
f"📦 前週業績  <b>NT${prev_rev/10000:.1f} 萬</b>\n"
f"{wow_icon} 週成長率  <b>{wow:+.1f}%</b>\n"
f"🏆 最強品類  <b>{top_category}</b>\n"
f"══════════════════════════\n"
f"🤖 <i>OpenClaw × MCP 全景策略分析</i>"
)
def monthly_report_header(month_str: str, revenue: float, mom: float,
yoy: float, top3_categories: List[str]) -> str:
"""月報標題卡"""
mom_icon = "📈" if mom >= 0 else "📉"
yoy_icon = "🚀" if yoy >= 10 else "📈" if yoy >= 0 else "📉"
cats = " / ".join(top3_categories[:3])
return (
f"📅 <b>EwoooC 電商月報</b> · {month_str}\n"
f"══════════════════════════\n"
f"💰 月度業績  <b>NT${revenue/10000:.0f} 萬</b>\n"
f"{mom_icon} 月成長率  <b>{mom:+.1f}%</b> {yoy_icon} 年成長 <b>{yoy:+.1f}%</b>\n"
f"🏆 TOP 3 品類 <b>{cats}</b>\n"
f"══════════════════════════\n"
f"🤖 <i>OpenClaw × Hermes × MCP 月度全景洞察</i>"
)
def report_section(icon: str, title: str, lines: List[str]) -> str:
"""通用報告節段(供日/週/月報各節使用)"""
body = "\n".join(f" {l}" for l in lines)
return f"\n{icon} <b>{title}</b>\n{body}"
# ══════════════════════════════════════════════════════════════════════════════
# 💰 決策類模板
# ══════════════════════════════════════════════════════════════════════════════
def price_decision(product_name: str, product_sku: str,
current_price: float, suggested_price: float,
reason: str, insight_id: Optional[int] = None,
report_url: Optional[str] = None) -> tuple:
"""降 / 提價決策通知(含 Inline Keyboard"""
diff = current_price - suggested_price
action_text = f"降價 NT${diff:,.0f}" if diff > 0 else \
f"提價 NT${-diff:,.0f}" if diff < 0 else "維持現價"
direction = "📉" if diff > 0 else "📈" if diff < 0 else "➡️"
safe_name = escape(str(product_name or ""))
safe_sku = escape(str(product_sku or ""))
safe_reason = escape(_normalize_telegram_html_linebreaks(str(reason or "")))
message = (
f"💰 <b>AI 定價決策建議</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🏷️ <b>{safe_name}</b> <code>{safe_sku}</code>\n\n"
f"現價:<b>NT${current_price:,.0f}</b>\n"
f"建議:<b>NT${suggested_price:,.0f}</b> {direction} {action_text}\n\n"
f"💡 <b>依據:</b>{safe_reason}\n"
)
if insight_id:
message += f"🔗 洞察 ID<code>{insight_id}</code>\n"
if report_url:
message += f"📎 <a href=\"{escape(str(report_url), quote=True)}\">查看分析報表</a>\n"
message += f"━━━━━━━━━━━━━━━━━━━━"
# ADR-012: callback_data 採短 prefixmomo:pa/pr:{insight_id})— 64-byte 安全、與 L2 handler 一致
# 降級:若無 insight_id不應發生但保底以 sku 做後綴 — sku 仍可能超長,呼叫者有責任提供 insight_id
_cb_id = str(insight_id) if insight_id else f"sku_{product_sku}"[:40]
keyboard = {"inline_keyboard": [[
{"text": "✅ 確認執行", "callback_data": f"momo:pa:{_cb_id}"},
{"text": "❌ 拒絕", "callback_data": f"momo:pr:{_cb_id}"},
]]}
return message, keyboard
def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple:
"""批次定價決策(多 SKU 一次確認)"""
lines = [f"💰 <b>批次定價決策</b> · 共 {len(items)} 個 SKU\n━━━━━━━━━━━━━━━━━━━━"]
for i, item in enumerate(items[:6], 1):
diff = item.get("current_price", 0) - item.get("suggested_price", 0)
direction = "📉" if diff > 0 else "📈"
lines.append(
f"{i}. {direction} <b>{item.get('name','')[:20]}</b>\n"
f" ${item.get('current_price',0):,.0f} → <b>${item.get('suggested_price',0):,.0f}</b>"
)
if len(items) > 6:
lines.append(f"<i>…另有 {len(items)-6} 個 SKU</i>")
lines.append("━━━━━━━━━━━━━━━━━━━━")
# ADR-012: 短 prefix bpa=batch_price_approve / bpr=batch_price_reject預留 64-byte buffer
_bid = str(batch_id)[:48]
keyboard = {"inline_keyboard": [[
{"text": f"✅ 全部確認({len(items)}項)",
"callback_data": f"momo:bpa:{_bid}"},
{"text": "❌ 取消",
"callback_data": f"momo:bpr:{_bid}"},
]]}
return "\n".join(lines), keyboard
# ══════════════════════════════════════════════════════════════════════════════
# 🧠 Phase 11 RAG 反饋v5.0 護欄 #1強制晉升門檻 — Stage 4 人工驗收)
# ══════════════════════════════════════════════════════════════════════════════
def rag_feedback_keyboard(rag_query_log_id: int) -> dict:
"""產生 RAG 命中後的 👍/👎 反饋鍵盤callback prefix 'rag_fb:')。
callback_data 設計(與 ADR-012 短 prefix 規範一致,<= 64 byte:
rag_fb:{log_id}:5 → 👍feedback_score=5
rag_fb:{log_id}:1 → 👎feedback_score=1
使用範例caller 拿 RAGResult 後):
from services.telegram_templates import rag_feedback_keyboard
send_message(chat_id, rag.synthesize(),
keyboard=rag_feedback_keyboard(rag.log_id))
"""
_id = int(rag_query_log_id) if rag_query_log_id else 0
return {
'inline_keyboard': [[
{'text': '👍 有用', 'callback_data': f'rag_fb:{_id}:5'},
{'text': '👎 沒用', 'callback_data': f'rag_fb:{_id}:1'},
]],
}
def promotion_review_keyboard(episode_id: int) -> dict:
"""蒸餾池高權重晉升人工驗收鍵盤PromotionGate Stage 4
callback_data:
pg_ok:{episode_id} → 通過 → 寫 ai_insights
pg_no:{episode_id} → 駁回 → rejected_human
"""
_id = int(episode_id) if episode_id else 0
return {
'inline_keyboard': [[
{'text': '✅ 通過晉升', 'callback_data': f'pg_ok:{_id}'},
{'text': '🚫 駁回', 'callback_data': f'pg_no:{_id}'},
]],
}
# ══════════════════════════════════════════════════════════════════════════════
# 🤖 系統類模板
# ══════════════════════════════════════════════════════════════════════════════
def deploy_msg(status: str, branch: str, commit: str, duration: str = "") -> str:
"""CI/CD 部署通知"""
icons = {"success": "", "failed": "", "started": "🚀"}
icon = icons.get(status, "")
dur = f" 耗時 {duration}" if duration else ""
return (
f"{icon} <b>部署{status.upper()}</b>{dur}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🌿 分支:<code>{branch}</code>\n"
f"🔖 Commit<code>{commit[:12]}</code>\n"
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
)
def heal_msg(status: str, error_type: str, target_file: str,
commit_sha: str = "", diff_lines: int = 0) -> str:
"""AiderHeal 自動修復通知"""
if status == "started":
icon, title = "🔧", "AiderHeal 啟動"
elif status == "success":
icon, title = "", "AiderHeal 修復完成"
elif status == "reverted":
icon, title = "🔄", "AiderHeal 自動回滾"
else:
icon, title = "", "AiderHeal 失敗"
lines = [f"{icon} <b>{title}</b>", "━━━━━━━━━━━━━━━━━━━━",
f"🐛 錯誤類型:<code>{error_type}</code>",
f"📄 目標檔案:<code>{target_file}</code>"]
if commit_sha:
lines.append(f"🔖 Commit<code>{commit_sha[:12]}</code>")
if diff_lines:
lines.append(f"📝 修改行數:{diff_lines}")
lines += ["━━━━━━━━━━━━━━━━━━━━",
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"]
return "\n".join(lines)
def meta_analysis_msg(period: str, content: str) -> str:
"""AI 系統自我審視報告"""
return (
f"🤖 <b>AI 系統效能自我審視</b> · {period}\n"
f"══════════════════════════\n"
f"{content[:1200]}\n"
f"══════════════════════════\n"
f"<i>由 OpenClaw 定期分析 · {datetime.now().strftime('%m/%d %H:%M')}</i>"
)
# ══════════════════════════════════════════════════════════════════════════════
# 💡 洞察類模板
# ══════════════════════════════════════════════════════════════════════════════
def _short_text(value: Any, limit: int = 120) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return text[: max(0, limit - 1)].rstrip() + ""
def _split_action_parts(action: Any) -> List[str]:
return [
part.strip()
for part in re.split(r"\s*[|]\s*", str(action or ""))
if part and part.strip()
]
def _parse_ea_action(action: Any) -> Dict[str, Any]:
parts = _split_action_parts(action)
item: Dict[str, Any] = {
"raw": str(action or ""),
"title": parts[0] if parts else str(action or ""),
"notes": [],
}
title_match = re.match(r"^\[([^\]]+)\]\s*(.+)$", item["title"])
if title_match:
item["sku"] = title_match.group(1).strip()
item["name"] = title_match.group(2).strip()
for part in parts[1:]:
if "MOMO" in part and "PChome" in part:
item["comparison"] = part
momo = re.search(r"MOMO\s*\$([0-9,]+)", part)
pchome = re.search(r"PChome\s*\$([0-9,]+)", part)
pct = re.search(r"\(([+-]?\d+(?:\.\d+)?)%\)", part)
if momo:
item["momo_price"] = momo.group(1)
if pchome:
item["pchome_price"] = pchome.group(1)
if pct:
item["gap_pct"] = float(pct.group(1))
item["gap_pct_text"] = f"{float(pct.group(1)):+.1f}%"
elif part.startswith("PChome "):
item["pchome_id"] = part.replace("PChome", "", 1).strip()
elif part.startswith("建議"):
item["action"] = part
elif part.startswith("證據"):
item["evidence"] = part.replace("證據", "", 1).strip(" :")
elif "NT$" in part or "流失" in part or "價差" in part or "價格優勢" in part:
item["impact"] = part
amount = re.search(r"NT\$\s*([0-9,]+)", part)
if amount:
try:
item["impact_amount"] = int(amount.group(1).replace(",", ""))
except ValueError:
pass
else:
item["notes"].append(part)
return item
def _format_ea_risk_summary(actions: List[Dict[str, Any]]) -> List[str]:
count = len(actions)
gap_values = [a["gap_pct"] for a in actions if isinstance(a.get("gap_pct"), (int, float))]
amounts = [a["impact_amount"] for a in actions if isinstance(a.get("impact_amount"), int)]
lines = [f"• 待審 SKU<b>{count}</b> 件"]
if gap_values:
low, high = min(gap_values), max(gap_values)
if low == high:
lines.append(f"• 價差幅度:<b>{low:+.1f}%</b>")
else:
lines.append(f"• 價差範圍:<b>{low:+.1f}%{high:+.1f}%</b>")
if amounts:
lines.append(f"• 最大單件價差:<b>NT$ {max(amounts):,}</b>")
lines.append("• 核心判斷:先確認同款 identity_v2再決定跟價、促銷或曝光")
return lines
def _is_ea_sku_action(item: Dict[str, Any]) -> bool:
return bool(
item.get("sku")
or item.get("comparison")
or item.get("momo_price")
or item.get("pchome_price")
)
def _format_ea_action_card(item: Dict[str, Any], index: int) -> List[str]:
sku = escape(str(item.get("sku") or ""))
name = escape(_short_text(item.get("name") or item.get("title") or "", 58))
heading = f"<b>{index}. [{sku}] {name}</b>" if sku else f"<b>{index}. {name}</b>"
lines = [heading]
momo_price = item.get("momo_price")
pchome_price = item.get("pchome_price")
if momo_price or pchome_price:
momo_text = f"${momo_price}" if momo_price else ""
pchome_text = f"${pchome_price}" if pchome_price else ""
lines.append(f" MOMO<b>{momo_text}</b> PChome<b>{pchome_text}</b>")
gap_text = item.get("gap_pct_text")
impact = item.get("impact")
if gap_text or impact:
impact_text = escape(str(impact or ""))
if gap_text and impact_text:
lines.append(f" 價差:<b>{escape(str(gap_text))}</b> {impact_text}")
elif gap_text:
lines.append(f" 價差:<b>{escape(str(gap_text))}</b>")
else:
lines.append(f" 影響:{impact_text}")
evidence = item.get("evidence")
if evidence:
lines.append(f" 證據:{escape(_short_text(evidence, 96))}")
action = str(item.get("action") or "").replace("建議", "", 1).strip(" :")
if action:
lines.append(f" 動作:{escape(_short_text(action, 86))}")
if item.get("pchome_id"):
lines.append(f" PChome<code>{escape(_short_text(item['pchome_id'], 40))}</code>")
return lines
def _format_ea_generic_action(item: Dict[str, Any], index: int) -> str:
text = item.get("raw") or item.get("title") or ""
return f"{index}. {escape(_short_text(text, 140))}"
def _format_ea_escalation_alert(
*,
base_event: Dict[str, Any],
tier_label: str,
ai_summary: str,
ai_cause: Optional[str],
ai_actions: Optional[list],
) -> str:
event_type = escape(str(base_event.get("event_type", "ea_escalation")))
title = escape(str(base_event.get("title", "EA 升級審核")))
summary = escape(str(base_event.get("summary", "")))
cause_parts = [
escape(part.strip())
for part in str(ai_cause or "").split("|")
if part and part.strip()
]
parsed_actions = [_parse_ea_action(action) for action in (ai_actions or [])]
sku_actions = [item for item in parsed_actions if _is_ea_sku_action(item)]
generic_actions = [item for item in parsed_actions if not _is_ea_sku_action(item)]
shown_actions = sku_actions[:5]
hidden_count = max(0, len(sku_actions) - len(shown_actions))
decision_envelope = base_event.get("decision_envelope") or base_event.get("decision")
lines = [
f"⚡ <b>{escape(str(tier_label))}</b>",
f"📌 <b>{title}</b>",
f"<code>{event_type}</code>",
"━━━━━━━━━━━━━━━━━━━━",
"🧭 <b>決策狀態</b>",
]
if summary:
lines.append(f"{summary}")
for part in cause_parts[:3]:
lines.append(f"{part}")
if isinstance(decision_envelope, dict) and decision_envelope:
lines += ["", *_format_decision_envelope(decision_envelope)]
if ai_summary:
lines += [
"",
"🧠 <b>背景摘要</b>",
f"{escape(_short_text(ai_summary, 280))}",
]
if sku_actions:
lines += [
"",
"📊 <b>風險摘要</b>",
*_format_ea_risk_summary(sku_actions),
"",
"📋 <b>TOP 待審 SKU</b>",
]
for idx, item in enumerate(shown_actions, start=1):
if idx > 1:
lines.append("")
lines.extend(_format_ea_action_card(item, idx))
if hidden_count:
lines.append(f"\n<i>另有 {hidden_count} 件,請至觀測台查看完整清單。</i>")
lines += [
"",
"✅ <b>建議處置</b>",
"• 先人工確認 PChome identity_v2 與規格一致",
"• 同款:評估跟價、組合促銷或加強 PChome 價格優勢曝光",
"• 非同款:標記待審,避免進入自動調價或簡報決策",
]
elif generic_actions:
lines += [
"",
"📋 <b>待確認事項</b>",
*[
_format_ea_generic_action(item, idx)
for idx, item in enumerate(generic_actions[:5], start=1)
],
"",
"✅ <b>建議處置</b>",
"• 先確認資料來源、最近錯誤紀錄與觀測台狀態",
"• 補齊可審核證據後再批准執行",
"• 未取得實證前,不執行自動調價、修復或策略派發",
]
else:
lines += [
"",
"✅ <b>建議處置</b>",
"• 先確認資料來源與最近錯誤紀錄",
"• 補齊可審核證據後再批准執行",
]
return "\n".join(lines)
def _numeric_value(value: Any) -> Optional[float]:
if value is None or value == "":
return None
try:
text = str(value).strip().replace(",", "")
text = text.replace("NT$", "").replace("$", "").replace("%", "")
return float(text)
except (TypeError, ValueError):
return None
def _format_money_value(value: Any) -> str:
number = _numeric_value(value)
if number is None or number <= 0:
return ""
if number == int(number):
return f"NT$ {int(number):,}"
return f"NT$ {number:,.2f}"
def _format_percent_value(value: Any) -> str:
number = _numeric_value(value)
if number is None:
text = str(value or "").strip()
return escape(text) if text else ""
return f"{number:+.1f}%"
def _evidence_items(envelope: Dict[str, Any]) -> List[Dict[str, Any]]:
raw_items = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
return [item for item in raw_items if isinstance(item, dict)]
def _find_evidence(envelope: Dict[str, Any], metric: str) -> Optional[Dict[str, Any]]:
for item in _evidence_items(envelope):
if str(item.get("metric") or item.get("type") or "") == metric:
return item
return None
def _is_price_decision_envelope(envelope: Dict[str, Any]) -> bool:
decision_type = str(envelope.get("decision_type") or "").lower()
if decision_type in {"price_alert", "pchome_match_review", "competitor_price_review"}:
return True
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
return bool(
subject.get("competitor_product_id")
or subject.get("competitor_price")
or subject.get("pchome_price")
or _find_evidence(envelope, "candidate_gap_pct")
or _find_evidence(envelope, "unit_price_gap_pct")
)
def _action_label(action_code: str) -> str:
labels = {
"price_follow_review": "確認是否跟價或改用促銷防守",
"review_accept_identity": "人工確認同款後採納 identity",
"review_catalog_comparable": "依型錄證據覆核可比性",
"unit_price_required": "改用單位價覆核,不寫總價型價差",
"identity_or_price_review": "先確認身份,再判斷價格處置",
"verify_or_reject_identity": "確認候選是否同款;非同款即駁回",
"compare_existing_identity": "比較既有正式 identity 與新候選",
"refresh_or_compare_identity": "刷新過期 identity 後再覆核",
"needs_research": "補搜尋或補證據後再判斷",
"human_review": "人工覆核",
}
return labels.get(action_code or "", action_code or "人工覆核")
_PRICE_MATCH_TYPE_LABELS = {
"exact": "高信心同款",
"same_product_different_pack": "同商品不同包裝",
"same_line_variant": "同系列不同款",
"comparable": "可比但需覆核",
"no_match": "非同款",
}
_PRICE_BASIS_LABELS = {
"total_price": "總價可比",
"unit_price": "單位價可比",
"manual_review": "人工覆核後可比",
"none": "不可比",
}
_PRICE_ALERT_TIER_LABELS = {
"price_alert_exact": "可直接價格告警",
"unit_price_review": "單位價覆核",
"identity_review": "身份覆核",
"suppress": "壓制告警",
}
def _price_match_path(envelope: Dict[str, Any]) -> tuple[str, str, str]:
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
match_type = str(guardrails.get("match_type") or "")
price_basis = str(guardrails.get("price_basis") or "")
alert_tier = str(guardrails.get("alert_tier") or "")
if match_type and price_basis and alert_tier:
return match_type, price_basis, alert_tier
match_evidence = _find_evidence(envelope, "match_score") or {}
basis = str(match_evidence.get("basis") or "")
parts = [part.strip() for part in basis.split("/") if part.strip()]
if len(parts) >= 3:
return match_type or parts[0], price_basis or parts[1], alert_tier or parts[2]
return match_type, price_basis, alert_tier
def _price_notification_guidance(match_type: str, price_basis: str, alert_tier: str) -> tuple[str, str]:
if match_type == "exact" and price_basis == "total_price" and alert_tier == "price_alert_exact":
return "直接價格威脅", "可用總價比較;先確認庫存與促銷期,再人工決定跟價或促銷防守。"
if price_basis == "unit_price" or alert_tier == "unit_price_review":
return "單位價覆核", "先換算單位價與入數,禁止用總價直接判定價格威脅。"
if alert_tier == "identity_review" or price_basis == "manual_review":
return "身份覆核", "先確認同款、規格、組合與前台狀態,人工採納後才可寫入正式價差。"
if alert_tier == "suppress" or match_type == "no_match" or price_basis == "none":
return "壓制告警", "目前不可作為價格威脅;保留診斷紀錄,避免誤報。"
return "可比性待判讀", "依比對證據人工覆核,未確認前不自動調價、不覆蓋正式 identity。"
def _format_price_decision_envelope(envelope: Dict[str, Any]) -> List[str]:
"""將價格/競品決策信封排成可讀的專業 brief。"""
severity = escape(str(envelope.get("severity") or "info"))
decision_type = escape(str(envelope.get("decision_type") or "price_review"))
confidence = _numeric_value(envelope.get("confidence"))
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
expected = envelope.get("expected_impact") if isinstance(envelope.get("expected_impact"), dict) else {}
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
recommended_action = envelope.get("recommended_action") if isinstance(envelope.get("recommended_action"), dict) else {}
data_quality = escape(str(guardrails.get("data_quality") or envelope.get("data_quality") or "unknown"))
can_auto_execute = bool(guardrails.get("can_auto_execute", False))
blocked_reason = escape(str(guardrails.get("blocked_reason") or ""))
confidence_text = f" 信心度:<b>{confidence:.0%}</b>" if confidence is not None else ""
lines = [
"🧭 <b>決策信封</b>",
f"• 類型:<code>{decision_type}</code> 嚴重度:<b>{severity}</b>{confidence_text}",
f"• 資料品質:<code>{data_quality}</code> 自動執行:<b>{'允許' if can_auto_execute else '不允許'}</b>",
]
if blocked_reason:
lines.append(f"• 邊界:{blocked_reason}")
match_type, price_basis, alert_tier = _price_match_path(envelope)
if match_type or price_basis or alert_tier:
guidance_title, guidance_text = _price_notification_guidance(match_type, price_basis, alert_tier)
path_labels = [
_PRICE_MATCH_TYPE_LABELS.get(match_type, match_type) if match_type else "",
_PRICE_BASIS_LABELS.get(price_basis, price_basis) if price_basis else "",
_PRICE_ALERT_TIER_LABELS.get(alert_tier, alert_tier) if alert_tier else "",
]
lines += [
"",
"🚦 <b>通知分級</b>",
f"• 判讀:<b>{escape(guidance_title)}</b>",
f"• 路徑:{escape(' / '.join(part for part in path_labels if part))}",
f"• 邊界:{escape(guidance_text)}",
]
sku = escape(str(subject.get("sku") or ""))
name = escape(_short_text(subject.get("name") or "", 96))
competitor_id = escape(str(subject.get("competitor_product_id") or subject.get("pchome_id") or ""))
competitor_name = escape(_short_text(subject.get("competitor_product_name") or subject.get("pchome_name") or "", 96))
target_lines = []
if sku:
target_lines.append(f"• SKU<code>{sku}</code>")
if name:
target_lines.append(f"• MOMO{name}")
if competitor_id:
target_lines.append(f"• PChome<code>{competitor_id}</code>")
if competitor_name:
target_lines.append(f"• 候選:{competitor_name}")
if target_lines:
lines += ["", "🎯 <b>標的</b>", *target_lines]
momo_price = (
subject.get("momo_price")
or expected.get("momo_price")
or (_find_evidence(envelope, "momo_price") or {}).get("value")
)
competitor_price = (
subject.get("competitor_price")
or subject.get("pchome_price")
or expected.get("competitor_price")
or expected.get("candidate_price")
or expected.get("pchome_price")
or (_find_evidence(envelope, "pchome_price") or {}).get("value")
)
gap_pct = (
expected.get("candidate_gap_pct")
if expected.get("candidate_gap_pct") is not None
else (_find_evidence(envelope, "candidate_gap_pct") or {}).get("value")
)
gap_amount = expected.get("gap_amount")
price_lines = []
momo_price_text = _format_money_value(momo_price)
competitor_price_text = _format_money_value(competitor_price)
if momo_price_text or competitor_price_text:
price_lines.append(
f"• MOMO<b>{momo_price_text or ''}</b> PChome<b>{competitor_price_text or ''}</b>"
)
gap_text = _format_percent_value(gap_pct)
gap_amount_text = _format_money_value(gap_amount)
if gap_text or gap_amount_text:
detail = []
if gap_text:
detail.append(f"<b>{gap_text}</b>")
if gap_amount_text:
detail.append(gap_amount_text)
price_lines.append(f"• 價差:{' / '.join(detail)}(正值代表 MOMO 較貴)")
unit_insight = expected.get("unit_price_insight")
if isinstance(unit_insight, dict) and unit_insight:
unit_summary = escape(_short_text(unit_insight.get("summary") or "", 120))
if unit_summary:
price_lines.append(f"• 單位價:{unit_summary}")
if price_lines:
lines += ["", "📊 <b>價格證據</b>", *price_lines]
evidence_lines = []
match_evidence = _find_evidence(envelope, "match_score")
if match_evidence:
score = match_evidence.get("value")
basis = escape(str(match_evidence.get("basis") or ""))
score_text = escape(str(score if score is not None else ""))
evidence_lines.append(f"• Match<code>{score_text}</code>" + (f" {basis}" if basis else ""))
reason_evidence = _find_evidence(envelope, "reasons")
if reason_evidence:
evidence_lines.append(f"• 診斷:{escape(_short_text(reason_evidence.get('value') or '', 130))}")
conflict = expected.get("existing_match_conflict")
if isinstance(conflict, dict) and conflict:
incoming = escape(str(conflict.get("incoming_product_id") or "unknown"))
existing = escape(str(conflict.get("existing_product_id") or "unknown"))
delta_value = _numeric_value(conflict.get("score_delta"))
delta = f"{delta_value:+.3f}" if delta_value is not None else ""
evidence_lines.append(f"• 既有保護:新候選 <code>{incoming}</code> vs 既有 <code>{existing}</code>" + (f" delta {delta}" if delta else ""))
if evidence_lines:
lines += ["", "🧩 <b>比對證據</b>", *evidence_lines]
difference_highlights = envelope.get("difference_highlights")
if isinstance(difference_highlights, list) and difference_highlights:
diff_lines = []
for row in difference_highlights[:3]:
if not isinstance(row, dict):
continue
dimension = escape(str(row.get("dimension") or row.get("label") or "差異"))
left = escape(_short_text(row.get("left") or row.get("momo") or "", 42))
right = escape(_short_text(row.get("right") or row.get("pchome") or "", 42))
if left or right:
diff_lines.append(f"{dimension}MOMO {left or ''} / PChome {right or ''}")
else:
note = escape(_short_text(row.get("note") or row.get("summary") or "", 84))
if note:
diff_lines.append(f"{dimension}{note}")
if diff_lines:
lines += ["", "⚖️ <b>差異提醒</b>", *diff_lines]
action_code = str(recommended_action.get("action") or "human_review")
owner = escape(str(recommended_action.get("owner") or "未指定"))
requires_hitl = bool(recommended_action.get("requires_hitl", True))
lines += [
"",
"✅ <b>人工下一步</b>",
f"{_action_label(action_code)}",
f"• 動作:<code>{escape(action_code)}</code> 負責:<b>{owner}</b> HITL<b>{'需要' if requires_hitl else '不需要'}</b>",
]
trace = envelope.get("trace")
if isinstance(trace, dict):
trace_parts = []
for key in ("ai_call_id", "insight_id", "action_plan_id", "source", "attempted_at", "model", "provider"):
if trace.get(key) is not None:
trace_parts.append(f"{key}={trace[key]}")
if trace_parts:
lines += ["", f"<code>{escape(' | '.join(trace_parts))}</code>"]
return lines + [""]
def _format_decision_envelope(envelope: Dict[str, Any]) -> List[str]:
"""將 12 Agent 共用決策信封轉成可審核的 Telegram 區塊。"""
if not isinstance(envelope, dict) or not envelope:
return []
if _is_price_decision_envelope(envelope):
return _format_price_decision_envelope(envelope)
severity = escape(str(envelope.get("severity") or "info"))
decision_type = escape(str(envelope.get("decision_type") or "general"))
confidence = envelope.get("confidence")
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
data_quality = escape(str(guardrails.get("data_quality") or envelope.get("data_quality") or "unknown"))
can_auto_execute = bool(guardrails.get("can_auto_execute", False))
blocked_reason = escape(str(guardrails.get("blocked_reason") or ""))
confidence_text = ""
try:
if confidence is not None:
confidence_text = f" 信心度:<b>{float(confidence):.0%}</b>"
except (TypeError, ValueError):
confidence_text = ""
lines = [
"🧭 <b>決策信封</b>",
f"• 類型:<code>{decision_type}</code> 嚴重度:<b>{severity}</b>{confidence_text}",
f"• 資料品質:<code>{data_quality}</code> 自動執行:<b>{'允許' if can_auto_execute else '不允許'}</b>",
]
if blocked_reason:
lines.append(f"• 邊界:{blocked_reason}")
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
if subject:
sku = escape(str(subject.get("sku") or ""))
name = escape(_short_text(subject.get("name") or "", 96))
competitor_id = escape(str(subject.get("competitor_product_id") or ""))
competitor_name = escape(_short_text(subject.get("competitor_product_name") or "", 96))
subject_lines = []
if sku:
subject_lines.append(f"• SKU<code>{sku}</code>")
if name:
subject_lines.append(f"• 商品:{name}")
if competitor_id:
subject_lines.append(f"• PChome<code>{competitor_id}</code>")
if competitor_name:
subject_lines.append(f"• 候選:{competitor_name}")
if subject_lines:
lines += ["", "<b>標的</b>", *subject_lines]
evidence_items = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
if evidence_items:
lines += ["", "<b>證據</b>"]
for item in evidence_items[:3]:
if not isinstance(item, dict):
lines.append(f"{escape(str(item))[:180]}")
continue
metric = escape(str(item.get("metric") or item.get("type") or "evidence"))
value = escape(str(item.get("value") if item.get("value") is not None else ""))
basis = escape(str(item.get("basis") or ""))
freshness = escape(str(item.get("freshness") or ""))
item_confidence = item.get("confidence")
confidence_suffix = ""
try:
if item_confidence is not None:
confidence_suffix = f" / {float(item_confidence):.0%}"
except (TypeError, ValueError):
confidence_suffix = ""
detail = " / ".join(part for part in (value, basis, freshness) if part)
lines.append(f"• <code>{metric}</code>{confidence_suffix}" + (f"{detail}" if detail else ""))
recommended_action = envelope.get("recommended_action")
if isinstance(recommended_action, dict):
action = escape(str(recommended_action.get("action") or "human_review"))
owner = escape(str(recommended_action.get("owner") or "未指定"))
deadline = escape(str(recommended_action.get("deadline") or ""))
requires_hitl = bool(recommended_action.get("requires_hitl", True))
lines += [
"",
"<b>建議行動</b>",
f"• 動作:<code>{action}</code> 負責:<b>{owner}</b>",
f"• HITL<b>{'需要' if requires_hitl else '不需要'}</b>" + (f" 期限:{deadline}" if deadline else ""),
]
expected_impact = envelope.get("expected_impact")
if isinstance(expected_impact, dict) and expected_impact:
impact_parts = []
for key in ("revenue_loss_7d", "gap_amount", "cost_usd", "risk_reduction"):
if key in expected_impact and expected_impact[key] is not None:
impact_parts.append(f"{escape(key)}={escape(str(expected_impact[key]))}")
if impact_parts:
lines += ["", "<b>預期影響</b>", "" + " / ".join(impact_parts[:4])]
trace = envelope.get("trace")
if isinstance(trace, dict):
trace_parts = []
for key in ("ai_call_id", "insight_id", "action_plan_id", "model", "provider"):
if trace.get(key) is not None:
trace_parts.append(f"{key}={trace[key]}")
if trace_parts:
lines += ["", f"<code>{escape(' | '.join(trace_parts))}</code>"]
return lines + [""]
def triaged_alert(base_event: Dict[str, Any], tier_label: str,
ai_summary: str, ai_cause: Optional[str] = None,
ai_actions: Optional[list] = None,
ai_executed: Optional[list] = None) -> tuple:
"""EA L1/L2 自主執行通知(保留原有介面,升級排版)"""
event_type = base_event.get("event_type", "alert")
title = escape(str(base_event.get("title", "")))
summary = escape(str(base_event.get("summary", "")))
event_id = base_event.get("id")
decision_envelope = base_event.get("decision_envelope") or base_event.get("decision")
if not event_id and isinstance(decision_envelope, dict):
event_id = decision_envelope.get("decision_id")
safe_ai_summary = escape(str(ai_summary or ""))
safe_ai_cause = escape(str(ai_cause or "")) if ai_cause else None
safe_actions = [escape(str(a)) for a in (ai_actions or [])]
safe_executed = [escape(str(a)) for a in (ai_executed or [])]
if event_type == "ea_escalation":
message = _format_ea_escalation_alert(
base_event=base_event,
tier_label=tier_label,
ai_summary=str(ai_summary or ""),
ai_cause=ai_cause,
ai_actions=ai_actions,
)
else:
lines = [
f"⚡ <b>{tier_label} · {event_type}</b>",
f"📌 {title}",
"",
]
if summary:
lines += [f"🔍 <b>概要:</b>{summary}", ""]
if safe_ai_summary:
lines += [f"🧠 <b>AI 摘要:</b>{safe_ai_summary[:400]}", ""]
if safe_ai_cause:
lines += [f"💡 <b>可能原因:</b>{safe_ai_cause}", ""]
if isinstance(decision_envelope, dict):
lines += _format_decision_envelope(decision_envelope)
if not event_id:
event_id = decision_envelope.get("decision_id")
if safe_actions:
lines += ["<b>📋 建議行動:</b>"] + [f"{a}" for a in safe_actions] + [""]
if safe_executed:
lines += ["<b>✅ 已執行:</b>"] + [f"{a}" for a in safe_executed] + [""]
trace = base_event.get("trace")
if trace:
lines.append(f"<pre>{trace[-400:]}</pre>")
message = "\n".join(lines)
# ADR-012: eig=event_ignorecallback_data 需小於 Telegram 64-byte 限制。
_eid = _callback_payload_utf8(event_id, max_bytes=52)
keyboard = {"inline_keyboard": [
[{"text": "🛑 忽略此事件",
"callback_data": f"momo:eig:{_eid}"}],
]}
return message, keyboard
def insight_summary_msg(insights: List[Dict], period: str = "近24h") -> str:
"""AI 洞察摘要彙整(供定期推播)"""
n = len(insights)
lines = [
f"💡 <b>AI 洞察摘要</b> · {period}{n}",
"━━━━━━━━━━━━━━━━━━━━",
]
type_icons = {
"price_alert": "🔴", "recommendation": "💰", "weekly_strategy": "📊",
"meta_analysis": "🤖", "market_opportunity": "🟢", "mcp_cache": "🌐",
}
for ins in insights[:6]:
icon = type_icons.get(ins.get("insight_type", ""), "💡")
content_preview = str(ins.get("content", ""))[:80].replace("\n", " ")
conf = float(ins.get("confidence", 0))
lines.append(f"{icon} {content_preview}… <i>({conf:.0%})</i>")
if n > 6:
lines.append(f"<i>…另有 {n-6} 筆洞察</i>")
lines.append("━━━━━━━━━━━━━━━━━━━━")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════════
# 舊版相容介面(保留供現有程式碼調用)
# ══════════════════════════════════════════════════════════════════════════════
def alert(title: str, content: str, actions: Optional[list] = None) -> str:
msg = system_alert_msg("error", title, content)
if actions:
msg += "\n" + "\n".join(f"{a}" for a in actions)
return msg
def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
detail_str = "\n".join(f" {k}: {v}" for k, v in (details or {}).items())
return system_alert_msg("warning", title, summary + ("\n" + detail_str if detail_str else ""))
def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
t_str = f" · {time}" if time else ""
return f" <b>{title}</b> [{module}]{t_str}\n\n{content}"
def success(title: str, module: str, stats: str = "") -> str:
return f"✅ <b>{title}</b> [{module}]\n{stats}"
def report(title: str, report_type: str, period: str, content_md: str) -> str:
icons = {"weekly_strategy": "📊", "daily": "📅", "monthly": "📆", "meta_analysis": "🤖"}
icon = icons.get(report_type, "📋")
return (
f"{icon} <b>{title}</b> ({report_type})\n"
f"📅 期間:{period}\n"
f"══════════════════════════\n"
f"{content_md}"
)
def _send_telegram(msg: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None) -> bool:
return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)
# ══════════════════════════════════════════════════════════════════════════════
# LLM Token 日報模板Operation Ollama-First v5.0 — Phase 1 收尾)
# 對應 services/token_report_service.py
# ══════════════════════════════════════════════════════════════════════════════
# Telegram 單訊息上限保險絲sendMessage HTML 上限為 4096 字元)
_DAILY_TOKEN_REPORT_MAX_CHARS = 4096
def daily_token_report(report_html: str,
footer_url: Optional[str] = None) -> str:
"""LLM Token 日報訊息包裝HTML parse_mode
本函數只負責「附上 footer + 截斷至 Telegram 上限」;報表本體由
services/token_report_service.generate_daily_report() 產出,已含 HTML escape。
Args:
report_html: 已 escape 的 HTML 報表字串
footer_url: 選填 admin 後台連結,會自動 escape
Returns:
≤ 4096 字元的 HTML 字串,安全送 Telegram
"""
body = report_html or ""
if footer_url:
body = f"{body}\n📎 <a href=\"{escape(footer_url)}\">詳細日誌</a>"
if len(body) <= _DAILY_TOKEN_REPORT_MAX_CHARS:
return body
# 超長 → 截斷並加省略尾(保留 80 字給 trailing notice
truncated = body[: _DAILY_TOKEN_REPORT_MAX_CHARS - 80]
return truncated + "\n\n... <i>(訊息過長已截斷;完整內容存於 ai_insights)</i>"
# ══════════════════════════════════════════════════════════════════════════════
# 決策回執模板L2 / L3 按鈕點擊後編輯原訊息使用)
# ADR-012 Phase 4審計留痕 — 操作者、時間、action、結果
# ══════════════════════════════════════════════════════════════════════════════
# Asia/TaipeiUTC+8— 容器可能跑 UTC這裡手動偏移避免依賴 tzdata
_TAIPEI_UTC_OFFSET_HOURS = 8
def _now_taipei_hhmm() -> str:
"""回傳 Asia/Taipei 的 HH:MM容器 TZ 不可靠,手動加 8h"""
try:
from datetime import timedelta, timezone
tz = timezone(timedelta(hours=_TAIPEI_UTC_OFFSET_HOURS))
return datetime.now(tz).strftime("%H:%M")
except Exception:
return datetime.now().strftime("%H:%M")
def _html_escape(s: Any) -> str:
"""最小 HTML 轉義Telegram HTML parse_mode 允許 <b><i><code><pre><a>
故只轉 < > & 避免使用者輸入破版)。"""
text = "" if s is None else str(s)
return (text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;"))
def decision_result(original_text: str, action: str, operator: str,
note: Optional[str] = None, **extras: Any) -> str:
"""L2 單品定價決策回執approve / reject 後編輯原訊息)。
Args:
original_text: 原 Telegram 訊息文字query.message.text
action: "approve" / "reject"
operator: 操作者顯示名稱user.full_name 或 id_<tg_id>
note: 選填補充說明(例:訓練保守策略)
**extras: 前向相容預留(目前忽略)
Returns:
原訊息 + 分隔線 + 稽核區塊HTML parse_mode 安全)
"""
act = (action or "").lower()
if act == "approve":
icon, label = "", "已執行"
elif act == "reject":
icon, label = "", "已拒絕"
else:
icon, label = "", f"已處理({_html_escape(action)}"
ts = _now_taipei_hhmm()
op_esc = _html_escape(operator or "unknown")
base = original_text or ""
lines = [
base,
"",
"━━━━━━━━━━━━━━━━━━━━",
f"{icon} <b>{label}</b> by <b>{op_esc}</b> at {ts}",
]
if note:
lines.append(f"📝 {_html_escape(note)}")
return "\n".join(lines)
def ops_action_result(original_text: str, action: str, operator: str,
result: Optional[Dict[str, Any]] = None,
**extras: Any) -> str:
"""L3 運維決策回執pause1h / pause6h / retry / resume 執行後編輯原訊息)。
Args:
original_text: 原 Telegram 訊息文字
action: "pause1h" / "pause6h" / "retry" / "resume" / 其他
operator: 操作者顯示名稱
result: OPS_ACTIONS 函數回傳 dict常見欄位
status: "ok" / "success" / "error" / "skipped"
error: 錯誤訊息(若 status=error
message / detail: 成功訊息
task_name, duration_min 等
**extras: 前向相容預留
Returns:
原訊息 + 分隔線 + 運維執行結果區塊
"""
action_labels = {
"pause1h": "暫停 1 小時",
"pause6h": "暫停 6 小時",
"retry": "立即重試",
"resume": "恢復執行",
"execute": "執行",
"skip": "略過",
"rollback": "回滾",
}
act_label = action_labels.get((action or "").lower(), _html_escape(action or "未知動作"))
result = result or {}
status = str(result.get("status", "")).lower()
if status in ("ok", "success", "done"):
status_icon, status_text = "", "成功"
elif status in ("error", "failed", "fail"):
status_icon, status_text = "", "失敗"
elif status == "skipped":
status_icon, status_text = "⏭️", "已略過"
else:
status_icon, status_text = "", status or "已提交"
ts = _now_taipei_hhmm()
op_esc = _html_escape(operator or "unknown")
# 擷取結果訊息err > message > detail
detail_msg: Optional[str] = None
for key in ("error", "message", "detail"):
if result.get(key):
detail_msg = str(result[key])[:300]
break
lines = [
original_text or "",
"",
"━━━━━━━━━━━━━━━━━━━━",
f"🛠️ <b>運維動作:</b>{act_label}",
f"👤 操作員:<b>{op_esc}</b> 🕐 {ts}",
f"{status_icon} <b>執行結果:</b>{_html_escape(status_text)}",
]
if detail_msg:
lines.append(f"📋 {_html_escape(detail_msg)}")
# 額外關鍵欄位task_name / duration_min 等)
task_name = result.get("task_name")
if task_name:
lines.append(f"📌 任務:<code>{_html_escape(task_name)}</code>")
duration_min = result.get("duration_min")
if duration_min:
lines.append(f"⏱️ 時長:{_html_escape(duration_min)} 分鐘")
return "\n".join(lines)