From 52c06f6861228d8e9bd78c35db851b5705eb04fa Mon Sep 17 00:00:00 2001 From: OoO Date: Sat, 2 May 2026 17:35:48 +0800 Subject: [PATCH] fix(ppt): admin guard for destructive /cache commands (critic Medium-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critic Medium-3:群組內任意成員可執行破壞性快取指令的問題。 新增機制: - ADMIN_USER_IDS:新增 OPENCLAW_ADMIN_USER_IDS 環境變數 逗號分隔的 user_id;未設時退回 ALLOWED_USERS(向後兼容) - _is_admin(user_id):fail-closed 判定函式 - _CURRENT_USER_ID_CTX:ContextVar 在 webhook 入口(msg + callback) set 當前 user_id,避免改 handle_cmd 30+ 處呼叫端簽名 權限模型: | 指令 | 權限 | 行為 | | /cache status | 已授權 | 任何已授權用戶可看 | | /cache cleanup [days] | 已授權 | 預設乾跑可預覽 | | /cache flush | admin | 拒絕非 admin | | /cache cleanup [days] confirm | admin | 拒絕非 admin | | /cache cleanup [days<1] confirm | - | 強制乾跑(防呆) | 非 admin 嘗試破壞性指令時,回傳清楚錯誤訊息引導設定環境變數。 admin 操作會額外寫 sys_log.warning 留軌跡(含 user_id)。 煙霧測試: - syntax OK - _is_admin(None) / _is_admin("abc") / _is_admin(unknown_id) 皆 False - ContextVar set/get 行為正確 剩餘 Medium 1/2 + Info 類後續再處理(非緊急)。 Co-Authored-By: Claude Opus 4.7 (1M context) --- routes/openclaw_bot_routes.py | 70 ++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/routes/openclaw_bot_routes.py b/routes/openclaw_bot_routes.py index b68dbea..3e97e9f 100644 --- a/routes/openclaw_bot_routes.py +++ b/routes/openclaw_bot_routes.py @@ -167,6 +167,36 @@ _RATE_WINDOW_SEC = 60 _CALLBACK_SEND_LOCK = threading.Lock() _CMD_FROM_CALLBACK_CTX = ContextVar('openclaw_cmd_from_callback', default=False) +# critic Medium-3:當前請求的 user_id ContextVar(webhook 入口設定,handle_cmd 讀取) +# 用 ContextVar 而非改 handle_cmd 簽名 — 避免動到 30+ 處呼叫端。 +_CURRENT_USER_ID_CTX = ContextVar('openclaw_current_user_id', default=None) + +# 管理員白名單:可執行破壞性指令(/cache flush all / cleanup confirm 等) +# OPENCLAW_ADMIN_USER_IDS 未設 → 退回 ALLOWED_USERS(保持向後兼容) +# 建議部署時明確設定,避免群組內所有人都能動快取 +_admin_users_raw = os.getenv('OPENCLAW_ADMIN_USER_IDS', '') +ADMIN_USER_IDS: set = ( + {int(uid.strip()) for uid in _admin_users_raw.split(',') if uid.strip().isdigit()} + if _admin_users_raw.strip() else set() +) + + +def _is_admin(user_id) -> bool: + """判定 user_id 是否為管理員。 + 優先用 ADMIN_USER_IDS(明確設定);未設時退回 ALLOWED_USERS(兼容)。 + 回傳 True 才允許執行破壞性指令。 + """ + try: + uid = int(user_id) if user_id is not None else None + except (TypeError, ValueError): + return False + if uid is None: + return False + if ADMIN_USER_IDS: + return uid in ADMIN_USER_IDS + # 兼容:ADMIN 未設時,退回 ALLOWED_USERS(私訊白名單即視為 admin) + return bool(ALLOWED_USERS) and uid in ALLOWED_USERS + def _check_rate_limit(user_id: int) -> bool: """回傳 True = 允許,False = 超過速率限制""" now = _time_mod.time() @@ -5412,10 +5442,21 @@ def handle_cmd(cmd, arg, chat_id, reply_to): send_message(chat_id, fmt_strategy(strat, target), reply_to, kb) elif cmd in ('cache', '快取'): - # /cache flush [type] 清除指定 PPT 快取(或全部) - # /cache status 顯示目前快取數量 + # /cache status 任何已授權用戶可看 + # /cache flush admin only — 清除指定/全部 PPT 快取 + # /cache cleanup [days] [confirm] admin only(confirm 才實刪)— 清磁碟 sub = (arg or '').strip().lower() + # critic Medium-3:破壞性操作前先取當前 user_id 並驗 admin + _curr_uid = _CURRENT_USER_ID_CTX.get() + if sub.startswith('flush'): + # admin guard + if not _is_admin(_curr_uid): + send_message(chat_id, + "⛔ `/cache flush` 限管理員執行。\n" + "請聯繫系統管理員,或設定 `OPENCLAW_ADMIN_USER_IDS` 環境變數。", + reply_to, parse_mode='Markdown') + return parts = sub.split(maxsplit=1) target_type = parts[1].strip() if len(parts) > 1 else None try: @@ -5426,7 +5467,9 @@ def handle_cmd(cmd, arg, chat_id, reply_to): f"類型:{target_type or '全部'}\n" f"當前模板版本:{ver}\n" f"影響筆數:{affected}\n" - f"下次請求將以新模板重新生成。") + f"下次請求將以新模板重新生成。\n" + f"執行者:user_id={_curr_uid}") + sys_log.warning(f"[PPT cache flush] admin={_curr_uid} type={target_type or 'ALL'} affected={affected}") except Exception as e: msg = f"❌ 快取清除失敗:{e}" send_message(chat_id, msg, reply_to, parse_mode=None) @@ -5459,9 +5502,9 @@ def handle_cmd(cmd, arg, chat_id, reply_to): except Exception as e: send_message(chat_id, f"❌ 查詢快取失敗:{e}", reply_to, parse_mode=None) elif sub.startswith('cleanup'): - # /cache cleanup [days] 乾跑(預設安全)— 只統計不刪 - # /cache cleanup [days] confirm 真正執行刪除 - # 注意:days < 1 視為高風險,無條件強制乾跑(critic Medium-3 防呆) + # /cache cleanup [days] 乾跑(任何已授權用戶可預覽) + # /cache cleanup [days] confirm admin only — 真正執行刪除 + # 注意:days < 1 強制乾跑(critic Medium-3 防呆) parts = sub.split() confirm = 'confirm' in parts or 'real' in parts days = 7 @@ -5470,9 +5513,18 @@ def handle_cmd(cmd, arg, chat_id, reply_to): days = int(p) # 防呆:days < 1 強制乾跑 forced_dry = days < 1 + # admin guard:只有實刪需要 admin(乾跑任何人可看) + if confirm and not forced_dry and not _is_admin(_curr_uid): + send_message(chat_id, + "⛔ `/cache cleanup ... confirm` 限管理員執行。\n" + "可改用乾跑模式(不加 `confirm`)預覽影響範圍。", + reply_to, parse_mode='Markdown') + return dry = forced_dry or (not confirm) try: stat = cleanup_expired_ppt_cache(days_old=days, dry_run=dry) + if not dry: + sys_log.warning(f"[PPT cleanup] admin={_curr_uid} days={days} stat={stat}") tag = "(乾跑—未實刪)" if dry else "(已實刪)" if forced_dry: tag += " [days<1 強制乾跑]" @@ -5865,6 +5917,9 @@ def telegram_webhook(): answer_callback(cq_id) return jsonify({'ok': False, 'error': 'forbidden'}) + # critic Medium-3:把 user_id 放進 ContextVar 供 handle_cmd 內部 _is_admin 讀 + _user_token = _CURRENT_USER_ID_CTX.set(cq_from_id) + answer_callback(cq_id) send_typing(chat_id) @@ -6007,6 +6062,9 @@ def telegram_webhook(): # 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制) return jsonify({'ok': False, 'error': 'forbidden'}) + # critic Medium-3:把 user_id 放進 ContextVar 供 handle_cmd 內部 _is_admin 讀 + _CURRENT_USER_ID_CTX.set(_uid) + if chat_type in ('group', 'supergroup'): # 移除 @mention(不強制要求,但如有則移除) question = text_raw.replace(BOT_USERNAME, '').strip()