強化商品比價身份分級與告警路由
All checks were successful
CD Pipeline / deploy (push) Successful in 1m9s

This commit is contained in:
OoO
2026-05-21 14:03:51 +08:00
committed by AiderHeal Bot
parent c329d96dff
commit 78b6f156ba
11 changed files with 644 additions and 46 deletions

View File

@@ -323,7 +323,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.364"
SYSTEM_VERSION = "V10.365"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -13,6 +13,7 @@
## 📅 詳細更新日誌 (考古存檔)
### 2026-05-21瀏覽器測試守門與 PChome 熱路徑優化
- **V10.365 專業比價分級連動**: MOMO/PChome matcher 新增 `match_type``price_basis``alert_tier` 與 evidence flags將「高信心同款 / 同商品不同包裝 / 同系列不同款 / 可比但需覆核 / 非同款」寫入 diagnostics 與 tagsfeeder、競價情報 repository、Hermes payload、NemoTron 派發與 Telegram 告警格式同步讀取同一份分級。NemoTron 也新增硬閘門:非 `exact + total_price + price_alert_exact` 的項目即使模型回傳 price alert也會改走人工覆核避免不同包裝或同系列不同款被直接建議降價。
- **V10.364 111 context cap**: 111 fallback 即使降到 `llama3.2:latest`Ollama 仍可能用 131k context 啟動 runner導致 3B 模型也吃到 10GB+;新增 `OLLAMA_111_NUM_CTX=4096`,落到 111 時強制縮 context並把 `llama3.2:latest` 加入零成本模型表,避免觀測台 unknown model warning。
- **V10.363 Dashing Diva variant-safe search**: PChome/MOMO matcher 針對 Dashing Diva 美甲片補「商品頁目錄有 30片/盒、MOMO 標題省略片數」的安全豁免,只限同品牌、同美甲片線、同具名款式錨點;搜尋詞也優先帶出 `月影柔霧``銀絲柔彩` 等款式名,降低同系列不同款式互撞。
- **V10.362 111 fallback shrink-to-3B**: 111 Mac 實測 `hermes3` / `qwen2.5-coder` 雖是 7B/8B但 large context runner 仍會佔用 6-10GB RSS 並推高 swap111 fallback 改為所有 7B+、vision 與 long-context 文字生成都降級到 `llama3.2:latest``ai_calls.model` 也會記錄實際降級模型並把原請求模型放入 `meta.requested_model`

View File

