From 0b4f80ee8ac608b39d0178867ad5efce1371a1ac Mon Sep 17 00:00:00 2001 From: ogt Date: Sun, 19 Apr 2026 12:46:51 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai-ops):=20Agent=20Action=20Ladder=20?= =?UTF-8?q?=E9=AA=A8=E5=B9=B9=EF=BC=88ADR-012=20Phase=201=EF=BC=89+=20?= =?UTF-8?q?=E9=80=B1=E5=A0=B1=E5=A5=97=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ADR-012 核心設計: - 4 級信任邊界:L0 直出 / L1 Hermes 觀察 / L2 NemoTron 診斷執行 / L3 OpenClaw HITL - 通知鏈絕不中斷:每級失敗立即降級,保底 L0 模板 + 🟡 標記 - Audit Trail:每次 dispatch 自動寫 ai_insights (insight_type=agent_action) - 安全白名單:L2 可呼叫 6 個安全 action(retry/query_km/silence + 3 個既有 NemoTron tool) 新增檔案: - services/event_router.py — 事件分流入口,按 severity × event_type 分 Tier - services/agent_actions.py — 安全 action 白名單(Phase 1 stub + 完整介面) - docs/adr/ADR-012-agent-action-ladder.md — 完整設計 + 分階段計畫 Phase 1 狀態: - L0 直出完整可用 ✅ - L1 Hermes / L2 NemoTron 為 stub(Phase 2/3 填實作) - Fallback 降級鏈已完整 ✅ - 靜音檢查(is_silenced)+ Audit Trail 已就緒 ✅ 處理既有 TODO: - services/openclaw_strategist_service.py::_notify_telegram_group() 改用 telegram_templates.report() 統一週報格式 全景盤點(新 memory): - reference_telegram_endpoints_map.md — 21 個 Telegram 發送點 - feedback_agent_action_ladder.md — 操作規範 (+ 既有 ADR-011 跨專案隔離規範一併生效) Co-Authored-By: Claude Sonnet 4.6 --- docs/adr/ADR-012-agent-action-ladder.md | 142 ++++++++++++ services/agent_actions.py | 175 +++++++++++++++ services/event_router.py | 285 ++++++++++++++++++++++++ services/openclaw_strategist_service.py | 43 ++-- 4 files changed, 626 insertions(+), 19 deletions(-) create mode 100644 docs/adr/ADR-012-agent-action-ladder.md create mode 100644 services/agent_actions.py create mode 100644 services/event_router.py diff --git a/docs/adr/ADR-012-agent-action-ladder.md b/docs/adr/ADR-012-agent-action-ladder.md new file mode 100644 index 0000000..3f49538 --- /dev/null +++ b/docs/adr/ADR-012-agent-action-ladder.md @@ -0,0 +1,142 @@ +# ADR-012: Agent Action Ladder(AI Agent 事件介入與自動修復三級信任邊界) + +- **Status**: Accepted +- **Date**: 2026-04-19 +- **Deciders**: 統帥 +- **Related**: ADR-001(三 Agent 分工), ADR-004(NemoTron Fallback), ADR-007(AI Dual-Write), ADR-011(跨專案隔離) + +## Context + +P2/P3 Inline Keyboard 降價決策上線後,統帥希望進一步讓 AI Agent(Hermes / NemoTron / OpenClaw)**自動接手處理**系統事件,而非僅發送原始錯誤訊息給人工讀。 + +但「AI 全自動執行」存在嚴重風險: +1. AI 幻覺可能做出錯誤判斷(AI 下架正常商品、誤觸發降價) +2. 不可逆操作(刪資料、重啟生產容器)一旦出錯災難級 +3. AI 服務本身可能掛掉(NIM quota / Ollama OOM),若無 fallback 會連帶阻斷通知 + +## Decision + +### ① 三級信任邊界(Action Ladder) + +建立 **L0 直出 → L1 觀察 → L2 診斷 → L3 執行** 四級分流,每級限定能做的動作與失效行為: + +| Level | Agent | 能做什麼 | 不能做什麼 | 失效降級 | +|-------|-------|---------|-----------|---------| +| **L0 Direct** | — | 模板直出 | — | 永遠可用(保底) | +| **L1 Observer** | Hermes | 翻譯 stack trace、摘要、風險等級標註 | 寫任何資料 / 呼叫外部 API | → L0 + 🟡 標記 | +| **L2 Investigator** | NemoTron | 寫 `ai_insights`、執行 **5+3 個安全 tool**、發 Telegram | 動 prod 資料表 / 容器 / 外部系統 | → Hermes 規則引擎(ADR-004)| +| **L3 Operator** | OpenClaw | 提方案 + HITL 按鈕 → 人批准後執行任意動作 | 無 HITL 批准前執行 | → 人工 SOP(寄信通知)| + +### ② 安全 Action 白名單(L2 NemoTron 可用) + +**已有(price_threat 流):** +- `trigger_price_alert(sku, data)` — 發價格告警 +- `add_to_recommendation(sku, reason)` — 加入推薦 +- `flag_for_human_review(sku, concern)` — 升級 L3 HITL +- `route_to_km(sku, domain, summary)` — KM 歸檔 +- `mark_for_relearn(sku, reason)` — 標記重新訓練 + +**本 ADR 新增(通用事件流):** +- `retry_task(task_name, max_attempts=3, backoff=60)` — 安全重試(exponential backoff) +- `query_km(query, limit=5)` — RAG 檢索歷史同類事件 +- `silence_alert(event_key, duration_min=60)` — 靜音抑制,避免告警風暴 + +### ③ EventRouter 分類規則 + +單一入口 `services/event_router.py::dispatch(event)`,依 `severity × event_type` 決定 Tier: + +``` +event = { + "source": "Scheduler.AutoImport", # 來源模組 + "event_type": "db_connection_error", # 事件類型(供 L2 matching) + "severity": "warning", # P0/alert, P1/alert, P2/warning, info, success + "title": "...", "summary": "...", + "trace": "...", # 可選 + "payload": {...}, # 結構化資料 +} +``` + +**分流邏輯:** +- `severity=success|info` → L0 直出 +- `severity=warning` 且 `trace` 不存在 → L0(已結構化,不需 AI) +- `severity=warning` 且 `trace` 存在 → L1(Hermes 翻譯) +- `severity=alert(P1)` + 符合 L2 白名單 event_type → L2(NemoTron) +- `severity=alert(P0)` → L2 + 人工標記(雙軌) +- 複雜策略建議 / 週報 → L3 OpenClaw(P3 已實作) + +### ④ 訊息呈現格式(三層式) + +所有經 AI 加工的訊息**必須保留原始事實**,避免 AI 幻覺掩蓋真相: + +``` +⚠️ [EwoooC 警告] 自動匯入異常 (Level 1 · Hermes) +🕐 ... 📦 Scheduler.AutoImport + +🤖 AI 摘要(Hermes v3): +資料庫暫時斷線,疑似容器間 DNS 波動。 +本週已發生 3 次,系統通常 2-5 分鐘自癒。 + +📊 原始事實: +• event_type: db_connection_error +• 影響: 當日業績匯入延遲 +• 詳細 trace(末段): ... + +🔧 AI 建議行動: +• 等候自動重試 +• 30 分鐘仍失敗 → 檢查 momo-pro_default 網路 +``` + +L2 流程另加 **🤖 AI 已執行動作** 區塊(retry_task / silence_alert 等)。 + +### ⑤ Audit Trail(雙寫強制) + +每次 Agent 介入都要: +1. 寫 `ai_insights` (`insight_type='agent_action'`, metadata 含 `tier`, `agent`, `action_taken`, `confidence`, `latency_ms`) +2. Telegram 訊息末尾加隱藏式 `source_insight_id` 以便追蹤 + +### ⑥ Fallback 降級鏈(SLA 保證) + +``` +L2 NemoTron 掛 → L1 Hermes(規則模式)→ L0 模板直出 +L1 Hermes 掛 → L0 模板直出 + 🟡 「AI 分析暫不可用」 +通知通道掛 → 本地 file queue 暫存(ADR-009 pattern) +``` + +**關鍵 SLA**:無論 AI 狀況,**通知鏈絕不中斷** — 這是 P0 底線。 + +### ⑦ 成本配額(漏斗型,沿用 ADR-001) + +| Agent | 日呼叫上限 | 超額動作 | +|-------|-----------|---------| +| Hermes(本機 Ollama)| 無限制 | — | +| NemoTron(NIM cloud) | 80 次/日(現值) | 走 Hermes 規則(ADR-004) | +| OpenClaw(Gemini) | 依週期觸發 | 不適用(離線批次) | + +## Consequences + +**正面** +- 通知從「原始 stack trace 直丟」升級到「AI 摘要 + 原始事實 + 建議行動」 +- 已知 known issue(如 DNS 暫斷)可由 NemoTron 自動重試,無需人工介入 +- 三級邊界清楚,審計可追溯(`ai_insights` 雙寫 + tier 標記) +- AI 掛掉有完整降級鏈,通知鏈不會塌 + +**負面 / 風險** +- EventRouter 增加一個中間層,延遲 2-15s(依 Tier) +- AI 成本上升(Hermes 本機還好,NIM quota 要盯) +- 新增的 3 個 L2 tool 未實戰測試,前 2 週需觀察 +- 若 Hermes prompt 寫不好,AI 摘要可能誤導 → 每月檢視 `agent_action` insights 的 feedback_down + +## 實施計畫(階段性) + +- **Phase 1(本 ADR 同步提交)**:EventRouter 骨幹 + agent_actions stub + triaged_alert 模板 +- **Phase 2**:Hermes L1 接入 scheduler.run_auto_import_task + run_momo_task 兩個 exception +- **Phase 3**:NemoTron 擴充 3 個新 tool (retry/query_km/silence) +- **Phase 4**:依需求擴 L3 HITL 按鈕 +- **Phase 5**:Prometheus metric 接入(`agent_action_total{tier,agent,event_type}`、`agent_latency_seconds`) + +## References +- `services/event_router.py` — 分流入口(Phase 1) +- `services/agent_actions.py` — 安全 action 白名單(Phase 1) +- `services/telegram_templates.py::triaged_alert()` — L1/L2 訊息格式(Phase 1) +- `memory/feedback_agent_action_ladder.md` — 操作規範 +- `memory/reference_telegram_endpoints_map.md` — 21 個發送點盤點 diff --git a/services/agent_actions.py b/services/agent_actions.py new file mode 100644 index 0000000..3cfe50d --- /dev/null +++ b/services/agent_actions.py @@ -0,0 +1,175 @@ +""" +Agent Action 白名單(ADR-012 Phase 1 骨幹) + +L2 NemoTron 可安全呼叫的動作集合。嚴格限制: +- 只能寫 ai_insights 和發 Telegram +- 不可動 prod 資料表 / 容器 / 外部系統 +- 所有 action 必須 dual-write 審計軌跡 + +現階段為 **stub + 完整 interface**,供 event_router 串接。真實執行邏輯將於 Phase 3 填入。 +""" + +from __future__ import annotations + +import time +from datetime import datetime, timedelta +from typing import Any + +from services.logger_manager import SystemLogger + +sys_log = SystemLogger("AgentAction").get_logger() + +# 靜音表(記憶體快取,重啟後清空;Phase 3 可改 DB 持久化) +_silence_table: dict[str, datetime] = {} + + +def _audit(action: str, params: dict, result: dict, latency_ms: float) -> int | None: + """所有 action 統一審計入 ai_insights(ADR-007 Dual-Write)""" + try: + from services.openclaw_learning_service import store_insight + return store_insight( + insight_type="agent_action", + content=f"action={action} result={result.get('status', 'unknown')}", + period=datetime.now().strftime("%Y-%m-%d"), + metadata={ + "action": action, + "params": params, + "result": result, + "latency_ms": latency_ms, + "ts": datetime.now().isoformat(), + }, + ) + except Exception as e: + sys_log.error(f"[AgentAction] audit 失敗 action={action}: {e}") + return None + + +# ===================================================================== +# 🔁 retry_task — 安全重試(exponential backoff) +# ===================================================================== +def retry_task(task_name: str, max_attempts: int = 3, backoff_sec: int = 60) -> dict: + """ + 安全重試一個 scheduler task。Phase 1 stub:只記錄,不真正重試。 + Phase 3 將接入 scheduler.py 的 task dispatch。 + + 限制:task_name 必須在白名單內(避免任意程式碼執行) + """ + ALLOWED_TASKS = { + "run_auto_import_task", "run_momo_task", "run_edm_task", + "run_competitor_price_feeder_task", "run_backup_monitor_task", + "run_icaim_analysis_task", + } + t0 = time.time() + if task_name not in ALLOWED_TASKS: + result = {"status": "rejected", "reason": f"task '{task_name}' not in whitelist"} + _audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000) + sys_log.warning(f"[AgentAction] retry_task 拒絕:{task_name} 不在白名單") + return result + + # TODO Phase 3: 真實重試邏輯(呼叫 scheduler module 的 task function) + result = { + "status": "queued", + "task_name": task_name, + "max_attempts": max_attempts, + "backoff_sec": backoff_sec, + "note": "Phase 1 stub — 尚未真正重試,僅記錄意圖", + } + _audit("retry_task", {"task_name": task_name, "max_attempts": max_attempts}, + result, (time.time() - t0) * 1000) + sys_log.info(f"[AgentAction] retry_task 已排隊(stub): {task_name}") + return result + + +# ===================================================================== +# 🔍 query_km — RAG 查詢歷史同類事件 +# ===================================================================== +def query_km(query: str, insight_type: str | None = None, limit: int = 5) -> dict: + """透過 openclaw_learning_service.build_rag_context 找歷史同類事件""" + t0 = time.time() + try: + from services.openclaw_learning_service import build_rag_context + context = build_rag_context(query=query, insight_type=insight_type) + result = { + "status": "ok", + "query": query, + "context_preview": (context or "")[:500], + "has_results": bool(context and context.strip()), + } + except Exception as e: + result = {"status": "error", "error": str(e)[:200]} + sys_log.error(f"[AgentAction] query_km 失敗: {e}") + + _audit("query_km", {"query": query, "insight_type": insight_type, "limit": limit}, + result, (time.time() - t0) * 1000) + return result + + +# ===================================================================== +# 🔕 silence_alert — 靜音抑制(避免告警風暴) +# ===================================================================== +def silence_alert(event_key: str, duration_min: int = 60) -> dict: + """ + 對特定 event_key 設定靜音期限。EventRouter 在 dispatch 前會先檢查。 + event_key 建議格式:":",例: + "Scheduler.AutoImport:db_connection_error" + """ + t0 = time.time() + until = datetime.now() + timedelta(minutes=duration_min) + _silence_table[event_key] = until + result = {"status": "silenced", "event_key": event_key, "until": until.isoformat()} + _audit("silence_alert", {"event_key": event_key, "duration_min": duration_min}, + result, (time.time() - t0) * 1000) + sys_log.info(f"[AgentAction] silence_alert: {event_key} → 靜音至 {until.strftime('%H:%M')}") + return result + + +def is_silenced(event_key: str) -> bool: + """EventRouter 呼叫,判斷是否需略過此事件""" + until = _silence_table.get(event_key) + if until is None: + return False + if datetime.now() >= until: + _silence_table.pop(event_key, None) + return False + return True + + +# ===================================================================== +# 🏷️ 三個既有 NemoTron tool 的 wrapper(供 event_router 統一調用) +# ===================================================================== +def flag_for_human_review(sku: str, concern: str) -> dict: + """升級到 L3 HITL(包裝 NemoTron 既有 tool,保持呼叫介面一致)""" + t0 = time.time() + # TODO Phase 3: 接入 nemoton_dispatcher_service._exec_flag_for_human_review + result = {"status": "stub", "sku": sku, "concern": concern, + "note": "Phase 1 stub,Phase 3 接 NemoTron"} + _audit("flag_for_human_review", {"sku": sku, "concern": concern}, + result, (time.time() - t0) * 1000) + return result + + +def route_to_km(sku: str, domain: str, summary: str) -> dict: + """KM 歸檔(Phase 3 接 NemoTron)""" + t0 = time.time() + result = {"status": "stub", "note": "Phase 3 接 NemoTron"} + _audit("route_to_km", {"sku": sku, "domain": domain}, result, (time.time() - t0) * 1000) + return result + + +def mark_for_relearn(sku: str, reason: str) -> dict: + """標記重新訓練(Phase 3 接 NemoTron)""" + t0 = time.time() + result = {"status": "stub", "note": "Phase 3 接 NemoTron"} + _audit("mark_for_relearn", {"sku": sku, "reason": reason}, result, (time.time() - t0) * 1000) + return result + + +# 白名單(供 EventRouter / NemoTron 引用) +SAFE_ACTIONS: dict[str, Any] = { + "retry_task": retry_task, + "query_km": query_km, + "silence_alert": silence_alert, + "flag_for_human_review": flag_for_human_review, + "route_to_km": route_to_km, + "mark_for_relearn": mark_for_relearn, +} diff --git a/services/event_router.py b/services/event_router.py new file mode 100644 index 0000000..fa8b152 --- /dev/null +++ b/services/event_router.py @@ -0,0 +1,285 @@ +""" +EventRouter — 事件分流入口(ADR-012 Phase 1 骨幹) + +所有系統事件(exception / 排程完成 / 告警 / 資訊通報)**應**統一透過 +`dispatch(event)` 進入,由 EventRouter 依 severity × event_type 分流到: + L0 Direct / L1 Hermes Observer / L2 NemoTron Investigator / L3 OpenClaw Operator + +設計原則(ADR-012 §⑥):無論 AI 狀況,**通知鏈絕不中斷**。 +每一級失敗立即降級到下一級,最終保底 L0 直出模板。 + +Phase 1 實作範圍: +- 骨幹 + 分類邏輯 +- L0 模板直出(已可用) +- L1 Hermes / L2 NemoTron / L3 OpenClaw 為 stub(附 TODO 標記) +- 完整 fallback 鏈(AI 掛必降級) +- 靜音檢查 + Audit Trail +""" + +from __future__ import annotations + +import os +import time +from datetime import datetime +from enum import Enum +from typing import Any + +import requests + +from services.logger_manager import SystemLogger +from services import telegram_templates as tpl +from services import agent_actions + +sys_log = SystemLogger("EventRouter").get_logger() + + +class Tier(str, Enum): + L0_DIRECT = "L0" + L1_OBSERVER = "L1" + L2_INVESTIGATOR = "L2" + L3_OPERATOR = "L3" + + +class Severity(str, Enum): + INFO = "info" + SUCCESS = "success" + WARNING = "warning" + ALERT = "alert" # P0/P1 + + +# ===================================================================== +# 分類規則(ADR-012 §③) +# ===================================================================== +def _classify(event: dict) -> Tier: + sev = event.get("severity", "info") + has_trace = bool(event.get("trace")) + event_type = event.get("event_type", "") + + # L3 OpenClaw 由週期任務主動觸發(週報、Meta-Analysis),不走 router + # 這裡只處理 L0/L1/L2 + + if sev in (Severity.INFO, Severity.SUCCESS): + return Tier.L0_DIRECT + + if sev == Severity.WARNING: + # 有技術 trace → L1 Hermes 翻譯 + return Tier.L1_OBSERVER if has_trace else Tier.L0_DIRECT + + if sev == Severity.ALERT: + # 符合 L2 白名單 event_type → NemoTron 介入 + L2_EVENT_TYPES = { + "price_threat", "db_connection_error", "crawler_timeout", + "nim_quota_exhausted", "embedding_failure", + } + if event_type in L2_EVENT_TYPES: + return Tier.L2_INVESTIGATOR + return Tier.L1_OBSERVER + + return Tier.L0_DIRECT + + +# ===================================================================== +# 主入口 +# ===================================================================== +def dispatch(event: dict, admin_chat_ids: list[int] | None = None) -> dict: + """ + 主要入口。回傳 dict: {tier, sent, insight_id, errors, latency_ms} + + event 格式見 ADR-012 §③。必要欄位:source, event_type, severity, title, summary + 可選欄位:trace, payload, time + + admin_chat_ids 若未給,從 DB telegram_users where is_admin=true 取 + """ + t0 = time.time() + event_key = f"{event.get('source', '?')}:{event.get('event_type', '?')}" + + # 靜音檢查 + if agent_actions.is_silenced(event_key): + sys_log.info(f"[EventRouter] 事件被靜音略過: {event_key}") + return {"tier": "silenced", "sent": 0, "event_key": event_key} + + tier = _classify(event) + sys_log.info(f"[EventRouter] dispatch {event_key} → {tier.value}") + + # 執行對應 Tier + try: + if tier == Tier.L0_DIRECT: + text = _render_l0(event) + elif tier == Tier.L1_OBSERVER: + text = _render_l1_with_fallback(event) + elif tier == Tier.L2_INVESTIGATOR: + text = _render_l2_with_fallback(event) + else: + text = _render_l0(event) # 未知 Tier 保底 + except Exception as e: + sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}") + text = _render_l0(event) + + # 發送 Telegram + result = _send(text, admin_chat_ids) + result["tier"] = tier.value + result["event_key"] = event_key + result["latency_ms"] = round((time.time() - t0) * 1000, 1) + + # 審計(每次 dispatch 都入 KM) + _audit_dispatch(event, tier, result) + return result + + +# ===================================================================== +# Tier 渲染器 +# ===================================================================== +def _render_l0(event: dict) -> str: + """L0 直出:根據 severity 選用對應模板""" + sev = event.get("severity", "info") + title = event.get("title", "未命名事件") + module = event.get("source", "unknown") + summary = event.get("summary", "") + details = event.get("payload") if isinstance(event.get("payload"), dict) else None + + if sev == Severity.SUCCESS: + return tpl.success(title=title, module=module, stats=summary) + if sev == Severity.INFO: + return tpl.info(title=title, module=module, content=summary) + if sev == Severity.WARNING: + return tpl.warning(title=title, module=module, summary=summary, details=details) + # alert 但降級到 L0 + return tpl.alert( + title=title, module=module, + status=event.get("status", "未知"), + impact=event.get("impact", "未評估"), + summary=summary, + actions=event.get("suggested_actions"), + trace=event.get("trace"), + ) + + +def _render_l1_with_fallback(event: dict) -> str: + """L1 Hermes 翻譯 stack trace。Phase 1 stub — 直接降 L0 + 標記""" + # TODO Phase 2: 呼叫 Hermes 做 stack trace 翻譯與摘要 + try: + ai_summary = _hermes_observe(event) # stub + if ai_summary: + return _compose_triaged(event, tier_label="L1 · Hermes", ai_summary=ai_summary) + except Exception as e: + sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}") + + # Fallback:L0 模板 + 降級標記 + text = _render_l0(event) + return text + "\n\n🟡 _AI 分析暫不可用,以原始資料呈現_" + + +def _render_l2_with_fallback(event: dict) -> str: + """L2 NemoTron 介入(含 tool call)。Phase 1 stub — 降 L1""" + # TODO Phase 3: 呼叫 NemoTron dispatcher,允許執行 SAFE_ACTIONS 中的 tool + try: + ai_result = _nemoton_investigate(event) # stub + if ai_result: + return _compose_triaged( + event, tier_label="L2 · NemoTron", + ai_summary=ai_result.get("summary", ""), + ai_actions=ai_result.get("actions_taken", []), + ) + except Exception as e: + sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}") + + return _render_l1_with_fallback(event) + + +def _compose_triaged(event: dict, tier_label: str, ai_summary: str, + ai_actions: list | None = None) -> str: + """三層式訊息:AI 摘要 + 原始事實 + 建議行動(ADR-012 §④)""" + base = _render_l0(event) + parts = [f"🤖 *AI 摘要({tier_label}):*", ai_summary, ""] + if ai_actions: + parts.append("🛠️ *AI 已執行動作:*") + for a in ai_actions: + parts.append(f"• {a}") + parts.append("") + return base + "\n\n" + "\n".join(parts) + + +# ===================================================================== +# AI 介入 stub(Phase 2/3 填實作) +# ===================================================================== +def _hermes_observe(event: dict) -> str | None: + """Phase 1 stub:Phase 2 會呼叫 hermes_analyst_service""" + # 不呼叫 AI,回傳 None 讓上層降級 + return None + + +def _nemoton_investigate(event: dict) -> dict | None: + """Phase 1 stub:Phase 3 會呼叫 nemoton_dispatcher_service""" + return None + + +# ===================================================================== +# Telegram 發送 + Audit +# ===================================================================== +def _send(text: str, admin_chat_ids: list[int] | None) -> dict: + token = os.getenv("TELEGRAM_BOT_TOKEN") + if not token: + return {"sent": 0, "errors": ["TELEGRAM_BOT_TOKEN 未設定"]} + + if admin_chat_ids is None: + admin_chat_ids = _load_admin_chat_ids() + if not admin_chat_ids: + return {"sent": 0, "errors": ["無管理員 chat_id"]} + + url = f"https://api.telegram.org/bot{token}/sendMessage" + sent = 0 + errors = [] + for cid in admin_chat_ids: + try: + r = requests.post(url, json={ + "chat_id": int(cid), "text": text, "parse_mode": "Markdown", + }, timeout=10) + if r.ok: + sent += 1 + else: + errors.append(f"chat={cid} status={r.status_code}") + except Exception as e: + errors.append(f"chat={cid} exc={e}") + return {"sent": sent, "errors": errors} + + +def _load_admin_chat_ids() -> list[int]: + """從 DB 撈 is_admin=true 的 chat_id;fallback 到 .env TELEGRAM_CHAT_IDS""" + try: + from database.manager import get_session + from sqlalchemy import text as sa_text + session = get_session() + try: + rows = session.execute(sa_text( + "SELECT telegram_id FROM telegram_users WHERE is_active=true AND is_admin=true" + )).fetchall() + if rows: + return [int(r[0]) for r in rows] + finally: + session.close() + except Exception as e: + sys_log.warning(f"[EventRouter] 查 telegram_users 失敗,fallback .env: {e}") + + # Fallback 到 env + import re + raw = os.getenv("TELEGRAM_CHAT_IDS", "") + return [int(x.strip()) for x in re.sub(r'[\[\]"\' ]', "", raw).split(",") if x.strip()] + + +def _audit_dispatch(event: dict, tier: Tier, result: dict) -> None: + """每次 dispatch 都寫入 ai_insights 作為審計軌跡""" + try: + from services.openclaw_learning_service import store_insight + store_insight( + insight_type="agent_action", + content=f"dispatch tier={tier.value} event={event.get('event_type')}", + period=datetime.now().strftime("%Y-%m-%d"), + metadata={ + "tier": tier.value, + "event": {k: event.get(k) for k in ("source", "event_type", "severity", "title")}, + "result": {k: result.get(k) for k in ("sent", "latency_ms", "errors")}, + "ts": datetime.now().isoformat(), + }, + ) + except Exception as e: + sys_log.error(f"[EventRouter] audit 失敗: {e}") diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index dd4f227..1c473de 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -332,29 +332,34 @@ def _send_price_decision_requests(recs: list, period_str: str, source_insight_id sys_log.info(f"[OCStrategist] 降價決策推送 insight_id={rec_insight_id} → {len(admin_ids)} 位管理員") -def _notify_telegram_group(report_md: str, period_str: str): +def _notify_telegram_group(report_md: str, period_str: str, report_type: str = "週報") -> None: """ - 推送至 Telegram + 推送策略報告至 Telegram 群組(已套用 telegram_templates.report() 統一格式)。 + ADR-012 備註:週報類為 L3 OpenClaw 的週期性輸出,不經 event_router。 """ - bot_token = os.getenv('OPENCLAW_BOT_TOKEN', '8610496165:AAFOlcWV4oRUSC2TI-fYux7JV97fjNzsYR8') - chat_id = os.getenv('OPENCLAW_GROUP_ID', '-1003940688311') - - # Telegram 貼文過長則截斷 - msg = f"📣 **[Gemini 策略師] {period_str} 週報已出爐!**\n\n{report_md}" - if len(msg) > 3500: - msg = msg[:3500] + "\n\n... (報告過長已截斷,請至知識庫查看全文)" - - url = f"https://api.telegram.org/bot{bot_token}/sendMessage" - payload = { - "chat_id": chat_id, - "text": msg, - "parse_mode": "Markdown" - } + bot_token = os.getenv("TELEGRAM_BOT_TOKEN") or os.getenv("OPENCLAW_BOT_TOKEN") + chat_id = os.getenv("OPENCLAW_GROUP_ID", "-1003940688311") + if not bot_token: + sys_log.warning("[OCStrategist] TELEGRAM_BOT_TOKEN 未設定,略過週報推播") + return + + from services.telegram_templates import report as render_report + msg = render_report( + title="AI 策略報告已出爐", + report_type=report_type, + period=period_str, + content_md=report_md, + ) + try: - requests.post(url, json=payload, timeout=10) - sys_log.info("[OCStrategist] Telegram 週報推送成功。") + requests.post( + f"https://api.telegram.org/bot{bot_token}/sendMessage", + json={"chat_id": chat_id, "text": msg, "parse_mode": "Markdown"}, + timeout=10, + ) + sys_log.info(f"[OCStrategist] Telegram {report_type}推送成功") except Exception as e: - sys_log.error(f"[OCStrategist] Telegram 週報推送失敗: {e}") + sys_log.error(f"[OCStrategist] Telegram {report_type}推送失敗: {e}") def generate_meta_analysis_report() -> str: """