feat(ai-ops): Agent Action Ladder 骨幹(ADR-012 Phase 1)+ 週報套模板
All checks were successful
CD Pipeline / deploy (push) Successful in 1m14s

ADR-012 核心設計:
- 4 級信任邊界:L0 直出 / L1 Hermes 觀察 / L2 NemoTron 診斷執行 / L3 OpenClaw HITL
- 通知鏈絕不中斷:每級失敗立即降級,保底 L0 模板 + 🟡 標記
- Audit Trail:每次 dispatch 自動寫 ai_insights (insight_type=agent_action)
- 安全白名單:L2 可呼叫 6 個安全 action(retry/query_km/silence + 3 個既有 NemoTron tool)

新增檔案:
- services/event_router.py — 事件分流入口,按 severity × event_type 分 Tier
- services/agent_actions.py — 安全 action 白名單(Phase 1 stub + 完整介面)
- docs/adr/ADR-012-agent-action-ladder.md — 完整設計 + 分階段計畫

Phase 1 狀態:
- L0 直出完整可用 
- L1 Hermes / L2 NemoTron 為 stub(Phase 2/3 填實作)
- Fallback 降級鏈已完整 
- 靜音檢查(is_silenced)+ Audit Trail 已就緒 

處理既有 TODO:
- services/openclaw_strategist_service.py::_notify_telegram_group()
  改用 telegram_templates.report() 統一週報格式

全景盤點(新 memory):
- reference_telegram_endpoints_map.md — 21 個 Telegram 發送點
- feedback_agent_action_ladder.md — 操作規範
  (+ 既有 ADR-011 跨專案隔離規範一併生效)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-19 12:46:51 +08:00
parent 528a6c0468
commit 0b4f80ee8a
4 changed files with 626 additions and 19 deletions

View File

