588 lines
23 KiB
Python
588 lines
23 KiB
Python
"""
|
||
auto_heal_service.py
|
||
|
||
ADR-013 AIOps 自動修復服務。
|
||
|
||
支援的動作:
|
||
DOCKER_RESTART — 在指定主機重啟 Docker 服務
|
||
WAIT_RETRY — 等待後重試(不做系統操作)
|
||
ALERT_ONLY — 只記錄 / 發 Telegram,不執行
|
||
SSH_CMD — 執行 PlayBook 指定的靜態白名單命令(list 型)
|
||
CODE_FIX — ADR-014: Aider 自動修覆程式碼並推版
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import threading
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
from sqlalchemy import text
|
||
|
||
from services.logger_manager import SystemLogger
|
||
from services.ai_automation_metrics import record_autoheal_action
|
||
from database.manager import get_session
|
||
from utils.ssh_helper import run_ssh_command
|
||
|
||
logger = SystemLogger("AutoHealService").get_logger()
|
||
|
||
# ---- Configuration ----
|
||
SSH_JUMP_HOST = os.getenv("ELEPHANT_ALPHA_JUMP_HOST", "192.168.0.110")
|
||
SSH_JUMP_USER = os.getenv("ELEPHANT_ALPHA_JUMP_USER", "wooo")
|
||
SSH_KEY_PATH = os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH", os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519"))
|
||
SSH_PORT = int(os.getenv("ELEPHANT_ALPHA_SSH_PORT", "22"))
|
||
SSH_CONNECT_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_CONNECT_TIMEOUT", "10"))
|
||
SSH_COMMAND_TIMEOUT = int(os.getenv("ELEPHANT_ALPHA_SSH_COMMAND_TIMEOUT", "60"))
|
||
|
||
CACHE_DB_PATH = os.getenv("ELEPHANT_ALPHA_CACHE_DB", ":memory:")
|
||
ESCALATION_COOLDOWN_MIN = int(os.getenv("ELEPHANT_ALPHA_ESCALATION_COOLDOWN_MIN", "30"))
|
||
|
||
# ---- Constants ----
|
||
_ALLOWED_ACTION_TYPES = frozenset({"DOCKER_RESTART", "WAIT_RETRY", "ALERT_ONLY", "SSH_CMD", "CODE_FIX"})
|
||
_PROTECTED_CONTAINERS = frozenset({"momo-db", "momo-postgres"})
|
||
_ALLOWED_SSH_HOSTS = frozenset(
|
||
h.strip() for h in os.getenv("ELEPHANT_ALPHA_ALLOWED_SSH_HOSTS", "192.168.0.188").split(",") if h.strip()
|
||
)
|
||
_ALLOWED_SSH_DOCKER_SUBCOMMANDS = frozenset({"ps", "logs", "inspect"})
|
||
_ALLOWED_SSH_READONLY_COMMANDS = frozenset({"df", "free", "uptime"})
|
||
_OFFLINE_PLAYBOOKS = {
|
||
"DNS_FAIL": {
|
||
"id": 0,
|
||
"name": "Offline DNS/DB guard",
|
||
"action_type": "ALERT_ONLY",
|
||
"action_params": {"message": "DB/DNS 異常,依 ADR-011 不重啟 momo-db,轉人工檢查。"},
|
||
"max_retries": 1,
|
||
"cooldown_min": 30,
|
||
},
|
||
"DB_UNREACHABLE": {
|
||
"id": 0,
|
||
"name": "Offline DB guard",
|
||
"action_type": "ALERT_ONLY",
|
||
"action_params": {"message": "DB 無法連線,依 ADR-011 不重啟 momo-db,轉人工檢查。"},
|
||
"max_retries": 1,
|
||
"cooldown_min": 30,
|
||
},
|
||
}
|
||
|
||
# ---- DB / dedup ----
|
||
_dedup_lock = threading.Lock()
|
||
_dedup_conn = sqlite3.connect(CACHE_DB_PATH, check_same_thread=False)
|
||
_dedup_conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS escalation_dedup (
|
||
trigger_type TEXT PRIMARY KEY,
|
||
last_triggered INTEGER NOT NULL
|
||
)
|
||
""")
|
||
_dedup_conn.commit()
|
||
|
||
|
||
def _store_escalation(trigger_type: str) -> None:
|
||
with _dedup_lock:
|
||
_dedup_conn.execute(
|
||
"INSERT OR REPLACE INTO escalation_dedup (trigger_type, last_triggered) VALUES (?, ?)",
|
||
(trigger_type, int(datetime.now().timestamp())),
|
||
)
|
||
_dedup_conn.commit()
|
||
|
||
|
||
def _load_escalation(trigger_type: str) -> Optional[int]:
|
||
with _dedup_lock:
|
||
row = _dedup_conn.execute(
|
||
"SELECT last_triggered FROM escalation_dedup WHERE trigger_type = ?",
|
||
(trigger_type,),
|
||
).fetchone()
|
||
return row[0] if row else None
|
||
|
||
|
||
def _ssh_exec(
|
||
jump_host: str,
|
||
jump_user: str,
|
||
target_host: str,
|
||
target_user: str,
|
||
command: List[str],
|
||
key_path: Optional[str] = None,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Execute command on target_host via SSH jump host.
|
||
command must be a list (argv) to avoid shell injection.
|
||
"""
|
||
safe_key = key_path or SSH_KEY_PATH
|
||
result = run_ssh_command(
|
||
host=target_host,
|
||
user=target_user,
|
||
command=command,
|
||
port=SSH_PORT,
|
||
key_path=safe_key,
|
||
connect_timeout=SSH_CONNECT_TIMEOUT,
|
||
command_timeout=SSH_COMMAND_TIMEOUT,
|
||
jump_host=jump_host,
|
||
jump_user=jump_user,
|
||
batch_mode=True,
|
||
server_alive_interval=15,
|
||
server_alive_count_max=3,
|
||
logger=logger,
|
||
)
|
||
return {
|
||
"success": result.success,
|
||
"exit_code": result.returncode,
|
||
"stdout": result.stdout,
|
||
"stderr": "SSH timeout" if result.stderr.startswith("SSH timeout after ") else result.stderr,
|
||
"command": command,
|
||
}
|
||
|
||
|
||
# ---- PlayBook ----
|
||
def _find_playbook(error_type: str) -> Optional[Dict[str, Any]]:
|
||
session = None
|
||
try:
|
||
session = get_session()
|
||
row = session.execute(
|
||
text("SELECT id, name, action_type, action_params, max_retries, cooldown_min FROM playbooks WHERE error_type = :et AND is_active = TRUE"),
|
||
{"et": error_type},
|
||
).fetchone()
|
||
if row:
|
||
return {
|
||
"id": row.id,
|
||
"name": row.name,
|
||
"action_type": row.action_type,
|
||
"action_params": json.loads(row.action_params) if row.action_params else {},
|
||
"max_retries": row.max_retries,
|
||
"cooldown_min": row.cooldown_min,
|
||
}
|
||
return None
|
||
except Exception as exc:
|
||
logger.warning("[AutoHeal] playbook lookup failed for %s: %s", error_type, exc)
|
||
return None
|
||
finally:
|
||
if session is not None:
|
||
session.close()
|
||
|
||
|
||
# ---- Executor ----
|
||
@dataclass
|
||
class AutoHealResult:
|
||
success: bool
|
||
action: str
|
||
message: str
|
||
commit_sha: Optional[str] = None
|
||
reverted: bool = False
|
||
|
||
|
||
class AutoHealService:
|
||
"""
|
||
ADR-013: resource_optimization trigger → auto_heal_service.handle_exception
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._log = logger
|
||
|
||
def handle_exception(
|
||
self,
|
||
error_type: Optional[str] = None,
|
||
context: Optional[Dict[str, Any]] = None,
|
||
**legacy_kwargs: Any,
|
||
) -> AutoHealResult:
|
||
context = dict(context or {})
|
||
if legacy_kwargs:
|
||
context.update({k: v for k, v in legacy_kwargs.items() if v is not None})
|
||
|
||
error_type = error_type or context.get("error_type") or self._derive_error_type(context)
|
||
context.setdefault("error_type", error_type)
|
||
self._log.info("[AutoHeal] handle_exception: error_type=%s context=%s", error_type, context)
|
||
|
||
incident_id = self._ensure_incident(error_type, context)
|
||
if incident_id is not None:
|
||
context["incident_id"] = incident_id
|
||
|
||
playbook = _find_playbook(error_type) or self._offline_playbook(error_type)
|
||
if not playbook:
|
||
msg = f"No playbook matched for error_type={error_type}"
|
||
self._log.info("[AutoHeal] %s", msg)
|
||
record_autoheal_action(action="NO_PLAYBOOK", error_type=error_type, success=False)
|
||
return AutoHealResult(success=False, action=None, message=msg)
|
||
|
||
playbook_id = int(playbook.get("id") or 0)
|
||
if playbook_id > 0:
|
||
context["matched_playbook_id"] = playbook_id
|
||
context["playbook_id"] = playbook_id
|
||
|
||
if playbook["action_type"] not in _ALLOWED_ACTION_TYPES:
|
||
msg = f"action_type '{playbook['action_type']}' is not allowed"
|
||
self._log.warning("[AutoHeal] %s", msg)
|
||
record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False)
|
||
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
|
||
|
||
# cooldown guard
|
||
cooldown_key = self._cooldown_key(playbook)
|
||
last = _load_escalation(cooldown_key)
|
||
if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]:
|
||
msg = f"Cooldown active for {playbook['action_type']}"
|
||
self._log.info("[AutoHeal] %s", msg)
|
||
record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False)
|
||
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
|
||
|
||
if playbook["action_type"] == "CODE_FIX":
|
||
started_at = datetime.now()
|
||
result = self._handle_code_fix(playbook, context)
|
||
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
|
||
self._alert_and_store(playbook, context, result, duration_ms)
|
||
record_autoheal_action(
|
||
action=result.action or playbook["action_type"],
|
||
error_type=error_type,
|
||
success=result.success,
|
||
duration_ms=duration_ms,
|
||
)
|
||
return result
|
||
|
||
# generic action execution
|
||
return self._execute_playbook_action(playbook, context)
|
||
|
||
def _derive_error_type(self, context: Dict[str, Any]) -> str:
|
||
error_text = str(context.get("exception") or context.get("error_message") or "").lower()
|
||
traceback_text = str(context.get("traceback_str") or "").lower()
|
||
combined = f"{error_text}\n{traceback_text}"
|
||
|
||
if "could not translate host name" in combined or "temporary failure in name resolution" in combined:
|
||
return "DNS_FAIL"
|
||
if (
|
||
"db_connection_error" in combined
|
||
or "connection refused" in combined
|
||
or "could not connect to server" in combined
|
||
or "operationalerror" in combined
|
||
):
|
||
return "DB_UNREACHABLE"
|
||
if "timeout" in combined:
|
||
return "crawler_timeout"
|
||
if "nvidia" in combined and "quota" in combined:
|
||
return "nim_quota_exhausted"
|
||
if "embedding" in combined:
|
||
return "embedding_failure"
|
||
if (
|
||
context.get("target_file")
|
||
or "traceback (most recent call last)" in combined
|
||
or "nameerror" in combined
|
||
or "attributeerror" in combined
|
||
or "typeerror" in combined
|
||
or "importerror" in combined
|
||
or "modulenotfounderror" in combined
|
||
):
|
||
return "python_exception"
|
||
return "scheduler_task_failure"
|
||
|
||
def _offline_playbook(self, error_type: str) -> Optional[Dict[str, Any]]:
|
||
"""Fallback when the primary DB is unavailable during DB/DNS incidents."""
|
||
playbook = _OFFLINE_PLAYBOOKS.get(error_type)
|
||
return dict(playbook) if playbook else None
|
||
|
||
def _cooldown_key(self, playbook: Dict[str, Any]) -> str:
|
||
playbook_id = playbook.get("id")
|
||
if playbook_id:
|
||
return f"playbook:{playbook_id}:{playbook['action_type']}"
|
||
return f"offline:{playbook.get('name', playbook['action_type'])}:{playbook['action_type']}"
|
||
|
||
def _ensure_incident(self, error_type: str, context: Dict[str, Any]) -> Optional[int]:
|
||
session = None
|
||
try:
|
||
from database.autoheal_models import Incident
|
||
|
||
session = get_session()
|
||
incident = Incident(
|
||
task_name=str(context.get("task_name") or context.get("source") or "unknown_task"),
|
||
error_type=error_type,
|
||
error_message=str(context.get("exception") or context.get("error_message") or error_type)[:2000],
|
||
traceback_str=str(context.get("traceback_str") or "")[:8000] or None,
|
||
error_traceback=str(context.get("traceback_str") or "")[:8000] or None,
|
||
severity=str(context.get("severity") or "medium"),
|
||
status="healing",
|
||
retry_count=0,
|
||
)
|
||
session.add(incident)
|
||
session.flush()
|
||
incident_id = incident.id
|
||
session.commit()
|
||
self._log.info("[AutoHeal] incident created: id=%s error_type=%s", incident_id, error_type)
|
||
return incident_id
|
||
except Exception as exc:
|
||
if session is not None:
|
||
session.rollback()
|
||
self._log.error("[AutoHeal] incident create failed: %s", exc)
|
||
return None
|
||
finally:
|
||
if session is not None:
|
||
session.close()
|
||
|
||
def _handle_code_fix(self, playbook: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
|
||
from services.aider_heal_executor import execute_code_fix
|
||
target_file = context.get("target_file", "")
|
||
error_type = context.get("error_type", "UnknownError")
|
||
error_message = context.get("error_message", "")
|
||
if not target_file:
|
||
return AutoHealResult(success=False, action="CODE_FIX", message="target_file missing")
|
||
try:
|
||
return execute_code_fix(
|
||
error_type=error_type,
|
||
error_message=error_message,
|
||
target_file=target_file,
|
||
context=context,
|
||
)
|
||
except Exception as e:
|
||
self._log.exception("[AutoHeal] CODE_FIX failed: %s", e)
|
||
return AutoHealResult(success=False, action="CODE_FIX", message=f"Exception: {e}")
|
||
|
||
def _execute_playbook_action(
|
||
self,
|
||
playbook: Dict[str, Any],
|
||
context: Dict[str, Any],
|
||
) -> AutoHealResult:
|
||
action_type = playbook["action_type"]
|
||
params = playbook.get("action_params", {})
|
||
|
||
started_at = datetime.now()
|
||
if action_type == "DOCKER_RESTART":
|
||
result = self._docker_restart(params)
|
||
elif action_type == "WAIT_RETRY":
|
||
result = self._wait_retry(params)
|
||
elif action_type == "ALERT_ONLY":
|
||
result = self._alert_only(params)
|
||
elif action_type == "SSH_CMD":
|
||
result = self._ssh_cmd(params, context)
|
||
else:
|
||
return AutoHealResult(success=False, action=action_type, message="Unhandled action_type")
|
||
|
||
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
|
||
self._alert_and_store(playbook, context, result, duration_ms)
|
||
record_autoheal_action(
|
||
action=result.action or action_type,
|
||
error_type=str(context.get("error_type") or "unknown"),
|
||
success=result.success,
|
||
duration_ms=duration_ms,
|
||
)
|
||
return result
|
||
|
||
def _docker_restart(self, params: Dict[str, Any]) -> AutoHealResult:
|
||
container = params.get("container")
|
||
if not container:
|
||
return AutoHealResult(success=False, action="DOCKER_RESTART", message="missing container")
|
||
safe_container = re.sub(r"[^a-zA-Z0-9._-]", "", container)
|
||
if safe_container != container:
|
||
return AutoHealResult(success=False, action="DOCKER_RESTART", message=f"unsafe container: {container}")
|
||
if safe_container in _PROTECTED_CONTAINERS:
|
||
return AutoHealResult(
|
||
success=False,
|
||
action="DOCKER_RESTART",
|
||
message=f"protected container blocked by ADR-011: {safe_container}",
|
||
)
|
||
|
||
result = _ssh_exec(
|
||
jump_host=SSH_JUMP_HOST,
|
||
jump_user=SSH_JUMP_USER,
|
||
target_host="192.168.0.188",
|
||
target_user="ollama",
|
||
command=["docker", "restart", safe_container],
|
||
key_path=os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH"),
|
||
)
|
||
if result["success"]:
|
||
msg = f"Container {safe_container} restarted (SSH jump)"
|
||
self._log.info("[AutoHeal] %s", msg)
|
||
else:
|
||
msg = f"Container restart failed: {result.get('stderr','')[:200]}"
|
||
self._log.error("[AutoHeal] %s", msg)
|
||
return AutoHealResult(success=result["success"], action="DOCKER_RESTART", message=msg)
|
||
|
||
def _wait_retry(self, params: Dict[str, Any]) -> AutoHealResult:
|
||
wait_min = min(int(params.get("wait_minutes", 5)), 30)
|
||
return AutoHealResult(success=True, action="WAIT_RETRY", message=f"Waiting {wait_min} min")
|
||
|
||
def _alert_only(self, params: Dict[str, Any]) -> AutoHealResult:
|
||
msg = params.get("message", "Alert sent")
|
||
return AutoHealResult(success=True, action="ALERT_ONLY", message=msg)
|
||
|
||
def _ssh_cmd(self, params: Dict[str, Any], context: Dict[str, Any]) -> AutoHealResult:
|
||
argv = params.get("argv")
|
||
if not isinstance(argv, list) or not argv:
|
||
return AutoHealResult(success=False, action="SSH_CMD", message="argv must be a non-empty list")
|
||
if not self._is_allowed_ssh_argv(argv):
|
||
return AutoHealResult(success=False, action="SSH_CMD", message=f"argv is not in read-only allowlist: {argv}")
|
||
host = params.get("host", "")
|
||
user = params.get("user", "ollama")
|
||
if host not in _ALLOWED_SSH_HOSTS:
|
||
return AutoHealResult(success=False, action="SSH_CMD", message=f"host is not allowed: {host}")
|
||
if user != "ollama":
|
||
return AutoHealResult(success=False, action="SSH_CMD", message=f"user is not allowed: {user}")
|
||
result = _ssh_exec(
|
||
jump_host=SSH_JUMP_HOST,
|
||
jump_user=SSH_JUMP_USER,
|
||
target_host=host,
|
||
target_user=user,
|
||
command=argv,
|
||
key_path=os.getenv("ELEPHANT_ALPHA_SSH_KEY_PATH"),
|
||
)
|
||
out = result["stdout"] or result["stderr"]
|
||
return AutoHealResult(success=result["success"], action="SSH_CMD", message=out)
|
||
|
||
def _is_allowed_ssh_argv(self, argv: List[Any]) -> bool:
|
||
normalized = [str(part) for part in argv]
|
||
if not normalized:
|
||
return False
|
||
if normalized[0] == "docker":
|
||
if len(normalized) < 2 or normalized[1] not in _ALLOWED_SSH_DOCKER_SUBCOMMANDS:
|
||
return False
|
||
return not any(part in _PROTECTED_CONTAINERS for part in normalized[2:])
|
||
if normalized[0] in _ALLOWED_SSH_READONLY_COMMANDS:
|
||
return True
|
||
return False
|
||
|
||
def _alert_and_store(
|
||
self,
|
||
playbook: Dict[str, Any],
|
||
context: Dict[str, Any],
|
||
result: "AutoHealResult",
|
||
duration_ms: float = 0.0,
|
||
) -> None:
|
||
_store_escalation(self._cooldown_key(playbook))
|
||
self._log.info("[AutoHeal] Alert stored for: %s", playbook["action_type"])
|
||
self._write_heal_log(playbook, context, result, duration_ms)
|
||
self._write_ai_insight(playbook, context, result, duration_ms)
|
||
self._update_incident_status(context, result)
|
||
|
||
def _write_heal_log(
|
||
self,
|
||
playbook: Dict[str, Any],
|
||
context: Dict[str, Any],
|
||
result: "AutoHealResult",
|
||
duration_ms: float,
|
||
) -> None:
|
||
"""Write heal execution record to heal_logs table (ADR-013 Step 6/7)."""
|
||
from database.autoheal_models import HealLog
|
||
incident_id = context.get("incident_id")
|
||
if not incident_id:
|
||
self._log.warning("[AutoHeal] _write_heal_log: incident_id missing in context, skipping DB write")
|
||
return
|
||
session = None
|
||
try:
|
||
session = get_session()
|
||
playbook_id = int(playbook["id"]) if int(playbook.get("id") or 0) > 0 else None
|
||
log_entry = HealLog(
|
||
incident_id=int(incident_id),
|
||
playbook_id=playbook_id,
|
||
action_type=playbook["action_type"],
|
||
action_detail=json.dumps(playbook.get("action_params", {}), ensure_ascii=False),
|
||
result="success" if result.success else "failed",
|
||
result_output=result.message[:2000] if result.message else None,
|
||
duration_ms=duration_ms,
|
||
)
|
||
session.add(log_entry)
|
||
session.commit()
|
||
self._log.info(
|
||
"[AutoHeal] heal_log written: incident_id=%s playbook=%s result=%s duration_ms=%.1f",
|
||
incident_id, playbook["id"], log_entry.result, duration_ms,
|
||
)
|
||
except Exception as exc:
|
||
self._log.error("[AutoHeal] _write_heal_log DB error: %s", exc)
|
||
if session is not None:
|
||
session.rollback()
|
||
finally:
|
||
if session is not None:
|
||
session.close()
|
||
|
||
def _write_ai_insight(
|
||
self,
|
||
playbook: Dict[str, Any],
|
||
context: Dict[str, Any],
|
||
result: "AutoHealResult",
|
||
duration_ms: float,
|
||
) -> None:
|
||
from database.ai_models import AIInsight
|
||
|
||
session = None
|
||
try:
|
||
session = get_session()
|
||
payload = {
|
||
"incident_id": context.get("incident_id"),
|
||
"task_name": context.get("task_name"),
|
||
"error_type": context.get("error_type"),
|
||
"playbook_id": playbook.get("id"),
|
||
"action_type": playbook.get("action_type"),
|
||
"success": result.success,
|
||
"duration_ms": duration_ms,
|
||
}
|
||
insight = AIInsight(
|
||
insight_type="auto_heal_playbook",
|
||
content=(
|
||
f"task={context.get('task_name', 'unknown')} "
|
||
f"error_type={context.get('error_type', 'unknown')} "
|
||
f"action={playbook.get('action_type')} "
|
||
f"result={'success' if result.success else 'failed'} "
|
||
f"message={str(result.message)[:500]}"
|
||
),
|
||
metadata_json=json.dumps(payload, ensure_ascii=False),
|
||
confidence=1.0 if result.success else 0.4,
|
||
created_by="auto_heal_service",
|
||
ai_model="rule_based",
|
||
status="approved",
|
||
)
|
||
session.add(insight)
|
||
session.commit()
|
||
try:
|
||
from services.openclaw_learning_service import enqueue_insight_embedding
|
||
enqueue_insight_embedding(insight.id, "auto_heal_playbook", insight.content)
|
||
except Exception as embed_err:
|
||
self._log.warning("[AutoHeal] embedding queue enqueue failed: %s", embed_err)
|
||
except Exception as exc:
|
||
self._log.error("[AutoHeal] ai_insight write failed: %s", exc)
|
||
if session is not None:
|
||
session.rollback()
|
||
finally:
|
||
if session is not None:
|
||
session.close()
|
||
|
||
def _update_incident_status(self, context: Dict[str, Any], result: "AutoHealResult") -> None:
|
||
incident_id = context.get("incident_id")
|
||
if not incident_id:
|
||
return
|
||
|
||
session = None
|
||
try:
|
||
session = get_session()
|
||
status = "closed" if result.success else "escalated"
|
||
playbook_id = int(
|
||
context.get("matched_playbook_id")
|
||
or context.get("playbook_id")
|
||
or 0
|
||
) or None
|
||
session.execute(
|
||
text("""
|
||
UPDATE incidents
|
||
SET status = :status,
|
||
matched_playbook_id = COALESCE(:playbook_id, matched_playbook_id),
|
||
playbook_id = COALESCE(:playbook_id, playbook_id),
|
||
resolved_at = CASE
|
||
WHEN :status = 'closed' THEN COALESCE(resolved_at, NOW())
|
||
ELSE resolved_at
|
||
END,
|
||
updated_at = NOW()
|
||
WHERE id = :incident_id
|
||
"""),
|
||
{
|
||
"status": status,
|
||
"playbook_id": playbook_id,
|
||
"incident_id": int(incident_id),
|
||
},
|
||
)
|
||
session.commit()
|
||
except Exception as exc:
|
||
if session is not None:
|
||
session.rollback()
|
||
self._log.error("[AutoHeal] incident status update failed: %s", exc)
|
||
finally:
|
||
if session is not None:
|
||
session.close()
|
||
|
||
|
||
auto_heal_service = AutoHealService()
|