"""ADR-019 Phase 4:OpenClaw 對話 state 管理(in-memory)。 提供 chat_id 級別的多輪對話歷史 + carry-over slot: - history:最近 N 輪 (user, bot) pair,給 agent 注入 prompt 用 - slots:跨訊息 carry-over(如「剛問月份 → 用戶答 4 → 接續執行」) 設計取捨: - in-memory(不寫 DB),TTL 30 分鐘清掃,重啟後清空 — 對話 state 為臨時資料, 不該污染 ai_insights;要長期記憶請走 store_conversation 既有路徑。 - 單進程:本檔不處理跨 worker 一致性。多 worker 部署時 session 可能落在不同 worker,但 OpenClaw bot 容器目前單 process(python-telegram-bot polling), 尚無此問題。若未來 webhook 要多 worker,再考慮搬 Redis。 """ from __future__ import annotations from collections import deque from datetime import datetime, timedelta from threading import Lock from typing import Any, Optional _SESSION_TTL_MIN = 30 _HISTORY_MAX = 5 _sessions: dict[int, dict] = {} # chat_id → {history, slots, updated_at} _lock = Lock() def _cleanup_expired_locked(now: datetime) -> None: """假設 caller 已持鎖。移除超過 TTL 的 session。""" cutoff = now - timedelta(minutes=_SESSION_TTL_MIN) expired = [cid for cid, s in _sessions.items() if s.get('updated_at', cutoff) < cutoff] for cid in expired: _sessions.pop(cid, None) def _get_or_create_locked(chat_id: int, now: datetime) -> dict: """假設 caller 已持鎖。""" if chat_id not in _sessions: _sessions[chat_id] = { 'history': deque(maxlen=_HISTORY_MAX), 'slots': {}, 'updated_at': now, } s = _sessions[chat_id] s['updated_at'] = now return s def append_turn(chat_id: int, user_msg: str, bot_reply: str) -> None: """記錄一輪對話。content 會被截斷到 500 字以避免 prompt 爆量。""" if not chat_id: return with _lock: now = datetime.now() s = _get_or_create_locked(chat_id, now) s['history'].append({ 'user': (user_msg or '')[:500], 'bot': (bot_reply or '')[:500], 'ts': now.isoformat(timespec='seconds'), }) _cleanup_expired_locked(now) def set_slot(chat_id: int, key: str, value: Any) -> None: """設定 carry-over slot。""" if not chat_id: return with _lock: now = datetime.now() s = _get_or_create_locked(chat_id, now) s['slots'][key] = value def get_slot(chat_id: int, key: str, default: Any = None) -> Any: if not chat_id: return default with _lock: s = _sessions.get(chat_id) if not s: return default return s['slots'].get(key, default) def pop_slot(chat_id: int, key: str, default: Any = None) -> Any: if not chat_id: return default with _lock: s = _sessions.get(chat_id) if not s: return default return s['slots'].pop(key, default) def history_as_prompt(chat_id: int, max_chars: int = 800) -> str: """組合歷史對話成 system prompt 用的 context;無歷史回空字串。""" if not chat_id: return "" with _lock: s = _sessions.get(chat_id) if not s or not s['history']: return "" lines = [] for turn in s['history']: lines.append(f"[{turn['ts']}]\n用戶:{turn['user']}\n小O:{turn['bot']}") text = "\n---\n".join(lines) if len(text) > max_chars: text = "...(較舊內容已截斷)\n" + text[-max_chars:] return text def session_stats() -> dict: """測試/觀察用:回傳目前 session 數量、各 chat 歷史長度。""" with _lock: return { 'active_sessions': len(_sessions), 'detail': { cid: {'history_len': len(s['history']), 'slot_keys': list(s['slots'].keys()), 'updated_at': s['updated_at'].isoformat(timespec='seconds')} for cid, s in _sessions.items() } } def reset_session(chat_id: int) -> None: """測試/手動清除 session。""" if not chat_id: return with _lock: _sessions.pop(chat_id, None)