refactor: unify event routing, orchestration, and agent context handling with consistent naming and closed-loop tracking

This commit is contained in:
ogt (aider)
2026-04-19 22:21:02 +08:00
parent 055eca1cd8
commit f5faf478bb
4 changed files with 38 additions and 38 deletions

View File

@@ -4,14 +4,14 @@ from sqlalchemy.orm import relationship
from database.models import Base
from datetime import datetime, timezone
# Helper for default timestamps
# helper for default timestamps
datetime_now = lambda: datetime.now(timezone.utc)
class AgentContext(Base):
"""
共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL
索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。
Shared context table (replaces hardcoded chain), supporting multi-agent access and TTL.
Index: (session_id, agent_name, context_key) for fast cross-agent queries.
"""
__tablename__ = 'agent_context'
@@ -19,7 +19,7 @@ class AgentContext(Base):
session_id = Column(String(64), nullable=False, index=True)
agent_name = Column(String(50), nullable=False, index=True)
context_key = Column(String(100), nullable=False)
context_val = Column(Text) # JSON 字串
context_val = Column(Text) # JSON string
created_at = Column(DateTime, default=datetime_now)
ttl_minutes = Column(Integer, default=60)
@@ -31,7 +31,7 @@ class AgentContext(Base):
class ActionPlan(Base):
"""
行動計畫表NemoTron 輸出,等待審核與執行追蹤)。
Action plan table (NemoTron output, awaiting review/execution tracking).
"""
__tablename__ = 'action_plans'
@@ -39,7 +39,7 @@ class ActionPlan(Base):
session_id = Column(String(64), nullable=True)
plan_type = Column(String(50), nullable=True) # price_adjust / restock / campaign
sku = Column(String(100), nullable=True, index=True)
payload = Column(Text) # JSON 行動內容
payload = Column(Text) # JSON payload
status = Column(String(20), default='pending') # pending/approved/rejected/executed
created_by = Column(String(50)) # nemotron / openclaw
approved_by = Column(String(100), nullable=True) # Telegram user_id
@@ -54,7 +54,7 @@ class ActionPlan(Base):
class ActionOutcome(Base):
"""
行動結果追蹤(閉環學習核心)。
Action outcome tracking (closed-loop learning core).
"""
__tablename__ = 'action_outcomes'
@@ -72,8 +72,8 @@ class ActionOutcome(Base):
class AgentStrategyWeights(Base):
"""
Agent 策略權重OpenClaw 學習累積)。
索引:strategy_key 以便快速更新與查詢。
Agent strategy weights (OpenClaw learning accumulation).
Index: strategy_key for fast updates/query.
"""
__tablename__ = 'agent_strategy_weights'

View File

@@ -12,27 +12,27 @@ from services.decision_tracker import DecisionTracker as DTService
logger = logging.getLogger(__name__)
decision_tracker_service = DTService()
# 模擬 ICAIM 完成回撥:排程 follow_up
# simulate ICAIM completion callback: schedule follow_up
def on_icaim_task_complete(plan_id: int, sku: str):
"""此函數由 ICAIM 排程觸發,調用 DecisionTracker.schedule_follow_up"""
"""Triggered by ICAIM scheduler to schedule follow_up via DecisionTracker."""
asyncio.create_task(decision_tracker_service.schedule_follow_up(plan_id, sku))
# 排程設置(保持原有 schedule 邏輯)
# schedule settings (keep original schedule logic)
def run_icaim_task():
"""模擬 ICAIM 任務執行"""
logger.info("[Scheduler] [ICAIM] 執行 ICAIM 分析任務...")
# ... 執行 ICAIM 分析 ...
"""Simulate ICAIM task execution."""
logger.info("[Scheduler] [ICAIM] executing ICAIM analysis task...")
# ... execute ICAIM analysis ...
plan_id = 123
sku = "sample_sku"
# 任務完成後觸發 follow_up 排程
# after task completes, trigger follow_up schedule
on_icaim_task_complete(plan_id, sku)
logger.info("[Scheduler] [ICAIM] 任務完成,已觸發 follow_up 排程")
logger.info("[Scheduler] [ICAIM] task completed, triggered follow_up schedule")
# 保留原有排程設定
# keep original schedule configuration
schedule.every(6).hours.do(run_icaim_task)
logger.info("📅 已設定:每 6 小時執行 ICAIM 分析任務")
logger.info("📅 scheduled: ICAIM analysis task every 6 hours")
# 啟動排程循環(保持原有主循環)
# start schedule loop (keep original main loop)
if __name__ == "__main__":
logger.info("Scheduler started.")
while True:

