"""個人投注弱點分析(Betting Leaks)引擎。 將使用者歷史注單做群組化彙總,找出長期導致虧損的下注模式。 """ from __future__ import annotations from dataclasses import dataclass from typing import Any def _safe_float(value: Any, default: float | None = None) -> float | None: try: return float(value) except (TypeError, ValueError): return default def _safe_int(value: Any, default: int | None = None) -> int | None: try: return int(value) except (TypeError, ValueError): return default def _to_bool(value: Any, default: bool = False) -> bool: if isinstance(value, bool): return value if isinstance(value, str): return value.strip().lower() in {'1', 'true', 't', 'yes', 'y'} if isinstance(value, (int, float)): return value not in {0} return default def _odds_bucket(odds: float | None, step: float = 0.5) -> str: if odds is None or odds <= 0: return 'N/A' if odds <= 1: return '1.00-1.50' bucket_start = ((odds - 1) // step) * step + 1 bucket_end = bucket_start + step return f'{bucket_start:.2f}-{bucket_end:.2f}' def _calculate_pnl(stake: float, is_win: bool, closing_odds: float | None, recommended_odds: float | None) -> float: """依下注結果與收盤賠率計算實際 P/L。""" effective_odds = closing_odds if effective_odds is None or effective_odds <= 1: effective_odds = recommended_odds if effective_odds is None or effective_odds <= 1 or stake <= 0: return 0.0 if is_win: return stake * (effective_odds - 1) return -stake def _calculate_clv(recommended_odds: float | None, closing_odds: float | None) -> float | None: if recommended_odds is None or closing_odds is None: return None if recommended_odds <= 0 or closing_odds <= 0: return None return (recommended_odds / closing_odds - 1) * 100 @dataclass(frozen=True) class LeakageCluster: market_type: str bet_type: str odds_bucket: str match_stage: str bet_count: int total_stake: float closed_count: int win_count: int total_pnl: float avg_clv_percent: float roi_percent: float hit_rate_percent: float status: str def as_dict(self) -> dict[str, Any]: return { 'market_type': self.market_type, 'bet_type': self.bet_type, 'odds_bucket': self.odds_bucket, 'match_stage': self.match_stage, 'bet_count': self.bet_count, 'total_stake': self.total_stake, 'closed_count': self.closed_count, 'win_count': self.win_count, 'total_pnl': self.total_pnl, 'avg_clv_percent': self.avg_clv_percent, 'roi_percent': self.roi_percent, 'hit_rate_percent': self.hit_rate_percent, 'status': self.status, } @dataclass(frozen=True) class HardTruth: title: str message: str cluster: dict[str, Any] def analyze_user_leaks(user_bets: list[dict[str, Any]]) -> dict[str, Any]: """分析使用者注單中的高頻虧損模式,回傳風險群組與漏點警告。""" raw_bets = user_bets if isinstance(user_bets, list) else [] grouped: dict[tuple[str, str, str, str], dict[str, Any]] = {} for raw in raw_bets: if not isinstance(raw, dict): continue market_type = str(raw.get('market_type', 'unknown')).strip() or 'unknown' is_single = raw.get('parlay_type') in (None, 'single', '', 'single_bet') bet_type = 'single' if is_single else 'parlay' odds = _safe_float(raw.get('odds')) stake = _safe_float(raw.get('stake')) if stake is None or stake <= 0: continue match_stage = str(raw.get('match_stage', raw.get('stage', 'unknown'))).strip() or 'unknown' odds_band = _odds_bucket(odds) key = (market_type, bet_type, odds_band, match_stage) entry = grouped.setdefault( key, { 'bet_count': 0, 'total_stake': 0.0, 'closed_count': 0, 'win_count': 0, 'total_pnl': 0.0, 'clv_values': [] as list[float], }, ) entry['bet_count'] += 1 entry['total_stake'] += stake is_settled = _to_bool(raw.get('is_settled'), default=False) if not is_settled: continue is_win = _to_bool(raw.get('is_win')) if is_win: entry['win_count'] += 1 entry['closed_count'] += 1 closing_odds = _safe_float(raw.get('closing_odds')) recommended_odds = odds or _safe_float(raw.get('recommended_odds')) pnl = _calculate_pnl( stake=stake, is_win=is_win, closing_odds=closing_odds, recommended_odds=recommended_odds, ) entry['total_pnl'] += pnl clv = _calculate_clv(recommended_odds, closing_odds) if clv is not None: entry['clv_values'].append(clv) total_bets = sum(v['bet_count'] for v in grouped.values()) settled_bets = sum(v['closed_count'] for v in grouped.values()) total_stake = sum(v['total_stake'] for v in grouped.values()) total_pnl = sum(v['total_pnl'] for v in grouped.values()) total_win = sum(v['win_count'] for v in grouped.values()) clusters: list[LeakageCluster] = [] hard_truths: list[HardTruth] = [] for (market_type, bet_type, odds_bucket, match_stage), row in grouped.items(): bet_count = int(row['bet_count']) closed_count = int(row['closed_count']) total_stake_group = float(row['total_stake']) total_pnl_group = float(row['total_pnl']) roi = (total_pnl_group / total_stake_group * 100) if total_stake_group > 0 else 0.0 win_rate = (row['win_count'] / closed_count * 100) if closed_count > 0 else 0.0 avg_clv = (sum(row['clv_values']) / len(row['clv_values'])) if row['clv_values'] else 0.0 status = 'OK' if bet_count > 20 and roi < -10: status = 'CRITICAL_LEAK' hard_truths.append( HardTruth( title='嚴重漏財點', message=( f'{match_stage} / {bet_type} / {market_type} / {odds_bucket} 的下注次數 {bet_count} 場,' f'ROI {roi:.2f}%,請先降低此區塊投注比例。' ), cluster={ 'market_type': market_type, 'bet_type': bet_type, 'odds_bucket': odds_bucket, 'match_stage': match_stage, }, ).__dict__, ) clusters.append( LeakageCluster( market_type=market_type, bet_type=bet_type, odds_bucket=odds_bucket, match_stage=match_stage, bet_count=bet_count, total_stake=round(total_stake_group, 2), closed_count=closed_count, win_count=row['win_count'], total_pnl=round(total_pnl_group, 2), avg_clv_percent=round(avg_clv, 4), roi_percent=round(roi, 4), hit_rate_percent=round(win_rate, 2), status=status, ), ) clusters.sort(key=lambda c: c.roi_percent) overall_roi = (total_pnl / total_stake * 100) if total_stake > 0 else 0.0 overall_hit_rate = (total_win / settled_bets * 100) if settled_bets > 0 else 0.0 return { 'total_bet_count': total_bets, 'settled_bet_count': settled_bets, 'total_stake': round(total_stake, 2), 'total_pnl': round(total_pnl, 2), 'overall_roi_percent': round(overall_roi, 4), 'overall_hit_rate_percent': round(overall_hit_rate, 2), 'clusters': [c.as_dict() for c in clusters], 'hard_truths': [h.__dict__ for h in hard_truths], }