# services/ai_orchestrator.py import asyncio import json import logging from typing import Any, Dict, Optional from sqlalchemy import text from services.hermes_analyst_service import HermesAnalystService from services.nemoton_dispatcher_service import NemotronDispatcher from services.action_plan_dedupe import ( active_nemotron_action_plan_exists, is_nemotron_direct_response_plan, ) from database.manager import get_session from database.autoheal_models import AgentContext, ActionPlan logger = logging.getLogger(__name__) class AIOrchestrator: """ Coordination hub: handles EventRouter L1/L2, agent shared context, and closed-loop decision tracking. Single file <= 100 lines. """ def __init__(self): self.hermes = HermesAnalystService() self.nemotron = NemotronDispatcher() async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L1: semantic translation + reason analysis (provided by Hermes). Writes to agent_context and can be used as L2 context. """ ctx = await self._get_context(session_id) result = await self.hermes.handle_l1(event, ctx) await self._save_context(session_id, "hermes", result) return result async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L2: planning + review gate. Produces an ActionPlan awaiting approval (handled via Telegram callback). """ ctx = await self._get_context(session_id) # includes hermes analysis result = await self.nemotron.handle_l2(event, ctx) result.setdefault("session_id", session_id) await self._save_action_plan(result) # review gate handled by routes/bot_api_routes callback return result async def _get_context(self, session_id: str) -> Dict[str, Any]: session = get_session() try: rows = session.execute( text("SELECT context_key, context_val FROM agent_context WHERE session_id = :sid"), {"sid": session_id}, ).fetchall() return {r[0]: r[1] for r in rows} finally: session.close() async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None: session = get_session() try: session.execute( text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"), {"sid": session_id, "ag": agent}, ) session.execute( text(""" INSERT INTO agent_context (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) VALUES (:sid, :ag, :ck, :cv, NOW(), 60) """), { "sid": session_id, "ag": agent, "ck": "latest", "cv": json.dumps(payload, ensure_ascii=False), }, ) session.commit() except Exception as e: session.rollback() logger.warning(f"[AIOrchestrator] save_context failed (non-fatal): {e}") finally: session.close() async def _save_action_plan(self, plan: Dict[str, Any]) -> None: if is_nemotron_direct_response_plan(plan): logger.info("[AIOrchestrator] skip direct_response action_plan persistence") return session = get_session() try: payload_json = json.dumps(plan, ensure_ascii=False) if active_nemotron_action_plan_exists(session, plan, payload_json): logger.info("[AIOrchestrator] skip duplicate nemotron action_plan") return session.execute( text(""" INSERT INTO action_plans (session_id, plan_type, sku, payload, status, created_by) VALUES (:sid, :pt, :sku, :pl, :status, 'nemotron') """), { "sid": plan.get("session_id"), "pt": plan.get("plan_type"), "sku": plan.get("sku"), "pl": payload_json, "status": "auto_pending" if plan.get("auto_execute") else "pending", }, ) session.commit() except Exception as e: session.rollback() logger.warning(f"[AIOrchestrator] save_action_plan failed (non-fatal): {e}") finally: session.close()