2165 lines
96 KiB
Python
2165 lines
96 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
NemoTron 行動派發器 (Module 2 — Dispatcher)
|
||
|
||
角色:派發器 (Dispatcher)
|
||
模型:NVIDIA NIM meta/llama-3.1-8b-instruct(Tool Calling 專用)
|
||
輸入:Hermes 分析師輸出的 Top N 威脅清單 (list[PriceThreat]) + 運算足跡
|
||
輸出:呼叫工具 → Telegram 語意化告警 / DB 推薦商品寫入
|
||
|
||
訊息規範:語意化 Emoji 字典 + 倒金字塔結構 + 底部運算足跡區塊
|
||
詳見 docs/AI_INTELLIGENCE_MODULE_SOT.md § 五、六
|
||
|
||
工具清單(扁平化,避免 NIM JSON 截斷):
|
||
trigger_price_alert → Telegram 競價高危險預警
|
||
add_to_recommendation → 寫入前台推薦商品 + Telegram 通知
|
||
flag_for_human_review → Telegram 人工覆核請求
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import uuid
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
import requests
|
||
from services.mcp_context_service import build_mcp_context
|
||
|
||
from config import HERMES_URL # ADR-008 集中化:禁止硬編碼 IP
|
||
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# [2026-04-18 台北] Bug-3.1 防線升級:簡體字/異體字黑名單(2次精選) — Claude Opus 4.7
|
||
# 規則:只收「繁體中文環境 100% 不會出現」的字。繁簡通用字(迎、据、准、理、
|
||
# 整、別、始、束、史、點、但、於、原、據、準、理)一律剔除,避免誤報正常繁中。
|
||
_SIMPLIFIED_BLACKLIST = frozenset(
|
||
# 頂級高頻(繁體寫法完全不同)
|
||
'参给当为来国发会说时间过从实现个这话动问题'
|
||
'经济学员务关对应听见设计电脑产东团专义价样种让'
|
||
'议统战爱态头带业无该决积权导档号风险'
|
||
# 商業 / 財務常見
|
||
'选择证龙习惯亏损营运货币场较况负责调优势'
|
||
# 書寫表達
|
||
'认识书写结历观终则'
|
||
# 異體字
|
||
'亊'
|
||
)
|
||
|
||
|
||
def _has_repeated_phrase(text: str, min_len: int = 4, min_count: int = 2) -> bool:
|
||
"""偵測連續中文片段中「min_len 字子串」重複出現 ≥ min_count 次的語意坍塌。
|
||
|
||
範例:
|
||
「当前事亊当前事亊」→ min_len=4, 「当前事亊」重複 2 次 → True
|
||
「價差擴大且銷量下滑」→ 無 4 字重複 → False
|
||
"""
|
||
chinese = ''.join(c for c in text if '\u4e00' <= c <= '\u9fff')
|
||
if len(chinese) < min_len * min_count:
|
||
return False
|
||
for i in range(len(chinese) - min_len + 1):
|
||
phrase = chinese[i:i + min_len]
|
||
if chinese.count(phrase) >= min_count:
|
||
return True
|
||
return False
|
||
|
||
|
||
def _sanitize_text(text: str, fallback: str = "請人工確認", max_len: int = 200) -> str:
|
||
"""防止 LLM 幻覺文字輸出到 Telegram。
|
||
|
||
[2026-04-18 台北] Bug-3.1 三層檢測升級 — Claude Opus 4.7
|
||
L1: 連續 ≥15 字中文且整段無標點 → 幻覺(原規則)
|
||
L2: 命中簡體字/異體字黑名單 → 簡繁污染
|
||
L3: 連續中文片段內 4+ 字子串重複 ≥2 次 → 語意坍塌
|
||
任一層命中 → fallback 替換為「請人工確認」(或呼叫端指定字串)
|
||
"""
|
||
if not text or not isinstance(text, str):
|
||
return fallback
|
||
text = text.strip()[:max_len]
|
||
|
||
# L1: 連續無標點中文區段(原規則)
|
||
if re.search(r'[\u4e00-\u9fff]{15,}', text) and not re.search(r'[,。、?!\s]', text):
|
||
logger.warning(f"[Sanitize L1] 連續無標點中文幻覺,fallback: {text[:40]}")
|
||
return fallback
|
||
|
||
# L2: 簡體字/異體字污染
|
||
hits = [c for c in text if c in _SIMPLIFIED_BLACKLIST]
|
||
if hits:
|
||
logger.warning(f"[Sanitize L2] 簡體/異體字污染 {''.join(sorted(set(hits)))},fallback: {text[:40]}")
|
||
return fallback
|
||
|
||
# L3: 短語重複(語意坍塌)
|
||
if _has_repeated_phrase(text):
|
||
logger.warning(f"[Sanitize L3] 短語重複偵測(語意坍塌),fallback: {text[:40]}")
|
||
return fallback
|
||
|
||
return text or fallback
|
||
|
||
|
||
# ── NVIDIA NIM ──────────────────────────────────────
|
||
NIM_BASE_URL = "https://integrate.api.nvidia.com/v1"
|
||
NIM_MODEL = "meta/llama-3.1-8b-instruct"
|
||
NIM_API_KEY = os.getenv("NVIDIA_API_KEY", "")
|
||
NIM_TIMEOUT = 60 # 秒
|
||
|
||
# ── 每日配額守門 ─────────────────────────────────────
|
||
NIM_DAILY_LIMIT = 80 # 留 20 個給 AWOOOI,100/天免費配額
|
||
_nim_call_count = {"date": "", "count": 0}
|
||
|
||
# ── Operation Ollama-First v5.0 / Phase 3 / A9 ──────────────────
|
||
# GCP Ollama qwen3:14b 灰度切換開關
|
||
# - true → qwen3 主路徑,NIM 降為備援,最後仍兜底 Hermes 規則引擎(ADR-004)
|
||
# - false → 緊急停用 Ollama-first 時才回 NIM-first
|
||
# 模型選擇:A2 web-research 紅綠燈報告 docs/phase0_research_report_20260503.md
|
||
# 原戰役計畫 deepseek-r1:14b 的 Ollama tool_calls chat template 缺對應 jinja
|
||
# (GitHub Issue #10935 未解),改採 qwen3:14b(Ollama 官方 + qwenlm 雙確認 tools 支援)。
|
||
# 統帥 2026-05-03 23:30 指令:「免費優先」— GCP qwen3:14b 已拉
|
||
# 預設 ON:qwen3:14b 主 → NIM 備援 → Hermes 規則引擎兜底(ADR-004)
|
||
# 緊急停用(回 NIM-first):export NEMOTRON_OLLAMA_FIRST=false
|
||
NEMOTRON_OLLAMA_FIRST = os.getenv("NEMOTRON_OLLAMA_FIRST", "true").lower() == "true"
|
||
NEMOTRON_OLLAMA_MODEL = os.getenv("NEMOTRON_OLLAMA_MODEL", "qwen3:14b")
|
||
NEMOTRON_OLLAMA_TIMEOUT = int(os.getenv("NEMOTRON_OLLAMA_TIMEOUT", "180")) # 秒
|
||
|
||
|
||
def _check_nim_quota() -> bool:
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
if _nim_call_count["date"] != today:
|
||
_nim_call_count["date"] = today
|
||
_nim_call_count["count"] = 0
|
||
if _nim_call_count["count"] >= NIM_DAILY_LIMIT:
|
||
logger.warning(f"[NIM] 今日配額已達上限 {NIM_DAILY_LIMIT},跳過")
|
||
return False
|
||
_nim_call_count["count"] += 1
|
||
return True
|
||
|
||
# ── 告警去重 (Deduplication) 快取 ────────────────────────
|
||
import time
|
||
_ALERT_CACHE = {} # {sku: timestamp}
|
||
_ALERT_TTL_SEC = 4 * 3600 # 預設 4 小時(防止同商品短時間重複告警)
|
||
|
||
def _is_duplicate_alert(sku: str) -> bool:
|
||
"""檢查是否在 TTL 內已經告警過。若是則回傳 True,否則記錄當下時間並回傳 False"""
|
||
now = time.time()
|
||
last_alert = _ALERT_CACHE.get(sku)
|
||
if last_alert and (now - last_alert) < _ALERT_TTL_SEC:
|
||
return True
|
||
_ALERT_CACHE[sku] = now
|
||
|
||
# 順便清理過期快取,避免記憶體洩漏
|
||
expired = [k for k, v in _ALERT_CACHE.items() if (now - v) >= _ALERT_TTL_SEC]
|
||
for k in expired:
|
||
del _ALERT_CACHE[k]
|
||
|
||
return False
|
||
|
||
def _nim_quota_used() -> int:
|
||
"""回傳今日已使用配額數"""
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
if _nim_call_count["date"] != today:
|
||
return 0
|
||
return _nim_call_count["count"]
|
||
|
||
|
||
# ── Tool 定義(扁平化 Schema)────────────────────────
|
||
TOOLS = [
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "trigger_price_alert",
|
||
"description": "發送競價高危險預警 Telegram 告警,當商品價格比競品貴且銷量明顯下滑時使用",
|
||
"parameters": {
|
||
"type": "object",
|
||
# [2026-04-18 台北] Bug-1 防線一:移除 momo_price / comp_price —
|
||
# 客觀售價由 Python 從 threat_map 獨裁注入,不讓 LLM 碰
|
||
# 避免 NemoTron 漏吐 → default=0 → Telegram 顯示 $0 幻覺 — Claude Opus 4.7
|
||
"properties": {
|
||
"sku": {"type": "string", "description": "商品 SKU 編號"},
|
||
"name": {"type": "string", "description": "商品名稱"},
|
||
"gap_pct": {"type": "number", "description": "我方與競品的價差百分比(正值代表我貴)"},
|
||
"sales_delta": {"type": "number", "description": "近7天銷量變動百分比(負值代表下滑)"},
|
||
"action": {"type": "string", "description": "建議行動說明"},
|
||
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
|
||
},
|
||
"required": ["sku", "name", "gap_pct", "sales_delta", "action", "confidence"],
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "add_to_recommendation",
|
||
"description": "將價格具競爭力且庫存充足的商品加入前台推薦商品區塊",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"sku": {"type": "string", "description": "商品 SKU 編號"},
|
||
"name": {"type": "string", "description": "商品名稱"},
|
||
"reason": {"type": "string", "description": "推薦原因(給後台審核人員)"},
|
||
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
|
||
},
|
||
"required": ["sku", "name", "reason", "confidence"],
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "flag_for_human_review",
|
||
"description": "當情況複雜、AI 信心不足,或需要人工決策時,發送 Telegram 請求人工覆核",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"sku": {"type": "string", "description": "商品 SKU 編號"},
|
||
"name": {"type": "string", "description": "商品名稱"},
|
||
"concern": {"type": "string", "description": "需要人工判斷的疑慮說明(含矛盾數據描述)"},
|
||
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
|
||
},
|
||
"required": ["sku", "name", "concern", "confidence"],
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "route_to_km",
|
||
"description": (
|
||
"將商品競價洞察路由到知識庫(KM)的指定領域分類,"
|
||
"供未來 RAG 查詢與 OpenClaw 週報引用。"
|
||
"適用於:數據有參考價值但不需立即告警的情況。"
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"sku": {"type": "string", "description": "商品 SKU 編號"},
|
||
"name": {"type": "string", "description": "商品名稱"},
|
||
"km_domain": {
|
||
"type": "string",
|
||
"description": (
|
||
"KM 領域分類,必須為以下之一:"
|
||
"price_competition(競價情報)、"
|
||
"sales_anomaly(銷量異常)、"
|
||
"promotion_opportunity(促銷機會)、"
|
||
"market_trend(市場趨勢)"
|
||
),
|
||
},
|
||
"summary": {"type": "string", "description": "此洞察的核心摘要(繁體中文,50 字內)"},
|
||
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
|
||
},
|
||
"required": ["sku", "name", "km_domain", "summary", "confidence"],
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "mark_for_relearn",
|
||
"description": (
|
||
"當新數據與 KM 既有洞察矛盾,或告警方向被推翻時,"
|
||
"將該商品的歷史洞察標記為需重新學習(relearn)。"
|
||
"適用於:此次分析結果與上次截然不同的情況。"
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"sku": {"type": "string", "description": "商品 SKU 編號"},
|
||
"name": {"type": "string", "description": "商品名稱"},
|
||
"reason": {"type": "string", "description": "標記原因說明(繁體中文)"},
|
||
},
|
||
"required": ["sku", "name", "reason"],
|
||
},
|
||
},
|
||
},
|
||
]
|
||
|
||
|
||
# ── 金額影響量化(B' 軌:告警必須攜帶可決策的金額數字) ──
|
||
def _compute_business_impact(threat) -> dict:
|
||
"""從 PriceThreat 計算「過去 7 日營收流失」與「建議調價金額」。
|
||
|
||
回傳純 Python 客觀計算結果,由 dispatcher 強制注入告警 — LLM 不得碰觸這些數字。
|
||
|
||
revenue_loss_7d
|
||
= max(0, sales_7d_prev_amount - sales_7d_curr_amount) **僅在 gap_pct > 0 時**
|
||
語意:「我方比競品貴,且過去 7 日銷量金額下滑 → 推估價格因素導致的流失」
|
||
若 gap_pct ≤ 0(我方已便宜或持平)即使銷量下滑亦歸 0,避免把
|
||
季節性/商品壽命終結等非價格因素誤標為「流失」誘導降價(Critic Medium-3 fix)
|
||
|
||
recommended_price
|
||
= round(pchome_price)
|
||
語意:「跟進競品的最低調價金額」;統帥可基於此再依毛利策略加溢價
|
||
gap_pct ≤ 0(我方已便宜或持平)→ recommended_price=None(不需調價)
|
||
"""
|
||
try:
|
||
gap_pct = float(getattr(threat, "gap_pct", 0) or 0)
|
||
except (TypeError, ValueError):
|
||
gap_pct = 0.0
|
||
|
||
revenue_loss_7d = 0.0
|
||
if gap_pct > 0:
|
||
try:
|
||
prev = float(getattr(threat, "sales_7d_prev_amount", 0) or 0)
|
||
curr = float(getattr(threat, "sales_7d_curr_amount", 0) or 0)
|
||
revenue_loss_7d = max(0.0, prev - curr)
|
||
except (TypeError, ValueError):
|
||
revenue_loss_7d = 0.0
|
||
|
||
recommended_price = None
|
||
try:
|
||
pchome = float(getattr(threat, "pchome_price", 0) or 0)
|
||
if pchome > 0 and gap_pct > 0:
|
||
recommended_price = round(pchome)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
return {
|
||
"revenue_loss_7d": revenue_loss_7d,
|
||
"recommended_price": recommended_price,
|
||
}
|
||
|
||
|
||
# ── 語意化 Emoji 字典 ──────────────────────────────────
|
||
# 身份識別
|
||
HEADER_DISPATCHER = "⚡ NemoTron 派發器"
|
||
# 風險級別
|
||
ICON_CRITICAL = "🚨" # 高危險/立即行動
|
||
ICON_WARNING = "⚠️" # 中風險/人工覆核
|
||
ICON_INSIGHT = "💡" # 低風險/策略建議
|
||
ICON_REPORT = "📊" # 例行報告
|
||
# 業務屬性
|
||
ICON_PRICE = "💰"
|
||
ICON_SALES = "📦"
|
||
ICON_COMPETE = "🏆"
|
||
ICON_AI = "🧠"
|
||
ICON_FOOTPRINT = "⚙️"
|
||
|
||
MATCH_TYPE_LABELS = {
|
||
"exact": "高信心同款",
|
||
"same_product_different_pack": "同商品不同包裝",
|
||
"same_line_variant": "同系列不同款",
|
||
"comparable": "可比但需覆核",
|
||
"no_match": "非同款",
|
||
}
|
||
PRICE_BASIS_LABELS = {
|
||
"total_price": "總價可比",
|
||
"unit_price": "單位價可比",
|
||
"manual_review": "人工覆核後可比",
|
||
"none": "不可比",
|
||
}
|
||
ALERT_TIER_LABELS = {
|
||
"price_alert_exact": "可直接價格告警",
|
||
"unit_price_review": "單位價覆核",
|
||
"identity_review": "身份覆核",
|
||
"suppress": "不告警",
|
||
}
|
||
|
||
|
||
def _threat_match_metadata(threat) -> dict:
|
||
return {
|
||
"match_type": getattr(threat, "match_type", "exact") or "exact",
|
||
"price_basis": getattr(threat, "price_basis", "total_price") or "total_price",
|
||
"alert_tier": getattr(threat, "alert_tier", "price_alert_exact") or "price_alert_exact",
|
||
"match_score": float(getattr(threat, "match_score", 0) or 0),
|
||
"competitor_product_id": getattr(threat, "competitor_product_id", "") or "",
|
||
"competitor_product_name": getattr(threat, "competitor_product_name", "") or "",
|
||
}
|
||
|
||
|
||
def _can_direct_price_alert(threat) -> bool:
|
||
meta = _threat_match_metadata(threat)
|
||
return (
|
||
meta["match_type"] == "exact"
|
||
and meta["price_basis"] == "total_price"
|
||
and meta["alert_tier"] == "price_alert_exact"
|
||
)
|
||
|
||
|
||
def _format_match_evidence_block(
|
||
*,
|
||
match_type: str = "",
|
||
price_basis: str = "",
|
||
alert_tier: str = "",
|
||
match_score: float = 0.0,
|
||
competitor_product_id: str = "",
|
||
competitor_product_name: str = "",
|
||
) -> str:
|
||
match_type = match_type or "exact"
|
||
price_basis = price_basis or "total_price"
|
||
alert_tier = alert_tier or "price_alert_exact"
|
||
lines = [
|
||
f"{ICON_COMPETE} 比對證據:",
|
||
f"• 身份分級:{MATCH_TYPE_LABELS.get(match_type, match_type)}",
|
||
f"• 比價基準:{PRICE_BASIS_LABELS.get(price_basis, price_basis)}",
|
||
f"• 告警路徑:{ALERT_TIER_LABELS.get(alert_tier, alert_tier)}",
|
||
]
|
||
if match_score:
|
||
lines.append(f"• Match score:{match_score:.3f}")
|
||
if competitor_product_id:
|
||
lines.append(f"• PChome ID:{competitor_product_id}")
|
||
if competitor_product_name:
|
||
lines.append(f"• PChome 品名:{str(competitor_product_name)[:68]}")
|
||
return "\n".join(lines) + "\n\n"
|
||
|
||
|
||
def _safe_float(value, default: float = 0.0) -> float:
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return default
|
||
|
||
|
||
def _price_decision_data_quality(
|
||
momo_price,
|
||
comp_price,
|
||
match_score: float,
|
||
match_type: str,
|
||
price_basis: str,
|
||
alert_tier: str,
|
||
) -> str:
|
||
momo_ok = _safe_float(momo_price) > 0
|
||
comp_ok = _safe_float(comp_price) > 0
|
||
identity_ok = bool(match_type and price_basis and alert_tier)
|
||
score_ok = _safe_float(match_score) > 0
|
||
if momo_ok and comp_ok and identity_ok and score_ok:
|
||
return "complete"
|
||
if momo_ok or comp_ok or identity_ok:
|
||
return "partial"
|
||
return "missing"
|
||
|
||
|
||
def _price_decision_severity(
|
||
*,
|
||
decision_type: str,
|
||
gap_pct,
|
||
revenue_loss_7d: float,
|
||
alert_tier: str,
|
||
) -> str:
|
||
gap = abs(_safe_float(gap_pct))
|
||
loss = _safe_float(revenue_loss_7d)
|
||
if decision_type == "price_alert" and alert_tier == "price_alert_exact":
|
||
if gap >= 15 or loss >= 50000:
|
||
return "P1"
|
||
return "P2"
|
||
if loss >= 50000 or gap >= 20:
|
||
return "P2"
|
||
return "P3"
|
||
|
||
|
||
def _build_price_decision_envelope(
|
||
*,
|
||
decision_type: str,
|
||
sku: str,
|
||
name: str,
|
||
gap_pct,
|
||
sales_delta,
|
||
confidence: float,
|
||
analysis: str,
|
||
momo_price=None,
|
||
comp_price=None,
|
||
revenue_loss_7d: float = 0.0,
|
||
recommended_price: Optional[float] = None,
|
||
match_type: str = "",
|
||
price_basis: str = "",
|
||
alert_tier: str = "",
|
||
match_score: float = 0.0,
|
||
competitor_product_id: str = "",
|
||
competitor_product_name: str = "",
|
||
) -> dict:
|
||
"""建立 12 Agent 共用的價格決策信封;只描述證據,不執行價格或匹配覆寫。"""
|
||
match_type = match_type or "unknown"
|
||
price_basis = price_basis or "manual_review"
|
||
alert_tier = alert_tier or "identity_review"
|
||
match_score_value = _safe_float(match_score)
|
||
confidence_value = max(0.0, min(1.0, _safe_float(confidence, 0.0)))
|
||
gap_value = _safe_float(gap_pct)
|
||
sales_value = _safe_float(sales_delta)
|
||
momo_value = _safe_float(momo_price)
|
||
comp_value = _safe_float(comp_price)
|
||
loss_value = _safe_float(revenue_loss_7d)
|
||
gap_amount = None
|
||
if momo_value > 0 and comp_value > 0:
|
||
gap_amount = round(momo_value - comp_value, 2)
|
||
|
||
data_quality = _price_decision_data_quality(
|
||
momo_price=momo_price,
|
||
comp_price=comp_price,
|
||
match_score=match_score_value,
|
||
match_type=match_type,
|
||
price_basis=price_basis,
|
||
alert_tier=alert_tier,
|
||
)
|
||
severity = _price_decision_severity(
|
||
decision_type=decision_type,
|
||
gap_pct=gap_value,
|
||
revenue_loss_7d=loss_value,
|
||
alert_tier=alert_tier,
|
||
)
|
||
|
||
evidence = [
|
||
{
|
||
"type": "match",
|
||
"metric": "match_score",
|
||
"value": round(match_score_value, 3),
|
||
"basis": f"{match_type}/{price_basis}/{alert_tier}",
|
||
"confidence": round(match_score_value, 3) if match_score_value else None,
|
||
},
|
||
{
|
||
"type": "price",
|
||
"metric": "gap_pct",
|
||
"value": f"{gap_value:+.1f}%",
|
||
"basis": "MOMO latest price + PChome competitor_prices",
|
||
},
|
||
{
|
||
"type": "sales",
|
||
"metric": "sales_7d_delta_pct",
|
||
"value": f"{sales_value:+.1f}%",
|
||
"basis": "daily_sales_snapshot 7d vs previous 7d",
|
||
},
|
||
]
|
||
if loss_value > 0:
|
||
evidence.append({
|
||
"type": "impact",
|
||
"metric": "revenue_loss_7d",
|
||
"value": round(loss_value, 2),
|
||
"basis": "sales_7d_prev_amount - sales_7d_curr_amount",
|
||
})
|
||
|
||
action = "price_follow_review" if decision_type == "price_alert" else "identity_or_price_review"
|
||
blocked_reason = (
|
||
"價格調整需人工覆核;不得自動寫入或覆蓋正式競品價格"
|
||
if decision_type == "price_alert"
|
||
else "身份、包裝、單位價或前台狀態需人工確認"
|
||
)
|
||
risk_reduction = "high" if severity == "P1" else ("medium" if severity == "P2" else "watch")
|
||
|
||
return {
|
||
"decision_id": f"nemotron:{decision_type}:{sku}:{uuid.uuid4().hex[:8]}",
|
||
"source_agent": "nemotron",
|
||
"decision_type": decision_type,
|
||
"severity": severity,
|
||
"subject": {
|
||
"sku": sku,
|
||
"name": name,
|
||
"event_type": "price_competition",
|
||
"momo_price": momo_value if momo_value > 0 else None,
|
||
"competitor_price": comp_value if comp_value > 0 else None,
|
||
"competitor_product_id": competitor_product_id,
|
||
"competitor_product_name": str(competitor_product_name or "")[:120],
|
||
},
|
||
"evidence": evidence,
|
||
"analysis": _sanitize_text(analysis, fallback="請人工確認", max_len=300),
|
||
"recommended_action": {
|
||
"action": action,
|
||
"owner": "營運",
|
||
"requires_hitl": True,
|
||
},
|
||
"expected_impact": {
|
||
"momo_price": momo_value if momo_value > 0 else None,
|
||
"competitor_price": comp_value if comp_value > 0 else None,
|
||
"candidate_gap_pct": round(gap_value, 1),
|
||
"sales_7d_delta_pct": round(sales_value, 1),
|
||
"revenue_loss_7d": round(loss_value, 2),
|
||
"gap_amount": gap_amount,
|
||
"recommended_price": recommended_price,
|
||
"risk_reduction": risk_reduction,
|
||
},
|
||
"confidence": round(confidence_value, 3),
|
||
"guardrails": {
|
||
"can_auto_execute": False,
|
||
"blocked_reason": blocked_reason,
|
||
"data_quality": data_quality,
|
||
"match_type": match_type,
|
||
"price_basis": price_basis,
|
||
"alert_tier": alert_tier,
|
||
},
|
||
"trace": {
|
||
"model": NEMOTRON_OLLAMA_MODEL if NEMOTRON_OLLAMA_FIRST else NIM_MODEL,
|
||
"provider": "nemotron_dispatcher",
|
||
},
|
||
}
|
||
|
||
|
||
# ── tool_calls 解析(NIM 與 qwen3 共用)──────────────────────────
|
||
def _parse_tool_calls_struct(tool_calls: list) -> list:
|
||
"""從 OpenAI 格式的 tool_calls 結構陣列抽出 [{tool, args}] 清單。
|
||
NIM 與 qwen3 (Ollama /api/chat) 兩邊回應對齊 OpenAI schema:
|
||
[{"function": {"name": ..., "arguments": <json-str-or-dict>}, ...}]
|
||
arguments 在 NIM 是 JSON 字串、在 Ollama 通常已是 dict;本 helper 兼容兩者。
|
||
"""
|
||
results = []
|
||
for tc in tool_calls or []:
|
||
fn = tc.get("function", {}) if isinstance(tc, dict) else {}
|
||
if not fn:
|
||
continue
|
||
raw_args = fn.get("arguments", {})
|
||
if isinstance(raw_args, str):
|
||
try:
|
||
args = json.loads(raw_args) if raw_args.strip() else {}
|
||
except json.JSONDecodeError:
|
||
args = {}
|
||
elif isinstance(raw_args, dict):
|
||
args = raw_args
|
||
else:
|
||
args = {}
|
||
name = fn.get("name")
|
||
if name:
|
||
results.append({"tool": name, "args": args})
|
||
return results
|
||
|
||
|
||
def _parse_content_fallback(raw_content: str) -> list:
|
||
"""當模型沒回 tool_calls 結構、把工具呼叫塞進 content 時嘗試解析。
|
||
既有 NIM llama-3.1-8b 偶有此行為(line 537-554 原邏輯);
|
||
qwen3 開 thinking_mode=False 後較少見,但保留同等容錯。
|
||
"""
|
||
if not raw_content or not isinstance(raw_content, str):
|
||
return []
|
||
try:
|
||
parsed = json.loads(raw_content.strip())
|
||
except Exception as parse_err:
|
||
logger.error(f"[ToolCalls] content fallback JSON 解析失敗:{parse_err}")
|
||
return []
|
||
|
||
if not isinstance(parsed, list):
|
||
return []
|
||
|
||
results = []
|
||
for item in parsed:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
name = item.get("name") or (item.get("function", {}) or {}).get("name")
|
||
args = item.get("parameters") or item.get("arguments") or {}
|
||
if isinstance(args, str):
|
||
try:
|
||
args = json.loads(args)
|
||
except json.JSONDecodeError:
|
||
args = {}
|
||
if name:
|
||
results.append({"tool": name, "args": args})
|
||
if results:
|
||
logger.info(f"[ToolCalls] content fallback 解析成功,取得 {len(results)} 個 tool_calls")
|
||
return results
|
||
|
||
|
||
def _build_footprint_json(hermes_stats: Optional[dict], nim_stats: Optional[dict]) -> dict:
|
||
"""
|
||
建立結構化運算足跡 (用於 DB model_footprint JSONB 欄位)
|
||
|
||
Returns:
|
||
{"analyst": {...}, "dispatcher": {...}}
|
||
"""
|
||
result = {}
|
||
if hermes_stats:
|
||
result["analyst"] = {
|
||
"model": "qwen2.5:7b-instruct",
|
||
"host": hermes_stats.get("host", HERMES_URL),
|
||
"host_label": hermes_stats.get("host_label", "未知"),
|
||
"duration_sec": hermes_stats.get("duration_sec", 0),
|
||
"tokens": hermes_stats.get("tokens", 0),
|
||
"cost_usd": 0,
|
||
}
|
||
if nim_stats:
|
||
result["dispatcher"] = {
|
||
"model": NIM_MODEL,
|
||
"platform": "NVIDIA NIM",
|
||
"total_tokens": nim_stats.get("total_tokens", 0),
|
||
"quota_used": nim_stats.get("quota_used", 0),
|
||
"cost_usd": 0,
|
||
}
|
||
return result
|
||
|
||
|
||
def _build_footprint_block(hermes_stats: Optional[dict], nim_stats: Optional[dict]) -> str:
|
||
"""
|
||
建立底部運算足跡區塊(Telegram 顯示用)
|
||
|
||
Args:
|
||
hermes_stats: {"duration_sec": float, "tokens": int}
|
||
nim_stats: {"total_tokens": int, "quota_used": int}
|
||
|
||
Returns:
|
||
分隔線 + ⚙️ 運算足跡 文字區塊
|
||
"""
|
||
lines = ["─────────────────────", f"{ICON_FOOTPRINT} 運算足跡:"]
|
||
|
||
if hermes_stats:
|
||
dur = hermes_stats.get("duration_sec", 0)
|
||
tok = hermes_stats.get("tokens", "?")
|
||
label = hermes_stats.get("host_label", "本地 188")
|
||
lines.append(
|
||
f"• 🔍 分析: Qwen2.5 7B ({label}) | "
|
||
f"耗時: {dur:.1f}s | Tokens: {tok} | $0 成本"
|
||
)
|
||
else:
|
||
lines.append("• 🔍 分析: Qwen2.5 7B (未知主機) | $0 成本")
|
||
|
||
if nim_stats:
|
||
tok = nim_stats.get("total_tokens", "?")
|
||
quota = nim_stats.get("quota_used", "?")
|
||
lines.append(
|
||
f"• ⚡ 決策: NemoTron NIM | "
|
||
f"{tok} Tokens | $0 (配額內 {quota}/{NIM_DAILY_LIMIT})"
|
||
)
|
||
else:
|
||
lines.append(f"• ⚡ 決策: NemoTron NIM | $0 (配額內)")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
class NemotronDispatcher:
|
||
"""
|
||
NemoTron 行動派發器
|
||
接收 Hermes 輸出 → NIM tool calling 決策 → 語意化 Telegram 告警 / DB 寫入
|
||
"""
|
||
|
||
def __init__(self, notification_manager=None, engine=None):
|
||
"""
|
||
Args:
|
||
notification_manager: NotificationManager 實例(負責 Telegram 推播)
|
||
engine: SQLAlchemy engine(用於寫入推薦商品 DB)
|
||
"""
|
||
self.nm = notification_manager
|
||
self.engine = engine
|
||
|
||
# ──────────────────────────────────────────────
|
||
# NIM Tool Calling
|
||
# ──────────────────────────────────────────────
|
||
def _call_nim(self, threats: list) -> tuple:
|
||
"""
|
||
將 Hermes 威脅清單交給 NIM,取得 tool_calls 決策清單
|
||
|
||
Returns:
|
||
(list of {"tool": str, "args": dict}, dict nim_stats)
|
||
nim_stats: {"total_tokens": int, "quota_used": int}
|
||
"""
|
||
if not NIM_API_KEY:
|
||
logger.warning("[NemotronDispatcher] NVIDIA_API_KEY 未設定,跳過 NIM 呼叫")
|
||
raise RuntimeError("NVIDIA_API_KEY not configured")
|
||
|
||
threat_summary = json.dumps(
|
||
[
|
||
{
|
||
"sku": t.sku,
|
||
"name": t.name,
|
||
"momo_price": t.momo_price,
|
||
"pchome_price": t.pchome_price,
|
||
"gap_pct": t.gap_pct,
|
||
"sales_delta": t.sales_7d_delta_pct,
|
||
"risk": t.risk,
|
||
"action": t.recommended_action,
|
||
"confidence": t.confidence,
|
||
**_threat_match_metadata(t),
|
||
}
|
||
for t in threats
|
||
],
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# 注入 MCP 市場上下文
|
||
mcp_ctx = build_mcp_context()
|
||
|
||
messages = [
|
||
{
|
||
"role": "system",
|
||
"content": (
|
||
"你是台灣電商競價情報的行動派發器。"
|
||
f"當前市場背景 (MCP):\n{mcp_ctx}\n\n"
|
||
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
|
||
"路由鐵律(依序判斷,命中即停):\n"
|
||
"1. match_type 不是 exact,或 price_basis 不是 total_price,或 alert_tier 不是 price_alert_exact "
|
||
"→ 不可直接價格告警,呼叫 flag_for_human_review,concern 說明需覆核身份、包裝或單位價。\n"
|
||
"2. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review,"
|
||
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
|
||
"3. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert(填入 momo_price, comp_price)。\n"
|
||
"4. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
|
||
"5. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
|
||
"每支商品只呼叫一個工具。\n"
|
||
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
|
||
" 1. 嚴禁簡體字(例:不可用「参给当为来国发会说时间过从实现这话动问题」,"
|
||
" 必須用「參給當為來國發會說時間過從實現這話動問題」)\n"
|
||
" 2. 嚴禁異體字(例:不可用「亊」,必須用「事」)\n"
|
||
" 3. 嚴禁短語重複(例:不可輸出「當前事當前事」這種坍塌)\n"
|
||
" 4. 嚴禁無意義字元組合或亂碼\n"
|
||
"若無法產出合理的繁體中文說明,直接輸出「請人工評估議價空間」。"
|
||
),
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": f"請處理以下 {len(threats)} 筆威脅清單:\n{threat_summary}",
|
||
},
|
||
]
|
||
|
||
# P1-4 修復:NIM API 指數退避 retry(最多 3 次)
|
||
# Phase 1 v5.0: 包 ai_call_logger 追蹤 NIM 配額/tokens/錯誤
|
||
import time as _time
|
||
last_err = None
|
||
with log_ai_call(
|
||
caller='nemotron_dispatch',
|
||
provider='nim',
|
||
model=NIM_MODEL,
|
||
meta={'threat_count': len(threats), 'quota_used': _nim_quota_used()},
|
||
) as _ctx:
|
||
for _attempt in range(3):
|
||
try:
|
||
resp = requests.post(
|
||
f"{NIM_BASE_URL}/chat/completions",
|
||
headers={
|
||
"Authorization": f"Bearer {NIM_API_KEY}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
json={
|
||
"model": NIM_MODEL,
|
||
"messages": messages,
|
||
"tools": TOOLS,
|
||
"tool_choice": "required",
|
||
"max_tokens": 2048,
|
||
},
|
||
timeout=NIM_TIMEOUT,
|
||
)
|
||
resp.raise_for_status()
|
||
break
|
||
except (requests.Timeout, requests.HTTPError) as e:
|
||
last_err = e
|
||
# ADR-004: 429 不重試,立即拋出讓上層啟動 Hermes 規則引擎降級
|
||
if isinstance(e, requests.HTTPError) and e.response is not None \
|
||
and e.response.status_code == 429:
|
||
logger.warning("[NIM] HTTP 429 速率限制,跳出 retry 迴圈")
|
||
_ctx.set_error(f"NIM 429 rate-limited")
|
||
_ctx.fallback_to_caller('hermes_rule_engine')
|
||
raise
|
||
if _attempt < 2:
|
||
_time.sleep(2 ** _attempt)
|
||
logger.warning(f"[NIM] retry {_attempt + 1}/2 after {e}")
|
||
else:
|
||
raise last_err
|
||
|
||
body = resp.json()
|
||
usage = body.get("usage", {})
|
||
# 記錄 token / 成本到 ai_calls 表
|
||
_ctx.set_tokens(
|
||
input=usage.get("prompt_tokens", 0),
|
||
output=usage.get("completion_tokens", 0),
|
||
)
|
||
nim_stats = {
|
||
"total_tokens": usage.get("total_tokens", 0),
|
||
"quota_used": _nim_quota_used(),
|
||
}
|
||
|
||
choices = body.get("choices", [])
|
||
message = choices[0].get("message", {}) if choices else {}
|
||
tool_calls = message.get("tool_calls", []) or []
|
||
|
||
# 共用結構解析(NIM / qwen3 兩邊統一走同一條)
|
||
results = _parse_tool_calls_struct(tool_calls)
|
||
|
||
if not results:
|
||
# llama-3.1-8b-instruct 有時把 tool call 寫進 content 而非 tool_calls 結構
|
||
raw_content = message.get("content", "") or ""
|
||
logger.warning(f"[NIM] 0 tool_calls,嘗試從 content 解析:{raw_content[:120]}")
|
||
results = _parse_content_fallback(raw_content)
|
||
|
||
logger.info(f"[NIM] 收到 {len(results)} 個 tool_calls | tokens={nim_stats['total_tokens']}")
|
||
return results, nim_stats
|
||
|
||
# ──────────────────────────────────────────────
|
||
# GCP Ollama qwen3:14b Tool Calling(Operation Ollama-First v5.0 / Phase 3)
|
||
# ──────────────────────────────────────────────
|
||
def _call_qwen3_dispatch(self, threats: list) -> tuple:
|
||
"""
|
||
將 Hermes 威脅清單交給 GCP Ollama qwen3:14b,取得 tool_calls 決策。
|
||
|
||
Why qwen3:14b(A2 web-research 結論,docs/phase0_research_report_20260503.md):
|
||
- Ollama registry 官方頁 + qwenlm.github.io 雙確認 tools capability 可用
|
||
- 預設可關閉 thinking mode(避免 deepseek-r1 的 30s thinking 延遲)
|
||
- 14B 體積 9.3GB,與 deepseek-r1:14b 同級
|
||
- 與 NIM 一致採 OpenAI 兼容 chat completion + tools schema
|
||
|
||
Returns:
|
||
(list of {"tool": str, "args": dict}, dict ollama_stats)
|
||
ollama_stats: {"total_tokens": int, "host": str, "model": str}
|
||
"""
|
||
from services.ollama_service import (
|
||
get_host_label,
|
||
get_provider_tag,
|
||
mark_unhealthy,
|
||
resolve_ollama_host,
|
||
)
|
||
|
||
threat_summary = json.dumps(
|
||
[
|
||
{
|
||
"sku": t.sku,
|
||
"name": t.name,
|
||
"momo_price": t.momo_price,
|
||
"pchome_price": t.pchome_price,
|
||
"gap_pct": t.gap_pct,
|
||
"sales_delta": t.sales_7d_delta_pct,
|
||
"risk": t.risk,
|
||
"action": t.recommended_action,
|
||
"confidence": t.confidence,
|
||
**_threat_match_metadata(t),
|
||
}
|
||
for t in threats
|
||
],
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# 注入 MCP 市場上下文(與 NIM 路徑一致)
|
||
mcp_ctx = build_mcp_context()
|
||
|
||
# System prompt 與 NIM 完全一致(避免兩套維護)
|
||
system_prompt = (
|
||
"你是台灣電商競價情報的行動派發器。"
|
||
f"當前市場背景 (MCP):\n{mcp_ctx}\n\n"
|
||
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
|
||
"路由鐵律(依序判斷,命中即停):\n"
|
||
"1. match_type 不是 exact,或 price_basis 不是 total_price,或 alert_tier 不是 price_alert_exact "
|
||
"→ 不可直接價格告警,呼叫 flag_for_human_review,concern 說明需覆核身份、包裝或單位價。\n"
|
||
"2. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review,"
|
||
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
|
||
"3. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert(填入 momo_price, comp_price)。\n"
|
||
"4. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
|
||
"5. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
|
||
"每支商品只呼叫一個工具。\n"
|
||
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
|
||
" 1. 嚴禁簡體字、嚴禁異體字(例:不可用「亊」,必須用「事」)\n"
|
||
" 2. 嚴禁短語重複(語意坍塌)、嚴禁無意義字元組合\n"
|
||
"若無法產出合理的繁體中文說明,直接輸出「請人工評估議價空間」。"
|
||
)
|
||
|
||
payload = {
|
||
"model": NEMOTRON_OLLAMA_MODEL,
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": f"請處理以下 {len(threats)} 筆威脅清單:\n{threat_summary}"},
|
||
],
|
||
"tools": TOOLS, # 重用既有 NIM tools schema
|
||
"stream": False,
|
||
"options": {
|
||
"temperature": 0.2,
|
||
"num_predict": 2048,
|
||
},
|
||
}
|
||
|
||
with log_ai_call(
|
||
caller='nemotron_dispatch',
|
||
provider='gcp_ollama',
|
||
model=NEMOTRON_OLLAMA_MODEL,
|
||
request_id=f"nem-{int(time.time())}",
|
||
meta={
|
||
'flag': 'NEMOTRON_OLLAMA_FIRST',
|
||
'threats_count': len(threats),
|
||
},
|
||
) as ctx:
|
||
attempted_hosts = []
|
||
body = None
|
||
host = None
|
||
last_error = None
|
||
for _attempt in range(3):
|
||
host = resolve_ollama_host().rstrip("/")
|
||
if host in attempted_hosts:
|
||
break
|
||
attempted_hosts.append(host)
|
||
try:
|
||
resp = requests.post(
|
||
f"{host}/api/chat",
|
||
json=payload,
|
||
timeout=NEMOTRON_OLLAMA_TIMEOUT,
|
||
)
|
||
resp.raise_for_status()
|
||
body = resp.json()
|
||
ctx.set_provider(get_provider_tag(host))
|
||
ctx.add_meta('host', host)
|
||
ctx.add_meta('host_label', get_host_label(host))
|
||
ctx.add_meta('attempted_hosts', attempted_hosts)
|
||
break
|
||
except Exception as e:
|
||
last_error = e
|
||
# 連線/HTTP 失敗 → 標記主機 unhealthy,下一輪依序嘗試 GCP-B / 111。
|
||
mark_unhealthy(host)
|
||
logger.warning(
|
||
"[Dispatcher][qwen3] host=%s 呼叫失敗,嘗試下一台: %s",
|
||
host, e,
|
||
)
|
||
if body is None:
|
||
ctx.set_error(
|
||
f"qwen3 call failed after {len(attempted_hosts)} host(s): "
|
||
f"{type(last_error).__name__}: {last_error}"
|
||
)
|
||
ctx.fallback_to_caller('nim')
|
||
raise RuntimeError(last_error or "qwen3 all hosts failed")
|
||
|
||
ctx.set_tokens(
|
||
input=body.get('prompt_eval_count', 0),
|
||
output=body.get('eval_count', 0),
|
||
)
|
||
|
||
msg = body.get('message', {}) if isinstance(body, dict) else {}
|
||
tool_calls = msg.get('tool_calls', []) or []
|
||
|
||
# 走共用 tool_calls 結構解析(與 NIM 同一條 helper)
|
||
results = _parse_tool_calls_struct(tool_calls)
|
||
|
||
if not results:
|
||
# qwen3 沒回 tool_calls → 走既有 content fallback 解析
|
||
raw_content = msg.get('content', '') or ''
|
||
logger.warning(
|
||
f"[Dispatcher][qwen3] 0 tool_calls,嘗試從 content 解析:{raw_content[:120]}"
|
||
)
|
||
results = _parse_content_fallback(raw_content)
|
||
|
||
ollama_stats = {
|
||
"total_tokens": (body.get('prompt_eval_count', 0) or 0)
|
||
+ (body.get('eval_count', 0) or 0),
|
||
"host": host,
|
||
"host_label": get_host_label(host),
|
||
"provider": get_provider_tag(host),
|
||
"model": NEMOTRON_OLLAMA_MODEL,
|
||
}
|
||
|
||
logger.info(
|
||
f"[Dispatcher][qwen3] 收到 {len(results)} 個 tool_calls | "
|
||
f"tokens={ollama_stats['total_tokens']} host={host}"
|
||
)
|
||
return results, ollama_stats
|
||
|
||
# ──────────────────────────────────────────────
|
||
# ADR-004:Hermes 規則引擎降級路由
|
||
# ──────────────────────────────────────────────
|
||
def _hermes_rule_fallback(self, threats: list, hermes_stats: Optional[dict] = None) -> dict:
|
||
"""
|
||
ADR-004 降級模式:NIM HTTP 429 時,改用確定性規則路由 Hermes 威脅清單。
|
||
路由規則與 NIM system prompt 一致,所有 Telegram 告警加 🟡 降級前綴。
|
||
|
||
Rules(依序判斷,命中即停):
|
||
1. gap_pct < 5% 且 sales_delta < -30% → flag_for_human_review(疑似缺貨/流量異常)
|
||
2. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert
|
||
3. gap_pct < 0 且 sales_delta > 0 → add_to_recommendation(我方具競爭力)
|
||
4. 其餘 → flag_for_human_review(信心不足/複雜情況)
|
||
"""
|
||
degraded_note = "🟡 [降級模式 ADR-004] NIM 配額耗盡,改用 Hermes 規則引擎決策"
|
||
footprint = degraded_note + "\n" + _build_footprint_block(hermes_stats, None)
|
||
|
||
dispatched, errors = 0, []
|
||
|
||
for t in threats:
|
||
try:
|
||
# B' 軌:每個 threat 預先算金額影響,所有路徑統一注入
|
||
impact = _compute_business_impact(t)
|
||
rl, rp = impact["revenue_loss_7d"], impact["recommended_price"]
|
||
match_meta = _threat_match_metadata(t)
|
||
|
||
if not _can_direct_price_alert(t):
|
||
self._exec_flag_for_human_review(
|
||
sku=t.sku,
|
||
name=t.name,
|
||
concern=(
|
||
"🟡 [規則引擎] 比對證據尚未達直接價格告警門檻;"
|
||
f"match_type={match_meta['match_type']}、"
|
||
f"price_basis={match_meta['price_basis']}、"
|
||
f"alert_tier={match_meta['alert_tier']}。"
|
||
"請先覆核是否為同款、同包裝或需改用單位價。"
|
||
),
|
||
confidence=max(float(getattr(t, "confidence", 0.5) or 0.5), 0.75),
|
||
footprint=footprint,
|
||
momo_price=t.momo_price,
|
||
comp_price=t.pchome_price,
|
||
gap_pct=t.gap_pct,
|
||
sales_delta=t.sales_7d_delta_pct,
|
||
revenue_loss_7d=rl,
|
||
recommended_price=rp,
|
||
**match_meta,
|
||
)
|
||
dispatched += 1
|
||
continue
|
||
|
||
if t.gap_pct < 5 and t.sales_7d_delta_pct < -30:
|
||
# Rule 1:價差微小但銷量大跌 → 非定價問題,人工確認
|
||
self._exec_flag_for_human_review(
|
||
sku=t.sku, name=t.name,
|
||
concern=(
|
||
f"🟡 [規則引擎] 價差僅 {t.gap_pct:+.1f}% 但銷量大跌 "
|
||
f"{t.sales_7d_delta_pct:+.1f}%,疑似缺貨/下架/平台流量異常,"
|
||
"請人工走查前台。"
|
||
),
|
||
confidence=0.80,
|
||
footprint=footprint,
|
||
momo_price=t.momo_price, comp_price=t.pchome_price,
|
||
gap_pct=t.gap_pct, sales_delta=t.sales_7d_delta_pct,
|
||
revenue_loss_7d=rl, recommended_price=rp,
|
||
**_threat_match_metadata(t),
|
||
)
|
||
elif t.gap_pct >= 5 and t.risk == "HIGH":
|
||
# Rule 2:高價差 HIGH 風險 → 競價告警
|
||
self._exec_trigger_price_alert(
|
||
t.sku, t.name,
|
||
t.gap_pct, t.sales_7d_delta_pct,
|
||
f"🟡 [規則引擎] {t.recommended_action}",
|
||
t.confidence,
|
||
momo_price=t.momo_price, comp_price=t.pchome_price,
|
||
footprint=footprint,
|
||
revenue_loss_7d=rl, recommended_price=rp,
|
||
**_threat_match_metadata(t),
|
||
)
|
||
elif t.gap_pct < 0 and t.sales_7d_delta_pct > 0:
|
||
# Rule 3:我方具競爭力 + 銷量正成長 → 推薦
|
||
self._exec_add_to_recommendation(
|
||
t.sku, t.name,
|
||
(
|
||
f"🟡 [規則引擎] 我方比競品便宜 {abs(t.gap_pct):.1f}%,"
|
||
f"銷量正成長 {t.sales_7d_delta_pct:+.1f}%"
|
||
),
|
||
t.confidence,
|
||
footprint=footprint,
|
||
threat=t,
|
||
)
|
||
else:
|
||
# Rule 4:其餘複雜情況 → 人工覆核
|
||
self._exec_flag_for_human_review(
|
||
sku=t.sku, name=t.name,
|
||
concern=(
|
||
f"🟡 [規則引擎] 情況複雜或信心不足(信心 {t.confidence:.0%}),"
|
||
f"建議:{t.recommended_action}"
|
||
),
|
||
confidence=t.confidence,
|
||
footprint=footprint,
|
||
momo_price=t.momo_price, comp_price=t.pchome_price,
|
||
gap_pct=t.gap_pct, sales_delta=t.sales_7d_delta_pct,
|
||
revenue_loss_7d=rl, recommended_price=rp,
|
||
**_threat_match_metadata(t),
|
||
)
|
||
dispatched += 1
|
||
except Exception as e:
|
||
errors.append(f"fallback({t.sku}): {e}")
|
||
logger.error(f"[Dispatcher][ADR-004] Hermes fallback 失敗 {t.sku}: {e}")
|
||
|
||
logger.info(
|
||
f"[Dispatcher][ADR-004] Hermes 規則引擎降級完成 "
|
||
f"dispatched={dispatched} errors={len(errors)}"
|
||
)
|
||
return {"dispatched": dispatched, "skipped": 0, "errors": errors, "nim_stats": {"degraded": True}}
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 語意化訊息格式器
|
||
# ──────────────────────────────────────────────
|
||
@staticmethod
|
||
def _fmt_price_alert(
|
||
sku: str, name: str,
|
||
momo_price, comp_price,
|
||
gap_pct: float, sales_delta: float,
|
||
action: str, confidence: float,
|
||
footprint: str,
|
||
revenue_loss_7d: float = 0.0,
|
||
recommended_price: Optional[float] = None,
|
||
match_type: str = "exact",
|
||
price_basis: str = "total_price",
|
||
alert_tier: str = "price_alert_exact",
|
||
match_score: float = 0.0,
|
||
competitor_product_id: str = "",
|
||
competitor_product_name: str = "",
|
||
) -> str:
|
||
"""
|
||
類別一:緊急告警
|
||
倒金字塔:結論先行 → 核心數據 → 金額影響 → AI 洞察 → 運算足跡
|
||
|
||
[2026-04-18 台北] Bug-3 防線三 UI 物理隔離:
|
||
- 核心問題 = Python 客觀組字(價差 X% / 銷量 Y%),不碰 AI 文字
|
||
- 關鍵數據 = Python 獨裁注入,None/0 降級為 N/A 而非 $0
|
||
- AI 洞察 = action 唯一使用位置;移除假冒「Hermes 分析師研判」標籤
|
||
(action 實為 NemoTron 輸出,非 Hermes) — Claude Opus 4.7
|
||
|
||
[2026-05-02 台北] B' 軌:金額影響量化 — Claude Opus 4.7
|
||
- revenue_loss_7d / recommended_price 純 Python 計算(_compute_business_impact)
|
||
- 解決「告警內容空泛、人類無可批准的具體動作」根因
|
||
"""
|
||
conf_pct = int(confidence * 100)
|
||
|
||
# 客觀數據(None/0 降級避免 $0 幻覺)
|
||
mp_str = f"${momo_price:,.0f}" if momo_price not in (None, 0) else "N/A(資料缺失)"
|
||
cp_str = f"${comp_price:,.0f}" if comp_price not in (None, 0) else "N/A(資料缺失)"
|
||
|
||
# 核心問題:Python 客觀組字,不碰 AI 文字
|
||
core_issue = f"價差 {gap_pct:+.1f}% / 近七天銷量 {sales_delta:+.1f}%"
|
||
|
||
# 金額影響區塊(B' 軌新增)
|
||
impact_lines = []
|
||
if revenue_loss_7d and revenue_loss_7d > 0:
|
||
impact_lines.append(f"📉 過去 7 日營收流失:NT$ {revenue_loss_7d:,.0f}")
|
||
if recommended_price is not None and recommended_price > 0:
|
||
impact_lines.append(
|
||
f"🎯 跟進競品建議價:NT$ {recommended_price:,.0f}"
|
||
f"(毛利策略可再加溢價)"
|
||
)
|
||
impact_block = ("\n".join(impact_lines) + "\n\n") if impact_lines else ""
|
||
match_block = _format_match_evidence_block(
|
||
match_type=match_type,
|
||
price_basis=price_basis,
|
||
alert_tier=alert_tier,
|
||
match_score=match_score,
|
||
competitor_product_id=competitor_product_id,
|
||
competitor_product_name=competitor_product_name,
|
||
)
|
||
|
||
# AI 洞察:唯一允許 LLM 文字進入的欄位
|
||
ai_insight = _sanitize_text(action, fallback="請人工評估議價空間")
|
||
|
||
return (
|
||
f"{ICON_CRITICAL} [{HEADER_DISPATCHER}] 競價高危險預警\n\n"
|
||
f"{ICON_WARNING} 核心問題:[{sku}] {name}\n"
|
||
f"{core_issue}\n\n"
|
||
f"{ICON_REPORT} 關鍵數據:\n"
|
||
f"• 我方價格:{mp_str}\n"
|
||
f"• 競品價格:{cp_str}\n"
|
||
f"• 銷量變化:{sales_delta:+.1f}%\n\n"
|
||
f"{match_block}"
|
||
f"{impact_block}"
|
||
f"{ICON_AI} AI 洞察(信心度 {conf_pct}%):\n"
|
||
f"{ai_insight}\n\n"
|
||
f"{footprint}"
|
||
)
|
||
|
||
@staticmethod
|
||
def _fmt_human_review(
|
||
sku: str, name: str,
|
||
concern: str, footprint: str,
|
||
momo_price: float = None, comp_price: float = None,
|
||
gap_pct: float = None, sales_delta: float = None,
|
||
revenue_loss_7d: float = 0.0,
|
||
recommended_price: Optional[float] = None,
|
||
match_type: str = "",
|
||
price_basis: str = "",
|
||
alert_tier: str = "",
|
||
match_score: float = 0.0,
|
||
competitor_product_id: str = "",
|
||
competitor_product_name: str = "",
|
||
) -> str:
|
||
"""
|
||
類別二:人工覆核
|
||
客觀數據由 Python 注入(防幻覺),AI 診斷隔離在獨立欄位
|
||
B' 軌:補金額影響欄位
|
||
"""
|
||
# 客觀數據快照(100% Python,不經 LLM)
|
||
if momo_price is not None and comp_price is not None:
|
||
data_block = (
|
||
f"{ICON_REPORT} 客觀數據快照:\n"
|
||
f"• 我方價格:${momo_price:,.0f}\n"
|
||
f"• 競品價格:${comp_price:,.0f}(價差 {gap_pct:+.1f}%)\n"
|
||
f"• 七天銷量變化:{sales_delta:+.1f}%\n"
|
||
)
|
||
else:
|
||
data_block = f"{ICON_REPORT} 客觀數據:(無競品比價數據)\n"
|
||
|
||
# 金額影響(B' 軌新增)
|
||
impact_lines = []
|
||
if revenue_loss_7d and revenue_loss_7d > 0:
|
||
impact_lines.append(f"📉 過去 7 日營收流失:NT$ {revenue_loss_7d:,.0f}")
|
||
if recommended_price is not None and recommended_price > 0:
|
||
impact_lines.append(f"🎯 跟進競品建議價:NT$ {recommended_price:,.0f}")
|
||
impact_block = ("\n".join(f"• {l}" for l in impact_lines) + "\n") if impact_lines else ""
|
||
match_block = _format_match_evidence_block(
|
||
match_type=match_type or "unknown",
|
||
price_basis=price_basis or "manual_review",
|
||
alert_tier=alert_tier or "identity_review",
|
||
match_score=match_score,
|
||
competitor_product_id=competitor_product_id,
|
||
competitor_product_name=competitor_product_name,
|
||
)
|
||
|
||
return (
|
||
f"{ICON_WARNING} [{HEADER_DISPATCHER}] 異常波動需人工覆核\n\n"
|
||
f"🔍 待查商品:[{sku}] {name}\n\n"
|
||
f"{data_block}"
|
||
f"{match_block}"
|
||
f"{impact_block}\n"
|
||
f"🧠 AI 診斷:\n"
|
||
f"{concern}\n\n"
|
||
f"👉 建議行動:請營運人員立即進行前台走查。\n\n"
|
||
f"{footprint}"
|
||
)
|
||
|
||
@staticmethod
|
||
def _fmt_recommendation(
|
||
sku: str, name: str,
|
||
reason: str, confidence: float,
|
||
db_written: bool,
|
||
footprint: str,
|
||
) -> str:
|
||
"""
|
||
類別三:策略執行通知
|
||
"""
|
||
conf_pct = int(confidence * 100)
|
||
db_status = "✅ 系統已自動寫入 ai_price_recommendations 推薦表" if db_written \
|
||
else "⚠️ DB 未注入,僅發送通知(表尚未建立)"
|
||
return (
|
||
f"{ICON_INSIGHT} [{HEADER_DISPATCHER}] 潛力商品自動佈署\n\n"
|
||
f"{ICON_COMPETE} 推薦品項:[{sku} {name}] 已自動加入「首頁推薦區塊」\n\n"
|
||
f"{ICON_REPORT} 決策依據:\n"
|
||
f"{reason}\n\n"
|
||
f"{ICON_AI} AI 洞察 (信心度 {conf_pct}%):\n"
|
||
f"具備價格競爭優勢,NemoTron 主動提升曝光量以最大化業績。\n\n"
|
||
f"👉 執行狀態:{db_status}\n\n"
|
||
f"{footprint}"
|
||
)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 工具實作
|
||
# ──────────────────────────────────────────────
|
||
def _exec_trigger_price_alert(
|
||
self,
|
||
sku: str, name: str,
|
||
gap_pct: float, sales_delta: float, action: str, confidence: float,
|
||
momo_price=None, comp_price=None,
|
||
footprint: str = "",
|
||
revenue_loss_7d: float = 0.0,
|
||
recommended_price: Optional[float] = None,
|
||
match_type: str = "exact",
|
||
price_basis: str = "total_price",
|
||
alert_tier: str = "price_alert_exact",
|
||
match_score: float = 0.0,
|
||
competitor_product_id: str = "",
|
||
competitor_product_name: str = "",
|
||
):
|
||
"""發送語意化競價高危險預警
|
||
|
||
[2026-04-18 台北] Bug-1 防線一 保險:default 改 None,避免 LLM 漏吐
|
||
→ 舊版 default=0 → Telegram 顯示 $0。Layer A Hermes 已根治,這層是第二道屏障
|
||
— Claude Opus 4.7
|
||
|
||
[2026-05-02 台北] B' 軌:revenue_loss_7d / recommended_price 純 Python 注入
|
||
— Claude Opus 4.7
|
||
"""
|
||
msg = self._fmt_price_alert(
|
||
sku, name, momo_price, comp_price,
|
||
gap_pct, sales_delta, action, confidence, footprint,
|
||
revenue_loss_7d=revenue_loss_7d,
|
||
recommended_price=recommended_price,
|
||
match_type=match_type,
|
||
price_basis=price_basis,
|
||
alert_tier=alert_tier,
|
||
match_score=match_score,
|
||
competitor_product_id=competitor_product_id,
|
||
competitor_product_name=competitor_product_name,
|
||
)
|
||
decision_envelope = _build_price_decision_envelope(
|
||
decision_type="price_alert",
|
||
sku=sku,
|
||
name=name,
|
||
gap_pct=gap_pct,
|
||
sales_delta=sales_delta,
|
||
confidence=confidence,
|
||
analysis=action,
|
||
momo_price=momo_price,
|
||
comp_price=comp_price,
|
||
revenue_loss_7d=revenue_loss_7d,
|
||
recommended_price=recommended_price,
|
||
match_type=match_type,
|
||
price_basis=price_basis,
|
||
alert_tier=alert_tier,
|
||
match_score=match_score,
|
||
competitor_product_id=competitor_product_id,
|
||
competitor_product_name=competitor_product_name,
|
||
)
|
||
self._send_telegram(msg, decision_envelope=decision_envelope)
|
||
logger.info(
|
||
f"[Dispatcher] 競價告警 → {sku} gap={gap_pct:.1f}% sales={sales_delta:.1f}% "
|
||
f"loss=${revenue_loss_7d:,.0f} rec_price={recommended_price}"
|
||
)
|
||
# ADR-007 雙寫:沉澱到 ai_insights 供日後 RAG
|
||
self._sink_insight_to_km(
|
||
insight_type="price_alert",
|
||
sku=sku, name=name,
|
||
content=f"[高危險告警] {name} 價差 {gap_pct:+.1f}% / 銷量 {sales_delta:+.1f}%。行動:{action}",
|
||
metadata={"gap_pct": gap_pct, "sales_delta": sales_delta, "confidence": confidence,
|
||
"momo_price": momo_price, "comp_price": comp_price,
|
||
"revenue_loss_7d": revenue_loss_7d,
|
||
"recommended_price": recommended_price,
|
||
"match_type": match_type,
|
||
"price_basis": price_basis,
|
||
"alert_tier": alert_tier,
|
||
"match_score": match_score,
|
||
"competitor_product_id": competitor_product_id,
|
||
"decision_envelope": decision_envelope},
|
||
)
|
||
|
||
def _exec_add_to_recommendation(
|
||
self,
|
||
sku: str, name: str, reason: str, confidence: float,
|
||
footprint: str = "",
|
||
footprint_data: Optional[dict] = None, # 結構化 JSON,寫入 model_footprint 欄位
|
||
threat=None, # PriceThreat 物件,用於寫入完整數據快照
|
||
):
|
||
"""寫入前台推薦商品 DB + 語意化 Telegram 通知"""
|
||
db_written = False
|
||
|
||
if self.engine:
|
||
try:
|
||
from sqlalchemy import text
|
||
footprint_json = json.dumps(footprint_data or {}, ensure_ascii=False)
|
||
with self.engine.begin() as conn:
|
||
conn.execute(text("""
|
||
INSERT INTO ai_price_recommendations
|
||
(sku, name, reason, strategy, confidence,
|
||
momo_price, pchome_price, gap_pct, sales_7d_delta,
|
||
model_footprint, status, created_at, updated_at)
|
||
VALUES
|
||
(:sku, :name, :reason, 'promote', :confidence,
|
||
:momo_price, :pchome_price, :gap_pct, :sales_delta,
|
||
:footprint, 'pending', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||
ON CONFLICT (sku) DO UPDATE
|
||
SET reason = EXCLUDED.reason,
|
||
confidence = EXCLUDED.confidence,
|
||
momo_price = EXCLUDED.momo_price,
|
||
pchome_price = EXCLUDED.pchome_price,
|
||
gap_pct = EXCLUDED.gap_pct,
|
||
sales_7d_delta = EXCLUDED.sales_7d_delta,
|
||
model_footprint = EXCLUDED.model_footprint,
|
||
status = 'pending',
|
||
updated_at = CURRENT_TIMESTAMP
|
||
"""), {
|
||
"sku": sku,
|
||
"name": name,
|
||
"reason": reason,
|
||
"confidence": confidence,
|
||
"momo_price": getattr(threat, "momo_price", None) if threat else None,
|
||
"pchome_price": getattr(threat, "pchome_price", None) if threat else None,
|
||
"gap_pct": getattr(threat, "gap_pct", None) if threat else None,
|
||
"sales_delta": getattr(threat, "sales_7d_delta_pct", None) if threat else None,
|
||
"footprint": footprint_json,
|
||
})
|
||
db_written = True
|
||
logger.info(f"[Dispatcher] 推薦商品寫入 DB → {sku}")
|
||
except Exception as e:
|
||
logger.error(f"[Dispatcher] DB 寫入失敗 {sku}: {e}")
|
||
|
||
msg = self._fmt_recommendation(
|
||
sku, name, reason, confidence, db_written, footprint,
|
||
)
|
||
self._send_telegram(msg)
|
||
# ADR-007 雙寫
|
||
self._sink_insight_to_km(
|
||
insight_type="recommendation",
|
||
sku=sku, name=name,
|
||
content=f"[推薦商品] {name}。原因:{reason}",
|
||
metadata={"confidence": confidence, "db_written": db_written},
|
||
)
|
||
|
||
def _exec_flag_for_human_review(
|
||
self,
|
||
sku: str, name: str, concern: str, confidence: float,
|
||
footprint: str = "",
|
||
momo_price: float = None, comp_price: float = None,
|
||
gap_pct: float = None, sales_delta: float = None,
|
||
revenue_loss_7d: float = 0.0,
|
||
recommended_price: Optional[float] = None,
|
||
match_type: str = "",
|
||
price_basis: str = "",
|
||
alert_tier: str = "",
|
||
match_score: float = 0.0,
|
||
competitor_product_id: str = "",
|
||
competitor_product_name: str = "",
|
||
):
|
||
"""發送語意化人工覆核請求"""
|
||
concern = _sanitize_text(concern, fallback=f"數據走勢違背常理,疑似缺貨或前台異常。")
|
||
msg = self._fmt_human_review(
|
||
sku, name, concern, footprint,
|
||
momo_price=momo_price, comp_price=comp_price,
|
||
gap_pct=gap_pct, sales_delta=sales_delta,
|
||
revenue_loss_7d=revenue_loss_7d,
|
||
recommended_price=recommended_price,
|
||
match_type=match_type,
|
||
price_basis=price_basis,
|
||
alert_tier=alert_tier,
|
||
match_score=match_score,
|
||
competitor_product_id=competitor_product_id,
|
||
competitor_product_name=competitor_product_name,
|
||
)
|
||
decision_envelope = _build_price_decision_envelope(
|
||
decision_type="human_review",
|
||
sku=sku,
|
||
name=name,
|
||
gap_pct=gap_pct,
|
||
sales_delta=sales_delta,
|
||
confidence=confidence,
|
||
analysis=concern,
|
||
momo_price=momo_price,
|
||
comp_price=comp_price,
|
||
revenue_loss_7d=revenue_loss_7d,
|
||
recommended_price=recommended_price,
|
||
match_type=match_type,
|
||
price_basis=price_basis,
|
||
alert_tier=alert_tier,
|
||
match_score=match_score,
|
||
competitor_product_id=competitor_product_id,
|
||
competitor_product_name=competitor_product_name,
|
||
)
|
||
self._send_telegram(msg, decision_envelope=decision_envelope)
|
||
logger.info(
|
||
f"[Dispatcher] 人工覆核請求 → {sku} loss=${revenue_loss_7d:,.0f}"
|
||
)
|
||
# ADR-007 雙寫
|
||
self._sink_insight_to_km(
|
||
insight_type="human_review",
|
||
sku=sku, name=name,
|
||
content=f"[人工覆核] {name}。疑慮:{concern}",
|
||
metadata={"confidence": confidence, "gap_pct": gap_pct, "sales_delta": sales_delta,
|
||
"momo_price": momo_price, "comp_price": comp_price,
|
||
"revenue_loss_7d": revenue_loss_7d,
|
||
"recommended_price": recommended_price,
|
||
"match_type": match_type,
|
||
"price_basis": price_basis,
|
||
"alert_tier": alert_tier,
|
||
"match_score": match_score,
|
||
"competitor_product_id": competitor_product_id,
|
||
"decision_envelope": decision_envelope},
|
||
)
|
||
|
||
def _exec_route_to_km(
|
||
self,
|
||
sku: str, name: str, km_domain: str, summary: str, confidence: float,
|
||
footprint: str = "",
|
||
threat=None,
|
||
):
|
||
"""
|
||
將洞察路由到 KM 指定領域,sink 到 ai_insights 供 RAG 使用。
|
||
不送 Telegram 告警(靜默操作,僅 log)。
|
||
"""
|
||
_KM_DOMAINS = {"price_competition", "sales_anomaly", "promotion_opportunity", "market_trend"}
|
||
domain = km_domain if km_domain in _KM_DOMAINS else "price_competition"
|
||
summary = _sanitize_text(summary, fallback="競價洞察已歸檔")
|
||
|
||
self._sink_insight_to_km(
|
||
insight_type=f"km_{domain}",
|
||
sku=sku, name=name,
|
||
content=f"[KM 路由 {domain}] {name}:{summary}",
|
||
metadata={
|
||
"km_domain": domain,
|
||
"confidence": confidence,
|
||
"momo_price": getattr(threat, "momo_price", None) if threat else None,
|
||
"pchome_price": getattr(threat, "pchome_price", None) if threat else None,
|
||
"gap_pct": getattr(threat, "gap_pct", None) if threat else None,
|
||
"sales_delta": getattr(threat, "sales_7d_delta_pct", None) if threat else None,
|
||
},
|
||
)
|
||
logger.info(f"[Dispatcher] KM 路由 → {sku} domain={domain} confidence={confidence:.2f}")
|
||
|
||
def _exec_mark_for_relearn(
|
||
self,
|
||
sku: str, name: str, reason: str,
|
||
footprint: str = "",
|
||
):
|
||
"""
|
||
將該 SKU 的既有 ai_insights 標記 status='relearn' + feedback_down+1,
|
||
讓每日去重/品質分數重算批次可感知「此洞察已被推翻」。
|
||
不送 Telegram 告警(靜默操作,僅 log)。
|
||
"""
|
||
reason = _sanitize_text(reason, fallback="新數據與歷史洞察矛盾,需重新學習")
|
||
try:
|
||
from database.manager import DatabaseManager
|
||
db = DatabaseManager()
|
||
with db.get_session() as session:
|
||
from sqlalchemy import text
|
||
result = session.execute(text("""
|
||
UPDATE ai_insights
|
||
SET status = 'relearn',
|
||
feedback_down = COALESCE(feedback_down, 0) + 1,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE product_sku = :sku
|
||
AND status NOT IN ('relearn', 'archived')
|
||
"""), {"sku": sku})
|
||
session.commit()
|
||
rows = result.rowcount
|
||
logger.info(f"[Dispatcher] mark_for_relearn → {sku} 共更新 {rows} 筆洞察;原因:{reason}")
|
||
except Exception as e:
|
||
logger.warning(f"[Dispatcher] mark_for_relearn DB 更新失敗 ({sku}): {e}")
|
||
|
||
# 同時寫入一筆 relearn 事件到 ai_insights 留存紀錄
|
||
self._sink_insight_to_km(
|
||
insight_type="relearn_event",
|
||
sku=sku, name=name,
|
||
content=f"[重新學習事件] {name}:{reason}",
|
||
metadata={"sku": sku, "trigger": "nemoton_dispatcher"},
|
||
)
|
||
|
||
def _sink_insight_to_km(self, insight_type: str, sku: str, name: str,
|
||
content: str, metadata: dict = None):
|
||
"""
|
||
ADR-007 雙寫:派發後把決策/洞察沉澱到 ai_insights(供日後 RAG/PPT)
|
||
period 以當日 YYYY-MM-DD 作為 cache-aside 鍵,同日同 SKU 同 type 會覆蓋。
|
||
失敗不阻斷主線。
|
||
"""
|
||
try:
|
||
from services.openclaw_learning_service import store_insight
|
||
period = datetime.now().strftime("%Y-%m-%d")
|
||
meta = {"sku": sku, "name": name}
|
||
if metadata:
|
||
meta.update(metadata)
|
||
store_insight(
|
||
insight_type=insight_type,
|
||
content=content,
|
||
period=period,
|
||
product_sku=sku,
|
||
metadata=meta,
|
||
ai_model=NIM_MODEL,
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"[Dispatcher] sink insight 略過 ({insight_type}/{sku}): {e}")
|
||
|
||
def _send_telegram(self, message: str, decision_envelope: Optional[dict] = None):
|
||
"""
|
||
ADR-019 Phase 5: 改走 EventRouter 統一入口
|
||
舊行為(直接呼叫 Telegram Bot API + MarkdownV2 跳脫)已由 EventRouter
|
||
+ telegram_templates.send_telegram_with_result 取代。
|
||
|
||
失敗降級:EventRouter 內建 retry + JSONL queue replay;任何例外不阻斷主線。
|
||
"""
|
||
try:
|
||
from services.event_router import dispatch_sync
|
||
payload = {"raw_message": message}
|
||
event = {
|
||
"id": "",
|
||
"event_type": "nemoton_dispatch_alert",
|
||
"severity": "alert",
|
||
"source": "NemoTron.Dispatcher",
|
||
"title": "NemoTron 派發器告警",
|
||
"summary": message[:400],
|
||
"status": "dispatched",
|
||
"payload": payload,
|
||
}
|
||
if isinstance(decision_envelope, dict) and decision_envelope:
|
||
event["id"] = str(decision_envelope.get("decision_id") or "")[:52]
|
||
event["decision_envelope"] = decision_envelope
|
||
payload["decision_envelope"] = decision_envelope
|
||
if not event["id"]:
|
||
event.pop("id", None)
|
||
dispatch_sync(event=event)
|
||
except Exception as e:
|
||
logger.error(f"[Dispatcher] EventRouter dispatch 失敗: {e}")
|
||
logger.info(f"[Dispatcher] 告警內容(fallback log):{message[:200]}")
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 公開介面
|
||
# ──────────────────────────────────────────────
|
||
async def handle_l2(self, event: dict, ctx: dict) -> dict:
|
||
"""L2 行動規劃介面(給 EventRouter / Telegram NLP 使用)。
|
||
|
||
Contract:
|
||
event: {"message": str, "user_id": int, "chat_id": int, ...}
|
||
ctx: 包含 Hermes handle_l1 結果(在 "latest" key 或 flatten 後)
|
||
Returns:
|
||
{
|
||
"action_plan": [{"action": str, "params": dict}, ...],
|
||
"dispatch_to": "openclaw|direct_response|human_review",
|
||
"metadata": dict,
|
||
}
|
||
|
||
策略:
|
||
本方法不呼叫 NIM(批量 tool calling 專用),改以 L1 intent + 訊息關鍵字
|
||
決定下一步路由。複雜分析 → openclaw;簡單回覆 → direct_response。
|
||
任何例外都回 direct_response 保底。
|
||
"""
|
||
try:
|
||
message = (event or {}).get("message", "") or ""
|
||
event_type = (event or {}).get("event_type", "")
|
||
payload = (event or {}).get("payload") or {}
|
||
task_name = payload.get("task_name") or (event or {}).get("task_name")
|
||
|
||
if event_type == "scheduler_task_failure" and task_name:
|
||
try:
|
||
from services.agent_actions import ALLOWED_RETRY_TASKS
|
||
if task_name in ALLOWED_RETRY_TASKS:
|
||
return {
|
||
"session_id": f"evt:{event_type}:{(event or {}).get('source', 'unknown')}",
|
||
"plan_type": "retry_task",
|
||
"action_plan": [{
|
||
"action": "retry_task",
|
||
"params": {
|
||
"task_name": task_name,
|
||
"max_attempts": 2,
|
||
"backoff_sec": 60,
|
||
},
|
||
}],
|
||
"dispatch_to": "safe_action",
|
||
"auto_execute": True,
|
||
"metadata": {"event_type": event_type, "task_name": task_name},
|
||
}
|
||
except Exception as action_err:
|
||
logger.warning("[NemotronDispatcher.handle_l2] retry_task 規劃跳過: %s", action_err)
|
||
|
||
# ctx 可能是 {"latest": {...}} 或已攤平的 intent 結果
|
||
hermes = {}
|
||
if isinstance(ctx, dict):
|
||
hermes = ctx.get("latest") if isinstance(ctx.get("latest"), dict) else ctx
|
||
|
||
intent = (hermes or {}).get("intent", "unknown")
|
||
complexity = float((hermes or {}).get("complexity_score", 0.0) or 0.0)
|
||
needs_data = bool((hermes or {}).get("requires_data_fetch", False))
|
||
|
||
msg_lower = message.lower()
|
||
report_keywords = ("report", "ppt", "週報", "月報", "日報", "報表", "報告")
|
||
wants_report = any(k in msg_lower for k in report_keywords)
|
||
|
||
if wants_report or complexity >= 0.7 or needs_data or intent in (
|
||
"query_sales", "analyze_competitor", "report"
|
||
):
|
||
dispatch_to = "openclaw"
|
||
else:
|
||
dispatch_to = "direct_response"
|
||
|
||
action_plan = [{
|
||
"action": "strategist_analyze" if dispatch_to == "openclaw" else "reply_simple",
|
||
"params": {"message": message, "intent": intent},
|
||
}]
|
||
return {
|
||
"action_plan": action_plan,
|
||
"dispatch_to": dispatch_to,
|
||
"metadata": {"complexity_score": complexity, "intent": intent},
|
||
}
|
||
except Exception as e:
|
||
logger.warning(f"[NemotronDispatcher.handle_l2] 規劃失敗 fallback direct_response: {e}")
|
||
return {
|
||
"action_plan": [{"action": "reply_simple", "params": {}}],
|
||
"dispatch_to": "direct_response",
|
||
"metadata": {"error": str(e)},
|
||
}
|
||
|
||
def dispatch(self, threats: list, hermes_stats: Optional[dict] = None) -> dict:
|
||
"""
|
||
主入口:接收 Hermes 威脅清單,透過 NIM 決策後執行語意化告警
|
||
|
||
Args:
|
||
threats: list[PriceThreat]
|
||
hermes_stats: {"duration_sec": float, "tokens": int}
|
||
由 HermesAnalystService 傳入,用於運算足跡顯示
|
||
|
||
Returns:
|
||
{"dispatched": int, "skipped": int, "errors": list, "nim_stats": dict}
|
||
|
||
[2026-04-18 台北] Bug-2 防線二 Python 絕對獨裁路由(雙閘門):
|
||
閘門 A:銷量 ≤ -95% 絕對斷崖 → 100% 是缺貨/下架/前台異常,不論價差
|
||
(2026-04-18 傍晚升級:真實案例 sku=7440662 sales=-100% gap=6.1% 被 NemoTron 錯派降價)
|
||
閘門 B:銷量 ≤ -80% 且 |價差| < 5% → 中度斷崖 + 價差微小,定價非主因
|
||
命中任一閘門 → 強制走人工覆核,不進 NIM — Claude Opus 4.7
|
||
"""
|
||
if not threats:
|
||
return {"dispatched": 0, "skipped": 0, "errors": [], "nim_stats": {}}
|
||
|
||
# ── 防線二:Python 絕對獨裁預路由(雙閘門) ──
|
||
forced_review, nim_candidates = [], []
|
||
skipped = 0
|
||
for t in threats:
|
||
if _is_duplicate_alert(t.sku):
|
||
logger.info(f"[Dispatcher] SKU {t.sku} 在 {int(_ALERT_TTL_SEC/3600)}h 內已告警,觸發去重跳過。")
|
||
skipped += 1
|
||
continue
|
||
|
||
gate_a = t.sales_7d_delta_pct <= -95 # 絕對斷崖
|
||
gate_b = t.sales_7d_delta_pct <= -80 and abs(t.gap_pct) < 5 # 中度斷崖 + 微價差
|
||
if gate_a or gate_b:
|
||
forced_review.append(t)
|
||
else:
|
||
nim_candidates.append(t)
|
||
|
||
errors = []
|
||
dispatched = 0
|
||
|
||
if forced_review:
|
||
logger.warning(
|
||
f"[Dispatcher] 防線二攔截 {len(forced_review)} 筆斷崖異常(不進 NIM): "
|
||
+ ", ".join(
|
||
f"{t.sku}(sales={t.sales_7d_delta_pct:+.0f}%, gap={t.gap_pct:+.1f}%)"
|
||
for t in forced_review
|
||
)
|
||
)
|
||
hr_footprint = _build_footprint_block(hermes_stats, None)
|
||
for t in forced_review:
|
||
# 依閘門類型組 concern 文字
|
||
if t.sales_7d_delta_pct <= -95:
|
||
concern_text = (
|
||
f"銷量近乎歸零({t.sales_7d_delta_pct:+.1f}%)— 絕對斷崖警戒。"
|
||
"此等級業績崩潰幾乎 100% 為缺貨、下架、或前台頁面異常,"
|
||
"與定價策略無關,請營運人員立即走查前台。"
|
||
)
|
||
else:
|
||
concern_text = (
|
||
f"銷量斷崖下跌 {t.sales_7d_delta_pct:+.1f}%,但價差僅 "
|
||
f"{t.gap_pct:+.1f}%(絕對值 < 5%),與定價無關。"
|
||
"疑似缺貨、下架、或前台異常,請營運人員立即走查。"
|
||
)
|
||
try:
|
||
impact = _compute_business_impact(t)
|
||
self._exec_flag_for_human_review(
|
||
sku=t.sku,
|
||
name=t.name,
|
||
concern=concern_text,
|
||
confidence=0.99, # Python 規則判定,信心滿格
|
||
footprint=hr_footprint,
|
||
momo_price=t.momo_price,
|
||
comp_price=t.pchome_price,
|
||
gap_pct=t.gap_pct,
|
||
sales_delta=t.sales_7d_delta_pct,
|
||
revenue_loss_7d=impact["revenue_loss_7d"],
|
||
recommended_price=impact["recommended_price"],
|
||
**_threat_match_metadata(t),
|
||
)
|
||
dispatched += 1
|
||
except Exception as e:
|
||
errors.append(f"forced_review({t.sku}): {e}")
|
||
logger.error(f"[Dispatcher] 防線二執行失敗 {t.sku}: {e}")
|
||
|
||
# 若全部被防線二攔截,直接回傳(不浪費 NIM 配額)
|
||
if not nim_candidates:
|
||
logger.info(f"[Dispatcher] 全部 {len(forced_review)} 筆由防線二處理(或去重),不呼叫 NIM")
|
||
return {
|
||
"dispatched": dispatched,
|
||
"skipped": skipped,
|
||
"errors": errors,
|
||
"nim_stats": {},
|
||
}
|
||
|
||
# ── Operation Ollama-First v5.0 / Phase 3 / A9:qwen3 主路徑(feature flag 灰度)──
|
||
# NEMOTRON_OLLAMA_FIRST=false 時不進入此分支,僅作緊急退路。
|
||
# 若 qwen3 成功取得 tool_calls,沿用既有 TOOL_MAP 執行邏輯(共用 footprint/threat 注入)。
|
||
# 若 qwen3 失敗或 0 tool_calls → 不直接降到 Hermes 規則,先嘗試 NIM 備援,再走 ADR-004。
|
||
qwen3_used = False
|
||
qwen3_stats: Optional[dict] = None
|
||
qwen3_tool_calls: Optional[list] = None
|
||
if NEMOTRON_OLLAMA_FIRST:
|
||
try:
|
||
qwen3_tool_calls, qwen3_stats = self._call_qwen3_dispatch(nim_candidates)
|
||
if qwen3_tool_calls:
|
||
qwen3_used = True
|
||
logger.info(
|
||
f"[Dispatcher][qwen3] 主路徑成功 tool_calls={len(qwen3_tool_calls)} "
|
||
f"tokens={qwen3_stats.get('total_tokens', 0)}"
|
||
)
|
||
else:
|
||
logger.warning("[Dispatcher][qwen3] 0 tool_calls,fallback 至 NIM")
|
||
except Exception as e:
|
||
logger.warning(f"[Dispatcher][qwen3] 呼叫失敗 fallback NIM: {e}")
|
||
# log_ai_call 已在 _call_qwen3_dispatch 內標記 status=error + fallback_to=nim
|
||
qwen3_tool_calls = None
|
||
qwen3_stats = None
|
||
|
||
# qwen3 主路徑成功 → 直接進入工具執行區塊(跳過 NIM)
|
||
if qwen3_used:
|
||
tool_calls = qwen3_tool_calls
|
||
# 與既有 NIM 路徑一致的 stats 結構(footprint 顯示用)
|
||
nim_stats = {
|
||
"total_tokens": qwen3_stats.get("total_tokens", 0),
|
||
"quota_used": _nim_quota_used(), # 配額未動用
|
||
"provider": qwen3_stats.get("provider", "gcp_ollama"),
|
||
"model": qwen3_stats.get("model", NEMOTRON_OLLAMA_MODEL),
|
||
"host": qwen3_stats.get("host"),
|
||
"host_label": qwen3_stats.get("host_label"),
|
||
}
|
||
return self._execute_tool_calls(
|
||
tool_calls=tool_calls,
|
||
threats=threats,
|
||
hermes_stats=hermes_stats,
|
||
nim_stats=nim_stats,
|
||
pre_dispatched=dispatched,
|
||
pre_skipped=skipped,
|
||
pre_errors=errors,
|
||
)
|
||
|
||
# ── 進入 NIM 路徑(flag=false 緊急主路徑;flag=true 則為 qwen3 失敗備援)──
|
||
if not NIM_API_KEY:
|
||
logger.warning("[Dispatcher][ADR-004] NVIDIA_API_KEY 未設定,啟動 Hermes 規則引擎降級")
|
||
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
|
||
return {
|
||
"dispatched": dispatched + fb["dispatched"],
|
||
"skipped": skipped + fb["skipped"],
|
||
"errors": errors + fb["errors"],
|
||
"nim_stats": fb["nim_stats"],
|
||
}
|
||
|
||
if not _check_nim_quota():
|
||
logger.warning("[Dispatcher][ADR-004] NIM 配額耗盡,啟動 Hermes 規則引擎降級")
|
||
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
|
||
return {
|
||
"dispatched": dispatched + fb["dispatched"],
|
||
"skipped": skipped + fb["skipped"],
|
||
"errors": errors + fb["errors"],
|
||
"nim_stats": fb["nim_stats"],
|
||
}
|
||
|
||
try:
|
||
tool_calls, nim_stats = self._call_nim(nim_candidates)
|
||
if not tool_calls:
|
||
logger.warning("[Dispatcher][ADR-004] NIM 0 tool_calls,啟動 Hermes 規則引擎降級")
|
||
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
|
||
return {
|
||
"dispatched": dispatched + fb["dispatched"],
|
||
"skipped": skipped + fb["skipped"],
|
||
"errors": errors + fb["errors"],
|
||
"nim_stats": fb["nim_stats"],
|
||
}
|
||
except requests.HTTPError as e:
|
||
if e.response is not None and e.response.status_code == 429:
|
||
logger.warning("[Dispatcher][ADR-004] NIM HTTP 429,啟動 Hermes 規則引擎降級")
|
||
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
|
||
return {
|
||
"dispatched": dispatched + fb["dispatched"],
|
||
"skipped": skipped + fb["skipped"],
|
||
"errors": errors + fb["errors"],
|
||
"nim_stats": fb["nim_stats"],
|
||
}
|
||
logger.warning("[Dispatcher][ADR-004] NIM HTTP 錯誤,啟動 Hermes 規則引擎降級: %s", e)
|
||
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
|
||
return {
|
||
"dispatched": dispatched + fb["dispatched"],
|
||
"skipped": skipped + fb["skipped"],
|
||
"errors": errors + [str(e)] + fb["errors"],
|
||
"nim_stats": fb["nim_stats"],
|
||
}
|
||
except Exception as e:
|
||
logger.warning("[Dispatcher][ADR-004] NIM 呼叫失敗,啟動 Hermes 規則引擎降級: %s", e)
|
||
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
|
||
return {
|
||
"dispatched": dispatched + fb["dispatched"],
|
||
"skipped": skipped + fb["skipped"],
|
||
"errors": errors + [str(e)] + fb["errors"],
|
||
"nim_stats": fb["nim_stats"],
|
||
}
|
||
|
||
return self._execute_tool_calls(
|
||
tool_calls=tool_calls,
|
||
threats=threats,
|
||
hermes_stats=hermes_stats,
|
||
nim_stats=nim_stats,
|
||
pre_dispatched=dispatched,
|
||
pre_skipped=skipped,
|
||
pre_errors=errors,
|
||
)
|
||
|
||
# ──────────────────────────────────────────────
|
||
# tool_calls 執行區塊(NIM 與 qwen3 共用)
|
||
# ──────────────────────────────────────────────
|
||
def _execute_tool_calls(
|
||
self,
|
||
tool_calls: list,
|
||
threats: list,
|
||
hermes_stats: Optional[dict],
|
||
nim_stats: dict,
|
||
pre_dispatched: int = 0,
|
||
pre_skipped: int = 0,
|
||
pre_errors: Optional[list] = None,
|
||
) -> dict:
|
||
"""執行 LLM 回傳的 tool_calls 清單,注入 Python 獨裁的客觀數字 + 金額影響。
|
||
被 NIM 路徑與 qwen3 路徑共用,避免雙路雙維護。
|
||
"""
|
||
errors = list(pre_errors or [])
|
||
dispatched = pre_dispatched
|
||
|
||
footprint_text = _build_footprint_block(hermes_stats, nim_stats)
|
||
footprint_data = _build_footprint_json(hermes_stats, nim_stats)
|
||
|
||
threat_map = {t.sku: t for t in threats}
|
||
|
||
TOOL_MAP = {
|
||
"trigger_price_alert": self._exec_trigger_price_alert,
|
||
"add_to_recommendation": self._exec_add_to_recommendation,
|
||
"flag_for_human_review": self._exec_flag_for_human_review,
|
||
"route_to_km": self._exec_route_to_km,
|
||
"mark_for_relearn": self._exec_mark_for_relearn,
|
||
}
|
||
|
||
for tc in tool_calls:
|
||
tool_name = tc.get("tool")
|
||
args = dict(tc.get("args", {}) or {})
|
||
handler = TOOL_MAP.get(tool_name)
|
||
|
||
if not handler:
|
||
errors.append(f"未知工具: {tool_name}")
|
||
continue
|
||
|
||
args["footprint"] = footprint_text
|
||
|
||
t = threat_map.get(args.get("sku"))
|
||
if tool_name == "trigger_price_alert" and t and not _can_direct_price_alert(t):
|
||
match_meta = _threat_match_metadata(t)
|
||
tool_name = "flag_for_human_review"
|
||
handler = TOOL_MAP[tool_name]
|
||
args = {
|
||
"sku": getattr(t, "sku", args.get("sku")),
|
||
"name": getattr(t, "name", args.get("name")),
|
||
"concern": (
|
||
"比對證據尚未達直接價格告警門檻;"
|
||
f"match_type={match_meta['match_type']}、"
|
||
f"price_basis={match_meta['price_basis']}、"
|
||
f"alert_tier={match_meta['alert_tier']}。"
|
||
"請先覆核是否為同款、同包裝或需改用單位價。"
|
||
),
|
||
"confidence": max(float(getattr(t, "confidence", 0.5) or 0.5), 0.75),
|
||
"footprint": footprint_text,
|
||
}
|
||
if tool_name == "trigger_price_alert" and t:
|
||
args["momo_price"] = getattr(t, "momo_price", None)
|
||
args["comp_price"] = getattr(t, "pchome_price", None)
|
||
args["gap_pct"] = getattr(t, "gap_pct", None)
|
||
args["sales_delta"] = getattr(t, "sales_7d_delta_pct", None)
|
||
impact = _compute_business_impact(t)
|
||
args["revenue_loss_7d"] = impact["revenue_loss_7d"]
|
||
args["recommended_price"] = impact["recommended_price"]
|
||
args.update(_threat_match_metadata(t))
|
||
elif tool_name == "flag_for_human_review" and t:
|
||
args["momo_price"] = getattr(t, "momo_price", None)
|
||
args["comp_price"] = getattr(t, "pchome_price", None)
|
||
args["gap_pct"] = getattr(t, "gap_pct", None)
|
||
args["sales_delta"] = getattr(t, "sales_7d_delta_pct", None)
|
||
impact = _compute_business_impact(t)
|
||
args["revenue_loss_7d"] = impact["revenue_loss_7d"]
|
||
args["recommended_price"] = impact["recommended_price"]
|
||
args.update(_threat_match_metadata(t))
|
||
elif tool_name == "add_to_recommendation":
|
||
args["footprint_data"] = footprint_data
|
||
args["threat"] = t
|
||
elif tool_name == "route_to_km":
|
||
args["threat"] = t
|
||
|
||
try:
|
||
handler(**args)
|
||
dispatched += 1
|
||
except Exception as e:
|
||
errors.append(f"{tool_name}({args.get('sku', '?')}): {e}")
|
||
logger.error(f"[Dispatcher] 工具執行失敗 [{tool_name}]: {e}")
|
||
|
||
skipped = max(0, len(threats) - dispatched)
|
||
# nim_stats 在 qwen3 路徑下會帶 provider='gcp_ollama',log 出處可區辨
|
||
provider = nim_stats.get("provider", "nim") if isinstance(nim_stats, dict) else "nim"
|
||
logger.info(
|
||
f"[Dispatcher] 完成 provider={provider} "
|
||
f"dispatched={dispatched} skipped={skipped} "
|
||
f"errors={len(errors)} tokens={nim_stats.get('total_tokens', 0)}"
|
||
)
|
||
return {
|
||
"dispatched": dispatched,
|
||
"skipped": skipped,
|
||
"errors": errors,
|
||
"nim_stats": nim_stats,
|
||
}
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# CLI 測試(需設 NVIDIA_API_KEY env var)
|
||
# python3 services/nemoton_dispatcher_service.py
|
||
# ─────────────────────────────────────────────
|
||
if __name__ == "__main__":
|
||
import sys
|
||
from dataclasses import dataclass
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
||
|
||
@dataclass
|
||
class FakeThreat:
|
||
sku: str
|
||
name: str
|
||
category: str
|
||
momo_price: float
|
||
pchome_price: float
|
||
gap_pct: float
|
||
sales_7d_delta_pct: float
|
||
risk: str
|
||
recommended_action: str
|
||
confidence: float
|
||
sales_7d_curr_amount: float = 0.0
|
||
sales_7d_prev_amount: float = 0.0
|
||
|
||
fake_threats = [
|
||
FakeThreat("A003", "舒特膚AD乳液200ml", "美妝保養",
|
||
1200, 980, 22.4, -35.0, "HIGH",
|
||
"建議立即降價至 $1,000 迎戰,或發放 $200 專屬折價券", 0.85,
|
||
sales_7d_curr_amount=78000, sales_7d_prev_amount=120000),
|
||
FakeThreat("A001", "玻尿酸面膜10片裝", "美妝保養",
|
||
320, 280, 14.3, -42.0, "HIGH",
|
||
"建議跟進降價至 $285,配合限時加購活動", 0.78,
|
||
sales_7d_curr_amount=58000, sales_7d_prev_amount=100000),
|
||
FakeThreat("A009", "美白化妝水150ml", "美妝保養",
|
||
420, 350, 20.0, -22.0, "HIGH",
|
||
"價格差距過大,建議優先調降或捆包促銷", 0.45,
|
||
sales_7d_curr_amount=78000, sales_7d_prev_amount=100000),
|
||
]
|
||
|
||
# 模擬 Hermes 運算足跡
|
||
fake_hermes_stats = {"duration_sec": 34.2, "tokens": 512}
|
||
|
||
if not NIM_API_KEY:
|
||
print("⚠️ NVIDIA_API_KEY 未設定,測試訊息格式(不呼叫 NIM)")
|
||
print()
|
||
dispatcher = NemotronDispatcher()
|
||
fake_nim_stats = {"total_tokens": 185, "quota_used": 2}
|
||
footprint = _build_footprint_block(fake_hermes_stats, fake_nim_stats)
|
||
|
||
# 測試三種訊息格式
|
||
print("=== 類別一:緊急告警(含 B' 金額影響) ===")
|
||
print(NemotronDispatcher._fmt_price_alert(
|
||
"A003", "舒特膚AD乳液200ml", 1200, 980,
|
||
22.4, -35.0, "建議立即降價至 $1,000 迎戰,或發放 $200 專屬折價券",
|
||
0.85, footprint,
|
||
revenue_loss_7d=42000.0, # B' 軌驗證:120k - 78k = 42k 流失
|
||
recommended_price=980, # B' 軌驗證:跟進競品價
|
||
))
|
||
print()
|
||
print("=== 類別二:人工覆核 ===")
|
||
# [2026-04-18 台北] CLI 測試修正:_fmt_human_review 用 kwargs,避免位置錯位 — Claude Opus 4.7
|
||
print(NemotronDispatcher._fmt_human_review(
|
||
sku="A001", name="玻尿酸面膜10片裝",
|
||
concern=("銷量斷崖下跌 -100.0%,但價差僅 +2.1%(絕對值 < 5%),與定價無關。"
|
||
"疑似缺貨、下架、或前台異常,請營運人員立即走查。"),
|
||
footprint=footprint,
|
||
momo_price=285, comp_price=280, gap_pct=1.8, sales_delta=-100.0,
|
||
))
|
||
print()
|
||
print("=== 類別三:策略推薦 ===")
|
||
print(NemotronDispatcher._fmt_recommendation(
|
||
"A009", "美白化妝水150ml",
|
||
"我方價格低於市場 20%,近7天銷量回升,具備流量轉換潛力",
|
||
0.82, True, footprint,
|
||
))
|
||
print()
|
||
|
||
# ── 防線二 Bug-2 驗證:Python 絕對獨裁路由 ──
|
||
print("=== 防線二驗證:Python 絕對獨裁預路由 ===")
|
||
validation_threats = [
|
||
FakeThreat("A-EXTREME-1", "斷崖商品(缺貨)", "測試",
|
||
300, 298, 0.7, -100.0, "MED",
|
||
"疑似缺貨", 0.5),
|
||
FakeThreat("A-EXTREME-2", "斷崖商品(負價差)", "測試",
|
||
298, 310, -3.9, -85.0, "MED",
|
||
"疑似前台異常", 0.5),
|
||
# [2026-04-18 台北] 閘門 A 案例:sales=-100% gap 超過 5% 仍要攔(真實 sku 7440662)
|
||
FakeThreat("A-GATE-A", "雪芙蘭小蒼蘭滋養膏 60g", "美妝保養",
|
||
52, 49, 6.1, -100.0, "HIGH",
|
||
"建議跟進降價促銷", 0.95),
|
||
FakeThreat("A-NORMAL-1", "正常降價案例", "測試",
|
||
1200, 980, 22.4, -35.0, "HIGH",
|
||
"建議跟進降價", 0.85),
|
||
]
|
||
forced, nim_list = [], []
|
||
for t in validation_threats:
|
||
gate_a = t.sales_7d_delta_pct <= -95
|
||
gate_b = t.sales_7d_delta_pct <= -80 and abs(t.gap_pct) < 5
|
||
if gate_a or gate_b:
|
||
forced.append(t)
|
||
else:
|
||
nim_list.append(t)
|
||
print(f"強制人工覆核 {len(forced)} 筆(預期 3:閘門 A + 閘門 B × 2): "
|
||
f"{[t.sku for t in forced]}")
|
||
print(f"送入 NIM 決策 {len(nim_list)} 筆(預期 1): "
|
||
f"{[t.sku for t in nim_list]}")
|
||
assert len(forced) == 3, f"防線二分類錯誤:forced 應為 3,實為 {len(forced)}"
|
||
assert len(nim_list) == 1, f"防線二分類錯誤:nim 應為 1,實為 {len(nim_list)}"
|
||
print("✅ 防線二雙閘門分類邏輯正確")
|
||
sys.exit(0)
|
||
|
||
print("=== NemoTron Dispatcher CLI 測試(真實 NIM) ===\n")
|
||
dispatcher = NemotronDispatcher()
|
||
result = dispatcher.dispatch(fake_threats, hermes_stats=fake_hermes_stats)
|
||
print(
|
||
f"\n結果:dispatched={result['dispatched']} "
|
||
f"skipped={result['skipped']} errors={result['errors']}\n"
|
||
f"NIM: {result['nim_stats']}"
|
||
)
|