@@ -11,6 +11,7 @@
from __future__ import annotations
import os
import json
import pickle
import time
from datetime import date, datetime, timedelta
@@ -100,6 +101,26 @@ MATCH_DIAGNOSTIC_REASON_LABELS = {
"unit_comparable": "需單位價",
"price_ratio_extreme": "價差極端",
"price_ratio_wide": "價差過大",
"catalog_count_omission": "目錄入數待確認",
}
MATCH_TYPE_LABELS = {
"exact": "高信心同款",
"same_product_different_pack": "同商品不同包裝",
"same_line_variant": "同系列不同款",
"comparable": "可比但需覆核",
"no_match": "非同款",
}
PRICE_BASIS_LABELS = {
"total_price": "總價可比",
"unit_price": "單位價可比",
"manual_review": "人工覆核後可比",
"none": "不可比",
}
ALERT_TIER_LABELS = {
"price_alert_exact": "可直接價格告警",
"unit_price_review": "單位價覆核",
"identity_review": "身份覆核",
"suppress": "不告警",
}
COMPETITOR_INTEL_CACHE_TTL_SECONDS = int(os.getenv("COMPETITOR_INTEL_CACHE_TTL_SECONDS", "1800"))
_BASE_DIR = Path(__file__).resolve().parents[1]
@@ -135,6 +156,41 @@ def _attempt_action_label(status: Any) -> str:
return ATTEMPT_ACTION_LABELS.get(str(status or ""), "人工確認比對證據")
def _parse_json_payload(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
if not value:
return {}
if isinstance(value, str):
try:
payload = json.loads(value)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
return {}
def _parse_tag_list(value: Any) -> list[str]:
if isinstance(value, list):
return [str(item) for item in value if item]
if isinstance(value, str):
try:
payload = json.loads(value)
if isinstance(payload, list):
return [str(item) for item in payload if item]
except Exception:
return []
return []
def _tag_suffix(tags: list[str], prefix: str) -> str:
marker = f"{prefix}_"
for tag in tags:
if tag.startswith(marker):
return tag.removeprefix(marker)
return ""
def _empty_manual_review_summary() -> dict[str, Any]:
return {
"total": 0,
@@ -196,6 +252,16 @@ def _format_competitor_review_item(row: dict[str, Any]) -> dict[str, Any]:
item = dict(row)
unit_comparison = _build_unit_comparison_for_attempt(item)
match_diagnostic = item.get("error_message") or ""
diagnostic_payload = _parse_json_payload(item.get("match_diagnostic_json"))
tags = _parse_tag_list(item.get("tags"))
match_type = (
diagnostic_payload.get("match_type")
or _tag_suffix(tags, "match_type")
or ("same_product_different_pack" if item.get("attempt_status") in UNIT_COMPARABLE_STATUSES else "")
)
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(tags, "price_basis") or ""
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(tags, "alert_tier") or ""
evidence_flags = diagnostic_payload.get("evidence_flags") or []
diagnostic_reasons = _extract_match_diagnostic_reasons(match_diagnostic)
return {
"sku": str(item.get("sku") or ""),
@@ -211,6 +277,13 @@ def _format_competitor_review_item(row: dict[str, Any]) -> dict[str, Any]:
"candidate_pc_price": _num(item.get("best_competitor_price")),
"best_match_score": _num(item.get("best_match_score")),
"match_diagnostic": match_diagnostic,
"match_type": match_type,
"match_type_label": MATCH_TYPE_LABELS.get(match_type, match_type or "待判讀"),
"price_basis": price_basis,
"price_basis_label": PRICE_BASIS_LABELS.get(price_basis, price_basis or "待判讀"),
"alert_tier": alert_tier,
"alert_tier_label": ALERT_TIER_LABELS.get(alert_tier, alert_tier or "待判讀"),
"evidence_flags": list(evidence_flags) if isinstance(evidence_flags, list) else [],
"diagnostic_reasons": diagnostic_reasons,
"diagnostic_reason_text": "".join(reason["label"] for reason in diagnostic_reasons),
"attempted_at": _date_label(item.get("attempted_at")),
@@ -589,6 +662,9 @@ def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.tags,
cp.match_diagnostic_json,
cp.comparison_mode,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
@@ -608,6 +684,9 @@ def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.tags,
vc.match_diagnostic_json,
vc.comparison_mode,
vc.crawled_at,
(latest_price.momo_price - vc.pchome_price) AS gap_amount,
((latest_price.momo_price - vc.pchome_price) / vc.pchome_price * 100) AS gap_pct
@@ -631,6 +710,11 @@ def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
result = []
for row in rows:
diagnostic_payload = _parse_json_payload(row.get("match_diagnostic_json"))
tags = _parse_tag_list(row.get("tags"))
match_type = diagnostic_payload.get("match_type") or _tag_suffix(tags, "match_type")
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(tags, "price_basis")
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(tags, "alert_tier")
result.append({
"sku": str(row.get("sku") or ""),
"name": row.get("name") or "",
@@ -642,6 +726,12 @@ def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
"match_score": _num(row.get("match_score")),
"pchome_id": row.get("competitor_product_id"),
"pchome_name": row.get("competitor_product_name") or "",
"match_type": match_type,
"match_type_label": MATCH_TYPE_LABELS.get(match_type, match_type or "待判讀"),
"price_basis": price_basis,
"price_basis_label": PRICE_BASIS_LABELS.get(price_basis, price_basis or "待判讀"),
"alert_tier": alert_tier,
"alert_tier_label": ALERT_TIER_LABELS.get(alert_tier, alert_tier or "待判讀"),
"crawled_at": _date_label(row.get("crawled_at")),
})
return result
@@ -752,6 +842,7 @@ def _review_queue_cte_and_filter(
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
@@ -770,6 +861,7 @@ def _review_queue_cte_and_filter(
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.match_diagnostic_json,
la.error_message,
la.attempted_at,
CASE
@@ -900,6 +992,7 @@ def _fetch_competitor_review_queue_uncached(engine, limit: int = 12) -> list[dic
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
@@ -917,6 +1010,7 @@ def _fetch_competitor_review_queue_uncached(engine, limit: int = 12) -> list[dic
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.match_diagnostic_json,
la.error_message,
la.attempted_at
FROM latest_momo lm
@@ -997,6 +1091,7 @@ def fetch_competitor_comparison_results(
NULL AS best_competitor_product_name,
NULL AS best_competitor_price,
NULL AS best_match_score,
NULL AS match_diagnostic_json,
NULL AS error_message,
NULL AS attempted_at
WHERE FALSE
@@ -1049,6 +1144,7 @@ def fetch_competitor_comparison_results(
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
@@ -1079,6 +1175,9 @@ def fetch_competitor_comparison_results(
cph.competitor_product_id,
cph.competitor_product_name,
cph.match_score,
cph.tags,
cph.match_diagnostic_json,
cph.comparison_mode,
cph.crawled_at,
'competitor_price_history' AS competitor_source
FROM competitor_price_history cph
@@ -1095,6 +1194,9 @@ def fetch_competitor_comparison_results(
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.tags,
cp.match_diagnostic_json,
cp.comparison_mode,
cp.crawled_at,
'competitor_prices' AS competitor_source
FROM competitor_prices cp
@@ -1132,6 +1234,9 @@ def fetch_competitor_comparison_results(
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.tags,
vc.match_diagnostic_json,
vc.comparison_mode,
vc.crawled_at AS competitor_crawled_at,
vc.competitor_source,
la.attempt_status,
@@ -1140,6 +1245,7 @@ def fetch_competitor_comparison_results(
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.match_diagnostic_json AS attempt_match_diagnostic_json,
la.error_message,
la.attempted_at,
{sales_select}
@@ -1162,6 +1268,13 @@ def fetch_competitor_comparison_results(
pchome_id = row.get("competitor_product_id")
found = bool(row.get("pchome_price"))
match_status = "matched" if found else (row.get("attempt_status") or "no_valid_match")
diagnostic_payload = _parse_json_payload(
row.get("match_diagnostic_json") or row.get("attempt_match_diagnostic_json")
)
tags = _parse_tag_list(row.get("tags"))
match_type = diagnostic_payload.get("match_type") or _tag_suffix(tags, "match_type")
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(tags, "price_basis")
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(tags, "alert_tier")
unit_comparison = _build_unit_comparison_for_attempt({
"attempt_status": match_status,
"name": row.get("name") or "",
@@ -1189,6 +1302,12 @@ def fetch_competitor_comparison_results(
"match_status": match_status,
"match_status_label": _attempt_status_label(match_status),
"action_label": _attempt_action_label(match_status),
"match_type": match_type,
"match_type_label": MATCH_TYPE_LABELS.get(match_type, match_type or "待判讀"),
"price_basis": price_basis,
"price_basis_label": PRICE_BASIS_LABELS.get(price_basis, price_basis or "待判讀"),
"alert_tier": alert_tier,
"alert_tier_label": ALERT_TIER_LABELS.get(alert_tier, alert_tier or "待判讀"),
"candidate_count": int(row.get("candidate_count") or 0),
"best_match_score": _num(row.get("best_match_score")),
"match_diagnostic": row.get("error_message") or "",

View File

@@ -139,6 +139,17 @@ def _extract_tags(pchome_product) -> list:
return tags
def _extend_match_tags(tags: list, diagnostics, extra: list[str] = None) -> list:
"""Attach matcher evidence tags in one place so all feeder lanes agree."""
tags = list(tags or [])
tags.extend(getattr(diagnostics, "tags", []) or [])
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
if extra:
tags.extend(extra)
return list(dict.fromkeys(tag for tag in tags if tag))
def _clean_search_text(value: str) -> str:
value = re.sub(r'[()]', ' ', value or '')
value = re.sub(r'[【】\[\]]', ' ', value)
@@ -246,6 +257,9 @@ def _format_match_diagnostics(diagnostics) -> str:
f"seq={diagnostics.sequence_score}; type={diagnostics.type_score}; "
f"penalty={diagnostics.price_penalty}; veto={diagnostics.hard_veto}; "
f"mode={getattr(diagnostics, 'comparison_mode', 'exact_identity')}; "
f"match_type={getattr(diagnostics, 'match_type', '')}; "
f"price_basis={getattr(diagnostics, 'price_basis', '')}; "
f"alert_tier={getattr(diagnostics, 'alert_tier', '')}; "
f"reasons={reasons}"
)
@@ -264,6 +278,10 @@ def _match_diagnostics_payload(diagnostics) -> dict:
"price_penalty": getattr(diagnostics, "price_penalty", None),
"hard_veto": bool(getattr(diagnostics, "hard_veto", False)),
"comparison_mode": getattr(diagnostics, "comparison_mode", "exact_identity"),
"match_type": getattr(diagnostics, "match_type", None),
"price_basis": getattr(diagnostics, "price_basis", None),
"alert_tier": getattr(diagnostics, "alert_tier", None),
"evidence_flags": list(getattr(diagnostics, "evidence_flags", ()) or ()),
"reasons": list(getattr(diagnostics, "reasons", ()) or ()),
}
@@ -1331,10 +1349,7 @@ class CompetitorPriceFeeder:
if manual_accept_override:
score = max(score, MIN_MATCH_SCORE)
tags = _extract_tags(best_product)
tags.extend(getattr(diagnostics, "tags", []))
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
tags = _extend_match_tags(_extract_tags(best_product), diagnostics)
if manual_accept_override:
tags.extend(["manual_review", "manual_accept"])
tags = [tag for tag in tags if tag != "identity_veto"]
@@ -1511,12 +1526,11 @@ class CompetitorPriceFeeder:
continue
if score >= MIN_MATCH_SCORE and not getattr(diagnostics, "hard_veto", False):
tags = _extract_tags(best_product)
tags.extend(getattr(diagnostics, "tags", []))
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
tags.extend(["refresh_known_identity", "fresh_search_recovery", "missing_known_product_id"])
tags = list(dict.fromkeys(tags))
tags = _extend_match_tags(
_extract_tags(best_product),
diagnostics,
["refresh_known_identity", "fresh_search_recovery", "missing_known_product_id"],
)
should_write, write_reason = self._should_upsert_competitor_price(
sku,
@@ -1640,14 +1654,10 @@ class CompetitorPriceFeeder:
best_product, score, diagnostics = recovered_product, recovered_score, recovered_diagnostics
if score >= MIN_MATCH_SCORE:
tags = _extract_tags(best_product)
tags.extend(getattr(diagnostics, "tags", []))
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
tags.append("refresh_known_identity")
extras = ["refresh_known_identity"]
if recovery_terms:
tags.append("fresh_search_recovery")
tags = list(dict.fromkeys(tags))
extras.append("fresh_search_recovery")
tags = _extend_match_tags(_extract_tags(best_product), diagnostics, extras)
should_write, write_reason = self._should_upsert_competitor_price(
sku,
@@ -1724,12 +1734,7 @@ class CompetitorPriceFeeder:
attempts_written += 1
continue
tags = _extract_tags(best_product)
tags.extend(getattr(diagnostics, "tags", []))
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
tags.append("refresh_known_identity")
tags = list(dict.fromkeys(tags))
tags = _extend_match_tags(_extract_tags(best_product), diagnostics, ["refresh_known_identity"])
should_write, write_reason = self._should_upsert_competitor_price(
sku,

View File

@@ -21,6 +21,7 @@ import uuid
from dataclasses import dataclass
from typing import Optional
import requests
from sqlalchemy import text
from services.mcp_context_service import build_mcp_context
from services.ollama_service import OllamaService, get_host_label, get_provider_tag
@@ -36,6 +37,39 @@ HERMES_KEEP_ALIVE = "24h" # ADR-012保持模型熱駐留避免被別模
TOP_N = 20 # 輸出前 N 個威脅,控制 NemoTron 每次消耗配額
def _parse_json_payload(value) -> dict:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
try:
payload = json.loads(value)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
return {}
def _parse_tag_list(value) -> list[str]:
if isinstance(value, list):
return [str(item) for item in value if item]
if isinstance(value, str) and value.strip():
try:
payload = json.loads(value)
if isinstance(payload, list):
return [str(item) for item in payload if item]
except Exception:
return []
return []
def _tag_suffix(tags: list[str], prefix: str) -> str:
marker = f"{prefix}_"
for tag in tags:
if str(tag).startswith(marker):
return str(tag).removeprefix(marker)
return ""
@dataclass
class PriceThreat:
sku: str
@@ -50,6 +84,13 @@ class PriceThreat:
confidence: float
sales_7d_curr_amount: float = 0.0 # 過去 7 日營收金額NT$),供下游金額影響量化
sales_7d_prev_amount: float = 0.0 # 前 7 日營收金額NT$),供「可挽回營收」估算
match_type: str = "exact"
price_basis: str = "total_price"
alert_tier: str = "price_alert_exact"
match_score: float = 0.0
competitor_product_id: str = ""
competitor_product_name: str = ""
competitor_tags: tuple[str, ...] = ()
@dataclass
@@ -86,7 +127,10 @@ class HermesAnalystService:
4. confidence 根據數據確定性給分0.0~1.0
5. 【防幻覺鐵律】絕對禁止捏造輸入資料中未提供的數據(如折扣%、促銷活動、隱藏優惠)。
只能基於 gap_pct、sales_delta、competitor_tags 等已提供欄位做推論。
6. 【非價格異常路由】若 gap_pct 絕對值 < 5% 但 sales_delta < -30%
6. 【身份證據鐵律】match_type / price_basis / alert_tier 是系統比對結論,不得改寫。
- 只有 alert_tier="price_alert_exact" 且 match_type="exact" 且 price_basis="total_price" 可當成直接價格威脅。
- 其他情況只能建議「人工覆核身份/包裝/單位價」,不可建議直接降價。
7. 【非價格異常路由】若 gap_pct 絕對值 < 5% 但 sales_delta < -30%
- 判定為「非價格因素異常」(高機率:缺貨、下架、平台流量異常、頁面問題)
- risk 設為 MEDrecommended_action 必須寫「價差接近零但業績異常下滑,建議立即人工走查前台頁面(確認是否缺貨/下架/頁面異常)」
- confidence 設為 0.5(因缺乏確切原因)"""
@@ -391,8 +435,12 @@ class HermesAnalystService:
rs.sales_7d_curr,
rs.sales_7d_prev,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.tags AS competitor_tags,
cp.match_score AS competitor_match_score
cp.match_score AS competitor_match_score,
cp.match_diagnostic_json,
cp.comparison_mode
FROM latest_momo_price lmp
JOIN recent_sales rs ON rs.product_name = lmp.name
LEFT JOIN competitor_prices cp
@@ -450,14 +498,13 @@ class HermesAnalystService:
delta_pct = round((sales_curr - sales_prev) / sales_prev * 100, 1) if sales_prev else 0
gap_pct = round((momo_price - pchome_price) / pchome_price * 100, 1)
# 競品語意標籤JSONB 從 DB 來,可能是 list 或 JSON 字串)
raw_tags = c.get("competitor_tags") or []
if isinstance(raw_tags, str):
try:
import json as _json
raw_tags = _json.loads(raw_tags)
except Exception:
raw_tags = []
# 競品語意標籤與 matcher 診斷JSONB 從 DB 來,可能是 dict/list 或 JSON 字串)
raw_tags = _parse_tag_list(c.get("competitor_tags"))
diagnostic_payload = _parse_json_payload(c.get("match_diagnostic_json"))
match_type = diagnostic_payload.get("match_type") or _tag_suffix(raw_tags, "match_type") or "exact"
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(raw_tags, "price_basis") or "total_price"
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(raw_tags, "alert_tier") or "price_alert_exact"
evidence_flags = diagnostic_payload.get("evidence_flags") or []
item = {
"sku": c["sku"],
@@ -467,12 +514,20 @@ class HermesAnalystService:
"pchome": pchome_price,
"gap_pct": gap_pct, # Python 預算好Hermes 只做分類
"sales_delta": delta_pct,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
"match_score": round(float(c.get("competitor_match_score") or 0), 3),
"pchome_id": c.get("competitor_product_id") or "",
"pchome_name": (c.get("competitor_product_name") or "")[:48],
# 絕對營收金額(不傳給 Hermes 推理,只在 Python 端保留供下游金額影響量化)
"_sales_curr": sales_curr,
"_sales_prev": sales_prev,
}
if raw_tags:
item["competitor_tags"] = raw_tags # 語意情境給 Hermes 加分
if evidence_flags:
item["match_evidence"] = evidence_flags
items.append(item)
@@ -489,6 +544,9 @@ class HermesAnalystService:
prompt = (
f"【市場外部情報 (MCP)】\n{mcp_ctx}\n\n"
f"分析以下 {len(items_for_llm)} 支商品的競價威脅,回傳前 {TOP_N} 個最高風險商品。\n\n"
f"注意match_type / price_basis / alert_tier 是比對系統的硬證據;"
f"只有 alert_tier=price_alert_exact 的 exact 同款可判定直接價格威脅,"
f"其他一律只能建議人工覆核身份、包裝或單位價。\n\n"
f"資料:{json.dumps(items_for_llm, ensure_ascii=False)}\n\n"
f"輸出格式JSON 陣列,每筆含):\n"
f'[{{"sku": string, "name": string, "category": string, '
@@ -613,6 +671,13 @@ class HermesAnalystService:
# 絕對營收金額:純 Python truth供下游金額影響量化B' 軌)
sales_7d_curr_amount=float(ground.get("_sales_curr", 0) or 0),
sales_7d_prev_amount=float(ground.get("_sales_prev", 0) or 0),
match_type=str(ground.get("match_type") or "exact"),
price_basis=str(ground.get("price_basis") or "total_price"),
alert_tier=str(ground.get("alert_tier") or "price_alert_exact"),
match_score=float(ground.get("match_score") or 0),
competitor_product_id=str(ground.get("pchome_id") or ""),
competitor_product_name=str(ground.get("pchome_name") or ""),
competitor_tags=tuple(ground.get("competitor_tags") or ()),
))
hermes_stats = getattr(self, "_last_stats", {})

View File

@@ -589,18 +589,30 @@ class MatchDiagnostics:
hard_veto: bool
reasons: tuple[str, ...]
comparison_mode: str = "exact_identity"
match_type: str = "exact"
price_basis: str = "total_price"
alert_tier: str = "price_alert_exact"
evidence_flags: tuple[str, ...] = ()
@property
def tags(self) -> list[str]:
tags: list[str] = ["identity_v2"]
if self.comparison_mode:
tags.append(f"comparison_{self.comparison_mode}")
if self.match_type:
tags.append(f"match_type_{self.match_type}")
if self.price_basis:
tags.append(f"price_basis_{self.price_basis}")
if self.alert_tier:
tags.append(f"alert_tier_{self.alert_tier}")
if self.brand_score >= 0.95:
tags.append("brand_match")
if self.spec_score >= 0.85:
tags.append("spec_match")
if self.hard_veto:
tags.append("identity_veto")
for flag in self.evidence_flags:
tags.append(f"evidence_{flag}")
return tags
@@ -1333,6 +1345,108 @@ def _has_model_line_conflict(left: ProductIdentity, right: ProductIdentity) -> b
return not bool(left_tokens & right_tokens)
def _dedupe_tuple(values: Iterable[str]) -> tuple[str, ...]:
result: list[str] = []
seen: set[str] = set()
for value in values:
if not value or value in seen:
continue
seen.add(value)
result.append(value)
return tuple(result)
def _build_evidence_flags(
*,
brand_score: float,
token_score: float,
spec_score: float,
sequence_score: float,
type_score: float,
shared_anchor: str,
shared_models: set[str],
reasons: Iterable[str],
catalog_count_omission: bool,
) -> tuple[str, ...]:
reason_set = set(reasons)
flags: list[str] = []
if brand_score >= 0.95:
flags.append("brand")
if spec_score >= 0.85:
flags.append("spec")
if token_score >= 0.72:
flags.append("tokens")
if sequence_score >= 0.70:
flags.append("name_sequence")
if type_score >= 0.95:
flags.append("product_type")
if shared_anchor:
flags.append("identity_anchor")
if shared_models:
flags.append("model_token")
if catalog_count_omission:
flags.append("catalog_count_omission")
for reason in (
"unit_comparable",
"variant_option_conflict",
"variant_descriptor_conflict",
"count_conflict",
"bundle_offer_conflict",
"multi_component_conflict",
"refill_pack_conflict",
"price_ratio_extreme",
"price_ratio_wide",
):
if reason in reason_set:
flags.append(reason)
return _dedupe_tuple(flags)
def _classify_match_quality(
*,
score: float,
brand_score: float,
token_score: float,
spec_score: float,
sequence_score: float,
type_score: float,
hard_veto: bool,
comparison_mode: str,
reasons: Iterable[str],
shared_anchor: str,
shared_models: set[str],
catalog_count_omission: bool,
) -> tuple[str, str, str]:
"""Map raw matcher scores into operator-facing price comparison lanes."""
reason_set = set(reasons)
if comparison_mode == "unit_comparable":
return "same_product_different_pack", "unit_price", "unit_price_review"
if hard_veto or comparison_mode == "not_comparable":
variant_conflict = bool(reason_set & {"variant_option_conflict", "variant_descriptor_conflict"})
same_line_signal = bool(shared_anchor and brand_score >= 0.95 and type_score >= 0.55)
if variant_conflict and same_line_signal:
return "same_line_variant", "manual_review", "suppress"
return "no_match", "none", "suppress"
direct_spec_evidence = spec_score >= 0.85 or bool(shared_models)
strong_identity_evidence = (
brand_score >= 0.95
and type_score >= 0.55
and score >= 0.86
and (direct_spec_evidence or (shared_anchor and token_score >= 0.62 and sequence_score >= 0.58))
)
if strong_identity_evidence and not catalog_count_omission:
return "exact", "total_price", "price_alert_exact"
if score >= 0.76:
if catalog_count_omission:
return "same_product_different_pack", "manual_review", "unit_price_review"
return "comparable", "manual_review", "identity_review"
return "no_match", "none", "suppress"
def score_marketplace_match(
momo_name: str,
competitor_name: str,
@@ -1382,6 +1496,9 @@ def score_marketplace_match(
if chinese_name_score < 0.16:
reasons.append("product_line_conflict")
shared_anchor = _shared_identity_anchor(left, right)
catalog_count_omission = _allow_catalog_count_omission(left, right)
if catalog_count_omission:
reasons.append("catalog_count_omission")
variant_descriptor_conflict = _has_variant_descriptor_conflict(left, right, shared_anchor)
variant_option_conflict = _has_explicit_variant_option_conflict(left, right, shared_anchor)
if variant_option_conflict:
@@ -1581,6 +1698,32 @@ def score_marketplace_match(
if hard_veto:
score = min(score, 0.74 if comparison_mode == "unit_comparable" else 0.32)
score = max(0.0, min(1.0, score))
reason_tuple = _dedupe_tuple(reasons)
match_type, price_basis, alert_tier = _classify_match_quality(
score=score,
brand_score=brand_score,
token_score=token_score,
spec_score=spec_score,
sequence_score=sequence_score,
type_score=type_score,
hard_veto=hard_veto,
comparison_mode=comparison_mode,
reasons=reason_tuple,
shared_anchor=shared_anchor,
shared_models=shared_models,
catalog_count_omission=catalog_count_omission,
)
evidence_flags = _build_evidence_flags(
brand_score=brand_score,
token_score=token_score,
spec_score=spec_score,
sequence_score=sequence_score,
type_score=type_score,
shared_anchor=shared_anchor,
shared_models=shared_models,
reasons=reason_tuple,
catalog_count_omission=catalog_count_omission,
)
return MatchDiagnostics(
score=round(score, 3),
@@ -1591,8 +1734,12 @@ def score_marketplace_match(
type_score=round(type_score, 3),
price_penalty=round(price_penalty, 3),
hard_veto=hard_veto,
reasons=tuple(reasons),
reasons=reason_tuple,
comparison_mode=comparison_mode,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
evidence_flags=evidence_flags,
)

View File

@@ -334,6 +334,73 @@ ICON_COMPETE = "🏆"
ICON_AI = "🧠"
ICON_FOOTPRINT = "⚙️"
MATCH_TYPE_LABELS = {
"exact": "高信心同款",
"same_product_different_pack": "同商品不同包裝",
"same_line_variant": "同系列不同款",
"comparable": "可比但需覆核",
"no_match": "非同款",
}
PRICE_BASIS_LABELS = {
"total_price": "總價可比",
"unit_price": "單位價可比",
"manual_review": "人工覆核後可比",
"none": "不可比",
}
ALERT_TIER_LABELS = {
"price_alert_exact": "可直接價格告警",
"unit_price_review": "單位價覆核",
"identity_review": "身份覆核",
"suppress": "不告警",
}
def _threat_match_metadata(threat) -> dict:
return {
"match_type": getattr(threat, "match_type", "exact") or "exact",
"price_basis": getattr(threat, "price_basis", "total_price") or "total_price",
"alert_tier": getattr(threat, "alert_tier", "price_alert_exact") or "price_alert_exact",
"match_score": float(getattr(threat, "match_score", 0) or 0),
"competitor_product_id": getattr(threat, "competitor_product_id", "") or "",
"competitor_product_name": getattr(threat, "competitor_product_name", "") or "",
}
def _can_direct_price_alert(threat) -> bool:
meta = _threat_match_metadata(threat)
return (
meta["match_type"] == "exact"
and meta["price_basis"] == "total_price"
and meta["alert_tier"] == "price_alert_exact"
)
def _format_match_evidence_block(
*,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> str:
match_type = match_type or "exact"
price_basis = price_basis or "total_price"
alert_tier = alert_tier or "price_alert_exact"
lines = [
f"{ICON_COMPETE} 比對證據:",
f"• 身份分級:{MATCH_TYPE_LABELS.get(match_type, match_type)}",
f"• 比價基準:{PRICE_BASIS_LABELS.get(price_basis, price_basis)}",
f"• 告警路徑:{ALERT_TIER_LABELS.get(alert_tier, alert_tier)}",
]
if match_score:
lines.append(f"• Match score{match_score:.3f}")
if competitor_product_id:
lines.append(f"• PChome ID{competitor_product_id}")
if competitor_product_name:
lines.append(f"• PChome 品名:{str(competitor_product_name)[:68]}")
return "\n".join(lines) + "\n\n"
# ── tool_calls 解析NIM 與 qwen3 共用)──────────────────────────
def _parse_tool_calls_struct(tool_calls: list) -> list:
@@ -504,6 +571,7 @@ class NemotronDispatcher:
"risk": t.risk,
"action": t.recommended_action,
"confidence": t.confidence,
**_threat_match_metadata(t),
}
for t in threats
],
@@ -521,11 +589,13 @@ class NemotronDispatcher:
f"當前市場背景 (MCP)\n{mcp_ctx}\n\n"
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
"路由鐵律(依序判斷,命中即停):\n"
"1. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"1. match_type 不是 exact或 price_basis 不是 total_price或 alert_tier 不是 price_alert_exact "
"→ 不可直接價格告警,呼叫 flag_for_human_reviewconcern 說明需覆核身份、包裝或單位價。\n"
"2. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
"2. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"3. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"4. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"3. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"4. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"5. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"每支商品只呼叫一個工具。\n"
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
" 1. 嚴禁簡體字(例:不可用「参给当为来国发会说时间过从实现这话动问题」,"
@@ -650,6 +720,7 @@ class NemotronDispatcher:
"risk": t.risk,
"action": t.recommended_action,
"confidence": t.confidence,
**_threat_match_metadata(t),
}
for t in threats
],
@@ -665,11 +736,13 @@ class NemotronDispatcher:
f"當前市場背景 (MCP)\n{mcp_ctx}\n\n"
"根據 Hermes 分析師提供的威脅清單,決定對每支商品呼叫哪個工具。\n"
"路由鐵律(依序判斷,命中即停):\n"
"1. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"1. match_type 不是 exact或 price_basis 不是 total_price或 alert_tier 不是 price_alert_exact "
"→ 不可直接價格告警,呼叫 flag_for_human_reviewconcern 說明需覆核身份、包裝或單位價。\n"
"2. gap_pct < 5% 且 sales_delta < -30% → 非價格異常,呼叫 flag_for_human_review"
"concern 說明『價差接近 0 但銷量大幅下滑,疑似缺貨/下架/平台流量異常,請人工走查前台』。\n"
"2. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"3. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"4. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"3. gap_pct ≥ 5% 且 risk=HIGH → trigger_price_alert填入 momo_price, comp_price\n"
"4. 我方價格低於競品且銷量正成長 → add_to_recommendation。\n"
"5. confidence < 0.6 或其他複雜情況 → flag_for_human_review。\n"
"每支商品只呼叫一個工具。\n"
"【語言鐵律 — 台灣標準正體中文(繁體)】所有文字欄位必須遵守:\n"
" 1. 嚴禁簡體字、嚴禁異體字(例:不可用「亊」,必須用「事」)\n"
@@ -797,6 +870,31 @@ class NemotronDispatcher:
# B' 軌:每個 threat 預先算金額影響,所有路徑統一注入
impact = _compute_business_impact(t)
rl, rp = impact["revenue_loss_7d"], impact["recommended_price"]
match_meta = _threat_match_metadata(t)
if not _can_direct_price_alert(t):
self._exec_flag_for_human_review(
sku=t.sku,
name=t.name,
concern=(
"🟡 [規則引擎] 比對證據尚未達直接價格告警門檻;"
f"match_type={match_meta['match_type']}"
f"price_basis={match_meta['price_basis']}"
f"alert_tier={match_meta['alert_tier']}"
"請先覆核是否為同款、同包裝或需改用單位價。"
),
confidence=max(float(getattr(t, "confidence", 0.5) or 0.5), 0.75),
footprint=footprint,
momo_price=t.momo_price,
comp_price=t.pchome_price,
gap_pct=t.gap_pct,
sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=rl,
recommended_price=rp,
**match_meta,
)
dispatched += 1
continue
if t.gap_pct < 5 and t.sales_7d_delta_pct < -30:
# Rule 1價差微小但銷量大跌 → 非定價問題,人工確認
@@ -812,6 +910,7 @@ class NemotronDispatcher:
momo_price=t.momo_price, comp_price=t.pchome_price,
gap_pct=t.gap_pct, sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=rl, recommended_price=rp,
**_threat_match_metadata(t),
)
elif t.gap_pct >= 5 and t.risk == "HIGH":
# Rule 2高價差 HIGH 風險 → 競價告警
@@ -823,6 +922,7 @@ class NemotronDispatcher:
momo_price=t.momo_price, comp_price=t.pchome_price,
footprint=footprint,
revenue_loss_7d=rl, recommended_price=rp,
**_threat_match_metadata(t),
)
elif t.gap_pct < 0 and t.sales_7d_delta_pct > 0:
# Rule 3我方具競爭力 + 銷量正成長 → 推薦
@@ -849,6 +949,7 @@ class NemotronDispatcher:
momo_price=t.momo_price, comp_price=t.pchome_price,
gap_pct=t.gap_pct, sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=rl, recommended_price=rp,
**_threat_match_metadata(t),
)
dispatched += 1
except Exception as e:
@@ -873,6 +974,12 @@ class NemotronDispatcher:
footprint: str,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "exact",
price_basis: str = "total_price",
alert_tier: str = "price_alert_exact",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> str:
"""
類別一:緊急告警
@@ -907,6 +1014,14 @@ class NemotronDispatcher:
f"(毛利策略可再加溢價)"
)
impact_block = ("\n".join(impact_lines) + "\n\n") if impact_lines else ""
match_block = _format_match_evidence_block(
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
# AI 洞察:唯一允許 LLM 文字進入的欄位
ai_insight = _sanitize_text(action, fallback="請人工評估議價空間")
@@ -919,6 +1034,7 @@ class NemotronDispatcher:
f"• 我方價格:{mp_str}\n"
f"• 競品價格:{cp_str}\n"
f"• 銷量變化:{sales_delta:+.1f}%\n\n"
f"{match_block}"
f"{impact_block}"
f"{ICON_AI} AI 洞察(信心度 {conf_pct}%\n"
f"{ai_insight}\n\n"
@@ -933,6 +1049,12 @@ class NemotronDispatcher:
gap_pct: float = None, sales_delta: float = None,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
) -> str:
"""
類別二:人工覆核
@@ -957,11 +1079,20 @@ class NemotronDispatcher:
if recommended_price is not None and recommended_price > 0:
impact_lines.append(f"🎯 跟進競品建議價NT$ {recommended_price:,.0f}")
impact_block = ("\n".join(f"{l}" for l in impact_lines) + "\n") if impact_lines else ""
match_block = _format_match_evidence_block(
match_type=match_type or "unknown",
price_basis=price_basis or "manual_review",
alert_tier=alert_tier or "identity_review",
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
return (
f"{ICON_WARNING} [{HEADER_DISPATCHER}] 異常波動需人工覆核\n\n"
f"🔍 待查商品:[{sku}] {name}\n\n"
f"{data_block}"
f"{match_block}"
f"{impact_block}\n"
f"🧠 AI 診斷:\n"
f"{concern}\n\n"
@@ -1004,6 +1135,12 @@ class NemotronDispatcher:
footprint: str = "",
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "exact",
price_basis: str = "total_price",
alert_tier: str = "price_alert_exact",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
):
"""發送語意化競價高危險預警
@@ -1019,6 +1156,12 @@ class NemotronDispatcher:
gap_pct, sales_delta, action, confidence, footprint,
revenue_loss_7d=revenue_loss_7d,
recommended_price=recommended_price,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
self._send_telegram(msg)
logger.info(
@@ -1033,7 +1176,12 @@ class NemotronDispatcher:
metadata={"gap_pct": gap_pct, "sales_delta": sales_delta, "confidence": confidence,
"momo_price": momo_price, "comp_price": comp_price,
"revenue_loss_7d": revenue_loss_7d,
"recommended_price": recommended_price},
"recommended_price": recommended_price,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
"match_score": match_score,
"competitor_product_id": competitor_product_id},
)
def _exec_add_to_recommendation(
@@ -1106,6 +1254,12 @@ class NemotronDispatcher:
gap_pct: float = None, sales_delta: float = None,
revenue_loss_7d: float = 0.0,
recommended_price: Optional[float] = None,
match_type: str = "",
price_basis: str = "",
alert_tier: str = "",
match_score: float = 0.0,
competitor_product_id: str = "",
competitor_product_name: str = "",
):
"""發送語意化人工覆核請求"""
concern = _sanitize_text(concern, fallback=f"數據走勢違背常理,疑似缺貨或前台異常。")
@@ -1115,6 +1269,12 @@ class NemotronDispatcher:
gap_pct=gap_pct, sales_delta=sales_delta,
revenue_loss_7d=revenue_loss_7d,
recommended_price=recommended_price,
match_type=match_type,
price_basis=price_basis,
alert_tier=alert_tier,
match_score=match_score,
competitor_product_id=competitor_product_id,
competitor_product_name=competitor_product_name,
)
self._send_telegram(msg)
logger.info(
@@ -1128,7 +1288,12 @@ class NemotronDispatcher:
metadata={"confidence": confidence, "gap_pct": gap_pct, "sales_delta": sales_delta,
"momo_price": momo_price, "comp_price": comp_price,
"revenue_loss_7d": revenue_loss_7d,
"recommended_price": recommended_price},
"recommended_price": recommended_price,
"match_type": match_type,
"price_basis": price_basis,
"alert_tier": alert_tier,
"match_score": match_score,
"competitor_product_id": competitor_product_id},
)
def _exec_route_to_km(
@@ -1408,6 +1573,7 @@ class NemotronDispatcher:
sales_delta=t.sales_7d_delta_pct,
revenue_loss_7d=impact["revenue_loss_7d"],
recommended_price=impact["recommended_price"],
**_threat_match_metadata(t),
)
dispatched += 1
except Exception as e:
@@ -1584,6 +1750,23 @@ class NemotronDispatcher:
args["footprint"] = footprint_text
t = threat_map.get(args.get("sku"))
if tool_name == "trigger_price_alert" and t and not _can_direct_price_alert(t):
match_meta = _threat_match_metadata(t)
tool_name = "flag_for_human_review"
handler = TOOL_MAP[tool_name]
args = {
"sku": getattr(t, "sku", args.get("sku")),
"name": getattr(t, "name", args.get("name")),
"concern": (
"比對證據尚未達直接價格告警門檻;"
f"match_type={match_meta['match_type']}"
f"price_basis={match_meta['price_basis']}"
f"alert_tier={match_meta['alert_tier']}"
"請先覆核是否為同款、同包裝或需改用單位價。"
),
"confidence": max(float(getattr(t, "confidence", 0.5) or 0.5), 0.75),
"footprint": footprint_text,
}
if tool_name == "trigger_price_alert" and t:
args["momo_price"] = getattr(t, "momo_price", None)
args["comp_price"] = getattr(t, "pchome_price", None)
@@ -1592,6 +1775,7 @@ class NemotronDispatcher:
impact = _compute_business_impact(t)
args["revenue_loss_7d"] = impact["revenue_loss_7d"]
args["recommended_price"] = impact["recommended_price"]
args.update(_threat_match_metadata(t))
elif tool_name == "flag_for_human_review" and t:
args["momo_price"] = getattr(t, "momo_price", None)
args["comp_price"] = getattr(t, "pchome_price", None)
@@ -1600,6 +1784,7 @@ class NemotronDispatcher:
impact = _compute_business_impact(t)
args["revenue_loss_7d"] = impact["revenue_loss_7d"]
args["recommended_price"] = impact["recommended_price"]
args.update(_threat_match_metadata(t))
elif tool_name == "add_to_recommendation":
args["footprint_data"] = footprint_data
args["threat"] = t

View File

@@ -95,6 +95,29 @@ def test_competitor_feeder_persists_all_match_attempt_outcomes():
assert "idx_comp_match_attempts_sku_source_time" in migration
def test_match_diagnostics_payload_carries_professional_match_lanes():
from services.competitor_price_feeder import _match_diagnostics_payload, _extend_match_tags
from services.marketplace_product_matcher import score_marketplace_match
diagnostics = score_marketplace_match(
"理膚寶水 B5 全面修復霜 40ml x2 超值組",
"理膚寶水 全面修復霜 B5 40ml",
momo_price=1199,
competitor_price=679,
)
payload = _match_diagnostics_payload(diagnostics)
tags = _extend_match_tags([], diagnostics)
assert payload["match_type"] == "same_product_different_pack"
assert payload["price_basis"] == "unit_price"
assert payload["alert_tier"] == "unit_price_review"
assert "unit_comparable" in payload["evidence_flags"]
assert "match_type_same_product_different_pack" in tags
assert "price_basis_unit_price" in tags
assert "alert_tier_unit_price_review" in tags
def test_competitor_match_review_service_closes_human_review_loop():
service_source = (ROOT / "services/competitor_match_review_service.py").read_text(encoding="utf-8")
migration = (ROOT / "migrations/039_create_competitor_match_reviews.sql").read_text(encoding="utf-8")

View File

@@ -113,7 +113,15 @@ def test_hermes_batch_analyze_uses_ollama_service_and_logs_secondary(monkeypatch
'pchome_price': 100,
'sales_7d_prev': 1000,
'sales_7d_curr': 700,
'competitor_tags': [],
'competitor_tags': [
'identity_v2',
'match_type_same_product_different_pack',
'price_basis_unit_price',
'alert_tier_unit_price_review',
],
'competitor_match_score': 0.74,
'competitor_product_id': 'PCH-UNIT',
'competitor_product_name': '測試商品 2入組',
}]
svc = hermes_mod.HermesAnalystService()
@@ -121,6 +129,9 @@ def test_hermes_batch_analyze_uses_ollama_service_and_logs_secondary(monkeypatch
assert raw_threats[0]['sku'] == 'A1'
assert items[0]['gap_pct'] == 20.0
assert items[0]['match_type'] == 'same_product_different_pack'
assert items[0]['price_basis'] == 'unit_price'
assert items[0]['alert_tier'] == 'unit_price_review'
call_kwargs = fake_service.instances[0].generate_calls[0]
assert call_kwargs['system_prompt'] == svc.SYSTEM_PROMPT
assert call_kwargs['keep_alive'] == hermes_mod.HERMES_KEEP_ALIVE

View File

@@ -15,6 +15,10 @@ def test_marketplace_matcher_accepts_same_product_identity():
assert diagnostics.hard_veto is False
assert "brand_match" in diagnostics.tags
assert "spec_match" in diagnostics.tags
assert diagnostics.match_type == "exact"
assert diagnostics.price_basis == "total_price"
assert diagnostics.alert_tier == "price_alert_exact"
assert "match_type_exact" in diagnostics.tags
def test_marketplace_matcher_rejects_brand_conflict_even_when_volume_matches():
@@ -105,10 +109,29 @@ def test_marketplace_matcher_marks_bundle_single_as_unit_comparable_not_exact():
assert diagnostics.score < 0.76
assert diagnostics.hard_veto is True
assert diagnostics.comparison_mode == "unit_comparable"
assert diagnostics.match_type == "same_product_different_pack"
assert diagnostics.price_basis == "unit_price"
assert diagnostics.alert_tier == "unit_price_review"
assert "unit_comparable" in diagnostics.reasons
assert "comparison_unit_comparable" in diagnostics.tags
def test_marketplace_matcher_suppresses_same_line_variant_price_alert():
from services.marketplace_product_matcher import score_marketplace_match
diagnostics = score_marketplace_match(
"【Dashing Diva】時尚潮流美甲片 月影柔霧",
"Dashing Diva 時尚潮流美甲片 銀絲柔彩 30片",
momo_price=399,
competitor_price=399,
)
assert diagnostics.match_type == "no_match"
assert diagnostics.price_basis == "none"
assert diagnostics.alert_tier == "suppress"
assert "variant_descriptor_conflict" in diagnostics.reasons
def test_unit_price_comparison_builds_normalized_evidence():
from services.marketplace_product_matcher import build_unit_price_comparison

View File

@@ -56,6 +56,25 @@ def test_dispatch_falls_back_to_hermes_rules_without_nim_api_key(monkeypatch):
assert calls[0]["kind"] == "price_alert"
def test_dispatch_routes_non_exact_match_to_human_review(monkeypatch):
import services.nemoton_dispatcher_service as module
monkeypatch.setattr(module, "NIM_API_KEY", "")
dispatcher = module.NemotronDispatcher()
calls = _patch_execution_methods(monkeypatch, dispatcher)
threat = FakeThreat("SKU-UNIT", "測試品 2入組")
threat.match_type = "same_product_different_pack"
threat.price_basis = "unit_price"
threat.alert_tier = "unit_price_review"
threat.match_score = 0.74
result = dispatcher.dispatch([threat], hermes_stats={"duration_sec": 1})
assert result["dispatched"] == 1
assert calls[0]["kind"] == "human_review"
assert "unit_price_review" in calls[0]["kwargs"]["concern"]
def test_dispatch_falls_back_to_hermes_rules_on_nim_timeout(monkeypatch):
import requests
import services.nemoton_dispatcher_service as module