diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 3319a88..dbcee29 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -341,6 +341,7 @@ LEFT JOIN competitor_prices cp - `services/competitor_intel_repository.py` 是下游頁面、圖表、簡報的共用查詢出口;新增消費端不得各自硬寫不同 match threshold。 - competitor PPT 不可只輸出 matched rows 造成覆蓋率假象;`fetch_competitor_comparison_results()` 必須用 `LEFT JOIN valid_competitor` 保留高營收/高價但尚未有效配對的 MOMO 商品,並帶出 `match_status`、`candidate_count`、`best_match_score` 與 `match_diagnostic`,讓簡報與 AI 文案明確區分「高信心比對」與「待補身份/價格」。 - `services/competitor_identity_revalidator.py` 可對既有 `competitor_prices` legacy row 離線重跑 `identity_v2`:只有新版 matcher 分數 `>= 0.76` 且無 hard veto 才補 `identity_v2` / `legacy_revalidated` tags;預設不刷新 `expires_at`,避免過期價格進入決策。 +- `CompetitorPriceFeeder.run_expired_identity_refresh()` 會優先刷新已通過 `identity_v2` 但 TTL 過期的 PChome row:直接用既有 `competitor_product_id` 批次呼叫 PChome 商品 API,再用新版 matcher 重新驗證名稱/規格/價格 sanity,通過後寫回 `competitor_prices` 與 `competitor_price_history`。這條路徑提升新鮮價格覆蓋率,但不降低 match threshold,也不讓過期價格直接進入決策。 - Dashboard 必須把「待比對」拆成可診斷狀態:`價格過期待刷新`、`舊版配對待重驗`、`低分配對待審`、`身份否決`、`找不到同款`、`抓取異常`、`尚未搜尋`。不可再用單一「待比對」掩蓋資料品質原因。 ### 執行方式 diff --git a/scheduler.py b/scheduler.py index d189578..15579cb 100644 --- a/scheduler.py +++ b/scheduler.py @@ -2208,7 +2208,9 @@ def run_pchome_match_backfill_task(): include_expired=True, write_attempts=True, ) - feeder_result = CompetitorPriceFeeder(engine=engine).run_unmatched_priority(limit=120) + feeder = CompetitorPriceFeeder(engine=engine) + refresh_result = feeder.run_expired_identity_refresh(limit=240) + feeder_result = feeder.run_unmatched_priority(limit=120) pick_result = generate_product_pick_list(engine, limit=50) clear_dashboard_cache() clear_competitor_intel_cache() @@ -2225,6 +2227,11 @@ def run_pchome_match_backfill_task(): "identity_revalidated_expired": revalidation_result.promoted_expired, "identity_revalidation_rejected_low": revalidation_result.rejected_low_score, "identity_revalidation_rejected_veto": revalidation_result.rejected_veto, + "expired_identity_refresh_total": refresh_result.total_skus, + "expired_identity_refresh_matched": refresh_result.matched, + "expired_identity_refresh_no_result": refresh_result.skipped_no_result, + "expired_identity_refresh_low_score": refresh_result.skipped_low_score, + "expired_identity_refresh_errors": refresh_result.errors, "pick_candidates": pick_result.candidates, "pick_written": pick_result.written, "status": "Success", @@ -2232,11 +2239,12 @@ def run_pchome_match_backfill_task(): logging.info( f"[Scheduler] [PChomeBackfill] ✅ 完成 | " f"revalidated={revalidation_result.promoted_fresh}+{revalidation_result.promoted_expired} " + f"refreshed={refresh_result.matched}/{refresh_result.total_skus} " f"matched={feeder_result.matched}/{feeder_result.total_skus} " f"history_written={feeder_result.history_written} " f"pick_written={pick_result.written} " - f"errors={feeder_result.errors} " - f"耗時={feeder_result.duration_sec}s" + f"errors={feeder_result.errors + refresh_result.errors} " + f"耗時={feeder_result.duration_sec + refresh_result.duration_sec}s" ) _save_stats('pchome_match_backfill', stats) diff --git a/services/competitor_price_feeder.py b/services/competitor_price_feeder.py index a2cc327..6e417a9 100644 --- a/services/competitor_price_feeder.py +++ b/services/competitor_price_feeder.py @@ -155,6 +155,11 @@ def _format_match_diagnostics(diagnostics) -> str: ) +def _product_id_key(product_id: str) -> str: + """Normalize PChome IDs for comparing cached IDs with API-returned IDs.""" + return re.sub(r"[^A-Z0-9]", "", str(product_id or "").upper()) + + def _find_best_match_detail( momo_name: str, pchome_products: list, @@ -529,6 +534,59 @@ class CompetitorPriceFeeder: ).fetchall() return [dict(r._mapping) for r in rows] + def _fetch_expired_identity_skus(self, limit: int = 120) -> list: + """ + 取得 identity_v2 已確認、但 PChome 價格快取過期的商品。 + 這些商品不需重新 keyword search,先用既有 PChome product_id 批次刷新價格。 + """ + if self.engine is None: + raise RuntimeError("需要注入 SQLAlchemy engine") + + from sqlalchemy import text + sql = text(""" + WITH latest_momo AS ( + SELECT + p.id AS product_id, + p.i_code AS sku, + p.name, + p.category, + pr.price AS momo_price, + ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.status = 'ACTIVE' + ) + SELECT + lm.product_id, + lm.sku, + lm.name, + lm.category, + lm.momo_price, + cp.competitor_product_id, + cp.competitor_product_name, + cp.match_score, + cp.expires_at + FROM latest_momo lm + JOIN competitor_prices cp + ON cp.sku = lm.sku + AND cp.source = 'pchome' + AND cp.competitor_product_id IS NOT NULL + AND cp.competitor_product_id <> '' + AND cp.expires_at IS NOT NULL + AND cp.expires_at <= CURRENT_TIMESTAMP + AND COALESCE(cp.match_score, 0) >= :match_score_floor + AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2' + WHERE lm.rn = 1 + ORDER BY cp.expires_at ASC, lm.momo_price DESC NULLS LAST, lm.sku + LIMIT :limit + """) + with self.engine.connect() as conn: + rows = conn.execute( + sql, + {"limit": max(1, min(int(limit), 500)), "match_score_floor": MIN_MATCH_SCORE}, + ).fetchall() + return [dict(r._mapping) for r in rows] + def _upsert_competitor_price( self, sku: str, @@ -834,6 +892,195 @@ class CompetitorPriceFeeder: attempts_written=attempts_written, ) + def _run_known_identity_refresh_items( + self, + skus: list, + source: str = "pchome", + label: str = "已確認身份價格刷新", + ) -> FeederResult: + start = time.time() + + if source != "pchome": + logger.warning(f"[Feeder] 尚未支援 source={source},跳過") + return FeederResult(0, 0, 0, 0, 0, 0.0) + + if not skus: + return FeederResult(0, 0, 0, 0, 0, 0.0) + + from services.pchome_crawler import PChomeCrawler + crawler = PChomeCrawler(timeout=30, delay=RATE_DELAY) + + requested_ids = [ + str(item.get("competitor_product_id") or "").strip() + for item in skus + if str(item.get("competitor_product_id") or "").strip() + ] + ok, message, products = crawler.fetch_product_details(requested_ids, batch_size=20) + product_map = {_product_id_key(product.product_id): product for product in products} if ok else {} + logger.info( + f"[Feeder] {label} product_id 批次查詢 | requested={len(requested_ids)} " + f"returned={len(product_map)} msg={message}" + ) + + matched = 0 + skipped_no = 0 + skipped_low = 0 + errors = 0 + history_written = 0 + attempts_written = 0 + + for item in skus: + sku = item["sku"] + momo_name = item["name"] + momo_product_id = item.get("product_id") + momo_price = item.get("momo_price") + competitor_product_id = str(item.get("competitor_product_id") or "").strip() + search_terms = [f"known_product_id:{competitor_product_id}"] if competitor_product_id else [] + + try: + product = product_map.get(_product_id_key(competitor_product_id)) + if not product: + self._record_match_attempt( + sku, + momo_name, + momo_product_id=momo_product_id, + momo_price=momo_price, + search_terms=search_terms, + candidate_count=0, + attempt_status="refresh_no_result", + error_message=f"PChome product_id not returned: {competitor_product_id}", + source=source, + ) + skipped_no += 1 + attempts_written += 1 + continue + + result = _find_best_match_detail(momo_name, [product], momo_price=momo_price) + if not result: + self._record_match_attempt( + sku, + momo_name, + momo_product_id=momo_product_id, + momo_price=momo_price, + search_terms=search_terms, + candidate_count=1, + attempt_status="refresh_no_match", + source=source, + ) + skipped_no += 1 + attempts_written += 1 + continue + + best_product, score, diagnostics = result + if score < MIN_MATCH_SCORE: + self._record_match_attempt( + sku, + momo_name, + momo_product_id=momo_product_id, + momo_price=momo_price, + search_terms=search_terms, + candidate_count=1, + attempt_status="refresh_low_score", + best_product=best_product, + best_score=score, + error_message=_format_match_diagnostics(diagnostics), + source=source, + ) + skipped_low += 1 + attempts_written += 1 + continue + + tags = _extract_tags(best_product) + tags.extend(getattr(diagnostics, "tags", [])) + for reason in getattr(diagnostics, "reasons", ()) or (): + tags.append(f"match_{reason}") + tags.append("refresh_known_identity") + tags = list(dict.fromkeys(tags)) + + should_write, write_reason = self._should_upsert_competitor_price( + sku, + best_product, + score, + source=source, + ) + if not should_write: + self._record_match_attempt( + sku, + momo_name, + momo_product_id=momo_product_id, + momo_price=momo_price, + search_terms=search_terms, + candidate_count=1, + attempt_status="refresh_needs_review", + best_product=best_product, + best_score=score, + error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}", + source=source, + ) + skipped_low += 1 + attempts_written += 1 + continue + + tags.append(write_reason) + self._upsert_competitor_price( + sku, + best_product, + score, + tags, + momo_product_id=momo_product_id, + momo_price=momo_price, + source=source, + ) + self._record_match_attempt( + sku, + momo_name, + momo_product_id=momo_product_id, + momo_price=momo_price, + search_terms=search_terms, + candidate_count=1, + attempt_status="matched", + best_product=best_product, + best_score=score, + source=source, + ) + matched += 1 + history_written += 1 + attempts_written += 1 + except Exception as e: + logger.error(f"[Feeder] {sku} 已確認身份刷新失敗: {e}") + try: + self._record_match_attempt( + sku, + momo_name, + momo_product_id=momo_product_id, + momo_price=momo_price, + search_terms=search_terms, + attempt_status="refresh_error", + error_message=str(e), + source=source, + ) + attempts_written += 1 + except Exception as attempt_error: + logger.warning(f"[Feeder] {sku} 刷新嘗試紀錄寫入失敗: {attempt_error}") + errors += 1 + + duration = round(time.time() - start, 2) + logger.info( + f"[Feeder] {label} 完成 matched={matched}/{len(skus)} " + f"skip_no={skipped_no} skip_low={skipped_low} errors={errors} " + f"history_written={history_written} attempts_written={attempts_written} 耗時={duration}s" + ) + return FeederResult( + total_skus=len(skus), + matched=matched, + skipped_no_result=skipped_no, + skipped_low_score=skipped_low, + errors=errors, + duration_sec=duration, + history_written=history_written, + attempts_written=attempts_written, + ) + def run(self, source: str = "pchome") -> FeederResult: """ 執行一輪競品價格抓取與寫入 @@ -852,6 +1099,16 @@ class CompetitorPriceFeeder: return self._run_sku_items(skus, source=source, label="PChome 競品價格") + def run_expired_identity_refresh(self, limit: int = 120, source: str = "pchome") -> FeederResult: + """刷新已通過 identity_v2、但 PChome 價格快取過期的商品。""" + try: + skus = self._fetch_expired_identity_skus(limit=limit) + except Exception as e: + logger.error(f"[Feeder] 讀取過期 identity_v2 商品失敗: {e}") + return FeederResult(0, 0, 0, 0, 1, 0.0) + + return self._run_known_identity_refresh_items(skus, source=source, label="identity_v2 過期價格刷新") + def run_unmatched_priority(self, limit: int = 80, source: str = "pchome") -> FeederResult: """優先補抓尚未有有效 PChome 配對的高價商品。""" try: diff --git a/tests/test_competitor_match_attempts_persistence.py b/tests/test_competitor_match_attempts_persistence.py index 61ecdd4..ed44890 100644 --- a/tests/test_competitor_match_attempts_persistence.py +++ b/tests/test_competitor_match_attempts_persistence.py @@ -1,5 +1,6 @@ from pathlib import Path import logging +from datetime import datetime ROOT = Path(__file__).resolve().parents[1] @@ -24,6 +25,9 @@ def test_competitor_feeder_persists_all_match_attempt_outcomes(): assert "_should_upsert_competitor_price" in source assert "replace_legacy_unverified" in source assert "identity_v2" in source + assert "_fetch_expired_identity_skus" in source + assert "run_expired_identity_refresh" in source + assert "refresh_known_identity" in source assert "CREATE TABLE IF NOT EXISTS competitor_match_attempts" in migration assert "attempt_status" in migration @@ -47,3 +51,75 @@ def test_competitor_feeder_logs_keyword_parser_fallback(monkeypatch, caplog): assert terms assert "fallback to cleaned product name" in caplog.text + + +def test_competitor_feeder_refreshes_expired_identity_by_known_product_id(monkeypatch): + from services.competitor_price_feeder import CompetitorPriceFeeder + from services.pchome_crawler import PChomeProduct + + requested = [] + product = PChomeProduct( + product_id="DDAB01-1900ABCD", + name="舒特膚 AD 乳液 200ml", + price=899, + original_price=999, + discount=10, + image_url="", + product_url="https://24h.pchome.com.tw/prod/DDAB01-1900ABCD", + stock=50, + store="24h", + rating=4.8, + review_count=12, + is_on_sale=True, + crawled_at=datetime.now(), + ) + + class FakeCrawler: + def __init__(self, *_args, **_kwargs): + pass + + def fetch_product_details(self, product_ids, batch_size=20): + requested.extend(product_ids) + return True, "ok", [product] + + monkeypatch.setattr("services.pchome_crawler.PChomeCrawler", FakeCrawler) + feeder = CompetitorPriceFeeder(engine=object()) + writes = [] + attempts = [] + monkeypatch.setattr( + feeder, + "_should_upsert_competitor_price", + lambda *_args, **_kwargs: (True, "same_or_empty_existing"), + ) + monkeypatch.setattr( + feeder, + "_upsert_competitor_price", + lambda sku, product, score, tags, **kwargs: writes.append({ + "sku": sku, + "product_id": product.product_id, + "score": score, + "tags": tags, + **kwargs, + }), + ) + monkeypatch.setattr( + feeder, + "_record_match_attempt", + lambda *args, **kwargs: attempts.append(kwargs), + ) + + result = feeder._run_known_identity_refresh_items([{ + "sku": "A001", + "name": "舒特膚 AD 乳液 200ml", + "product_id": 1, + "momo_price": 980, + "competitor_product_id": "DDAB01-1900ABCD", + }]) + + assert requested == ["DDAB01-1900ABCD"] + assert result.matched == 1 + assert writes[0]["product_id"] == "DDAB01-1900ABCD" + assert "identity_v2" in writes[0]["tags"] + assert "refresh_known_identity" in writes[0]["tags"] + assert attempts[0]["attempt_status"] == "matched" + assert attempts[0]["search_terms"] == ["known_product_id:DDAB01-1900ABCD"] diff --git a/tests/test_frontend_v2_assets.py b/tests/test_frontend_v2_assets.py index 409be39..eeb757f 100644 --- a/tests/test_frontend_v2_assets.py +++ b/tests/test_frontend_v2_assets.py @@ -359,6 +359,7 @@ def test_ai_product_pick_agent_uses_real_competitor_data_and_dashboard_action(): feeder_source = (ROOT / "services/competitor_price_feeder.py").read_text(encoding="utf-8") agent_source = (ROOT / "services/ai_product_pick_agent.py").read_text(encoding="utf-8") route_source = (ROOT / "routes/ai_routes.py").read_text(encoding="utf-8") + scheduler_source = (ROOT / "scheduler.py").read_text(encoding="utf-8") template = (ROOT / "templates/ai_intelligence.html").read_text(encoding="utf-8") assert "MIN_MATCH_SCORE = 0.76" in feeder_source @@ -369,7 +370,11 @@ def test_ai_product_pick_agent_uses_real_competitor_data_and_dashboard_action(): assert "_search_pchome_candidates" in feeder_source assert "crawler.search_products(keyword, limit=SEARCH_LIMIT)" in feeder_source assert "_fetch_unmatched_priority_skus" in feeder_source + assert "_fetch_expired_identity_skus" in feeder_source + assert "run_expired_identity_refresh" in feeder_source + assert "fetch_product_details(requested_ids" in feeder_source assert "run_unmatched_priority" in feeder_source + assert "run_expired_identity_refresh(limit=240)" in scheduler_source assert "generate_product_pick_list" in agent_source assert "clear_dashboard_cache()" in route_source