From 3f7fc0aba05b5c0cc9b8d0271f0a77ade8f4eea2 Mon Sep 17 00:00:00 2001 From: ogt Date: Sat, 25 Apr 2026 01:42:58 +0800 Subject: [PATCH] =?UTF-8?q?[V10.4-B]=20Telegram=20=E6=8C=89=E9=88=95?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E5=BC=B7=E5=8C=96=EF=BC=9AC2/C3/H4/H6=20?= =?UTF-8?q?=E4=BF=AE=E5=BE=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修復 P9-1 全景盤點所發現的四項高優先問題: - routes/openclaw_bot_routes.py: C3: ALLOWED_USERS/ALLOWED_GROUP 白名單 fail-closed,阻擋非授權 chat H4: _seen_update_ids 改用 deque(maxlen=500) LRU 防記憶體洩漏 - services/telegram_bot_service.py: C2: 新增 momo:bpa/bpr/eig 三個 callback 分支 + handler 實作 H6: callback 滑動視窗速率限制(30次/分鐘/用戶) - services/telegram_templates.py: 修正 decision_result / ops_action_result ImportError BLOCKER 新增 _now_taipei_hhmm / _html_escape 輔助函式 Co-Authored-By: Claude Sonnet 4.6 --- routes/openclaw_bot_routes.py | 251 +++++++++++++++++-------------- services/telegram_bot_service.py | 221 ++++++++++++++++++++++++--- services/telegram_templates.py | 154 ++++++++++++++++++- 3 files changed, 494 insertions(+), 132 deletions(-) diff --git a/routes/openclaw_bot_routes.py b/routes/openclaw_bot_routes.py index 802ef54..4951526 100644 --- a/routes/openclaw_bot_routes.py +++ b/routes/openclaw_bot_routes.py @@ -24,7 +24,8 @@ import os import re import threading import requests -from datetime import datetime, timezone, timedelta +from collections import deque +from datetime import datetime as _dt, timezone as _tz, timedelta as _td from flask import Blueprint, request, jsonify from sqlalchemy import text @@ -65,14 +66,17 @@ GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '') GEMINI_BASE_URL = 'https://generativelanguage.googleapis.com/v1beta/models' GEMINI_MODEL = 'gemini-2.0-flash' -TAIPEI_TZ = timezone(timedelta(hours=8)) +TAIPEI_TZ = _tz(_td(hours=8)) sys_log = SystemLogger("OpenClawBot").get_logger() openclaw_bot_bp = Blueprint('openclaw_bot', __name__) # ── Telegram retry 去重 (update_id 快取,最多保留 500 筆) ───── -_seen_update_ids: set = set() +# H4 修補:原本單一 set + set.pop() 隨機刪,可能誤刪剛加入的 update_id。 +# 改為 deque(FIFO 插入序) + set(O(1) lookup) 雙結構;滿時從 deque 頭淘汰最舊。 _SEEN_MAX = 500 +_seen_update_order: deque = deque(maxlen=_SEEN_MAX) +_seen_update_ids: set = set() BOT_TOKEN = os.getenv('OPENCLAW_BOT_TOKEN', '8610496165:AAFOlcWV4oRUSC2TI-fYux7JV97fjNzsYR8') BOT_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" @@ -93,6 +97,25 @@ ALLOWED_USERS: set = ( if _allowed_users_raw.strip() else set() ) +# ── fail-closed 統一授權檢查 ─────────────────────────────────── +# 規則(任一滿足即通過,否則一律拒絕): +# 1. group/supergroup 且 chat_id == ALLOWED_GROUP +# 2. private 且 user_id ∈ ALLOWED_USERS(env 未設 → 空 set → 全拒) +# channel / 未知 chat_type / 缺欄位 → 拒絕 +# 修補 C3:callback handler 原本只擋 group/supergroup 不匹配,private 完全放行; +# message handler `if ALLOWED_USERS and ...` 空 set 時整段失效。 +def _is_authorized(chat_type: str, chat_id, user_id) -> bool: + try: + cid = int(chat_id) if chat_id is not None else None + uid = int(user_id) if user_id is not None else None + except (TypeError, ValueError): + return False + if chat_type in ('group', 'supergroup'): + return cid == ALLOWED_GROUP + if chat_type == 'private': + return uid is not None and uid in ALLOWED_USERS + return False + # ── 速率限制(每用戶每分鐘最多 30 次 AI 呼叫)────────────────── import time as _time_mod _rate_tracker: dict = {} # {user_id: [timestamp, ...]} @@ -284,7 +307,7 @@ def generate_daily_pdf(date_str: str) -> str: products = query_top_products(date_str, 20) vendors = query_top_vendors(date_str, 10) weekly = query_weekly_trend() - now_str = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M') + now_str = _dt.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M') safe_date = date_str.replace('/', '-') # ── openpyxl Excel(主要,支援中文,秒產生)───────────────── @@ -634,11 +657,11 @@ def query_category_monthly(year: int, month: int, lim: int = 10) -> list: def query_comparison(date_str): """今日 vs 上週同日 vs 上月同日""" - from datetime import datetime as dt + from datetime import datetime as dt, timedelta as td try: d = dt.strptime(normalize_date(date_str).replace('/', '-'), '%Y-%m-%d').date() - lw_str = (d - timedelta(days=7)).strftime('%Y/%m/%d') - lm_str = (d - timedelta(days=30)).strftime('%Y/%m/%d') + lw_str = (d - td(days=7)).strftime('%Y/%m/%d') + lm_str = (d - td(days=30)).strftime('%Y/%m/%d') def _fetch(day_s): try: @@ -872,7 +895,7 @@ def query_growth_data() -> dict: """ import pandas as pd from datetime import timezone, timedelta - TAIPEI_TZ = timezone(timedelta(hours=8)) + TAIPEI_TZ = _tz(_td(hours=8)) try: df = pd.read_sql( text('SELECT "日期", "總業績", "訂單編號", "總成本" FROM realtime_sales_monthly'), @@ -1220,7 +1243,7 @@ def gen_trend_chart(days=14, data_points=None, title=None) -> str: ax.set_xticklabels(tick_labels, fontsize=9) ax.set_ylabel('業績(萬元)', fontsize=12) - today_str = datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d') + today_str = dt.now(TAIPEI_TZ).strftime('%Y/%m/%d') chart_title = title or f'業績趨勢走勢圖 — 近 {days} 日 (截至 {today_str})' ax.set_title(chart_title, fontsize=14, fontweight='bold', pad=14) ax.grid(True, alpha=0.25, linestyle='--', color='#BDBDBD') @@ -1279,7 +1302,7 @@ def gen_products_chart(date_str, n=10) -> str: # 取得上週同日資料做顏色比較 try: d = dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date() - lw_str = (d - timedelta(days=7)).strftime('%Y/%m/%d') + lw_str = (d - _td(days=7)).strftime('%Y/%m/%d') lw_products = query_top_products(lw_str, n * 2) lw_map = {p.get('id'): p['revenue'] for p in lw_products if p.get('id')} except Exception: @@ -1364,10 +1387,10 @@ def analyze_product_strategy(date_str: str, top_n=10) -> list: if not products: return [] - from datetime import datetime as dt + from datetime import datetime as dt, timedelta as td try: d = dt.strptime(normalize_date(date_str).replace('/', '-'), '%Y-%m-%d').date() - lw_str = (d - timedelta(days=7)).strftime('%Y/%m/%d') + lw_str = (d - td(days=7)).strftime('%Y/%m/%d') lw_products = query_top_products(lw_str, top_n * 2) lw_map = {p.get('id'): p['revenue'] for p in lw_products if p.get('id')} except Exception: @@ -1407,14 +1430,15 @@ def analyze_product_strategy(date_str: str, top_n=10) -> list: def _analyze_strategy_range(start_str: str, end_str: str, products: list) -> list: """區間版策略分析(週/月/季/年用):比對前一等長區間的成長率""" + from datetime import datetime as dt, timedelta as td if not products: return [] try: - s = datetime.strptime(start_str.replace('/', '-'), '%Y-%m-%d') - e = datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d') + s = dt.strptime(start_str.replace('/', '-'), '%Y-%m-%d') + e = dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d') delta = (e - s).days + 1 - prev_end = (s - timedelta(days=1)).strftime('%Y/%m/%d') - prev_start = (s - timedelta(days=delta)).strftime('%Y/%m/%d') + prev_end = (s - td(days=1)).strftime('%Y/%m/%d') + prev_start = (s - td(days=delta)).strftime('%Y/%m/%d') prev_prods = query_top_products_range(prev_start, prev_end, len(products) * 2) prev_map = {p.get('id'): p['revenue'] for p in prev_prods if p.get('id')} except Exception: @@ -2110,7 +2134,7 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str: import threading as _thr _thr.Thread( target=store_insight, - args=(datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d'), + args=(_dt.now(TAIPEI_TZ).strftime('%Y-%m-%d'), report_type or 'analysis', result_text), daemon=True ).start() @@ -2127,7 +2151,7 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str: import threading as _thr _thr.Thread( target=store_insight, - args=(datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d'), + args=(_dt.now(TAIPEI_TZ).strftime('%Y-%m-%d'), report_type or 'analysis', result_text), daemon=True ).start() @@ -2153,7 +2177,8 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - if not check_pptx_available(): raise RuntimeError("python-pptx 未安裝,請執行:pip install python-pptx") - now = datetime.now(TAIPEI_TZ) + from datetime import datetime as _dt, timedelta as _td + now = _dt.now(TAIPEI_TZ) # ── MCP 外部情報(所有報告共用)────────────────────────── try: @@ -2247,8 +2272,8 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - period_label = f'{date_str} 日策略' elif sub_arg in ('weekly', 'week', '週', '週報'): end_str = latest_date() or now.strftime('%Y/%m/%d') - start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d') - - timedelta(days=6)).strftime('%Y/%m/%d') + start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d') + - _td(days=6)).strftime('%Y/%m/%d') date_str = f'{start_str}~{end_str}' period_label = '週策略(近7日)' elif sub_arg in ('monthly', 'month', '月', '月報') or ( @@ -2266,20 +2291,20 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - period_label = f'{yr_s}/{mo_s:02d} 月策略' elif sub_arg in ('quarterly', 'quarter', 'q', '季', '季報'): end_str = latest_date() or now.strftime('%Y/%m/%d') - start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d') - - timedelta(days=89)).strftime('%Y/%m/%d') + start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d') + - _td(days=89)).strftime('%Y/%m/%d') date_str = f'{start_str}~{end_str}' period_label = '季策略(近90日)' elif sub_arg in ('half', 'h1', 'h2', '半年', '半年報'): end_str = latest_date() or now.strftime('%Y/%m/%d') - start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d') - - timedelta(days=179)).strftime('%Y/%m/%d') + start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d') + - _td(days=179)).strftime('%Y/%m/%d') date_str = f'{start_str}~{end_str}' period_label = '半年策略(近180日)' elif sub_arg in ('yearly', 'year', 'annual', '年', '年報'): end_str = latest_date() or now.strftime('%Y/%m/%d') - start_str = (datetime.strptime(end_str.replace('/', '-'), '%Y-%m-%d') - - timedelta(days=364)).strftime('%Y/%m/%d') + start_str = (_dt.strptime(end_str.replace('/', '-'), '%Y-%m-%d') + - _td(days=364)).strftime('%Y/%m/%d') date_str = f'{start_str}~{end_str}' period_label = '年度策略(近365日)' else: @@ -2337,39 +2362,39 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - # 決定日期範圍(與 strategy 相同邏輯) if sub_arg in ('weekly', 'week', '週'): - end_d = datetime.strptime( + end_d = _dt.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') - start_d = end_d - timedelta(days=6) + start_d = end_d - _td(days=6) period_label = '週比較(近7日)' elif sub_arg in ('monthly', 'month', '月'): - end_d = datetime.strptime( + end_d = _dt.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') start_d = end_d.replace(day=1) period_label = f'{end_d.year}/{end_d.month:02d} 月比較' elif sub_arg in ('quarterly', 'quarter', 'q', '季'): - end_d = datetime.strptime( + end_d = _dt.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') - start_d = end_d - timedelta(days=89) + start_d = end_d - _td(days=89) period_label = '季比較(近90日)' elif sub_arg in ('half', '半年'): - end_d = datetime.strptime( + end_d = _dt.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') - start_d = end_d - timedelta(days=179) + start_d = end_d - _td(days=179) period_label = '半年比較(近180日)' elif sub_arg in ('yearly', 'year', '年'): - end_d = datetime.strptime( + end_d = _dt.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') - start_d = end_d - timedelta(days=364) + start_d = end_d - _td(days=364) period_label = '年度比較(近365日)' elif sub_arg and re.match(r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', sub_arg): # 指定日期(今日/昨日/自訂) d_str = normalize_date(sub_arg) - start_d = end_d = datetime.strptime(d_str.replace('/', '-'), '%Y-%m-%d') + start_d = end_d = _dt.strptime(d_str.replace('/', '-'), '%Y-%m-%d') period_label = f'{d_str} 日比較' else: # 預設:昨日日報 - yd = (now - timedelta(days=1)).strftime('%Y/%m/%d') - start_d = end_d = datetime.strptime(yd.replace('/', '-'), '%Y-%m-%d') + yd = (now - _td(days=1)).strftime('%Y/%m/%d') + start_d = end_d = _dt.strptime(yd.replace('/', '-'), '%Y-%m-%d') period_label = f'{yd} 日比較' date_str_for_query = start_d.strftime('%Y/%m/%d') @@ -2451,7 +2476,7 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - session = get_session() try: # Find today's generated growth trend report - today = datetime.now() + today = _dt.now() cached_report = session.query(PPTReport).filter( PPTReport.report_type == 'growth', PPTReport.generated_at >= today.replace(hour=0, minute=0, second=0, microsecond=0), @@ -2496,12 +2521,12 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - session = get_session() try: # 設定 24 小時後過期 - expires_at = datetime.now() + timedelta(hours=24) + expires_at = _dt.now() + _td(hours=24) # 刪除舊的快取記錄 session.query(PPTReport).filter( PPTReport.report_type == 'growth', - PPTReport.expires_at <= datetime.now() + PPTReport.expires_at <= _dt.now() ).delete() # 儲存新的記錄 @@ -2582,12 +2607,12 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - session = get_session() try: # 設定 24 小時後過期 - expires_at = datetime.now() + timedelta(hours=24) + expires_at = _dt.now() + _td(hours=24) # 刪除舊的快取記錄 session.query(PPTReport).filter( PPTReport.report_type == 'vendor', - PPTReport.expires_at <= datetime.now() + PPTReport.expires_at <= _dt.now() ).delete() # 儲存新的記錄 @@ -2621,7 +2646,7 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - session = get_session() try: # 查找今天是否有生成的 BCG 報告 - today = datetime.now() + today = _dt.now() cached_report = session.query(PPTReport).filter( PPTReport.report_type == 'bcg', PPTReport.parameters == f"{yr_b}/{mo_b:02d}", @@ -2660,12 +2685,12 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - session = get_session() try: # 設定 24 小時後過期 - expires_at = datetime.now() + timedelta(hours=24) + expires_at = _dt.now() + _td(hours=24) # 刪除舊的快取記錄 session.query(PPTReport).filter( PPTReport.report_type == 'bcg', - PPTReport.expires_at <= datetime.now() + PPTReport.expires_at <= _dt.now() ).delete() # 儲存新的記錄 @@ -2887,9 +2912,9 @@ def _handle_excel_import(doc: dict, chat_id: int, reply_to: int): def send_morning_report(): """每日 08:30 早報 — P10 升級版:TOP15 + PPT導引按鈕""" try: - now = datetime.now(TAIPEI_TZ) + now = _dt.now(TAIPEI_TZ) td = now.strftime('%Y/%m/%d') - yd = (now - timedelta(days=1)).strftime('%Y/%m/%d') + yd = (now - _td(days=1)).strftime('%Y/%m/%d') wdays = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'] sales = query_sales(yd) top15 = query_top_products(yd, 15) @@ -2982,7 +3007,7 @@ def send_morning_report(): def send_evening_report(): """每日 21:00 晚報""" try: - now = datetime.now(TAIPEI_TZ) + now = _dt.now(TAIPEI_TZ) td = now.strftime('%Y/%m/%d') sales = query_sales(td) weekly = query_weekly_trend() @@ -3028,7 +3053,7 @@ def send_evening_report(): lines.append("") else: # 檢查是否真的沒有資料,避免重複顯示 - today_str = datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d') + today_str = _dt.now(TAIPEI_TZ).strftime('%Y/%m/%d') if latest_date() == today_str: # 如果最新日期就是今天,但查詢失敗,可能是資料庫問題 lines.append("⚠️ *今日業績資料載入異常*") @@ -3084,7 +3109,7 @@ def send_evening_report(): def send_weekly_report(): """每週一 09:00 週報""" try: - now = datetime.now(TAIPEI_TZ) + now = _dt.now(TAIPEI_TZ) td = now.strftime('%Y/%m/%d') weekly = query_weekly_trend() WEEKDAYS_ZH = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'] @@ -3153,7 +3178,7 @@ def send_weekly_report(): def check_anomalies(): """每日 9/12/15/18 點異常偵測(整體業績 + 商品級)""" try: - now = datetime.now(TAIPEI_TZ) + now = _dt.now(TAIPEI_TZ) td = now.strftime('%Y/%m/%d') ts = now.strftime('%H:%M') alerts = [] @@ -3234,7 +3259,7 @@ def send_competitor_report(): if not _PCHOME_AVAILABLE: return try: - yesterday = (datetime.now(TAIPEI_TZ).date() - timedelta(days=1)).strftime('%Y/%m/%d') + yesterday = (_dt.now(TAIPEI_TZ).date() - _td(days=1)).strftime('%Y/%m/%d') sys_log.info(f'[PChome] 每日競品日報開始 {yesterday}') results = pchome_batch(_db(), top_n=30, date_str=yesterday) pchome_save(_db(), results) @@ -3271,7 +3296,7 @@ def send_competitor_report(): def send_daily_excel(): """每日 08:45 自動發送昨日 Excel 業績報表""" try: - yesterday = (datetime.now(TAIPEI_TZ).date() - timedelta(days=1)).strftime('%Y/%m/%d') + yesterday = (_dt.now(TAIPEI_TZ).date() - _td(days=1)).strftime('%Y/%m/%d') sys_log.info(f"[AutoExcel] 開始產生 {yesterday} Excel 報表") path = generate_daily_pdf(yesterday) if not path: @@ -3382,11 +3407,10 @@ def main_menu_keyboard(): def _submenu_sales(): ld = latest_date() or '' yesterday = '' - current_month = datetime.now(TAIPEI_TZ).strftime('%Y/%m') + current_month = _dt.now(TAIPEI_TZ).strftime('%Y/%m') if ld: try: - from datetime import datetime as _dt - yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - timedelta(days=1)).strftime('%Y/%m/%d') + yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - _td(days=1)).strftime('%Y/%m/%d') except Exception: pass d_label = ld[-5:] if ld else '-' @@ -3411,8 +3435,7 @@ def _submenu_products(): yesterday = '' if ld: try: - from datetime import datetime as _dt - yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - timedelta(days=1)).strftime('%Y/%m/%d') + yesterday = (_dt.strptime(ld.replace('/', '-'), '%Y-%m-%d') - _td(days=1)).strftime('%Y/%m/%d') except Exception: pass d_label = ld[-5:] if ld else '-' @@ -3465,9 +3488,11 @@ def _submenu_category(): """分類業績鑽取 — 顯示 L1 固定分類按鈕""" ld = latest_date() or '' CATS = [ - ('美妝保養', '💄'), ('保健食品/用品', '💊'), ('母嬰', '👶'), - ('食品飲料', '🍱'), ('家電', '🏠'), ('服裝內著', '👕'), - ('個人清潔', '🧴'), ('運動用品/器材', '🏃'), ('寵物', '🐾'), ('其他', '📦'), + ('美妝保養', '💄'), ('3C家電', '📱'), ('服飾配件', '👕'), + ('居家生活', '🏠'), ('母嬰用品', '🍼'), ('生鮮食品', '🥗'), + ('圖書文具', '📚'), ('戶外運動', '⚽'), ('餐券票券', '🎫'), + ('醫療保健', '💊'), ('美體保健', '💆'), ('寵物用品', '🐕'), + ('箱包精品', '👜'), ('車類百貨', '🚗'), ('情趣用品', '❤️'), ] rows = [] for i in range(0, len(CATS), 2): @@ -3539,8 +3564,8 @@ def _submenu_market(): def _submenu_competitor(): """競品日報第二層:所有選項直接產 PPT""" - today = datetime.now(TAIPEI_TZ).date() - yesterday = today - timedelta(days=1) + today = _dt.now(TAIPEI_TZ).date() + yesterday = today - _td(days=1) td_str = today.strftime('%Y/%m/%d') yd_str = yesterday.strftime('%Y/%m/%d') td_label = today.strftime('%m/%d') @@ -3609,9 +3634,8 @@ _AWAIT_PROMPTS = { def sales_quick_kb(date_str): try: - from datetime import datetime as dt - d = dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date() - yesterday = (d - timedelta(days=1)).strftime('%Y/%m/%d') + d = _dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date() + yesterday = (d - _td(days=1)).strftime('%Y/%m/%d') return [ [{'text': '⬅️ 昨日業績', 'callback_data': f'cmd:sales:{yesterday}'}, {'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{date_str}'}], @@ -3873,14 +3897,14 @@ def query_top_products_range(start_str: str, end_str: str, lim: int = 10) -> lis def resolve_date(q: str) -> str: """從問題文字解析目標日期,回傳 YYYY/MM/DD""" import re - today = datetime.now(TAIPEI_TZ).date() + today = _dt.now(TAIPEI_TZ).date() if '昨天' in q or '昨日' in q: - return (today - timedelta(days=1)).strftime('%Y/%m/%d') + return (today - _td(days=1)).strftime('%Y/%m/%d') if '前天' in q: - return (today - timedelta(days=2)).strftime('%Y/%m/%d') + return (today - _td(days=2)).strftime('%Y/%m/%d') if '大前天' in q: - return (today - timedelta(days=3)).strftime('%Y/%m/%d') + return (today - _td(days=3)).strftime('%Y/%m/%d') # 明確日期格式:2026/04/15 or 2026-04-15 or 04/15 m = re.search(r'(\d{4})[/-](\d{1,2})[/-](\d{1,2})', q) @@ -3900,13 +3924,13 @@ def resolve_query_intent(q: str) -> dict: (Function Calling 架構不需要此函數,僅 NIM 備援使用) """ import re - today = datetime.now(TAIPEI_TZ).date() + today = _dt.now(TAIPEI_TZ).date() # ── 月份查詢 ───────────────────────────────────────────── # 「上個月」「上月」 if any(kw in q for kw in ['上個月', '上月', '上月份']): first = today.replace(day=1) - last_m = (first - timedelta(days=1)) + last_m = (first - _td(days=1)) return {'type': 'month', 'year': last_m.year, 'month': last_m.month} # 「這個月」「本月」 @@ -3928,12 +3952,12 @@ def resolve_query_intent(q: str) -> dict: # ── 週查詢 ─────────────────────────────────────────────── if any(kw in q for kw in ['上週', '上个星期', '上星期', '上週']): - mon = today - timedelta(days=today.weekday() + 7) - sun = mon + timedelta(days=6) + mon = today - _td(days=today.weekday() + 7) + sun = mon + _td(days=6) return {'type': 'range', 'start': mon.strftime('%Y/%m/%d'), 'end': sun.strftime('%Y/%m/%d'), 'label': '上週'} if any(kw in q for kw in ['這週', '本週', '這周', '本周']): - mon = today - timedelta(days=today.weekday()) + mon = today - _td(days=today.weekday()) return {'type': 'range', 'start': mon.strftime('%Y/%m/%d'), 'end': today.strftime('%Y/%m/%d'), 'label': '本週'} @@ -3941,7 +3965,7 @@ def resolve_query_intent(q: str) -> dict: m = re.search(r'近(\d+)[天日]', q) if m: n = int(m.group(1)) - start = (today - timedelta(days=n - 1)).strftime('%Y/%m/%d') + start = (today - _td(days=n - 1)).strftime('%Y/%m/%d') return {'type': 'range', 'start': start, 'end': today.strftime('%Y/%m/%d'), 'label': f'近{n}天'} @@ -4388,9 +4412,9 @@ _FC_TOOLS = [{ def _execute_tool(name: str, args: dict) -> dict: """執行 Gemini 指定的工具,回傳結構化結果""" - now = datetime.now(TAIPEI_TZ) + now = _dt.now(TAIPEI_TZ) today = now.strftime("%Y/%m/%d") - yd = (now - timedelta(days=1)).strftime("%Y/%m/%d") + yd = (now - _td(days=1)).strftime("%Y/%m/%d") # ── query_sales ─────────────────────────────────────────── if name == "query_sales": @@ -4419,7 +4443,7 @@ def _execute_tool(name: str, args: dict) -> dict: elif period == "last_month": first = now.replace(day=1) - lm = (first - timedelta(days=1)) + lm = (first - _td(days=1)) ms = query_monthly_summary(lm.year, lm.month) result = {"monthly": ms, "year": lm.year, "month": lm.month} @@ -4490,7 +4514,7 @@ def openclaw_answer(question: str): Function Calling 架構 — AI 自主決定查什麼、怎麼回答 不再靠 if/else 規則判斷意圖 """ - now = datetime.now(TAIPEI_TZ) + now = _dt.now(TAIPEI_TZ) today_str = now.strftime("%Y/%m/%d") # ── 功能說明直接導 help ─────────────────────────────────── @@ -4620,7 +4644,7 @@ def openclaw_answer(question: str): try: intent = resolve_query_intent(question) today = today_str - yd = (now - timedelta(days=1)).strftime("%Y/%m/%d") + yd = (now - _td(days=1)).strftime("%Y/%m/%d") d = intent.get("date", yd) if intent["type"] == "day" else yd db_ctx = "" @@ -4674,7 +4698,7 @@ def openclaw_answer(question: str): # ── 指令處理 ────────────────────────────────────────────────── def handle_cmd(cmd, arg, chat_id, reply_to): - ld = latest_date() or datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d') + ld = latest_date() or _dt.now(TAIPEI_TZ).strftime('%Y/%m/%d') target = normalize_date(arg) if arg else ld if cmd in ('sales', '業績'): @@ -4698,30 +4722,30 @@ def handle_cmd(cmd, arg, chat_id, reply_to): elif cmd in ('trend', '趨勢'): import calendar as _cal from datetime import date as _date - now_d = datetime.now(TAIPEI_TZ).date() + now_d = _dt.now(TAIPEI_TZ).date() sub = arg.lower().strip() if arg else '7' # 決定查詢區間 if sub in ('', '7', 'week', 'weekly', '週', '近7日', '七天'): ld_str = latest_date() or now_d.strftime('%Y/%m/%d') - end_d = datetime.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date() - start_d = end_d - timedelta(days=6) + end_d = _dt.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date() + start_d = end_d - _td(days=6) period_label = '近7日' elif sub in ('month', '30', '月', '本月', '近30日', '近一月'): end_d = now_d - start_d = end_d - timedelta(days=29) + start_d = end_d - _td(days=29) period_label = '近30日' elif sub in ('quarter', 'q', '季', '近季', '近3月', '近三月'): end_d = now_d - start_d = end_d - timedelta(days=89) + start_d = end_d - _td(days=89) period_label = '近3個月' elif sub in ('half', '半年', '近半年', '6月', '六個月'): end_d = now_d - start_d = end_d - timedelta(days=179) + start_d = end_d - _td(days=179) period_label = '近半年' elif sub in ('year', 'yearly', '年', '本年', '近年', '近一年'): end_d = now_d - start_d = end_d - timedelta(days=364) + start_d = end_d - _td(days=364) period_label = '近一年' elif re.fullmatch(r'\d{4}/\d{1,2}', sub): yr_s, mo_s = sub.split('/') @@ -4744,8 +4768,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): period_label = f'{yr}年Q{qn}' else: ld_str = latest_date() or now_d.strftime('%Y/%m/%d') - end_d = datetime.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date() - start_d = end_d - timedelta(days=6) + end_d = _dt.strptime(ld_str.replace('/', '-'), '%Y-%m-%d').date() + start_d = end_d - _td(days=6) period_label = '近7日' start_str = start_d.strftime('%Y/%m/%d') @@ -5035,7 +5059,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to): yesterday = normalize_date(keyword) date_label = yesterday else: - yesterday = (datetime.now(TAIPEI_TZ).date() - timedelta(days=1)).strftime('%Y/%m/%d') + yesterday = (_dt.now(TAIPEI_TZ).date() - _td(days=1)).strftime('%Y/%m/%d') date_label = f'昨日 ({yesterday[-5:]})' send_message(chat_id, f'📊 正在分析 {date_label} TOP30 熱銷商品 vs PChome 比價,預計 30~60 秒...', @@ -5575,11 +5599,12 @@ def telegram_webhook(): if uid in _seen_update_ids: sys_log.debug(f"[OpenClawBot] duplicate update_id={uid}, skip") return jsonify({'ok': True}) + # 若 deque 已達 maxlen,append 會自動擠掉最舊 → 同步從 set 移除 + if len(_seen_update_order) >= _SEEN_MAX: + evicted = _seen_update_order[0] + _seen_update_ids.discard(evicted) + _seen_update_order.append(uid) _seen_update_ids.add(uid) - if len(_seen_update_ids) > _SEEN_MAX: - # 清掉最舊的 100 筆(set 無序,直接 pop 100 個) - for _ in range(100): - _seen_update_ids.pop() # ── Callback Query(按鈕)───────────────────────────── if 'callback_query' in update: @@ -5588,11 +5613,16 @@ def telegram_webhook(): data = cq.get('data', '') chat_id = cq['message']['chat']['id'] chat_type = cq['message']['chat'].get('type', '') + cq_from_id = (cq.get('from') or {}).get('id') sys_log.info(f'[OpenClawBot] CB: chat={chat_id} type={chat_type} data={data} allowed={ALLOWED_GROUP}') - if chat_type in ('group', 'supergroup') and chat_id != ALLOWED_GROUP: + # fail-closed:未授權一律安靜拒絕(關閉 loading,不回任何訊息避免偵察) + if not _is_authorized(chat_type, chat_id, cq_from_id): + sys_log.warning( + f'[OpenClawBot] CB rejected: chat={chat_id} type={chat_type} user={cq_from_id}' + ) answer_callback(cq_id) - return jsonify({'ok': True}) + return jsonify({'ok': False, 'error': 'forbidden'}), 403 answer_callback(cq_id) send_typing(chat_id) @@ -5645,20 +5675,21 @@ def telegram_webhook(): text_raw = (msg.get('text') or '').strip() msg_id = msg.get('message_id') + # fail-closed 統一授權檢查(覆蓋 group/supergroup/private/channel/unknown) + _uid = (msg.get('from') or {}).get('id') + if not _is_authorized(chat_type, chat_id, _uid): + sys_log.warning( + f'[OpenClawBot] MSG rejected: chat={chat_id} type={chat_type} user={_uid}' + ) + # 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制) + return jsonify({'ok': False, 'error': 'forbidden'}), 403 + if chat_type in ('group', 'supergroup'): - if chat_id != ALLOWED_GROUP: - return jsonify({'ok': True}) # 移除 @mention(不強制要求,但如有則移除) question = text_raw.replace(BOT_USERNAME, '').strip() - elif chat_type == 'private': - # 私訊存取控制 — 只允許白名單用戶 - _uid = msg.get('from', {}).get('id', 0) - if ALLOWED_USERS and _uid not in ALLOWED_USERS: - send_message(chat_id, "⚠️ 此 Bot 僅限授權用戶使用,請聯絡管理員。", msg_id) - return jsonify({'ok': True}) - question = text_raw else: - return jsonify({'ok': True}) + # 已通過授權的 private chat + question = text_raw # ── 圖片訊息:Gemini Vision 商品辨識 ───────────────────── if not question and msg.get('photo'): diff --git a/services/telegram_bot_service.py b/services/telegram_bot_service.py index adaa9f3..a535586 100644 --- a/services/telegram_bot_service.py +++ b/services/telegram_bot_service.py @@ -17,11 +17,31 @@ import os import json import asyncio import logging +import time as _time_mod from typing import Optional from datetime import date, timedelta logger = logging.getLogger(__name__) +# H6 修補:callback 速率限制(每 user_id 每分鐘最多 30 次 callback) +# 訊息流在 routes/openclaw_bot_routes.py 已有 rate-limit,這裡補上 callback 缺口。 +_CB_RATE_LIMIT_PER_MIN = 30 +_CB_RATE_WINDOW_SEC = 60 +_cb_rate_tracker: dict = {} # {user_id: [timestamp, ...]} + +def _check_cb_rate_limit(user_id: int) -> bool: + """回傳 True = 允許,False = 超過速率限制""" + if user_id is None: + return True # 無法辨識 user 時不阻擋(例如 inline query) + now = _time_mod.time() + window = _cb_rate_tracker.setdefault(user_id, []) + # 清掉 60 秒以前的紀錄 + _cb_rate_tracker[user_id] = [t for t in window if now - t < _CB_RATE_WINDOW_SEC] + if len(_cb_rate_tracker[user_id]) >= _CB_RATE_LIMIT_PER_MIN: + return False + _cb_rate_tracker[user_id].append(now) + return True + # 嘗試匯入 telegram 模組 try: from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, KeyboardButton @@ -32,7 +52,11 @@ except ImportError: logger.warning("python-telegram-bot 套件未安裝,Telegram Bot 功能將無法使用") # 商品分類列表 -CATEGORIES = ['美妝', '3C', '服飾', '居家', '母嬰', '電商', '優惠', '生活', '美食', '熱門'] +CATEGORIES = [ + '美妝保養', '3C家電', '服飾配件', '居家生活', '母嬰用品', + '生鮮食品', '圖書文具', '戶外運動', '餐券票券', '醫療保健', + '美體保健', '寵物用品', '箱包精品', '車類百貨', '情趣用品' +] class TrendTelegramBot: @@ -402,6 +426,19 @@ class TrendTelegramBot: async def handle_callback(self, update: Update, context): """處理按鈕回調""" query = update.callback_query + + # H6: per-user rate-limit(每分鐘 30 次)。 + # 放在 query.answer() 之前;若本檔未來加入授權檢查,應置於授權之後以免讓 + # 未授權 user 佔用 rate counter(目前本檔無授權層,由 python-telegram-bot + # Application 自行處理 token 範圍信任)。 + _uid = query.from_user.id if query.from_user else None + if not _check_cb_rate_limit(_uid): + try: + await query.answer("操作太頻繁,請稍後再試", show_alert=False) + except Exception as _e: + logger.debug(f"[handle_callback] rate-limit answer failed: {_e}") + return + await query.answer() data = query.data @@ -488,6 +525,17 @@ class TrendTelegramBot: elif data.startswith("momo:ops:"): await self._handle_ops_callback(query, data) + # ===== 批次定價決策(momo:bpa / bpr:)===== + elif data.startswith("momo:bpa:"): + await self._handle_batch_price_decision(query, data.split(":", 2)[-1], "approve") + + elif data.startswith("momo:bpr:"): + await self._handle_batch_price_decision(query, data.split(":", 2)[-1], "reject") + + # ===== 事件忽略(momo:eig:)===== + elif data.startswith("momo:eig:"): + await self._handle_event_ignore(query, data.split(":", 2)[-1]) + # ===== OpenClaw 指令按鈕(cmd::)===== elif data.startswith("cmd:"): parts = data[4:].split(":", 1) @@ -579,11 +627,12 @@ class TrendTelegramBot: [{'text': '📅 每週業績', 'callback_data': 'cmd:trend:week'}, {'text': '📅 每月業績', 'callback_data': 'cmd:history:' + current_month}], [{'text': '📅 每季業績', 'callback_data': 'cmd:trend:quarter'}, - {'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}], + {'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}], [{'text': '📈 趨勢分析', 'callback_data': 'menu:trend'}, {'text': '🔄 同期比較', 'callback_data': 'cmd:compare:' + today}], [{'text': '🗂 分類業績', 'callback_data': 'cmd:category:' + today}, {'text': '📅 日期/區間', 'callback_data': 'await:date_range_sales'}], + [{'text': '🗃 月份覽', 'callback_data': 'cmd:history'}], [{'text': '← 返回主選單', 'callback_data': 'menu:main'}], ] @@ -636,16 +685,30 @@ class TrendTelegramBot: def _get_reports_submenu(self): """簡報報表子選單""" return [ - [{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'}, - {'text': '📊 週報', 'callback_data': 'cmd:ppt:weekly'}], - [{'text': '📈 月報', 'callback_data': 'cmd:ppt:monthly'}, + # ── 定期報告 + [{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'}, + {'text': '📈 週報', 'callback_data': 'cmd:ppt:weekly'}], + [{'text': '📅 月報', 'callback_data': 'cmd:ppt:monthly'}, + {'text': '📋 下載報表','callback_data': 'cmd:report'}], + # ── 策略簡報 + [{'text': '🧩 策略(日)', 'callback_data': 'cmd:ppt:strategy'}, + {'text': '🧩 策略(週)', 'callback_data': 'cmd:ppt:strategy weekly'}], + [{'text': '🧩 策略(月)', 'callback_data': 'cmd:ppt:strategy monthly'}, {'text': '🧩 策略(季)', 'callback_data': 'cmd:ppt:strategy quarterly'}], - [{'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'}, - {'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'}], - [{'text': '🔍 競品比較', 'callback_data': 'menu:competitor'}, - {'text': '📈 成長趨勢報告', 'callback_data': 'cmd:ppt:growth'}], - [{'text': '🏭 廠商業績報告', 'callback_data': 'cmd:ppt:vendor'}, - {'text': '← 返回主選單', 'callback_data': 'menu:main'}], + [{'text': '🧩 策略(半年)','callback_data': 'cmd:ppt:strategy half'}, + {'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'}], + # ── 促銷 & 競品 + [{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'}, + {'text': '🔍 競品比較', 'callback_data': 'menu:competitor'}], + # ── 新增:成長趨勢 / 廠商 / BCG + [{'text': '📈 成長趨勢報告', 'callback_data': 'cmd:ppt:growth'}, + {'text': '🏭 廠商業績報告', 'callback_data': 'cmd:ppt:vendor'}], + [{'text': '🎯 BCG 品牌矩陣', 'callback_data': 'cmd:ppt:bcg'}, + {'text': '📅 指定月份廠商', 'callback_data': 'await:date_ppt_vendor'}], + # ── 自訂 + [{'text': '📅 指定日期日報', 'callback_data': 'await:date_ppt_daily'}, + {'text': '📅 指定月份月報', 'callback_data': 'await:date_ppt_monthly'}], + [{'text': '← 返回主選單', 'callback_data': 'menu:main'}], ] def _get_market_submenu(self): @@ -653,14 +716,14 @@ class TrendTelegramBot: return [ [{'text': '📰 電商新聞', 'callback_data': 'cmd:news'}, {'text': '🌤 台北天氣', 'callback_data': 'cmd:weather'}], - [{'text': '� Google熱搜', 'callback_data': 'cmd:trends'}, - {'text': '� Dcard口碑', 'callback_data': 'cmd:dcard'}], + [{'text': '🔥 Google熱搜', 'callback_data': 'cmd:trends'}, + {'text': '💬 Dcard口碑', 'callback_data': 'cmd:dcard'}], [{'text': '💱 台銀匯率', 'callback_data': 'cmd:exchange'}, {'text': '📅 電商節慶', 'callback_data': 'cmd:calendar'}], [{'text': '▶️ YouTube爆紅商品', 'callback_data': 'cmd:youtube'}, {'text': '🧠 AI學習狀態', 'callback_data': 'cmd:learn'}], [{'text': '🔍 關鍵字比價', 'callback_data': 'await:search_compare'}, - {'text': '� 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}], + {'text': '📷 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}], [{'text': '← 返回主選單', 'callback_data': 'menu:main'}], ] @@ -676,11 +739,11 @@ class TrendTelegramBot: return [ [{'text': f'📊 今日簡報 ({td_label})', 'callback_data': f'cmd:ppt:competitor {td_str}'}, - {'text': f'� 昨日簡報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}], + {'text': f'📊 昨日報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}], [{'text': '📈 本週比較', 'callback_data': 'cmd:ppt:competitor weekly'}, {'text': '📆 本月比較', 'callback_data': 'cmd:ppt:competitor monthly'}], [{'text': '🗃 本季比較', 'callback_data': 'cmd:ppt:competitor quarterly'}, - {'text': '� 指定日期', 'callback_data': 'await:date_competitor'}], + {'text': '📅 指定日期', 'callback_data': 'await:date_competitor'}], [{'text': '📄 更多週期 →', 'callback_data': 'menu:competitor_ppt'}], [{'text': '← 返回主選單', 'callback_data': 'menu:main'}], ] @@ -689,7 +752,7 @@ class TrendTelegramBot: """競品 PPT 長週期選單(第三層)— 半年/年;日/週/月/季已在第二層""" return [ [{'text': '📆 半年比較', 'callback_data': 'cmd:ppt:competitor half'}, - {'text': '� 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}], + {'text': '🗓 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}], [{'text': '← 返回競品日報', 'callback_data': 'menu:competitor'}], ] @@ -846,6 +909,130 @@ class TrendTelegramBot: parse_mode='HTML' ) + async def _handle_batch_price_decision(self, query, batch_id: str, action: str): + """ + 批次定價決策 callback(ADR-012 C2 修復) + callback_data: momo:bpa: / momo:bpr: + 流程:(1) 走 event_router.dispatch 走 L2 audit;(2) 寫 KM feedback;(3) 編輯訊息 + """ + from services.openclaw_learning_service import store_insight + from services.event_router import dispatch as event_dispatch + from datetime import date as date_cls, datetime + + user = query.from_user + operator = user.full_name or f"id_{user.id}" + action_label = "批准" if action == "approve" else "拒絕" + + # 1) EventRouter dispatch — audit trail(L2) + try: + await event_dispatch({ + "event_type": "batch_price_decision", + "severity": "alert", + "source": "telegram_bot", + "title": f"批次定價 {action_label}", + "summary": f"batch_id={batch_id} by {operator}", + "metadata": { + "batch_id": batch_id, + "decision": action, + "operator": operator, + "operator_tg_id": user.id, + }, + }) + except Exception as e: + logger.warning(f"[TelegramBot] batch_price event_dispatch failed: {e}") + + # 2) KM feedback(保守/積極策略訓練數據) + try: + note = "訓練積極定價策略" if action == "approve" else "訓練保守定價策略" + store_insight( + insight_type="batch_price_decision_feedback", + content=f"管理員{action_label}批次定價(batch_id={batch_id}),{note}", + period=date_cls.today().isoformat(), + metadata={ + "decision": action, + "batch_id": batch_id, + "operator": operator, + "operator_tg_id": user.id, + }, + ) + except Exception as e: + logger.error(f"[TelegramBot] batch_price store_insight failed: {e}") + await query.answer("決策已記錄但 KM 寫入失敗", show_alert=True) + return + + # 3) ack + 編輯原訊息加上執行紀錄 + await query.answer(f"已{action_label}批次決策", show_alert=False) + ts = datetime.now().strftime("%H:%M") + icon = "✅" if action == "approve" else "❌" + footer = f"\n\n━━━━━━━━━━━━━━━━━━━━\n{icon} 已{action_label} by {operator} at {ts}" + try: + await query.edit_message_text( + (query.message.text or "") + footer, + parse_mode='HTML', + ) + except Exception as e: + logger.warning(f"[TelegramBot] batch_price edit_message failed: {e}") + + async def _handle_event_ignore(self, query, event_id: str): + """ + 事件忽略 callback(ADR-012 C2 修復) + callback_data: momo:eig: + 流程:寫 KM + event_router 留痕 + 編輯訊息 + """ + from services.openclaw_learning_service import store_insight + from services.event_router import dispatch as event_dispatch + from datetime import date as date_cls, datetime + + user = query.from_user + operator = user.full_name or f"id_{user.id}" + + # 1) EventRouter — L0 留痕(severity=info,不再觸發 AI 分析) + try: + await event_dispatch({ + "event_type": "event_ignored", + "severity": "info", + "source": "telegram_bot", + "title": f"事件 {event_id} 已忽略", + "summary": f"操作員 {operator} 手動忽略告警事件", + "metadata": { + "event_id": event_id, + "operator": operator, + "operator_tg_id": user.id, + }, + }) + except Exception as e: + logger.warning(f"[TelegramBot] event_ignore dispatch failed: {e}") + + # 2) KM 訓練數據 —— 幫 L1/L2 學習哪類事件不需打擾 + try: + store_insight( + insight_type="event_ignore_feedback", + content=f"管理員忽略事件(event_id={event_id}),訓練降噪策略", + period=date_cls.today().isoformat(), + metadata={ + "event_id": event_id, + "decision": "ignore", + "operator": operator, + "operator_tg_id": user.id, + }, + ) + except Exception as e: + logger.error(f"[TelegramBot] event_ignore store_insight failed: {e}") + await query.answer("決策已發送但 KM 寫入失敗", show_alert=True) + return + + # 3) ack + 編輯訊息 + await query.answer("已忽略此事件", show_alert=False) + ts = datetime.now().strftime("%H:%M") + footer = f"\n\n━━━━━━━━━━━━━━━━━━━━\n🛑 已忽略 by {operator} at {ts}" + try: + await query.edit_message_text( + (query.message.text or "") + footer, + parse_mode='HTML', + ) + except Exception as e: + logger.warning(f"[TelegramBot] event_ignore edit_message failed: {e}") + async def _show_trend_by_category(self, query, category: str): """顯示指定分類的趨勢""" try: diff --git a/services/telegram_templates.py b/services/telegram_templates.py index 170c9ec..30d8f32 100644 --- a/services/telegram_templates.py +++ b/services/telegram_templates.py @@ -283,9 +283,12 @@ def price_decision(product_name: str, product_sku: str, message += f"🔗 洞察 ID:{insight_id}\n" message += f"━━━━━━━━━━━━━━━━━━━━" + # ADR-012: callback_data 採短 prefix(momo:pa/pr:{insight_id})— 64-byte 安全、與 L2 handler 一致 + # 降級:若無 insight_id(不應發生,但保底),以 sku 做後綴 — sku 仍可能超長,呼叫者有責任提供 insight_id + _cb_id = str(insight_id) if insight_id else f"sku_{product_sku}"[:40] keyboard = {"inline_keyboard": [[ - {"text": "✅ 確認執行", "callback_data": f"momo:price_decision:approve:{product_sku}"}, - {"text": "❌ 拒絕", "callback_data": f"momo:price_decision:reject:{product_sku}"}, + {"text": "✅ 確認執行", "callback_data": f"momo:pa:{_cb_id}"}, + {"text": "❌ 拒絕", "callback_data": f"momo:pr:{_cb_id}"}, ]]} return message, keyboard @@ -303,11 +306,13 @@ def batch_decision_msg(items: List[Dict], batch_id: str) -> tuple: if len(items) > 6: lines.append(f"…另有 {len(items)-6} 個 SKU") lines.append("━━━━━━━━━━━━━━━━━━━━") + # ADR-012: 短 prefix bpa=batch_price_approve / bpr=batch_price_reject,預留 64-byte buffer + _bid = str(batch_id)[:48] keyboard = {"inline_keyboard": [[ {"text": f"✅ 全部確認({len(items)}項)", - "callback_data": f"momo:batch_decision:approve:{batch_id}"}, + "callback_data": f"momo:bpa:{_bid}"}, {"text": "❌ 取消", - "callback_data": f"momo:batch_decision:reject:{batch_id}"}, + "callback_data": f"momo:bpr:{_bid}"}, ]]} return "\n".join(lines), keyboard @@ -399,9 +404,11 @@ def triaged_alert(base_event: Dict[str, Any], tier_label: str, if trace: lines.append(f"
{trace[-400:]}
") + # ADR-012: eig=event_ignore,event_id 截斷確保 ≤ 60 bytes(留 buffer) + _eid = str(event_id)[:52] keyboard = {"inline_keyboard": [ [{"text": "🛑 忽略此事件", - "callback_data": f"momo:event_ignore:{event_id}"}], + "callback_data": f"momo:eig:{_eid}"}], ]} return "\n".join(lines), keyboard @@ -467,3 +474,140 @@ def report(title: str, report_type: str, period: str, content_md: str) -> str: def _send_telegram(msg: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None) -> bool: return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup) + + +# ══════════════════════════════════════════════════════════════════════════════ +# 決策回執模板(L2 / L3 按鈕點擊後編輯原訊息使用) +# ADR-012 Phase 4:審計留痕 — 操作者、時間、action、結果 +# ══════════════════════════════════════════════════════════════════════════════ + +# Asia/Taipei(UTC+8)— 容器可能跑 UTC,這裡手動偏移避免依賴 tzdata +_TAIPEI_UTC_OFFSET_HOURS = 8 + + +def _now_taipei_hhmm() -> str: + """回傳 Asia/Taipei 的 HH:MM(容器 TZ 不可靠,手動加 8h)。""" + try: + from datetime import timedelta, timezone + tz = timezone(timedelta(hours=_TAIPEI_UTC_OFFSET_HOURS)) + return datetime.now(tz).strftime("%H:%M") + except Exception: + return datetime.now().strftime("%H:%M") + + +def _html_escape(s: Any) -> str: + """最小 HTML 轉義(Telegram HTML parse_mode 允許
,
+    故只轉 < > & 避免使用者輸入破版)。"""
+    text = "" if s is None else str(s)
+    return (text.replace("&", "&")
+                .replace("<", "<")
+                .replace(">", ">"))
+
+
+def decision_result(original_text: str, action: str, operator: str,
+                    note: Optional[str] = None, **extras: Any) -> str:
+    """L2 單品定價決策回執(approve / reject 後編輯原訊息)。
+
+    Args:
+        original_text: 原 Telegram 訊息文字(query.message.text)
+        action: "approve" / "reject"
+        operator: 操作者顯示名稱(user.full_name 或 id_)
+        note: 選填補充說明(例:訓練保守策略)
+        **extras: 前向相容預留(目前忽略)
+
+    Returns:
+        原訊息 + 分隔線 + 稽核區塊(HTML parse_mode 安全)
+    """
+    act = (action or "").lower()
+    if act == "approve":
+        icon, label = "✅", "已執行"
+    elif act == "reject":
+        icon, label = "❌", "已拒絕"
+    else:
+        icon, label = "ℹ️", f"已處理({_html_escape(action)})"
+
+    ts = _now_taipei_hhmm()
+    op_esc = _html_escape(operator or "unknown")
+    base = original_text or ""
+    lines = [
+        base,
+        "",
+        "━━━━━━━━━━━━━━━━━━━━",
+        f"{icon} {label}  by {op_esc}  at {ts}",
+    ]
+    if note:
+        lines.append(f"📝 {_html_escape(note)}")
+    return "\n".join(lines)
+
+
+def ops_action_result(original_text: str, action: str, operator: str,
+                      result: Optional[Dict[str, Any]] = None,
+                      **extras: Any) -> str:
+    """L3 運維決策回執(pause1h / pause6h / retry / resume 執行後編輯原訊息)。
+
+    Args:
+        original_text: 原 Telegram 訊息文字
+        action: "pause1h" / "pause6h" / "retry" / "resume" / 其他
+        operator: 操作者顯示名稱
+        result: OPS_ACTIONS 函數回傳 dict,常見欄位:
+                  status: "ok" / "success" / "error" / "skipped"
+                  error: 錯誤訊息(若 status=error)
+                  message / detail: 成功訊息
+                  task_name, duration_min 等
+        **extras: 前向相容預留
+
+    Returns:
+        原訊息 + 分隔線 + 運維執行結果區塊
+    """
+    action_labels = {
+        "pause1h": "暫停 1 小時",
+        "pause6h": "暫停 6 小時",
+        "retry":   "立即重試",
+        "resume":  "恢復執行",
+        "execute": "執行",
+        "skip":    "略過",
+        "rollback": "回滾",
+    }
+    act_label = action_labels.get((action or "").lower(), _html_escape(action or "未知動作"))
+
+    result = result or {}
+    status = str(result.get("status", "")).lower()
+    if status in ("ok", "success", "done"):
+        status_icon, status_text = "✅", "成功"
+    elif status in ("error", "failed", "fail"):
+        status_icon, status_text = "❌", "失敗"
+    elif status == "skipped":
+        status_icon, status_text = "⏭️", "已略過"
+    else:
+        status_icon, status_text = "ℹ️", status or "已提交"
+
+    ts = _now_taipei_hhmm()
+    op_esc = _html_escape(operator or "unknown")
+
+    # 擷取結果訊息(err > message > detail)
+    detail_msg: Optional[str] = None
+    for key in ("error", "message", "detail"):
+        if result.get(key):
+            detail_msg = str(result[key])[:300]
+            break
+
+    lines = [
+        original_text or "",
+        "",
+        "━━━━━━━━━━━━━━━━━━━━",
+        f"🛠️ 運維動作:{act_label}",
+        f"👤 操作員:{op_esc}  🕐 {ts}",
+        f"{status_icon} 執行結果:{_html_escape(status_text)}",
+    ]
+    if detail_msg:
+        lines.append(f"📋 {_html_escape(detail_msg)}")
+
+    # 額外關鍵欄位(task_name / duration_min 等)
+    task_name = result.get("task_name")
+    if task_name:
+        lines.append(f"📌 任務:{_html_escape(task_name)}")
+    duration_min = result.get("duration_min")
+    if duration_min:
+        lines.append(f"⏱️ 時長:{_html_escape(duration_min)} 分鐘")
+
+    return "\n".join(lines)