Files
ewoooc/services/auto_heal_service.py
OoO 4f62480bdb
All checks were successful
CD Pipeline / deploy (push) Successful in 56s
串接 AutoHeal 共用 SSH helper
2026-05-12 23:53:32 +08:00

588 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
auto_heal_service.py
ADR-013 AIOps 自動修復服務。
支援的動作:
DOCKER_RESTART — 在指定主機重啟 Docker 服務
WAIT_RETRY — 等待後重試(不做系統操作)
ALERT_ONLY — 只記錄 / 發 Telegram不執行
SSH_CMD — 執行 PlayBook 指定的靜態白名單命令list 型)
CODE_FIX — ADR-014: Aider 自動修覆程式碼並推版
"""
import asyncio
import json
import logging
import os
import re
import sqlite3
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from sqlalchemy import text
from services.logger_manager import SystemLogger
from services.ai_automation_metrics import record_autoheal_action
from database.manager import get_session
from utils.ssh_helper import run_ssh_command
logger = SystemLogger("AutoHealService").get_logger()
# ---- Configuration ----
SSH_JUMP_HOST = os.getenv("ELEPHANT_ALPHA_JUMP_HOST", "192.168.0.110")
SSH_JUMP_USER = os.getenv("ELEPHANT_ALPHA_JUMP_USER", "wooo")
SSH_KEY_PATH = os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH", os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519"))
SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22"))
SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10"))
SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60"))
CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"})
_PROTECTED_CONTAINERS = frozenset({"momo-db", "momo-postgres"})
_ALLOWED_SSH_HOSTS = frozenset(
h.strip() for h in os.getenv("ELEPHANT_ALPHA_ALLOWED_SSH_HOSTS", "192.168.0.188").split(",") if h.strip()
)
_ALLOWED_SSH_DOCKER_SUBCOMMANDS = frozenset({"ps", "logs", "inspect"})
_ALLOWED_SSH_READONLY_COMMANDS = frozenset({"df", "free", "uptime"})
_OFFLINE_PLAYBOOKS = {
"DNS_FAIL": {
"id": 0,
"name": "Offline DNS/DB guard",
"action_type": "ALERT_ONLY",
"action_params": {"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db轉人工檢查。"},
"max_retries": 1,
"cooldown_min": 30,
},
"DB_UNREACHABLE": {
"id": 0,
"name": "Offline DB guard",
"action_type": "ALERT_ONLY",
"action_params": {"message": "DB 無法連線,依 ADR-011 不重啟 momo-db轉人工檢查。"},
"max_retries": 1,
"cooldown_min": 30,
},
}
# ---- DB / dedup ----
_dedup_lock = threading.Lock()
_dedup_conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False)
_dedup_conn.execute("""
CREATE TABLE IF NOT EXISTS escalation_dedup (
trigger_type TEXT PRIMARY KEY,
last_triggered INTEGER NOT NULL
)
""")
_dedup_conn.commit()
def _store_escalation(trigger_type: str) -> None:
with _dedup_lock:
_dedup_conn.execute(
"INSERT OR REPLACE INTO escalation_dedup (trigger_type, last_triggered) VALUES (?, ?)",
(trigger_type, int(datetime.now().timestamp())),
)
_dedup_conn.commit()
def _load_escalation(trigger_type: str) -> Optional[int]:
with _dedup_lock:
row = _dedup_conn.execute(
"SELECT last_triggered FROM escalation_dedup WHERE trigger_type = ?",
(trigger_type,),
).fetchone()
return row[0] if row else None
def _ssh_exec(
jump_host: str,
jump_user: str,
target_host: str,
target_user: str,
command: List[str],
key_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
Execute command on target_host via SSH jump host.
command must be a list (argv) to avoid shell injection.
"""
safe_key = key_path or SSH_KEY_PATH
result = run_ssh_command(
host=target_host,
user=target_user,
command=command,
port=SSH_PORT,
key_path=safe_key,
connect_timeout=SSH_CONNECT_TIMEOUT,
command_timeout=SSH_COMMAND_TIMEOUT,
jump_host=jump_host,
jump_user=jump_user,
batch_mode=True,
server_alive_interval=15,
server_alive_count_max=3,
logger=logger,
)
return {
"success": result.success,
"exit_code": result.returncode,
"stdout": result.stdout,
"stderr": "SSH timeout" if result.stderr.startswith("SSH timeout after ") else result.stderr,
"command": command,
}
# ---- PlayBook ----
def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]:
session = None
try:
session = get_session()
row = session.execute(
text("SELECT id, name, action_type, action_params, max_retries, cooldown_min FROM playbooks WHERE error_type = :et AND is_active = TRUE"),
{"et": error_type},
).fetchone()
if row:
return {
"id": row.id,
"name": row.name,
"action_type": row.action_type,
"action_params": json.loads(row.action_params) if row.action_params else {},
"max_retries": row.max_retries,
"cooldown_min": row.cooldown_min,
}
return None
except Exception as exc:
logger.warning("[AutoHeal] playbook lookup failed for %s: %s", error_type, exc)
return None
finally:
if session is not None:
session.close()
# ---- Executor ----
@dataclass
class AutoHealResult:
success: bool
action: str
message: str
commit_sha: Optional[str] = None
reverted: bool = False
class AutoHealService:
"""
ADR-013: resource_optimization trigger → auto_heal_service.handle_exception
"""
def __init__(self):
self._log = logger
def handle_exception(
self,
error_type: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
**legacy_kwargs: Any,
) -> AutoHealResult:
context = dict(context or {})
if legacy_kwargs:
context.update({k: v for k, v in legacy_kwargs.items() if v is not None})
error_type = error_type or context.get("error_type") or self._derive_error_type(context)
context.setdefault("error_type", error_type)
self._log.info("[AutoHeal] handle_exception: error_type=%s context=%s", error_type, context)
incident_id = self._ensure_incident(error_type, context)
if incident_id is not None:
context["incident_id"] = incident_id
playbook = _find_playbook(error_type) or self._offline_playbook(error_type)
if not playbook:
msg = f"No playbook matched for error_type={error_type}"
self._log.info("[AutoHeal] %s", msg)
record_autoheal_action(action="NO_PLAYBOOK", error_type=error_type, success=False)
return AutoHealResult(success=False, action=None, message=msg)
playbook_id = int(playbook.get("id") or 0)
if playbook_id > 0:
context["matched_playbook_id"] = playbook_id
context["playbook_id"] = playbook_id
if playbook["action_type"] not in _ALLOWED_ACTION_TYPES:
msg = f"action_type '{playbook['action_type']}' is not allowed"
self._log.warning("[AutoHeal] %s", msg)
record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False)
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
# cooldown guard
cooldown_key = self._cooldown_key(playbook)
last = _load_escalation(cooldown_key)
if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]:
msg = f"Cooldown active for {playbook['action_type']}"
self._log.info("[AutoHeal] %s", msg)
record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False)
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
if playbook["action_type"] == "CODE_FIX":
started_at = datetime.now()
result = self._handle_code_fix(playbook, context)
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
self._alert_and_store(playbook, context, result, duration_ms)
record_autoheal_action(
action=result.action or playbook["action_type"],
error_type=error_type,
success=result.success,
duration_ms=duration_ms,
)
return result
# generic action execution
return self._execute_playbook_action(playbook, context)
def _derive_error_type(self, context: Dict[str, Any]) -> str:
error_text = str(context.get("exception") or context.get("error_message") or "").lower()
traceback_text = str(context.get("traceback_str") or "").lower()
combined = f"{error_text}\n{traceback_text}"
if "could not translate host name" in combined or "temporary failure in name resolution" in combined:
return "DNS_FAIL"
if (
"db_connection_error" in combined
or "connection refused" in combined
or "could not connect to server" in combined
or "operationalerror" in combined
):
return "DB_UNREACHABLE"
if "timeout" in combined:
return "crawler_timeout"
if "nvidia" in combined and "quota" in combined:
return "nim_quota_exhausted"
if "embedding" in combined:
return "embedding_failure"
if (
context.get("target_file")
or "traceback (most recent call last)" in combined
or "nameerror" in combined
or "attributeerror" in combined
or "typeerror" in combined
or "importerror" in combined
or "modulenotfounderror" in combined
):
return "python_exception"
return "scheduler_task_failure"
def _offline_playbook(self, error_type: str) -> Optional[Dict[str, Any]]:
"""Fallback when the primary DB is unavailable during DB/DNS incidents."""
playbook = _OFFLINE_PLAYBOOKS.get(error_type)
return dict(playbook) if playbook else None
def _cooldown_key(self, playbook: Dict[str, Any]) -> str:
playbook_id = playbook.get("id")
if playbook_id:
return f"playbook:{playbook_id}:{playbook['action_type']}"
return f"offline:{playbook.get('name', playbook['action_type'])}:{playbook['action_type']}"
def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]:
session = None
try:
from database.autoheal_models import Incident
session = get_session()
incident = Incident(
task_name=str(context.get("task_name") or context.get("source") or "unknown_task"),
error_type=error_type,
error_message=str(context.get("exception") or context.get("error_message") or error_type)[:2000],
traceback_str=str(context.get("traceback_str") or "")[:8000] or None,
error_traceback=str(context.get("traceback_str") or "")[:8000] or None,
severity=str(context.get("severity") or "medium"),
status="healing",
retry_count=0,
)
session.add(incident)
session.flush()
incident_id = incident.id
session.commit()
self._log.info("[AutoHeal] incident created: id=%s error_type=%s", incident_id, error_type)
return incident_id
except Exception as exc:
if session is not None:
session.rollback()
self._log.error("[AutoHeal] incident create failed: %s", exc)
return None
finally:
if session is not None:
session.close()
def _handle_code_fix(self, playbook: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
from services.aider_heal_executor import execute_code_fix
target_file = context.get("target_file", "")
error_type = context.get("error_type", "UnknownError")
error_message = context.get("error_message", "")
if not target_file:
return AutoHealResult(success=False, action="CODE_FIX", message="target_file missing")
try:
return execute_code_fix(
error_type=error_type,
error_message=error_message,
target_file=target_file,
context=context,
)
except Exception as e:
self._log.exception("[AutoHeal] CODE_FIX failed: %s", e)
return AutoHealResult(success=False, action="CODE_FIX", message=f"Exception: {e}")
def _execute_playbook_action(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
) -> AutoHealResult:
action_type = playbook["action_type"]
params = playbook.get("action_params", {})
started_at = datetime.now()
if action_type == "DOCKER_RESTART":
result = self._docker_restart(params)
elif action_type == "WAIT_RETRY":
result = self._wait_retry(params)
elif action_type == "ALERT_ONLY":
result = self._alert_only(params)
elif action_type == "SSH_CMD":
result = self._ssh_cmd(params, context)
else:
return AutoHealResult(success=False, action=action_type, message="Unhandled action_type")
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
self._alert_and_store(playbook, context, result, duration_ms)
record_autoheal_action(
action=result.action or action_type,
error_type=str(context.get("error_type") or "unknown"),
success=result.success,
duration_ms=duration_ms,
)
return result
def _docker_restart(self, params: Dict[str, Any]) -> AutoHealResult:
container = params.get("container")
if not container:
return AutoHealResult(success=False, action="DOCKER_RESTART", message="missing container")
safe_container = re.sub(r"[^a-zA-Z0-9._-]", "", container)
if safe_container != container:
return AutoHealResult(success=False, action="DOCKER_RESTART", message=f"unsafe container: {container}")
if safe_container in _PROTECTED_CONTAINERS:
return AutoHealResult(
success=False,
action="DOCKER_RESTART",
message=f"protected container blocked by ADR-011: {safe_container}",
)
result = _ssh_exec(
jump_host=SSH_JUMP_HOST,
jump_user=SSH_JUMP_USER,
target_host="192.168.0.188",
target_user="ollama",
command=["docker", "restart", safe_container],
key_path=os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH"),
)
if result["success"]:
msg = f"Container {safe_container} restarted (SSH jump)"
self._log.info("[AutoHeal] %s", msg)
else:
msg = f"Container restart failed: {result.get('stderr','')[:200]}"
self._log.error("[AutoHeal] %s", msg)
return AutoHealResult(success=result["success"], action="DOCKER_RESTART", message=msg)
def _wait_retry(self, params: Dict[str, Any]) -> AutoHealResult:
wait_min = min(int(params.get("wait_minutes", 5)), 30)
return AutoHealResult(success=True, action="WAIT_RETRY", message=f"Waiting {wait_min} min")
def _alert_only(self, params: Dict[str, Any]) -> AutoHealResult:
msg = params.get("message", "Alert sent")
return AutoHealResult(success=True, action="ALERT_ONLY", message=msg)
def _ssh_cmd(self, params: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
argv = params.get("argv")
if not isinstance(argv, list) or not argv:
return AutoHealResult(success=False, action="SSH_CMD", message="argv must be a non-empty list")
if not self._is_allowed_ssh_argv(argv):
return AutoHealResult(success=False, action="SSH_CMD", message=f"argv is not in read-only allowlist: {argv}")
host = params.get("host", "")
user = params.get("user", "ollama")
if host not in _ALLOWED_SSH_HOSTS:
return AutoHealResult(success=False, action="SSH_CMD", message=f"host is not allowed: {host}")
if user != "ollama":
return AutoHealResult(success=False, action="SSH_CMD", message=f"user is not allowed: {user}")
result = _ssh_exec(
jump_host=SSH_JUMP_HOST,
jump_user=SSH_JUMP_USER,
target_host=host,
target_user=user,
command=argv,
key_path=os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH"),
)
out = result["stdout"] or result["stderr"]
return AutoHealResult(success=result["success"], action="SSH_CMD", message=out)
def _is_allowed_ssh_argv(self, argv: List[Any]) -> bool:
normalized = [str(part) for part in argv]
if not normalized:
return False
if normalized[0] == "docker":
if len(normalized) < 2 or normalized[1] not in _ALLOWED_SSH_DOCKER_SUBCOMMANDS:
return False
return not any(part in _PROTECTED_CONTAINERS for part in normalized[2:])
if normalized[0] in _ALLOWED_SSH_READONLY_COMMANDS:
return True
return False
def _alert_and_store(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
result: "AutoHealResult",
duration_ms: float = 0.0,
) -> None:
_store_escalation(self._cooldown_key(playbook))
self._log.info("[AutoHeal] Alert stored for: %s", playbook["action_type"])
self._write_heal_log(playbook, context, result, duration_ms)
self._write_ai_insight(playbook, context, result, duration_ms)
self._update_incident_status(context, result)
def _write_heal_log(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
result: "AutoHealResult",
duration_ms: float,
) -> None:
"""Write heal execution record to heal_logs table (ADR-013 Step 6/7)."""
from database.autoheal_models import HealLog
incident_id = context.get("incident_id")
if not incident_id:
self._log.warning("[AutoHeal] _write_heal_log: incident_id missing in context, skipping DB write")
return
session = None
try:
session = get_session()
playbook_id = int(playbook["id"]) if int(playbook.get("id") or 0) > 0 else None
log_entry = HealLog(
incident_id=int(incident_id),
playbook_id=playbook_id,
action_type=playbook["action_type"],
action_detail=json.dumps(playbook.get("action_params", {}), ensure_ascii=False),
result="success" if result.success else "failed",
result_output=result.message[:2000] if result.message else None,
duration_ms=duration_ms,
)
session.add(log_entry)
session.commit()
self._log.info(
"[AutoHeal] heal_log written: incident_id=%s playbook=%s result=%s duration_ms=%.1f",
incident_id, playbook["id"], log_entry.result, duration_ms,
)
except Exception as exc:
self._log.error("[AutoHeal] _write_heal_log DB error: %s", exc)
if session is not None:
session.rollback()
finally:
if session is not None:
session.close()
def _write_ai_insight(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
result: "AutoHealResult",
duration_ms: float,
) -> None:
from database.ai_models import AIInsight
session = None
try:
session = get_session()
payload = {
"incident_id": context.get("incident_id"),
"task_name": context.get("task_name"),
"error_type": context.get("error_type"),
"playbook_id": playbook.get("id"),
"action_type": playbook.get("action_type"),
"success": result.success,
"duration_ms": duration_ms,
}
insight = AIInsight(
insight_type="auto_heal_playbook",
content=(
f"task={context.get('task_name', 'unknown')} "
f"error_type={context.get('error_type', 'unknown')} "
f"action={playbook.get('action_type')} "
f"result={'success' if result.success else 'failed'} "
f"message={str(result.message)[:500]}"
),
metadata_json=json.dumps(payload, ensure_ascii=False),
confidence=1.0 if result.success else 0.4,
created_by="auto_heal_service",
ai_model="rule_based",
status="approved",
)
session.add(insight)
session.commit()
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(insight.id, "auto_heal_playbook", insight.content)
except Exception as embed_err:
self._log.warning("[AutoHeal] embedding queue enqueue failed: %s", embed_err)
except Exception as exc:
self._log.error("[AutoHeal] ai_insight write failed: %s", exc)
if session is not None:
session.rollback()
finally:
if session is not None:
session.close()
def _update_incident_status(self, context: Dict[str, Any], result: "AutoHealResult") -> None:
incident_id = context.get("incident_id")
if not incident_id:
return
session = None
try:
session = get_session()
status = "closed" if result.success else "escalated"
playbook_id = int(
context.get("matched_playbook_id")
or context.get("playbook_id")
or 0
) or None
session.execute(
text("""
UPDATE incidents
SET status = :status,
matched_playbook_id = COALESCE(:playbook_id, matched_playbook_id),
playbook_id = COALESCE(:playbook_id, playbook_id),
resolved_at = CASE
WHEN :status = 'closed' THEN COALESCE(resolved_at, NOW())
ELSE resolved_at
END,
updated_at = NOW()
WHERE id = :incident_id
"""),
{
"status": status,
"playbook_id": playbook_id,
"incident_id": int(incident_id),
},
)
session.commit()
except Exception as exc:
if session is not None:
session.rollback()
self._log.error("[AutoHeal] incident status update failed: %s", exc)
finally:
if session is not None:
session.close()
auto_heal_service = AutoHealService()