All checks were successful
CD Pipeline / deploy (push) Successful in 2m51s
ADR-019 Phase 4:新增 services/openclaw_session.py 管理 chat_id 級別的多輪 對話歷史與 carry-over slot。In-memory,30 分鐘 TTL,重啟清空(臨時對話 state 不該污染 ai_insights 永久記憶)。 openclaw_answer 簽章加 chat_id=None 可選參數: - 傳入時 agent 會看到該 chat 最近 5 輪對話歷史,注入 system prompt - Ollama / Gemini FC 兩條路徑都會在生成成功後 append_turn 寫回 session - system prompt 加決策規則:「若歷史顯示用戶剛被問參數,優先用該答案接續執行」 Caller 全部更新傳 chat_id: - routes/openclaw_bot_routes.py:5479 (handle_cmd 不認識指令 fallback) - routes/openclaw_bot_routes.py:5916 (webhook NL 路徑) - routes/openclaw_bot_routes.py:_agent_dispatch_cmd (Phase 3 hook) - services/telegram_bot_service.py:934 (polling NL fallback) 向下相容:chat_id=None 時行為與舊版完全相同(無 multi-turn 記憶)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
134 lines
4.2 KiB
Python
134 lines
4.2 KiB
Python
"""ADR-019 Phase 4:OpenClaw 對話 state 管理(in-memory)。
|
||
|
||
提供 chat_id 級別的多輪對話歷史 + carry-over slot:
|
||
- history:最近 N 輪 (user, bot) pair,給 agent 注入 prompt 用
|
||
- slots:跨訊息 carry-over(如「剛問月份 → 用戶答 4 → 接續執行」)
|
||
|
||
設計取捨:
|
||
- in-memory(不寫 DB),TTL 30 分鐘清掃,重啟後清空 — 對話 state 為臨時資料,
|
||
不該污染 ai_insights;要長期記憶請走 store_conversation 既有路徑。
|
||
- 單進程:本檔不處理跨 worker 一致性。多 worker 部署時 session 可能落在不同
|
||
worker,但 OpenClaw bot 容器目前單 process(python-telegram-bot polling),
|
||
尚無此問題。若未來 webhook 要多 worker,再考慮搬 Redis。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from collections import deque
|
||
from datetime import datetime, timedelta
|
||
from threading import Lock
|
||
from typing import Any, Optional
|
||
|
||
|
||
_SESSION_TTL_MIN = 30
|
||
_HISTORY_MAX = 5
|
||
|
||
_sessions: dict[int, dict] = {} # chat_id → {history, slots, updated_at}
|
||
_lock = Lock()
|
||
|
||
|
||
def _cleanup_expired_locked(now: datetime) -> None:
|
||
"""假設 caller 已持鎖。移除超過 TTL 的 session。"""
|
||
cutoff = now - timedelta(minutes=_SESSION_TTL_MIN)
|
||
expired = [cid for cid, s in _sessions.items()
|
||
if s.get('updated_at', cutoff) < cutoff]
|
||
for cid in expired:
|
||
_sessions.pop(cid, None)
|
||
|
||
|
||
def _get_or_create_locked(chat_id: int, now: datetime) -> dict:
|
||
"""假設 caller 已持鎖。"""
|
||
if chat_id not in _sessions:
|
||
_sessions[chat_id] = {
|
||
'history': deque(maxlen=_HISTORY_MAX),
|
||
'slots': {},
|
||
'updated_at': now,
|
||
}
|
||
s = _sessions[chat_id]
|
||
s['updated_at'] = now
|
||
return s
|
||
|
||
|
||
def append_turn(chat_id: int, user_msg: str, bot_reply: str) -> None:
|
||
"""記錄一輪對話。content 會被截斷到 500 字以避免 prompt 爆量。"""
|
||
if not chat_id:
|
||
return
|
||
with _lock:
|
||
now = datetime.now()
|
||
s = _get_or_create_locked(chat_id, now)
|
||
s['history'].append({
|
||
'user': (user_msg or '')[:500],
|
||
'bot': (bot_reply or '')[:500],
|
||
'ts': now.isoformat(timespec='seconds'),
|
||
})
|
||
_cleanup_expired_locked(now)
|
||
|
||
|
||
def set_slot(chat_id: int, key: str, value: Any) -> None:
|
||
"""設定 carry-over slot。"""
|
||
if not chat_id:
|
||
return
|
||
with _lock:
|
||
now = datetime.now()
|
||
s = _get_or_create_locked(chat_id, now)
|
||
s['slots'][key] = value
|
||
|
||
|
||
def get_slot(chat_id: int, key: str, default: Any = None) -> Any:
|
||
if not chat_id:
|
||
return default
|
||
with _lock:
|
||
s = _sessions.get(chat_id)
|
||
if not s:
|
||
return default
|
||
return s['slots'].get(key, default)
|
||
|
||
|
||
def pop_slot(chat_id: int, key: str, default: Any = None) -> Any:
|
||
if not chat_id:
|
||
return default
|
||
with _lock:
|
||
s = _sessions.get(chat_id)
|
||
if not s:
|
||
return default
|
||
return s['slots'].pop(key, default)
|
||
|
||
|
||
def history_as_prompt(chat_id: int, max_chars: int = 800) -> str:
|
||
"""組合歷史對話成 system prompt 用的 context;無歷史回空字串。"""
|
||
if not chat_id:
|
||
return ""
|
||
with _lock:
|
||
s = _sessions.get(chat_id)
|
||
if not s or not s['history']:
|
||
return ""
|
||
lines = []
|
||
for turn in s['history']:
|
||
lines.append(f"[{turn['ts']}]\n用戶:{turn['user']}\n小O:{turn['bot']}")
|
||
text = "\n---\n".join(lines)
|
||
if len(text) > max_chars:
|
||
text = "...(較舊內容已截斷)\n" + text[-max_chars:]
|
||
return text
|
||
|
||
|
||
def session_stats() -> dict:
|
||
"""測試/觀察用:回傳目前 session 數量、各 chat 歷史長度。"""
|
||
with _lock:
|
||
return {
|
||
'active_sessions': len(_sessions),
|
||
'detail': {
|
||
cid: {'history_len': len(s['history']),
|
||
'slot_keys': list(s['slots'].keys()),
|
||
'updated_at': s['updated_at'].isoformat(timespec='seconds')}
|
||
for cid, s in _sessions.items()
|
||
}
|
||
}
|
||
|
||
|
||
def reset_session(chat_id: int) -> None:
|
||
"""測試/手動清除 session。"""
|
||
if not chat_id:
|
||
return
|
||
with _lock:
|
||
_sessions.pop(chat_id, None)
|