#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Elephant Alpha AI Agent Super Orchestrator Elephant Alpha (100B, 256K context) - AI Agent 3.0 autonomous decision-making - Deep strategic reasoning across all AI agents - Long-term memory integration - Autonomous workflow orchestration - Self-learning and adaptation Position: Super Orchestrator above Hermes/NemoTron/OpenClaw """ import os import json import asyncio from json import JSONDecodeError from datetime import datetime, timedelta from typing import Dict, List, Any, Optional from dataclasses import dataclass from services.logger_manager import SystemLogger from database.manager import get_session from sqlalchemy import text from services.ai_provider import ai_provider_service from services.elephant_service import elephant_service # Elephant Alpha Configuration removed - now handled by elephant_service logger = SystemLogger("ElephantAlphaOrchestrator").get_logger() @dataclass class AgentCapability: """AI Agent capability definition""" name: str model: str strengths: List[str] limitations: List[str] cost_per_token: float max_context: int @dataclass class StrategicDecision: """High-level strategic decision from Elephant Alpha""" priority: str # critical/high/medium/low agents_required: List[str] reasoning: str expected_outcome: str confidence: float execution_plan: List[Dict[str, Any]] resource_requirements: Dict[str, Any] class ElephantAlphaOrchestrator: """ Elephant Alpha Super Orchestrator Capabilities: - Cross-agent coordination and optimization - Strategic long-term planning - Autonomous decision making - Resource allocation optimization - Self-learning integration """ def __init__(self): self.ai_provider = ai_provider_service self.elephant = elephant_service # Agent Registry self.agents = { "hermes": AgentCapability( name="Hermes Analyst", model="hermes3:latest", strengths=["price_competition_analysis", "threat_detection", "market_intelligence"], limitations=["context_window", "real_time_data"], cost_per_token=0.0, max_context=8000 ), "nemotron": AgentCapability( name="NemoTron Dispatcher", model="meta/llama-3.1-8b-instruct", strengths=["tool_calling", "action_dispatch", "decision_making"], limitations=["strategic_depth", "long_term_planning"], cost_per_token=0.0, max_context=128000 ), "openclaw": AgentCapability( name="OpenClaw Strategist", # Ollama-first: OpenClaw strategy/reporting must use the approved # GCP-A → GCP-B → 111 cascade; Gemini is emergency fallback only. model="qwen2.5-coder:7b", strengths=["strategic_planning", "market_analysis", "insight_generation"], limitations=["real_time_execution", "direct_actions"], cost_per_token=0.0, max_context=32000 ) } # System context for Elephant Alpha self.system_prompt = self._build_system_prompt() def _build_system_prompt(self) -> str: """Build comprehensive system prompt for Elephant Alpha""" return f"""You are Elephant Alpha, the Super Orchestrator for momo-pro-system e-commerce AI platform. 重要語言規定: 1. 所有文字欄位(strategic_assessment、reasoning、expected_outcome、execution_plan 的 description、risk_factors、contingency_plans)必須使用繁體中文(台灣用語)撰寫。 2. 【禁止翻譯 Agent 名稱】Hermes、NemoTron、OpenClaw、Elephant Alpha 是專有名詞,必須保留英文原名,嚴禁音譯(禁止:赫瑪斯、內莫特朗、開爪等)。 3. reasoning 欄位必須包含具體數字(如:競品價差 X%、SKU 數量 N 個、業績跌幅 X%),嚴禁使用空泛企業用語(如「提升轉化率」、「擴大利潤邊際」等無數據支撐的說法)。 4. expected_outcome 必須說明具體預期指標(如:預計 48h 內恢復 N 個 SKU 競爭力、降低平均價差至 X%)。 CURRENT ARCHITECTURE: - You coordinate 3 specialized AI agents: Hermes (Analyst), NemoTron (Dispatcher), OpenClaw (Strategist) - Your context window: 256,000 tokens - enables deep strategic reasoning - Your role: Autonomous decision-making and agent orchestration AGENT CAPABILITIES: 1. HERMES (hermes3:latest) - Strengths: Price competition analysis, threat detection, market intelligence - Limitations: Limited context window, no real-time data access - Best for: Analyzing large datasets, identifying patterns, threat assessment 2. NEMOTRON (meta/llama-3.1-8b-instruct) - Strengths: Tool calling, action dispatch, immediate decision making - Limitations: Limited strategic depth, short-term focus - Best for: Executing actions, immediate responses, tool-based operations 3. OPENCLAW (qwen2.5-coder:7b / qwen3:14b via Ollama cascade) - Strengths: Strategic planning, market analysis, insight generation - Limitations: No direct execution capabilities, analysis-only - Best for: Long-term strategy, market insights, recommendation generation Gemini policy: - Gemini is not a primary agent model. - Gemini may only be used after the approved Ollama cascade fails and the operator explicitly unlocks the emergency fallback guard. YOUR SUPERVISORY CAPABILITIES: - Cross-agent coordination and optimization - Strategic long-term planning (weeks/months ahead) - Autonomous decision making with confidence scoring - Resource allocation optimization - Self-learning integration from past outcomes - Conflict resolution between agent recommendations - Priority-based task scheduling - Risk assessment and mitigation planning DECISION FRAMEWORK: 1. Analyze the business context and objectives 2. Evaluate which agents are best suited for the task 3. Determine optimal sequencing and parallelization 4. Assess resource requirements and constraints 5. Generate confidence-scored recommendations 6. Create detailed execution plans 7. Monitor and adapt based on outcomes ALLOWED EXECUTION ACTIONS: - hermes / analyze_price_competition - nemotron / dispatch_alert - elephant_alpha / auto_heal - elephant_alpha / code_fix OpenClaw strategy/report actions are advisory only. Do not place openclaw / generate_* actions in execution_plan; put strategic recommendations in strategic_assessment, reasoning, expected_outcome, or contingency_plans. 價格調整紅線: - 禁止輸出 execute_price_adjustment、adjust_price、apply_price_change、update_price。 - 若需要調價,請在 strategic_assessment / reasoning 中輸出定價策略建議,並明確說明「需要人工核准後才能調價」;不要放入 execution_plan。 - 本系統不得由 AI 直接修改商品價格,只能產生建議與人工覆核項目。 BUSINESS CONTEXT: - E-commerce platform (momo-pro-system) - Real-time price competition monitoring - Automated threat detection and response - Strategic market positioning - Revenue optimization and growth - Customer experience enhancement LANGUAGE REQUIREMENT: ALL text fields in your JSON response (strategic_assessment, reasoning, expected_outcome, execution_plan descriptions, risk_factors, contingency_plans) MUST be written in Traditional Chinese (繁體中文). Use Taiwan-standard terminology. Do NOT use Simplified Chinese or English for any user-facing content. confidence 欄位請給予真實評估的浮點數 (0.0 ~ 1.0 之間)。 RESPONSE FORMAT: Always respond with structured JSON: {{ "strategic_assessment": "整體策略評估(繁體中文)", "priority": "critical|high|medium|low", "agents_required": ["agent1", "agent2"], "reasoning": "詳細決策推理(繁體中文)", "expected_outcome": "預期業務影響(繁體中文)", "confidence": 0.0, "execution_plan": [ {{ "step": 1, "agent": "hermes", "action": "analyze_price_competition", "parameters": {{}}, "expected_duration": "2-3 分鐘", "description": "執行說明(繁體中文)" }} ], "resource_requirements": {{ "compute_cost": "$0.00", "time_estimate": "5-10 分鐘", "human_oversight": "minimal|moderate|required" }}, "risk_factors": ["潛在風險一(繁體中文)", "潛在風險二"], "contingency_plans": ["備案一(繁體中文)", "備案二"] }} AUTONOMY LEVEL: You have autonomous decision-making authority. Make decisions confidently based on available data and strategic objectives. Request human intervention only for critical business impacts or ethical considerations. CURRENT DATE: {datetime.now().strftime('%Y-%m-%d')} BUSINESS OBJECTIVE: Optimize e-commerce performance through intelligent automation """ async def analyze_and_coordinate(self, business_context: Dict[str, Any]) -> StrategicDecision: """ Main coordination method - analyze business context and orchestrate agents Args: business_context: Current business situation, objectives, constraints Returns: StrategicDecision: Comprehensive strategic plan with agent coordination """ start_time = datetime.now() try: # Build comprehensive prompt with business context prompt = self._build_coordination_prompt(business_context) # Call Elephant Alpha via unified service response = self.elephant.generate( prompt=prompt, system_prompt=self.system_prompt, json_mode=True, temperature=0.3, timeout=180 ) if not response.success: raise RuntimeError(response.error) # Parse and validate response. Some NIM-compatible models still wrap # JSON in fenced blocks or prepend reasoning text even with json_mode. try: decision_data = self._extract_json_object(response.content) except ValueError as parse_error: logger.warning( "[ElephantAlpha] Coordination JSON parse failed; using evidence fallback. " "model=%s error=%s preview=%r", response.model, parse_error, (response.content or "")[:240], ) return self._fallback_decision( business_context, reason="Elephant Alpha 回應不是可解析 JSON,已改用實證 fallback。", ) decision = self._parse_strategic_decision(decision_data) # Log decision for learning await self._log_decision(decision, business_context, start_time) return decision except Exception as e: logger.error(f"[ElephantAlpha] Coordination failed: {e}") # Fallback to conservative decision return self._fallback_decision( business_context, reason=f"Elephant Alpha 協調失敗:{type(e).__name__}", ) @staticmethod def _extract_json_object(raw: str) -> Dict[str, Any]: """Extract one JSON object from tolerant LLM output.""" text_value = (raw or "").strip() if not text_value: raise ValueError("empty response") if text_value.startswith("```"): lines = text_value.splitlines() if lines and lines[0].strip().startswith("```"): lines = lines[1:] if lines and lines[-1].strip().startswith("```"): lines = lines[:-1] text_value = "\n".join(lines).strip() try: parsed = json.loads(text_value) if isinstance(parsed, dict): return parsed raise ValueError("JSON root is not an object") except JSONDecodeError: pass decoder = json.JSONDecoder() for idx, char in enumerate(text_value): if char != "{": continue try: parsed, _end = decoder.raw_decode(text_value[idx:]) except JSONDecodeError: continue if isinstance(parsed, dict): return parsed raise ValueError("no JSON object found") def _build_coordination_prompt(self, context: Dict[str, Any]) -> str: """Build detailed coordination prompt for Elephant Alpha""" # Get recent performance data for context recent_performance = self._get_recent_performance_metrics() # Get current agent statuses agent_status = self._get_agent_status() prompt = f""" BUSINESS CONTEXT: {json.dumps(context, indent=2)} RECENT PERFORMANCE METRICS: {json.dumps(recent_performance, indent=2)} CURRENT AGENT STATUS: {json.dumps(agent_status, indent=2)} CURRENT SITUATION: - Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - System Load: {self._get_system_load()} - Pending Actions: {self._get_pending_actions_count()} REQUIRED DECISION: Based on the current business context and system state, determine the optimal strategy and agent coordination. Consider: 1. Immediate priorities vs long-term objectives 2. Resource constraints and optimization opportunities 3. Risk factors and mitigation strategies 4. Agent capabilities and current workload 5. Historical performance and learning outcomes Provide your strategic decision in the specified JSON format. 重要:所有 JSON 文字欄位必須使用繁體中文(台灣用語)。Hermes、NemoTron、OpenClaw、Elephant Alpha 保留英文原名勿翻譯。reasoning 必須引用上方數據中的具體數字。 """ return prompt # _call_elephant_alpha removed - logic moved to elephant_service def _parse_strategic_decision(self, decision_data: Dict[str, Any]) -> StrategicDecision: """Parse Elephant Alpha response into StrategicDecision object""" conf = float(decision_data.get("confidence", 0.5)) if conf > 1.0: conf = 1.0 if conf < 0.0: conf = 0.0 return StrategicDecision( priority=decision_data.get("priority", "medium"), agents_required=decision_data.get("agents_required", []), reasoning=decision_data.get("reasoning", ""), expected_outcome=decision_data.get("expected_outcome", ""), confidence=conf, execution_plan=decision_data.get("execution_plan", []), resource_requirements=decision_data.get("resource_requirements", {}) ) def _get_recent_performance_metrics(self) -> Dict[str, Any]: """Get recent performance metrics for context""" session = get_session() try: # Get recent AI insights performance rows = session.execute(text(""" SELECT insight_type, COUNT(*) as count, AVG(confidence) as avg_confidence, MAX(created_at) as last_activity FROM ai_insights WHERE created_at >= NOW() - INTERVAL '7 days' GROUP BY insight_type """)).fetchall() metrics = {} for row in rows: metrics[row.insight_type] = { "count": row.count, "avg_confidence": float(row.avg_confidence) if row.avg_confidence else 0.0, "last_activity": row.last_activity.isoformat() if row.last_activity else None } return metrics finally: session.close() def _get_agent_status(self) -> Dict[str, Any]: """Get current status of all agents""" return { "hermes": { "status": "active", "last_analysis": self._get_last_analysis_time("hermes"), "queue_size": 0 }, "nemotron": { "status": "active", "last_dispatch": self._get_last_analysis_time("nemotron"), "queue_size": 0 }, "openclaw": { "status": "active", "last_strategy": self._get_last_analysis_time("openclaw"), "queue_size": 0 } } def _get_last_analysis_time(self, agent: str) -> str: """Get last activity time for agent""" session = get_session() try: row = session.execute(text(""" SELECT MAX(created_at) as last_time FROM ai_insights WHERE created_by = :agent """), {"agent": agent}).fetchone() return row.last_time.isoformat() if row and row.last_time else None finally: session.close() def _get_system_load(self) -> str: """Get current system load""" return "normal" # Placeholder - could integrate with monitoring def _get_pending_actions_count(self) -> int: """Get count of pending actions""" session = get_session() try: row = session.execute(text(""" SELECT COUNT(*) as count FROM action_plans WHERE status = 'pending' """)).fetchone() return row.count if row else 0 finally: session.close() async def _log_decision(self, decision: StrategicDecision, context: Dict[str, Any], start_time: datetime): """Log decision for learning and audit""" session = get_session() try: # B4b FIX: metadata → metadata_json; confidence/created_by added by migration 015 row = session.execute(text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json) VALUES (:type, :content, :confidence, :created_by, :status, :metadata) RETURNING id """), { "type": "elephant_alpha_decision", "content": json.dumps({ "decision": decision.__dict__, "context": context, "duration_seconds": (datetime.now() - start_time).total_seconds() }), "confidence": decision.confidence, "created_by": "elephant_alpha", "status": "executed", "metadata": json.dumps({"agents_required": decision.agents_required}) }).fetchone() session.commit() if row: try: from services.openclaw_learning_service import enqueue_insight_embedding enqueue_insight_embedding(row[0], "elephant_alpha_decision", json.dumps({ "decision": decision.__dict__, "context": context, }, ensure_ascii=False)) except Exception as embed_err: logger.warning("[ElephantAlpha] embedding queue enqueue failed: %s", embed_err) finally: session.close() @staticmethod def _context_concrete_actions(context: Dict[str, Any]) -> List[str]: conditions = context.get("conditions") if isinstance(context, dict) else {} if not isinstance(conditions, dict): return [] for key in ("_prefetched_hermes_threats", "_db_evidence_actions"): actions = conditions.get(key) if isinstance(actions, list): cleaned = [str(action).strip() for action in actions if str(action).strip()] if cleaned: return cleaned[:5] return [] def _fallback_decision(self, context: Dict[str, Any], *, reason: str = "Elephant Alpha unavailable") -> StrategicDecision: """Fallback decision if Elephant Alpha fails""" concrete_actions = self._context_concrete_actions(context) trigger_type = str((context or {}).get("trigger_type") or "unknown") if concrete_actions: return StrategicDecision( priority="high", agents_required=["hermes", "elephant_alpha"], reasoning=( f"{reason} 已保留 {len(concrete_actions)} 筆 DB/Hermes 價格比對實證;" "僅送人工覆核,不執行自動調價。" ), expected_outcome="產生可稽核的人工覆核告警,避免使用無法解析的 LLM 推論文字。", confidence=0.74, execution_plan=[], resource_requirements={ "compute_cost": "$0.00", "time_estimate": "人工覆核", "human_oversight": "required", }, ) return StrategicDecision( priority="medium", agents_required=["elephant_alpha"], reasoning=( f"{reason} trigger={trigger_type},且沒有可稽核的 DB/Hermes 實證;" "不產生策略型行動計畫,避免把推測當成事實。" ), expected_outcome="不執行自動動作;需要先確認資料來源或等待下一輪具體實證。", confidence=0.6, execution_plan=[], resource_requirements={ "compute_cost": "$0.00", "time_estimate": "等待實證", "human_oversight": "required", }, ) # Singleton instance elephant_orchestrator = ElephantAlphaOrchestrator()