#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AI 競價情報模組 E2E 整合測試 通關清單(三道關卡): 關卡一:Feeder 語意萃取 — PChome 搜尋 + 模糊比對 + tags 生成 關卡二:Hermes 情境認知 — competitor_tags 注入後 JSON 品質 關卡三:NemoTron + Telegram — DB 寫入 + 語意化排版 + 運算足跡 測試策略:使用 SQLite in-memory DB + 真實假資料,不依賴 Docker/PostgreSQL。 NIM 與 Telegram 可選跳過(設 SKIP_NIM=1 / SKIP_TELEGRAM=1 環境變數)。 執行: cd /Users/ogt/momo-pro-system python3 scripts/e2e_test_ai_pipeline.py # 跳過真實 API(純本地邏輯驗證) SKIP_NIM=1 SKIP_TELEGRAM=1 python3 scripts/e2e_test_ai_pipeline.py # 啟用 Hermes(需 192.168.0.111 可達) SKIP_NIM=1 python3 scripts/e2e_test_ai_pipeline.py """ import json import logging import os import sys import time from dataclasses import dataclass from datetime import datetime, timedelta # ── 確保從 project root 執行 ────────────────────────── PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, PROJECT_ROOT) from dotenv import load_dotenv load_dotenv(os.path.join(PROJECT_ROOT, ".env")) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("e2e") SKIP_NIM = os.getenv("SKIP_NIM", "0") == "1" SKIP_TELEGRAM = os.getenv("SKIP_TELEGRAM", "0") == "1" SKIP_HERMES = os.getenv("SKIP_HERMES", "0") == "1" SKIP_FEEDER = os.getenv("SKIP_FEEDER", "0") == "1" USE_POSTGRESQL = os.getenv("USE_POSTGRESQL", "0") == "1" # PostgreSQL 連線參數(從 env 讀,對齊 docker-compose.yml 預設值) PG_HOST = os.getenv("POSTGRES_HOST", "127.0.0.1") PG_PORT = os.getenv("POSTGRES_PORT", "5432") PG_USER = os.getenv("POSTGRES_USER", "momo") PG_PASSWORD = os.getenv("POSTGRES_PASSWORD") PG_DB = os.getenv("POSTGRES_DB", "momo_analytics") # ═══════════════════════════════════════════════════════════ # 測試基礎設施:SQLite in-memory + 建表 + 假資料 # ═══════════════════════════════════════════════════════════ def _build_test_engine(): """ 建立測試 engine: - 預設:SQLite in-memory(不需任何外部服務) - USE_POSTGRESQL=1:連接本地 Docker PostgreSQL(需先跑 migrations) """ from sqlalchemy import create_engine, text if USE_POSTGRESQL: dsn = f"postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}" logger.info(f"[Engine] 使用 PostgreSQL: {PG_HOST}:{PG_PORT}/{PG_DB}") engine = create_engine(dsn, echo=False) # PostgreSQL 模式:表已由 migrations 建立,直接回傳 return engine # SQLite in-memory:自建所有表 engine = create_engine("sqlite:///:memory:", echo=False) with engine.begin() as conn: # products conn.execute(text(""" CREATE TABLE products ( id INTEGER PRIMARY KEY AUTOINCREMENT, i_code TEXT UNIQUE NOT NULL, name TEXT NOT NULL, category TEXT, status TEXT DEFAULT 'ACTIVE', created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """)) # price_records conn.execute(text(""" CREATE TABLE price_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, product_id INTEGER NOT NULL, price REAL NOT NULL, timestamp TEXT DEFAULT CURRENT_TIMESTAMP ) """)) # daily_sales_snapshot(動態表,欄位用實際 MOMO Excel 名稱) conn.execute(text(""" CREATE TABLE daily_sales_snapshot ( id INTEGER PRIMARY KEY AUTOINCREMENT, "商品ID" TEXT, "商品名稱" TEXT, "銷售金額" REAL, snapshot_date TEXT ) """)) # competitor_prices conn.execute(text(""" CREATE TABLE competitor_prices ( id INTEGER PRIMARY KEY AUTOINCREMENT, sku TEXT NOT NULL, source TEXT NOT NULL DEFAULT 'pchome', price REAL NOT NULL, original_price REAL, discount_pct INTEGER, competitor_product_id TEXT, competitor_product_name TEXT, match_score REAL, tags TEXT DEFAULT '[]', crawled_at TEXT DEFAULT CURRENT_TIMESTAMP, expires_at TEXT, UNIQUE(sku, source) ) """)) # ai_price_recommendations conn.execute(text(""" CREATE TABLE ai_price_recommendations ( id INTEGER PRIMARY KEY AUTOINCREMENT, sku TEXT NOT NULL UNIQUE, name TEXT NOT NULL, reason TEXT, strategy TEXT DEFAULT 'promote', confidence REAL, momo_price REAL, pchome_price REAL, gap_pct REAL, sales_7d_delta REAL, model_footprint TEXT DEFAULT '{}', status TEXT DEFAULT 'pending', reviewed_by TEXT, reviewed_at TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """)) return engine def _seed_test_data(engine): """填充測試假資料""" from sqlalchemy import text today = datetime.now().strftime("%Y-%m-%d") week_ago = (datetime.now() - timedelta(days=8)).strftime("%Y-%m-%d") with engine.begin() as conn: # 三支監控商品 products = [ ("A003", "舒特膚AD乳液200ml", "美妝保養"), ("A001", "玻尿酸面膜10片裝", "美妝保養"), ("A009", "美白化妝水150ml", "美妝保養"), ] for i, (i_code, name, cat) in enumerate(products, 1): conn.execute(text(""" INSERT INTO products (i_code, name, category, status) VALUES (:i_code, :name, :cat, 'ACTIVE') ON CONFLICT (i_code) DO NOTHING """), {"i_code": i_code, "name": name, "cat": cat}) # 最新 MOMO 售價(price_records,透過 i_code subquery 取 product_id) prices = [ ("A003", 1200.0), ("A001", 320.0), ("A009", 420.0), ] for i_code, price in prices: conn.execute(text(""" INSERT INTO price_records (product_id, price, timestamp) SELECT id, :price, CURRENT_TIMESTAMP FROM products WHERE i_code = :i_code LIMIT 1 """), {"i_code": i_code, "price": price}) # daily_sales_snapshot:近7天銷量下滑 > 10%(漏斗條件) sales_data = [ # sku, name, curr, prev, date ("A003", "舒特膚AD乳液200ml", 65000, 100000, today), ("A001", "玻尿酸面膜10片裝", 58000, 100000, today), ("A009", "美白化妝水150ml", 78000, 100000, today), # 前一週資料(用 week_ago 日期) ("A003", "舒特膚AD乳液200ml", 100000, 0, week_ago), ("A001", "玻尿酸面膜10片裝", 100000, 0, week_ago), ("A009", "美白化妝水150ml", 100000, 0, week_ago), ] for sku, name, amount, _, date in sales_data: conn.execute(text(""" INSERT INTO daily_sales_snapshot ("商品ID", "商品名稱", "銷售金額", snapshot_date) VALUES (:sku, :name, :amount, :date) """), {"sku": sku, "name": name, "amount": amount, "date": date}) # competitor_prices:預填 PChome 競品資料(模擬 Feeder 已跑過) expires = (datetime.now() + timedelta(hours=6)).strftime("%Y-%m-%d %H:%M:%S") comp_data = [ ("A003", 980.0, None, None, "DDABSD-A003FAKE", "舒特膚 AD 乳液 200ml", 0.82, '["on_sale","discount_10pct"]'), ("A001", 280.0, 320.0, 12, "DDABSD-A001FAKE", "玻尿酸保濕面膜10入", 0.76, '["discount_10pct"]'), ("A009", 350.0, 400.0, 12, "DDABSD-A009FAKE", "美白化妝水 150ml", 0.71, '["on_sale"]'), ] for sku, price, orig, disc, comp_id, comp_name, score, tags in comp_data: conn.execute(text(""" INSERT INTO competitor_prices (sku, source, price, original_price, discount_pct, competitor_product_id, competitor_product_name, match_score, tags, crawled_at, expires_at) VALUES (:sku, 'pchome', :price, :orig, :disc, :comp_id, :comp_name, :score, :tags, CURRENT_TIMESTAMP, :expires) ON CONFLICT(sku, source) DO NOTHING """), { "sku": sku, "price": price, "orig": orig, "disc": disc, "comp_id": comp_id, "comp_name": comp_name, "score": score, "tags": tags, "expires": expires, }) logger.info("✅ 測試資料填充完成") # ═══════════════════════════════════════════════════════════ # 關卡一:Feeder 語意萃取(CLI 模式,不需 DB) # ═══════════════════════════════════════════════════════════ def gate1_feeder(): print("\n" + "═"*60) print("【關卡一】Feeder 語意萃取 — PChome 搜尋 + 比對 + tags") print("═"*60) if SKIP_FEEDER: print("⏭️ SKIP_FEEDER=1,跳過(假設 competitor_prices 已由 seed 填入)") return True from services.pchome_crawler import PChomeCrawler from services.competitor_price_feeder import _find_best_match, _extract_tags, MIN_MATCH_SCORE test_skus = [ {"sku": "A003", "name": "舒特膚AD乳液200ml"}, {"sku": "A001", "name": "玻尿酸面膜10片裝"}, ] crawler = PChomeCrawler(delay=0.8) ok_count = 0 for item in test_skus: keyword = item["name"][:20] success, _, products = crawler.search_products(keyword, limit=10) if not success or not products: print(f" ⚠️ [{item['sku']}] PChome 搜尋無結果(網路問題或無此商品)") continue result = _find_best_match(item["name"], products) if not result: print(f" ⚠️ [{item['sku']}] 無法找到比對結果") continue best, score = result tags = _extract_tags(best) status = "✅" if score >= MIN_MATCH_SCORE else "⚠️ 低分" print(f" {status} [{item['sku']}] {item['name'][:25]}") print(f" PChome: {best.name[:40]}") print(f" 售價 ${best.price} | 分數 {score:.3f} | 標籤 {tags}") if score >= MIN_MATCH_SCORE: ok_count += 1 print(f"\n → 關卡一結果:{ok_count}/{len(test_skus)} 支成功比對") return ok_count > 0 # ═══════════════════════════════════════════════════════════ # 關卡二:Hermes 情境認知 # ═══════════════════════════════════════════════════════════ def gate2_hermes(engine): print("\n" + "═"*60) print("【關卡二】Hermes 情境認知 — competitor_tags 注入後 JSON 品質") print("═"*60) if SKIP_HERMES: print("⏭️ SKIP_HERMES=1,跳過") return _mock_threats() from services.hermes_analyst_service import HermesAnalystService # 用 SQLite 版的 fetch_candidates(需調整 SQL) # SQLite 不支援 PostgreSQL 特有語法,改為帶資料的直接查詢 service = HermesAnalystService(engine=engine) from sqlalchemy import text with engine.connect() as conn: rows = conn.execute(text(""" SELECT p.i_code AS sku, p.name, p.category, pr.price AS momo_price, (SELECT SUM(CAST("銷售金額" AS REAL)) FROM daily_sales_snapshot WHERE "商品ID" = p.i_code AND date(snapshot_date) = date('now')) AS sales_7d_curr, (SELECT SUM(CAST("銷售金額" AS REAL)) FROM daily_sales_snapshot WHERE "商品ID" = p.i_code AND date(snapshot_date) < date('now')) AS sales_7d_prev, cp.price AS pchome_price, cp.tags AS competitor_tags FROM products p JOIN price_records pr ON pr.product_id = p.id LEFT JOIN competitor_prices cp ON cp.sku = p.i_code AND cp.source = 'pchome' WHERE p.status = 'ACTIVE' """)).fetchall() candidates = [dict(r._mapping) for r in rows] logger.info(f"[關卡二] 候選商品 {len(candidates)} 筆") if not candidates: print(" ❌ 沒有候選商品,測試資料可能有誤") return [] # 顯示 candidates 驗證 for c in candidates: tags = c.get("competitor_tags") or "無" print(f" ✅ {c['sku']} | MOMO ${c['momo_price']} | PChome ${c.get('pchome_price','?')} " f"| tags={tags}") # 呼叫 Hermes start = time.time() raw = service._batch_analyze(candidates) duration = round(time.time() - start, 2) if not raw: print(" ⚠️ Hermes 回傳空結果(Ollama 未啟動或逾時)") return _mock_threats() print(f"\n ✅ Hermes 推理完成 ({duration}s),回傳 {len(raw)} 筆") for item in raw[:3]: tags_mention = "competitor_tags" in str(item.get("recommended_action", "")) print(f" [{item['sku']}] {item.get('risk','?')} | " f"confidence={item.get('confidence','?')} | " f"action={item.get('recommended_action','')[:50]}") if tags_mention: print(" 🎯 AI 洞察中包含競品標籤資訊") from services.hermes_analyst_service import PriceThreat threats = [ PriceThreat( sku=t["sku"], name=t["name"], category=t.get("category", ""), momo_price=t.get("momo_price", 0), pchome_price=t.get("pchome_price", 0), gap_pct=t.get("gap_pct", 0), sales_7d_delta_pct=t.get("sales_7d_delta_pct", 0), risk=t.get("risk", "LOW"), recommended_action=t.get("recommended_action", ""), confidence=t.get("confidence", 0.5), ) for t in raw ] return threats, {"duration_sec": duration, "tokens": 512} def _mock_threats(): """Hermes 跳過時的假威脅清單""" from services.hermes_analyst_service import PriceThreat threats = [ PriceThreat("A003", "舒特膚AD乳液200ml", "美妝保養", 1200, 980, 22.4, -35.0, "HIGH", "建議立即降價至 $1,000(競品有 on_sale 促銷標籤)", 0.85), PriceThreat("A001", "玻尿酸面膜10片裝", "美妝保養", 320, 280, 14.3, -42.0, "HIGH", "跟進降價至 $285(競品有 discount_10pct)", 0.78), PriceThreat("A009", "美白化妝水150ml", "美妝保養", 420, 350, 20.0, -22.0, "HIGH", "價格差距大,建議促銷或捆包", 0.45), ] return threats, {"duration_sec": 0, "tokens": 0} # ═══════════════════════════════════════════════════════════ # 關卡三:NemoTron + DB 寫入 + Telegram # ═══════════════════════════════════════════════════════════ def gate3_dispatcher(engine, threats, hermes_stats): print("\n" + "═"*60) print("【關卡三】NemoTron 決策 + DB 寫入 + Telegram 排版") print("═"*60) from services.nemoton_dispatcher_service import NemotronDispatcher # Mock Telegram:不真實發送,只印出訊息 class MockNotificationManager: def _send_telegram_messages(self, msgs): for msg in msgs: print("\n 📩 [Telegram 預覽]") print(" " + "\n ".join(msg.split("\n"))) nm = None if not SKIP_TELEGRAM else MockNotificationManager() if SKIP_NIM: print(" ⏭️ SKIP_NIM=1,使用 Fallback 路徑直接派發 HIGH 威脅") # 手動模擬 dispatch fallback 路徑 dispatcher = NemotronDispatcher(notification_manager=nm, engine=engine) from services.nemoton_dispatcher_service import ( _build_footprint_block, _build_footprint_json, NIM_DAILY_LIMIT ) fake_nim_stats = {"total_tokens": 185, "quota_used": 1} fp_text = _build_footprint_block(hermes_stats, fake_nim_stats) fp_data = _build_footprint_json(hermes_stats, fake_nim_stats) dispatched = 0 for t in threats: if t.risk == "HIGH": dispatcher._exec_trigger_price_alert( t.sku, t.name, t.gap_pct, t.sales_7d_delta_pct, t.recommended_action, t.confidence, momo_price=t.momo_price, comp_price=t.pchome_price, footprint=fp_text, ) dispatched += 1 # 對 A009 測試 add_to_recommendation if t.sku == "A009": dispatcher._exec_add_to_recommendation( t.sku, t.name, "我方價格低於市場,近7天流量上升,建議提升曝光", t.confidence, footprint=fp_text, footprint_data=fp_data, threat=t, ) result = {"dispatched": dispatched, "skipped": 0, "errors": [], "nim_stats": fake_nim_stats} else: dispatcher = NemotronDispatcher(notification_manager=nm, engine=engine) result = dispatcher.dispatch(threats, hermes_stats=hermes_stats) print(f"\n → dispatched={result['dispatched']} | " f"skipped={result['skipped']} | errors={result['errors']}") # 驗證 DB 寫入 _verify_db(engine) return result def _verify_db(engine): """驗證 DB 寫入是否正確(關鍵:model_footprint 必須是真實 JSON,不是 {})""" from sqlalchemy import text print("\n 🔍 DB 寫入驗證:") with engine.connect() as conn: recs = conn.execute(text( "SELECT sku, name, confidence, model_footprint, status FROM ai_price_recommendations" )).fetchall() if not recs: print(" ⚠️ ai_price_recommendations 無資料(add_to_recommendation 未被觸發)") return all_ok = True for r in recs: fp = r._mapping.get("model_footprint", "{}") try: fp_dict = json.loads(fp) if isinstance(fp, str) else fp has_analyst = "analyst" in fp_dict has_dispatcher = "dispatcher" in fp_dict fp_ok = has_analyst or has_dispatcher except Exception: fp_ok = False status = "✅" if fp_ok else "❌ model_footprint 為空 {}" print(f" {status} [{r._mapping['sku']}] " f"confidence={r._mapping['confidence']} " f"status={r._mapping['status']} " f"footprint_keys={list(json.loads(fp).keys()) if fp_ok else '[]'}") if not fp_ok: all_ok = False if all_ok: print(" ✅ 所有推薦記錄的 model_footprint 均已正確寫入 DB") else: print(" ❌ 部分記錄 model_footprint 為空,請檢查 footprint_data 傳遞路徑") # ═══════════════════════════════════════════════════════════ # 主程式 # ═══════════════════════════════════════════════════════════ def main(): print("=" * 60) print("🚀 AI 競價情報模組 E2E 整合測試") print(f" 模式:NIM={'跳過' if SKIP_NIM else '真實'} | " f"Hermes={'跳過' if SKIP_HERMES else '真實'} | " f"Telegram={'Mock' if SKIP_TELEGRAM else '真實'} | " f"Feeder={'跳過' if SKIP_FEEDER else '真實'}") print("=" * 60) # 建立測試 DB engine = _build_test_engine() _seed_test_data(engine) # 關卡一 gate1_feeder() # 關卡二 result2 = gate2_hermes(engine) if isinstance(result2, tuple): threats, hermes_stats = result2 else: threats, hermes_stats = result2, {"duration_sec": 0, "tokens": 0} # 關卡三 gate3_dispatcher(engine, threats, hermes_stats) print("\n" + "═"*60) print("✅ E2E 測試完成") print("═"*60) print() print("下一步:") print(" 1. 啟動 Docker Desktop → docker compose -f docker-compose.yml up -d postgres") print(" 2. psql -U momo -h 127.0.0.1 -d momo_analytics -f migrations/003_ai_price_recommendations.sql") print(" 3. psql -U momo -h 127.0.0.1 -d momo_analytics -f migrations/004_competitor_prices.sql") print(" 4. USE_POSTGRESQL=true python3 scripts/e2e_test_ai_pipeline.py") if __name__ == "__main__": main()