From a3ace326c8424e6becd74e5d419627faacfa1cb8 Mon Sep 17 00:00:00 2001 From: OoO Date: Mon, 15 Jun 2026 21:34:48 +0800 Subject: [PATCH] =?UTF-8?q?V10.609=20=E8=87=AA=E5=8B=95=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=A4=96=E9=83=A8=E5=A0=B1=E5=83=B9=E8=B3=87=E6=96=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.py | 2 +- docs/AI_INTELLIGENCE_MODULE_SOT.md | 5 +- .../current_execution_queue_20260524.md | 9 +- run_scheduler.py | 6 +- scheduler.py | 55 +++ services/external_market_offer_service.py | 371 ++++++++++++++++++ tests/test_external_market_offer_service.py | 150 +++++++ 7 files changed, 593 insertions(+), 5 deletions(-) diff --git a/config.py b/config.py index b100472..1f98c1d 100644 --- a/config.py +++ b/config.py @@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.608" +SYSTEM_VERSION = "V10.609" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index fc68197..8d38ef4 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -1,8 +1,8 @@ # PChome 業績成長自動化作戰系統 — AI 競價情報模組 Single Source of Truth > **最後更新**: 2026-06-15 (台北時間) -> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層與 CSV 預檢已建立 -> **適用版本**: V10.608 +> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步與 CSV 備援預檢已建立 +> **適用版本**: V10.609 --- @@ -55,6 +55,7 @@ - 蝦皮與酷澎暫停接入,不進作戰清單、不發告警;後續只可透過 official API / provider API / manual CSV 進 `external_offers` 類正規化層,並清楚標示資料品質。 - V10.607 新增 `external_market_sources` / `external_offers` 正規化層與 `/api/ai/pchome-growth/source-contract` 只讀 API。MOMO 先以既有比價快取橋接進來源狀態;蝦皮與酷澎只保留 official API、provider API、manual CSV contract,預設暫停且不進告警。 - V10.608 新增 `/api/ai/pchome-growth/external-offers/csv-dry-run` 與 AI 情報頁「外部報價預檢」。CSV 預檢只讀、不寫 DB;逐列回報「可使用」「需人工確認」「不能使用」,並支援中文表頭,避免格式小錯造成整批匯入失敗。 +- V10.609 明確把外部報價主路徑改為自動化:`run_external_offer_sync_task` 每 4 小時將已確認同款的既有比價快取同步進 `external_offers`。CSV 只保留為 API / crawler / provider 失敗時的備援預檢入口,不是日常營運主流程。 ## 零之一、12 Agent 決策信封(2026-05-24) diff --git a/docs/memory/current_execution_queue_20260524.md b/docs/memory/current_execution_queue_20260524.md index dbefd69..09934fd 100644 --- a/docs/memory/current_execution_queue_20260524.md +++ b/docs/memory/current_execution_queue_20260524.md @@ -198,4 +198,11 @@ - 新增 `/api/ai/pchome-growth/external-offers/csv-dry-run`,接受 CSV 檔案或貼上的 CSV 文字,只做預檢、不寫 DB。 - AI 情報頁新增「外部報價預檢」區塊,顯示可使用、待確認、不能使用;用字保持白話,不顯示工程欄位給一般使用者。 - 預檢支援中文表頭,例如「資料來源、外部商品ID、商品名稱、售價、資料時間、取得方式、PChome商品ID、同款狀態、資料可信度」。 -- 下一步:在 dry-run 通過後新增人工批准寫入器,先寫 `external_offers`,再串回 PChome 成長作戰清單。 +- CSV 預檢是備援入口,不是日常主流程。 + +## 12. 2026-06-15 V10.609 外部報價自動同步 + +- 新增 `sync_legacy_momo_reference_offers()`,自動把已確認同款的既有比價快取同步進 `external_offers`。 +- 新增 `run_external_offer_sync_task`,每 4 小時自動執行;排在 competitor feeder 後,同步 MOMO 外部價格參考資料層。 +- CSV 保留為 API / crawler / provider 故障時的救援預檢;日常目標是自動抓、自動同步、自動進作戰清單。 +- 下一步:讓 PChome 成長作戰清單優先讀 `external_offers`,再 fallback 舊 `competitor_prices`,逐步把舊表降為 bridge。 diff --git a/run_scheduler.py b/run_scheduler.py index c3c1339..340a5b8 100644 --- a/run_scheduler.py +++ b/run_scheduler.py @@ -5,7 +5,7 @@ run_scheduler.py — momo-scheduler 容器入口點 排程任務清單(對齊 app.py init_scheduler + scheduler.py 全任務): 每 30 分鐘:auto_import、whitepage_check 每 1 小時:momo、edm、festival - 每 4 小時:competitor_price_feeder、icaim_analysis + 每 4 小時:competitor_price_feeder、external_offer_sync、icaim_analysis 每 6 小時:quality_rescore、action_plan_hygiene 每 12 小時:dedup_batch 每 10 分鐘:ppt_auto_generation_catchup(補跑被長任務卡過的定期簡報) @@ -33,6 +33,7 @@ from scheduler import ( run_auto_import_task, run_whitepage_check, run_competitor_price_feeder_task, + run_external_offer_sync_task, run_pchome_match_backfill_task, run_icaim_analysis_task, run_weekly_strategy_task, @@ -175,6 +176,9 @@ def _register_schedules(): schedule.every(4).hours.do(run_competitor_price_feeder_task) logger.info("📅 每 4 小時:competitor_price_feeder") + schedule.every(4).hours.do(run_external_offer_sync_task) + logger.info("📅 每 4 小時:external_offer_sync(自動同步 MOMO 外部價格參考)") + schedule.every(4).hours.do(run_icaim_analysis_task) logger.info("📅 每 4 小時:icaim_analysis") diff --git a/scheduler.py b/scheduler.py index 57fe895..121a91d 100644 --- a/scheduler.py +++ b/scheduler.py @@ -2257,6 +2257,61 @@ def run_competitor_price_feeder_task(): logging.error(f"[Scheduler] [Feeder] event_router 失敗: {_router_e}") +def run_external_offer_sync_task(): + """ + 外部報價正規化同步任務(每 4 小時執行一次) + 將已確認同款的既有比價快取自動同步到 external_offers,讓 PChome 成長作戰清單 + 能吃共同資料層。CSV 僅保留備援,不是日常主流程。 + """ + try: + from config import DATABASE_PATH + from sqlalchemy import create_engine + from services.external_market_offer_service import sync_legacy_momo_reference_offers + + now_str = datetime.now(TAIPEI_TZ).strftime('%Y-%m-%d %H:%M') + limit = int(os.getenv("EXTERNAL_OFFER_SYNC_LIMIT", "1000")) + logging.info(f"[Scheduler] [ExternalOfferSync] 🚀 啟動外部報價自動同步 | {now_str}") + + engine = create_engine(DATABASE_PATH) + try: + result = sync_legacy_momo_reference_offers(engine, limit=limit, dry_run=False) + finally: + engine.dispose() + + stats = { + "status": "Success" if result.get("success") else "Skipped", + "candidate_count": result.get("candidate_count", 0), + "written_count": result.get("written_count", 0), + "source_code": result.get("source_code", "momo_reference"), + "message": result.get("message"), + } + logging.info( + "[Scheduler] [ExternalOfferSync] ✅ 完成 | candidates=%s written=%s status=%s", + stats["candidate_count"], + stats["written_count"], + result.get("status"), + ) + _save_stats('external_offer_sync', stats) + + except Exception as e: + import traceback as _tb + logging.error(f"[Scheduler] [ExternalOfferSync] 🚨 任務異常 | Error: {e}") + _save_stats('external_offer_sync', {"status": "Failed", "error": str(e)}) + try: + from services.event_router import notify_failure + notify_failure( + task_name="run_external_offer_sync_task", + error=e, + source="Scheduler.ExternalOfferSync", + event_type="external_offer_sync_failure", + priority="P2", + title="外部報價自動同步異常", + trace=_tb.format_exc(), + ) + except Exception as _router_e: + logging.error(f"[Scheduler] [ExternalOfferSync] event_router 失敗: {_router_e}") + + def run_pchome_match_backfill_task(): """ PChome 待比對商品補抓任務(每日執行) diff --git a/services/external_market_offer_service.py b/services/external_market_offer_service.py index 7fbfc88..3067d09 100644 --- a/services/external_market_offer_service.py +++ b/services/external_market_offer_service.py @@ -211,6 +211,15 @@ def _has_table(conn, table_name: str) -> bool: return False +def _quality_score_from_match(value: Any) -> float: + score = _to_float(value) + if score is None: + return 0.0 + if 0 <= score <= 1: + score *= 100 + return max(0.0, min(100.0, score)) + + def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalOfferPayload | None, list[str]]: """把 official API / provider API / manual CSV 的資料轉成同一份欄位。""" errors: list[str] = [] @@ -276,6 +285,368 @@ def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalO return record, [] +def _ensure_external_market_source_seeds(conn) -> None: + if not _has_table(conn, "external_market_sources"): + return + + for source in SOURCE_CONTRACTS: + payload = { + "code": source["code"], + "display_name": source["display_name"], + "platform_code": source["platform_code"], + "source_kind": source["source_kind"], + "status": source["status_code"], + "enabled": source["status_code"] == "active", + "write_enabled": False, + "allowed_input_methods_json": json.dumps(source["input_methods"], ensure_ascii=False), + "quality_policy_json": json.dumps({ + "minimum_match_status": "verified", + "minimum_quality_score": 76, + }, ensure_ascii=False), + "plain_note": source["plain_note"], + } + if conn.dialect.name == "postgresql": + conn.execute(text(""" + INSERT INTO external_market_sources ( + code, display_name, platform_code, source_kind, status, enabled, + write_enabled, allowed_input_methods_json, quality_policy_json, plain_note + ) + VALUES ( + :code, :display_name, :platform_code, :source_kind, :status, :enabled, + :write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note + ) + ON CONFLICT (code) DO UPDATE SET + display_name = EXCLUDED.display_name, + platform_code = EXCLUDED.platform_code, + source_kind = EXCLUDED.source_kind, + status = EXCLUDED.status, + enabled = EXCLUDED.enabled, + allowed_input_methods_json = EXCLUDED.allowed_input_methods_json, + quality_policy_json = EXCLUDED.quality_policy_json, + plain_note = EXCLUDED.plain_note, + updated_at = NOW() + """), payload) + else: + conn.execute(text(""" + INSERT INTO external_market_sources ( + code, display_name, platform_code, source_kind, status, enabled, + write_enabled, allowed_input_methods_json, quality_policy_json, plain_note + ) + VALUES ( + :code, :display_name, :platform_code, :source_kind, :status, :enabled, + :write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note + ) + ON CONFLICT (code) DO UPDATE SET + display_name = excluded.display_name, + platform_code = excluded.platform_code, + source_kind = excluded.source_kind, + status = excluded.status, + enabled = excluded.enabled, + allowed_input_methods_json = excluded.allowed_input_methods_json, + quality_policy_json = excluded.quality_policy_json, + plain_note = excluded.plain_note, + updated_at = CURRENT_TIMESTAMP + """), payload) + + +def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]: + if not all(_has_table(conn, table) for table in {"competitor_prices", "products", "price_records"}): + return [] + + if conn.dialect.name == "postgresql": + sql = """ + WITH valid_cp AS ( + SELECT DISTINCT ON (cp.sku, cp.competitor_product_id) + cp.sku AS momo_sku, + cp.competitor_product_id AS pchome_product_id, + cp.competitor_product_name AS pchome_product_name, + cp.match_score, + cp.tags::text AS tags, + cp.crawled_at, + cp.expires_at + FROM competitor_prices cp + WHERE cp.source = 'pchome' + AND cp.sku IS NOT NULL + AND cp.competitor_product_id IS NOT NULL + AND cp.price IS NOT NULL + AND cp.price > 0 + AND COALESCE(cp.match_score, 0) >= 0.76 + AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP) + AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2' + ORDER BY cp.sku, cp.competitor_product_id, cp.crawled_at DESC NULLS LAST + ) + SELECT + vc.*, + lm.momo_name, + lm.product_url, + lm.image_url, + lm.momo_price, + lm.momo_price_at + FROM valid_cp vc + JOIN LATERAL ( + SELECT + p.name AS momo_name, + p.url AS product_url, + p.image_url AS image_url, + pr.price AS momo_price, + pr.timestamp AS momo_price_at + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.i_code = vc.momo_sku + AND p.status = 'ACTIVE' + AND pr.price IS NOT NULL + AND pr.price > 0 + ORDER BY pr.timestamp DESC, pr.id DESC + LIMIT 1 + ) lm ON TRUE + ORDER BY COALESCE(lm.momo_price_at, vc.crawled_at) DESC NULLS LAST + LIMIT :limit + """ + else: + sql = """ + WITH latest_cp AS ( + SELECT + cp.sku AS momo_sku, + cp.competitor_product_id AS pchome_product_id, + cp.competitor_product_name AS pchome_product_name, + cp.match_score, + cp.tags AS tags, + cp.crawled_at, + cp.expires_at, + ROW_NUMBER() OVER ( + PARTITION BY cp.sku, cp.competitor_product_id + ORDER BY cp.crawled_at DESC + ) AS rn + FROM competitor_prices cp + WHERE cp.source = 'pchome' + AND cp.sku IS NOT NULL + AND cp.competitor_product_id IS NOT NULL + AND cp.price IS NOT NULL + AND cp.price > 0 + AND COALESCE(cp.match_score, 0) >= 0.76 + AND COALESCE(cp.tags, '') LIKE '%identity_v2%' + ), + latest_momo AS ( + SELECT + p.i_code AS momo_sku, + p.name AS momo_name, + p.url AS product_url, + p.image_url AS image_url, + pr.price AS momo_price, + pr.timestamp AS momo_price_at, + ROW_NUMBER() OVER ( + PARTITION BY p.i_code + ORDER BY pr.timestamp DESC, pr.id DESC + ) AS rn + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.status = 'ACTIVE' + AND pr.price IS NOT NULL + AND pr.price > 0 + ) + SELECT + cp.momo_sku, + cp.pchome_product_id, + cp.pchome_product_name, + cp.match_score, + cp.tags, + cp.crawled_at, + cp.expires_at, + lm.momo_name, + lm.product_url, + lm.image_url, + lm.momo_price, + lm.momo_price_at + FROM latest_cp cp + JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1 + WHERE cp.rn = 1 + ORDER BY COALESCE(lm.momo_price_at, cp.crawled_at) DESC + LIMIT :limit + """ + + return [dict(row) for row in conn.execute(text(sql), {"limit": limit}).mappings().all()] + + +def _legacy_row_to_external_offer(row: dict[str, Any]) -> dict[str, Any]: + momo_sku = str(row.get("momo_sku") or "").strip() + pchome_product_id = str(row.get("pchome_product_id") or "").strip() + observed_at = row.get("momo_price_at") or row.get("crawled_at") + title = str(row.get("momo_name") or row.get("pchome_product_name") or momo_sku).strip() + quality_score = _quality_score_from_match(row.get("match_score")) + notes = [ + "由已確認同款的舊比價快取自動同步", + "MOMO 最新價格作為外部參考價", + ] + return { + "source_code": "momo_reference", + "platform_code": "momo", + "source_product_id": momo_sku, + "source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}", + "title": title or momo_sku, + "brand": None, + "category_text": None, + "product_url": row.get("product_url"), + "image_url": row.get("image_url"), + "price": _to_float(row.get("momo_price")), + "original_price": None, + "currency": "TWD", + "stock_status": None, + "sold_count": None, + "rating": None, + "review_count": None, + "observed_at": observed_at, + "expires_at": row.get("expires_at"), + "ingestion_method": "legacy_competitor_cache", + "connector_key": "competitor_prices", + "pchome_product_id": pchome_product_id, + "momo_sku": momo_sku, + "match_status": "verified", + "quality_score": quality_score, + "data_quality_status": "verified" if quality_score >= 76 else "needs_review", + "quality_notes_json": json.dumps(notes, ensure_ascii=False), + "raw_payload_json": json.dumps({ + "legacy_source": "competitor_prices", + "match_score": str(row.get("match_score") or ""), + "tags": row.get("tags"), + "crawled_at": str(row.get("crawled_at") or ""), + "momo_price_at": str(row.get("momo_price_at") or ""), + }, ensure_ascii=False), + } + + +def _upsert_external_offer(conn, payload: dict[str, Any]) -> None: + if conn.dialect.name == "postgresql": + sql = """ + INSERT INTO external_offers ( + source_code, platform_code, source_product_id, source_offer_key, title, + brand, category_text, product_url, image_url, price, original_price, + currency, stock_status, sold_count, rating, review_count, observed_at, + expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku, + match_status, quality_score, data_quality_status, quality_notes_json, + raw_payload_json + ) + VALUES ( + :source_code, :platform_code, :source_product_id, :source_offer_key, :title, + :brand, :category_text, :product_url, :image_url, :price, :original_price, + :currency, :stock_status, :sold_count, :rating, :review_count, :observed_at, + :expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku, + :match_status, :quality_score, :data_quality_status, :quality_notes_json, + :raw_payload_json + ) + ON CONFLICT ON CONSTRAINT uq_external_offer_source_product_observed DO UPDATE SET + source_offer_key = EXCLUDED.source_offer_key, + title = EXCLUDED.title, + product_url = EXCLUDED.product_url, + image_url = EXCLUDED.image_url, + price = EXCLUDED.price, + original_price = EXCLUDED.original_price, + expires_at = EXCLUDED.expires_at, + connector_key = EXCLUDED.connector_key, + pchome_product_id = EXCLUDED.pchome_product_id, + momo_sku = EXCLUDED.momo_sku, + match_status = EXCLUDED.match_status, + quality_score = EXCLUDED.quality_score, + data_quality_status = EXCLUDED.data_quality_status, + quality_notes_json = EXCLUDED.quality_notes_json, + raw_payload_json = EXCLUDED.raw_payload_json, + updated_at = NOW() + """ + else: + sql = """ + INSERT INTO external_offers ( + source_code, platform_code, source_product_id, source_offer_key, title, + brand, category_text, product_url, image_url, price, original_price, + currency, stock_status, sold_count, rating, review_count, observed_at, + expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku, + match_status, quality_score, data_quality_status, quality_notes_json, + raw_payload_json + ) + VALUES ( + :source_code, :platform_code, :source_product_id, :source_offer_key, :title, + :brand, :category_text, :product_url, :image_url, :price, :original_price, + :currency, :stock_status, :sold_count, :rating, :review_count, :observed_at, + :expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku, + :match_status, :quality_score, :data_quality_status, :quality_notes_json, + :raw_payload_json + ) + ON CONFLICT (source_code, source_product_id, observed_at, ingestion_method) DO UPDATE SET + source_offer_key = excluded.source_offer_key, + title = excluded.title, + product_url = excluded.product_url, + image_url = excluded.image_url, + price = excluded.price, + original_price = excluded.original_price, + expires_at = excluded.expires_at, + connector_key = excluded.connector_key, + pchome_product_id = excluded.pchome_product_id, + momo_sku = excluded.momo_sku, + match_status = excluded.match_status, + quality_score = excluded.quality_score, + data_quality_status = excluded.data_quality_status, + quality_notes_json = excluded.quality_notes_json, + raw_payload_json = excluded.raw_payload_json, + updated_at = CURRENT_TIMESTAMP + """ + conn.execute(text(sql), payload) + + +def sync_legacy_momo_reference_offers(engine, *, limit: int = 500, dry_run: bool = False) -> dict[str, Any]: + """把既有已確認同款的比價快取自動同步到 external_offers。""" + limit = max(1, min(int(limit or 500), 5000)) + generated_at = datetime.now().isoformat(timespec="seconds") + required_tables = {"external_market_sources", "external_offers", "competitor_prices", "products", "price_records"} + + with engine.begin() as conn: + missing_tables = sorted(table for table in required_tables if not _has_table(conn, table)) + if missing_tables: + return { + "success": False, + "status": "skipped", + "generated_at": generated_at, + "candidate_count": 0, + "written_count": 0, + "dry_run": dry_run, + "message": "外部報價同步暫時無法執行,缺少必要資料表。", + "missing_tables": missing_tables, + } + + _ensure_external_market_source_seeds(conn) + rows = _fetch_legacy_momo_reference_rows(conn, limit) + offers = [_legacy_row_to_external_offer(row) for row in rows] + offers = [ + offer for offer in offers + if offer["source_product_id"] and offer["pchome_product_id"] and offer["price"] + ] + + if not dry_run: + for offer in offers: + _upsert_external_offer(conn, offer) + + return { + "success": True, + "status": "dry_run" if dry_run else "synced", + "generated_at": generated_at, + "candidate_count": len(rows), + "written_count": 0 if dry_run else len(offers), + "dry_run": dry_run, + "source_code": "momo_reference", + "message": ( + "已完成 MOMO 外部價格參考自動同步。" + if not dry_run + else "已完成 MOMO 外部價格參考同步預檢,尚未寫入資料。" + ), + "sample_rows": [ + { + "momo_sku": offer["momo_sku"], + "pchome_product_id": offer["pchome_product_id"], + "price": offer["price"], + "quality_score": offer["quality_score"], + } + for offer in offers[:5] + ], + } + + def _normalize_header(header: str) -> str: cleaned = str(header or "").strip().replace("\ufeff", "") for canonical, aliases in CSV_HEADER_ALIASES.items(): diff --git a/tests/test_external_market_offer_service.py b/tests/test_external_market_offer_service.py index 10aba98..ceaac6a 100644 --- a/tests/test_external_market_offer_service.py +++ b/tests/test_external_market_offer_service.py @@ -84,6 +84,144 @@ def test_external_offer_csv_dry_run_reports_empty_file_plainly(): assert payload["summary"]["blocked_count"] == 0 +def _seed_external_offer_sync_tables(engine): + with engine.begin() as conn: + conn.execute(text(""" + CREATE TABLE external_market_sources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT UNIQUE NOT NULL, + display_name TEXT NOT NULL, + platform_code TEXT NOT NULL, + source_kind TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'paused', + enabled BOOLEAN NOT NULL DEFAULT 0, + write_enabled BOOLEAN NOT NULL DEFAULT 0, + allowed_input_methods_json TEXT, + quality_policy_json TEXT, + plain_note TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + """)) + conn.execute(text(""" + CREATE TABLE external_offers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_code TEXT NOT NULL, + platform_code TEXT NOT NULL, + source_product_id TEXT NOT NULL, + source_offer_key TEXT NOT NULL, + title TEXT NOT NULL, + brand TEXT, + category_text TEXT, + product_url TEXT, + image_url TEXT, + price REAL, + original_price REAL, + currency TEXT NOT NULL DEFAULT 'TWD', + stock_status TEXT, + sold_count INTEGER, + rating REAL, + review_count INTEGER, + observed_at TEXT NOT NULL, + expires_at TEXT, + ingestion_method TEXT NOT NULL, + connector_key TEXT, + pchome_product_id TEXT, + momo_sku TEXT, + match_status TEXT NOT NULL DEFAULT 'unmatched', + quality_score REAL NOT NULL DEFAULT 0, + data_quality_status TEXT NOT NULL DEFAULT 'needs_review', + quality_notes_json TEXT, + raw_payload_json TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE (source_code, source_product_id, observed_at, ingestion_method) + ) + """)) + conn.execute(text( + "CREATE TABLE competitor_prices (" + "sku TEXT, source TEXT, competitor_product_id TEXT, competitor_product_name TEXT, " + "price REAL, match_score REAL, tags TEXT, crawled_at TEXT, expires_at TEXT)" + )) + conn.execute(text( + "CREATE TABLE products (" + "id INTEGER PRIMARY KEY, i_code TEXT, name TEXT, url TEXT, image_url TEXT, status TEXT)" + )) + conn.execute(text( + "CREATE TABLE price_records (" + "id INTEGER PRIMARY KEY, product_id INTEGER, price REAL, timestamp TEXT)" + )) + conn.execute(text(""" + INSERT INTO competitor_prices + (sku, source, competitor_product_id, competitor_product_name, + price, match_score, tags, crawled_at, expires_at) + VALUES + ('MOMO-1', 'pchome', 'PCH-1', 'PChome 商品', + 1000, 0.91, '["identity_v2"]', '2026-06-15 09:30:00', NULL), + ('MOMO-2', 'pchome', 'PCH-2', '低信心商品', + 600, 0.60, '["identity_v2"]', '2026-06-15 09:30:00', NULL) + """)) + conn.execute(text( + "INSERT INTO products (id, i_code, name, url, image_url, status) " + "VALUES (1, 'MOMO-1', 'MOMO 同款商品', 'https://momo.test/1', 'https://img.test/1.jpg', 'ACTIVE')" + )) + conn.execute(text( + "INSERT INTO price_records (id, product_id, price, timestamp) " + "VALUES (1, 1, 900, '2026-06-15 10:00:00')" + )) + + +def test_sync_legacy_momo_reference_offers_writes_verified_cache_to_external_offers(): + from services.external_market_offer_service import sync_legacy_momo_reference_offers + + engine = create_engine("sqlite:///:memory:") + _seed_external_offer_sync_tables(engine) + + payload = sync_legacy_momo_reference_offers(engine, limit=20) + + assert payload["success"] is True + assert payload["status"] == "synced" + assert payload["candidate_count"] == 1 + assert payload["written_count"] == 1 + with engine.connect() as conn: + rows = conn.execute(text(""" + SELECT source_code, platform_code, source_product_id, title, price, + pchome_product_id, momo_sku, match_status, quality_score, + data_quality_status, ingestion_method + FROM external_offers + """)).mappings().all() + + assert len(rows) == 1 + row = dict(rows[0]) + assert row["source_code"] == "momo_reference" + assert row["platform_code"] == "momo" + assert row["source_product_id"] == "MOMO-1" + assert row["pchome_product_id"] == "PCH-1" + assert row["title"] == "MOMO 同款商品" + assert row["price"] == 900 + assert row["match_status"] == "verified" + assert row["quality_score"] == 91 + assert row["data_quality_status"] == "verified" + assert row["ingestion_method"] == "legacy_competitor_cache" + + +def test_sync_legacy_momo_reference_offers_dry_run_does_not_write(): + from services.external_market_offer_service import sync_legacy_momo_reference_offers + + engine = create_engine("sqlite:///:memory:") + _seed_external_offer_sync_tables(engine) + + payload = sync_legacy_momo_reference_offers(engine, limit=20, dry_run=True) + + assert payload["success"] is True + assert payload["status"] == "dry_run" + assert payload["candidate_count"] == 1 + assert payload["written_count"] == 0 + with engine.connect() as conn: + count = conn.execute(text("SELECT COUNT(*) FROM external_offers")).scalar() + assert count == 0 + + def test_external_source_readiness_uses_legacy_momo_reference_cache(): from services.external_market_offer_service import build_external_source_readiness @@ -134,3 +272,15 @@ def test_external_offer_csv_dry_run_route_is_registered_as_post_only(): assert "@ai_bp.route('/api/ai/pchome-growth/external-offers/csv-dry-run', methods=['POST'])" in route_source assert "dry_run_external_offer_csv" in route_source + + +def test_external_offer_sync_is_registered_in_scheduler(): + from pathlib import Path + + scheduler_source = Path("scheduler.py").read_text(encoding="utf-8") + run_scheduler_source = Path("run_scheduler.py").read_text(encoding="utf-8") + + assert "def run_external_offer_sync_task" in scheduler_source + assert "sync_legacy_momo_reference_offers" in scheduler_source + assert "run_external_offer_sync_task" in run_scheduler_source + assert "external_offer_sync" in run_scheduler_source