Files
ewoooc/services/auto_heal_service.py
ogt 237d3af76f
Some checks failed
CD Pipeline / deploy (push) Failing after 2m59s
fix: Phase 2 P0 全清零 — 14 項安全與功能修復完成
P0-06: google_drive_service.py — pickle.load() 改 JSON token(消除 RCE 風險)
P0-07: bot_api_routes.py:30 — BOT_API_TOKEN 移除硬編碼預設值 clawdbot_momo_2026
P0-08: auto_import_index.html — showAlert innerHTML 改 createTextNode(XSS 修復)
P0-09: abc_analysis_detail.html + dashboard.html + daily_sales.html — Jinja2 | e 轉義
P0-10: openclaw_bot_routes.py:2634 — vendor PPT 補 return ppt_path(廠商報告恢復)
P0-11: telegram_bot_service.py:177-214 — cmd_start/cmd_help 補 try/except
P0-12: app.py:689-712 — 10 個 Blueprint 補齊 register(消滅 404 路由)
P0-13: auto_heal_service.py — 實作 _write_heal_log(),AIOps 稽核閉環補完
P0-14: monitoring/prometheus.yml — 取消 alert_rules comment;新增 alert_rules.yml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 21:11:52 +08:00

355 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
auto_heal_service.py
ADR-013 AIOps 自動修復服務。
支援的動作:
DOCKER_RESTART — 在指定主機重啟 Docker 服務
WAIT_RETRY — 等待後重試(不做系統操作)
ALERT_ONLY — 只記錄 / 發 Telegram不執行
SSH_CMD — 執行 PlayBook 指定的靜態白名單命令list 型)
CODE_FIX — ADR-014: Aider 自動修覆程式碼並推版
"""
import asyncio
import json
import logging
import os
import re
import sqlite3
import threading
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from sqlalchemy import text
from services.logger_manager import SystemLogger
from database.manager import get_session
logger = SystemLogger("AutoHealService").get_logger()
# ---- Configuration ----
SSH_JUMP_HOST = os.getenv("ELEPHANT_ALPHA_JUMP_HOST", "192.168.0.110")
SSH_JUMP_USER = os.getenv("ELEPHANT_ALPHA_JUMP_USER", "wooo")
SSH_KEY_PATH = os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH", os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519"))
SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22"))
SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10"))
SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60"))
CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
# ---- Constants ----
_ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"})
# ---- DB / dedup ----
_dedup_lock = threading.Lock()
_dedup_conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False)
_dedup_conn.execute("""
CREATE TABLE IF NOT EXISTS escalation_dedup (
trigger_type TEXT PRIMARY KEY,
last_triggered INTEGER NOT NULL
)
""")
_dedup_conn.commit()
def _store_escalation(trigger_type: str) -> None:
with _dedup_lock:
_dedup_conn.execute(
"INSERT OR REPLACE INTO escalation_dedup (trigger_type, last_triggered) VALUES (?, ?)",
(trigger_type, int(datetime.now().timestamp())),
)
_dedup_conn.commit()
def _load_escalation(trigger_type: str) -> Optional[int]:
with _dedup_lock:
row = _dedup_conn.execute(
"SELECT last_triggered FROM escalation_dedup WHERE trigger_type = ?",
(trigger_type,),
).fetchone()
return row[0] if row else None
# ---- SSH helper ----
def _ensure_key_permissions(key_path: str) -> None:
if not os.path.exists(key_path):
logger.warning("SSH key not found: %s", key_path)
return
try:
os.chmod(key_path, 0o600)
except Exception as e:
logger.warning("Failed to secure SSH key: %s", e)
def _ssh_exec(
jump_host: str,
jump_user: str,
target_host: str,
target_user: str,
command: List[str],
key_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
Execute command on target_host via SSH jump host.
command must be a list (argv) to avoid shell injection.
"""
import subprocess
safe_key = key_path or SSH_KEY_PATH
_ensure_key_permissions(safe_key)
full_cmd = [
"ssh",
"-p", str(SSH_PORT),
"-i", safe_key,
"-o", "StrictHostKeyChecking=no",
"-o", "BatchMode=yes",
"-o", f"ConnectTimeout={SSH_CONNECT_TIMEOUT}",
"-o", "ServerAliveInterval=15",
"-o", "ServerAliveCountMax=3",
"-J", f"{jump_user}@{jump_host}",
f"{target_user}@{target_host}",
"--",
*command,
]
try:
result = subprocess.run(
full_cmd,
shell=False,
capture_output=True,
text=True,
timeout=SSH_COMMAND_TIMEOUT,
)
return {
"success": result.returncode == 0,
"exit_code": result.returncode,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
"command": command,
}
except subprocess.TimeoutExpired:
return {"success": False, "exit_code": -1, "stdout": "", "stderr": "SSH timeout", "command": command}
except Exception as e:
logger.warning("SSH exec error: %s", e)
return {"success": False, "exit_code": -1, "stdout": "", "stderr": str(e), "command": command}
# ---- PlayBook ----
def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]:
try:
session = get_session()
row = session.execute(
text("SELECT id, name, action_type, action_params, max_retries, cooldown_min FROM playbooks WHERE error_type = :et AND is_active = TRUE"),
{"et": error_type},
).fetchone()
if row:
return {
"id": row.id,
"name": row.name,
"action_type": row.action_type,
"action_params": json.loads(row.action_params) if row.action_params else {},
"max_retries": row.max_retries,
"cooldown_min": row.cooldown_min,
}
return None
finally:
session.close()
# ---- Executor ----
@dataclass
class AutoHealResult:
success: bool
action: str
message: str
commit_sha: Optional[str] = None
reverted: bool = False
class AutoHealService:
"""
ADR-013: resource_optimization trigger → auto_heal_service.handle_exception
"""
def __init__(self):
self._log = logger
def handle_exception(
self,
error_type: str,
context: Optional[Dict[str, Any]] = None,
) -> AutoHealResult:
context = context or {}
self._log.info("[AutoHeal] handle_exception: error_type=%s context=%s", error_type, context)
playbook = _find_playbook(error_type)
if not playbook:
msg = f"No playbook matched for error_type={error_type}"
self._log.info("[AutoHeal] %s", msg)
return AutoHealResult(success=False, action=None, message=msg)
if playbook["action_type"] not in _ALLOWED_ACTION_TYPES:
msg = f"action_type '{playbook['action_type']}' is not allowed"
self._log.warning("[AutoHeal] %s", msg)
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
if playbook["action_type"] == "CODE_FIX":
return self._handle_code_fix(playbook, context)
# cooldown guard
last = _load_escalation(playbook["action_type"])
if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]:
msg = f"Cooldown active for {playbook['action_type']}"
self._log.info("[AutoHeal] %s", msg)
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
# generic action execution
return self._execute_playbook_action(playbook, context)
def _handle_code_fix(self, playbook: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
from services.aider_heal_executor import execute_code_fix
target_file = context.get("target_file", "")
error_type = context.get("error_type", "UnknownError")
error_message = context.get("error_message", "")
if not target_file:
return AutoHealResult(success=False, action="CODE_FIX", message="target_file missing")
try:
return execute_code_fix(
error_type=error_type,
error_message=error_message,
target_file=target_file,
context=context,
)
except Exception as e:
self._log.exception("[AutoHeal] CODE_FIX failed: %s", e)
return AutoHealResult(success=False, action="CODE_FIX", message=f"Exception: {e}")
def _execute_playbook_action(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
) -> AutoHealResult:
action_type = playbook["action_type"]
params = playbook.get("action_params", {})
started_at = datetime.now()
if action_type == "DOCKER_RESTART":
result = self._docker_restart(params)
elif action_type == "WAIT_RETRY":
result = self._wait_retry(params)
elif action_type == "ALERT_ONLY":
result = self._alert_only(params)
elif action_type == "SSH_CMD":
result = self._ssh_cmd(params, context)
else:
return AutoHealResult(success=False, action=action_type, message="Unhandled action_type")
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
self._alert_and_store(playbook, context, result, duration_ms)
return result
def _docker_restart(self, params: Dict[str, Any]) -> AutoHealResult:
container = params.get("container")
if not container:
return AutoHealResult(success=False, action="DOCKER_RESTART", message="missing container")
safe_container = re.sub(r"[^a-zA-Z0-9._-]", "", container)
if safe_container != container:
return AutoHealResult(success=False, action="DOCKER_RESTART", message=f"unsafe container: {container}")
result = _ssh_exec(
jump_host=SSH_JUMP_HOST,
jump_user=SSH_JUMP_USER,
target_host="192.168.0.188",
target_user="ollama",
command=["docker", "restart", safe_container],
key_path=os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH"),
)
if result["success"]:
msg = f"Container {safe_container} restarted (SSH jump)"
self._log.info("[AutoHeal] %s", msg)
else:
msg = f"Container restart failed: {result.get('stderr','')[:200]}"
self._log.error("[AutoHeal] %s", msg)
return AutoHealResult(success=result["success"], action="DOCKER_RESTART", message=msg)
def _wait_retry(self, params: Dict[str, Any]) -> AutoHealResult:
wait_min = min(int(params.get("wait_minutes", 5)), 30)
return AutoHealResult(success=True, action="WAIT_RETRY", message=f"Waiting {wait_min} min")
def _alert_only(self, params: Dict[str, Any]) -> AutoHealResult:
msg = params.get("message", "Alert sent")
return AutoHealResult(success=True, action="ALERT_ONLY", message=msg)
def _ssh_cmd(self, params: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
argv = params.get("argv")
if not isinstance(argv, list) or not argv:
return AutoHealResult(success=False, action="SSH_CMD", message="argv must be a non-empty list")
host = params.get("host", "")
user = params.get("user", "ollama")
try:
from services.elephant_alpha_orchestrator import elephant_orchestrator
# validate host/user via orchestrator context (lightweight)
_ = elephant_orchestrator._validate_host_port(host)
except Exception:
pass
result = _ssh_exec(
jump_host=SSH_JUMP_HOST,
jump_user=SSH_JUMP_USER,
target_host=host,
target_user=user,
command=argv,
key_path=os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH"),
)
out = result["stdout"] or result["stderr"]
return AutoHealResult(success=result["success"], action="SSH_CMD", message=out)
def _alert_and_store(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
result: "AutoHealResult",
duration_ms: float = 0.0,
) -> None:
_store_escalation(playbook["action_type"])
self._log.info("[AutoHeal] Alert stored for: %s", playbook["action_type"])
self._write_heal_log(playbook, context, result, duration_ms)
def _write_heal_log(
self,
playbook: Dict[str, Any],
context: Dict[str, Any],
result: "AutoHealResult",
duration_ms: float,
) -> None:
"""Write heal execution record to heal_logs table (ADR-013 Step 6/7)."""
from database.autoheal_models import HealLog
incident_id = context.get("incident_id")
if not incident_id:
self._log.warning("[AutoHeal] _write_heal_log: incident_id missing in context, skipping DB write")
return
session = get_session()
try:
log_entry = HealLog(
incident_id=int(incident_id),
playbook_id=int(playbook["id"]),
action_type=playbook["action_type"],
action_detail=json.dumps(playbook.get("action_params", {}), ensure_ascii=False),
result="success" if result.success else "failed",
result_output=result.message[:2000] if result.message else None,
duration_ms=duration_ms,
)
session.add(log_entry)
session.commit()
self._log.info(
"[AutoHeal] heal_log written: incident_id=%s playbook=%s result=%s duration_ms=%.1f",
incident_id, playbook["id"], log_entry.result, duration_ms,
)
except Exception as exc:
self._log.error("[AutoHeal] _write_heal_log DB error: %s", exc)
session.rollback()
finally:
session.close()