@@ -0,0 +1,142 @@
# ADR-012: Agent Action LadderAI Agent 事件介入與自動修復三級信任邊界)
- **Status**: Accepted
- **Date**: 2026-04-19
- **Deciders**: 統帥
- **Related**: ADR-001三 Agent 分工), ADR-004NemoTron Fallback, ADR-007AI Dual-Write, ADR-011跨專案隔離
## Context
P2/P3 Inline Keyboard 降價決策上線後,統帥希望進一步讓 AI AgentHermes / NemoTron / OpenClaw**自動接手處理**系統事件,而非僅發送原始錯誤訊息給人工讀。
但「AI 全自動執行」存在嚴重風險:
1. AI 幻覺可能做出錯誤判斷AI 下架正常商品、誤觸發降價)
2. 不可逆操作(刪資料、重啟生產容器)一旦出錯災難級
3. AI 服務本身可能掛掉NIM quota / Ollama OOM若無 fallback 會連帶阻斷通知
## Decision
### ① 三級信任邊界Action Ladder
建立 **L0 直出 → L1 觀察 → L2 診斷 → L3 執行** 四級分流,每級限定能做的動作與失效行為:
| Level | Agent | 能做什麼 | 不能做什麼 | 失效降級 |
|-------|-------|---------|-----------|---------|
| **L0 Direct** | — | 模板直出 | — | 永遠可用(保底) |
| **L1 Observer** | Hermes | 翻譯 stack trace、摘要、風險等級標註 | 寫任何資料 / 呼叫外部 API | → L0 + 🟡 標記 |
| **L2 Investigator** | NemoTron | 寫 `ai_insights`、執行 **5+3 個安全 tool**、發 Telegram | 動 prod 資料表 / 容器 / 外部系統 | → Hermes 規則引擎ADR-004|
| **L3 Operator** | OpenClaw | 提方案 + HITL 按鈕 → 人批准後執行任意動作 | 無 HITL 批准前執行 | → 人工 SOP寄信通知|
### ② 安全 Action 白名單L2 NemoTron 可用)
**已有price_threat 流):**
- `trigger_price_alert(sku, data)` — 發價格告警
- `add_to_recommendation(sku, reason)` — 加入推薦
- `flag_for_human_review(sku, concern)` — 升級 L3 HITL
- `route_to_km(sku, domain, summary)` — KM 歸檔
- `mark_for_relearn(sku, reason)` — 標記重新訓練
**本 ADR 新增(通用事件流):**
- `retry_task(task_name, max_attempts=3, backoff=60)` — 安全重試exponential backoff
- `query_km(query, limit=5)` — RAG 檢索歷史同類事件
- `silence_alert(event_key, duration_min=60)` — 靜音抑制,避免告警風暴
### ③ EventRouter 分類規則
單一入口 `services/event_router.py::dispatch(event)`,依 `severity × event_type` 決定 Tier
```
event = {
"source": "Scheduler.AutoImport", # 來源模組
"event_type": "db_connection_error", # 事件類型(供 L2 matching
"severity": "warning", # P0/alert, P1/alert, P2/warning, info, success
"title": "...", "summary": "...",
"trace": "...", # 可選
"payload": {...}, # 結構化資料
}
```
**分流邏輯:**
- `severity=success|info` → L0 直出
- `severity=warning``trace` 不存在 → L0已結構化不需 AI
- `severity=warning``trace` 存在 → L1Hermes 翻譯)
- `severity=alert(P1)` + 符合 L2 白名單 event_type → L2NemoTron
- `severity=alert(P0)` → L2 + 人工標記(雙軌)
- 複雜策略建議 / 週報 → L3 OpenClawP3 已實作)
### ④ 訊息呈現格式(三層式)
所有經 AI 加工的訊息**必須保留原始事實**,避免 AI 幻覺掩蓋真相:
```
⚠️ [EwoooC 警告] 自動匯入異常 (Level 1 · Hermes)
🕐 ... 📦 Scheduler.AutoImport
🤖 AI 摘要Hermes v3:
資料庫暫時斷線,疑似容器間 DNS 波動。
本週已發生 3 次,系統通常 2-5 分鐘自癒。
📊 原始事實:
• event_type: db_connection_error
• 影響: 當日業績匯入延遲
• 詳細 trace末段: ...
🔧 AI 建議行動:
• 等候自動重試
• 30 分鐘仍失敗 → 檢查 momo-pro_default 網路
```
L2 流程另加 **🤖 AI 已執行動作** 區塊retry_task / silence_alert 等)。
### ⑤ Audit Trail雙寫強制
每次 Agent 介入都要:
1.`ai_insights` (`insight_type='agent_action'`, metadata 含 `tier`, `agent`, `action_taken`, `confidence`, `latency_ms`)
2. Telegram 訊息末尾加隱藏式 `source_insight_id` 以便追蹤
### ⑥ Fallback 降級鏈SLA 保證)
```
L2 NemoTron 掛 → L1 Hermes規則模式→ L0 模板直出
L1 Hermes 掛 → L0 模板直出 + 🟡 「AI 分析暫不可用」
通知通道掛 → 本地 file queue 暫存ADR-009 pattern
```
**關鍵 SLA**:無論 AI 狀況,**通知鏈絕不中斷** — 這是 P0 底線。
### ⑦ 成本配額(漏斗型,沿用 ADR-001
| Agent | 日呼叫上限 | 超額動作 |
|-------|-----------|---------|
| Hermes本機 Ollama| 無限制 | — |
| NemoTronNIM cloud | 80 次/日(現值) | 走 Hermes 規則ADR-004 |
| OpenClawGemini | 依週期觸發 | 不適用(離線批次) |
## Consequences
**正面**
- 通知從「原始 stack trace 直丟」升級到「AI 摘要 + 原始事實 + 建議行動」
- 已知 known issue如 DNS 暫斷)可由 NemoTron 自動重試,無需人工介入
- 三級邊界清楚,審計可追溯(`ai_insights` 雙寫 + tier 標記)
- AI 掛掉有完整降級鏈,通知鏈不會塌
**負面 / 風險**
- EventRouter 增加一個中間層,延遲 2-15s依 Tier
- AI 成本上升Hermes 本機還好NIM quota 要盯)
- 新增的 3 個 L2 tool 未實戰測試,前 2 週需觀察
- 若 Hermes prompt 寫不好AI 摘要可能誤導 → 每月檢視 `agent_action` insights 的 feedback_down
## 實施計畫(階段性)
- **Phase 1本 ADR 同步提交)**EventRouter 骨幹 + agent_actions stub + triaged_alert 模板
- **Phase 2**Hermes L1 接入 scheduler.run_auto_import_task + run_momo_task 兩個 exception
- **Phase 3**NemoTron 擴充 3 個新 tool (retry/query_km/silence)
- **Phase 4**:依需求擴 L3 HITL 按鈕
- **Phase 5**Prometheus metric 接入(`agent_action_total{tier,agent,event_type}``agent_latency_seconds`
## References
- `services/event_router.py` — 分流入口Phase 1
- `services/agent_actions.py` — 安全 action 白名單Phase 1
- `services/telegram_templates.py::triaged_alert()` — L1/L2 訊息格式Phase 1
- `memory/feedback_agent_action_ladder.md` — 操作規範
- `memory/reference_telegram_endpoints_map.md` — 21 個發送點盤點

175
services/agent_actions.py Normal file
View File

@@ -0,0 +1,175 @@
"""
Agent Action 白名單ADR-012 Phase 1 骨幹)
L2 NemoTron 可安全呼叫的動作集合。嚴格限制:
- 只能寫 ai_insights 和發 Telegram
- 不可動 prod 資料表 / 容器 / 外部系統
- 所有 action 必須 dual-write 審計軌跡
現階段為 **stub + 完整 interface**,供 event_router 串接。真實執行邏輯將於 Phase 3 填入。
"""
from __future__ import annotations
import time
from datetime import datetime, timedelta
from typing import Any
from services.logger_manager import SystemLogger
sys_log = SystemLogger("AgentAction").get_logger()
# 靜音表記憶體快取重啟後清空Phase 3 可改 DB 持久化)
_silence_table: dict[str, datetime] = {}
def _audit(action: str, params: dict, result: dict, latency_ms: float) -> int | None:
"""所有 action 統一審計入 ai_insightsADR-007 Dual-Write"""
try:
from services.openclaw_learning_service import store_insight
return store_insight(
insight_type="agent_action",
content=f"action={action} result={result.get('status', 'unknown')}",
period=datetime.now().strftime("%Y-%m-%d"),
metadata={
"action": action,
"params": params,
"result": result,
"latency_ms": latency_ms,
"ts": datetime.now().isoformat(),
},
)
except Exception as e:
sys_log.error(f"[AgentAction] audit 失敗 action={action}: {e}")
return None
# =====================================================================
# 🔁 retry_task — 安全重試exponential backoff
# =====================================================================
def retry_task(task_name: str, max_attempts: int = 3, backoff_sec: int = 60) -> dict:
"""
安全重試一個 scheduler task。Phase 1 stub只記錄不真正重試。
Phase 3 將接入 scheduler.py 的 task dispatch。
限制task_name 必須在白名單內(避免任意程式碼執行)
"""
ALLOWED_TASKS = {
"run_auto_import_task", "run_momo_task", "run_edm_task",
"run_competitor_price_feeder_task", "run_backup_monitor_task",
"run_icaim_analysis_task",
}
t0 = time.time()
if task_name not in ALLOWED_TASKS:
result = {"status": "rejected", "reason": f"task '{task_name}' not in whitelist"}
_audit("retry_task", {"task_name": task_name}, result, (time.time() - t0) * 1000)
sys_log.warning(f"[AgentAction] retry_task 拒絕:{task_name} 不在白名單")
return result
# TODO Phase 3: 真實重試邏輯(呼叫 scheduler module 的 task function
result = {
"status": "queued",
"task_name": task_name,
"max_attempts": max_attempts,
"backoff_sec": backoff_sec,
"note": "Phase 1 stub — 尚未真正重試,僅記錄意圖",
}
_audit("retry_task", {"task_name": task_name, "max_attempts": max_attempts},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] retry_task 已排隊stub: {task_name}")
return result
# =====================================================================
# 🔍 query_km — RAG 查詢歷史同類事件
# =====================================================================
def query_km(query: str, insight_type: str | None = None, limit: int = 5) -> dict:
"""透過 openclaw_learning_service.build_rag_context 找歷史同類事件"""
t0 = time.time()
try:
from services.openclaw_learning_service import build_rag_context
context = build_rag_context(query=query, insight_type=insight_type)
result = {
"status": "ok",
"query": query,
"context_preview": (context or "")[:500],
"has_results": bool(context and context.strip()),
}
except Exception as e:
result = {"status": "error", "error": str(e)[:200]}
sys_log.error(f"[AgentAction] query_km 失敗: {e}")
_audit("query_km", {"query": query, "insight_type": insight_type, "limit": limit},
result, (time.time() - t0) * 1000)
return result
# =====================================================================
# 🔕 silence_alert — 靜音抑制(避免告警風暴)
# =====================================================================
def silence_alert(event_key: str, duration_min: int = 60) -> dict:
"""
對特定 event_key 設定靜音期限。EventRouter 在 dispatch 前會先檢查。
event_key 建議格式:"<source>:<event_type>",例:
"Scheduler.AutoImport:db_connection_error"
"""
t0 = time.time()
until = datetime.now() + timedelta(minutes=duration_min)
_silence_table[event_key] = until
result = {"status": "silenced", "event_key": event_key, "until": until.isoformat()}
_audit("silence_alert", {"event_key": event_key, "duration_min": duration_min},
result, (time.time() - t0) * 1000)
sys_log.info(f"[AgentAction] silence_alert: {event_key} → 靜音至 {until.strftime('%H:%M')}")
return result
def is_silenced(event_key: str) -> bool:
"""EventRouter 呼叫,判斷是否需略過此事件"""
until = _silence_table.get(event_key)
if until is None:
return False
if datetime.now() >= until:
_silence_table.pop(event_key, None)
return False
return True
# =====================================================================
# 🏷️ 三個既有 NemoTron tool 的 wrapper供 event_router 統一調用)
# =====================================================================
def flag_for_human_review(sku: str, concern: str) -> dict:
"""升級到 L3 HITL包裝 NemoTron 既有 tool保持呼叫介面一致"""
t0 = time.time()
# TODO Phase 3: 接入 nemoton_dispatcher_service._exec_flag_for_human_review
result = {"status": "stub", "sku": sku, "concern": concern,
"note": "Phase 1 stubPhase 3 接 NemoTron"}
_audit("flag_for_human_review", {"sku": sku, "concern": concern},
result, (time.time() - t0) * 1000)
return result
def route_to_km(sku: str, domain: str, summary: str) -> dict:
"""KM 歸檔Phase 3 接 NemoTron"""
t0 = time.time()
result = {"status": "stub", "note": "Phase 3 接 NemoTron"}
_audit("route_to_km", {"sku": sku, "domain": domain}, result, (time.time() - t0) * 1000)
return result
def mark_for_relearn(sku: str, reason: str) -> dict:
"""標記重新訓練Phase 3 接 NemoTron"""
t0 = time.time()
result = {"status": "stub", "note": "Phase 3 接 NemoTron"}
_audit("mark_for_relearn", {"sku": sku, "reason": reason}, result, (time.time() - t0) * 1000)
return result
# 白名單(供 EventRouter / NemoTron 引用)
SAFE_ACTIONS: dict[str, Any] = {
"retry_task": retry_task,
"query_km": query_km,
"silence_alert": silence_alert,
"flag_for_human_review": flag_for_human_review,
"route_to_km": route_to_km,
"mark_for_relearn": mark_for_relearn,
}

285
services/event_router.py Normal file
View File

@@ -0,0 +1,285 @@
"""
EventRouter — 事件分流入口ADR-012 Phase 1 骨幹)
所有系統事件exception / 排程完成 / 告警 / 資訊通報)**應**統一透過
`dispatch(event)` 進入,由 EventRouter 依 severity × event_type 分流到:
L0 Direct / L1 Hermes Observer / L2 NemoTron Investigator / L3 OpenClaw Operator
設計原則ADR-012 §⑥):無論 AI 狀況,**通知鏈絕不中斷**。
每一級失敗立即降級到下一級,最終保底 L0 直出模板。
Phase 1 實作範圍:
- 骨幹 + 分類邏輯
- L0 模板直出(已可用)
- L1 Hermes / L2 NemoTron / L3 OpenClaw 為 stub附 TODO 標記)
- 完整 fallback 鏈AI 掛必降級)
- 靜音檢查 + Audit Trail
"""
from __future__ import annotations
import os
import time
from datetime import datetime
from enum import Enum
from typing import Any
import requests
from services.logger_manager import SystemLogger
from services import telegram_templates as tpl
from services import agent_actions
sys_log = SystemLogger("EventRouter").get_logger()
class Tier(str, Enum):
L0_DIRECT = "L0"
L1_OBSERVER = "L1"
L2_INVESTIGATOR = "L2"
L3_OPERATOR = "L3"
class Severity(str, Enum):
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
ALERT = "alert" # P0/P1
# =====================================================================
# 分類規則ADR-012 §③)
# =====================================================================
def _classify(event: dict) -> Tier:
sev = event.get("severity", "info")
has_trace = bool(event.get("trace"))
event_type = event.get("event_type", "")
# L3 OpenClaw 由週期任務主動觸發週報、Meta-Analysis不走 router
# 這裡只處理 L0/L1/L2
if sev in (Severity.INFO, Severity.SUCCESS):
return Tier.L0_DIRECT
if sev == Severity.WARNING:
# 有技術 trace → L1 Hermes 翻譯
return Tier.L1_OBSERVER if has_trace else Tier.L0_DIRECT
if sev == Severity.ALERT:
# 符合 L2 白名單 event_type → NemoTron 介入
L2_EVENT_TYPES = {
"price_threat", "db_connection_error", "crawler_timeout",
"nim_quota_exhausted", "embedding_failure",
}
if event_type in L2_EVENT_TYPES:
return Tier.L2_INVESTIGATOR
return Tier.L1_OBSERVER
return Tier.L0_DIRECT
# =====================================================================
# 主入口
# =====================================================================
def dispatch(event: dict, admin_chat_ids: list[int] | None = None) -> dict:
"""
主要入口。回傳 dict: {tier, sent, insight_id, errors, latency_ms}
event 格式見 ADR-012 §③。必要欄位source, event_type, severity, title, summary
可選欄位trace, payload, time
admin_chat_ids 若未給,從 DB telegram_users where is_admin=true 取
"""
t0 = time.time()
event_key = f"{event.get('source', '?')}:{event.get('event_type', '?')}"
# 靜音檢查
if agent_actions.is_silenced(event_key):
sys_log.info(f"[EventRouter] 事件被靜音略過: {event_key}")
return {"tier": "silenced", "sent": 0, "event_key": event_key}
tier = _classify(event)
sys_log.info(f"[EventRouter] dispatch {event_key}{tier.value}")
# 執行對應 Tier
try:
if tier == Tier.L0_DIRECT:
text = _render_l0(event)
elif tier == Tier.L1_OBSERVER:
text = _render_l1_with_fallback(event)
elif tier == Tier.L2_INVESTIGATOR:
text = _render_l2_with_fallback(event)
else:
text = _render_l0(event) # 未知 Tier 保底
except Exception as e:
sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}")
text = _render_l0(event)
# 發送 Telegram
result = _send(text, admin_chat_ids)
result["tier"] = tier.value
result["event_key"] = event_key
result["latency_ms"] = round((time.time() - t0) * 1000, 1)
# 審計(每次 dispatch 都入 KM
_audit_dispatch(event, tier, result)
return result
# =====================================================================
# Tier 渲染器
# =====================================================================
def _render_l0(event: dict) -> str:
"""L0 直出:根據 severity 選用對應模板"""
sev = event.get("severity", "info")
title = event.get("title", "未命名事件")
module = event.get("source", "unknown")
summary = event.get("summary", "")
details = event.get("payload") if isinstance(event.get("payload"), dict) else None
if sev == Severity.SUCCESS:
return tpl.success(title=title, module=module, stats=summary)
if sev == Severity.INFO:
return tpl.info(title=title, module=module, content=summary)
if sev == Severity.WARNING:
return tpl.warning(title=title, module=module, summary=summary, details=details)
# alert 但降級到 L0
return tpl.alert(
title=title, module=module,
status=event.get("status", "未知"),
impact=event.get("impact", "未評估"),
summary=summary,
actions=event.get("suggested_actions"),
trace=event.get("trace"),
)
def _render_l1_with_fallback(event: dict) -> str:
"""L1 Hermes 翻譯 stack trace。Phase 1 stub — 直接降 L0 + 標記"""
# TODO Phase 2: 呼叫 Hermes 做 stack trace 翻譯與摘要
try:
ai_summary = _hermes_observe(event) # stub
if ai_summary:
return _compose_triaged(event, tier_label="L1 · Hermes", ai_summary=ai_summary)
except Exception as e:
sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}")
# FallbackL0 模板 + 降級標記
text = _render_l0(event)
return text + "\n\n🟡 _AI 分析暫不可用以原始資料呈現_"
def _render_l2_with_fallback(event: dict) -> str:
"""L2 NemoTron 介入(含 tool call。Phase 1 stub — 降 L1"""
# TODO Phase 3: 呼叫 NemoTron dispatcher允許執行 SAFE_ACTIONS 中的 tool
try:
ai_result = _nemoton_investigate(event) # stub
if ai_result:
return _compose_triaged(
event, tier_label="L2 · NemoTron",
ai_summary=ai_result.get("summary", ""),
ai_actions=ai_result.get("actions_taken", []),
)
except Exception as e:
sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}")
return _render_l1_with_fallback(event)
def _compose_triaged(event: dict, tier_label: str, ai_summary: str,
ai_actions: list | None = None) -> str:
"""三層式訊息AI 摘要 + 原始事實 + 建議行動ADR-012 §④)"""
base = _render_l0(event)
parts = [f"🤖 *AI 摘要({tier_label}*", ai_summary, ""]
if ai_actions:
parts.append("🛠️ *AI 已執行動作:*")
for a in ai_actions:
parts.append(f"{a}")
parts.append("")
return base + "\n\n" + "\n".join(parts)
# =====================================================================
# AI 介入 stubPhase 2/3 填實作)
# =====================================================================
def _hermes_observe(event: dict) -> str | None:
"""Phase 1 stubPhase 2 會呼叫 hermes_analyst_service"""
# 不呼叫 AI回傳 None 讓上層降級
return None
def _nemoton_investigate(event: dict) -> dict | None:
"""Phase 1 stubPhase 3 會呼叫 nemoton_dispatcher_service"""
return None
# =====================================================================
# Telegram 發送 + Audit
# =====================================================================
def _send(text: str, admin_chat_ids: list[int] | None) -> dict:
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
return {"sent": 0, "errors": ["TELEGRAM_BOT_TOKEN 未設定"]}
if admin_chat_ids is None:
admin_chat_ids = _load_admin_chat_ids()
if not admin_chat_ids:
return {"sent": 0, "errors": ["無管理員 chat_id"]}
url = f"https://api.telegram.org/bot{token}/sendMessage"
sent = 0
errors = []
for cid in admin_chat_ids:
try:
r = requests.post(url, json={
"chat_id": int(cid), "text": text, "parse_mode": "Markdown",
}, timeout=10)
if r.ok:
sent += 1
else:
errors.append(f"chat={cid} status={r.status_code}")
except Exception as e:
errors.append(f"chat={cid} exc={e}")
return {"sent": sent, "errors": errors}
def _load_admin_chat_ids() -> list[int]:
"""從 DB 撈 is_admin=true 的 chat_idfallback 到 .env TELEGRAM_CHAT_IDS"""
try:
from database.manager import get_session
from sqlalchemy import text as sa_text
session = get_session()
try:
rows = session.execute(sa_text(
"SELECT telegram_id FROM telegram_users WHERE is_active=true AND is_admin=true"
)).fetchall()
if rows:
return [int(r[0]) for r in rows]
finally:
session.close()
except Exception as e:
sys_log.warning(f"[EventRouter] 查 telegram_users 失敗fallback .env: {e}")
# Fallback 到 env
import re
raw = os.getenv("TELEGRAM_CHAT_IDS", "")
return [int(x.strip()) for x in re.sub(r'[\[\]"\' ]', "", raw).split(",") if x.strip()]
def _audit_dispatch(event: dict, tier: Tier, result: dict) -> None:
"""每次 dispatch 都寫入 ai_insights 作為審計軌跡"""
try:
from services.openclaw_learning_service import store_insight
store_insight(
insight_type="agent_action",
content=f"dispatch tier={tier.value} event={event.get('event_type')}",
period=datetime.now().strftime("%Y-%m-%d"),
metadata={
"tier": tier.value,
"event": {k: event.get(k) for k in ("source", "event_type", "severity", "title")},
"result": {k: result.get(k) for k in ("sent", "latency_ms", "errors")},
"ts": datetime.now().isoformat(),
},
)
except Exception as e:
sys_log.error(f"[EventRouter] audit 失敗: {e}")

