Files
ewoooc/services/openclaw_session.py
OoO 14195b65fd
All checks were successful
CD Pipeline / deploy (push) Successful in 2m51s
feat(telegram): ADR-019 Phase 4 - conversation memory + chat_id propagation
ADR-019 Phase 4:新增 services/openclaw_session.py 管理 chat_id 級別的多輪
對話歷史與 carry-over slot。In-memory,30 分鐘 TTL,重啟清空(臨時對話 state
不該污染 ai_insights 永久記憶)。

openclaw_answer 簽章加 chat_id=None 可選參數:
- 傳入時 agent 會看到該 chat 最近 5 輪對話歷史,注入 system prompt
- Ollama / Gemini FC 兩條路徑都會在生成成功後 append_turn 寫回 session
- system prompt 加決策規則:「若歷史顯示用戶剛被問參數,優先用該答案接續執行」

Caller 全部更新傳 chat_id:
- routes/openclaw_bot_routes.py:5479 (handle_cmd 不認識指令 fallback)
- routes/openclaw_bot_routes.py:5916 (webhook NL 路徑)
- routes/openclaw_bot_routes.py:_agent_dispatch_cmd (Phase 3 hook)
- services/telegram_bot_service.py:934 (polling NL fallback)

向下相容:chat_id=None 時行為與舊版完全相同(無 multi-turn 記憶)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 13:04:18 +08:00

134 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ADR-019 Phase 4OpenClaw 對話 state 管理in-memory
提供 chat_id 級別的多輪對話歷史 + carry-over slot
- history最近 N 輪 (user, bot) pair給 agent 注入 prompt 用
- slots跨訊息 carry-over如「剛問月份 → 用戶答 4 → 接續執行」)
設計取捨:
- in-memory不寫 DBTTL 30 分鐘清掃,重啟後清空 — 對話 state 為臨時資料,
不該污染 ai_insights要長期記憶請走 store_conversation 既有路徑。
- 單進程:本檔不處理跨 worker 一致性。多 worker 部署時 session 可能落在不同
worker但 OpenClaw bot 容器目前單 processpython-telegram-bot polling
尚無此問題。若未來 webhook 要多 worker再考慮搬 Redis。
"""
from __future__ import annotations
from collections import deque
from datetime import datetime, timedelta
from threading import Lock
from typing import Any, Optional
_SESSION_TTL_MIN = 30
_HISTORY_MAX = 5
_sessions: dict[int, dict] = {} # chat_id → {history, slots, updated_at}
_lock = Lock()
def _cleanup_expired_locked(now: datetime) -> None:
"""假設 caller 已持鎖。移除超過 TTL 的 session。"""
cutoff = now - timedelta(minutes=_SESSION_TTL_MIN)
expired = [cid for cid, s in _sessions.items()
if s.get('updated_at', cutoff) < cutoff]
for cid in expired:
_sessions.pop(cid, None)
def _get_or_create_locked(chat_id: int, now: datetime) -> dict:
"""假設 caller 已持鎖。"""
if chat_id not in _sessions:
_sessions[chat_id] = {
'history': deque(maxlen=_HISTORY_MAX),
'slots': {},
'updated_at': now,
}
s = _sessions[chat_id]
s['updated_at'] = now
return s
def append_turn(chat_id: int, user_msg: str, bot_reply: str) -> None:
"""記錄一輪對話。content 會被截斷到 500 字以避免 prompt 爆量。"""
if not chat_id:
return
with _lock:
now = datetime.now()
s = _get_or_create_locked(chat_id, now)
s['history'].append({
'user': (user_msg or '')[:500],
'bot': (bot_reply or '')[:500],
'ts': now.isoformat(timespec='seconds'),
})
_cleanup_expired_locked(now)
def set_slot(chat_id: int, key: str, value: Any) -> None:
"""設定 carry-over slot。"""
if not chat_id:
return
with _lock:
now = datetime.now()
s = _get_or_create_locked(chat_id, now)
s['slots'][key] = value
def get_slot(chat_id: int, key: str, default: Any = None) -> Any:
if not chat_id:
return default
with _lock:
s = _sessions.get(chat_id)
if not s:
return default
return s['slots'].get(key, default)
def pop_slot(chat_id: int, key: str, default: Any = None) -> Any:
if not chat_id:
return default
with _lock:
s = _sessions.get(chat_id)
if not s:
return default
return s['slots'].pop(key, default)
def history_as_prompt(chat_id: int, max_chars: int = 800) -> str:
"""組合歷史對話成 system prompt 用的 context無歷史回空字串。"""
if not chat_id:
return ""
with _lock:
s = _sessions.get(chat_id)
if not s or not s['history']:
return ""
lines = []
for turn in s['history']:
lines.append(f"[{turn['ts']}]\n用戶:{turn['user']}\n小O{turn['bot']}")
text = "\n---\n".join(lines)
if len(text) > max_chars:
text = "...(較舊內容已截斷)\n" + text[-max_chars:]
return text
def session_stats() -> dict:
"""測試/觀察用:回傳目前 session 數量、各 chat 歷史長度。"""
with _lock:
return {
'active_sessions': len(_sessions),
'detail': {
cid: {'history_len': len(s['history']),
'slot_keys': list(s['slots'].keys()),
'updated_at': s['updated_at'].isoformat(timespec='seconds')}
for cid, s in _sessions.items()
}
}
def reset_session(chat_id: int) -> None:
"""測試/手動清除 session。"""
if not chat_id:
return
with _lock:
_sessions.pop(chat_id, None)