Files
OG T 8d7e34e6bd
Some checks failed
2026 World Cup Quant Platform - Production Deployment / Code Quality, Security Gate & Testing (push) Successful in 3m56s
2026 World Cup Quant Platform - Production Deployment / Deploy to Production VM via Gitea CD (push) Has been cancelled
fix: expose formal odds provider blockers
2026-06-19 00:44:11 +08:00

1063 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""正式賠率資料擷取 worker。
目標:
- 優先使用 The Odds API 的世界盃盤口。
- 沒有金鑰時使用 ESPN scoreboard 作為低階備援,不產生假盤。
- 寫入 PostgreSQL / TimescaleDB 的 odds_history。
- 將最新 ingestion 狀態寫入 Redis讓前端能顯示資料新鮮度。
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import uuid4
import httpx
from redis.asyncio import Redis
from sqlalchemy import update
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.future import select
from app.db.base import SessionFactory
from app.db.models import Bookmaker, Match, MatchStatus, OddsHistory, SmartMoneyFlow, Team, ValueBetRecommendation, Venue
logger = logging.getLogger('fifa2026-odds-worker')
logging.basicConfig(level=logging.INFO)
THE_ODDS_API_KEY = os.environ.get('THE_ODDS_API_KEY', '').strip()
THE_ODDS_SPORT_KEY = os.environ.get('THE_ODDS_SPORT_KEY', 'soccer_fifa_world_cup').strip()
THE_ODDS_BASE = os.environ.get('THE_ODDS_BASE', 'https://api.the-odds-api.com').rstrip('/')
THE_ODDS_REGIONS = os.environ.get('THE_ODDS_REGIONS', 'eu').strip()
THE_ODDS_DEFAULT_MARKETS = 'h2h,spreads,totals,btts,draw_no_bet,h2h_3_way,alternate_spreads,alternate_totals,team_totals,alternate_team_totals'
THE_ODDS_MARKETS = os.environ.get('THE_ODDS_MARKETS', THE_ODDS_DEFAULT_MARKETS).strip()
THE_ODDS_ADDITIONAL_EVENTS_LIMIT = max(0, int(os.environ.get('THE_ODDS_ADDITIONAL_EVENTS_LIMIT', '24')))
ODDS_POLL_INTERVAL_SECONDS = max(30, int(os.environ.get('ODDS_POLL_INTERVAL_SECONDS', '300')))
MATCH_RECONCILE_WINDOW_HOURS = max(1, int(os.environ.get('MATCH_RECONCILE_WINDOW_HOURS', '36')))
ESPN_SCOREBOARD_LOOKBACK_DAYS = max(0, int(os.environ.get('ESPN_SCOREBOARD_LOOKBACK_DAYS', '5')))
ESPN_SCOREBOARD_LOOKAHEAD_DAYS = max(0, int(os.environ.get('ESPN_SCOREBOARD_LOOKAHEAD_DAYS', '2')))
REDIS_URL = os.environ.get('REDIS_URL', 'redis://fifa2026-redis:6379/0')
ESPN_SCOREBOARD_URL = 'https://site.api.espn.com/apis/site/v2/sports/soccer/fifa.world/scoreboard'
TAIWAN_SPORTS_LOTTERY_ENABLED = os.environ.get('TAIWAN_SPORTS_LOTTERY_ENABLED', 'true').lower() not in {'0', 'false', 'no'}
TAIWAN_SPORTS_LOTTERY_WC_URL = os.environ.get(
'TAIWAN_SPORTS_LOTTERY_WC_URL',
'https://blob3rd.sportslottery.com.tw/apidata/Pre/WC-Games.zh.json',
)
THE_ODDS_PLACEHOLDER_KEYS = {'your_the_odds_api_key', 'changeme', 'placeholder', 'todo'}
FEATURED_MARKET_KEYS = {'h2h', 'spreads', 'totals', 'outrights', 'h2h_lay', 'outrights_lay'}
TEAM_ALIAS_MAP = {
'USA': 'United States',
'USMNT': 'United States',
'United States of America': 'United States',
'Korea Republic': 'South Korea',
'IR Iran': 'Iran',
'Czechia': 'Czech Republic',
'Bosnia & Herzegovina': 'Bosnia and Herzegovina',
'Bosnia-Herzegovina': 'Bosnia and Herzegovina',
'Curacao': 'Curaçao',
'阿根廷': 'Argentina',
'澳洲': 'Australia',
'奧地利': 'Austria',
'比利時': 'Belgium',
'維德角': 'Cape Verde',
'哥倫比亞': 'Colombia',
'象牙海岸': 'Côte dIvoire',
'克羅埃西亞': 'Croatia',
'古拉索': 'Curaçao',
'捷克': 'Czech Republic',
'民主剛果': 'DR Congo',
'厄瓜多': 'Ecuador',
'埃及': 'Egypt',
'英格蘭': 'England',
'法國': 'France',
'德國': 'Germany',
'迦納': 'Ghana',
'伊朗': 'Iran',
'伊拉克': 'Iraq',
'日本': 'Japan',
'約旦': 'Jordan',
'墨西哥': 'Mexico',
'荷蘭': 'Netherlands',
'紐西蘭': 'New Zealand',
'挪威': 'Norway',
'巴拿馬': 'Panama',
'葡萄牙': 'Portugal',
'沙烏地阿拉伯': 'Saudi Arabia',
'塞內加爾': 'Senegal',
'南非': 'South Africa',
'南韓': 'South Korea',
'西班牙': 'Spain',
'瑞典': 'Sweden',
'突尼西亞': 'Tunisia',
'土耳其': 'Türkiye',
'烏拉圭': 'Uruguay',
'烏茲別克': 'Uzbekistan',
}
MARKET_TYPE_MAP = {
'h2h': '1x2',
'h2h_3_way': '1x2',
'spreads': 'asian_handicap',
'alternate_spreads': 'asian_handicap',
'totals': 'ou',
'alternate_totals': 'ou',
'btts': 'btts',
'draw_no_bet': 'draw_no_bet',
'team_totals': 'team_total',
'alternate_team_totals': 'team_total',
'correct_score': 'correct_score',
}
DRAW_ALIASES = {'draw', 'tie', '平手', '和局', '平局'}
OVER_ALIASES = {'over', '', '大球'}
UNDER_ALIASES = {'under', '', '小球'}
YES_ALIASES = {'yes', ''}
NO_ALIASES = {'no', ''}
def _as_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _as_list(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def _market_keys(value: str) -> list[str]:
seen: set[str] = set()
keys: list[str] = []
for raw_key in value.split(','):
key = raw_key.strip()
if key and key not in seen:
keys.append(key)
seen.add(key)
return keys
def _featured_markets_param() -> str:
featured = [key for key in _market_keys(THE_ODDS_MARKETS) if key in FEATURED_MARKET_KEYS]
return ','.join(featured or ['h2h'])
def _additional_markets_param() -> str:
additional = [key for key in _market_keys(THE_ODDS_MARKETS) if key not in FEATURED_MARKET_KEYS]
return ','.join(additional)
def _canonical_api_base() -> str:
if THE_ODDS_BASE.endswith('/v4'):
return THE_ODDS_BASE
return f'{THE_ODDS_BASE}/v4'
def _slug(value: str, *, fallback: str | None = None, max_length: int = 64) -> str:
cleaned = re.sub(r'[^a-z0-9_\-]+', '-', value.lower()).strip('-')
return (cleaned or fallback or str(uuid4()))[:max_length]
def normalize_team_name(raw_name: str | None) -> str:
value = (raw_name or '').strip()
return TEAM_ALIAS_MAP.get(value, value or 'Unknown Team')
def _team_name_candidates(raw_name: str | None) -> list[str]:
canonical = normalize_team_name(raw_name)
candidates = {canonical}
raw = (raw_name or '').strip()
if raw:
candidates.add(raw)
for alias, target in TEAM_ALIAS_MAP.items():
if target == canonical:
candidates.add(alias)
return sorted(candidates)
def _parse_datetime(value: str | None) -> datetime:
if not value:
return datetime.now(timezone.utc)
try:
return datetime.fromisoformat(value.replace('Z', '+00:00'))
except ValueError:
return datetime.now(timezone.utc)
def _parse_score(value: Any) -> int | None:
if value in (None, ''):
return None
try:
return int(float(str(value)))
except (TypeError, ValueError):
return None
def _parse_match_status(event: dict[str, Any]) -> MatchStatus:
status_type = ((event.get('status') or {}).get('type') or {})
state = str(status_type.get('state') or '').lower()
completed = bool(status_type.get('completed'))
name = str(status_type.get('name') or '').lower()
if completed or state == 'post' or name in {'status_final', 'final'}:
return MatchStatus.FINISHED
if state == 'in':
return MatchStatus.IN_PLAY
return MatchStatus.PRE_MATCH
def _utc_aware(value: datetime) -> datetime:
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)
def _time_distance_seconds(left: datetime, right: datetime) -> float:
return abs((_utc_aware(left) - _utc_aware(right)).total_seconds())
def _pick_canonical_match(candidates: list[Match], incoming_match_id: str, kickoff: datetime) -> Match | None:
if not candidates:
return None
return sorted(
candidates,
key=lambda match: (
match.id == incoming_match_id,
match.venue_id == 'unknown-stadium',
_time_distance_seconds(match.match_time_utc, kickoff),
),
)[0]
async def _find_reconciled_match(
session: Any,
*,
incoming_match_id: str,
home_team: Team,
away_team: Team,
kickoff: datetime,
) -> tuple[Match | None, bool]:
"""用隊伍與開球時間找既有賽程,避免 ESPN id 把同一場建成新比賽。"""
window = timedelta(hours=MATCH_RECONCILE_WINDOW_HOURS)
window_start = kickoff - window
window_end = kickoff + window
exact_result = await session.execute(
select(Match).where(
Match.home_team_id == home_team.id,
Match.away_team_id == away_team.id,
Match.match_time_utc >= window_start,
Match.match_time_utc <= window_end,
)
)
exact_match = _pick_canonical_match(list(exact_result.scalars().all()), incoming_match_id, kickoff)
if exact_match:
return exact_match, False
swapped_result = await session.execute(
select(Match).where(
Match.home_team_id == away_team.id,
Match.away_team_id == home_team.id,
Match.match_time_utc >= window_start,
Match.match_time_utc <= window_end,
)
)
swapped_match = _pick_canonical_match(list(swapped_result.scalars().all()), incoming_match_id, kickoff)
if swapped_match:
return swapped_match, True
return None, False
async def _merge_duplicate_match(session: Any, *, source_match_id: str, target_match: Match) -> bool:
"""把已經被 ESPN id 建出的重複賽事併回原本賽程。"""
if source_match_id == target_match.id:
return False
duplicate = await session.get(Match, source_match_id)
if duplicate is None:
return False
await session.execute(update(OddsHistory).where(OddsHistory.match_id == duplicate.id).values(match_id=target_match.id))
await session.execute(update(SmartMoneyFlow).where(SmartMoneyFlow.match_id == duplicate.id).values(match_id=target_match.id))
await session.execute(
update(ValueBetRecommendation).where(ValueBetRecommendation.match_id == duplicate.id).values(match_id=target_match.id)
)
await session.delete(duplicate)
logger.info('已合併重複賽事source=%s target=%s', source_match_id, target_match.id)
return True
def _american_to_decimal(american: str | int | float | None) -> float | None:
if american in (None, '', '0'):
return None
try:
value = float(str(american).replace('+', ''))
except ValueError:
return None
if value > 0:
return round((value / 100.0) + 1.0, 4)
if value < 0:
return round((100.0 / abs(value)) + 1.0, 4)
return None
def _decimal_price(value: Any) -> float | None:
try:
price = float(value)
except (TypeError, ValueError):
return None
if price <= 1.0:
return None
return round(price, 4)
def _fractional_to_decimal(numerator: Any, denominator: Any) -> float | None:
try:
up = float(str(numerator))
down = float(str(denominator))
except (TypeError, ValueError):
return None
if down <= 0:
return None
return round(1.0 + (up / down), 4)
def _line_value(*values: Any) -> float | None:
for value in values:
if value in (None, ''):
continue
try:
return float(value)
except (TypeError, ValueError):
continue
return None
def _taiwan_market_key_and_point(market_name: str | None) -> tuple[str | None, float | None]:
name = (market_name or '').strip()
if name == '不讓分':
return 'h2h', None
total_match = re.search(r'\[總分\]大小\s*([0-9]+(?:\.[0-9]+)?)', name)
if total_match:
return 'totals', float(total_match.group(1))
if name == '正確比數':
return 'correct_score', None
return None, None
def _taiwan_selection_name(choice_name: str | None, home_team: str, away_team: str) -> str | None:
raw = (choice_name or '').strip()
if not raw:
return None
if raw == '和局':
return 'Draw'
if raw.startswith(''):
return 'Over'
if raw.startswith(''):
return 'Under'
if re.match(r'^\d+\s*:\s*\d+$', raw):
return raw.replace(' ', '')
normalized = normalize_team_name(raw)
if normalized in {home_team, away_team}:
return normalized
return raw
def _selection_for_outcome(
market_key: str,
outcome_name: str,
home_team: str,
away_team: str,
outcome_description: str | None = None,
) -> str | None:
raw = outcome_name.strip()
description = (outcome_description or '').strip()
lowered = raw.lower()
lowered_description = description.lower()
normalized = normalize_team_name(raw).lower()
normalized_description = normalize_team_name(description).lower()
home = normalize_team_name(home_team).lower()
away = normalize_team_name(away_team).lower()
if market_key in {'h2h', 'h2h_3_way'}:
if lowered in DRAW_ALIASES:
return 'draw'
if normalized == home:
return 'home'
if normalized == away:
return 'away'
return None
if market_key == 'draw_no_bet':
if normalized == home:
return 'home'
if normalized == away:
return 'away'
return None
if market_key in {'totals', 'alternate_totals'}:
if lowered in OVER_ALIASES:
return 'over'
if lowered in UNDER_ALIASES:
return 'under'
if market_key == 'btts':
if lowered in YES_ALIASES:
return 'yes'
if lowered in NO_ALIASES:
return 'no'
if market_key in {'spreads', 'alternate_spreads'}:
if normalized == home:
return 'home'
if normalized == away:
return 'away'
if market_key in {'team_totals', 'alternate_team_totals'}:
side = None
if normalized_description == home or normalized == home:
side = 'home'
elif normalized_description == away or normalized == away:
side = 'away'
elif description:
side = _slug(description, max_length=20)
total_side = None
if lowered in OVER_ALIASES:
total_side = 'over'
elif lowered in UNDER_ALIASES:
total_side = 'under'
elif 'over' in lowered_description:
total_side = 'over'
elif 'under' in lowered_description:
total_side = 'under'
if side and total_side:
return f'{side}_{total_side}'
return _slug(raw, max_length=30)
def _merge_event_markets(base_event: dict[str, Any], detail_event: dict[str, Any]) -> None:
if not isinstance(detail_event, dict):
return
bookmaker_map: dict[str, dict[str, Any]] = {}
for bookmaker in _as_list(base_event.get('bookmakers')):
if not isinstance(bookmaker, dict):
continue
key = str(bookmaker.get('key') or bookmaker.get('title') or '')
if key:
bookmaker_map[key] = bookmaker
for detail_bookmaker in _as_list(detail_event.get('bookmakers')):
if not isinstance(detail_bookmaker, dict):
continue
key = str(detail_bookmaker.get('key') or detail_bookmaker.get('title') or '')
if not key:
continue
existing = bookmaker_map.get(key)
if existing is None:
base_event.setdefault('bookmakers', []).append(detail_bookmaker)
bookmaker_map[key] = detail_bookmaker
continue
existing_markets = existing.setdefault('markets', [])
existing_market_keys = {
str(market.get('key') or '')
for market in _as_list(existing_markets)
if isinstance(market, dict)
}
for market in _as_list(detail_bookmaker.get('markets')):
if not isinstance(market, dict):
continue
market_key = str(market.get('key') or '')
if market_key not in existing_market_keys:
existing_markets.append(market)
existing_market_keys.add(market_key)
async def _request_json(client: httpx.AsyncClient, url: str, params: dict[str, Any] | None = None) -> Any:
delay = 1.0
for attempt in range(1, 6):
try:
response = await client.get(url, params=params, timeout=20.0)
if response.status_code == 429 or response.status_code >= 500:
if attempt == 5:
response.raise_for_status()
logger.warning('來源暫時限流或失敗 status=%s attempt=%s url=%s', response.status_code, attempt, url)
await asyncio.sleep(delay)
delay *= 2
continue
response.raise_for_status()
return response.json()
except (httpx.HTTPError, ValueError) as exc:
if attempt == 5:
raise RuntimeError(f'抓取來源失敗:{exc}') from exc
logger.warning('來源請求失敗 attempt=%s url=%s error=%s', attempt, url, exc)
await asyncio.sleep(delay)
delay *= 2
return None
async def fetch_the_odds_api(client: httpx.AsyncClient) -> list[dict[str, Any]]:
url = f'{_canonical_api_base()}/sports/{THE_ODDS_SPORT_KEY}/odds'
featured_markets = _featured_markets_param()
additional_markets = _additional_markets_param()
params = {
'apiKey': THE_ODDS_API_KEY,
'regions': THE_ODDS_REGIONS,
'markets': featured_markets,
'oddsFormat': 'decimal',
'dateFormat': 'iso',
}
logger.info('抓取 The Odds API featured marketssport=%s regions=%s markets=%s', THE_ODDS_SPORT_KEY, THE_ODDS_REGIONS, featured_markets)
payload = await _request_json(client, url, params)
events = payload if isinstance(payload, list) else []
if not additional_markets or not events:
return events
detail_events = events[:THE_ODDS_ADDITIONAL_EVENTS_LIMIT]
logger.info(
'抓取 The Odds API additional marketsevents=%s markets=%s',
len(detail_events),
additional_markets,
)
detail_results = await asyncio.gather(
*[
fetch_the_odds_event_markets(client, str(event.get('id')), additional_markets)
for event in detail_events
if isinstance(event, dict) and event.get('id')
],
return_exceptions=True,
)
by_event_id = {str(event.get('id')): event for event in events if isinstance(event, dict) and event.get('id')}
for detail_result in detail_results:
if isinstance(detail_result, Exception):
logger.warning('The Odds API additional market 抓取失敗:%s', detail_result)
continue
if not isinstance(detail_result, dict):
continue
event_id = str(detail_result.get('id') or '')
if event_id in by_event_id:
_merge_event_markets(by_event_id[event_id], detail_result)
return events
async def fetch_the_odds_event_markets(
client: httpx.AsyncClient,
event_id: str,
markets: str,
) -> dict[str, Any] | None:
url = f'{_canonical_api_base()}/sports/{THE_ODDS_SPORT_KEY}/events/{event_id}/odds'
params = {
'apiKey': THE_ODDS_API_KEY,
'regions': THE_ODDS_REGIONS,
'markets': markets,
'oddsFormat': 'decimal',
'dateFormat': 'iso',
}
payload = await _request_json(client, url, params)
return payload if isinstance(payload, dict) else None
async def fetch_espn_scoreboard(client: httpx.AsyncClient) -> list[dict[str, Any]]:
now = datetime.now(timezone.utc)
start = now - timedelta(days=ESPN_SCOREBOARD_LOOKBACK_DAYS)
end = now + timedelta(days=ESPN_SCOREBOARD_LOOKAHEAD_DAYS)
params = {'dates': f'{start:%Y%m%d}-{end:%Y%m%d}'}
logger.info('同步 ESPN scoreboard 比分備援。dates=%s', params['dates'])
payload = await _request_json(client, ESPN_SCOREBOARD_URL, params=params)
if not isinstance(payload, dict):
logger.warning('ESPN scoreboard 回傳格式不是物件,略過本輪比分同步。')
return []
events = _as_list(payload.get('events'))
parsed: list[dict[str, Any]] = []
for event in events:
if not isinstance(event, dict):
continue
competitions = [item for item in _as_list(event.get('competitions')) if isinstance(item, dict)]
competition = competitions[0] if competitions else {}
home_team = 'Unknown Team'
away_team = 'Unknown Team'
home_score: int | None = None
away_score: int | None = None
for competitor in _as_list(competition.get('competitors')):
if not isinstance(competitor, dict):
continue
team_payload = _as_dict(competitor.get('team'))
team_name = team_payload.get('name') or team_payload.get('displayName')
if competitor.get('homeAway') == 'home':
home_team = normalize_team_name(team_name)
home_score = _parse_score(competitor.get('score'))
elif competitor.get('homeAway') == 'away':
away_team = normalize_team_name(team_name)
away_score = _parse_score(competitor.get('score'))
bookmakers: list[dict[str, Any]] = []
odds_arr = _as_list(competition.get('odds'))
if odds_arr:
primary = _as_dict(odds_arr[0])
provider_name = _as_dict(primary.get('provider')).get('name', 'ESPN Odds')
moneyline = _as_dict(primary.get('moneyline'))
outcomes: list[dict[str, Any]] = []
for key, name in (('home', home_team), ('draw', 'Draw'), ('away', away_team)):
raw = _as_dict(moneyline.get(key))
price = _american_to_decimal(_as_dict(raw.get('close')).get('odds') or _as_dict(raw.get('open')).get('odds'))
if price:
outcomes.append({'name': name, 'price': price})
if outcomes:
bookmakers.append(
{
'key': _slug(provider_name, fallback='espn'),
'title': provider_name,
'markets': [{'key': 'h2h', 'outcomes': outcomes}],
}
)
parsed.append(
{
'id': event.get('id') or _slug(f'{home_team}-{away_team}-{event.get("date", "")}', max_length=64),
'home_team': home_team,
'away_team': away_team,
'home_score': home_score,
'away_score': away_score,
'status': _parse_match_status(event),
'commence_time': event.get('date'),
'bookmakers': bookmakers,
}
)
return parsed
async def fetch_taiwan_sports_lottery_reference(client: httpx.AsyncClient) -> list[dict[str, Any]]:
"""抓取台灣運彩公開世界盃盤口,作為台灣盤參考來源。
此來源目前只作為單一參考盤與最低可接受賠率比對,不直接等同多莊家正式 odds provider。
"""
if not TAIWAN_SPORTS_LOTTERY_ENABLED:
return []
payload = await _request_json(client, TAIWAN_SPORTS_LOTTERY_WC_URL)
games = payload if isinstance(payload, list) else []
parsed: list[dict[str, Any]] = []
for game in games:
if not isinstance(game, dict):
continue
home_team = normalize_team_name(game.get('hn'))
away_team = normalize_team_name(game.get('an'))
if home_team == 'Unknown Team' or away_team == 'Unknown Team':
continue
markets: list[dict[str, Any]] = []
for market in _as_list(game.get('ms')):
if not isinstance(market, dict):
continue
market_key, point = _taiwan_market_key_and_point(str(market.get('name') or ''))
if not market_key:
continue
outcomes: list[dict[str, Any]] = []
for choice in _as_list(market.get('cs')):
if not isinstance(choice, dict):
continue
price = _fractional_to_decimal(choice.get('pu'), choice.get('pd'))
if price is None:
continue
selection_name = _taiwan_selection_name(str(choice.get('name') or ''), home_team, away_team)
if not selection_name:
continue
outcome: dict[str, Any] = {
'name': selection_name,
'price': price,
}
if point is not None:
outcome['point'] = point
outcomes.append(outcome)
if outcomes:
markets.append(
{
'key': market_key,
'outcomes': outcomes,
}
)
if not markets:
continue
fallback_event_id = _slug(f'{home_team}-{away_team}-{game.get("kt", "")}')
parsed.append(
{
'id': f'tsl-{game.get("id") or fallback_event_id}',
'home_team': home_team,
'away_team': away_team,
'status': MatchStatus.PRE_MATCH,
'commence_time': game.get('kt'),
'bookmakers': [
{
'key': 'taiwan-sports-lottery-reference',
'title': '台灣運彩參考盤',
'markets': markets,
}
],
}
)
logger.info('台灣運彩世界盃參考盤抓取完成events=%s', len(parsed))
return parsed
def _primary_provider_status(
*,
status: str | None = None,
message: str | None = None,
event_count: int | None = None,
) -> dict[str, Any]:
key_is_configured = bool(THE_ODDS_API_KEY)
key_is_placeholder = THE_ODDS_API_KEY.lower() in THE_ODDS_PLACEHOLDER_KEYS
if status is None:
if not key_is_configured:
status = 'missing_key'
message = '正式 The Odds API key 尚未設定。'
elif key_is_placeholder:
status = 'placeholder_key'
message = '正式 The Odds API key 仍是 placeholder未接入可用 provider。'
else:
status = 'configured'
message = '正式 The Odds API key 已設定,等待抓取結果確認。'
payload: dict[str, Any] = {
'provider': 'the-odds-api',
'status': status,
'api_key_configured': key_is_configured,
'api_key_placeholder': key_is_placeholder,
'sport_key': THE_ODDS_SPORT_KEY,
'regions': THE_ODDS_REGIONS,
'markets': _market_keys(THE_ODDS_MARKETS),
'message': message,
}
if event_count is not None:
payload['event_count'] = event_count
return payload
async def fetch_source_payload() -> tuple[str, list[dict[str, Any]], dict[str, Any]]:
async with httpx.AsyncClient(
headers={
'User-Agent': 'Mozilla/5.0 FIFA2026QuantBot/1.0',
'Accept': 'application/json,text/plain,*/*',
'Referer': 'https://www.sportslottery.com.tw/',
}
) as client:
primary_status = _primary_provider_status()
if primary_status['api_key_configured'] and not primary_status['api_key_placeholder']:
try:
the_odds_events = await fetch_the_odds_api(client)
if the_odds_events:
return 'the-odds-api', the_odds_events, _primary_provider_status(
status='ok',
message='正式 The Odds API 已回傳可用賽事與盤口。',
event_count=len(the_odds_events),
)
primary_status = _primary_provider_status(
status='empty_events',
message='正式 The Odds API 已可呼叫,但目前沒有回傳可用世界盃盤口;暫時回退台灣參考盤。',
event_count=0,
)
except Exception as exc:
primary_status = _primary_provider_status(
status='error',
message=f'正式 The Odds API 抓取失敗,已回退台灣參考盤:{str(exc)[:220]}',
event_count=0,
)
logger.warning('The Odds API 抓取失敗,回退台灣運彩參考盤:%s', exc)
else:
logger.warning('The Odds API 未可用:%s', primary_status.get('message'))
try:
taiwan_reference_events = await fetch_taiwan_sports_lottery_reference(client)
except Exception as exc:
logger.warning('台灣運彩參考盤抓取失敗,本輪僅使用 ESPN 備援:%s', exc)
taiwan_reference_events = []
if taiwan_reference_events:
return 'taiwan-sports-lottery-reference', taiwan_reference_events, primary_status
return 'espn-scoreboard', await fetch_espn_scoreboard(client), primary_status
async def process_odds_data(data: list[dict[str, Any]]) -> dict[str, int]:
if not data:
logger.info('本輪來源沒有可處理賽事。')
return {'events': 0, 'odds_rows': 0, 'bookmakers': 0}
event_count = 0
odds_count = 0
reconciled_count = 0
deduped_count = 0
bookmaker_ids: set[str] = set()
async with SessionFactory() as session:
try:
for event in data:
if not isinstance(event, dict):
continue
match_id = str(event.get('id') or '').strip()
if not match_id:
continue
home_team_name = normalize_team_name(event.get('home_team'))
away_team_name = normalize_team_name(event.get('away_team'))
kickoff = _parse_datetime(event.get('commence_time'))
home_team = await get_or_create_team(session, home_team_name)
away_team = await get_or_create_team(session, away_team_name)
venue = await session.get(Venue, 'unknown-stadium')
if venue is None:
venue = await get_or_create_venue(session, '待確認場館', '待確認', '待確認')
reconciled_match, is_swapped_fixture = await _find_reconciled_match(
session,
incoming_match_id=match_id,
home_team=home_team,
away_team=away_team,
kickoff=kickoff,
)
if reconciled_match and reconciled_match.id != match_id:
reconciled_count += 1
if await _merge_duplicate_match(session, source_match_id=match_id, target_match=reconciled_match):
deduped_count += 1
match = reconciled_match or await session.get(Match, match_id)
event_status = event.get('status')
has_result_payload = isinstance(event_status, MatchStatus) or 'home_score' in event or 'away_score' in event
parsed_status = event_status if isinstance(event_status, MatchStatus) else MatchStatus.PRE_MATCH
home_score = _parse_score(event.get('home_score'))
away_score = _parse_score(event.get('away_score'))
if not match:
match = Match(
id=match_id,
home_team_id=home_team.id,
away_team_id=away_team.id,
venue_id=venue.id,
match_time_utc=kickoff,
status=parsed_status,
home_score=home_score,
away_score=away_score,
result_synced_at=datetime.now(timezone.utc),
)
session.add(match)
else:
if not is_swapped_fixture:
match.home_team_id = home_team.id
match.away_team_id = away_team.id
if venue.id != 'unknown-stadium' or match.venue_id == 'unknown-stadium':
match.venue_id = venue.id
match.match_time_utc = kickoff
if has_result_payload:
match.status = parsed_status
match.home_score = away_score if is_swapped_fixture else home_score
match.away_score = home_score if is_swapped_fixture else away_score
match.result_synced_at = datetime.now(timezone.utc)
event_count += 1
recorded_at = datetime.now(timezone.utc)
for bookmaker_payload in _as_list(event.get('bookmakers')):
if not isinstance(bookmaker_payload, dict):
continue
bookmaker_key = _slug(str(bookmaker_payload.get('key') or bookmaker_payload.get('title') or 'unknown'))
bookmaker_name = str(bookmaker_payload.get('title') or bookmaker_key)
bookmaker = await get_or_create_bookmaker(session, bookmaker_key, bookmaker_name)
bookmaker_ids.add(bookmaker.id)
for market in _as_list(bookmaker_payload.get('markets')):
if not isinstance(market, dict):
continue
source_market = str(market.get('key') or '').strip()
market_type = MARKET_TYPE_MAP.get(source_market, source_market or 'unknown')
for outcome in _as_list(market.get('outcomes')):
if not isinstance(outcome, dict):
continue
price = _decimal_price(outcome.get('price'))
if not price:
continue
point = _line_value(outcome.get('point'), market.get('point'))
market_line = point if source_market in {'totals', 'alternate_totals', 'team_totals', 'alternate_team_totals'} else None
handicap = point if source_market in {'spreads', 'alternate_spreads'} else None
source_outcome_name = str(outcome.get('name') or '')[:140]
source_outcome_description = str(outcome.get('description') or '')[:140]
selection = _selection_for_outcome(
source_market,
source_outcome_name,
home_team_name,
away_team_name,
source_outcome_description,
)
if not selection:
continue
session.add(
OddsHistory(
match_id=match.id,
bookmaker_id=bookmaker.id,
market_type=market_type,
selection=selection,
market_line=market_line,
handicap=handicap,
decimal_odds=price,
implied_probability=1.0 / price,
source_market_key=source_market[:80],
source_outcome_name=source_outcome_name,
recorded_at=recorded_at,
)
)
odds_count += 1
await session.commit()
logger.info(
'本輪賠率寫入完成events=%s odds_rows=%s bookmakers=%s reconciled=%s deduped=%s',
event_count,
odds_count,
len(bookmaker_ids),
reconciled_count,
deduped_count,
)
return {
'events': event_count,
'odds_rows': odds_count,
'bookmakers': len(bookmaker_ids),
'reconciled_matches': reconciled_count,
'deduped_matches': deduped_count,
}
except SQLAlchemyError as exc:
await session.rollback()
logger.exception('資料庫寫入賠率失敗:%s', exc)
raise
async def get_or_create_team(session: Any, name: str) -> Team:
canonical_name = normalize_team_name(name)
result = await session.execute(select(Team).where(Team.name.in_(_team_name_candidates(name))))
teams = list(result.scalars().all())
for team in teams:
if normalize_team_name(team.name) == canonical_name:
return team
if teams:
return teams[0]
team = Team(id=str(uuid4()), name=canonical_name)
session.add(team)
await session.flush()
return team
async def get_or_create_venue(session: Any, name: str, city: str, country: str) -> Venue:
result = await session.execute(select(Venue).where(Venue.name == name))
venue = result.scalars().first()
if not venue:
venue = Venue(id=str(uuid4()), name=name, city=city, country=country, timezone='UTC')
session.add(venue)
await session.flush()
return venue
async def get_or_create_bookmaker(session: Any, bookmaker_id: str, name: str) -> Bookmaker:
bookmaker = await session.get(Bookmaker, bookmaker_id)
if not bookmaker:
bookmaker = Bookmaker(id=bookmaker_id, name=name[:120])
session.add(bookmaker)
await session.flush()
return bookmaker
async def publish_status(status: dict[str, Any]) -> None:
try:
redis = Redis.from_url(REDIS_URL, decode_responses=True)
await redis.set(
'ingestion:odds:last_run',
json.dumps(status, ensure_ascii=False),
ex=max(ODDS_POLL_INTERVAL_SECONDS * 3, 900),
)
await redis.aclose()
except Exception as exc: # pragma: no cover - Redis 狀態不應阻斷 DB ingestion
logger.warning('寫入 Redis ingestion 狀態失敗:%s', exc)
async def run_once() -> dict[str, Any]:
source, payload, primary_provider = await fetch_source_payload()
stats = await process_odds_data(payload)
if source != 'espn-scoreboard':
try:
async with httpx.AsyncClient() as client:
espn_payload = await fetch_espn_scoreboard(client)
score_stats = await process_odds_data(espn_payload)
stats = {
'events': stats.get('events', 0) + score_stats.get('events', 0),
'odds_rows': stats.get('odds_rows', 0) + score_stats.get('odds_rows', 0),
'bookmakers': stats.get('bookmakers', 0) + score_stats.get('bookmakers', 0),
'reconciled_matches': stats.get('reconciled_matches', 0) + score_stats.get('reconciled_matches', 0),
'deduped_matches': stats.get('deduped_matches', 0) + score_stats.get('deduped_matches', 0),
'score_events': score_stats.get('events', 0),
'score_status': 'ok',
}
except Exception as exc:
logger.warning('比分來源同步失敗,但不阻斷賠率 ingestion%s', exc)
stats = {
**stats,
'score_events': 0,
'score_status': 'error',
'score_message': str(exc),
}
status = {
'status': 'ok',
'source': source,
'run_at': datetime.now(timezone.utc).isoformat(),
'primary_provider': primary_provider,
**stats,
}
await publish_status(status)
return status
async def run_forever() -> None:
logger.info('啟動賠率 ingestion workerinterval=%ss', ODDS_POLL_INTERVAL_SECONDS)
while True:
try:
status = await run_once()
logger.info('ingestion 狀態:%s', status)
except Exception as exc:
error_status = {
'status': 'error',
'source': 'unknown',
'run_at': datetime.now(timezone.utc).isoformat(),
'primary_provider': _primary_provider_status(status='unknown', message='本輪 ingestion 失敗,未能確認正式 provider 狀態。'),
'message': str(exc),
}
await publish_status(error_status)
logger.exception('本輪賠率 ingestion 失敗:%s', exc)
await asyncio.sleep(ODDS_POLL_INTERVAL_SECONDS)
if __name__ == '__main__':
if os.environ.get('ODDS_WORKER_ONCE') == 'true':
print(asyncio.run(run_once()))
else:
asyncio.run(run_forever())