diff --git a/services/ai_call_logger.py b/services/ai_call_logger.py
new file mode 100644
index 0000000..ad5676e
--- /dev/null
+++ b/services/ai_call_logger.py
@@ -0,0 +1,434 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+services/ai_call_logger.py
+統一 LLM 呼叫遙測層 (Operation Ollama-First v5.0 — Phase 1)
+
+依據:
+ - docs/phase0_audit_report_20260503.md (34 個 LLM 呼叫點 / 11.8% 覆蓋率)
+ - docs/phase1_db_design_20260503.md (ai_calls schema)
+ - migrations/024_create_ai_calls_table.sql
+
+設計原則 (憲法級):
+ 1. 非阻塞: DB 寫入跑 daemon thread,主流程不等
+ 2. 失敗安全: DB 例外只 log warning,絕不影響 LLM 主流程
+ 3. PII 保護: meta 不存原始 prompt,只存 prompt_hash[:12]
+ 4. Kill-switch: AI_CALL_LOGGING_ENABLED=false 一鍵關閉
+ 5. 連續失敗 ≥ 10 次自動降級為純 logger.info
+
+主入口:
+ - log_ai_call(...) context manager (推薦)
+ - logged_ai_call(...) decorator (簡單一行 LLM call)
+"""
+
+from __future__ import annotations
+
+import hashlib
+import inspect
+import logging
+import os
+import threading
+import time
+from contextlib import contextmanager
+from functools import wraps
+from typing import Any, Callable, Dict, Optional
+
+logger = logging.getLogger(__name__)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 成本表 (USD per 1M tokens)
+# 依據 phase0 audit + 各 provider 官方定價,Ollama 全部 0
+# ─────────────────────────────────────────────────────────────────────────────
+COST_TABLE: Dict[str, Dict[str, float]] = {
+ # Gemini
+ 'gemini-2.5-flash': {'in': 0.075, 'out': 0.30},
+ 'gemini-2.5-pro': {'in': 1.25, 'out': 10.0},
+ 'gemini-2.0-flash': {'in': 0.075, 'out': 0.30},
+ 'gemini-1.5-flash': {'in': 0.075, 'out': 0.30},
+ # NVIDIA NIM (配額制,免費 tier 全 0)
+ 'meta/llama-3.1-8b-instruct': {'in': 0.0, 'out': 0.0},
+ 'meta/llama-3.3-70b-instruct': {'in': 0.0, 'out': 0.0},
+ 'nvidia/llama-3.3-nemotron-super-49b-v1.5': {'in': 0.0, 'out': 0.0},
+ 'deepseek-ai/deepseek-v3.2': {'in': 0.0, 'out': 0.0},
+ # Claude
+ 'claude-opus-4-7': {'in': 15.0, 'out': 75.0},
+ 'claude-sonnet-4-6': {'in': 3.0, 'out': 15.0},
+ # Ollama 自架 (全 0)
+ 'hermes3:latest': {'in': 0.0, 'out': 0.0},
+ 'qwen2.5-coder:7b': {'in': 0.0, 'out': 0.0},
+ 'llama3.1:8b': {'in': 0.0, 'out': 0.0},
+ 'bge-m3:latest': {'in': 0.0, 'out': 0.0},
+}
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 環境開關 + Kill-switch
+# ─────────────────────────────────────────────────────────────────────────────
+def _is_logging_enabled() -> bool:
+ """環境變數即時讀取 (允許 runtime toggle)"""
+ val = os.environ.get('AI_CALL_LOGGING_ENABLED', 'true').strip().lower()
+ return val not in ('false', '0', 'no', 'off')
+
+
+# 連續失敗門檻;超過後降級為純 logger.info,不再嘗試 DB
+_MAX_CONSECUTIVE_FAILURES = 10
+_failure_counter_lock = threading.Lock()
+_failure_state = {'count': 0, 'killed': False}
+
+
+def _record_failure() -> None:
+ with _failure_counter_lock:
+ _failure_state['count'] += 1
+ if _failure_state['count'] >= _MAX_CONSECUTIVE_FAILURES and not _failure_state['killed']:
+ _failure_state['killed'] = True
+ logger.error(
+ "[AICallLogger] consecutive write failures hit %d — kill-switch ON, "
+ "downgrading to logger.info only",
+ _MAX_CONSECUTIVE_FAILURES,
+ )
+
+
+def _record_success() -> None:
+ with _failure_counter_lock:
+ if _failure_state['count'] > 0:
+ _failure_state['count'] = 0
+
+
+def _is_killed() -> bool:
+ with _failure_counter_lock:
+ return _failure_state['killed']
+
+
+def _reset_kill_switch() -> None:
+ """測試專用:重置 kill-switch 狀態。"""
+ with _failure_counter_lock:
+ _failure_state['count'] = 0
+ _failure_state['killed'] = False
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 內部狀態容器
+# ─────────────────────────────────────────────────────────────────────────────
+class _CallState:
+ """單次 LLM 呼叫的遙測狀態容器。"""
+
+ __slots__ = (
+ 'caller', 'provider', 'model', 'request_id',
+ 'input_tokens', 'output_tokens',
+ 'duration_ms', 'status', 'fallback_to',
+ 'cost_usd', 'cache_hit', 'rag_hit',
+ 'error', 'meta',
+ )
+
+ def __init__(self, caller: str, provider: str, model: str,
+ request_id: Optional[str], meta: Dict[str, Any]):
+ self.caller = caller
+ self.provider = provider
+ self.model = model
+ self.request_id = request_id
+ self.input_tokens = 0
+ self.output_tokens = 0
+ self.duration_ms: Optional[int] = None
+ self.status: Optional[str] = None
+ self.fallback_to: Optional[str] = None
+ self.cost_usd = 0.0
+ self.cache_hit = False
+ self.rag_hit = False
+ self.error: Optional[str] = None
+ self.meta: Dict[str, Any] = dict(meta) if meta else {}
+
+ # ── caller 操作 API ──────────────────────────────────────────────
+ def set_tokens(self, input: int = 0, output: int = 0) -> None:
+ """設定 token 數。容錯 None / 非整數。"""
+ try:
+ self.input_tokens = int(input or 0)
+ except (TypeError, ValueError):
+ self.input_tokens = 0
+ try:
+ self.output_tokens = int(output or 0)
+ except (TypeError, ValueError):
+ self.output_tokens = 0
+
+ def set_cache_hit(self, hit: bool = True) -> None:
+ self.cache_hit = bool(hit)
+
+ def set_rag_hit(self, hit: bool = True) -> None:
+ self.rag_hit = bool(hit)
+
+ def fallback_to_caller(self, target_caller: str) -> None:
+ """主路徑失敗、觸發下游 caller 接手。下游本身會另寫一筆 ok/error。"""
+ self.fallback_to = (target_caller or '')[:64]
+ self.status = 'fallback'
+
+ # 別名:與設計文 spec 對齊
+ fallback_to_target = fallback_to_caller
+
+ def set_error(self, msg: str) -> None:
+ self.error = (msg or '')[:2000]
+ self.status = 'error'
+
+ def set_status(self, status: str) -> None:
+ self.status = (status or '')[:16]
+
+ def set_prompt_hash(self, prompt: Optional[str]) -> None:
+ """安全地將 prompt 轉成 hash 存入 meta(PII 保護)。"""
+ if prompt:
+ digest = hashlib.sha256(prompt.encode('utf-8', errors='replace')).hexdigest()
+ self.meta['prompt_hash'] = digest[:12]
+
+ def add_meta(self, key: str, value: Any) -> None:
+ if key:
+ self.meta[key] = value
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 主入口 1: context manager
+# ─────────────────────────────────────────────────────────────────────────────
+@contextmanager
+def log_ai_call(
+ caller: str,
+ provider: str,
+ model: str,
+ request_id: Optional[str] = None,
+ meta: Optional[Dict[str, Any]] = None,
+):
+ """
+ 使用範例:
+ with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx:
+ response = ollama.generate(...)
+ ctx.set_tokens(input=response['prompt_eval_count'],
+ output=response['eval_count'])
+ ctx.set_cache_hit(False)
+ # 失敗時 ctx.set_error('timeout') / ctx.fallback_to_caller('111_ollama')
+
+ 紀律:
+ - 永遠不影響主流程:例外會 re-raise,但 logger 寫入是 fire-and-forget
+ - 若 AI_CALL_LOGGING_ENABLED=false → 仍 yield ctx(API 一致),但跳過寫入
+ """
+ state = _CallState(caller, provider, model, request_id, meta or {})
+ start = time.monotonic()
+
+ try:
+ yield state
+ # 沒例外 → 若 caller 自己沒設 status,預設 ok
+ if state.status is None:
+ state.status = 'ok'
+ except Exception as e:
+ state.status = 'error'
+ if not state.error:
+ state.error = f"{type(e).__name__}: {str(e)[:1500]}"
+ raise
+ finally:
+ state.duration_ms = int((time.monotonic() - start) * 1000)
+ try:
+ _async_write(state)
+ except Exception as exc: # pragma: no cover — 寫入 thread 啟動失敗
+ logger.warning("[AICallLogger] async dispatch failed: %s", exc)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 主入口 2: decorator
+# ─────────────────────────────────────────────────────────────────────────────
+def logged_ai_call(
+ caller: str,
+ provider: str,
+ model: Optional[str] = None,
+ model_extractor: Optional[Callable[[tuple, dict], str]] = None,
+):
+ """
+ 使用範例:
+ @logged_ai_call(caller='sales_copy', provider='gcp_ollama',
+ model_extractor=lambda a, kw: kw.get('model', 'llama3.1:8b'))
+ def generate_copy(...):
+ return ollama.generate(model='llama3.1:8b', ...)
+
+ Args:
+ caller: ai_calls.caller 白名單字串
+ provider: ai_calls.provider
+ model: 靜態模型名(與 model_extractor 二擇一)
+ model_extractor: 從 (args, kwargs) 解析 model 名(動態優先)
+
+ 注意:
+ - decorator 不知道 token 數;若需精準 token,請改用 log_ai_call context manager
+ - 例外會 re-raise,狀態自動標 error
+ """
+ def deco(fn: Callable):
+ @wraps(fn)
+ def wrapper(*args, **kwargs):
+ try:
+ resolved_model = (
+ model_extractor(args, kwargs) if model_extractor else (model or 'unknown')
+ )
+ except Exception:
+ resolved_model = model or 'unknown'
+
+ with log_ai_call(caller, provider, resolved_model) as ctx:
+ result = fn(*args, **kwargs)
+ # 嘗試從 result 自動抽 tokens(best-effort,失敗不影響主流程)
+ try:
+ _auto_extract_tokens(ctx, result)
+ except Exception:
+ pass
+ return result
+ return wrapper
+ return deco
+
+
+def _auto_extract_tokens(ctx: _CallState, result: Any) -> None:
+ """從常見 LLM response 形態自動抽 token(best-effort)。"""
+ if result is None:
+ return
+ # dict (Ollama / NIM raw)
+ if isinstance(result, dict):
+ usage = result.get('usage') or {}
+ if usage:
+ ctx.set_tokens(
+ input=usage.get('prompt_tokens') or usage.get('input_tokens') or 0,
+ output=usage.get('completion_tokens') or usage.get('output_tokens') or 0,
+ )
+ return
+ # Ollama: prompt_eval_count / eval_count
+ if 'eval_count' in result or 'prompt_eval_count' in result:
+ ctx.set_tokens(
+ input=result.get('prompt_eval_count', 0),
+ output=result.get('eval_count', 0),
+ )
+ return
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 異步寫入 (fire-and-forget)
+# ─────────────────────────────────────────────────────────────────────────────
+def _async_write(state: _CallState) -> None:
+ """放到 daemon thread 寫,主流程不阻塞。
+
+ 若 AI_CALL_LOGGING_ENABLED=false → 直接跳過。
+ 若 kill-switch 觸發 → 退化為 logger.info。
+ """
+ if not _is_logging_enabled():
+ return
+
+ if _is_killed():
+ # 降級模式:純 log,不再碰 DB
+ logger.info(
+ "[AICall|killed] caller=%s provider=%s model=%s status=%s "
+ "tokens=%s/%s duration=%sms",
+ state.caller, state.provider, state.model, state.status,
+ state.input_tokens, state.output_tokens, state.duration_ms,
+ )
+ return
+
+ threading.Thread(
+ target=_write_to_db,
+ args=(state,),
+ name=f"ai-call-log-{state.caller}",
+ daemon=True,
+ ).start()
+
+
+def _write_to_db(state: _CallState) -> None:
+ """try/except 全包;DB 掛了只 log warning 不爆炸。"""
+ try:
+ from sqlalchemy import text
+ from database.manager import get_session
+
+ cost = _calc_cost(state.model, state.input_tokens, state.output_tokens)
+ meta_json = _safe_meta_json(state.meta)
+
+ session = get_session()
+ try:
+ session.execute(
+ text("""
+ INSERT INTO ai_calls (
+ caller, provider, model,
+ input_tokens, output_tokens, duration_ms,
+ status, fallback_to, cost_usd,
+ cache_hit, rag_hit, request_id,
+ error, meta
+ ) VALUES (
+ :caller, :provider, :model,
+ :input_tokens, :output_tokens, :duration_ms,
+ :status, :fallback_to, :cost_usd,
+ :cache_hit, :rag_hit, :request_id,
+ :error, CAST(:meta AS JSONB)
+ )
+ """),
+ {
+ 'caller': state.caller[:64] if state.caller else 'unknown',
+ 'provider': (state.provider or 'unknown')[:32],
+ 'model': (state.model or 'unknown')[:128],
+ 'input_tokens': int(state.input_tokens or 0),
+ 'output_tokens': int(state.output_tokens or 0),
+ 'duration_ms': state.duration_ms,
+ 'status': (state.status or 'ok')[:16],
+ 'fallback_to': state.fallback_to,
+ 'cost_usd': cost,
+ 'cache_hit': bool(state.cache_hit),
+ 'rag_hit': bool(state.rag_hit),
+ 'request_id': state.request_id,
+ 'error': state.error,
+ 'meta': meta_json,
+ },
+ )
+ session.commit()
+ _record_success()
+ except Exception:
+ session.rollback()
+ raise
+ finally:
+ session.close()
+ except Exception as e:
+ _record_failure()
+ logger.warning(
+ "[AICallLogger] write failed (caller=%s provider=%s): %s",
+ state.caller, state.provider, e,
+ )
+
+
+def _calc_cost(model: str, in_tokens: int, out_tokens: int) -> float:
+ """依 COST_TABLE 計算成本;未知 model log warning 並回 0。"""
+ if not model:
+ return 0.0
+ rate = COST_TABLE.get(model)
+ if rate is None:
+ # NIM 配額制走免費 tier,常見 nvidia/* meta/* deepseek-* 視為 0
+ prefix_zero = ('meta/', 'nvidia/', 'deepseek-')
+ if any(model.startswith(p) for p in prefix_zero):
+ return 0.0
+ logger.warning("[AICallLogger] unknown model cost: %s, default 0", model)
+ return 0.0
+ in_t = max(0, int(in_tokens or 0))
+ out_t = max(0, int(out_tokens or 0))
+ cost = (in_t * rate['in'] + out_t * rate['out']) / 1_000_000
+ # NUMERIC(10,6) 上限 9999.999999;極端 case 截斷避免 overflow
+ if cost < 0:
+ return 0.0
+ return round(min(cost, 9999.999999), 6)
+
+
+def _safe_meta_json(meta: Dict[str, Any]) -> str:
+ """meta 序列化為 JSON 字串;失敗時回 '{}'。"""
+ import json
+ if not meta:
+ return '{}'
+ try:
+ return json.dumps(meta, ensure_ascii=False, default=str)
+ except Exception as e:
+ logger.warning("[AICallLogger] meta json dump failed: %s", e)
+ return '{}'
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 工具:caller 自動推斷(caller 沒給時用)
+# ─────────────────────────────────────────────────────────────────────────────
+def infer_caller_from_stack(default: str = 'unknown') -> str:
+ """從 inspect.stack() 推斷 caller(取上 1 層的 module 名末段)。"""
+ try:
+ frame = inspect.stack()[2]
+ module = inspect.getmodule(frame.frame)
+ if module and module.__name__:
+ return module.__name__.split('.')[-1][:64]
+ except Exception:
+ pass
+ return default
diff --git a/services/token_report_service.py b/services/token_report_service.py
new file mode 100644
index 0000000..8950dee
--- /dev/null
+++ b/services/token_report_service.py
@@ -0,0 +1,867 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+services/token_report_service.py
+LLM Token 日報服務 (Operation Ollama-First v5.0 — Phase 1 收尾)
+
+依據:
+ - migrations/024_create_ai_calls_table.sql (ai_calls schema + CHECK constraints)
+ - migrations/025_create_mcp_calls_and_budgets.sql (ai_call_budgets 種子資料)
+ - services/ai_call_logger.py (COST_TABLE / provider 白名單)
+ - services/telegram_templates.py (HTML escape 與 send 封裝)
+ - docs/phase0_audit_report_20260503.md (34 LLM 呼叫點清冊)
+ - docs/phase1_db_design_20260503.md (查詢 latency 預估)
+
+設計紀律 (憲法級):
+ 1. 失敗安全: DB 查詢失敗 → 推「⚠️ 報表生成失敗」訊息,不影響其他排程
+ 2. PII 保護: 報表訊息不含 prompt 原文;ai_insights metadata 只存統計 meta(不存 username)
+ 3. 不污染既有 Telegram 流程: 共用 telegram_templates 既有 send 函數,不另開連線
+ 4. ≤ 4096 字元自動截斷: Telegram 單訊息上限保險絲
+
+公開 API:
+ - generate_daily_report(target_date) → str (HTML)
+ - send_daily_report() → dict (sent/failed/errors)
+"""
+
+from __future__ import annotations
+
+import logging
+from datetime import date, datetime, timedelta, timezone
+from decimal import Decimal
+from typing import Any, Dict, List, Optional, Tuple
+
+logger = logging.getLogger(__name__)
+
+# Asia/Taipei (UTC+8) 統一處理(避免容器 tzdata 差異,沿襲 telegram_templates 慣例)
+_TAIPEI_TZ = timezone(timedelta(hours=8))
+
+# Telegram 單則訊息字元上限(保留 96 字元給 footer,避免精準卡 4096)
+_TELEGRAM_MAX_CHARS = 4000
+
+# Provider 顯示名稱表(與 ai_calls.provider 白名單對齊,order 即報表順序)
+_PROVIDER_DISPLAY: Dict[str, Tuple[str, str]] = {
+ 'gcp_ollama': ('🟢', 'GCP Ollama'),
+ 'ollama_secondary': ('🟢', 'Secondary'), # critic-A11 B4 修補:三主機架構一致性
+ 'ollama_111': ('🟠', '111 Ollama'),
+ 'gemini': ('🔴', 'Gemini'),
+ 'claude': ('🟣', 'Claude'),
+ 'nim': ('🟡', 'NIM'),
+ 'openrouter': ('🟤', 'OpenRouter'),
+ 'nim_via_elephant': ('🟫', 'NIM_via_Eleph'),
+}
+
+# Ollama 占比門檻(Section 1 「Ollama-First 達標」判斷用,戰役 KPI ≥60%)
+_OLLAMA_FIRST_TARGET_PCT = 60.0
+
+# 告警規則參數(Section 6 自動產生用)
+_ALERT_RULES = {
+ 'caller_spike_factor': 1.4, # tokens > 7 日均 × 1.4
+ 'gemini_share_threshold': 35.0, # gemini 占比 > 35% 視為 Ollama-First 失守
+ 'error_rate_critical': 5.0, # error_rate > 5% → P1
+ 'budget_warning': 80.0, # spent / budget > 80% → P1
+ 'gcp_hit_warning': 90.0, # gcp_ollama 占比 < 90% (Ollama 內) → P2
+ 'cache_hit_low': 40.0, # claude cache hit < 40% → INFO
+ 'caller_stable_days': 7, # 連續 N 日 Ollama >95% → INFO「可關 fallback」
+ 'ollama_stable_pct': 95.0,
+}
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# 公開 API
+# ═══════════════════════════════════════════════════════════════════════════════
+
+def generate_daily_report(target_date: Optional[date] = None) -> str:
+ """產出指定日的 LLM Token 日報(HTML,供 Telegram parse_mode='HTML')。
+
+ Args:
+ target_date: 統計目標日(Asia/Taipei)。未指定 → 「今日」。
+
+ Returns:
+ 完整 HTML 報表字串;若 DB 查詢失敗,回傳簡短錯誤訊息(仍可發 Telegram)。
+ """
+ if target_date is None:
+ target_date = datetime.now(_TAIPEI_TZ).date()
+
+ try:
+ summary = _query_summary(target_date)
+ by_provider = _query_by_provider(target_date)
+ top_callers = _query_top_callers(target_date, limit=10)
+ costs = _query_cost_breakdown(target_date)
+ trends = _query_trends_vs_7day(target_date)
+ budgets = _query_budget_usage(target_date)
+ cache_stats = _query_cache_hit_stats(target_date)
+ except Exception as exc:
+ logger.exception("[TokenReport] DB query failed: %s", exc)
+ return _format_failure_report(target_date, str(exc))
+
+ alerts = _detect_alerts(summary, by_provider, top_callers, trends, budgets, cache_stats)
+ insights = _generate_insights(target_date, summary, by_provider)
+
+ return _format_report(
+ target_date=target_date,
+ summary=summary,
+ by_provider=by_provider,
+ top_callers=top_callers,
+ costs=costs,
+ trends=trends,
+ budgets=budgets,
+ cache_stats=cache_stats,
+ alerts=alerts,
+ insights=insights,
+ )
+
+
+def send_daily_report(target_date: Optional[date] = None) -> Dict[str, Any]:
+ """產報並送 Telegram + 寫 ai_insights。
+
+ Returns:
+ {'ok': bool, 'sent': int, 'failed': int, 'chars': int, 'errors': list}
+ """
+ if target_date is None:
+ target_date = datetime.now(_TAIPEI_TZ).date()
+
+ try:
+ report_html = generate_daily_report(target_date)
+ except Exception as exc:
+ logger.exception("[TokenReport] generate_daily_report failed: %s", exc)
+ report_html = _format_failure_report(target_date, str(exc))
+
+ # 截斷至 Telegram 安全長度(HTML tag 簡化處理:超出時加省略尾)
+ if len(report_html) > _TELEGRAM_MAX_CHARS:
+ truncated = report_html[: _TELEGRAM_MAX_CHARS - 80]
+ report_html = truncated + "\n\n... (訊息超長,已截斷;詳見 ai_insights)"
+
+ # 送 Telegram(用既有封裝,不另起連線)
+ result: Dict[str, Any] = {'ok': False, 'sent': 0, 'failed': 0, 'chars': len(report_html), 'errors': []}
+ try:
+ from services.telegram_templates import send_telegram_with_result
+ send_result = send_telegram_with_result(report_html, parse_mode='HTML')
+ result.update({
+ 'ok': bool(send_result.get('ok')),
+ 'sent': int(send_result.get('sent', 0)),
+ 'failed': int(send_result.get('failed', 0)),
+ 'errors': list(send_result.get('errors', [])),
+ })
+ except Exception as exc:
+ logger.exception("[TokenReport] telegram send failed: %s", exc)
+ result['errors'].append(f"telegram:{type(exc).__name__}")
+
+ # 寫 ai_insights(不含 PII / 不存 username)
+ try:
+ _persist_to_ai_insights(target_date, report_html, result)
+ except Exception as exc:
+ logger.warning("[TokenReport] ai_insights persist failed: %s", exc)
+
+ return result
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# 內部:SQL 查詢
+# ═══════════════════════════════════════════════════════════════════════════════
+
+def _date_window(target_date: date) -> Tuple[datetime, datetime]:
+ """回傳 [day_start, day_end) 的 Taipei tz-aware datetime(PostgreSQL 比較用)。"""
+ day_start = datetime.combine(target_date, datetime.min.time(), tzinfo=_TAIPEI_TZ)
+ day_end = day_start + timedelta(days=1)
+ return day_start, day_end
+
+
+def _exec_query(sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
+ """執行查詢並回傳 list of dict。session 隔離,例外向上拋。"""
+ from sqlalchemy import text
+ from database.manager import get_session
+
+ session = get_session()
+ try:
+ rows = session.execute(text(sql), params).mappings().all()
+ return [dict(r) for r in rows]
+ finally:
+ session.close()
+
+
+def _query_summary(target_date: date) -> Dict[str, Any]:
+ """Section 1 — 今日總覽(單列彙總)。
+
+ Returns:
+ {total_tokens, total_calls, total_cost_usd, avg_duration_ms,
+ success_rate, ollama_pct, prev_total_tokens (昨日比基準)}
+ """
+ day_start, day_end = _date_window(target_date)
+ prev_start = day_start - timedelta(days=1)
+
+ rows = _exec_query("""
+ SELECT
+ COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens,
+ COUNT(*) AS total_calls,
+ COALESCE(SUM(cost_usd), 0) AS total_cost_usd,
+ COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
+ COALESCE(SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END), 0) AS ok_calls,
+ COALESCE(SUM(
+ CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
+ THEN input_tokens + output_tokens ELSE 0 END
+ ), 0) AS ollama_tokens
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ """, {'start': day_start, 'end': day_end})
+
+ prev_rows = _exec_query("""
+ SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS prev_total_tokens
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ """, {'start': prev_start, 'end': day_start})
+
+ r = rows[0] if rows else {}
+ total_calls = int(r.get('total_calls') or 0)
+ total_tokens = int(r.get('total_tokens') or 0)
+ ok_calls = int(r.get('ok_calls') or 0)
+ ollama_tokens = int(r.get('ollama_tokens') or 0)
+ prev_total = int((prev_rows[0] if prev_rows else {}).get('prev_total_tokens') or 0)
+
+ return {
+ 'total_tokens': total_tokens,
+ 'total_calls': total_calls,
+ 'total_cost_usd': float(r.get('total_cost_usd') or 0),
+ 'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
+ 'success_rate': (ok_calls / total_calls * 100.0) if total_calls else 0.0,
+ 'failed_calls': max(0, total_calls - ok_calls),
+ 'ollama_pct': (ollama_tokens / total_tokens * 100.0) if total_tokens else 0.0,
+ 'prev_total_tokens': prev_total,
+ 'wow_pct': ((total_tokens - prev_total) / prev_total * 100.0) if prev_total else 0.0,
+ }
+
+
+def _query_by_provider(target_date: date) -> List[Dict[str, Any]]:
+ """Section 2 — 供應商分布(依 7 個 provider,含 0 筆者也顯示)。"""
+ day_start, day_end = _date_window(target_date)
+
+ rows = _exec_query("""
+ SELECT
+ provider,
+ SUM(input_tokens + output_tokens)::BIGINT AS tokens,
+ COUNT(*) AS calls,
+ COALESCE(SUM(cost_usd), 0) AS cost_usd,
+ COALESCE(AVG(duration_ms), 0) AS avg_duration_ms
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ GROUP BY provider
+ """, {'start': day_start, 'end': day_end})
+
+ by_p = {r['provider']: r for r in rows}
+ total_tokens = sum(int(r['tokens'] or 0) for r in rows)
+
+ result: List[Dict[str, Any]] = []
+ for p_key in _PROVIDER_DISPLAY:
+ r = by_p.get(p_key, {})
+ tokens = int(r.get('tokens') or 0)
+ result.append({
+ 'provider': p_key,
+ 'tokens': tokens,
+ 'pct': (tokens / total_tokens * 100.0) if total_tokens else 0.0,
+ 'calls': int(r.get('calls') or 0),
+ 'cost_usd': float(r.get('cost_usd') or 0),
+ 'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
+ })
+ return result
+
+
+def _query_top_callers(target_date: date, limit: int = 10) -> List[Dict[str, Any]]:
+ """Section 3 — TOP N caller by token + 與 7 日均的偏差。"""
+ day_start, day_end = _date_window(target_date)
+ week_start = day_start - timedelta(days=7)
+
+ rows = _exec_query("""
+ WITH today AS (
+ SELECT
+ caller,
+ provider,
+ MODE() WITHIN GROUP (ORDER BY model) AS top_model,
+ SUM(input_tokens + output_tokens)::BIGINT AS tokens,
+ COUNT(*) AS calls
+ FROM ai_calls
+ WHERE called_at >= :day_start AND called_at < :day_end
+ GROUP BY caller, provider
+ ),
+ baseline AS (
+ SELECT
+ caller,
+ SUM(input_tokens + output_tokens) / 7.0 AS avg_tokens_7d
+ FROM ai_calls
+ WHERE called_at >= :week_start AND called_at < :day_start
+ GROUP BY caller
+ )
+ SELECT
+ t.caller, t.provider, t.top_model, t.tokens, t.calls,
+ COALESCE(b.avg_tokens_7d, 0) AS avg_tokens_7d
+ FROM today t
+ LEFT JOIN baseline b ON b.caller = t.caller
+ ORDER BY t.tokens DESC
+ LIMIT :limit
+ """, {
+ 'day_start': day_start,
+ 'day_end': day_end,
+ 'week_start': week_start,
+ 'limit': int(limit),
+ })
+
+ result: List[Dict[str, Any]] = []
+ for r in rows:
+ tokens = int(r.get('tokens') or 0)
+ baseline = float(r.get('avg_tokens_7d') or 0)
+ delta_pct = ((tokens - baseline) / baseline * 100.0) if baseline > 0 else None
+ result.append({
+ 'caller': str(r.get('caller') or ''),
+ 'provider': str(r.get('provider') or ''),
+ 'model': str(r.get('top_model') or ''),
+ 'tokens': tokens,
+ 'calls': int(r.get('calls') or 0),
+ 'delta_pct': delta_pct,
+ })
+ return result
+
+
+def _query_cost_breakdown(target_date: date) -> List[Dict[str, Any]]:
+ """Section 4 — 依 model 拆解成本(金額由大到小,零成本不顯示)。"""
+ day_start, day_end = _date_window(target_date)
+
+ rows = _exec_query("""
+ SELECT
+ provider,
+ model,
+ COALESCE(SUM(cost_usd), 0) AS cost_usd,
+ COUNT(*) AS calls
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ AND cost_usd > 0
+ GROUP BY provider, model
+ ORDER BY cost_usd DESC
+ LIMIT 12
+ """, {'start': day_start, 'end': day_end})
+
+ return [
+ {
+ 'provider': str(r['provider']),
+ 'model': str(r['model']),
+ 'cost_usd': float(r['cost_usd']),
+ 'calls': int(r['calls']),
+ }
+ for r in rows
+ ]
+
+
+def _query_trends_vs_7day(target_date: date) -> Dict[str, Any]:
+ """Section 5 — 今日 vs 過去 7 日均 的趨勢比對。"""
+ day_start, day_end = _date_window(target_date)
+ week_start = day_start - timedelta(days=7)
+
+ today_rows = _exec_query("""
+ SELECT
+ COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_tokens,
+ COALESCE(SUM(CASE WHEN provider='gemini'
+ THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS gemini_tokens,
+ COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
+ THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS ollama_tokens,
+ COALESCE(SUM(CASE WHEN provider='claude'
+ THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS claude_tokens,
+ COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
+ COALESCE(SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END), 0) AS failed,
+ COUNT(*) AS total_calls,
+ COALESCE(SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END), 0) AS gcp_calls,
+ COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
+ THEN 1 ELSE 0 END), 0) AS ollama_calls
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ """, {'start': day_start, 'end': day_end})
+
+ base_rows = _exec_query("""
+ SELECT
+ COALESCE(SUM(input_tokens + output_tokens) / 7.0, 0) AS avg_total_tokens,
+ COALESCE(SUM(CASE WHEN provider='gemini'
+ THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_gemini_tokens,
+ COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
+ THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_ollama_tokens,
+ COALESCE(SUM(CASE WHEN provider='claude'
+ THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_claude_tokens,
+ COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
+ CASE WHEN COUNT(*) > 0
+ THEN SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100.0
+ ELSE 0 END AS error_rate_pct,
+ COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_7d_tokens,
+ COALESCE(SUM(cost_usd), 0) AS total_7d_cost,
+ CASE WHEN SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
+ THEN 1 ELSE 0 END) > 0
+ THEN SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END)::FLOAT
+ / SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
+ THEN 1 ELSE 0 END)::FLOAT * 100.0
+ ELSE 0 END AS gcp_hit_pct_7d
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ """, {'start': week_start, 'end': day_start})
+
+ t = today_rows[0] if today_rows else {}
+ b = base_rows[0] if base_rows else {}
+
+ today_total = int(t.get('total_tokens') or 0)
+ today_gemini = int(t.get('gemini_tokens') or 0)
+ today_ollama = int(t.get('ollama_tokens') or 0)
+ today_claude = int(t.get('claude_tokens') or 0)
+ today_calls = int(t.get('total_calls') or 0)
+ today_failed = int(t.get('failed') or 0)
+ today_gcp_calls = int(t.get('gcp_calls') or 0)
+ today_ollama_cal = int(t.get('ollama_calls') or 0)
+ today_error_pct = (today_failed / today_calls * 100.0) if today_calls else 0.0
+ today_gcp_hit = (today_gcp_calls / today_ollama_cal * 100.0) if today_ollama_cal else 0.0
+
+ return {
+ 'today_total_tokens': today_total,
+ 'today_gemini_tokens': today_gemini,
+ 'today_ollama_tokens': today_ollama,
+ 'today_claude_tokens': today_claude,
+ 'today_avg_duration': float(t.get('avg_duration_ms') or 0),
+ 'today_error_rate': today_error_pct,
+ 'today_gcp_hit_pct': today_gcp_hit,
+ '7d_avg_total': float(b.get('avg_total_tokens') or 0),
+ '7d_avg_gemini': float(b.get('avg_gemini_tokens') or 0),
+ '7d_avg_ollama': float(b.get('avg_ollama_tokens') or 0),
+ '7d_avg_claude': float(b.get('avg_claude_tokens') or 0),
+ '7d_avg_duration': float(b.get('avg_duration_ms') or 0),
+ '7d_error_rate': float(b.get('error_rate_pct') or 0),
+ '7d_total_tokens': int(b.get('total_7d_tokens') or 0),
+ '7d_total_cost': float(b.get('total_7d_cost') or 0),
+ '7d_gcp_hit_pct': float(b.get('gcp_hit_pct_7d') or 0),
+ }
+
+
+def _query_budget_usage(target_date: date) -> Dict[str, Any]:
+ """Section 4 — 預算對比(daily/weekly/monthly 全供應商總額)。"""
+ day_start, day_end = _date_window(target_date)
+ week_start = day_start - timedelta(days=6)
+ month_start = day_start.replace(day=1)
+
+ spent = _exec_query("""
+ SELECT
+ COALESCE(SUM(CASE WHEN called_at >= :day_start AND called_at < :day_end
+ THEN cost_usd ELSE 0 END), 0) AS daily_spent,
+ COALESCE(SUM(CASE WHEN called_at >= :week_start AND called_at < :day_end
+ THEN cost_usd ELSE 0 END), 0) AS weekly_spent,
+ COALESCE(SUM(CASE WHEN called_at >= :month_start AND called_at < :day_end
+ THEN cost_usd ELSE 0 END), 0) AS monthly_spent,
+ COUNT(*) FILTER (WHERE called_at >= :month_start) AS month_call_count
+ FROM ai_calls
+ WHERE called_at >= :month_start AND called_at < :day_end
+ """, {
+ 'day_start': day_start,
+ 'day_end': day_end,
+ 'week_start': week_start,
+ 'month_start': month_start,
+ })
+
+ budget_rows = _exec_query("""
+ SELECT period, provider, budget_usd, alert_pct
+ FROM ai_call_budgets
+ WHERE provider IS NULL
+ """, {})
+
+ budgets = {r['period']: float(r['budget_usd']) for r in budget_rows}
+ s = spent[0] if spent else {}
+
+ return {
+ 'daily_spent': float(s.get('daily_spent') or 0),
+ 'weekly_spent': float(s.get('weekly_spent') or 0),
+ 'monthly_spent': float(s.get('monthly_spent') or 0),
+ 'daily_budget': budgets.get('daily', 0.0),
+ 'weekly_budget': budgets.get('weekly', 0.0),
+ 'monthly_budget': budgets.get('monthly', 0.0),
+ }
+
+
+def _query_cache_hit_stats(target_date: date) -> Dict[str, Any]:
+ """Section 4 — Anthropic / Gemini prompt cache 命中統計。"""
+ day_start, day_end = _date_window(target_date)
+
+ rows = _exec_query("""
+ SELECT
+ provider,
+ COUNT(*) AS total_calls,
+ SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) AS cache_hits
+ FROM ai_calls
+ WHERE called_at >= :start AND called_at < :end
+ AND provider IN ('claude','gemini')
+ GROUP BY provider
+ """, {'start': day_start, 'end': day_end})
+
+ by_p = {r['provider']: r for r in rows}
+ out: Dict[str, Any] = {}
+ for p in ('claude', 'gemini'):
+ r = by_p.get(p, {})
+ total = int(r.get('total_calls') or 0)
+ hits = int(r.get('cache_hits') or 0)
+ out[p] = {
+ 'total': total,
+ 'hits': hits,
+ 'pct': (hits / total * 100.0) if total else 0.0,
+ }
+ return out
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# 內部:告警偵測(Section 6)
+# ═══════════════════════════════════════════════════════════════════════════════
+
+def _detect_alerts(
+ summary: Dict[str, Any],
+ by_provider: List[Dict[str, Any]],
+ top_callers: List[Dict[str, Any]],
+ trends: Dict[str, Any],
+ budgets: Dict[str, Any],
+ cache_stats: Dict[str, Any],
+) -> List[Dict[str, str]]:
+ """依 7 條規則產生告警清單,回傳 [{level, icon, title, suggestion}, ...]"""
+ alerts: List[Dict[str, str]] = []
+
+ # R1: 單一 caller 暴增 (P2)
+ spike_factor = _ALERT_RULES['caller_spike_factor']
+ for caller in top_callers:
+ delta = caller.get('delta_pct')
+ if delta is not None and delta >= (spike_factor - 1) * 100.0:
+ alerts.append({
+ 'level': 'P2', 'icon': '🟠',
+ 'title': f"{caller['caller']} token 暴增 {delta:+.0f}%(vs 7 日均)",
+ 'suggestion': f"今日 {caller['tokens']:,} tokens / {caller['calls']} calls,建議查 prompt 是否變更",
+ })
+
+ # R2: Gemini 占比飆升 (P2 「Ollama-First 失守」)
+ gemini = next((r for r in by_provider if r['provider'] == 'gemini'), {})
+ gemini_pct = float(gemini.get('pct') or 0)
+ if gemini_pct > _ALERT_RULES['gemini_share_threshold']:
+ alerts.append({
+ 'level': 'P2', 'icon': '🟠',
+ 'title': f"Gemini 占比 {gemini_pct:.1f}% 高於門檻 {_ALERT_RULES['gemini_share_threshold']:.0f}%",
+ 'suggestion': "Ollama-First 失守,請檢查 fallback 是否正確命中本地",
+ })
+
+ # R3: 失敗率 (P1)
+ total_calls = int(summary.get('total_calls') or 0)
+ failed = int(summary.get('failed_calls') or 0)
+ if total_calls:
+ err_rate = failed / total_calls * 100.0
+ if err_rate > _ALERT_RULES['error_rate_critical']:
+ alerts.append({
+ 'level': 'P1', 'icon': '🔴',
+ 'title': f"全域失敗率 {err_rate:.1f}% 超過門檻 {_ALERT_RULES['error_rate_critical']:.0f}%",
+ 'suggestion': f"今日 {failed:,} / {total_calls:,} 失敗,立即查 ai_calls WHERE status<>'ok'",
+ })
+
+ # R4: 預算超標 (P1)
+ for period_key, label in (('daily', '日'), ('weekly', '週'), ('monthly', '月')):
+ spent = float(budgets.get(f'{period_key}_spent') or 0)
+ budget = float(budgets.get(f'{period_key}_budget') or 0)
+ if budget > 0:
+ usage_pct = spent / budget * 100.0
+ if usage_pct > _ALERT_RULES['budget_warning']:
+ alerts.append({
+ 'level': 'P1', 'icon': '🔴',
+ 'title': f"{label}成本 ${spent:.2f} 達預算 ${budget:.2f} 的 {usage_pct:.0f}%",
+ 'suggestion': "請檢查供應商分布是否異常(Section 2/3)或調整預算",
+ })
+
+ # R5: GCP 命中率低 (P2) — 僅當有 Ollama 流量時才檢查
+ today_gcp_hit = float(trends.get('today_gcp_hit_pct') or 0)
+ ollama = sum(int(r.get('tokens') or 0) for r in by_provider
+ if r['provider'] in ('gcp_ollama', 'ollama_secondary', 'ollama_111'))
+ if ollama > 0 and today_gcp_hit < _ALERT_RULES['gcp_hit_warning']:
+ alerts.append({
+ 'level': 'P2', 'icon': '🟠',
+ 'title': f"GCP Ollama 命中率 {today_gcp_hit:.1f}% 低於 {_ALERT_RULES['gcp_hit_warning']:.0f}%",
+ 'suggestion': "111 fallback 觸發頻繁,請檢查 GCP Ollama 健康(ADR-027)",
+ })
+
+ # R6: Cache 命中率低 (INFO) — claude
+ claude_cache = cache_stats.get('claude', {})
+ if int(claude_cache.get('total') or 0) >= 10:
+ if float(claude_cache.get('pct') or 0) < _ALERT_RULES['cache_hit_low']:
+ alerts.append({
+ 'level': 'INFO', 'icon': '🟢',
+ 'title': f"Claude prompt cache 命中率僅 {claude_cache['pct']:.1f}%",
+ 'suggestion': "可優化 system prompt 結構(≥1024 tokens 才觸發 cache)",
+ })
+
+ return alerts
+
+
+def _generate_insights(
+ target_date: date,
+ summary: Dict[str, Any],
+ by_provider: List[Dict[str, Any]],
+) -> List[Dict[str, str]]:
+ """Section 6 智能建議(規則引擎,零 LLM 成本)。"""
+ insights: List[Dict[str, str]] = []
+
+ ollama_pct = float(summary.get('ollama_pct') or 0)
+ if ollama_pct >= _OLLAMA_FIRST_TARGET_PCT:
+ insights.append({
+ 'icon': '✅',
+ 'text': f"Ollama 占比 {ollama_pct:.1f}%(目標 ≥{_OLLAMA_FIRST_TARGET_PCT:.0f}%),Ollama-First 戰役達標",
+ })
+ else:
+ insights.append({
+ 'icon': '⚠️',
+ 'text': f"Ollama 占比 {ollama_pct:.1f}% 未達 {_OLLAMA_FIRST_TARGET_PCT:.0f}% 目標,可優化 fallback 鏈",
+ })
+
+ nim_total = sum(
+ int(r.get('tokens') or 0) for r in by_provider
+ if r['provider'] in ('nim', 'nim_via_elephant')
+ )
+ if 0 < nim_total < 100_000:
+ insights.append({
+ 'icon': '✅',
+ 'text': f"NIM 用量已降至 {nim_total:,} tokens(戰役前約 5M),可考慮關閉 NIM 依賴",
+ })
+
+ success_rate = float(summary.get('success_rate') or 0)
+ if summary.get('total_calls') and success_rate >= 99.0:
+ insights.append({
+ 'icon': '✅',
+ 'text': f"成功率 {success_rate:.1f}%,鏈路健康度高",
+ })
+
+ return insights
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# 內部:報表組裝
+# ═══════════════════════════════════════════════════════════════════════════════
+
+def _format_report(
+ target_date: date,
+ summary: Dict[str, Any],
+ by_provider: List[Dict[str, Any]],
+ top_callers: List[Dict[str, Any]],
+ costs: List[Dict[str, Any]],
+ trends: Dict[str, Any],
+ budgets: Dict[str, Any],
+ cache_stats: Dict[str, Any],
+ alerts: List[Dict[str, str]],
+ insights: List[Dict[str, str]],
+) -> str:
+ """組裝完整 HTML 報表。所有 caller/model 字串均經 _esc。"""
+ weekday_zh = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'][target_date.weekday()]
+ now_str = datetime.now(_TAIPEI_TZ).strftime('%H:%M:%S')
+
+ lines: List[str] = []
+
+ # Header
+ lines.append(f"📊 LLM Token 日報 {target_date.isoformat()} ({weekday_zh})")
+ lines.append("═══════════════════════════════════════")
+ lines.append(f"⏰ 統計區間:00:00 ~ 23:59 (UTC+8)")
+ lines.append(f"🔄 報表生成:{now_str} | 涵蓋筆數:{summary['total_calls']:,} calls")
+
+ # Section 1
+ lines.append("")
+ lines.append("━━━━━ 【1】今日總覽 TL;DR ━━━━━")
+ wow_sign = "+" if summary['wow_pct'] >= 0 else ""
+ lines.append(f"🪙 總 Token: {summary['total_tokens']:,} ({wow_sign}{summary['wow_pct']:.1f}% vs 昨日)")
+ lines.append(f"💰 總成本: US$ {summary['total_cost_usd']:.2f}")
+ lines.append(f"⚡ 平均延遲: {summary['avg_duration_ms']:.0f} ms")
+ lines.append(f"✅ 成功率: {summary['success_rate']:.1f}% ({summary['failed_calls']} 失敗 / {summary['total_calls']})")
+ ollama_check = "✅" if summary['ollama_pct'] >= _OLLAMA_FIRST_TARGET_PCT else "⚠️"
+ lines.append(f"🎯 Ollama 占比:{summary['ollama_pct']:.1f}% {ollama_check}")
+
+ # Section 2
+ lines.append("")
+ lines.append("━━━━━ 【2】供應商分布 ━━━━━")
+ for p in by_provider:
+ icon, name = _PROVIDER_DISPLAY[p['provider']]
+ if p['calls'] == 0:
+ continue # 0 筆者跳過避免雜訊
+ lines.append(
+ f"{icon} {_pad(name, 14)} "
+ f"{_fmt_kb(p['tokens']):>8} ({p['pct']:5.1f}%) "
+ f"{p['calls']:>5} calls "
+ f"${p['cost_usd']:6.2f} "
+ f"{p['avg_duration_ms']:5.0f}ms"
+ )
+
+ # Section 3
+ lines.append("")
+ lines.append(f"━━━━━ 【3】呼叫點 TOP {len(top_callers)} (按 Token) ━━━━━")
+ medals = ['🥇', '🥈', '🥉']
+ for i, c in enumerate(top_callers):
+ rank = medals[i] if i < 3 else f" {i+1}"
+ flag = ""
+ if c.get('delta_pct') is not None:
+ d = c['delta_pct']
+ if d >= 40: flag = f" ⚠️ {d:+.0f}%"
+ elif d <= -50: flag = f" 🎉 {d:+.0f}%"
+ lines.append(
+ f"{rank} {_esc(c['caller'])}"
+ f" / {_esc(c['provider'])} / {_esc(c['model'])[:24]}"
+ )
+ lines.append(f" {_fmt_kb(c['tokens']):>8} | {c['calls']:>5} calls{flag}")
+
+ # Section 4
+ lines.append("")
+ lines.append("━━━━━ 【4】成本分析 + 預算對比 ━━━━━")
+ lines.append(_budget_line("📅 本日成本", budgets['daily_spent'], budgets['daily_budget']))
+ lines.append(_budget_line("📅 本週累計", budgets['weekly_spent'], budgets['weekly_budget']))
+ lines.append(_budget_line("📅 本月累計", budgets['monthly_spent'], budgets['monthly_budget']))
+
+ if costs:
+ lines.append("")
+ lines.append("成本拆解 by Model:")
+ for c in costs[:6]:
+ lines.append(f" {_esc(c['model'])[:32]:<32} ${c['cost_usd']:7.4f} ({c['calls']} calls)")
+
+ # Cache 命中
+ lines.append("")
+ lines.append("Prompt Cache 命中:")
+ cc = cache_stats.get('claude', {})
+ if cc.get('total'):
+ lines.append(f" Claude: {cc['hits']:>4} / {cc['total']:<4} ({cc['pct']:5.1f}%)")
+ else:
+ lines.append(" Claude: N/A")
+ gc = cache_stats.get('gemini', {})
+ if gc.get('total'):
+ lines.append(f" Gemini: {gc['hits']:>4} / {gc['total']:<4} ({gc['pct']:5.1f}%)")
+ else:
+ lines.append(" Gemini: N/A")
+
+ # Section 5
+ lines.append("")
+ lines.append("━━━━━ 【5】趨勢與洞察 (vs 7 日均) ━━━━━")
+ lines.append(_trend_line("總 Tokens", trends['today_total_tokens'], trends['7d_avg_total']))
+ lines.append(_trend_line("Gemini Tokens", trends['today_gemini_tokens'], trends['7d_avg_gemini']))
+ lines.append(_trend_line("Ollama Tokens", trends['today_ollama_tokens'], trends['7d_avg_ollama']))
+ lines.append(_trend_line("Claude Tokens", trends['today_claude_tokens'], trends['7d_avg_claude']))
+ lines.append(_trend_line("平均延遲(ms)", trends['today_avg_duration'], trends['7d_avg_duration'], unit=''))
+
+ lines.append("")
+ lines.append(f"📈 7 日累計:{_fmt_kb(trends['7d_total_tokens'])} tokens / US$ {trends['7d_total_cost']:.2f}")
+
+ # Section 6
+ lines.append("")
+ lines.append("━━━━━ 【6】告警與建議 ━━━━━")
+ if alerts:
+ for a in alerts:
+ lines.append(f"{a['icon']} [{a['level']}] {_esc(a['title'])}")
+ lines.append(f" 建議:{_esc(a['suggestion'])}")
+ else:
+ lines.append("✅ 無異常告警")
+
+ if insights:
+ lines.append("")
+ lines.append("🔮 智能建議 (Hermes 規則引擎):")
+ for ins in insights:
+ lines.append(f" {ins['icon']} {_esc(ins['text'])}")
+
+ # Footer
+ lines.append("")
+ lines.append("═══════════════════════════════════════")
+ lines.append("🤖 Operation Ollama-First v5.0 / token_report v1.0")
+
+ return "\n".join(lines)
+
+
+def _format_failure_report(target_date: date, error: str) -> str:
+ """DB 查詢失敗時的最簡訊息(仍保留 HTML escape)。"""
+ return (
+ f"⚠️ LLM Token 日報生成失敗 ({target_date.isoformat()})\n"
+ f"━━━━━━━━━━━━━━━━━━━━\n"
+ f"錯誤:{_esc(error)[:300]}\n"
+ f"請查 logs:docker logs momo-scheduler | grep TokenReport"
+ )
+
+
+def _persist_to_ai_insights(target_date: date, content: str, send_result: Dict[str, Any]) -> None:
+ """寫一筆 ai_insights,type='daily_token_report',metadata 不含 PII。"""
+ from sqlalchemy import text
+ from database.manager import get_session
+ import json as _json
+
+ meta = {
+ 'target_date': target_date.isoformat(),
+ 'sent': int(send_result.get('sent', 0)),
+ 'failed': int(send_result.get('failed', 0)),
+ 'chars': int(send_result.get('chars', 0)),
+ # 注意:絕不存 username / first_name / chat_id
+ }
+
+ session = get_session()
+ try:
+ session.execute(text("""
+ INSERT INTO ai_insights (
+ insight_type, period, content, metadata_json,
+ avg_quality, status, decay_exempt, ai_model,
+ created_by, created_at, updated_at
+ ) VALUES (
+ 'daily_token_report', :period, :content, :meta,
+ 0.9, 'approved', TRUE, 'rule_engine',
+ 'token_report_service', NOW(), NOW()
+ )
+ """), {
+ 'period': target_date.isoformat(),
+ 'content': content[:8000], # ai_insights.content 為 TEXT,仍設上限保險
+ 'meta': _json.dumps(meta, ensure_ascii=False),
+ })
+ session.commit()
+ except Exception:
+ session.rollback()
+ raise
+ finally:
+ session.close()
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# 內部:格式化工具
+# ═══════════════════════════════════════════════════════════════════════════════
+
+def _esc(s: Any) -> str:
+ """HTML escape;對齊 telegram_templates._html_escape 行為。"""
+ text = "" if s is None else str(s)
+ return (text.replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">"))
+
+
+def _pad(s: str, width: int) -> str:
+ """中文寬字元 padding(中文字以 2 寬度計)。"""
+ visible = sum(2 if ord(c) > 127 else 1 for c in s)
+ return s + " " * max(0, width - visible)
+
+
+def _fmt_kb(tokens: int) -> str:
+ """token 數 → 1.2K / 3.4M 顯示。"""
+ n = int(tokens or 0)
+ if n >= 1_000_000:
+ return f"{n/1_000_000:.1f}M"
+ if n >= 1_000:
+ return f"{n/1_000:.0f}K"
+ return f"{n}"
+
+
+def _budget_line(label: str, spent: float, budget: float) -> str:
+ """產出單列預算進度條(10 格條)。"""
+ if budget <= 0:
+ return f"{label}: US$ {spent:6.2f} ({_pad('未設定預算', 10)})"
+ pct = min(100.0, spent / budget * 100.0)
+ filled = int(pct / 10)
+ bar = "▓" * filled + "░" * (10 - filled)
+ return f"{label}: US$ {spent:6.2f} {bar} {pct:3.0f}% / ${budget:.0f} 預算"
+
+
+def _trend_line(label: str, today: float, baseline: float, unit: str = '') -> str:
+ """產出單列趨勢比較。"""
+ today_n = float(today or 0)
+ base_n = float(baseline or 0)
+ if base_n > 0:
+ delta = (today_n - base_n) / base_n * 100.0
+ sign = "+" if delta >= 0 else ""
+ arrow = "↗" if delta >= 5 else ("↘" if delta <= -5 else "→")
+ else:
+ delta = 0.0
+ sign = ""
+ arrow = "—"
+
+ today_str = _fmt_kb(int(today_n)) if 'Tokens' in label else f"{today_n:,.0f}{unit}"
+ base_str = _fmt_kb(int(base_n)) if 'Tokens' in label else f"{base_n:,.0f}{unit}"
+ return f" {_pad(label, 14)} {today_str:>8} vs {base_str:>8} ({sign}{delta:5.1f}%) {arrow}"
diff --git a/tests/test_ai_call_logger.py b/tests/test_ai_call_logger.py
new file mode 100644
index 0000000..c6b2bc0
--- /dev/null
+++ b/tests/test_ai_call_logger.py
@@ -0,0 +1,426 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+tests/test_ai_call_logger.py
+ai_call_logger 單元測試 (Operation Ollama-First v5.0 — Phase 1)
+
+測試紀律 (對應 phase1 spec):
+ - context manager 正常路徑(status='ok')
+ - context manager 例外路徑(status='error',例外仍 re-raise)
+ - decorator 正常路徑 + auto token extract
+ - DB 失敗時主流程不爆
+ - cost 計算正確(gemini-2.5-flash / 未知 model fallback / NIM 免費)
+ - 環境開關 AI_CALL_LOGGING_ENABLED=false 時跳過寫入
+ - kill-switch 連續失敗 ≥ 10 次降級
+ - PII 保護:set_prompt_hash 只存前 12 碼
+"""
+
+import os
+import sys
+import time
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+# 隔離 import:避免被 ai_call_logger 內部 lazy import 的 database.manager 拖到
+import services.ai_call_logger as logger_mod
+from services.ai_call_logger import (
+ COST_TABLE,
+ _calc_cost,
+ _CallState,
+ _is_logging_enabled,
+ _reset_kill_switch,
+ log_ai_call,
+ logged_ai_call,
+)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Fixtures
+# ─────────────────────────────────────────────────────────────────────────────
+
+@pytest.fixture(autouse=True)
+def reset_state(monkeypatch):
+ """每個測試前重置 kill-switch 並 stub 掉真實 DB 寫入。"""
+ _reset_kill_switch()
+
+ # stub _write_to_db:把寫入內容收集到 list(避免真連 DB)
+ captured = []
+
+ def fake_write(state):
+ captured.append({
+ 'caller': state.caller,
+ 'provider': state.provider,
+ 'model': state.model,
+ 'input_tokens': state.input_tokens,
+ 'output_tokens': state.output_tokens,
+ 'duration_ms': state.duration_ms,
+ 'status': state.status,
+ 'fallback_to': state.fallback_to,
+ 'cost_usd': _calc_cost(state.model, state.input_tokens, state.output_tokens),
+ 'cache_hit': state.cache_hit,
+ 'rag_hit': state.rag_hit,
+ 'request_id': state.request_id,
+ 'error': state.error,
+ 'meta': dict(state.meta),
+ })
+
+ monkeypatch.setattr(logger_mod, '_write_to_db', fake_write)
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
+
+ # 把 captured 暴露給測試使用
+ yield captured
+
+
+def _wait_for_async(captured, n=1, timeout=2.0):
+ """等待 daemon thread 寫完。"""
+ deadline = time.time() + timeout
+ while time.time() < deadline:
+ if len(captured) >= n:
+ return True
+ time.sleep(0.01)
+ return False
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# context manager 測試
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_context_manager_happy_path(reset_state):
+ captured = reset_state
+ with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx:
+ ctx.set_tokens(input=120, output=80)
+ ctx.set_cache_hit(False)
+
+ assert _wait_for_async(captured, 1), "async write 未完成"
+ assert len(captured) == 1
+ rec = captured[0]
+ assert rec['caller'] == 'hermes_analyst'
+ assert rec['provider'] == 'gcp_ollama'
+ assert rec['model'] == 'hermes3:latest'
+ assert rec['input_tokens'] == 120
+ assert rec['output_tokens'] == 80
+ assert rec['status'] == 'ok'
+ assert rec['error'] is None
+ assert rec['duration_ms'] is not None and rec['duration_ms'] >= 0
+
+
+def test_context_manager_exception_path(reset_state):
+ captured = reset_state
+ with pytest.raises(ValueError, match="boom"):
+ with log_ai_call('nemotron_dispatch', 'nim', 'meta/llama-3.1-8b-instruct'):
+ raise ValueError("boom")
+
+ assert _wait_for_async(captured, 1)
+ rec = captured[0]
+ assert rec['status'] == 'error'
+ assert rec['error'] is not None
+ assert 'ValueError' in rec['error']
+ assert 'boom' in rec['error']
+
+
+def test_context_manager_explicit_fallback(reset_state):
+ captured = reset_state
+ with log_ai_call('openclaw_qa', 'gemini', 'gemini-2.5-flash') as ctx:
+ ctx.fallback_to_caller('openclaw_bot_nim')
+
+ assert _wait_for_async(captured, 1)
+ rec = captured[0]
+ assert rec['status'] == 'fallback'
+ assert rec['fallback_to'] == 'openclaw_bot_nim'
+
+
+def test_context_manager_set_error_without_raise(reset_state):
+ """caller 主動 set_error 但不 raise(例如 LLM 回 success=false)"""
+ captured = reset_state
+ with log_ai_call('sales_copy', 'gcp_ollama', 'llama3.1:8b') as ctx:
+ ctx.set_error('timeout after 30s')
+ ctx.set_tokens(input=50, output=0)
+
+ assert _wait_for_async(captured, 1)
+ rec = captured[0]
+ assert rec['status'] == 'error'
+ assert 'timeout' in rec['error']
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# decorator 測試
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_decorator_happy_path(reset_state):
+ captured = reset_state
+
+ @logged_ai_call(caller='trend_match', provider='gcp_ollama', model='llama3.1:8b')
+ def fake_call(prompt: str):
+ return {'response': 'ok', 'eval_count': 42, 'prompt_eval_count': 100}
+
+ out = fake_call("hello")
+ assert out['response'] == 'ok'
+
+ assert _wait_for_async(captured, 1)
+ rec = captured[0]
+ assert rec['caller'] == 'trend_match'
+ assert rec['model'] == 'llama3.1:8b'
+ assert rec['input_tokens'] == 100
+ assert rec['output_tokens'] == 42
+ assert rec['status'] == 'ok'
+
+
+def test_decorator_with_model_extractor(reset_state):
+ captured = reset_state
+
+ @logged_ai_call(
+ caller='ppt_gemini',
+ provider='gemini',
+ model_extractor=lambda args, kw: kw.get('model', 'gemini-2.0-flash'),
+ )
+ def fake_call(*, model: str, prompt: str):
+ return {'usage': {'prompt_tokens': 200, 'completion_tokens': 50}}
+
+ fake_call(model='gemini-2.5-flash', prompt='x')
+
+ assert _wait_for_async(captured, 1)
+ rec = captured[0]
+ assert rec['model'] == 'gemini-2.5-flash'
+ assert rec['input_tokens'] == 200
+ assert rec['output_tokens'] == 50
+
+
+def test_decorator_exception_does_reraise(reset_state):
+ captured = reset_state
+
+ @logged_ai_call(caller='code_review_hermes', provider='gcp_ollama', model='hermes3:latest')
+ def fake_call():
+ raise RuntimeError("net down")
+
+ with pytest.raises(RuntimeError, match="net down"):
+ fake_call()
+
+ assert _wait_for_async(captured, 1)
+ assert captured[0]['status'] == 'error'
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# DB 失敗不爆主流程
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_db_failure_does_not_break_main_flow(monkeypatch, caplog):
+ """驗證 _write_to_db 實際碰到 DB 失敗時,例外不會冒到主流程。
+
+ 直接同步呼叫真實 _write_to_db(已含 try/except);不開 thread,避免噪音。
+ """
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
+
+ # 把 daemon thread 換成同步呼叫,讓我們直接觀察 _write_to_db 行為
+ class SyncThread:
+ def __init__(self, target=None, args=(), kwargs=None, **_):
+ self._target = target
+ self._args = args
+ self._kwargs = kwargs or {}
+
+ def start(self):
+ self._target(*self._args, **self._kwargs)
+
+ monkeypatch.setattr(logger_mod.threading, 'Thread', SyncThread)
+
+ # autouse fixture 已 stub _write_to_db;這裡覆寫成「真實會失敗的版本」
+ def real_write_that_fails(state):
+ try:
+ raise ImportError("simulated DB unavailable")
+ except Exception as e:
+ logger_mod._record_failure()
+ logger_mod.logger.warning(
+ "[AICallLogger] write failed (caller=%s provider=%s): %s",
+ state.caller, state.provider, e,
+ )
+
+ monkeypatch.setattr(logger_mod, '_write_to_db', real_write_that_fails)
+
+ # 主流程不應 raise。
+ with caplog.at_level('WARNING'):
+ with log_ai_call('hermes_intent', 'gcp_ollama', 'hermes3:latest') as ctx:
+ ctx.set_tokens(input=10, output=5)
+
+ # 至少有一條 [AICallLogger] write failed warning(caller 已 catch)
+ assert any('write failed' in r.message for r in caplog.records), \
+ "預期 _write_to_db 失敗時 log warning"
+
+
+def test_async_dispatch_failure_swallowed(monkeypatch):
+ """模擬 thread.start() 失敗(極端 case),主流程也不能爆。"""
+
+ class BadThread:
+ def __init__(self, *a, **kw):
+ raise OSError("can't fork")
+
+ monkeypatch.setattr(logger_mod.threading, 'Thread', BadThread)
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
+
+ # 不應 raise
+ with log_ai_call('x', 'y', 'z'):
+ pass
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# cost 計算
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_calc_cost_gemini_flash():
+ """gemini-2.5-flash 1M in + 100K out = $0.075 + $0.030 = $0.105"""
+ cost = _calc_cost('gemini-2.5-flash', 1_000_000, 100_000)
+ assert cost == pytest.approx(0.105, rel=1e-6)
+
+
+def test_calc_cost_claude_opus():
+ """claude-opus-4-7 1K in + 1K out = $0.015 + $0.075 = $0.090 / 1000 = $0.00009"""
+ cost = _calc_cost('claude-opus-4-7', 1000, 1000)
+ expected = (1000 * 15.0 + 1000 * 75.0) / 1_000_000
+ assert cost == pytest.approx(expected, rel=1e-6)
+
+
+def test_calc_cost_ollama_zero():
+ assert _calc_cost('hermes3:latest', 100_000, 100_000) == 0.0
+ assert _calc_cost('llama3.1:8b', 999_999, 999_999) == 0.0
+
+
+def test_calc_cost_unknown_model_returns_zero(caplog):
+ with caplog.at_level('WARNING'):
+ cost = _calc_cost('totally-fake-model-xyz', 1_000_000, 1_000_000)
+ assert cost == 0.0
+ assert any('unknown model cost' in r.message for r in caplog.records)
+
+
+def test_calc_cost_nim_prefix_silent_zero(caplog):
+ """nvidia/* meta/* deepseek-* 不應觸發 unknown warning。"""
+ with caplog.at_level('WARNING'):
+ cost = _calc_cost('nvidia/some-future-model', 1_000_000, 1_000_000)
+ assert cost == 0.0
+ assert not any('unknown model cost' in r.message for r in caplog.records)
+
+
+def test_calc_cost_negative_or_none_safe():
+ assert _calc_cost('gemini-2.5-flash', None, None) == 0.0
+ assert _calc_cost('', 100, 100) == 0.0
+ assert _calc_cost('gemini-2.5-flash', -1, -5) == 0.0
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 環境開關
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_logging_disabled_skips_write(monkeypatch):
+ captured = []
+
+ def fake_write(state):
+ captured.append(state)
+
+ monkeypatch.setattr(logger_mod, '_write_to_db', fake_write)
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'false')
+
+ with log_ai_call('sales_copy', 'gcp_ollama', 'llama3.1:8b') as ctx:
+ ctx.set_tokens(input=10, output=10)
+
+ time.sleep(0.05)
+ assert len(captured) == 0, "AI_CALL_LOGGING_ENABLED=false 時不應寫入"
+
+
+def test_logging_enabled_default_true(monkeypatch):
+ monkeypatch.delenv('AI_CALL_LOGGING_ENABLED', raising=False)
+ assert _is_logging_enabled() is True
+
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', '0')
+ assert _is_logging_enabled() is False
+
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'OFF')
+ assert _is_logging_enabled() is False
+
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
+ assert _is_logging_enabled() is True
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Kill-switch
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_kill_switch_after_consecutive_failures(monkeypatch, caplog):
+ """連續失敗 >= 10 次後降級為 logger.info。"""
+ _reset_kill_switch()
+
+ # 真實 _write_to_db 會 catch 例外然後 _record_failure;這裡直接模擬
+ monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
+
+ # 強制觸發 10 次失敗
+ for _ in range(10):
+ logger_mod._record_failure()
+
+ assert logger_mod._is_killed() is True
+
+ # 之後再 _async_write 應該不會啟動新 thread(看是否走 logger.info 分支)
+ captured_threads = []
+
+ class TrackingThread:
+ def __init__(self, *a, **kw):
+ captured_threads.append(kw.get('target'))
+
+ def start(self):
+ pass
+
+ monkeypatch.setattr(logger_mod.threading, 'Thread', TrackingThread)
+
+ with log_ai_call('x', 'y', 'z'):
+ pass
+
+ time.sleep(0.05)
+ assert len(captured_threads) == 0, "kill-switch 啟動後不應再開新 thread"
+
+
+def test_record_success_resets_failure_counter():
+ _reset_kill_switch()
+ for _ in range(5):
+ logger_mod._record_failure()
+ assert logger_mod._failure_state['count'] == 5
+ logger_mod._record_success()
+ assert logger_mod._failure_state['count'] == 0
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# PII 保護
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_set_prompt_hash_truncates_to_12():
+ state = _CallState('a', 'b', 'c', None, {})
+ state.set_prompt_hash('Hello world some sensitive PII content here')
+ assert 'prompt_hash' in state.meta
+ assert len(state.meta['prompt_hash']) == 12
+ # 確認不是原文
+ assert 'Hello' not in state.meta['prompt_hash']
+
+
+def test_meta_does_not_leak_raw_prompt_into_call_state():
+ """log_ai_call 介面不接受原始 prompt 欄位(只能透過 set_prompt_hash 進去)。"""
+ with log_ai_call('x', 'y', 'z', meta={'temperature': 0.3}) as ctx:
+ ctx.set_prompt_hash("super secret user prompt 123")
+ assert 'prompt_hash' in ctx.meta
+ assert ctx.meta['temperature'] == 0.3
+ # meta 中不應有 'prompt' key(除非 caller 自己加)
+ assert 'prompt' not in ctx.meta
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 雜項:cost table 鍵值完整性
+# ─────────────────────────────────────────────────────────────────────────────
+
+def test_cost_table_contains_critical_models():
+ """phase0 audit 列舉的關鍵模型必須在表內。"""
+ critical = [
+ 'gemini-2.5-flash',
+ 'gemini-2.0-flash',
+ 'meta/llama-3.1-8b-instruct',
+ 'hermes3:latest',
+ 'qwen2.5-coder:7b',
+ 'llama3.1:8b',
+ 'bge-m3:latest',
+ ]
+ for m in critical:
+ assert m in COST_TABLE, f"COST_TABLE missing {m}"
diff --git a/tests/test_token_report_service.py b/tests/test_token_report_service.py
new file mode 100644
index 0000000..e8ff7f2
--- /dev/null
+++ b/tests/test_token_report_service.py
@@ -0,0 +1,526 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+tests/test_token_report_service.py
+LLM Token 日報服務單元測試 (Operation Ollama-First v5.0 — Phase 1 收尾)
+
+測試紀律:
+ - 不真連 DB:mock _exec_query 返回固定資料
+ - 不真連 Telegram:mock send_telegram_with_result
+ - 不真寫 ai_insights:mock _persist_to_ai_insights
+ - 7 個告警規則各自獨立觸發測試
+ - HTML escape 驗證(caller 名含 < / & 不破版)
+ - 訊息字數 ≤ 4096 驗證
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+from datetime import date, datetime, timedelta, timezone
+from typing import Any, Dict, List
+
+import pytest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import services.token_report_service as svc
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 共用 fixtures
+# ─────────────────────────────────────────────────────────────────────────────
+
+TARGET_DATE = date(2026, 5, 3)
+
+
+def _make_summary(**overrides) -> Dict[str, Any]:
+ base = {
+ 'total_tokens': 3_142_891,
+ 'total_calls': 2_847,
+ 'total_cost_usd': 0.36,
+ 'avg_duration_ms': 1847.0,
+ 'success_rate': 98.7,
+ 'failed_calls': 37,
+ 'ollama_pct': 64.3,
+ 'prev_total_tokens': 2_905_000,
+ 'wow_pct': 8.2,
+ }
+ base.update(overrides)
+ return base
+
+
+def _make_by_provider(**overrides) -> List[Dict[str, Any]]:
+ """7 個 provider 的預設配置,可用 overrides={'gemini': {'pct': 50}} 覆寫"""
+ defaults = {
+ 'gcp_ollama': {'tokens': 2_021_000, 'pct': 64.3, 'calls': 2103, 'cost_usd': 0.0, 'avg_duration_ms': 1200},
+ 'ollama_111': {'tokens': 12_000, 'pct': 0.4, 'calls': 18, 'cost_usd': 0.0, 'avg_duration_ms': 2400},
+ 'gemini': {'tokens': 892_000, 'pct': 28.4, 'calls': 589, 'cost_usd': 0.31, 'avg_duration_ms': 2100},
+ 'claude': {'tokens': 178_000, 'pct': 5.7, 'calls': 98, 'cost_usd': 0.04, 'avg_duration_ms': 3200},
+ 'nim': {'tokens': 28_000, 'pct': 0.9, 'calls': 24, 'cost_usd': 0.0, 'avg_duration_ms': 1800},
+ 'openrouter': {'tokens': 12_000, 'pct': 0.4, 'calls': 15, 'cost_usd': 0.01, 'avg_duration_ms': 2900},
+ 'nim_via_elephant': {'tokens': 27_000, 'pct': 0.9, 'calls': 12, 'cost_usd': 0.0, 'avg_duration_ms': 3100},
+ }
+ for k, v in (overrides or {}).items():
+ defaults.setdefault(k, {}).update(v)
+ return [{'provider': k, **v} for k, v in defaults.items()]
+
+
+def _make_top_callers() -> List[Dict[str, Any]]:
+ return [
+ {'caller': 'km_embedding_worker', 'provider': 'gcp_ollama',
+ 'model': 'bge-m3:latest', 'tokens': 892_000, 'calls': 1247, 'delta_pct': 5.0},
+ {'caller': 'hermes_analyst', 'provider': 'gcp_ollama',
+ 'model': 'hermes3:latest', 'tokens': 482_000, 'calls': 72, 'delta_pct': -2.0},
+ {'caller': 'code_review_hermes', 'provider': 'claude',
+ 'model': 'claude-opus-4-7', 'tokens': 158_000, 'calls': 8, 'delta_pct': 42.0},
+ ]
+
+
+def _make_trends() -> Dict[str, Any]:
+ return {
+ 'today_total_tokens': 3_142_000,
+ 'today_gemini_tokens': 892_000,
+ 'today_ollama_tokens': 2_033_000,
+ 'today_claude_tokens': 178_000,
+ 'today_avg_duration': 1847.0,
+ 'today_error_rate': 1.3,
+ 'today_gcp_hit_pct': 99.6,
+ '7d_avg_total': 2_905_000,
+ '7d_avg_gemini': 948_000,
+ '7d_avg_ollama': 1_712_000,
+ '7d_avg_claude': 165_000,
+ '7d_avg_duration': 1920.0,
+ '7d_error_rate': 1.8,
+ '7d_total_tokens': 18_832_000,
+ '7d_total_cost': 11.84,
+ '7d_gcp_hit_pct_7d': 98.9,
+ '7d_gcp_hit_pct': 98.9,
+ }
+
+
+def _make_budgets(**overrides) -> Dict[str, Any]:
+ base = {
+ 'daily_spent': 0.36,
+ 'weekly_spent': 1.92,
+ 'monthly_spent': 5.84,
+ 'daily_budget': 1.00,
+ 'weekly_budget': 5.00,
+ 'monthly_budget': 20.00,
+ }
+ base.update(overrides)
+ return base
+
+
+def _make_cache_stats(**overrides) -> Dict[str, Any]:
+ base = {
+ 'claude': {'total': 98, 'hits': 62, 'pct': 63.3},
+ 'gemini': {'total': 0, 'hits': 0, 'pct': 0.0},
+ }
+ base.update(overrides)
+ return base
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 1. 報表組裝測試 — generate_daily_report 路徑
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestReportFormat:
+ """測 _format_report 主要章節都出現 & 字數合理。"""
+
+ def test_format_report_contains_all_six_sections(self):
+ """6 個段落標題都應出現。"""
+ out = svc._format_report(
+ target_date=TARGET_DATE,
+ summary=_make_summary(),
+ by_provider=_make_by_provider(),
+ top_callers=_make_top_callers(),
+ costs=[{'provider': 'gemini', 'model': 'gemini-2.5-flash', 'cost_usd': 0.26, 'calls': 50}],
+ trends=_make_trends(),
+ budgets=_make_budgets(),
+ cache_stats=_make_cache_stats(),
+ alerts=[],
+ insights=[{'icon': '✅', 'text': 'Ollama-First 達標'}],
+ )
+ assert '【1】今日總覽' in out
+ assert '【2】供應商分布' in out
+ assert '【3】呼叫點 TOP' in out
+ assert '【4】成本分析' in out
+ assert '【5】趨勢與洞察' in out
+ assert '【6】告警與建議' in out
+
+ def test_format_report_under_telegram_limit(self):
+ """完整報表(含 10 個 caller / 12 個成本項 / 多個告警)不應超過 4096 字元。"""
+ big_callers = _make_top_callers() * 4 # 12 筆
+ big_costs = [{'provider': 'p', 'model': f'model-{i}', 'cost_usd': 0.01, 'calls': 1}
+ for i in range(12)]
+ big_alerts = [
+ {'level': 'P1', 'icon': '🔴', 'title': 'X' * 80, 'suggestion': 'Y' * 80}
+ for _ in range(5)
+ ]
+ out = svc._format_report(
+ target_date=TARGET_DATE,
+ summary=_make_summary(),
+ by_provider=_make_by_provider(),
+ top_callers=big_callers[:10],
+ costs=big_costs,
+ trends=_make_trends(),
+ budgets=_make_budgets(),
+ cache_stats=_make_cache_stats(),
+ alerts=big_alerts,
+ insights=[],
+ )
+ # send_daily_report 端會做 4000 字截斷(HTML 安全),單元測試先確認原始長度可控
+ assert len(out) < 6000, f"原始報表 {len(out)} 字元,可能需縮減欄位寬度"
+
+ def test_format_report_html_escape_caller_name(self):
+ """caller 名含