Files
ewoooc/database/autoheal_models.py
ogt (aider) c73b430566 services/ai_orchestrator.py
```
import logging
from typing import Any, Dict, Optional

from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher
from services.openclaw_strategist_service import OpenClawStrategist
from services.telegram_templates import alert
from database.manager import get_session
from database.autoheal_models import AgentContext, ActionPlan, ActionOutcome

logger = logging.getLogger(__name__)

class AIOrchestrator:
    """
    協調中樞:負責 EventRouter 的 L1/L2 處理、Agent 共享上下文與閉環決策追蹤。
    這是新增的核心模組,將逐步替換硬編碼鏈。
    """

    def __init__(self):
        self.hermes = HermesAnalystService()
        self.nemotron = NemotronDispatcher()
        self.openclaw = OpenClawStrategist()
        self._retry_config = {"max_attempts": 3, "backoff_factor": 1.5}

    async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
        """
        L1:語意翻譯 + 原因分析(由 Hermes 提供)。
        結果會寫入 agent_context,並可作為 L2 的上下文。
        """
        ctx = await self._get_context(session_id)
        result = await self._call_with_retry(self.hermes.handle_l1, event, session_id)
        await self._save_context(session_id, "hermes", result)
        return result

    async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
        """
        L2:規劃 + 審核閘。
        輸入包含 L1 分析結果(若可用),產出 ActionPlan 等待批准。
        """
        ctx = await self._get_context(session_id)  # 包含 hermes 分析
        result = await self._call_with_retry(self.nemotron.handle_l2, event, session_id)
        await self._save_action_plan(result)
        # 審核閘由 routes/bot_api_routes 透過 callback 處理
        return result

    async def handle_l3(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
        """
        L3:策略師介入(週報 / 複雜重分析)。
        """
        ctx = await self._get_context(session_id)
        return await self.openclaw.handle_l3(event, ctx)

    async def _call_with_retry(self, func, *args, **kwargs):
        """
        簡易重試機制,避免瞬間網路錯誤導致中斷。
        """
        attempt = 0
        while True:
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                attempt += 1
                if attempt > self._retry_config["max_attempts"]:
                    logger.error(f"[AIOrchestrator] 重試超過上限,最後一次錯誤: {e}")
                    raise
                backoff = self._retry_config["backoff_factor"] ** attempt
                logger.warning(f"[AIOrchestrator] 第 {attempt} 次重試,延遲 {backoff:.1f}s: {e}")
                await asyncio.sleep(backoff)

    async def _get_context(self, session_id: str) -> Dict[str, Any]:
        """
        讀取共享上下文(按 session_id + agent),若不存在則返回空。
        """
        import asyncio
        session = get_session()
        try:
            rows = session.execute(
                "SELECT context_key, context_val FROM agent_context WHERE session_id = :sid",
                {"sid": session_id},
            ).fetchall()
            out: Dict[str, Any] = {}
            for r in rows:
                out[r[0]] = r[1]
            return out
        finally:
            session.close()

    async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None:
        import asyncio
        session = get_session()
        try:
            # 刪除舊 key(保留 TTL 邏輯在應用層)
            session.execute(
                "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag",
                {"sid": session_id, "ag": agent},
            )
            session.execute(
                """
                INSERT INTO agent_context
                    (session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
                VALUES
                    (:sid, :ag, :ck, :cv, NOW(), 60)
                """,
                {
                    "sid": session_id,
                    "ag": agent,
                    "ck": "latest",
                    "cv": payload,
                },
            )
            session.commit()
            logger.debug(f"[AIOrchestrator] 已保存上下文 session={session_id} agent={agent}")
        except Exception as e:
            session.rollback()
            logger.error(f"[AIOrchestrator] save_context 失敗: {e}")
            raise
        finally:
            session.close()

    async def _save_action_plan(self, plan: Dict[str, Any]) -> None:
        import asyncio
        session = get_session()
        try:
            # 簡化:payload 直接存 JSON 字串
            session.execute(
                """
                INSERT INTO action_plans
                    (session_id, plan_type, sku, payload, status, created_by)
                VALUES
                    (:sid, :pt, :sku, :pl, 'pending', 'nemotron')
                """,
                {
                    "sid": plan.get("session_id"),
                    "pt": plan.get("plan_type"),
                    "sku": plan.get("sku"),
                    "pl": plan,
                },
            )
            session.commit()
            logger.info(f"[AIOrchestrator] 已建立 ActionPlan plan_type={plan.get('plan_type')} sku={plan.get('sku')}")
        except Exception as e:
            session.rollback()
            logger.error(f"[AIOrchestrator] save_action_plan 失敗: {e}")
            raise
        finally:
            session.close()

    async def record_outcome(self, plan_id: int, verdict: str, metrics: Dict[str, Any]) -> None:
        """
        記錄決策後果,並觸發策略權重更新(OpenClaw 學習)。
        """
        import asyncio
        session = get_session()
        try:
            session.execute(
                """
                INSERT INTO action_outcomes
                    (plan_id, metric_type, before_val, after_val, measured_at, verdict)
                VALUES
                    (:pid, :mt, :bv, :av, NOW(), :vc)
                """,
                {
                    "pid": plan_id,
                    "mt": metrics.get("metric_type"),
                    "bv": metrics.get("before_val"),
                    "av": metrics.get("after_val"),
                    "vc": verdict,
                },
            )
            # 簡化:直接呼叫學習服務(可替換為隊列)
            await self.openclaw.absorb_outcome(metrics, verdict)
            session.commit()
            logger.info(f"[AIOrchestrator] 已記錄 outcome plan_id={plan_id} verdict={verdict}")
        except Exception as e:
            session.rollback()
            logger.error(f"[AIOrchestrator] record_outcome 失敗: {e}")
            raise
        finally:
            session.close()
```

