Files
ewoooc/services/ai_orchestrator.py
OoO 5e2186a808
All checks were successful
CD Pipeline / deploy (push) Successful in 1m0s
防止 action_plans 重複回長
2026-05-19 21:19:41 +08:00

124 lines
4.5 KiB
Python

# services/ai_orchestrator.py
import asyncio
import json
import logging
from typing import Any, Dict, Optional
from sqlalchemy import text
from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher
from services.action_plan_dedupe import (
active_nemotron_action_plan_exists,
is_nemotron_direct_response_plan,
)
from database.manager import get_session
from database.autoheal_models import AgentContext, ActionPlan
logger = logging.getLogger(__name__)
class AIOrchestrator:
"""
Coordination hub: handles EventRouter L1/L2, agent shared context, and closed-loop decision tracking.
Single file <= 100 lines.
"""
def __init__(self):
self.hermes = HermesAnalystService()
self.nemotron = NemotronDispatcher()
async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L1: semantic translation + reason analysis (provided by Hermes).
Writes to agent_context and can be used as L2 context.
"""
ctx = await self._get_context(session_id)
result = await self.hermes.handle_l1(event, ctx)
await self._save_context(session_id, "hermes", result)
return result
async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L2: planning + review gate.
Produces an ActionPlan awaiting approval (handled via Telegram callback).
"""
ctx = await self._get_context(session_id) # includes hermes analysis
result = await self.nemotron.handle_l2(event, ctx)
result.setdefault("session_id", session_id)
await self._save_action_plan(result)
# review gate handled by routes/bot_api_routes callback
return result
async def _get_context(self, session_id: str) -> Dict[str, Any]:
session = get_session()
try:
rows = session.execute(
text("SELECT context_key, context_val FROM agent_context WHERE session_id = :sid"),
{"sid": session_id},
).fetchall()
return {r[0]: r[1] for r in rows}
finally:
session.close()
async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None:
session = get_session()
try:
session.execute(
text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"),
{"sid": session_id, "ag": agent},
)
session.execute(
text("""
INSERT INTO agent_context
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
VALUES
(:sid, :ag, :ck, :cv, NOW(), 60)
"""),
{
"sid": session_id,
"ag": agent,
"ck": "latest",
"cv": json.dumps(payload, ensure_ascii=False),
},
)
session.commit()
except Exception as e:
session.rollback()
logger.warning(f"[AIOrchestrator] save_context failed (non-fatal): {e}")
finally:
session.close()
async def _save_action_plan(self, plan: Dict[str, Any]) -> None:
if is_nemotron_direct_response_plan(plan):
logger.info("[AIOrchestrator] skip direct_response action_plan persistence")
return
session = get_session()
try:
payload_json = json.dumps(plan, ensure_ascii=False)
if active_nemotron_action_plan_exists(session, plan, payload_json):
logger.info("[AIOrchestrator] skip duplicate nemotron action_plan")
return
session.execute(
text("""
INSERT INTO action_plans
(session_id, plan_type, sku, payload, status, created_by)
VALUES
(:sid, :pt, :sku, :pl, :status, 'nemotron')
"""),
{
"sid": plan.get("session_id"),
"pt": plan.get("plan_type"),
"sku": plan.get("sku"),
"pl": payload_json,
"status": "auto_pending" if plan.get("auto_execute") else "pending",
},
)
session.commit()
except Exception as e:
session.rollback()
logger.warning(f"[AIOrchestrator] save_action_plan failed (non-fatal): {e}")
finally:
session.close()