#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Hermes 3 競價情報分析服務 (Module 2) 角色:分析師 (Analyst) 模型:hermes3:latest @ OllamaService 三主機級聯(GCP-A → GCP-B → 111) 輸入:SQL 漏斗篩選後的候選商品(~300筆) 輸出:Top N 威脅清單(結構化 JSON)→ 交給 NemoTron dispatcher 架構位置: SQL漏斗 → [本服務] → NemotronDispatcher → Telegram 告警 """ import asyncio import json import logging import os import re import time import uuid from dataclasses import dataclass from typing import Optional import requests from sqlalchemy import text from services.mcp_context_service import build_mcp_context from services.ollama_service import OllamaService, get_host_label, get_provider_tag from services.ai_call_logger import log_ai_call # Operation Ollama-First v5.0 P1 from services.rag_service import rag_service, is_rag_enabled # Phase 11: RAG-first 快取 logger = logging.getLogger(__name__) from config import HERMES_TIMEOUT HERMES_MODEL = "hermes3:latest" HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "5m") HERMES_ALLOW_111_FALLBACK = os.getenv("HERMES_ALLOW_111_FALLBACK", "false").strip().lower() in ( "1", "true", "yes", "on", ) TOP_N = 20 # 輸出前 N 個威脅,控制 NemoTron 每次消耗配額 def _parse_json_payload(value) -> dict: if isinstance(value, dict): return value if isinstance(value, str) and value.strip(): try: payload = json.loads(value) return payload if isinstance(payload, dict) else {} except Exception: return {} return {} def _parse_tag_list(value) -> list[str]: if isinstance(value, list): return [str(item) for item in value if item] if isinstance(value, str) and value.strip(): try: payload = json.loads(value) if isinstance(payload, list): return [str(item) for item in payload if item] except Exception: return [] return [] def _tag_suffix(tags: list[str], prefix: str) -> str: marker = f"{prefix}_" for tag in tags: if str(tag).startswith(marker): return str(tag).removeprefix(marker) return "" @dataclass class PriceThreat: sku: str name: str category: str momo_price: float pchome_price: float gap_pct: float # 正值代表我貴 sales_7d_delta_pct: float risk: str # HIGH / MED / LOW recommended_action: str confidence: float sales_7d_curr_amount: float = 0.0 # 過去 7 日營收金額(NT$),供下游金額影響量化 sales_7d_prev_amount: float = 0.0 # 前 7 日營收金額(NT$),供「可挽回營收」估算 match_type: str = "exact" price_basis: str = "total_price" alert_tier: str = "price_alert_exact" match_score: float = 0.0 competitor_product_id: str = "" competitor_product_name: str = "" competitor_tags: tuple[str, ...] = () @dataclass class AnalysisResult: success: bool threats: list total_candidates: int analysis_duration_sec: float hermes_tokens: int = 0 # Ollama eval_count(供 footprint 顯示) error: Optional[str] = None class HermesAnalystService: """ 競價情報分析師 負責從 DB 撈候選商品、交給 Hermes 3 分析、回傳結構化威脅清單 """ SYSTEM_PROMPT = """你是一位台灣電商競價情報分析師。 規則: 1. 輸出「只能」是有效的 JSON 陣列,禁止任何前言或解釋文字 2. recommended_action 必須使用「台灣標準正體中文(繁體)」。 【語言鐵律 — 2026-04-18 台北強化】 a. 嚴禁簡體字(例:不可用「参给当为来国发会说时间过从实现这话动问题」, 必須用「參給當為來國發會說時間過從實現這話動問題」) b. 嚴禁異體字(例:不可用「亊」,必須用「事」) c. 嚴禁短語重複(例:不可輸出「當前事當前事」這種坍塌) d. 嚴禁無意義字元組合或亂碼 e. 若無法產出合理的繁體中文說明,直接輸出「建議人工評估」 3. 風險等級判定: - HIGH:價差 > 15% 且 7天銷量跌幅 > 20% - MED:價差 > 10% 或 7天銷量跌幅 > 15% - LOW:其他 4. confidence 根據數據確定性給分(0.0~1.0) 5. 【防幻覺鐵律】絕對禁止捏造輸入資料中未提供的數據(如折扣%、促銷活動、隱藏優惠)。 只能基於 gap_pct、sales_delta、competitor_tags 等已提供欄位做推論。 6. 【身份證據鐵律】match_type / price_basis / alert_tier 是系統比對結論,不得改寫。 - 只有 alert_tier="price_alert_exact" 且 match_type="exact" 且 price_basis="total_price" 可當成直接價格威脅。 - 其他情況只能建議「人工覆核身份/包裝/單位價」,不可建議直接降價。 7. 【非價格異常路由】若 gap_pct 絕對值 < 5% 但 sales_delta < -30%: - 判定為「非價格因素異常」(高機率:缺貨、下架、平台流量異常、頁面問題) - risk 設為 MED,recommended_action 必須寫「價差接近零但業績異常下滑,建議立即人工走查前台頁面(確認是否缺貨/下架/頁面異常)」 - confidence 設為 0.5(因缺乏確切原因)""" def __init__(self, engine=None): self.engine = engine # SQLAlchemy engine,可外部注入 # ────────────────────────────────────────────── # Phase 11: RAG-first Q&A 介面(Operation Ollama-First v5.0) # ────────────────────────────────────────────── def analyze(self, query: str, request_id: Optional[str] = None) -> dict: """RAG-first Q&A 入口(feature flag RAG_ENABLED 預設 OFF)。 流程: 1. RAG_ENABLED=true 時先查 ai_insights(cosine >= 0.85 直接回) 2. 命中 → 不呼 LLM,回 {"source": "rag", "content": ..., "rag_log_id": ...} 3. 未命中或 flag OFF → fallback 到 _call_hermes_intent(既有 LLM 路徑) - LLM 結果同時 enqueue learning_episodes 給 PromotionGate 蒸餾 - 回 {"source": "llm", "content": ..., "intent": ...} Args: query: 使用者問題(繁中) request_id: 與 ai_calls.request_id 串鏈;None 自動產生 Returns: dict 一律含 'source' / 'content' 兩個欄位,caller 不必自己分支判斷。 """ rid = request_id or f"hermes-{uuid.uuid4().hex[:8]}" # ── Step 1: RAG-first(feature flag 預設 OFF → 立即跳過)── if is_rag_enabled(): rag = rag_service.query( text=query, caller='hermes_analyst', threshold=0.85, request_id=rid, mark_saved_call=True, ) if rag.has_high_confidence: logger.info( "[Hermes.analyze] RAG hit request_id=%s top_score=%.3f hits=%d", rid, rag.hits[0].get('score', 0), len(rag.hits), ) return { 'source': 'rag', 'content': rag.synthesize(), 'rag_log_id': rag.log_id, 'request_id': rid, } # ── Step 2: LLM fallback(既有 _call_hermes_intent 路徑)── intent_result = self._call_hermes_intent(query) or self._rule_based_intent(query) content = intent_result.get('preliminary_answer') or '' # ── Step 3: enqueue learning_episodes(給 PromotionGate 蒸餾;失敗不影響主流程)── if content: try: from services.learning_pipeline import learning_pipeline learning_pipeline.enqueue( episode_type='llm_response', raw_content=content, source_table=None, # Hermes 此處未取 ai_calls.id;保留 None 讓 source check 通過 source_id=None, ) except Exception as exc: logger.debug("[Hermes.analyze] learning_pipeline enqueue 跳過: %s", exc) return { 'source': 'llm', 'content': content, 'intent': intent_result.get('intent'), 'metadata': intent_result.get('metadata', {}), 'request_id': rid, } # ────────────────────────────────────────────── # L1 意圖分析介面(給 EventRouter / Telegram NLP 使用) # ────────────────────────────────────────────── async def handle_l1(self, event: dict, ctx: dict) -> dict: """L1 語意理解:輸入 event(含 message),輸出 intent / complexity_score / requires_data_fetch。 Contract: event: {"message": str, "user_id": int, "chat_id": int, ...} ctx: 既有 agent_context(可為空 dict) Returns: { "intent": str, # greeting / help / query_sales / analyze_competitor / report / unknown "confidence": float, "complexity_score": float, # 0.0~1.0 "requires_data_fetch": bool, "preliminary_answer": str, "metadata": dict, } 實作策略: 1) 先用 Hermes3 LLM 做意圖分類(timeout 短、temp 低) 2) Ollama 掛掉時降級為關鍵字規則引擎,保證永遠回傳結構化結果 3) 任何例外都被 graceful 吞掉,不會讓 handle_message 整個炸 """ message = (event or {}).get("message", "") or "" llm_result = None try: llm_result = await asyncio.get_running_loop().run_in_executor( None, self._call_hermes_intent, message ) except Exception as e: logger.warning( f"[Hermes.handle_l1] run_in_executor 例外,降級規則引擎" f"({type(e).__name__}: {e})", exc_info=True, ) if llm_result: return llm_result return self._rule_based_intent(message) def _call_hermes_intent(self, message: str) -> Optional[dict]: """呼叫 Hermes3 做輕量意圖分類(非批量分析)。失敗回 None 由上層降級。""" if not message.strip(): return self._rule_based_intent(message) system = ( "你是一位繁體中文意圖分類器。讀取使用者訊息,輸出單行 JSON," "嚴禁任何解釋文字與 markdown 標記。欄位:\n" '{"intent": "greeting|help|query_sales|analyze_competitor|' 'report|product_info|unknown", "confidence": 0.0~1.0, ' '"complexity_score": 0.0~1.0, "requires_data_fetch": true|false, ' '"preliminary_answer": "若為 greeting/help 請回一句繁中問候,否則空字串"}\n' "規則:greeting/help 類 complexity_score<=0.3;涉及數據、報告、日期、" "品牌、競品對比者 complexity_score>=0.7 且 requires_data_fetch=true。" ) prompt = f"使用者訊息:{message}\n輸出 JSON:" # Phase 1 v5.0: 包 ai_call_logger 追蹤 Hermes 意圖分類 token / fallback with log_ai_call( caller='hermes_intent', provider='gcp_ollama', model=HERMES_MODEL, meta={'route': 'ollama_first'}, ) as _ctx: try: ollama = OllamaService(model=HERMES_MODEL) resp = ollama.generate( prompt=prompt, model=HERMES_MODEL, system_prompt=system, temperature=0.1, timeout=HERMES_TIMEOUT, keep_alive=HERMES_KEEP_ALIVE, # ADR-012:避免冷啟動 timeout allow_111_fallback=HERMES_ALLOW_111_FALLBACK, ) _ctx.set_provider(get_provider_tag(resp.host or '')) _ctx.set_model(resp.model or HERMES_MODEL) _ctx.set_tokens( input=resp.input_tokens, output=resp.output_tokens, ) _ctx.add_meta('host', resp.host) _ctx.add_meta('host_label', get_host_label(resp.host or '')) if resp.model and resp.model != HERMES_MODEL: _ctx.add_meta('requested_model', HERMES_MODEL) if not resp.success: raise RuntimeError(resp.error or "ollama generate failed") raw = (resp.content or "").strip() if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE) raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE).strip() data = json.loads(raw) return { "intent": data.get("intent", "unknown"), "confidence": float(data.get("confidence", 0.5)), "complexity_score": float(data.get("complexity_score", 0.5)), "requires_data_fetch": bool(data.get("requires_data_fetch", False)), "preliminary_answer": data.get("preliminary_answer", "") or "", "metadata": {"source": "hermes_llm"}, } except Exception as e: # NOTE: 修補 commit 00591c5 殘留的孤立 f-string(原 logger.warning 被誤刪) logger.warning( f"[Hermes.intent] Ollama 連線失敗,降級規則引擎" f"(model={HERMES_MODEL} error={type(e).__name__}: {e})" ) _ctx.set_error(f"{type(e).__name__}: {e}") _ctx.fallback_to_caller('hermes_rule_engine') return None def _rule_based_intent(self, message: str) -> dict: """Ollama 掛掉時的規則引擎 fallback — 永遠返回結構化結果。""" msg = (message or "").lower().strip() if not msg: return { "intent": "unknown", "confidence": 0.3, "complexity_score": 0.1, "requires_data_fetch": False, "preliminary_answer": "", "metadata": {"source": "rule_empty"}, } greetings = ("hi", "hello", "哈囉", "你好", "嗨", "早安", "午安", "晚安") helps = ("help", "幫助", "說明", "怎麼用", "功能") data_keywords = ( "業績", "銷量", "銷售", "營收", "排名", "威脅", "競品", "報表", "週報", "月報", "日報", "2024", "2025", "2026", "ppt", "report", "compare", "vs", "分析", "趨勢", "品牌", "廠商", "商品", "加碼", "投資", "策略", "建議", "市場", "機會", "成長", "預測", "比較", "推薦", "最佳", ) if any(g in msg for g in greetings): return { "intent": "greeting", "confidence": 0.9, "complexity_score": 0.1, "requires_data_fetch": False, "preliminary_answer": "您好!我是 MOMO Pro 智能助理,請問需要什麼協助?", "metadata": {"source": "rule"}, } if any(h in msg for h in helps): return { "intent": "help", "confidence": 0.85, "complexity_score": 0.2, "requires_data_fetch": False, "preliminary_answer": "我可以協助您查詢業績、商品、競品情報與產出報告。", "metadata": {"source": "rule"}, } if any(k in msg for k in data_keywords): return { "intent": "query_sales", "confidence": 0.7, "complexity_score": 0.8, "requires_data_fetch": True, "preliminary_answer": "", "metadata": {"source": "rule"}, } return { "intent": "unknown", "confidence": 0.4, "complexity_score": 0.5, "requires_data_fetch": False, "preliminary_answer": "", "metadata": {"source": "rule"}, } # ────────────────────────────────────────────── # Step 1:SQL 漏斗 — 從 226萬筆壓縮到 ~300 筆候選 # ────────────────────────────────────────────── def _validate_snapshot_columns(self) -> tuple: """ 校驗 daily_sales_snapshot 欄位,回傳 (商品名稱欄位, 銷售金額欄位) 如果找不到,則 raise ValueError 避免 SQL 靜默失敗 """ with self.engine.connect() as conn: res = conn.execute(text("SELECT * FROM daily_sales_snapshot LIMIT 0")) cols = list(res.keys()) def find_col(keywords): for kw in keywords: for col in cols: if kw in col: return col return None name_col = find_col(['商品名稱', '品名', 'Name', 'Product']) sales_col = find_col(['銷售金額', '業績', '金額', 'Amount', 'Sales', 'Total', '總業績']) if not name_col or not sales_col: raise ValueError(f"daily_sales_snapshot 缺少必要欄位!目前欄位: {cols}") return name_col, sales_col def fetch_candidates(self) -> list: """ 競價威脅候選漏斗(v2 — 直接 LEFT JOIN competitor_prices) 條件:近7天銷量下滑 > 10% 且 MOMO 自家有歷史價格紀錄 擴充:LEFT JOIN competitor_prices(PChome 快取,預設 TTL 48h) 帶回 pchome_price + competitor_tags 供 Hermes 情境分析 無競品資料的商品仍回傳,pchome_price=NULL → _batch_analyze 跳過 """ if self.engine is None: raise RuntimeError("需要注入 SQLAlchemy engine") name_col, sales_col = self._validate_snapshot_columns() # 注意:products.i_code(MOMO商品頁URL碼,純數字)與 # daily_sales_snapshot.商品ID(訂單目錄碼,英數格式)是不同 ID 系統, # 無法直接 JOIN。改以商品名稱(商品名稱 = p.name)做橋接。 sql_str = f""" WITH latest_momo_price AS ( SELECT p.i_code AS sku, p.name, p.category, pr.price AS momo_price, ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn FROM products p JOIN price_records pr ON pr.product_id = p.id WHERE p.status = 'ACTIVE' ), recent_sales AS ( SELECT "{name_col}" AS product_name, SUM(CASE WHEN snapshot_date::date >= CURRENT_DATE - 7 THEN COALESCE("{sales_col}"::numeric, 0) ELSE 0 END) AS sales_7d_curr, SUM(CASE WHEN snapshot_date::date >= CURRENT_DATE - 14 AND snapshot_date::date < CURRENT_DATE - 7 THEN COALESCE("{sales_col}"::numeric, 0) ELSE 0 END) AS sales_7d_prev FROM daily_sales_snapshot GROUP BY "{name_col}" ) SELECT lmp.sku, lmp.name, lmp.category, lmp.momo_price, rs.sales_7d_curr, rs.sales_7d_prev, cp.price AS pchome_price, cp.competitor_product_id, cp.competitor_product_name, cp.tags AS competitor_tags, cp.match_score AS competitor_match_score, cp.match_diagnostic_json, cp.comparison_mode FROM latest_momo_price lmp JOIN recent_sales rs ON rs.product_name = lmp.name LEFT JOIN competitor_prices cp ON cp.sku = lmp.sku AND cp.source = 'pchome' AND cp.expires_at > NOW() AND cp.match_score >= 0.76 AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2' AND ( cp.match_diagnostic_json @> '{{"match_type":"exact","price_basis":"total_price","alert_tier":"price_alert_exact"}}'::jsonb OR ( cp.match_diagnostic_json IS NULL AND COALESCE(cp.tags, '[]'::jsonb) ? 'match_type_exact' AND COALESCE(cp.tags, '[]'::jsonb) ? 'price_basis_total_price' AND COALESCE(cp.tags, '[]'::jsonb) ? 'alert_tier_price_alert_exact' ) ) WHERE lmp.rn = 1 AND rs.sales_7d_prev > 0 AND (rs.sales_7d_curr - rs.sales_7d_prev) / rs.sales_7d_prev < -0.10 ORDER BY (rs.sales_7d_curr - rs.sales_7d_prev) / rs.sales_7d_prev ASC LIMIT 300 """ sql = text(sql_str) with self.engine.connect() as conn: rows = conn.execute(sql).fetchall() return [dict(row._mapping) for row in rows] # ────────────────────────────────────────────── # Step 2:批量注入 Hermes 3 分析 # ────────────────────────────────────────────── def _batch_analyze(self, candidates: list, pchome_prices: dict = None) -> tuple: """ 將候選商品 + PChome 比價資料打包交 Hermes 分析 Args: candidates: SQL 漏斗結果 (list of dict) v2 起 candidates 已內含 pchome_price + competitor_tags pchome_prices: 舊式外部注入 {sku: price}(向下相容,優先度低於 candidates 內建值) Returns: tuple(raw_threats, items) - raw_threats: Hermes LLM 輸出的 JSON 陣列(含 risk/action/confidence 等分類結果) - items: Python 計算的客觀數據 source of truth(sku/momo/pchome/gap_pct/sales_delta) [2026-04-18 台北] Bug-1 根治 Layer A:同時回傳 items 作為客觀數據的真理來源, 防止 run() 從 LLM 輸出讀 momo_price/pchome_price 時因 LLM 漏吐 → default=0 → $0 幻覺 — Claude Opus 4.7 """ external = pchome_prices or {} items = [] for c in candidates: # 優先使用 DB 快取的競品價格;fallback 到外部注入 pchome_price = c.get("pchome_price") or external.get(c["sku"]) if not pchome_price: continue sales_prev = float(c.get("sales_7d_prev") or 0) sales_curr = float(c.get("sales_7d_curr") or 0) momo_price = float(c["momo_price"]) pchome_price = float(pchome_price) delta_pct = round((sales_curr - sales_prev) / sales_prev * 100, 1) if sales_prev else 0 gap_pct = round((momo_price - pchome_price) / pchome_price * 100, 1) # 競品語意標籤與 matcher 診斷(JSONB 從 DB 來,可能是 dict/list 或 JSON 字串) raw_tags = _parse_tag_list(c.get("competitor_tags")) diagnostic_payload = _parse_json_payload(c.get("match_diagnostic_json")) match_type = diagnostic_payload.get("match_type") or _tag_suffix(raw_tags, "match_type") or "exact" price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(raw_tags, "price_basis") or "total_price" alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(raw_tags, "alert_tier") or "price_alert_exact" evidence_flags = diagnostic_payload.get("evidence_flags") or [] item = { "sku": c["sku"], "name": c["name"][:30], # 截斷避免 token 爆炸 "category": c.get("category", ""), "momo": c["momo_price"], "pchome": pchome_price, "gap_pct": gap_pct, # Python 預算好,Hermes 只做分類 "sales_delta": delta_pct, "match_type": match_type, "price_basis": price_basis, "alert_tier": alert_tier, "match_score": round(float(c.get("competitor_match_score") or 0), 3), "pchome_id": c.get("competitor_product_id") or "", "pchome_name": (c.get("competitor_product_name") or "")[:48], # 絕對營收金額(不傳給 Hermes 推理,只在 Python 端保留供下游金額影響量化) "_sales_curr": sales_curr, "_sales_prev": sales_prev, } if raw_tags: item["competitor_tags"] = raw_tags # 語意情境給 Hermes 加分 if evidence_flags: item["match_evidence"] = evidence_flags items.append(item) if not items: return [], [] mcp_ctx = build_mcp_context(topics=["market_trends", "holiday_calendar", "seasonal_insights"]) # 餵給 LLM 的版本:剝除底線開頭的 Python-only 欄位(避免 token 爆炸 + 防 LLM 嘗試引用) items_for_llm = [ {k: v for k, v in item.items() if not k.startswith("_")} for item in items ] prompt = ( f"【市場外部情報 (MCP)】\n{mcp_ctx}\n\n" f"分析以下 {len(items_for_llm)} 支商品的競價威脅,回傳前 {TOP_N} 個最高風險商品。\n\n" f"注意:match_type / price_basis / alert_tier 是比對系統的硬證據;" f"只有 alert_tier=price_alert_exact 的 exact 同款可判定直接價格威脅," f"其他一律只能建議人工覆核身份、包裝或單位價。\n\n" f"資料:{json.dumps(items_for_llm, ensure_ascii=False)}\n\n" f"輸出格式(JSON 陣列,每筆含):\n" f'[{{"sku": string, "name": string, "category": string, ' f'"momo_price": number, "pchome_price": number, ' f'"gap_pct": number, "sales_7d_delta_pct": number, ' f'"risk": "HIGH|MED|LOW", "recommended_action": string, "confidence": number}}]' ) # Phase 1 v5.0: 包 ai_call_logger 追蹤 Hermes 競價分析 token / fallback with log_ai_call( caller='hermes_analyst', provider='gcp_ollama', model=HERMES_MODEL, meta={ 'route': 'ollama_first', 'item_count': len(items), 'top_n': TOP_N, }, ) as _ctx: try: ollama = OllamaService(model=HERMES_MODEL) resp = ollama.generate( prompt=prompt, model=HERMES_MODEL, system_prompt=self.SYSTEM_PROMPT, temperature=0.1, timeout=HERMES_TIMEOUT, keep_alive=HERMES_KEEP_ALIVE, allow_111_fallback=HERMES_ALLOW_111_FALLBACK, ) _ctx.set_provider(get_provider_tag(resp.host or '')) _ctx.set_model(resp.model or HERMES_MODEL) _ctx.set_tokens(input=resp.input_tokens, output=resp.output_tokens) _ctx.add_meta('host', resp.host) _ctx.add_meta('host_label', get_host_label(resp.host or '')) if resp.model and resp.model != HERMES_MODEL: _ctx.add_meta('requested_model', HERMES_MODEL) if not resp.success: raise RuntimeError(resp.error or "ollama generate failed") except Exception as e: _ctx.set_error(f"{type(e).__name__}: {e}") raise raw = (resp.content or "").strip() duration_sec = round(resp.total_duration or 0, 1) eval_tokens_raw = resp.output_tokens logger.info( f"[Hermes] 推理耗時 {duration_sec}s," f"輸入 {len(items)} 筆,tokens={eval_tokens_raw},回應長度 {len(raw)}" ) # 儲存統計供 footprint 使用(掛在 instance 上供 run() 讀取) self._last_stats = { "duration_sec": duration_sec, "tokens": eval_tokens_raw, "host": resp.host, "host_label": get_host_label(resp.host or '') } # P0-1 修復:剝除 Hermes 可能輸出的 markdown code fence if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE) raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE) raw = raw.strip() logger.debug("[Hermes] 已剝除 markdown code fence") return json.loads(raw), items # ────────────────────────────────────────────── # 公開介面 # ────────────────────────────────────────────── def run(self, pchome_prices: dict = None) -> AnalysisResult: """ 執行完整競價情報分析流程 Args: pchome_prices: 舊式外部注入 {sku: price}(可選,向下相容) v2 起競品價格由 competitor_prices DB 表提供(fetch_candidates LEFT JOIN) 若 DB 快取缺漏,此參數可作 fallback 補充 Returns: AnalysisResult """ start = time.time() try: candidates = self.fetch_candidates() logger.info(f"[漏斗] 候選商品 {len(candidates)} 筆") if not candidates: return AnalysisResult( success=True, threats=[], total_candidates=0, analysis_duration_sec=time.time() - start, ) raw_threats, ground_items = self._batch_analyze(candidates, pchome_prices or {}) # [2026-04-18 台北] Bug-1 根治 Layer A: # 客觀數據(momo/pchome/gap_pct/sales_delta)從 Python items 讀,不從 LLM 輸出讀 # LLM 只保留分類結果(risk / recommended_action / confidence) # 避免 Hermes 漏吐欄位 → default=0 → Telegram $0 幻覺 — Claude Opus 4.7 items_by_sku = {i["sku"]: i for i in ground_items} threats = [] for t in raw_threats: sku = t.get("sku") ground = items_by_sku.get(sku) if not ground: logger.warning( f"[Hermes] LLM 回吐未知 SKU={sku},不在 items 清單,防幻覺跳過" ) continue threats.append(PriceThreat( sku=sku, name=ground["name"], # Python truth category=ground.get("category", ""), # Python truth momo_price=float(ground["momo"]), # Python truth(根治 $0) pchome_price=float(ground["pchome"]), # Python truth(根治 $0) gap_pct=float(ground["gap_pct"]), # Python truth sales_7d_delta_pct=float(ground["sales_delta"]), # Python truth risk=t.get("risk", "LOW"), # LLM 分類 recommended_action=t.get("recommended_action", ""), # LLM 洞察 confidence=float(t.get("confidence", 0.5)), # LLM 信心度 # 絕對營收金額:純 Python truth,供下游金額影響量化(B' 軌) sales_7d_curr_amount=float(ground.get("_sales_curr", 0) or 0), sales_7d_prev_amount=float(ground.get("_sales_prev", 0) or 0), match_type=str(ground.get("match_type") or "exact"), price_basis=str(ground.get("price_basis") or "total_price"), alert_tier=str(ground.get("alert_tier") or "price_alert_exact"), match_score=float(ground.get("match_score") or 0), competitor_product_id=str(ground.get("pchome_id") or ""), competitor_product_name=str(ground.get("pchome_name") or ""), competitor_tags=tuple(ground.get("competitor_tags") or ()), )) hermes_stats = getattr(self, "_last_stats", {}) return AnalysisResult( success=True, threats=threats, total_candidates=len(candidates), analysis_duration_sec=round(time.time() - start, 2), hermes_tokens=hermes_stats.get("tokens", 0), ) except requests.Timeout: return AnalysisResult( success=False, threats=[], total_candidates=0, analysis_duration_sec=time.time() - start, error="Hermes 推理超時(>120s),候選數量可能過多", ) except json.JSONDecodeError as e: return AnalysisResult( success=False, threats=[], total_candidates=0, analysis_duration_sec=time.time() - start, error=f"Hermes JSON 解析失敗:{e}", ) except Exception as e: logger.exception("[HermesAnalyst] 分析失敗") return AnalysisResult( success=False, threats=[], total_candidates=0, analysis_duration_sec=time.time() - start, error=str(e), ) # ───────────────────────────────────────────── # CLI 快速測試(不依賴 DB) # python3 services/hermes_analyst_service.py # ───────────────────────────────────────────── if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") service = HermesAnalystService() fake_candidates = [ {"sku": "A001", "name": "玻尿酸面膜10片裝", "category": "美妝保養", "momo_price": 320, "sales_7d_curr": 58000, "sales_7d_prev": 100000}, {"sku": "A003", "name": "舒特膚AD乳液200ml", "category": "美妝保養", "momo_price": 1200, "sales_7d_curr": 65000, "sales_7d_prev": 100000}, {"sku": "A005", "name": "玻尿酸精華液30ml", "category": "美妝保養", "momo_price": 890, "sales_7d_curr": 72000, "sales_7d_prev": 100000}, {"sku": "A007", "name": "眼霜15ml", "category": "美妝保養", "momo_price": 680, "sales_7d_curr": 82000, "sales_7d_prev": 100000}, {"sku": "A009", "name": "美白化妝水150ml", "category": "美妝保養", "momo_price": 420, "sales_7d_curr": 78000, "sales_7d_prev": 100000}, ] fake_pchome = {"A001": 280, "A003": 980, "A005": 760, "A007": 590, "A009": 350} print("=== AI分析師 競價分析 CLI 測試 ===\n") raw = service._batch_analyze(fake_candidates, fake_pchome) for t in raw: icon = {"HIGH": "🔴", "MED": "🟡", "LOW": "🟢"}.get(t.get("risk", ""), "⚪") print(f"{icon} [{t['sku']}] {t['name']}") print(f" MOMO ${t['momo_price']} vs PChome ${t['pchome_price']} → 價差 {t['gap_pct']:.1f}%") print(f" 銷量 {t['sales_7d_delta_pct']}% | 建議:{t['recommended_action']}\n")