[V10.4-B] Telegram 按鈕安全強化:C2/C3/H4/H6 修復

修復 P9-1 全景盤點所發現的四項高優先問題:

- routes/openclaw_bot_routes.py:
    C3: ALLOWED_USERS/ALLOWED_GROUP 白名單 fail-closed,阻擋非授權 chat
    H4: _seen_update_ids 改用 deque(maxlen=500) LRU 防記憶體洩漏
- services/telegram_bot_service.py:
    C2: 新增 momo:bpa/bpr/eig 三個 callback 分支 + handler 實作
    H6: callback 滑動視窗速率限制(30次/分鐘/用戶)
- services/telegram_templates.py:
    修正 decision_result / ops_action_result ImportError BLOCKER
    新增 _now_taipei_hhmm / _html_escape 輔助函式

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-25 01:42:58 +08:00
parent fcac03379d
commit 3f7fc0aba0
3 changed files with 494 additions and 132 deletions

View File

@@ -24,7 +24,8 @@ import os
import re
import threading
import requests
from datetime import datetime, timezone, timedelta
from collections import deque
from datetime import datetime as _dt, timezone as _tz, timedelta as _td
from flask import Blueprint, request, jsonify
from sqlalchemy import text
@@ -65,14 +66,17 @@ GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')
GEMINI_BASE_URL = 'https://generativelanguage.googleapis.com/v1beta/models'
GEMINI_MODEL = 'gemini-2.0-flash'
TAIPEI_TZ = timezone(timedelta(hours=8))
TAIPEI_TZ = _tz(_td(hours=8))
sys_log = SystemLogger("OpenClawBot").get_logger()
openclaw_bot_bp = Blueprint('openclaw_bot', __name__)
# ── Telegram retry 去重 (update_id 快取,最多保留 500 筆) ─────
_seen_update_ids: set = set()
# H4 修補:原本單一 set + set.pop() 隨機刪,可能誤刪剛加入的 update_id。
# 改為 deque(FIFO 插入序) + set(O(1) lookup) 雙結構;滿時從 deque 頭淘汰最舊。
_SEEN_MAX = 500
_seen_update_order: deque = deque(maxlen=_SEEN_MAX)
_seen_update_ids: set = set()
BOT_TOKEN = os.getenv('OPENCLAW_BOT_TOKEN', '8610496165:AAFOlcWV4oRUSC2TI-fYux7JV97fjNzsYR8')
BOT_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
@@ -93,6 +97,25 @@ ALLOWED_USERS: set = (
if _allowed_users_raw.strip() else set()
)
# ── fail-closed 統一授權檢查 ───────────────────────────────────
# 規則(任一滿足即通過,否則一律拒絕):
# 1. group/supergroup 且 chat_id == ALLOWED_GROUP
# 2. private 且 user_id ∈ ALLOWED_USERSenv 未設 → 空 set → 全拒)
# channel / 未知 chat_type / 缺欄位 → 拒絕
# 修補 C3callback handler 原本只擋 group/supergroup 不匹配private 完全放行;
# message handler `if ALLOWED_USERS and ...` 空 set 時整段失效。
def _is_authorized(chat_type: str, chat_id, user_id) -> bool:
try:
cid = int(chat_id) if chat_id is not None else None
uid = int(user_id) if user_id is not None else None
except (TypeError, ValueError):
return False
if chat_type in ('group', 'supergroup'):
return cid == ALLOWED_GROUP
if chat_type == 'private':
return uid is not None and uid in ALLOWED_USERS
return False
# ── 速率限制(每用戶每分鐘最多 30 次 AI 呼叫)──────────────────
import time as _time_mod
_rate_tracker: dict = {} # {user_id: [timestamp, ...]}
@@ -284,7 +307,7 @@ def generate_daily_pdf(date_str: str) -> str:
products = query_top_products(date_str, 20)
vendors = query_top_vendors(date_str, 10)
weekly = query_weekly_trend()
now_str = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M')
now_str = _dt.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M')
safe_date = date_str.replace('/', '-')
# ── openpyxl Excel主要支援中文秒產生─────────────────
@@ -634,11 +657,11 @@ def query_category_monthly(year: int, month: int, lim: int = 10) -> list:
def query_comparison(date_str):
"""今日 vs 上週同日 vs 上月同日"""
from datetime import datetime as dt
from datetime import datetime as dt, timedelta as td
try:
d = dt.strptime(normalize_date(date_str).replace('/', '-'), '%Y-%m-%d').date()
lw_str = (d - timedelta(days=7)).strftime('%Y/%m/%d')
lm_str = (d - timedelta(days=30)).strftime('%Y/%m/%d')
lw_str = (d - td(days=7)).strftime('%Y/%m/%d')
lm_str = (d - td(days=30)).strftime('%Y/%m/%d')
def _fetch(day_s):
try:
@@ -872,7 +895,7 @@ def query_growth_data() -> dict:
"""
import pandas as pd
from datetime import timezone, timedelta
TAIPEI_TZ = timezone(timedelta(hours=8))
TAIPEI_TZ = _tz(_td(hours=8))
try:
df = pd.read_sql(
text('SELECT "日期", "總業績", "訂單編號", "總成本" FROM realtime_sales_monthly'),
@@ -1220,7 +1243,7 @@ def gen_trend_chart(days=14, data_points=None, title=None) -> str:
ax.set_xticklabels(tick_labels, fontsize=9)
ax.set_ylabel('業績(萬元)', fontsize=12)
today_str = datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d')
today_str = dt.now(TAIPEI_TZ).strftime('%Y/%m/%d')
chart_title = title or f'業績趨勢走勢圖 — 近 {days} 日 (截至 {today_str}'
ax.set_title(chart_title, fontsize=14, fontweight='bold', pad=14)
ax.grid(True, alpha=0.25, linestyle='--', color='#BDBDBD')
@@ -1279,7 +1302,7 @@ def gen_products_chart(date_str, n=10) -> str:
# 取得上週同日資料做顏色比較
try:
d = dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date()
lw_str = (d - timedelta(days=7)).strftime('%Y/%m/%d')
lw_str = (d - _td(days=7)).strftime('%Y/%m/%d')
lw_products = query_top_products(lw_str, n * 2)
lw_map = {p.get('id'): p['revenue'] for p in lw_products if p.get('id')}
except Exception:
@@ -1364,10 +1387,10 @@ def analyze_product_strategy(date_str: str, top_n=10) -> list:
if not products:
return []
from datetime import datetime as dt
from datetime import datetime as dt, timedelta as td
try:
d = dt.strptime(normalize_date(date_str).replace('/', '-'), '%Y-%m-%d').date()
lw_str = (d - timedelta(days=7)).strftime('%Y/%m/%d')
lw_str = (d - td(days=7)).strftime('%Y/%m/%d')
lw_products = query_top_products(lw_str, top_n * 2)
lw_map = {p.get('id'): p['revenue'] for p in lw_products if p.get('id')}
except Exception:
@@ -1407,14 +1430,15 @@ def analyze_product_strategy(date_str: str, top_n=10) -> list:
def _analyze_strategy_range(start_str: str, end_str: str, products: list) -> list:
"""區間版策略分析(週/月/季/年用):比對前一等長區間的成長率"""
from datetime import datetime as dt, timedelta as td
if not products:
return []
try:
s = datetime.strptime(start_str.replace('/', '-'), '%Y-%m-%d')
e = datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
s = dt.strptime(start_str.replace('/', '-'), '%Y-%m-%d')
e = dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
delta = (e - s).days + 1
prev_end = (s - timedelta(days=1)).strftime('%Y/%m/%d')
prev_start = (s - timedelta(days=delta)).strftime('%Y/%m/%d')
prev_end = (s - td(days=1)).strftime('%Y/%m/%d')
prev_start = (s - td(days=delta)).strftime('%Y/%m/%d')
prev_prods = query_top_products_range(prev_start, prev_end, len(products) * 2)
prev_map = {p.get('id'): p['revenue'] for p in prev_prods if p.get('id')}
except Exception:
@@ -2110,7 +2134,7 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str:
import threading as _thr
_thr.Thread(
target=store_insight,
args=(datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d'),
args=(_dt.now(TAIPEI_TZ).strftime('%Y-%m-%d'),
report_type or 'analysis', result_text),
daemon=True
).start()
@@ -2127,7 +2151,7 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str:
import threading as _thr
_thr.Thread(
target=store_insight,
args=(datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d'),
args=(_dt.now(TAIPEI_TZ).strftime('%Y-%m-%d'),
report_type or 'analysis', result_text),
daemon=True
).start()
@@ -2153,7 +2177,8 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
if not check_pptx_available():
raise RuntimeError("python-pptx 未安裝請執行pip install python-pptx")
now = datetime.now(TAIPEI_TZ)
from datetime import datetime as _dt, timedelta as _td
now = _dt.now(TAIPEI_TZ)
# ── MCP 外部情報(所有報告共用)──────────────────────────
try:
@@ -2247,8 +2272,8 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
period_label = f'{date_str} 日策略'
elif sub_arg in ('weekly', 'week', '', '週報'):
end_str = latest_date() or now.strftime('%Y/%m/%d')
start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- timedelta(days=6)).strftime('%Y/%m/%d')
start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- _td(days=6)).strftime('%Y/%m/%d')
date_str = f'{start_str}~{end_str}'
period_label = '週策略近7日'
elif sub_arg in ('monthly', 'month', '', '月報') or (
@@ -2266,20 +2291,20 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
period_label = f'{yr_s}/{mo_s:02d} 月策略'
elif sub_arg in ('quarterly', 'quarter', 'q', '', '季報'):
end_str = latest_date() or now.strftime('%Y/%m/%d')
start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- timedelta(days=89)).strftime('%Y/%m/%d')
start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- _td(days=89)).strftime('%Y/%m/%d')
date_str = f'{start_str}~{end_str}'
period_label = '季策略近90日'
elif sub_arg in ('half', 'h1', 'h2', '半年', '半年報'):
end_str = latest_date() or now.strftime('%Y/%m/%d')
start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- timedelta(days=179)).strftime('%Y/%m/%d')
start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- _td(days=179)).strftime('%Y/%m/%d')
date_str = f'{start_str}~{end_str}'
period_label = '半年策略近180日'
elif sub_arg in ('yearly', 'year', 'annual', '', '年報'):
end_str = latest_date() or now.strftime('%Y/%m/%d')
start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- timedelta(days=364)).strftime('%Y/%m/%d')
start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d')
- _td(days=364)).strftime('%Y/%m/%d')
date_str = f'{start_str}~{end_str}'
period_label = '年度策略近365日'
else:
@@ -2337,39 +2362,39 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
# 決定日期範圍(與 strategy 相同邏輯)
if sub_arg in ('weekly', 'week', ''):
end_d = datetime.strptime(
end_d = _dt.strptime(
(latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d')
start_d = end_d - timedelta(days=6)
start_d = end_d - _td(days=6)
period_label = '週比較近7日'
elif sub_arg in ('monthly', 'month', ''):
end_d = datetime.strptime(
end_d = _dt.strptime(
(latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d')
start_d = end_d.replace(day=1)
period_label = f'{end_d.year}/{end_d.month:02d} 月比較'
elif sub_arg in ('quarterly', 'quarter', 'q', ''):
end_d = datetime.strptime(
end_d = _dt.strptime(
(latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d')
start_d = end_d - timedelta(days=89)
start_d = end_d - _td(days=89)
period_label = '季比較近90日'
elif sub_arg in ('half', '半年'):
end_d = datetime.strptime(
end_d = _dt.strptime(
(latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d')
start_d = end_d - timedelta(days=179)
start_d = end_d - _td(days=179)
period_label = '半年比較近180日'
elif sub_arg in ('yearly', 'year', ''):
end_d = datetime.strptime(
end_d = _dt.strptime(
(latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d')
start_d = end_d - timedelta(days=364)
start_d = end_d - _td(days=364)
period_label = '年度比較近365日'
elif sub_arg and re.match(r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', sub_arg):
# 指定日期(今日/昨日/自訂)
d_str = normalize_date(sub_arg)
start_d = end_d = datetime.strptime(d_str.replace('/', '-'), '%Y-%m-%d')
start_d = end_d = _dt.strptime(d_str.replace('/', '-'), '%Y-%m-%d')
period_label = f'{d_str} 日比較'
else:
# 預設:昨日日報
yd = (now - timedelta(days=1)).strftime('%Y/%m/%d')
start_d = end_d = datetime.strptime(yd.replace('/', '-'), '%Y-%m-%d')
yd = (now - _td(days=1)).strftime('%Y/%m/%d')
start_d = end_d = _dt.strptime(yd.replace('/', '-'), '%Y-%m-%d')
period_label = f'{yd} 日比較'
date_str_for_query = start_d.strftime('%Y/%m/%d')
@@ -2451,7 +2476,7 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
session = get_session()
try:
# Find today's generated growth trend report
today = datetime.now()
today = _dt.now()
cached_report = session.query(PPTReport).filter(
PPTReport.report_type == 'growth',
PPTReport.generated_at >= today.replace(hour=0, minute=0, second=0, microsecond=0),
@@ -2496,12 +2521,12 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
session = get_session()
try:
# 設定 24 小時後過期
expires_at = datetime.now() + timedelta(hours=24)
expires_at = _dt.now() + _td(hours=24)
# 刪除舊的快取記錄
session.query(PPTReport).filter(
PPTReport.report_type == 'growth',
PPTReport.expires_at <= datetime.now()
PPTReport.expires_at <= _dt.now()
).delete()
# 儲存新的記錄
@@ -2582,12 +2607,12 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
session = get_session()
try:
# 設定 24 小時後過期
expires_at = datetime.now() + timedelta(hours=24)
expires_at = _dt.now() + _td(hours=24)
# 刪除舊的快取記錄
session.query(PPTReport).filter(
PPTReport.report_type == 'vendor',
PPTReport.expires_at <= datetime.now()
PPTReport.expires_at <= _dt.now()
).delete()
# 儲存新的記錄
@@ -2621,7 +2646,7 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
session = get_session()
try:
# 查找今天是否有生成的 BCG 報告
today = datetime.now()
today = _dt.now()
cached_report = session.query(PPTReport).filter(
PPTReport.report_type == 'bcg',
PPTReport.parameters == f"{yr_b}/{mo_b:02d}",
@@ -2660,12 +2685,12 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -
session = get_session()
try:
# 設定 24 小時後過期
expires_at = datetime.now() + timedelta(hours=24)
expires_at = _dt.now() + _td(hours=24)
# 刪除舊的快取記錄
session.query(PPTReport).filter(
PPTReport.report_type == 'bcg',
PPTReport.expires_at <= datetime.now()
PPTReport.expires_at <= _dt.now()
).delete()
# 儲存新的記錄
@@ -2887,9 +2912,9 @@ def _handle_excel_import(doc: dict, chat_id: int, reply_to: int):
def send_morning_report():
"""每日 08:30 早報 — P10 升級版TOP15 + PPT導引按鈕"""
try:
now = datetime.now(TAIPEI_TZ)
now = _dt.now(TAIPEI_TZ)
td = now.strftime('%Y/%m/%d')
yd = (now - timedelta(days=1)).strftime('%Y/%m/%d')
yd = (now - _td(days=1)).strftime('%Y/%m/%d')
wdays = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
sales = query_sales(yd)
top15 = query_top_products(yd, 15)
@@ -2982,7 +3007,7 @@ def send_morning_report():
def send_evening_report():
"""每日 21:00 晚報"""
try:
now = datetime.now(TAIPEI_TZ)
now = _dt.now(TAIPEI_TZ)
td = now.strftime('%Y/%m/%d')
sales = query_sales(td)
weekly = query_weekly_trend()
@@ -3028,7 +3053,7 @@ def send_evening_report():
lines.append("")
else:
# 檢查是否真的沒有資料,避免重複顯示
today_str = datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d')
today_str = _dt.now(TAIPEI_TZ).strftime('%Y/%m/%d')
if latest_date() == today_str:
# 如果最新日期就是今天,但查詢失敗,可能是資料庫問題
lines.append("⚠️ *今日業績資料載入異常*")
@@ -3084,7 +3109,7 @@ def send_evening_report():
def send_weekly_report():
"""每週一 09:00 週報"""
try:
now = datetime.now(TAIPEI_TZ)
now = _dt.now(TAIPEI_TZ)
td = now.strftime('%Y/%m/%d')
weekly = query_weekly_trend()
WEEKDAYS_ZH = ['週一', '週二', '週三', '週四', '週五', '週六', '週日']
@@ -3153,7 +3178,7 @@ def send_weekly_report():
def check_anomalies():
"""每日 9/12/15/18 點異常偵測(整體業績 + 商品級)"""
try:
now = datetime.now(TAIPEI_TZ)
now = _dt.now(TAIPEI_TZ)
td = now.strftime('%Y/%m/%d')
ts = now.strftime('%H:%M')
alerts = []
@@ -3234,7 +3259,7 @@ def send_competitor_report():
if not _PCHOME_AVAILABLE:
return
try:
yesterday = (datetime.now(TAIPEI_TZ).date() - timedelta(days=1)).strftime('%Y/%m/%d')
yesterday = (_dt.now(TAIPEI_TZ).date() - _td(days=1)).strftime('%Y/%m/%d')
sys_log.info(f'[PChome] 每日競品日報開始 {yesterday}')
results = pchome_batch(_db(), top_n=30, date_str=yesterday)
pchome_save(_db(), results)
@@ -3271,7 +3296,7 @@ def send_competitor_report():
def send_daily_excel():
"""每日 08:45 自動發送昨日 Excel 業績報表"""
try:
yesterday = (datetime.now(TAIPEI_TZ).date() - timedelta(days=1)).strftime('%Y/%m/%d')
yesterday = (_dt.now(TAIPEI_TZ).date() - _td(days=1)).strftime('%Y/%m/%d')
sys_log.info(f"[AutoExcel] 開始產生 {yesterday} Excel 報表")
path = generate_daily_pdf(yesterday)
if not path:
@@ -3382,11 +3407,10 @@ def main_menu_keyboard():
def _submenu_sales():
ld = latest_date() or ''
yesterday = ''
current_month = datetime.now(TAIPEI_TZ).strftime('%Y/%m')
current_month = _dt.now(TAIPEI_TZ).strftime('%Y/%m')
if ld:
try:
from datetime import datetime as _dt
yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - timedelta(days=1)).strftime('%Y/%m/%d')
yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - _td(days=1)).strftime('%Y/%m/%d')
except Exception:
pass
d_label = ld[-5:] if ld else '-'
@@ -3411,8 +3435,7 @@ def _submenu_products():
yesterday = ''
if ld:
try:
from datetime import datetime as _dt
yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - timedelta(days=1)).strftime('%Y/%m/%d')
yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - _td(days=1)).strftime('%Y/%m/%d')
except Exception:
pass
d_label = ld[-5:] if ld else '-'
@@ -3465,9 +3488,11 @@ def _submenu_category():
"""分類業績鑽取 — 顯示 L1 固定分類按鈕"""
ld = latest_date() or ''
CATS = [
('美妝保養', '💄'), ('保健食品/用品', '💊'), ('母嬰', '👶'),
('食品飲料', '🍱'), ('家電', '🏠'), ('服裝內著', '👕'),
('個人清潔', '🧴'), ('運動用品/器材', '🏃'), ('寵物', '🐾'), ('其他', '📦'),
('美妝保養', '💄'), ('3C家電', '📱'), ('服飾配件', '👕'),
('居家生活', '🏠'), ('母嬰用品', '🍼'), ('生鮮食品', '🥗'),
('圖書文具', '📚'), ('戶外運動', ''), ('餐券票券', '🎫'),
('醫療保健', '💊'), ('美體保健', '💆'), ('寵物用品', '🐕'),
('箱包精品', '👜'), ('車類百貨', '🚗'), ('情趣用品', '❤️'),
]
rows = []
for i in range(0, len(CATS), 2):
@@ -3539,8 +3564,8 @@ def _submenu_market():
def _submenu_competitor():
"""競品日報第二層:所有選項直接產 PPT"""
today = datetime.now(TAIPEI_TZ).date()
yesterday = today - timedelta(days=1)
today = _dt.now(TAIPEI_TZ).date()
yesterday = today - _td(days=1)
td_str = today.strftime('%Y/%m/%d')
yd_str = yesterday.strftime('%Y/%m/%d')
td_label = today.strftime('%m/%d')
@@ -3609,9 +3634,8 @@ _AWAIT_PROMPTS = {
def sales_quick_kb(date_str):
try:
from datetime import datetime as dt
d = dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date()
yesterday = (d - timedelta(days=1)).strftime('%Y/%m/%d')
d = _dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date()
yesterday = (d - _td(days=1)).strftime('%Y/%m/%d')
return [
[{'text': '⬅️ 昨日業績', 'callback_data': f'cmd:sales:{yesterday}'},
{'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{date_str}'}],
@@ -3873,14 +3897,14 @@ def query_top_products_range(start_str: str, end_str: str, lim: int = 10) -> lis
def resolve_date(q: str) -> str:
"""從問題文字解析目標日期,回傳 YYYY/MM/DD"""
import re
today = datetime.now(TAIPEI_TZ).date()
today = _dt.now(TAIPEI_TZ).date()
if '昨天' in q or '昨日' in q:
return (today - timedelta(days=1)).strftime('%Y/%m/%d')
return (today - _td(days=1)).strftime('%Y/%m/%d')
if '前天' in q:
return (today - timedelta(days=2)).strftime('%Y/%m/%d')
return (today - _td(days=2)).strftime('%Y/%m/%d')
if '大前天' in q:
return (today - timedelta(days=3)).strftime('%Y/%m/%d')
return (today - _td(days=3)).strftime('%Y/%m/%d')
# 明確日期格式2026/04/15 or 2026-04-15 or 04/15
m = re.search(r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', q)
@@ -3900,13 +3924,13 @@ def resolve_query_intent(q: str) -> dict:
Function Calling 架構不需要此函數,僅 NIM 備援使用)
"""
import re
today = datetime.now(TAIPEI_TZ).date()
today = _dt.now(TAIPEI_TZ).date()
# ── 月份查詢 ─────────────────────────────────────────────
# 「上個月」「上月」
if any(kw in q for kw in ['上個月', '上月', '上月份']):
first = today.replace(day=1)
last_m = (first - timedelta(days=1))
last_m = (first - _td(days=1))
return {'type': 'month', 'year': last_m.year, 'month': last_m.month}
# 「這個月」「本月」
@@ -3928,12 +3952,12 @@ def resolve_query_intent(q: str) -> dict:
# ── 週查詢 ───────────────────────────────────────────────
if any(kw in q for kw in ['上週', '上个星期', '上星期', '上週']):
mon = today - timedelta(days=today.weekday() + 7)
sun = mon + timedelta(days=6)
mon = today - _td(days=today.weekday() + 7)
sun = mon + _td(days=6)
return {'type': 'range', 'start': mon.strftime('%Y/%m/%d'),
'end': sun.strftime('%Y/%m/%d'), 'label': '上週'}
if any(kw in q for kw in ['這週', '本週', '這周', '本周']):
mon = today - timedelta(days=today.weekday())
mon = today - _td(days=today.weekday())
return {'type': 'range', 'start': mon.strftime('%Y/%m/%d'),
'end': today.strftime('%Y/%m/%d'), 'label': '本週'}
@@ -3941,7 +3965,7 @@ def resolve_query_intent(q: str) -> dict:
m = re.search(r'近(\d+)[天日]', q)
if m:
n = int(m.group(1))
start = (today - timedelta(days=n - 1)).strftime('%Y/%m/%d')
start = (today - _td(days=n - 1)).strftime('%Y/%m/%d')
return {'type': 'range', 'start': start,
'end': today.strftime('%Y/%m/%d'), 'label': f'{n}'}
@@ -4388,9 +4412,9 @@ _FC_TOOLS = [{
def _execute_tool(name: str, args: dict) -> dict:
"""執行 Gemini 指定的工具,回傳結構化結果"""
now = datetime.now(TAIPEI_TZ)
now = _dt.now(TAIPEI_TZ)
today = now.strftime("%Y/%m/%d")
yd = (now - timedelta(days=1)).strftime("%Y/%m/%d")
yd = (now - _td(days=1)).strftime("%Y/%m/%d")
# ── query_sales ───────────────────────────────────────────
if name == "query_sales":
@@ -4419,7 +4443,7 @@ def _execute_tool(name: str, args: dict) -> dict:
elif period == "last_month":
first = now.replace(day=1)
lm = (first - timedelta(days=1))
lm = (first - _td(days=1))
ms = query_monthly_summary(lm.year, lm.month)
result = {"monthly": ms, "year": lm.year, "month": lm.month}
@@ -4490,7 +4514,7 @@ def openclaw_answer(question: str):
Function Calling 架構 — AI 自主決定查什麼、怎麼回答
不再靠 if/else 規則判斷意圖
"""
now = datetime.now(TAIPEI_TZ)
now = _dt.now(TAIPEI_TZ)
today_str = now.strftime("%Y/%m/%d")
# ── 功能說明直接導 help ───────────────────────────────────
@@ -4620,7 +4644,7 @@ def openclaw_answer(question: str):
try:
intent = resolve_query_intent(question)
today = today_str
yd = (now - timedelta(days=1)).strftime("%Y/%m/%d")
yd = (now - _td(days=1)).strftime("%Y/%m/%d")
d = intent.get("date", yd) if intent["type"] == "day" else yd
db_ctx = ""
@@ -4674,7 +4698,7 @@ def openclaw_answer(question: str):
# ── 指令處理 ──────────────────────────────────────────────────
def handle_cmd(cmd, arg, chat_id, reply_to):
ld = latest_date() or datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d')
ld = latest_date() or _dt.now(TAIPEI_TZ).strftime('%Y/%m/%d')
target = normalize_date(arg) if arg else ld
if cmd in ('sales', '業績'):
@@ -4698,30 +4722,30 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
elif cmd in ('trend', '趨勢'):
import calendar as _cal
from datetime import date as _date
now_d = datetime.now(TAIPEI_TZ).date()
now_d = _dt.now(TAIPEI_TZ).date()
sub = arg.lower().strip() if arg else '7'
# 決定查詢區間
if sub in ('', '7', 'week', 'weekly', '', '近7日', '七天'):
ld_str = latest_date() or now_d.strftime('%Y/%m/%d')
end_d = datetime.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date()
start_d = end_d - timedelta(days=6)
end_d = _dt.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date()
start_d = end_d - _td(days=6)
period_label = '近7日'
elif sub in ('month', '30', '', '本月', '近30日', '近一月'):
end_d = now_d
start_d = end_d - timedelta(days=29)
start_d = end_d - _td(days=29)
period_label = '近30日'
elif sub in ('quarter', 'q', '', '近季', '近3月', '近三月'):
end_d = now_d
start_d = end_d - timedelta(days=89)
start_d = end_d - _td(days=89)
period_label = '近3個月'
elif sub in ('half', '半年', '近半年', '6月', '六個月'):
end_d = now_d
start_d = end_d - timedelta(days=179)
start_d = end_d - _td(days=179)
period_label = '近半年'
elif sub in ('year', 'yearly', '', '本年', '近年', '近一年'):
end_d = now_d
start_d = end_d - timedelta(days=364)
start_d = end_d - _td(days=364)
period_label = '近一年'
elif re.fullmatch(r'\d{4}/\d{1,2}', sub):
yr_s, mo_s = sub.split('/')
@@ -4744,8 +4768,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
period_label = f'{yr}年Q{qn}'
else:
ld_str = latest_date() or now_d.strftime('%Y/%m/%d')
end_d = datetime.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date()
start_d = end_d - timedelta(days=6)
end_d = _dt.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date()
start_d = end_d - _td(days=6)
period_label = '近7日'
start_str = start_d.strftime('%Y/%m/%d')
@@ -5035,7 +5059,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
yesterday = normalize_date(keyword)
date_label = yesterday
else:
yesterday = (datetime.now(TAIPEI_TZ).date() - timedelta(days=1)).strftime('%Y/%m/%d')
yesterday = (_dt.now(TAIPEI_TZ).date() - _td(days=1)).strftime('%Y/%m/%d')
date_label = f'昨日 ({yesterday[-5:]})'
send_message(chat_id,
f'📊 正在分析 {date_label} TOP30 熱銷商品 vs PChome 比價,預計 30~60 秒...',
@@ -5575,11 +5599,12 @@ def telegram_webhook():
if uid in _seen_update_ids:
sys_log.debug(f"[OpenClawBot] duplicate update_id={uid}, skip")
return jsonify({'ok': True})
# 若 deque 已達 maxlenappend 會自動擠掉最舊 → 同步從 set 移除
if len(_seen_update_order) >= _SEEN_MAX:
evicted = _seen_update_order[0]
_seen_update_ids.discard(evicted)
_seen_update_order.append(uid)
_seen_update_ids.add(uid)
if len(_seen_update_ids) > _SEEN_MAX:
# 清掉最舊的 100 筆set 無序,直接 pop 100 個)
for _ in range(100):
_seen_update_ids.pop()
# ── Callback Query按鈕─────────────────────────────
if 'callback_query' in update:
@@ -5588,11 +5613,16 @@ def telegram_webhook():
data = cq.get('data', '')
chat_id = cq['message']['chat']['id']
chat_type = cq['message']['chat'].get('type', '')
cq_from_id = (cq.get('from') or {}).get('id')
sys_log.info(f'[OpenClawBot] CB: chat={chat_id} type={chat_type} data={data} allowed={ALLOWED_GROUP}')
if chat_type in ('group', 'supergroup') and chat_id != ALLOWED_GROUP:
# fail-closed未授權一律安靜拒絕關閉 loading不回任何訊息避免偵察
if not _is_authorized(chat_type, chat_id, cq_from_id):
sys_log.warning(
f'[OpenClawBot] CB rejected: chat={chat_id} type={chat_type} user={cq_from_id}'
)
answer_callback(cq_id)
return jsonify({'ok': True})
return jsonify({'ok': False, 'error': 'forbidden'}), 403
answer_callback(cq_id)
send_typing(chat_id)
@@ -5645,20 +5675,21 @@ def telegram_webhook():
text_raw = (msg.get('text') or '').strip()
msg_id = msg.get('message_id')
# fail-closed 統一授權檢查(覆蓋 group/supergroup/private/channel/unknown
_uid = (msg.get('from') or {}).get('id')
if not _is_authorized(chat_type, chat_id, _uid):
sys_log.warning(
f'[OpenClawBot] MSG rejected: chat={chat_id} type={chat_type} user={_uid}'
)
# 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制)
return jsonify({'ok': False, 'error': 'forbidden'}), 403
if chat_type in ('group', 'supergroup'):
if chat_id != ALLOWED_GROUP:
return jsonify({'ok': True})
# 移除 @mention不強制要求但如有則移除
question = text_raw.replace(BOT_USERNAME, '').strip()
elif chat_type == 'private':
# 私訊存取控制 — 只允許白名單用戶
_uid = msg.get('from', {}).get('id', 0)
if ALLOWED_USERS and _uid not in ALLOWED_USERS:
send_message(chat_id, "⚠️ 此 Bot 僅限授權用戶使用,請聯絡管理員。", msg_id)
return jsonify({'ok': True})
question = text_raw
else:
return jsonify({'ok': True})
# 已通過授權的 private chat
question = text_raw
# ── 圖片訊息Gemini Vision 商品辨識 ─────────────────────
if not question and msg.get('photo'):

View File

@@ -17,11 +17,31 @@ import os
import json
import asyncio
import logging
import time as _time_mod
from typing import Optional
from datetime import date, timedelta
logger = logging.getLogger(__name__)
# H6 修補callback 速率限制(每 user_id 每分鐘最多 30 次 callback
# 訊息流在 routes/openclaw_bot_routes.py 已有 rate-limit這裡補上 callback 缺口。
_CB_RATE_LIMIT_PER_MIN = 30
_CB_RATE_WINDOW_SEC = 60
_cb_rate_tracker: dict = {} # {user_id: [timestamp, ...]}
def _check_cb_rate_limit(user_id: int) -> bool:
"""回傳 True = 允許False = 超過速率限制"""
if user_id is None:
return True # 無法辨識 user 時不阻擋(例如 inline query
now = _time_mod.time()
window = _cb_rate_tracker.setdefault(user_id, [])
# 清掉 60 秒以前的紀錄
_cb_rate_tracker[user_id] = [t for t in window if now - t < _CB_RATE_WINDOW_SEC]
if len(_cb_rate_tracker[user_id]) >= _CB_RATE_LIMIT_PER_MIN:
return False
_cb_rate_tracker[user_id].append(now)
return True
# 嘗試匯入 telegram 模組
try:
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, KeyboardButton
@@ -32,7 +52,11 @@ except ImportError:
logger.warning("python-telegram-bot 套件未安裝Telegram Bot 功能將無法使用")
# 商品分類列表
CATEGORIES = ['美妝', '3C', '服飾', '居家', '母嬰', '電商', '優惠', '生活', '美食', '熱門']
CATEGORIES = [
'美妝保養', '3C家電', '服飾配件', '居家生活', '母嬰用品',
'生鮮食品', '圖書文具', '戶外運動', '餐券票券', '醫療保健',
'美體保健', '寵物用品', '箱包精品', '車類百貨', '情趣用品'
]
class TrendTelegramBot:
@@ -402,6 +426,19 @@ class TrendTelegramBot:
async def handle_callback(self, update: Update, context):
"""處理按鈕回調"""
query = update.callback_query
# H6: per-user rate-limit每分鐘 30 次)。
# 放在 query.answer() 之前;若本檔未來加入授權檢查,應置於授權之後以免讓
# 未授權 user 佔用 rate counter目前本檔無授權層由 python-telegram-bot
# Application 自行處理 token 範圍信任)。
_uid = query.from_user.id if query.from_user else None
if not _check_cb_rate_limit(_uid):
try:
await query.answer("操作太頻繁,請稍後再試", show_alert=False)
except Exception as _e:
logger.debug(f"[handle_callback] rate-limit answer failed: {_e}")
return
await query.answer()
data = query.data
@@ -488,6 +525,17 @@ class TrendTelegramBot:
elif data.startswith("momo:ops:"):
await self._handle_ops_callback(query, data)
# ===== 批次定價決策momo:bpa / bpr:<batch_id>=====
elif data.startswith("momo:bpa:"):
await self._handle_batch_price_decision(query, data.split(":", 2)[-1], "approve")
elif data.startswith("momo:bpr:"):
await self._handle_batch_price_decision(query, data.split(":", 2)[-1], "reject")
# ===== 事件忽略momo:eig:<event_id>=====
elif data.startswith("momo:eig:"):
await self._handle_event_ignore(query, data.split(":", 2)[-1])
# ===== OpenClaw 指令按鈕cmd:<cmd>:<arg>=====
elif data.startswith("cmd:"):
parts = data[4:].split(":", 1)
@@ -579,11 +627,12 @@ class TrendTelegramBot:
[{'text': '📅 每週業績', 'callback_data': 'cmd:trend:week'},
{'text': '📅 每月業績', 'callback_data': 'cmd:history:' + current_month}],
[{'text': '📅 每季業績', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📈 趨勢分析', 'callback_data': 'menu:trend'},
{'text': '🔄 同期比較', 'callback_data': 'cmd:compare:' + today}],
[{'text': '🗂 分類業績', 'callback_data': 'cmd:category:' + today},
{'text': '📅 日期/區間', 'callback_data': 'await:date_range_sales'}],
[{'text': '🗃 月份覽', 'callback_data': 'cmd:history'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
@@ -636,16 +685,30 @@ class TrendTelegramBot:
def _get_reports_submenu(self):
"""簡報報表子選單"""
return [
[{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'},
{'text': '📊 週', 'callback_data': 'cmd:ppt:weekly'}],
[{'text': '📈 ', 'callback_data': 'cmd:ppt:monthly'},
# ── 定期報告
[{'text': '📄 日', 'callback_data': 'cmd:ppt:daily'},
{'text': '📈 ', 'callback_data': 'cmd:ppt:weekly'}],
[{'text': '📅 月報', 'callback_data': 'cmd:ppt:monthly'},
{'text': '📋 下載報表','callback_data': 'cmd:report'}],
# ── 策略簡報
[{'text': '🧩 策略(日)', 'callback_data': 'cmd:ppt:strategy'},
{'text': '🧩 策略(週)', 'callback_data': 'cmd:ppt:strategy weekly'}],
[{'text': '🧩 策略(月)', 'callback_data': 'cmd:ppt:strategy monthly'},
{'text': '🧩 策略(季)', 'callback_data': 'cmd:ppt:strategy quarterly'}],
[{'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'},
{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'}],
[{'text': '🔍 競品比較', 'callback_data': 'menu:competitor'},
{'text': '📈 成長趨勢報告', 'callback_data': 'cmd:ppt:growth'}],
[{'text': '🏭 廠商業績報告', 'callback_data': 'cmd:ppt:vendor'},
{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
[{'text': '🧩 策略(年)','callback_data': 'cmd:ppt:strategy half'},
{'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'}],
# ── 促銷 & 競品
[{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'},
{'text': '🔍 競品比較', 'callback_data': 'menu:competitor'}],
# ── 新增:成長趨勢 / 廠商 / BCG
[{'text': '📈 成長趨勢報告', 'callback_data': 'cmd:ppt:growth'},
{'text': '🏭 廠商業績報告', 'callback_data': 'cmd:ppt:vendor'}],
[{'text': '🎯 BCG 品牌矩陣', 'callback_data': 'cmd:ppt:bcg'},
{'text': '📅 指定月份廠商', 'callback_data': 'await:date_ppt_vendor'}],
# ── 自訂
[{'text': '📅 指定日期日報', 'callback_data': 'await:date_ppt_daily'},
{'text': '📅 指定月份月報', 'callback_data': 'await:date_ppt_monthly'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_market_submenu(self):
@@ -653,14 +716,14 @@ class TrendTelegramBot:
return [
[{'text': '📰 電商新聞', 'callback_data': 'cmd:news'},
{'text': '🌤 台北天氣', 'callback_data': 'cmd:weather'}],
[{'text': '<EFBFBD> Google熱搜', 'callback_data': 'cmd:trends'},
{'text': '<EFBFBD> Dcard口碑', 'callback_data': 'cmd:dcard'}],
[{'text': '🔥 Google熱搜', 'callback_data': 'cmd:trends'},
{'text': '💬 Dcard口碑', 'callback_data': 'cmd:dcard'}],
[{'text': '💱 台銀匯率', 'callback_data': 'cmd:exchange'},
{'text': '📅 電商節慶', 'callback_data': 'cmd:calendar'}],
[{'text': '▶️ YouTube爆紅商品', 'callback_data': 'cmd:youtube'},
{'text': '🧠 AI學習狀態', 'callback_data': 'cmd:learn'}],
[{'text': '🔍 關鍵字比價', 'callback_data': 'await:search_compare'},
{'text': '<EFBFBD> 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}],
{'text': '📷 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
@@ -676,11 +739,11 @@ class TrendTelegramBot:
return [
[{'text': f'📊 今日簡報 ({td_label})', 'callback_data': f'cmd:ppt:competitor {td_str}'},
{'text': f'<EFBFBD> 昨日報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}],
{'text': f'📊 昨日報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}],
[{'text': '📈 本週比較', 'callback_data': 'cmd:ppt:competitor weekly'},
{'text': '📆 本月比較', 'callback_data': 'cmd:ppt:competitor monthly'}],
[{'text': '🗃 本季比較', 'callback_data': 'cmd:ppt:competitor quarterly'},
{'text': '<EFBFBD> 指定日期', 'callback_data': 'await:date_competitor'}],
{'text': '📅 指定日期', 'callback_data': 'await:date_competitor'}],
[{'text': '📄 更多週期 →', 'callback_data': 'menu:competitor_ppt'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
@@ -689,7 +752,7 @@ class TrendTelegramBot:
"""競品 PPT 長週期選單(第三層)— 半年/年;日/週/月/季已在第二層"""
return [
[{'text': '📆 半年比較', 'callback_data': 'cmd:ppt:competitor half'},
{'text': '<EFBFBD> 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}],
{'text': '🗓 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}],
[{'text': '← 返回競品日報', 'callback_data': 'menu:competitor'}],
]
@@ -846,6 +909,130 @@ class TrendTelegramBot:
parse_mode='HTML'
)
async def _handle_batch_price_decision(self, query, batch_id: str, action: str):
"""
批次定價決策 callbackADR-012 C2 修復)
callback_data: momo:bpa:<batch_id> / momo:bpr:<batch_id>
流程:(1) 走 event_router.dispatch 走 L2 audit(2) 寫 KM feedback(3) 編輯訊息
"""
from services.openclaw_learning_service import store_insight
from services.event_router import dispatch as event_dispatch
from datetime import date as date_cls, datetime
user = query.from_user
operator = user.full_name or f"id_{user.id}"
action_label = "批准" if action == "approve" else "拒絕"
# 1) EventRouter dispatch — audit trailL2
try:
await event_dispatch({
"event_type": "batch_price_decision",
"severity": "alert",
"source": "telegram_bot",
"title": f"批次定價 {action_label}",
"summary": f"batch_id={batch_id} by {operator}",
"metadata": {
"batch_id": batch_id,
"decision": action,
"operator": operator,
"operator_tg_id": user.id,
},
})
except Exception as e:
logger.warning(f"[TelegramBot] batch_price event_dispatch failed: {e}")
# 2) KM feedback保守/積極策略訓練數據)
try:
note = "訓練積極定價策略" if action == "approve" else "訓練保守定價策略"
store_insight(
insight_type="batch_price_decision_feedback",
content=f"管理員{action_label}批次定價batch_id={batch_id}{note}",
period=date_cls.today().isoformat(),
metadata={
"decision": action,
"batch_id": batch_id,
"operator": operator,
"operator_tg_id": user.id,
},
)
except Exception as e:
logger.error(f"[TelegramBot] batch_price store_insight failed: {e}")
await query.answer("決策已記錄但 KM 寫入失敗", show_alert=True)
return
# 3) ack + 編輯原訊息加上執行紀錄
await query.answer(f"{action_label}批次決策", show_alert=False)
ts = datetime.now().strftime("%H:%M")
icon = "" if action == "approve" else ""
footer = f"\n\n━━━━━━━━━━━━━━━━━━━━\n{icon}{action_label} by {operator} at {ts}"
try:
await query.edit_message_text(
(query.message.text or "") + footer,
parse_mode='HTML',
)
except Exception as e:
logger.warning(f"[TelegramBot] batch_price edit_message failed: {e}")
async def _handle_event_ignore(self, query, event_id: str):
"""
事件忽略 callbackADR-012 C2 修復)
callback_data: momo:eig:<event_id>
流程:寫 KM + event_router 留痕 + 編輯訊息
"""
from services.openclaw_learning_service import store_insight
from services.event_router import dispatch as event_dispatch
from datetime import date as date_cls, datetime
user = query.from_user
operator = user.full_name or f"id_{user.id}"
# 1) EventRouter — L0 留痕severity=info不再觸發 AI 分析)
try:
await event_dispatch({
"event_type": "event_ignored",
"severity": "info",
"source": "telegram_bot",
"title": f"事件 {event_id} 已忽略",
"summary": f"操作員 {operator} 手動忽略告警事件",
"metadata": {
"event_id": event_id,
"operator": operator,
"operator_tg_id": user.id,
},
})
except Exception as e:
logger.warning(f"[TelegramBot] event_ignore dispatch failed: {e}")
# 2) KM 訓練數據 —— 幫 L1/L2 學習哪類事件不需打擾
try:
store_insight(
insight_type="event_ignore_feedback",
content=f"管理員忽略事件event_id={event_id}),訓練降噪策略",
period=date_cls.today().isoformat(),
metadata={
"event_id": event_id,
"decision": "ignore",
"operator": operator,
"operator_tg_id": user.id,
},
)
except Exception as e:
logger.error(f"[TelegramBot] event_ignore store_insight failed: {e}")
await query.answer("決策已發送但 KM 寫入失敗", show_alert=True)
return
# 3) ack + 編輯訊息
await query.answer("已忽略此事件", show_alert=False)
ts = datetime.now().strftime("%H:%M")
footer = f"\n\n━━━━━━━━━━━━━━━━━━━━\n🛑 已忽略 by {operator} at {ts}"
try:
await query.edit_message_text(
(query.message.text or "") + footer,
parse_mode='HTML',
)
except Exception as e:
logger.warning(f"[TelegramBot] event_ignore edit_message failed: {e}")
async def _show_trend_by_category(self, query, category: str):
"""顯示指定分類的趨勢"""
try:

View File

@@ -283,9 +283,12 @@ def price_decision(product_name: str, product_sku: str,
message += f"🔗 洞察 ID<code>{insight_id}</code>\n"
message += f"━━━━━━━━━━━━━━━━━━━━"
# ADR-012: callback_data 採短 prefixmomo:pa/pr:{insight_id})— 64-byte 安全、與 L2 handler 一致
# 降級:若無 insight_id不應發生但保底以 sku 做後綴 — sku 仍可能超長,呼叫者有責任提供 insight_id
_cb_id = str(insight_id) if insight_id else f"sku_{product_sku}"[:40]
keyboard = {"inline_keyboard": [[
{"text": "✅ 確認執行", "callback_data": f"momo:price_decision:approve:{product_sku}"},
{"text": "❌ 拒絕", "callback_data": f"momo:price_decision:reject:{product_sku}"},
{"text": "✅ 確認執行", "callback_data": f"momo:pa:{_cb_id}"},
{"text": "❌ 拒絕", "callback_data": f"momo:pr:{_cb_id}"},
]]}
return message, keyboard
@@ -303,11 +306,13 @@ def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple:
if len(items) > 6:
lines.append(f"<i>…另有 {len(items)-6} 個 SKU</i>")
lines.append("━━━━━━━━━━━━━━━━━━━━")
# ADR-012: 短 prefix bpa=batch_price_approve / bpr=batch_price_reject預留 64-byte buffer
_bid = str(batch_id)[:48]
keyboard = {"inline_keyboard": [[
{"text": f"✅ 全部確認({len(items)}項)",
"callback_data": f"momo:batch_decision:approve:{batch_id}"},
"callback_data": f"momo:bpa:{_bid}"},
{"text": "❌ 取消",
"callback_data": f"momo:batch_decision:reject:{batch_id}"},
"callback_data": f"momo:bpr:{_bid}"},
]]}
return "\n".join(lines), keyboard
@@ -399,9 +404,11 @@ def triaged_alert(base_event: Dict[str, Any], tier_label: str,
if trace:
lines.append(f"<pre>{trace[-400:]}</pre>")
# ADR-012: eig=event_ignoreevent_id 截斷確保 ≤ 60 bytes留 buffer
_eid = str(event_id)[:52]
keyboard = {"inline_keyboard": [
[{"text": "🛑 忽略此事件",
"callback_data": f"momo:event_ignore:{event_id}"}],
"callback_data": f"momo:eig:{_eid}"}],
]}
return "\n".join(lines), keyboard
@@ -467,3 +474,140 @@ def report(title: str, report_type: str, period: str, content_md: str) -> str:
def _send_telegram(msg: str, chat_ids: Optional[list] = None,
reply_markup: Optional[Dict[str, Any]] = None) -> bool:
return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)
# ══════════════════════════════════════════════════════════════════════════════
# 決策回執模板L2 / L3 按鈕點擊後編輯原訊息使用)
# ADR-012 Phase 4審計留痕 — 操作者、時間、action、結果
# ══════════════════════════════════════════════════════════════════════════════
# Asia/TaipeiUTC+8— 容器可能跑 UTC這裡手動偏移避免依賴 tzdata
_TAIPEI_UTC_OFFSET_HOURS = 8
def _now_taipei_hhmm() -> str:
"""回傳 Asia/Taipei 的 HH:MM容器 TZ 不可靠,手動加 8h"""
try:
from datetime import timedelta, timezone
tz = timezone(timedelta(hours=_TAIPEI_UTC_OFFSET_HOURS))
return datetime.now(tz).strftime("%H:%M")
except Exception:
return datetime.now().strftime("%H:%M")
def _html_escape(s: Any) -> str:
"""最小 HTML 轉義Telegram HTML parse_mode 允許 <b><i><code><pre><a>
故只轉 < > & 避免使用者輸入破版)。"""
text = "" if s is None else str(s)
return (text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;"))
def decision_result(original_text: str, action: str, operator: str,
note: Optional[str] = None, **extras: Any) -> str:
"""L2 單品定價決策回執approve / reject 後編輯原訊息)。
Args:
original_text: 原 Telegram 訊息文字query.message.text
action: "approve" / "reject"
operator: 操作者顯示名稱user.full_name 或 id_<tg_id>
note: 選填補充說明(例:訓練保守策略)
**extras: 前向相容預留(目前忽略)
Returns:
原訊息 + 分隔線 + 稽核區塊HTML parse_mode 安全)
"""
act = (action or "").lower()
if act == "approve":
icon, label = "", "已執行"
elif act == "reject":
icon, label = "", "已拒絕"
else:
icon, label = "", f"已處理({_html_escape(action)}"
ts = _now_taipei_hhmm()
op_esc = _html_escape(operator or "unknown")
base = original_text or ""
lines = [
base,
"",
"━━━━━━━━━━━━━━━━━━━━",
f"{icon} <b>{label}</b> by <b>{op_esc}</b> at {ts}",
]
if note:
lines.append(f"📝 {_html_escape(note)}")
return "\n".join(lines)
def ops_action_result(original_text: str, action: str, operator: str,
result: Optional[Dict[str, Any]] = None,
**extras: Any) -> str:
"""L3 運維決策回執pause1h / pause6h / retry / resume 執行後編輯原訊息)。
Args:
original_text: 原 Telegram 訊息文字
action: "pause1h" / "pause6h" / "retry" / "resume" / 其他
operator: 操作者顯示名稱
result: OPS_ACTIONS 函數回傳 dict常見欄位
status: "ok" / "success" / "error" / "skipped"
error: 錯誤訊息(若 status=error
message / detail: 成功訊息
task_name, duration_min 等
**extras: 前向相容預留
Returns:
原訊息 + 分隔線 + 運維執行結果區塊
"""
action_labels = {
"pause1h": "暫停 1 小時",
"pause6h": "暫停 6 小時",
"retry": "立即重試",
"resume": "恢復執行",
"execute": "執行",
"skip": "略過",
"rollback": "回滾",
}
act_label = action_labels.get((action or "").lower(), _html_escape(action or "未知動作"))
result = result or {}
status = str(result.get("status", "")).lower()
if status in ("ok", "success", "done"):
status_icon, status_text = "", "成功"
elif status in ("error", "failed", "fail"):
status_icon, status_text = "", "失敗"
elif status == "skipped":
status_icon, status_text = "⏭️", "已略過"
else:
status_icon, status_text = "", status or "已提交"
ts = _now_taipei_hhmm()
op_esc = _html_escape(operator or "unknown")
# 擷取結果訊息err > message > detail
detail_msg: Optional[str] = None
for key in ("error", "message", "detail"):
if result.get(key):
detail_msg = str(result[key])[:300]
break
lines = [
original_text or "",
"",
"━━━━━━━━━━━━━━━━━━━━",
f"🛠️ <b>運維動作:</b>{act_label}",
f"👤 操作員:<b>{op_esc}</b> 🕐 {ts}",
f"{status_icon} <b>執行結果:</b>{_html_escape(status_text)}",
]
if detail_msg:
lines.append(f"📋 {_html_escape(detail_msg)}")
# 額外關鍵欄位task_name / duration_min 等)
task_name = result.get("task_name")
if task_name:
lines.append(f"📌 任務:<code>{_html_escape(task_name)}</code>")
duration_min = result.get("duration_min")
if duration_min:
lines.append(f"⏱️ 時長:{_html_escape(duration_min)} 分鐘")
return "\n".join(lines)