View File

@@ -13,8 +13,8 @@ logger = logging.getLogger(__name__)
class AIOrchestrator:
"""
協調中樞:負責 EventRouter L1/L2 處理、Agent 共享上下文與閉環決策追蹤。
設計輕量,單檔不超過 100 行。
Coordination hub: handles EventRouter L1/L2, agent shared context, and closed-loop decision tracking.
Single file <= 100 lines.
"""
def __init__(self):
@@ -23,8 +23,8 @@ class AIOrchestrator:
async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L1:語意翻譯 + 原因分析(由 Hermes 提供)。
結果會寫入 agent_context,並可作為 L2 的上下文。
L1: semantic translation + reason analysis (provided by Hermes).
Writes to agent_context and can be used as L2 context.
"""
ctx = await self._get_context(session_id)
result = await self.hermes.handle_l1(event, ctx)
@@ -33,13 +33,13 @@ class AIOrchestrator:
async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L2:規劃 + 審核閘。
輸入包含 L1 分析結果(若可用),產出 ActionPlan 等待批准。
L2: planning + review gate.
Produces an ActionPlan awaiting approval (handled via Telegram callback).
"""
ctx = await self._get_context(session_id) # 包含 hermes 分析
ctx = await self._get_context(session_id) # includes hermes analysis
result = await self.nemotron.handle_l2(event, ctx)
await self._save_action_plan(result)
# 審核閘由 routes/bot_api_routes 透過 callback 處理
# review gate handled by routes/bot_api_routes callback
return result
async def _get_context(self, session_id: str) -> Dict[str, Any]:
@@ -77,7 +77,7 @@ class AIOrchestrator:
session.commit()
except Exception as e:
session.rollback()
logger.error(f"[AIOrchestrator] save_context 失敗: {e}")
logger.error(f"[AIOrchestrator] save_context failed: {e}")
raise
finally:
session.close()
@@ -102,7 +102,7 @@ class AIOrchestrator:
session.commit()
except Exception as e:
session.rollback()
logger.error(f"[AIOrchestrator] save_action_plan 失敗: {e}")
logger.error(f"[AIOrchestrator] save_action_plan failed: {e}")
raise
finally:
session.close()

View File

@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L1:語意翻譯 + 原因分析(由 AIOrchestrator 調用 Hermes)。
L1: semantic translation + reason analysis (provided by Hermes).
"""
orchestrator = AIOrchestrator()
return await orchestrator.handle_l1(event, session_id)
@@ -20,22 +20,22 @@ async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L2:規劃 + 審核閘。
產出 ActionPlan 等待批准Telegram 回調處理)。
L2: planning + review gate.
Produces an ActionPlan awaiting approval (handled via Telegram callback).
"""
orchestrator = AIOrchestrator()
return await orchestrator.handle_l2(event, session_id)
async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
"""L0:直接回傳原始事件(兼容與監控)"""
"""L0: return raw event (for compatibility/monitoring)."""
return {"status": "ok", "echo": event.get("event_type")}
async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
"""
事件路由主入口(與 routes/bot_api_routes 兼容)。
輸出格式與 dispatch_v1 保持一致,以便平滑切換。
Main event routing entry (compatible with routes/bot_api_routes).
Output format matches dispatch_v1 for smooth migration.
"""
tier = _classify(event)
session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}"
@@ -50,7 +50,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
else:
result = await _handle_l0(event)
# 保留舊版回傳格式
# preserve legacy output format
return {
"tier": tier,
"sent": 1,
@@ -59,7 +59,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
"payload": result,
}
except Exception as e:
logger.exception(f"[EventRouter] dispatch 失敗: {e}")
logger.exception(f"[EventRouter] dispatch failed: {e}")
return {
"tier": tier,
"sent": 0,