補強 ElephantAlpha 執行與通知閉環
All checks were successful
CD Pipeline / deploy (push) Successful in 1m10s

This commit is contained in:
OoO
2026-04-29 23:14:09 +08:00
parent 78eebfbcfc
commit d4865983d8
3 changed files with 101 additions and 9 deletions

View File

@@ -273,6 +273,16 @@ class AutoHealService:
return "nim_quota_exhausted"
if "embedding" in combined:
return "embedding_failure"
if (
context.get("target_file")
or "traceback (most recent call last)" in combined
or "nameerror" in combined
or "attributeerror" in combined
or "typeerror" in combined
or "importerror" in combined
or "modulenotfounderror" in combined
):
return "python_exception"
return "scheduler_task_failure"
def _offline_playbook(self, error_type: str) -> Optional[Dict[str, Any]]:

View File

@@ -15,6 +15,7 @@ ADR-013 Compliance:
"""
import asyncio
import inspect
import json
import logging
import os
@@ -591,7 +592,23 @@ class ElephantAlphaAutonomousEngine:
timeout=SSH_COMMAND_TIMEOUT,
)
self._log.warning("Unrecognized step: agent=%s action=%s", agent_type, action)
if action in {"auto_heal", "resource_optimization", "optimize_resources", "handle_exception"}:
return await self._run_with_timeout(
self._run_auto_heal,
"scheduler_task_failure",
params,
timeout=SSH_COMMAND_TIMEOUT,
)
if action in {"code_fix", "fix_code_exception", "handle_code_exception"}:
return await self._run_with_timeout(
self._run_auto_heal,
"python_exception",
params,
timeout=SSH_COMMAND_TIMEOUT,
)
raise ValueError(f"Unrecognized step: agent={agent_type} action={action}")
# ---- Sub-services ----
async def _hermes_analyze(self) -> Any:
@@ -614,6 +631,13 @@ class ElephantAlphaAutonomousEngine:
from services.openclaw_strategist_service import generate_meta_analysis_report
return await self._run_with_timeout(generate_meta_analysis_report, timeout=SSH_COMMAND_TIMEOUT)
def _run_auto_heal(self, error_type: str, context: Dict[str, Any]) -> Any:
from services.auto_heal_service import auto_heal_service
payload = dict(context or {})
payload.setdefault("source", "ElephantAlphaAutonomousEngine")
payload.setdefault("error_type", error_type)
return auto_heal_service.handle_exception(error_type=error_type, context=payload)
# ---- Notification ----
async def _notify_telegram_executed(
self,
@@ -715,27 +739,26 @@ class ElephantAlphaAutonomousEngine:
for i, s in enumerate(decision.execution_plan[:3])
] or ["無具體執行計畫"],
)
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, keyboard=keyboard)
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, reply_markup=keyboard)
self._log.info("Human escalation Telegram sent: %s", trigger.trigger_type)
except Exception as e:
self._log.error("Telegram escalation failed (non-blocking): %s", e)
# ---- Resource Optimization ----
def _ssh_exec(self, host: str, user: str, cmd: str, timeout: int = 120) -> tuple:
def _ssh_exec(self, host: str, user: str, cmd: Any, timeout: int = 120) -> tuple:
"""Execute command via SSH jump host to target host."""
import subprocess
import shlex
# 這裡實作 SSH 跳板或直接連線邏輯,根據 ADR-013 應透過跳板機
# 簡化版:直接從容器透過 SSH 連向目標 (188)
remote_cmd = [str(part) for part in cmd] if isinstance(cmd, list) else [str(cmd)]
full_cmd = [
"ssh",
"-p", str(SSH_PORT),
"-i", SSH_KEY_PATH,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
"-J", f"{SSH_JUMP_USER}@{SSH_JUMP_HOST}",
f"{user}@{host}",
cmd
"--",
*remote_cmd,
]
try:
res = subprocess.run(
@@ -786,7 +809,16 @@ class ElephantAlphaAutonomousEngine:
@staticmethod
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):
try:
return await asyncio.wait_for(coro(*args, **kwargs), timeout=timeout)
if inspect.iscoroutinefunction(coro):
awaitable = coro(*args, **kwargs)
else:
async def _call_sync():
result = await asyncio.to_thread(coro, *args, **kwargs)
if inspect.isawaitable(result):
return await result
return result
awaitable = _call_sync()
return await asyncio.wait_for(awaitable, timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation timed out after {timeout}s")

View File

@@ -0,0 +1,50 @@
import asyncio
def test_run_with_timeout_supports_sync_function():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
result = asyncio.run(ElephantAlphaAutonomousEngine._run_with_timeout(lambda value: value + 1, 41))
assert result == 42
def test_execute_step_rejects_unknown_action():
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
engine = ElephantAlphaAutonomousEngine()
try:
asyncio.run(engine._execute_step({"agent": "mystery", "action": "do_anything"}))
except ValueError as exc:
assert "Unrecognized step" in str(exc)
else:
raise AssertionError("unknown action should fail")
def test_execute_step_routes_code_fix_to_autoheal(monkeypatch):
from services.elephant_alpha_autonomous_engine import ElephantAlphaAutonomousEngine
calls = []
engine = ElephantAlphaAutonomousEngine()
monkeypatch.setattr(
engine,
"_run_auto_heal",
lambda error_type, context: calls.append((error_type, context)) or {"ok": True},
)
asyncio.run(engine._execute_step({
"agent": "elephant_alpha",
"action": "code_fix",
"parameters": {"target_file": "services/example.py", "error_message": "Traceback"},
}))
assert calls == [("python_exception", {"target_file": "services/example.py", "error_message": "Traceback"})]
def test_autoheal_derives_python_exception_from_traceback():
from services.auto_heal_service import AutoHealService
svc = AutoHealService()
assert svc._derive_error_type({"traceback_str": "Traceback (most recent call last):\nNameError"}) == "python_exception"