""" Agent Action 白名單(ADR-012 Phase 1 骨幹) L2 NemoTron 可安全呼叫的動作集合。嚴格限制: - 只能寫 ai_insights 和發 Telegram - 不可動 prod 資料表 / 容器 / 外部系統 - 所有 action 必須 dual-write 審計軌跡 EventRouter 只會執行本檔 SAFE_ACTIONS;所有動作必須保守、可審計、可回放。 """ from __future__ import annotations import threading import time from datetime import datetime, timedelta from typing import Any from services.logger_manager import SystemLogger sys_log = SystemLogger("AgentAction").get_logger() # ─── Module-level 狀態(記憶體,container restart 清空)───────────────── # 靜音表:event_key → 靜音到期時間 _silence_table: dict[str, datetime] = {} # 暫停表:task_name → 暫停到期時間(Phase 4 L3 HITL Ops) _paused_tasks: dict[str, datetime] = {} # Retry 狀態:task_name → {attempts, last_ts, last_error}(指數退避用) _retry_state: dict[str, dict] = {} _retry_lock = threading.Lock() def _audit(action: str, params: dict, result: dict, latency_ms: float) -> int | None: """所有 action 統一審計入 ai_insights(ADR-007 Dual-Write)""" try: from services.openclaw_learning_service import store_insight return store_insight( insight_type="agent_action", content=f"action={action} result={result.get('status', 'unknown')}", period=datetime.now().strftime("%Y-%m-%d"), metadata={ "action": action, "params": params, "result": result, "latency_ms": latency_ms, "ts": datetime.now().isoformat(), }, ) except Exception as e: sys_log.error(f"[AgentAction] audit 失敗 action={action}: {e}") return None def _store_action_memory( insight_type: str, content: str, *, product_sku: str | None = None, metadata: dict | None = None, status: str = "approved", ) -> int | None: """Write a concrete L2 action outcome into OpenClaw memory.""" from services.openclaw_learning_service import store_insight return store_insight( insight_type=insight_type, content=content, period=datetime.now().strftime("%Y-%m-%d"), product_sku=product_sku, metadata=metadata or {}, ai_model="agent_actions", confidence=0.8, created_by="agent_actions", status=status, ) ALLOWED_RETRY_TASKS = { "run_auto_import_task", "run_momo_task", "run_edm_task", "run_competitor_price_feeder_task", "run_backup_monitor_task", "run_pchome_match_backfill_task", "run_pchome_growth_momo_backfill_task", "run_icaim_analysis_task", "run_festival_task", "run_whitepage_check", "run_icaim_analysis_task", "run_db_backup_task", "run_promo_event_task", } def _try_load_task(task_name: str): """Lazy-import scheduler module 取 task function。非 scheduler 容器會失敗。""" try: import importlib mod = importlib.import_module("scheduler") return getattr(mod, task_name, None) except Exception as e: sys_log.warning(f"[AgentAction] import scheduler 失敗(預期於非 scheduler 容器): {e}") return None def _run_task_in_thread(task_name: str, attempt: int): """在獨立 thread 執行 task,寫審計""" func = _try_load_task(task_name) if func is None: sys_log.error(f"[AgentAction] _run_task_in_thread: task {task_name} 不存在") return t0 = time.time() sys_log.info(f"[AgentAction] 🔁 重試執行 {task_name} (attempt {attempt})") try: func() with _retry_lock: _retry_state.pop(task_name, None) _audit("retry_task_completed", {"task_name": task_name, "attempt": attempt}, {"status": "success"}, (time.time() - t0) * 1000) sys_log.info(f"[AgentAction] ✅ 重試成功 {task_name}") except Exception as e: err = str(e)[:300] sys_log.error(f"[AgentAction] ❌ 重試失敗 {task_name} attempt={attempt}: {err}") with _retry_lock: st = _retry_state.get(task_name, {}) st["last_error"] = err _retry_state[task_name] = st _audit("retry_task_failed", {"task_name": task_name, "attempt": attempt}, {"status": "error", "error": err}, (time.time() - t0) * 1000) # ===================================================================== # 🔁 retry_task — 安全重試(exponential backoff,threading.Timer 延遲執行) # ===================================================================== def retry_task(task_name: str, max_attempts: int = 3, backoff_sec: int = 60) -> dict: t0 = time.time() if task_name not in ALLOWED_RETRY_TASKS: result = {"status": "rejected", "reason": f"task '{task_name}' not in whitelist"} _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) sys_log.warning(f"[AgentAction] retry_task 拒絕:{task_name} 不在白名單") return result if is_task_paused(task_name): result = {"status": "skipped", "reason": "task is paused"} _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) return result # 確認能載入 task(否則 deferred) if _try_load_task(task_name) is None: result = {"status": "deferred", "task_name": task_name, "note": "非 scheduler 容器無法重試,將由下次排程自然執行"} _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) return result # Exponential backoff:已有 retry 中的就延長,最多 max_attempts with _retry_lock: st = _retry_state.get(task_name, {"attempts": 0}) attempts = st.get("attempts", 0) + 1 if attempts > max_attempts: result = {"status": "exhausted", "task_name": task_name, "attempts": attempts - 1} _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) _retry_state.pop(task_name, None) sys_log.warning(f"[AgentAction] {task_name} 已達最大重試次數 {max_attempts}") return result delay = backoff_sec * (2 ** (attempts - 1)) # 60s / 120s / 240s ... st.update({"attempts": attempts, "last_ts": time.time()}) _retry_state[task_name] = st timer = threading.Timer(delay, _run_task_in_thread, args=[task_name, attempts]) timer.daemon = True timer.start() result = {"status": "scheduled", "task_name": task_name, "attempt": attempts, "delay_sec": delay, "max_attempts": max_attempts} _audit("retry_task", {"task_name": task_name, "delay_sec": delay, "attempt": attempts}, result, (time.time() - t0) * 1000) sys_log.info(f"[AgentAction] 🔁 {task_name} 已排定 {delay}s 後重試 (attempt {attempts}/{max_attempts})") return result # ===================================================================== # 🔍 query_km — RAG 查詢歷史同類事件 # ===================================================================== def query_km(query: str, insight_type: str | None = None, limit: int = 5) -> dict: """透過 openclaw_learning_service.build_rag_context 找歷史同類事件""" t0 = time.time() try: from services.openclaw_learning_service import build_rag_context context = build_rag_context(query=query, insight_type=insight_type) result = { "status": "ok", "query": query, "context_preview": (context or "")[:500], "has_results": bool(context and context.strip()), } except Exception as e: result = {"status": "error", "error": str(e)[:200]} sys_log.error(f"[AgentAction] query_km 失敗: {e}") _audit("query_km", {"query": query, "insight_type": insight_type, "limit": limit}, result, (time.time() - t0) * 1000) return result # ===================================================================== # 🔕 silence_alert — 靜音抑制(避免告警風暴) # ===================================================================== def silence_alert(event_key: str, duration_min: int = 60) -> dict: """ 對特定 event_key 設定靜音期限。EventRouter 在 dispatch 前會先檢查。 event_key 建議格式:":",例: "Scheduler.AutoImport:db_connection_error" """ t0 = time.time() until = datetime.now() + timedelta(minutes=duration_min) _silence_table[event_key] = until result = {"status": "silenced", "event_key": event_key, "until": until.isoformat()} _audit("silence_alert", {"event_key": event_key, "duration_min": duration_min}, result, (time.time() - t0) * 1000) sys_log.info(f"[AgentAction] silence_alert: {event_key} → 靜音至 {until.strftime('%H:%M')}") return result def is_silenced(event_key: str) -> bool: """EventRouter 呼叫,判斷是否需略過此事件""" until = _silence_table.get(event_key) if until is None: return False if datetime.now() >= until: _silence_table.pop(event_key, None) return False return True # ===================================================================== # 🏷️ 三個既有 NemoTron tool 的 wrapper(供 event_router 統一調用) # ===================================================================== def flag_for_human_review(sku: str, concern: str) -> dict: """升級到 L3 HITL:寫入 human_review 記憶,等待人工後續處理。""" t0 = time.time() insight_id = _store_action_memory( "human_review", f"SKU={sku} 需要人工覆核:{concern}", product_sku=sku, metadata={"sku": sku, "concern": concern, "source": "flag_for_human_review"}, status="pending", ) result = {"status": "pending_review", "sku": sku, "concern": concern, "insight_id": insight_id} _audit("flag_for_human_review", {"sku": sku, "concern": concern}, result, (time.time() - t0) * 1000) return result def route_to_km(sku: str, domain: str, summary: str) -> dict: """KM 歸檔:將 NemoTron/Hermes 判斷沉澱為可檢索知識。""" t0 = time.time() insight_id = _store_action_memory( "km_entry", f"[{domain}] SKU={sku}:{summary}", product_sku=sku, metadata={"sku": sku, "domain": domain, "summary": summary, "source": "route_to_km"}, ) result = {"status": "archived", "sku": sku, "domain": domain, "insight_id": insight_id} _audit("route_to_km", {"sku": sku, "domain": domain}, result, (time.time() - t0) * 1000) return result def mark_for_relearn(sku: str, reason: str) -> dict: """標記重新訓練:寫入 relearn_marker 供 OpenClaw/品質批次使用。""" t0 = time.time() insight_id = _store_action_memory( "relearn_marker", f"SKU={sku} 需要重新學習:{reason}", product_sku=sku, metadata={"sku": sku, "reason": reason, "source": "mark_for_relearn"}, status="pending", ) result = {"status": "marked", "sku": sku, "reason": reason, "insight_id": insight_id} _audit("mark_for_relearn", {"sku": sku, "reason": reason}, result, (time.time() - t0) * 1000) return result # ===================================================================== # 🛑 L3 OPS Actions(只由 Telegram HITL Callback 觸發,不進 SAFE_ACTIONS) # ===================================================================== def pause_task(task_name: str, duration_min: int = 60, operator: str = "unknown") -> dict: """暫停 scheduler 某個 task 指定分鐘。run_scheduler 須在執行前呼叫 is_task_paused 檢查。""" t0 = time.time() if task_name not in ALLOWED_RETRY_TASKS: result = {"status": "rejected", "reason": "task not in whitelist"} _audit("pause_task", {"task_name": task_name, "operator": operator}, result, (time.time() - t0) * 1000) return result until = datetime.now() + timedelta(minutes=duration_min) _paused_tasks[task_name] = until result = {"status": "paused", "task_name": task_name, "until": until.isoformat(), "operator": operator} _audit("pause_task", {"task_name": task_name, "duration_min": duration_min, "operator": operator}, result, (time.time() - t0) * 1000) sys_log.info(f"[AgentAction] ⏸️ {task_name} 已暫停至 {until.strftime('%H:%M')} (by {operator})") return result def resume_task(task_name: str, operator: str = "unknown") -> dict: """立即解除 task 暫停""" t0 = time.time() had = _paused_tasks.pop(task_name, None) result = {"status": "resumed" if had else "not_paused", "task_name": task_name, "operator": operator} _audit("resume_task", {"task_name": task_name, "operator": operator}, result, (time.time() - t0) * 1000) sys_log.info(f"[AgentAction] ▶️ {task_name} 恢復 (by {operator})") return result def is_task_paused(task_name: str) -> bool: """run_scheduler 每個 task 啟動前呼叫:true 則跳過本次""" until = _paused_tasks.get(task_name) if until is None: return False if datetime.now() >= until: _paused_tasks.pop(task_name, None) return False return True def force_retry_now(task_name: str, operator: str = "unknown") -> dict: """HITL:立即強制重試,繞過 backoff,不計入 attempts""" t0 = time.time() if task_name not in ALLOWED_RETRY_TASKS: result = {"status": "rejected", "reason": "task not in whitelist"} _audit("force_retry_now", {"task_name": task_name, "operator": operator}, result, (time.time() - t0) * 1000) return result if _try_load_task(task_name) is None: result = {"status": "deferred", "note": "非 scheduler 容器"} _audit("force_retry_now", {"task_name": task_name, "operator": operator}, result, (time.time() - t0) * 1000) return result # 立即執行(不延遲) t = threading.Thread(target=_run_task_in_thread, args=[task_name, 0], daemon=True) t.start() result = {"status": "started", "task_name": task_name, "operator": operator} _audit("force_retry_now", {"task_name": task_name, "operator": operator}, result, (time.time() - t0) * 1000) sys_log.info(f"[AgentAction] ⚡ {task_name} 強制立即重試 (by {operator})") return result # L2 白名單(NemoTron 可自主呼叫,讀多寫少) SAFE_ACTIONS: dict[str, Any] = { "retry_task": retry_task, "query_km": query_km, "silence_alert": silence_alert, "flag_for_human_review": flag_for_human_review, "route_to_km": route_to_km, "mark_for_relearn": mark_for_relearn, } # L3 白名單(僅 Telegram HITL callback 可呼叫,狀態變更類) OPS_ACTIONS: dict[str, Any] = { "pause_task": pause_task, "resume_task": resume_task, "force_retry_now": force_retry_now, }