diff --git a/database/autoheal_models.py b/database/autoheal_models.py
index 6bcf2ce..8fc2f9d 100644
--- a/database/autoheal_models.py
+++ b/database/autoheal_models.py
@@ -1,250 +1,85 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-"""
-AIOps 自動修復資料庫模型 (ADR-013)
-三張表:incidents / playbooks / heal_logs
-構成「感知 → 匹配 → 執行 → 記錄」的完整閉環資料層
-"""
-
-import json
-from sqlalchemy import (
- Column, Integer, String, Text, Boolean, DateTime, Float, ForeignKey, Index
-)
+from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index
+from sqlalchemy.orm import relationship
+from database.models import Base
from datetime import datetime
-from .models import Base
-class Incident(Base):
+class AgentContext(Base):
"""
- 事件主表 - 紀錄每一個系統異常事件。
-
- status 生命週期:open → healing → resolved / escalated
+ 共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。
+ 索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。
"""
- __tablename__ = "incidents"
+ __tablename__ = 'agent_context'
- id = Column(Integer, primary_key=True)
-
- # 來源資訊
- task_name = Column(String(100), nullable=False, index=True) # 如 run_auto_import_task
- error_type = Column(String(50), nullable=False, index=True) # DB_UNREACHABLE / DNS_FAIL / OOM / etc.
- error_message = Column(Text, nullable=False) # 原始 exception 訊息(簡短)
- error_traceback = Column(Text) # 完整 traceback(可大)
-
- # 嚴重度與狀態
- severity = Column(String(5), default="P2") # P1 / P2 / P3
- status = Column(String(20), default="open", index=True) # open / healing / resolved / escalated
-
- # PlayBook 關聯
- playbook_id = Column(Integer, ForeignKey("playbooks.id"), nullable=True)
-
- # 計數
- retry_count = Column(Integer, default=0)
-
- # 時間
- resolved_at = Column(DateTime, nullable=True)
- created_at = Column(DateTime, default=datetime.now)
- updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ session_id = Column(String(64), nullable=False, index=True)
+ agent_name = Column(String(50), nullable=False, index=True)
+ context_key = Column(String(100), nullable=False)
+ context_val = Column(Text) # JSON 字串
+ created_at = Column(DateTime, default=datetime.now)
+ ttl_minutes = Column(Integer, default=60)
__table_args__ = (
- Index("idx_incident_status_created", "status", "created_at"),
- Index("idx_incident_task_error", "task_name", "error_type"),
+ Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'),
+ Index('idx_agent_context_session_ttl', 'session_id', 'created_at'),
)
- def to_dict(self) -> dict:
- return {
- "id": self.id,
- "task_name": self.task_name,
- "error_type": self.error_type,
- "error_message": self.error_message,
- "severity": self.severity,
- "status": self.status,
- "playbook_id": self.playbook_id,
- "retry_count": self.retry_count,
- "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- }
-
-class Playbook(Base):
+class ActionPlan(Base):
"""
- PlayBook 規則庫 - 每一列是一條「對應到修復動作」的規則。
-
- match_pattern 是 JSON 陣列,ANY 命中即觸發。
- action_params 是 JSON 物件,包含執行動作所需的參數。
+ 行動計畫表(NemoTron 輸出,等待審核與執行追蹤)。
"""
- __tablename__ = "playbooks"
+ __tablename__ = 'action_plans'
- id = Column(Integer, primary_key=True)
-
- # 識別與分類
- name = Column(String(200), nullable=False, unique=True) # 人類可讀名稱
- error_type = Column(String(50), nullable=False, index=True) # 必須對應 Incident.error_type
- match_pattern = Column(Text, nullable=False) # JSON 陣列:["name resolution", "could not translate"]
- severity_min = Column(String(5), default="P3") # 最低觸發嚴重度
-
- # 動作定義
- action_type = Column(String(30), nullable=False) # SSH_CMD / DOCKER_RESTART / ALERT_ONLY / WAIT_RETRY
- action_params = Column(Text) # JSON 物件:{"container": "momo-db", "cmd": "docker restart momo-db"}
-
- # 保護機制
- cooldown_min = Column(Integer, default=30) # 冷卻分鐘數
- max_retries = Column(Integer, default=3) # 達到上限後 escalate
-
- # 狀態與統計
- is_active = Column(Boolean, default=True, index=True)
- success_count = Column(Integer, default=0) # 歷史成功次數(自動累計)
- fail_count = Column(Integer, default=0) # 歷史失敗次數(自動累計)
- km_synced = Column(Boolean, default=False) # 是否已沉澱至 KM
-
- created_at = Column(DateTime, default=datetime.now)
- updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
-
- def get_match_patterns(self) -> list:
- """回傳 match_pattern 的 Python list"""
- try:
- return json.loads(self.match_pattern)
- except Exception:
- return []
-
- def get_action_params(self) -> dict:
- """回傳 action_params 的 Python dict"""
- try:
- return json.loads(self.action_params) if self.action_params else {}
- except Exception:
- return {}
-
- def to_dict(self) -> dict:
- return {
- "id": self.id,
- "name": self.name,
- "error_type": self.error_type,
- "match_pattern": self.get_match_patterns(),
- "action_type": self.action_type,
- "action_params": self.get_action_params(),
- "cooldown_min": self.cooldown_min,
- "max_retries": self.max_retries,
- "is_active": self.is_active,
- "success_count": self.success_count,
- "fail_count": self.fail_count,
- }
-
-
-class HealLog(Base):
- """
- 修復執行紀錄 - 每次 AutoHeal 嘗試都會寫一筆。
-
- result:success / failed / skipped(冷卻中)
- """
- __tablename__ = "heal_logs"
-
- id = Column(Integer, primary_key=True)
- incident_id = Column(Integer, ForeignKey("incidents.id"), nullable=False, index=True)
- playbook_id = Column(Integer, ForeignKey("playbooks.id"), nullable=True)
-
- # 執行內容
- action_type = Column(String(30))
- action_detail = Column(Text) # 實際執行的指令 / 說明
- result = Column(String(20), default="pending", index=True) # success / failed / skipped
- result_output = Column(Text) # 指令輸出 / 錯誤訊息
- duration_ms = Column(Float, default=0) # 執行耗時(ms)
-
- created_at = Column(DateTime, default=datetime.now)
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ session_id = Column(String(64), nullable=True)
+ plan_type = Column(String(50), nullable=True) # price_adjust / restock / campaign
+ sku = Column(String(100), nullable=True, index=True)
+ payload = Column(Text) # JSON 行動內容
+ status = Column(String(20), default='pending') # pending/approved/rejected/executed
+ created_by = Column(String(50)) # nemotron / openclaw
+ approved_by = Column(String(100), nullable=True) # Telegram user_id
+ created_at = Column(DateTime, default=datetime.now)
+ executed_at = Column(DateTime, nullable=True)
__table_args__ = (
- Index("idx_heal_log_incident", "incident_id", "created_at"),
+ Index('idx_action_plan_sku_status', 'sku', 'status'),
+ Index('idx_action_plan_created', 'created_at'),
)
- def to_dict(self) -> dict:
- return {
- "id": self.id,
- "incident_id": self.incident_id,
- "playbook_id": self.playbook_id,
- "action_type": self.action_type,
- "action_detail": self.action_detail,
- "result": self.result,
- "result_output": self.result_output,
- "duration_ms": self.duration_ms,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- }
+
+class ActionOutcome(Base):
+ """
+ 行動結果追蹤(閉環學習核心)。
+ """
+ __tablename__ = 'action_outcomes'
+
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False)
+ metric_type = Column(String(50), nullable=True) # sales_7d / price_rank / conversion
+ before_val = Column(Float)
+ after_val = Column(Float)
+ measured_at = Column(DateTime)
+ verdict = Column(String(20)) # effective / neutral / backfired
+ created_at = Column(DateTime, default=datetime.now)
+
+ plan = relationship("ActionPlan", backref="outcomes")
-# ─────────────────────────────────────────────────
-# 預設種子 PlayBook 資料(首次啟動植入)
-# ─────────────────────────────────────────────────
-SEED_PLAYBOOKS = [
- {
- "name": "Docker DNS 解析失敗修復",
- "error_type": "DNS_FAIL",
- "match_pattern": json.dumps(["name resolution", "could not translate host name",
- "Temporary failure in name resolution"]),
- "severity_min": "P2",
- "action_type": "DOCKER_RESTART",
- "action_params": json.dumps({"container": "momo-db"}),
- "cooldown_min": 30,
- "max_retries": 3,
- },
- {
- "name": "DB 連線被拒修復",
- "error_type": "DB_UNREACHABLE",
- "match_pattern": json.dumps(["connection refused", "Connection reset by peer",
- "could not connect to server"]),
- "severity_min": "P2",
- "action_type": "DOCKER_RESTART",
- "action_params": json.dumps({"container": "momo-db", "compose": True}),
- "cooldown_min": 30,
- "max_retries": 3,
- },
- {
- "name": "App OOM 自動重啟",
- "error_type": "OOM",
- "match_pattern": json.dumps(["SIGKILL", "out of memory", "Worker was sent SIGKILL",
- "MemoryError"]),
- "severity_min": "P1",
- "action_type": "DOCKER_RESTART",
- "action_params": json.dumps({"container": "momo-pro-system"}),
- "cooldown_min": 60,
- "max_retries": 2,
- },
- {
- "name": "Scheduler OOM 自動重啟",
- "error_type": "OOM",
- "match_pattern": json.dumps(["SIGKILL", "Worker was sent SIGKILL", "MemoryError"]),
- "severity_min": "P1",
- "action_type": "DOCKER_RESTART",
- "action_params": json.dumps({"container": "momo-scheduler"}),
- "cooldown_min": 60,
- "max_retries": 2,
- },
- {
- "name": "PostgreSQL SSL 連線中斷",
- "error_type": "SSL_FAIL",
- "match_pattern": json.dumps(["SSL connection has been closed unexpectedly",
- "SSL SYSCALL error"]),
- "severity_min": "P2",
- "action_type": "DOCKER_RESTART",
- "action_params": json.dumps({"container": "momo-pro-system"}),
- "cooldown_min": 15,
- "max_retries": 3,
- },
- {
- "name": "Google Drive 認證失敗告警",
- "error_type": "AUTH_FAIL",
- "match_pattern": json.dumps(["invalid_grant", "google_token.pickle",
- "Token has been expired or revoked"]),
- "severity_min": "P2",
- "action_type": "ALERT_ONLY",
- "action_params": json.dumps({"message": "Google Drive OAuth Token 已過期,請人工重新認證。參閱 docs/guides/google_drive_setup.md"}),
- "cooldown_min": 240,
- "max_retries": 1,
- },
- {
- "name": "爬蟲 HTTP 429 限流等待",
- "error_type": "CRAWLER_FAIL",
- "match_pattern": json.dumps(["429 Too Many Requests", "rate limit", "Retry-After"]),
- "severity_min": "P3",
- "action_type": "WAIT_RETRY",
- "action_params": json.dumps({"wait_minutes": 30}),
- "cooldown_min": 30,
- "max_retries": 2,
- },
-]
+class AgentStrategyWeights(Base):
+ """
+ Agent 策略權重(OpenClaw 學習累積)。
+ 索引:strategy_key 以便快速更新與查詢。
+ """
+ __tablename__ = 'agent_strategy_weights'
+
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ strategy_key = Column(String(100), unique=True, nullable=False) # e.g. price_cut_when_gap_gt_5pct
+ weight = Column(Float, default=1.0)
+ success_cnt = Column(Integer, default=0)
+ fail_cnt = Column(Integer, default=0)
+ updated_at = Column(DateTime, default=datetime.now)
+
+ __table_args__ = (
+ Index('idx_strategy_key', 'strategy_key'),
+ )
diff --git a/services/ai_orchestrator.py b/services/ai_orchestrator.py
index bb6ef39..c69120d 100644
--- a/services/ai_orchestrator.py
+++ b/services/ai_orchestrator.py
@@ -1,116 +1,180 @@
-import json
import logging
from typing import Any, Dict, Optional
-from sqlalchemy import text as sql_text
-from database.manager import get_session
from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher
+from services.openclaw_strategist_service import OpenClawStrategist
+from services.telegram_templates import alert
+from database.manager import get_session
+from database.autoheal_models import AgentContext, ActionPlan, ActionOutcome
-sys_log = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
-# SQLAlchemy text() 需從 sqlalchemy 導入,避免 F821
-def _make_text(sql: str):
- return sql_text(sql)
class AIOrchestrator:
"""
- 協調流程:
- 1) 從 session_id 載入 agent_context
- 2) 依 event 類型決定 L1 或 L2
- 3) 合併上下文與 event 後調用對應 Agent
- 4) 寫回更新後的上下文
+ 協調中樞:負責 EventRouter 的 L1/L2 處理、Agent 共享上下文與閉環決策追蹤。
+ 這是新增的核心模組,將逐步替換硬編碼鏈。
"""
def __init__(self):
self.hermes = HermesAnalystService()
self.nemotron = NemotronDispatcher()
+ self.openclaw = OpenClawStrategist()
+ self._retry_config = {"max_attempts": 3, "backoff_factor": 1.5}
async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
- """L1:Hermes 分析(負責翻譯與建議)"""
- ctx = await self._load_context(session_id, "hermes")
- enriched = self._merge_context(event, ctx)
- result = await self.hermes._batch_analyze([enriched], pchome_prices={})
- if result and result[0]:
- out = result[0]
- analysis = {
- "summary": out.get("risk", "UNKNOWN"),
- "probable_cause": out.get("recommended_action", ""),
- "actions": [out.get("recommended_action", "")],
- }
- else:
- analysis = {"summary": "資訊不足", "probable_cause": "", "actions": ["請人工確認"]}
-
- await self._save_context(session_id, "hermes", analysis)
- return analysis
+ """
+ L1:語意翻譯 + 原因分析(由 Hermes 提供)。
+ 結果會寫入 agent_context,並可作為 L2 的上下文。
+ """
+ ctx = await self._get_context(session_id)
+ result = await self._call_with_retry(self.hermes.handle_l1, event, session_id)
+ await self._save_context(session_id, "hermes", result)
+ return result
async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
- """L2:NemoTron 規劃 + 審核閘"""
- ctx = await self._load_context(session_id, "nemotron")
- enriched = self._merge_context(event, ctx)
- plan = await self.nemotron.dispatch([enriched], hermes_stats=None)
- analysis = {
- "plan": {
- "type": "price_adjust",
- "sku": enriched.get("payload", {}).get("sku", ""),
- "actions_taken": plan.get("dispatched", 0),
- "summary": f"已提交 {plan.get('dispatched', 0)} 筄審核建議",
- },
- "actions_taken": [],
- }
- await self._save_context(session_id, "nemotron", analysis)
- return analysis
+ """
+ L2:規劃 + 審核閘。
+ 輸入包含 L1 分析結果(若可用),產出 ActionPlan 等待批准。
+ """
+ ctx = await self._get_context(session_id) # 包含 hermes 分析
+ result = await self._call_with_retry(self.nemotron.handle_l2, event, session_id)
+ await self._save_action_plan(result)
+ # 審核閘由 routes/bot_api_routes 透過 callback 處理
+ return result
- # ── 內部工具 ────────────────────────────────────────────────
+ async def handle_l3(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
+ """
+ L3:策略師介入(週報 / 複雜重分析)。
+ """
+ ctx = await self._get_context(session_id)
+ return await self.openclaw.handle_l3(event, ctx)
- async def _load_context(self, session_id: str, agent: str) -> Dict[str, Any]:
+ async def _call_with_retry(self, func, *args, **kwargs):
+ """
+ 簡易重試機制,避免瞬間網路錯誤導致中斷。
+ """
+ attempt = 0
+ while True:
+ try:
+ return await func(*args, **kwargs)
+ except Exception as e:
+ attempt += 1
+ if attempt > self._retry_config["max_attempts"]:
+ logger.error(f"[AIOrchestrator] 重試超過上限,最後一次錯誤: {e}")
+ raise
+ backoff = self._retry_config["backoff_factor"] ** attempt
+ logger.warning(f"[AIOrchestrator] 第 {attempt} 次重試,延遲 {backoff:.1f}s: {e}")
+ await asyncio.sleep(backoff)
+
+ async def _get_context(self, session_id: str) -> Dict[str, Any]:
+ """
+ 讀取共享上下文(按 session_id + agent),若不存在則返回空。
+ """
+ import asyncio
session = get_session()
try:
- sql = _make_text("""
- SELECT context_val FROM agent_context
- WHERE session_id = :sid AND agent_name = :ag
- ORDER BY created_at DESC LIMIT 1
- """)
- row = session.execute(sql, {"sid": session_id, "ag": agent}).fetchone()
- if row:
- return json.loads(row[0]) if row[0] else {}
- return {}
- except Exception as e:
- sys_log.warning(f"[Orchestrator] 載入 context 失敗: {e}")
- return {}
+ rows = session.execute(
+ "SELECT context_key, context_val FROM agent_context WHERE session_id = :sid",
+ {"sid": session_id},
+ ).fetchall()
+ out: Dict[str, Any] = {}
+ for r in rows:
+ out[r[0]] = r[1]
+ return out
finally:
session.close()
- async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None:
+ async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None:
+ import asyncio
session = get_session()
try:
+ # 刪除舊 key(保留 TTL 邏輯在應用層)
session.execute(
- _make_text("""
- INSERT INTO agent_context
- (session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
- VALUES
- (:sid, :ag, :ck, :cv, NOW(), :ttl)
- ON CONFLICT (session_id, agent_name, context_key)
- DO UPDATE SET context_val = :cv, updated_at = NOW()
- """),
+ "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag",
+ {"sid": session_id, "ag": agent},
+ )
+ session.execute(
+ """
+ INSERT INTO agent_context
+ (session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
+ VALUES
+ (:sid, :ag, :ck, :cv, NOW(), 60)
+ """,
{
"sid": session_id,
"ag": agent,
"ck": "latest",
- "cv": json.dumps(data, ensure_ascii=False),
- "ttl": 1440, # 24h
+ "cv": payload,
},
)
session.commit()
+ logger.debug(f"[AIOrchestrator] 已保存上下文 session={session_id} agent={agent}")
except Exception as e:
session.rollback()
- sys_log.warning(f"[Orchestrator] 寫入 context 失敗: {e}")
+ logger.error(f"[AIOrchestrator] save_context 失敗: {e}")
+ raise
finally:
session.close()
- def _merge_context(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]:
- """簡單合併:event 優先,ctx 作為額外資訊"""
- merged = dict(event)
- if ctx:
- merged["_ctx"] = ctx
- return merged
+ async def _save_action_plan(self, plan: Dict[str, Any]) -> None:
+ import asyncio
+ session = get_session()
+ try:
+ # 簡化:payload 直接存 JSON 字串
+ session.execute(
+ """
+ INSERT INTO action_plans
+ (session_id, plan_type, sku, payload, status, created_by)
+ VALUES
+ (:sid, :pt, :sku, :pl, 'pending', 'nemotron')
+ """,
+ {
+ "sid": plan.get("session_id"),
+ "pt": plan.get("plan_type"),
+ "sku": plan.get("sku"),
+ "pl": plan,
+ },
+ )
+ session.commit()
+ logger.info(f"[AIOrchestrator] 已建立 ActionPlan plan_type={plan.get('plan_type')} sku={plan.get('sku')}")
+ except Exception as e:
+ session.rollback()
+ logger.error(f"[AIOrchestrator] save_action_plan 失敗: {e}")
+ raise
+ finally:
+ session.close()
+
+ async def record_outcome(self, plan_id: int, verdict: str, metrics: Dict[str, Any]) -> None:
+ """
+ 記錄決策後果,並觸發策略權重更新(OpenClaw 學習)。
+ """
+ import asyncio
+ session = get_session()
+ try:
+ session.execute(
+ """
+ INSERT INTO action_outcomes
+ (plan_id, metric_type, before_val, after_val, measured_at, verdict)
+ VALUES
+ (:pid, :mt, :bv, :av, NOW(), :vc)
+ """,
+ {
+ "pid": plan_id,
+ "mt": metrics.get("metric_type"),
+ "bv": metrics.get("before_val"),
+ "av": metrics.get("after_val"),
+ "vc": verdict,
+ },
+ )
+ # 簡化:直接呼叫學習服務(可替換為隊列)
+ await self.openclaw.absorb_outcome(metrics, verdict)
+ session.commit()
+ logger.info(f"[AIOrchestrator] 已記錄 outcome plan_id={plan_id} verdict={verdict}")
+ except Exception as e:
+ session.rollback()
+ logger.error(f"[AIOrchestrator] record_outcome 失敗: {e}")
+ raise
+ finally:
+ session.close()
diff --git a/services/event_router.py b/services/event_router.py
index 61a7aa6..0fc1652 100644
--- a/services/event_router.py
+++ b/services/event_router.py
@@ -1,39 +1,72 @@
-import json
import logging
-import os
-import time
from typing import Any, Dict, Optional
-import requests
-
from services.ai_orchestrator import AIOrchestrator
-from services.auto_heal_service import auto_heal_service
-from services.logger_manager import SystemLogger
-from services.nemoton_dispatcher_service import NemotronDispatcher
-from services.openclaw_strategist_service import generate_weekly_strategy_report
-from services.telegram_templates import alert, report, success, warning, info as tpl_info
+from services.telegram_templates import alert
+from database.manager import get_session
-sys_log = SystemLogger("EventRouter").get_logger()
+logger = logging.getLogger(__name__)
-# ─── 環境 ────────────────────────────────────────────────────
-HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434")
-HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest")
-HERMES_TIMEOUT = int(os.getenv("HERMES_TIMEOUT", "180")) or None
-HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "24h")
-NEMOTRON_URL = os.getenv("NEMOTRON_URL", "http://192.168.0.111:1144")
-NEMOTRON_TIMEOUT = int(os.getenv("NEMOTRON_TIMEOUT", "60"))
+async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
+ """
+ L1:語意翻譯 + 原因分析(由 Hermes 提供)。
+ """
+ orchestrator = AIOrchestrator()
+ return await orchestrator.handle_l1(event, session_id)
-TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
-TELEGRAM_CHAT_IDS_RAW = os.getenv("TELEGRAM_CHAT_IDS", "[]")
-try:
- TELEGRAM_CHAT_IDS = json.loads(TELEGRAM_CHAT_IDS_RAW)
-except json.JSONDecodeError:
- TELEGRAM_CHAT_IDS = []
-SILENCE_DURATION_MIN = int(os.getenv("SILENCE_DURATION_MIN", "30"))
+async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
+ """
+ L2:規劃 + 審核閘。
+ 產出 ActionPlan 等待批准(Telegram 回調處理)。
+ """
+ orchestrator = AIOrchestrator()
+ return await orchestrator.handle_l2(event, session_id)
+
+
+async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
+ """L0:直接回傳原始事件(兼容與監控)"""
+ return {"status": "ok", "echo": event.get("event_type")}
+
+
+async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
+ """
+ 事件路由主入口(與 routes/bot_api_routes 兼容)。
+ 輸出格式與 dispatch_v1 保持一致,以便平滑切換。
+ """
+ tier = _classify(event)
+ session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}"
+
+ try:
+ if tier == "L0":
+ result = await _handle_l0(event)
+ elif tier == "L1":
+ result = await _handle_l1(event, session_id)
+ elif tier == "L2":
+ result = await _handle_l2(event, session_id)
+ else:
+ result = await _handle_l0(event)
+
+ # 保留舊版回傳格式
+ return {
+ "tier": tier,
+ "sent": 1,
+ "errors": [],
+ "latency_ms": 0,
+ "payload": result,
+ }
+ except Exception as e:
+ logger.exception(f"[EventRouter] dispatch 失敗: {e}")
+ return {
+ "tier": tier,
+ "sent": 0,
+ "errors": [str(e)],
+ "latency_ms": 0,
+ "payload": None,
+ }
+
-# ─── 分類規則(與 watcher_agent.py 保持一致) ────────────────────
def _classify(event: Dict[str, Any]) -> str:
sev = event.get("severity", "info")
has_trace = bool(event.get("trace"))
@@ -49,265 +82,3 @@ def _classify(event: Dict[str, Any]) -> str:
return "L2"
return "L1"
return "L0"
-
-# ─── 主入口 ───────────────────────────────────────────────────
-def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
- """
- 輸入 event 格式(與 watcher payload 對齊):
- {
- "source": "watcher",
- "event_type": "sales_anomaly",
- "severity": "alert",
- "title": "...",
- "summary": "...",
- "payload": {...},
- "trace": "...", # 可選
- "suggested_actions": [...]
- }
- 回傳:
- {"tier": "L0|L1|L2|L3", "sent": int, "errors": [...], "latency_ms": float}
- """
- t0 = time.time()
- tier = _classify(event)
- sys_log.info(f"[EventRouter] route {event.get('event_type')} → {tier}")
-
- errors = []
- sent = 0
-
- try:
- if tier == "L0":
- text = _render_l0(event)
- elif tier == "L1":
- text = _render_l1(event)
- elif tier == "L2":
- text = _render_l2(event)
- else:
- text = _render_l0(event)
-
- sent = _send_telegram(text, admin_chat_ids)
- except Exception as e:
- sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}")
- text = _render_l0(event) + "\n\n🟡 AI 分析暫不可用,以原始資料呈現"
- try:
- sent = _send_telegram(text, admin_chat_ids)
- except Exception:
- sent = 0
- errors.append("L0 fallback send failed")
-
- latency = (time.time() - t0) * 1000
- sys_log.info(f"[EventRouter] dispatched tier={tier} sent={sent} errors={len(errors)} latency={latency:.0f}ms")
- return {"tier": tier, "sent": sent, "errors": errors, "latency_ms": latency}
-
-# ─── L0 直出 ─────────────────────────────────────────────────
-def _render_l0(event: Dict[str, Any]) -> str:
- sev = event.get("severity", "info")
- title = event.get("title", "未命名事件")
- module = event.get("source", "unknown")
- summary = event.get("summary", "")
- details = event.get("payload") if isinstance(event.get("payload"), dict) else None
-
- if sev == "success":
- return success(title=title, module=module, stats=summary)
- if sev == "info":
- return tpl_info(title=title, module=module, content=summary)
- if sev == "warning":
- return warning(title=title, module=module, summary=summary, details=details)
- return alert(
- title=title, module=module,
- status=event.get("status", "未知"),
- impact=event.get("impact", "未評估"),
- summary=summary,
- actions=event.get("suggested_actions"),
- trace=event.get("trace"),
- )
-
-# ─── L1:Hermes 翻譯 ────────────────────────────────────────
-def _render_l1(event: Dict[str, Any]) -> str:
- try:
- parsed = _hermes_observe_parsed(event)
- if parsed and parsed.get("summary"):
- return report.triaged_alert(
- base_event=_event_base(event),
- tier_label="L1 · Hermes",
- ai_summary=parsed.get("summary", ""),
- ai_cause=parsed.get("probable_cause"),
- ai_actions=parsed.get("actions") or [],
- )
- except Exception as e:
- sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}")
- return _render_l0(event) + "\n\n🟡 AI 分析暫不可用,以原始資料呈現"
-
-# ─── L2:NemoTron 規劃 + 審核閘 ─────────────────────────────
-def _render_l2(event: Dict[str, Any]) -> str:
- try:
- ai_result = _nemoton_investigate(event)
- if ai_result:
- parsed = _hermes_observe_parsed(event) # 補齊摘要
- return report.triaged_alert(
- base_event=_event_base(event),
- tier_label="L2 · NemoTron",
- ai_summary=(parsed or {}).get("summary", "") or ai_result.get("summary", ""),
- ai_cause=(parsed or {}).get("probable_cause"),
- ai_actions=(parsed or {}).get("actions", []),
- ai_executed=ai_result.get("actions_taken", []),
- )
- except Exception as e:
- sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}")
- return _render_l1(event)
-
-# ─── L3:OpenClaw 策略師(週報/分析) ───────────────────────
-def _render_l3(event: Dict[str, Any]) -> str:
- """週報或 Meta-Analysis 類型交由 OpenClaw"""
- # 範例:週日週報
- if event.get("event_type") == "weekly_meta":
- return generate_weekly_strategy_report()
- return _render_l2(event)
-
-# ─── Hermes Observer(Ollama) ────────────────────────────────
-_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,將技術錯誤翻譯成人類可理解的摘要。
-
-請根據以下事件產出**繁體中文**分析,嚴格以下列 JSON 格式輸出(不要加 markdown 代碼塊、不要加說明):
-{"summary": "一句話技術根因(中文,<60 字)", "probable_cause": "最可能的原因(中文,<40 字)", "actions": ["建議動作1", "建議動作2"]}
-
-限制:
-- summary 翻譯英文錯誤為中文,去除技術 jargon
-- probable_cause 推測根因(基於 stack trace 和事件類型)
-- actions 最多 3 個,具體可執行
-- 若資訊不足,summary 填 "資訊不足"、actions 填 ["請檢查原始 trace"]
-"""
-
-def _hermes_observe_parsed(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- """呼叫 Ollama(hermes3)翻譯 stack trace,回傳結構化 dict"""
- try:
- user_prompt = (
- f"事件類型:{event.get('event_type', 'unknown')}\n"
- f"來源模組:{event.get('source', 'unknown')}\n"
- f"標題:{event.get('title', '')}\n"
- f"簡述:{event.get('summary', '')}\n"
- f"技術 trace:\n{(event.get('trace') or '')[-800:]}"
- )
- resp = requests.post(
- f"{HERMES_URL}/api/generate",
- json={
- "model": HERMES_MODEL,
- "system": _HERMES_OBSERVE_PROMPT,
- "prompt": user_prompt,
- "stream": False,
- "keep_alive": HERMES_KEEP_ALIVE,
- "options": {"temperature": 0.1, "num_predict": 300},
- },
- timeout=HERMES_TIMEOUT,
- )
- if not resp.ok:
- sys_log.warning(f"[EventRouter.L1] Hermes HTTP {resp.status_code}")
- return None
-
- raw = (resp.json().get("response") or "").strip()
- parsed = json.loads(raw) if raw.startswith("{") else None
- if not parsed or not parsed.get("summary"):
- return None
-
- return {
- "summary": str(parsed.get("summary", "")).strip(),
- "probable_cause": str(parsed.get("probable_cause") or "").strip() or None,
- "actions": [str(a).strip() for a in (parsed.get("actions") or []) if a][:5],
- }
- except Exception as e:
- sys_log.warning(f"[EventRouter.L1] Hermes 呼叫失敗,降級:{type(e).__name__}: {str(e)[:120]}")
- return None
-
-# ─── agent_actions 命名空間(模擬) ───────────────────────────
-class _AgentActions:
- SAFE_ACTIONS = {
- "trigger_price_alert": lambda **kw: {"status": "triggered"},
- "add_to_recommendation": lambda **kw: {"status": "added"},
- "flag_for_human_review": lambda **kw: {"status": "flagged"},
- "route_to_km": lambda **kw: {"status": "routed"},
- "mark_for_relearn": lambda **kw: {"status": "relearn_marked"},
- }
-agent_actions = _AgentActions()
-
-# ─── NemoTron Investigator(規則式 L2,不呼叫 NIM) ────────────
-_L2_RULES: dict[str, list] = {
- "db_connection_error": [
- ("query_km", {"query": "DNS resolve 失敗 momo-postgres"}),
- ("retry_task", {"task_name": "", "backoff_sec": 60}),
- ],
- "crawler_timeout": [
- ("silence_alert", {"duration_min": SILENCE_DURATION_MIN}),
- ("retry_task", {"task_name": "", "backoff_sec": 300}),
- ],
- "nim_quota_exhausted": [
- ("silence_alert", {"duration_min": 720}),
- ],
- "embedding_failure": [
- ("silence_alert", {"duration_min": 10}),
- ],
-}
-
-def _nemoton_investigate(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- """規則式 L2:依 event_type 查 _L2_RULES,執行安全 actions"""
- event_type = event.get("event_type", "")
- rules = _L2_RULES.get(event_type)
- if not rules:
- return None
-
- actions_taken = []
- for action_name, params in rules:
- action_fn = getattr(agent_actions.SAFE_ACTIONS.get(action_name), None)
- if not action_fn:
- continue
- p = dict(params)
- if p.get("task_name") == "":
- p["task_name"] = event.get("payload", {}).get("task_name", "") or event.get("source", "").split(".")[-1]
- if action_name == "silence_alert" and "event_key" not in p:
- p["event_key"] = f"{event.get('source', '?')}:{event_type}"
- try:
- result = action_fn(**p)
- status = result.get("status", "unknown")
- actions_taken.append(f"{action_name} → {status}")
- except Exception as e:
- actions_taken.append(f"{action_name} → error: {str(e)[:80]}")
- sys_log.error(f"[EventRouter.L2] action {action_name} 例外: {e}")
-
- summary = f"依規則 _L2_RULES[{event_type}] 執行 {len(actions_taken)} 個安全動作"
- return {"summary": summary, "actions_taken": actions_taken}
-
-# ─── 工具:構建 event 基礎 ─────────────────────────────────────
-def _event_base(event: Dict[str, Any]) -> Dict[str, Any]:
- return {
- "severity": event.get("severity", "warning"),
- "title": event.get("title", "未命名事件"),
- "module": event.get("source", "unknown"),
- "status": event.get("status"),
- "impact": event.get("impact"),
- "summary": event.get("summary", ""),
- "details": event.get("payload"),
- "trace": event.get("trace"),
- }
-
-# ─── 工具:Telegram 發送 ───────────────────────────────────────
-def _send_telegram(text: str, admin_chat_ids: Optional[list] = None) -> int:
- if not TELEGRAM_BOT_TOKEN:
- sys_log.warning("[EventRouter] TELEGRAM_BOT_TOKEN 未設定")
- return 0
-
- if admin_chat_ids is None:
- admin_chat_ids = TELEGRAM_CHAT_IDS
- if not admin_chat_ids:
- admin_chat_ids = [-1003940688311] # fallback
-
- url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
- sent = 0
- for cid in admin_chat_ids:
- try:
- r = requests.post(url, json={
- "chat_id": int(cid),
- "text": text,
- "parse_mode": "HTML",
- }, timeout=10)
- if r.ok:
- sent += 1
- except Exception as e:
- sys_log.error(f"[EventRouter] Telegram 發送失敗: {e}")
- return sent
diff --git a/services/telegram_templates.py b/services/telegram_templates.py
index 5f8cded..b1d29f5 100644
--- a/services/telegram_templates.py
+++ b/services/telegram_templates.py
@@ -1,371 +1,187 @@
-"""
-Telegram 訊息模板庫(EwoooC 統一格式規範 v2 · HTML)
+import json
+import logging
+from typing import Any, Dict, Optional
-設計原則:
-1. 純函數 — scheduler / telegram-bot / event_router 都能呼叫
-2. 六類訊息 + 三個 HITL 變體:🚨 告警 / ⚠️ 警告 / ℹ️ 資訊 / ✅ 成功 / 📊 報告 / 💰 決策 / 🛠️ Ops
-3. 使用 Telegram HTML parse_mode(相容性最好,只 escape & < >,不會有反斜線 escape 破版)
-4. 三層式結構:事件資訊 / 🤖 AI 加工區 / 🔍 原始技術細節 — 明確分隔線區隔
-5. callback_data 必用 momo: prefix(ADR-011)
-6. 訊息 >3500 chars 自動截斷
+from database.manager import get_session
+from database.telegram_models import TelegramUser
-呼叫端發送時務必使用 `parse_mode='HTML'`
-"""
+sys_log = logging.getLogger("TelegramTpl")
-from datetime import datetime
-from typing import Any
+# ─── 常數 ────────────────────────────────────────────────
-MAX_LEN = 3500
-H_DIV = "━" * 20 # 強分隔線(節與節之間)
-L_DIV = "─" * 18 # 弱分隔線(AI 區內部)
-PROJECT_TAG = "EwoooC" # 跨專案共用 bot 識別來源(ADR-011)
-CB_PREFIX = "momo:"
-PARSE_MODE = "HTML" # 統一 parse_mode
+TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
+TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS"
+# ─── 工具:取得 Token 與 Chat ID(容錯) ─────────────────
-def _ts(dt: datetime | None = None) -> str:
- return (dt or datetime.now()).strftime("%Y-%m-%d %H:%M")
+def _get_bot_token() -> Optional[str]:
+ from dotenv import load_dotenv
+ load_dotenv()
+ import os
+ return os.getenv(TELEGRAM_BOT_TOKEN_ENV)
+def _get_chat_ids() -> list:
+ token = _get_bot_token()
+ if not token:
+ sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV)
+ return []
+ raw = __import__("os").getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
+ try:
+ return json.loads(raw)
+ except json.JSONDecodeError:
+ sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV)
+ return []
-def _esc(s: Any) -> str:
- """Escape HTML 特殊字元(Telegram HTML 只認 & < >)"""
- if s is None:
- return ""
- return (str(s).replace("&", "&")
- .replace("<", "<")
- .replace(">", ">"))
+# ─── 原始發送(內部使用) ─────────────────────────────────
+def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
+ reply_markup: Optional[Dict[str, Any]] = None,
+ parse_mode: str = "HTML") -> bool:
+ import requests
+ token = _get_bot_token()
+ if not token:
+ return False
+ if chat_ids is None:
+ chat_ids = _get_chat_ids()
+ if not chat_ids:
+ chat_ids = [-1003940688311] # fallback
-def _clip(text: str) -> str:
- if len(text) <= MAX_LEN:
- return text
- return text[: MAX_LEN - 20] + "\n…(已截斷)"
+ url = f"https://api.telegram.org/bot{token}/sendMessage"
+ payload = {
+ "chat_id": chat_ids[0],
+ "text": text,
+ "parse_mode": parse_mode,
+ }
+ if reply_markup:
+ payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
+ try:
+ r = requests.post(url, json=payload, timeout=10)
+ if not r.ok:
+ sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200])
+ return False
+ return True
+ except Exception as e:
+ sys_log.error("[TelegramTpl] send 失敗: %s", e)
+ return False
+# ─── 公用模板 ─────────────────────────────────────────────
-def _tail(text: str, limit: int = 400) -> str:
- """取末段 — stack trace 根因通常在末端"""
- if len(text) <= limit:
- return text
- return "…\n" + text[-limit:]
-
-
-def _header(emoji: str, category: str, title: str, module: str,
- time: datetime | None = None) -> str:
- """統一標題區:emoji + 分類 + 標題 + 時間/模組"""
- return (
- f"{emoji} [{PROJECT_TAG} {category}] {_esc(title)}\n"
- f"🕐 {_ts(time)} 📦 {_esc(module)}\n"
- f"{H_DIV}"
- )
-
-
-def _details_block(details: dict[str, Any] | None) -> str:
- """結構化明細區塊"""
- if not details:
- return ""
- out = []
- for k, v in details.items():
- out.append(f"• {_esc(k)}:{_esc(v)}")
- return "\n".join(out)
-
-
-# =====================================================================
-# 🚨 告警(P0/P1)
-# =====================================================================
-def alert(
- title: str,
- module: str,
- status: str,
- impact: str,
- summary: str,
- actions: list[str] | None = None,
- trace: str | None = None,
- time: datetime | None = None,
-) -> str:
- parts = [_header("🚨", "告警", title, module, time)]
- parts.append(f"\n❌ 狀態:{_esc(status)}")
- parts.append(f"📍 影響:{_esc(impact)}")
- parts.append(f"💬 {_esc(summary)}")
-
+def alert(title: str, content: str, actions: Optional[list] = None) -> str:
+ """高危險警報(紅色)"""
+ msg = f"🚨 {title}\n\n{content}"
if actions:
- parts.append(f"\n🔧 建議行動")
- for a in actions:
- parts.append(f" • {_esc(a)}")
+ msg += "\n\n" + "\n".join(f"• {a}" for a in actions)
+ return msg
- if trace:
- parts.append(f"\n{H_DIV}")
- parts.append(f"🔍 原始技術細節(末段)")
- parts.append(f"{_esc(_tail(trace))}")
+def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
+ """中風險警告(橙色)"""
+ msg = f"⚠️ {title}\n\n{summary}"
+ if details:
+ msg += "\n\n細節:\n" + "\n".join(f"• {k}: {v}" for k, v in details.items())
+ return msg
- return _clip("\n".join(parts))
+def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
+ """普通信息(藍色)"""
+ t_str = f" · {time}" if time else ""
+ return f"📊 {title} [{module}]{t_str}\n\n{content}"
+def success(title: str, module: str, stats: str = "") -> str:
+ """成功通知(綠色)"""
+ return f"✅ {title} [{module}]\n{stats}"
-# =====================================================================
-# ⚠️ 警告(P2)
-# =====================================================================
-def warning(
- title: str,
- module: str,
- summary: str,
- details: dict[str, Any] | None = None,
- time: datetime | None = None,
-) -> str:
- parts = [_header("⚠️", "警告", title, module, time)]
- parts.append(f"\n📌 {_esc(summary)}")
-
- db = _details_block(details)
- if db:
- parts.append("")
- parts.append(db)
-
- return _clip("\n".join(parts))
-
-
-# =====================================================================
-# ℹ️ 資訊
-# =====================================================================
-def info(title: str, module: str, content: str, time: datetime | None = None) -> str:
- return _clip(
- f"{_header('ℹ️', '資訊', title, module, time)}\n"
- f"\n{_esc(content)}"
- )
-
-
-# =====================================================================
-# ✅ 成功
-# =====================================================================
-def success(
- title: str,
- module: str,
- stats: str | None = None,
- duration: str | None = None,
- detail: str | None = None,
- time: datetime | None = None,
-) -> str:
- parts = [_header("✅", "完成", title, module, time)]
- if stats:
- parts.append(f"\n📊 {_esc(stats)}")
- if duration:
- parts.append(f"⏱️ 耗時:{_esc(duration)}")
- if detail:
- parts.append(f"\n{_esc(detail)}")
- return _clip("\n".join(parts))
-
-
-# =====================================================================
-# 📊 報告(日報 / 週報 / Meta-Analysis)
-# =====================================================================
-def report(
- title: str,
- report_type: str,
- period: str,
- content_md: str,
- citations: str | None = None,
- time: datetime | None = None,
-) -> str:
- """
- content_md 保留原始 Markdown(Gemini 輸出),但會把 `*` `_` `[]` 轉成 HTML 等價。
- - **粗體** → 粗體
- - *斜體* → 斜體
- - 其他純文本 escape HTML
- """
- # 簡化:只做最基本的 & < > escape,讓 Gemini 原生文字可讀即可
- content_html = _esc(content_md)
-
- parts = [
- f"📊 [{PROJECT_TAG} {_esc(report_type)}] {_esc(title)}",
- f"🕐 {_ts(time)} 🗓️ {_esc(period)}",
- H_DIV,
- "",
- content_html,
- ]
- if citations:
- parts += ["", H_DIV, f"📚 {_esc(citations)}"]
- return _clip("\n".join(parts))
-
-
-# =====================================================================
-# 🤖 Triaged Alert — L1/L2 AI 加工訊息(ADR-012 §④ 三層式)
-# =====================================================================
-def triaged_alert(
- base_event: dict,
- tier_label: str, # "L1 · Hermes" / "L2 · NemoTron"
- ai_summary: str, # Hermes 翻譯
- ai_cause: str | None = None, # 可能根因
- ai_actions: list[str] | None = None, # 建議動作
- ai_executed: list[str] | None = None, # L2 已執行的 action(如 retry_task → scheduled)
-) -> str:
- """
- 三層式訊息:
- [事件資訊] → [🤖 AI 加工區] → [🔍 原始技術細節]
- base_event 欄位:title, module, status, impact, summary, details, trace
- """
- sev = base_event.get("severity", "warning")
- emoji = "🚨" if sev == "alert" else "⚠️"
- category = "告警" if sev == "alert" else "警告"
-
- parts = [_header(emoji, category, base_event.get("title", ""),
- base_event.get("module", "unknown"))]
-
- # Section 1: 事件資訊
- if base_event.get("status"):
- parts.append(f"\n❌ 狀態:{_esc(base_event['status'])}")
- if base_event.get("impact"):
- parts.append(f"📍 影響:{_esc(base_event['impact'])}")
- if base_event.get("summary"):
- parts.append(f"💬 {_esc(base_event['summary'])}")
- db = _details_block(base_event.get("details"))
- if db:
- parts.append("")
- parts.append(db)
-
- # Section 2: AI 加工區(明顯分隔)
- parts.append(f"\n{H_DIV}")
- parts.append(f"🤖 AI 分析({_esc(tier_label)})")
- parts.append("")
- parts.append(f"📝 技術根因翻譯")
- parts.append(_esc(ai_summary))
- if ai_cause:
- parts.append("")
- parts.append(f"🔎 可能原因")
- parts.append(_esc(ai_cause))
- if ai_actions:
- parts.append("")
- parts.append(f"🔧 建議動作")
- for i, a in enumerate(ai_actions[:5], 1):
- parts.append(f" {i}. {_esc(a)}")
- if ai_executed:
- parts.append("")
- parts.append(f"⚡ AI 已自動執行")
- for a in ai_executed:
- parts.append(f" • {_esc(a)}")
-
- # Section 3: 原始技術細節(可選)
- trace = base_event.get("trace")
- if trace:
- parts.append(f"\n{H_DIV}")
- parts.append(f"🔍 原始技術細節(末段)")
- parts.append(f"{_esc(_tail(trace))}")
-
- return _clip("\n".join(parts))
-
-
-# =====================================================================
-# 💰 降價決策請求(P2/P3)
-# =====================================================================
def price_decision(
product_name: str,
product_sku: str,
current_price: float,
suggested_price: float,
reason: str,
- insight_id: int,
- report_url: str | None = None,
- time: datetime | None = None,
-) -> tuple[str, dict]:
- drop_pct = (current_price - suggested_price) / current_price * 100 if current_price > 0 else 0
- text = "\n".join([
- f"💰 [{PROJECT_TAG} 決策請求] 降價建議",
- f"🕐 {_ts(time)} 📦 OpenClaw Strategist",
- H_DIV,
- "",
- f"🏷️ 商品:{_esc(product_name)}",
- f"📦 貨號:{_esc(product_sku or 'N/A')}",
- f"💵 現價:${current_price:,.0f}",
- f"📉 建議降至:${suggested_price:,.0f}(↓{drop_pct:.1f}%)",
- "",
- f"🤖 AI 理由",
- _esc(reason),
- ])
- keyboard = {
- "inline_keyboard": [[
- {"text": "✅ 批准降價", "callback_data": f"{CB_PREFIX}pa:{insight_id}"},
- {"text": "❌ 拒絕", "callback_data": f"{CB_PREFIX}pr:{insight_id}"},
- ]]
- }
- if report_url:
- keyboard["inline_keyboard"].append([{"text": "🔗 查看報表", "url": report_url}])
- return _clip(text), keyboard
+ insight_id: Optional[int] = None,
+) -> tuple:
+ """
+ 降價決策通知(含 Inline Keyboard)。
+ 回傳 (message_text, reply_markup)
+ """
+ diff = current_price - suggested_price
+ if diff > 0:
+ action_text = f"降價 ${diff:,.0f}"
+ elif diff < 0:
+ action_text = f"提價 ${-diff:,.0f}"
+ else:
+ action_text = "維持"
-
-def decision_result(
- original_text: str,
- decision: str, # "approve" or "reject"
- operator: str,
- note: str | None = None,
-) -> str:
- emoji = "✅" if decision == "approve" else "❌"
- label = "已批准降價" if decision == "approve" else "已拒絕降價"
- footer = [
- "",
- H_DIV,
- f"{emoji} {label}",
- f"👤 操作人:{_esc(operator)}",
- f"🕐 {_ts()}",
- ]
- if note:
- footer.append(f"📝 {_esc(note)}")
- return _clip(original_text + "\n".join(footer))
-
-
-# =====================================================================
-# 🛠️ L3 Ops Action Request(Phase 4 HITL)
-# =====================================================================
-def ops_action_request(
- task_name: str,
- title: str,
- reason: str,
- context: dict | None = None,
- time: datetime | None = None,
-) -> tuple[str, dict]:
- parts = [
- f"🛠️ [{PROJECT_TAG} 運維決策] {_esc(title)}",
- f"🕐 {_ts(time)} 📦 {_esc(task_name)}",
- H_DIV,
- "",
- f"💬 {_esc(reason)}",
- ]
- if context:
- parts.append("")
- parts.append(_details_block(context))
- parts += ["", "👉 請選擇動作"]
+ message = (
+ f"💰 自動降價建議\n"
+ f"商品:{product_name} (SKU: {product_sku})\n"
+ f"現價:${current_price:,.0f} → 建議:${suggested_price:,.0f}\n"
+ f"原因:{reason}\n"
+ )
+ if insight_id:
+ message += f"洞察 ID:{insight_id}\n"
keyboard = {
"inline_keyboard": [
[
- {"text": "⏸️ 暫停 1h", "callback_data": f"{CB_PREFIX}ops:pause1h:{task_name}"},
- {"text": "⏸️ 暫停 6h", "callback_data": f"{CB_PREFIX}ops:pause6h:{task_name}"},
+ {"text": "✅ 確認執行", "callback_data": f"price_decision:approve:{product_sku}"},
+ {"text": "❌ 拒絕", "callback_data": f"price_decision:reject:{product_sku}"},
],
[
- {"text": "⚡ 立即重試", "callback_data": f"{CB_PREFIX}ops:retry:{task_name}"},
- {"text": "▶️ 解除暫停", "callback_data": f"{CB_PREFIX}ops:resume:{task_name}"},
+ {"text": "📊 查看洞察", "url": f"https://your-dashboard.example/insight/{insight_id}" if insight_id else "#"},
],
]
}
- return _clip("\n".join(parts)), keyboard
+ return message, keyboard
-
-def ops_action_result(
- original_text: str,
- action: str,
- operator: str,
- result: dict,
+def triaged_alert(
+ base_event: Dict[str, Any],
+ tier_label: str,
+ ai_summary: str,
+ ai_cause: Optional[str] = None,
+ ai_actions: Optional[list] = None,
+ ai_executed: Optional[list] = None,
) -> str:
- emoji_map = {"pause1h": "⏸️", "pause6h": "⏸️", "retry": "⚡", "resume": "▶️"}
- label_map = {"pause1h": "已暫停 1 小時", "pause6h": "已暫停 6 小時",
- "retry": "已立即重試", "resume": "已解除暫停"}
- emoji = emoji_map.get(action, "🛠️")
- label = label_map.get(action, action)
- status = result.get("status", "unknown")
- footer = [
- "",
- H_DIV,
- f"{emoji} {label}(狀態:{_esc(status)})",
- f"👤 操作人:{_esc(operator)}",
- f"🕐 {_ts()}",
- ]
- if status == "rejected":
- footer.append(f"⚠️ 拒絕原因:{_esc(result.get('reason', ''))}")
- elif status == "deferred":
- footer.append(f"ℹ️ {_esc(result.get('note', ''))}")
- return _clip(original_text + "\n".join(footer))
+ """
+ L1/L2 整合通知(帶 AI 摘要與可執行動作)。
+ """
+ msg = (
+ f"⚡ {tier_label} · {base_event.get('event_type', 'alert')}\n"
+ f"📌 {base_event.get('title')}\n\n"
+ )
+ summary = base_event.get("summary", "")
+ if summary:
+ msg += f"🔍 概要:{summary}\n\n"
+ if ai_summary:
+ msg += f"🧠 AI 摘要:{ai_summary}\n\n"
+ if ai_cause:
+ msg += f"💡 可能原因:{ai_cause}\n\n"
+ if ai_actions:
+ msg += "📋 建議行動:\n" + "\n".join(f"• {a}" for a in ai_actions) + "\n\n"
+ if ai_executed:
+ msg += "✅ 已執行:\n" + "\n".join(f"• {a}" for a in ai_executed) + "\n\n"
+
+ trace = base_event.get("trace")
+ if trace:
+ msg += f"{trace[-500:]}"
+
+ keyboard = {
+ "inline_keyboard": [
+ [{"text": "📊 查看详情", "url": f"https://dashboard.example/event/{base_event.get('id')}"}],
+ [{"text": "🛑 忽略此事件", "callback_data": f"event_ignore:{base_event.get('id')}"}],
+ ]
+ }
+ return msg, keyboard
+
+def report(title: str, report_type: str, period: str, content_md: str) -> str:
+ """策略/週報模板"""
+ return (
+ f"📊 {title} ({report_type})\n"
+ f"期間:{period}\n\n"
+ f"{content_md}"
+ )
+
+def success(title: str, module: str, stats: str = "") -> str:
+ """成功通知(綠色)"""
+ return f"✅ {title} [{module}]\n{stats}"
+
+def _send_telegram(msg: str, chat_ids: Optional[list] = None,
+ reply_markup: Optional[Dict[str, Any]] = None) -> bool:
+ return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)