fix(post-3.5g): restore _is_authorized fail-closed for callback + message (CRIT-2 + HIGH-3)

從 4349db2~1 撈回 _is_authorized() 並重新套用到 callback 與 message handler。

問題:
- CRIT-2 (callback fail-open):原本只擋 group/supergroup 不匹配,
  private chat 任何人都能觸發 callback 指令(按鈕 menu/await/cmd)。
- HIGH-3 (message short-circuit fail):`if ALLOWED_USERS and _uid not in ALLOWED_USERS`
  在 OPENCLAW_ALLOWED_USERS 環境變數未設時 → ALLOWED_USERS 為空 set →
  `if False and ...` 整段不執行 → 所有 private 訊息都通過。

修法(fail-closed 三檢查):
1. 在頂部 import 區下方還原 `_is_authorized(chat_type, chat_id, user_id)`:
   - group/supergroup:chat_id 必須等於 ALLOWED_GROUP
   - private:user_id 必須在 ALLOWED_USERS(空 set → 全拒)
   - channel / 未知 / 缺欄位 → 拒絕
2. callback handler 替換為 `if not _is_authorized(chat_type, chat_id, cq_from_id)`
   並從 cq.get('from') 取 user_id(之前完全沒取)。
3. message handler 替換為統一檢查,未授權回 403 + 靜默(不回 Telegram 避免偵察)。

驗證:
- AST parse OK
- 模擬測試:999999 私訊 → False;111(在白名單)私訊 → True;
  錯誤群組 → False;channel → False;None → False
- grep 結果:剩下兩處 `_is_authorized` 呼叫(callback 5195, message 5255),
  舊的 `ALLOWED_USERS and _uid not in ALLOWED_USERS` 已移除(只留註解描述歷史)。

Critic findings: CRIT-2 + HIGH-3
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-04-28 12:49:21 +08:00
parent b49b704e82
commit d276853e54

View File

@@ -93,6 +93,25 @@ ALLOWED_USERS: set = (
if _allowed_users_raw.strip() else set()
)
# ── fail-closed 統一授權檢查 ───────────────────────────────────
# 規則(任一滿足即通過,否則一律拒絕):
# 1. group/supergroup 且 chat_id == ALLOWED_GROUP
# 2. private 且 user_id ∈ ALLOWED_USERSenv 未設 → 空 set → 全拒)
# channel / 未知 chat_type / 缺欄位 → 拒絕
# 修補 C3callback handler 原本只擋 group/supergroup 不匹配private 完全放行;
# message handler `if ALLOWED_USERS and ...` 空 set 時整段失效。
def _is_authorized(chat_type: str, chat_id, user_id) -> bool:
try:
cid = int(chat_id) if chat_id is not None else None
uid = int(user_id) if user_id is not None else None
except (TypeError, ValueError):
return False
if chat_type in ('group', 'supergroup'):
return cid == ALLOWED_GROUP
if chat_type == 'private':
return uid is not None and uid in ALLOWED_USERS
return False
# ── 速率限制(每用戶每分鐘最多 30 次 AI 呼叫)──────────────────
import time as _time_mod
_rate_tracker: dict = {} # {user_id: [timestamp, ...]}
@@ -5169,11 +5188,16 @@ def telegram_webhook():
data = cq.get('data', '')
chat_id = cq['message']['chat']['id']
chat_type = cq['message']['chat'].get('type', '')
cq_from_id = (cq.get('from') or {}).get('id')
sys_log.info(f'[OpenClawBot] CB: chat={chat_id} type={chat_type} data={data} allowed={ALLOWED_GROUP}')
if chat_type in ('group', 'supergroup') and chat_id != ALLOWED_GROUP:
# fail-closed未授權一律安靜拒絕關閉 loading不回任何訊息避免偵察
if not _is_authorized(chat_type, chat_id, cq_from_id):
sys_log.warning(
f'[OpenClawBot] CB rejected: chat={chat_id} type={chat_type} user={cq_from_id}'
)
answer_callback(cq_id)
return jsonify({'ok': True})
return jsonify({'ok': False, 'error': 'forbidden'}), 403
answer_callback(cq_id)
send_typing(chat_id)
@@ -5226,20 +5250,21 @@ def telegram_webhook():
text_raw = (msg.get('text') or '').strip()
msg_id = msg.get('message_id')
# fail-closed 統一授權檢查(覆蓋 group/supergroup/private/channel/unknown
_uid = (msg.get('from') or {}).get('id')
if not _is_authorized(chat_type, chat_id, _uid):
sys_log.warning(
f'[OpenClawBot] MSG rejected: chat={chat_id} type={chat_type} user={_uid}'
)
# 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制)
return jsonify({'ok': False, 'error': 'forbidden'}), 403
if chat_type in ('group', 'supergroup'):
if chat_id != ALLOWED_GROUP:
return jsonify({'ok': True})
# 移除 @mention不強制要求但如有則移除
question = text_raw.replace(BOT_USERNAME, '').strip()
elif chat_type == 'private':
# 私訊存取控制 — 只允許白名單用戶
_uid = msg.get('from', {}).get('id', 0)
if ALLOWED_USERS and _uid not in ALLOWED_USERS:
send_message(chat_id, "⚠️ 此 Bot 僅限授權用戶使用,請聯絡管理員。", msg_id)
return jsonify({'ok': True})
question = text_raw
else:
return jsonify({'ok': True})
# 已通過授權的 private chat
question = text_raw
# ── 圖片訊息Gemini Vision 商品辨識 ─────────────────────
if not question and msg.get('photo'):