"""
services/telegram_templates.py
Telegram 訊息模板系統 v2
═══ 訊息分類 ═══════════════════════════════════════════════
🚨 告警類 — price_alert_msg / threat_alert_msg / system_alert_msg
📊 報告類 — daily_report_msg / weekly_report_msg / monthly_report_msg
💰 決策類 — price_decision / batch_decision_msg
🤖 系統類 — deploy_msg / heal_msg / meta_analysis_msg
💡 洞察類 — triaged_alert / insight_summary_msg
═══════════════════════════════════════════════════════════
規範:
1. 所有模板使用 HTML parse_mode
2. 一律繁體中文,Agent 名稱保留英文(Hermes/NemoTron/OpenClaw/EA)
3. 每則訊息須含:標題行 / 核心數據 / 建議行動(三段式)
4. 圖文訊息使用 send_photo_with_caption(附說明文字)
"""
import io
import json
import logging
import os
import re
from html import escape
from datetime import datetime
from typing import Any, Dict, List, Optional
sys_log = logging.getLogger("TelegramTpl")
TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS"
_TELEGRAM_HTML_BR_RE = re.compile(r"<\s*br\s*/?\s*>", re.IGNORECASE)
_TELEGRAM_HTML_TAG_RE = re.compile(r"<[^<>\n]{1,500}>")
_TELEGRAM_ALLOWED_HTML_TAG_RE = re.compile(
r"(?:?(?:b|strong|i|em|u|s|strike|del|code|pre)>||)",
re.IGNORECASE,
)
# ══════════════════════════════════════════════════════════════════════════════
# 基礎工具
# ══════════════════════════════════════════════════════════════════════════════
def _get_bot_token() -> Optional[str]:
from dotenv import load_dotenv
load_dotenv()
return os.getenv(TELEGRAM_BOT_TOKEN_ENV)
def _get_chat_ids() -> list:
token = _get_bot_token()
if not token:
sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV)
return []
raw = os.getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
try:
return json.loads(raw)
except json.JSONDecodeError:
sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV)
return []
def _sanitize_telegram_html(text: str, parse_mode: Optional[str] = "HTML") -> str:
"""Telegram HTML 只保留白名單標籤,其餘轉成文字避免 sendMessage 400。"""
value = str(text or "")
if parse_mode and str(parse_mode).upper() == "HTML":
value = _normalize_telegram_html_linebreaks(value)
return _TELEGRAM_HTML_TAG_RE.sub(_escape_unsupported_telegram_html_tag, value)
return value
def _normalize_telegram_html_linebreaks(text: str) -> str:
return _TELEGRAM_HTML_BR_RE.sub("\n", str(text or ""))
def _escape_unsupported_telegram_html_tag(match: re.Match) -> str:
tag = match.group(0)
if _TELEGRAM_ALLOWED_HTML_TAG_RE.fullmatch(tag):
return tag
return escape(tag)
def _callback_payload_utf8(value: Any, max_bytes: int = 52) -> str:
"""Clamp callback payload by UTF-8 bytes without splitting multibyte chars."""
text = str(value or "unknown").strip() or "unknown"
encoded = text.encode("utf-8")
if len(encoded) <= max_bytes:
return text
clipped = encoded[:max_bytes]
while clipped:
try:
decoded = clipped.decode("utf-8").strip()
return decoded or "unknown"
except UnicodeDecodeError:
clipped = clipped[:-1]
return "unknown"
def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: Optional[str] = "HTML") -> bool:
"""發送純文字訊息"""
import requests
token = _get_bot_token()
if not token:
return False
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311] # fallback
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_ids[0], "text": _sanitize_telegram_html(text, parse_mode)}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
r = requests.post(url, json=payload, timeout=10)
if not r.ok:
sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200])
return False
return True
except Exception as e:
sys_log.error("[TelegramTpl] send 失敗: %s", e)
return False
def send_telegram_with_result(text: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None,
parse_mode: Optional[str] = "HTML") -> Dict[str, Any]:
"""發送 Telegram 並回傳結果明細,供 EventRouter / AIOps 使用。"""
token = _get_bot_token()
if not token:
return {"ok": False, "sent": 0, "failed": 0, "chat_ids": [], "errors": ["token_missing"]}
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311]
import requests
url = f"https://api.telegram.org/bot{token}/sendMessage"
sent = 0
failed = 0
errors: List[str] = []
for chat_id in chat_ids:
payload = {"chat_id": chat_id, "text": _sanitize_telegram_html(text, parse_mode)}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
try:
response = requests.post(url, json=payload, timeout=10)
if response.ok:
sent += 1
else:
failed += 1
errors.append(f"{chat_id}:HTTP {response.status_code}")
sys_log.warning("[TelegramTpl] sendMessage chat=%s HTTP %s: %s",
chat_id, response.status_code, response.text[:200])
except Exception as e:
failed += 1
errors.append(f"{chat_id}:{type(e).__name__}")
sys_log.error("[TelegramTpl] send chat=%s 失敗: %s", chat_id, e)
return {
"ok": sent > 0 and failed == 0,
"sent": sent,
"failed": failed,
"chat_ids": list(chat_ids),
"errors": errors,
}
def send_photo(photo_bytes: bytes, caption: str = "",
chat_ids: Optional[list] = None,
parse_mode: str = "HTML") -> bool:
"""發送圖片(含說明文字),供圖表報告使用"""
import requests
token = _get_bot_token()
if not token or not photo_bytes:
return False
if chat_ids is None:
chat_ids = _get_chat_ids()
if not chat_ids:
chat_ids = [-1003940688311]
url = f"https://api.telegram.org/bot{token}/sendPhoto"
try:
r = requests.post(
url,
data={"chat_id": chat_ids[0], "caption": _sanitize_telegram_html(caption, parse_mode)[:1024],
"parse_mode": parse_mode},
files={"photo": ("chart.png", photo_bytes, "image/png")},
timeout=30,
)
if not r.ok:
sys_log.warning("[TelegramTpl] sendPhoto HTTP %s: %s", r.status_code, r.text[:200])
return False
return True
except Exception as e:
sys_log.error("[TelegramTpl] sendPhoto 失敗: %s", e)
return False
def send_report_with_charts(text_msg: str, charts: List[Optional[bytes]],
chat_ids: Optional[list] = None) -> bool:
"""先發文字報告,再逐張發送圖表"""
ok = _send_telegram_raw(text_msg, chat_ids=chat_ids)
for i, chart in enumerate(charts):
if chart:
send_photo(chart, caption=f"圖表 {i+1}/{len(charts)}", chat_ids=chat_ids)
return ok
# ══════════════════════════════════════════════════════════════════════════════
# 🚨 告警類模板
# ══════════════════════════════════════════════════════════════════════════════
def price_alert_msg(threats: List[Dict], analysis_period: str = "近48h") -> str:
"""
競品削價告警(Hermes + NemoTron 偵測結果)
┌─────────────────────────────────┐
│ 🚨 競品削價告警 · N 個 SKU │
│ ─────────────────────────────── │
│ ⚠️ SKU名稱 MOMO $X → 競品 $Y │
│ 價差 +X% 業績跌幅 -Y% │
└─────────────────────────────────┘
"""
n = len(threats)
high = [t for t in threats if t.get("risk") == "HIGH"]
lines = [
f"🚨 競品削價告警 · 偵測到 {n} 個 SKU [{analysis_period}]",
f"🔴 高危:{len(high)} 個 🟡 中危:{n - len(high)} 個",
"─" * 32,
]
for t in threats[:8]:
risk_icon = "🔴" if t.get("risk") == "HIGH" else "🟡"
name = str(t.get("name", t.get("sku", "")))[:22]
gap = float(t.get("gap_pct", 0))
delta = float(t.get("sales_7d_delta_pct", t.get("sales_delta", 0)))
momo_p = t.get("momo_price")
pchome_p = t.get("pchome_price")
price_str = f" MOMO ${momo_p:,.0f} vs 競品 ${pchome_p:,.0f}" \
if momo_p and pchome_p else ""
lines.append(
f"{risk_icon} {name}\n"
f" 價差 {gap:+.1f}% 業績週跌 {delta:+.1f}%{price_str}"
)
if n > 8:
lines.append(f"…另有 {n-8} 個 SKU(查看完整報告)")
lines += ["─" * 32,
"💡 建議:優先處理紅色高危品項,確認競品是否促銷或長期低價"]
return "\n".join(lines)
def threat_alert_msg(sku: str, name: str, momo_price: float,
comp_price: float, gap_pct: float,
sales_delta: float, action: str, confidence: float) -> str:
"""單品威脅告警(NemoTron 派發)"""
risk = "🔴 高危" if gap_pct > 15 else "🟡 中危" if gap_pct > 5 else "🟢 低危"
return (
f"{risk} 競品威脅通報\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🏷️ {name} {sku}\n\n"
f"💴 MOMO NT${momo_price:,.0f}\n"
f"💴 競品(PChome)NT${comp_price:,.0f}\n"
f"📉 價差 {gap_pct:+.1f}%(我方較貴)\n"
f"📊 業績週變 {sales_delta:+.1f}%\n\n"
f"🤖 AI 建議:{action}\n"
f"📈 信心度:{confidence:.0%}\n"
f"━━━━━━━━━━━━━━━━━━━━"
)
def system_alert_msg(level: str, title: str, detail: str, source: str = "") -> str:
"""系統級告警(AIOps / 部署失敗等)"""
icons = {"critical": "🚨", "error": "❌", "warning": "⚠️", "info": "ℹ️"}
icon = icons.get(level, "⚠️")
src = f" [{source}]" if source else ""
return (
f"{icon} {title}{src}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"{detail[:600]}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
)
# ══════════════════════════════════════════════════════════════════════════════
# 📊 報告類模板(三種週期)
# ══════════════════════════════════════════════════════════════════════════════
def daily_report_header(date_str: str, revenue: float, wow: float,
threat_count: int, opportunity_count: int) -> str:
"""
日報標題卡(附圖前發送的文字說明)
"""
wow_icon = "📈" if wow >= 0 else "📉"
wow_color = "+" if wow >= 0 else ""
return (
f"📊 EwoooC 電商日報 · {date_str}\n"
f"══════════════════════════\n"
f"💰 今日業績 NT${revenue/10000:.1f} 萬\n"
f"{wow_icon} 週同比 {wow_color}{wow:.1f}%\n"
f"🔴 競品威脅 {threat_count} 個 SKU\n"
f"🟢 市場機會 {opportunity_count} 個 SKU\n"
f"══════════════════════════\n"
f"🤖 Hermes + NemoTron + OpenClaw 聯合分析"
)
def weekly_report_header(period: str, curr_rev: float, prev_rev: float,
wow: float, top_category: str) -> str:
"""週報標題卡"""
wow_icon = "📈" if wow >= 0 else "📉"
return (
f"📊 EwoooC 電商週報 · {period}\n"
f"══════════════════════════\n"
f"💰 本週業績 NT${curr_rev/10000:.1f} 萬\n"
f"📦 前週業績 NT${prev_rev/10000:.1f} 萬\n"
f"{wow_icon} 週成長率 {wow:+.1f}%\n"
f"🏆 最強品類 {top_category}\n"
f"══════════════════════════\n"
f"🤖 OpenClaw × MCP 全景策略分析"
)
def monthly_report_header(month_str: str, revenue: float, mom: float,
yoy: float, top3_categories: List[str]) -> str:
"""月報標題卡"""
mom_icon = "📈" if mom >= 0 else "📉"
yoy_icon = "🚀" if yoy >= 10 else "📈" if yoy >= 0 else "📉"
cats = " / ".join(top3_categories[:3])
return (
f"📅 EwoooC 電商月報 · {month_str}\n"
f"══════════════════════════\n"
f"💰 月度業績 NT${revenue/10000:.0f} 萬\n"
f"{mom_icon} 月成長率 {mom:+.1f}% {yoy_icon} 年成長 {yoy:+.1f}%\n"
f"🏆 TOP 3 品類 {cats}\n"
f"══════════════════════════\n"
f"🤖 OpenClaw × Hermes × MCP 月度全景洞察"
)
def report_section(icon: str, title: str, lines: List[str]) -> str:
"""通用報告節段(供日/週/月報各節使用)"""
body = "\n".join(f" {l}" for l in lines)
return f"\n{icon} {title}\n{body}"
# ══════════════════════════════════════════════════════════════════════════════
# 💰 決策類模板
# ══════════════════════════════════════════════════════════════════════════════
def price_decision(product_name: str, product_sku: str,
current_price: float, suggested_price: float,
reason: str, insight_id: Optional[int] = None,
report_url: Optional[str] = None) -> tuple:
"""降 / 提價決策通知(含 Inline Keyboard)"""
diff = current_price - suggested_price
action_text = f"降價 NT${diff:,.0f}" if diff > 0 else \
f"提價 NT${-diff:,.0f}" if diff < 0 else "維持現價"
direction = "📉" if diff > 0 else "📈" if diff < 0 else "➡️"
safe_name = escape(str(product_name or ""))
safe_sku = escape(str(product_sku or ""))
safe_reason = escape(_normalize_telegram_html_linebreaks(str(reason or "")))
message = (
f"💰 AI 定價決策建議\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🏷️ {safe_name} {safe_sku}\n\n"
f"現價:NT${current_price:,.0f}\n"
f"建議:NT${suggested_price:,.0f} {direction} {action_text}\n\n"
f"💡 依據:{safe_reason}\n"
)
if insight_id:
message += f"🔗 洞察 ID:{insight_id}\n"
if report_url:
message += f"📎 查看分析報表\n"
message += f"━━━━━━━━━━━━━━━━━━━━"
# ADR-012: callback_data 採短 prefix(momo:pa/pr:{insight_id})— 64-byte 安全、與 L2 handler 一致
# 降級:若無 insight_id(不應發生,但保底),以 sku 做後綴 — sku 仍可能超長,呼叫者有責任提供 insight_id
_cb_id = str(insight_id) if insight_id else f"sku_{product_sku}"[:40]
keyboard = {"inline_keyboard": [[
{"text": "✅ 確認執行", "callback_data": f"momo:pa:{_cb_id}"},
{"text": "❌ 拒絕", "callback_data": f"momo:pr:{_cb_id}"},
]]}
return message, keyboard
def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple:
"""批次定價決策(多 SKU 一次確認)"""
lines = [f"💰 批次定價決策 · 共 {len(items)} 個 SKU\n━━━━━━━━━━━━━━━━━━━━"]
for i, item in enumerate(items[:6], 1):
diff = item.get("current_price", 0) - item.get("suggested_price", 0)
direction = "📉" if diff > 0 else "📈"
lines.append(
f"{i}. {direction} {item.get('name','')[:20]}\n"
f" ${item.get('current_price',0):,.0f} → ${item.get('suggested_price',0):,.0f}"
)
if len(items) > 6:
lines.append(f"…另有 {len(items)-6} 個 SKU")
lines.append("━━━━━━━━━━━━━━━━━━━━")
# ADR-012: 短 prefix bpa=batch_price_approve / bpr=batch_price_reject,預留 64-byte buffer
_bid = str(batch_id)[:48]
keyboard = {"inline_keyboard": [[
{"text": f"✅ 全部確認({len(items)}項)",
"callback_data": f"momo:bpa:{_bid}"},
{"text": "❌ 取消",
"callback_data": f"momo:bpr:{_bid}"},
]]}
return "\n".join(lines), keyboard
# ══════════════════════════════════════════════════════════════════════════════
# 🧠 Phase 11 RAG 反饋(v5.0 護欄 #1:強制晉升門檻 — Stage 4 人工驗收)
# ══════════════════════════════════════════════════════════════════════════════
def rag_feedback_keyboard(rag_query_log_id: int) -> dict:
"""產生 RAG 命中後的 👍/👎 反饋鍵盤(callback prefix 'rag_fb:')。
callback_data 設計(與 ADR-012 短 prefix 規範一致,<= 64 byte):
rag_fb:{log_id}:5 → 👍(feedback_score=5)
rag_fb:{log_id}:1 → 👎(feedback_score=1)
使用範例(caller 拿 RAGResult 後):
from services.telegram_templates import rag_feedback_keyboard
send_message(chat_id, rag.synthesize(),
keyboard=rag_feedback_keyboard(rag.log_id))
"""
_id = int(rag_query_log_id) if rag_query_log_id else 0
return {
'inline_keyboard': [[
{'text': '👍 有用', 'callback_data': f'rag_fb:{_id}:5'},
{'text': '👎 沒用', 'callback_data': f'rag_fb:{_id}:1'},
]],
}
def promotion_review_keyboard(episode_id: int) -> dict:
"""蒸餾池高權重晉升人工驗收鍵盤(PromotionGate Stage 4)。
callback_data:
pg_ok:{episode_id} → 通過 → 寫 ai_insights
pg_no:{episode_id} → 駁回 → rejected_human
"""
_id = int(episode_id) if episode_id else 0
return {
'inline_keyboard': [[
{'text': '✅ 通過晉升', 'callback_data': f'pg_ok:{_id}'},
{'text': '🚫 駁回', 'callback_data': f'pg_no:{_id}'},
]],
}
# ══════════════════════════════════════════════════════════════════════════════
# 🤖 系統類模板
# ══════════════════════════════════════════════════════════════════════════════
def deploy_msg(status: str, branch: str, commit: str, duration: str = "") -> str:
"""CI/CD 部署通知"""
icons = {"success": "✅", "failed": "❌", "started": "🚀"}
icon = icons.get(status, "ℹ️")
dur = f" 耗時 {duration}" if duration else ""
return (
f"{icon} 部署{status.upper()}{dur}\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"🌿 分支:{branch}\n"
f"🔖 Commit:{commit[:12]}\n"
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"
)
def heal_msg(status: str, error_type: str, target_file: str,
commit_sha: str = "", diff_lines: int = 0) -> str:
"""AiderHeal 自動修復通知"""
if status == "started":
icon, title = "🔧", "AiderHeal 啟動"
elif status == "success":
icon, title = "✅", "AiderHeal 修復完成"
elif status == "reverted":
icon, title = "🔄", "AiderHeal 自動回滾"
else:
icon, title = "❌", "AiderHeal 失敗"
lines = [f"{icon} {title}", "━━━━━━━━━━━━━━━━━━━━",
f"🐛 錯誤類型:{error_type}",
f"📄 目標檔案:{target_file}"]
if commit_sha:
lines.append(f"🔖 Commit:{commit_sha[:12]}")
if diff_lines:
lines.append(f"📝 修改行數:{diff_lines} 行")
lines += ["━━━━━━━━━━━━━━━━━━━━",
f"🕐 {datetime.now().strftime('%m/%d %H:%M')}"]
return "\n".join(lines)
def meta_analysis_msg(period: str, content: str) -> str:
"""AI 系統自我審視報告"""
return (
f"🤖 AI 系統效能自我審視 · {period}\n"
f"══════════════════════════\n"
f"{content[:1200]}\n"
f"══════════════════════════\n"
f"由 OpenClaw 定期分析 · {datetime.now().strftime('%m/%d %H:%M')}"
)
# ══════════════════════════════════════════════════════════════════════════════
# 💡 洞察類模板
# ══════════════════════════════════════════════════════════════════════════════
def _short_text(value: Any, limit: int = 120) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return text[: max(0, limit - 1)].rstrip() + "…"
def _split_action_parts(action: Any) -> List[str]:
return [
part.strip()
for part in re.split(r"\s*[||]\s*", str(action or ""))
if part and part.strip()
]
def _parse_ea_action(action: Any) -> Dict[str, Any]:
parts = _split_action_parts(action)
item: Dict[str, Any] = {
"raw": str(action or ""),
"title": parts[0] if parts else str(action or ""),
"notes": [],
}
title_match = re.match(r"^\[([^\]]+)\]\s*(.+)$", item["title"])
if title_match:
item["sku"] = title_match.group(1).strip()
item["name"] = title_match.group(2).strip()
for part in parts[1:]:
if "MOMO" in part and "PChome" in part:
item["comparison"] = part
momo = re.search(r"MOMO\s*\$([0-9,]+)", part)
pchome = re.search(r"PChome\s*\$([0-9,]+)", part)
pct = re.search(r"\(([+-]?\d+(?:\.\d+)?)%\)", part)
if momo:
item["momo_price"] = momo.group(1)
if pchome:
item["pchome_price"] = pchome.group(1)
if pct:
item["gap_pct"] = float(pct.group(1))
item["gap_pct_text"] = f"{float(pct.group(1)):+.1f}%"
elif part.startswith("PChome "):
item["pchome_id"] = part.replace("PChome", "", 1).strip()
elif part.startswith("建議"):
item["action"] = part
elif part.startswith("證據"):
item["evidence"] = part.replace("證據", "", 1).strip(" ::")
elif "NT$" in part or "流失" in part or "價差" in part or "價格優勢" in part:
item["impact"] = part
amount = re.search(r"NT\$\s*([0-9,]+)", part)
if amount:
try:
item["impact_amount"] = int(amount.group(1).replace(",", ""))
except ValueError:
pass
else:
item["notes"].append(part)
return item
def _format_ea_risk_summary(actions: List[Dict[str, Any]]) -> List[str]:
count = len(actions)
gap_values = [a["gap_pct"] for a in actions if isinstance(a.get("gap_pct"), (int, float))]
amounts = [a["impact_amount"] for a in actions if isinstance(a.get("impact_amount"), int)]
lines = [f"• 待審 SKU:{count} 件"]
if gap_values:
low, high = min(gap_values), max(gap_values)
if low == high:
lines.append(f"• 價差幅度:{low:+.1f}%")
else:
lines.append(f"• 價差範圍:{low:+.1f}%~{high:+.1f}%")
if amounts:
lines.append(f"• 最大單件價差:NT$ {max(amounts):,}")
lines.append("• 核心判斷:先確認同款 identity_v2,再決定跟價、促銷或曝光")
return lines
def _is_ea_sku_action(item: Dict[str, Any]) -> bool:
return bool(
item.get("sku")
or item.get("comparison")
or item.get("momo_price")
or item.get("pchome_price")
)
def _format_ea_action_card(item: Dict[str, Any], index: int) -> List[str]:
sku = escape(str(item.get("sku") or ""))
name = escape(_short_text(item.get("name") or item.get("title") or "", 58))
heading = f"{index}. [{sku}] {name}" if sku else f"{index}. {name}"
lines = [heading]
momo_price = item.get("momo_price")
pchome_price = item.get("pchome_price")
if momo_price or pchome_price:
momo_text = f"${momo_price}" if momo_price else "—"
pchome_text = f"${pchome_price}" if pchome_price else "—"
lines.append(f" MOMO:{momo_text} PChome:{pchome_text}")
gap_text = item.get("gap_pct_text")
impact = item.get("impact")
if gap_text or impact:
impact_text = escape(str(impact or ""))
if gap_text and impact_text:
lines.append(f" 價差:{escape(str(gap_text))} {impact_text}")
elif gap_text:
lines.append(f" 價差:{escape(str(gap_text))}")
else:
lines.append(f" 影響:{impact_text}")
evidence = item.get("evidence")
if evidence:
lines.append(f" 證據:{escape(_short_text(evidence, 96))}")
action = str(item.get("action") or "").replace("建議", "", 1).strip(" ::")
if action:
lines.append(f" 動作:{escape(_short_text(action, 86))}")
if item.get("pchome_id"):
lines.append(f" PChome:{escape(_short_text(item['pchome_id'], 40))}")
return lines
def _format_ea_generic_action(item: Dict[str, Any], index: int) -> str:
text = item.get("raw") or item.get("title") or ""
return f"{index}. {escape(_short_text(text, 140))}"
def _format_ea_escalation_alert(
*,
base_event: Dict[str, Any],
tier_label: str,
ai_summary: str,
ai_cause: Optional[str],
ai_actions: Optional[list],
) -> str:
event_type = escape(str(base_event.get("event_type", "ea_escalation")))
title = escape(str(base_event.get("title", "EA 升級審核")))
summary = escape(str(base_event.get("summary", "")))
cause_parts = [
escape(part.strip())
for part in str(ai_cause or "").split("|")
if part and part.strip()
]
parsed_actions = [_parse_ea_action(action) for action in (ai_actions or [])]
sku_actions = [item for item in parsed_actions if _is_ea_sku_action(item)]
generic_actions = [item for item in parsed_actions if not _is_ea_sku_action(item)]
shown_actions = sku_actions[:5]
hidden_count = max(0, len(sku_actions) - len(shown_actions))
decision_envelope = base_event.get("decision_envelope") or base_event.get("decision")
lines = [
f"⚡ {escape(str(tier_label))}",
f"📌 {title}",
f"{event_type}",
"━━━━━━━━━━━━━━━━━━━━",
"🧭 決策狀態",
]
if summary:
lines.append(f"• {summary}")
for part in cause_parts[:3]:
lines.append(f"• {part}")
if isinstance(decision_envelope, dict) and decision_envelope:
lines += ["", *_format_decision_envelope(decision_envelope)]
if ai_summary:
lines += [
"",
"🧠 背景摘要",
f"• {escape(_short_text(ai_summary, 280))}",
]
if sku_actions:
lines += [
"",
"📊 風險摘要",
*_format_ea_risk_summary(sku_actions),
"",
"📋 TOP 待審 SKU",
]
for idx, item in enumerate(shown_actions, start=1):
if idx > 1:
lines.append("")
lines.extend(_format_ea_action_card(item, idx))
if hidden_count:
lines.append(f"\n另有 {hidden_count} 件,請至觀測台查看完整清單。")
lines += [
"",
"✅ 建議處置",
"• 先人工確認 PChome identity_v2 與規格一致",
"• 同款:評估跟價、組合促銷或加強 PChome 價格優勢曝光",
"• 非同款:標記待審,避免進入自動調價或簡報決策",
]
elif generic_actions:
lines += [
"",
"📋 待確認事項",
*[
_format_ea_generic_action(item, idx)
for idx, item in enumerate(generic_actions[:5], start=1)
],
"",
"✅ 建議處置",
"• 先確認資料來源、最近錯誤紀錄與觀測台狀態",
"• 補齊可審核證據後再批准執行",
"• 未取得實證前,不執行自動調價、修復或策略派發",
]
else:
lines += [
"",
"✅ 建議處置",
"• 先確認資料來源與最近錯誤紀錄",
"• 補齊可審核證據後再批准執行",
]
return "\n".join(lines)
def _numeric_value(value: Any) -> Optional[float]:
if value is None or value == "":
return None
try:
text = str(value).strip().replace(",", "")
text = text.replace("NT$", "").replace("$", "").replace("%", "")
return float(text)
except (TypeError, ValueError):
return None
def _format_money_value(value: Any) -> str:
number = _numeric_value(value)
if number is None or number <= 0:
return ""
if number == int(number):
return f"NT$ {int(number):,}"
return f"NT$ {number:,.2f}"
def _format_percent_value(value: Any) -> str:
number = _numeric_value(value)
if number is None:
text = str(value or "").strip()
return escape(text) if text else ""
return f"{number:+.1f}%"
def _evidence_items(envelope: Dict[str, Any]) -> List[Dict[str, Any]]:
raw_items = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
return [item for item in raw_items if isinstance(item, dict)]
def _find_evidence(envelope: Dict[str, Any], metric: str) -> Optional[Dict[str, Any]]:
for item in _evidence_items(envelope):
if str(item.get("metric") or item.get("type") or "") == metric:
return item
return None
def _is_price_decision_envelope(envelope: Dict[str, Any]) -> bool:
decision_type = str(envelope.get("decision_type") or "").lower()
if decision_type in {"price_alert", "pchome_match_review", "competitor_price_review"}:
return True
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
return bool(
subject.get("competitor_product_id")
or subject.get("competitor_price")
or subject.get("pchome_price")
or _find_evidence(envelope, "candidate_gap_pct")
or _find_evidence(envelope, "unit_price_gap_pct")
)
def _action_label(action_code: str) -> str:
labels = {
"price_follow_review": "確認是否跟價或改用促銷防守",
"review_accept_identity": "人工確認同款後採納 identity",
"review_catalog_comparable": "依型錄證據覆核可比性",
"unit_price_required": "改用單位價覆核,不寫總價型價差",
"identity_or_price_review": "先確認身份,再判斷價格處置",
"verify_or_reject_identity": "確認候選是否同款;非同款即駁回",
"compare_existing_identity": "比較既有正式 identity 與新候選",
"refresh_or_compare_identity": "刷新過期 identity 後再覆核",
"needs_research": "補搜尋或補證據後再判斷",
"human_review": "人工覆核",
}
return labels.get(action_code or "", action_code or "人工覆核")
_PRICE_MATCH_TYPE_LABELS = {
"exact": "高信心同款",
"same_product_different_pack": "同商品不同包裝",
"same_line_variant": "同系列不同款",
"comparable": "可比但需覆核",
"no_match": "非同款",
}
_PRICE_BASIS_LABELS = {
"total_price": "總價可比",
"unit_price": "單位價可比",
"manual_review": "人工覆核後可比",
"none": "不可比",
}
_PRICE_ALERT_TIER_LABELS = {
"price_alert_exact": "可直接價格告警",
"unit_price_review": "單位價覆核",
"identity_review": "身份覆核",
"suppress": "壓制告警",
}
def _price_match_path(envelope: Dict[str, Any]) -> tuple[str, str, str]:
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
match_type = str(guardrails.get("match_type") or "")
price_basis = str(guardrails.get("price_basis") or "")
alert_tier = str(guardrails.get("alert_tier") or "")
if match_type and price_basis and alert_tier:
return match_type, price_basis, alert_tier
match_evidence = _find_evidence(envelope, "match_score") or {}
basis = str(match_evidence.get("basis") or "")
parts = [part.strip() for part in basis.split("/") if part.strip()]
if len(parts) >= 3:
return match_type or parts[0], price_basis or parts[1], alert_tier or parts[2]
return match_type, price_basis, alert_tier
def _price_notification_guidance(match_type: str, price_basis: str, alert_tier: str) -> tuple[str, str]:
if match_type == "exact" and price_basis == "total_price" and alert_tier == "price_alert_exact":
return "直接價格威脅", "可用總價比較;先確認庫存與促銷期,再人工決定跟價或促銷防守。"
if price_basis == "unit_price" or alert_tier == "unit_price_review":
return "單位價覆核", "先換算單位價與入數,禁止用總價直接判定價格威脅。"
if alert_tier == "identity_review" or price_basis == "manual_review":
return "身份覆核", "先確認同款、規格、組合與前台狀態,人工採納後才可寫入正式價差。"
if alert_tier == "suppress" or match_type == "no_match" or price_basis == "none":
return "壓制告警", "目前不可作為價格威脅;保留診斷紀錄,避免誤報。"
return "可比性待判讀", "依比對證據人工覆核,未確認前不自動調價、不覆蓋正式 identity。"
def _format_price_decision_envelope(envelope: Dict[str, Any]) -> List[str]:
"""將價格/競品決策信封排成可讀的專業 brief。"""
severity = escape(str(envelope.get("severity") or "info"))
decision_type = escape(str(envelope.get("decision_type") or "price_review"))
confidence = _numeric_value(envelope.get("confidence"))
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
expected = envelope.get("expected_impact") if isinstance(envelope.get("expected_impact"), dict) else {}
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
recommended_action = envelope.get("recommended_action") if isinstance(envelope.get("recommended_action"), dict) else {}
data_quality = escape(str(guardrails.get("data_quality") or envelope.get("data_quality") or "unknown"))
can_auto_execute = bool(guardrails.get("can_auto_execute", False))
blocked_reason = escape(str(guardrails.get("blocked_reason") or ""))
confidence_text = f" 信心度:{confidence:.0%}" if confidence is not None else ""
lines = [
"🧭 決策信封",
f"• 類型:{decision_type} 嚴重度:{severity}{confidence_text}",
f"• 資料品質:{data_quality} 自動執行:{'允許' if can_auto_execute else '不允許'}",
]
if blocked_reason:
lines.append(f"• 邊界:{blocked_reason}")
match_type, price_basis, alert_tier = _price_match_path(envelope)
if match_type or price_basis or alert_tier:
guidance_title, guidance_text = _price_notification_guidance(match_type, price_basis, alert_tier)
path_labels = [
_PRICE_MATCH_TYPE_LABELS.get(match_type, match_type) if match_type else "",
_PRICE_BASIS_LABELS.get(price_basis, price_basis) if price_basis else "",
_PRICE_ALERT_TIER_LABELS.get(alert_tier, alert_tier) if alert_tier else "",
]
lines += [
"",
"🚦 通知分級",
f"• 判讀:{escape(guidance_title)}",
f"• 路徑:{escape(' / '.join(part for part in path_labels if part))}",
f"• 邊界:{escape(guidance_text)}",
]
sku = escape(str(subject.get("sku") or ""))
name = escape(_short_text(subject.get("name") or "", 96))
competitor_id = escape(str(subject.get("competitor_product_id") or subject.get("pchome_id") or ""))
competitor_name = escape(_short_text(subject.get("competitor_product_name") or subject.get("pchome_name") or "", 96))
target_lines = []
if sku:
target_lines.append(f"• SKU:{sku}")
if name:
target_lines.append(f"• MOMO:{name}")
if competitor_id:
target_lines.append(f"• PChome:{competitor_id}")
if competitor_name:
target_lines.append(f"• 候選:{competitor_name}")
if target_lines:
lines += ["", "🎯 標的", *target_lines]
momo_price = (
subject.get("momo_price")
or expected.get("momo_price")
or (_find_evidence(envelope, "momo_price") or {}).get("value")
)
competitor_price = (
subject.get("competitor_price")
or subject.get("pchome_price")
or expected.get("competitor_price")
or expected.get("candidate_price")
or expected.get("pchome_price")
or (_find_evidence(envelope, "pchome_price") or {}).get("value")
)
gap_pct = (
expected.get("candidate_gap_pct")
if expected.get("candidate_gap_pct") is not None
else (_find_evidence(envelope, "candidate_gap_pct") or {}).get("value")
)
gap_amount = expected.get("gap_amount")
price_lines = []
momo_price_text = _format_money_value(momo_price)
competitor_price_text = _format_money_value(competitor_price)
if momo_price_text or competitor_price_text:
price_lines.append(
f"• MOMO:{momo_price_text or '—'} PChome:{competitor_price_text or '—'}"
)
gap_text = _format_percent_value(gap_pct)
gap_amount_text = _format_money_value(gap_amount)
if gap_text or gap_amount_text:
detail = []
if gap_text:
detail.append(f"{gap_text}")
if gap_amount_text:
detail.append(gap_amount_text)
price_lines.append(f"• 價差:{' / '.join(detail)}(正值代表 MOMO 較貴)")
unit_insight = expected.get("unit_price_insight")
if isinstance(unit_insight, dict) and unit_insight:
unit_summary = escape(_short_text(unit_insight.get("summary") or "", 120))
if unit_summary:
price_lines.append(f"• 單位價:{unit_summary}")
if price_lines:
lines += ["", "📊 價格證據", *price_lines]
evidence_lines = []
match_evidence = _find_evidence(envelope, "match_score")
if match_evidence:
score = match_evidence.get("value")
basis = escape(str(match_evidence.get("basis") or ""))
score_text = escape(str(score if score is not None else ""))
evidence_lines.append(f"• Match:{score_text}" + (f" {basis}" if basis else ""))
reason_evidence = _find_evidence(envelope, "reasons")
if reason_evidence:
evidence_lines.append(f"• 診斷:{escape(_short_text(reason_evidence.get('value') or '', 130))}")
conflict = expected.get("existing_match_conflict")
if isinstance(conflict, dict) and conflict:
incoming = escape(str(conflict.get("incoming_product_id") or "unknown"))
existing = escape(str(conflict.get("existing_product_id") or "unknown"))
delta_value = _numeric_value(conflict.get("score_delta"))
delta = f"{delta_value:+.3f}" if delta_value is not None else ""
evidence_lines.append(f"• 既有保護:新候選 {incoming} vs 既有 {existing}" + (f" delta {delta}" if delta else ""))
if evidence_lines:
lines += ["", "🧩 比對證據", *evidence_lines]
difference_highlights = envelope.get("difference_highlights")
if isinstance(difference_highlights, list) and difference_highlights:
diff_lines = []
for row in difference_highlights[:3]:
if not isinstance(row, dict):
continue
dimension = escape(str(row.get("dimension") or row.get("label") or "差異"))
left = escape(_short_text(row.get("left") or row.get("momo") or "", 42))
right = escape(_short_text(row.get("right") or row.get("pchome") or "", 42))
if left or right:
diff_lines.append(f"• {dimension}:MOMO {left or '—'} / PChome {right or '—'}")
else:
note = escape(_short_text(row.get("note") or row.get("summary") or "", 84))
if note:
diff_lines.append(f"• {dimension}:{note}")
if diff_lines:
lines += ["", "⚖️ 差異提醒", *diff_lines]
action_code = str(recommended_action.get("action") or "human_review")
owner = escape(str(recommended_action.get("owner") or "未指定"))
requires_hitl = bool(recommended_action.get("requires_hitl", True))
lines += [
"",
"✅ 人工下一步",
f"• {_action_label(action_code)}",
f"• 動作:{escape(action_code)} 負責:{owner} HITL:{'需要' if requires_hitl else '不需要'}",
]
trace = envelope.get("trace")
if isinstance(trace, dict):
trace_parts = []
for key in ("ai_call_id", "insight_id", "action_plan_id", "source", "attempted_at", "model", "provider"):
if trace.get(key) is not None:
trace_parts.append(f"{key}={trace[key]}")
if trace_parts:
lines += ["", f"{escape(' | '.join(trace_parts))}"]
return lines + [""]
def _format_decision_envelope(envelope: Dict[str, Any]) -> List[str]:
"""將 12 Agent 共用決策信封轉成可審核的 Telegram 區塊。"""
if not isinstance(envelope, dict) or not envelope:
return []
if _is_price_decision_envelope(envelope):
return _format_price_decision_envelope(envelope)
severity = escape(str(envelope.get("severity") or "info"))
decision_type = escape(str(envelope.get("decision_type") or "general"))
confidence = envelope.get("confidence")
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
data_quality = escape(str(guardrails.get("data_quality") or envelope.get("data_quality") or "unknown"))
can_auto_execute = bool(guardrails.get("can_auto_execute", False))
blocked_reason = escape(str(guardrails.get("blocked_reason") or ""))
confidence_text = ""
try:
if confidence is not None:
confidence_text = f" 信心度:{float(confidence):.0%}"
except (TypeError, ValueError):
confidence_text = ""
lines = [
"🧭 決策信封",
f"• 類型:{decision_type} 嚴重度:{severity}{confidence_text}",
f"• 資料品質:{data_quality} 自動執行:{'允許' if can_auto_execute else '不允許'}",
]
if blocked_reason:
lines.append(f"• 邊界:{blocked_reason}")
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
if subject:
sku = escape(str(subject.get("sku") or ""))
name = escape(_short_text(subject.get("name") or "", 96))
competitor_id = escape(str(subject.get("competitor_product_id") or ""))
competitor_name = escape(_short_text(subject.get("competitor_product_name") or "", 96))
subject_lines = []
if sku:
subject_lines.append(f"• SKU:{sku}")
if name:
subject_lines.append(f"• 商品:{name}")
if competitor_id:
subject_lines.append(f"• PChome:{competitor_id}")
if competitor_name:
subject_lines.append(f"• 候選:{competitor_name}")
if subject_lines:
lines += ["", "標的", *subject_lines]
evidence_items = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
if evidence_items:
lines += ["", "證據"]
for item in evidence_items[:3]:
if not isinstance(item, dict):
lines.append(f"• {escape(str(item))[:180]}")
continue
metric = escape(str(item.get("metric") or item.get("type") or "evidence"))
value = escape(str(item.get("value") if item.get("value") is not None else ""))
basis = escape(str(item.get("basis") or ""))
freshness = escape(str(item.get("freshness") or ""))
item_confidence = item.get("confidence")
confidence_suffix = ""
try:
if item_confidence is not None:
confidence_suffix = f" / {float(item_confidence):.0%}"
except (TypeError, ValueError):
confidence_suffix = ""
detail = " / ".join(part for part in (value, basis, freshness) if part)
lines.append(f"• {metric}{confidence_suffix}" + (f":{detail}" if detail else ""))
recommended_action = envelope.get("recommended_action")
if isinstance(recommended_action, dict):
action = escape(str(recommended_action.get("action") or "human_review"))
owner = escape(str(recommended_action.get("owner") or "未指定"))
deadline = escape(str(recommended_action.get("deadline") or ""))
requires_hitl = bool(recommended_action.get("requires_hitl", True))
lines += [
"",
"建議行動",
f"• 動作:{action} 負責:{owner}",
f"• HITL:{'需要' if requires_hitl else '不需要'}" + (f" 期限:{deadline}" if deadline else ""),
]
expected_impact = envelope.get("expected_impact")
if isinstance(expected_impact, dict) and expected_impact:
impact_parts = []
for key in ("revenue_loss_7d", "gap_amount", "cost_usd", "risk_reduction"):
if key in expected_impact and expected_impact[key] is not None:
impact_parts.append(f"{escape(key)}={escape(str(expected_impact[key]))}")
if impact_parts:
lines += ["", "預期影響", "• " + " / ".join(impact_parts[:4])]
trace = envelope.get("trace")
if isinstance(trace, dict):
trace_parts = []
for key in ("ai_call_id", "insight_id", "action_plan_id", "model", "provider"):
if trace.get(key) is not None:
trace_parts.append(f"{key}={trace[key]}")
if trace_parts:
lines += ["", f"{escape(' | '.join(trace_parts))}"]
return lines + [""]
def triaged_alert(base_event: Dict[str, Any], tier_label: str,
ai_summary: str, ai_cause: Optional[str] = None,
ai_actions: Optional[list] = None,
ai_executed: Optional[list] = None) -> tuple:
"""EA L1/L2 自主執行通知(保留原有介面,升級排版)"""
event_type = base_event.get("event_type", "alert")
title = escape(str(base_event.get("title", "")))
summary = escape(str(base_event.get("summary", "")))
event_id = base_event.get("id")
decision_envelope = base_event.get("decision_envelope") or base_event.get("decision")
if not event_id and isinstance(decision_envelope, dict):
event_id = decision_envelope.get("decision_id")
safe_ai_summary = escape(str(ai_summary or ""))
safe_ai_cause = escape(str(ai_cause or "")) if ai_cause else None
safe_actions = [escape(str(a)) for a in (ai_actions or [])]
safe_executed = [escape(str(a)) for a in (ai_executed or [])]
if event_type == "ea_escalation":
message = _format_ea_escalation_alert(
base_event=base_event,
tier_label=tier_label,
ai_summary=str(ai_summary or ""),
ai_cause=ai_cause,
ai_actions=ai_actions,
)
else:
lines = [
f"⚡ {tier_label} · {event_type}",
f"📌 {title}",
"",
]
if summary:
lines += [f"🔍 概要:{summary}", ""]
if safe_ai_summary:
lines += [f"🧠 AI 摘要:{safe_ai_summary[:400]}", ""]
if safe_ai_cause:
lines += [f"💡 可能原因:{safe_ai_cause}", ""]
if isinstance(decision_envelope, dict):
lines += _format_decision_envelope(decision_envelope)
if not event_id:
event_id = decision_envelope.get("decision_id")
if safe_actions:
lines += ["📋 建議行動:"] + [f" • {a}" for a in safe_actions] + [""]
if safe_executed:
lines += ["✅ 已執行:"] + [f" • {a}" for a in safe_executed] + [""]
trace = base_event.get("trace")
if trace:
lines.append(f"
{trace[-400:]}")
message = "\n".join(lines)
# ADR-012: eig=event_ignore,callback_data 需小於 Telegram 64-byte 限制。
_eid = _callback_payload_utf8(event_id, max_bytes=52)
keyboard = {"inline_keyboard": [
[{"text": "🛑 忽略此事件",
"callback_data": f"momo:eig:{_eid}"}],
]}
return message, keyboard
def insight_summary_msg(insights: List[Dict], period: str = "近24h") -> str:
"""AI 洞察摘要彙整(供定期推播)"""
n = len(insights)
lines = [
f"💡 AI 洞察摘要 · {period} 共 {n} 筆",
"━━━━━━━━━━━━━━━━━━━━",
]
type_icons = {
"price_alert": "🔴", "recommendation": "💰", "weekly_strategy": "📊",
"meta_analysis": "🤖", "market_opportunity": "🟢", "mcp_cache": "🌐",
}
for ins in insights[:6]:
icon = type_icons.get(ins.get("insight_type", ""), "💡")
content_preview = str(ins.get("content", ""))[:80].replace("\n", " ")
conf = float(ins.get("confidence", 0))
lines.append(f"{icon} {content_preview}… ({conf:.0%})")
if n > 6:
lines.append(f"…另有 {n-6} 筆洞察")
lines.append("━━━━━━━━━━━━━━━━━━━━")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════════
# 舊版相容介面(保留供現有程式碼調用)
# ══════════════════════════════════════════════════════════════════════════════
def alert(title: str, content: str, actions: Optional[list] = None) -> str:
msg = system_alert_msg("error", title, content)
if actions:
msg += "\n" + "\n".join(f" • {a}" for a in actions)
return msg
def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
detail_str = "\n".join(f" {k}: {v}" for k, v in (details or {}).items())
return system_alert_msg("warning", title, summary + ("\n" + detail_str if detail_str else ""))
def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
t_str = f" · {time}" if time else ""
return f"ℹ️ {title} [{module}]{t_str}\n\n{content}"
def success(title: str, module: str, stats: str = "") -> str:
return f"✅ {title} [{module}]\n{stats}"
def report(title: str, report_type: str, period: str, content_md: str) -> str:
icons = {"weekly_strategy": "📊", "daily": "📅", "monthly": "📆", "meta_analysis": "🤖"}
icon = icons.get(report_type, "📋")
return (
f"{icon} {title} ({report_type})\n"
f"📅 期間:{period}\n"
f"══════════════════════════\n"
f"{content_md}"
)
def _send_telegram(msg: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None) -> bool:
return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)
# ══════════════════════════════════════════════════════════════════════════════
# LLM Token 日報模板(Operation Ollama-First v5.0 — Phase 1 收尾)
# 對應 services/token_report_service.py
# ══════════════════════════════════════════════════════════════════════════════
# Telegram 單訊息上限(保險絲,sendMessage HTML 上限為 4096 字元)
_DAILY_TOKEN_REPORT_MAX_CHARS = 4096
def daily_token_report(report_html: str,
footer_url: Optional[str] = None) -> str:
"""LLM Token 日報訊息包裝(HTML parse_mode)。
本函數只負責「附上 footer + 截斷至 Telegram 上限」;報表本體由
services/token_report_service.generate_daily_report() 產出,已含 HTML escape。
Args:
report_html: 已 escape 的 HTML 報表字串
footer_url: 選填 admin 後台連結,會自動 escape
Returns:
≤ 4096 字元的 HTML 字串,安全送 Telegram
"""
body = report_html or ""
if footer_url:
body = f"{body}\n📎 詳細日誌"
if len(body) <= _DAILY_TOKEN_REPORT_MAX_CHARS:
return body
# 超長 → 截斷並加省略尾(保留 80 字給 trailing notice)
truncated = body[: _DAILY_TOKEN_REPORT_MAX_CHARS - 80]
return truncated + "\n\n... (訊息過長已截斷;完整內容存於 ai_insights)"
# ══════════════════════════════════════════════════════════════════════════════
# 決策回執模板(L2 / L3 按鈕點擊後編輯原訊息使用)
# ADR-012 Phase 4:審計留痕 — 操作者、時間、action、結果
# ══════════════════════════════════════════════════════════════════════════════
# Asia/Taipei(UTC+8)— 容器可能跑 UTC,這裡手動偏移避免依賴 tzdata
_TAIPEI_UTC_OFFSET_HOURS = 8
def _now_taipei_hhmm() -> str:
"""回傳 Asia/Taipei 的 HH:MM(容器 TZ 不可靠,手動加 8h)。"""
try:
from datetime import timedelta, timezone
tz = timezone(timedelta(hours=_TAIPEI_UTC_OFFSET_HOURS))
return datetime.now(tz).strftime("%H:%M")
except Exception:
return datetime.now().strftime("%H:%M")
def _html_escape(s: Any) -> str:
"""最小 HTML 轉義(Telegram HTML parse_mode 允許 ,
故只轉 < > & 避免使用者輸入破版)。"""
text = "" if s is None else str(s)
return (text.replace("&", "&")
.replace("<", "<")
.replace(">", ">"))
def decision_result(original_text: str, action: str, operator: str,
note: Optional[str] = None, **extras: Any) -> str:
"""L2 單品定價決策回執(approve / reject 後編輯原訊息)。
Args:
original_text: 原 Telegram 訊息文字(query.message.text)
action: "approve" / "reject"
operator: 操作者顯示名稱(user.full_name 或 id_)
note: 選填補充說明(例:訓練保守策略)
**extras: 前向相容預留(目前忽略)
Returns:
原訊息 + 分隔線 + 稽核區塊(HTML parse_mode 安全)
"""
act = (action or "").lower()
if act == "approve":
icon, label = "✅", "已執行"
elif act == "reject":
icon, label = "❌", "已拒絕"
else:
icon, label = "ℹ️", f"已處理({_html_escape(action)})"
ts = _now_taipei_hhmm()
op_esc = _html_escape(operator or "unknown")
base = original_text or ""
lines = [
base,
"",
"━━━━━━━━━━━━━━━━━━━━",
f"{icon} {label} by {op_esc} at {ts}",
]
if note:
lines.append(f"📝 {_html_escape(note)}")
return "\n".join(lines)
def ops_action_result(original_text: str, action: str, operator: str,
result: Optional[Dict[str, Any]] = None,
**extras: Any) -> str:
"""L3 運維決策回執(pause1h / pause6h / retry / resume 執行後編輯原訊息)。
Args:
original_text: 原 Telegram 訊息文字
action: "pause1h" / "pause6h" / "retry" / "resume" / 其他
operator: 操作者顯示名稱
result: OPS_ACTIONS 函數回傳 dict,常見欄位:
status: "ok" / "success" / "error" / "skipped"
error: 錯誤訊息(若 status=error)
message / detail: 成功訊息
task_name, duration_min 等
**extras: 前向相容預留
Returns:
原訊息 + 分隔線 + 運維執行結果區塊
"""
action_labels = {
"pause1h": "暫停 1 小時",
"pause6h": "暫停 6 小時",
"retry": "立即重試",
"resume": "恢復執行",
"execute": "執行",
"skip": "略過",
"rollback": "回滾",
}
act_label = action_labels.get((action or "").lower(), _html_escape(action or "未知動作"))
result = result or {}
status = str(result.get("status", "")).lower()
if status in ("ok", "success", "done"):
status_icon, status_text = "✅", "成功"
elif status in ("error", "failed", "fail"):
status_icon, status_text = "❌", "失敗"
elif status == "skipped":
status_icon, status_text = "⏭️", "已略過"
else:
status_icon, status_text = "ℹ️", status or "已提交"
ts = _now_taipei_hhmm()
op_esc = _html_escape(operator or "unknown")
# 擷取結果訊息(err > message > detail)
detail_msg: Optional[str] = None
for key in ("error", "message", "detail"):
if result.get(key):
detail_msg = str(result[key])[:300]
break
lines = [
original_text or "",
"",
"━━━━━━━━━━━━━━━━━━━━",
f"🛠️ 運維動作:{act_label}",
f"👤 操作員:{op_esc} 🕐 {ts}",
f"{status_icon} 執行結果:{_html_escape(status_text)}",
]
if detail_msg:
lines.append(f"📋 {_html_escape(detail_msg)}")
# 額外關鍵欄位(task_name / duration_min 等)
task_name = result.get("task_name")
if task_name:
lines.append(f"📌 任務:{_html_escape(task_name)}")
duration_min = result.get("duration_min")
if duration_min:
lines.append(f"⏱️ 時長:{_html_escape(duration_min)} 分鐘")
return "\n".join(lines)