View File

@@ -332,29 +332,34 @@ def _send_price_decision_requests(recs: list, period_str: str, source_insight_id
sys_log.info(f"[OCStrategist] 降價決策推送 insight_id={rec_insight_id}{len(admin_ids)} 位管理員")
def _notify_telegram_group(report_md: str, period_str: str):
def _notify_telegram_group(report_md: str, period_str: str, report_type: str = "週報") -> None:
"""
推送至 Telegram
推送策略報告至 Telegram 群組(已套用 telegram_templates.report() 統一格式)。
ADR-012 備註:週報類為 L3 OpenClaw 的週期性輸出,不經 event_router。
"""
bot_token = os.getenv('OPENCLAW_BOT_TOKEN', '8610496165:AAFOlcWV4oRUSC2TI-fYux7JV97fjNzsYR8')
chat_id = os.getenv('OPENCLAW_GROUP_ID', '-1003940688311')
# Telegram 貼文過長則截斷
msg = f"📣 **[Gemini 策略師] {period_str} 週報已出爐!**\n\n{report_md}"
if len(msg) > 3500:
msg = msg[:3500] + "\n\n... (報告過長已截斷,請至知識庫查看全文)"
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": msg,
"parse_mode": "Markdown"
}
bot_token = os.getenv("TELEGRAM_BOT_TOKEN") or os.getenv("OPENCLAW_BOT_TOKEN")
chat_id = os.getenv("OPENCLAW_GROUP_ID", "-1003940688311")
if not bot_token:
sys_log.warning("[OCStrategist] TELEGRAM_BOT_TOKEN 未設定,略過週報推播")
return
from services.telegram_templates import report as render_report
msg = render_report(
title="AI 策略報告已出爐",
report_type=report_type,
period=period_str,
content_md=report_md,
)
try:
requests.post(url, json=payload, timeout=10)
sys_log.info("[OCStrategist] Telegram 週報推送成功。")
requests.post(
f"https://api.telegram.org/bot{bot_token}/sendMessage",
json={"chat_id": chat_id, "text": msg, "parse_mode": "Markdown"},
timeout=10,
)
sys_log.info(f"[OCStrategist] Telegram {report_type}推送成功")
except Exception as e:
sys_log.error(f"[OCStrategist] Telegram 週報推送失敗: {e}")
sys_log.error(f"[OCStrategist] Telegram {report_type}推送失敗: {e}")
def generate_meta_analysis_report() -> str:
"""