From d276853e54fde7890becee343931fd6cfda44258 Mon Sep 17 00:00:00 2001 From: OoO Date: Tue, 28 Apr 2026 12:49:21 +0800 Subject: [PATCH] fix(post-3.5g): restore _is_authorized fail-closed for callback + message (CRIT-2 + HIGH-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 從 4349db2~1 撈回 _is_authorized() 並重新套用到 callback 與 message handler。 問題: - CRIT-2 (callback fail-open):原本只擋 group/supergroup 不匹配, private chat 任何人都能觸發 callback 指令(按鈕 menu/await/cmd)。 - HIGH-3 (message short-circuit fail):`if ALLOWED_USERS and _uid not in ALLOWED_USERS` 在 OPENCLAW_ALLOWED_USERS 環境變數未設時 → ALLOWED_USERS 為空 set → `if False and ...` 整段不執行 → 所有 private 訊息都通過。 修法(fail-closed 三檢查): 1. 在頂部 import 區下方還原 `_is_authorized(chat_type, chat_id, user_id)`: - group/supergroup:chat_id 必須等於 ALLOWED_GROUP - private:user_id 必須在 ALLOWED_USERS(空 set → 全拒) - channel / 未知 / 缺欄位 → 拒絕 2. callback handler 替換為 `if not _is_authorized(chat_type, chat_id, cq_from_id)` 並從 cq.get('from') 取 user_id(之前完全沒取)。 3. message handler 替換為統一檢查,未授權回 403 + 靜默(不回 Telegram 避免偵察)。 驗證: - AST parse OK - 模擬測試:999999 私訊 → False;111(在白名單)私訊 → True; 錯誤群組 → False;channel → False;None → False - grep 結果:剩下兩處 `_is_authorized` 呼叫(callback 5195, message 5255), 舊的 `ALLOWED_USERS and _uid not in ALLOWED_USERS` 已移除(只留註解描述歷史)。 Critic findings: CRIT-2 + HIGH-3 Co-Authored-By: Claude Opus 4.7 (1M context) --- routes/openclaw_bot_routes.py | 49 ++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/routes/openclaw_bot_routes.py b/routes/openclaw_bot_routes.py index 3f13f18..ed97a29 100644 --- a/routes/openclaw_bot_routes.py +++ b/routes/openclaw_bot_routes.py @@ -93,6 +93,25 @@ ALLOWED_USERS: set = ( if _allowed_users_raw.strip() else set() ) +# ── fail-closed 統一授權檢查 ─────────────────────────────────── +# 規則(任一滿足即通過,否則一律拒絕): +# 1. group/supergroup 且 chat_id == ALLOWED_GROUP +# 2. private 且 user_id ∈ ALLOWED_USERS(env 未設 → 空 set → 全拒) +# channel / 未知 chat_type / 缺欄位 → 拒絕 +# 修補 C3:callback handler 原本只擋 group/supergroup 不匹配,private 完全放行; +# message handler `if ALLOWED_USERS and ...` 空 set 時整段失效。 +def _is_authorized(chat_type: str, chat_id, user_id) -> bool: + try: + cid = int(chat_id) if chat_id is not None else None + uid = int(user_id) if user_id is not None else None + except (TypeError, ValueError): + return False + if chat_type in ('group', 'supergroup'): + return cid == ALLOWED_GROUP + if chat_type == 'private': + return uid is not None and uid in ALLOWED_USERS + return False + # ── 速率限制(每用戶每分鐘最多 30 次 AI 呼叫)────────────────── import time as _time_mod _rate_tracker: dict = {} # {user_id: [timestamp, ...]} @@ -5169,11 +5188,16 @@ def telegram_webhook(): data = cq.get('data', '') chat_id = cq['message']['chat']['id'] chat_type = cq['message']['chat'].get('type', '') + cq_from_id = (cq.get('from') or {}).get('id') sys_log.info(f'[OpenClawBot] CB: chat={chat_id} type={chat_type} data={data} allowed={ALLOWED_GROUP}') - if chat_type in ('group', 'supergroup') and chat_id != ALLOWED_GROUP: + # fail-closed:未授權一律安靜拒絕(關閉 loading,不回任何訊息避免偵察) + if not _is_authorized(chat_type, chat_id, cq_from_id): + sys_log.warning( + f'[OpenClawBot] CB rejected: chat={chat_id} type={chat_type} user={cq_from_id}' + ) answer_callback(cq_id) - return jsonify({'ok': True}) + return jsonify({'ok': False, 'error': 'forbidden'}), 403 answer_callback(cq_id) send_typing(chat_id) @@ -5226,20 +5250,21 @@ def telegram_webhook(): text_raw = (msg.get('text') or '').strip() msg_id = msg.get('message_id') + # fail-closed 統一授權檢查(覆蓋 group/supergroup/private/channel/unknown) + _uid = (msg.get('from') or {}).get('id') + if not _is_authorized(chat_type, chat_id, _uid): + sys_log.warning( + f'[OpenClawBot] MSG rejected: chat={chat_id} type={chat_type} user={_uid}' + ) + # 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制) + return jsonify({'ok': False, 'error': 'forbidden'}), 403 + if chat_type in ('group', 'supergroup'): - if chat_id != ALLOWED_GROUP: - return jsonify({'ok': True}) # 移除 @mention(不強制要求,但如有則移除) question = text_raw.replace(BOT_USERNAME, '').strip() - elif chat_type == 'private': - # 私訊存取控制 — 只允許白名單用戶 - _uid = msg.get('from', {}).get('id', 0) - if ALLOWED_USERS and _uid not in ALLOWED_USERS: - send_message(chat_id, "⚠️ 此 Bot 僅限授權用戶使用,請聯絡管理員。", msg_id) - return jsonify({'ok': True}) - question = text_raw else: - return jsonify({'ok': True}) + # 已通過授權的 private chat + question = text_raw # ── 圖片訊息:Gemini Vision 商品辨識 ───────────────────── if not question and msg.get('photo'):