#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ services/token_report_service.py LLM Token 日報服務 (Operation Ollama-First v5.0 — Phase 1 收尾) 依據: - migrations/024_create_ai_calls_table.sql (ai_calls schema + CHECK constraints) - migrations/025_create_mcp_calls_and_budgets.sql (ai_call_budgets 種子資料) - services/ai_call_logger.py (COST_TABLE / provider 白名單) - services/telegram_templates.py (HTML escape 與 send 封裝) - docs/phase0_audit_report_20260503.md (34 LLM 呼叫點清冊) - docs/phase1_db_design_20260503.md (查詢 latency 預估) 設計紀律 (憲法級): 1. 失敗安全: DB 查詢失敗 → 推「⚠️ 報表生成失敗」訊息,不影響其他排程 2. PII 保護: 報表訊息不含 prompt 原文;ai_insights metadata 只存統計 meta(不存 username) 3. 不污染既有 Telegram 流程: 共用 telegram_templates 既有 send 函數,不另開連線 4. ≤ 4096 字元自動截斷: Telegram 單訊息上限保險絲 公開 API: - generate_daily_report(target_date) → str (HTML) - send_daily_report() → dict (sent/failed/errors) """ from __future__ import annotations import logging from datetime import date, datetime, timedelta, timezone from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) # Asia/Taipei (UTC+8) 統一處理(避免容器 tzdata 差異,沿襲 telegram_templates 慣例) _TAIPEI_TZ = timezone(timedelta(hours=8)) # Telegram 單則訊息字元上限(保留 96 字元給 footer,避免精準卡 4096) _TELEGRAM_MAX_CHARS = 4000 # Provider 顯示名稱表(與 ai_calls.provider 白名單對齊,order 即報表順序) _PROVIDER_DISPLAY: Dict[str, Tuple[str, str]] = { 'gcp_ollama': ('🟢', 'GCP Ollama'), 'ollama_secondary': ('🟢', 'Secondary'), # critic-A11 B4 修補:三主機架構一致性 'ollama_111': ('🟠', '111 Ollama'), 'gemini': ('🔴', 'Gemini'), 'claude': ('🟣', 'Claude'), 'nim': ('🟡', 'NIM'), 'openrouter': ('🟤', 'OpenRouter'), 'nim_via_elephant': ('🟫', 'NIM_via_Eleph'), } # Ollama 占比門檻(Section 1 「Ollama-First 達標」判斷用,戰役 KPI ≥60%) _OLLAMA_FIRST_TARGET_PCT = 60.0 # 告警規則參數(Section 6 自動產生用) _ALERT_RULES = { 'caller_spike_factor': 1.4, # tokens > 7 日均 × 1.4 'gemini_share_threshold': 35.0, # gemini 占比 > 35% 視為 Ollama-First 失守 'error_rate_critical': 5.0, # error_rate > 5% → P1 'budget_warning': 80.0, # spent / budget > 80% → P1 'gcp_hit_warning': 90.0, # gcp_ollama 占比 < 90% (Ollama 內) → P2 'cache_hit_low': 40.0, # claude cache hit < 40% → INFO 'caller_stable_days': 7, # 連續 N 日 Ollama >95% → INFO「可關 fallback」 'ollama_stable_pct': 95.0, } # ═══════════════════════════════════════════════════════════════════════════════ # 公開 API # ═══════════════════════════════════════════════════════════════════════════════ def generate_daily_report(target_date: Optional[date] = None) -> str: """產出指定日的 LLM Token 日報(HTML,供 Telegram parse_mode='HTML')。 Args: target_date: 統計目標日(Asia/Taipei)。未指定 → 「今日」。 Returns: 完整 HTML 報表字串;若 DB 查詢失敗,回傳簡短錯誤訊息(仍可發 Telegram)。 """ if target_date is None: target_date = datetime.now(_TAIPEI_TZ).date() try: summary = _query_summary(target_date) by_provider = _query_by_provider(target_date) top_callers = _query_top_callers(target_date, limit=10) costs = _query_cost_breakdown(target_date) trends = _query_trends_vs_7day(target_date) budgets = _query_budget_usage(target_date) cache_stats = _query_cache_hit_stats(target_date) except Exception as exc: logger.exception("[TokenReport] DB query failed: %s", exc) return _format_failure_report(target_date, str(exc)) alerts = _detect_alerts(summary, by_provider, top_callers, trends, budgets, cache_stats) insights = _generate_insights(target_date, summary, by_provider) return _format_report( target_date=target_date, summary=summary, by_provider=by_provider, top_callers=top_callers, costs=costs, trends=trends, budgets=budgets, cache_stats=cache_stats, alerts=alerts, insights=insights, ) def send_daily_report(target_date: Optional[date] = None) -> Dict[str, Any]: """產報並送 Telegram + 寫 ai_insights。 Returns: {'ok': bool, 'sent': int, 'failed': int, 'chars': int, 'errors': list} """ if target_date is None: target_date = datetime.now(_TAIPEI_TZ).date() try: report_html = generate_daily_report(target_date) except Exception as exc: logger.exception("[TokenReport] generate_daily_report failed: %s", exc) report_html = _format_failure_report(target_date, str(exc)) # 截斷至 Telegram 安全長度(HTML tag 簡化處理:超出時加省略尾) if len(report_html) > _TELEGRAM_MAX_CHARS: truncated = report_html[: _TELEGRAM_MAX_CHARS - 80] report_html = truncated + "\n\n... (訊息超長,已截斷;詳見 ai_insights)" # 送 Telegram(用既有封裝,不另起連線) result: Dict[str, Any] = {'ok': False, 'sent': 0, 'failed': 0, 'chars': len(report_html), 'errors': []} try: from services.telegram_templates import send_telegram_with_result send_result = send_telegram_with_result(report_html, parse_mode='HTML') result.update({ 'ok': bool(send_result.get('ok')), 'sent': int(send_result.get('sent', 0)), 'failed': int(send_result.get('failed', 0)), 'errors': list(send_result.get('errors', [])), }) except Exception as exc: logger.exception("[TokenReport] telegram send failed: %s", exc) result['errors'].append(f"telegram:{type(exc).__name__}") # 寫 ai_insights(不含 PII / 不存 username) try: _persist_to_ai_insights(target_date, report_html, result) except Exception as exc: logger.warning("[TokenReport] ai_insights persist failed: %s", exc) return result # ═══════════════════════════════════════════════════════════════════════════════ # 內部:SQL 查詢 # ═══════════════════════════════════════════════════════════════════════════════ def _date_window(target_date: date) -> Tuple[datetime, datetime]: """回傳 [day_start, day_end) 的 Taipei tz-aware datetime(PostgreSQL 比較用)。""" day_start = datetime.combine(target_date, datetime.min.time(), tzinfo=_TAIPEI_TZ) day_end = day_start + timedelta(days=1) return day_start, day_end def _exec_query(sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: """執行查詢並回傳 list of dict。session 隔離,例外向上拋。""" from sqlalchemy import text from database.manager import get_session session = get_session() try: rows = session.execute(text(sql), params).mappings().all() return [dict(r) for r in rows] finally: session.close() def _query_summary(target_date: date) -> Dict[str, Any]: """Section 1 — 今日總覽(單列彙總)。 Returns: {total_tokens, total_calls, total_cost_usd, avg_duration_ms, success_rate, ollama_pct, prev_total_tokens (昨日比基準)} """ day_start, day_end = _date_window(target_date) prev_start = day_start - timedelta(days=1) rows = _exec_query(""" SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens, COUNT(*) AS total_calls, COALESCE(SUM(cost_usd), 0) AS total_cost_usd, COALESCE(AVG(duration_ms), 0) AS avg_duration_ms, COALESCE(SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END), 0) AS ok_calls, COALESCE(SUM( CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') THEN input_tokens + output_tokens ELSE 0 END ), 0) AS ollama_tokens FROM ai_calls WHERE called_at >= :start AND called_at < :end """, {'start': day_start, 'end': day_end}) prev_rows = _exec_query(""" SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS prev_total_tokens FROM ai_calls WHERE called_at >= :start AND called_at < :end """, {'start': prev_start, 'end': day_start}) r = rows[0] if rows else {} total_calls = int(r.get('total_calls') or 0) total_tokens = int(r.get('total_tokens') or 0) ok_calls = int(r.get('ok_calls') or 0) ollama_tokens = int(r.get('ollama_tokens') or 0) prev_total = int((prev_rows[0] if prev_rows else {}).get('prev_total_tokens') or 0) return { 'total_tokens': total_tokens, 'total_calls': total_calls, 'total_cost_usd': float(r.get('total_cost_usd') or 0), 'avg_duration_ms': float(r.get('avg_duration_ms') or 0), 'success_rate': (ok_calls / total_calls * 100.0) if total_calls else 0.0, 'failed_calls': max(0, total_calls - ok_calls), 'ollama_pct': (ollama_tokens / total_tokens * 100.0) if total_tokens else 0.0, 'prev_total_tokens': prev_total, 'wow_pct': ((total_tokens - prev_total) / prev_total * 100.0) if prev_total else 0.0, } def _query_by_provider(target_date: date) -> List[Dict[str, Any]]: """Section 2 — 供應商分布(依 7 個 provider,含 0 筆者也顯示)。""" day_start, day_end = _date_window(target_date) rows = _exec_query(""" SELECT provider, SUM(input_tokens + output_tokens)::BIGINT AS tokens, COUNT(*) AS calls, COALESCE(SUM(cost_usd), 0) AS cost_usd, COALESCE(AVG(duration_ms), 0) AS avg_duration_ms FROM ai_calls WHERE called_at >= :start AND called_at < :end GROUP BY provider """, {'start': day_start, 'end': day_end}) by_p = {r['provider']: r for r in rows} total_tokens = sum(int(r['tokens'] or 0) for r in rows) result: List[Dict[str, Any]] = [] for p_key in _PROVIDER_DISPLAY: r = by_p.get(p_key, {}) tokens = int(r.get('tokens') or 0) result.append({ 'provider': p_key, 'tokens': tokens, 'pct': (tokens / total_tokens * 100.0) if total_tokens else 0.0, 'calls': int(r.get('calls') or 0), 'cost_usd': float(r.get('cost_usd') or 0), 'avg_duration_ms': float(r.get('avg_duration_ms') or 0), }) return result def _query_top_callers(target_date: date, limit: int = 10) -> List[Dict[str, Any]]: """Section 3 — TOP N caller by token + 與 7 日均的偏差。""" day_start, day_end = _date_window(target_date) week_start = day_start - timedelta(days=7) rows = _exec_query(""" WITH today AS ( SELECT caller, provider, MODE() WITHIN GROUP (ORDER BY model) AS top_model, SUM(input_tokens + output_tokens)::BIGINT AS tokens, COUNT(*) AS calls FROM ai_calls WHERE called_at >= :day_start AND called_at < :day_end GROUP BY caller, provider ), baseline AS ( SELECT caller, SUM(input_tokens + output_tokens) / 7.0 AS avg_tokens_7d FROM ai_calls WHERE called_at >= :week_start AND called_at < :day_start GROUP BY caller ) SELECT t.caller, t.provider, t.top_model, t.tokens, t.calls, COALESCE(b.avg_tokens_7d, 0) AS avg_tokens_7d FROM today t LEFT JOIN baseline b ON b.caller = t.caller ORDER BY t.tokens DESC LIMIT :limit """, { 'day_start': day_start, 'day_end': day_end, 'week_start': week_start, 'limit': int(limit), }) result: List[Dict[str, Any]] = [] for r in rows: tokens = int(r.get('tokens') or 0) baseline = float(r.get('avg_tokens_7d') or 0) delta_pct = ((tokens - baseline) / baseline * 100.0) if baseline > 0 else None result.append({ 'caller': str(r.get('caller') or ''), 'provider': str(r.get('provider') or ''), 'model': str(r.get('top_model') or ''), 'tokens': tokens, 'calls': int(r.get('calls') or 0), 'delta_pct': delta_pct, }) return result def _query_cost_breakdown(target_date: date) -> List[Dict[str, Any]]: """Section 4 — 依 model 拆解成本(金額由大到小,零成本不顯示)。""" day_start, day_end = _date_window(target_date) rows = _exec_query(""" SELECT provider, model, COALESCE(SUM(cost_usd), 0) AS cost_usd, COUNT(*) AS calls FROM ai_calls WHERE called_at >= :start AND called_at < :end AND cost_usd > 0 GROUP BY provider, model ORDER BY cost_usd DESC LIMIT 12 """, {'start': day_start, 'end': day_end}) return [ { 'provider': str(r['provider']), 'model': str(r['model']), 'cost_usd': float(r['cost_usd']), 'calls': int(r['calls']), } for r in rows ] def _query_trends_vs_7day(target_date: date) -> Dict[str, Any]: """Section 5 — 今日 vs 過去 7 日均 的趨勢比對。""" day_start, day_end = _date_window(target_date) week_start = day_start - timedelta(days=7) today_rows = _exec_query(""" SELECT COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_tokens, COALESCE(SUM(CASE WHEN provider='gemini' THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS gemini_tokens, COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS ollama_tokens, COALESCE(SUM(CASE WHEN provider='claude' THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS claude_tokens, COALESCE(AVG(duration_ms), 0) AS avg_duration_ms, COALESCE(SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END), 0) AS failed, COUNT(*) AS total_calls, COALESCE(SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END), 0) AS gcp_calls, COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') THEN 1 ELSE 0 END), 0) AS ollama_calls FROM ai_calls WHERE called_at >= :start AND called_at < :end """, {'start': day_start, 'end': day_end}) base_rows = _exec_query(""" SELECT COALESCE(SUM(input_tokens + output_tokens) / 7.0, 0) AS avg_total_tokens, COALESCE(SUM(CASE WHEN provider='gemini' THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_gemini_tokens, COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_ollama_tokens, COALESCE(SUM(CASE WHEN provider='claude' THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_claude_tokens, COALESCE(AVG(duration_ms), 0) AS avg_duration_ms, CASE WHEN COUNT(*) > 0 THEN SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100.0 ELSE 0 END AS error_rate_pct, COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_7d_tokens, COALESCE(SUM(cost_usd), 0) AS total_7d_cost, CASE WHEN SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') THEN 1 ELSE 0 END) > 0 THEN SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END)::FLOAT / SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') THEN 1 ELSE 0 END)::FLOAT * 100.0 ELSE 0 END AS gcp_hit_pct_7d FROM ai_calls WHERE called_at >= :start AND called_at < :end """, {'start': week_start, 'end': day_start}) t = today_rows[0] if today_rows else {} b = base_rows[0] if base_rows else {} today_total = int(t.get('total_tokens') or 0) today_gemini = int(t.get('gemini_tokens') or 0) today_ollama = int(t.get('ollama_tokens') or 0) today_claude = int(t.get('claude_tokens') or 0) today_calls = int(t.get('total_calls') or 0) today_failed = int(t.get('failed') or 0) today_gcp_calls = int(t.get('gcp_calls') or 0) today_ollama_cal = int(t.get('ollama_calls') or 0) today_error_pct = (today_failed / today_calls * 100.0) if today_calls else 0.0 today_gcp_hit = (today_gcp_calls / today_ollama_cal * 100.0) if today_ollama_cal else 0.0 return { 'today_total_tokens': today_total, 'today_gemini_tokens': today_gemini, 'today_ollama_tokens': today_ollama, 'today_claude_tokens': today_claude, 'today_avg_duration': float(t.get('avg_duration_ms') or 0), 'today_error_rate': today_error_pct, 'today_gcp_hit_pct': today_gcp_hit, '7d_avg_total': float(b.get('avg_total_tokens') or 0), '7d_avg_gemini': float(b.get('avg_gemini_tokens') or 0), '7d_avg_ollama': float(b.get('avg_ollama_tokens') or 0), '7d_avg_claude': float(b.get('avg_claude_tokens') or 0), '7d_avg_duration': float(b.get('avg_duration_ms') or 0), '7d_error_rate': float(b.get('error_rate_pct') or 0), '7d_total_tokens': int(b.get('total_7d_tokens') or 0), '7d_total_cost': float(b.get('total_7d_cost') or 0), '7d_gcp_hit_pct': float(b.get('gcp_hit_pct_7d') or 0), } def _query_budget_usage(target_date: date) -> Dict[str, Any]: """Section 4 — 預算對比(daily/weekly/monthly 全供應商總額)。""" day_start, day_end = _date_window(target_date) week_start = day_start - timedelta(days=6) month_start = day_start.replace(day=1) spent = _exec_query(""" SELECT COALESCE(SUM(CASE WHEN called_at >= :day_start AND called_at < :day_end THEN cost_usd ELSE 0 END), 0) AS daily_spent, COALESCE(SUM(CASE WHEN called_at >= :week_start AND called_at < :day_end THEN cost_usd ELSE 0 END), 0) AS weekly_spent, COALESCE(SUM(CASE WHEN called_at >= :month_start AND called_at < :day_end THEN cost_usd ELSE 0 END), 0) AS monthly_spent, COUNT(*) FILTER (WHERE called_at >= :month_start) AS month_call_count FROM ai_calls WHERE called_at >= :month_start AND called_at < :day_end """, { 'day_start': day_start, 'day_end': day_end, 'week_start': week_start, 'month_start': month_start, }) budget_rows = _exec_query(""" SELECT period, provider, budget_usd, alert_pct FROM ai_call_budgets WHERE provider IS NULL """, {}) budgets = {r['period']: float(r['budget_usd']) for r in budget_rows} s = spent[0] if spent else {} return { 'daily_spent': float(s.get('daily_spent') or 0), 'weekly_spent': float(s.get('weekly_spent') or 0), 'monthly_spent': float(s.get('monthly_spent') or 0), 'daily_budget': budgets.get('daily', 0.0), 'weekly_budget': budgets.get('weekly', 0.0), 'monthly_budget': budgets.get('monthly', 0.0), } def _query_cache_hit_stats(target_date: date) -> Dict[str, Any]: """Section 4 — Anthropic / Gemini prompt cache 命中統計。""" day_start, day_end = _date_window(target_date) rows = _exec_query(""" SELECT provider, COUNT(*) AS total_calls, SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) AS cache_hits FROM ai_calls WHERE called_at >= :start AND called_at < :end AND provider IN ('claude','gemini') GROUP BY provider """, {'start': day_start, 'end': day_end}) by_p = {r['provider']: r for r in rows} out: Dict[str, Any] = {} for p in ('claude', 'gemini'): r = by_p.get(p, {}) total = int(r.get('total_calls') or 0) hits = int(r.get('cache_hits') or 0) out[p] = { 'total': total, 'hits': hits, 'pct': (hits / total * 100.0) if total else 0.0, } return out # ═══════════════════════════════════════════════════════════════════════════════ # 內部:告警偵測(Section 6) # ═══════════════════════════════════════════════════════════════════════════════ def _detect_alerts( summary: Dict[str, Any], by_provider: List[Dict[str, Any]], top_callers: List[Dict[str, Any]], trends: Dict[str, Any], budgets: Dict[str, Any], cache_stats: Dict[str, Any], ) -> List[Dict[str, str]]: """依 7 條規則產生告警清單,回傳 [{level, icon, title, suggestion}, ...]""" alerts: List[Dict[str, str]] = [] # R1: 單一 caller 暴增 (P2) spike_factor = _ALERT_RULES['caller_spike_factor'] for caller in top_callers: delta = caller.get('delta_pct') if delta is not None and delta >= (spike_factor - 1) * 100.0: alerts.append({ 'level': 'P2', 'icon': '🟠', 'title': f"{caller['caller']} token 暴增 {delta:+.0f}%(vs 7 日均)", 'suggestion': f"今日 {caller['tokens']:,} tokens / {caller['calls']} calls,建議查 prompt 是否變更", }) # R2: Gemini 占比飆升 (P2 「Ollama-First 失守」) gemini = next((r for r in by_provider if r['provider'] == 'gemini'), {}) gemini_pct = float(gemini.get('pct') or 0) if gemini_pct > _ALERT_RULES['gemini_share_threshold']: alerts.append({ 'level': 'P2', 'icon': '🟠', 'title': f"Gemini 占比 {gemini_pct:.1f}% 高於門檻 {_ALERT_RULES['gemini_share_threshold']:.0f}%", 'suggestion': "Ollama-First 失守,請檢查 fallback 是否正確命中本地", }) # R3: 失敗率 (P1) total_calls = int(summary.get('total_calls') or 0) failed = int(summary.get('failed_calls') or 0) if total_calls: err_rate = failed / total_calls * 100.0 if err_rate > _ALERT_RULES['error_rate_critical']: alerts.append({ 'level': 'P1', 'icon': '🔴', 'title': f"全域失敗率 {err_rate:.1f}% 超過門檻 {_ALERT_RULES['error_rate_critical']:.0f}%", 'suggestion': f"今日 {failed:,} / {total_calls:,} 失敗,立即查 ai_calls WHERE status<>'ok'", }) # R4: 預算超標 (P1) for period_key, label in (('daily', '日'), ('weekly', '週'), ('monthly', '月')): spent = float(budgets.get(f'{period_key}_spent') or 0) budget = float(budgets.get(f'{period_key}_budget') or 0) if budget > 0: usage_pct = spent / budget * 100.0 if usage_pct > _ALERT_RULES['budget_warning']: alerts.append({ 'level': 'P1', 'icon': '🔴', 'title': f"{label}成本 ${spent:.2f} 達預算 ${budget:.2f} 的 {usage_pct:.0f}%", 'suggestion': "請檢查供應商分布是否異常(Section 2/3)或調整預算", }) # R5: GCP 命中率低 (P2) — 僅當有 Ollama 流量時才檢查 today_gcp_hit = float(trends.get('today_gcp_hit_pct') or 0) ollama = sum(int(r.get('tokens') or 0) for r in by_provider if r['provider'] in ('gcp_ollama', 'ollama_secondary', 'ollama_111')) if ollama > 0 and today_gcp_hit < _ALERT_RULES['gcp_hit_warning']: alerts.append({ 'level': 'P2', 'icon': '🟠', 'title': f"GCP Ollama 命中率 {today_gcp_hit:.1f}% 低於 {_ALERT_RULES['gcp_hit_warning']:.0f}%", 'suggestion': "111 fallback 觸發頻繁,請檢查 GCP Ollama 健康(ADR-027)", }) # R6: Cache 命中率低 (INFO) — claude claude_cache = cache_stats.get('claude', {}) if int(claude_cache.get('total') or 0) >= 10: if float(claude_cache.get('pct') or 0) < _ALERT_RULES['cache_hit_low']: alerts.append({ 'level': 'INFO', 'icon': '🟢', 'title': f"Claude prompt cache 命中率僅 {claude_cache['pct']:.1f}%", 'suggestion': "可優化 system prompt 結構(≥1024 tokens 才觸發 cache)", }) return alerts def _generate_insights( target_date: date, summary: Dict[str, Any], by_provider: List[Dict[str, Any]], ) -> List[Dict[str, str]]: """Section 6 智能建議(規則引擎,零 LLM 成本)。""" insights: List[Dict[str, str]] = [] ollama_pct = float(summary.get('ollama_pct') or 0) if ollama_pct >= _OLLAMA_FIRST_TARGET_PCT: insights.append({ 'icon': '✅', 'text': f"Ollama 占比 {ollama_pct:.1f}%(目標 ≥{_OLLAMA_FIRST_TARGET_PCT:.0f}%),Ollama-First 戰役達標", }) else: insights.append({ 'icon': '⚠️', 'text': f"Ollama 占比 {ollama_pct:.1f}% 未達 {_OLLAMA_FIRST_TARGET_PCT:.0f}% 目標,可優化 fallback 鏈", }) nim_total = sum( int(r.get('tokens') or 0) for r in by_provider if r['provider'] in ('nim', 'nim_via_elephant') ) if 0 < nim_total < 100_000: insights.append({ 'icon': '✅', 'text': f"NIM 用量已降至 {nim_total:,} tokens(戰役前約 5M),可考慮關閉 NIM 依賴", }) success_rate = float(summary.get('success_rate') or 0) if summary.get('total_calls') and success_rate >= 99.0: insights.append({ 'icon': '✅', 'text': f"成功率 {success_rate:.1f}%,鏈路健康度高", }) return insights # ═══════════════════════════════════════════════════════════════════════════════ # 內部:報表組裝 # ═══════════════════════════════════════════════════════════════════════════════ def _format_report( target_date: date, summary: Dict[str, Any], by_provider: List[Dict[str, Any]], top_callers: List[Dict[str, Any]], costs: List[Dict[str, Any]], trends: Dict[str, Any], budgets: Dict[str, Any], cache_stats: Dict[str, Any], alerts: List[Dict[str, str]], insights: List[Dict[str, str]], ) -> str: """組裝完整 HTML 報表。所有 caller/model 字串均經 _esc。""" weekday_zh = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'][target_date.weekday()] now_str = datetime.now(_TAIPEI_TZ).strftime('%H:%M:%S') lines: List[str] = [] # Header lines.append(f"📊 LLM Token 日報 {target_date.isoformat()} ({weekday_zh})") lines.append("═══════════════════════════════════════") lines.append(f"⏰ 統計區間:00:00 ~ 23:59 (UTC+8)") lines.append(f"🔄 報表生成:{now_str} | 涵蓋筆數:{summary['total_calls']:,} calls") # Section 1 lines.append("") lines.append("━━━━━ 【1】今日總覽 TL;DR ━━━━━") wow_sign = "+" if summary['wow_pct'] >= 0 else "" lines.append(f"🪙 總 Token: {summary['total_tokens']:,} ({wow_sign}{summary['wow_pct']:.1f}% vs 昨日)") lines.append(f"💰 總成本: US$ {summary['total_cost_usd']:.2f}") lines.append(f"⚡ 平均延遲: {summary['avg_duration_ms']:.0f} ms") lines.append(f"✅ 成功率: {summary['success_rate']:.1f}% ({summary['failed_calls']} 失敗 / {summary['total_calls']})") ollama_check = "✅" if summary['ollama_pct'] >= _OLLAMA_FIRST_TARGET_PCT else "⚠️" lines.append(f"🎯 Ollama 占比:{summary['ollama_pct']:.1f}% {ollama_check}") # Section 2 lines.append("") lines.append("━━━━━ 【2】供應商分布 ━━━━━") for p in by_provider: icon, name = _PROVIDER_DISPLAY[p['provider']] if p['calls'] == 0: continue # 0 筆者跳過避免雜訊 lines.append( f"{icon} {_pad(name, 14)} " f"{_fmt_kb(p['tokens']):>8} ({p['pct']:5.1f}%) " f"{p['calls']:>5} calls " f"${p['cost_usd']:6.2f} " f"{p['avg_duration_ms']:5.0f}ms" ) # Section 3 lines.append("") lines.append(f"━━━━━ 【3】呼叫點 TOP {len(top_callers)} (按 Token) ━━━━━") medals = ['🥇', '🥈', '🥉'] for i, c in enumerate(top_callers): rank = medals[i] if i < 3 else f" {i+1}" flag = "" if c.get('delta_pct') is not None: d = c['delta_pct'] if d >= 40: flag = f" ⚠️ {d:+.0f}%" elif d <= -50: flag = f" 🎉 {d:+.0f}%" lines.append( f"{rank} {_esc(c['caller'])}" f" / {_esc(c['provider'])} / {_esc(c['model'])[:24]}" ) lines.append(f" {_fmt_kb(c['tokens']):>8} | {c['calls']:>5} calls{flag}") # Section 4 lines.append("") lines.append("━━━━━ 【4】成本分析 + 預算對比 ━━━━━") lines.append(_budget_line("📅 本日成本", budgets['daily_spent'], budgets['daily_budget'])) lines.append(_budget_line("📅 本週累計", budgets['weekly_spent'], budgets['weekly_budget'])) lines.append(_budget_line("📅 本月累計", budgets['monthly_spent'], budgets['monthly_budget'])) if costs: lines.append("") lines.append("成本拆解 by Model:") for c in costs[:6]: lines.append(f" {_esc(c['model'])[:32]:<32} ${c['cost_usd']:7.4f} ({c['calls']} calls)") # Cache 命中 lines.append("") lines.append("Prompt Cache 命中:") cc = cache_stats.get('claude', {}) if cc.get('total'): lines.append(f" Claude: {cc['hits']:>4} / {cc['total']:<4} ({cc['pct']:5.1f}%)") else: lines.append(" Claude: N/A") gc = cache_stats.get('gemini', {}) if gc.get('total'): lines.append(f" Gemini: {gc['hits']:>4} / {gc['total']:<4} ({gc['pct']:5.1f}%)") else: lines.append(" Gemini: N/A") # Section 5 lines.append("") lines.append("━━━━━ 【5】趨勢與洞察 (vs 7 日均) ━━━━━") lines.append(_trend_line("總 Tokens", trends['today_total_tokens'], trends['7d_avg_total'])) lines.append(_trend_line("Gemini Tokens", trends['today_gemini_tokens'], trends['7d_avg_gemini'])) lines.append(_trend_line("Ollama Tokens", trends['today_ollama_tokens'], trends['7d_avg_ollama'])) lines.append(_trend_line("Claude Tokens", trends['today_claude_tokens'], trends['7d_avg_claude'])) lines.append(_trend_line("平均延遲(ms)", trends['today_avg_duration'], trends['7d_avg_duration'], unit='')) lines.append("") lines.append(f"📈 7 日累計:{_fmt_kb(trends['7d_total_tokens'])} tokens / US$ {trends['7d_total_cost']:.2f}") # Section 6 lines.append("") lines.append("━━━━━ 【6】告警與建議 ━━━━━") if alerts: for a in alerts: lines.append(f"{a['icon']} [{a['level']}] {_esc(a['title'])}") lines.append(f" 建議:{_esc(a['suggestion'])}") else: lines.append("✅ 無異常告警") if insights: lines.append("") lines.append("🔮 智能建議 (Hermes 規則引擎):") for ins in insights: lines.append(f" {ins['icon']} {_esc(ins['text'])}") # Footer lines.append("") lines.append("═══════════════════════════════════════") lines.append("🤖 Operation Ollama-First v5.0 / token_report v1.0") return "\n".join(lines) def _format_failure_report(target_date: date, error: str) -> str: """DB 查詢失敗時的最簡訊息(仍保留 HTML escape)。""" return ( f"⚠️ LLM Token 日報生成失敗 ({target_date.isoformat()})\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"錯誤:{_esc(error)[:300]}\n" f"請查 logs:docker logs momo-scheduler | grep TokenReport" ) def _persist_to_ai_insights(target_date: date, content: str, send_result: Dict[str, Any]) -> None: """寫一筆 ai_insights,type='daily_token_report',metadata 不含 PII。""" from sqlalchemy import text from database.manager import get_session import json as _json meta = { 'target_date': target_date.isoformat(), 'sent': int(send_result.get('sent', 0)), 'failed': int(send_result.get('failed', 0)), 'chars': int(send_result.get('chars', 0)), # 注意:絕不存 username / first_name / chat_id } session = get_session() try: session.execute(text(""" INSERT INTO ai_insights ( insight_type, period, content, metadata_json, avg_quality, status, decay_exempt, ai_model, created_by, created_at, updated_at ) VALUES ( 'daily_token_report', :period, :content, :meta, 0.9, 'approved', TRUE, 'rule_engine', 'token_report_service', NOW(), NOW() ) """), { 'period': target_date.isoformat(), 'content': content[:8000], # ai_insights.content 為 TEXT,仍設上限保險 'meta': _json.dumps(meta, ensure_ascii=False), }) session.commit() except Exception: session.rollback() raise finally: session.close() # ═══════════════════════════════════════════════════════════════════════════════ # 內部:格式化工具 # ═══════════════════════════════════════════════════════════════════════════════ def _esc(s: Any) -> str: """HTML escape;對齊 telegram_templates._html_escape 行為。""" text = "" if s is None else str(s) return (text.replace("&", "&") .replace("<", "<") .replace(">", ">")) def _pad(s: str, width: int) -> str: """中文寬字元 padding(中文字以 2 寬度計)。""" visible = sum(2 if ord(c) > 127 else 1 for c in s) return s + " " * max(0, width - visible) def _fmt_kb(tokens: int) -> str: """token 數 → 1.2K / 3.4M 顯示。""" n = int(tokens or 0) if n >= 1_000_000: return f"{n/1_000_000:.1f}M" if n >= 1_000: return f"{n/1_000:.0f}K" return f"{n}" def _budget_line(label: str, spent: float, budget: float) -> str: """產出單列預算進度條(10 格條)。""" if budget <= 0: return f"{label}: US$ {spent:6.2f} ({_pad('未設定預算', 10)})" pct = min(100.0, spent / budget * 100.0) filled = int(pct / 10) bar = "▓" * filled + "░" * (10 - filled) return f"{label}: US$ {spent:6.2f} {bar} {pct:3.0f}% / ${budget:.0f} 預算" def _trend_line(label: str, today: float, baseline: float, unit: str = '') -> str: """產出單列趨勢比較。""" today_n = float(today or 0) base_n = float(baseline or 0) if base_n > 0: delta = (today_n - base_n) / base_n * 100.0 sign = "+" if delta >= 0 else "" arrow = "↗" if delta >= 5 else ("↘" if delta <= -5 else "→") else: delta = 0.0 sign = "" arrow = "—" today_str = _fmt_kb(int(today_n)) if 'Tokens' in label else f"{today_n:,.0f}{unit}" base_str = _fmt_kb(int(base_n)) if 'Tokens' in label else f"{base_n:,.0f}{unit}" return f" {_pad(label, 14)} {today_str:>8} vs {base_str:>8} ({sign}{delta:5.1f}%) {arrow}"