#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/token_report_service.py
LLM Token 日報服務 (Operation Ollama-First v5.0 — Phase 1 收尾)
依據:
- migrations/024_create_ai_calls_table.sql (ai_calls schema + CHECK constraints)
- migrations/025_create_mcp_calls_and_budgets.sql (ai_call_budgets 種子資料)
- services/ai_call_logger.py (COST_TABLE / provider 白名單)
- services/telegram_templates.py (HTML escape 與 send 封裝)
- docs/phase0_audit_report_20260503.md (34 LLM 呼叫點清冊)
- docs/phase1_db_design_20260503.md (查詢 latency 預估)
設計紀律 (憲法級):
1. 失敗安全: DB 查詢失敗 → 推「⚠️ 報表生成失敗」訊息,不影響其他排程
2. PII 保護: 報表訊息不含 prompt 原文;ai_insights metadata 只存統計 meta(不存 username)
3. 不污染既有 Telegram 流程: 共用 telegram_templates 既有 send 函數,不另開連線
4. ≤ 4096 字元自動截斷: Telegram 單訊息上限保險絲
公開 API:
- generate_daily_report(target_date) → str (HTML)
- send_daily_report() → dict (sent/failed/errors)
"""
from __future__ import annotations
import logging
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Asia/Taipei (UTC+8) 統一處理(避免容器 tzdata 差異,沿襲 telegram_templates 慣例)
_TAIPEI_TZ = timezone(timedelta(hours=8))
# Telegram 單則訊息字元上限(保留 96 字元給 footer,避免精準卡 4096)
_TELEGRAM_MAX_CHARS = 4000
# Provider 顯示名稱表(與 ai_calls.provider 白名單對齊,order 即報表順序)
_PROVIDER_DISPLAY: Dict[str, Tuple[str, str]] = {
'gcp_ollama': ('🟢', 'GCP Ollama'),
'ollama_secondary': ('🟢', 'Secondary'), # critic-A11 B4 修補:三主機架構一致性
'ollama_111': ('🟠', '111 Ollama'),
'gemini': ('🔴', 'Gemini'),
'claude': ('🟣', 'Claude'),
'nim': ('🟡', 'NIM'),
'openrouter': ('🟤', 'OpenRouter'),
'nim_via_elephant': ('🟫', 'NIM_via_Eleph'),
}
# Ollama 占比門檻(Section 1 「Ollama-First 達標」判斷用,戰役 KPI ≥60%)
_OLLAMA_FIRST_TARGET_PCT = 60.0
# 告警規則參數(Section 6 自動產生用)
_ALERT_RULES = {
'caller_spike_factor': 1.4, # tokens > 7 日均 × 1.4
'gemini_share_threshold': 35.0, # gemini 占比 > 35% 視為 Ollama-First 失守
'error_rate_critical': 5.0, # error_rate > 5% → P1
'budget_warning': 80.0, # spent / budget > 80% → P1
'gcp_hit_warning': 90.0, # gcp_ollama 占比 < 90% (Ollama 內) → P2
'cache_hit_low': 40.0, # claude cache hit < 40% → INFO
'caller_stable_days': 7, # 連續 N 日 Ollama >95% → INFO「可關 fallback」
'ollama_stable_pct': 95.0,
}
# ═══════════════════════════════════════════════════════════════════════════════
# 公開 API
# ═══════════════════════════════════════════════════════════════════════════════
def generate_daily_report(target_date: Optional[date] = None) -> str:
"""產出指定日的 LLM Token 日報(HTML,供 Telegram parse_mode='HTML')。
Args:
target_date: 統計目標日(Asia/Taipei)。未指定 → 「今日」。
Returns:
完整 HTML 報表字串;若 DB 查詢失敗,回傳簡短錯誤訊息(仍可發 Telegram)。
"""
if target_date is None:
target_date = datetime.now(_TAIPEI_TZ).date()
try:
summary = _query_summary(target_date)
by_provider = _query_by_provider(target_date)
top_callers = _query_top_callers(target_date, limit=10)
costs = _query_cost_breakdown(target_date)
trends = _query_trends_vs_7day(target_date)
budgets = _query_budget_usage(target_date)
cache_stats = _query_cache_hit_stats(target_date)
except Exception as exc:
logger.exception("[TokenReport] DB query failed: %s", exc)
return _format_failure_report(target_date, str(exc))
alerts = _detect_alerts(summary, by_provider, top_callers, trends, budgets, cache_stats)
insights = _generate_insights(target_date, summary, by_provider)
return _format_report(
target_date=target_date,
summary=summary,
by_provider=by_provider,
top_callers=top_callers,
costs=costs,
trends=trends,
budgets=budgets,
cache_stats=cache_stats,
alerts=alerts,
insights=insights,
)
def send_daily_report(target_date: Optional[date] = None) -> Dict[str, Any]:
"""產報並送 Telegram + 寫 ai_insights。
Returns:
{'ok': bool, 'sent': int, 'failed': int, 'chars': int, 'errors': list}
"""
if target_date is None:
target_date = datetime.now(_TAIPEI_TZ).date()
try:
report_html = generate_daily_report(target_date)
except Exception as exc:
logger.exception("[TokenReport] generate_daily_report failed: %s", exc)
report_html = _format_failure_report(target_date, str(exc))
# 截斷至 Telegram 安全長度(HTML tag 簡化處理:超出時加省略尾)
if len(report_html) > _TELEGRAM_MAX_CHARS:
truncated = report_html[: _TELEGRAM_MAX_CHARS - 80]
report_html = truncated + "\n\n... (訊息超長,已截斷;詳見 ai_insights)"
# 送 Telegram(用既有封裝,不另起連線)
result: Dict[str, Any] = {'ok': False, 'sent': 0, 'failed': 0, 'chars': len(report_html), 'errors': []}
try:
from services.telegram_templates import send_telegram_with_result
send_result = send_telegram_with_result(report_html, parse_mode='HTML')
result.update({
'ok': bool(send_result.get('ok')),
'sent': int(send_result.get('sent', 0)),
'failed': int(send_result.get('failed', 0)),
'errors': list(send_result.get('errors', [])),
})
except Exception as exc:
logger.exception("[TokenReport] telegram send failed: %s", exc)
result['errors'].append(f"telegram:{type(exc).__name__}")
# 寫 ai_insights(不含 PII / 不存 username)
try:
_persist_to_ai_insights(target_date, report_html, result)
except Exception as exc:
logger.warning("[TokenReport] ai_insights persist failed: %s", exc)
return result
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:SQL 查詢
# ═══════════════════════════════════════════════════════════════════════════════
def _date_window(target_date: date) -> Tuple[datetime, datetime]:
"""回傳 [day_start, day_end) 的 Taipei tz-aware datetime(PostgreSQL 比較用)。"""
day_start = datetime.combine(target_date, datetime.min.time(), tzinfo=_TAIPEI_TZ)
day_end = day_start + timedelta(days=1)
return day_start, day_end
def _exec_query(sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""執行查詢並回傳 list of dict。session 隔離,例外向上拋。"""
from sqlalchemy import text
from database.manager import get_session
session = get_session()
try:
rows = session.execute(text(sql), params).mappings().all()
return [dict(r) for r in rows]
finally:
session.close()
def _query_summary(target_date: date) -> Dict[str, Any]:
"""Section 1 — 今日總覽(單列彙總)。
Returns:
{total_tokens, total_calls, total_cost_usd, avg_duration_ms,
success_rate, ollama_pct, prev_total_tokens (昨日比基準)}
"""
day_start, day_end = _date_window(target_date)
prev_start = day_start - timedelta(days=1)
rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens,
COUNT(*) AS total_calls,
COALESCE(SUM(cost_usd), 0) AS total_cost_usd,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
COALESCE(SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END), 0) AS ok_calls,
COALESCE(SUM(
CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END
), 0) AS ollama_tokens
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': day_start, 'end': day_end})
prev_rows = _exec_query("""
SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS prev_total_tokens
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': prev_start, 'end': day_start})
r = rows[0] if rows else {}
total_calls = int(r.get('total_calls') or 0)
total_tokens = int(r.get('total_tokens') or 0)
ok_calls = int(r.get('ok_calls') or 0)
ollama_tokens = int(r.get('ollama_tokens') or 0)
prev_total = int((prev_rows[0] if prev_rows else {}).get('prev_total_tokens') or 0)
return {
'total_tokens': total_tokens,
'total_calls': total_calls,
'total_cost_usd': float(r.get('total_cost_usd') or 0),
'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
'success_rate': (ok_calls / total_calls * 100.0) if total_calls else 0.0,
'failed_calls': max(0, total_calls - ok_calls),
'ollama_pct': (ollama_tokens / total_tokens * 100.0) if total_tokens else 0.0,
'prev_total_tokens': prev_total,
'wow_pct': ((total_tokens - prev_total) / prev_total * 100.0) if prev_total else 0.0,
}
def _query_by_provider(target_date: date) -> List[Dict[str, Any]]:
"""Section 2 — 供應商分布(依 7 個 provider,含 0 筆者也顯示)。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
SUM(input_tokens + output_tokens)::BIGINT AS tokens,
COUNT(*) AS calls,
COALESCE(SUM(cost_usd), 0) AS cost_usd,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
GROUP BY provider
""", {'start': day_start, 'end': day_end})
by_p = {r['provider']: r for r in rows}
total_tokens = sum(int(r['tokens'] or 0) for r in rows)
result: List[Dict[str, Any]] = []
for p_key in _PROVIDER_DISPLAY:
r = by_p.get(p_key, {})
tokens = int(r.get('tokens') or 0)
result.append({
'provider': p_key,
'tokens': tokens,
'pct': (tokens / total_tokens * 100.0) if total_tokens else 0.0,
'calls': int(r.get('calls') or 0),
'cost_usd': float(r.get('cost_usd') or 0),
'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
})
return result
def _query_top_callers(target_date: date, limit: int = 10) -> List[Dict[str, Any]]:
"""Section 3 — TOP N caller by token + 與 7 日均的偏差。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=7)
rows = _exec_query("""
WITH today AS (
SELECT
caller,
provider,
MODE() WITHIN GROUP (ORDER BY model) AS top_model,
SUM(input_tokens + output_tokens)::BIGINT AS tokens,
COUNT(*) AS calls
FROM ai_calls
WHERE called_at >= :day_start AND called_at < :day_end
GROUP BY caller, provider
),
baseline AS (
SELECT
caller,
SUM(input_tokens + output_tokens) / 7.0 AS avg_tokens_7d
FROM ai_calls
WHERE called_at >= :week_start AND called_at < :day_start
GROUP BY caller
)
SELECT
t.caller, t.provider, t.top_model, t.tokens, t.calls,
COALESCE(b.avg_tokens_7d, 0) AS avg_tokens_7d
FROM today t
LEFT JOIN baseline b ON b.caller = t.caller
ORDER BY t.tokens DESC
LIMIT :limit
""", {
'day_start': day_start,
'day_end': day_end,
'week_start': week_start,
'limit': int(limit),
})
result: List[Dict[str, Any]] = []
for r in rows:
tokens = int(r.get('tokens') or 0)
baseline = float(r.get('avg_tokens_7d') or 0)
delta_pct = ((tokens - baseline) / baseline * 100.0) if baseline > 0 else None
result.append({
'caller': str(r.get('caller') or ''),
'provider': str(r.get('provider') or ''),
'model': str(r.get('top_model') or ''),
'tokens': tokens,
'calls': int(r.get('calls') or 0),
'delta_pct': delta_pct,
})
return result
def _query_cost_breakdown(target_date: date) -> List[Dict[str, Any]]:
"""Section 4 — 依 model 拆解成本(金額由大到小,零成本不顯示)。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
model,
COALESCE(SUM(cost_usd), 0) AS cost_usd,
COUNT(*) AS calls
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
AND cost_usd > 0
GROUP BY provider, model
ORDER BY cost_usd DESC
LIMIT 12
""", {'start': day_start, 'end': day_end})
return [
{
'provider': str(r['provider']),
'model': str(r['model']),
'cost_usd': float(r['cost_usd']),
'calls': int(r['calls']),
}
for r in rows
]
def _query_trends_vs_7day(target_date: date) -> Dict[str, Any]:
"""Section 5 — 今日 vs 過去 7 日均 的趨勢比對。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=7)
today_rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_tokens,
COALESCE(SUM(CASE WHEN provider='gemini'
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS gemini_tokens,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS ollama_tokens,
COALESCE(SUM(CASE WHEN provider='claude'
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS claude_tokens,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
COALESCE(SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END), 0) AS failed,
COUNT(*) AS total_calls,
COALESCE(SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END), 0) AS gcp_calls,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END), 0) AS ollama_calls
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': day_start, 'end': day_end})
base_rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens) / 7.0, 0) AS avg_total_tokens,
COALESCE(SUM(CASE WHEN provider='gemini'
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_gemini_tokens,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_ollama_tokens,
COALESCE(SUM(CASE WHEN provider='claude'
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_claude_tokens,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
CASE WHEN COUNT(*) > 0
THEN SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100.0
ELSE 0 END AS error_rate_pct,
COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_7d_tokens,
COALESCE(SUM(cost_usd), 0) AS total_7d_cost,
CASE WHEN SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END) > 0
THEN SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END)::FLOAT
/ SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END)::FLOAT * 100.0
ELSE 0 END AS gcp_hit_pct_7d
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': week_start, 'end': day_start})
t = today_rows[0] if today_rows else {}
b = base_rows[0] if base_rows else {}
today_total = int(t.get('total_tokens') or 0)
today_gemini = int(t.get('gemini_tokens') or 0)
today_ollama = int(t.get('ollama_tokens') or 0)
today_claude = int(t.get('claude_tokens') or 0)
today_calls = int(t.get('total_calls') or 0)
today_failed = int(t.get('failed') or 0)
today_gcp_calls = int(t.get('gcp_calls') or 0)
today_ollama_cal = int(t.get('ollama_calls') or 0)
today_error_pct = (today_failed / today_calls * 100.0) if today_calls else 0.0
today_gcp_hit = (today_gcp_calls / today_ollama_cal * 100.0) if today_ollama_cal else 0.0
return {
'today_total_tokens': today_total,
'today_gemini_tokens': today_gemini,
'today_ollama_tokens': today_ollama,
'today_claude_tokens': today_claude,
'today_avg_duration': float(t.get('avg_duration_ms') or 0),
'today_error_rate': today_error_pct,
'today_gcp_hit_pct': today_gcp_hit,
'7d_avg_total': float(b.get('avg_total_tokens') or 0),
'7d_avg_gemini': float(b.get('avg_gemini_tokens') or 0),
'7d_avg_ollama': float(b.get('avg_ollama_tokens') or 0),
'7d_avg_claude': float(b.get('avg_claude_tokens') or 0),
'7d_avg_duration': float(b.get('avg_duration_ms') or 0),
'7d_error_rate': float(b.get('error_rate_pct') or 0),
'7d_total_tokens': int(b.get('total_7d_tokens') or 0),
'7d_total_cost': float(b.get('total_7d_cost') or 0),
'7d_gcp_hit_pct': float(b.get('gcp_hit_pct_7d') or 0),
}
def _query_budget_usage(target_date: date) -> Dict[str, Any]:
"""Section 4 — 預算對比(daily/weekly/monthly 全供應商總額)。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=6)
month_start = day_start.replace(day=1)
spent = _exec_query("""
SELECT
COALESCE(SUM(CASE WHEN called_at >= :day_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS daily_spent,
COALESCE(SUM(CASE WHEN called_at >= :week_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS weekly_spent,
COALESCE(SUM(CASE WHEN called_at >= :month_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS monthly_spent,
COUNT(*) FILTER (WHERE called_at >= :month_start) AS month_call_count
FROM ai_calls
WHERE called_at >= :month_start AND called_at < :day_end
""", {
'day_start': day_start,
'day_end': day_end,
'week_start': week_start,
'month_start': month_start,
})
budget_rows = _exec_query("""
SELECT period, provider, budget_usd, alert_pct
FROM ai_call_budgets
WHERE provider IS NULL
""", {})
budgets = {r['period']: float(r['budget_usd']) for r in budget_rows}
s = spent[0] if spent else {}
return {
'daily_spent': float(s.get('daily_spent') or 0),
'weekly_spent': float(s.get('weekly_spent') or 0),
'monthly_spent': float(s.get('monthly_spent') or 0),
'daily_budget': budgets.get('daily', 0.0),
'weekly_budget': budgets.get('weekly', 0.0),
'monthly_budget': budgets.get('monthly', 0.0),
}
def _query_cache_hit_stats(target_date: date) -> Dict[str, Any]:
"""Section 4 — Anthropic / Gemini prompt cache 命中統計。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
COUNT(*) AS total_calls,
SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) AS cache_hits
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
AND provider IN ('claude','gemini')
GROUP BY provider
""", {'start': day_start, 'end': day_end})
by_p = {r['provider']: r for r in rows}
out: Dict[str, Any] = {}
for p in ('claude', 'gemini'):
r = by_p.get(p, {})
total = int(r.get('total_calls') or 0)
hits = int(r.get('cache_hits') or 0)
out[p] = {
'total': total,
'hits': hits,
'pct': (hits / total * 100.0) if total else 0.0,
}
return out
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:告警偵測(Section 6)
# ═══════════════════════════════════════════════════════════════════════════════
def _detect_alerts(
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
top_callers: List[Dict[str, Any]],
trends: Dict[str, Any],
budgets: Dict[str, Any],
cache_stats: Dict[str, Any],
) -> List[Dict[str, str]]:
"""依 7 條規則產生告警清單,回傳 [{level, icon, title, suggestion}, ...]"""
alerts: List[Dict[str, str]] = []
# R1: 單一 caller 暴增 (P2)
spike_factor = _ALERT_RULES['caller_spike_factor']
for caller in top_callers:
delta = caller.get('delta_pct')
if delta is not None and delta >= (spike_factor - 1) * 100.0:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"{caller['caller']} token 暴增 {delta:+.0f}%(vs 7 日均)",
'suggestion': f"今日 {caller['tokens']:,} tokens / {caller['calls']} calls,建議查 prompt 是否變更",
})
# R2: Gemini 占比飆升 (P2 「Ollama-First 失守」)
gemini = next((r for r in by_provider if r['provider'] == 'gemini'), {})
gemini_pct = float(gemini.get('pct') or 0)
if gemini_pct > _ALERT_RULES['gemini_share_threshold']:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"Gemini 占比 {gemini_pct:.1f}% 高於門檻 {_ALERT_RULES['gemini_share_threshold']:.0f}%",
'suggestion': "Ollama-First 失守,請檢查 fallback 是否正確命中本地",
})
# R3: 失敗率 (P1)
total_calls = int(summary.get('total_calls') or 0)
failed = int(summary.get('failed_calls') or 0)
if total_calls:
err_rate = failed / total_calls * 100.0
if err_rate > _ALERT_RULES['error_rate_critical']:
alerts.append({
'level': 'P1', 'icon': '🔴',
'title': f"全域失敗率 {err_rate:.1f}% 超過門檻 {_ALERT_RULES['error_rate_critical']:.0f}%",
'suggestion': f"今日 {failed:,} / {total_calls:,} 失敗,立即查 ai_calls WHERE status<>'ok'",
})
# R4: 預算超標 (P1)
for period_key, label in (('daily', '日'), ('weekly', '週'), ('monthly', '月')):
spent = float(budgets.get(f'{period_key}_spent') or 0)
budget = float(budgets.get(f'{period_key}_budget') or 0)
if budget > 0:
usage_pct = spent / budget * 100.0
if usage_pct > _ALERT_RULES['budget_warning']:
alerts.append({
'level': 'P1', 'icon': '🔴',
'title': f"{label}成本 ${spent:.2f} 達預算 ${budget:.2f} 的 {usage_pct:.0f}%",
'suggestion': "請檢查供應商分布是否異常(Section 2/3)或調整預算",
})
# R5: GCP 命中率低 (P2) — 僅當有 Ollama 流量時才檢查
today_gcp_hit = float(trends.get('today_gcp_hit_pct') or 0)
ollama = sum(int(r.get('tokens') or 0) for r in by_provider
if r['provider'] in ('gcp_ollama', 'ollama_secondary', 'ollama_111'))
if ollama > 0 and today_gcp_hit < _ALERT_RULES['gcp_hit_warning']:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"GCP Ollama 命中率 {today_gcp_hit:.1f}% 低於 {_ALERT_RULES['gcp_hit_warning']:.0f}%",
'suggestion': "111 fallback 觸發頻繁,請檢查 GCP Ollama 健康(ADR-027)",
})
# R6: Cache 命中率低 (INFO) — claude
claude_cache = cache_stats.get('claude', {})
if int(claude_cache.get('total') or 0) >= 10:
if float(claude_cache.get('pct') or 0) < _ALERT_RULES['cache_hit_low']:
alerts.append({
'level': 'INFO', 'icon': '🟢',
'title': f"Claude prompt cache 命中率僅 {claude_cache['pct']:.1f}%",
'suggestion': "可優化 system prompt 結構(≥1024 tokens 才觸發 cache)",
})
return alerts
def _generate_insights(
target_date: date,
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
) -> List[Dict[str, str]]:
"""Section 6 智能建議(規則引擎,零 LLM 成本)。"""
insights: List[Dict[str, str]] = []
ollama_pct = float(summary.get('ollama_pct') or 0)
if ollama_pct >= _OLLAMA_FIRST_TARGET_PCT:
insights.append({
'icon': '✅',
'text': f"Ollama 占比 {ollama_pct:.1f}%(目標 ≥{_OLLAMA_FIRST_TARGET_PCT:.0f}%),Ollama-First 戰役達標",
})
else:
insights.append({
'icon': '⚠️',
'text': f"Ollama 占比 {ollama_pct:.1f}% 未達 {_OLLAMA_FIRST_TARGET_PCT:.0f}% 目標,可優化 fallback 鏈",
})
nim_total = sum(
int(r.get('tokens') or 0) for r in by_provider
if r['provider'] in ('nim', 'nim_via_elephant')
)
if 0 < nim_total < 100_000:
insights.append({
'icon': '✅',
'text': f"NIM 用量已降至 {nim_total:,} tokens(戰役前約 5M),可考慮關閉 NIM 依賴",
})
success_rate = float(summary.get('success_rate') or 0)
if summary.get('total_calls') and success_rate >= 99.0:
insights.append({
'icon': '✅',
'text': f"成功率 {success_rate:.1f}%,鏈路健康度高",
})
return insights
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:報表組裝
# ═══════════════════════════════════════════════════════════════════════════════
def _format_report(
target_date: date,
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
top_callers: List[Dict[str, Any]],
costs: List[Dict[str, Any]],
trends: Dict[str, Any],
budgets: Dict[str, Any],
cache_stats: Dict[str, Any],
alerts: List[Dict[str, str]],
insights: List[Dict[str, str]],
) -> str:
"""組裝完整 HTML 報表。所有 caller/model 字串均經 _esc。"""
weekday_zh = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'][target_date.weekday()]
now_str = datetime.now(_TAIPEI_TZ).strftime('%H:%M:%S')
lines: List[str] = []
# Header
lines.append(f"📊 LLM Token 日報 {target_date.isoformat()} ({weekday_zh})")
lines.append("═══════════════════════════════════════")
lines.append(f"⏰ 統計區間:00:00 ~ 23:59 (UTC+8)")
lines.append(f"🔄 報表生成:{now_str} | 涵蓋筆數:{summary['total_calls']:,} calls")
# Section 1
lines.append("")
lines.append("━━━━━ 【1】今日總覽 TL;DR ━━━━━")
wow_sign = "+" if summary['wow_pct'] >= 0 else ""
lines.append(f"🪙 總 Token: {summary['total_tokens']:,} ({wow_sign}{summary['wow_pct']:.1f}% vs 昨日)")
lines.append(f"💰 總成本: US$ {summary['total_cost_usd']:.2f}")
lines.append(f"⚡ 平均延遲: {summary['avg_duration_ms']:.0f} ms")
lines.append(f"✅ 成功率: {summary['success_rate']:.1f}% ({summary['failed_calls']} 失敗 / {summary['total_calls']})")
ollama_check = "✅" if summary['ollama_pct'] >= _OLLAMA_FIRST_TARGET_PCT else "⚠️"
lines.append(f"🎯 Ollama 占比:{summary['ollama_pct']:.1f}% {ollama_check}")
# Section 2
lines.append("")
lines.append("━━━━━ 【2】供應商分布 ━━━━━")
for p in by_provider:
icon, name = _PROVIDER_DISPLAY[p['provider']]
if p['calls'] == 0:
continue # 0 筆者跳過避免雜訊
lines.append(
f"{icon} {_pad(name, 14)} "
f"{_fmt_kb(p['tokens']):>8} ({p['pct']:5.1f}%) "
f"{p['calls']:>5} calls "
f"${p['cost_usd']:6.2f} "
f"{p['avg_duration_ms']:5.0f}ms"
)
# Section 3
lines.append("")
lines.append(f"━━━━━ 【3】呼叫點 TOP {len(top_callers)} (按 Token) ━━━━━")
medals = ['🥇', '🥈', '🥉']
for i, c in enumerate(top_callers):
rank = medals[i] if i < 3 else f" {i+1}"
flag = ""
if c.get('delta_pct') is not None:
d = c['delta_pct']
if d >= 40: flag = f" ⚠️ {d:+.0f}%"
elif d <= -50: flag = f" 🎉 {d:+.0f}%"
lines.append(
f"{rank} {_esc(c['caller'])}"
f" / {_esc(c['provider'])} / {_esc(c['model'])[:24]}"
)
lines.append(f" {_fmt_kb(c['tokens']):>8} | {c['calls']:>5} calls{flag}")
# Section 4
lines.append("")
lines.append("━━━━━ 【4】成本分析 + 預算對比 ━━━━━")
lines.append(_budget_line("📅 本日成本", budgets['daily_spent'], budgets['daily_budget']))
lines.append(_budget_line("📅 本週累計", budgets['weekly_spent'], budgets['weekly_budget']))
lines.append(_budget_line("📅 本月累計", budgets['monthly_spent'], budgets['monthly_budget']))
if costs:
lines.append("")
lines.append("成本拆解 by Model:")
for c in costs[:6]:
lines.append(f" {_esc(c['model'])[:32]:<32} ${c['cost_usd']:7.4f} ({c['calls']} calls)")
# Cache 命中
lines.append("")
lines.append("Prompt Cache 命中:")
cc = cache_stats.get('claude', {})
if cc.get('total'):
lines.append(f" Claude: {cc['hits']:>4} / {cc['total']:<4} ({cc['pct']:5.1f}%)")
else:
lines.append(" Claude: N/A")
gc = cache_stats.get('gemini', {})
if gc.get('total'):
lines.append(f" Gemini: {gc['hits']:>4} / {gc['total']:<4} ({gc['pct']:5.1f}%)")
else:
lines.append(" Gemini: N/A")
# Section 5
lines.append("")
lines.append("━━━━━ 【5】趨勢與洞察 (vs 7 日均) ━━━━━")
lines.append(_trend_line("總 Tokens", trends['today_total_tokens'], trends['7d_avg_total']))
lines.append(_trend_line("Gemini Tokens", trends['today_gemini_tokens'], trends['7d_avg_gemini']))
lines.append(_trend_line("Ollama Tokens", trends['today_ollama_tokens'], trends['7d_avg_ollama']))
lines.append(_trend_line("Claude Tokens", trends['today_claude_tokens'], trends['7d_avg_claude']))
lines.append(_trend_line("平均延遲(ms)", trends['today_avg_duration'], trends['7d_avg_duration'], unit=''))
lines.append("")
lines.append(f"📈 7 日累計:{_fmt_kb(trends['7d_total_tokens'])} tokens / US$ {trends['7d_total_cost']:.2f}")
# Section 6
lines.append("")
lines.append("━━━━━ 【6】告警與建議 ━━━━━")
if alerts:
for a in alerts:
lines.append(f"{a['icon']} [{a['level']}] {_esc(a['title'])}")
lines.append(f" 建議:{_esc(a['suggestion'])}")
else:
lines.append("✅ 無異常告警")
if insights:
lines.append("")
lines.append("🔮 智能建議 (Hermes 規則引擎):")
for ins in insights:
lines.append(f" {ins['icon']} {_esc(ins['text'])}")
# Footer
lines.append("")
lines.append("═══════════════════════════════════════")
lines.append("🤖 Operation Ollama-First v5.0 / token_report v1.0")
return "\n".join(lines)
def _format_failure_report(target_date: date, error: str) -> str:
"""DB 查詢失敗時的最簡訊息(仍保留 HTML escape)。"""
return (
f"⚠️ LLM Token 日報生成失敗 ({target_date.isoformat()})\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"錯誤:{_esc(error)[:300]}\n"
f"請查 logs:docker logs momo-scheduler | grep TokenReport"
)
def _persist_to_ai_insights(target_date: date, content: str, send_result: Dict[str, Any]) -> None:
"""寫一筆 ai_insights,type='daily_token_report',metadata 不含 PII。"""
from sqlalchemy import text
from database.manager import get_session
import json as _json
meta = {
'target_date': target_date.isoformat(),
'sent': int(send_result.get('sent', 0)),
'failed': int(send_result.get('failed', 0)),
'chars': int(send_result.get('chars', 0)),
# 注意:絕不存 username / first_name / chat_id
}
session = get_session()
try:
session.execute(text("""
INSERT INTO ai_insights (
insight_type, period, content, metadata_json,
avg_quality, status, decay_exempt, ai_model,
created_by, created_at, updated_at
) VALUES (
'daily_token_report', :period, :content, :meta,
0.9, 'approved', TRUE, 'rule_engine',
'token_report_service', NOW(), NOW()
)
"""), {
'period': target_date.isoformat(),
'content': content[:8000], # ai_insights.content 為 TEXT,仍設上限保險
'meta': _json.dumps(meta, ensure_ascii=False),
})
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:格式化工具
# ═══════════════════════════════════════════════════════════════════════════════
def _esc(s: Any) -> str:
"""HTML escape;對齊 telegram_templates._html_escape 行為。"""
text = "" if s is None else str(s)
return (text.replace("&", "&")
.replace("<", "<")
.replace(">", ">"))
def _pad(s: str, width: int) -> str:
"""中文寬字元 padding(中文字以 2 寬度計)。"""
visible = sum(2 if ord(c) > 127 else 1 for c in s)
return s + " " * max(0, width - visible)
def _fmt_kb(tokens: int) -> str:
"""token 數 → 1.2K / 3.4M 顯示。"""
n = int(tokens or 0)
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.0f}K"
return f"{n}"
def _budget_line(label: str, spent: float, budget: float) -> str:
"""產出單列預算進度條(10 格條)。"""
if budget <= 0:
return f"{label}: US$ {spent:6.2f} ({_pad('未設定預算', 10)})"
pct = min(100.0, spent / budget * 100.0)
filled = int(pct / 10)
bar = "▓" * filled + "░" * (10 - filled)
return f"{label}: US$ {spent:6.2f} {bar} {pct:3.0f}% / ${budget:.0f} 預算"
def _trend_line(label: str, today: float, baseline: float, unit: str = '') -> str:
"""產出單列趨勢比較。"""
today_n = float(today or 0)
base_n = float(baseline or 0)
if base_n > 0:
delta = (today_n - base_n) / base_n * 100.0
sign = "+" if delta >= 0 else ""
arrow = "↗" if delta >= 5 else ("↘" if delta <= -5 else "→")
else:
delta = 0.0
sign = ""
arrow = "—"
today_str = _fmt_kb(int(today_n)) if 'Tokens' in label else f"{today_n:,.0f}{unit}"
base_str = _fmt_kb(int(base_n)) if 'Tokens' in label else f"{base_n:,.0f}{unit}"
return f" {_pad(label, 14)} {today_str:>8} vs {base_str:>8} ({sign}{delta:5.1f}%) {arrow}"