feat(telegram): ADR-019 Phase 4 - conversation memory + chat_id propagation
All checks were successful
CD Pipeline / deploy (push) Successful in 2m51s

ADR-019 Phase 4:新增 services/openclaw_session.py 管理 chat_id 級別的多輪
對話歷史與 carry-over slot。In-memory,30 分鐘 TTL,重啟清空(臨時對話 state
不該污染 ai_insights 永久記憶)。

openclaw_answer 簽章加 chat_id=None 可選參數:
- 傳入時 agent 會看到該 chat 最近 5 輪對話歷史,注入 system prompt
- Ollama / Gemini FC 兩條路徑都會在生成成功後 append_turn 寫回 session
- system prompt 加決策規則:「若歷史顯示用戶剛被問參數,優先用該答案接續執行」

Caller 全部更新傳 chat_id:
- routes/openclaw_bot_routes.py:5479 (handle_cmd 不認識指令 fallback)
- routes/openclaw_bot_routes.py:5916 (webhook NL 路徑)
- routes/openclaw_bot_routes.py:_agent_dispatch_cmd (Phase 3 hook)
- services/telegram_bot_service.py:934 (polling NL fallback)

向下相容:chat_id=None 時行為與舊版完全相同(無 multi-turn 記憶)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-02 13:04:18 +08:00
parent 38f4033eb0
commit 14195b65fd
3 changed files with 159 additions and 9 deletions

View File

