Files
ewoooc/services/agent_actions.py
OoO 4c59b74ced
All checks were successful
CD Pipeline / deploy (push) Successful in 1m11s
feat: schedule growth momo backfill
2026-06-19 00:18:53 +08:00

363 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent Action 白名單ADR-012 Phase 1 骨幹)
L2 NemoTron 可安全呼叫的動作集合。嚴格限制:
- 只能寫 ai_insights 和發 Telegram
- 不可動 prod 資料表 / 容器 / 外部系統
- 所有 action 必須 dual-write 審計軌跡
EventRouter 只會執行本檔 SAFE_ACTIONS所有動作必須保守、可審計、可回放。
"""
from __future__ import annotations
import threading
import time
from datetime import datetime, timedelta
from typing import Any
from services.logger_manager import SystemLogger
sys_log = SystemLogger("AgentAction").get_logger()
# ─── Module-level 狀態記憶體container restart 清空)─────────────────
# 靜音表event_key → 靜音到期時間
_silence_table: dict[str, datetime] = {}
# 暫停表task_name → 暫停到期時間Phase 4 L3 HITL Ops
_paused_tasks: dict[str, datetime] = {}
# Retry 狀態task_name → {attempts, last_ts, last_error}(指數退避用)
_retry_state: dict[str, dict] = {}
_retry_lock = threading.Lock()
def _audit(action: str, params: dict, result: dict, latency_ms: float) -> int | None:
"""所有 action 統一審計入 ai_insightsADR-007 Dual-Write"""
try:
from services.openclaw_learning_service import store_insight
return store_insight(
insight_type="agent_action",
content=f"action={action} result={result.get('status', 'unknown')}",
period=datetime.now().strftime("%Y-%m-%d"),
metadata={
"action": action,
"params": params,
"result": result,
"latency_ms": latency_ms,
"ts": datetime.now().isoformat(),
},
)
except Exception as e:
sys_log.error(f"[AgentAction] audit 失敗 action={action}: {e}")
return None
def _store_action_memory(
insight_type: str,
content: str,
*,
product_sku: str | None = None,
metadata: dict | None = None,
status: str = "approved",
) -> int | None:
"""Write a concrete L2 action outcome into OpenClaw memory."""
from services.openclaw_learning_service import store_insight
return store_insight(
insight_type=insight_type,
content=content,
period=datetime.now().strftime("%Y-%m-%d"),
product_sku=product_sku,
metadata=metadata or {},
ai_model="agent_actions",
confidence=0.8,
created_by="agent_actions",
status=status,
)
ALLOWED_RETRY_TASKS = {
"run_auto_import_task", "run_momo_task", "run_edm_task",
"run_competitor_price_feeder_task", "run_backup_monitor_task",
"run_pchome_match_backfill_task", "run_pchome_growth_momo_backfill_task",
"run_icaim_analysis_task", "run_festival_task", "run_whitepage_check",
"run_icaim_analysis_task", "run_db_backup_task", "run_promo_event_task",
}
def _try_load_task(task_name: str):
"""Lazy-import scheduler module 取 task function。非 scheduler 容器會失敗。"""
try:
import importlib
mod = importlib.import_module("scheduler")
return getattr(mod, task_name, None)
except Exception as e:
sys_log.warning(f"[AgentAction] import scheduler 失敗(預期於非 scheduler 容器): {e}")
return None
def _run_task_in_thread(task_name: str, attempt: int):
"""在獨立 thread 執行 task寫審計"""
func = _try_load_task(task_name)
if func is None:
sys_log.error(f"[AgentAction] _run_task_in_thread: task {task_name} 不存在")
return
t0 = time.time()
sys_log.info(f"[AgentAction] 🔁 重試執行 {task_name} (attempt {attempt})")
try:
func()
with _retry_lock:
_retry_state.pop(task_name, None)
_audit("retry_task_completed",
{"task_name": task_name, "attempt": attempt},
{"status": "success"}, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] ✅ 重試成功 {task_name}")
except Exception as e:
err = str(e)[:300]
sys_log.error(f"[AgentAction] ❌ 重試失敗 {task_name} attempt={attempt}: {err}")
with _retry_lock:
st = _retry_state.get(task_name, {})
st["last_error"] = err
_retry_state[task_name] = st
_audit("retry_task_failed",
{"task_name": task_name, "attempt": attempt},
{"status": "error", "error": err}, (time.time() - t0) * 1000)
# =====================================================================
# 🔁 retry_task — 安全重試exponential backoffthreading.Timer 延遲執行)
# =====================================================================
def retry_task(task_name: str, max_attempts: int = 3, backoff_sec: int = 60) -> dict:
t0 = time.time()
if task_name not in ALLOWED_RETRY_TASKS:
result = {"status": "rejected", "reason": f"task '{task_name}' not in whitelist"}
_audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000)
sys_log.warning(f"[AgentAction] retry_task 拒絕:{task_name} 不在白名單")
return result
if is_task_paused(task_name):
result = {"status": "skipped", "reason": "task is paused"}
_audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000)
return result
# 確認能載入 task否則 deferred
if _try_load_task(task_name) is None:
result = {"status": "deferred", "task_name": task_name,
"note": "非 scheduler 容器無法重試,將由下次排程自然執行"}
_audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000)
return result
# Exponential backoff已有 retry 中的就延長,最多 max_attempts
with _retry_lock:
st = _retry_state.get(task_name, {"attempts": 0})
attempts = st.get("attempts", 0) + 1
if attempts > max_attempts:
result = {"status": "exhausted", "task_name": task_name, "attempts": attempts - 1}
_audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000)
_retry_state.pop(task_name, None)
sys_log.warning(f"[AgentAction] {task_name} 已達最大重試次數 {max_attempts}")
return result
delay = backoff_sec * (2 ** (attempts - 1)) # 60s / 120s / 240s ...
st.update({"attempts": attempts, "last_ts": time.time()})
_retry_state[task_name] = st
timer = threading.Timer(delay, _run_task_in_thread, args=[task_name, attempts])
timer.daemon = True
timer.start()
result = {"status": "scheduled", "task_name": task_name,
"attempt": attempts, "delay_sec": delay, "max_attempts": max_attempts}
_audit("retry_task", {"task_name": task_name, "delay_sec": delay, "attempt": attempts},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] 🔁 {task_name} 已排定 {delay}s 後重試 (attempt {attempts}/{max_attempts})")
return result
# =====================================================================
# 🔍 query_km — RAG 查詢歷史同類事件
# =====================================================================
def query_km(query: str, insight_type: str | None = None, limit: int = 5) -> dict:
"""透過 openclaw_learning_service.build_rag_context 找歷史同類事件"""
t0 = time.time()
try:
from services.openclaw_learning_service import build_rag_context
context = build_rag_context(query=query, insight_type=insight_type)
result = {
"status": "ok",
"query": query,
"context_preview": (context or "")[:500],
"has_results": bool(context and context.strip()),
}
except Exception as e:
result = {"status": "error", "error": str(e)[:200]}
sys_log.error(f"[AgentAction] query_km 失敗: {e}")
_audit("query_km", {"query": query, "insight_type": insight_type, "limit": limit},
result, (time.time() - t0) * 1000)
return result
# =====================================================================
# 🔕 silence_alert — 靜音抑制(避免告警風暴)
# =====================================================================
def silence_alert(event_key: str, duration_min: int = 60) -> dict:
"""
對特定 event_key 設定靜音期限。EventRouter 在 dispatch 前會先檢查。
event_key 建議格式:"<source>:<event_type>",例:
"Scheduler.AutoImport:db_connection_error"
"""
t0 = time.time()
until = datetime.now() + timedelta(minutes=duration_min)
_silence_table[event_key] = until
result = {"status": "silenced", "event_key": event_key, "until": until.isoformat()}
_audit("silence_alert", {"event_key": event_key, "duration_min": duration_min},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] silence_alert: {event_key} → 靜音至 {until.strftime('%H:%M')}")
return result
def is_silenced(event_key: str) -> bool:
"""EventRouter 呼叫,判斷是否需略過此事件"""
until = _silence_table.get(event_key)
if until is None:
return False
if datetime.now() >= until:
_silence_table.pop(event_key, None)
return False
return True
# =====================================================================
# 🏷️ 三個既有 NemoTron tool 的 wrapper供 event_router 統一調用)
# =====================================================================
def flag_for_human_review(sku: str, concern: str) -> dict:
"""升級到 L3 HITL寫入 human_review 記憶,等待人工後續處理。"""
t0 = time.time()
insight_id = _store_action_memory(
"human_review",
f"SKU={sku} 需要人工覆核:{concern}",
product_sku=sku,
metadata={"sku": sku, "concern": concern, "source": "flag_for_human_review"},
status="pending",
)
result = {"status": "pending_review", "sku": sku, "concern": concern, "insight_id": insight_id}
_audit("flag_for_human_review", {"sku": sku, "concern": concern},
result, (time.time() - t0) * 1000)
return result
def route_to_km(sku: str, domain: str, summary: str) -> dict:
"""KM 歸檔:將 NemoTron/Hermes 判斷沉澱為可檢索知識。"""
t0 = time.time()
insight_id = _store_action_memory(
"km_entry",
f"[{domain}] SKU={sku}{summary}",
product_sku=sku,
metadata={"sku": sku, "domain": domain, "summary": summary, "source": "route_to_km"},
)
result = {"status": "archived", "sku": sku, "domain": domain, "insight_id": insight_id}
_audit("route_to_km", {"sku": sku, "domain": domain}, result, (time.time() - t0) * 1000)
return result
def mark_for_relearn(sku: str, reason: str) -> dict:
"""標記重新訓練:寫入 relearn_marker 供 OpenClaw/品質批次使用。"""
t0 = time.time()
insight_id = _store_action_memory(
"relearn_marker",
f"SKU={sku} 需要重新學習:{reason}",
product_sku=sku,
metadata={"sku": sku, "reason": reason, "source": "mark_for_relearn"},
status="pending",
)
result = {"status": "marked", "sku": sku, "reason": reason, "insight_id": insight_id}
_audit("mark_for_relearn", {"sku": sku, "reason": reason}, result, (time.time() - t0) * 1000)
return result
# =====================================================================
# 🛑 L3 OPS Actions只由 Telegram HITL Callback 觸發,不進 SAFE_ACTIONS
# =====================================================================
def pause_task(task_name: str, duration_min: int = 60, operator: str = "unknown") -> dict:
"""暫停 scheduler 某個 task 指定分鐘。run_scheduler 須在執行前呼叫 is_task_paused 檢查。"""
t0 = time.time()
if task_name not in ALLOWED_RETRY_TASKS:
result = {"status": "rejected", "reason": "task not in whitelist"}
_audit("pause_task", {"task_name": task_name, "operator": operator},
result, (time.time() - t0) * 1000)
return result
until = datetime.now() + timedelta(minutes=duration_min)
_paused_tasks[task_name] = until
result = {"status": "paused", "task_name": task_name,
"until": until.isoformat(), "operator": operator}
_audit("pause_task",
{"task_name": task_name, "duration_min": duration_min, "operator": operator},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] ⏸️ {task_name} 已暫停至 {until.strftime('%H:%M')} (by {operator})")
return result
def resume_task(task_name: str, operator: str = "unknown") -> dict:
"""立即解除 task 暫停"""
t0 = time.time()
had = _paused_tasks.pop(task_name, None)
result = {"status": "resumed" if had else "not_paused",
"task_name": task_name, "operator": operator}
_audit("resume_task", {"task_name": task_name, "operator": operator},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] ▶️ {task_name} 恢復 (by {operator})")
return result
def is_task_paused(task_name: str) -> bool:
"""run_scheduler 每個 task 啟動前呼叫true 則跳過本次"""
until = _paused_tasks.get(task_name)
if until is None:
return False
if datetime.now() >= until:
_paused_tasks.pop(task_name, None)
return False
return True
def force_retry_now(task_name: str, operator: str = "unknown") -> dict:
"""HITL立即強制重試繞過 backoff不計入 attempts"""
t0 = time.time()
if task_name not in ALLOWED_RETRY_TASKS:
result = {"status": "rejected", "reason": "task not in whitelist"}
_audit("force_retry_now", {"task_name": task_name, "operator": operator},
result, (time.time() - t0) * 1000)
return result
if _try_load_task(task_name) is None:
result = {"status": "deferred", "note": "非 scheduler 容器"}
_audit("force_retry_now", {"task_name": task_name, "operator": operator},
result, (time.time() - t0) * 1000)
return result
# 立即執行(不延遲)
t = threading.Thread(target=_run_task_in_thread, args=[task_name, 0], daemon=True)
t.start()
result = {"status": "started", "task_name": task_name, "operator": operator}
_audit("force_retry_now", {"task_name": task_name, "operator": operator},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] ⚡ {task_name} 強制立即重試 (by {operator})")
return result
# L2 白名單NemoTron 可自主呼叫,讀多寫少)
SAFE_ACTIONS: dict[str, Any] = {
"retry_task": retry_task,
"query_km": query_km,
"silence_alert": silence_alert,
"flag_for_human_review": flag_for_human_review,
"route_to_km": route_to_km,
"mark_for_relearn": mark_for_relearn,
}
# L3 白名單(僅 Telegram HITL callback 可呼叫,狀態變更類)
OPS_ACTIONS: dict[str, Any] = {
"pause_task": pause_task,
"resume_task": resume_task,
"force_retry_now": force_retry_now,
}