Files
ewoooc/services/elephant_alpha_autonomous_engine.py
OoO 00591c5489 feat(ea-hitl): ADR-021 EA 升級審核 pre-fetch + 競價告警金額影響量化
根治 2026-05-02 統帥反映的三層 EA escalation 訊息空泛問題:

1. _escalate_to_human 對 price_drop_alert / market_opportunity /
   threat_escalation 三類觸發,送 Telegram 前先 await Hermes 取具體
   SKU 清單覆蓋 plan 元流程文字(5s 短超時,失敗 fallback 原 plan)
2. NemoTron 競價告警新增 _compute_business_impact helper:
   過去 7 日營收流失(gap_pct>0 才算)+ 跟進競品建議價,
   dispatch 主路徑 / 防線二 / Hermes rule fallback 三條全部 Python
   獨裁注入,告警含「📉 NT$ X」「🎯 NT$ Y」具體金額
3. 補實 telegram_bot_service.handle_callback 的 momo:eig: prefix
   handler,HITL「🛑 忽略此事件」按鈕首次有對應 audit 寫入

Critic 審查通過(5 項必修全綠):
- Critical-1: user_label HTML escape 防 Telegram username XSS
- High-1: pre-fetch 改 asyncio.wait_for(5s) 防阻塞 escalation
- High-2: 全部行缺金額時 return None 觸發 plan fallback
- Medium-2: 空 event_id callback 拒絕避免 audit 污染
- Medium-3: gap_pct≤0 時 revenue_loss_7d 強制歸 0 不誤導降價

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 00:03:38 +08:00

