Files
ewoooc/services/token_report_service.py
OoO bb891f1a6e feat(observability): ai_call_logger + 23:55 Telegram token 日報
services/ai_call_logger.py(300 行)— 統一 LLM 遙測層
- context manager log_ai_call() / decorator logged_ai_call()
- async fire-and-forget 寫 ai_calls,DB 失敗永不影響主流程
- kill-switch:連續 10 次失敗自動降級為 logger.info
- env AI_CALL_LOGGING_ENABLED=false 一鍵關閉
- COST_TABLE 集中 13 個模型計費(gemini/claude/nim/ollama)
- PII 保護:meta 只存 prompt_hash[:12],不存原文
- 22 unit tests 全綠

services/token_report_service.py(580 行)— 6 段落每日 23:55 日報
- Section 1-6: 總覽 / 供應商分布 / TOP10 caller / 成本預算 / 趨勢 / 告警建議
- 7 條告警規則 + Hermes 規則引擎智能建議
- HTML escape + 4096 字元雙保險
- Telegram 失敗 fallback 訊息
- ai_insights 寫入 PII safe(無 chat_id/username 落地)
- 30 unit tests 全綠

A11 critic 護欄:H6 chat_id PII fix(services/openclaw_bot_routes 4 處 → SHA1[:8])

Operation Ollama-First v5.0 / Phase 1 A4+A5

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 23:04:58 +08:00

