All checks were successful
CD Pipeline / deploy (push) Successful in 2m37s
- C1: Removed weekly_strategy and meta_analysis from Elephant Alpha's SAFE_ACTIONS (orchestrator prompt and engine _ACTION_ZH) to prevent autonomous generation of scheduled reports - C2: Removed hardcoded 0.85 confidence example from orchestrator prompt and implemented bounding validation (0.0~1.0) in _parse_strategic_decision - C3: Expanded ADR-019 data freshness gate (_ppt_check_data_freshness) to cover /ppt weekly and /ppt strategy routes, proactively warning users of stale data
946 lines
38 KiB
Python
946 lines
38 KiB
Python
"""
|
||
elephant_alpha_autonomous_engine.py
|
||
|
||
AI 3.0 Autonomous Operations:
|
||
- Self-learning from outcomes
|
||
- Predictive decision making
|
||
- Autonomous resource optimization
|
||
- Continuous improvement loop
|
||
|
||
ADR-012 Compliance:
|
||
§③ 單一 audit trail — 所有基行完畢後必發 triaged_alert Telegram
|
||
§⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram
|
||
ADR-013 Compliance:
|
||
resource_optimization trigger → auto_heal_service.handle_exception
|
||
"""
|
||
|
||
import asyncio
|
||
import inspect
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import threading
|
||
from dataclasses import dataclass, asdict
|
||
from datetime import datetime, timedelta
|
||
from enum import Enum
|
||
from typing import Dict, List, Any, Optional
|
||
|
||
from sqlalchemy import text
|
||
from services.logger_manager import SystemLogger
|
||
from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision
|
||
from database.manager import get_session
|
||
|
||
logger = SystemLogger("ElephantAlphaEngine").get_logger()
|
||
|
||
# ---- Configuration ----
|
||
SSH_JUMP_HOST = os.getenv("ELEPHANT_ALPHA_JUMP_HOST", "192.168.0.110")
|
||
SSH_JUMP_USER = os.getenv("ELEPHANT_ALPHA_JUMP_USER", "wooo")
|
||
SSH_KEY_PATH = os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH", os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519"))
|
||
SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22"))
|
||
SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10"))
|
||
SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60"))
|
||
|
||
CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
|
||
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
|
||
CONFIDENCE_THRESHOLD = float(os.getenv("ELEPHANT_ALPHA_CONFIDENCE_THRESHOLD", "0.7"))
|
||
MAX_AUTONOMOUS_DECISIONS_PER_HOUR = int(os.getenv("ELEPHANT_ALPHA_MAX_AUTONOMOUS_DECISIONS_PER_HOUR", "10"))
|
||
|
||
# ---- Constants ----
|
||
_ALLOWED_ACTION_TYPES = frozenset({
|
||
"DOCKER_RESTART",
|
||
"WAIT_RETRY",
|
||
"ALERT_ONLY",
|
||
"SSH_CMD",
|
||
"CODE_FIX",
|
||
})
|
||
|
||
_TRIGGER_TO_DECISION_TYPE = {}
|
||
|
||
|
||
class DecisionType(Enum):
|
||
PRICE_OPTIMIZATION = "price_optimization"
|
||
THREAT_RESPONSE = "threat_response"
|
||
MARKET_OPPORTUNITY = "market_opportunity"
|
||
RESOURCE_ALLOCATION = "resource_allocation"
|
||
STRATEGIC_PLANNING = "strategic_planning"
|
||
|
||
|
||
_TRIGGER_TO_DECISION_TYPE = {
|
||
"price_drop_alert": DecisionType.PRICE_OPTIMIZATION,
|
||
"market_opportunity": DecisionType.MARKET_OPPORTUNITY,
|
||
"threat_escalation": DecisionType.THREAT_RESPONSE,
|
||
"resource_optimization": DecisionType.RESOURCE_ALLOCATION,
|
||
"code_exception": DecisionType.RESOURCE_ALLOCATION, # mapped for handling
|
||
}
|
||
|
||
_TRIGGER_ZH = {
|
||
"price_drop_alert": "價格下滑警報",
|
||
"market_opportunity": "市場機會偵測",
|
||
"threat_escalation": "威脅升級通報",
|
||
"resource_optimization": "資源調配優化",
|
||
"code_exception": "程式碼異常偵測",
|
||
"weekly_insight": "全景電商洞察分析",
|
||
}
|
||
|
||
# Agent 名稱保留英文,僅補上角色說明(禁止音譯)
|
||
_AGENT_LABEL = {
|
||
"hermes": "Hermes",
|
||
"nemotron": "NemoTron",
|
||
"openclaw": "OpenClaw",
|
||
"elephant_alpha": "Elephant Alpha",
|
||
"scheduler": "Scheduler",
|
||
}
|
||
|
||
_ACTION_ZH = {
|
||
"analyze_price_competition": "競品價格分析",
|
||
"dispatch_alert": "派送告警通知",
|
||
"dispatch_price_updates": "派送定價更新",
|
||
"dispatch_price_update": "派送定價更新",
|
||
"generate_strategic_analysis": "產出策略分析",
|
||
"generate_market_analysis": "市場分析",
|
||
"generate_pricing_strategy": "定價策略建議",
|
||
"execute_price_adjustment": "價格調整覆核",
|
||
"adjust_price": "調整定價",
|
||
"send_alert": "發送告警",
|
||
}
|
||
|
||
_PRICE_ADJUSTMENT_REVIEW_ACTIONS = frozenset({
|
||
"execute_price_adjustment",
|
||
"adjust_price",
|
||
"apply_price_change",
|
||
"update_price",
|
||
"dispatch_price_update",
|
||
"dispatch_price_updates",
|
||
})
|
||
|
||
|
||
def _zh_trigger(trigger_type: str) -> str:
|
||
return _TRIGGER_ZH.get(trigger_type, trigger_type)
|
||
|
||
|
||
def _zh_step(step: dict) -> str:
|
||
agent_key = step.get("agent", "?").lower()
|
||
agent = _AGENT_LABEL.get(agent_key, step.get("agent", "?"))
|
||
action = _ACTION_ZH.get(step.get("action", ""), step.get("action", ""))
|
||
desc = step.get("description", "")
|
||
# 優先用 description(Gemini 生成的繁中說明),其次用 action 中文對照
|
||
detail = desc if desc else action
|
||
return f"[{agent}] {detail}"
|
||
|
||
|
||
@dataclass
|
||
class DecisionOutcome:
|
||
decision_id: str
|
||
decision_type: DecisionType
|
||
prediction: Dict[str, Any]
|
||
actual_outcome: Dict[str, Any]
|
||
accuracy_score: float
|
||
business_impact: float
|
||
timestamp: datetime
|
||
lessons_learned: List[str]
|
||
|
||
|
||
class AutonomousTrigger:
|
||
def __init__(
|
||
self,
|
||
trigger_type: str,
|
||
conditions: Dict[str, Any],
|
||
threshold: float,
|
||
enabled: bool,
|
||
):
|
||
self.trigger_type = trigger_type
|
||
self.conditions = conditions
|
||
self.threshold = threshold
|
||
self.enabled = enabled
|
||
self.last_triggered: Optional[datetime] = None
|
||
self._temp_error_msg: Optional[str] = None
|
||
self._temp_target_file: Optional[str] = None
|
||
|
||
@property
|
||
def temp_error_msg(self) -> Optional[str]:
|
||
return self._temp_error_msg
|
||
|
||
@temp_error_msg.setter
|
||
def temp_error_msg(self, value: Optional[str]) -> None:
|
||
self._temp_error_msg = value
|
||
|
||
@property
|
||
def temp_target_file(self) -> Optional[str]:
|
||
return self._temp_target_file
|
||
|
||
@temp_target_file.setter
|
||
def temp_target_file(self, value: Optional[str]) -> None:
|
||
self._temp_target_file = value
|
||
|
||
|
||
class ElephantAlphaAutonomousEngine:
|
||
"""
|
||
Elephant Alpha Autonomous Decision Engine
|
||
|
||
Features:
|
||
- Persistent escalation dedup via SQLite
|
||
- Secure SSH key handling with enforced file permissions
|
||
- Timeouts and retries for external calls
|
||
- Circuit-breaker for repeated failures
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._log = logger
|
||
self._init_cache_db()
|
||
self._init_ssh_key_once()
|
||
self.decision_history: List[DecisionOutcome] = []
|
||
self.triggers: List[AutonomousTrigger] = []
|
||
self.confidence_threshold = CONFIDENCE_THRESHOLD
|
||
self.max_autonomous_decisions_per_hour = MAX_AUTONOMOUS_DECISIONS_PER_HOUR
|
||
self._initialize_triggers()
|
||
self._circuit_breaker_state = {"failures": 0, "last_failure": None}
|
||
self._cb_threshold = 5
|
||
self._cb_reset_after = timedelta(minutes=5)
|
||
|
||
# ---- DB ----
|
||
def _init_cache_db(self) -> None:
|
||
self._db_lock = threading.Lock()
|
||
self._conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False)
|
||
self._conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS escalation_dedup (
|
||
trigger_type TEXT PRIMARY KEY,
|
||
last_triggered INTEGER NOT NULL
|
||
)
|
||
""")
|
||
self._conn.commit()
|
||
|
||
def _store_escalation(self, trigger_type: str) -> None:
|
||
with self._db_lock:
|
||
self._conn.execute(
|
||
"INSERT OR REPLACE INTO escalation_dedup (trigger_type, last_triggered) VALUES (?, ?)",
|
||
(trigger_type, int(datetime.now().timestamp())),
|
||
)
|
||
self._conn.commit()
|
||
|
||
def _load_escalation(self, trigger_type: str) -> Optional[int]:
|
||
with self._db_lock:
|
||
row = self._conn.execute(
|
||
"SELECT last_triggered FROM escalation_dedup WHERE trigger_type = ?",
|
||
(trigger_type,),
|
||
).fetchone()
|
||
return row[0] if row else None
|
||
|
||
# ---- SSH ----
|
||
def _init_ssh_key_once(self) -> None:
|
||
key_path = os.path.expanduser(SSH_KEY_PATH)
|
||
if not os.path.exists(key_path):
|
||
self._log.warning("SSH key not found at %s; some operations may fail.", key_path)
|
||
return
|
||
try:
|
||
os.chmod(key_path, 0o600)
|
||
except Exception as e:
|
||
self._log.warning("Failed to secure SSH key permissions: %s", e)
|
||
|
||
# ---- Triggers ----
|
||
def _initialize_triggers(self) -> None:
|
||
self.triggers = [
|
||
AutonomousTrigger(
|
||
trigger_type="price_drop_alert",
|
||
conditions={"competitor_price_drop_pct": 15, "sales_velocity": "decreasing"},
|
||
threshold=0.8,
|
||
enabled=True,
|
||
),
|
||
AutonomousTrigger(
|
||
trigger_type="market_opportunity",
|
||
conditions={"competitor_price_premium": ">5%", "our_stock": "available"},
|
||
threshold=0.7,
|
||
enabled=True,
|
||
),
|
||
AutonomousTrigger(
|
||
trigger_type="threat_escalation",
|
||
conditions={"threat_score": 0.9, "trend": "worsening"},
|
||
threshold=0.85,
|
||
enabled=True,
|
||
),
|
||
AutonomousTrigger(
|
||
trigger_type="resource_optimization",
|
||
conditions={"system_load": "high", "queue_size": ">10"},
|
||
threshold=0.6,
|
||
enabled=True,
|
||
),
|
||
AutonomousTrigger(
|
||
trigger_type="code_exception",
|
||
conditions={
|
||
"scan_containers": ["momo-pro-system", "momo-scheduler"],
|
||
"error_patterns": ["Traceback", "ImportError", "RuntimeError", "ModuleNotFoundError"],
|
||
},
|
||
threshold=1.0,
|
||
enabled=True,
|
||
),
|
||
AutonomousTrigger(
|
||
trigger_type="weekly_insight",
|
||
conditions={"min_new_insights": 5, "cooldown_hours": 6},
|
||
threshold=0.7,
|
||
# weekly_strategy 由 run_scheduler.py 週一 06:00 統一發送,EA 不再 6h 觸發(防 35+/週重複)
|
||
enabled=False,
|
||
),
|
||
]
|
||
|
||
# ---- Main loop ----
|
||
async def start_autonomous_monitoring(self) -> None:
|
||
self._log.info("Starting autonomous monitoring engine")
|
||
while True:
|
||
try:
|
||
await self._check_triggers()
|
||
await self._continuous_learning()
|
||
await self._optimize_resources()
|
||
await asyncio.sleep(60)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as e:
|
||
self._log.exception("Autonomous monitoring error: %s", e)
|
||
await asyncio.sleep(30)
|
||
|
||
# ---- Trigger evaluation ----
|
||
async def _check_triggers(self) -> None:
|
||
if self._is_circuit_open():
|
||
self._log.warning("Circuit breaker open; skipping trigger checks")
|
||
return
|
||
|
||
for trigger in self.triggers:
|
||
if not trigger.enabled:
|
||
continue
|
||
cooldown_min = self._get_cooldown(trigger.trigger_type)
|
||
last = trigger.last_triggered
|
||
if last and (datetime.now() - last).total_seconds() / 60 < cooldown_min:
|
||
continue
|
||
if await self._evaluate_trigger(trigger):
|
||
await self._execute_autonomous_decision(trigger)
|
||
trigger.last_triggered = datetime.now()
|
||
|
||
def _get_cooldown(self, trigger_type: str) -> int:
|
||
return self._get_cooldown_min(trigger_type)
|
||
|
||
def _get_cooldown_min(self, trigger_type: str) -> int:
|
||
return ESCALATION_COOLDOWN_MIN
|
||
|
||
async def _evaluate_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
try:
|
||
if trigger.trigger_type == "price_drop_alert":
|
||
return await self._check_price_drop_trigger(trigger)
|
||
if trigger.trigger_type == "market_opportunity":
|
||
return await self._check_market_opportunity_trigger(trigger)
|
||
if trigger.trigger_type == "threat_escalation":
|
||
return await self._check_threat_escalation_trigger(trigger)
|
||
if trigger.trigger_type == "resource_optimization":
|
||
return await self._check_resource_optimization_trigger(trigger)
|
||
if trigger.trigger_type == "code_exception":
|
||
return await self._check_code_exception_trigger(trigger)
|
||
if trigger.trigger_type == "weekly_insight":
|
||
return await self._check_weekly_insight_trigger(trigger)
|
||
except Exception as e:
|
||
self._log.exception("Trigger evaluation error: %s", e)
|
||
return False
|
||
|
||
# ---- Individual trigger checkers ----
|
||
async def _check_price_drop_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
text("""
|
||
SELECT p.i_code AS sku, p.name, p.category,
|
||
cp.price AS competitor_price, pr.price AS momo_price,
|
||
((pr.price - cp.price) / pr.price * 100) AS price_gap_pct
|
||
FROM products p
|
||
JOIN (
|
||
SELECT DISTINCT ON (product_id) product_id, price
|
||
FROM price_records
|
||
ORDER BY product_id, timestamp DESC
|
||
) pr ON pr.product_id = p.id
|
||
JOIN competitor_prices cp ON cp.sku = p.i_code
|
||
WHERE cp.expires_at > NOW()
|
||
AND cp.price < pr.price * 0.85
|
||
AND cp.crawled_at >= NOW() - INTERVAL '2 hours'
|
||
LIMIT 10
|
||
""")
|
||
).fetchall()
|
||
return len(rows) >= 3
|
||
finally:
|
||
session.close()
|
||
|
||
async def _check_market_opportunity_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
text("""
|
||
SELECT p.i_code AS sku
|
||
FROM products p
|
||
JOIN (
|
||
SELECT DISTINCT ON (product_id) product_id, price
|
||
FROM price_records
|
||
ORDER BY product_id, timestamp DESC
|
||
) pr ON pr.product_id = p.id
|
||
JOIN competitor_prices cp ON cp.sku = p.i_code
|
||
WHERE cp.expires_at > NOW()
|
||
AND cp.price > pr.price * 1.05
|
||
AND cp.crawled_at >= NOW() - INTERVAL '1 hour'
|
||
LIMIT 5
|
||
""")
|
||
).fetchall()
|
||
return bool(rows)
|
||
finally:
|
||
session.close()
|
||
|
||
async def _check_threat_escalation_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
session = get_session()
|
||
try:
|
||
rows = session.execute(
|
||
text("""
|
||
SELECT product_sku AS sku, confidence, content, created_at
|
||
FROM ai_insights
|
||
WHERE insight_type = 'price_alert'
|
||
AND confidence >= 0.9
|
||
AND created_at >= NOW() - INTERVAL '30 minutes'
|
||
AND metadata_json LIKE '%worsening%'
|
||
LIMIT 5
|
||
""")
|
||
).fetchall()
|
||
return len(rows) >= 2
|
||
finally:
|
||
session.close()
|
||
|
||
async def _check_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
return (self._get_action_queue_size() > 10
|
||
or self._get_system_load_percentage() > 80)
|
||
|
||
async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"])
|
||
error_ptns = trigger.conditions.get("error_patterns", ["Traceback", "ImportError", "RuntimeError", "ModuleNotFoundError"])
|
||
|
||
key_path = os.path.expanduser(SSH_KEY_PATH)
|
||
if not os.path.exists(key_path):
|
||
self._log.warning("SSH key missing for code exception scan: %s", key_path)
|
||
return False
|
||
|
||
has_error = False
|
||
error_context = []
|
||
target_file = ""
|
||
|
||
for c in containers:
|
||
try:
|
||
rc, out, err = self._ssh_exec(
|
||
"192.168.0.188",
|
||
"ollama",
|
||
["docker", "logs", "--since", "5m", c]
|
||
)
|
||
if rc != 0:
|
||
self._log.debug("Failed to fetch logs for %s via SSH", c)
|
||
continue
|
||
combined = out + "\n" + err
|
||
if "Traceback (most recent call last):" in combined:
|
||
lines = combined.splitlines()
|
||
for i, line in enumerate(lines):
|
||
if "Traceback" in line:
|
||
block = lines[i:i + 15]
|
||
blk_str = "\n".join(block)
|
||
m = re.search(r'File "([^"]*/(services|routes|database)/[^"]+\.py)"', blk_str)
|
||
if m:
|
||
tf = m.group(1)
|
||
if "/app/" in tf:
|
||
tf = tf.split("/app/")[1]
|
||
elif "momo-pro-system/" in tf:
|
||
tf = tf.split("momo-pro-system/")[1]
|
||
target_file = tf
|
||
error_context.append(f"[{c}] {blk_str}")
|
||
has_error = True
|
||
break
|
||
except Exception as e:
|
||
self._log.debug("Error scanning container %s: %s", c, e)
|
||
|
||
if has_error and error_context:
|
||
trigger.temp_error_msg = "\n".join(error_context)
|
||
trigger.temp_target_file = target_file
|
||
return True
|
||
return False
|
||
|
||
async def _check_weekly_insight_trigger(self, trigger: AutonomousTrigger) -> bool:
|
||
"""
|
||
每 6 小時累積 ≥ 5 筆新 ai_insights 時觸發 OpenClaw 全景分析。
|
||
cooldown 機制防止重複觸發。
|
||
"""
|
||
min_new = trigger.conditions.get("min_new_insights", 5)
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(text("""
|
||
SELECT COUNT(*) FROM ai_insights
|
||
WHERE created_at >= NOW() - INTERVAL '6 hours'
|
||
AND insight_type IN ('price_alert', 'recommendation', 'relearn_event')
|
||
AND created_by != 'openclaw'
|
||
""")).fetchone()
|
||
count = int(row[0]) if row else 0
|
||
return count >= min_new
|
||
except Exception as e:
|
||
self._log.debug("weekly_insight trigger check failed: %s", e)
|
||
return False
|
||
finally:
|
||
session.close()
|
||
|
||
# ---- Decision execution ----
|
||
async def _build_trigger_context(self, trigger: AutonomousTrigger) -> Dict[str, Any]:
|
||
"""Build the business context passed to Elephant Alpha."""
|
||
context = {
|
||
"trigger_type": trigger.trigger_type,
|
||
"trigger_label": _zh_trigger(trigger.trigger_type),
|
||
"conditions": trigger.conditions,
|
||
"threshold": trigger.threshold,
|
||
"enabled": trigger.enabled,
|
||
"last_triggered": trigger.last_triggered.isoformat() if trigger.last_triggered else None,
|
||
"generated_at": datetime.now().isoformat(),
|
||
"system_state": {
|
||
"action_queue_size": self._safe_metric(self._get_action_queue_size, default=0),
|
||
"system_load_pct": self._safe_metric(self._get_system_load_percentage, default=0.0),
|
||
"circuit_breaker_failures": self._circuit_breaker_state.get("failures", 0),
|
||
"confidence_threshold": self.confidence_threshold,
|
||
"max_autonomous_decisions_per_hour": self.max_autonomous_decisions_per_hour,
|
||
},
|
||
"objectives": [
|
||
"維持 MOMO 商品監控與業績分析服務可用性",
|
||
"降低重複告警與人工排查成本",
|
||
"在低風險範圍內產生可稽核的自動化行動建議",
|
||
],
|
||
"constraints": [
|
||
"不得使用 docker compose down 或 --remove-orphans",
|
||
"不得操作 momo-db 容器生命週期",
|
||
"P1/P2 行動需保留 audit trail 與 Telegram 稽核通知",
|
||
],
|
||
}
|
||
|
||
if trigger.temp_error_msg:
|
||
context["error_context"] = trigger.temp_error_msg[:4000]
|
||
if trigger.temp_target_file:
|
||
context["target_file"] = trigger.temp_target_file
|
||
return context
|
||
|
||
@staticmethod
|
||
def _safe_metric(fn, default):
|
||
try:
|
||
return fn()
|
||
except Exception:
|
||
return default
|
||
|
||
async def _execute_autonomous_decision(self, trigger: AutonomousTrigger) -> None:
|
||
context = await self._build_trigger_context(trigger)
|
||
try:
|
||
decision = await self._run_with_timeout(
|
||
elephant_orchestrator.analyze_and_coordinate,
|
||
context,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
except Exception as e:
|
||
self._log.exception("Orchestrator analysis failed: %s", e)
|
||
self._handle_failure(trigger, "analysis_failure")
|
||
return
|
||
|
||
if decision.confidence >= (0.85 if trigger.trigger_type in {"price_drop_alert", "market_opportunity"} else self.confidence_threshold):
|
||
try:
|
||
await self._run_with_timeout(
|
||
self._execute_decision,
|
||
decision,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
self._store_escalation(trigger.trigger_type)
|
||
self._log.info("Autonomous decision executed: %s", trigger.trigger_type)
|
||
await self._notify_telegram_executed(decision, trigger)
|
||
self._circuit_reset()
|
||
except Exception as e:
|
||
self._log.exception("Decision execution failed: %s", e)
|
||
self._handle_failure(trigger, "execution_failure")
|
||
else:
|
||
self._log.warning("Low confidence decision; escalating: %s", trigger.trigger_type)
|
||
await self._escalate_to_human(decision, trigger)
|
||
|
||
async def _execute_decision(self, decision: StrategicDecision) -> None:
|
||
for step in decision.execution_plan:
|
||
try:
|
||
await self._execute_step(step)
|
||
self._log.info("Step done: %s -> %s", step.get("agent"), step.get("action"))
|
||
except Exception as e:
|
||
self._log.error("Step failed: %s", e)
|
||
raise
|
||
|
||
async def _execute_step(self, step: Dict[str, Any]) -> None:
|
||
agent_type = step.get("agent", "").lower()
|
||
action = step.get("action", "")
|
||
params = step.get("parameters") or step.get("params") or {}
|
||
|
||
if agent_type == "hermes" and action == "analyze_price_competition":
|
||
return await self._run_with_timeout(
|
||
self._hermes_analyze,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
|
||
if agent_type == "nemotron" and action == "dispatch_alert":
|
||
raw = params.get("threats", [])
|
||
threats = [self._parse_threat(t) for t in raw if isinstance(t, dict)]
|
||
if threats:
|
||
return await self._run_with_timeout(
|
||
self._dispatch_alerts,
|
||
threats,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
return
|
||
|
||
# NOTE: openclaw weekly_strategy / meta_analysis / strategic_analysis /
|
||
# market_analysis / pricing_strategy dispatch removed — weekly_strategy 由
|
||
# run_scheduler.py 週一 06:00 統一發送(防 EA 6h 重複觸發 35+/週)。
|
||
# 若 orchestrator Gemini 仍回傳這些 action,會落到下方 raise ValueError,
|
||
# 由 _execute_decision 的 try/except 捕捉並計入 circuit breaker。
|
||
|
||
if action in {"auto_heal", "resource_optimization", "optimize_resources", "handle_exception"}:
|
||
return await self._run_with_timeout(
|
||
self._run_auto_heal,
|
||
"scheduler_task_failure",
|
||
params,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
|
||
if action in {"code_fix", "fix_code_exception", "handle_code_exception"}:
|
||
return await self._run_with_timeout(
|
||
self._run_auto_heal,
|
||
"python_exception",
|
||
params,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
|
||
if action in _PRICE_ADJUSTMENT_REVIEW_ACTIONS:
|
||
return await self._run_with_timeout(
|
||
self._record_price_adjustment_review,
|
||
step,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
|
||
raise ValueError(f"Unrecognized step: agent={agent_type} action={action}")
|
||
|
||
# ---- Sub-services ----
|
||
async def _hermes_analyze(self) -> Any:
|
||
from services.hermes_analyst_service import HermesAnalystService
|
||
return await self._run_with_timeout(HermesAnalystService().run, timeout=SSH_COMMAND_TIMEOUT)
|
||
|
||
async def _dispatch_alerts(self, threats: List[Any]) -> Any:
|
||
from services.nemoton_dispatcher_service import NemotronDispatcher
|
||
return await self._run_with_timeout(
|
||
NemotronDispatcher().dispatch,
|
||
threats,
|
||
timeout=SSH_COMMAND_TIMEOUT,
|
||
)
|
||
|
||
async def _generate_strategy_report(self) -> Any:
|
||
# 深層保險:即便未來新增 caller,此 method 仍立即崩,避免 EA 再度繞過 dedupe
|
||
# 重複發送 weekly_strategy。weekly_strategy 路徑唯一擁有者:run_scheduler.py 週一 06:00。
|
||
raise RuntimeError(
|
||
"EA autonomous engine no longer generates weekly/meta reports — "
|
||
"owned by run_scheduler.py"
|
||
)
|
||
|
||
async def _generate_meta_report(self) -> Any:
|
||
# 同 _generate_strategy_report:meta_analysis 不再由 EA 觸發。
|
||
raise RuntimeError(
|
||
"EA autonomous engine no longer generates weekly/meta reports — "
|
||
"owned by run_scheduler.py"
|
||
)
|
||
|
||
def _run_auto_heal(self, error_type: str, context: Dict[str, Any]) -> Any:
|
||
from services.auto_heal_service import auto_heal_service
|
||
payload = dict(context or {})
|
||
payload.setdefault("source", "ElephantAlphaAutonomousEngine")
|
||
payload.setdefault("error_type", error_type)
|
||
return auto_heal_service.handle_exception(error_type=error_type, context=payload)
|
||
|
||
def _record_price_adjustment_review(self, step: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
Price changes are business-critical. Elephant Alpha may recommend them,
|
||
but this system records the proposal for HITL review instead of applying it.
|
||
"""
|
||
params = step.get("parameters") or step.get("params") or {}
|
||
sku = (
|
||
params.get("sku")
|
||
or params.get("product_sku")
|
||
or params.get("i_code")
|
||
or params.get("item_id")
|
||
or "unknown"
|
||
)
|
||
action = step.get("action", "price_adjustment")
|
||
content = (
|
||
f"[Elephant Alpha 價格調整覆核] AI 建議執行 {action},"
|
||
f"商品 {sku} 已攔截直接執行並轉入人工審核。"
|
||
)
|
||
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(
|
||
text("""
|
||
INSERT INTO ai_insights
|
||
(insight_type, content, confidence, created_by, status, metadata_json)
|
||
VALUES (:type, :content, :confidence, :created_by, :status, :metadata)
|
||
RETURNING id
|
||
"""),
|
||
{
|
||
"type": "human_review",
|
||
"content": content,
|
||
"confidence": 0.8,
|
||
"created_by": "elephant_alpha",
|
||
"status": "pending",
|
||
"metadata": json.dumps({
|
||
"source": "price_adjustment_review",
|
||
"step": step,
|
||
"sku": sku,
|
||
"reason": "price_adjustment_requires_human_approval",
|
||
}, ensure_ascii=False),
|
||
},
|
||
).fetchone()
|
||
session.commit()
|
||
insight_id = row[0] if row else None
|
||
if insight_id:
|
||
try:
|
||
from services.openclaw_learning_service import enqueue_insight_embedding
|
||
enqueue_insight_embedding(insight_id, "human_review", content)
|
||
except Exception as embed_err:
|
||
self._log.warning("Embedding enqueue failed for price adjustment review: %s", embed_err)
|
||
self._log.warning("Price adjustment intercepted for HITL review: action=%s sku=%s", action, sku)
|
||
return {"status": "pending_review", "insight_id": insight_id, "sku": sku, "action": action}
|
||
except Exception:
|
||
session.rollback()
|
||
raise
|
||
finally:
|
||
session.close()
|
||
|
||
# ---- Notification ----
|
||
async def _notify_telegram_executed(
|
||
self,
|
||
decision: StrategicDecision,
|
||
trigger: AutonomousTrigger,
|
||
) -> None:
|
||
try:
|
||
from services.telegram_templates import _send_telegram_raw
|
||
|
||
trigger_zh = _zh_trigger(trigger.trigger_type)
|
||
steps = [_zh_step(s) for s in decision.execution_plan[:5]] or ["(無執行步驟)"]
|
||
steps_text = "\n".join(f" • {s}" for s in steps)
|
||
|
||
# reasoning 必須含數據;若只是空泛摘要則標記為「待補充」
|
||
reasoning = (decision.reasoning or "").strip()
|
||
if len(reasoning) < 30:
|
||
reasoning = "(AI 推理未提供足夠細節)"
|
||
|
||
msg = (
|
||
f"<b>⚡ 🐘 Elephant Alpha · 自主執行 · {trigger.trigger_type}</b>\n"
|
||
f"📌 <b>{trigger_zh}</b>\n\n"
|
||
f"🔍 <b>預期效益:</b>{(decision.expected_outcome or '').strip()}\n\n"
|
||
f"🧠 <b>決策依據:</b>{reasoning[:400]}\n\n"
|
||
f"✅ <b>已執行(信心 {decision.confidence:.0%}):</b>\n{steps_text}"
|
||
)
|
||
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10)
|
||
self._log.info("Telegram audit sent: %s", trigger.trigger_type)
|
||
except Exception as e:
|
||
self._log.error("Telegram audit failed (non-blocking): %s", e)
|
||
|
||
async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger) -> None:
|
||
self._log.warning("Escalating to human: %s", trigger.trigger_type)
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(
|
||
text("""
|
||
INSERT INTO ai_insights
|
||
(insight_type, content, confidence, created_by, status, metadata_json)
|
||
VALUES (:type, :content, :conf, :by, :status, :meta)
|
||
RETURNING id
|
||
"""),
|
||
{
|
||
"type": "human_review",
|
||
"content": (
|
||
f"[Elephant Alpha 升級審核] {trigger.trigger_type} "
|
||
f"信心度僅 {decision.confidence:.2f},建議人工介入。"
|
||
),
|
||
"conf": decision.confidence,
|
||
"by": "elephant_alpha",
|
||
"status": "pending",
|
||
"meta": json.dumps({
|
||
"decision": asdict(decision),
|
||
"trigger": trigger.trigger_type,
|
||
"reason": "low_confidence"
|
||
}),
|
||
},
|
||
).fetchone()
|
||
session.commit()
|
||
if row:
|
||
try:
|
||
from services.openclaw_learning_service import enqueue_insight_embedding
|
||
enqueue_insight_embedding(
|
||
row[0],
|
||
"human_review",
|
||
f"[Elephant Alpha 升級審核] {trigger.trigger_type} 信心度僅 {decision.confidence:.2f}",
|
||
)
|
||
except Exception as embed_err:
|
||
self._log.warning("Embedding enqueue failed for human_review: %s", embed_err)
|
||
except Exception as e:
|
||
self._log.error("DB escalation write failed: %s", e)
|
||
session.rollback()
|
||
finally:
|
||
session.close()
|
||
|
||
dedup_ts = self._load_escalation(trigger.trigger_type)
|
||
cooldown_min = self._get_cooldown_min(trigger.trigger_type)
|
||
if not dedup_ts or (datetime.now().timestamp() - dedup_ts) / 60 >= cooldown_min:
|
||
self._store_escalation(trigger.trigger_type)
|
||
try:
|
||
from services.telegram_templates import triaged_alert, _send_telegram_raw
|
||
msg, keyboard = triaged_alert(
|
||
base_event={
|
||
"event_type": "ea_escalation",
|
||
"title": f"🐘 EA 升級審核 · {_zh_trigger(trigger.trigger_type)}",
|
||
"summary": (
|
||
f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准"
|
||
),
|
||
"id": f"ea_review_{int(datetime.now().timestamp())}",
|
||
},
|
||
tier_label="🐘 Elephant Alpha · L3 HITL",
|
||
ai_summary=(decision.reasoning or "")[:300],
|
||
ai_cause=(
|
||
f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
|
||
f"信心度:{decision.confidence:.2f} | "
|
||
f"參與模組:{', '.join(_AGENT_LABEL.get(a.lower(), a) for a in decision.agents_required)}"
|
||
),
|
||
ai_actions=[
|
||
f"步驟 {s.get('step', i+1)}:{_zh_step(s)}"
|
||
for i, s in enumerate(decision.execution_plan[:3])
|
||
] or ["無具體執行計畫"],
|
||
)
|
||
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, reply_markup=keyboard)
|
||
self._log.info("Human escalation Telegram sent: %s", trigger.trigger_type)
|
||
except Exception as e:
|
||
self._log.error("Telegram escalation failed (non-blocking): %s", e)
|
||
|
||
# ---- Resource Optimization ----
|
||
def _ssh_exec(self, host: str, user: str, cmd: Any, timeout: int = 120) -> tuple:
|
||
"""Execute command via SSH jump host to target host."""
|
||
import subprocess
|
||
remote_cmd = [str(part) for part in cmd] if isinstance(cmd, list) else [str(cmd)]
|
||
full_cmd = [
|
||
"ssh",
|
||
"-p", str(SSH_PORT),
|
||
"-i", SSH_KEY_PATH,
|
||
"-o", "StrictHostKeyChecking=no",
|
||
"-o", "ConnectTimeout=10",
|
||
"-J", f"{SSH_JUMP_USER}@{SSH_JUMP_HOST}",
|
||
f"{user}@{host}",
|
||
"--",
|
||
*remote_cmd,
|
||
]
|
||
try:
|
||
res = subprocess.run(
|
||
full_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout
|
||
)
|
||
return res.returncode, res.stdout, res.stderr
|
||
except Exception as e:
|
||
return -1, "", str(e)
|
||
|
||
def _is_circuit_open(self) -> bool:
|
||
cb = self._circuit_breaker_state
|
||
if cb["failures"] >= self._cb_threshold:
|
||
if not cb["last_failure"]:
|
||
cb["last_failure"] = datetime.now()
|
||
if datetime.now() - cb["last_failure"] > self._cb_reset_after:
|
||
cb["failures"] = 0
|
||
cb["last_failure"] = None
|
||
return False
|
||
return True
|
||
return False
|
||
|
||
def _circuit_reset(self) -> None:
|
||
self._circuit_breaker_state["failures"] = 0
|
||
self._circuit_breaker_state["last_failure"] = None
|
||
|
||
def _handle_failure(self, trigger: AutonomousTrigger, reason: str) -> None:
|
||
self._circuit_breaker_state["failures"] += 1
|
||
self._circuit_breaker_state["last_failure"] = datetime.now()
|
||
self._log.warning("Failure reason=%s trigger=%s", reason, trigger.trigger_type)
|
||
self._store_escalation(trigger.trigger_type)
|
||
|
||
def _get_action_queue_size(self) -> int:
|
||
session = get_session()
|
||
try:
|
||
row = session.execute(
|
||
text("SELECT COUNT(*) AS count FROM action_plans WHERE status = 'pending'")
|
||
).fetchone()
|
||
return row.count if row else 0
|
||
finally:
|
||
session.close()
|
||
|
||
def _get_system_load_percentage(self) -> float:
|
||
try:
|
||
import psutil
|
||
return float(psutil.cpu_percent(interval=0.1))
|
||
except ImportError:
|
||
return min(90.0, float(self._get_action_queue_size() * 5.0))
|
||
|
||
@staticmethod
|
||
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):
|
||
try:
|
||
if inspect.iscoroutinefunction(coro):
|
||
awaitable = coro(*args, **kwargs)
|
||
else:
|
||
async def _call_sync():
|
||
result = await asyncio.to_thread(coro, *args, **kwargs)
|
||
if inspect.isawaitable(result):
|
||
return await result
|
||
return result
|
||
awaitable = _call_sync()
|
||
return await asyncio.wait_for(awaitable, timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
raise TimeoutError(f"Operation timed out after {timeout}s")
|
||
|
||
@staticmethod
|
||
def _parse_threat(raw: Any) -> Dict[str, Any]:
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
return {"sku": "", "name": "", "confidence": 0.5}
|
||
|
||
# ---- Internal workers ----
|
||
async def _continuous_learning(self) -> None:
|
||
recent = [
|
||
o for o in self.decision_history
|
||
if o.timestamp >= datetime.now() - timedelta(hours=24)
|
||
]
|
||
if len(recent) >= 5:
|
||
acc = [o.accuracy_score for o in recent if o.accuracy_score > 0]
|
||
if acc:
|
||
avg = sum(acc) / len(acc)
|
||
if avg > 0.8:
|
||
self.confidence_threshold = min(0.9, self.confidence_threshold + 0.05)
|
||
elif avg < 0.6:
|
||
self.confidence_threshold = max(0.5, self.confidence_threshold - 0.05)
|
||
self._log.info(
|
||
"Learning: accuracy=%.2f threshold=%.2f",
|
||
avg,
|
||
self.confidence_threshold,
|
||
)
|
||
|
||
async def _optimize_resources(self) -> None:
|
||
q = self._get_action_queue_size()
|
||
load = self._get_system_load_percentage()
|
||
if load > 90:
|
||
self.max_autonomous_decisions_per_hour = 5
|
||
elif load < 50 and q < 5:
|
||
self.max_autonomous_decisions_per_hour = 15
|
||
|
||
# Singleton instance
|
||
autonomous_engine = ElephantAlphaAutonomousEngine()
|
||
|
||
__all__ = [
|
||
"ElephantAlphaAutonomousEngine",
|
||
"autonomous_engine",
|
||
"AutonomousTrigger",
|
||
"DecisionType",
|
||
"DecisionOutcome",
|
||
]
|