1046 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
elephant_alpha_autonomous_engine.py
AI 3.0 Autonomous Operations:
- Self-learning from outcomes
- Predictive decision making
- Autonomous resource optimization
- Continuous improvement loop
ADR-012 Compliance:
§③ 單一 audit trail — 所有基行完畢後必發 triaged_alert Telegram
§⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram
ADR-013 Compliance:
resource_optimization trigger → auto_heal_service.handle_exception
"""
import asyncio
import inspect
import json
import logging
import os
import re
import sqlite3
import threading
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Any, Optional
from sqlalchemy import text
from services.logger_manager import SystemLogger
from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision
from database.manager import get_session
logger = SystemLogger("ElephantAlphaEngine").get_logger()
# ---- Configuration ----
SSH_JUMP_HOST = os.getenv("ELEPHANT_ALPHA_JUMP_HOST", "192.168.0.110")
SSH_JUMP_USER = os.getenv("ELEPHANT_ALPHA_JUMP_USER", "wooo")
SSH_KEY_PATH = os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH", os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519"))
SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22"))
SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10"))
SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60"))
CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
CONFIDENCE_THRESHOLD = float(os.getenv("ELEPHANT_ALPHA_CONFIDENCE_THRESHOLD", "0.7"))
MAX_AUTONOMOUS_DECISIONS_PER_HOUR = int(os.getenv("ELEPHANT_ALPHA_MAX_AUTONOMOUS_DECISIONS_PER_HOUR", "10"))
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({
"DOCKER_RESTART",
"WAIT_RETRY",
"ALERT_ONLY",
"SSH_CMD",
"CODE_FIX",
})
_TRIGGER_TO_DECISION_TYPE = {}
class DecisionType(Enum):
PRICE_OPTIMIZATION = "price_optimization"
THREAT_RESPONSE = "threat_response"
MARKET_OPPORTUNITY = "market_opportunity"
RESOURCE_ALLOCATION = "resource_allocation"
STRATEGIC_PLANNING = "strategic_planning"
_TRIGGER_TO_DECISION_TYPE = {
"price_drop_alert": DecisionType.PRICE_OPTIMIZATION,
"market_opportunity": DecisionType.MARKET_OPPORTUNITY,
"threat_escalation": DecisionType.THREAT_RESPONSE,
"resource_optimization": DecisionType.RESOURCE_ALLOCATION,
"code_exception": DecisionType.RESOURCE_ALLOCATION, # mapped for handling
}
_TRIGGER_ZH = {
"price_drop_alert": "價格下滑警報",
"market_opportunity": "市場機會偵測",
"threat_escalation": "威脅升級通報",
"resource_optimization": "資源調配優化",
"code_exception": "程式碼異常偵測",
"weekly_insight": "全景電商洞察分析",
}
# Agent 名稱保留英文,僅補上角色說明(禁止音譯)
_AGENT_LABEL = {
"hermes": "Hermes",
"nemotron": "NemoTron",
"openclaw": "OpenClaw",
"elephant_alpha": "Elephant Alpha",
"scheduler": "Scheduler",
}
_ACTION_ZH = {
"analyze_price_competition": "競品價格分析",
"dispatch_alert": "派送告警通知",
"dispatch_price_updates": "派送定價更新",
"dispatch_price_update": "派送定價更新",
"generate_strategic_analysis": "產出策略分析",
"generate_market_analysis": "市場分析",
"generate_pricing_strategy": "定價策略建議",
"execute_price_adjustment": "價格調整覆核",
"adjust_price": "調整定價",
"send_alert": "發送告警",
}
_PRICE_ADJUSTMENT_REVIEW_ACTIONS = frozenset({
"execute_price_adjustment",
"adjust_price",
"apply_price_change",
"update_price",
"dispatch_price_update",
"dispatch_price_updates",
})
# A' 軌價格相關觸發類型HITL 前需 pre-fetch Hermes 具體威脅清單
# 取代 Gemini plan 階段的元流程文字(「步驟 1:[OpenClaw] 生成策略」這類)
_PRICE_RELATED_TRIGGERS = frozenset({
"price_drop_alert",
"market_opportunity",
"threat_escalation",
})
def _zh_trigger(trigger_type: str) -> str:
return _TRIGGER_ZH.get(trigger_type, trigger_type)
def _zh_step(step: dict) -> str:
agent_key = step.get("agent", "?").lower()
agent = _AGENT_LABEL.get(agent_key, step.get("agent", "?"))
action = _ACTION_ZH.get(step.get("action", ""), step.get("action", ""))
desc = step.get("description", "")
# 優先用 descriptionGemini 生成的繁中說明),其次用 action 中文對照
detail = desc if desc else action
return f"[{agent}] {detail}"
@dataclass
class DecisionOutcome:
decision_id: str
decision_type: DecisionType
prediction: Dict[str, Any]
actual_outcome: Dict[str, Any]
accuracy_score: float
business_impact: float
timestamp: datetime
lessons_learned: List[str]
class AutonomousTrigger:
def __init__(
self,
trigger_type: str,
conditions: Dict[str, Any],
threshold: float,
enabled: bool,
):
self.trigger_type = trigger_type
self.conditions = conditions
self.threshold = threshold
self.enabled = enabled
self.last_triggered: Optional[datetime] = None
self._temp_error_msg: Optional[str] = None
self._temp_target_file: Optional[str] = None
@property
def temp_error_msg(self) -> Optional[str]:
return self._temp_error_msg
@temp_error_msg.setter
def temp_error_msg(self, value: Optional[str]) -> None:
self._temp_error_msg = value
@property
def temp_target_file(self) -> Optional[str]:
return self._temp_target_file
@temp_target_file.setter
def temp_target_file(self, value: Optional[str]) -> None:
self._temp_target_file = value
class ElephantAlphaAutonomousEngine:
"""
Elephant Alpha Autonomous Decision Engine
Features:
- Persistent escalation dedup via SQLite
- Secure SSH key handling with enforced file permissions
- Timeouts and retries for external calls
- Circuit-breaker for repeated failures
"""
def __init__(self):
self._log = logger
self._init_cache_db()
self._init_ssh_key_once()
self.decision_history: List[DecisionOutcome] = []
self.triggers: List[AutonomousTrigger] = []
self.confidence_threshold = CONFIDENCE_THRESHOLD
self.max_autonomous_decisions_per_hour = MAX_AUTONOMOUS_DECISIONS_PER_HOUR
self._initialize_triggers()
self._circuit_breaker_state = {"failures": 0, "last_failure": None}
self._cb_threshold = 5
self._cb_reset_after = timedelta(minutes=5)
# ---- DB ----
def _init_cache_db(self) -> None:
self._db_lock = threading.Lock()
self._conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False)
self._conn.execute("""
CREATE TABLE IF NOT EXISTS escalation_dedup (
trigger_type TEXT PRIMARY KEY,
last_triggered INTEGER NOT NULL
)
""")
self._conn.commit()
def _store_escalation(self, trigger_type: str) -> None:
with self._db_lock:
self._conn.execute(
"INSERT OR REPLACE INTO escalation_dedup (trigger_type, last_triggered) VALUES (?, ?)",
(trigger_type, int(datetime.now().timestamp())),
)
self._conn.commit()
def _load_escalation(self, trigger_type: str) -> Optional[int]:
with self._db_lock:
row = self._conn.execute(
"SELECT last_triggered FROM escalation_dedup WHERE trigger_type = ?",
(trigger_type,),
).fetchone()
return row[0] if row else None
# ---- SSH ----
def _init_ssh_key_once(self) -> None:
key_path = os.path.expanduser(SSH_KEY_PATH)
if not os.path.exists(key_path):
self._log.warning("SSH key not found at %s; some operations may fail.", key_path)
return
try:
os.chmod(key_path, 0o600)
except Exception as e:
self._log.warning("Failed to secure SSH key permissions: %s", e)
# ---- Triggers ----
def _initialize_triggers(self) -> None:
self.triggers = [
AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"competitor_price_drop_pct": 15, "sales_velocity": "decreasing"},
threshold=0.8,
enabled=True,
),
AutonomousTrigger(
trigger_type="market_opportunity",
conditions={"competitor_price_premium": ">5%", "our_stock": "available"},
threshold=0.7,
enabled=True,
),
AutonomousTrigger(
trigger_type="threat_escalation",
conditions={"threat_score": 0.9, "trend": "worsening"},
threshold=0.85,
enabled=True,
),
AutonomousTrigger(
trigger_type="resource_optimization",
conditions={"system_load": "high", "queue_size": ">10"},
threshold=0.6,
enabled=True,
),
AutonomousTrigger(
trigger_type="code_exception",
conditions={
"scan_containers": ["momo-pro-system", "momo-scheduler"],
"error_patterns": ["Traceback", "ImportError", "RuntimeError", "ModuleNotFoundError"],
},
threshold=1.0,
enabled=True,
),
AutonomousTrigger(
trigger_type="weekly_insight",
conditions={"min_new_insights": 5, "cooldown_hours": 6},
threshold=0.7,
# weekly_strategy 由 run_scheduler.py 週一 06:00 統一發送EA 不再 6h 觸發(防 35+/週重複)
enabled=False,
),
]
# ---- Main loop ----
async def start_autonomous_monitoring(self) -> None:
self._log.info("Starting autonomous monitoring engine")
while True:
try:
await self._check_triggers()
await self._continuous_learning()
await self._optimize_resources()
await asyncio.sleep(60)
except asyncio.CancelledError:
raise
except Exception as e:
self._log.exception("Autonomous monitoring error: %s", e)
await asyncio.sleep(30)
# ---- Trigger evaluation ----
async def _check_triggers(self) -> None:
if self._is_circuit_open():
self._log.warning("Circuit breaker open; skipping trigger checks")
return
for trigger in self.triggers:
if not trigger.enabled:
continue
cooldown_min = self._get_cooldown(trigger.trigger_type)
last = trigger.last_triggered
if last and (datetime.now() - last).total_seconds() / 60 < cooldown_min:
continue
if await self._evaluate_trigger(trigger):
await self._execute_autonomous_decision(trigger)
trigger.last_triggered = datetime.now()
def _get_cooldown(self, trigger_type: str) -> int:
return self._get_cooldown_min(trigger_type)
def _get_cooldown_min(self, trigger_type: str) -> int:
return ESCALATION_COOLDOWN_MIN
async def _evaluate_trigger(self, trigger: AutonomousTrigger) -> bool:
try:
if trigger.trigger_type == "price_drop_alert":
return await self._check_price_drop_trigger(trigger)
if trigger.trigger_type == "market_opportunity":
return await self._check_market_opportunity_trigger(trigger)
if trigger.trigger_type == "threat_escalation":
return await self._check_threat_escalation_trigger(trigger)
if trigger.trigger_type == "resource_optimization":
return await self._check_resource_optimization_trigger(trigger)
if trigger.trigger_type == "code_exception":
return await self._check_code_exception_trigger(trigger)
if trigger.trigger_type == "weekly_insight":
return await self._check_weekly_insight_trigger(trigger)
except Exception as e:
self._log.exception("Trigger evaluation error: %s", e)
return False
# ---- Individual trigger checkers ----
async def _check_price_drop_trigger(self, trigger: AutonomousTrigger) -> bool:
session = get_session()
try:
rows = session.execute(
text("""
SELECT p.i_code AS sku, p.name, p.category,
cp.price AS competitor_price, pr.price AS momo_price,
((pr.price - cp.price) / pr.price * 100) AS price_gap_pct
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records
ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
JOIN competitor_prices cp ON cp.sku = p.i_code
WHERE cp.expires_at > NOW()
AND cp.price < pr.price * 0.85
AND cp.crawled_at >= NOW() - INTERVAL '2 hours'
LIMIT 10
""")
).fetchall()
return len(rows) >= 3
finally:
session.close()
async def _check_market_opportunity_trigger(self, trigger: AutonomousTrigger) -> bool:
session = get_session()
try:
rows = session.execute(
text("""
SELECT p.i_code AS sku
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records
ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
JOIN competitor_prices cp ON cp.sku = p.i_code
WHERE cp.expires_at > NOW()
AND cp.price > pr.price * 1.05
AND cp.crawled_at >= NOW() - INTERVAL '1 hour'
LIMIT 5
""")
).fetchall()
return bool(rows)
finally:
session.close()
async def _check_threat_escalation_trigger(self, trigger: AutonomousTrigger) -> bool:
session = get_session()
try:
rows = session.execute(
text("""
SELECT product_sku AS sku, confidence, content, created_at
FROM ai_insights
WHERE insight_type = 'price_alert'
AND confidence >= 0.9
AND created_at >= NOW() - INTERVAL '30 minutes'
AND metadata_json LIKE '%worsening%'
LIMIT 5
""")
).fetchall()
return len(rows) >= 2
finally:
session.close()
async def _check_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> bool:
return (self._get_action_queue_size() > 10
or self._get_system_load_percentage() > 80)
async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool:
containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"])
error_ptns = trigger.conditions.get("error_patterns", ["Traceback", "ImportError", "RuntimeError", "ModuleNotFoundError"])
key_path = os.path.expanduser(SSH_KEY_PATH)
if not os.path.exists(key_path):
self._log.warning("SSH key missing for code exception scan: %s", key_path)
return False
has_error = False
error_context = []
target_file = ""
for c in containers:
try:
rc, out, err = self._ssh_exec(
"192.168.0.188",
"ollama",
["docker", "logs", "--since", "5m", c]
)
if rc != 0:
self._log.debug("Failed to fetch logs for %s via SSH", c)
continue
combined = out + "\n" + err
if "Traceback (most recent call last):" in combined:
lines = combined.splitlines()
for i, line in enumerate(lines):
if "Traceback" in line:
block = lines[i:i + 15]
blk_str = "\n".join(block)
m = re.search(r'File "([^"]*/(services|routes|database)/[^"]+\.py)"', blk_str)
if m:
tf = m.group(1)
if "/app/" in tf:
tf = tf.split("/app/")[1]
elif "momo-pro-system/" in tf:
tf = tf.split("momo-pro-system/")[1]
target_file = tf
error_context.append(f"[{c}] {blk_str}")
has_error = True
break
except Exception as e:
self._log.debug("Error scanning container %s: %s", c, e)
if has_error and error_context:
trigger.temp_error_msg = "\n".join(error_context)
trigger.temp_target_file = target_file
return True
return False
async def _check_weekly_insight_trigger(self, trigger: AutonomousTrigger) -> bool:
"""
每 6 小時累積 ≥ 5 筆新 ai_insights 時觸發 OpenClaw 全景分析。
cooldown 機制防止重複觸發。
"""
min_new = trigger.conditions.get("min_new_insights", 5)
session = get_session()
try:
row = session.execute(text("""
SELECT COUNT(*) FROM ai_insights
WHERE created_at >= NOW() - INTERVAL '6 hours'
AND insight_type IN ('price_alert', 'recommendation', 'relearn_event')
AND created_by != 'openclaw'
""")).fetchone()
count = int(row[0]) if row else 0
return count >= min_new
except Exception as e:
self._log.debug("weekly_insight trigger check failed: %s", e)
return False
finally:
session.close()
# ---- Decision execution ----
async def _build_trigger_context(self, trigger: AutonomousTrigger) -> Dict[str, Any]:
"""Build the business context passed to Elephant Alpha."""
context = {
"trigger_type": trigger.trigger_type,
"trigger_label": _zh_trigger(trigger.trigger_type),
"conditions": trigger.conditions,
"threshold": trigger.threshold,
"enabled": trigger.enabled,
"last_triggered": trigger.last_triggered.isoformat() if trigger.last_triggered else None,
"generated_at": datetime.now().isoformat(),
"system_state": {
"action_queue_size": self._safe_metric(self._get_action_queue_size, default=0),
"system_load_pct": self._safe_metric(self._get_system_load_percentage, default=0.0),
"circuit_breaker_failures": self._circuit_breaker_state.get("failures", 0),
"confidence_threshold": self.confidence_threshold,
"max_autonomous_decisions_per_hour": self.max_autonomous_decisions_per_hour,
},
"objectives": [
"維持 MOMO 商品監控與業績分析服務可用性",
"降低重複告警與人工排查成本",
"在低風險範圍內產生可稽核的自動化行動建議",
],
"constraints": [
"不得使用 docker compose down 或 --remove-orphans",
"不得操作 momo-db 容器生命週期",
"P1/P2 行動需保留 audit trail 與 Telegram 稽核通知",
],
}
if trigger.temp_error_msg:
context["error_context"] = trigger.temp_error_msg[:4000]
if trigger.temp_target_file:
context["target_file"] = trigger.temp_target_file
return context
@staticmethod
def _safe_metric(fn, default):
try:
return fn()
except Exception:
return default
async def _execute_autonomous_decision(self, trigger: AutonomousTrigger) -> None:
context = await self._build_trigger_context(trigger)
try:
decision = await self._run_with_timeout(
elephant_orchestrator.analyze_and_coordinate,
context,
timeout=SSH_COMMAND_TIMEOUT,
)
except Exception as e:
self._log.exception("Orchestrator analysis failed: %s", e)
self._handle_failure(trigger, "analysis_failure")
return
if decision.confidence >= (0.85 if trigger.trigger_type in {"price_drop_alert", "market_opportunity"} else self.confidence_threshold):
try:
await self._run_with_timeout(
self._execute_decision,
decision,
timeout=SSH_COMMAND_TIMEOUT,
)
self._store_escalation(trigger.trigger_type)
self._log.info("Autonomous decision executed: %s", trigger.trigger_type)
await self._notify_telegram_executed(decision, trigger)
self._circuit_reset()
except Exception as e:
self._log.exception("Decision execution failed: %s", e)
self._handle_failure(trigger, "execution_failure")
else:
self._log.warning("Low confidence decision; escalating: %s", trigger.trigger_type)
await self._escalate_to_human(decision, trigger)
async def _execute_decision(self, decision: StrategicDecision) -> None:
for step in decision.execution_plan:
try:
await self._execute_step(step)
self._log.info("Step done: %s -> %s", step.get("agent"), step.get("action"))
except Exception as e:
self._log.error("Step failed: %s", e)
raise
async def _execute_step(self, step: Dict[str, Any]) -> None:
agent_type = step.get("agent", "").lower()
action = step.get("action", "")
params = step.get("parameters") or step.get("params") or {}
if agent_type == "hermes" and action == "analyze_price_competition":
return await self._run_with_timeout(
self._hermes_analyze,
timeout=SSH_COMMAND_TIMEOUT,
)
if agent_type == "nemotron" and action == "dispatch_alert":
raw = params.get("threats", [])
threats = [self._parse_threat(t) for t in raw if isinstance(t, dict)]
if threats:
return await self._run_with_timeout(
self._dispatch_alerts,
threats,
timeout=SSH_COMMAND_TIMEOUT,
)
return
# NOTE: openclaw weekly_strategy / meta_analysis / strategic_analysis /
# market_analysis / pricing_strategy dispatch removed — weekly_strategy 由
# run_scheduler.py 週一 06:00 統一發送(防 EA 6h 重複觸發 35+/週)。
# 若 orchestrator Gemini 仍回傳這些 action會落到下方 raise ValueError
# 由 _execute_decision 的 try/except 捕捉並計入 circuit breaker。
if action in {"auto_heal", "resource_optimization", "optimize_resources", "handle_exception"}:
return await self._run_with_timeout(
self._run_auto_heal,
"scheduler_task_failure",
params,
timeout=SSH_COMMAND_TIMEOUT,
)
if action in {"code_fix", "fix_code_exception", "handle_code_exception"}:
return await self._run_with_timeout(
self._run_auto_heal,
"python_exception",
params,
timeout=SSH_COMMAND_TIMEOUT,
)
if action in _PRICE_ADJUSTMENT_REVIEW_ACTIONS:
return await self._run_with_timeout(
self._record_price_adjustment_review,
step,
timeout=SSH_COMMAND_TIMEOUT,
)
raise ValueError(f"Unrecognized step: agent={agent_type} action={action}")
# ---- Sub-services ----
async def _hermes_analyze(self) -> Any:
from services.hermes_analyst_service import HermesAnalystService
return await self._run_with_timeout(HermesAnalystService().run, timeout=SSH_COMMAND_TIMEOUT)
async def _fetch_hermes_threats_summary(self, top_n: int = 5) -> Optional[List[str]]:
"""A'HITL escalation 前 pre-fetch Hermes 具體威脅清單,
將「步驟 1: [OpenClaw] 生成策略」這類元流程文字換成
「[SKU] 商品MOMO $X / PChome $Y流失 NT$ Z建議 NT$ W」具體可決策行動。
失敗回 None由呼叫端 fallback 至既有 execution_plan 文字。
本方法為 best-effort任何例外都不阻斷 escalation 主流程。
Critic High-1 fix: 加 5 秒短超時防止阻塞 escalation cooldown 視窗
Hermes 完整 run 可能 30-60sHITL 訊息應快速送出)
Critic High-2 fix: 若每筆都缺 loss/rec_price視同無料、return None 觸發 fallback
"""
# 使用 5s 短超時Hermes 熱駐留時實測 < 10s但若需冷啟動會拖到 30s+
# HITL 訊息延遲不可大於 10s影響統帥決策時效性寧可 fallback 到原 plan 文字
try:
result = await asyncio.wait_for(self._hermes_analyze(), timeout=5)
except asyncio.TimeoutError:
self._log.warning("Pre-fetch Hermes 5s timeout; falling back to plan text")
return None
except Exception as e:
self._log.warning("Pre-fetch Hermes threats failed (non-blocking): %s", e)
return None
threats = getattr(result, "threats", None) or []
if not threats:
self._log.info("Pre-fetch Hermes returned 0 threats; falling back to plan text")
return None
# 模組頂部 import 較乾淨,但這裡保留 lazy import 避免兩服務循環依賴
# nemoton 也需 hermes_analyst 的 PriceThreat dataclass
try:
from services.nemoton_dispatcher_service import _compute_business_impact
except Exception as imp_err:
self._log.error("import _compute_business_impact failed: %s", imp_err)
_compute_business_impact = None
lines: List[str] = []
any_concrete = False # Critic High-2: 至少一筆有金額才算具體
for t in threats[:top_n]:
sku = getattr(t, "sku", "?")
name = getattr(t, "name", "")[:24]
momo = float(getattr(t, "momo_price", 0) or 0)
pchome = float(getattr(t, "pchome_price", 0) or 0)
gap_pct = float(getattr(t, "gap_pct", 0) or 0)
impact = _compute_business_impact(t) if _compute_business_impact else {
"revenue_loss_7d": 0.0, "recommended_price": None,
}
loss = impact.get("revenue_loss_7d", 0.0) or 0.0
rec_price = impact.get("recommended_price")
parts = [
f"[{sku}] {name}",
f"MOMO ${momo:,.0f} vs PChome ${pchome:,.0f} ({gap_pct:+.1f}%)",
]
if loss > 0:
parts.append(f"近 7 日流失 NT$ {loss:,.0f}")
any_concrete = True
if rec_price is not None and rec_price > 0:
parts.append(f"建議跟進 NT$ {rec_price:,.0f}")
any_concrete = True
lines.append("".join(parts))
if not any_concrete:
# Critic High-2: 全部都只有「MOMO $X vs PChome $Y」乾巴巴兩行
# 比原本「步驟 1:OpenClaw 生成策略」更空泛。返回 None 觸發 plan fallback
self._log.info("Pre-fetch threats lacked impact figures on all rows; falling back")
return None
self._log.info("Pre-fetch Hermes threats produced %d concrete actions", len(lines))
return lines
async def _dispatch_alerts(self, threats: List[Any]) -> Any:
from services.nemoton_dispatcher_service import NemotronDispatcher
return await self._run_with_timeout(
NemotronDispatcher().dispatch,
threats,
timeout=SSH_COMMAND_TIMEOUT,
)
async def _generate_strategy_report(self) -> Any:
# 深層保險:即便未來新增 caller此 method 仍立即崩,避免 EA 再度繞過 dedupe
# 重複發送 weekly_strategy。weekly_strategy 路徑唯一擁有者run_scheduler.py 週一 06:00。
raise RuntimeError(
"EA autonomous engine no longer generates weekly/meta reports — "
"owned by run_scheduler.py"
)
async def _generate_meta_report(self) -> Any:
# 同 _generate_strategy_reportmeta_analysis 不再由 EA 觸發。
raise RuntimeError(
"EA autonomous engine no longer generates weekly/meta reports — "
"owned by run_scheduler.py"
)
def _run_auto_heal(self, error_type: str, context: Dict[str, Any]) -> Any:
from services.auto_heal_service import auto_heal_service
payload = dict(context or {})
payload.setdefault("source", "ElephantAlphaAutonomousEngine")
payload.setdefault("error_type", error_type)
return auto_heal_service.handle_exception(error_type=error_type, context=payload)
def _record_price_adjustment_review(self, step: Dict[str, Any]) -> Dict[str, Any]:
"""
Price changes are business-critical. Elephant Alpha may recommend them,
but this system records the proposal for HITL review instead of applying it.
"""
params = step.get("parameters") or step.get("params") or {}
sku = (
params.get("sku")
or params.get("product_sku")
or params.get("i_code")
or params.get("item_id")
or "unknown"
)
action = step.get("action", "price_adjustment")
content = (
f"[Elephant Alpha 價格調整覆核] AI 建議執行 {action}"
f"商品 {sku} 已攔截直接執行並轉入人工審核。"
)
session = get_session()
try:
row = session.execute(
text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES (:type, :content, :confidence, :created_by, :status, :metadata)
RETURNING id
"""),
{
"type": "human_review",
"content": content,
"confidence": 0.8,
"created_by": "elephant_alpha",
"status": "pending",
"metadata": json.dumps({
"source": "price_adjustment_review",
"step": step,
"sku": sku,
"reason": "price_adjustment_requires_human_approval",
}, ensure_ascii=False),
},
).fetchone()
session.commit()
insight_id = row[0] if row else None
if insight_id:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(insight_id, "human_review", content)
except Exception as embed_err:
self._log.warning("Embedding enqueue failed for price adjustment review: %s", embed_err)
self._log.warning("Price adjustment intercepted for HITL review: action=%s sku=%s", action, sku)
return {"status": "pending_review", "insight_id": insight_id, "sku": sku, "action": action}
except Exception:
session.rollback()
raise
finally:
session.close()
# ---- Notification ----
async def _notify_telegram_executed(
self,
decision: StrategicDecision,
trigger: AutonomousTrigger,
) -> None:
try:
from services.telegram_templates import _send_telegram_raw
trigger_zh = _zh_trigger(trigger.trigger_type)
steps = [_zh_step(s) for s in decision.execution_plan[:5]] or ["(無執行步驟)"]
steps_text = "\n".join(f"{s}" for s in steps)
# reasoning 必須含數據;若只是空泛摘要則標記為「待補充」
reasoning = (decision.reasoning or "").strip()
if len(reasoning) < 30:
reasoning = "AI 推理未提供足夠細節)"
msg = (
f"<b>⚡ 🐘 Elephant Alpha · 自主執行 · {trigger.trigger_type}</b>\n"
f"📌 <b>{trigger_zh}</b>\n\n"
f"🔍 <b>預期效益:</b>{(decision.expected_outcome or '').strip()}\n\n"
f"🧠 <b>決策依據:</b>{reasoning[:400]}\n\n"
f"✅ <b>已執行(信心 {decision.confidence:.0%}</b>\n{steps_text}"
)
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10)
self._log.info("Telegram audit sent: %s", trigger.trigger_type)
except Exception as e:
self._log.error("Telegram audit failed (non-blocking): %s", e)
async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger) -> None:
self._log.warning("Escalating to human: %s", trigger.trigger_type)
session = get_session()
try:
row = session.execute(
text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES (:type, :content, :conf, :by, :status, :meta)
RETURNING id
"""),
{
"type": "human_review",
"content": (
f"[Elephant Alpha 升級審核] {trigger.trigger_type} "
f"信心度僅 {decision.confidence:.2f},建議人工介入。"
),
"conf": decision.confidence,
"by": "elephant_alpha",
"status": "pending",
"meta": json.dumps({
"decision": asdict(decision),
"trigger": trigger.trigger_type,
"reason": "low_confidence"
}),
},
).fetchone()
session.commit()
if row:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(
row[0],
"human_review",
f"[Elephant Alpha 升級審核] {trigger.trigger_type} 信心度僅 {decision.confidence:.2f}",
)
except Exception as embed_err:
self._log.warning("Embedding enqueue failed for human_review: %s", embed_err)
except Exception as e:
self._log.error("DB escalation write failed: %s", e)
session.rollback()
finally:
session.close()
dedup_ts = self._load_escalation(trigger.trigger_type)
cooldown_min = self._get_cooldown_min(trigger.trigger_type)
if not dedup_ts or (datetime.now().timestamp() - dedup_ts) / 60 >= cooldown_min:
self._store_escalation(trigger.trigger_type)
# A' 軌:價格類觸發前 pre-fetch Hermes 具體威脅清單,
# 取代「步驟 1:[OpenClaw] 生成策略」這類元流程文字。
# — Claude Opus 4.7 (2026-05-02)
concrete_actions: Optional[List[str]] = None
if trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
try:
concrete_actions = await self._fetch_hermes_threats_summary(top_n=5)
except Exception as e:
self._log.warning("Pre-fetch threats raised (non-blocking): %s", e)
concrete_actions = None
if concrete_actions:
ai_actions_payload = concrete_actions
else:
ai_actions_payload = [
f"步驟 {s.get('step', i+1)}{_zh_step(s)}"
for i, s in enumerate(decision.execution_plan[:3])
] or ["無具體執行計畫"]
try:
from services.telegram_templates import triaged_alert, _send_telegram_raw
msg, keyboard = triaged_alert(
base_event={
"event_type": "ea_escalation",
"title": f"🐘 EA 升級審核 · {_zh_trigger(trigger.trigger_type)}",
"summary": (
f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准"
),
"id": f"ea_review_{int(datetime.now().timestamp())}",
},
tier_label="🐘 Elephant Alpha · L3 HITL",
ai_summary=(decision.reasoning or "")[:300],
ai_cause=(
f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
f"信心度:{decision.confidence:.2f} | "
f"參與模組:{', '.join(_AGENT_LABEL.get(a.lower(), a) for a in decision.agents_required)}"
),
ai_actions=ai_actions_payload,
)
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, reply_markup=keyboard)
self._log.info(
"Human escalation Telegram sent: %s (concrete=%s)",
trigger.trigger_type, bool(concrete_actions),
)
except Exception as e:
self._log.error("Telegram escalation failed (non-blocking): %s", e)
# ---- Resource Optimization ----
def _ssh_exec(self, host: str, user: str, cmd: Any, timeout: int = 120) -> tuple:
"""Execute command via SSH jump host to target host."""
import subprocess
remote_cmd = [str(part) for part in cmd] if isinstance(cmd, list) else [str(cmd)]
full_cmd = [
"ssh",
"-p", str(SSH_PORT),
"-i", SSH_KEY_PATH,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
"-J", f"{SSH_JUMP_USER}@{SSH_JUMP_HOST}",
f"{user}@{host}",
"--",
*remote_cmd,
]
try:
res = subprocess.run(
full_cmd,
capture_output=True,
text=True,
timeout=timeout
)
return res.returncode, res.stdout, res.stderr
except Exception as e:
return -1, "", str(e)
def _is_circuit_open(self) -> bool:
cb = self._circuit_breaker_state
if cb["failures"] >= self._cb_threshold:
if not cb["last_failure"]:
cb["last_failure"] = datetime.now()
if datetime.now() - cb["last_failure"] > self._cb_reset_after:
cb["failures"] = 0
cb["last_failure"] = None
return False
return True
return False
def _circuit_reset(self) -> None:
self._circuit_breaker_state["failures"] = 0
self._circuit_breaker_state["last_failure"] = None
def _handle_failure(self, trigger: AutonomousTrigger, reason: str) -> None:
self._circuit_breaker_state["failures"] += 1
self._circuit_breaker_state["last_failure"] = datetime.now()
self._log.warning("Failure reason=%s trigger=%s", reason, trigger.trigger_type)
self._store_escalation(trigger.trigger_type)
def _get_action_queue_size(self) -> int:
session = get_session()
try:
row = session.execute(
text("SELECT COUNT(*) AS count FROM action_plans WHERE status = 'pending'")
).fetchone()
return row.count if row else 0
finally:
session.close()
def _get_system_load_percentage(self) -> float:
try:
import psutil
return float(psutil.cpu_percent(interval=0.1))
except ImportError:
return min(90.0, float(self._get_action_queue_size() * 5.0))
@staticmethod
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):
try:
if inspect.iscoroutinefunction(coro):
awaitable = coro(*args, **kwargs)
else:
async def _call_sync():
result = await asyncio.to_thread(coro, *args, **kwargs)
if inspect.isawaitable(result):
return await result
return result
awaitable = _call_sync()
return await asyncio.wait_for(awaitable, timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation timed out after {timeout}s")
@staticmethod
def _parse_threat(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return raw
return {"sku": "", "name": "", "confidence": 0.5}
# ---- Internal workers ----
async def _continuous_learning(self) -> None:
recent = [
o for o in self.decision_history
if o.timestamp >= datetime.now() - timedelta(hours=24)
]
if len(recent) >= 5:
acc = [o.accuracy_score for o in recent if o.accuracy_score > 0]
if acc:
avg = sum(acc) / len(acc)
if avg > 0.8:
self.confidence_threshold = min(0.9, self.confidence_threshold + 0.05)
elif avg < 0.6:
self.confidence_threshold = max(0.5, self.confidence_threshold - 0.05)
self._log.info(
"Learning: accuracy=%.2f threshold=%.2f",
avg,
self.confidence_threshold,
)
async def _optimize_resources(self) -> None:
q = self._get_action_queue_size()
load = self._get_system_load_percentage()
if load > 90:
self.max_autonomous_decisions_per_hour = 5
elif load < 50 and q < 5:
self.max_autonomous_decisions_per_hour = 15
# Singleton instance
autonomous_engine = ElephantAlphaAutonomousEngine()
__all__ = [
"ElephantAlphaAutonomousEngine",
"autonomous_engine",
"AutonomousTrigger",
"DecisionType",
"DecisionOutcome",
]