fix(ppt): admin guard for destructive /cache commands (critic Medium-3)
All checks were successful
CD Pipeline / deploy (push) Successful in 2m14s

Critic Medium-3:群組內任意成員可執行破壞性快取指令的問題。

新增機制:
- ADMIN_USER_IDS:新增 OPENCLAW_ADMIN_USER_IDS 環境變數
  逗號分隔的 user_id;未設時退回 ALLOWED_USERS(向後兼容)
- _is_admin(user_id):fail-closed 判定函式
- _CURRENT_USER_ID_CTX:ContextVar 在 webhook 入口(msg + callback)
  set 當前 user_id,避免改 handle_cmd 30+ 處呼叫端簽名

權限模型:
| 指令                              | 權限    | 行為                |
| /cache status                     | 已授權  | 任何已授權用戶可看  |
| /cache cleanup [days]             | 已授權  | 預設乾跑可預覽      |
| /cache flush <type>               | admin   | 拒絕非 admin        |
| /cache cleanup [days] confirm     | admin   | 拒絕非 admin        |
| /cache cleanup [days<1] confirm   | -       | 強制乾跑(防呆)    |

非 admin 嘗試破壞性指令時,回傳清楚錯誤訊息引導設定環境變數。
admin 操作會額外寫 sys_log.warning 留軌跡(含 user_id)。

煙霧測試:
- syntax OK
- _is_admin(None) / _is_admin("abc") / _is_admin(unknown_id) 皆 False
- ContextVar set/get 行為正確

剩餘 Medium 1/2 + Info 類後續再處理(非緊急)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-02 17:35:48 +08:00
parent 3b0b4b3d42
commit 52c06f6861

View File

