124 lines
4.5 KiB
Python
124 lines
4.5 KiB
Python
# services/ai_orchestrator.py
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, Optional
|
|
|
|
from sqlalchemy import text
|
|
|
|
from services.hermes_analyst_service import HermesAnalystService
|
|
from services.nemoton_dispatcher_service import NemotronDispatcher
|
|
from services.action_plan_dedupe import (
|
|
active_nemotron_action_plan_exists,
|
|
is_nemotron_direct_response_plan,
|
|
)
|
|
from database.manager import get_session
|
|
from database.autoheal_models import AgentContext, ActionPlan
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AIOrchestrator:
|
|
"""
|
|
Coordination hub: handles EventRouter L1/L2, agent shared context, and closed-loop decision tracking.
|
|
Single file <= 100 lines.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.hermes = HermesAnalystService()
|
|
self.nemotron = NemotronDispatcher()
|
|
|
|
async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
|
"""
|
|
L1: semantic translation + reason analysis (provided by Hermes).
|
|
Writes to agent_context and can be used as L2 context.
|
|
"""
|
|
ctx = await self._get_context(session_id)
|
|
result = await self.hermes.handle_l1(event, ctx)
|
|
await self._save_context(session_id, "hermes", result)
|
|
return result
|
|
|
|
async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
|
"""
|
|
L2: planning + review gate.
|
|
Produces an ActionPlan awaiting approval (handled via Telegram callback).
|
|
"""
|
|
ctx = await self._get_context(session_id) # includes hermes analysis
|
|
result = await self.nemotron.handle_l2(event, ctx)
|
|
result.setdefault("session_id", session_id)
|
|
await self._save_action_plan(result)
|
|
# review gate handled by routes/bot_api_routes callback
|
|
return result
|
|
|
|
async def _get_context(self, session_id: str) -> Dict[str, Any]:
|
|
session = get_session()
|
|
try:
|
|
rows = session.execute(
|
|
text("SELECT context_key, context_val FROM agent_context WHERE session_id = :sid"),
|
|
{"sid": session_id},
|
|
).fetchall()
|
|
return {r[0]: r[1] for r in rows}
|
|
finally:
|
|
session.close()
|
|
|
|
async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None:
|
|
session = get_session()
|
|
try:
|
|
session.execute(
|
|
text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"),
|
|
{"sid": session_id, "ag": agent},
|
|
)
|
|
session.execute(
|
|
text("""
|
|
INSERT INTO agent_context
|
|
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
|
|
VALUES
|
|
(:sid, :ag, :ck, :cv, NOW(), 60)
|
|
"""),
|
|
{
|
|
"sid": session_id,
|
|
"ag": agent,
|
|
"ck": "latest",
|
|
"cv": json.dumps(payload, ensure_ascii=False),
|
|
},
|
|
)
|
|
session.commit()
|
|
except Exception as e:
|
|
session.rollback()
|
|
logger.warning(f"[AIOrchestrator] save_context failed (non-fatal): {e}")
|
|
finally:
|
|
session.close()
|
|
|
|
async def _save_action_plan(self, plan: Dict[str, Any]) -> None:
|
|
if is_nemotron_direct_response_plan(plan):
|
|
logger.info("[AIOrchestrator] skip direct_response action_plan persistence")
|
|
return
|
|
|
|
session = get_session()
|
|
try:
|
|
payload_json = json.dumps(plan, ensure_ascii=False)
|
|
if active_nemotron_action_plan_exists(session, plan, payload_json):
|
|
logger.info("[AIOrchestrator] skip duplicate nemotron action_plan")
|
|
return
|
|
session.execute(
|
|
text("""
|
|
INSERT INTO action_plans
|
|
(session_id, plan_type, sku, payload, status, created_by)
|
|
VALUES
|
|
(:sid, :pt, :sku, :pl, :status, 'nemotron')
|
|
"""),
|
|
{
|
|
"sid": plan.get("session_id"),
|
|
"pt": plan.get("plan_type"),
|
|
"sku": plan.get("sku"),
|
|
"pl": payload_json,
|
|
"status": "auto_pending" if plan.get("auto_execute") else "pending",
|
|
},
|
|
)
|
|
session.commit()
|
|
except Exception as e:
|
|
session.rollback()
|
|
logger.warning(f"[AIOrchestrator] save_action_plan failed (non-fatal): {e}")
|
|
finally:
|
|
session.close()
|