services/event_router.py
```
import logging
from typing import Any, Dict, Optional

from services.ai_orchestrator import AIOrchestrator
from services.telegram_templates import alert
from database.manager import get_session

logger = logging.getLogger(__name__)

async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
    """
    L1:語意翻譯 + 原因分析(由 Hermes 提供)。
    """
    orchestrator = AIOrchestrator()
    return await orchestrator.handle_l1(event, session_id)

async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
    """
    L2:規劃 + 審核閘。
    產出 ActionPlan 等待批准(Telegram 回調處理)。
    """
    orchestrator = AIOrchestrator()
    return await orchestrator.handle_l2(event, session_id)

async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
    """L0:直接回傳原始事件(兼容與監控)"""
    return {"status": "ok", "echo": event.get("event_type")}

async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
    """
    事件路由主入口(與 routes/bot_api_routes 兼容)。
    輸出格式與 dispatch_v1 保持一致,以便平滑切換。
    """
    tier = _classify(event)
    session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}"

    try:
        if tier == "L0":
            result = await _handle_l0(event)
        elif tier == "L1":
            result = await _handle_l1(event, session_id)
        elif tier == "L2":
            result = await _handle_l2(event, session_id)
        else:
            result = await _handle_l0(event)

        # 保留舊版回傳格式
        return {
            "tier": tier,
            "sent": 1,
            "errors": [],
            "latency_ms": 0,
            "payload": result,
        }
    except Exception as e:
        logger.exception(f"[EventRouter] dispatch 失敗: {e}")
        return {
            "tier": tier,
            "sent": 0,
            "errors": [str(e)],
            "latency_ms": 0,
            "payload": None,
        }

def _classify(event: Dict[str, Any]) -> str:
    sev = event.get("severity", "info")
    has_trace = bool(event.get("trace"))
    event_type = event.get("event_type", "")

    if sev in ("info", "success"):
        return "L0"
    if sev == "warning":
        return "L1" if has_trace else "L0"
    if sev == "alert":
        if event_type in {"price_threat", "db_connection_error", "crawler_timeout",
                          "nim_quota_exhausted", "embedding_failure"}:
            return "L2"
        return "L1"
    return "L0"
```

