"""正式賠率資料擷取 worker。 目標: - 優先使用 The Odds API 的世界盃盤口。 - 沒有金鑰時使用 ESPN scoreboard 作為低階備援,不產生假盤。 - 寫入 PostgreSQL / TimescaleDB 的 odds_history。 - 將最新 ingestion 狀態寫入 Redis,讓前端能顯示資料新鮮度。 """ from __future__ import annotations import asyncio import json import logging import os import re from datetime import datetime, timedelta, timezone from typing import Any from uuid import uuid4 import httpx from redis.asyncio import Redis from sqlalchemy import update from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.future import select from app.db.base import SessionFactory from app.db.models import Bookmaker, Match, MatchStatus, OddsHistory, SmartMoneyFlow, Team, ValueBetRecommendation, Venue logger = logging.getLogger('fifa2026-odds-worker') logging.basicConfig(level=logging.INFO) THE_ODDS_API_KEY = os.environ.get('THE_ODDS_API_KEY', '').strip() THE_ODDS_SPORT_KEY = os.environ.get('THE_ODDS_SPORT_KEY', 'soccer_fifa_world_cup').strip() THE_ODDS_BASE = os.environ.get('THE_ODDS_BASE', 'https://api.the-odds-api.com').rstrip('/') THE_ODDS_REGIONS = os.environ.get('THE_ODDS_REGIONS', 'eu').strip() THE_ODDS_DEFAULT_MARKETS = 'h2h,spreads,totals,btts,draw_no_bet,h2h_3_way,alternate_spreads,alternate_totals,team_totals,alternate_team_totals' THE_ODDS_MARKETS = os.environ.get('THE_ODDS_MARKETS', THE_ODDS_DEFAULT_MARKETS).strip() THE_ODDS_ADDITIONAL_EVENTS_LIMIT = max(0, int(os.environ.get('THE_ODDS_ADDITIONAL_EVENTS_LIMIT', '24'))) ODDS_POLL_INTERVAL_SECONDS = max(30, int(os.environ.get('ODDS_POLL_INTERVAL_SECONDS', '300'))) MATCH_RECONCILE_WINDOW_HOURS = max(1, int(os.environ.get('MATCH_RECONCILE_WINDOW_HOURS', '36'))) ESPN_SCOREBOARD_LOOKBACK_DAYS = max(0, int(os.environ.get('ESPN_SCOREBOARD_LOOKBACK_DAYS', '5'))) ESPN_SCOREBOARD_LOOKAHEAD_DAYS = max(0, int(os.environ.get('ESPN_SCOREBOARD_LOOKAHEAD_DAYS', '2'))) REDIS_URL = os.environ.get('REDIS_URL', 'redis://fifa2026-redis:6379/0') ESPN_SCOREBOARD_URL = 'https://site.api.espn.com/apis/site/v2/sports/soccer/fifa.world/scoreboard' TAIWAN_SPORTS_LOTTERY_ENABLED = os.environ.get('TAIWAN_SPORTS_LOTTERY_ENABLED', 'true').lower() not in {'0', 'false', 'no'} TAIWAN_SPORTS_LOTTERY_WC_URL = os.environ.get( 'TAIWAN_SPORTS_LOTTERY_WC_URL', 'https://blob3rd.sportslottery.com.tw/apidata/Pre/WC-Games.zh.json', ) FEATURED_MARKET_KEYS = {'h2h', 'spreads', 'totals', 'outrights', 'h2h_lay', 'outrights_lay'} TEAM_ALIAS_MAP = { 'USA': 'United States', 'USMNT': 'United States', 'United States of America': 'United States', 'Korea Republic': 'South Korea', 'IR Iran': 'Iran', 'Czechia': 'Czech Republic', 'Bosnia & Herzegovina': 'Bosnia and Herzegovina', 'Bosnia-Herzegovina': 'Bosnia and Herzegovina', 'Curacao': 'Curaçao', '阿根廷': 'Argentina', '澳洲': 'Australia', '奧地利': 'Austria', '比利時': 'Belgium', '維德角': 'Cape Verde', '哥倫比亞': 'Colombia', '象牙海岸': 'Côte d’Ivoire', '克羅埃西亞': 'Croatia', '古拉索': 'Curaçao', '捷克': 'Czech Republic', '民主剛果': 'DR Congo', '厄瓜多': 'Ecuador', '埃及': 'Egypt', '英格蘭': 'England', '法國': 'France', '德國': 'Germany', '迦納': 'Ghana', '伊朗': 'Iran', '伊拉克': 'Iraq', '日本': 'Japan', '約旦': 'Jordan', '墨西哥': 'Mexico', '荷蘭': 'Netherlands', '紐西蘭': 'New Zealand', '挪威': 'Norway', '巴拿馬': 'Panama', '葡萄牙': 'Portugal', '沙烏地阿拉伯': 'Saudi Arabia', '塞內加爾': 'Senegal', '南非': 'South Africa', '南韓': 'South Korea', '西班牙': 'Spain', '瑞典': 'Sweden', '突尼西亞': 'Tunisia', '土耳其': 'Türkiye', '烏拉圭': 'Uruguay', '烏茲別克': 'Uzbekistan', } MARKET_TYPE_MAP = { 'h2h': '1x2', 'h2h_3_way': '1x2', 'spreads': 'asian_handicap', 'alternate_spreads': 'asian_handicap', 'totals': 'ou', 'alternate_totals': 'ou', 'btts': 'btts', 'draw_no_bet': 'draw_no_bet', 'team_totals': 'team_total', 'alternate_team_totals': 'team_total', 'correct_score': 'correct_score', } DRAW_ALIASES = {'draw', 'tie', '平手', '和局', '平局'} OVER_ALIASES = {'over', '大', '大球'} UNDER_ALIASES = {'under', '小', '小球'} YES_ALIASES = {'yes', '是'} NO_ALIASES = {'no', '否'} def _as_dict(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} def _as_list(value: Any) -> list[Any]: return value if isinstance(value, list) else [] def _market_keys(value: str) -> list[str]: seen: set[str] = set() keys: list[str] = [] for raw_key in value.split(','): key = raw_key.strip() if key and key not in seen: keys.append(key) seen.add(key) return keys def _featured_markets_param() -> str: featured = [key for key in _market_keys(THE_ODDS_MARKETS) if key in FEATURED_MARKET_KEYS] return ','.join(featured or ['h2h']) def _additional_markets_param() -> str: additional = [key for key in _market_keys(THE_ODDS_MARKETS) if key not in FEATURED_MARKET_KEYS] return ','.join(additional) def _canonical_api_base() -> str: if THE_ODDS_BASE.endswith('/v4'): return THE_ODDS_BASE return f'{THE_ODDS_BASE}/v4' def _slug(value: str, *, fallback: str | None = None, max_length: int = 64) -> str: cleaned = re.sub(r'[^a-z0-9_\-]+', '-', value.lower()).strip('-') return (cleaned or fallback or str(uuid4()))[:max_length] def normalize_team_name(raw_name: str | None) -> str: value = (raw_name or '').strip() return TEAM_ALIAS_MAP.get(value, value or 'Unknown Team') def _team_name_candidates(raw_name: str | None) -> list[str]: canonical = normalize_team_name(raw_name) candidates = {canonical} raw = (raw_name or '').strip() if raw: candidates.add(raw) for alias, target in TEAM_ALIAS_MAP.items(): if target == canonical: candidates.add(alias) return sorted(candidates) def _parse_datetime(value: str | None) -> datetime: if not value: return datetime.now(timezone.utc) try: return datetime.fromisoformat(value.replace('Z', '+00:00')) except ValueError: return datetime.now(timezone.utc) def _parse_score(value: Any) -> int | None: if value in (None, ''): return None try: return int(float(str(value))) except (TypeError, ValueError): return None def _parse_match_status(event: dict[str, Any]) -> MatchStatus: status_type = ((event.get('status') or {}).get('type') or {}) state = str(status_type.get('state') or '').lower() completed = bool(status_type.get('completed')) name = str(status_type.get('name') or '').lower() if completed or state == 'post' or name in {'status_final', 'final'}: return MatchStatus.FINISHED if state == 'in': return MatchStatus.IN_PLAY return MatchStatus.PRE_MATCH def _utc_aware(value: datetime) -> datetime: if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc) def _time_distance_seconds(left: datetime, right: datetime) -> float: return abs((_utc_aware(left) - _utc_aware(right)).total_seconds()) def _pick_canonical_match(candidates: list[Match], incoming_match_id: str, kickoff: datetime) -> Match | None: if not candidates: return None return sorted( candidates, key=lambda match: ( match.id == incoming_match_id, match.venue_id == 'unknown-stadium', _time_distance_seconds(match.match_time_utc, kickoff), ), )[0] async def _find_reconciled_match( session: Any, *, incoming_match_id: str, home_team: Team, away_team: Team, kickoff: datetime, ) -> tuple[Match | None, bool]: """用隊伍與開球時間找既有賽程,避免 ESPN id 把同一場建成新比賽。""" window = timedelta(hours=MATCH_RECONCILE_WINDOW_HOURS) window_start = kickoff - window window_end = kickoff + window exact_result = await session.execute( select(Match).where( Match.home_team_id == home_team.id, Match.away_team_id == away_team.id, Match.match_time_utc >= window_start, Match.match_time_utc <= window_end, ) ) exact_match = _pick_canonical_match(list(exact_result.scalars().all()), incoming_match_id, kickoff) if exact_match: return exact_match, False swapped_result = await session.execute( select(Match).where( Match.home_team_id == away_team.id, Match.away_team_id == home_team.id, Match.match_time_utc >= window_start, Match.match_time_utc <= window_end, ) ) swapped_match = _pick_canonical_match(list(swapped_result.scalars().all()), incoming_match_id, kickoff) if swapped_match: return swapped_match, True return None, False async def _merge_duplicate_match(session: Any, *, source_match_id: str, target_match: Match) -> bool: """把已經被 ESPN id 建出的重複賽事併回原本賽程。""" if source_match_id == target_match.id: return False duplicate = await session.get(Match, source_match_id) if duplicate is None: return False await session.execute(update(OddsHistory).where(OddsHistory.match_id == duplicate.id).values(match_id=target_match.id)) await session.execute(update(SmartMoneyFlow).where(SmartMoneyFlow.match_id == duplicate.id).values(match_id=target_match.id)) await session.execute( update(ValueBetRecommendation).where(ValueBetRecommendation.match_id == duplicate.id).values(match_id=target_match.id) ) await session.delete(duplicate) logger.info('已合併重複賽事:source=%s target=%s', source_match_id, target_match.id) return True def _american_to_decimal(american: str | int | float | None) -> float | None: if american in (None, '', '0'): return None try: value = float(str(american).replace('+', '')) except ValueError: return None if value > 0: return round((value / 100.0) + 1.0, 4) if value < 0: return round((100.0 / abs(value)) + 1.0, 4) return None def _decimal_price(value: Any) -> float | None: try: price = float(value) except (TypeError, ValueError): return None if price <= 1.0: return None return round(price, 4) def _fractional_to_decimal(numerator: Any, denominator: Any) -> float | None: try: up = float(str(numerator)) down = float(str(denominator)) except (TypeError, ValueError): return None if down <= 0: return None return round(1.0 + (up / down), 4) def _line_value(*values: Any) -> float | None: for value in values: if value in (None, ''): continue try: return float(value) except (TypeError, ValueError): continue return None def _taiwan_market_key_and_point(market_name: str | None) -> tuple[str | None, float | None]: name = (market_name or '').strip() if name == '不讓分': return 'h2h', None total_match = re.search(r'\[總分\]大小\s*([0-9]+(?:\.[0-9]+)?)', name) if total_match: return 'totals', float(total_match.group(1)) if name == '正確比數': return 'correct_score', None return None, None def _taiwan_selection_name(choice_name: str | None, home_team: str, away_team: str) -> str | None: raw = (choice_name or '').strip() if not raw: return None if raw == '和局': return 'Draw' if raw.startswith('大 '): return 'Over' if raw.startswith('小 '): return 'Under' if re.match(r'^\d+\s*:\s*\d+$', raw): return raw.replace(' ', '') normalized = normalize_team_name(raw) if normalized in {home_team, away_team}: return normalized return raw def _selection_for_outcome( market_key: str, outcome_name: str, home_team: str, away_team: str, outcome_description: str | None = None, ) -> str | None: raw = outcome_name.strip() description = (outcome_description or '').strip() lowered = raw.lower() lowered_description = description.lower() normalized = normalize_team_name(raw).lower() normalized_description = normalize_team_name(description).lower() home = normalize_team_name(home_team).lower() away = normalize_team_name(away_team).lower() if market_key in {'h2h', 'h2h_3_way'}: if lowered in DRAW_ALIASES: return 'draw' if normalized == home: return 'home' if normalized == away: return 'away' return None if market_key == 'draw_no_bet': if normalized == home: return 'home' if normalized == away: return 'away' return None if market_key in {'totals', 'alternate_totals'}: if lowered in OVER_ALIASES: return 'over' if lowered in UNDER_ALIASES: return 'under' if market_key == 'btts': if lowered in YES_ALIASES: return 'yes' if lowered in NO_ALIASES: return 'no' if market_key in {'spreads', 'alternate_spreads'}: if normalized == home: return 'home' if normalized == away: return 'away' if market_key in {'team_totals', 'alternate_team_totals'}: side = None if normalized_description == home or normalized == home: side = 'home' elif normalized_description == away or normalized == away: side = 'away' elif description: side = _slug(description, max_length=20) total_side = None if lowered in OVER_ALIASES: total_side = 'over' elif lowered in UNDER_ALIASES: total_side = 'under' elif 'over' in lowered_description: total_side = 'over' elif 'under' in lowered_description: total_side = 'under' if side and total_side: return f'{side}_{total_side}' return _slug(raw, max_length=30) def _merge_event_markets(base_event: dict[str, Any], detail_event: dict[str, Any]) -> None: if not isinstance(detail_event, dict): return bookmaker_map: dict[str, dict[str, Any]] = {} for bookmaker in _as_list(base_event.get('bookmakers')): if not isinstance(bookmaker, dict): continue key = str(bookmaker.get('key') or bookmaker.get('title') or '') if key: bookmaker_map[key] = bookmaker for detail_bookmaker in _as_list(detail_event.get('bookmakers')): if not isinstance(detail_bookmaker, dict): continue key = str(detail_bookmaker.get('key') or detail_bookmaker.get('title') or '') if not key: continue existing = bookmaker_map.get(key) if existing is None: base_event.setdefault('bookmakers', []).append(detail_bookmaker) bookmaker_map[key] = detail_bookmaker continue existing_markets = existing.setdefault('markets', []) existing_market_keys = { str(market.get('key') or '') for market in _as_list(existing_markets) if isinstance(market, dict) } for market in _as_list(detail_bookmaker.get('markets')): if not isinstance(market, dict): continue market_key = str(market.get('key') or '') if market_key not in existing_market_keys: existing_markets.append(market) existing_market_keys.add(market_key) async def _request_json(client: httpx.AsyncClient, url: str, params: dict[str, Any] | None = None) -> Any: delay = 1.0 for attempt in range(1, 6): try: response = await client.get(url, params=params, timeout=20.0) if response.status_code == 429 or response.status_code >= 500: if attempt == 5: response.raise_for_status() logger.warning('來源暫時限流或失敗 status=%s attempt=%s url=%s', response.status_code, attempt, url) await asyncio.sleep(delay) delay *= 2 continue response.raise_for_status() return response.json() except (httpx.HTTPError, ValueError) as exc: if attempt == 5: raise RuntimeError(f'抓取來源失敗:{exc}') from exc logger.warning('來源請求失敗 attempt=%s url=%s error=%s', attempt, url, exc) await asyncio.sleep(delay) delay *= 2 return None async def fetch_the_odds_api(client: httpx.AsyncClient) -> list[dict[str, Any]]: url = f'{_canonical_api_base()}/sports/{THE_ODDS_SPORT_KEY}/odds' featured_markets = _featured_markets_param() additional_markets = _additional_markets_param() params = { 'apiKey': THE_ODDS_API_KEY, 'regions': THE_ODDS_REGIONS, 'markets': featured_markets, 'oddsFormat': 'decimal', 'dateFormat': 'iso', } logger.info('抓取 The Odds API featured markets:sport=%s regions=%s markets=%s', THE_ODDS_SPORT_KEY, THE_ODDS_REGIONS, featured_markets) payload = await _request_json(client, url, params) events = payload if isinstance(payload, list) else [] if not additional_markets or not events: return events detail_events = events[:THE_ODDS_ADDITIONAL_EVENTS_LIMIT] logger.info( '抓取 The Odds API additional markets:events=%s markets=%s', len(detail_events), additional_markets, ) detail_results = await asyncio.gather( *[ fetch_the_odds_event_markets(client, str(event.get('id')), additional_markets) for event in detail_events if isinstance(event, dict) and event.get('id') ], return_exceptions=True, ) by_event_id = {str(event.get('id')): event for event in events if isinstance(event, dict) and event.get('id')} for detail_result in detail_results: if isinstance(detail_result, Exception): logger.warning('The Odds API additional market 抓取失敗:%s', detail_result) continue if not isinstance(detail_result, dict): continue event_id = str(detail_result.get('id') or '') if event_id in by_event_id: _merge_event_markets(by_event_id[event_id], detail_result) return events async def fetch_the_odds_event_markets( client: httpx.AsyncClient, event_id: str, markets: str, ) -> dict[str, Any] | None: url = f'{_canonical_api_base()}/sports/{THE_ODDS_SPORT_KEY}/events/{event_id}/odds' params = { 'apiKey': THE_ODDS_API_KEY, 'regions': THE_ODDS_REGIONS, 'markets': markets, 'oddsFormat': 'decimal', 'dateFormat': 'iso', } payload = await _request_json(client, url, params) return payload if isinstance(payload, dict) else None async def fetch_espn_scoreboard(client: httpx.AsyncClient) -> list[dict[str, Any]]: now = datetime.now(timezone.utc) start = now - timedelta(days=ESPN_SCOREBOARD_LOOKBACK_DAYS) end = now + timedelta(days=ESPN_SCOREBOARD_LOOKAHEAD_DAYS) params = {'dates': f'{start:%Y%m%d}-{end:%Y%m%d}'} logger.info( '未設定 THE_ODDS_API_KEY,使用 ESPN scoreboard 作為低階備援。dates=%s', params['dates'], ) payload = await _request_json(client, ESPN_SCOREBOARD_URL, params=params) if not isinstance(payload, dict): logger.warning('ESPN scoreboard 回傳格式不是物件,略過本輪比分同步。') return [] events = _as_list(payload.get('events')) parsed: list[dict[str, Any]] = [] for event in events: if not isinstance(event, dict): continue competitions = [item for item in _as_list(event.get('competitions')) if isinstance(item, dict)] competition = competitions[0] if competitions else {} home_team = 'Unknown Team' away_team = 'Unknown Team' home_score: int | None = None away_score: int | None = None for competitor in _as_list(competition.get('competitors')): if not isinstance(competitor, dict): continue team_payload = _as_dict(competitor.get('team')) team_name = team_payload.get('name') or team_payload.get('displayName') if competitor.get('homeAway') == 'home': home_team = normalize_team_name(team_name) home_score = _parse_score(competitor.get('score')) elif competitor.get('homeAway') == 'away': away_team = normalize_team_name(team_name) away_score = _parse_score(competitor.get('score')) bookmakers: list[dict[str, Any]] = [] odds_arr = _as_list(competition.get('odds')) if odds_arr: primary = _as_dict(odds_arr[0]) provider_name = _as_dict(primary.get('provider')).get('name', 'ESPN Odds') moneyline = _as_dict(primary.get('moneyline')) outcomes: list[dict[str, Any]] = [] for key, name in (('home', home_team), ('draw', 'Draw'), ('away', away_team)): raw = _as_dict(moneyline.get(key)) price = _american_to_decimal(_as_dict(raw.get('close')).get('odds') or _as_dict(raw.get('open')).get('odds')) if price: outcomes.append({'name': name, 'price': price}) if outcomes: bookmakers.append( { 'key': _slug(provider_name, fallback='espn'), 'title': provider_name, 'markets': [{'key': 'h2h', 'outcomes': outcomes}], } ) parsed.append( { 'id': event.get('id') or _slug(f'{home_team}-{away_team}-{event.get("date", "")}', max_length=64), 'home_team': home_team, 'away_team': away_team, 'home_score': home_score, 'away_score': away_score, 'status': _parse_match_status(event), 'commence_time': event.get('date'), 'bookmakers': bookmakers, } ) return parsed async def fetch_taiwan_sports_lottery_reference(client: httpx.AsyncClient) -> list[dict[str, Any]]: """抓取台灣運彩公開世界盃盤口,作為台灣盤參考來源。 此來源目前只作為單一參考盤與最低可接受賠率比對,不直接等同多莊家正式 odds provider。 """ if not TAIWAN_SPORTS_LOTTERY_ENABLED: return [] payload = await _request_json(client, TAIWAN_SPORTS_LOTTERY_WC_URL) games = payload if isinstance(payload, list) else [] parsed: list[dict[str, Any]] = [] for game in games: if not isinstance(game, dict): continue home_team = normalize_team_name(game.get('hn')) away_team = normalize_team_name(game.get('an')) if home_team == 'Unknown Team' or away_team == 'Unknown Team': continue markets: list[dict[str, Any]] = [] for market in _as_list(game.get('ms')): if not isinstance(market, dict): continue market_key, point = _taiwan_market_key_and_point(str(market.get('name') or '')) if not market_key: continue outcomes: list[dict[str, Any]] = [] for choice in _as_list(market.get('cs')): if not isinstance(choice, dict): continue price = _fractional_to_decimal(choice.get('pu'), choice.get('pd')) if price is None: continue selection_name = _taiwan_selection_name(str(choice.get('name') or ''), home_team, away_team) if not selection_name: continue outcome: dict[str, Any] = { 'name': selection_name, 'price': price, } if point is not None: outcome['point'] = point outcomes.append(outcome) if outcomes: markets.append( { 'key': market_key, 'outcomes': outcomes, } ) if not markets: continue fallback_event_id = _slug(f'{home_team}-{away_team}-{game.get("kt", "")}') parsed.append( { 'id': f'tsl-{game.get("id") or fallback_event_id}', 'home_team': home_team, 'away_team': away_team, 'status': MatchStatus.PRE_MATCH, 'commence_time': game.get('kt'), 'bookmakers': [ { 'key': 'taiwan-sports-lottery-reference', 'title': '台灣運彩參考盤', 'markets': markets, } ], } ) logger.info('台灣運彩世界盃參考盤抓取完成:events=%s', len(parsed)) return parsed async def fetch_source_payload() -> tuple[str, list[dict[str, Any]]]: async with httpx.AsyncClient( headers={ 'User-Agent': 'Mozilla/5.0 FIFA2026QuantBot/1.0', 'Accept': 'application/json,text/plain,*/*', 'Referer': 'https://www.sportslottery.com.tw/', } ) as client: if THE_ODDS_API_KEY and THE_ODDS_API_KEY != 'your_the_odds_api_key': return 'the-odds-api', await fetch_the_odds_api(client) try: taiwan_reference_events = await fetch_taiwan_sports_lottery_reference(client) except Exception as exc: logger.warning('台灣運彩參考盤抓取失敗,本輪僅使用 ESPN 備援:%s', exc) taiwan_reference_events = [] if taiwan_reference_events: return 'taiwan-sports-lottery-reference', taiwan_reference_events return 'espn-scoreboard', await fetch_espn_scoreboard(client) async def process_odds_data(data: list[dict[str, Any]]) -> dict[str, int]: if not data: logger.info('本輪來源沒有可處理賽事。') return {'events': 0, 'odds_rows': 0, 'bookmakers': 0} event_count = 0 odds_count = 0 reconciled_count = 0 deduped_count = 0 bookmaker_ids: set[str] = set() async with SessionFactory() as session: try: for event in data: if not isinstance(event, dict): continue match_id = str(event.get('id') or '').strip() if not match_id: continue home_team_name = normalize_team_name(event.get('home_team')) away_team_name = normalize_team_name(event.get('away_team')) kickoff = _parse_datetime(event.get('commence_time')) home_team = await get_or_create_team(session, home_team_name) away_team = await get_or_create_team(session, away_team_name) venue = await session.get(Venue, 'unknown-stadium') if venue is None: venue = await get_or_create_venue(session, '待確認場館', '待確認', '待確認') reconciled_match, is_swapped_fixture = await _find_reconciled_match( session, incoming_match_id=match_id, home_team=home_team, away_team=away_team, kickoff=kickoff, ) if reconciled_match and reconciled_match.id != match_id: reconciled_count += 1 if await _merge_duplicate_match(session, source_match_id=match_id, target_match=reconciled_match): deduped_count += 1 match = reconciled_match or await session.get(Match, match_id) event_status = event.get('status') has_result_payload = isinstance(event_status, MatchStatus) or 'home_score' in event or 'away_score' in event parsed_status = event_status if isinstance(event_status, MatchStatus) else MatchStatus.PRE_MATCH home_score = _parse_score(event.get('home_score')) away_score = _parse_score(event.get('away_score')) if not match: match = Match( id=match_id, home_team_id=home_team.id, away_team_id=away_team.id, venue_id=venue.id, match_time_utc=kickoff, status=parsed_status, home_score=home_score, away_score=away_score, result_synced_at=datetime.now(timezone.utc), ) session.add(match) else: if not is_swapped_fixture: match.home_team_id = home_team.id match.away_team_id = away_team.id if venue.id != 'unknown-stadium' or match.venue_id == 'unknown-stadium': match.venue_id = venue.id match.match_time_utc = kickoff if has_result_payload: match.status = parsed_status match.home_score = away_score if is_swapped_fixture else home_score match.away_score = home_score if is_swapped_fixture else away_score match.result_synced_at = datetime.now(timezone.utc) event_count += 1 recorded_at = datetime.now(timezone.utc) for bookmaker_payload in _as_list(event.get('bookmakers')): if not isinstance(bookmaker_payload, dict): continue bookmaker_key = _slug(str(bookmaker_payload.get('key') or bookmaker_payload.get('title') or 'unknown')) bookmaker_name = str(bookmaker_payload.get('title') or bookmaker_key) bookmaker = await get_or_create_bookmaker(session, bookmaker_key, bookmaker_name) bookmaker_ids.add(bookmaker.id) for market in _as_list(bookmaker_payload.get('markets')): if not isinstance(market, dict): continue source_market = str(market.get('key') or '').strip() market_type = MARKET_TYPE_MAP.get(source_market, source_market or 'unknown') for outcome in _as_list(market.get('outcomes')): if not isinstance(outcome, dict): continue price = _decimal_price(outcome.get('price')) if not price: continue point = _line_value(outcome.get('point'), market.get('point')) market_line = point if source_market in {'totals', 'alternate_totals', 'team_totals', 'alternate_team_totals'} else None handicap = point if source_market in {'spreads', 'alternate_spreads'} else None source_outcome_name = str(outcome.get('name') or '')[:140] source_outcome_description = str(outcome.get('description') or '')[:140] selection = _selection_for_outcome( source_market, source_outcome_name, home_team_name, away_team_name, source_outcome_description, ) if not selection: continue session.add( OddsHistory( match_id=match.id, bookmaker_id=bookmaker.id, market_type=market_type, selection=selection, market_line=market_line, handicap=handicap, decimal_odds=price, implied_probability=1.0 / price, source_market_key=source_market[:80], source_outcome_name=source_outcome_name, recorded_at=recorded_at, ) ) odds_count += 1 await session.commit() logger.info( '本輪賠率寫入完成:events=%s odds_rows=%s bookmakers=%s reconciled=%s deduped=%s', event_count, odds_count, len(bookmaker_ids), reconciled_count, deduped_count, ) return { 'events': event_count, 'odds_rows': odds_count, 'bookmakers': len(bookmaker_ids), 'reconciled_matches': reconciled_count, 'deduped_matches': deduped_count, } except SQLAlchemyError as exc: await session.rollback() logger.exception('資料庫寫入賠率失敗:%s', exc) raise async def get_or_create_team(session: Any, name: str) -> Team: canonical_name = normalize_team_name(name) result = await session.execute(select(Team).where(Team.name.in_(_team_name_candidates(name)))) teams = list(result.scalars().all()) for team in teams: if normalize_team_name(team.name) == canonical_name: return team if teams: return teams[0] team = Team(id=str(uuid4()), name=canonical_name) session.add(team) await session.flush() return team async def get_or_create_venue(session: Any, name: str, city: str, country: str) -> Venue: result = await session.execute(select(Venue).where(Venue.name == name)) venue = result.scalars().first() if not venue: venue = Venue(id=str(uuid4()), name=name, city=city, country=country, timezone='UTC') session.add(venue) await session.flush() return venue async def get_or_create_bookmaker(session: Any, bookmaker_id: str, name: str) -> Bookmaker: bookmaker = await session.get(Bookmaker, bookmaker_id) if not bookmaker: bookmaker = Bookmaker(id=bookmaker_id, name=name[:120]) session.add(bookmaker) await session.flush() return bookmaker async def publish_status(status: dict[str, Any]) -> None: try: redis = Redis.from_url(REDIS_URL, decode_responses=True) await redis.set( 'ingestion:odds:last_run', json.dumps(status, ensure_ascii=False), ex=max(ODDS_POLL_INTERVAL_SECONDS * 3, 900), ) await redis.aclose() except Exception as exc: # pragma: no cover - Redis 狀態不應阻斷 DB ingestion logger.warning('寫入 Redis ingestion 狀態失敗:%s', exc) async def run_once() -> dict[str, Any]: source, payload = await fetch_source_payload() stats = await process_odds_data(payload) if source != 'espn-scoreboard': try: async with httpx.AsyncClient() as client: espn_payload = await fetch_espn_scoreboard(client) score_stats = await process_odds_data(espn_payload) stats = { 'events': stats.get('events', 0) + score_stats.get('events', 0), 'odds_rows': stats.get('odds_rows', 0) + score_stats.get('odds_rows', 0), 'bookmakers': stats.get('bookmakers', 0) + score_stats.get('bookmakers', 0), 'reconciled_matches': stats.get('reconciled_matches', 0) + score_stats.get('reconciled_matches', 0), 'deduped_matches': stats.get('deduped_matches', 0) + score_stats.get('deduped_matches', 0), 'score_events': score_stats.get('events', 0), 'score_status': 'ok', } except Exception as exc: logger.warning('比分來源同步失敗,但不阻斷賠率 ingestion:%s', exc) stats = { **stats, 'score_events': 0, 'score_status': 'error', 'score_message': str(exc), } status = { 'status': 'ok', 'source': source, 'run_at': datetime.now(timezone.utc).isoformat(), **stats, } await publish_status(status) return status async def run_forever() -> None: logger.info('啟動賠率 ingestion worker,interval=%ss', ODDS_POLL_INTERVAL_SECONDS) while True: try: status = await run_once() logger.info('ingestion 狀態:%s', status) except Exception as exc: error_status = { 'status': 'error', 'source': 'unknown', 'run_at': datetime.now(timezone.utc).isoformat(), 'message': str(exc), } await publish_status(error_status) logger.exception('本輪賠率 ingestion 失敗:%s', exc) await asyncio.sleep(ODDS_POLL_INTERVAL_SECONDS) if __name__ == '__main__': if os.environ.get('ODDS_WORKER_ONCE') == 'true': print(asyncio.run(run_once())) else: asyncio.run(run_forever())