feat(aiops): implement ADR-014 Autonomous Code Heal Pipeline
All checks were successful
CD Pipeline / deploy (push) Successful in 1m14s

- Added AiderHealExecutor for SSH remote execution of aider-chat
- Added CODE_FIX action_type to AutoHealService
- Added code_exception trigger to Elephant Alpha engine (Traceback log scanning)
- Added 014 playbook migration script
This commit is contained in:
ogt
2026-04-20 23:13:32 +08:00
parent 4f4e7ef062
commit 3127466a85
4 changed files with 444 additions and 2 deletions

View File

@@ -0,0 +1,11 @@
-- ADR-014: Aider Code Fix Playbook
INSERT INTO aiops_playbook (name, description, trigger_pattern, action_type, action_params, is_active)
VALUES (
'Auto-fix Python Runtime exceptions using Aider (ADR-014)',
'When Elephant Alpha detects python_exception with tracebacks in logs, trigger CODE_FIX',
'python_exception',
'CODE_FIX',
'{"max_diff_lines": 50, "require_health_check": true, "auto_revert_on_fail": true}'::jsonb,
true
)
ON CONFLICT DO NOTHING;

View File

@@ -0,0 +1,306 @@
"""
services/aider_heal_executor.py
ADR-014: Autonomous Code Heal Pipeline
透過 SSH 在 110 主機執行 Aider自動修復 momo-pro repo 的程式碼問題,
修復後直接 git push觸發 Gitea CD Pipeline 部署。
安全護欄:
L1 - 檔案白名單(只改 services/ routes/ database/ 內 .py
L2 - diff 限制(>50 行 → 拒絕,不 push
L3 - 每小時最多 5 次 CODE_FIX
L4 - health check 失敗 → 自動 git revert + push
L5 - Telegram 通知每次修復結果(成功/失敗/回滾)
"""
import os
import re
import time
import subprocess
import threading
import requests
from datetime import datetime, timedelta
from typing import Optional
from services.logger_manager import SystemLogger
logger = SystemLogger("AiderHealExecutor").get_logger()
# ── 設定 ──────────────────────────────────────────────────────────────────────
HEAL_SSH_HOST = "192.168.0.110"
HEAL_SSH_USER = "wooo"
HEAL_SSH_KEY = os.getenv("DEPLOY_SSH_KEY_PATH", "/root/.ssh/id_deploy")
REPO_PATH_110 = os.getenv("AIDER_REPO_PATH", "/home/wooo/ewoooc")
GITEA_REMOTE = "origin"
HEALTH_CHECK_URL = os.getenv("MOMO_BASE_URL", "https://mo.wooo.work") + "/health"
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
AIDER_MODEL = os.getenv("AIDER_MODEL", "gemini/gemini-2.0-flash")
MAX_DIFF_LINES = int(os.getenv("AIDER_MAX_DIFF_LINES", "50"))
MAX_HOURLY_FIX = int(os.getenv("AIDER_MAX_HOURLY_FIX", "5"))
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
# 允許 Aider 修改的路徑(正則)
ALLOWED_FILE_PATTERN = re.compile(
r'^(services|routes|database)/[a-zA-Z0-9_]+\.py$'
)
# ── 速率計數器(執行緒安全) ────────────────────────────────────────────────
_lock = threading.Lock()
_fix_history: list[datetime] = []
def _check_rate_limit() -> bool:
"""回傳 True 表示尚未超限,可執行修復。"""
now = datetime.utcnow()
cutoff = now - timedelta(hours=1)
with _lock:
global _fix_history
_fix_history = [t for t in _fix_history if t > cutoff]
if len(_fix_history) >= MAX_HOURLY_FIX:
return False
_fix_history.append(now)
return True
def _notify_telegram(msg: str):
"""發送 Telegram 通知(非阻塞,忽略失敗)"""
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return
try:
requests.post(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
json={"chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "HTML"},
timeout=5
)
except Exception:
pass
def _ssh_run(cmd: str, timeout: int = 60) -> tuple[int, str, str]:
"""在 110 主機執行指令,回傳 (returncode, stdout, stderr)"""
full_cmd = [
"ssh",
"-i", HEAL_SSH_KEY,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{HEAL_SSH_USER}@{HEAL_SSH_HOST}",
cmd
]
try:
result = subprocess.run(
full_cmd, capture_output=True, text=True, timeout=timeout
)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
return -1, "", f"SSH timeout after {timeout}s"
except Exception as e:
return -1, "", str(e)
def _health_check(retries: int = 6, interval: int = 10) -> bool:
"""等待健康檢查通過,最多 retries * interval 秒"""
for i in range(retries):
try:
r = requests.get(HEALTH_CHECK_URL, timeout=10)
if r.status_code == 200:
return True
except Exception:
pass
if i < retries - 1:
time.sleep(interval)
return False
def execute_code_fix(
error_type: str,
error_message: str,
target_file: str,
context: dict | None = None,
) -> dict:
"""
主要入口:針對指定檔案執行 Aider 自動修復並推版。
Args:
error_type: 錯誤類型(如 'ImportError', 'RuntimeError'
error_message: 完整錯誤訊息(來自容器 log
target_file: 相對於 repo root 的檔案路徑(如 'services/pchome_crawler.py'
context: 額外上下文字典(可選)
Returns:
{
'success': bool,
'action': 'CODE_FIX',
'message': str,
'commit_sha': str | None,
'reverted': bool,
}
"""
ts = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
ctx = context or {}
# L1檔案白名單
if not ALLOWED_FILE_PATTERN.match(target_file):
reason = f"[AiderHeal] 檔案不在白名單: {target_file}"
logger.warning(reason)
return {"success": False, "action": "CODE_FIX",
"message": reason, "commit_sha": None, "reverted": False}
# L3速率限制
if not _check_rate_limit():
reason = f"[AiderHeal] 每小時上限 {MAX_HOURLY_FIX} 次,跳過"
logger.warning(reason)
return {"success": False, "action": "CODE_FIX",
"message": reason, "commit_sha": None, "reverted": False}
_notify_telegram(
f"🔧 <b>AiderHeal 啟動</b>\n"
f"├ 錯誤類型: <code>{error_type}</code>\n"
f"├ 目標檔案: <code>{target_file}</code>\n"
f"└ 時間: {ts}"
)
logger.info("[AiderHeal] 開始修復: %s%s", error_type, target_file)
# ── Step 1準備 repo在 110 上)──────────────────────────────────────────
setup_cmds = (
f"cd {REPO_PATH_110} && "
f"git fetch {GITEA_REMOTE} main 2>&1 && "
f"git reset --hard {GITEA_REMOTE}/main 2>&1 && "
f"git stash 2>&1 || true"
)
rc, out, err = _ssh_run(setup_cmds, timeout=30)
if rc != 0:
msg = f"[AiderHeal] git 準備失敗: {err or out}"
logger.error(msg)
_notify_telegram(f"❌ AiderHeal 失敗git 準備)\n<code>{msg}</code>")
return {"success": False, "action": "CODE_FIX",
"message": msg, "commit_sha": None, "reverted": False}
# ── Step 2組裝 Aider 指令 ────────────────────────────────────────────────
# 截斷 error_message避免 shell 注入問題
safe_error = error_message[:500].replace('"', "'").replace('`', "'").replace('$', '')
instruction = (
f"Fix the following {error_type} in this file. "
f"Only fix what is necessary, do not refactor or add features. "
f"Error: {safe_error}"
)
aider_cmd = (
f"cd {REPO_PATH_110} && "
f"GEMINI_API_KEY={GEMINI_API_KEY} "
f"aider --model {AIDER_MODEL} "
f"--yes-always --no-git "
f'--message "{instruction}" '
f"{target_file} 2>&1"
)
logger.info("[AiderHeal] 執行 aider on 110...")
rc, aider_out, aider_err = _ssh_run(aider_cmd, timeout=180)
logger.info("[AiderHeal] aider 輸出: %s", (aider_out or aider_err)[:300])
# ── Step 3diff 行數檢查L2 護欄)───────────────────────────────────────
diff_cmd = f"cd {REPO_PATH_110} && git diff --unified=0 | wc -l"
rc2, diff_lines_str, _ = _ssh_run(diff_cmd)
diff_lines = int(diff_lines_str.strip()) if diff_lines_str.strip().isdigit() else 999
if diff_lines == 0:
msg = f"[AiderHeal] Aider 未產生任何修改diff=0行可能已自動解決或模型失效"
logger.warning(msg)
_notify_telegram(f"⚠️ AiderHeal無修改產生\n<code>{target_file}</code>")
return {"success": False, "action": "CODE_FIX",
"message": msg, "commit_sha": None, "reverted": False}
if diff_lines > MAX_DIFF_LINES:
# 改動太大,丟棄並升級告警
_ssh_run(f"cd {REPO_PATH_110} && git checkout -- .", timeout=10)
msg = (f"[AiderHeal] diff 超出限制 {diff_lines}>{MAX_DIFF_LINES} 行,"
f"已丟棄,需人工介入")
logger.warning(msg)
_notify_telegram(
f"⚠️ <b>AiderHealdiff 過大,需人工審核</b>\n"
f"├ 檔案: <code>{target_file}</code>\n"
f"├ diff: {diff_lines} 行(上限 {MAX_DIFF_LINES}\n"
f"└ 錯誤: <code>{error_type}</code>"
)
return {"success": False, "action": "CODE_FIX",
"message": msg, "commit_sha": None, "reverted": False}
# ── Step 4git commit + push ──────────────────────────────────────────────
fix_msg = (
f"fix(autoheal): [{error_type}] auto-fix {target_file}\n\n"
f"Triggered by AiderHealExecutor (ADR-014)\n"
f"Error: {safe_error[:200]}"
)
commit_cmd = (
f"cd {REPO_PATH_110} && "
f'git add {target_file} && '
f'git commit -m "{fix_msg}" 2>&1 && '
f"git push {GITEA_REMOTE} main 2>&1"
)
rc3, commit_out, commit_err = _ssh_run(commit_cmd, timeout=30)
# 取得 commit SHA
sha_cmd = f"cd {REPO_PATH_110} && git rev-parse --short HEAD"
_, commit_sha, _ = _ssh_run(sha_cmd)
commit_sha = commit_sha.strip() or "unknown"
if rc3 != 0:
msg = f"[AiderHeal] git push 失敗: {commit_err or commit_out}"
logger.error(msg)
_notify_telegram(f"❌ AiderHeal git push 失敗\n<code>{msg}</code>")
return {"success": False, "action": "CODE_FIX",
"message": msg, "commit_sha": None, "reverted": False}
logger.info("[AiderHeal] push 成功commit=%s,等待健康檢查...", commit_sha)
_notify_telegram(
f"🚀 <b>AiderHeal push 完成</b>\n"
f"├ commit: <code>{commit_sha}</code>\n"
f"├ 檔案: <code>{target_file}</code>\n"
f"└ 等待健康檢查..."
)
# ── Step 5健康檢查L4 護欄)────────────────────────────────────────────
time.sleep(20) # 等 CD 部署啟動
healthy = _health_check(retries=6, interval=10)
if healthy:
msg = f"[AiderHeal] 修復成功並部署完成: {target_file} ({commit_sha})"
logger.info(msg)
_notify_telegram(
f"✅ <b>AiderHeal 修復完成</b>\n"
f"├ 錯誤: <code>{error_type}</code>\n"
f"├ 檔案: <code>{target_file}</code>\n"
f"├ commit: <code>{commit_sha}</code>\n"
f"└ diff: {diff_lines}"
)
return {"success": True, "action": "CODE_FIX",
"message": msg, "commit_sha": commit_sha, "reverted": False}
# ── Step 6健康檢查失敗 → 自動 revertL4 護欄)─────────────────────────
logger.error("[AiderHeal] 健康檢查失敗,執行自動 revert...")
revert_cmd = (
f"cd {REPO_PATH_110} && "
f"git revert --no-edit {commit_sha} 2>&1 && "
f"git push {GITEA_REMOTE} main 2>&1"
)
rc4, rev_out, rev_err = _ssh_run(revert_cmd, timeout=30)
if rc4 == 0:
_, revert_sha, _ = _ssh_run(sha_cmd)
revert_sha = revert_sha.strip()
msg = f"[AiderHeal] 健康檢查失敗,已自動 revert: {commit_sha}{revert_sha}"
logger.warning(msg)
_notify_telegram(
f"🔄 <b>AiderHeal 自動回滾</b>\n"
f"├ 原 commit: <code>{commit_sha}</code>\n"
f"├ 回滾 commit: <code>{revert_sha}</code>\n"
f"└ 需人工排查: <code>{error_type}</code> in <code>{target_file}</code>"
)
else:
msg = f"[AiderHeal] revert 失敗!需立即人工介入: {rev_err}"
logger.critical(msg)
_notify_telegram(f"🚨 <b>AiderHeal revert 失敗!請立即人工介入</b>\n<code>{msg}</code>")
return {"success": False, "action": "CODE_FIX",
"message": msg, "commit_sha": commit_sha, "reverted": rc4 == 0}

View File

@@ -27,6 +27,7 @@ ALLOWED_ACTION_TYPES = frozenset({
'WAIT_RETRY',
'ALERT_ONLY',
'SSH_CMD',
'CODE_FIX', # ADR-014: Aider 自動修覆
})
@@ -385,6 +386,30 @@ class AutoHealService:
except Exception as e:
return {"success": False, "action": "SSH_CMD", "message": str(e)}
if action_type == "CODE_FIX":
# ADR-014: 透過 Aider 自動修覆程式碼並推版
target_file = params.get("target_file", "")
error_type = context.get("error_type", "UnknownError")
error_message = context.get("error_message", "")
if not target_file:
return {
"success": False,
"action": "CODE_FIX",
"message": "Playbook CODE_FIX requires action_params.target_file",
}
try:
from services.aider_heal_executor import execute_code_fix
return execute_code_fix(
error_type=error_type,
error_message=error_message,
target_file=target_file,
context=context,
)
except Exception as e:
logger.error("[AutoHeal] CODE_FIX 失敗: %s", e)
return {"success": False, "action": "CODE_FIX",
"message": f"CODE_FIX 例外: {e}"}
return {
"success": False,
"action": action_type,

View File

@@ -57,6 +57,7 @@ _TRIGGER_ZH: Dict[str, str] = {
"resource_optimization": "資源調配優化",
"sales_anomaly": "銷售異常偵測",
"ea_escalation": "EA 升級審核",
"code_exception": "程式碼異常偵測", # ADR-014
}
_AGENT_ZH: Dict[str, str] = {
@@ -119,10 +120,11 @@ class ElephantAlphaAutonomousEngine:
# 各 trigger 的 escalation cooldown分鐘
ESCALATION_COOLDOWN: Dict[str, int] = {
"price_drop_alert": 30, # 同一類型 30 分鐘只發一次
"price_drop_alert": 30,
"market_opportunity": 60,
"threat_escalation": 15,
"resource_optimization": 60,
"code_exception": 5, # ADR-014: 程式錯誤 5 分鐘再檢查一次
}
DEFAULT_COOLDOWN_MIN = 30
@@ -171,7 +173,15 @@ class ElephantAlphaAutonomousEngine:
conditions={"system_load": "high", "queue_size": ">10"},
threshold=0.6,
enabled=True
)
),
AutonomousTrigger(
trigger_type="code_exception", # ADR-014
conditions={"scan_containers": ["momo-pro-system", "momo-scheduler"],
"error_patterns": ["Traceback", "ImportError",
"RuntimeError", "ModuleNotFoundError"]},
threshold=1.0, # 出現即觸發
enabled=True
),
]
async def start_autonomous_monitoring(self):
@@ -223,6 +233,8 @@ class ElephantAlphaAutonomousEngine:
return await self._check_threat_escalation_trigger(trigger)
elif trigger.trigger_type == "resource_optimization":
return await self._check_resource_optimization_trigger(trigger)
elif trigger.trigger_type == "code_exception": # ADR-014
return await self._check_code_exception_trigger(trigger)
return False
# ── Trigger checkers ──────────────────────────────────────────────
@@ -293,6 +305,58 @@ class ElephantAlphaAutonomousEngine:
return (self._get_action_queue_size() > 10
or self._get_system_load_percentage() > 80)
async def _check_code_exception_trigger(self, trigger: AutonomousTrigger) -> bool:
"""ADR-014: 掃描容器 log 抓取 Python Traceback"""
import subprocess
containers = trigger.conditions.get("scan_containers", ["momo-pro-system", "momo-scheduler"])
error_ptns = trigger.conditions.get("error_patterns", ["Traceback", "ImportError"])
has_error = False
error_context = []
target_file = ""
for c in containers:
try:
# 只掃描最近 5 分鐘的 log
cmd = ["docker", "logs", "--since", "5m", c]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
out = result.stdout + "\n" + result.stderr
# 簡單找 Traceback
if "Traceback (most recent call last):" in out:
lines = out.splitlines()
for i, line in enumerate(lines):
if "Traceback" in line:
err_block = lines[i:i+15] # 抓後續15行
err_str = "\n".join(err_block)
# 嘗試從 Traceback 中提取本專案的檔案路徑
import re
# 找 File "/app/services/xxx.py" 或類似
m = re.search(r'File "([^"]*/(services|routes|database)/[^"]+\.py)"', err_str)
if m:
target_file = m.group(1)
# 整理成相對路徑
if "/app/" in target_file:
target_file = target_file.split("/app/")[1]
elif "momo-pro-system/" in target_file:
target_file = target_file.split("momo-pro-system/")[1]
error_context.append(f"[{c}] {err_str}")
has_error = True
break # 只抓第一個錯誤
except Exception as e:
logger.debug(f"Failed to scan log for {c}: {e}")
if has_error and error_context:
# 暫存到 trigger class 中供後續 _handle 使用
trigger._temp_error_msg = "\n".join(error_context)
trigger._temp_target_file = target_file
return True
return False
# ── Decision execution ────────────────────────────────────────────
async def _execute_autonomous_decision(self, trigger: AutonomousTrigger):
@@ -317,6 +381,12 @@ class ElephantAlphaAutonomousEngine:
trigger.last_triggered = datetime.now()
return
# ADR-014: code_exception → AiderHeal code fix loop
if trigger.trigger_type == "code_exception":
await self._handle_code_exception_via_aider(trigger)
trigger.last_triggered = datetime.now()
return
context = await self._build_trigger_context(trigger)
decision = await elephant_orchestrator.analyze_and_coordinate(context)
@@ -371,6 +441,36 @@ class ElephantAlphaAutonomousEngine:
except Exception as e:
logger.error(f"[ElephantAlpha] AutoHeal handoff failed: {e}")
async def _handle_code_exception_via_aider(self, trigger: AutonomousTrigger):
"""ADR-014: code_exception → auto_heal_service.handle_exception (CODE_FIX)"""
error_msg = getattr(trigger, '_temp_error_msg', 'Unknown Traceback')
target_file = getattr(trigger, '_temp_target_file', '')
# 基本過濾:有 traceback 但找不到目標檔案時不處理
if not target_file:
logger.warning("[ElephantAlpha] No target file parsed from traceback, skipping CODE_FIX")
return
try:
from services.auto_heal_service import AutoHealService
heal_service = AutoHealService()
# 使用 error_type='python_exception',此類型應該在 PlayBook 表中有對應設定
await asyncio.get_event_loop().run_in_executor(
None,
heal_service.handle_exception,
"python_exception",
{
"error_type": "Python Traceback",
"error_message": error_msg,
"target_file": target_file,
"source": "elephant_alpha_code_scan",
}
)
logger.info(f"[ElephantAlpha] Code exception handed off to AutoHealService for {target_file}")
except Exception as e:
logger.error(f"[ElephantAlpha] AutoHeal (CODE_FIX) handoff failed: {e}")
async def _build_trigger_context(self, trigger: AutonomousTrigger) -> Dict[str, Any]:
context = {
"trigger_type": trigger.trigger_type,