fix: missing sqlalchemy text import and _ssh_exec in ElephantAlpha
All checks were successful
CD Pipeline / deploy (push) Successful in 1m20s

This commit is contained in:
OoO
2026-04-28 12:13:44 +08:00
parent bc7113bc86
commit 3dd73dce03

View File

@@ -26,6 +26,7 @@ from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Any, Optional
from sqlalchemy import text
from services.logger_manager import SystemLogger
from services.elephant_alpha_orchestrator import elephant_orchestrator, StrategicDecision
from database.manager import get_session
@@ -662,7 +663,32 @@ class ElephantAlphaAutonomousEngine:
except Exception as e:
self._log.error("Telegram escalation failed (non-blocking): %s", e)
# ---- Helpers ----
# ---- Resource Optimization ----
def _ssh_exec(self, cmd: str, timeout: int = 60) -> tuple:
"""Execute command via SSH jump host."""
import subprocess
import shlex
full_cmd = [
"ssh",
"-p", str(SSH_PORT),
"-i", SSH_KEY_PATH,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{SSH_JUMP_USER}@{SSH_JUMP_HOST}",
cmd
]
try:
res = subprocess.run(
full_cmd,
capture_output=True,
text=True,
timeout=timeout
)
return res.returncode, res.stdout, res.stderr
except Exception as e:
return -1, "", str(e)
def _is_circuit_open(self) -> bool:
cb = self._circuit_breaker_state
if cb["failures"] >= self._cb_threshold: