Files
ewoooc/tests/test_elephant_alpha_engine.py
OG T 0904a60237
All checks were successful
CD Pipeline / deploy (push) Successful in 1m29s
fix(scheduler): quiet cold-start noise gates
2026-05-06 00:31:30 +08:00

85 lines
2.7 KiB
Python

import asyncio
def test_run_with_timeout_supports_sync_function():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41))
assert result == 42
def test_execute_step_rejects_unknown_action():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
try:
asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"}))
except ValueError as exc:
assert "Unrecognized step" in str(exc)
else:
raise AssertionError("unknown action should fail")
def test_execute_step_routes_code_fix_to_autoheal(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_run_auto_heal",
lambda error_type, context: calls.append((error_type, context)) or {"ok": True},
)
asyncio.run(engine._execute_step({
"agent": "elephant_alpha",
"action": "code_fix",
"parameters": {"target_file": "services/example.py", "error_message": "Traceback"},
}))
assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})]
def test_execute_step_routes_price_adjustment_to_human_review(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_record_price_adjustment_review",
lambda step: calls.append(step) or {"status": "pending_review", "sku": "SKU-9"},
)
result = asyncio.run(engine._execute_step({
"agent": "nemotron",
"action": "execute_price_adjustment",
"parameters": {"sku": "SKU-9", "recommended_price": 1280},
}))
assert result == {"status": "pending_review", "sku": "SKU-9"}
assert calls[0]["parameters"]["recommended_price"] == 1280
def test_execute_step_skips_legacy_openclaw_resource_strategy():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
result = asyncio.run(engine._execute_step({
"agent": "openclaw",
"action": "generate_resource_optimization_strategy",
}))
assert result is None
def test_autoheal_derives_python_exception_from_traceback():
from services.auto_heal_service import AutoHealService
svc = AutoHealService()
assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"