services/telegram_templates.py
```
import json
import logging
from typing import Any, Dict, Optional

from database.manager import get_session
from database.telegram_models import TelegramUser

sys_log = logging.getLogger("TelegramTpl")

# ─── 常數 ────────────────────────────────────────────────

TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN"
TELEGRAM_CHAT_IDS_ENV  = "TELEGRAM_CHAT_IDS"

# ─── 工具:取得 Token 與 Chat ID(容錯) ─────────────────

def _get_bot_token() -> Optional[str]:
    from dotenv import load_dotenv
    load_dotenv()
    import os
    return os.getenv(TELEGRAM_BOT_TOKEN_ENV)

def _get_chat_ids() -> list:
    token = _get_bot_token()
    if not token:
        sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV)
        return []
    raw = __import__("os").getenv(TELEGRAM_CHAT_IDS_ENV, "[]")
    try:
        return json.loads(raw)
    except json.JSONDecodeError:
        sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV)
        return []

# ─── 原始發送(內部使用) ─────────────────────────────────

def _send_telegram_raw(text: str, chat_ids: Optional[list] = None,
                       reply_markup: Optional[Dict[str, Any]] = None,
                       parse_mode: str = "HTML") -> bool:
    import requests
    token = _get_bot_token()
    if not token:
        return False
    if chat_ids is None:
        chat_ids = _get_chat_ids()
    if not chat_ids:
        chat_ids = [-1003940688311]  # fallback

    url = f"https://api.telegram.org/bot{token}/sendMessage"
    payload = {
        "chat_id": chat_ids[0],
        "text": text,
        "parse_mode": parse_mode,
    }
    if reply_markup:
        payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False)
    try:
        r = requests.post(url, json=payload, timeout=10)
        if not r.ok:
            sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200])
            return False
        return True
    except Exception as e:
        sys_log.error("[TelegramTpl] send 失敗: %s", e)
        return False

# ─── 公用模板 ─────────────────────────────────────────────

def alert(title: str, content: str, actions: Optional[list] = None) -> str:
    """高危險警報(紅色)"""
    msg = f"<b>🚨 {title}</b>\n\n{content}"
    if actions:
        msg += "\n\n" + "\n".join(f"• {a}" for a in actions)
    return msg

def warning(title: str, summary: str, details: Optional[dict] = None) -> str:
    """中風險警告(橙色)"""
    msg = f"<b>⚠️ {title}</b>\n\n{summary}"
    if details:
        msg += "\n\n<b>細節:</b>\n" + "\n".join(f"• {k}: {v}" for k, v in details.items())
    return msg

def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str:
    """普通信息(藍色)"""
    t_str = f" · {time}" if time else ""
    return f"<b>📊 {title}</b> [{module}]{t_str}\n\n{content}"

def success(title: str, module: str, stats: str = "") -> str:
    """成功通知(綠色)"""
    return f"<b> {title}</b> [{module}]\n{stats}"

def price_decision(
    product_name: str,
    product_sku: str,
    current_price: float,
    suggested_price: float,
    reason: str,
    insight_id: Optional[int] = None,
) -> tuple:
    """
    降價決策通知(含 Inline Keyboard)。
    回傳 (message_text, reply_markup)
    """
    diff = current_price - suggested_price
    if diff > 0:
        action_text = f"降價 ${diff:,.0f}"
    elif diff < 0:
        action_text = f"提價 ${-diff:,.0f}"
    else:
        action_text = "維持"

    message = (
        f"<b>💰 自動降價建議</b>\n"
        f"商品:{product_name} (SKU: {product_sku})\n"
        f"現價:${current_price:,.0f} → 建議:${suggested_price:,.0f}\n"
        f"原因:{reason}\n"
    )
    if insight_id:
        message += f"洞察 ID:{insight_id}\n"

    keyboard = {
        "inline_keyboard": [
            [
                {"text": " 確認執行", "callback_data": f"price_decision:approve:{product_sku}"},
                {"text": " 拒絕", "callback_data": f"price_decision:reject:{product_sku}"},
            ],
            [
                {"text": "📊 查看洞察", "url": f"https://your-dashboard.example/insight/{insight_id}" if insight_id else "#"},
            ],
        ]
    }
    return message, keyboard

def triaged_alert(
    base_event: Dict[str, Any],
    tier_label: str,
    ai_summary: str,
    ai_cause: Optional[str] = None,
    ai_actions: Optional[list] = None,
    ai_executed: Optional[list] = None,
) -> str:
    """
    L1/L2 整合通知(帶 AI 摘要與可執行動作)。
    """
    msg = (
        f"<b> {tier_label} · {base_event.get('event_type', 'alert')}</b>\n"
        f"📌 <code>{base_event.get('title')}</code>\n\n"
    )
    summary = base_event.get("summary", "")
    if summary:
        msg += f"🔍 概要:{summary}\n\n"
    if ai_summary:
        msg += f"🧠 AI 摘要:{ai_summary}\n\n"
    if ai_cause:
        msg += f"💡 可能原因:{ai_cause}\n\n"
    if ai_actions:
        msg += "<b>📋 建議行動:</b>\n" + "\n".join(f"• {a}" for a in ai_actions) + "\n\n"
    if ai_executed:
        msg += "<b> 已執行:</b>\n" + "\n".join(f"• {a}" for a in ai_executed) + "\n\n"

    trace = base_event.get("trace")
    if trace:
        msg += f"<pre>{trace[-500:]}</pre>"

    keyboard = {
        "inline_keyboard": [
            [{"text": "📊 查看详情", "url": f"https://dashboard.example/event/{base_event.get('id')}"}],
            [{"text": "🛑 忽略此事件", "callback_data": f"event_ignore:{base_event.get('id')}"}],
        ]
    }
    return msg, keyboard

def report(title: str, report_type: str, period: str, content_md: str) -> str:
    """策略/週報模板"""
    return (
        f"<b>📊 {title}</b> ({report_type})\n"
        f"期間:{period}\n\n"
        f"{content_md}"
    )

def success(title: str, module: str, stats: str = "") -> str:
    """成功通知(綠色)"""
    return f"<b> {title}</b> [{module}]\n{stats}"

def _send_telegram(msg: str, chat_ids: Optional[list] = None,
                   reply_markup: Optional[Dict[str, Any]] = None) -> bool:
    return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)
```

database/autoheal_models.py
```
from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index
from sqlalchemy.orm import relationship
from database.models import Base
from datetime import datetime

class AgentContext(Base):
    """
    共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。
    索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。
    """
    __tablename__ = 'agent_context'

    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64), nullable=False, index=True)
    agent_name = Column(String(50), nullable=False, index=True)
    context_key = Column(String(100), nullable=False)
    context_val = Column(Text)  # JSON 字串
    created_at = Column(DateTime, default=datetime.now)
    ttl_minutes = Column(Integer, default=60)

    __table_args__ = (
        Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'),
        Index('idx_agent_context_session_ttl', 'session_id', 'created_at'),
    )

class ActionPlan(Base):
    """
    行動計畫表(NemoTron 輸出,等待審核與執行追蹤)。
    """
    __tablename__ = 'action_plans'

    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64), nullable=True)
    plan_type = Column(String(50), nullable=True)       # price_adjust / restock / campaign
    sku = Column(String(100), nullable=True, index=True)
    payload = Column(Text)                              # JSON 行動內容
    status = Column(String(20), default='pending')      # pending/approved/rejected/executed
    created_by = Column(String(50))                     # nemotron / openclaw
    approved_by = Column(String(100), nullable=True)    # Telegram user_id
    created_at = Column(DateTime, default=datetime.now)
    executed_at = Column(DateTime, nullable=True)

    __table_args__ = (
        Index('idx_action_plan_sku_status', 'sku', 'status'),
        Index('idx_action_plan_created', 'created_at'),
    )

class ActionOutcome(Base):
    """
    行動結果追蹤(閉環學習核心)。
    """
    __tablename__ = 'action_outcomes'

    id = Column(Integer, primary_key=True, autoincrement=True)
    plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False)
    metric_type = Column(String(50), nullable=True)      # sales_7d / price_rank / conversion
    before_val = Column(Float)
    after_val = Column(Float)
    measured_at = Column(DateTime)
    verdict = Column(String(20))                         # effective / neutral / backfired
    created_at = Column(DateTime, default=datetime.now)

    plan = relationship("ActionPlan", backref="outcomes")

class AgentStrategyWeights(Base):
    """
    Agent 策略權重(OpenClaw 學習累積)。
    索引:strategy_key 以便快速更新與查詢。
    """
    __tablename__ = 'agent_strategy_weights'

    id = Column(Integer, primary_key=True, autoincrement=True)
    strategy_key = Column(String(100), unique=True, nullable=False)  # e.g. price_cut_when_gap_gt_5pct
    weight = Column(Float, default=1.0)
    success_cnt = Column(Integer, default=0)
    fail_cnt = Column(Integer, default=0)
    updated_at = Column(DateTime, default=datetime.now)

    __table_args__ = (
        Index('idx_strategy_key', 'strategy_key'),
    )
```

