import asyncio
import logging
def test_run_with_timeout_supports_sync_function():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41))
assert result == 42
def test_execute_step_rejects_unknown_action():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
try:
asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"}))
except ValueError as exc:
assert "Unrecognized step" in str(exc)
else:
raise AssertionError("unknown action should fail")
def test_execute_step_routes_code_fix_to_autoheal(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_run_auto_heal",
lambda error_type, context: calls.append((error_type, context)) or {"ok": True},
)
asyncio.run(engine._execute_step({
"agent": "elephant_alpha",
"action": "code_fix",
"parameters": {"target_file": "services/example.py", "error_message": "Traceback"},
}))
assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})]
def test_execute_step_routes_price_adjustment_to_human_review(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_record_price_adjustment_review",
lambda step: calls.append(step) or {"status": "pending_review", "sku": "SKU-9"},
)
result = asyncio.run(engine._execute_step({
"agent": "nemotron",
"action": "execute_price_adjustment",
"parameters": {"sku": "SKU-9", "recommended_price": 1280},
}))
assert result == {"status": "pending_review", "sku": "SKU-9"}
assert calls[0]["parameters"]["recommended_price"] == 1280
def test_execute_step_skips_legacy_openclaw_strategy_actions():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
for action in {
"generate_market_strategy",
"generate_dynamic_pricing_strategy",
"generate_resource_optimization_strategy",
}:
result = asyncio.run(engine._execute_step({
"agent": "openclaw",
"action": action,
}))
assert result is None
def test_autoheal_derives_python_exception_from_traceback():
from services.auto_heal_service import AutoHealService
svc = AutoHealService()
assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"
def test_execute_autonomous_decision_logs_short_circuit_telemetry_failure(monkeypatch, caplog):
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
import services.ai_call_logger as ai_call_logger
engine = ElephantAlphaAutonomousEngine()
async def _no_hermes_threats(top_n=5):
return None
def _broken_log_ai_call(*args, **kwargs):
raise RuntimeError("ai telemetry unavailable")
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_hermes_threats)
monkeypatch.setattr(ai_call_logger, "log_ai_call", _broken_log_ai_call)
caplog.set_level(logging.WARNING, logger="services.elephant_alpha_autonomous_engine")
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert "EA short-circuit telemetry failed" in caplog.text
def test_competitor_db_evidence_actions_are_concrete():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
actions = ElephantAlphaAutonomousEngine._format_competitor_evidence_actions(
[
{
"sku": "SKU-1",
"name": "測試商品",
"momo_price": 1200,
"competitor_price": 990,
"price_gap_pct": 17.5,
"competitor_product_id": "D123456",
"match_score": 0.84,
"tags": [
"identity_v2",
"match_type_exact",
"price_basis_total_price",
"alert_tier_price_alert_exact",
],
}
],
trigger_type="price_drop_alert",
)
assert actions == [
"[SKU-1] 測試商品|MOMO $1,200 vs PChome $990 (+17.5%)|每件價差 NT$ 210|證據:高信心同款 / 總價可比 / 可直接價格告警 / score 0.84|建議人工確認 PChome identity_v2 後評估跟價或促銷|PChome D123456"
]
def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
contexts = []
notified = []
async def _capture_context(context):
contexts.append(context)
return StrategicDecision(
priority="high",
agents_required=["elephant_alpha"],
reasoning="已有 DB 價差實證,允許進入決策流程。",
expected_outcome="生成可稽核行動",
confidence=0.95,
execution_plan=[],
resource_requirements={},
)
async def _fetch_should_not_run(top_n=5):
raise AssertionError("DB evidence should avoid Hermes LLM prefetch")
async def _capture_notify(decision, trigger):
notified.append(trigger.trigger_type)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _fetch_should_not_run)
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: None)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert contexts[0]["conditions"]["_prefetched_hermes_threats"] == ["[SKU-1] DB 實證價差"]
assert notified == ["price_drop_alert"]
def test_high_confidence_price_decision_skips_execution_plan_and_goes_hitl(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
notified = []
stored = []
async def _capture_context(context):
return StrategicDecision(
priority="high",
agents_required=["hermes", "nemotron"],
reasoning="已有 DB 價格比對實證,轉人工覆核。",
expected_outcome="人工確認價格策略",
confidence=0.96,
execution_plan=[
{"agent": "hermes", "action": "analyze_price_competition"},
{"agent": "nemotron", "action": "dispatch_alert", "parameters": {"threats": []}},
],
resource_requirements={},
)
async def _execution_should_not_run(decision):
raise AssertionError("price decisions must not execute long-running autonomous steps")
async def _capture_notify(decision, trigger):
notified.append((trigger.trigger_type, len(decision.execution_plan)))
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
monkeypatch.setattr(engine, "_execute_decision", _execution_should_not_run)
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert notified == [("price_drop_alert", 2)]
assert stored == ["price_drop_alert"]
def test_price_review_decision_envelope_blocks_auto_execution():
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
decision = StrategicDecision(
priority="high",
agents_required=["elephant_alpha"],
reasoning="已有價格比對實證。",
expected_outcome="人工確認價格策略。",
confidence=0.96,
execution_plan=[{"agent": "hermes", "action": "analyze_price_competition"}],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={},
threshold=0.8,
enabled=True,
)
envelope = ElephantAlphaAutonomousEngine._build_price_review_decision_envelope(
decision,
trigger,
concrete_actions=["[SKU-1] MOMO $1,200 vs PChome $990"],
)
assert envelope["decision_type"] == "price_decision_review"
assert envelope["source_agent"] == "elephant_alpha"
assert envelope["recommended_action"]["requires_hitl"] is True
assert envelope["guardrails"]["can_auto_execute"] is False
assert envelope["guardrails"]["blocked_reason"] == "price decisions require HITL; execution_plan skipped"
assert envelope["guardrails"]["data_quality"] == "complete"
def test_price_trigger_queries_use_lateral_latest_price_lookup(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
captured_sql = []
class _FakeResult:
def mappings(self):
return self
def fetchall(self):
return []
class _FakeSession:
def execute(self, statement, params=None):
captured_sql.append(str(statement))
return _FakeResult()
def close(self):
return None
monkeypatch.setattr(engine_module, "get_session", lambda: _FakeSession())
engine = ElephantAlphaAutonomousEngine()
asyncio.run(engine._check_price_drop_trigger(AutonomousTrigger("price_drop_alert", {}, 0.8, True)))
asyncio.run(engine._check_market_opportunity_trigger(AutonomousTrigger("market_opportunity", {}, 0.8, True)))
assert engine._fetch_recent_competitor_evidence_actions(top_n=2) is None
assert len(captured_sql) == 3
assert all("JOIN LATERAL" in sql for sql in captured_sql)
assert all("SELECT DISTINCT ON (product_id)" not in sql for sql in captured_sql)
assert all("latest_momo" not in sql for sql in captured_sql)
def test_escalate_resource_optimization_without_evidence_is_suppressed(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
suppressed = []
cooldown = []
def _raise_if_db_opened():
raise AssertionError("no-concrete resource escalation should not write human_review")
monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type))
monkeypatch.setattr(
engine,
"_record_suppressed_escalation",
lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)),
)
decision = StrategicDecision(
priority="medium",
agents_required=["openclaw"],
reasoning="資源調配建議信心不足",
expected_outcome="待人工確認",
confidence=0.60,
execution_plan=[],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={"_resource_metrics": {"action_queue_size": 14, "system_load_pct": 52.0}},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._escalate_to_human(decision, trigger))
assert cooldown == ["resource_optimization"]
assert suppressed == [("resource_optimization", "no_concrete_evidence")]
def test_escalate_price_alert_without_evidence_is_suppressed(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
suppressed = []
cooldown = []
async def _no_concrete_actions(top_n=5):
return None
def _raise_if_db_opened():
raise AssertionError("no-concrete price escalation should not write human_review")
monkeypatch.setattr(engine, "_fetch_hermes_threats_summary", _no_concrete_actions)
monkeypatch.setattr(engine_module, "get_session", _raise_if_db_opened)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: cooldown.append(trigger_type))
monkeypatch.setattr(
engine,
"_record_suppressed_escalation",
lambda decision, trigger, reason: suppressed.append((trigger.trigger_type, reason)),
)
decision = StrategicDecision(
priority="medium",
agents_required=["openclaw"],
reasoning="價格調整建議信心不足",
expected_outcome="待人工確認",
confidence=0.62,
execution_plan=[],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="price_drop_alert",
conditions={},
threshold=0.8,
enabled=True,
)
asyncio.run(engine._escalate_to_human(decision, trigger))
assert cooldown == ["price_drop_alert"]
assert suppressed == [("price_drop_alert", "no_concrete_evidence")]
def test_resource_pressure_classifier_does_not_equate_backlog_with_cpu_load():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 0,
"human_review_count": 0,
"stale_count": 0,
"system_load_pct": 19.2,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
assert metrics["pressure_level"] == "backlog_only"
assert metrics["should_alert"] is False
assert metrics["load_pressure"] is False
assert "system_load=19.2%/80%" in metrics["evidence"]
def test_resource_pressure_classifier_alerts_on_actionable_review_backlog():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 0,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
assert metrics["pressure_level"] == "warning"
assert metrics["should_alert"] is True
assert metrics["high_priority_pressure"] is True
assert metrics["load_pressure"] is False
def test_resource_pressure_message_is_measurement_based_not_llm_theatre():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 5,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
"stale_hours": 24,
"top_action_items": [
{
"id": 42,
"status": "pending",
"priority": 1,
"age_hours": 31.5,
"type": "code_review_fix",
"description": "修正高優先錯誤",
}
],
})
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=123,
previous_limit=10,
new_limit=8,
)
assert "量測指標" in msg
assert "系統處置紀錄" in msg
assert "待處理焦點" in msg
assert "#42 P1 pending code_review_fix" in msg
assert "主機 CPU 未達高負載門檻" in msg
assert "未啟動 Hermes/NemoTron 價格分析" in msg
assert "預期效益" not in msg
assert "48小時" not in msg
assert "48 小時效益預測" in msg
def test_resource_pressure_decision_envelope_is_measurement_based():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 34,
"high_priority_count": 8,
"human_review_count": 6,
"stale_count": 5,
"system_load_pct": 22.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
"stale_hours": 24,
})
envelope = ElephantAlphaAutonomousEngine._build_resource_pressure_decision_envelope(
metrics,
insight_id=1942,
previous_limit=10,
new_limit=8,
)
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=1942,
previous_limit=10,
new_limit=8,
decision_envelope=envelope,
)
assert envelope["decision_id"] == "ea_resource_pressure_1942"
assert envelope["source_agent"] == "elephant_alpha"
assert envelope["decision_type"] == "resource_optimization"
assert envelope["severity"] == "P2"
assert envelope["guardrails"]["can_auto_execute"] is False
assert envelope["guardrails"]["llm_used"] is False
assert envelope["trace"]["provider"] == "action_plans_cpu_probe"
assert "🧭 決策信封" in msg
assert "資料品質:complete 自動執行:不允許" in msg
assert "不採用 LLM 生成的 48 小時效益預測" in msg
assert "Gemini" not in msg
def test_resource_optimization_bypasses_llm_orchestrator(monkeypatch):
import services.elephant_alpha_autonomous_engine as engine_module
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
engine = ElephantAlphaAutonomousEngine()
sent = []
stored = []
async def _boom(*args, **kwargs):
raise AssertionError("resource_optimization must not call LLM orchestrator")
async def _capture_send(*args, **kwargs):
sent.append(args)
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _boom)
monkeypatch.setattr(engine, "_run_action_plan_hygiene", lambda: {"updated_count": 0})
monkeypatch.setattr(engine, "_record_resource_pressure_insight", lambda *args, **kwargs: 77)
monkeypatch.setattr(engine, "_send_resource_pressure_telegram", _capture_send)
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={
"_resource_metrics": {
"action_queue_size": 34,
"high_priority_count": 9,
"human_review_count": 6,
"stale_count": 0,
"system_load_pct": 20.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
}
},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._execute_autonomous_decision(trigger))
assert stored == ["resource_optimization"]
assert len(sent) == 1
assert engine.max_autonomous_decisions_per_hour == 8
def test_resource_pressure_message_reports_hygiene_result():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
metrics = ElephantAlphaAutonomousEngine._classify_resource_pressure({
"action_queue_size": 9,
"high_priority_count": 0,
"human_review_count": 0,
"stale_count": 0,
"system_load_pct": 20.0,
"queue_threshold": 10,
"load_threshold_pct": 80,
"high_priority_threshold": 5,
"stale_threshold": 5,
})
metrics["pre_hygiene"] = {
"action_queue_size": 100,
"high_priority_count": 47,
"stale_count": 99,
}
metrics["hygiene_result"] = {
"updated_count": 91,
"by_source": {"code_review_fix": 66, "openclaw_recommendation": 25},
}
msg = ElephantAlphaAutonomousEngine._build_resource_pressure_telegram_message(
metrics,
insight_id=124,
previous_limit=10,
new_limit=10,
)
assert "P4 resolved" in msg
assert "清理前 Action queue:100" in msg
assert "已自動關閉過期 action_plans 91 筆" in msg
assert "只改 status/metadata,不刪除資料" in msg
def test_resource_optimization_cannot_use_legacy_autonomous_execution_template(monkeypatch):
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
engine = ElephantAlphaAutonomousEngine()
sent = []
async def _capture_send(message):
sent.append(message)
monkeypatch.setattr("services.telegram_templates._send_telegram_raw", _capture_send)
decision = StrategicDecision(
priority="medium",
agents_required=["elephant_alpha"],
reasoning="resource pressure test",
expected_outcome="old style outcome must not be sent",
confidence=0.8,
execution_plan=[{"agent": "Hermes", "action": "generate_resource_optimization_strategy"}],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="resource_optimization",
conditions={},
threshold=0.6,
enabled=True,
)
asyncio.run(engine._notify_telegram_executed(decision, trigger))
assert sent == []
def test_human_escalation_decision_envelope_blocks_auto_execution():
from services.elephant_alpha_autonomous_engine import (
AutonomousTrigger,
ElephantAlphaAutonomousEngine,
)
from services.elephant_alpha_orchestrator import StrategicDecision
decision = StrategicDecision(
priority="medium",
agents_required=["elephant_alpha"],
reasoning="需要人工判讀。",
expected_outcome="不自動執行。",
confidence=0.62,
execution_plan=[],
resource_requirements={},
)
trigger = AutonomousTrigger(
trigger_type="code_exception",
conditions={"scan_containers": ["momo-pro-system"]},
threshold=1.0,
enabled=True,
)
envelope = ElephantAlphaAutonomousEngine._build_human_escalation_decision_envelope(
decision,
trigger,
insight_id=77,
concrete_actions=[],
)
assert envelope["decision_id"] == "ea_review_77"
assert envelope["decision_type"] == "ea_escalation"
assert envelope["guardrails"]["can_auto_execute"] is False
assert envelope["guardrails"]["data_quality"] == "partial"
assert envelope["recommended_action"]["requires_hitl"] is True
assert envelope["trace"]["provider"] == "elephant_alpha"
def test_elephant_alpha_openclaw_registry_is_ollama_first():
from services.elephant_alpha_orchestrator import ElephantAlphaOrchestrator
orchestrator = ElephantAlphaOrchestrator()
openclaw = orchestrator.agents["openclaw"]
assert not openclaw.model.startswith("gemini")
assert openclaw.model == "qwen2.5-coder:7b"
assert "Gemini is not a primary agent model" in orchestrator.system_prompt
assert "Ollama cascade" in orchestrator.system_prompt