868 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/token_report_service.py
LLM Token 日報服務 (Operation Ollama-First v5.0 — Phase 1 收尾)
依據:
- migrations/024_create_ai_calls_table.sql (ai_calls schema + CHECK constraints)
- migrations/025_create_mcp_calls_and_budgets.sql (ai_call_budgets 種子資料)
- services/ai_call_logger.py (COST_TABLE / provider 白名單)
- services/telegram_templates.py (HTML escape 與 send 封裝)
- docs/phase0_audit_report_20260503.md (34 LLM 呼叫點清冊)
- docs/phase1_db_design_20260503.md (查詢 latency 預估)
設計紀律 (憲法級):
1. 失敗安全: DB 查詢失敗 → 推「⚠️ 報表生成失敗」訊息,不影響其他排程
2. PII 保護: 報表訊息不含 prompt 原文ai_insights metadata 只存統計 meta不存 username
3. 不污染既有 Telegram 流程: 共用 telegram_templates 既有 send 函數,不另開連線
4. ≤ 4096 字元自動截斷: Telegram 單訊息上限保險絲
公開 API:
- generate_daily_report(target_date) → str (HTML)
- send_daily_report() → dict (sent/failed/errors)
"""
from __future__ import annotations
import logging
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Asia/Taipei (UTC+8) 統一處理(避免容器 tzdata 差異,沿襲 telegram_templates 慣例)
_TAIPEI_TZ = timezone(timedelta(hours=8))
# Telegram 單則訊息字元上限(保留 96 字元給 footer避免精準卡 4096
_TELEGRAM_MAX_CHARS = 4000
# Provider 顯示名稱表(與 ai_calls.provider 白名單對齊order 即報表順序)
_PROVIDER_DISPLAY: Dict[str, Tuple[str, str]] = {
'gcp_ollama': ('🟢', 'GCP Ollama'),
'ollama_secondary': ('🟢', 'Secondary'), # critic-A11 B4 修補:三主機架構一致性
'ollama_111': ('🟠', '111 Ollama'),
'gemini': ('🔴', 'Gemini'),
'claude': ('🟣', 'Claude'),
'nim': ('🟡', 'NIM'),
'openrouter': ('🟤', 'OpenRouter'),
'nim_via_elephant': ('🟫', 'NIM_via_Eleph'),
}
# Ollama 占比門檻Section 1 「Ollama-First 達標」判斷用,戰役 KPI ≥60%
_OLLAMA_FIRST_TARGET_PCT = 60.0
# 告警規則參數Section 6 自動產生用)
_ALERT_RULES = {
'caller_spike_factor': 1.4, # tokens > 7 日均 × 1.4
'gemini_share_threshold': 35.0, # gemini 占比 > 35% 視為 Ollama-First 失守
'error_rate_critical': 5.0, # error_rate > 5% → P1
'budget_warning': 80.0, # spent / budget > 80% → P1
'gcp_hit_warning': 90.0, # gcp_ollama 占比 < 90% (Ollama 內) → P2
'cache_hit_low': 40.0, # claude cache hit < 40% → INFO
'caller_stable_days': 7, # 連續 N 日 Ollama >95% → INFO「可關 fallback」
'ollama_stable_pct': 95.0,
}
# ═══════════════════════════════════════════════════════════════════════════════
# 公開 API
# ═══════════════════════════════════════════════════════════════════════════════
def generate_daily_report(target_date: Optional[date] = None) -> str:
"""產出指定日的 LLM Token 日報HTML供 Telegram parse_mode='HTML')。
Args:
target_date: 統計目標日Asia/Taipei。未指定 → 「今日」。
Returns:
完整 HTML 報表字串;若 DB 查詢失敗,回傳簡短錯誤訊息(仍可發 Telegram
"""
if target_date is None:
target_date = datetime.now(_TAIPEI_TZ).date()
try:
summary = _query_summary(target_date)
by_provider = _query_by_provider(target_date)
top_callers = _query_top_callers(target_date, limit=10)
costs = _query_cost_breakdown(target_date)
trends = _query_trends_vs_7day(target_date)
budgets = _query_budget_usage(target_date)
cache_stats = _query_cache_hit_stats(target_date)
except Exception as exc:
logger.exception("[TokenReport] DB query failed: %s", exc)
return _format_failure_report(target_date, str(exc))
alerts = _detect_alerts(summary, by_provider, top_callers, trends, budgets, cache_stats)
insights = _generate_insights(target_date, summary, by_provider)
return _format_report(
target_date=target_date,
summary=summary,
by_provider=by_provider,
top_callers=top_callers,
costs=costs,
trends=trends,
budgets=budgets,
cache_stats=cache_stats,
alerts=alerts,
insights=insights,
)
def send_daily_report(target_date: Optional[date] = None) -> Dict[str, Any]:
"""產報並送 Telegram + 寫 ai_insights。
Returns:
{'ok': bool, 'sent': int, 'failed': int, 'chars': int, 'errors': list}
"""
if target_date is None:
target_date = datetime.now(_TAIPEI_TZ).date()
try:
report_html = generate_daily_report(target_date)
except Exception as exc:
logger.exception("[TokenReport] generate_daily_report failed: %s", exc)
report_html = _format_failure_report(target_date, str(exc))
# 截斷至 Telegram 安全長度HTML tag 簡化處理:超出時加省略尾)
if len(report_html) > _TELEGRAM_MAX_CHARS:
truncated = report_html[: _TELEGRAM_MAX_CHARS - 80]
report_html = truncated + "\n\n... <i>(訊息超長,已截斷;詳見 ai_insights)</i>"
# 送 Telegram用既有封裝不另起連線
result: Dict[str, Any] = {'ok': False, 'sent': 0, 'failed': 0, 'chars': len(report_html), 'errors': []}
try:
from services.telegram_templates import send_telegram_with_result
send_result = send_telegram_with_result(report_html, parse_mode='HTML')
result.update({
'ok': bool(send_result.get('ok')),
'sent': int(send_result.get('sent', 0)),
'failed': int(send_result.get('failed', 0)),
'errors': list(send_result.get('errors', [])),
})
except Exception as exc:
logger.exception("[TokenReport] telegram send failed: %s", exc)
result['errors'].append(f"telegram:{type(exc).__name__}")
# 寫 ai_insights不含 PII / 不存 username
try:
_persist_to_ai_insights(target_date, report_html, result)
except Exception as exc:
logger.warning("[TokenReport] ai_insights persist failed: %s", exc)
return result
# ═══════════════════════════════════════════════════════════════════════════════
# 內部SQL 查詢
# ═══════════════════════════════════════════════════════════════════════════════
def _date_window(target_date: date) -> Tuple[datetime, datetime]:
"""回傳 [day_start, day_end) 的 Taipei tz-aware datetimePostgreSQL 比較用)。"""
day_start = datetime.combine(target_date, datetime.min.time(), tzinfo=_TAIPEI_TZ)
day_end = day_start + timedelta(days=1)
return day_start, day_end
def _exec_query(sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""執行查詢並回傳 list of dict。session 隔離,例外向上拋。"""
from sqlalchemy import text
from database.manager import get_session
session = get_session()
try:
rows = session.execute(text(sql), params).mappings().all()
return [dict(r) for r in rows]
finally:
session.close()
def _query_summary(target_date: date) -> Dict[str, Any]:
"""Section 1 — 今日總覽(單列彙總)。
Returns:
{total_tokens, total_calls, total_cost_usd, avg_duration_ms,
success_rate, ollama_pct, prev_total_tokens (昨日比基準)}
"""
day_start, day_end = _date_window(target_date)
prev_start = day_start - timedelta(days=1)
rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens,
COUNT(*) AS total_calls,
COALESCE(SUM(cost_usd), 0) AS total_cost_usd,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
COALESCE(SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END), 0) AS ok_calls,
COALESCE(SUM(
CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END
), 0) AS ollama_tokens
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': day_start, 'end': day_end})
prev_rows = _exec_query("""
SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS prev_total_tokens
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': prev_start, 'end': day_start})
r = rows[0] if rows else {}
total_calls = int(r.get('total_calls') or 0)
total_tokens = int(r.get('total_tokens') or 0)
ok_calls = int(r.get('ok_calls') or 0)
ollama_tokens = int(r.get('ollama_tokens') or 0)
prev_total = int((prev_rows[0] if prev_rows else {}).get('prev_total_tokens') or 0)
return {
'total_tokens': total_tokens,
'total_calls': total_calls,
'total_cost_usd': float(r.get('total_cost_usd') or 0),
'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
'success_rate': (ok_calls / total_calls * 100.0) if total_calls else 0.0,
'failed_calls': max(0, total_calls - ok_calls),
'ollama_pct': (ollama_tokens / total_tokens * 100.0) if total_tokens else 0.0,
'prev_total_tokens': prev_total,
'wow_pct': ((total_tokens - prev_total) / prev_total * 100.0) if prev_total else 0.0,
}
def _query_by_provider(target_date: date) -> List[Dict[str, Any]]:
"""Section 2 — 供應商分布(依 7 個 provider含 0 筆者也顯示)。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
SUM(input_tokens + output_tokens)::BIGINT AS tokens,
COUNT(*) AS calls,
COALESCE(SUM(cost_usd), 0) AS cost_usd,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
GROUP BY provider
""", {'start': day_start, 'end': day_end})
by_p = {r['provider']: r for r in rows}
total_tokens = sum(int(r['tokens'] or 0) for r in rows)
result: List[Dict[str, Any]] = []
for p_key in _PROVIDER_DISPLAY:
r = by_p.get(p_key, {})
tokens = int(r.get('tokens') or 0)
result.append({
'provider': p_key,
'tokens': tokens,
'pct': (tokens / total_tokens * 100.0) if total_tokens else 0.0,
'calls': int(r.get('calls') or 0),
'cost_usd': float(r.get('cost_usd') or 0),
'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
})
return result
def _query_top_callers(target_date: date, limit: int = 10) -> List[Dict[str, Any]]:
"""Section 3 — TOP N caller by token + 與 7 日均的偏差。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=7)
rows = _exec_query("""
WITH today AS (
SELECT
caller,
provider,
MODE() WITHIN GROUP (ORDER BY model) AS top_model,
SUM(input_tokens + output_tokens)::BIGINT AS tokens,
COUNT(*) AS calls
FROM ai_calls
WHERE called_at >= :day_start AND called_at < :day_end
GROUP BY caller, provider
),
baseline AS (
SELECT
caller,
SUM(input_tokens + output_tokens) / 7.0 AS avg_tokens_7d
FROM ai_calls
WHERE called_at >= :week_start AND called_at < :day_start
GROUP BY caller
)
SELECT
t.caller, t.provider, t.top_model, t.tokens, t.calls,
COALESCE(b.avg_tokens_7d, 0) AS avg_tokens_7d
FROM today t
LEFT JOIN baseline b ON b.caller = t.caller
ORDER BY t.tokens DESC
LIMIT :limit
""", {
'day_start': day_start,
'day_end': day_end,
'week_start': week_start,
'limit': int(limit),
})
result: List[Dict[str, Any]] = []
for r in rows:
tokens = int(r.get('tokens') or 0)
baseline = float(r.get('avg_tokens_7d') or 0)
delta_pct = ((tokens - baseline) / baseline * 100.0) if baseline > 0 else None
result.append({
'caller': str(r.get('caller') or ''),
'provider': str(r.get('provider') or ''),
'model': str(r.get('top_model') or ''),
'tokens': tokens,
'calls': int(r.get('calls') or 0),
'delta_pct': delta_pct,
})
return result
def _query_cost_breakdown(target_date: date) -> List[Dict[str, Any]]:
"""Section 4 — 依 model 拆解成本(金額由大到小,零成本不顯示)。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
model,
COALESCE(SUM(cost_usd), 0) AS cost_usd,
COUNT(*) AS calls
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
AND cost_usd > 0
GROUP BY provider, model
ORDER BY cost_usd DESC
LIMIT 12
""", {'start': day_start, 'end': day_end})
return [
{
'provider': str(r['provider']),
'model': str(r['model']),
'cost_usd': float(r['cost_usd']),
'calls': int(r['calls']),
}
for r in rows
]
def _query_trends_vs_7day(target_date: date) -> Dict[str, Any]:
"""Section 5 — 今日 vs 過去 7 日均 的趨勢比對。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=7)
today_rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_tokens,
COALESCE(SUM(CASE WHEN provider='gemini'
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS gemini_tokens,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS ollama_tokens,
COALESCE(SUM(CASE WHEN provider='claude'
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS claude_tokens,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
COALESCE(SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END), 0) AS failed,
COUNT(*) AS total_calls,
COALESCE(SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END), 0) AS gcp_calls,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END), 0) AS ollama_calls
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': day_start, 'end': day_end})
base_rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens) / 7.0, 0) AS avg_total_tokens,
COALESCE(SUM(CASE WHEN provider='gemini'
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_gemini_tokens,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_ollama_tokens,
COALESCE(SUM(CASE WHEN provider='claude'
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_claude_tokens,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
CASE WHEN COUNT(*) > 0
THEN SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100.0
ELSE 0 END AS error_rate_pct,
COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_7d_tokens,
COALESCE(SUM(cost_usd), 0) AS total_7d_cost,
CASE WHEN SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END) > 0
THEN SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END)::FLOAT
/ SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END)::FLOAT * 100.0
ELSE 0 END AS gcp_hit_pct_7d
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': week_start, 'end': day_start})
t = today_rows[0] if today_rows else {}
b = base_rows[0] if base_rows else {}
today_total = int(t.get('total_tokens') or 0)
today_gemini = int(t.get('gemini_tokens') or 0)
today_ollama = int(t.get('ollama_tokens') or 0)
today_claude = int(t.get('claude_tokens') or 0)
today_calls = int(t.get('total_calls') or 0)
today_failed = int(t.get('failed') or 0)
today_gcp_calls = int(t.get('gcp_calls') or 0)
today_ollama_cal = int(t.get('ollama_calls') or 0)
today_error_pct = (today_failed / today_calls * 100.0) if today_calls else 0.0
today_gcp_hit = (today_gcp_calls / today_ollama_cal * 100.0) if today_ollama_cal else 0.0
return {
'today_total_tokens': today_total,
'today_gemini_tokens': today_gemini,
'today_ollama_tokens': today_ollama,
'today_claude_tokens': today_claude,
'today_avg_duration': float(t.get('avg_duration_ms') or 0),
'today_error_rate': today_error_pct,
'today_gcp_hit_pct': today_gcp_hit,
'7d_avg_total': float(b.get('avg_total_tokens') or 0),
'7d_avg_gemini': float(b.get('avg_gemini_tokens') or 0),
'7d_avg_ollama': float(b.get('avg_ollama_tokens') or 0),
'7d_avg_claude': float(b.get('avg_claude_tokens') or 0),
'7d_avg_duration': float(b.get('avg_duration_ms') or 0),
'7d_error_rate': float(b.get('error_rate_pct') or 0),
'7d_total_tokens': int(b.get('total_7d_tokens') or 0),
'7d_total_cost': float(b.get('total_7d_cost') or 0),
'7d_gcp_hit_pct': float(b.get('gcp_hit_pct_7d') or 0),
}
def _query_budget_usage(target_date: date) -> Dict[str, Any]:
"""Section 4 — 預算對比daily/weekly/monthly 全供應商總額)。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=6)
month_start = day_start.replace(day=1)
spent = _exec_query("""
SELECT
COALESCE(SUM(CASE WHEN called_at >= :day_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS daily_spent,
COALESCE(SUM(CASE WHEN called_at >= :week_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS weekly_spent,
COALESCE(SUM(CASE WHEN called_at >= :month_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS monthly_spent,
COUNT(*) FILTER (WHERE called_at >= :month_start) AS month_call_count
FROM ai_calls
WHERE called_at >= :month_start AND called_at < :day_end
""", {
'day_start': day_start,
'day_end': day_end,
'week_start': week_start,
'month_start': month_start,
})
budget_rows = _exec_query("""
SELECT period, provider, budget_usd, alert_pct
FROM ai_call_budgets
WHERE provider IS NULL
""", {})
budgets = {r['period']: float(r['budget_usd']) for r in budget_rows}
s = spent[0] if spent else {}
return {
'daily_spent': float(s.get('daily_spent') or 0),
'weekly_spent': float(s.get('weekly_spent') or 0),
'monthly_spent': float(s.get('monthly_spent') or 0),
'daily_budget': budgets.get('daily', 0.0),
'weekly_budget': budgets.get('weekly', 0.0),
'monthly_budget': budgets.get('monthly', 0.0),
}
def _query_cache_hit_stats(target_date: date) -> Dict[str, Any]:
"""Section 4 — Anthropic / Gemini prompt cache 命中統計。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
COUNT(*) AS total_calls,
SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) AS cache_hits
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
AND provider IN ('claude','gemini')
GROUP BY provider
""", {'start': day_start, 'end': day_end})
by_p = {r['provider']: r for r in rows}
out: Dict[str, Any] = {}
for p in ('claude', 'gemini'):
r = by_p.get(p, {})
total = int(r.get('total_calls') or 0)
hits = int(r.get('cache_hits') or 0)
out[p] = {
'total': total,
'hits': hits,
'pct': (hits / total * 100.0) if total else 0.0,
}
return out
# ═══════════════════════════════════════════════════════════════════════════════
# 內部告警偵測Section 6
# ═══════════════════════════════════════════════════════════════════════════════
def _detect_alerts(
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
top_callers: List[Dict[str, Any]],
trends: Dict[str, Any],
budgets: Dict[str, Any],
cache_stats: Dict[str, Any],
) -> List[Dict[str, str]]:
"""依 7 條規則產生告警清單,回傳 [{level, icon, title, suggestion}, ...]"""
alerts: List[Dict[str, str]] = []
# R1: 單一 caller 暴增 (P2)
spike_factor = _ALERT_RULES['caller_spike_factor']
for caller in top_callers:
delta = caller.get('delta_pct')
if delta is not None and delta >= (spike_factor - 1) * 100.0:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"{caller['caller']} token 暴增 {delta:+.0f}%vs 7 日均)",
'suggestion': f"今日 {caller['tokens']:,} tokens / {caller['calls']} calls建議查 prompt 是否變更",
})
# R2: Gemini 占比飆升 (P2 「Ollama-First 失守」)
gemini = next((r for r in by_provider if r['provider'] == 'gemini'), {})
gemini_pct = float(gemini.get('pct') or 0)
if gemini_pct > _ALERT_RULES['gemini_share_threshold']:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"Gemini 占比 {gemini_pct:.1f}% 高於門檻 {_ALERT_RULES['gemini_share_threshold']:.0f}%",
'suggestion': "Ollama-First 失守,請檢查 fallback 是否正確命中本地",
})
# R3: 失敗率 (P1)
total_calls = int(summary.get('total_calls') or 0)
failed = int(summary.get('failed_calls') or 0)
if total_calls:
err_rate = failed / total_calls * 100.0
if err_rate > _ALERT_RULES['error_rate_critical']:
alerts.append({
'level': 'P1', 'icon': '🔴',
'title': f"全域失敗率 {err_rate:.1f}% 超過門檻 {_ALERT_RULES['error_rate_critical']:.0f}%",
'suggestion': f"今日 {failed:,} / {total_calls:,} 失敗,立即查 ai_calls WHERE status<>'ok'",
})
# R4: 預算超標 (P1)
for period_key, label in (('daily', ''), ('weekly', ''), ('monthly', '')):
spent = float(budgets.get(f'{period_key}_spent') or 0)
budget = float(budgets.get(f'{period_key}_budget') or 0)
if budget > 0:
usage_pct = spent / budget * 100.0
if usage_pct > _ALERT_RULES['budget_warning']:
alerts.append({
'level': 'P1', 'icon': '🔴',
'title': f"{label}成本 ${spent:.2f} 達預算 ${budget:.2f}{usage_pct:.0f}%",
'suggestion': "請檢查供應商分布是否異常Section 2/3或調整預算",
})
# R5: GCP 命中率低 (P2) — 僅當有 Ollama 流量時才檢查
today_gcp_hit = float(trends.get('today_gcp_hit_pct') or 0)
ollama = sum(int(r.get('tokens') or 0) for r in by_provider
if r['provider'] in ('gcp_ollama', 'ollama_secondary', 'ollama_111'))
if ollama > 0 and today_gcp_hit < _ALERT_RULES['gcp_hit_warning']:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"GCP Ollama 命中率 {today_gcp_hit:.1f}% 低於 {_ALERT_RULES['gcp_hit_warning']:.0f}%",
'suggestion': "111 fallback 觸發頻繁,請檢查 GCP Ollama 健康ADR-027",
})
# R6: Cache 命中率低 (INFO) — claude
claude_cache = cache_stats.get('claude', {})
if int(claude_cache.get('total') or 0) >= 10:
if float(claude_cache.get('pct') or 0) < _ALERT_RULES['cache_hit_low']:
alerts.append({
'level': 'INFO', 'icon': '🟢',
'title': f"Claude prompt cache 命中率僅 {claude_cache['pct']:.1f}%",
'suggestion': "可優化 system prompt 結構≥1024 tokens 才觸發 cache",
})
return alerts
def _generate_insights(
target_date: date,
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
) -> List[Dict[str, str]]:
"""Section 6 智能建議(規則引擎,零 LLM 成本)。"""
insights: List[Dict[str, str]] = []
ollama_pct = float(summary.get('ollama_pct') or 0)
if ollama_pct >= _OLLAMA_FIRST_TARGET_PCT:
insights.append({
'icon': '',
'text': f"Ollama 占比 {ollama_pct:.1f}%(目標 ≥{_OLLAMA_FIRST_TARGET_PCT:.0f}%Ollama-First 戰役達標",
})
else:
insights.append({
'icon': '⚠️',
'text': f"Ollama 占比 {ollama_pct:.1f}% 未達 {_OLLAMA_FIRST_TARGET_PCT:.0f}% 目標,可優化 fallback 鏈",
})
nim_total = sum(
int(r.get('tokens') or 0) for r in by_provider
if r['provider'] in ('nim', 'nim_via_elephant')
)
if 0 < nim_total < 100_000:
insights.append({
'icon': '',
'text': f"NIM 用量已降至 {nim_total:,} tokens戰役前約 5M可考慮關閉 NIM 依賴",
})
success_rate = float(summary.get('success_rate') or 0)
if summary.get('total_calls') and success_rate >= 99.0:
insights.append({
'icon': '',
'text': f"成功率 {success_rate:.1f}%,鏈路健康度高",
})
return insights
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:報表組裝
# ═══════════════════════════════════════════════════════════════════════════════
def _format_report(
target_date: date,
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
top_callers: List[Dict[str, Any]],
costs: List[Dict[str, Any]],
trends: Dict[str, Any],
budgets: Dict[str, Any],
cache_stats: Dict[str, Any],
alerts: List[Dict[str, str]],
insights: List[Dict[str, str]],
) -> str:
"""組裝完整 HTML 報表。所有 caller/model 字串均經 _esc。"""
weekday_zh = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'][target_date.weekday()]
now_str = datetime.now(_TAIPEI_TZ).strftime('%H:%M:%S')
lines: List[str] = []
# Header
lines.append(f"📊 <b>LLM Token 日報 {target_date.isoformat()} ({weekday_zh})</b>")
lines.append("═══════════════════════════════════════")
lines.append(f"⏰ 統計區間00:00 ~ 23:59 (UTC+8)")
lines.append(f"🔄 報表生成:{now_str} | 涵蓋筆數:{summary['total_calls']:,} calls")
# Section 1
lines.append("")
lines.append("━━━━━ <b>【1】今日總覽 TL;DR</b> ━━━━━")
wow_sign = "+" if summary['wow_pct'] >= 0 else ""
lines.append(f"🪙 總 Token <b>{summary['total_tokens']:,}</b> ({wow_sign}{summary['wow_pct']:.1f}% vs 昨日)")
lines.append(f"💰 總成本: <b>US$ {summary['total_cost_usd']:.2f}</b>")
lines.append(f"⚡ 平均延遲: {summary['avg_duration_ms']:.0f} ms")
lines.append(f"✅ 成功率: {summary['success_rate']:.1f}% ({summary['failed_calls']} 失敗 / {summary['total_calls']})")
ollama_check = "" if summary['ollama_pct'] >= _OLLAMA_FIRST_TARGET_PCT else "⚠️"
lines.append(f"🎯 Ollama 占比:{summary['ollama_pct']:.1f}% {ollama_check}")
# Section 2
lines.append("")
lines.append("━━━━━ <b>【2】供應商分布</b> ━━━━━")
for p in by_provider:
icon, name = _PROVIDER_DISPLAY[p['provider']]
if p['calls'] == 0:
continue # 0 筆者跳過避免雜訊
lines.append(
f"{icon} {_pad(name, 14)} "
f"{_fmt_kb(p['tokens']):>8} ({p['pct']:5.1f}%) "
f"{p['calls']:>5} calls "
f"${p['cost_usd']:6.2f} "
f"{p['avg_duration_ms']:5.0f}ms"
)
# Section 3
lines.append("")
lines.append(f"━━━━━ <b>【3】呼叫點 TOP {len(top_callers)} (按 Token)</b> ━━━━━")
medals = ['🥇', '🥈', '🥉']
for i, c in enumerate(top_callers):
rank = medals[i] if i < 3 else f" {i+1}"
flag = ""
if c.get('delta_pct') is not None:
d = c['delta_pct']
if d >= 40: flag = f" ⚠️ {d:+.0f}%"
elif d <= -50: flag = f" 🎉 {d:+.0f}%"
lines.append(
f"{rank} <code>{_esc(c['caller'])}</code>"
f" / {_esc(c['provider'])} / {_esc(c['model'])[:24]}"
)
lines.append(f" {_fmt_kb(c['tokens']):>8} | {c['calls']:>5} calls{flag}")
# Section 4
lines.append("")
lines.append("━━━━━ <b>【4】成本分析 + 預算對比</b> ━━━━━")
lines.append(_budget_line("📅 本日成本", budgets['daily_spent'], budgets['daily_budget']))
lines.append(_budget_line("📅 本週累計", budgets['weekly_spent'], budgets['weekly_budget']))
lines.append(_budget_line("📅 本月累計", budgets['monthly_spent'], budgets['monthly_budget']))
if costs:
lines.append("")
lines.append("<b>成本拆解 by Model:</b>")
for c in costs[:6]:
lines.append(f" {_esc(c['model'])[:32]:<32} ${c['cost_usd']:7.4f} ({c['calls']} calls)")
# Cache 命中
lines.append("")
lines.append("<b>Prompt Cache 命中:</b>")
cc = cache_stats.get('claude', {})
if cc.get('total'):
lines.append(f" Claude: {cc['hits']:>4} / {cc['total']:<4} ({cc['pct']:5.1f}%)")
else:
lines.append(" Claude: N/A")
gc = cache_stats.get('gemini', {})
if gc.get('total'):
lines.append(f" Gemini: {gc['hits']:>4} / {gc['total']:<4} ({gc['pct']:5.1f}%)")
else:
lines.append(" Gemini: N/A")
# Section 5
lines.append("")
lines.append("━━━━━ <b>【5】趨勢與洞察 (vs 7 日均)</b> ━━━━━")
lines.append(_trend_line("總 Tokens", trends['today_total_tokens'], trends['7d_avg_total']))
lines.append(_trend_line("Gemini Tokens", trends['today_gemini_tokens'], trends['7d_avg_gemini']))
lines.append(_trend_line("Ollama Tokens", trends['today_ollama_tokens'], trends['7d_avg_ollama']))
lines.append(_trend_line("Claude Tokens", trends['today_claude_tokens'], trends['7d_avg_claude']))
lines.append(_trend_line("平均延遲(ms)", trends['today_avg_duration'], trends['7d_avg_duration'], unit=''))
lines.append("")
lines.append(f"📈 7 日累計:{_fmt_kb(trends['7d_total_tokens'])} tokens / US$ {trends['7d_total_cost']:.2f}")
# Section 6
lines.append("")
lines.append("━━━━━ <b>【6】告警與建議</b> ━━━━━")
if alerts:
for a in alerts:
lines.append(f"{a['icon']} <b>[{a['level']}]</b> {_esc(a['title'])}")
lines.append(f" 建議:{_esc(a['suggestion'])}")
else:
lines.append("✅ 無異常告警")
if insights:
lines.append("")
lines.append("<b>🔮 智能建議 (Hermes 規則引擎)</b>")
for ins in insights:
lines.append(f" {ins['icon']} {_esc(ins['text'])}")
# Footer
lines.append("")
lines.append("═══════════════════════════════════════")
lines.append("🤖 Operation Ollama-First v5.0 / token_report v1.0")
return "\n".join(lines)
def _format_failure_report(target_date: date, error: str) -> str:
"""DB 查詢失敗時的最簡訊息(仍保留 HTML escape"""
return (
f"⚠️ <b>LLM Token 日報生成失敗 ({target_date.isoformat()})</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"錯誤:<code>{_esc(error)[:300]}</code>\n"
f"請查 logs<code>docker logs momo-scheduler | grep TokenReport</code>"
)
def _persist_to_ai_insights(target_date: date, content: str, send_result: Dict[str, Any]) -> None:
"""寫一筆 ai_insightstype='daily_token_report'metadata 不含 PII。"""
from sqlalchemy import text
from database.manager import get_session
import json as _json
meta = {
'target_date': target_date.isoformat(),
'sent': int(send_result.get('sent', 0)),
'failed': int(send_result.get('failed', 0)),
'chars': int(send_result.get('chars', 0)),
# 注意:絕不存 username / first_name / chat_id
}
session = get_session()
try:
session.execute(text("""
INSERT INTO ai_insights (
insight_type, period, content, metadata_json,
avg_quality, status, decay_exempt, ai_model,
created_by, created_at, updated_at
) VALUES (
'daily_token_report', :period, :content, :meta,
0.9, 'approved', TRUE, 'rule_engine',
'token_report_service', NOW(), NOW()
)
"""), {
'period': target_date.isoformat(),
'content': content[:8000], # ai_insights.content 為 TEXT仍設上限保險
'meta': _json.dumps(meta, ensure_ascii=False),
})
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:格式化工具
# ═══════════════════════════════════════════════════════════════════════════════
def _esc(s: Any) -> str:
"""HTML escape對齊 telegram_templates._html_escape 行為。"""
text = "" if s is None else str(s)
return (text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;"))
def _pad(s: str, width: int) -> str:
"""中文寬字元 padding中文字以 2 寬度計)。"""
visible = sum(2 if ord(c) > 127 else 1 for c in s)
return s + " " * max(0, width - visible)
def _fmt_kb(tokens: int) -> str:
"""token 數 → 1.2K / 3.4M 顯示。"""
n = int(tokens or 0)
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.0f}K"
return f"{n}"
def _budget_line(label: str, spent: float, budget: float) -> str:
"""產出單列預算進度條10 格條)。"""
if budget <= 0:
return f"{label} US$ {spent:6.2f} ({_pad('未設定預算', 10)})"
pct = min(100.0, spent / budget * 100.0)
filled = int(pct / 10)
bar = "" * filled + "" * (10 - filled)
return f"{label} US$ {spent:6.2f} {bar} {pct:3.0f}% / ${budget:.0f} 預算"
def _trend_line(label: str, today: float, baseline: float, unit: str = '') -> str:
"""產出單列趨勢比較。"""
today_n = float(today or 0)
base_n = float(baseline or 0)
if base_n > 0:
delta = (today_n - base_n) / base_n * 100.0
sign = "+" if delta >= 0 else ""
arrow = "" if delta >= 5 else ("" if delta <= -5 else "")
else:
delta = 0.0
sign = ""
arrow = ""
today_str = _fmt_kb(int(today_n)) if 'Tokens' in label else f"{today_n:,.0f}{unit}"
base_str = _fmt_kb(int(base_n)) if 'Tokens' in label else f"{base_n:,.0f}{unit}"
return f" {_pad(label, 14)} {today_str:>8} vs {base_str:>8} ({sign}{delta:5.1f}%) {arrow}"