services/watcher_agent.py
```
import logging
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict, Any

from database.manager import get_session
from services.event_router import dispatch

logger = logging.getLogger(__name__)

class WatcherAgent:
    """
    主動偵測 Agent:定期輪詢銷售快照,檢查異常並觸發 EventRouter。
    設計為輕量、無外部依賴(僅用 PostgreSQL)。
    """

    SALES_DROP_THRESHOLD = 0.20   # 銷售下滑 >20% 觸發
    PRICE_SURGE_THRESHOLD = 0.15  # 競品價格漲幅 >15% 觸發
    CACHE_TTL_MIN = 30            # 輪詻間隔

    def __init__(self):
        self.last_scan: Dict[str, float] = {}

    async def scan(self) -> int:
        """執行一次掃描,回傳觸發的異常數"""
        rows = await self._fetch_sales_snapshot()
        if not rows:
            logger.info("[Watcher] 無銷售快照,跳過掃描")
            return 0

        anomalies = self._detect_anomalies(rows)
        if not anomalies:
            logger.info("[Watcher] 未檢測到異常")
            return 0

        logger.info(f"[Watcher] 檢測到 {len(anomalies)} 筆異常,開始 dispatch")
        triggered = 0
        for an in anomalies:
            if await self._dispatch_anomaly(an):
                triggered += 1
        return triggered

    async def track_outcome(self, plan_id: int) -> None:
        """
        排程回撥:行動執行後由 DecisionTracker 調用,評估效果並更新策略。
        這裡保留接口供未來擴充。
        """
        logger.info(f"[Watcher] 行動效果回撥 plan_id={plan_id}(待實現)")

    # ── 內部方法 ────────────────────────────────────────────────

    async def _fetch_sales_snapshot(self) -> List[Dict[str, Any]]:
        """
        讀取銷售快照。欄位依實際 DB 調整。
        預期欄位:sku, name, category, sales_curr, sales_prev, price_momo, price_pchome, stock_status
        """
        session = get_session()
        try:
            sql = """
                SELECT sku, name, category,
                       COALESCE(sales_curr, 0) AS sales_curr,
                       COALESCE(sales_prev, 0) AS sales_prev,
                       price_momo, price_pchome, stock_status
                FROM daily_sales_snapshot
                WHERE snapshot_date = CURRENT_DATE - INTERVAL '1 day'
                LIMIT 500
            """
            result = session.execute(sql).fetchall()
            return [dict(row._mapping) for row in result]
        except Exception as e:
            logger.error(f"[Watcher] 無法讀取快照: {e}")
            return []
        finally:
            session.close()

    def _detect_anomalies(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        anomalies: List[Dict[str, Any]] = []
        for r in rows:
            sku = r["sku"]
            name = r["name"]
            curr = float(r["sales_curr"] or 0)
            prev = float(r["sales_prev"] or 1)
            pchome = r["price_pchome"]
            momo = r["price_momo"]
            stock = r.get("stock_status", "unknown")

            drop_pct = (curr - prev) / prev if prev else 0.0
            price_gap_pct = ((momo - pchome) / pchome * 100) if pchome else 0.0

            reasons: List[str] = []

            # 銷量下滑異常
            if drop_pct <= -self.SALES_DROP_THRESHOLD:
                reasons.append(
                    f"銷量下滑 {drop_pct:+.1%}(閾值 {self.SALES_DROP_THRESHOLD:+.0%})"
                )

            # 競品價格突漲(若我方價格低且差距擴大)
            if price_gap_pct > self.PRICE_SURGE_THRESHOLD:
                reasons.append(
                    f"競品價格突漲 {price_gap_pct:+.1f}% 形成高價差"
                )

            # 庫存危機
            if stock in ("out_of_stock", "low_stock"):
                reasons.append(f"庫存狀態: {stock}")

            if not reasons:
                continue

            anomalies.append({
                "sku": sku,
                "name": name,
                "category": r.get("category", ""),
                "drop_pct": drop_pct,
                "price_gap_pct": price_gap_pct,
                "reasons": reasons,
                "stock": stock,
                "momo_price": momo,
                "pchome_price": pchome,
            })
        return anomalies

    async def _dispatch_anomaly(self, anom: Dict[str, Any]) -> bool:
        """
        依異常類型決定路由:
          - 銷量下滑 + 價差微小 → L1(分析原因)
          - 銷量下滑 + 價差大      → L2(規劃 + 審核)
          - 競品價格突漲          → L2(防範被動)
        """
        drop = anom["drop_pct"]
        gap = anom["price_gap_pct"]
        sku = anom["sku"]
        name = anom["name"]
        session_id = self._ensure_session(sku)

        event = {
            "source": "watcher",
            "event_type": "sales_anomaly",
            "severity": "alert",
            "title": f"銷售異常偵測 — {sku} {name}",
            "summary": "; ".join(anom["reasons"]),
            "payload": {
                "sku": sku,
                "name": name,
                "category": anom["category"],
                "drop_pct": anom["drop_pct"],
                "price_gap_pct": anom["price_gap_pct"],
                "stock": anom["stock"],
                "momo_price": anom["momo_price"],
                "pchome_price": anom["pchome_price"],
                "sales_prev": anom.get("sales_prev"),
                "sales_curr": anom.get("sales_curr"),
            },
            "impact": "銷量下滑可能導致收入損失",
            "status": "open",
        }

        # 決策路由
        if drop <= -self.SALES_DROP_THRESHOLD and abs(gap) < self.PRICE_SURGE_THRESHOLD:
            # 銷量下滑但價差微小 → 檢查是否非價格因素(缺貨/流量)
            event["payload"]["non_price_factor"] = True
            return await self._route_l1(event, session_id)
        else:
            return await self._route_l2(event, session_id)

    async def _route_l1(self, event: Dict[str, Any], session_id: str) -> bool:
        """L1:Hermes 分析下滑原因"""
        try:
            orchestrator = AIOrchestrator()
            result = await orchestrator.handle_l1(event, session_id)
            logger.info(f"[Watcher] L1 dispatch success for {event['payload']['sku']}")
            await self._save_context(session_id, "hermes", {
                "summary": result.get("summary"),
                "probable_cause": result.get("probable_cause"),
                "actions": result.get("actions", []),
            })
            return True
        except Exception as e:
            logger.error(f"[Watcher] L1 dispatch failed: {e}")
            await self._fallback_notify(event)
            return False

    async def _route_l2(self, event: Dict[str, Any], session_id: str) -> bool:
        """L2:NemoTron 規劃 + 審核閘"""
        try:
            orchestrator = AIOrchestrator()
            result = await orchestrator.handle_l2(event, session_id)
            logger.info(f"[Watcher] L2 dispatch success for {event['payload']['sku']}")
            await self._save_context(session_id, "nemotron", {
                "plan": result.get("plan"),
                "actions_taken": result.get("actions_taken", []),
            })
            await self._save_action_plan(event, result.get("plan"))
            return True
        except Exception as e:
            logger.error(f"[Watcher] L2 dispatch failed: {e}")
            await self._fallback_notify(event)
            return False

    async def _fallback_notify(self, event: Dict[str, Any]) -> None:
        """當 AI 失敗時,直接通知並記錄原因"""
        sku = event["payload"]["sku"]
        name = event["payload"]["name"]
        text = (
            f"⚠️ [Watcher Fallback] {sku} {name}\n"
            f"原因:{event['summary']}\n"
            f"建議:立即人工檢查銷售與庫存狀態。"
        )
        await self._notify_telegram(text)

    async def _notify_telegram(self, text: str) -> bool:
        """透過 Telegram 發送訊息"""
        from services.telegram_templates import alert as render_alert
        bot_token = "TELEGRAM_BOT_TOKEN_PLACEHOLDER"  # 實際由環境注入
        if not bot_token:
            logger.warning("[Watcher] TELEGRAM_BOT_TOKEN 未設定")
            return False
        chat_ids = []  # 實際由環境注入
        url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
        payload = {
            "chat_id": chat_ids[0] if chat_ids else -1003940688311,
            "text": render_alert(title="銷售異常通知", content=text),
            "parse_mode": "HTML",
        }
        try:
            r = requests.post(url, json=payload, timeout=10)
            return r.ok
        except Exception as e:
            logger.error(f"[Watcher] Telegram 通知失敗: {e}")
            return False

    def _ensure_session(self, sku: str) -> str:
        """保證 session_id 存在(skuid 作為 session)"""
        return f"session:{sku}"

    async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None:
        """寫入 agent_context(共享記憶)"""
        session = get_session()
        try:
            session.execute(
                "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag",
                {"sid": session_id, "ag": agent},
            )
            session.execute(
                """
                INSERT INTO agent_context
                    (session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
                VALUES
                    (:sid, :ag, :ck, :cv, NOW(), :ttl)
                """,
                {
                    "sid": session_id,
                    "ag": agent,
                    "ck": "latest",
                    "cv": data,
                    "ttl": self.CACHE_TTL_MIN * 2,
                },
            )
            session.commit()
            logger.debug(f"[Watcher] 已保存 context session={session_id} agent={agent}")
        except Exception as e:
            session.rollback()
            logger.warning(f"[Watcher] 寫入 context 失敗: {e}")
        finally:
            session.close()

    async def _save_action_plan(self, event: Dict[str, Any], plan: Optional[Dict[str, Any]]) -> None:
        """將 NemoTron 的 plan 寫入 action_plans"""
        if not plan:
            return
        session = get_session()
        try:
            sku = event["payload"]["sku"]
            session.execute(
                """
                INSERT INTO action_plans
                    (session_id, plan_type, sku, payload, status, created_by)
                VALUES
                    (:sid, :pt, :sku, :pl, 'pending', 'nemotron')
                """,
                {
                    "sid": plan.get("session_id"),
                    "pt": plan.get("plan_type"),
                    "sku": sku,
                    "pl": plan,
                },
            )
            session.commit()
            logger.info(f"[Watcher] 已建立 ActionPlan plan_type={plan.get('plan_type')} sku={plan.get('sku')}")
        except Exception as e:
            session.rollback()
            logger.warning(f"[Watcher] 寫入 action_plan 失敗: {e}")
        finally:
            session.close()
```

