542 lines
22 KiB
Python
542 lines
22 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Elephant Alpha AI Agent Super Orchestrator
|
||
|
||
Elephant Alpha (100B, 256K context) - AI Agent 3.0 autonomous decision-making
|
||
- Deep strategic reasoning across all AI agents
|
||
- Long-term memory integration
|
||
- Autonomous workflow orchestration
|
||
- Self-learning and adaptation
|
||
|
||
Position: Super Orchestrator above Hermes/NemoTron/OpenClaw
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import asyncio
|
||
from json import JSONDecodeError
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Any, Optional
|
||
from dataclasses import dataclass
|
||
from services.logger_manager import SystemLogger
|
||
from database.manager import get_session
|
||
from sqlalchemy import text
|
||
from services.ai_provider import ai_provider_service
|
||
from services.elephant_service import elephant_service
|
||
|
||
# Elephant Alpha Configuration removed - now handled by elephant_service
|
||
|
||
logger = SystemLogger("ElephantAlphaOrchestrator").get_logger()
|
||
|
||
@dataclass
|
||
class AgentCapability:
|
||
"""AI Agent capability definition"""
|
||
name: str
|
||
model: str
|
||
strengths: List[str]
|
||
limitations: List[str]
|
||
cost_per_token: float
|
||
max_context: int
|
||
|
||
@dataclass
|
||
class StrategicDecision:
|
||
"""High-level strategic decision from Elephant Alpha"""
|
||
priority: str # critical/high/medium/low
|
||
agents_required: List[str]
|
||
reasoning: str
|
||
expected_outcome: str
|
||
confidence: float
|
||
execution_plan: List[Dict[str, Any]]
|
||
resource_requirements: Dict[str, Any]
|
||
|
||
class ElephantAlphaOrchestrator:
|
||
"""
|
||
Elephant Alpha Super Orchestrator
|
||
|
||
Capabilities:
|
||
- Cross-agent coordination and optimization
|
||
- Strategic long-term planning
|
||
- Autonomous decision making
|
||
- Resource allocation optimization
|
||
- Self-learning integration
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.ai_provider = ai_provider_service
|
||
self.elephant = elephant_service
|
||
|
||
# Agent Registry
|
||
self.agents = {
|
||
"hermes": AgentCapability(
|
||
name="Hermes Analyst",
|
||
model="hermes3:latest",
|
||
strengths=["price_competition_analysis", "threat_detection", "market_intelligence"],
|
||
limitations=["context_window", "real_time_data"],
|
||
cost_per_token=0.0,
|
||
max_context=8000
|
||
),
|
||
"nemotron": AgentCapability(
|
||
name="NemoTron Dispatcher",
|
||
model="meta/llama-3.1-8b-instruct",
|
||
strengths=["tool_calling", "action_dispatch", "decision_making"],
|
||
limitations=["strategic_depth", "long_term_planning"],
|
||
cost_per_token=0.0,
|
||
max_context=128000
|
||
),
|
||
"openclaw": AgentCapability(
|
||
name="OpenClaw Strategist",
|
||
# Ollama-first: OpenClaw strategy/reporting must use the approved
|
||
# GCP-A → GCP-B → 111 cascade; Gemini is emergency fallback only.
|
||
model="qwen2.5-coder:7b",
|
||
strengths=["strategic_planning", "market_analysis", "insight_generation"],
|
||
limitations=["real_time_execution", "direct_actions"],
|
||
cost_per_token=0.0,
|
||
max_context=32000
|
||
)
|
||
}
|
||
|
||
# System context for Elephant Alpha
|
||
self.system_prompt = self._build_system_prompt()
|
||
|
||
def _build_system_prompt(self) -> str:
|
||
"""Build comprehensive system prompt for Elephant Alpha"""
|
||
return f"""You are Elephant Alpha, the Super Orchestrator for momo-pro-system e-commerce AI platform.
|
||
|
||
重要語言規定:
|
||
1. 所有文字欄位(strategic_assessment、reasoning、expected_outcome、execution_plan 的 description、risk_factors、contingency_plans)必須使用繁體中文(台灣用語)撰寫。
|
||
2. 【禁止翻譯 Agent 名稱】Hermes、NemoTron、OpenClaw、Elephant Alpha 是專有名詞,必須保留英文原名,嚴禁音譯(禁止:赫瑪斯、內莫特朗、開爪等)。
|
||
3. reasoning 欄位必須包含具體數字(如:競品價差 X%、SKU 數量 N 個、業績跌幅 X%),嚴禁使用空泛企業用語(如「提升轉化率」、「擴大利潤邊際」等無數據支撐的說法)。
|
||
4. expected_outcome 必須說明具體預期指標(如:預計 48h 內恢復 N 個 SKU 競爭力、降低平均價差至 X%)。
|
||
|
||
CURRENT ARCHITECTURE:
|
||
- You coordinate 3 specialized AI agents: Hermes (Analyst), NemoTron (Dispatcher), OpenClaw (Strategist)
|
||
- Your context window: 256,000 tokens - enables deep strategic reasoning
|
||
- Your role: Autonomous decision-making and agent orchestration
|
||
|
||
AGENT CAPABILITIES:
|
||
1. HERMES (hermes3:latest)
|
||
- Strengths: Price competition analysis, threat detection, market intelligence
|
||
- Limitations: Limited context window, no real-time data access
|
||
- Best for: Analyzing large datasets, identifying patterns, threat assessment
|
||
|
||
2. NEMOTRON (meta/llama-3.1-8b-instruct)
|
||
- Strengths: Tool calling, action dispatch, immediate decision making
|
||
- Limitations: Limited strategic depth, short-term focus
|
||
- Best for: Executing actions, immediate responses, tool-based operations
|
||
|
||
3. OPENCLAW (qwen2.5-coder:7b / qwen3:14b via Ollama cascade)
|
||
- Strengths: Strategic planning, market analysis, insight generation
|
||
- Limitations: No direct execution capabilities, analysis-only
|
||
- Best for: Long-term strategy, market insights, recommendation generation
|
||
|
||
Gemini policy:
|
||
- Gemini is not a primary agent model.
|
||
- Gemini may only be used after the approved Ollama cascade fails and the
|
||
operator explicitly unlocks the emergency fallback guard.
|
||
|
||
YOUR SUPERVISORY CAPABILITIES:
|
||
- Cross-agent coordination and optimization
|
||
- Strategic long-term planning (weeks/months ahead)
|
||
- Autonomous decision making with confidence scoring
|
||
- Resource allocation optimization
|
||
- Self-learning integration from past outcomes
|
||
- Conflict resolution between agent recommendations
|
||
- Priority-based task scheduling
|
||
- Risk assessment and mitigation planning
|
||
|
||
DECISION FRAMEWORK:
|
||
1. Analyze the business context and objectives
|
||
2. Evaluate which agents are best suited for the task
|
||
3. Determine optimal sequencing and parallelization
|
||
4. Assess resource requirements and constraints
|
||
5. Generate confidence-scored recommendations
|
||
6. Create detailed execution plans
|
||
7. Monitor and adapt based on outcomes
|
||
|
||
ALLOWED EXECUTION ACTIONS:
|
||
- hermes / analyze_price_competition
|
||
- nemotron / dispatch_alert
|
||
- elephant_alpha / auto_heal
|
||
- elephant_alpha / code_fix
|
||
|
||
OpenClaw strategy/report actions are advisory only. Do not place openclaw / generate_* actions in execution_plan; put strategic recommendations in strategic_assessment, reasoning, expected_outcome, or contingency_plans.
|
||
|
||
價格調整紅線:
|
||
- 禁止輸出 execute_price_adjustment、adjust_price、apply_price_change、update_price。
|
||
- 若需要調價,請在 strategic_assessment / reasoning 中輸出定價策略建議,並明確說明「需要人工核准後才能調價」;不要放入 execution_plan。
|
||
- 本系統不得由 AI 直接修改商品價格,只能產生建議與人工覆核項目。
|
||
|
||
BUSINESS CONTEXT:
|
||
- E-commerce platform (momo-pro-system)
|
||
- Real-time price competition monitoring
|
||
- Automated threat detection and response
|
||
- Strategic market positioning
|
||
- Revenue optimization and growth
|
||
- Customer experience enhancement
|
||
|
||
LANGUAGE REQUIREMENT:
|
||
ALL text fields in your JSON response (strategic_assessment, reasoning, expected_outcome, execution_plan descriptions, risk_factors, contingency_plans) MUST be written in Traditional Chinese (繁體中文). Use Taiwan-standard terminology. Do NOT use Simplified Chinese or English for any user-facing content.
|
||
confidence 欄位請給予真實評估的浮點數 (0.0 ~ 1.0 之間)。
|
||
|
||
RESPONSE FORMAT:
|
||
Always respond with structured JSON:
|
||
{{
|
||
"strategic_assessment": "整體策略評估(繁體中文)",
|
||
"priority": "critical|high|medium|low",
|
||
"agents_required": ["agent1", "agent2"],
|
||
"reasoning": "詳細決策推理(繁體中文)",
|
||
"expected_outcome": "預期業務影響(繁體中文)",
|
||
"confidence": 0.0,
|
||
"execution_plan": [
|
||
{{
|
||
"step": 1,
|
||
"agent": "hermes",
|
||
"action": "analyze_price_competition",
|
||
"parameters": {{}},
|
||
"expected_duration": "2-3 分鐘",
|
||
"description": "執行說明(繁體中文)"
|
||
}}
|
||
],
|
||
"resource_requirements": {{
|
||
"compute_cost": "$0.00",
|
||
"time_estimate": "5-10 分鐘",
|
||
"human_oversight": "minimal|moderate|required"
|
||
}},
|
||
"risk_factors": ["潛在風險一(繁體中文)", "潛在風險二"],
|
||
"contingency_plans": ["備案一(繁體中文)", "備案二"]
|
||
}}
|
||
|
||
AUTONOMY LEVEL:
|
||
You have autonomous decision-making authority. Make decisions confidently based on available data and strategic objectives. Request human intervention only for critical business impacts or ethical considerations.
|
||
|
||
CURRENT DATE: {datetime.now().strftime('%Y-%m-%d')}
|
||
BUSINESS OBJECTIVE: Optimize e-commerce performance through intelligent automation
|
||
"""
|
||
|
||
async def analyze_and_coordinate(self, business_context: Dict[str, Any]) -> StrategicDecision:
|
||
"""
|
||
Main coordination method - analyze business context and orchestrate agents
|
||
|
||
Args:
|
||
business_context: Current business situation, objectives, constraints
|
||
|
||
Returns:
|
||
StrategicDecision: Comprehensive strategic plan with agent coordination
|
||
"""
|
||
start_time = datetime.now()
|
||
|
||
try:
|
||
# Build comprehensive prompt with business context
|
||
prompt = self._build_coordination_prompt(business_context)
|
||
|
||
# Call Elephant Alpha via unified service
|
||
response = self.elephant.generate(
|
||
prompt=prompt,
|
||
system_prompt=self.system_prompt,
|
||
json_mode=True,
|
||
temperature=0.3,
|
||
timeout=180
|
||
)
|
||
|
||
if not response.success:
|
||
raise RuntimeError(response.error)
|
||
|
||
# Parse and validate response. Some NIM-compatible models still wrap
|
||
# JSON in fenced blocks or prepend reasoning text even with json_mode.
|
||
try:
|
||
decision_data = self._extract_json_object(response.content)
|
||
except ValueError as parse_error:
|
||
logger.warning(
|
||
"[ElephantAlpha] Coordination JSON parse failed; using evidence fallback. "
|
||
"model=%s error=%s preview=%r",
|
||
response.model,
|
||
parse_error,
|
||
(response.content or "")[:240],
|
||
)
|
||
return self._fallback_decision(
|
||
business_context,
|
||
reason="Elephant Alpha 回應不是可解析 JSON,已改用實證 fallback。",
|
||
)
|
||
decision = self._parse_strategic_decision(decision_data)
|
||
|
||
# Log decision for learning
|
||
await self._log_decision(decision, business_context, start_time)
|
||
|
||
return decision
|
||
|
||
except Exception as e:
|
||
logger.error(f"[ElephantAlpha] Coordination failed: {e}")
|
||
# Fallback to conservative decision
|
||
return self._fallback_decision(
|
||
business_context,
|
||
reason=f"Elephant Alpha 協調失敗:{type(e).__name__}",
|
||
)
|
||
|
||
@staticmethod
|
||
def _extract_json_object(raw: str) -> Dict[str, Any]:
|
||
"""Extract one JSON object from tolerant LLM output."""
|
||
text_value = (raw or "").strip()
|
||
if not text_value:
|
||
raise ValueError("empty response")
|
||
|
||
if text_value.startswith("```"):
|
||
lines = text_value.splitlines()
|
||
if lines and lines[0].strip().startswith("```"):
|
||
lines = lines[1:]
|
||
if lines and lines[-1].strip().startswith("```"):
|
||
lines = lines[:-1]
|
||
text_value = "\n".join(lines).strip()
|
||
|
||
try:
|
||
parsed = json.loads(text_value)
|
||
if isinstance(parsed, dict):
|
||
return parsed
|
||
raise ValueError("JSON root is not an object")
|
||
except JSONDecodeError:
|
||
pass
|
||
|
||
decoder = json.JSONDecoder()
|
||
for idx, char in enumerate(text_value):
|
||
if char != "{":
|
||
continue
|
||
try:
|
||
parsed, _end = decoder.raw_decode(text_value[idx:])
|
||
except JSONDecodeError:
|
||
continue
|
||
if isinstance(parsed, dict):
|
||
return parsed
|
||
|
||
raise ValueError("no JSON object found")
|
||
|
||
def _build_coordination_prompt(self, context: Dict[str, Any]) -> str:
|
||
"""Build detailed coordination prompt for Elephant Alpha"""
|
||
|
||
# Get recent performance data for context
|
||
recent_performance = self._get_recent_performance_metrics()
|
||
|
||
# Get current agent statuses
|
||
agent_status = self._get_agent_status()
|
||
|
||
prompt = f"""
|
||
BUSINESS CONTEXT:
|
||
{json.dumps(context, indent=2)}
|
||
|
||
RECENT PERFORMANCE METRICS:
|
||
{json.dumps(recent_performance, indent=2)}
|
||
|
||
CURRENT AGENT STATUS:
|
||
{json.dumps(agent_status, indent=2)}
|
||
|
||
CURRENT SITUATION:
|
||
- Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||
- System Load: {self._get_system_load()}
|
||
- Pending Actions: {self._get_pending_actions_count()}
|
||
|
||
REQUIRED DECISION:
|
||
Based on the current business context and system state, determine the optimal strategy and agent coordination. Consider:
|
||
1. Immediate priorities vs long-term objectives
|
||
2. Resource constraints and optimization opportunities
|
||
3. Risk factors and mitigation strategies
|
||
4. Agent capabilities and current workload
|
||
5. Historical performance and learning outcomes
|
||
|
||
Provide your strategic decision in the specified JSON format.
|
||
|
||
重要:所有 JSON 文字欄位必須使用繁體中文(台灣用語)。Hermes、NemoTron、OpenClaw、Elephant Alpha 保留英文原名勿翻譯。reasoning 必須引用上方數據中的具體數字。
|
||
"""
|
||
return prompt
|
||
|
||
# _call_elephant_alpha removed - logic moved to elephant_service
|
||
|
||
def _parse_strategic_decision(self, decision_data: Dict[str, Any]) -> StrategicDecision:
|
||
"""Parse Elephant Alpha response into StrategicDecision object"""
|
||
conf = float(decision_data.get("confidence", 0.5))
|
||
if conf > 1.0: conf = 1.0
|
||
if conf < 0.0: conf = 0.0
|
||
|
||
return StrategicDecision(
|
||
priority=decision_data.get("priority", "medium"),
|
||
agents_required=decision_data.get("agents_required", []),
|
||
reasoning=decision_data.get("reasoning", ""),
|
||
expected_outcome=decision_data.get("expected_outcome", ""),
|
||
confidence=conf,
|
||
execution_plan=decision_data.get("execution_plan", []),
|
||
resource_requirements=decision_data.get("resource_requirements", {})
|
||
)
|
||
|
||
def _get_recent_performance_metrics(self) -> Dict[str, Any]:
|
||
"""Get recent performance metrics for context"""
|
||
session = get_session()
|
||
try:
|
||
# Get recent AI insights performance
|
||
rows = session.execute(text("""
|
||
SELECT
|
||
insight_type,
|
||
COUNT(*) as count,
|
||
AVG(confidence) as avg_confidence,
|
||
MAX(created_at) as last_activity
|
||
FROM ai_insights
|
||
WHERE created_at >= NOW() - INTERVAL '7 days'
|
||
GROUP BY insight_type
|
||
""")).fetchall()
|
||
|
||
metrics = {}
|
||
for row in rows:
|
||
metrics[row.insight_type] = {
|
||
"count": row.count,
|
||
"avg_confidence": float(row.avg_confidence) if row.avg_confidence else 0.0,
|
||
"last_activity": row.last_activity.isoformat() if row.last_activity else None
|
||
}
|
||
|
||
return metrics
|
||
|
||
finally:
|
||
session.close()
|
||
|
||
def _get_agent_status(self) -> Dict[str, Any]:
|
||
"""Get current status of all agents"""
|
||
return {
|
||
"hermes": {
|
||
"status": "active",
|
||
"last_analysis": self._get_last_analysis_time("hermes"),
|
||
"queue_size": 0
|
||
},
|
||
"nemotron": {
|
||
"status": "active",
|
||
"last_dispatch": self._get_last_analysis_time("nemotron"),
|
||
"queue_size": 0
|
||
},
|
||
"openclaw": {
|
||
"status": "active",
|
||
"last_strategy": self._get_last_analysis_time("openclaw"),
|
||
"queue_size": 0
|
||
}
|
||
}
|
||
|
||
def _get_last_analysis_time(self, agent: str) -> str:
|
||
"""Get last activity time for agent"""
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(text("""
|
||
SELECT MAX(created_at) as last_time
|
||
FROM ai_insights
|
||
WHERE created_by = :agent
|
||
"""), {"agent": agent}).fetchone()
|
||
|
||
return row.last_time.isoformat() if row and row.last_time else None
|
||
|
||
finally:
|
||
session.close()
|
||
|
||
def _get_system_load(self) -> str:
|
||
"""Get current system load"""
|
||
return "normal" # Placeholder - could integrate with monitoring
|
||
|
||
def _get_pending_actions_count(self) -> int:
|
||
"""Get count of pending actions"""
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(text("""
|
||
SELECT COUNT(*) as count
|
||
FROM action_plans
|
||
WHERE status = 'pending'
|
||
""")).fetchone()
|
||
|
||
return row.count if row else 0
|
||
|
||
finally:
|
||
session.close()
|
||
|
||
async def _log_decision(self, decision: StrategicDecision, context: Dict[str, Any], start_time: datetime):
|
||
"""Log decision for learning and audit"""
|
||
session = get_session()
|
||
try:
|
||
# B4b FIX: metadata → metadata_json; confidence/created_by added by migration 015
|
||
row = session.execute(text("""
|
||
INSERT INTO ai_insights
|
||
(insight_type, content, confidence, created_by, status, metadata_json)
|
||
VALUES
|
||
(:type, :content, :confidence, :created_by, :status, :metadata)
|
||
RETURNING id
|
||
"""), {
|
||
"type": "elephant_alpha_decision",
|
||
"content": json.dumps({
|
||
"decision": decision.__dict__,
|
||
"context": context,
|
||
"duration_seconds": (datetime.now() - start_time).total_seconds()
|
||
}),
|
||
"confidence": decision.confidence,
|
||
"created_by": "elephant_alpha",
|
||
"status": "executed",
|
||
"metadata": json.dumps({"agents_required": decision.agents_required})
|
||
}).fetchone()
|
||
session.commit()
|
||
if row:
|
||
try:
|
||
from services.openclaw_learning_service import enqueue_insight_embedding
|
||
enqueue_insight_embedding(row[0], "elephant_alpha_decision", json.dumps({
|
||
"decision": decision.__dict__,
|
||
"context": context,
|
||
}, ensure_ascii=False))
|
||
except Exception as embed_err:
|
||
logger.warning("[ElephantAlpha] embedding queue enqueue failed: %s", embed_err)
|
||
|
||
finally:
|
||
session.close()
|
||
|
||
@staticmethod
|
||
def _context_concrete_actions(context: Dict[str, Any]) -> List[str]:
|
||
conditions = context.get("conditions") if isinstance(context, dict) else {}
|
||
if not isinstance(conditions, dict):
|
||
return []
|
||
for key in ("_prefetched_hermes_threats", "_db_evidence_actions"):
|
||
actions = conditions.get(key)
|
||
if isinstance(actions, list):
|
||
cleaned = [str(action).strip() for action in actions if str(action).strip()]
|
||
if cleaned:
|
||
return cleaned[:5]
|
||
return []
|
||
|
||
def _fallback_decision(self, context: Dict[str, Any], *, reason: str = "Elephant Alpha unavailable") -> StrategicDecision:
|
||
"""Fallback decision if Elephant Alpha fails"""
|
||
concrete_actions = self._context_concrete_actions(context)
|
||
trigger_type = str((context or {}).get("trigger_type") or "unknown")
|
||
if concrete_actions:
|
||
return StrategicDecision(
|
||
priority="high",
|
||
agents_required=["hermes", "elephant_alpha"],
|
||
reasoning=(
|
||
f"{reason} 已保留 {len(concrete_actions)} 筆 DB/Hermes 價格比對實證;"
|
||
"僅送人工覆核,不執行自動調價。"
|
||
),
|
||
expected_outcome="產生可稽核的人工覆核告警,避免使用無法解析的 LLM 推論文字。",
|
||
confidence=0.74,
|
||
execution_plan=[],
|
||
resource_requirements={
|
||
"compute_cost": "$0.00",
|
||
"time_estimate": "人工覆核",
|
||
"human_oversight": "required",
|
||
},
|
||
)
|
||
|
||
return StrategicDecision(
|
||
priority="medium",
|
||
agents_required=["elephant_alpha"],
|
||
reasoning=(
|
||
f"{reason} trigger={trigger_type},且沒有可稽核的 DB/Hermes 實證;"
|
||
"不產生策略型行動計畫,避免把推測當成事實。"
|
||
),
|
||
expected_outcome="不執行自動動作;需要先確認資料來源或等待下一輪具體實證。",
|
||
confidence=0.6,
|
||
execution_plan=[],
|
||
resource_requirements={
|
||
"compute_cost": "$0.00",
|
||
"time_estimate": "等待實證",
|
||
"human_oversight": "required",
|
||
},
|
||
)
|
||
|
||
# Singleton instance
|
||
elephant_orchestrator = ElephantAlphaOrchestrator()
|