"""Persist lightweight PChome match backfill run status. The PChome backfill endpoint runs in a background thread. A tiny JSON status file gives operators progress, last result, and failure context without adding new schema or blocking the dashboard request path. """ from __future__ import annotations import json import os import time import uuid from copy import deepcopy from datetime import datetime from typing import Any try: from config import BASE_DIR except Exception: # pragma: no cover - import fallback for isolated scripts BASE_DIR = os.getcwd() RECENT_RUN_LIMIT = 8 ACTIVE_TTL_SECONDS = int(os.getenv("PCHOME_BACKFILL_ACTIVE_TTL_SECONDS", "7200")) STAGE_ORDER = ( "queued", "refreshing_stale", "recovering_stale", "revalidating", "matching", "generating_picks", "clearing_cache", "completed", ) STAGE_LABELS = { "idle": "尚未執行", "queued": "已排入背景補抓", "refreshing_stale": "刷新過期 PChome 價格", "recovering_stale": "搜尋救援過期 PChome identity", "revalidating": "重新評分近門檻候選", "matching": "比對高優先未配對商品", "generating_picks": "重算 AI 挑品清單", "clearing_cache": "清除看板與競價快取", "completed": "產線完成", "failed": "產線失敗", "stale": "執行狀態逾時", } class PchomeBackfillAlreadyRunning(RuntimeError): """Raised when a fresh backfill run is already active.""" def __init__(self, status: dict[str, Any]): super().__init__("PChome backfill is already running") self.status = status def _status_path() -> str: return os.getenv( "PCHOME_BACKFILL_STATUS_PATH", os.path.join( os.getenv("DATA_DIR", os.path.join(str(BASE_DIR), "data")), "pchome_match_backfill_status.json", ), ) def _now_label() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _run_id() -> str: return f"pchome-backfill-{datetime.now().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}" def _read_payload() -> dict[str, Any]: path = _status_path() try: if not os.path.isfile(path): return _default_status() with open(path, "r", encoding="utf-8") as handle: payload = json.load(handle) if not isinstance(payload, dict): return _default_status() return _normalize_status(payload) except Exception: return _default_status() def _write_payload(payload: dict[str, Any]) -> None: path = _status_path() directory = os.path.dirname(path) tmp_path = f"{path}.{os.getpid()}.tmp" os.makedirs(directory, exist_ok=True) with open(tmp_path, "w", encoding="utf-8") as handle: json.dump(_normalize_status(payload), handle, ensure_ascii=False, indent=2, default=str) os.replace(tmp_path, path) def _default_status() -> dict[str, Any]: return { "status": "idle", "stage": "idle", "stage_label": STAGE_LABELS["idle"], "progress_pct": 0, "running": False, "current_run": None, "recent_runs": [], "last_result": None, "last_error": None, "updated_at": None, } def _age_seconds(value: str | None) -> float | None: if not value: return None try: parsed = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") return max(0.0, time.time() - parsed.timestamp()) except Exception: return None def _progress_for_stage(stage: str) -> int: if stage == "failed": return 100 if stage not in STAGE_ORDER: return 0 return int(round((STAGE_ORDER.index(stage) + 1) / len(STAGE_ORDER) * 100)) def _normalize_run(run: Any) -> dict[str, Any] | None: if not isinstance(run, dict): return None normalized = dict(run) stage = str(normalized.get("stage") or normalized.get("status") or "idle") status = str(normalized.get("status") or "idle") normalized["stage"] = stage normalized["status"] = status normalized["stage_label"] = STAGE_LABELS.get(stage, stage) normalized["progress_pct"] = int(normalized.get("progress_pct") or _progress_for_stage(stage)) normalized.setdefault("result", None) normalized.setdefault("pick_result", None) normalized.setdefault("last_error", None) normalized.setdefault("message", None) return normalized def _normalize_status(payload: dict[str, Any]) -> dict[str, Any]: normalized = dict(_default_status()) normalized.update({key: value for key, value in payload.items() if key in normalized}) current = _normalize_run(payload.get("current_run")) recent_runs = [ run for run in (_normalize_run(item) for item in payload.get("recent_runs", [])) if run ][:RECENT_RUN_LIMIT] if current: age = _age_seconds(current.get("updated_at") or current.get("started_at")) is_fresh_running = current.get("status") == "running" and ( age is None or age <= ACTIVE_TTL_SECONDS ) if current.get("status") == "running" and not is_fresh_running: current["status"] = "stale" current["stage"] = "stale" current["stage_label"] = STAGE_LABELS["stale"] current["running"] = False current["progress_pct"] = 100 current["last_error"] = current.get("last_error") or "active run exceeded ttl" else: current["running"] = is_fresh_running normalized["current_run"] = current normalized["status"] = current["status"] normalized["stage"] = current["stage"] normalized["stage_label"] = current["stage_label"] normalized["progress_pct"] = current["progress_pct"] normalized["running"] = bool(current.get("running")) normalized["last_result"] = current.get("result") or payload.get("last_result") normalized["last_error"] = current.get("last_error") or payload.get("last_error") normalized["recent_runs"] = recent_runs normalized["updated_at"] = payload.get("updated_at") or ( current.get("updated_at") if current else None ) return normalized def get_pchome_backfill_status() -> dict[str, Any]: """Return the latest persisted PChome backfill status.""" return _read_payload() def start_pchome_backfill_run(limit: int, operator: str | None = None) -> dict[str, Any]: """Create a fresh running status or raise when another run is active.""" status = _read_payload() if status.get("running"): raise PchomeBackfillAlreadyRunning(status) now = _now_label() run = { "run_id": _run_id(), "status": "running", "stage": "queued", "stage_label": STAGE_LABELS["queued"], "progress_pct": _progress_for_stage("queued"), "running": True, "limit": int(limit), "operator": operator or "web", "started_at": now, "updated_at": now, "finished_at": None, "message": "等待背景執行緒啟動", "result": None, "pick_result": None, "last_error": None, } status["current_run"] = run status["recent_runs"] = status.get("recent_runs", [])[:RECENT_RUN_LIMIT] status["last_error"] = None status["updated_at"] = now _write_payload(status) return deepcopy(run) def update_pchome_backfill_run( run_id: str, *, stage: str, message: str | None = None, result: dict[str, Any] | None = None, pick_result: dict[str, Any] | None = None, ) -> dict[str, Any]: """Update an active run stage.""" status = _read_payload() current = _normalize_run(status.get("current_run")) or {} if current.get("run_id") != run_id: current = {"run_id": run_id, "status": "running", "started_at": _now_label()} now = _now_label() current.update( { "status": "running", "stage": stage, "stage_label": STAGE_LABELS.get(stage, stage), "progress_pct": _progress_for_stage(stage), "running": True, "updated_at": now, } ) if message is not None: current["message"] = message if result is not None: current["result"] = result if pick_result is not None: current["pick_result"] = pick_result status["current_run"] = current status["updated_at"] = now _write_payload(status) return deepcopy(current) def finish_pchome_backfill_run( run_id: str, *, result: dict[str, Any] | None = None, pick_result: dict[str, Any] | None = None, message: str | None = None, ) -> dict[str, Any]: """Mark a run completed and append it to recent history.""" return _finish_run( run_id, status_value="completed", stage="completed", message=message or "PChome 補抓完成", result=result, pick_result=pick_result, error=None, ) def fail_pchome_backfill_run(run_id: str, error: str) -> dict[str, Any]: """Mark a run failed and append it to recent history.""" return _finish_run( run_id, status_value="failed", stage="failed", message="PChome 補抓失敗", result=None, pick_result=None, error=error, ) def _finish_run( run_id: str, *, status_value: str, stage: str, message: str, result: dict[str, Any] | None, pick_result: dict[str, Any] | None, error: str | None, ) -> dict[str, Any]: status = _read_payload() current = _normalize_run(status.get("current_run")) or {} now = _now_label() if current.get("run_id") != run_id: current = {"run_id": run_id, "started_at": now} current.update( { "status": status_value, "stage": stage, "stage_label": STAGE_LABELS.get(stage, stage), "progress_pct": _progress_for_stage(stage), "running": False, "updated_at": now, "finished_at": now, "message": message, "last_error": error, } ) if result is not None: current["result"] = result if pick_result is not None: current["pick_result"] = pick_result recent_runs = [current] for run in status.get("recent_runs", []): normalized_run = _normalize_run(run) if normalized_run and normalized_run.get("run_id") != run_id: recent_runs.append(normalized_run) status["current_run"] = current status["recent_runs"] = recent_runs[:RECENT_RUN_LIMIT] status["last_result"] = current.get("result") status["last_error"] = error status["updated_at"] = now _write_payload(status) return deepcopy(current)