services/decision_tracker.py
```
import logging
from datetime import datetime, timedelta
from typing import Dict, Any

from database.manager import get_session
from services.openclaw_learning_service import store_insight

logger = logging.getLogger(__name__)

class DecisionTracker:
    """
    閉環學習與效果追蹤:
      - 為每條 ActionPlan 排定 outcome 量測(7天後)
      - 量測後記錄 verdict,並觸發 OpenClaw 學習與策略權重更新
    """

    OUTCOME_WINDOW_DAYS = 7

    async def schedule_follow_up(self, plan_id: int, sku: str, metric: str = "sales_7d") -> None:
        """排程在 window 後回來量測"""
        logger.info(f"[DecisionTracker] 排程 outcome 追蹤 plan_id={plan_id} sku={sku} metric={metric}")

    async def measure_and_learn(self, plan_id: int) -> None:
        """
        量測 ActionPlan 的效果並回饋學習。
        由 scheduled job 每隔一定時間呼叫。
        """
        session = get_session()
        try:
            plan = session.query(ActionPlan).get(plan_id)
            if not plan or plan.status not in ("approved", "executed"):
                return

            before_val, after_val, metric_type = self._measure_outcome(plan)
            verdict = self._judge_verdict(before_val, after_val)

            await self._record_outcome(plan_id, metric_type, before_val, after_val, verdict)

            metrics = {
                "metric_type": metric_type,
                "before_val": before_val,
                "after_val": after_val,
            }
            await store_insight(
                insight_type="auto_heal_playbook",
                period=datetime.now().strftime("%Y-%m-%d"),
                content=f"[效果追蹤] plan_id={plan_id} sku={plan.sku} before={before_val} after={after_val} verdict={verdict}",
                metadata={"verdict": verdict, "plan_type": plan.plan_type},
                ai_model="auto_heal_engine_v1",
            )
            await self._update_strategy_weights(metrics, verdict)
        except Exception as e:
            logger.error(f"[DecisionTracker] measure_and_learn 失敗: {e}")
        finally:
            session.close()

    def _measure_outcome(self, plan: ActionPlan) -> tuple:
        """
        模擬量測:實際應用中連接銷售/庫存系統。
        返回 (before, after, metric_type)
        """
        if plan.plan_type == "price_adjust":
            return 100.0, 130.0, "sales_7d"
        return 0.0, 0.0, "unknown"

    def _judge_verdict(self, before: float, after: float) -> str:
        if after <= 0:
            return "neutral"
        ratio = (after - before) / before
        if ratio >= 0.2:
            return "effective"
        if ratio <= -0.1:
            return "backfired"
        return "neutral"

    async def _record_outcome(self, plan_id: int, metric_type: str,
                              before_val: float, after_val: float, verdict: str) -> None:
        session = get_session()
        try:
            session.execute(
                """
                INSERT INTO action_outcomes
                    (plan_id, metric_type, before_val, after_val, measured_at, verdict)
                VALUES
                    (:pid, :mt, :bv, :av, NOW(), :vc)
                """,
                {
                    "pid": plan_id,
                    "mt": metric_type,
                    "bv": before_val,
                    "av": after_val,
                    "vc": verdict,
                },
            )
            session.commit()
        except Exception as e:
            session.rollback()
            logger.error(f"[DecisionTracker] 記錄 outcome 失敗: {e}")
            raise
        finally:
            session.close()

    async def _update_strategy_weights(self, metrics: Dict[str, Any], verdict: str) -> None:
        """
        根據 outcome 更新策略權重(OpenClaw 學習)。
        簡化:effective +1,backfired -1。
        """
        session = get_session()
        try:
            key = f"{metrics.get('metric_type')}_{metrics.get('plan_type', 'default')}"
            if verdict == "effective":
                session.execute(
                    """
                    UPDATE agent_strategy_weights
                       SET success_cnt = success_cnt + 1,
                           weight = weight + 0.1,
                           updated_at = NOW()
                     WHERE strategy_key = :k
                    """,
                    {"k": key},
                )
            elif verdict == "backfired":
                session.execute(
                    """
                    UPDATE agent_strategy_weights
                       SET fail_cnt = fail_cnt + 1,
                           weight = GREATEST(weight - 0.2, 0.0),
                           updated_at = NOW()
                     WHERE strategy_key = :k
                    """,
                    {"k": key},
                )
            # neutral 不更新權重
            session.commit()
        except Exception as e:
            session.rollback()
            logger.warning(f"[DecisionTracker] 更新策略權重失敗: {e}")
        finally:
            session.close()
```

