diff --git a/CONSTITUTION.md b/CONSTITUTION.md index 3b698fb..cd73677 100644 --- a/CONSTITUTION.md +++ b/CONSTITUTION.md @@ -2,7 +2,7 @@ > 本文件定義專案開發的核心準則與不可違反的規範 > **建立日期**: 2026-01-12 -> **當前版本**: V10.44 (Persist PChome competitor price history) +> **當前版本**: V10.45 (AI product pick list and improved PChome matching) > **最後更新**: 2026-05-01 --- diff --git a/app.py b/app.py index df6b906..746c99f 100644 --- a/app.py +++ b/app.py @@ -95,8 +95,8 @@ except Exception as e: sys_log.error(f"無法檢測磁碟空間: {e}") # 🚩 系統版本定義 (備份與顯示用) -# 🚩 2026-05-01 V10.44: Persist PChome competitor price history -SYSTEM_VERSION = "V10.44" +# 🚩 2026-05-01 V10.45: AI product pick list and improved PChome matching +SYSTEM_VERSION = "V10.45" # ========================================== # 🔒 SQL Injection 防護函數 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 065fb5d..a504e6a 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -1,6 +1,6 @@ # MOMO PRO — AI 競價情報模組 Single Source of Truth -> **最後更新**: 2026-04-30 (台北時間) +> **最後更新**: 2026-05-01 (台北時間) > **狀態**: 🟢 四 AI Agent 自動化閉環已落地 — EventRouter / AutoHeal / OpenClaw Memory / ElephantAlpha bridge / Prometheus metrics / Smoke Dashboard / Smoke Trend Management / Telegram Summary / Grafana provisioning / Prometheus scrape / CD Gunicorn 掛載具測試覆蓋 > **適用版本**: V10.22 Legacy 5888 入口清理版 @@ -26,6 +26,16 @@ SQL漏斗(~300筆) 任務: 跨 Agent orchestration、HITL、AutoHeal bridge、受控 log scan ``` +### 1.1 PChome 挑品 Agent(2026-05-01) + +`services/ai_product_pick_agent.py` 新增 PChome 銷售用挑品 Agent: + +- 只讀真實資料表:`products`、`price_records`、`competitor_prices`、`competitor_price_history`,若 `daily_sales_snapshot` 可用則納入近 7 天銷售額與數量。 +- 將 PChome 比 MOMO 有價格優勢、比對信心足夠、且有歷史快照或銷售動能的品項寫入 `ai_price_recommendations`。 +- 寫入策略使用 `strategy='product_pick'`,保留在既有 AI 決策表,不新增假頁面或暫存 JSON。 +- 後台入口:`POST /api/ai/product-picks/generate`,`/ai_intelligence` 可手動產生清單。 +- 配對來源仍以 PChome crawler 真實搜尋結果為準;無競品資料時不生成挑品。 + | 角色 | 模型 | 主機 | 成本 | 每日限額 | |------|------|------|------|---------| | Hermes 分析師 | hermes3:latest / embedding model | 192.168.0.111:11434 或 188 Ollama | 零 | 無限 | diff --git a/routes/ai_routes.py b/routes/ai_routes.py index db280b6..4345801 100644 --- a/routes/ai_routes.py +++ b/routes/ai_routes.py @@ -1485,6 +1485,8 @@ def api_icaim_dashboard(): WHERE expires_at > NOW() AND source = 'pchome') AS valid_competitor_prices, (SELECT COUNT(*) FROM high_risk) AS high_risk_count, (SELECT COUNT(*) FROM ai_price_recommendations) AS total_ai_recs, + (SELECT COUNT(*) FROM ai_price_recommendations + WHERE strategy = 'product_pick' AND status = 'pending') AS product_pick_count, (SELECT MAX(crawled_at) FROM competitor_prices WHERE source='pchome') AS last_feeder_run """) @@ -1598,6 +1600,11 @@ def api_icaim_dashboard(): 'valid_competitor_prices': int(stats_row.valid_competitor_prices or 0), 'high_risk_count': int(stats_row.high_risk_count or 0), 'total_ai_recs': int(stats_row.total_ai_recs or 0), + 'product_pick_count': int(stats_row.product_pick_count or 0), + 'match_rate': round( + int(stats_row.valid_competitor_prices or 0) / max(int(stats_row.total_skus or 0), 1) * 100, + 1 + ), 'last_feeder_run': last_feeder, }, 'competitors': competitors, @@ -1609,6 +1616,37 @@ def api_icaim_dashboard(): return jsonify({'success': False, 'error': str(e)}), 500 +@ai_bp.route('/api/ai/product-picks/generate', methods=['POST']) +@login_required +def api_generate_product_picks(): + """手動產生 PChome 銷售用 AI 建議挑品清單,結果寫入 DB。""" + try: + from config import DATABASE_PATH + from sqlalchemy import create_engine + from services.ai_product_pick_agent import generate_product_pick_list + + payload = request.get_json(silent=True) or {} + limit = int(payload.get('limit', 30)) + limit = max(5, min(limit, 80)) + + engine = create_engine(DATABASE_PATH) + result = generate_product_pick_list(engine, limit=limit) + + return jsonify({ + 'success': True, + 'message': f'AI 挑品清單已產生:寫入 {result.written} 筆,候選 {result.candidates} 筆', + 'data': { + 'candidates': result.candidates, + 'written': result.written, + 'generated_at': result.generated_at, + 'picks': result.picks[:20], + } + }) + except Exception as e: + logger.error(f"[ProductPickAgent] 產生挑品清單失敗: {e}") + return jsonify({'success': False, 'error': str(e)}), 500 + + @ai_bp.route('/api/ai/icaim/trigger', methods=['POST']) @login_required def api_icaim_trigger(): diff --git a/services/ai_product_pick_agent.py b/services/ai_product_pick_agent.py new file mode 100644 index 0000000..dc97dd3 --- /dev/null +++ b/services/ai_product_pick_agent.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AI 建議挑品 Agent + +以真實 DB 資料建立可操作的 PChome 銷售挑品清單: +- MOMO 最新價格 +- PChome 最新競品價格與商品 ID +- PChome 歷史快照 +- 近 7 天銷售資料(若 daily_sales_snapshot 可用) + +此 Agent 不補假資料;資料不足的欄位只降低分數或略過。 +""" + +import json +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, List + +logger = logging.getLogger(__name__) + + +@dataclass +class ProductPickResult: + candidates: int + written: int + picks: List[Dict[str, Any]] + generated_at: str + + +def _to_float(value, default=0.0) -> float: + if value is None: + return default + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _load_json_tags(value) -> List[str]: + if not value: + return [] + if isinstance(value, list): + return value + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, list) else [] + except Exception: + return [] + + +def _has_daily_sales_snapshot(conn) -> bool: + from sqlalchemy import text + + try: + if conn.dialect.name == "postgresql": + row = conn.execute(text("SELECT to_regclass('daily_sales_snapshot') AS table_name")).mappings().first() + return bool(row and row.get("table_name")) + row = conn.execute(text(""" + SELECT name FROM sqlite_master + WHERE type='table' AND name='daily_sales_snapshot' + """)).first() + return bool(row) + except Exception: + return False + + +def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]: + from sqlalchemy import text + + sales_join = "" + sales_select = "0 AS sales_7d, 0 AS sales_prev_7d, 0 AS qty_7d" + if _has_daily_sales_snapshot(conn): + sales_join = """ + LEFT JOIN ( + SELECT + "商品ID" AS sku, + SUM(CASE WHEN snapshot_date >= CURRENT_DATE - 7 + THEN COALESCE("銷售金額"::numeric, 0) ELSE 0 END) AS sales_7d, + SUM(CASE WHEN snapshot_date >= CURRENT_DATE - 14 + AND snapshot_date < CURRENT_DATE - 7 + THEN COALESCE("銷售金額"::numeric, 0) ELSE 0 END) AS sales_prev_7d, + SUM(CASE WHEN snapshot_date >= CURRENT_DATE - 7 + THEN COALESCE("數量"::numeric, 0) ELSE 0 END) AS qty_7d + FROM daily_sales_snapshot + GROUP BY "商品ID" + ) sales ON sales.sku = lm.sku + """ + sales_select = """ + COALESCE(sales.sales_7d, 0) AS sales_7d, + COALESCE(sales.sales_prev_7d, 0) AS sales_prev_7d, + COALESCE(sales.qty_7d, 0) AS qty_7d + """ + + sql = text(f""" + WITH latest_momo AS ( + SELECT + p.id AS product_id, + p.i_code AS sku, + p.name, + p.url, + p.category, + pr.price AS momo_price, + ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.status = 'ACTIVE' + ), + history_stats AS ( + SELECT + sku, + source, + COUNT(*) AS history_points, + MIN(price) AS min_pchome_price, + MAX(price) AS max_pchome_price + FROM competitor_price_history + WHERE source = 'pchome' + AND crawled_at >= CURRENT_TIMESTAMP - INTERVAL '30 days' + GROUP BY sku, source + ) + SELECT + lm.product_id, + lm.sku, + lm.name, + lm.url, + lm.category, + lm.momo_price, + cp.price AS pchome_price, + cp.original_price, + cp.discount_pct, + cp.competitor_product_id, + cp.competitor_product_name, + cp.match_score, + cp.tags, + cp.crawled_at, + COALESCE(hs.history_points, 0) AS history_points, + hs.min_pchome_price, + hs.max_pchome_price, + {sales_select} + FROM latest_momo lm + JOIN competitor_prices cp + ON cp.sku = lm.sku + AND cp.source = 'pchome' + AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP) + AND cp.match_score >= 0.42 + LEFT JOIN history_stats hs + ON hs.sku = lm.sku + AND hs.source = cp.source + {sales_join} + WHERE lm.rn = 1 + ORDER BY cp.match_score DESC, cp.crawled_at DESC + LIMIT :limit + """) + + try: + return [dict(row) for row in conn.execute(sql, {"limit": max(limit * 6, 100)}).mappings().all()] + except Exception as exc: + logger.warning("[ProductPickAgent] sales-aware query failed, fallback without sales: %s", exc) + fallback = text(""" + WITH latest_momo AS ( + SELECT + p.id AS product_id, + p.i_code AS sku, + p.name, + p.url, + p.category, + pr.price AS momo_price, + ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.status = 'ACTIVE' + ) + SELECT + lm.product_id, + lm.sku, + lm.name, + lm.url, + lm.category, + lm.momo_price, + cp.price AS pchome_price, + cp.original_price, + cp.discount_pct, + cp.competitor_product_id, + cp.competitor_product_name, + cp.match_score, + cp.tags, + cp.crawled_at, + 0 AS history_points, + NULL AS min_pchome_price, + NULL AS max_pchome_price, + 0 AS sales_7d, + 0 AS sales_prev_7d, + 0 AS qty_7d + FROM latest_momo lm + JOIN competitor_prices cp + ON cp.sku = lm.sku + AND cp.source = 'pchome' + AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP) + AND cp.match_score >= 0.42 + WHERE lm.rn = 1 + ORDER BY cp.match_score DESC, cp.crawled_at DESC + LIMIT :limit + """) + return [dict(row) for row in conn.execute(fallback, {"limit": max(limit * 6, 100)}).mappings().all()] + + +def _score_candidate(row: Dict[str, Any]) -> Dict[str, Any]: + momo_price = _to_float(row.get("momo_price")) + pchome_price = _to_float(row.get("pchome_price")) + match_score = _to_float(row.get("match_score")) + sales_7d = _to_float(row.get("sales_7d")) + sales_prev_7d = _to_float(row.get("sales_prev_7d")) + qty_7d = _to_float(row.get("qty_7d")) + history_points = int(_to_float(row.get("history_points"))) + tags = _load_json_tags(row.get("tags")) + + gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else 0 + sales_delta = ((sales_7d - sales_prev_7d) / sales_prev_7d * 100) if sales_prev_7d else None + + price_score = max(0, min(38, gap_pct * 1.8 + 8)) + match_component = max(0, min(24, match_score * 24)) + sales_component = 0 + if sales_7d > 0: + sales_component += min(10, sales_7d / 30000 * 10) + if qty_7d > 0: + sales_component += min(5, qty_7d / 20 * 5) + if sales_delta is not None and sales_delta > 0: + sales_component += min(8, sales_delta / 40 * 8) + history_component = min(10, history_points * 2) + promo_component = 5 if any(tag in tags for tag in ["on_sale", "discount_10pct", "discount_20pct", "discount_30pct"]) else 0 + score = round(min(100, price_score + match_component + sales_component + history_component + promo_component), 1) + + if gap_pct >= 10: + angle = "PChome 價格優勢明顯" + elif gap_pct >= 3: + angle = "PChome 小幅價格優勢" + elif sales_7d > 0: + angle = "近期有銷售動能,可搭配內容或檔期測試" + else: + angle = "比對信心足夠,可列入觀察型挑品" + + reason_parts = [ + f"{angle},PChome ${pchome_price:,.0f} vs MOMO ${momo_price:,.0f}", + f"價差 {gap_pct:+.1f}%", + f"比對信心 {match_score:.2f}", + ] + if sales_7d > 0: + reason_parts.append(f"近 7 天銷售額 ${sales_7d:,.0f}") + if history_points: + reason_parts.append(f"已有 {history_points} 筆 PChome 歷史快照") + + return { + **row, + "gap_pct": round(gap_pct, 1), + "sales_7d_delta": round(sales_delta, 1) if sales_delta is not None else 0, + "pick_score": score, + "confidence": round(max(0.45, min(0.98, score / 100)), 3), + "reason": ";".join(reason_parts), + } + + +def _write_pick(conn, pick: Dict[str, Any]) -> None: + from sqlalchemy import text + + footprint = { + "agent": { + "name": "PChomeProductPickAgent", + "version": "v1", + "generated_at": datetime.now().isoformat(timespec="seconds"), + "inputs": ["products", "price_records", "competitor_prices", "competitor_price_history", "daily_sales_snapshot"], + "score": pick["pick_score"], + }, + "competitor": { + "source": "pchome", + "product_id": pick.get("competitor_product_id"), + "product_name": pick.get("competitor_product_name"), + "match_score": _to_float(pick.get("match_score")), + }, + } + + conn.execute(text(""" + INSERT INTO ai_price_recommendations + (sku, name, reason, strategy, confidence, + momo_price, pchome_price, gap_pct, sales_7d_delta, + model_footprint, status, created_at, updated_at) + VALUES + (:sku, :name, :reason, 'product_pick', :confidence, + :momo_price, :pchome_price, :gap_pct, :sales_7d_delta, + :footprint, 'pending', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (sku) DO UPDATE + SET reason = EXCLUDED.reason, + strategy = 'product_pick', + confidence = EXCLUDED.confidence, + momo_price = EXCLUDED.momo_price, + pchome_price = EXCLUDED.pchome_price, + gap_pct = EXCLUDED.gap_pct, + sales_7d_delta = EXCLUDED.sales_7d_delta, + model_footprint = EXCLUDED.model_footprint, + status = 'pending', + updated_at = CURRENT_TIMESTAMP + """), { + "sku": pick["sku"], + "name": pick["name"], + "reason": pick["reason"], + "confidence": pick["confidence"], + "momo_price": pick["momo_price"], + "pchome_price": pick["pchome_price"], + "gap_pct": pick["gap_pct"], + "sales_7d_delta": pick["sales_7d_delta"], + "footprint": json.dumps(footprint, ensure_ascii=False), + }) + + +def generate_product_pick_list(engine, limit: int = 30) -> ProductPickResult: + """產生並保存 AI 建議挑品清單。""" + generated_at = datetime.now().isoformat(timespec="seconds") + with engine.begin() as conn: + rows = _fetch_candidates(conn, limit) + scored = [_score_candidate(row) for row in rows if _to_float(row.get("pchome_price")) > 0] + picks = [ + pick for pick in scored + if pick["pick_score"] >= 45 and (_to_float(pick.get("match_score")) >= 0.42) + ] + picks.sort(key=lambda item: item["pick_score"], reverse=True) + picks = picks[:limit] + for pick in picks: + _write_pick(conn, pick) + + return ProductPickResult( + candidates=len(rows), + written=len(picks), + picks=picks, + generated_at=generated_at, + ) diff --git a/services/competitor_price_feeder.py b/services/competitor_price_feeder.py index a10b656..ac1633f 100644 --- a/services/competitor_price_feeder.py +++ b/services/competitor_price_feeder.py @@ -25,6 +25,7 @@ import json import logging +import re import time from dataclasses import dataclass from datetime import datetime, timedelta, timezone @@ -33,8 +34,9 @@ from typing import Optional logger = logging.getLogger(__name__) # ── 比對參數 ───────────────────────────────────────── -MIN_MATCH_SCORE = 0.45 # 低於此分數不寫入(避免張冠李戴) -SEARCH_LIMIT = 10 # 每個 SKU 搜尋 PChome 前 N 筆 +MIN_MATCH_SCORE = 0.42 # 低於此分數不寫入(避免張冠李戴) +SEARCH_LIMIT = 20 # 每個搜尋詞取 PChome 前 N 筆 +MAX_SEARCH_TERMS = 4 # 每個 MOMO 商品最多嘗試幾組搜尋詞 BATCH_SIZE = 30 # 每批 DB 寫入筆數 RATE_DELAY = 0.8 # 每次 PChome 請求間隔(秒) TTL_HOURS = 6 # competitor_prices 快取有效期 @@ -95,6 +97,58 @@ def _extract_tags(pchome_product) -> list: return tags +def _clean_search_text(value: str) -> str: + value = re.sub(r'[((][^))]*[))]', ' ', value or '') + value = re.sub(r'[【\[].*?[】\]]', ' ', value) + value = re.sub(r'[^\w\u4e00-\u9fff]+', ' ', value) + return re.sub(r'\s+', ' ', value).strip() + + +def _dedupe_terms(terms: list) -> list: + result = [] + seen = set() + for term in terms: + cleaned = _clean_search_text(term) + if len(cleaned) < 2: + continue + key = cleaned.lower() + if key in seen: + continue + seen.add(key) + result.append(cleaned[:36]) + if len(result) >= MAX_SEARCH_TERMS: + break + return result + + +def _build_search_keywords(momo_name: str) -> list: + """ + 用多組真實商品名線索搜尋 PChome,提高命中率,但仍交給相似度門檻把關。 + """ + cleaned = _clean_search_text(momo_name) + terms = [cleaned[:28], cleaned[:18]] + + try: + from services.price_comparison import ProductNameParser, BRAND_ALIASES + parser = ProductNameParser() + parsed = parser.parse(momo_name, "momo", 0, "", "") + if parsed.brand: + brand_terms = BRAND_ALIASES.get(parsed.brand, [parsed.brand]) + brand_label = next((term for term in brand_terms if any('\u4e00' <= c <= '\u9fff' for c in term)), brand_terms[0]) + if parsed.product_type: + terms.append(f"{brand_label} {parsed.product_type}") + if parsed.specs.get("volume"): + terms.append(f"{brand_label} {parsed.specs['volume']}") + if parsed.keywords: + terms.append(f"{brand_label} {' '.join(parsed.keywords[:3])}") + elif parsed.keywords: + terms.append(" ".join(parsed.keywords[:4])) + except Exception: + pass + + return _dedupe_terms(terms) + + def _find_best_match(momo_name: str, pchome_products: list) -> Optional[tuple]: """ 從 PChome 搜尋結果中找出與 MOMO 商品名稱最接近的一筆 @@ -132,6 +186,22 @@ def _find_best_match(momo_name: str, pchome_products: list) -> Optional[tuple]: return (best, best_score) if best else None +def _search_pchome_candidates(crawler, momo_name: str) -> list: + """以多組搜尋詞擴大 PChome 候選池,去重後回傳真實商品資料。""" + candidates = [] + seen_ids = set() + for keyword in _build_search_keywords(momo_name): + ok, _, products = crawler.search_products(keyword, limit=SEARCH_LIMIT) + if not ok or not products: + continue + for product in products: + if product.product_id in seen_ids: + continue + seen_ids.add(product.product_id) + candidates.append(product) + return candidates + + def _structural_similarity(momo_p, pchome_p) -> float: """ 結構化相似度計算(品牌 + 規格 + 關鍵字) @@ -398,12 +468,9 @@ class CompetitorPriceFeeder: momo_product_id = item.get("product_id") momo_price = item.get("momo_price") - # 用商品名稱前 20 字搜尋(避免 query 過長) - keyword = momo_name[:20].strip() - try: - ok, _, products = crawler.search_products(keyword, limit=SEARCH_LIMIT) - if not ok or not products: + products = _search_pchome_candidates(crawler, momo_name) + if not products: logger.debug(f"[Feeder] {sku} 無搜尋結果,跳過") skipped_no += 1 continue diff --git a/templates/ai_intelligence.html b/templates/ai_intelligence.html index c21ed36..e6ced35 100644 --- a/templates/ai_intelligence.html +++ b/templates/ai_intelligence.html @@ -20,6 +20,9 @@ + @@ -40,7 +43,10 @@