Files
ewoooc/services/hermes_analyst_service.py
OoO b73dc6df3f
All checks were successful
CD Pipeline / deploy (push) Successful in 1m5s
V10.415 protect Hermes fallback routing
2026-05-24 14:25:22 +08:00

759 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hermes 3 競價情報分析服務 (Module 2)
角色:分析師 (Analyst)
模型hermes3:latest @ OllamaService 三主機級聯GCP-A → GCP-B → 111
輸入SQL 漏斗篩選後的候選商品(~300筆
輸出Top N 威脅清單(結構化 JSON→ 交給 NemoTron dispatcher
架構位置:
SQL漏斗 → [本服務] → NemotronDispatcher → Telegram 告警
"""
import asyncio
import json
import logging
import os
import re
import time
import uuid
from dataclasses import dataclass
from typing import Optional
import requests
from sqlalchemy import text
from services.mcp_context_service import build_mcp_context
from services.ollama_service import OllamaService, get_host_label, get_provider_tag
from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1
from services.rag_service import rag_service, is_rag_enabled # Phase 11: RAG-first 快取
logger = logging.getLogger(__name__)
from config import HERMES_TIMEOUT
HERMES_MODEL = "hermes3:latest"
HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "5m")
HERMES_ALLOW_111_FALLBACK = os.getenv("HERMES_ALLOW_111_FALLBACK", "false").strip().lower() in (
"1", "true", "yes", "on",
)
TOP_N = 20 # 輸出前 N 個威脅,控制 NemoTron 每次消耗配額
def _parse_json_payload(value) -> dict:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
try:
payload = json.loads(value)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
return {}
def _parse_tag_list(value) -> list[str]:
if isinstance(value, list):
return [str(item) for item in value if item]
if isinstance(value, str) and value.strip():
try:
payload = json.loads(value)
if isinstance(payload, list):
return [str(item) for item in payload if item]
except Exception:
return []
return []
def _tag_suffix(tags: list[str], prefix: str) -> str:
marker = f"{prefix}_"
for tag in tags:
if str(tag).startswith(marker):
return str(tag).removeprefix(marker)
return ""
@dataclass
class PriceThreat:
sku: str
name: str
category: str
momo_price: float
pchome_price: float
gap_pct: float # 正值代表我貴
sales_7d_delta_pct: float
risk: str # HIGH / MED / LOW
recommended_action: str
confidence: float
sales_7d_curr_amount: float = 0.0 # 過去 7 日營收金額NT$),供下游金額影響量化
sales_7d_prev_amount: float = 0.0 # 前 7 日營收金額NT$),供「可挽回營收」估算
match_type: str = "exact"
price_basis: str = "total_price"
alert_tier: str = "price_alert_exact"
match_score: float = 0.0
competitor_product_id: str = ""
competitor_product_name: str = ""
competitor_tags: tuple[str, ...] = ()
@dataclass
class AnalysisResult:
success: bool
threats: list
total_candidates: int
analysis_duration_sec: float
hermes_tokens: int = 0 # Ollama eval_count供 footprint 顯示)
error: Optional[str] = None
class HermesAnalystService:
"""
競價情報分析師
負責從 DB 撈候選商品、交給 Hermes 3 分析、回傳結構化威脅清單
"""
SYSTEM_PROMPT = """你是一位台灣電商競價情報分析師。
規則:
1. 輸出「只能」是有效的 JSON 陣列,禁止任何前言或解釋文字
2. recommended_action 必須使用「台灣標準正體中文(繁體)」。
【語言鐵律 — 2026-04-18 台北強化】
a. 嚴禁簡體字(例:不可用「参给当为来国发会说时间过从实现这话动问题」,
必須用「參給當為來國發會說時間過從實現這話動問題」)
b. 嚴禁異體字(例:不可用「亊」,必須用「事」)
c. 嚴禁短語重複(例:不可輸出「當前事當前事」這種坍塌)
d. 嚴禁無意義字元組合或亂碼
e. 若無法產出合理的繁體中文說明,直接輸出「建議人工評估」
3. 風險等級判定:
- HIGH價差 > 15% 且 7天銷量跌幅 > 20%
- MED價差 > 10% 或 7天銷量跌幅 > 15%
- LOW其他
4. confidence 根據數據確定性給分0.0~1.0
5. 【防幻覺鐵律】絕對禁止捏造輸入資料中未提供的數據(如折扣%、促銷活動、隱藏優惠)。
只能基於 gap_pct、sales_delta、competitor_tags 等已提供欄位做推論。
6. 【身份證據鐵律】match_type / price_basis / alert_tier 是系統比對結論,不得改寫。
- 只有 alert_tier="price_alert_exact" 且 match_type="exact" 且 price_basis="total_price" 可當成直接價格威脅。
- 其他情況只能建議「人工覆核身份/包裝/單位價」,不可建議直接降價。
7. 【非價格異常路由】若 gap_pct 絕對值 < 5% 但 sales_delta < -30%
- 判定為「非價格因素異常」(高機率:缺貨、下架、平台流量異常、頁面問題)
- risk 設為 MEDrecommended_action 必須寫「價差接近零但業績異常下滑,建議立即人工走查前台頁面(確認是否缺貨/下架/頁面異常)」
- confidence 設為 0.5(因缺乏確切原因)"""
def __init__(self, engine=None):
self.engine = engine # SQLAlchemy engine可外部注入
# ──────────────────────────────────────────────
# Phase 11: RAG-first Q&A 介面Operation Ollama-First v5.0
# ──────────────────────────────────────────────
def analyze(self, query: str, request_id: Optional[str] = None) -> dict:
"""RAG-first Q&A 入口feature flag RAG_ENABLED 預設 OFF
流程:
1. RAG_ENABLED=true 時先查 ai_insightscosine >= 0.85 直接回)
2. 命中 → 不呼 LLM{"source": "rag", "content": ..., "rag_log_id": ...}
3. 未命中或 flag OFF → fallback 到 _call_hermes_intent既有 LLM 路徑)
- LLM 結果同時 enqueue learning_episodes 給 PromotionGate 蒸餾
- 回 {"source": "llm", "content": ..., "intent": ...}
Args:
query: 使用者問題(繁中)
request_id: 與 ai_calls.request_id 串鏈None 自動產生
Returns:
dict 一律含 'source' / 'content' 兩個欄位caller 不必自己分支判斷。
"""
rid = request_id or f"hermes-{uuid.uuid4().hex[:8]}"
# ── Step 1: RAG-firstfeature flag 預設 OFF → 立即跳過)──
if is_rag_enabled():
rag = rag_service.query(
text=query,
caller='hermes_analyst',
threshold=0.85,
request_id=rid,
mark_saved_call=True,
)
if rag.has_high_confidence:
logger.info(
"[Hermes.analyze] RAG hit request_id=%s top_score=%.3f hits=%d",
rid, rag.hits[0].get('score', 0), len(rag.hits),
)
return {
'source': 'rag',
'content': rag.synthesize(),
'rag_log_id': rag.log_id,
'request_id': rid,
}
# ── Step 2: LLM fallback既有 _call_hermes_intent 路徑)──
intent_result = self._call_hermes_intent(query) or self._rule_based_intent(query)
content = intent_result.get('preliminary_answer') or ''
# ── Step 3: enqueue learning_episodes給 PromotionGate 蒸餾;失敗不影響主流程)──
if content:
try:
from services.learning_pipeline import learning_pipeline
learning_pipeline.enqueue(
episode_type='llm_response',
raw_content=content,
source_table=None, # Hermes 此處未取 ai_calls.id保留 None 讓 source check 通過
source_id=None,
)
except Exception as exc:
logger.debug("[Hermes.analyze] learning_pipeline enqueue 跳過: %s", exc)
return {
'source': 'llm',
'content': content,
'intent': intent_result.get('intent'),
'metadata': intent_result.get('metadata', {}),
'request_id': rid,
}
# ──────────────────────────────────────────────
# L1 意圖分析介面(給 EventRouter / Telegram NLP 使用)
# ──────────────────────────────────────────────
async def handle_l1(self, event: dict, ctx: dict) -> dict:
"""L1 語意理解:輸入 event含 message輸出 intent / complexity_score / requires_data_fetch。
Contract:
event: {"message": str, "user_id": int, "chat_id": int, ...}
ctx: 既有 agent_context可為空 dict
Returns:
{
"intent": str, # greeting / help / query_sales / analyze_competitor / report / unknown
"confidence": float,
"complexity_score": float, # 0.0~1.0
"requires_data_fetch": bool,
"preliminary_answer": str,
"metadata": dict,
}
實作策略:
1) 先用 Hermes3 LLM 做意圖分類timeout 短、temp 低)
2) Ollama 掛掉時降級為關鍵字規則引擎,保證永遠回傳結構化結果
3) 任何例外都被 graceful 吞掉,不會讓 handle_message 整個炸
"""
message = (event or {}).get("message", "") or ""
llm_result = None
try:
llm_result = await asyncio.get_running_loop().run_in_executor(
None, self._call_hermes_intent, message
)
except Exception as e:
logger.warning(
f"[Hermes.handle_l1] run_in_executor 例外,降級規則引擎"
f"{type(e).__name__}: {e}",
exc_info=True,
)
if llm_result:
return llm_result
return self._rule_based_intent(message)
def _call_hermes_intent(self, message: str) -> Optional[dict]:
"""呼叫 Hermes3 做輕量意圖分類(非批量分析)。失敗回 None 由上層降級。"""
if not message.strip():
return self._rule_based_intent(message)
system = (
"你是一位繁體中文意圖分類器。讀取使用者訊息,輸出單行 JSON"
"嚴禁任何解釋文字與 markdown 標記。欄位:\n"
'{"intent": "greeting|help|query_sales|analyze_competitor|'
'report|product_info|unknown", "confidence": 0.0~1.0, '
'"complexity_score": 0.0~1.0, "requires_data_fetch": true|false, '
'"preliminary_answer": "若為 greeting/help 請回一句繁中問候,否則空字串"}\n'
"規則greeting/help 類 complexity_score<=0.3;涉及數據、報告、日期、"
"品牌、競品對比者 complexity_score>=0.7 且 requires_data_fetch=true。"
)
prompt = f"使用者訊息:{message}\n輸出 JSON"
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Hermes 意圖分類 token / fallback
with log_ai_call(
caller='hermes_intent',
provider='gcp_ollama',
model=HERMES_MODEL,
meta={'route': 'ollama_first'},
) as _ctx:
try:
ollama = OllamaService(model=HERMES_MODEL)
resp = ollama.generate(
prompt=prompt,
model=HERMES_MODEL,
system_prompt=system,
temperature=0.1,
timeout=HERMES_TIMEOUT,
keep_alive=HERMES_KEEP_ALIVE, # ADR-012避免冷啟動 timeout
allow_111_fallback=HERMES_ALLOW_111_FALLBACK,
)
_ctx.set_provider(get_provider_tag(resp.host or ''))
_ctx.set_model(resp.model or HERMES_MODEL)
_ctx.set_tokens(
input=resp.input_tokens,
output=resp.output_tokens,
)
_ctx.add_meta('host', resp.host)
_ctx.add_meta('host_label', get_host_label(resp.host or ''))
if resp.model and resp.model != HERMES_MODEL:
_ctx.add_meta('requested_model', HERMES_MODEL)
if not resp.success:
raise RuntimeError(resp.error or "ollama generate failed")
raw = (resp.content or "").strip()
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE)
raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE).strip()
data = json.loads(raw)
return {
"intent": data.get("intent", "unknown"),
"confidence": float(data.get("confidence", 0.5)),
"complexity_score": float(data.get("complexity_score", 0.5)),
"requires_data_fetch": bool(data.get("requires_data_fetch", False)),
"preliminary_answer": data.get("preliminary_answer", "") or "",
"metadata": {"source": "hermes_llm"},
}
except Exception as e:
# NOTE: 修補 commit 00591c5 殘留的孤立 f-string原 logger.warning 被誤刪)
logger.warning(
f"[Hermes.intent] Ollama 連線失敗,降級規則引擎"
f"model={HERMES_MODEL} error={type(e).__name__}: {e}"
)
_ctx.set_error(f"{type(e).__name__}: {e}")
_ctx.fallback_to_caller('hermes_rule_engine')
return None
def _rule_based_intent(self, message: str) -> dict:
"""Ollama 掛掉時的規則引擎 fallback — 永遠返回結構化結果。"""
msg = (message or "").lower().strip()
if not msg:
return {
"intent": "unknown", "confidence": 0.3, "complexity_score": 0.1,
"requires_data_fetch": False, "preliminary_answer": "",
"metadata": {"source": "rule_empty"},
}
greetings = ("hi", "hello", "哈囉", "你好", "", "早安", "午安", "晚安")
helps = ("help", "幫助", "說明", "怎麼用", "功能")
data_keywords = (
"業績", "銷量", "銷售", "營收", "排名", "威脅", "競品", "報表",
"週報", "月報", "日報", "2024", "2025", "2026",
"ppt", "report", "compare", "vs", "分析", "趨勢",
"品牌", "廠商", "商品", "加碼", "投資", "策略", "建議",
"市場", "機會", "成長", "預測", "比較", "推薦", "最佳",
)
if any(g in msg for g in greetings):
return {
"intent": "greeting", "confidence": 0.9, "complexity_score": 0.1,
"requires_data_fetch": False,
"preliminary_answer": "您好!我是 MOMO Pro 智能助理,請問需要什麼協助?",
"metadata": {"source": "rule"},
}
if any(h in msg for h in helps):
return {
"intent": "help", "confidence": 0.85, "complexity_score": 0.2,
"requires_data_fetch": False,
"preliminary_answer": "我可以協助您查詢業績、商品、競品情報與產出報告。",
"metadata": {"source": "rule"},
}
if any(k in msg for k in data_keywords):
return {
"intent": "query_sales", "confidence": 0.7, "complexity_score": 0.8,
"requires_data_fetch": True, "preliminary_answer": "",
"metadata": {"source": "rule"},
}
return {
"intent": "unknown", "confidence": 0.4, "complexity_score": 0.5,
"requires_data_fetch": False, "preliminary_answer": "",
"metadata": {"source": "rule"},
}
# ──────────────────────────────────────────────
# Step 1SQL 漏斗 — 從 226萬筆壓縮到 ~300 筆候選
# ──────────────────────────────────────────────
def _validate_snapshot_columns(self) -> tuple:
"""
校驗 daily_sales_snapshot 欄位,回傳 (商品名稱欄位, 銷售金額欄位)
如果找不到,則 raise ValueError 避免 SQL 靜默失敗
"""
with self.engine.connect() as conn:
res = conn.execute(text("SELECT * FROM daily_sales_snapshot LIMIT 0"))
cols = list(res.keys())
def find_col(keywords):
for kw in keywords:
for col in cols:
if kw in col:
return col
return None
name_col = find_col(['商品名稱', '品名', 'Name', 'Product'])
sales_col = find_col(['銷售金額', '業績', '金額', 'Amount', 'Sales', 'Total', '總業績'])
if not name_col or not sales_col:
raise ValueError(f"daily_sales_snapshot 缺少必要欄位!目前欄位: {cols}")
return name_col, sales_col
def fetch_candidates(self) -> list:
"""
競價威脅候選漏斗v2 — 直接 LEFT JOIN competitor_prices
條件近7天銷量下滑 > 10% 且 MOMO 自家有歷史價格紀錄
擴充LEFT JOIN competitor_pricesPChome 快取TTL 6h
帶回 pchome_price + competitor_tags 供 Hermes 情境分析
無競品資料的商品仍回傳pchome_price=NULL → _batch_analyze 跳過
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
name_col, sales_col = self._validate_snapshot_columns()
# 注意products.i_codeMOMO商品頁URL碼純數字
# daily_sales_snapshot.商品ID訂單目錄碼英數格式是不同 ID 系統,
# 無法直接 JOIN。改以商品名稱商品名稱 = p.name做橋接。
sql_str = f"""
WITH latest_momo_price AS (
SELECT
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
recent_sales AS (
SELECT
"{name_col}" AS product_name,
SUM(CASE WHEN snapshot_date::date >= CURRENT_DATE - 7
THEN COALESCE("{sales_col}"::numeric, 0) ELSE 0 END) AS sales_7d_curr,
SUM(CASE WHEN snapshot_date::date >= CURRENT_DATE - 14
AND snapshot_date::date < CURRENT_DATE - 7
THEN COALESCE("{sales_col}"::numeric, 0) ELSE 0 END) AS sales_7d_prev
FROM daily_sales_snapshot
GROUP BY "{name_col}"
)
SELECT
lmp.sku,
lmp.name,
lmp.category,
lmp.momo_price,
rs.sales_7d_curr,
rs.sales_7d_prev,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.tags AS competitor_tags,
cp.match_score AS competitor_match_score,
cp.match_diagnostic_json,
cp.comparison_mode
FROM latest_momo_price lmp
JOIN recent_sales rs ON rs.product_name = lmp.name
LEFT JOIN competitor_prices cp
ON cp.sku = lmp.sku
AND cp.source = 'pchome'
AND cp.expires_at > NOW()
AND cp.match_score >= 0.76
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
AND (
cp.match_diagnostic_json @> '{{"match_type":"exact","price_basis":"total_price","alert_tier":"price_alert_exact"}}'::jsonb
OR (
cp.match_diagnostic_json IS NULL
AND COALESCE(cp.tags, '[]'::jsonb) ? 'match_type_exact'
AND COALESCE(cp.tags, '[]'::jsonb) ? 'price_basis_total_price'
AND COALESCE(cp.tags, '[]'::jsonb) ? 'alert_tier_price_alert_exact'
)
)
WHERE lmp.rn = 1
AND rs.sales_7d_prev > 0
AND (rs.sales_7d_curr - rs.sales_7d_prev) / rs.sales_7d_prev < -0.10
ORDER BY (rs.sales_7d_curr - rs.sales_7d_prev) / rs.sales_7d_prev ASC
LIMIT 300
"""
sql = text(sql_str)
with self.engine.connect() as conn:
rows = conn.execute(sql).fetchall()
return [dict(row._mapping) for row in rows]
# ──────────────────────────────────────────────
# Step 2批量注入 Hermes 3 分析
# ──────────────────────────────────────────────
def _batch_analyze(self, candidates: list, pchome_prices: dict = None) -> tuple:
"""
將候選商品 + PChome 比價資料打包交 Hermes 分析
Args:
candidates: SQL 漏斗結果 (list of dict)
v2 起 candidates 已內含 pchome_price + competitor_tags
pchome_prices: 舊式外部注入 {sku: price}(向下相容,優先度低於 candidates 內建值)
Returns:
tuple(raw_threats, items)
- raw_threats: Hermes LLM 輸出的 JSON 陣列(含 risk/action/confidence 等分類結果)
- items: Python 計算的客觀數據 source of truthsku/momo/pchome/gap_pct/sales_delta
[2026-04-18 台北] Bug-1 根治 Layer A同時回傳 items 作為客觀數據的真理來源,
防止 run() 從 LLM 輸出讀 momo_price/pchome_price 時因 LLM 漏吐 → default=0 → $0 幻覺
— Claude Opus 4.7
"""
external = pchome_prices or {}
items = []
for c in candidates:
# 優先使用 DB 快取的競品價格fallback 到外部注入
pchome_price = c.get("pchome_price") or external.get(c["sku"])
if not pchome_price:
continue
sales_prev = float(c.get("sales_7d_prev") or 0)
sales_curr = float(c.get("sales_7d_curr") or 0)
momo_price = float(c["momo_price"])
pchome_price = float(pchome_price)
delta_pct = round((sales_curr - sales_prev) / sales_prev * 100, 1) if sales_prev else 0
gap_pct = round((momo_price - pchome_price) / pchome_price * 100, 1)
# 競品語意標籤與 matcher 診斷JSONB 從 DB 來,可能是 dict/list 或 JSON 字串)
raw_tags = _parse_tag_list(c.get("competitor_tags"))
diagnostic_payload = _parse_json_payload(c.get("match_diagnostic_json"))
match_type = diagnostic_payload.get("match_type") or _tag_suffix(raw_tags, "match_type") or "exact"
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(raw_tags, "price_basis") or "total_price"
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(raw_tags, "alert_tier") or "price_alert_exact"
evidence_flags = diagnostic_payload.get("evidence_flags") or []
item = {
"sku": c["sku"],
"name": c["name"][:30], # 截斷避免 token 爆炸
"category": c.get("category", ""),
"momo": c["momo_price"],
"pchome": pchome_price,
"gap_pct": gap_pct, # Python 預算好Hermes 只做分類
"sales_delta": delta_pct,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
"match_score": round(float(c.get("competitor_match_score") or 0), 3),
"pchome_id": c.get("competitor_product_id") or "",
"pchome_name": (c.get("competitor_product_name") or "")[:48],
# 絕對營收金額(不傳給 Hermes 推理,只在 Python 端保留供下游金額影響量化)
"_sales_curr": sales_curr,
"_sales_prev": sales_prev,
}
if raw_tags:
item["competitor_tags"] = raw_tags # 語意情境給 Hermes 加分
if evidence_flags:
item["match_evidence"] = evidence_flags
items.append(item)
if not items:
return [], []
mcp_ctx = build_mcp_context(topics=["market_trends", "holiday_calendar", "seasonal_insights"])
# 餵給 LLM 的版本:剝除底線開頭的 Python-only 欄位(避免 token 爆炸 + 防 LLM 嘗試引用)
items_for_llm = [
{k: v for k, v in item.items() if not k.startswith("_")}
for item in items
]
prompt = (
f"【市場外部情報 (MCP)】\n{mcp_ctx}\n\n"
f"分析以下 {len(items_for_llm)} 支商品的競價威脅,回傳前 {TOP_N} 個最高風險商品。\n\n"
f"注意match_type / price_basis / alert_tier 是比對系統的硬證據;"
f"只有 alert_tier=price_alert_exact 的 exact 同款可判定直接價格威脅,"
f"其他一律只能建議人工覆核身份、包裝或單位價。\n\n"
f"資料:{json.dumps(items_for_llm, ensure_ascii=False)}\n\n"
f"輸出格式JSON 陣列,每筆含):\n"
f'[{{"sku": string, "name": string, "category": string, '
f'"momo_price": number, "pchome_price": number, '
f'"gap_pct": number, "sales_7d_delta_pct": number, '
f'"risk": "HIGH|MED|LOW", "recommended_action": string, "confidence": number}}]'
)
# Phase 1 v5.0: 包 ai_call_logger 追蹤 Hermes 競價分析 token / fallback
with log_ai_call(
caller='hermes_analyst',
provider='gcp_ollama',
model=HERMES_MODEL,
meta={
'route': 'ollama_first',
'item_count': len(items),
'top_n': TOP_N,
},
) as _ctx:
try:
ollama = OllamaService(model=HERMES_MODEL)
resp = ollama.generate(
prompt=prompt,
model=HERMES_MODEL,
system_prompt=self.SYSTEM_PROMPT,
temperature=0.1,
timeout=HERMES_TIMEOUT,
keep_alive=HERMES_KEEP_ALIVE,
allow_111_fallback=HERMES_ALLOW_111_FALLBACK,
)
_ctx.set_provider(get_provider_tag(resp.host or ''))
_ctx.set_model(resp.model or HERMES_MODEL)
_ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens)
_ctx.add_meta('host', resp.host)
_ctx.add_meta('host_label', get_host_label(resp.host or ''))
if resp.model and resp.model != HERMES_MODEL:
_ctx.add_meta('requested_model', HERMES_MODEL)
if not resp.success:
raise RuntimeError(resp.error or "ollama generate failed")
except Exception as e:
_ctx.set_error(f"{type(e).__name__}: {e}")
raise
raw = (resp.content or "").strip()
duration_sec = round(resp.total_duration or 0, 1)
eval_tokens_raw = resp.output_tokens
logger.info(
f"[Hermes] 推理耗時 {duration_sec}s"
f"輸入 {len(items)}tokens={eval_tokens_raw},回應長度 {len(raw)}"
)
# 儲存統計供 footprint 使用(掛在 instance 上供 run() 讀取)
self._last_stats = {
"duration_sec": duration_sec,
"tokens": eval_tokens_raw,
"host": resp.host,
"host_label": get_host_label(resp.host or '')
}
# P0-1 修復:剝除 Hermes 可能輸出的 markdown code fence
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE)
raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE)
raw = raw.strip()
logger.debug("[Hermes] 已剝除 markdown code fence")
return json.loads(raw), items
# ──────────────────────────────────────────────
# 公開介面
# ──────────────────────────────────────────────
def run(self, pchome_prices: dict = None) -> AnalysisResult:
"""
執行完整競價情報分析流程
Args:
pchome_prices: 舊式外部注入 {sku: price}(可選,向下相容)
v2 起競品價格由 competitor_prices DB 表提供fetch_candidates LEFT JOIN
若 DB 快取缺漏,此參數可作 fallback 補充
Returns:
AnalysisResult
"""
start = time.time()
try:
candidates = self.fetch_candidates()
logger.info(f"[漏斗] 候選商品 {len(candidates)}")
if not candidates:
return AnalysisResult(
success=True, threats=[], total_candidates=0,
analysis_duration_sec=time.time() - start,
)
raw_threats, ground_items = self._batch_analyze(candidates, pchome_prices or {})
# [2026-04-18 台北] Bug-1 根治 Layer A
# 客觀數據momo/pchome/gap_pct/sales_delta從 Python items 讀,不從 LLM 輸出讀
# LLM 只保留分類結果risk / recommended_action / confidence
# 避免 Hermes 漏吐欄位 → default=0 → Telegram $0 幻覺 — Claude Opus 4.7
items_by_sku = {i["sku"]: i for i in ground_items}
threats = []
for t in raw_threats:
sku = t.get("sku")
ground = items_by_sku.get(sku)
if not ground:
logger.warning(
f"[Hermes] LLM 回吐未知 SKU={sku},不在 items 清單,防幻覺跳過"
)
continue
threats.append(PriceThreat(
sku=sku,
name=ground["name"], # Python truth
category=ground.get("category", ""), # Python truth
momo_price=float(ground["momo"]), # Python truth根治 $0
pchome_price=float(ground["pchome"]), # Python truth根治 $0
gap_pct=float(ground["gap_pct"]), # Python truth
sales_7d_delta_pct=float(ground["sales_delta"]), # Python truth
risk=t.get("risk", "LOW"), # LLM 分類
recommended_action=t.get("recommended_action", ""), # LLM 洞察
confidence=float(t.get("confidence", 0.5)), # LLM 信心度
# 絕對營收金額:純 Python truth供下游金額影響量化B' 軌)
sales_7d_curr_amount=float(ground.get("_sales_curr", 0) or 0),
sales_7d_prev_amount=float(ground.get("_sales_prev", 0) or 0),
match_type=str(ground.get("match_type") or "exact"),
price_basis=str(ground.get("price_basis") or "total_price"),
alert_tier=str(ground.get("alert_tier") or "price_alert_exact"),
match_score=float(ground.get("match_score") or 0),
competitor_product_id=str(ground.get("pchome_id") or ""),
competitor_product_name=str(ground.get("pchome_name") or ""),
competitor_tags=tuple(ground.get("competitor_tags") or ()),
))
hermes_stats = getattr(self, "_last_stats", {})
return AnalysisResult(
success=True,
threats=threats,
total_candidates=len(candidates),
analysis_duration_sec=round(time.time() - start, 2),
hermes_tokens=hermes_stats.get("tokens", 0),
)
except requests.Timeout:
return AnalysisResult(
success=False, threats=[], total_candidates=0,
analysis_duration_sec=time.time() - start,
error="Hermes 推理超時(>120s候選數量可能過多",
)
except json.JSONDecodeError as e:
return AnalysisResult(
success=False, threats=[], total_candidates=0,
analysis_duration_sec=time.time() - start,
error=f"Hermes JSON 解析失敗:{e}",
)
except Exception as e:
logger.exception("[HermesAnalyst] 分析失敗")
return AnalysisResult(
success=False, threats=[], total_candidates=0,
analysis_duration_sec=time.time() - start,
error=str(e),
)
# ─────────────────────────────────────────────
# CLI 快速測試(不依賴 DB
# python3 services/hermes_analyst_service.py
# ─────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
service = HermesAnalystService()
fake_candidates = [
{"sku": "A001", "name": "玻尿酸面膜10片裝", "category": "美妝保養",
"momo_price": 320, "sales_7d_curr": 58000, "sales_7d_prev": 100000},
{"sku": "A003", "name": "舒特膚AD乳液200ml", "category": "美妝保養",
"momo_price": 1200, "sales_7d_curr": 65000, "sales_7d_prev": 100000},
{"sku": "A005", "name": "玻尿酸精華液30ml", "category": "美妝保養",
"momo_price": 890, "sales_7d_curr": 72000, "sales_7d_prev": 100000},
{"sku": "A007", "name": "眼霜15ml", "category": "美妝保養",
"momo_price": 680, "sales_7d_curr": 82000, "sales_7d_prev": 100000},
{"sku": "A009", "name": "美白化妝水150ml", "category": "美妝保養",
"momo_price": 420, "sales_7d_curr": 78000, "sales_7d_prev": 100000},
]
fake_pchome = {"A001": 280, "A003": 980, "A005": 760, "A007": 590, "A009": 350}
print("=== AI分析師 競價分析 CLI 測試 ===\n")
raw = service._batch_analyze(fake_candidates, fake_pchome)
for t in raw:
icon = {"HIGH": "🔴", "MED": "🟡", "LOW": "🟢"}.get(t.get("risk", ""), "")
print(f"{icon} [{t['sku']}] {t['name']}")
print(f" MOMO ${t['momo_price']} vs PChome ${t['pchome_price']} → 價差 {t['gap_pct']:.1f}%")
print(f" 銷量 {t['sales_7d_delta_pct']}% | 建議:{t['recommended_action']}\n")