services/openclaw_learning_service.py
```
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional

from database.manager import get_session
from database.autoheal_models import AIInsight

sys_log = logging.getLogger(__name__)

def build_rag_context_by_date(start_date: str, end_date: str) -> str:
    """
    依日期區間拉取 ai_insights,用於週報 RAG。
    """
    session = get_session()
    try:
        rows = session.execute(
            "SELECT insight_type, period, content FROM ai_insights "
            "WHERE DATE(created_at) BETWEEN :s AND :e "
            "ORDER BY created_at ASC",
            {"s": start_date, "e": end_date},
        ).fetchall()
        if not rows:
            return ""
        parts = [f"[{r[1]}] {r[0]}: {r[2]}" for r in rows]
        return "\n\n---\n\n".join(parts)
    except Exception as e:
        sys_log.error(f"[OCLearn] build_rag_context_by_date 失敗: {e}")
        return ""
    finally:
        session.close()

def store_insight(
    insight_type: str,
    content: str,
    period: Optional[str] = None,
    product_sku: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None,
    ai_model: Optional[str] = None,
) -> Optional[int]:
    """
    雙寫:寫入 ai_insights + 排程 embedding(由 embedding_retry_queue 供 worker 處理)。
    """
    session = get_session()
    try:
        meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None
        rec = AIInsight(
            insight_type=insight_type,
            period=period,
            product_sku=product_sku,
            content=content,
            metadata_json=meta_str,
            created_at=datetime.now(),
            updated_at=datetime.now(),
        )
        session.add(rec)
        session.commit()
        session.refresh(rec)

        # 排程 embedding(持久化,由 background worker 消费)
        _enqueue_embedding_for_insight(rec, ai_model or "bge-m3")

        return rec.id
    except Exception as e:
        session.rollback()
        sys_log.error(f"[OCLearn] store_insight 失敗: {e}")
        return None
    finally:
        session.close()

def _enqueue_embedding_for_insight(insight: AIInsight, model: str) -> bool:
    """
    將洞察文本推入 embedding_retry_queue,供 background worker 批量向量化。
    """
    session = get_session()
    try:
        session.execute(
            """
            INSERT INTO embedding_retry_queue
                (target_table, target_id, text_content, model, status, created_at)
            VALUES
                (:t, :i, :txt, :m, 'pending', :now)
            """,
            {
                "t": "ai_insights",
                "i": insight.id,
                "txt": f"{insight.insight_type} ({insight.period or ''}): {insight.content}",
                "m": model,
                "now": datetime.now(),
            },
        )
        session.commit()
        return True
    except Exception as e:
        session.rollback()
        sys_log.warning(f"[OCLearn] enqueue embedding 失敗: {e}")
        return False
    finally:
        session.close()
```

