Files
2026FIFAWorldCup/platform/backend/app/analytics/fixtures_worker.py
wooo 489baecbe8
Some checks failed
2026 World Cup Quant Platform - Production Deployment / Code Quality, Security Gate & Testing (push) Failing after 2m44s
2026 World Cup Quant Platform - Production Deployment / Deploy to Production VM via Gitea CD (push) Has been skipped
fix: restore backend runtime entrypoints
2026-06-18 13:00:28 +08:00

56 lines
1.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Fixtures ingestion worker placeholder."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from typing import Any
from redis.asyncio import Redis
LOGGER = logging.getLogger("fifa2026-fixtures-worker")
logging.basicConfig(level=logging.INFO)
REDIS_URL = os.getenv("REDIS_URL", "redis://fifa2026-redis:6379/0")
INTERVAL_SECONDS = max(300, int(os.getenv("FIXTURES_POLL_INTERVAL_SECONDS", "21600")))
STATUS_KEY = "ingestion:fixtures:last_run"
async def publish_status(payload: dict[str, Any]) -> None:
redis = Redis.from_url(REDIS_URL, decode_responses=True)
try:
await redis.set(STATUS_KEY, json.dumps(payload, ensure_ascii=False), ex=max(INTERVAL_SECONDS * 3, 900))
finally:
await redis.aclose()
async def run_once() -> dict[str, Any]:
payload = {
"status": "standby",
"worker": "fixtures_worker",
"run_at": datetime.now(timezone.utc).isoformat(),
"message": "賽程同步器已啟動;正式來源未驗證前不覆寫既有賽程。",
}
await publish_status(payload)
LOGGER.info("%s", payload)
return payload
async def run_forever() -> None:
LOGGER.info("啟動 fixtures_workerinterval=%ss", INTERVAL_SECONDS)
while True:
try:
await run_once()
except Exception as exc: # pragma: no cover - worker loop must survive transient Redis errors
LOGGER.exception("fixtures_worker 本輪狀態寫入失敗:%s", exc)
await asyncio.sleep(INTERVAL_SECONDS)
if __name__ == "__main__":
if os.getenv("WORKER_ONCE") == "true":
print(asyncio.run(run_once()))
else:
asyncio.run(run_forever())