Files
ewoooc/services/elephant_alpha_orchestrator.py
OoO e3da4ffbb3
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
阻止 Gemini 成為推薦主路徑
2026-05-21 16:18:35 +08:00

454 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Elephant Alpha AI Agent Super Orchestrator
Elephant Alpha (100B, 256K context) - AI Agent 3.0 autonomous decision-making
- Deep strategic reasoning across all AI agents
- Long-term memory integration
- Autonomous workflow orchestration
- Self-learning and adaptation
Position: Super Orchestrator above Hermes/NemoTron/OpenClaw
"""
import os
import json
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from services.logger_manager import SystemLogger
from database.manager import get_session
from sqlalchemy import text
from services.ai_provider import ai_provider_service
from services.elephant_service import elephant_service
# Elephant Alpha Configuration removed - now handled by elephant_service
logger = SystemLogger("ElephantAlphaOrchestrator").get_logger()
@dataclass
class AgentCapability:
"""AI Agent capability definition"""
name: str
model: str
strengths: List[str]
limitations: List[str]
cost_per_token: float
max_context: int
@dataclass
class StrategicDecision:
"""High-level strategic decision from Elephant Alpha"""
priority: str # critical/high/medium/low
agents_required: List[str]
reasoning: str
expected_outcome: str
confidence: float
execution_plan: List[Dict[str, Any]]
resource_requirements: Dict[str, Any]
class ElephantAlphaOrchestrator:
"""
Elephant Alpha Super Orchestrator
Capabilities:
- Cross-agent coordination and optimization
- Strategic long-term planning
- Autonomous decision making
- Resource allocation optimization
- Self-learning integration
"""
def __init__(self):
self.ai_provider = ai_provider_service
self.elephant = elephant_service
# Agent Registry
self.agents = {
"hermes": AgentCapability(
name="Hermes Analyst",
model="hermes3:latest",
strengths=["price_competition_analysis", "threat_detection", "market_intelligence"],
limitations=["context_window", "real_time_data"],
cost_per_token=0.0,
max_context=8000
),
"nemotron": AgentCapability(
name="NemoTron Dispatcher",
model="meta/llama-3.1-8b-instruct",
strengths=["tool_calling", "action_dispatch", "decision_making"],
limitations=["strategic_depth", "long_term_planning"],
cost_per_token=0.0,
max_context=128000
),
"openclaw": AgentCapability(
name="OpenClaw Strategist",
# Ollama-first: OpenClaw strategy/reporting must use the approved
# GCP-A → GCP-B → 111 cascade; Gemini is emergency fallback only.
model="qwen2.5-coder:7b",
strengths=["strategic_planning", "market_analysis", "insight_generation"],
limitations=["real_time_execution", "direct_actions"],
cost_per_token=0.0,
max_context=32000
)
}
# System context for Elephant Alpha
self.system_prompt = self._build_system_prompt()
def _build_system_prompt(self) -> str:
"""Build comprehensive system prompt for Elephant Alpha"""
return f"""You are Elephant Alpha, the Super Orchestrator for momo-pro-system e-commerce AI platform.
重要語言規定:
1. 所有文字欄位strategic_assessment、reasoning、expected_outcome、execution_plan 的 description、risk_factors、contingency_plans必須使用繁體中文台灣用語撰寫。
2. 【禁止翻譯 Agent 名稱】Hermes、NemoTron、OpenClaw、Elephant Alpha 是專有名詞,必須保留英文原名,嚴禁音譯(禁止:赫瑪斯、內莫特朗、開爪等)。
3. reasoning 欄位必須包含具體數字(如:競品價差 X%、SKU 數量 N 個、業績跌幅 X%),嚴禁使用空泛企業用語(如「提升轉化率」、「擴大利潤邊際」等無數據支撐的說法)。
4. expected_outcome 必須說明具體預期指標(如:預計 48h 內恢復 N 個 SKU 競爭力、降低平均價差至 X%)。
CURRENT ARCHITECTURE:
- You coordinate 3 specialized AI agents: Hermes (Analyst), NemoTron (Dispatcher), OpenClaw (Strategist)
- Your context window: 256,000 tokens - enables deep strategic reasoning
- Your role: Autonomous decision-making and agent orchestration
AGENT CAPABILITIES:
1. HERMES (hermes3:latest)
- Strengths: Price competition analysis, threat detection, market intelligence
- Limitations: Limited context window, no real-time data access
- Best for: Analyzing large datasets, identifying patterns, threat assessment
2. NEMOTRON (meta/llama-3.1-8b-instruct)
- Strengths: Tool calling, action dispatch, immediate decision making
- Limitations: Limited strategic depth, short-term focus
- Best for: Executing actions, immediate responses, tool-based operations
3. OPENCLAW (qwen2.5-coder:7b / qwen3:14b via Ollama cascade)
- Strengths: Strategic planning, market analysis, insight generation
- Limitations: No direct execution capabilities, analysis-only
- Best for: Long-term strategy, market insights, recommendation generation
Gemini policy:
- Gemini is not a primary agent model.
- Gemini may only be used after the approved Ollama cascade fails and the
operator explicitly unlocks the emergency fallback guard.
YOUR SUPERVISORY CAPABILITIES:
- Cross-agent coordination and optimization
- Strategic long-term planning (weeks/months ahead)
- Autonomous decision making with confidence scoring
- Resource allocation optimization
- Self-learning integration from past outcomes
- Conflict resolution between agent recommendations
- Priority-based task scheduling
- Risk assessment and mitigation planning
DECISION FRAMEWORK:
1. Analyze the business context and objectives
2. Evaluate which agents are best suited for the task
3. Determine optimal sequencing and parallelization
4. Assess resource requirements and constraints
5. Generate confidence-scored recommendations
6. Create detailed execution plans
7. Monitor and adapt based on outcomes
ALLOWED EXECUTION ACTIONS:
- hermes / analyze_price_competition
- nemotron / dispatch_alert
- elephant_alpha / auto_heal
- elephant_alpha / code_fix
OpenClaw strategy/report actions are advisory only. Do not place openclaw / generate_* actions in execution_plan; put strategic recommendations in strategic_assessment, reasoning, expected_outcome, or contingency_plans.
價格調整紅線:
- 禁止輸出 execute_price_adjustment、adjust_price、apply_price_change、update_price。
- 若需要調價,請在 strategic_assessment / reasoning 中輸出定價策略建議,並明確說明「需要人工核准後才能調價」;不要放入 execution_plan。
- 本系統不得由 AI 直接修改商品價格,只能產生建議與人工覆核項目。
BUSINESS CONTEXT:
- E-commerce platform (momo-pro-system)
- Real-time price competition monitoring
- Automated threat detection and response
- Strategic market positioning
- Revenue optimization and growth
- Customer experience enhancement
LANGUAGE REQUIREMENT:
ALL text fields in your JSON response (strategic_assessment, reasoning, expected_outcome, execution_plan descriptions, risk_factors, contingency_plans) MUST be written in Traditional Chinese (繁體中文). Use Taiwan-standard terminology. Do NOT use Simplified Chinese or English for any user-facing content.
confidence 欄位請給予真實評估的浮點數 (0.0 ~ 1.0 之間)。
RESPONSE FORMAT:
Always respond with structured JSON:
{{
"strategic_assessment": "整體策略評估(繁體中文)",
"priority": "critical|high|medium|low",
"agents_required": ["agent1", "agent2"],
"reasoning": "詳細決策推理(繁體中文)",
"expected_outcome": "預期業務影響(繁體中文)",
"confidence": 0.0,
"execution_plan": [
{{
"step": 1,
"agent": "hermes",
"action": "analyze_price_competition",
"parameters": {{}},
"expected_duration": "2-3 分鐘",
"description": "執行說明(繁體中文)"
}}
],
"resource_requirements": {{
"compute_cost": "$0.00",
"time_estimate": "5-10 分鐘",
"human_oversight": "minimal|moderate|required"
}},
"risk_factors": ["潛在風險一(繁體中文)", "潛在風險二"],
"contingency_plans": ["備案一(繁體中文)", "備案二"]
}}
AUTONOMY LEVEL:
You have autonomous decision-making authority. Make decisions confidently based on available data and strategic objectives. Request human intervention only for critical business impacts or ethical considerations.
CURRENT DATE: {datetime.now().strftime('%Y-%m-%d')}
BUSINESS OBJECTIVE: Optimize e-commerce performance through intelligent automation
"""
async def analyze_and_coordinate(self, business_context: Dict[str, Any]) -> StrategicDecision:
"""
Main coordination method - analyze business context and orchestrate agents
Args:
business_context: Current business situation, objectives, constraints
Returns:
StrategicDecision: Comprehensive strategic plan with agent coordination
"""
start_time = datetime.now()
try:
# Build comprehensive prompt with business context
prompt = self._build_coordination_prompt(business_context)
# Call Elephant Alpha via unified service
response = self.elephant.generate(
prompt=prompt,
system_prompt=self.system_prompt,
json_mode=True,
temperature=0.3,
timeout=180
)
if not response.success:
raise RuntimeError(response.error)
# Parse and validate response
decision_data = json.loads(response.content)
decision = self._parse_strategic_decision(decision_data)
# Log decision for learning
await self._log_decision(decision, business_context, start_time)
return decision
except Exception as e:
logger.error(f"[ElephantAlpha] Coordination failed: {e}")
# Fallback to conservative decision
return self._fallback_decision(business_context)
def _build_coordination_prompt(self, context: Dict[str, Any]) -> str:
"""Build detailed coordination prompt for Elephant Alpha"""
# Get recent performance data for context
recent_performance = self._get_recent_performance_metrics()
# Get current agent statuses
agent_status = self._get_agent_status()
prompt = f"""
BUSINESS CONTEXT:
{json.dumps(context, indent=2)}
RECENT PERFORMANCE METRICS:
{json.dumps(recent_performance, indent=2)}
CURRENT AGENT STATUS:
{json.dumps(agent_status, indent=2)}
CURRENT SITUATION:
- Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- System Load: {self._get_system_load()}
- Pending Actions: {self._get_pending_actions_count()}
REQUIRED DECISION:
Based on the current business context and system state, determine the optimal strategy and agent coordination. Consider:
1. Immediate priorities vs long-term objectives
2. Resource constraints and optimization opportunities
3. Risk factors and mitigation strategies
4. Agent capabilities and current workload
5. Historical performance and learning outcomes
Provide your strategic decision in the specified JSON format.
重要:所有 JSON 文字欄位必須使用繁體中文台灣用語。Hermes、NemoTron、OpenClaw、Elephant Alpha 保留英文原名勿翻譯。reasoning 必須引用上方數據中的具體數字。
"""
return prompt
# _call_elephant_alpha removed - logic moved to elephant_service
def _parse_strategic_decision(self, decision_data: Dict[str, Any]) -> StrategicDecision:
"""Parse Elephant Alpha response into StrategicDecision object"""
conf = float(decision_data.get("confidence", 0.5))
if conf > 1.0: conf = 1.0
if conf < 0.0: conf = 0.0
return StrategicDecision(
priority=decision_data.get("priority", "medium"),
agents_required=decision_data.get("agents_required", []),
reasoning=decision_data.get("reasoning", ""),
expected_outcome=decision_data.get("expected_outcome", ""),
confidence=conf,
execution_plan=decision_data.get("execution_plan", []),
resource_requirements=decision_data.get("resource_requirements", {})
)
def _get_recent_performance_metrics(self) -> Dict[str, Any]:
"""Get recent performance metrics for context"""
session = get_session()
try:
# Get recent AI insights performance
rows = session.execute(text("""
SELECT
insight_type,
COUNT(*) as count,
AVG(confidence) as avg_confidence,
MAX(created_at) as last_activity
FROM ai_insights
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY insight_type
""")).fetchall()
metrics = {}
for row in rows:
metrics[row.insight_type] = {
"count": row.count,
"avg_confidence": float(row.avg_confidence) if row.avg_confidence else 0.0,
"last_activity": row.last_activity.isoformat() if row.last_activity else None
}
return metrics
finally:
session.close()
def _get_agent_status(self) -> Dict[str, Any]:
"""Get current status of all agents"""
return {
"hermes": {
"status": "active",
"last_analysis": self._get_last_analysis_time("hermes"),
"queue_size": 0
},
"nemotron": {
"status": "active",
"last_dispatch": self._get_last_analysis_time("nemotron"),
"queue_size": 0
},
"openclaw": {
"status": "active",
"last_strategy": self._get_last_analysis_time("openclaw"),
"queue_size": 0
}
}
def _get_last_analysis_time(self, agent: str) -> str:
"""Get last activity time for agent"""
session = get_session()
try:
row = session.execute(text("""
SELECT MAX(created_at) as last_time
FROM ai_insights
WHERE created_by = :agent
"""), {"agent": agent}).fetchone()
return row.last_time.isoformat() if row and row.last_time else None
finally:
session.close()
def _get_system_load(self) -> str:
"""Get current system load"""
return "normal" # Placeholder - could integrate with monitoring
def _get_pending_actions_count(self) -> int:
"""Get count of pending actions"""
session = get_session()
try:
row = session.execute(text("""
SELECT COUNT(*) as count
FROM action_plans
WHERE status = 'pending'
""")).fetchone()
return row.count if row else 0
finally:
session.close()
async def _log_decision(self, decision: StrategicDecision, context: Dict[str, Any], start_time: datetime):
"""Log decision for learning and audit"""
session = get_session()
try:
# B4b FIX: metadata → metadata_json; confidence/created_by added by migration 015
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES
(:type, :content, :confidence, :created_by, :status, :metadata)
RETURNING id
"""), {
"type": "elephant_alpha_decision",
"content": json.dumps({
"decision": decision.__dict__,
"context": context,
"duration_seconds": (datetime.now() - start_time).total_seconds()
}),
"confidence": decision.confidence,
"created_by": "elephant_alpha",
"status": "executed",
"metadata": json.dumps({"agents_required": decision.agents_required})
}).fetchone()
session.commit()
if row:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(row[0], "elephant_alpha_decision", json.dumps({
"decision": decision.__dict__,
"context": context,
}, ensure_ascii=False))
except Exception as embed_err:
logger.warning("[ElephantAlpha] embedding queue enqueue failed: %s", embed_err)
finally:
session.close()
def _fallback_decision(self, context: Dict[str, Any]) -> StrategicDecision:
"""Fallback decision if Elephant Alpha fails"""
return StrategicDecision(
priority="medium",
agents_required=["openclaw"],
reasoning="Elephant Alpha unavailable, using conservative OpenClaw strategy",
expected_outcome="Basic strategic analysis",
confidence=0.6,
execution_plan=[{
"step": 1,
"agent": "openclaw",
"action": "generate_market_analysis",
"parameters": context,
"expected_duration": "2-3 minutes"
}],
resource_requirements={"compute_cost": "$0.00", "time_estimate": "5 minutes"}
)
# Singleton instance
elephant_orchestrator = ElephantAlphaOrchestrator()