Files
ewoooc/services/elephant_alpha_autonomous_engine.py
ogt ba86f98514
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
feat: integrate Elephant Alpha ecosystem with full ADR-012/013 compliance
- Add ElephantService, AutonomousEngine, Orchestrator, DecisionRouter (EA 4-file stack)
- Fix 10 bugs: URL typo, SQL schema mismatches (price_records JOIN), enum mapping,
  metadata_json, NemoTron PriceThreat dispatch, async/await mismatch, broken imports
- Wire ADR-012 Agent Action Ladder: EventRouter L2 → EA first + AIOrch fallback;
  all decisions dual-write DB + triaged_alert Telegram; momo: callback prefix
- Wire ADR-013 AutoHeal: resource_optimization trigger → AutoHealService
- Add W3 guards: connection cache 300s TTL, $5/hr cost hard limit
- Add W4 persistence: routing decisions + agent performance snapshots → ai_insights
- Add Migration 015: confidence + created_by columns on ai_insights
- Fix run_scheduler.py broken imports (DecisionTracker service didn't exist)
- Fix verify_elephant_integration.py: check_status() → check_connection()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-20 04:28:26 +08:00

653 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Elephant Alpha Autonomous Decision Engine
AI 3.0 Autonomous Operations:
- Self-learning from outcomes
- Predictive decision making
- Autonomous resource optimization
- Continuous improvement loop
ADR-012 Compliance:
§③ 單一 audit trail — 所有執行完畢後必發 triaged_alert Telegram
§⑤ 雙寫強制 — ai_insights (由 orchestrator._log_decision) + Telegram
ADR-013 Compliance:
resource_optimization trigger → auto_heal_service.handle_exception
"""
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import numpy as np
from services.logger_manager import SystemLogger
from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision
from database.manager import get_session
from sqlalchemy import text
logger = SystemLogger("ElephantAlphaEngine").get_logger()
# trigger_type → DecisionType 映射B3 修正:避免 .upper() ValueError
_TRIGGER_TO_DECISION_TYPE: Dict[str, "DecisionType"] = {} # populated after class definition
class DecisionType(Enum):
PRICE_OPTIMIZATION = "price_optimization"
THREAT_RESPONSE = "threat_response"
MARKET_OPPORTUNITY = "market_opportunity"
RESOURCE_ALLOCATION = "resource_allocation"
STRATEGIC_PLANNING = "strategic_planning"
_TRIGGER_TO_DECISION_TYPE = {
"price_drop_alert": DecisionType.PRICE_OPTIMIZATION,
"market_opportunity": DecisionType.MARKET_OPPORTUNITY,
"threat_escalation": DecisionType.THREAT_RESPONSE,
"resource_optimization": DecisionType.RESOURCE_ALLOCATION,
}
@dataclass
class DecisionOutcome:
"""Track decision outcomes for learning"""
decision_id: str
decision_type: DecisionType
prediction: Dict[str, Any]
actual_outcome: Dict[str, Any]
accuracy_score: float
business_impact: float
timestamp: datetime
lessons_learned: List[str]
@dataclass
class AutonomousTrigger:
"""Autonomous decision trigger conditions"""
trigger_type: str
conditions: Dict[str, Any]
threshold: float
enabled: bool
last_triggered: Optional[datetime] = None
class ElephantAlphaAutonomousEngine:
"""
Elephant Alpha Autonomous Decision Engine
ADR-012: all execution results → triaged_alert Telegram (audit trail)
ADR-013: resource_optimization → auto_heal_service
"""
def __init__(self):
self.decision_history: List[DecisionOutcome] = [] # 最近 100 筆快取;持久化到 DB
self.triggers: List[AutonomousTrigger] = []
self.learning_rate = 0.1
self.confidence_threshold = 0.7
self.max_autonomous_decisions_per_hour = 10
self.decision_count_hour = 0
self.last_hour_reset = datetime.now()
# W3-B 護欄 3每小時費用上限
self.hourly_cost_usd = 0.0
self.max_hourly_cost_usd = 5.0 # $5/hr 硬上限
self._cost_per_ea_call = 0.002 # ~100B 模型每次約 $0.002 (1K tokens)
self._initialize_triggers()
def _initialize_triggers(self):
"""Initialize autonomous decision triggers"""
self.triggers = [
AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"competitor_price_drop_pct": 15, "sales_velocity": "decreasing"},
threshold=0.8,
enabled=True
),
AutonomousTrigger(
trigger_type="market_opportunity",
conditions={"competitor_price_premium": ">5%", "our_stock": "available"},
threshold=0.7,
enabled=True
),
AutonomousTrigger(
trigger_type="threat_escalation",
conditions={"threat_score": 0.9, "trend": "worsening"},
threshold=0.85,
enabled=True
),
AutonomousTrigger(
trigger_type="resource_optimization",
conditions={"system_load": "high", "queue_size": ">10"},
threshold=0.6,
enabled=True
)
]
async def start_autonomous_monitoring(self):
"""Start continuous autonomous monitoring loop"""
logger.info("[ElephantAlpha] Starting autonomous monitoring engine")
while True:
try:
self._reset_hourly_counter_if_needed()
await self._check_triggers()
await self._continuous_learning()
await self._optimize_resources()
await asyncio.sleep(60)
except Exception as e:
logger.error(f"[ElephantAlpha] Autonomous monitoring error: {e}")
await asyncio.sleep(300)
async def _check_triggers(self):
"""Check all autonomous triggers"""
if self.decision_count_hour >= self.max_autonomous_decisions_per_hour:
logger.warning("[ElephantAlpha] Hourly decision limit reached, skipping cycle")
return
for trigger in self.triggers:
if not trigger.enabled:
continue
if await self._evaluate_trigger(trigger):
await self._execute_autonomous_decision(trigger)
async def _evaluate_trigger(self, trigger: AutonomousTrigger) -> bool:
if trigger.trigger_type == "price_drop_alert":
return await self._check_price_drop_trigger(trigger)
elif trigger.trigger_type == "market_opportunity":
return await self._check_market_opportunity_trigger(trigger)
elif trigger.trigger_type == "threat_escalation":
return await self._check_threat_escalation_trigger(trigger)
elif trigger.trigger_type == "resource_optimization":
return await self._check_resource_optimization_trigger(trigger)
return False
# ── Trigger checkers ──────────────────────────────────────────────
async def _check_price_drop_trigger(self, trigger: AutonomousTrigger) -> bool:
session = get_session()
try:
rows = session.execute(text("""
SELECT
p.i_code AS sku,
cp.price AS competitor_price,
pr.price AS momo_price,
((pr.price - cp.price) / pr.price * 100) AS price_gap_pct
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records
ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
JOIN competitor_prices cp ON cp.sku = p.i_code
WHERE cp.expires_at > NOW()
AND cp.price < pr.price * 0.85
AND cp.crawled_at >= NOW() - INTERVAL '2 hours'
LIMIT 10
""")).fetchall()
return len(rows) >= 3
finally:
session.close()
async def _check_market_opportunity_trigger(self, trigger: AutonomousTrigger) -> bool:
session = get_session()
try:
rows = session.execute(text("""
SELECT p.i_code AS sku
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records
ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
JOIN competitor_prices cp ON cp.sku = p.i_code
WHERE cp.expires_at > NOW()
AND cp.price > pr.price * 1.05
AND cp.crawled_at >= NOW() - INTERVAL '1 hour'
LIMIT 5
""")).fetchall()
return bool(rows)
finally:
session.close()
async def _check_threat_escalation_trigger(self, trigger: AutonomousTrigger) -> bool:
session = get_session()
try:
rows = session.execute(text("""
SELECT product_sku
FROM ai_insights
WHERE insight_type = 'price_alert'
AND confidence >= 0.9
AND created_at >= NOW() - INTERVAL '30 minutes'
AND metadata_json LIKE '%worsening%'
LIMIT 3
""")).fetchall()
return len(rows) >= 2
finally:
session.close()
async def _check_resource_optimization_trigger(self, trigger: AutonomousTrigger) -> bool:
return (self._get_action_queue_size() > 10
or self._get_system_load_percentage() > 80)
# ── Decision execution ────────────────────────────────────────────
async def _execute_autonomous_decision(self, trigger: AutonomousTrigger):
"""
Execute autonomous decision.
W3-B: cost guard — abort if hourly spend would exceed $5.
Price actions require confidence ≥ 0.85 (護欄 1).
ADR-012 §⑤: successful execution → triaged_alert Telegram.
ADR-013: resource_optimization → auto_heal_service.
"""
# W3-B: 費用護欄
if self.hourly_cost_usd + self._cost_per_ea_call > self.max_hourly_cost_usd:
logger.warning(
f"[ElephantAlpha] Hourly cost limit ${self.max_hourly_cost_usd} reached, "
f"skipping trigger: {trigger.trigger_type}"
)
return
# ADR-013: resource_optimization → AIOps autoheal loop
if trigger.trigger_type == "resource_optimization":
await self._handle_resource_via_autoheal(trigger)
return
context = await self._build_trigger_context(trigger)
decision = await elephant_orchestrator.analyze_and_coordinate(context)
self.hourly_cost_usd += self._cost_per_ea_call
# 護欄 1: price 行動信心閾值 0.85
price_triggers = {"price_drop_alert", "market_opportunity"}
effective_threshold = (
0.85 if trigger.trigger_type in price_triggers else self.confidence_threshold
)
if decision.confidence >= effective_threshold:
await self._execute_decision(decision, trigger)
trigger.last_triggered = datetime.now()
self.decision_count_hour += 1
logger.info(f"[ElephantAlpha] Autonomous decision executed: {trigger.trigger_type}")
# W2-B: ADR-012 §⑤ — 執行完畢後強制發 Telegram audit trail
await self._notify_telegram_executed(decision, trigger)
else:
# W2-A: ADR-012 §⑤ — 升級人工 + 強制發 Telegram
await self._escalate_to_human(decision, trigger)
async def _handle_resource_via_autoheal(self, trigger: AutonomousTrigger):
"""ADR-013: resource_optimization → auto_heal_service.handle_exception"""
try:
from services.auto_heal_service import AutoHealService
heal_service = AutoHealService()
await asyncio.get_event_loop().run_in_executor(
None,
heal_service.handle_exception,
"resource_pressure",
{
"queue_size": self._get_action_queue_size(),
"system_load": self._get_system_load_percentage(),
"source": "elephant_alpha_autonomous_engine",
}
)
logger.info("[ElephantAlpha] Resource optimization handed off to AutoHealService")
except Exception as e:
logger.error(f"[ElephantAlpha] AutoHeal handoff failed: {e}")
async def _build_trigger_context(self, trigger: AutonomousTrigger) -> Dict[str, Any]:
context = {
"trigger_type": trigger.trigger_type,
"urgency": "high",
"autonomous_mode": True,
"timestamp": datetime.now().isoformat()
}
if trigger.trigger_type == "price_drop_alert":
context.update(await self._get_price_drop_context())
elif trigger.trigger_type == "market_opportunity":
context.update(await self._get_market_opportunity_context())
elif trigger.trigger_type == "threat_escalation":
context.update(await self._get_threat_escalation_context())
return context
# ── Context builders ──────────────────────────────────────────────
async def _get_price_drop_context(self) -> Dict[str, Any]:
session = get_session()
try:
rows = session.execute(text("""
SELECT
p.i_code AS sku, p.name, p.category,
cp.price AS competitor_price, cp.source AS competitor_name,
pr.price AS momo_price,
((pr.price - cp.price) / pr.price * 100) AS price_gap_pct
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records
ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
JOIN competitor_prices cp ON cp.sku = p.i_code
WHERE cp.expires_at > NOW()
AND cp.price < pr.price * 0.85
AND cp.crawled_at >= NOW() - INTERVAL '2 hours'
ORDER BY price_gap_pct DESC
LIMIT 10
""")).fetchall()
return {
"affected_products": [
{"sku": r.sku, "name": r.name, "category": r.category,
"current_price": float(r.momo_price),
"competitor_price": float(r.competitor_price),
"price_gap_pct": float(r.price_gap_pct),
"competitor": r.competitor_name}
for r in rows
],
"decision_type": "price_optimization",
"business_impact": "revenue_protection"
}
finally:
session.close()
async def _get_market_opportunity_context(self) -> Dict[str, Any]:
session = get_session()
try:
rows = session.execute(text("""
SELECT
p.i_code AS sku, p.name, p.category,
pr.price AS momo_price, cp.price AS competitor_price,
cp.source AS competitor_name
FROM products p
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records
ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
JOIN competitor_prices cp ON cp.sku = p.i_code
WHERE cp.expires_at > NOW()
AND cp.price > pr.price * 1.05
AND cp.crawled_at >= NOW() - INTERVAL '1 hour'
LIMIT 5
""")).fetchall()
return {
"opportunity_products": [
{"sku": r.sku, "name": r.name, "category": r.category,
"current_price": float(r.momo_price),
"competitor_price": float(r.competitor_price),
"competitor": r.competitor_name}
for r in rows
],
"decision_type": "market_opportunity",
"business_impact": "revenue_growth"
}
finally:
session.close()
async def _get_threat_escalation_context(self) -> Dict[str, Any]:
session = get_session()
try:
rows = session.execute(text("""
SELECT product_sku AS sku, confidence, content, created_at
FROM ai_insights
WHERE insight_type = 'price_alert'
AND confidence >= 0.9
AND created_at >= NOW() - INTERVAL '30 minutes'
LIMIT 5
""")).fetchall()
return {
"threat_insights": [
{"sku": r.sku,
"confidence": float(r.confidence) if r.confidence else 0.0,
"content": r.content,
"created_at": r.created_at.isoformat()}
for r in rows
],
"decision_type": "threat_response",
"business_impact": "risk_mitigation"
}
finally:
session.close()
# ── Execution & persistence ───────────────────────────────────────
async def _execute_decision(self, decision: StrategicDecision, trigger: AutonomousTrigger):
"""Execute each step; persist outcome to DB (B5); Telegram handled by caller."""
for step in decision.execution_plan:
try:
await self._execute_step(step)
logger.info(f"[ElephantAlpha] Step done: {step.get('agent')}{step.get('action')}")
except Exception as e:
logger.error(f"[ElephantAlpha] Step failed: {e}")
decision_type = _TRIGGER_TO_DECISION_TYPE.get(
trigger.trigger_type, DecisionType.STRATEGIC_PLANNING
)
outcome = DecisionOutcome(
decision_id=f"{trigger.trigger_type}_{datetime.now().timestamp()}",
decision_type=decision_type,
prediction=asdict(decision),
actual_outcome={},
accuracy_score=0.0,
business_impact=0.0,
timestamp=datetime.now(),
lessons_learned=[]
)
# B5 FIX: persist to action_plans (DB)
session = get_session()
try:
session.execute(text("""
INSERT INTO action_plans
(session_id, plan_type, sku, payload, status, created_by)
VALUES (:sid, :pt, :sku, :pl, :status, :by)
"""), {
"sid": outcome.decision_id,
"pt": "elephant_alpha_decision",
"sku": None,
"pl": json.dumps({
"decision": {k: str(v) for k, v in asdict(decision).items()},
"trigger": trigger.trigger_type,
"confidence": decision.confidence,
}),
"status": "executed",
"by": "elephant_alpha",
})
session.commit()
except Exception as e:
logger.error(f"[ElephantAlpha] DB persist failed: {e}")
session.rollback()
finally:
session.close()
self.decision_history.append(outcome)
if len(self.decision_history) > 100:
self.decision_history = self.decision_history[-100:]
async def _execute_step(self, step: Dict[str, Any]):
"""Execute individual step. NemoTron gets PriceThreat list (B6 FIX)."""
from services.hermes_analyst_service import HermesAnalystService, PriceThreat
from services.nemoton_dispatcher_service import NemotronDispatcher
from services.openclaw_strategist_service import generate_weekly_strategy_report
agent_type = step.get("agent", "").lower()
action = step.get("action", "")
params = step.get("parameters", {})
logger.info(f"[ElephantAlpha] Execute: {agent_type}{action}")
if agent_type == "hermes" and action == "analyze_price_competition":
return HermesAnalystService().run()
elif agent_type == "nemotron" and action == "dispatch_alert":
raw_threats = params.get("threats", [])
threats = [
PriceThreat(
sku=t.get("sku", ""), name=t.get("name", ""),
category=t.get("category", ""),
momo_price=float(t.get("momo_price", 0)),
pchome_price=float(t.get("competitor_price", 0)),
gap_pct=float(t.get("price_gap_pct", 0)),
sales_7d_delta_pct=float(t.get("sales_7d_delta_pct", 0)),
risk=t.get("risk", "MED"),
recommended_action=t.get("recommended_action", ""),
confidence=float(t.get("confidence", 0.5)),
)
for t in raw_threats if isinstance(t, dict)
]
if threats:
NemotronDispatcher().dispatch(threats)
elif agent_type == "openclaw" and action == "generate_strategic_analysis":
return generate_weekly_strategy_report()
# ── Telegram notifications (ADR-012 §⑤) ─────────────────────────
async def _notify_telegram_executed(
self, decision: StrategicDecision, trigger: AutonomousTrigger
):
"""W2-B: ADR-012 §⑤ — 自主執行完畢後強制發 Telegram audit trail。"""
try:
from services.telegram_templates import triaged_alert, _send_telegram_raw
msg, keyboard = triaged_alert(
base_event={
"event_type": trigger.trigger_type,
"title": f"🐘 EA 自主執行完畢 · {trigger.trigger_type}",
"summary": decision.expected_outcome or "EA 完成自主決策",
"id": f"ea_{trigger.trigger_type}_{int(datetime.now().timestamp())}",
},
tier_label="🐘 Elephant Alpha · 自主執行",
ai_summary=(decision.reasoning or "")[:300],
ai_executed=[
f"[{s.get('agent', '?')}] {s.get('action', '?')}"
for s in decision.execution_plan[:5]
] or ["(無具體執行計畫)"],
)
_send_telegram_raw(msg)
logger.info(f"[ElephantAlpha] Telegram audit sent for: {trigger.trigger_type}")
except Exception as e:
logger.error(f"[ElephantAlpha] Telegram audit failed (non-blocking): {e}")
async def _escalate_to_human(self, decision: StrategicDecision, trigger: AutonomousTrigger):
"""
W2-A: ADR-012 §⑤ — 信心不足時雙寫:
1. ai_insights (DB)
2. triaged_alert Telegram → 統帥收到升級通知
"""
logger.warning(
f"[ElephantAlpha] Escalating to human: {trigger.trigger_type} "
f"confidence={decision.confidence:.2f}"
)
# 1. DB write
session = get_session()
try:
session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status, metadata_json)
VALUES (:type, :content, :conf, :by, :status, :meta)
"""), {
"type": "human_review",
"content": (
f"[Elephant Alpha 升級審核] {trigger.trigger_type} "
f"信心度僅 {decision.confidence:.2f},建議人工介入。"
),
"conf": decision.confidence,
"by": "elephant_alpha",
"status": "pending",
"meta": json.dumps({
"decision": asdict(decision),
"trigger": trigger.trigger_type,
"reason": "low_confidence"
})
})
session.commit()
except Exception as e:
logger.error(f"[ElephantAlpha] DB escalation write failed: {e}")
session.rollback()
finally:
session.close()
# 2. Telegram (ADR-012 §⑤ 強制)
try:
from services.telegram_templates import triaged_alert, _send_telegram_raw
msg, keyboard = triaged_alert(
base_event={
"event_type": "ea_escalation",
"title": f"🐘 EA 升級審核 · {trigger.trigger_type}",
"summary": (
f"自主決策信心度 {decision.confidence:.2f} 低於門檻,需人工批准"
),
"id": f"ea_review_{int(datetime.now().timestamp())}",
},
tier_label="🐘 Elephant Alpha · L3 HITL",
ai_summary=(decision.reasoning or "")[:300],
ai_cause=(
f"觸發器:{trigger.trigger_type} | "
f"信心度:{decision.confidence:.2f} | "
f"需求 Agent{', '.join(decision.agents_required)}"
),
ai_actions=[
f"Step {s.get('step', i+1)}: [{s.get('agent', '?')}] {s.get('action', '?')}"
for i, s in enumerate(decision.execution_plan[:3])
] or ["無具體執行計畫"],
)
_send_telegram_raw(msg, reply_markup=keyboard)
logger.info(
f"[ElephantAlpha] Human escalation Telegram sent: {trigger.trigger_type}"
)
except Exception as e:
logger.error(f"[ElephantAlpha] Telegram escalation failed (non-blocking): {e}")
# ── Learning & resource management ───────────────────────────────
async def _continuous_learning(self):
recent_outcomes = [
o for o in self.decision_history
if o.timestamp >= datetime.now() - timedelta(hours=24)
]
if len(recent_outcomes) >= 5:
accuracy_scores = [o.accuracy_score for o in recent_outcomes if o.accuracy_score > 0]
if accuracy_scores:
avg_accuracy = np.mean(accuracy_scores)
if avg_accuracy > 0.8:
self.confidence_threshold = min(0.9, self.confidence_threshold + 0.05)
elif avg_accuracy < 0.6:
self.confidence_threshold = max(0.5, self.confidence_threshold - 0.05)
logger.info(
f"[ElephantAlpha] Learning: accuracy={avg_accuracy:.2f}, "
f"threshold={self.confidence_threshold:.2f}"
)
async def _optimize_resources(self):
queue_size = self._get_action_queue_size()
system_load = self._get_system_load_percentage()
if system_load > 90:
self.max_autonomous_decisions_per_hour = 5
elif system_load < 50 and queue_size < 5:
self.max_autonomous_decisions_per_hour = 15
def _get_action_queue_size(self) -> int:
session = get_session()
try:
row = session.execute(text(
"SELECT COUNT(*) as count FROM action_plans WHERE status = 'pending'"
)).fetchone()
return row.count if row else 0
finally:
session.close()
def _get_system_load_percentage(self) -> float:
return 45.0
def _reset_hourly_counter_if_needed(self):
if datetime.now().hour != self.last_hour_reset.hour:
self.decision_count_hour = 0
self.hourly_cost_usd = 0.0 # W3-B: 同步重置費用計數
self.last_hour_reset = datetime.now()
# Global autonomous engine instance
autonomous_engine = ElephantAlphaAutonomousEngine()