database/autoheal_models.py
```
from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index, Float
from sqlalchemy.orm import relationship
from database.models import Base
from datetime import datetime

class AgentContext(Base):
    """
    共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。
    索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。
    """
    __tablename__ = 'agent_context'

    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64), nullable=False, index=True)
    agent_name = Column(String(50), nullable=False, index=True)
    context_key = Column(String(100), nullable=False)
    context_val = Column(Text)  # JSON 字串
    created_at = Column(DateTime, default=datetime.now)
    ttl_minutes = Column(Integer, default=60)

    __table_args__ = (
        Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'),
        Index('idx_agent_context_session_ttl', 'session_id', 'created_at'),
    )

class ActionPlan(Base):
    """
    行動計畫表(NemoTron 輸出,等待審核與執行追蹤)。
    """
    __tablename__ = 'action_plans'

    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(64), nullable=True)
    plan_type = Column(String(50), nullable=True)       # price_adjust / restock / campaign
    sku = Column(String(100), nullable=True, index=True)
    payload = Column(Text)                              # JSON 行動內容
    status = Column(String(20), default='pending')      # pending/approved/rejected/executed
    created_by = Column(String(50))                     # nemotron / openclaw
    approved_by = Column(String(100), nullable=True)    # Telegram user_id
    created_at = Column(DateTime, default=datetime.now)
    executed_at = Column(DateTime, nullable=True)

    __table_args__ = (
        Index('idx_action_plan_sku_status', 'sku', 'status'),
        Index('idx_action_plan_created', 'created_at'),
    )

class ActionOutcome(Base):
    """
    行動結果追蹤(閉環學習核心)。
    """
    __tablename__ = 'action_outcomes'

    id = Column(Integer, primary_key=True, autoincrement=True)
    plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False)
    metric_type = Column(String(50), nullable=True)      # sales_7d / price_rank / conversion
    before_val = Column(Float)
    after_val = Column(Float)
    measured_at = Column(DateTime)
    verdict = Column(String(20))                         # effective / neutral / backfired
    created_at = Column(DateTime, default=datetime.now)

    plan = relationship("ActionPlan", backref="outcomes")

class AgentStrategyWeights(Base):
    """
    Agent 策略權重(OpenClaw 學習累積)。
    索引:strategy_key 以便快速更新與查詢。
    """
    __tablename__ = 'agent_strategy_weights'

    id = Column(Integer, primary_key=True, autoincrement=True)
    strategy_key = Column(String(100), unique=True, nullable=False)  # e.g. price_cut_when_gap_gt_5pct
    weight = Column(Float, default=1.0)
    success_cnt = Column(Integer, default=0)
    fail_cnt = Column(Integer, default=0)
    updated_at = Column(DateTime, default=datetime.now)

    __table_args__ = (
        Index('idx_strategy_key', 'strategy_key'),
    )
```

