From bda4edd23beb0e5bc6fe7b26ec2d8d82eceb5d38 Mon Sep 17 00:00:00 2001 From: ogt Date: Sun, 19 Apr 2026 13:26:51 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai-ops):=20ADR-012=20Phase=202/3/4=20?= =?UTF-8?q?=E5=AE=8C=E6=95=B4=E5=AF=A6=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 — Hermes L1 Observer 真實接入: - services/event_router.py::_hermes_observe() 呼叫 hermes3:latest @192.168.0.111:11434/api/generate,做 stack trace 翻譯 - 輸出 JSON {summary, probable_cause, actions},容錯 markdown fence - scheduler.py run_auto_import_task / run_momo_task 兩個 outer except 改走 event_router.dispatch(),帶完整 trace Phase 3 — NemoTron L2 Investigator 規則式實作: - event_router._L2_RULES: event_type → [(action, params)] 規則表 • db_connection_error → query_km + retry_task(60s backoff) • crawler_timeout → silence_alert(30min) + retry_task(300s) • nim_quota_exhausted → silence_alert(720min) • embedding_failure → silence_alert(10min) - agent_actions.retry_task 真實實作: threading.Timer + exponential backoff (60→120→240s) + _retry_state 追蹤 + ALLOWED_RETRY_TASKS 白名單 + 非 scheduler 容器回 'deferred' Phase 4 — L3 HITL Ops 擴充: - agent_actions: pause_task / resume_task / force_retry_now / is_task_paused - OPS_ACTIONS 白名單與 SAFE_ACTIONS 嚴格分離(L2 不可呼叫 L3) - telegram_templates.ops_action_request(): 4 按鈕 inline keyboard (暫停1h / 暫停6h / 立即重試 / 解除暫停) - telegram_bot_service._handle_ops_callback(): 接 momo:ops:: - scheduler.py run_momo_task + run_auto_import_task 開頭加 is_task_paused() 檢查(Phase 4 暫停機制生效) 安全邊界(ADR-012 §①): - L1 Hermes 只讀 → 失敗降 L0 + 🟡 標記 - L2 NemoTron 只碰 ai_insights + 發 Telegram + SAFE_ACTIONS - L3 OpenClaw 任意動作必經 HITL inline keyboard 批准 - 不做容器重啟按鈕(需 docker socket,風險過高) Co-Authored-By: Claude Sonnet 4.6 --- scheduler.py | 70 +++++++++-- services/agent_actions.py | 195 +++++++++++++++++++++++++++---- services/event_router.py | 128 +++++++++++++++++++- services/telegram_bot_service.py | 52 +++++++++ services/telegram_templates.py | 70 +++++++++++ 5 files changed, 472 insertions(+), 43 deletions(-) diff --git a/scheduler.py b/scheduler.py index 753a86e..53b8794 100644 --- a/scheduler.py +++ b/scheduler.py @@ -200,6 +200,15 @@ def managed_scraper_resources(window_size='1920,5000', debug=False, timeout=45, def run_momo_task(): """V8.1 邏輯:處理所有分類並存入資料庫""" + # ADR-012 Phase 4: HITL 暫停檢查 + try: + from services.agent_actions import is_task_paused + if is_task_paused("run_momo_task"): + logging.info("[Crawler] [MOMO] ⏸️ 任務被 HITL 暫停中,本次跳過") + return + except Exception: + pass # agent_actions 未就緒時不阻塞排程 + try: # V-New: 每次執行任務時,動態從 JSON 檔案重新讀取分類 # 這解決了「修改設定需重啟」的問題,也避免了重啟造成的系統崩潰 @@ -452,9 +461,26 @@ def run_momo_task(): logging.error(f"[Crawler] [MOMO] ❌ 發送通知失敗 | Error: {e}") except Exception as e: + import traceback as _tb logging.error(f"[Crawler] [MOMO] 🚨 任務中斷 | Error: {e}") stats = { "status": "Failed", "error": str(e) } _save_stats('momo_task', stats) + # ADR-012 Phase 2: 走 EventRouter(Hermes L1 翻譯 + 三層式訊息) + try: + from services.event_router import dispatch as _dispatch + _dispatch({ + "source": "Scheduler.MOMOCrawler", + "event_type": "crawler_timeout", + "severity": "alert", + "title": "MOMO 爬蟲任務中斷", + "status": "任務失敗", + "impact": "P1 - 熱銷商品監控中斷", + "summary": str(e)[:200], + "trace": _tb.format_exc(), + "payload": {"task_name": "run_momo_task"}, + }) + except Exception as _router_e: + logging.error(f"[Crawler] [MOMO] event_router 失敗: {_router_e}") finally: logging.info("[Crawler] [MOMO] 🏁 所有類別爬取結束") @@ -1391,6 +1417,15 @@ def run_auto_import_task(): V-New: 自動從 Google Drive 匯入當日業績 每半小時檢查一次 Google Drive 是否有新的 Excel 檔案 """ + # ADR-012 Phase 4: HITL 暫停檢查 + try: + from services.agent_actions import is_task_paused + if is_task_paused("run_auto_import_task"): + logging.info("[Scheduler] [AutoImport] ⏸️ 任務被 HITL 暫停中,本次跳過") + return + except Exception: + pass + try: from services.import_service import import_service from services.notification_manager import NotificationManager @@ -1507,26 +1542,37 @@ def run_auto_import_task(): _save_stats('auto_import_task', stats) except Exception as e: + import traceback as _tb logging.error(f"[Scheduler] [AutoImport] 🚨 自動匯入任務異常 | Error: {e}") stats = {"status": "Failed", "error": str(e)} _save_stats('auto_import_task', stats) - # V-New: 任務異常時也發送通知 + # ADR-012 Phase 2: 改走 EventRouter(Hermes L1 翻譯 + 降級鏈) + # LINE 通道保留(event_router 只處理 Telegram) + try: + from services.event_router import dispatch as _dispatch + _dispatch({ + "source": "Scheduler.AutoImport", + "event_type": "db_connection_error" if "translate host" in str(e).lower() or "operational" in str(e).lower() else "import_failure", + "severity": "alert", + "title": "當日業績自動匯入異常", + "status": "匯入失敗", + "impact": "P1 - 當日業績未更新", + "summary": str(e)[:200], + "trace": _tb.format_exc(), + "payload": {"task_name": "run_auto_import_task"}, + }) + except Exception as _router_e: + logging.error(f"[Scheduler] [AutoImport] event_router 失敗: {_router_e}") + + # LINE 通知保留(獨立通道,不經 event_router) try: from services.notification_manager import NotificationManager now_str = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M') - message = ( - f"🚨 當日業績自動匯入異常 ({now_str})\n" - f"{'='*30}\n" - f"❌ 系統錯誤:{str(e)}\n" - f"{'='*30}\n" - f"請聯絡系統管理員" - ) - notifier = NotificationManager() - notifier._send_line_messages([message]) - notifier._send_telegram_messages([message]) + message = f"🚨 當日業績自動匯入異常 ({now_str})\n系統錯誤:{str(e)[:200]}" + NotificationManager()._send_line_messages([message]) except Exception as notify_error: - logging.error(f"[Scheduler] [AutoImport] ❌ 發送異常通知時發生錯誤 | Error: {notify_error}") + logging.error(f"[Scheduler] [AutoImport] ❌ LINE 通知失敗 | Error: {notify_error}") def run_competitor_price_feeder_task(): """ diff --git a/services/agent_actions.py b/services/agent_actions.py index 3cfe50d..96df359 100644 --- a/services/agent_actions.py +++ b/services/agent_actions.py @@ -11,6 +11,7 @@ L2 NemoTron 可安全呼叫的動作集合。嚴格限制: from __future__ import annotations +import threading import time from datetime import datetime, timedelta from typing import Any @@ -19,8 +20,14 @@ from services.logger_manager import SystemLogger sys_log = SystemLogger("AgentAction").get_logger() -# 靜音表(記憶體快取,重啟後清空;Phase 3 可改 DB 持久化) +# ─── Module-level 狀態(記憶體,container restart 清空)───────────────── +# 靜音表:event_key → 靜音到期時間 _silence_table: dict[str, datetime] = {} +# 暫停表:task_name → 暫停到期時間(Phase 4 L3 HITL Ops) +_paused_tasks: dict[str, datetime] = {} +# Retry 狀態:task_name → {attempts, last_ts, last_error}(指數退避用) +_retry_state: dict[str, dict] = {} +_retry_lock = threading.Lock() def _audit(action: str, params: dict, result: dict, latency_ms: float) -> int | None: @@ -44,39 +51,99 @@ def _audit(action: str, params: dict, result: dict, latency_ms: float) -> int | return None +ALLOWED_RETRY_TASKS = { + "run_auto_import_task", "run_momo_task", "run_edm_task", + "run_competitor_price_feeder_task", "run_backup_monitor_task", + "run_icaim_analysis_task", "run_festival_task", "run_whitepage_check", + "run_icaim_analysis_task", "run_db_backup_task", +} + + +def _try_load_task(task_name: str): + """Lazy-import scheduler module 取 task function。非 scheduler 容器會失敗。""" + try: + import importlib + mod = importlib.import_module("scheduler") + return getattr(mod, task_name, None) + except Exception as e: + sys_log.warning(f"[AgentAction] import scheduler 失敗(預期於非 scheduler 容器): {e}") + return None + + +def _run_task_in_thread(task_name: str, attempt: int): + """在獨立 thread 執行 task,寫審計""" + func = _try_load_task(task_name) + if func is None: + sys_log.error(f"[AgentAction] _run_task_in_thread: task {task_name} 不存在") + return + t0 = time.time() + sys_log.info(f"[AgentAction] 🔁 重試執行 {task_name} (attempt {attempt})") + try: + func() + with _retry_lock: + _retry_state.pop(task_name, None) + _audit("retry_task_completed", + {"task_name": task_name, "attempt": attempt}, + {"status": "success"}, (time.time() - t0) * 1000) + sys_log.info(f"[AgentAction] ✅ 重試成功 {task_name}") + except Exception as e: + err = str(e)[:300] + sys_log.error(f"[AgentAction] ❌ 重試失敗 {task_name} attempt={attempt}: {err}") + with _retry_lock: + st = _retry_state.get(task_name, {}) + st["last_error"] = err + _retry_state[task_name] = st + _audit("retry_task_failed", + {"task_name": task_name, "attempt": attempt}, + {"status": "error", "error": err}, (time.time() - t0) * 1000) + + # ===================================================================== -# 🔁 retry_task — 安全重試(exponential backoff) +# 🔁 retry_task — 安全重試(exponential backoff,threading.Timer 延遲執行) # ===================================================================== def retry_task(task_name: str, max_attempts: int = 3, backoff_sec: int = 60) -> dict: - """ - 安全重試一個 scheduler task。Phase 1 stub:只記錄,不真正重試。 - Phase 3 將接入 scheduler.py 的 task dispatch。 - - 限制:task_name 必須在白名單內(避免任意程式碼執行) - """ - ALLOWED_TASKS = { - "run_auto_import_task", "run_momo_task", "run_edm_task", - "run_competitor_price_feeder_task", "run_backup_monitor_task", - "run_icaim_analysis_task", - } t0 = time.time() - if task_name not in ALLOWED_TASKS: + if task_name not in ALLOWED_RETRY_TASKS: result = {"status": "rejected", "reason": f"task '{task_name}' not in whitelist"} _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) sys_log.warning(f"[AgentAction] retry_task 拒絕:{task_name} 不在白名單") return result - # TODO Phase 3: 真實重試邏輯(呼叫 scheduler module 的 task function) - result = { - "status": "queued", - "task_name": task_name, - "max_attempts": max_attempts, - "backoff_sec": backoff_sec, - "note": "Phase 1 stub — 尚未真正重試,僅記錄意圖", - } - _audit("retry_task", {"task_name": task_name, "max_attempts": max_attempts}, + if is_task_paused(task_name): + result = {"status": "skipped", "reason": "task is paused"} + _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) + return result + + # 確認能載入 task(否則 deferred) + if _try_load_task(task_name) is None: + result = {"status": "deferred", "task_name": task_name, + "note": "非 scheduler 容器無法重試,將由下次排程自然執行"} + _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) + return result + + # Exponential backoff:已有 retry 中的就延長,最多 max_attempts + with _retry_lock: + st = _retry_state.get(task_name, {"attempts": 0}) + attempts = st.get("attempts", 0) + 1 + if attempts > max_attempts: + result = {"status": "exhausted", "task_name": task_name, "attempts": attempts - 1} + _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) + _retry_state.pop(task_name, None) + sys_log.warning(f"[AgentAction] {task_name} 已達最大重試次數 {max_attempts}") + return result + delay = backoff_sec * (2 ** (attempts - 1)) # 60s / 120s / 240s ... + st.update({"attempts": attempts, "last_ts": time.time()}) + _retry_state[task_name] = st + + timer = threading.Timer(delay, _run_task_in_thread, args=[task_name, attempts]) + timer.daemon = True + timer.start() + + result = {"status": "scheduled", "task_name": task_name, + "attempt": attempts, "delay_sec": delay, "max_attempts": max_attempts} + _audit("retry_task", {"task_name": task_name, "delay_sec": delay, "attempt": attempts}, result, (time.time() - t0) * 1000) - sys_log.info(f"[AgentAction] retry_task 已排隊(stub): {task_name}") + sys_log.info(f"[AgentAction] 🔁 {task_name} 已排定 {delay}s 後重試 (attempt {attempts}/{max_attempts})") return result @@ -164,7 +231,78 @@ def mark_for_relearn(sku: str, reason: str) -> dict: return result -# 白名單(供 EventRouter / NemoTron 引用) +# ===================================================================== +# 🛑 L3 OPS Actions(只由 Telegram HITL Callback 觸發,不進 SAFE_ACTIONS) +# ===================================================================== +def pause_task(task_name: str, duration_min: int = 60, operator: str = "unknown") -> dict: + """暫停 scheduler 某個 task 指定分鐘。run_scheduler 須在執行前呼叫 is_task_paused 檢查。""" + t0 = time.time() + if task_name not in ALLOWED_RETRY_TASKS: + result = {"status": "rejected", "reason": "task not in whitelist"} + _audit("pause_task", {"task_name": task_name, "operator": operator}, + result, (time.time() - t0) * 1000) + return result + + until = datetime.now() + timedelta(minutes=duration_min) + _paused_tasks[task_name] = until + result = {"status": "paused", "task_name": task_name, + "until": until.isoformat(), "operator": operator} + _audit("pause_task", + {"task_name": task_name, "duration_min": duration_min, "operator": operator}, + result, (time.time() - t0) * 1000) + sys_log.info(f"[AgentAction] ⏸️ {task_name} 已暫停至 {until.strftime('%H:%M')} (by {operator})") + return result + + +def resume_task(task_name: str, operator: str = "unknown") -> dict: + """立即解除 task 暫停""" + t0 = time.time() + had = _paused_tasks.pop(task_name, None) + result = {"status": "resumed" if had else "not_paused", + "task_name": task_name, "operator": operator} + _audit("resume_task", {"task_name": task_name, "operator": operator}, + result, (time.time() - t0) * 1000) + sys_log.info(f"[AgentAction] ▶️ {task_name} 恢復 (by {operator})") + return result + + +def is_task_paused(task_name: str) -> bool: + """run_scheduler 每個 task 啟動前呼叫:true 則跳過本次""" + until = _paused_tasks.get(task_name) + if until is None: + return False + if datetime.now() >= until: + _paused_tasks.pop(task_name, None) + return False + return True + + +def force_retry_now(task_name: str, operator: str = "unknown") -> dict: + """HITL:立即強制重試,繞過 backoff,不計入 attempts""" + t0 = time.time() + if task_name not in ALLOWED_RETRY_TASKS: + result = {"status": "rejected", "reason": "task not in whitelist"} + _audit("force_retry_now", {"task_name": task_name, "operator": operator}, + result, (time.time() - t0) * 1000) + return result + + if _try_load_task(task_name) is None: + result = {"status": "deferred", "note": "非 scheduler 容器"} + _audit("force_retry_now", {"task_name": task_name, "operator": operator}, + result, (time.time() - t0) * 1000) + return result + + # 立即執行(不延遲) + t = threading.Thread(target=_run_task_in_thread, args=[task_name, 0], daemon=True) + t.start() + result = {"status": "started", "task_name": task_name, "operator": operator} + _audit("force_retry_now", {"task_name": task_name, "operator": operator}, + result, (time.time() - t0) * 1000) + sys_log.info(f"[AgentAction] ⚡ {task_name} 強制立即重試 (by {operator})") + return result + + +# L2 白名單(NemoTron 可自主呼叫,讀多寫少) SAFE_ACTIONS: dict[str, Any] = { "retry_task": retry_task, "query_km": query_km, @@ -173,3 +311,10 @@ SAFE_ACTIONS: dict[str, Any] = { "route_to_km": route_to_km, "mark_for_relearn": mark_for_relearn, } + +# L3 白名單(僅 Telegram HITL callback 可呼叫,狀態變更類) +OPS_ACTIONS: dict[str, Any] = { + "pause_task": pause_task, + "resume_task": resume_task, + "force_retry_now": force_retry_now, +} diff --git a/services/event_router.py b/services/event_router.py index fa8b152..bc10611 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -200,17 +200,133 @@ def _compose_triaged(event: dict, tier_label: str, ai_summary: str, # ===================================================================== -# AI 介入 stub(Phase 2/3 填實作) +# L1 Hermes Observer(Phase 2 實作) # ===================================================================== +_HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434") +_HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest") +_HERMES_TIMEOUT = int(os.getenv("HERMES_TIMEOUT", "30")) + +_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,任務是把技術錯誤翻譯成人類可理解的摘要。 + +請根據以下事件產出**繁體中文**分析,嚴格以下列 JSON 格式輸出(不要加 markdown 代碼塊、不要加說明): +{"summary": "一句話技術根因(中文,<60 字)", "probable_cause": "最可能的原因(中文,<40 字)", "actions": ["建議動作1", "建議動作2"]} + +限制: +- summary 翻譯英文錯誤為中文,去除技術 jargon +- probable_cause 推測根因(基於 stack trace 和事件類型) +- actions 最多 3 個,具體可執行 +- 若資訊不足,summary 填 "資訊不足"、actions 填 ["請檢查原始 trace"] +""" + + def _hermes_observe(event: dict) -> str | None: - """Phase 1 stub:Phase 2 會呼叫 hermes_analyst_service""" - # 不呼叫 AI,回傳 None 讓上層降級 - return None + """呼叫 Hermes(Ollama)翻譯 stack trace 為人類摘要。失敗回 None 讓上層降級。""" + try: + user_prompt = ( + f"事件類型:{event.get('event_type', 'unknown')}\n" + f"來源模組:{event.get('source', 'unknown')}\n" + f"標題:{event.get('title', '')}\n" + f"簡述:{event.get('summary', '')}\n" + f"技術 trace:\n{(event.get('trace') or '')[-800:]}" + ) + resp = requests.post( + f"{_HERMES_URL}/api/generate", + json={ + "model": _HERMES_MODEL, + "system": _HERMES_OBSERVE_PROMPT, + "prompt": user_prompt, + "stream": False, + "options": {"temperature": 0.1, "num_predict": 300}, + }, + timeout=_HERMES_TIMEOUT, + ) + if not resp.ok: + sys_log.warning(f"[EventRouter.L1] Hermes HTTP {resp.status_code}") + return None + + raw = (resp.json().get("response") or "").strip() + # 容錯:Hermes 可能多出 markdown fence + if "```" in raw: + parts = raw.split("```") + for p in parts: + if p.strip().startswith("{"): + raw = p.strip() + break + + import json as _json + parsed = _json.loads(raw) + summary = parsed.get("summary", "").strip() + cause = parsed.get("probable_cause", "").strip() + actions = parsed.get("actions", []) or [] + + out = [summary] + if cause: + out.append(f"\n*可能根因:* {cause}") + if actions: + out.append("\n*建議動作:*") + for a in actions[:3]: + out.append(f"• {a}") + return "\n".join(out) + except Exception as e: + sys_log.warning(f"[EventRouter.L1] Hermes 呼叫失敗,降級:{type(e).__name__}: {str(e)[:120]}") + return None + + +# ===================================================================== +# L2 NemoTron Investigator(Phase 3 規則式實作,不呼 NIM) +# ===================================================================== +# event_type → [(action_name, params)] 規則表 +_L2_RULES: dict[str, list[tuple[str, dict]]] = { + "db_connection_error": [ + ("query_km", {"query": "DNS resolve 失敗 momo-postgres"}), + ("retry_task", {"task_name": "", "backoff_sec": 60}), + ], + "crawler_timeout": [ + ("silence_alert", {"duration_min": 30}), + ("retry_task", {"task_name": "", "backoff_sec": 300}), + ], + "nim_quota_exhausted": [ + ("silence_alert", {"duration_min": 720}), # 12 小時,等 quota 重置 + ], + "embedding_failure": [ + ("silence_alert", {"duration_min": 10}), # 已有 retry queue 處理 + ], +} def _nemoton_investigate(event: dict) -> dict | None: - """Phase 1 stub:Phase 3 會呼叫 nemoton_dispatcher_service""" - return None + """ + Phase 3 規則式 L2:根據 event_type 查 _L2_RULES,執行對應 safe actions。 + Phase 5+ 可改接 NIM 讓 LLM 決定 action。 + """ + event_type = event.get("event_type", "") + rules = _L2_RULES.get(event_type) + if not rules: + return None + + actions_taken = [] + for action_name, params in rules: + action_fn = agent_actions.SAFE_ACTIONS.get(action_name) + if not action_fn: + continue + # 動態參數: 代入事件本身的欄位 + p = dict(params) + if p.get("task_name") == "": + p["task_name"] = event.get("payload", {}).get("task_name", "") or event.get("source", "").split(".")[-1] + if action_name == "silence_alert" and "event_key" not in p: + p["event_key"] = f"{event.get('source', '?')}:{event_type}" + + try: + result = action_fn(**p) + status = result.get("status", "unknown") + actions_taken.append(f"{action_name} → {status}") + except Exception as e: + actions_taken.append(f"{action_name} → error: {str(e)[:80]}") + sys_log.error(f"[EventRouter.L2] action {action_name} 例外: {e}") + + # 產一句 summary 說明決策邏輯 + summary = f"依規則 _L2_RULES[{event_type}] 執行 {len(actions_taken)} 個安全動作" + return {"summary": summary, "actions_taken": actions_taken} # ===================================================================== diff --git a/services/telegram_bot_service.py b/services/telegram_bot_service.py index 71e5b78..ba46786 100644 --- a/services/telegram_bot_service.py +++ b/services/telegram_bot_service.py @@ -468,6 +468,10 @@ class TrendTelegramBot: elif data.startswith("momo:pr:") or data.startswith("pr:"): await self._handle_price_reject(query, data.split(":")[-1]) + # ===== L3 運維決策按鈕(momo:ops::)===== + elif data.startswith("momo:ops:"): + await self._handle_ops_callback(query, data) + async def _handle_price_approve(self, query, insight_id_str: str): """批准降價:寫 KM feedback + 移除按鈕""" from services.openclaw_learning_service import store_insight @@ -535,6 +539,54 @@ class TrendTelegramBot: parse_mode='Markdown' ) + async def _handle_ops_callback(self, query, data: str): + """ + L3 運維決策 callback(ADR-012 Phase 4) + callback_data 格式:momo:ops:: + actions: pause1h / pause6h / retry / resume + """ + from services.agent_actions import OPS_ACTIONS + from services.telegram_templates import ops_action_result + + try: + _, _, action, task_name = data.split(":", 3) + except ValueError: + await query.answer("無效的 ops callback 格式", show_alert=True) + return + + user = query.from_user + operator = user.full_name or f"id_{user.id}" + + # action → OPS_ACTIONS 呼叫對應 + action_map = { + "pause1h": ("pause_task", {"task_name": task_name, "duration_min": 60}), + "pause6h": ("pause_task", {"task_name": task_name, "duration_min": 360}), + "retry": ("force_retry_now", {"task_name": task_name}), + "resume": ("resume_task", {"task_name": task_name}), + } + mapped = action_map.get(action) + if mapped is None: + await query.answer(f"未知 ops action: {action}", show_alert=True) + return + + fn_name, params = mapped + fn = OPS_ACTIONS.get(fn_name) + if fn is None: + await query.answer(f"OPS_ACTIONS 未定義 {fn_name}", show_alert=True) + return + + params["operator"] = operator + try: + result = fn(**params) + except Exception as e: + result = {"status": "error", "error": str(e)[:200]} + logger.error(f"[TelegramBot] ops {action} 例外:{e}") + + await query.edit_message_text( + ops_action_result(query.message.text or "", action, operator, result), + parse_mode='Markdown' + ) + async def _show_trend_by_category(self, query, category: str): """顯示指定分類的趨勢""" try: diff --git a/services/telegram_templates.py b/services/telegram_templates.py index 1a9a3cc..cae2810 100644 --- a/services/telegram_templates.py +++ b/services/telegram_templates.py @@ -199,6 +199,76 @@ def price_decision( # ===================================================================== # 🛠️ 決策結果回執(管理員按下按鈕後,訊息會被 edit 成這個) # ===================================================================== +# ===================================================================== +# 🛠️ L3 Ops Action Request — 附 pause/force_retry 按鈕的運維決策訊息 +# ===================================================================== +def ops_action_request( + task_name: str, + title: str, + reason: str, + context: dict | None = None, + time: datetime | None = None, +) -> tuple[str, dict]: + """ + L3 HITL 運維決策訊息。NemoTron 多次重試失敗 / 告警未消 → 送此訊息請管理員裁決。 + 回傳 (text, inline_keyboard) — callback_data 用 momo:ops:: 格式 + """ + out = [ + f"🛠️ *[{PROJECT_TAG} 運維決策] {_escape_md(title)}*", + f"🕐 {_ts(time)} 📦 {_escape_md(task_name)}", + DIV, + f"💬 {_escape_md(reason)}", + ] + if context: + out.append("") + for k, v in context.items(): + out.append(f"• *{_escape_md(k)}*:{_escape_md(v)}") + out += ["", "👉 *請選擇動作:*"] + + keyboard = { + "inline_keyboard": [ + [ + {"text": "⏸️ 暫停 1h", "callback_data": f"{CB_PREFIX}ops:pause1h:{task_name}"}, + {"text": "⏸️ 暫停 6h", "callback_data": f"{CB_PREFIX}ops:pause6h:{task_name}"}, + ], + [ + {"text": "⚡ 立即重試", "callback_data": f"{CB_PREFIX}ops:retry:{task_name}"}, + {"text": "▶️ 解除暫停", "callback_data": f"{CB_PREFIX}ops:resume:{task_name}"}, + ], + ] + } + return _clip("\n".join(out)), keyboard + + +def ops_action_result( + original_text: str, + action: str, # pause1h / pause6h / retry / resume + operator: str, + result: dict, +) -> str: + """Ops callback 執行完,edit 原訊息顯示結果""" + emoji_map = {"pause1h": "⏸️", "pause6h": "⏸️", "retry": "⚡", "resume": "▶️"} + label_map = { + "pause1h": "已暫停 1 小時", "pause6h": "已暫停 6 小時", + "retry": "已立即重試", "resume": "已解除暫停", + } + emoji = emoji_map.get(action, "🛠️") + label = label_map.get(action, action) + status = result.get("status", "unknown") + footer = [ + "", + DIV, + f"{emoji} *{label}*(狀態:{_escape_md(status)})", + f"👤 操作人:{_escape_md(operator)}", + f"🕐 {_ts()}", + ] + if status == "rejected": + footer.append(f"⚠️ 拒絕原因:{_escape_md(result.get('reason', ''))}") + elif status == "deferred": + footer.append(f"ℹ️ {_escape_md(result.get('note', ''))}") + return _clip(original_text + "\n".join(footer)) + + def decision_result( original_text: str, decision: str, # "approve" or "reject"