349 lines
10 KiB
Python
349 lines
10 KiB
Python
"""Persist lightweight PChome match backfill run status.
|
|
|
|
The PChome backfill endpoint runs in a background thread. A tiny JSON status
|
|
file gives operators progress, last result, and failure context without adding
|
|
new schema or blocking the dashboard request path.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import uuid
|
|
from copy import deepcopy
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
try:
|
|
from config import BASE_DIR
|
|
except Exception: # pragma: no cover - import fallback for isolated scripts
|
|
BASE_DIR = os.getcwd()
|
|
|
|
|
|
RECENT_RUN_LIMIT = 8
|
|
ACTIVE_TTL_SECONDS = int(os.getenv("PCHOME_BACKFILL_ACTIVE_TTL_SECONDS", "7200"))
|
|
|
|
STAGE_ORDER = (
|
|
"queued",
|
|
"refreshing_stale",
|
|
"recovering_stale",
|
|
"revalidating",
|
|
"matching",
|
|
"generating_picks",
|
|
"clearing_cache",
|
|
"completed",
|
|
)
|
|
|
|
STAGE_LABELS = {
|
|
"idle": "尚未執行",
|
|
"queued": "已排入背景補抓",
|
|
"refreshing_stale": "刷新過期 PChome 價格",
|
|
"recovering_stale": "搜尋救援過期 PChome identity",
|
|
"revalidating": "重新評分近門檻候選",
|
|
"matching": "比對高優先未配對商品",
|
|
"generating_picks": "重算 AI 挑品清單",
|
|
"clearing_cache": "清除看板與競價快取",
|
|
"completed": "產線完成",
|
|
"failed": "產線失敗",
|
|
"stale": "執行狀態逾時",
|
|
}
|
|
|
|
|
|
class PchomeBackfillAlreadyRunning(RuntimeError):
|
|
"""Raised when a fresh backfill run is already active."""
|
|
|
|
def __init__(self, status: dict[str, Any]):
|
|
super().__init__("PChome backfill is already running")
|
|
self.status = status
|
|
|
|
|
|
def _status_path() -> str:
|
|
return os.getenv(
|
|
"PCHOME_BACKFILL_STATUS_PATH",
|
|
os.path.join(
|
|
os.getenv("DATA_DIR", os.path.join(str(BASE_DIR), "data")),
|
|
"pchome_match_backfill_status.json",
|
|
),
|
|
)
|
|
|
|
|
|
def _now_label() -> str:
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def _run_id() -> str:
|
|
return f"pchome-backfill-{datetime.now().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}"
|
|
|
|
|
|
def _read_payload() -> dict[str, Any]:
|
|
path = _status_path()
|
|
try:
|
|
if not os.path.isfile(path):
|
|
return _default_status()
|
|
with open(path, "r", encoding="utf-8") as handle:
|
|
payload = json.load(handle)
|
|
if not isinstance(payload, dict):
|
|
return _default_status()
|
|
return _normalize_status(payload)
|
|
except Exception:
|
|
return _default_status()
|
|
|
|
|
|
def _write_payload(payload: dict[str, Any]) -> None:
|
|
path = _status_path()
|
|
directory = os.path.dirname(path)
|
|
tmp_path = f"{path}.{os.getpid()}.tmp"
|
|
os.makedirs(directory, exist_ok=True)
|
|
with open(tmp_path, "w", encoding="utf-8") as handle:
|
|
json.dump(_normalize_status(payload), handle, ensure_ascii=False, indent=2, default=str)
|
|
os.replace(tmp_path, path)
|
|
|
|
|
|
def _default_status() -> dict[str, Any]:
|
|
return {
|
|
"status": "idle",
|
|
"stage": "idle",
|
|
"stage_label": STAGE_LABELS["idle"],
|
|
"progress_pct": 0,
|
|
"running": False,
|
|
"current_run": None,
|
|
"recent_runs": [],
|
|
"last_result": None,
|
|
"last_error": None,
|
|
"updated_at": None,
|
|
}
|
|
|
|
|
|
def _age_seconds(value: str | None) -> float | None:
|
|
if not value:
|
|
return None
|
|
try:
|
|
parsed = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
|
|
return max(0.0, time.time() - parsed.timestamp())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _progress_for_stage(stage: str) -> int:
|
|
if stage == "failed":
|
|
return 100
|
|
if stage not in STAGE_ORDER:
|
|
return 0
|
|
return int(round((STAGE_ORDER.index(stage) + 1) / len(STAGE_ORDER) * 100))
|
|
|
|
|
|
def _normalize_run(run: Any) -> dict[str, Any] | None:
|
|
if not isinstance(run, dict):
|
|
return None
|
|
normalized = dict(run)
|
|
stage = str(normalized.get("stage") or normalized.get("status") or "idle")
|
|
status = str(normalized.get("status") or "idle")
|
|
normalized["stage"] = stage
|
|
normalized["status"] = status
|
|
normalized["stage_label"] = STAGE_LABELS.get(stage, stage)
|
|
normalized["progress_pct"] = int(normalized.get("progress_pct") or _progress_for_stage(stage))
|
|
normalized.setdefault("result", None)
|
|
normalized.setdefault("pick_result", None)
|
|
normalized.setdefault("last_error", None)
|
|
normalized.setdefault("message", None)
|
|
return normalized
|
|
|
|
|
|
def _normalize_status(payload: dict[str, Any]) -> dict[str, Any]:
|
|
normalized = dict(_default_status())
|
|
normalized.update({key: value for key, value in payload.items() if key in normalized})
|
|
current = _normalize_run(payload.get("current_run"))
|
|
recent_runs = [
|
|
run for run in (_normalize_run(item) for item in payload.get("recent_runs", [])) if run
|
|
][:RECENT_RUN_LIMIT]
|
|
|
|
if current:
|
|
age = _age_seconds(current.get("updated_at") or current.get("started_at"))
|
|
is_fresh_running = current.get("status") == "running" and (
|
|
age is None or age <= ACTIVE_TTL_SECONDS
|
|
)
|
|
if current.get("status") == "running" and not is_fresh_running:
|
|
current["status"] = "stale"
|
|
current["stage"] = "stale"
|
|
current["stage_label"] = STAGE_LABELS["stale"]
|
|
current["running"] = False
|
|
current["progress_pct"] = 100
|
|
current["last_error"] = current.get("last_error") or "active run exceeded ttl"
|
|
else:
|
|
current["running"] = is_fresh_running
|
|
normalized["current_run"] = current
|
|
normalized["status"] = current["status"]
|
|
normalized["stage"] = current["stage"]
|
|
normalized["stage_label"] = current["stage_label"]
|
|
normalized["progress_pct"] = current["progress_pct"]
|
|
normalized["running"] = bool(current.get("running"))
|
|
normalized["last_result"] = current.get("result") or payload.get("last_result")
|
|
normalized["last_error"] = current.get("last_error") or payload.get("last_error")
|
|
|
|
normalized["recent_runs"] = recent_runs
|
|
normalized["updated_at"] = payload.get("updated_at") or (
|
|
current.get("updated_at") if current else None
|
|
)
|
|
return normalized
|
|
|
|
|
|
def get_pchome_backfill_status() -> dict[str, Any]:
|
|
"""Return the latest persisted PChome backfill status."""
|
|
|
|
return _read_payload()
|
|
|
|
|
|
def start_pchome_backfill_run(limit: int, operator: str | None = None) -> dict[str, Any]:
|
|
"""Create a fresh running status or raise when another run is active."""
|
|
|
|
status = _read_payload()
|
|
if status.get("running"):
|
|
raise PchomeBackfillAlreadyRunning(status)
|
|
|
|
now = _now_label()
|
|
run = {
|
|
"run_id": _run_id(),
|
|
"status": "running",
|
|
"stage": "queued",
|
|
"stage_label": STAGE_LABELS["queued"],
|
|
"progress_pct": _progress_for_stage("queued"),
|
|
"running": True,
|
|
"limit": int(limit),
|
|
"operator": operator or "web",
|
|
"started_at": now,
|
|
"updated_at": now,
|
|
"finished_at": None,
|
|
"message": "等待背景執行緒啟動",
|
|
"result": None,
|
|
"pick_result": None,
|
|
"last_error": None,
|
|
}
|
|
status["current_run"] = run
|
|
status["recent_runs"] = status.get("recent_runs", [])[:RECENT_RUN_LIMIT]
|
|
status["last_error"] = None
|
|
status["updated_at"] = now
|
|
_write_payload(status)
|
|
return deepcopy(run)
|
|
|
|
|
|
def update_pchome_backfill_run(
|
|
run_id: str,
|
|
*,
|
|
stage: str,
|
|
message: str | None = None,
|
|
result: dict[str, Any] | None = None,
|
|
pick_result: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Update an active run stage."""
|
|
|
|
status = _read_payload()
|
|
current = _normalize_run(status.get("current_run")) or {}
|
|
if current.get("run_id") != run_id:
|
|
current = {"run_id": run_id, "status": "running", "started_at": _now_label()}
|
|
|
|
now = _now_label()
|
|
current.update(
|
|
{
|
|
"status": "running",
|
|
"stage": stage,
|
|
"stage_label": STAGE_LABELS.get(stage, stage),
|
|
"progress_pct": _progress_for_stage(stage),
|
|
"running": True,
|
|
"updated_at": now,
|
|
}
|
|
)
|
|
if message is not None:
|
|
current["message"] = message
|
|
if result is not None:
|
|
current["result"] = result
|
|
if pick_result is not None:
|
|
current["pick_result"] = pick_result
|
|
|
|
status["current_run"] = current
|
|
status["updated_at"] = now
|
|
_write_payload(status)
|
|
return deepcopy(current)
|
|
|
|
|
|
def finish_pchome_backfill_run(
|
|
run_id: str,
|
|
*,
|
|
result: dict[str, Any] | None = None,
|
|
pick_result: dict[str, Any] | None = None,
|
|
message: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Mark a run completed and append it to recent history."""
|
|
|
|
return _finish_run(
|
|
run_id,
|
|
status_value="completed",
|
|
stage="completed",
|
|
message=message or "PChome 補抓完成",
|
|
result=result,
|
|
pick_result=pick_result,
|
|
error=None,
|
|
)
|
|
|
|
|
|
def fail_pchome_backfill_run(run_id: str, error: str) -> dict[str, Any]:
|
|
"""Mark a run failed and append it to recent history."""
|
|
|
|
return _finish_run(
|
|
run_id,
|
|
status_value="failed",
|
|
stage="failed",
|
|
message="PChome 補抓失敗",
|
|
result=None,
|
|
pick_result=None,
|
|
error=error,
|
|
)
|
|
|
|
|
|
def _finish_run(
|
|
run_id: str,
|
|
*,
|
|
status_value: str,
|
|
stage: str,
|
|
message: str,
|
|
result: dict[str, Any] | None,
|
|
pick_result: dict[str, Any] | None,
|
|
error: str | None,
|
|
) -> dict[str, Any]:
|
|
status = _read_payload()
|
|
current = _normalize_run(status.get("current_run")) or {}
|
|
now = _now_label()
|
|
if current.get("run_id") != run_id:
|
|
current = {"run_id": run_id, "started_at": now}
|
|
|
|
current.update(
|
|
{
|
|
"status": status_value,
|
|
"stage": stage,
|
|
"stage_label": STAGE_LABELS.get(stage, stage),
|
|
"progress_pct": _progress_for_stage(stage),
|
|
"running": False,
|
|
"updated_at": now,
|
|
"finished_at": now,
|
|
"message": message,
|
|
"last_error": error,
|
|
}
|
|
)
|
|
if result is not None:
|
|
current["result"] = result
|
|
if pick_result is not None:
|
|
current["pick_result"] = pick_result
|
|
|
|
recent_runs = [current]
|
|
for run in status.get("recent_runs", []):
|
|
normalized_run = _normalize_run(run)
|
|
if normalized_run and normalized_run.get("run_id") != run_id:
|
|
recent_runs.append(normalized_run)
|
|
status["current_run"] = current
|
|
status["recent_runs"] = recent_runs[:RECENT_RUN_LIMIT]
|
|
status["last_result"] = current.get("result")
|
|
status["last_error"] = error
|
|
status["updated_at"] = now
|
|
_write_payload(status)
|
|
return deepcopy(current)
|