services/openclaw_strategist_service.py
```
import json
import logging
from datetime import datetime
from typing import Any, Dict, Optional

from database.manager import get_session
from services.logger_manager import SystemLogger
from services.openclaw_learning_service import build_rag_context_by_date, store_insight

sys_log = SystemLogger("OCStrategist").get_logger()

class OpenClawStrategist:
    """
    策略師(週報 / 複雜重分析)
    與 OpenClaw 學習服務(RAG + 效果回饋)整合。
    """

    def __init__(self):
        pass

    async def handle_l3(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]:
        """
        L3:策略師介入(週報 / 複雜重分析)。
        依 event_type 決行動:
          - weekly_meta: 生成週報並評估上周 ActionPlan 效果
          - meta_analysis: 執行 Meta 分析(策略權重更新)
        """
        event_type = event.get("event_type", "weekly_meta")
        if event_type == "weekly_meta":
            return await self._weekly_meta_report(event)
        return await self._meta_analysis(event)

    async def _weekly_meta_report(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """
        週報:
          1) RAG 撈取上週洞察
          2) Gemini 生成策略報告
          3) 評估 ActionPlan 效果(DecisionTracker 已排程)
          4) 回傳報告並寫入 insight(供 RAG 與人類審閱)
        """
        start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
        end_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
        rag_context = build_rag_context_by_date(start_date, end_date)

        # 模擬 Gemini 生成(實際應用調用 Gemini API)
        report = self._mock_gemini_weekly_report(rag_context, start_date, end_date)

        # 寫入 insight(雙寫)
        await store_insight(
            insight_type="weekly_meta",
            content=report,
            period=f"{start_date}~{end_date}",
            metadata={"start": start_date, "end": end_date},
        )
        return {"report": report, "period": f"{start_date}~{end_date}"}

    async def _meta_analysis(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """
        Meta 分析:評估策略權重與效果,產生優化建議。
        """
        analysis = "Meta 分析:建議提升高成功率策略權重,降低低效策略影響。"
        await store_insight(
            insight_type="meta_analysis",
            content=analysis,
            period="meta",
            metadata={},
        )
        return {"analysis": analysis}

    def _mock_gemini_weekly_report(self, rag_context: str, start: str, end: str) -> str:
        """
        模擬 Gemini 生成的週報(實際應用替換為 Gemini API 呼叫)。
        """
        return (
            f"# 【EwoooC 每周策略報告】 ({start} ~ {end})\n\n"
            f"## 一、上週業績總結\n"
            f"{rag_context if rag_context else '(無資料)'}\n\n"
            f"## 二、關鍵洞察\n"
            f"- 高危險商品已通過人審核並執行降價。\n"
            f"- 部分策略成效顯著,建議提升權重。\n\n"
            f"## 三、下週行動計畫\n"
            f"- 繼續監控價格競爭与銷量異常。\n"
            f"- 優化低效策略,並擴大高成效策略覆蓋。\n\n"
            f"## 四、決策效果回顧\n"
            f"- 近期 ActionPlan 有效率:68%(目標 75%)。\n"
            f"- 建議:加強模型訓練,縮短人審介入週期。\n\n"
            f"--\n"
            f"生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
            f"策略模型:OpenClaw Meta-Analysis v1"
        )
```
2026-04-19 21:33:43 +08:00

86 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index
from sqlalchemy.orm import relationship
from database.models import Base
from datetime import datetime
class AgentContext(Base):
"""
共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。
索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。
"""
__tablename__ = 'agent_context'
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(64), nullable=False, index=True)
agent_name = Column(String(50), nullable=False, index=True)
context_key = Column(String(100), nullable=False)
context_val = Column(Text) # JSON 字串
created_at = Column(DateTime, default=datetime.now)
ttl_minutes = Column(Integer, default=60)
__table_args__ = (
Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'),
Index('idx_agent_context_session_ttl', 'session_id', 'created_at'),
)
class ActionPlan(Base):
"""
行動計畫表NemoTron 輸出,等待審核與執行追蹤)。
"""
__tablename__ = 'action_plans'
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(64), nullable=True)
plan_type = Column(String(50), nullable=True) # price_adjust / restock / campaign
sku = Column(String(100), nullable=True, index=True)
payload = Column(Text) # JSON 行動內容
status = Column(String(20), default='pending') # pending/approved/rejected/executed
created_by = Column(String(50)) # nemotron / openclaw
approved_by = Column(String(100), nullable=True) # Telegram user_id
created_at = Column(DateTime, default=datetime.now)
executed_at = Column(DateTime, nullable=True)
__table_args__ = (
Index('idx_action_plan_sku_status', 'sku', 'status'),
Index('idx_action_plan_created', 'created_at'),
)
class ActionOutcome(Base):
"""
行動結果追蹤(閉環學習核心)。
"""
__tablename__ = 'action_outcomes'
id = Column(Integer, primary_key=True, autoincrement=True)
plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False)
metric_type = Column(String(50), nullable=True) # sales_7d / price_rank / conversion
before_val = Column(Float)
after_val = Column(Float)
measured_at = Column(DateTime)
verdict = Column(String(20)) # effective / neutral / backfired
created_at = Column(DateTime, default=datetime.now)
plan = relationship("ActionPlan", backref="outcomes")
class AgentStrategyWeights(Base):
"""
Agent 策略權重OpenClaw 學習累積)。
索引strategy_key 以便快速更新與查詢。
"""
__tablename__ = 'agent_strategy_weights'
id = Column(Integer, primary_key=True, autoincrement=True)
strategy_key = Column(String(100), unique=True, nullable=False) # e.g. price_cut_when_gap_gt_5pct
weight = Column(Float, default=1.0)
success_cnt = Column(Integer, default=0)
fail_cnt = Column(Integer, default=0)
updated_at = Column(DateTime, default=datetime.now)
__table_args__ = (
Index('idx_strategy_key', 'strategy_key'),
)