@@ -167,6 +167,36 @@ _RATE_WINDOW_SEC = 60
_CALLBACK_SEND_LOCK = threading.Lock()
_CMD_FROM_CALLBACK_CTX = ContextVar('openclaw_cmd_from_callback', default=False)
# critic Medium-3當前請求的 user_id ContextVarwebhook 入口設定handle_cmd 讀取)
# 用 ContextVar 而非改 handle_cmd 簽名 — 避免動到 30+ 處呼叫端。
_CURRENT_USER_ID_CTX = ContextVar('openclaw_current_user_id', default=None)
# 管理員白名單:可執行破壞性指令(/cache flush all / cleanup confirm 等)
# OPENCLAW_ADMIN_USER_IDS 未設 → 退回 ALLOWED_USERS保持向後兼容
# 建議部署時明確設定,避免群組內所有人都能動快取
_admin_users_raw = os.getenv('OPENCLAW_ADMIN_USER_IDS', '')
ADMIN_USER_IDS: set = (
{int(uid.strip()) for uid in _admin_users_raw.split(',') if uid.strip().isdigit()}
if _admin_users_raw.strip() else set()
)
def _is_admin(user_id) -> bool:
"""判定 user_id 是否為管理員。
優先用 ADMIN_USER_IDS明確設定未設時退回 ALLOWED_USERS兼容
回傳 True 才允許執行破壞性指令。
"""
try:
uid = int(user_id) if user_id is not None else None
except (TypeError, ValueError):
return False
if uid is None:
return False
if ADMIN_USER_IDS:
return uid in ADMIN_USER_IDS
# 兼容ADMIN 未設時,退回 ALLOWED_USERS私訊白名單即視為 admin
return bool(ALLOWED_USERS) and uid in ALLOWED_USERS
def _check_rate_limit(user_id: int) -> bool:
"""回傳 True = 允許False = 超過速率限制"""
now = _time_mod.time()
@@ -5412,10 +5442,21 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
send_message(chat_id, fmt_strategy(strat, target), reply_to, kb)
elif cmd in ('cache', '快取'):
# /cache flush [type] 清除指定 PPT 快取(或全部)
# /cache status 顯示目前快取數量
# /cache status 任何已授權用戶可看
# /cache flush <type> admin only — 清除指定/全部 PPT 快取
# /cache cleanup [days] [confirm] admin onlyconfirm 才實刪)— 清磁碟
sub = (arg or '').strip().lower()
# critic Medium-3破壞性操作前先取當前 user_id 並驗 admin
_curr_uid = _CURRENT_USER_ID_CTX.get()
if sub.startswith('flush'):
# admin guard
if not _is_admin(_curr_uid):
send_message(chat_id,
"⛔ `/cache flush` 限管理員執行。\n"
"請聯繫系統管理員,或設定 `OPENCLAW_ADMIN_USER_IDS` 環境變數。",
reply_to, parse_mode='Markdown')
return
parts = sub.split(maxsplit=1)
target_type = parts[1].strip() if len(parts) > 1 else None
try:
@@ -5426,7 +5467,9 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
f"類型:{target_type or '全部'}\n"
f"當前模板版本:{ver}\n"
f"影響筆數:{affected}\n"
f"下次請求將以新模板重新生成。")
f"下次請求將以新模板重新生成。\n"
f"執行者user_id={_curr_uid}")
sys_log.warning(f"[PPT cache flush] admin={_curr_uid} type={target_type or 'ALL'} affected={affected}")
except Exception as e:
msg = f"❌ 快取清除失敗:{e}"
send_message(chat_id, msg, reply_to, parse_mode=None)
@@ -5459,9 +5502,9 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
except Exception as e:
send_message(chat_id, f"❌ 查詢快取失敗:{e}", reply_to, parse_mode=None)
elif sub.startswith('cleanup'):
# /cache cleanup [days] 乾跑(預設安全)— 只統計不刪
# /cache cleanup [days] confirm 真正執行刪除
# 注意days < 1 視為高風險,無條件強制乾跑critic Medium-3 防呆)
# /cache cleanup [days] 乾跑(任何已授權用戶可預覽)
# /cache cleanup [days] confirm admin only — 真正執行刪除
# 注意days < 1 強制乾跑critic Medium-3 防呆)
parts = sub.split()
confirm = 'confirm' in parts or 'real' in parts
days = 7
@@ -5470,9 +5513,18 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
days = int(p)
# 防呆days < 1 強制乾跑
forced_dry = days < 1
# admin guard只有實刪需要 admin乾跑任何人可看
if confirm and not forced_dry and not _is_admin(_curr_uid):
send_message(chat_id,
"⛔ `/cache cleanup ... confirm` 限管理員執行。\n"
"可改用乾跑模式(不加 `confirm`)預覽影響範圍。",
reply_to, parse_mode='Markdown')
return
dry = forced_dry or (not confirm)
try:
stat = cleanup_expired_ppt_cache(days_old=days, dry_run=dry)
if not dry:
sys_log.warning(f"[PPT cleanup] admin={_curr_uid} days={days} stat={stat}")
tag = "(乾跑—未實刪)" if dry else "(已實刪)"
if forced_dry:
tag += " [days<1 強制乾跑]"
@@ -5865,6 +5917,9 @@ def telegram_webhook():
answer_callback(cq_id)
return jsonify({'ok': False, 'error': 'forbidden'})
# critic Medium-3把 user_id 放進 ContextVar 供 handle_cmd 內部 _is_admin 讀
_user_token = _CURRENT_USER_ID_CTX.set(cq_from_id)
answer_callback(cq_id)
send_typing(chat_id)
@@ -6007,6 +6062,9 @@ def telegram_webhook():
# 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制)
return jsonify({'ok': False, 'error': 'forbidden'})
# critic Medium-3把 user_id 放進 ContextVar 供 handle_cmd 內部 _is_admin 讀
_CURRENT_USER_ID_CTX.set(_uid)
if chat_type in ('group', 'supergroup'):
# 移除 @mention不強制要求但如有則移除
question = text_raw.replace(BOT_USERNAME, '').strip()