@@ -4303,13 +4303,20 @@ def _execute_tool(name: str, args: dict) -> dict:
return {"error": f"unknown tool: {name}"}
def openclaw_answer(question: str):
def openclaw_answer(question: str, chat_id: int = None):
"""
Function Calling 架構 — AI 自主決定查什麼、怎麼回答
不再靠 if/else 規則判斷意圖
ADR-019 Phase 4可選 chat_id 啟用對話 state。傳入後 agent 會看到該 chat 的
最近對話歷史,並把本輪 (question, answer) 寫回 session 供下輪使用。
chat_id 為 None 時行為與舊版完全相同(無 multi-turn 記憶)。
"""
from services import openclaw_session
now = datetime.now(TAIPEI_TZ)
today_str = now.strftime("%Y/%m/%d")
history_ctx = openclaw_session.history_as_prompt(chat_id) if chat_id else ""
# ── 功能說明直接導 help ───────────────────────────────────
if _is_help_question(question):
@@ -4363,13 +4370,16 @@ def openclaw_answer(question: str):
sys_prompt = (
f"你是 OpenClaw小O電商智能助理。今天{today}\n"
+ (f"【最近對話】\n{history_ctx}\n\n" if history_ctx else "")
+ (f"【業績資料】{db_ctx}\n" if db_ctx else "")
+ (f"【市場情報】{mcp_ctx[:400]}\n" if mcp_ctx else "")
+ "請用繁體中文直接回答不要開場白300字以內。"
)
resp = ollama_service.generate(question, system_prompt=sys_prompt, timeout=180)
if resp.success and resp.content:
if chat_id:
openclaw_session.append_turn(chat_id, question, resp.content)
if _LEARNING_ENABLED:
import threading as _thr
_thr.Thread(target=store_conversation,
@@ -4390,7 +4400,8 @@ def openclaw_answer(question: str):
sys_msg = (
f"你是 OpenClaw小O服務「小龍蝦」電商業務團隊的 AI 助理。\n"
f"今天是 {today_str}\n"
"你有四個工具可以使用:\n"
+ (f"\n【最近對話歷史】\n{history_ctx}\n" if history_ctx else "")
+ "你有四個工具可以使用:\n"
"1. query_sales — 查自家業績資料庫\n"
"2. get_market_intel — 取得外部市場情報(新聞/熱搜/PTT口碑/匯率/天氣/節慶)\n"
"3. get_knowledge — 查歷史分析知識庫\n"
@@ -4400,7 +4411,8 @@ def openclaw_answer(question: str):
"- 若 data_freshness_warning 顯示當月無資料,禁止編造『業績為零』,"
"改主動問用戶是否要改看上月(並附 latest_data_date\n"
"- 工具結果回來後用繁體中文自然回答,不要開場白,不要多餘客套話\n"
"- 同時呼叫多個工具時請平行送出"
"- 同時呼叫多個工具時請平行送出\n"
"- 若【最近對話歷史】顯示用戶剛被詢問某參數(如月份),優先用該答案接續執行"
)
# 第一輪:讓 Gemini 判斷需要呼叫哪些工具
@@ -4428,6 +4440,8 @@ def openclaw_answer(question: str):
if not tool_calls:
text = "".join(p.get("text", "") for p in parts if "text" in p).strip()
if text:
if chat_id:
openclaw_session.append_turn(chat_id, question, text)
if _LEARNING_ENABLED:
import threading as _thr
_thr.Thread(target=store_conversation,
@@ -4476,6 +4490,8 @@ def openclaw_answer(question: str):
if final:
sys_log.info(f"[FC] done tools={used_sources} reply={len(final)}chars")
if chat_id:
openclaw_session.append_turn(chat_id, question, final)
if _LEARNING_ENABLED:
import threading as _thr
_thr.Thread(target=store_conversation,
@@ -4568,7 +4584,7 @@ def _agent_dispatch_cmd(cmd, arg, chat_id, reply_to) -> bool:
nl_question = _CMD_TO_NL[cmd](arg)
try:
sys_log.info(f"[AgentDispatch] cmd:{cmd}:{arg or ''} → NL: {nl_question}")
txt, kb = openclaw_answer(nl_question)
txt, kb = openclaw_answer(nl_question, chat_id=chat_id)
send_message(chat_id, txt, reply_to, keyboard=kb)
return True
except Exception as e:
@@ -5460,7 +5476,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to):
else:
# 不認識的指令 → 自然語言
txt, kb = openclaw_answer(cmd + (' ' + arg if arg else ''))
txt, kb = openclaw_answer(cmd + (' ' + arg if arg else ''), chat_id=chat_id)
send_message(chat_id, txt, reply_to, kb)
@@ -5897,7 +5913,7 @@ def telegram_webhook():
if question.startswith('/') or cmd in KNOWN:
handle_cmd(cmd, arg, chat_id, msg_id)
else:
txt, kb = openclaw_answer(question)
txt, kb = openclaw_answer(question, chat_id=chat_id)
send_message(chat_id, txt, msg_id, kb)
sys_log.info(f"[OpenClawBot] → replied chat={chat_id}")

View File

@@ -0,0 +1,133 @@
"""ADR-019 Phase 4OpenClaw 對話 state 管理in-memory
提供 chat_id 級別的多輪對話歷史 + carry-over slot
- history最近 N 輪 (user, bot) pair給 agent 注入 prompt 用
- slots跨訊息 carry-over如「剛問月份 → 用戶答 4 → 接續執行」)
設計取捨:
- in-memory不寫 DBTTL 30 分鐘清掃,重啟後清空 — 對話 state 為臨時資料,
不該污染 ai_insights要長期記憶請走 store_conversation 既有路徑。
- 單進程:本檔不處理跨 worker 一致性。多 worker 部署時 session 可能落在不同
worker但 OpenClaw bot 容器目前單 processpython-telegram-bot polling
尚無此問題。若未來 webhook 要多 worker再考慮搬 Redis。
"""
from __future__ import annotations
from collections import deque
from datetime import datetime, timedelta
from threading import Lock
from typing import Any, Optional
_SESSION_TTL_MIN = 30
_HISTORY_MAX = 5
_sessions: dict[int, dict] = {} # chat_id → {history, slots, updated_at}
_lock = Lock()
def _cleanup_expired_locked(now: datetime) -> None:
"""假設 caller 已持鎖。移除超過 TTL 的 session。"""
cutoff = now - timedelta(minutes=_SESSION_TTL_MIN)
expired = [cid for cid, s in _sessions.items()
if s.get('updated_at', cutoff) < cutoff]
for cid in expired:
_sessions.pop(cid, None)
def _get_or_create_locked(chat_id: int, now: datetime) -> dict:
"""假設 caller 已持鎖。"""
if chat_id not in _sessions:
_sessions[chat_id] = {
'history': deque(maxlen=_HISTORY_MAX),
'slots': {},
'updated_at': now,
}
s = _sessions[chat_id]
s['updated_at'] = now
return s
def append_turn(chat_id: int, user_msg: str, bot_reply: str) -> None:
"""記錄一輪對話。content 會被截斷到 500 字以避免 prompt 爆量。"""
if not chat_id:
return
with _lock:
now = datetime.now()
s = _get_or_create_locked(chat_id, now)
s['history'].append({
'user': (user_msg or '')[:500],
'bot': (bot_reply or '')[:500],
'ts': now.isoformat(timespec='seconds'),
})
_cleanup_expired_locked(now)
def set_slot(chat_id: int, key: str, value: Any) -> None:
"""設定 carry-over slot。"""
if not chat_id:
return
with _lock:
now = datetime.now()
s = _get_or_create_locked(chat_id, now)
s['slots'][key] = value
def get_slot(chat_id: int, key: str, default: Any = None) -> Any:
if not chat_id:
return default
with _lock:
s = _sessions.get(chat_id)
if not s:
return default
return s['slots'].get(key, default)
def pop_slot(chat_id: int, key: str, default: Any = None) -> Any:
if not chat_id:
return default
with _lock:
s = _sessions.get(chat_id)
if not s:
return default
return s['slots'].pop(key, default)
def history_as_prompt(chat_id: int, max_chars: int = 800) -> str:
"""組合歷史對話成 system prompt 用的 context無歷史回空字串。"""
if not chat_id:
return ""
with _lock:
s = _sessions.get(chat_id)
if not s or not s['history']:
return ""
lines = []
for turn in s['history']:
lines.append(f"[{turn['ts']}]\n用戶:{turn['user']}\n小O{turn['bot']}")
text = "\n---\n".join(lines)
if len(text) > max_chars:
text = "...(較舊內容已截斷)\n" + text[-max_chars:]
return text
def session_stats() -> dict:
"""測試/觀察用:回傳目前 session 數量、各 chat 歷史長度。"""
with _lock:
return {
'active_sessions': len(_sessions),
'detail': {
cid: {'history_len': len(s['history']),
'slot_keys': list(s['slots'].keys()),
'updated_at': s['updated_at'].isoformat(timespec='seconds')}
for cid, s in _sessions.items()
}
}
def reset_session(chat_id: int) -> None:
"""測試/手動清除 session。"""
if not chat_id:
return
with _lock:
_sessions.pop(chat_id, None)

View File

@@ -931,8 +931,9 @@ class TrendTelegramBot:
await update.message.reply_chat_action(action='typing')
try:
from routes.openclaw_bot_routes import openclaw_answer
txt, kb = openclaw_answer(text)
_chat_id = getattr(update.effective_chat, 'id', None)
txt, kb = openclaw_answer(text, chat_id=_chat_id)
reply_markup = None
if kb:
inline_kb = []