From d4865983d8da1eb3c3a1a6d0042eb43f3a7638d8 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 23:14:09 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A3=9C=E5=BC=B7=20ElephantAlpha=20=E5=9F=B7?= =?UTF-8?q?=E8=A1=8C=E8=88=87=E9=80=9A=E7=9F=A5=E9=96=89=E7=92=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/auto_heal_service.py | 10 ++++ services/elephant_alpha_autonomous_engine.py | 50 ++++++++++++++++---- tests/test_elephant_alpha_engine.py | 50 ++++++++++++++++++++ 3 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 tests/test_elephant_alpha_engine.py diff --git a/services/auto_heal_service.py b/services/auto_heal_service.py index 2872ca6..8ed9d2a 100644 --- a/services/auto_heal_service.py +++ b/services/auto_heal_service.py @@ -273,6 +273,16 @@ class AutoHealService: return "nim_quota_exhausted" if "embedding" in combined: return "embedding_failure" + if ( + context.get("target_file") + or "traceback (most recent call last)" in combined + or "nameerror" in combined + or "attributeerror" in combined + or "typeerror" in combined + or "importerror" in combined + or "modulenotfounderror" in combined + ): + return "python_exception" return "scheduler_task_failure" def _offline_playbook(self, error_type: str) -> Optional[Dict[str, Any]]: diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index 83bb0b5..4397109 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -15,6 +15,7 @@ ADR-013 Compliance: """ import asyncio +import inspect import json import logging import os @@ -591,7 +592,23 @@ class ElephantAlphaAutonomousEngine: timeout=SSH_COMMAND_TIMEOUT, ) - self._log.warning("Unrecognized step: agent=%s action=%s", agent_type, action) + if action in {"auto_heal", "resource_optimization", "optimize_resources", "handle_exception"}: + return await self._run_with_timeout( + self._run_auto_heal, + "scheduler_task_failure", + params, + timeout=SSH_COMMAND_TIMEOUT, + ) + + if action in {"code_fix", "fix_code_exception", "handle_code_exception"}: + return await self._run_with_timeout( + self._run_auto_heal, + "python_exception", + params, + timeout=SSH_COMMAND_TIMEOUT, + ) + + raise ValueError(f"Unrecognized step: agent={agent_type} action={action}") # ---- Sub-services ---- async def _hermes_analyze(self) -> Any: @@ -614,6 +631,13 @@ class ElephantAlphaAutonomousEngine: from services.openclaw_strategist_service import generate_meta_analysis_report return await self._run_with_timeout(generate_meta_analysis_report, timeout=SSH_COMMAND_TIMEOUT) + def _run_auto_heal(self, error_type: str, context: Dict[str, Any]) -> Any: + from services.auto_heal_service import auto_heal_service + payload = dict(context or {}) + payload.setdefault("source", "ElephantAlphaAutonomousEngine") + payload.setdefault("error_type", error_type) + return auto_heal_service.handle_exception(error_type=error_type, context=payload) + # ---- Notification ---- async def _notify_telegram_executed( self, @@ -715,27 +739,26 @@ class ElephantAlphaAutonomousEngine: for i, s in enumerate(decision.execution_plan[:3]) ] or ["無具體執行計畫"], ) - await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, keyboard=keyboard) + await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, reply_markup=keyboard) self._log.info("Human escalation Telegram sent: %s", trigger.trigger_type) except Exception as e: self._log.error("Telegram escalation failed (non-blocking): %s", e) # ---- Resource Optimization ---- - def _ssh_exec(self, host: str, user: str, cmd: str, timeout: int = 120) -> tuple: + def _ssh_exec(self, host: str, user: str, cmd: Any, timeout: int = 120) -> tuple: """Execute command via SSH jump host to target host.""" import subprocess - import shlex - - # 這裡實作 SSH 跳板或直接連線邏輯,根據 ADR-013 應透過跳板機 - # 簡化版:直接從容器透過 SSH 連向目標 (188) + remote_cmd = [str(part) for part in cmd] if isinstance(cmd, list) else [str(cmd)] full_cmd = [ "ssh", "-p", str(SSH_PORT), "-i", SSH_KEY_PATH, "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", + "-J", f"{SSH_JUMP_USER}@{SSH_JUMP_HOST}", f"{user}@{host}", - cmd + "--", + *remote_cmd, ] try: res = subprocess.run( @@ -786,7 +809,16 @@ class ElephantAlphaAutonomousEngine: @staticmethod async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs): try: - return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout) + if inspect.iscoroutinefunction(coro): + awaitable = coro(*args, **kwargs) + else: + async def _call_sync(): + result = await asyncio.to_thread(coro, *args, **kwargs) + if inspect.isawaitable(result): + return await result + return result + awaitable = _call_sync() + return await asyncio.wait_for(awaitable, timeout=timeout) except asyncio.TimeoutError: raise TimeoutError(f"Operation timed out after {timeout}s") diff --git a/tests/test_elephant_alpha_engine.py b/tests/test_elephant_alpha_engine.py new file mode 100644 index 0000000..745af0e --- /dev/null +++ b/tests/test_elephant_alpha_engine.py @@ -0,0 +1,50 @@ +import asyncio + + +def test_run_with_timeout_supports_sync_function(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41)) + + assert result == 42 + + +def test_execute_step_rejects_unknown_action(): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + engine = ElephantAlphaAutonomousEngine() + + try: + asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"})) + except ValueError as exc: + assert "Unrecognized step" in str(exc) + else: + raise AssertionError("unknown action should fail") + + +def test_execute_step_routes_code_fix_to_autoheal(monkeypatch): + from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine + + calls = [] + engine = ElephantAlphaAutonomousEngine() + monkeypatch.setattr( + engine, + "_run_auto_heal", + lambda error_type, context: calls.append((error_type, context)) or {"ok": True}, + ) + + asyncio.run(engine._execute_step({ + "agent": "elephant_alpha", + "action": "code_fix", + "parameters": {"target_file": "services/example.py", "error_message": "Traceback"}, + })) + + assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})] + + +def test_autoheal_derives_python_exception_from_traceback(): + from services.auto_heal_service import AutoHealService + + svc = AutoHealService() + + assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"