Files
ewoooc/services/nemoton_dispatcher_service.py
OoO 35ad29b04d
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
V10.582 強化比價通知與身份證據
2026-06-04 14:03:21 +08:00

2165 lines
96 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NemoTron 行動派發器 (Module 2 — Dispatcher)
角色:派發器 (Dispatcher)
模型NVIDIA NIM meta/llama-3.1-8b-instructTool Calling 專用)
輸入Hermes 分析師輸出的 Top N 威脅清單 (list[PriceThreat]) + 運算足跡
輸出:呼叫工具 → Telegram 語意化告警 / DB 推薦商品寫入
訊息規範:語意化 Emoji 字典 + 倒金字塔結構 + 底部運算足跡區塊
詳見 docs/AI_INTELLIGENCE_MODULE_SOT.md § 五、六
工具清單(扁平化,避免 NIM JSON 截斷):
trigger_price_alert → Telegram 競價高危險預警
add_to_recommendation → 寫入前台推薦商品 + Telegram 通知
flag_for_human_review → Telegram 人工覆核請求
"""
import json
import logging
import os
import re
import uuid
from datetime import datetime
from typing import Optional
import requests
from services.mcp_context_service import build_mcp_context
from config import HERMES_URL # ADR-008 集中化:禁止硬編碼 IP
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
logger = logging.getLogger(__name__)
# [2026-04-18 台北] Bug-3.1 防線升級:簡體字/異體字黑名單2次精選 — Claude Opus 4.7
# 規則:只收「繁體中文環境 100% 不會出現」的字。繁簡通用字(迎、据、准、理、
# 整、別、始、束、史、點、但、於、原、據、準、理)一律剔除,避免誤報正常繁中。
_SIMPLIFIED_BLACKLIST = frozenset(
# 頂級高頻(繁體寫法完全不同)
'参给当为来国发会说时间过从实现个这话动问题'
'经济学员务关对应听见设计电脑产东团专义价样种让'
'议统战爱态头带业无该决积权导档号风险'
# 商業 / 財務常見
'选择证龙习惯亏损营运货币场较况负责调优势'
# 書寫表達
'认识书写结历观终则'
# 異體字
''
)
def _has_repeated_phrase(text: str, min_len: int = 4, min_count: int = 2) -> bool:
"""偵測連續中文片段中「min_len 字子串」重複出現 ≥ min_count 次的語意坍塌。
範例:
「当前事亊当前事亊」→ min_len=4, 「当前事亊」重複 2 次 → True
「價差擴大且銷量下滑」→ 無 4 字重複 → False
"""
chinese = ''.join(c for c in text if '\u4e00' <= c <= '\u9fff')
if len(chinese) < min_len * min_count:
return False
for i in range(len(chinese) - min_len + 1):
phrase = chinese[i:i + min_len]
if chinese.count(phrase) >= min_count:
return True
return False
def _sanitize_text(text: str, fallback: str = "請人工確認", max_len: int = 200) -> str:
"""防止 LLM 幻覺文字輸出到 Telegram。
[2026-04-18 台北] Bug-3.1 三層檢測升級 — Claude Opus 4.7
L1: 連續 ≥15 字中文且整段無標點 → 幻覺(原規則)
L2: 命中簡體字/異體字黑名單 → 簡繁污染
L3: 連續中文片段內 4+ 字子串重複 ≥2 次 → 語意坍塌
任一層命中 → fallback 替換為「請人工確認」(或呼叫端指定字串)
"""
if not text or not isinstance(text, str):
return fallback
text = text.strip()[:max_len]
# L1: 連續無標點中文區段(原規則)
if re.search(r'[\u4e00-\u9fff]{15,}', text) and not re.search(r'[,。、?!\s]', text):
logger.warning(f"[Sanitize L1] 連續無標點中文幻覺fallback: {text[:40]}")
return fallback
# L2: 簡體字/異體字污染
hits = [c for c in text if c in _SIMPLIFIED_BLACKLIST]
if hits:
logger.warning(f"[Sanitize L2] 簡體/異體字污染 {''.join(sorted(set(hits)))}fallback: {text[:40]}")
return fallback
# L3: 短語重複(語意坍塌)
if _has_repeated_phrase(text):
logger.warning(f"[Sanitize L3] 短語重複偵測語意坍塌fallback: {text[:40]}")
return fallback
return text or fallback
# ── NVIDIA NIM ──────────────────────────────────────
NIM_BASE_URL = "https://integrate.api.nvidia.com/v1"
NIM_MODEL = "meta/llama-3.1-8b-instruct"
NIM_API_KEY = os.getenv("NVIDIA_API_KEY", "")
NIM_TIMEOUT = 60 # 秒
# ── 每日配額守門 ─────────────────────────────────────
NIM_DAILY_LIMIT = 80 # 留 20 個給 AWOOOI100/天免費配額
_nim_call_count = {"date": "", "count": 0}
# ── Operation Ollama-First v5.0 / Phase 3 / A9 ──────────────────
# GCP Ollama qwen3:14b 灰度切換開關
# - true → qwen3 主路徑NIM 降為備援,最後仍兜底 Hermes 規則引擎ADR-004
# - false → 緊急停用 Ollama-first 時才回 NIM-first
# 模型選擇A2 web-research 紅綠燈報告 docs/phase0_research_report_20260503.md
# 原戰役計畫 deepseek-r1:14b 的 Ollama tool_calls chat template 缺對應 jinja
# GitHub Issue #10935 未解),改採 qwen3:14bOllama 官方 + qwenlm 雙確認 tools 支援)。
# 統帥 2026-05-03 23:30 指令:「免費優先」— GCP qwen3:14b 已拉
# 預設 ONqwen3:14b 主 → NIM 備援 → Hermes 規則引擎兜底ADR-004
# 緊急停用(回 NIM-firstexport NEMOTRON_OLLAMA_FIRST=false
NEMOTRON_OLLAMA_FIRST = os.getenv("NEMOTRON_OLLAMA_FIRST", "true").lower() == "true"
NEMOTRON_OLLAMA_MODEL = os.getenv("NEMOTRON_OLLAMA_MODEL", "qwen3:14b")
NEMOTRON_OLLAMA_TIMEOUT = int(os.getenv("NEMOTRON_OLLAMA_TIMEOUT", "180")) # 秒
def _check_nim_quota() -> bool:
today = datetime.now().strftime("%Y-%m-%d")
if _nim_call_count["date"] != today:
_nim_call_count["date"] = today
_nim_call_count["count"] = 0
if _nim_call_count["count"] >= NIM_DAILY_LIMIT:
logger.warning(f"[NIM] 今日配額已達上限 {NIM_DAILY_LIMIT},跳過")
return False
_nim_call_count["count"] += 1
return True
# ── 告警去重 (Deduplication) 快取 ────────────────────────
import time
_ALERT_CACHE = {} # {sku: timestamp}
_ALERT_TTL_SEC = 4 * 3600 # 預設 4 小時(防止同商品短時間重複告警)
def _is_duplicate_alert(sku: str) -> bool:
"""檢查是否在 TTL 內已經告警過。若是則回傳 True否則記錄當下時間並回傳 False"""
now = time.time()
last_alert = _ALERT_CACHE.get(sku)
if last_alert and (now - last_alert) < _ALERT_TTL_SEC:
return True
_ALERT_CACHE[sku] = now
# 順便清理過期快取,避免記憶體洩漏
expired = [k for k, v in _ALERT_CACHE.items() if (now - v) >= _ALERT_TTL_SEC]
for k in expired:
del _ALERT_CACHE[k]
return False
def _nim_quota_used() -> int:
"""回傳今日已使用配額數"""
today = datetime.now().strftime("%Y-%m-%d")
if _nim_call_count["date"] != today:
return 0
return _nim_call_count["count"]
# ── Tool 定義(扁平化 Schema────────────────────────
TOOLS = [
{
"type": "function",
"function": {
"name": "trigger_price_alert",
"description": "發送競價高危險預警 Telegram 告警,當商品價格比競品貴且銷量明顯下滑時使用",
"parameters": {
"type": "object",
# [2026-04-18 台北] Bug-1 防線一:移除 momo_price / comp_price —
# 客觀售價由 Python 從 threat_map 獨裁注入,不讓 LLM 碰
# 避免 NemoTron 漏吐 → default=0 → Telegram 顯示 $0 幻覺 — Claude Opus 4.7
"properties": {
"sku": {"type": "string", "description": "商品 SKU 編號"},
"name": {"type": "string", "description": "商品名稱"},
"gap_pct": {"type": "number", "description": "我方與競品的價差百分比(正值代表我貴)"},
"sales_delta": {"type": "number", "description": "近7天銷量變動百分比負值代表下滑"},
"action": {"type": "string", "description": "建議行動說明"},
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
},
"required": ["sku", "name", "gap_pct", "sales_delta", "action", "confidence"],
},
},
},
{
"type": "function",
"function": {
"name": "add_to_recommendation",
"description": "將價格具競爭力且庫存充足的商品加入前台推薦商品區塊",
"parameters": {
"type": "object",
"properties": {
"sku": {"type": "string", "description": "商品 SKU 編號"},
"name": {"type": "string", "description": "商品名稱"},
"reason": {"type": "string", "description": "推薦原因(給後台審核人員)"},
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
},
"required": ["sku", "name", "reason", "confidence"],
},
},
},
{
"type": "function",
"function": {
"name": "flag_for_human_review",
"description": "當情況複雜、AI 信心不足,或需要人工決策時,發送 Telegram 請求人工覆核",
"parameters": {
"type": "object",
"properties": {
"sku": {"type": "string", "description": "商品 SKU 編號"},
"name": {"type": "string", "description": "商品名稱"},
"concern": {"type": "string", "description": "需要人工判斷的疑慮說明(含矛盾數據描述)"},
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
},
"required": ["sku", "name", "concern", "confidence"],
},
},
},
{
"type": "function",
"function": {
"name": "route_to_km",
"description": (
"將商品競價洞察路由到知識庫KM的指定領域分類"
"供未來 RAG 查詢與 OpenClaw 週報引用。"
"適用於:數據有參考價值但不需立即告警的情況。"
),
"parameters": {
"type": "object",
"properties": {
"sku": {"type": "string", "description": "商品 SKU 編號"},
"name": {"type": "string", "description": "商品名稱"},
"km_domain": {
"type": "string",
"description": (
"KM 領域分類,必須為以下之一:"
"price_competition競價情報"
"sales_anomaly銷量異常"
"promotion_opportunity促銷機會"
"market_trend市場趨勢"
),
},
"summary": {"type": "string", "description": "此洞察的核心摘要繁體中文50 字內)"},
"confidence": {"type": "number", "description": "AI 信心度 0.0~1.0"},
},
"required": ["sku", "name", "km_domain", "summary", "confidence"],
},
},
},
{
"type": "function",
"function": {
"name": "mark_for_relearn",
"description": (
"當新數據與 KM 既有洞察矛盾,或告警方向被推翻時,"
"將該商品的歷史洞察標記為需重新學習relearn"
"適用於:此次分析結果與上次截然不同的情況。"
),
"parameters": {
"type": "object",
"properties": {
"sku": {"type": "string", "description": "商品 SKU 編號"},
"name": {"type": "string", "description": "商品名稱"},
"reason": {"type": "string", "description": "標記原因說明(繁體中文)"},
},
"required": ["sku", "name", "reason"],
},
},
},
]
# ── 金額影響量化B' 軌:告警必須攜帶可決策的金額數字) ──
def _compute_business_impact(threat) -> dict:
"""從 PriceThreat 計算「過去 7 日營收流失」與「建議調價金額」。
回傳純 Python 客觀計算結果,由 dispatcher 強制注入告警 — LLM 不得碰觸這些數字。
revenue_loss_7d
= max(0, sales_7d_prev_amount - sales_7d_curr_amount) **僅在 gap_pct > 0 時**
語意:「我方比競品貴,且過去 7 日銷量金額下滑 → 推估價格因素導致的流失」
若 gap_pct ≤ 0我方已便宜或持平即使銷量下滑亦歸 0避免把
季節性/商品壽命終結等非價格因素誤標為「流失」誘導降價Critic Medium-3 fix
recommended_price
= round(pchome_price)
語意:「跟進競品的最低調價金額」;統帥可基於此再依毛利策略加溢價
gap_pct ≤ 0我方已便宜或持平→ recommended_price=None不需調價
"""
try:
gap_pct = float(getattr(threat, "gap_pct", 0) or 0)
except (TypeError, ValueError):
gap_pct = 0.0
revenue_loss_7d = 0.0
if gap_pct > 0:
try:
prev = float(getattr(threat, "sales_7d_prev_amount", 0) or 0)
curr = float(getattr(threat, "sales_7d_curr_amount", 0) or 0)
revenue_loss_7d = max(0.0, prev - curr)
except (TypeError, ValueError):
revenue_loss_7d = 0.0
recommended_price = None
try:
pchome = float(getattr(threat, "pchome_price", 0) or 0)
if pchome > 0 and gap_pct > 0:
recommended_price = round(pchome)
except (TypeError, ValueError):
pass
return {
"revenue_loss_7d": revenue_loss_7d,
"recommended_price": recommended_price,
}
# ── 語意化 Emoji 字典 ──────────────────────────────────
# 身份識別
HEADER_DISPATCHER = "⚡ NemoTron 派發器"
# 風險級別
ICON_CRITICAL = "🚨" # 高危險/立即行動
ICON_WARNING = "⚠️" # 中風險/人工覆核
ICON_INSIGHT = "💡" # 低風險/策略建議
ICON_REPORT = "📊" # 例行報告
# 業務屬性
ICON_PRICE = "💰"
ICON_SALES = "📦"
ICON_COMPETE = "🏆"
ICON_AI = "🧠"
ICON_FOOTPRINT = "⚙️"
MATCH_TYPE_LABELS = {
"exact": "高信心同款",
"same_product_different_pack": "同商品不同包裝",
"same_line_variant": "同系列不同款",
"comparable": "可比但需覆核",
"no_match": "非同款",
}
PRICE_BASIS_LABELS = {
"total_price": "總價可比",
"unit_price": "單位價可比",
"manual_review": "人工覆核後可比",
"none": "不可比",
}
ALERT_TIER_LABELS = {
"price_alert_exact": "可直接價格告警",
"unit_price_review": "單位價覆核",
"identity_review": "身份覆核",
"suppress": "不告警",
}
def _threat_match_metadata(threat) -> dict:
return {
"match_type": getattr(threat, "match_type", "exact") or "exact",
"price_basis": getattr(threat, "price_basis", "total_price") or "total_price",
"alert_tier": getattr(threat, "alert_tier", "price_alert_exact") or "price_alert_exact",
"match_score": float(getattr(threat, "match_score", 0) or 0),
"competitor_product_id": getattr(threat, "competitor_product_id", "") or "",
"competitor_product_name": getattr(threat, "competitor_product_name", "") or "",
}
def _can_direct_price_alert(threat) -> bool:
meta = _threat_match_metadata(threat)
return (
meta["match_type"] == "exact"
and meta["price_basis"] == "total_price"
and meta["alert_tier"] == "price_alert_exact"
)
def _format_match_evidence_block(
*,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> str:
match_type = match_type or "exact"
price_basis = price_basis or "total_price"
alert_tier = alert_tier or "price_alert_exact"
lines = [
f"{ICON_COMPETE} 比對證據:",
f"• 身份分級:{MATCH_TYPE_LABELS.get(match_type, match_type)}",
f"• 比價基準:{PRICE_BASIS_LABELS.get(price_basis, price_basis)}",
f"• 告警路徑:{ALERT_TIER_LABELS.get(alert_tier, alert_tier)}",
]
if match_score:
lines.append(f"• Match score{match_score:.3f}")
if competitor_product_id:
lines.append(f"• PChome ID{competitor_product_id}")
if competitor_product_name:
lines.append(f"• PChome 品名:{str(competitor_product_name)[:68]}")
return "\n".join(lines) + "\n\n"
def _safe_float(value, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _price_decision_data_quality(
momo_price,
comp_price,
match_score: float,
match_type: str,
price_basis: str,
alert_tier: str,
) -> str:
momo_ok = _safe_float(momo_price) > 0
comp_ok = _safe_float(comp_price) > 0
identity_ok = bool(match_type and price_basis and alert_tier)
score_ok = _safe_float(match_score) > 0
if momo_ok and comp_ok and identity_ok and score_ok:
return "complete"
if momo_ok or comp_ok or identity_ok:
return "partial"
return "missing"
def _price_decision_severity(
*,
decision_type: str,
gap_pct,
revenue_loss_7d: float,
alert_tier: str,
) -> str:
gap = abs(_safe_float(gap_pct))
loss = _safe_float(revenue_loss_7d)
if decision_type == "price_alert" and alert_tier == "price_alert_exact":
if gap >= 15 or loss >= 50000:
return "P1"
return "P2"
if loss >= 50000 or gap >= 20:
return "P2"
return "P3"
def _build_price_decision_envelope(
*,
decision_type: str,
sku: str,
name: str,
gap_pct,
sales_delta,
confidence: float,
analysis: str,
momo_price=None,
comp_price=None,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> dict:
"""建立 12 Agent 共用的價格決策信封;只描述證據,不執行價格或匹配覆寫。"""
match_type = match_type or "unknown"
price_basis = price_basis or "manual_review"
alert_tier = alert_tier or "identity_review"
match_score_value = _safe_float(match_score)
confidence_value = max(0.0, min(1.0, _safe_float(confidence, 0.0)))
gap_value = _safe_float(gap_pct)
sales_value = _safe_float(sales_delta)
momo_value = _safe_float(momo_price)
comp_value = _safe_float(comp_price)
loss_value = _safe_float(revenue_loss_7d)
gap_amount = None
if momo_value > 0 and comp_value > 0:
gap_amount = round(momo_value - comp_value, 2)
data_quality = _price_decision_data_quality(
momo_price=momo_price,
comp_price=comp_price,
match_score=match_score_value,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
)
severity = _price_decision_severity(
decision_type=decision_type,
gap_pct=gap_value,
revenue_loss_7d=loss_value,
alert_tier=alert_tier,
)
evidence = [
{
"type": "match",
"metric": "match_score",
"value": round(match_score_value, 3),
"basis": f"{match_type}/{price_basis}/{alert_tier}",
"confidence": round(match_score_value, 3) if match_score_value else None,
},
{
"type": "price",
"metric": "gap_pct",
"value": f"{gap_value:+.1f}%",
"basis": "MOMO latest price + PChome competitor_prices",
},
{
"type": "sales",
"metric": "sales_7d_delta_pct",
"value": f"{sales_value:+.1f}%",
"basis": "daily_sales_snapshot 7d vs previous 7d",
},
]
if loss_value > 0:
evidence.append({
"type": "impact",
"metric": "revenue_loss_7d",
"value": round(loss_value, 2),
"basis": "sales_7d_prev_amount - sales_7d_curr_amount",
})
action = "price_follow_review" if decision_type == "price_alert" else "identity_or_price_review"
blocked_reason = (
"價格調整需人工覆核;不得自動寫入或覆蓋正式競品價格"
if decision_type == "price_alert"
else "身份、包裝、單位價或前台狀態需人工確認"
)
risk_reduction = "high" if severity == "P1" else ("medium" if severity == "P2" else "watch")
return {
"decision_id": f"nemotron:{decision_type}:{sku}:{uuid.uuid4().hex[:8]}",
"source_agent": "nemotron",
"decision_type": decision_type,
"severity": severity,
"subject": {
"sku": sku,
"name": name,
"event_type": "price_competition",
"momo_price": momo_value if momo_value > 0 else None,
"competitor_price": comp_value if comp_value > 0 else None,
"competitor_product_id": competitor_product_id,
"competitor_product_name": str(competitor_product_name or "")[:120],
},
"evidence": evidence,
"analysis": _sanitize_text(analysis, fallback="請人工確認", max_len=300),
"recommended_action": {
"action": action,
"owner": "營運",
"requires_hitl": True,
},
"expected_impact": {
"momo_price": momo_value if momo_value > 0 else None,
"competitor_price": comp_value if comp_value > 0 else None,
"candidate_gap_pct": round(gap_value, 1),
"sales_7d_delta_pct": round(sales_value, 1),
"revenue_loss_7d": round(loss_value, 2),
"gap_amount": gap_amount,
"recommended_price": recommended_price,
"risk_reduction": risk_reduction,
},
"confidence": round(confidence_value, 3),
"guardrails": {
"can_auto_execute": False,
"blocked_reason": blocked_reason,
"data_quality": data_quality,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
},
"trace": {
"model": NEMOTRON_OLLAMA_MODEL if NEMOTRON_OLLAMA_FIRST else NIM_MODEL,
"provider": "nemotron_dispatcher",
},
}
# ── tool_calls 解析NIM 與 qwen3 共用)──────────────────────────
def _parse_tool_calls_struct(tool_calls: list) -> list:
"""從 OpenAI 格式的 tool_calls 結構陣列抽出 [{tool, args}] 清單。
NIM 與 qwen3 (Ollama /api/chat) 兩邊回應對齊 OpenAI schema
[{"function": {"name": ..., "arguments": <json-str-or-dict>}, ...}]
arguments 在 NIM 是 JSON 字串、在 Ollama 通常已是 dict本 helper 兼容兩者。
"""
results = []
for tc in tool_calls or []:
fn = tc.get("function", {}) if isinstance(tc, dict) else {}
if not fn:
continue
raw_args = fn.get("arguments", {})
if isinstance(raw_args, str):
try:
args = json.loads(raw_args) if raw_args.strip() else {}
except json.JSONDecodeError:
args = {}
elif isinstance(raw_args, dict):
args = raw_args
else:
args = {}
name = fn.get("name")
if name:
results.append({"tool": name, "args": args})
return results
def _parse_content_fallback(raw_content: str) -> list:
"""當模型沒回 tool_calls 結構、把工具呼叫塞進 content 時嘗試解析。
既有 NIM llama-3.1-8b 偶有此行為line 537-554 原邏輯);
qwen3 開 thinking_mode=False 後較少見,但保留同等容錯。
"""
if not raw_content or not isinstance(raw_content, str):
return []
try:
parsed = json.loads(raw_content.strip())
except Exception as parse_err:
logger.error(f"[ToolCalls] content fallback JSON 解析失敗:{parse_err}")
return []
if not isinstance(parsed, list):
return []
results = []
for item in parsed:
if not isinstance(item, dict):
continue
name = item.get("name") or (item.get("function", {}) or {}).get("name")
args = item.get("parameters") or item.get("arguments") or {}
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
if name:
results.append({"tool": name, "args": args})
if results:
logger.info(f"[ToolCalls] content fallback 解析成功,取得 {len(results)} 個 tool_calls")
return results
def _build_footprint_json(hermes_stats: Optional[dict], nim_stats: Optional[dict]) -> dict:
"""
建立結構化運算足跡 (用於 DB model_footprint JSONB 欄位)
Returns:
{"analyst": {...}, "dispatcher": {...}}
"""
result = {}
if hermes_stats:
result["analyst"] = {
"model": "qwen2.5:7b-instruct",
"host": hermes_stats.get("host", HERMES_URL),
"host_label": hermes_stats.get("host_label", "未知"),
"duration_sec": hermes_stats.get("duration_sec", 0),
"tokens": hermes_stats.get("tokens", 0),
"cost_usd": 0,
}
if nim_stats:
result["dispatcher"] = {
"model": NIM_MODEL,
"platform": "NVIDIA NIM",
"total_tokens": nim_stats.get("total_tokens", 0),
"quota_used": nim_stats.get("quota_used", 0),
"cost_usd": 0,
}
return result
def _build_footprint_block(hermes_stats: Optional[dict], nim_stats: Optional[dict]) -> str:
"""
建立底部運算足跡區塊Telegram 顯示用)
Args:
hermes_stats: {"duration_sec": float, "tokens": int}
nim_stats: {"total_tokens": int, "quota_used": int}
Returns:
分隔線 + ⚙️ 運算足跡 文字區塊
"""
lines = ["─────────────────────", f"{ICON_FOOTPRINT} 運算足跡:"]
if hermes_stats:
dur = hermes_stats.get("duration_sec", 0)
tok = hermes_stats.get("tokens", "?")
label = hermes_stats.get("host_label", "本地 188")
lines.append(
f"• 🔍 分析: Qwen2.5 7B ({label}) | "
f"耗時: {dur:.1f}s | Tokens: {tok} | $0 成本"
)
else:
lines.append("• 🔍 分析: Qwen2.5 7B (未知主機) | $0 成本")
if nim_stats:
tok = nim_stats.get("total_tokens", "?")
quota = nim_stats.get("quota_used", "?")
lines.append(
f"• ⚡ 決策: NemoTron NIM | "
f"{tok} Tokens | $0 (配額內 {quota}/{NIM_DAILY_LIMIT})"
)
else:
lines.append(f"• ⚡ 決策: NemoTron NIM | $0 (配額內)")
return "\n".join(lines)
class NemotronDispatcher:
"""
NemoTron 行動派發器
接收 Hermes 輸出 → NIM tool calling 決策 → 語意化 Telegram 告警 / DB 寫入
"""
def __init__(self, notification_manager=None, engine=None):
"""
Args:
notification_manager: NotificationManager 實例(負責 Telegram 推播)
engine: SQLAlchemy engine用於寫入推薦商品 DB
"""
self.nm = notification_manager
self.engine = engine
# ──────────────────────────────────────────────
# NIM Tool Calling
# ──────────────────────────────────────────────
def _call_nim(self, threats: list) -> tuple:
"""
將 Hermes 威脅清單交給 NIM取得 tool_calls 決策清單
Returns:
(list of {"tool": str, "args": dict}, dict nim_stats)
nim_stats: {"total_tokens": int, "quota_used": int}
"""
if not NIM_API_KEY:
logger.warning("[NemotronDispatcher] NVIDIA_API_KEY 未設定,跳過 NIM 呼叫")
raise RuntimeError("NVIDIA_API_KEY not configured")
threat_summary = json.dumps(
[
{
"sku": t.sku,
"name": t.name,
"momo_price": t.momo_price,
"pchome_price": t.pchome_price,
"gap_pct": t.gap_pct,
"sales_delta": t.sales_7d_delta_pct,
"risk": t.risk,
"action": t.recommended_action,
"confidence": t.confidence,
**_threat_match_metadata(t),
}
for t in threats
],
ensure_ascii=False,
)
# 注入 MCP 市場上下文
mcp_ctx = build_mcp_context()
messages = [
{
"role": "system",
"content": (
"你是台灣電商競價情報的行動派發器。"
f"當前市場背景 (MCP)\n{mcp_ctx}\n\n"
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
"路由鐵律(依序判斷,命中即停):\n"
"1. match_type 不是 exact或 price_basis 不是 total_price或 alert_tier 不是 price_alert_exact "
"→ 不可直接價格告警,呼叫 flag_for_human_reviewconcern 說明需覆核身份、包裝或單位價。\n"
"2. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
"3. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"4. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"5. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"每支商品只呼叫一個工具。\n"
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
" 1. 嚴禁簡體字(例:不可用「参给当为来国发会说时间过从实现这话动问题」,"
" 必須用「參給當為來國發會說時間過從實現這話動問題」)\n"
" 2. 嚴禁異體字(例:不可用「亊」,必須用「事」)\n"
" 3. 嚴禁短語重複(例:不可輸出「當前事當前事」這種坍塌)\n"
" 4. 嚴禁無意義字元組合或亂碼\n"
"若無法產出合理的繁體中文說明,直接輸出「請人工評估議價空間」。"
),
},
{
"role": "user",
"content": f"請處理以下 {len(threats)} 筆威脅清單:\n{threat_summary}",
},
]
# P1-4 修復NIM API 指數退避 retry最多 3 次)
# Phase 1 v5.0: 包 ai_call_logger 追蹤 NIM 配額/tokens/錯誤
import time as _time
last_err = None
with log_ai_call(
caller='nemotron_dispatch',
provider='nim',
model=NIM_MODEL,
meta={'threat_count': len(threats), 'quota_used': _nim_quota_used()},
) as _ctx:
for _attempt in range(3):
try:
resp = requests.post(
f"{NIM_BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {NIM_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": NIM_MODEL,
"messages": messages,
"tools": TOOLS,
"tool_choice": "required",
"max_tokens": 2048,
},
timeout=NIM_TIMEOUT,
)
resp.raise_for_status()
break
except (requests.Timeout, requests.HTTPError) as e:
last_err = e
# ADR-004: 429 不重試,立即拋出讓上層啟動 Hermes 規則引擎降級
if isinstance(e, requests.HTTPError) and e.response is not None \
and e.response.status_code == 429:
logger.warning("[NIM] HTTP 429 速率限制,跳出 retry 迴圈")
_ctx.set_error(f"NIM 429 rate-limited")
_ctx.fallback_to_caller('hermes_rule_engine')
raise
if _attempt < 2:
_time.sleep(2 ** _attempt)
logger.warning(f"[NIM] retry {_attempt + 1}/2 after {e}")
else:
raise last_err
body = resp.json()
usage = body.get("usage", {})
# 記錄 token / 成本到 ai_calls 表
_ctx.set_tokens(
input=usage.get("prompt_tokens", 0),
output=usage.get("completion_tokens", 0),
)
nim_stats = {
"total_tokens": usage.get("total_tokens", 0),
"quota_used": _nim_quota_used(),
}
choices = body.get("choices", [])
message = choices[0].get("message", {}) if choices else {}
tool_calls = message.get("tool_calls", []) or []
# 共用結構解析NIM / qwen3 兩邊統一走同一條)
results = _parse_tool_calls_struct(tool_calls)
if not results:
# llama-3.1-8b-instruct 有時把 tool call 寫進 content 而非 tool_calls 結構
raw_content = message.get("content", "") or ""
logger.warning(f"[NIM] 0 tool_calls嘗試從 content 解析:{raw_content[:120]}")
results = _parse_content_fallback(raw_content)
logger.info(f"[NIM] 收到 {len(results)} 個 tool_calls | tokens={nim_stats['total_tokens']}")
return results, nim_stats
# ──────────────────────────────────────────────
# GCP Ollama qwen3:14b Tool CallingOperation Ollama-First v5.0 / Phase 3
# ──────────────────────────────────────────────
def _call_qwen3_dispatch(self, threats: list) -> tuple:
"""
將 Hermes 威脅清單交給 GCP Ollama qwen3:14b取得 tool_calls 決策。
Why qwen3:14bA2 web-research 結論docs/phase0_research_report_20260503.md
- Ollama registry 官方頁 + qwenlm.github.io 雙確認 tools capability 可用
- 預設可關閉 thinking mode避免 deepseek-r1 的 30s thinking 延遲)
- 14B 體積 9.3GB,與 deepseek-r1:14b 同級
- 與 NIM 一致採 OpenAI 兼容 chat completion + tools schema
Returns:
(list of {"tool": str, "args": dict}, dict ollama_stats)
ollama_stats: {"total_tokens": int, "host": str, "model": str}
"""
from services.ollama_service import (
get_host_label,
get_provider_tag,
mark_unhealthy,
resolve_ollama_host,
)
threat_summary = json.dumps(
[
{
"sku": t.sku,
"name": t.name,
"momo_price": t.momo_price,
"pchome_price": t.pchome_price,
"gap_pct": t.gap_pct,
"sales_delta": t.sales_7d_delta_pct,
"risk": t.risk,
"action": t.recommended_action,
"confidence": t.confidence,
**_threat_match_metadata(t),
}
for t in threats
],
ensure_ascii=False,
)
# 注入 MCP 市場上下文(與 NIM 路徑一致)
mcp_ctx = build_mcp_context()
# System prompt 與 NIM 完全一致(避免兩套維護)
system_prompt = (
"你是台灣電商競價情報的行動派發器。"
f"當前市場背景 (MCP)\n{mcp_ctx}\n\n"
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
"路由鐵律(依序判斷,命中即停):\n"
"1. match_type 不是 exact或 price_basis 不是 total_price或 alert_tier 不是 price_alert_exact "
"→ 不可直接價格告警,呼叫 flag_for_human_reviewconcern 說明需覆核身份、包裝或單位價。\n"
"2. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
"3. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"4. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"5. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"每支商品只呼叫一個工具。\n"
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
" 1. 嚴禁簡體字、嚴禁異體字(例:不可用「亊」,必須用「事」)\n"
" 2. 嚴禁短語重複(語意坍塌)、嚴禁無意義字元組合\n"
"若無法產出合理的繁體中文說明,直接輸出「請人工評估議價空間」。"
)
payload = {
"model": NEMOTRON_OLLAMA_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"請處理以下 {len(threats)} 筆威脅清單:\n{threat_summary}"},
],
"tools": TOOLS, # 重用既有 NIM tools schema
"stream": False,
"options": {
"temperature": 0.2,
"num_predict": 2048,
},
}
with log_ai_call(
caller='nemotron_dispatch',
provider='gcp_ollama',
model=NEMOTRON_OLLAMA_MODEL,
request_id=f"nem-{int(time.time())}",
meta={
'flag': 'NEMOTRON_OLLAMA_FIRST',
'threats_count': len(threats),
},
) as ctx:
attempted_hosts = []
body = None
host = None
last_error = None
for _attempt in range(3):
host = resolve_ollama_host().rstrip("/")
if host in attempted_hosts:
break
attempted_hosts.append(host)
try:
resp = requests.post(
f"{host}/api/chat",
json=payload,
timeout=NEMOTRON_OLLAMA_TIMEOUT,
)
resp.raise_for_status()
body = resp.json()
ctx.set_provider(get_provider_tag(host))
ctx.add_meta('host', host)
ctx.add_meta('host_label', get_host_label(host))
ctx.add_meta('attempted_hosts', attempted_hosts)
break
except Exception as e:
last_error = e
# 連線/HTTP 失敗 → 標記主機 unhealthy下一輪依序嘗試 GCP-B / 111。
mark_unhealthy(host)
logger.warning(
"[Dispatcher][qwen3] host=%s 呼叫失敗,嘗試下一台: %s",
host, e,
)
if body is None:
ctx.set_error(
f"qwen3 call failed after {len(attempted_hosts)} host(s): "
f"{type(last_error).__name__}: {last_error}"
)
ctx.fallback_to_caller('nim')
raise RuntimeError(last_error or "qwen3 all hosts failed")
ctx.set_tokens(
input=body.get('prompt_eval_count', 0),
output=body.get('eval_count', 0),
)
msg = body.get('message', {}) if isinstance(body, dict) else {}
tool_calls = msg.get('tool_calls', []) or []
# 走共用 tool_calls 結構解析(與 NIM 同一條 helper
results = _parse_tool_calls_struct(tool_calls)
if not results:
# qwen3 沒回 tool_calls → 走既有 content fallback 解析
raw_content = msg.get('content', '') or ''
logger.warning(
f"[Dispatcher][qwen3] 0 tool_calls嘗試從 content 解析:{raw_content[:120]}"
)
results = _parse_content_fallback(raw_content)
ollama_stats = {
"total_tokens": (body.get('prompt_eval_count', 0) or 0)
+ (body.get('eval_count', 0) or 0),
"host": host,
"host_label": get_host_label(host),
"provider": get_provider_tag(host),
"model": NEMOTRON_OLLAMA_MODEL,
}
logger.info(
f"[Dispatcher][qwen3] 收到 {len(results)} 個 tool_calls | "
f"tokens={ollama_stats['total_tokens']} host={host}"
)
return results, ollama_stats
# ──────────────────────────────────────────────
# ADR-004Hermes 規則引擎降級路由
# ──────────────────────────────────────────────
def _hermes_rule_fallback(self, threats: list, hermes_stats: Optional[dict] = None) -> dict:
"""
ADR-004 降級模式NIM HTTP 429 時,改用確定性規則路由 Hermes 威脅清單。
路由規則與 NIM system prompt 一致,所有 Telegram 告警加 🟡 降級前綴。
Rules依序判斷命中即停
1. gap_pct < 5% 且 sales_delta < -30% → flag_for_human_review疑似缺貨/流量異常)
2. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert
3. gap_pct < 0 且 sales_delta > 0 → add_to_recommendation我方具競爭力
4. 其餘 → flag_for_human_review信心不足/複雜情況)
"""
degraded_note = "🟡 [降級模式 ADR-004] NIM 配額耗盡,改用 Hermes 規則引擎決策"
footprint = degraded_note + "\n" + _build_footprint_block(hermes_stats, None)
dispatched, errors = 0, []
for t in threats:
try:
# B' 軌:每個 threat 預先算金額影響,所有路徑統一注入
impact = _compute_business_impact(t)
rl, rp = impact["revenue_loss_7d"], impact["recommended_price"]
match_meta = _threat_match_metadata(t)
if not _can_direct_price_alert(t):
self._exec_flag_for_human_review(
sku=t.sku,
name=t.name,
concern=(
"🟡 [規則引擎] 比對證據尚未達直接價格告警門檻;"
f"match_type={match_meta['match_type']}"
f"price_basis={match_meta['price_basis']}"
f"alert_tier={match_meta['alert_tier']}"
"請先覆核是否為同款、同包裝或需改用單位價。"
),
confidence=max(float(getattr(t, "confidence", 0.5) or 0.5), 0.75),
footprint=footprint,
momo_price=t.momo_price,
comp_price=t.pchome_price,
gap_pct=t.gap_pct,
sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=rl,
recommended_price=rp,
**match_meta,
)
dispatched += 1
continue
if t.gap_pct < 5 and t.sales_7d_delta_pct < -30:
# Rule 1價差微小但銷量大跌 → 非定價問題,人工確認
self._exec_flag_for_human_review(
sku=t.sku, name=t.name,
concern=(
f"🟡 [規則引擎] 價差僅 {t.gap_pct:+.1f}% 但銷量大跌 "
f"{t.sales_7d_delta_pct:+.1f}%,疑似缺貨/下架/平台流量異常,"
"請人工走查前台。"
),
confidence=0.80,
footprint=footprint,
momo_price=t.momo_price, comp_price=t.pchome_price,
gap_pct=t.gap_pct, sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=rl, recommended_price=rp,
**_threat_match_metadata(t),
)
elif t.gap_pct >= 5 and t.risk == "HIGH":
# Rule 2高價差 HIGH 風險 → 競價告警
self._exec_trigger_price_alert(
t.sku, t.name,
t.gap_pct, t.sales_7d_delta_pct,
f"🟡 [規則引擎] {t.recommended_action}",
t.confidence,
momo_price=t.momo_price, comp_price=t.pchome_price,
footprint=footprint,
revenue_loss_7d=rl, recommended_price=rp,
**_threat_match_metadata(t),
)
elif t.gap_pct < 0 and t.sales_7d_delta_pct > 0:
# Rule 3我方具競爭力 + 銷量正成長 → 推薦
self._exec_add_to_recommendation(
t.sku, t.name,
(
f"🟡 [規則引擎] 我方比競品便宜 {abs(t.gap_pct):.1f}%"
f"銷量正成長 {t.sales_7d_delta_pct:+.1f}%"
),
t.confidence,
footprint=footprint,
threat=t,
)
else:
# Rule 4其餘複雜情況 → 人工覆核
self._exec_flag_for_human_review(
sku=t.sku, name=t.name,
concern=(
f"🟡 [規則引擎] 情況複雜或信心不足(信心 {t.confidence:.0%}"
f"建議:{t.recommended_action}"
),
confidence=t.confidence,
footprint=footprint,
momo_price=t.momo_price, comp_price=t.pchome_price,
gap_pct=t.gap_pct, sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=rl, recommended_price=rp,
**_threat_match_metadata(t),
)
dispatched += 1
except Exception as e:
errors.append(f"fallback({t.sku}): {e}")
logger.error(f"[Dispatcher][ADR-004] Hermes fallback 失敗 {t.sku}: {e}")
logger.info(
f"[Dispatcher][ADR-004] Hermes 規則引擎降級完成 "
f"dispatched={dispatched} errors={len(errors)}"
)
return {"dispatched": dispatched, "skipped": 0, "errors": errors, "nim_stats": {"degraded": True}}
# ──────────────────────────────────────────────
# 語意化訊息格式器
# ──────────────────────────────────────────────
@staticmethod
def _fmt_price_alert(
sku: str, name: str,
momo_price, comp_price,
gap_pct: float, sales_delta: float,
action: str, confidence: float,
footprint: str,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "exact",
price_basis: str = "total_price",
alert_tier: str = "price_alert_exact",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> str:
"""
類別一:緊急告警
倒金字塔:結論先行 → 核心數據 → 金額影響 → AI 洞察 → 運算足跡
[2026-04-18 台北] Bug-3 防線三 UI 物理隔離:
- 核心問題 = Python 客觀組字(價差 X% / 銷量 Y%),不碰 AI 文字
- 關鍵數據 = Python 獨裁注入None/0 降級為 N/A 而非 $0
- AI 洞察 = action 唯一使用位置移除假冒「Hermes 分析師研判」標籤
action 實為 NemoTron 輸出,非 Hermes — Claude Opus 4.7
[2026-05-02 台北] B' 軌:金額影響量化 — Claude Opus 4.7
- revenue_loss_7d / recommended_price 純 Python 計算_compute_business_impact
- 解決「告警內容空泛、人類無可批准的具體動作」根因
"""
conf_pct = int(confidence * 100)
# 客觀數據None/0 降級避免 $0 幻覺)
mp_str = f"${momo_price:,.0f}" if momo_price not in (None, 0) else "N/A資料缺失"
cp_str = f"${comp_price:,.0f}" if comp_price not in (None, 0) else "N/A資料缺失"
# 核心問題Python 客觀組字,不碰 AI 文字
core_issue = f"價差 {gap_pct:+.1f}% / 近七天銷量 {sales_delta:+.1f}%"
# 金額影響區塊B' 軌新增)
impact_lines = []
if revenue_loss_7d and revenue_loss_7d > 0:
impact_lines.append(f"📉 過去 7 日營收流失NT$ {revenue_loss_7d:,.0f}")
if recommended_price is not None and recommended_price > 0:
impact_lines.append(
f"🎯 跟進競品建議價NT$ {recommended_price:,.0f}"
f"(毛利策略可再加溢價)"
)
impact_block = ("\n".join(impact_lines) + "\n\n") if impact_lines else ""
match_block = _format_match_evidence_block(
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
# AI 洞察:唯一允許 LLM 文字進入的欄位
ai_insight = _sanitize_text(action, fallback="請人工評估議價空間")
return (
f"{ICON_CRITICAL} [{HEADER_DISPATCHER}] 競價高危險預警\n\n"
f"{ICON_WARNING} 核心問題:[{sku}] {name}\n"
f"{core_issue}\n\n"
f"{ICON_REPORT} 關鍵數據:\n"
f"• 我方價格:{mp_str}\n"
f"• 競品價格:{cp_str}\n"
f"• 銷量變化:{sales_delta:+.1f}%\n\n"
f"{match_block}"
f"{impact_block}"
f"{ICON_AI} AI 洞察(信心度 {conf_pct}%\n"
f"{ai_insight}\n\n"
f"{footprint}"
)
@staticmethod
def _fmt_human_review(
sku: str, name: str,
concern: str, footprint: str,
momo_price: float = None, comp_price: float = None,
gap_pct: float = None, sales_delta: float = None,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> str:
"""
類別二:人工覆核
客觀數據由 Python 注入防幻覺AI 診斷隔離在獨立欄位
B' 軌:補金額影響欄位
"""
# 客觀數據快照100% Python不經 LLM
if momo_price is not None and comp_price is not None:
data_block = (
f"{ICON_REPORT} 客觀數據快照:\n"
f"• 我方價格:${momo_price:,.0f}\n"
f"• 競品價格:${comp_price:,.0f}(價差 {gap_pct:+.1f}%\n"
f"• 七天銷量變化:{sales_delta:+.1f}%\n"
)
else:
data_block = f"{ICON_REPORT} 客觀數據:(無競品比價數據)\n"
# 金額影響B' 軌新增)
impact_lines = []
if revenue_loss_7d and revenue_loss_7d > 0:
impact_lines.append(f"📉 過去 7 日營收流失NT$ {revenue_loss_7d:,.0f}")
if recommended_price is not None and recommended_price > 0:
impact_lines.append(f"🎯 跟進競品建議價NT$ {recommended_price:,.0f}")
impact_block = ("\n".join(f"{l}" for l in impact_lines) + "\n") if impact_lines else ""
match_block = _format_match_evidence_block(
match_type=match_type or "unknown",
price_basis=price_basis or "manual_review",
alert_tier=alert_tier or "identity_review",
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
return (
f"{ICON_WARNING} [{HEADER_DISPATCHER}] 異常波動需人工覆核\n\n"
f"🔍 待查商品:[{sku}] {name}\n\n"
f"{data_block}"
f"{match_block}"
f"{impact_block}\n"
f"🧠 AI 診斷:\n"
f"{concern}\n\n"
f"👉 建議行動:請營運人員立即進行前台走查。\n\n"
f"{footprint}"
)
@staticmethod
def _fmt_recommendation(
sku: str, name: str,
reason: str, confidence: float,
db_written: bool,
footprint: str,
) -> str:
"""
類別三:策略執行通知
"""
conf_pct = int(confidence * 100)
db_status = "✅ 系統已自動寫入 ai_price_recommendations 推薦表" if db_written \
else "⚠️ DB 未注入,僅發送通知(表尚未建立)"
return (
f"{ICON_INSIGHT} [{HEADER_DISPATCHER}] 潛力商品自動佈署\n\n"
f"{ICON_COMPETE} 推薦品項:[{sku} {name}] 已自動加入「首頁推薦區塊」\n\n"
f"{ICON_REPORT} 決策依據:\n"
f"{reason}\n\n"
f"{ICON_AI} AI 洞察 (信心度 {conf_pct}%)\n"
f"具備價格競爭優勢NemoTron 主動提升曝光量以最大化業績。\n\n"
f"👉 執行狀態:{db_status}\n\n"
f"{footprint}"
)
# ──────────────────────────────────────────────
# 工具實作
# ──────────────────────────────────────────────
def _exec_trigger_price_alert(
self,
sku: str, name: str,
gap_pct: float, sales_delta: float, action: str, confidence: float,
momo_price=None, comp_price=None,
footprint: str = "",
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "exact",
price_basis: str = "total_price",
alert_tier: str = "price_alert_exact",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
):
"""發送語意化競價高危險預警
[2026-04-18 台北] Bug-1 防線一 保險default 改 None避免 LLM 漏吐
→ 舊版 default=0 → Telegram 顯示 $0。Layer A Hermes 已根治,這層是第二道屏障
— Claude Opus 4.7
[2026-05-02 台北] B'revenue_loss_7d / recommended_price 純 Python 注入
— Claude Opus 4.7
"""
msg = self._fmt_price_alert(
sku, name, momo_price, comp_price,
gap_pct, sales_delta, action, confidence, footprint,
revenue_loss_7d=revenue_loss_7d,
recommended_price=recommended_price,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
decision_envelope = _build_price_decision_envelope(
decision_type="price_alert",
sku=sku,
name=name,
gap_pct=gap_pct,
sales_delta=sales_delta,
confidence=confidence,
analysis=action,
momo_price=momo_price,
comp_price=comp_price,
revenue_loss_7d=revenue_loss_7d,
recommended_price=recommended_price,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
self._send_telegram(msg, decision_envelope=decision_envelope)
logger.info(
f"[Dispatcher] 競價告警 → {sku} gap={gap_pct:.1f}% sales={sales_delta:.1f}% "
f"loss=${revenue_loss_7d:,.0f} rec_price={recommended_price}"
)
# ADR-007 雙寫:沉澱到 ai_insights 供日後 RAG
self._sink_insight_to_km(
insight_type="price_alert",
sku=sku, name=name,
content=f"[高危險告警] {name} 價差 {gap_pct:+.1f}% / 銷量 {sales_delta:+.1f}%。行動:{action}",
metadata={"gap_pct": gap_pct, "sales_delta": sales_delta, "confidence": confidence,
"momo_price": momo_price, "comp_price": comp_price,
"revenue_loss_7d": revenue_loss_7d,
"recommended_price": recommended_price,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
"match_score": match_score,
"competitor_product_id": competitor_product_id,
"decision_envelope": decision_envelope},
)
def _exec_add_to_recommendation(
self,
sku: str, name: str, reason: str, confidence: float,
footprint: str = "",
footprint_data: Optional[dict] = None, # 結構化 JSON寫入 model_footprint 欄位
threat=None, # PriceThreat 物件,用於寫入完整數據快照
):
"""寫入前台推薦商品 DB + 語意化 Telegram 通知"""
db_written = False
if self.engine:
try:
from sqlalchemy import text
footprint_json = json.dumps(footprint_data or {}, ensure_ascii=False)
with self.engine.begin() as conn:
conn.execute(text("""
INSERT INTO ai_price_recommendations
(sku, name, reason, strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta,
model_footprint, status, created_at, updated_at)
VALUES
(:sku, :name, :reason, 'promote', :confidence,
:momo_price, :pchome_price, :gap_pct, :sales_delta,
:footprint, 'pending', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (sku) DO UPDATE
SET reason = EXCLUDED.reason,
confidence = EXCLUDED.confidence,
momo_price = EXCLUDED.momo_price,
pchome_price = EXCLUDED.pchome_price,
gap_pct = EXCLUDED.gap_pct,
sales_7d_delta = EXCLUDED.sales_7d_delta,
model_footprint = EXCLUDED.model_footprint,
status = 'pending',
updated_at = CURRENT_TIMESTAMP
"""), {
"sku": sku,
"name": name,
"reason": reason,
"confidence": confidence,
"momo_price": getattr(threat, "momo_price", None) if threat else None,
"pchome_price": getattr(threat, "pchome_price", None) if threat else None,
"gap_pct": getattr(threat, "gap_pct", None) if threat else None,
"sales_delta": getattr(threat, "sales_7d_delta_pct", None) if threat else None,
"footprint": footprint_json,
})
db_written = True
logger.info(f"[Dispatcher] 推薦商品寫入 DB → {sku}")
except Exception as e:
logger.error(f"[Dispatcher] DB 寫入失敗 {sku}: {e}")
msg = self._fmt_recommendation(
sku, name, reason, confidence, db_written, footprint,
)
self._send_telegram(msg)
# ADR-007 雙寫
self._sink_insight_to_km(
insight_type="recommendation",
sku=sku, name=name,
content=f"[推薦商品] {name}。原因:{reason}",
metadata={"confidence": confidence, "db_written": db_written},
)
def _exec_flag_for_human_review(
self,
sku: str, name: str, concern: str, confidence: float,
footprint: str = "",
momo_price: float = None, comp_price: float = None,
gap_pct: float = None, sales_delta: float = None,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
):
"""發送語意化人工覆核請求"""
concern = _sanitize_text(concern, fallback=f"數據走勢違背常理,疑似缺貨或前台異常。")
msg = self._fmt_human_review(
sku, name, concern, footprint,
momo_price=momo_price, comp_price=comp_price,
gap_pct=gap_pct, sales_delta=sales_delta,
revenue_loss_7d=revenue_loss_7d,
recommended_price=recommended_price,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
decision_envelope = _build_price_decision_envelope(
decision_type="human_review",
sku=sku,
name=name,
gap_pct=gap_pct,
sales_delta=sales_delta,
confidence=confidence,
analysis=concern,
momo_price=momo_price,
comp_price=comp_price,
revenue_loss_7d=revenue_loss_7d,
recommended_price=recommended_price,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
self._send_telegram(msg, decision_envelope=decision_envelope)
logger.info(
f"[Dispatcher] 人工覆核請求 → {sku} loss=${revenue_loss_7d:,.0f}"
)
# ADR-007 雙寫
self._sink_insight_to_km(
insight_type="human_review",
sku=sku, name=name,
content=f"[人工覆核] {name}。疑慮:{concern}",
metadata={"confidence": confidence, "gap_pct": gap_pct, "sales_delta": sales_delta,
"momo_price": momo_price, "comp_price": comp_price,
"revenue_loss_7d": revenue_loss_7d,
"recommended_price": recommended_price,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
"match_score": match_score,
"competitor_product_id": competitor_product_id,
"decision_envelope": decision_envelope},
)
def _exec_route_to_km(
self,
sku: str, name: str, km_domain: str, summary: str, confidence: float,
footprint: str = "",
threat=None,
):
"""
將洞察路由到 KM 指定領域sink 到 ai_insights 供 RAG 使用。
不送 Telegram 告警(靜默操作,僅 log
"""
_KM_DOMAINS = {"price_competition", "sales_anomaly", "promotion_opportunity", "market_trend"}
domain = km_domain if km_domain in _KM_DOMAINS else "price_competition"
summary = _sanitize_text(summary, fallback="競價洞察已歸檔")
self._sink_insight_to_km(
insight_type=f"km_{domain}",
sku=sku, name=name,
content=f"[KM 路由 {domain}] {name}{summary}",
metadata={
"km_domain": domain,
"confidence": confidence,
"momo_price": getattr(threat, "momo_price", None) if threat else None,
"pchome_price": getattr(threat, "pchome_price", None) if threat else None,
"gap_pct": getattr(threat, "gap_pct", None) if threat else None,
"sales_delta": getattr(threat, "sales_7d_delta_pct", None) if threat else None,
},
)
logger.info(f"[Dispatcher] KM 路由 → {sku} domain={domain} confidence={confidence:.2f}")
def _exec_mark_for_relearn(
self,
sku: str, name: str, reason: str,
footprint: str = "",
):
"""
將該 SKU 的既有 ai_insights 標記 status='relearn' + feedback_down+1
讓每日去重/品質分數重算批次可感知「此洞察已被推翻」。
不送 Telegram 告警(靜默操作,僅 log
"""
reason = _sanitize_text(reason, fallback="新數據與歷史洞察矛盾,需重新學習")
try:
from database.manager import DatabaseManager
db = DatabaseManager()
with db.get_session() as session:
from sqlalchemy import text
result = session.execute(text("""
UPDATE ai_insights
SET status = 'relearn',
feedback_down = COALESCE(feedback_down, 0) + 1,
updated_at = CURRENT_TIMESTAMP
WHERE product_sku = :sku
AND status NOT IN ('relearn', 'archived')
"""), {"sku": sku})
session.commit()
rows = result.rowcount
logger.info(f"[Dispatcher] mark_for_relearn → {sku} 共更新 {rows} 筆洞察;原因:{reason}")
except Exception as e:
logger.warning(f"[Dispatcher] mark_for_relearn DB 更新失敗 ({sku}): {e}")
# 同時寫入一筆 relearn 事件到 ai_insights 留存紀錄
self._sink_insight_to_km(
insight_type="relearn_event",
sku=sku, name=name,
content=f"[重新學習事件] {name}{reason}",
metadata={"sku": sku, "trigger": "nemoton_dispatcher"},
)
def _sink_insight_to_km(self, insight_type: str, sku: str, name: str,
content: str, metadata: dict = None):
"""
ADR-007 雙寫:派發後把決策/洞察沉澱到 ai_insights供日後 RAG/PPT
period 以當日 YYYY-MM-DD 作為 cache-aside 鍵,同日同 SKU 同 type 會覆蓋。
失敗不阻斷主線。
"""
try:
from services.openclaw_learning_service import store_insight
period = datetime.now().strftime("%Y-%m-%d")
meta = {"sku": sku, "name": name}
if metadata:
meta.update(metadata)
store_insight(
insight_type=insight_type,
content=content,
period=period,
product_sku=sku,
metadata=meta,
ai_model=NIM_MODEL,
)
except Exception as e:
logger.warning(f"[Dispatcher] sink insight 略過 ({insight_type}/{sku}): {e}")
def _send_telegram(self, message: str, decision_envelope: Optional[dict] = None):
"""
ADR-019 Phase 5: 改走 EventRouter 統一入口
舊行為(直接呼叫 Telegram Bot API + MarkdownV2 跳脫)已由 EventRouter
+ telegram_templates.send_telegram_with_result 取代。
失敗降級EventRouter 內建 retry + JSONL queue replay任何例外不阻斷主線。
"""
try:
from services.event_router import dispatch_sync
payload = {"raw_message": message}
event = {
"id": "",
"event_type": "nemoton_dispatch_alert",
"severity": "alert",
"source": "NemoTron.Dispatcher",
"title": "NemoTron 派發器告警",
"summary": message[:400],
"status": "dispatched",
"payload": payload,
}
if isinstance(decision_envelope, dict) and decision_envelope:
event["id"] = str(decision_envelope.get("decision_id") or "")[:52]
event["decision_envelope"] = decision_envelope
payload["decision_envelope"] = decision_envelope
if not event["id"]:
event.pop("id", None)
dispatch_sync(event=event)
except Exception as e:
logger.error(f"[Dispatcher] EventRouter dispatch 失敗: {e}")
logger.info(f"[Dispatcher] 告警內容fallback log{message[:200]}")
# ──────────────────────────────────────────────
# 公開介面
# ──────────────────────────────────────────────
async def handle_l2(self, event: dict, ctx: dict) -> dict:
"""L2 行動規劃介面(給 EventRouter / Telegram NLP 使用)。
Contract:
event: {"message": str, "user_id": int, "chat_id": int, ...}
ctx: 包含 Hermes handle_l1 結果(在 "latest" key 或 flatten 後)
Returns:
{
"action_plan": [{"action": str, "params": dict}, ...],
"dispatch_to": "openclaw|direct_response|human_review",
"metadata": dict,
}
策略:
本方法不呼叫 NIM批量 tool calling 專用),改以 L1 intent + 訊息關鍵字
決定下一步路由。複雜分析 → openclaw簡單回覆 → direct_response。
任何例外都回 direct_response 保底。
"""
try:
message = (event or {}).get("message", "") or ""
event_type = (event or {}).get("event_type", "")
payload = (event or {}).get("payload") or {}
task_name = payload.get("task_name") or (event or {}).get("task_name")
if event_type == "scheduler_task_failure" and task_name:
try:
from services.agent_actions import ALLOWED_RETRY_TASKS
if task_name in ALLOWED_RETRY_TASKS:
return {
"session_id": f"evt:{event_type}:{(event or {}).get('source', 'unknown')}",
"plan_type": "retry_task",
"action_plan": [{
"action": "retry_task",
"params": {
"task_name": task_name,
"max_attempts": 2,
"backoff_sec": 60,
},
}],
"dispatch_to": "safe_action",
"auto_execute": True,
"metadata": {"event_type": event_type, "task_name": task_name},
}
except Exception as action_err:
logger.warning("[NemotronDispatcher.handle_l2] retry_task 規劃跳過: %s", action_err)
# ctx 可能是 {"latest": {...}} 或已攤平的 intent 結果
hermes = {}
if isinstance(ctx, dict):
hermes = ctx.get("latest") if isinstance(ctx.get("latest"), dict) else ctx
intent = (hermes or {}).get("intent", "unknown")
complexity = float((hermes or {}).get("complexity_score", 0.0) or 0.0)
needs_data = bool((hermes or {}).get("requires_data_fetch", False))
msg_lower = message.lower()
report_keywords = ("report", "ppt", "週報", "月報", "日報", "報表", "報告")
wants_report = any(k in msg_lower for k in report_keywords)
if wants_report or complexity >= 0.7 or needs_data or intent in (
"query_sales", "analyze_competitor", "report"
):
dispatch_to = "openclaw"
else:
dispatch_to = "direct_response"
action_plan = [{
"action": "strategist_analyze" if dispatch_to == "openclaw" else "reply_simple",
"params": {"message": message, "intent": intent},
}]
return {
"action_plan": action_plan,
"dispatch_to": dispatch_to,
"metadata": {"complexity_score": complexity, "intent": intent},
}
except Exception as e:
logger.warning(f"[NemotronDispatcher.handle_l2] 規劃失敗 fallback direct_response: {e}")
return {
"action_plan": [{"action": "reply_simple", "params": {}}],
"dispatch_to": "direct_response",
"metadata": {"error": str(e)},
}
def dispatch(self, threats: list, hermes_stats: Optional[dict] = None) -> dict:
"""
主入口:接收 Hermes 威脅清單,透過 NIM 決策後執行語意化告警
Args:
threats: list[PriceThreat]
hermes_stats: {"duration_sec": float, "tokens": int}
由 HermesAnalystService 傳入,用於運算足跡顯示
Returns:
{"dispatched": int, "skipped": int, "errors": list, "nim_stats": dict}
[2026-04-18 台北] Bug-2 防線二 Python 絕對獨裁路由(雙閘門):
閘門 A銷量 ≤ -95% 絕對斷崖 → 100% 是缺貨/下架/前台異常,不論價差
(2026-04-18 傍晚升級:真實案例 sku=7440662 sales=-100% gap=6.1% 被 NemoTron 錯派降價)
閘門 B銷量 ≤ -80% 且 |價差| < 5% → 中度斷崖 + 價差微小,定價非主因
命中任一閘門 → 強制走人工覆核,不進 NIM — Claude Opus 4.7
"""
if not threats:
return {"dispatched": 0, "skipped": 0, "errors": [], "nim_stats": {}}
# ── 防線二Python 絕對獨裁預路由(雙閘門) ──
forced_review, nim_candidates = [], []
skipped = 0
for t in threats:
if _is_duplicate_alert(t.sku):
logger.info(f"[Dispatcher] SKU {t.sku}{int(_ALERT_TTL_SEC/3600)}h 內已告警,觸發去重跳過。")
skipped += 1
continue
gate_a = t.sales_7d_delta_pct <= -95 # 絕對斷崖
gate_b = t.sales_7d_delta_pct <= -80 and abs(t.gap_pct) < 5 # 中度斷崖 + 微價差
if gate_a or gate_b:
forced_review.append(t)
else:
nim_candidates.append(t)
errors = []
dispatched = 0
if forced_review:
logger.warning(
f"[Dispatcher] 防線二攔截 {len(forced_review)} 筆斷崖異常(不進 NIM: "
+ ", ".join(
f"{t.sku}(sales={t.sales_7d_delta_pct:+.0f}%, gap={t.gap_pct:+.1f}%)"
for t in forced_review
)
)
hr_footprint = _build_footprint_block(hermes_stats, None)
for t in forced_review:
# 依閘門類型組 concern 文字
if t.sales_7d_delta_pct <= -95:
concern_text = (
f"銷量近乎歸零({t.sales_7d_delta_pct:+.1f}%)— 絕對斷崖警戒。"
"此等級業績崩潰幾乎 100% 為缺貨、下架、或前台頁面異常,"
"與定價策略無關,請營運人員立即走查前台。"
)
else:
concern_text = (
f"銷量斷崖下跌 {t.sales_7d_delta_pct:+.1f}%,但價差僅 "
f"{t.gap_pct:+.1f}%(絕對值 < 5%),與定價無關。"
"疑似缺貨、下架、或前台異常,請營運人員立即走查。"
)
try:
impact = _compute_business_impact(t)
self._exec_flag_for_human_review(
sku=t.sku,
name=t.name,
concern=concern_text,
confidence=0.99, # Python 規則判定,信心滿格
footprint=hr_footprint,
momo_price=t.momo_price,
comp_price=t.pchome_price,
gap_pct=t.gap_pct,
sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=impact["revenue_loss_7d"],
recommended_price=impact["recommended_price"],
**_threat_match_metadata(t),
)
dispatched += 1
except Exception as e:
errors.append(f"forced_review({t.sku}): {e}")
logger.error(f"[Dispatcher] 防線二執行失敗 {t.sku}: {e}")
# 若全部被防線二攔截,直接回傳(不浪費 NIM 配額)
if not nim_candidates:
logger.info(f"[Dispatcher] 全部 {len(forced_review)} 筆由防線二處理(或去重),不呼叫 NIM")
return {
"dispatched": dispatched,
"skipped": skipped,
"errors": errors,
"nim_stats": {},
}
# ── Operation Ollama-First v5.0 / Phase 3 / A9qwen3 主路徑feature flag 灰度)──
# NEMOTRON_OLLAMA_FIRST=false 時不進入此分支,僅作緊急退路。
# 若 qwen3 成功取得 tool_calls沿用既有 TOOL_MAP 執行邏輯(共用 footprint/threat 注入)。
# 若 qwen3 失敗或 0 tool_calls → 不直接降到 Hermes 規則,先嘗試 NIM 備援,再走 ADR-004。
qwen3_used = False
qwen3_stats: Optional[dict] = None
qwen3_tool_calls: Optional[list] = None
if NEMOTRON_OLLAMA_FIRST:
try:
qwen3_tool_calls, qwen3_stats = self._call_qwen3_dispatch(nim_candidates)
if qwen3_tool_calls:
qwen3_used = True
logger.info(
f"[Dispatcher][qwen3] 主路徑成功 tool_calls={len(qwen3_tool_calls)} "
f"tokens={qwen3_stats.get('total_tokens', 0)}"
)
else:
logger.warning("[Dispatcher][qwen3] 0 tool_callsfallback 至 NIM")
except Exception as e:
logger.warning(f"[Dispatcher][qwen3] 呼叫失敗 fallback NIM: {e}")
# log_ai_call 已在 _call_qwen3_dispatch 內標記 status=error + fallback_to=nim
qwen3_tool_calls = None
qwen3_stats = None
# qwen3 主路徑成功 → 直接進入工具執行區塊(跳過 NIM
if qwen3_used:
tool_calls = qwen3_tool_calls
# 與既有 NIM 路徑一致的 stats 結構footprint 顯示用)
nim_stats = {
"total_tokens": qwen3_stats.get("total_tokens", 0),
"quota_used": _nim_quota_used(), # 配額未動用
"provider": qwen3_stats.get("provider", "gcp_ollama"),
"model": qwen3_stats.get("model", NEMOTRON_OLLAMA_MODEL),
"host": qwen3_stats.get("host"),
"host_label": qwen3_stats.get("host_label"),
}
return self._execute_tool_calls(
tool_calls=tool_calls,
threats=threats,
hermes_stats=hermes_stats,
nim_stats=nim_stats,
pre_dispatched=dispatched,
pre_skipped=skipped,
pre_errors=errors,
)
# ── 進入 NIM 路徑flag=false 緊急主路徑flag=true 則為 qwen3 失敗備援)──
if not NIM_API_KEY:
logger.warning("[Dispatcher][ADR-004] NVIDIA_API_KEY 未設定,啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
if not _check_nim_quota():
logger.warning("[Dispatcher][ADR-004] NIM 配額耗盡,啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
try:
tool_calls, nim_stats = self._call_nim(nim_candidates)
if not tool_calls:
logger.warning("[Dispatcher][ADR-004] NIM 0 tool_calls啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
logger.warning("[Dispatcher][ADR-004] NIM HTTP 429啟動 Hermes 規則引擎降級")
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + fb["errors"],
"nim_stats": fb["nim_stats"],
}
logger.warning("[Dispatcher][ADR-004] NIM HTTP 錯誤,啟動 Hermes 規則引擎降級: %s", e)
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + [str(e)] + fb["errors"],
"nim_stats": fb["nim_stats"],
}
except Exception as e:
logger.warning("[Dispatcher][ADR-004] NIM 呼叫失敗,啟動 Hermes 規則引擎降級: %s", e)
fb = self._hermes_rule_fallback(nim_candidates, hermes_stats)
return {
"dispatched": dispatched + fb["dispatched"],
"skipped": skipped + fb["skipped"],
"errors": errors + [str(e)] + fb["errors"],
"nim_stats": fb["nim_stats"],
}
return self._execute_tool_calls(
tool_calls=tool_calls,
threats=threats,
hermes_stats=hermes_stats,
nim_stats=nim_stats,
pre_dispatched=dispatched,
pre_skipped=skipped,
pre_errors=errors,
)
# ──────────────────────────────────────────────
# tool_calls 執行區塊NIM 與 qwen3 共用)
# ──────────────────────────────────────────────
def _execute_tool_calls(
self,
tool_calls: list,
threats: list,
hermes_stats: Optional[dict],
nim_stats: dict,
pre_dispatched: int = 0,
pre_skipped: int = 0,
pre_errors: Optional[list] = None,
) -> dict:
"""執行 LLM 回傳的 tool_calls 清單,注入 Python 獨裁的客觀數字 + 金額影響。
被 NIM 路徑與 qwen3 路徑共用,避免雙路雙維護。
"""
errors = list(pre_errors or [])
dispatched = pre_dispatched
footprint_text = _build_footprint_block(hermes_stats, nim_stats)
footprint_data = _build_footprint_json(hermes_stats, nim_stats)
threat_map = {t.sku: t for t in threats}
TOOL_MAP = {
"trigger_price_alert": self._exec_trigger_price_alert,
"add_to_recommendation": self._exec_add_to_recommendation,
"flag_for_human_review": self._exec_flag_for_human_review,
"route_to_km": self._exec_route_to_km,
"mark_for_relearn": self._exec_mark_for_relearn,
}
for tc in tool_calls:
tool_name = tc.get("tool")
args = dict(tc.get("args", {}) or {})
handler = TOOL_MAP.get(tool_name)
if not handler:
errors.append(f"未知工具: {tool_name}")
continue
args["footprint"] = footprint_text
t = threat_map.get(args.get("sku"))
if tool_name == "trigger_price_alert" and t and not _can_direct_price_alert(t):
match_meta = _threat_match_metadata(t)
tool_name = "flag_for_human_review"
handler = TOOL_MAP[tool_name]
args = {
"sku": getattr(t, "sku", args.get("sku")),
"name": getattr(t, "name", args.get("name")),
"concern": (
"比對證據尚未達直接價格告警門檻;"
f"match_type={match_meta['match_type']}"
f"price_basis={match_meta['price_basis']}"
f"alert_tier={match_meta['alert_tier']}"
"請先覆核是否為同款、同包裝或需改用單位價。"
),
"confidence": max(float(getattr(t, "confidence", 0.5) or 0.5), 0.75),
"footprint": footprint_text,
}
if tool_name == "trigger_price_alert" and t:
args["momo_price"] = getattr(t, "momo_price", None)
args["comp_price"] = getattr(t, "pchome_price", None)
args["gap_pct"] = getattr(t, "gap_pct", None)
args["sales_delta"] = getattr(t, "sales_7d_delta_pct", None)
impact = _compute_business_impact(t)
args["revenue_loss_7d"] = impact["revenue_loss_7d"]
args["recommended_price"] = impact["recommended_price"]
args.update(_threat_match_metadata(t))
elif tool_name == "flag_for_human_review" and t:
args["momo_price"] = getattr(t, "momo_price", None)
args["comp_price"] = getattr(t, "pchome_price", None)
args["gap_pct"] = getattr(t, "gap_pct", None)
args["sales_delta"] = getattr(t, "sales_7d_delta_pct", None)
impact = _compute_business_impact(t)
args["revenue_loss_7d"] = impact["revenue_loss_7d"]
args["recommended_price"] = impact["recommended_price"]
args.update(_threat_match_metadata(t))
elif tool_name == "add_to_recommendation":
args["footprint_data"] = footprint_data
args["threat"] = t
elif tool_name == "route_to_km":
args["threat"] = t
try:
handler(**args)
dispatched += 1
except Exception as e:
errors.append(f"{tool_name}({args.get('sku', '?')}): {e}")
logger.error(f"[Dispatcher] 工具執行失敗 [{tool_name}]: {e}")
skipped = max(0, len(threats) - dispatched)
# nim_stats 在 qwen3 路徑下會帶 provider='gcp_ollama'log 出處可區辨
provider = nim_stats.get("provider", "nim") if isinstance(nim_stats, dict) else "nim"
logger.info(
f"[Dispatcher] 完成 provider={provider} "
f"dispatched={dispatched} skipped={skipped} "
f"errors={len(errors)} tokens={nim_stats.get('total_tokens', 0)}"
)
return {
"dispatched": dispatched,
"skipped": skipped,
"errors": errors,
"nim_stats": nim_stats,
}
# ─────────────────────────────────────────────
# CLI 測試(需設 NVIDIA_API_KEY env var
# python3 services/nemoton_dispatcher_service.py
# ─────────────────────────────────────────────
if __name__ == "__main__":
import sys
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
@dataclass
class FakeThreat:
sku: str
name: str
category: str
momo_price: float
pchome_price: float
gap_pct: float
sales_7d_delta_pct: float
risk: str
recommended_action: str
confidence: float
sales_7d_curr_amount: float = 0.0
sales_7d_prev_amount: float = 0.0
fake_threats = [
FakeThreat("A003", "舒特膚AD乳液200ml", "美妝保養",
1200, 980, 22.4, -35.0, "HIGH",
"建議立即降價至 $1,000 迎戰,或發放 $200 專屬折價券", 0.85,
sales_7d_curr_amount=78000, sales_7d_prev_amount=120000),
FakeThreat("A001", "玻尿酸面膜10片裝", "美妝保養",
320, 280, 14.3, -42.0, "HIGH",
"建議跟進降價至 $285配合限時加購活動", 0.78,
sales_7d_curr_amount=58000, sales_7d_prev_amount=100000),
FakeThreat("A009", "美白化妝水150ml", "美妝保養",
420, 350, 20.0, -22.0, "HIGH",
"價格差距過大,建議優先調降或捆包促銷", 0.45,
sales_7d_curr_amount=78000, sales_7d_prev_amount=100000),
]
# 模擬 Hermes 運算足跡
fake_hermes_stats = {"duration_sec": 34.2, "tokens": 512}
if not NIM_API_KEY:
print("⚠️ NVIDIA_API_KEY 未設定,測試訊息格式(不呼叫 NIM")
print()
dispatcher = NemotronDispatcher()
fake_nim_stats = {"total_tokens": 185, "quota_used": 2}
footprint = _build_footprint_block(fake_hermes_stats, fake_nim_stats)
# 測試三種訊息格式
print("=== 類別一:緊急告警(含 B' 金額影響) ===")
print(NemotronDispatcher._fmt_price_alert(
"A003", "舒特膚AD乳液200ml", 1200, 980,
22.4, -35.0, "建議立即降價至 $1,000 迎戰,或發放 $200 專屬折價券",
0.85, footprint,
revenue_loss_7d=42000.0, # B' 軌驗證120k - 78k = 42k 流失
recommended_price=980, # B' 軌驗證:跟進競品價
))
print()
print("=== 類別二:人工覆核 ===")
# [2026-04-18 台北] CLI 測試修正_fmt_human_review 用 kwargs避免位置錯位 — Claude Opus 4.7
print(NemotronDispatcher._fmt_human_review(
sku="A001", name="玻尿酸面膜10片裝",
concern=("銷量斷崖下跌 -100.0%,但價差僅 +2.1%(絕對值 < 5%),與定價無關。"
"疑似缺貨、下架、或前台異常,請營運人員立即走查。"),
footprint=footprint,
momo_price=285, comp_price=280, gap_pct=1.8, sales_delta=-100.0,
))
print()
print("=== 類別三:策略推薦 ===")
print(NemotronDispatcher._fmt_recommendation(
"A009", "美白化妝水150ml",
"我方價格低於市場 20%近7天銷量回升具備流量轉換潛力",
0.82, True, footprint,
))
print()
# ── 防線二 Bug-2 驗證Python 絕對獨裁路由 ──
print("=== 防線二驗證Python 絕對獨裁預路由 ===")
validation_threats = [
FakeThreat("A-EXTREME-1", "斷崖商品(缺貨)", "測試",
300, 298, 0.7, -100.0, "MED",
"疑似缺貨", 0.5),
FakeThreat("A-EXTREME-2", "斷崖商品(負價差)", "測試",
298, 310, -3.9, -85.0, "MED",
"疑似前台異常", 0.5),
# [2026-04-18 台北] 閘門 A 案例sales=-100% gap 超過 5% 仍要攔(真實 sku 7440662
FakeThreat("A-GATE-A", "雪芙蘭小蒼蘭滋養膏 60g", "美妝保養",
52, 49, 6.1, -100.0, "HIGH",
"建議跟進降價促銷", 0.95),
FakeThreat("A-NORMAL-1", "正常降價案例", "測試",
1200, 980, 22.4, -35.0, "HIGH",
"建議跟進降價", 0.85),
]
forced, nim_list = [], []
for t in validation_threats:
gate_a = t.sales_7d_delta_pct <= -95
gate_b = t.sales_7d_delta_pct <= -80 and abs(t.gap_pct) < 5
if gate_a or gate_b:
forced.append(t)
else:
nim_list.append(t)
print(f"強制人工覆核 {len(forced)} 筆(預期 3閘門 A + 閘門 B × 2: "
f"{[t.sku for t in forced]}")
print(f"送入 NIM 決策 {len(nim_list)} 筆(預期 1: "
f"{[t.sku for t in nim_list]}")
assert len(forced) == 3, f"防線二分類錯誤forced 應為 3實為 {len(forced)}"
assert len(nim_list) == 1, f"防線二分類錯誤nim 應為 1實為 {len(nim_list)}"
print("✅ 防線二雙閘門分類邏輯正確")
sys.exit(0)
print("=== NemoTron Dispatcher CLI 測試(真實 NIM ===\n")
dispatcher = NemotronDispatcher()
result = dispatcher.dispatch(fake_threats, hermes_stats=fake_hermes_stats)
print(
f"\n結果dispatched={result['dispatched']} "
f"skipped={result['skipped']} errors={result['errors']}\n"
f"NIM: {result['nim_stats']}"
)