Files
ewoooc/services/pchome_backfill_status.py
OoO b9b3a410ff
All checks were successful
CD Pipeline / deploy (push) Successful in 1m7s
V10.526 補過期 identity 搜尋救援入口
2026-06-01 00:51:05 +08:00

349 lines
10 KiB
Python

"""Persist lightweight PChome match backfill run status.
The PChome backfill endpoint runs in a background thread. A tiny JSON status
file gives operators progress, last result, and failure context without adding
new schema or blocking the dashboard request path.
"""
from __future__ import annotations
import json
import os
import time
import uuid
from copy import deepcopy
from datetime import datetime
from typing import Any
try:
from config import BASE_DIR
except Exception: # pragma: no cover - import fallback for isolated scripts
BASE_DIR = os.getcwd()
RECENT_RUN_LIMIT = 8
ACTIVE_TTL_SECONDS = int(os.getenv("PCHOME_BACKFILL_ACTIVE_TTL_SECONDS", "7200"))
STAGE_ORDER = (
"queued",
"refreshing_stale",
"recovering_stale",
"revalidating",
"matching",
"generating_picks",
"clearing_cache",
"completed",
)
STAGE_LABELS = {
"idle": "尚未執行",
"queued": "已排入背景補抓",
"refreshing_stale": "刷新過期 PChome 價格",
"recovering_stale": "搜尋救援過期 PChome identity",
"revalidating": "重新評分近門檻候選",
"matching": "比對高優先未配對商品",
"generating_picks": "重算 AI 挑品清單",
"clearing_cache": "清除看板與競價快取",
"completed": "產線完成",
"failed": "產線失敗",
"stale": "執行狀態逾時",
}
class PchomeBackfillAlreadyRunning(RuntimeError):
"""Raised when a fresh backfill run is already active."""
def __init__(self, status: dict[str, Any]):
super().__init__("PChome backfill is already running")
self.status = status
def _status_path() -> str:
return os.getenv(
"PCHOME_BACKFILL_STATUS_PATH",
os.path.join(
os.getenv("DATA_DIR", os.path.join(str(BASE_DIR), "data")),
"pchome_match_backfill_status.json",
),
)
def _now_label() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _run_id() -> str:
return f"pchome-backfill-{datetime.now().strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8]}"
def _read_payload() -> dict[str, Any]:
path = _status_path()
try:
if not os.path.isfile(path):
return _default_status()
with open(path, "r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
return _default_status()
return _normalize_status(payload)
except Exception:
return _default_status()
def _write_payload(payload: dict[str, Any]) -> None:
path = _status_path()
directory = os.path.dirname(path)
tmp_path = f"{path}.{os.getpid()}.tmp"
os.makedirs(directory, exist_ok=True)
with open(tmp_path, "w", encoding="utf-8") as handle:
json.dump(_normalize_status(payload), handle, ensure_ascii=False, indent=2, default=str)
os.replace(tmp_path, path)
def _default_status() -> dict[str, Any]:
return {
"status": "idle",
"stage": "idle",
"stage_label": STAGE_LABELS["idle"],
"progress_pct": 0,
"running": False,
"current_run": None,
"recent_runs": [],
"last_result": None,
"last_error": None,
"updated_at": None,
}
def _age_seconds(value: str | None) -> float | None:
if not value:
return None
try:
parsed = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
return max(0.0, time.time() - parsed.timestamp())
except Exception:
return None
def _progress_for_stage(stage: str) -> int:
if stage == "failed":
return 100
if stage not in STAGE_ORDER:
return 0
return int(round((STAGE_ORDER.index(stage) + 1) / len(STAGE_ORDER) * 100))
def _normalize_run(run: Any) -> dict[str, Any] | None:
if not isinstance(run, dict):
return None
normalized = dict(run)
stage = str(normalized.get("stage") or normalized.get("status") or "idle")
status = str(normalized.get("status") or "idle")
normalized["stage"] = stage
normalized["status"] = status
normalized["stage_label"] = STAGE_LABELS.get(stage, stage)
normalized["progress_pct"] = int(normalized.get("progress_pct") or _progress_for_stage(stage))
normalized.setdefault("result", None)
normalized.setdefault("pick_result", None)
normalized.setdefault("last_error", None)
normalized.setdefault("message", None)
return normalized
def _normalize_status(payload: dict[str, Any]) -> dict[str, Any]:
normalized = dict(_default_status())
normalized.update({key: value for key, value in payload.items() if key in normalized})
current = _normalize_run(payload.get("current_run"))
recent_runs = [
run for run in (_normalize_run(item) for item in payload.get("recent_runs", [])) if run
][:RECENT_RUN_LIMIT]
if current:
age = _age_seconds(current.get("updated_at") or current.get("started_at"))
is_fresh_running = current.get("status") == "running" and (
age is None or age <= ACTIVE_TTL_SECONDS
)
if current.get("status") == "running" and not is_fresh_running:
current["status"] = "stale"
current["stage"] = "stale"
current["stage_label"] = STAGE_LABELS["stale"]
current["running"] = False
current["progress_pct"] = 100
current["last_error"] = current.get("last_error") or "active run exceeded ttl"
else:
current["running"] = is_fresh_running
normalized["current_run"] = current
normalized["status"] = current["status"]
normalized["stage"] = current["stage"]
normalized["stage_label"] = current["stage_label"]
normalized["progress_pct"] = current["progress_pct"]
normalized["running"] = bool(current.get("running"))
normalized["last_result"] = current.get("result") or payload.get("last_result")
normalized["last_error"] = current.get("last_error") or payload.get("last_error")
normalized["recent_runs"] = recent_runs
normalized["updated_at"] = payload.get("updated_at") or (
current.get("updated_at") if current else None
)
return normalized
def get_pchome_backfill_status() -> dict[str, Any]:
"""Return the latest persisted PChome backfill status."""
return _read_payload()
def start_pchome_backfill_run(limit: int, operator: str | None = None) -> dict[str, Any]:
"""Create a fresh running status or raise when another run is active."""
status = _read_payload()
if status.get("running"):
raise PchomeBackfillAlreadyRunning(status)
now = _now_label()
run = {
"run_id": _run_id(),
"status": "running",
"stage": "queued",
"stage_label": STAGE_LABELS["queued"],
"progress_pct": _progress_for_stage("queued"),
"running": True,
"limit": int(limit),
"operator": operator or "web",
"started_at": now,
"updated_at": now,
"finished_at": None,
"message": "等待背景執行緒啟動",
"result": None,
"pick_result": None,
"last_error": None,
}
status["current_run"] = run
status["recent_runs"] = status.get("recent_runs", [])[:RECENT_RUN_LIMIT]
status["last_error"] = None
status["updated_at"] = now
_write_payload(status)
return deepcopy(run)
def update_pchome_backfill_run(
run_id: str,
*,
stage: str,
message: str | None = None,
result: dict[str, Any] | None = None,
pick_result: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Update an active run stage."""
status = _read_payload()
current = _normalize_run(status.get("current_run")) or {}
if current.get("run_id") != run_id:
current = {"run_id": run_id, "status": "running", "started_at": _now_label()}
now = _now_label()
current.update(
{
"status": "running",
"stage": stage,
"stage_label": STAGE_LABELS.get(stage, stage),
"progress_pct": _progress_for_stage(stage),
"running": True,
"updated_at": now,
}
)
if message is not None:
current["message"] = message
if result is not None:
current["result"] = result
if pick_result is not None:
current["pick_result"] = pick_result
status["current_run"] = current
status["updated_at"] = now
_write_payload(status)
return deepcopy(current)
def finish_pchome_backfill_run(
run_id: str,
*,
result: dict[str, Any] | None = None,
pick_result: dict[str, Any] | None = None,
message: str | None = None,
) -> dict[str, Any]:
"""Mark a run completed and append it to recent history."""
return _finish_run(
run_id,
status_value="completed",
stage="completed",
message=message or "PChome 補抓完成",
result=result,
pick_result=pick_result,
error=None,
)
def fail_pchome_backfill_run(run_id: str, error: str) -> dict[str, Any]:
"""Mark a run failed and append it to recent history."""
return _finish_run(
run_id,
status_value="failed",
stage="failed",
message="PChome 補抓失敗",
result=None,
pick_result=None,
error=error,
)
def _finish_run(
run_id: str,
*,
status_value: str,
stage: str,
message: str,
result: dict[str, Any] | None,
pick_result: dict[str, Any] | None,
error: str | None,
) -> dict[str, Any]:
status = _read_payload()
current = _normalize_run(status.get("current_run")) or {}
now = _now_label()
if current.get("run_id") != run_id:
current = {"run_id": run_id, "started_at": now}
current.update(
{
"status": status_value,
"stage": stage,
"stage_label": STAGE_LABELS.get(stage, stage),
"progress_pct": _progress_for_stage(stage),
"running": False,
"updated_at": now,
"finished_at": now,
"message": message,
"last_error": error,
}
)
if result is not None:
current["result"] = result
if pick_result is not None:
current["pick_result"] = pick_result
recent_runs = [current]
for run in status.get("recent_runs", []):
normalized_run = _normalize_run(run)
if normalized_run and normalized_run.get("run_id") != run_id:
recent_runs.append(normalized_run)
status["current_run"] = current
status["recent_runs"] = recent_runs[:RECENT_RUN_LIMIT]
status["last_result"] = current.get("result")
status["last_error"] = error
status["updated_at"] = now
_write_payload(status)
return deepcopy(current)