Files
ewoooc/services/aider_heal_executor.py
ogt bf5f0d256a fix(aiops): resolve ADR-014 logical bugs
- Fixed target_file context passing in auto_heal_service
- Fixed docker log scanning inside momo-scheduler using SSHJumpExecutor
- Fixed AiderHealExecutor SSH key path
2026-04-20 23:25:49 +08:00

410 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
services/aider_heal_executor.py
ADR-014: Autonomous Code Heal Pipeline
通过 SSH 在 110 主机执行 Aider自动修复 momo-pro repo 的程式碼问题,
修复后直接 git push触发 Gitea CD Pipeline 部署。
安全护拦:
L1 - 文件白名单(只改 services/ routes/ database/ 内 .py
L2 - diff 限制(>50 行 → 拒绝,不 push
L3 - 每小时最多 5 次 CODE_FIX
L4 - health check 失败 → 自动 git revert + push
L5 - Telegram 通知每次修复结果(成功/失败/回滚)
"""
import os
import re
import time
import subprocess
import threading
import shlex
import requests
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from pathlib import Path
from services.logger_manager import SystemLogger
logger = SystemLogger("AiderHealExecutor").get_logger()
# ── 配置 ──────────────────────────────────────────────────────────────────────
HEAL_SSH_HOST: str = os.getenv("HEAL_SSH_HOST", "192.168.0.110")
HEAL_SSH_USER: str = os.getenv("HEAL_SSH_USER", "wooo")
HEAL_SSH_KEY_DEFAULT = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "config", "autoheal_id_ed25519"))
HEAL_SSH_KEY = os.getenv("DEPLOY_SSH_KEY_PATH", HEAL_SSH_KEY_DEFAULT)
HEAL_SSH_PORT: int = int(os.getenv("HEAL_SSH_PORT", "22"))
REPO_PATH_110: str = os.getenv("AIDER_REPO_PATH", "/home/wooo/ewoooc")
GITEA_REMOTE: str = "origin"
HEALTH_CHECK_URL: str = (
os.getenv("MOMO_BASE_URL", "https://mo.wooo.work").rstrip("/") + "/health"
)
OLLAMA_API_BASE: str = os.getenv("OLLAMA_API_BASE", "http://192.168.0.111:11434")
AIDER_MODEL: str = os.getenv("AIDER_MODEL", "ollama/qwen2.5-coder:7b")
MAX_DIFF_LINES: int = int(os.getenv("AIDER_MAX_DIFF_LINES", "50"))
MAX_HOURLY_FIX: int = int(os.getenv("AIDER_MAX_HOURLY_FIX", "5"))
TELEGRAM_BOT_TOKEN: str = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID: str = os.getenv("TELEGRAM_CHAT_ID", "")
# 允许 Aider 修改的路径(正则)
ALLOWED_FILE_PATTERN = re.compile(
r"^(services|routes|database)/[a-zA-Z0-9_]+\.py$"
)
# ── 速率控制(线程安全) ─────────────────────────────────────────────────────
_lock: threading.Lock = threading.Lock()
_fix_history: List[float] = []
_last_host_reset: float = time.monotonic()
def _enforce_rate_limit() -> bool:
"""
每小时最多 MAX_HOURLY_FIX 次修复。
使用单调时钟避免系统时间跳变影响。
"""
global _last_host_reset, _fix_history
now = time.monotonic()
with _lock:
# 每小时重置一次计数(基于单调时钟的近似小时窗口)
if now - _last_host_reset > 3600.0:
_fix_history.clear()
_last_host_reset = now
if len(_fix_history) >= MAX_HOURLY_FIX:
return False
_fix_history.append(now)
return True
def _ssh_exec(
cmd: str,
cwd: Optional[str] = None,
timeout: int = 60,
check: bool = True,
) -> tuple[int, str, str]:
"""
在远程主机执行命令(通过 SSH
返回 (returncode, stdout, stderr)
"""
safe_cmd = cmd.replace('"', '\\"').replace("`", "\\`").replace("$", "\\$")
full_cmd = (
f"ssh -p {HEAL_SSH_PORT} -i {shlex.quote(HEAL_SSH_KEY)} "
f"-o StrictHostKeyChecking=no "
f"-o ConnectTimeout=10 "
f"{HEAL_SSH_USER}@{HEAL_SSH_HOST} {shlex.quote(safe_cmd)}"
)
try:
result = subprocess.run(
full_cmd,
shell=True,
capture_output=True,
text=True,
cwd=cwd,
timeout=timeout,
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", f"SSH timeout after {timeout}s"
except Exception as e:
return -1, "", str(e)
def _http_get_json(url: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
try:
resp = requests.get(url, timeout=timeout)
if resp.status_code == 200:
return resp.json()
except Exception:
pass
return None
def _wait_for_health(
url: str,
timeout_seconds: int = 120,
interval_seconds: int = 10,
) -> bool:
"""
持续轮询健康检查,直到成功或超时。
"""
deadline = time.monotonic() + timeout_seconds
while time.monotonic() < deadline:
data = _http_get_json(url)
if data and data.get("status") == "ok":
return True
time.sleep(interval_seconds)
return False
def _notify_telegram(message_html: str) -> None:
"""非阻塞通知,失败静默忽略。"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return
try:
requests.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
json={"chat_id": TELEGRAM_CHAT_ID, "text": message_html, "parse_mode": "HTML"},
timeout=5,
)
except Exception:
pass
def _git_cmd(
repo_path: str,
args: List[str],
timeout: int = 30,
check: bool = True,
) -> tuple[int, str, str]:
"""在 repo_path 下执行 git 命令。"""
return _ssh_exec(
f"cd {shlex.quote(repo_path)} && git " + " ".join(shlex.quote(a) for a in args),
cwd=repo_path,
timeout=timeout,
check=check,
)
def execute_code_fix(
error_type: str,
error_message: str,
target_file: str,
context: Optional[dict] = None,
) -> Dict[str, Any]:
"""
主要入口:针对指定文件执行 Aider 自动修复并推版。
返回结构:
{
'success': bool,
'action': 'CODE_FIX',
'message': str,
'commit_sha': str | None,
'reverted': bool,
}
"""
ts = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
ctx: Dict[str, Any] = context or {}
repo = Path(REPO_PATH_110).expanduser()
# L1文件白名单
if not ALLOWED_FILE_PATTERN.match(target_file):
reason = f"[AiderHeal] 文件不在白名单: {target_file}"
logger.warning("event=heal_reject reason=%s file=%s", reason, target_file)
return {
"success": False,
"action": "CODE_FIX",
"message": reason,
"commit_sha": None,
"reverted": False,
}
# L3速率限制
if not _enforce_rate_limit():
reason = f"[AiderHeal] 每小时上限 {MAX_HOURLY_FIX} 次,跳过"
logger.warning("event=rate_limit file=%s", target_file)
return {
"success": False,
"action": "CODE_FIX",
"message": reason,
"commit_sha": None,
"reverted": False,
}
_notify_telegram(
f"🔧 <b>AiderHeal 启动</b>\n"
f"├ 错误类型: <code>{error_type}</code>\n"
f"├ 目标文件: <code>{target_file}</code>\n"
f"└ 时间: {ts}"
)
logger.info("event=heal_start error_type=%s file=%s", error_type, target_file)
# ── Step 1准备 repo在 110 上) ────────────────────────────────────────
setup_cmds = (
f"cd {REPO_PATH_110} && "
f"git fetch {GITEA_REMOTE} main 2>&1 && "
f"git reset --hard {GITEA_REMOTE}/main 2>&1 && "
f"git stash 2>&1 || true"
)
rc, out, err = _ssh_exec(setup_cmds, timeout=30)
if rc != 0:
msg = f"[AiderHeal] git 准备失败: {err or out}"
logger.error("event=setup_failed error=%s", msg)
_notify_telegram(f"❌ AiderHeal 失败git 准备)\n<code>{msg}</code>")
return {
"success": False,
"action": "CODE_FIX",
"message": msg,
"commit_sha": None,
"reverted": False,
}
# ── Step 2构造 Aider 指令 ───────────────────────────────────────────────
safe_error = error_message[:500].replace('"', "'").replace("`", "'").replace("$", "")
instruction = (
f"Fix the following {error_type} in this file. "
f"Only fix what is necessary, do not refactor or add features. "
f"Error: {safe_error}"
)
aider_cmd = (
f"cd {REPO_PATH_110} && "
f"PATH=/home/wooo/.local/bin:$PATH OLLAMA_API_BASE={OLLAMA_API_BASE} "
f"aider --model {AIDER_MODEL} "
f"--yes-always --no-git "
f'--message "{instruction}" '
f"{shlex.quote(target_file)} 2>&1"
)
logger.info("event=aider_exec file=%s", target_file)
rc, aider_out, aider_err = _ssh_exec(aider_cmd, timeout=180)
logger.debug("event=aider_output snippet=%s", (aider_out or aider_err)[:300])
# ── Step 3diff 评估L2 护拦) ─────────────────────────────────────────
# 使用 git diff --numstat 获取有意义的变更行数(增加+删除)
numstat_cmd = (
f"cd {REPO_PATH_110} && "
f"git diff --numstat HEAD 2>&1 | awk '{{added+=$1; deleted+=$2}} END{{print added+deleted}}'"
)
rc2, diff_lines_str, _ = _ssh_exec(numstat_cmd, timeout=10)
diff_lines = int(diff_lines_str.strip()) if rc2 == 0 and diff_lines_str.strip().isdigit() else 0
if diff_lines == 0:
msg = "[AiderHeal] Aider 未产生任何修改diff=0可能已自动解决或模型失效"
logger.warning("event=no_diff file=%s", target_file)
_notify_telegram(f"⚠️ AiderHeal无修改产生\n<code>{target_file}</code>")
return {
"success": False,
"action": "CODE_FIX",
"message": msg,
"commit_sha": None,
"reverted": False,
}
if diff_lines > MAX_DIFF_LINES:
# 改动太大,丢弃并告警
_, _, _ = _ssh_exec(
f"cd {REPO_PATH_110} && git checkout -- . 2>&1", timeout=10
)
msg = (
f"[AiderHeal] diff 超出限制 {diff_lines} > {MAX_DIFF_LINES} 行,"
f"已丢弃,需人工介入"
)
logger.warning("event=diff_too_large file=%s diff_lines=%d", target_file, diff_lines)
_notify_telegram(
f"⚠️ <b>AiderHealdiff 过大,需人工审核</b>\n"
f"├ 文件: <code>{target_file}</code>\n"
f"├ diff: {diff_lines} 行(上限 {MAX_DIFF_LINES}\n"
f"└ 错误: <code>{error_type}</code>"
)
return {
"success": False,
"action": "CODE_FIX",
"message": msg,
"commit_sha": None,
"reverted": False,
}
# ── Step 4提交并推送 ───────────────────────────────────────────────────
fix_msg = (
f"fix(autoheal): [{error_type}] auto-fix {target_file}\n\n"
f"Triggered by AiderHealExecutor (ADR-014)\n"
f"Error: {safe_error[:200]}"
)
commit_cmd = (
f"cd {REPO_PATH_110} && "
f'git add {shlex.quote(target_file)} && '
f'git commit -m {shlex.quote(fix_msg)} 2>&1 && '
f"git push {GITEA_REMOTE} main 2>&1"
)
rc3, commit_out, commit_err = _ssh_exec(commit_cmd, timeout=30)
# 获取最新的 commit SHA从 push 后的 HEAD 获取,更可靠)
_, commit_sha, _ = _git_cmd(REPO_PATH_110, ["log", "-1", "--format=%H"], timeout=10)
commit_sha = commit_sha.strip() or "unknown"
if rc3 != 0:
msg = f"[AiderHeal] git push 失败: {commit_err or commit_out}"
logger.error("event=push_failed error=%s", msg)
_notify_telegram(f"❌ AiderHeal git push 失败\n<code>{msg}</code>")
return {
"success": False,
"action": "CODE_FIX",
"message": msg,
"commit_sha": None,
"reverted": False,
}
logger.info("event=push_ok commit=%s", commit_sha)
_notify_telegram(
f"🚀 <b>AiderHeal push 完成</b>\n"
f"├ commit: <code>{commit_sha}</code>\n"
f"├ 文件: <code>{target_file}</code>\n"
f"└ 等待健康检查..."
)
# ── Step 5健康检查L4 护拦) ──────────────────────────────────────────
time.sleep(10) # 给部署一点启动缓冲
healthy = _wait_for_health(HEALTH_CHECK_URL, timeout_seconds=120, interval_seconds=10)
if healthy:
msg = f"[AiderHeal] 修复成功并部署完成: {target_file} ({commit_sha})"
logger.info("event=heal_success commit=%s file=%s", commit_sha, target_file)
_notify_telegram(
f"✅ <b>AiderHeal 修复完成</b>\n"
f"├ 错误: <code>{error_type}</code>\n"
f"├ 文件: <code>{target_file}</code>\n"
f"├ commit: <code>{commit_sha}</code>\n"
f"└ diff: {diff_lines}"
)
return {
"success": True,
"action": "CODE_FIX",
"message": msg,
"commit_sha": commit_sha,
"reverted": False,
}
# ── Step 6健康检查失败 → 自动 revertL4 护拦) ─────────────────────────
logger.error("event=health_check_failed commit=%s", commit_sha)
_, revert_out, revert_err = _ssh_exec(
f"cd {REPO_PATH_110} && "
f"git revert --no-edit {commit_sha} 2>&1 && "
f"git push {GITEA_REMOTE} main 2>&1",
timeout=30,
)
_, revert_sha, _ = _git_cmd(REPO_PATH_110, ["log", "-1", "--format=%H"], timeout=10)
revert_sha = revert_sha.strip() or "unknown"
if "error" not in revert_out.lower() and "error" not in revert_err.lower():
msg = (
f"[AiderHeal] 健康检查失败,已自动回滚: "
f"{commit_sha}{revert_sha}"
)
logger.warning("event=reverted commit=%s to=%s", commit_sha, revert_sha)
_notify_telegram(
f"🔄 <b>AiderHeal 自动回滚</b>\n"
f"├ 原 commit: <code>{commit_sha}</code>\n"
f"├ 回滚 commit: <code>{revert_sha}</code>\n"
f"└ 需人工排查: <code>{error_type}</code> in <code>{target_file}</code>"
)
else:
msg = f"[AiderHeal] 回滚失败!需立即人工介入: {revert_err}"
logger.critical("event=revert_failed commit=%s error=%s", commit_sha, revert_err)
_notify_telegram(
f"🚨 <b>AiderHeal 回滚失败!请立即人工介入</b>\n<code>{msg}</code>"
)
return {
"success": False,
"action": "CODE_FIX",
"message": msg,
"commit_sha": commit_sha,
"reverted": True,
}