51 lines
1.6 KiB
Python
51 lines
1.6 KiB
Python
import asyncio
|
|
|
|
|
|
def test_run_with_timeout_supports_sync_function():
|
|
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
|
|
|
result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41))
|
|
|
|
assert result == 42
|
|
|
|
|
|
def test_execute_step_rejects_unknown_action():
|
|
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
|
|
|
engine = ElephantAlphaAutonomousEngine()
|
|
|
|
try:
|
|
asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"}))
|
|
except ValueError as exc:
|
|
assert "Unrecognized step" in str(exc)
|
|
else:
|
|
raise AssertionError("unknown action should fail")
|
|
|
|
|
|
def test_execute_step_routes_code_fix_to_autoheal(monkeypatch):
|
|
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
|
|
|
|
calls = []
|
|
engine = ElephantAlphaAutonomousEngine()
|
|
monkeypatch.setattr(
|
|
engine,
|
|
"_run_auto_heal",
|
|
lambda error_type, context: calls.append((error_type, context)) or {"ok": True},
|
|
)
|
|
|
|
asyncio.run(engine._execute_step({
|
|
"agent": "elephant_alpha",
|
|
"action": "code_fix",
|
|
"parameters": {"target_file": "services/example.py", "error_message": "Traceback"},
|
|
}))
|
|
|
|
assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})]
|
|
|
|
|
|
def test_autoheal_derives_python_exception_from_traceback():
|
|
from services.auto_heal_service import AutoHealService
|
|
|
|
svc = AutoHealService()
|
|
|
|
assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"
|