diff --git a/routes/openclaw_bot_routes.py b/routes/openclaw_bot_routes.py index 3f13f18..ed97a29 100644 --- a/routes/openclaw_bot_routes.py +++ b/routes/openclaw_bot_routes.py @@ -93,6 +93,25 @@ ALLOWED_USERS: set = ( if _allowed_users_raw.strip() else set() ) +# ── fail-closed 統一授權檢查 ─────────────────────────────────── +# 規則(任一滿足即通過,否則一律拒絕): +# 1. group/supergroup 且 chat_id == ALLOWED_GROUP +# 2. private 且 user_id ∈ ALLOWED_USERS(env 未設 → 空 set → 全拒) +# channel / 未知 chat_type / 缺欄位 → 拒絕 +# 修補 C3:callback handler 原本只擋 group/supergroup 不匹配,private 完全放行; +# message handler `if ALLOWED_USERS and ...` 空 set 時整段失效。 +def _is_authorized(chat_type: str, chat_id, user_id) -> bool: + try: + cid = int(chat_id) if chat_id is not None else None + uid = int(user_id) if user_id is not None else None + except (TypeError, ValueError): + return False + if chat_type in ('group', 'supergroup'): + return cid == ALLOWED_GROUP + if chat_type == 'private': + return uid is not None and uid in ALLOWED_USERS + return False + # ── 速率限制(每用戶每分鐘最多 30 次 AI 呼叫)────────────────── import time as _time_mod _rate_tracker: dict = {} # {user_id: [timestamp, ...]} @@ -5169,11 +5188,16 @@ def telegram_webhook(): data = cq.get('data', '') chat_id = cq['message']['chat']['id'] chat_type = cq['message']['chat'].get('type', '') + cq_from_id = (cq.get('from') or {}).get('id') sys_log.info(f'[OpenClawBot] CB: chat={chat_id} type={chat_type} data={data} allowed={ALLOWED_GROUP}') - if chat_type in ('group', 'supergroup') and chat_id != ALLOWED_GROUP: + # fail-closed:未授權一律安靜拒絕(關閉 loading,不回任何訊息避免偵察) + if not _is_authorized(chat_type, chat_id, cq_from_id): + sys_log.warning( + f'[OpenClawBot] CB rejected: chat={chat_id} type={chat_type} user={cq_from_id}' + ) answer_callback(cq_id) - return jsonify({'ok': True}) + return jsonify({'ok': False, 'error': 'forbidden'}), 403 answer_callback(cq_id) send_typing(chat_id) @@ -5226,20 +5250,21 @@ def telegram_webhook(): text_raw = (msg.get('text') or '').strip() msg_id = msg.get('message_id') + # fail-closed 統一授權檢查(覆蓋 group/supergroup/private/channel/unknown) + _uid = (msg.get('from') or {}).get('id') + if not _is_authorized(chat_type, chat_id, _uid): + sys_log.warning( + f'[OpenClawBot] MSG rejected: chat={chat_id} type={chat_type} user={_uid}' + ) + # 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制) + return jsonify({'ok': False, 'error': 'forbidden'}), 403 + if chat_type in ('group', 'supergroup'): - if chat_id != ALLOWED_GROUP: - return jsonify({'ok': True}) # 移除 @mention(不強制要求,但如有則移除) question = text_raw.replace(BOT_USERNAME, '').strip() - elif chat_type == 'private': - # 私訊存取控制 — 只允許白名單用戶 - _uid = msg.get('from', {}).get('id', 0) - if ALLOWED_USERS and _uid not in ALLOWED_USERS: - send_message(chat_id, "⚠️ 此 Bot 僅限授權用戶使用,請聯絡管理員。", msg_id) - return jsonify({'ok': True}) - question = text_raw else: - return jsonify({'ok': True}) + # 已通過授權的 private chat + question = text_raw # ── 圖片訊息:Gemini Vision 商品辨識 ───────────────────── if not question and msg.get('photo'):