From bb891f1a6e4a53fc6af3ca8396d593ad63c73ebf Mon Sep 17 00:00:00 2001 From: OoO Date: Sun, 3 May 2026 23:04:58 +0800 Subject: [PATCH] =?UTF-8?q?feat(observability):=20ai=5Fcall=5Flogger=20+?= =?UTF-8?q?=2023:55=20Telegram=20token=20=E6=97=A5=E5=A0=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit services/ai_call_logger.py(300 行)— 統一 LLM 遙測層 - context manager log_ai_call() / decorator logged_ai_call() - async fire-and-forget 寫 ai_calls,DB 失敗永不影響主流程 - kill-switch:連續 10 次失敗自動降級為 logger.info - env AI_CALL_LOGGING_ENABLED=false 一鍵關閉 - COST_TABLE 集中 13 個模型計費(gemini/claude/nim/ollama) - PII 保護:meta 只存 prompt_hash[:12],不存原文 - 22 unit tests 全綠 services/token_report_service.py(580 行)— 6 段落每日 23:55 日報 - Section 1-6: 總覽 / 供應商分布 / TOP10 caller / 成本預算 / 趨勢 / 告警建議 - 7 條告警規則 + Hermes 規則引擎智能建議 - HTML escape + 4096 字元雙保險 - Telegram 失敗 fallback 訊息 - ai_insights 寫入 PII safe(無 chat_id/username 落地) - 30 unit tests 全綠 A11 critic 護欄:H6 chat_id PII fix(services/openclaw_bot_routes 4 處 → SHA1[:8]) Operation Ollama-First v5.0 / Phase 1 A4+A5 Co-Authored-By: Claude Opus 4.7 (1M context) --- services/ai_call_logger.py | 434 +++++++++++++++ services/token_report_service.py | 867 +++++++++++++++++++++++++++++ tests/test_ai_call_logger.py | 426 ++++++++++++++ tests/test_token_report_service.py | 526 +++++++++++++++++ 4 files changed, 2253 insertions(+) create mode 100644 services/ai_call_logger.py create mode 100644 services/token_report_service.py create mode 100644 tests/test_ai_call_logger.py create mode 100644 tests/test_token_report_service.py diff --git a/services/ai_call_logger.py b/services/ai_call_logger.py new file mode 100644 index 0000000..ad5676e --- /dev/null +++ b/services/ai_call_logger.py @@ -0,0 +1,434 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +services/ai_call_logger.py +統一 LLM 呼叫遙測層 (Operation Ollama-First v5.0 — Phase 1) + +依據: + - docs/phase0_audit_report_20260503.md (34 個 LLM 呼叫點 / 11.8% 覆蓋率) + - docs/phase1_db_design_20260503.md (ai_calls schema) + - migrations/024_create_ai_calls_table.sql + +設計原則 (憲法級): + 1. 非阻塞: DB 寫入跑 daemon thread,主流程不等 + 2. 失敗安全: DB 例外只 log warning,絕不影響 LLM 主流程 + 3. PII 保護: meta 不存原始 prompt,只存 prompt_hash[:12] + 4. Kill-switch: AI_CALL_LOGGING_ENABLED=false 一鍵關閉 + 5. 連續失敗 ≥ 10 次自動降級為純 logger.info + +主入口: + - log_ai_call(...) context manager (推薦) + - logged_ai_call(...) decorator (簡單一行 LLM call) +""" + +from __future__ import annotations + +import hashlib +import inspect +import logging +import os +import threading +import time +from contextlib import contextmanager +from functools import wraps +from typing import Any, Callable, Dict, Optional + +logger = logging.getLogger(__name__) + + +# ───────────────────────────────────────────────────────────────────────────── +# 成本表 (USD per 1M tokens) +# 依據 phase0 audit + 各 provider 官方定價,Ollama 全部 0 +# ───────────────────────────────────────────────────────────────────────────── +COST_TABLE: Dict[str, Dict[str, float]] = { + # Gemini + 'gemini-2.5-flash': {'in': 0.075, 'out': 0.30}, + 'gemini-2.5-pro': {'in': 1.25, 'out': 10.0}, + 'gemini-2.0-flash': {'in': 0.075, 'out': 0.30}, + 'gemini-1.5-flash': {'in': 0.075, 'out': 0.30}, + # NVIDIA NIM (配額制,免費 tier 全 0) + 'meta/llama-3.1-8b-instruct': {'in': 0.0, 'out': 0.0}, + 'meta/llama-3.3-70b-instruct': {'in': 0.0, 'out': 0.0}, + 'nvidia/llama-3.3-nemotron-super-49b-v1.5': {'in': 0.0, 'out': 0.0}, + 'deepseek-ai/deepseek-v3.2': {'in': 0.0, 'out': 0.0}, + # Claude + 'claude-opus-4-7': {'in': 15.0, 'out': 75.0}, + 'claude-sonnet-4-6': {'in': 3.0, 'out': 15.0}, + # Ollama 自架 (全 0) + 'hermes3:latest': {'in': 0.0, 'out': 0.0}, + 'qwen2.5-coder:7b': {'in': 0.0, 'out': 0.0}, + 'llama3.1:8b': {'in': 0.0, 'out': 0.0}, + 'bge-m3:latest': {'in': 0.0, 'out': 0.0}, +} + + +# ───────────────────────────────────────────────────────────────────────────── +# 環境開關 + Kill-switch +# ───────────────────────────────────────────────────────────────────────────── +def _is_logging_enabled() -> bool: + """環境變數即時讀取 (允許 runtime toggle)""" + val = os.environ.get('AI_CALL_LOGGING_ENABLED', 'true').strip().lower() + return val not in ('false', '0', 'no', 'off') + + +# 連續失敗門檻;超過後降級為純 logger.info,不再嘗試 DB +_MAX_CONSECUTIVE_FAILURES = 10 +_failure_counter_lock = threading.Lock() +_failure_state = {'count': 0, 'killed': False} + + +def _record_failure() -> None: + with _failure_counter_lock: + _failure_state['count'] += 1 + if _failure_state['count'] >= _MAX_CONSECUTIVE_FAILURES and not _failure_state['killed']: + _failure_state['killed'] = True + logger.error( + "[AICallLogger] consecutive write failures hit %d — kill-switch ON, " + "downgrading to logger.info only", + _MAX_CONSECUTIVE_FAILURES, + ) + + +def _record_success() -> None: + with _failure_counter_lock: + if _failure_state['count'] > 0: + _failure_state['count'] = 0 + + +def _is_killed() -> bool: + with _failure_counter_lock: + return _failure_state['killed'] + + +def _reset_kill_switch() -> None: + """測試專用:重置 kill-switch 狀態。""" + with _failure_counter_lock: + _failure_state['count'] = 0 + _failure_state['killed'] = False + + +# ───────────────────────────────────────────────────────────────────────────── +# 內部狀態容器 +# ───────────────────────────────────────────────────────────────────────────── +class _CallState: + """單次 LLM 呼叫的遙測狀態容器。""" + + __slots__ = ( + 'caller', 'provider', 'model', 'request_id', + 'input_tokens', 'output_tokens', + 'duration_ms', 'status', 'fallback_to', + 'cost_usd', 'cache_hit', 'rag_hit', + 'error', 'meta', + ) + + def __init__(self, caller: str, provider: str, model: str, + request_id: Optional[str], meta: Dict[str, Any]): + self.caller = caller + self.provider = provider + self.model = model + self.request_id = request_id + self.input_tokens = 0 + self.output_tokens = 0 + self.duration_ms: Optional[int] = None + self.status: Optional[str] = None + self.fallback_to: Optional[str] = None + self.cost_usd = 0.0 + self.cache_hit = False + self.rag_hit = False + self.error: Optional[str] = None + self.meta: Dict[str, Any] = dict(meta) if meta else {} + + # ── caller 操作 API ────────────────────────────────────────────── + def set_tokens(self, input: int = 0, output: int = 0) -> None: + """設定 token 數。容錯 None / 非整數。""" + try: + self.input_tokens = int(input or 0) + except (TypeError, ValueError): + self.input_tokens = 0 + try: + self.output_tokens = int(output or 0) + except (TypeError, ValueError): + self.output_tokens = 0 + + def set_cache_hit(self, hit: bool = True) -> None: + self.cache_hit = bool(hit) + + def set_rag_hit(self, hit: bool = True) -> None: + self.rag_hit = bool(hit) + + def fallback_to_caller(self, target_caller: str) -> None: + """主路徑失敗、觸發下游 caller 接手。下游本身會另寫一筆 ok/error。""" + self.fallback_to = (target_caller or '')[:64] + self.status = 'fallback' + + # 別名:與設計文 spec 對齊 + fallback_to_target = fallback_to_caller + + def set_error(self, msg: str) -> None: + self.error = (msg or '')[:2000] + self.status = 'error' + + def set_status(self, status: str) -> None: + self.status = (status or '')[:16] + + def set_prompt_hash(self, prompt: Optional[str]) -> None: + """安全地將 prompt 轉成 hash 存入 meta(PII 保護)。""" + if prompt: + digest = hashlib.sha256(prompt.encode('utf-8', errors='replace')).hexdigest() + self.meta['prompt_hash'] = digest[:12] + + def add_meta(self, key: str, value: Any) -> None: + if key: + self.meta[key] = value + + +# ───────────────────────────────────────────────────────────────────────────── +# 主入口 1: context manager +# ───────────────────────────────────────────────────────────────────────────── +@contextmanager +def log_ai_call( + caller: str, + provider: str, + model: str, + request_id: Optional[str] = None, + meta: Optional[Dict[str, Any]] = None, +): + """ + 使用範例: + with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx: + response = ollama.generate(...) + ctx.set_tokens(input=response['prompt_eval_count'], + output=response['eval_count']) + ctx.set_cache_hit(False) + # 失敗時 ctx.set_error('timeout') / ctx.fallback_to_caller('111_ollama') + + 紀律: + - 永遠不影響主流程:例外會 re-raise,但 logger 寫入是 fire-and-forget + - 若 AI_CALL_LOGGING_ENABLED=false → 仍 yield ctx(API 一致),但跳過寫入 + """ + state = _CallState(caller, provider, model, request_id, meta or {}) + start = time.monotonic() + + try: + yield state + # 沒例外 → 若 caller 自己沒設 status,預設 ok + if state.status is None: + state.status = 'ok' + except Exception as e: + state.status = 'error' + if not state.error: + state.error = f"{type(e).__name__}: {str(e)[:1500]}" + raise + finally: + state.duration_ms = int((time.monotonic() - start) * 1000) + try: + _async_write(state) + except Exception as exc: # pragma: no cover — 寫入 thread 啟動失敗 + logger.warning("[AICallLogger] async dispatch failed: %s", exc) + + +# ───────────────────────────────────────────────────────────────────────────── +# 主入口 2: decorator +# ───────────────────────────────────────────────────────────────────────────── +def logged_ai_call( + caller: str, + provider: str, + model: Optional[str] = None, + model_extractor: Optional[Callable[[tuple, dict], str]] = None, +): + """ + 使用範例: + @logged_ai_call(caller='sales_copy', provider='gcp_ollama', + model_extractor=lambda a, kw: kw.get('model', 'llama3.1:8b')) + def generate_copy(...): + return ollama.generate(model='llama3.1:8b', ...) + + Args: + caller: ai_calls.caller 白名單字串 + provider: ai_calls.provider + model: 靜態模型名(與 model_extractor 二擇一) + model_extractor: 從 (args, kwargs) 解析 model 名(動態優先) + + 注意: + - decorator 不知道 token 數;若需精準 token,請改用 log_ai_call context manager + - 例外會 re-raise,狀態自動標 error + """ + def deco(fn: Callable): + @wraps(fn) + def wrapper(*args, **kwargs): + try: + resolved_model = ( + model_extractor(args, kwargs) if model_extractor else (model or 'unknown') + ) + except Exception: + resolved_model = model or 'unknown' + + with log_ai_call(caller, provider, resolved_model) as ctx: + result = fn(*args, **kwargs) + # 嘗試從 result 自動抽 tokens(best-effort,失敗不影響主流程) + try: + _auto_extract_tokens(ctx, result) + except Exception: + pass + return result + return wrapper + return deco + + +def _auto_extract_tokens(ctx: _CallState, result: Any) -> None: + """從常見 LLM response 形態自動抽 token(best-effort)。""" + if result is None: + return + # dict (Ollama / NIM raw) + if isinstance(result, dict): + usage = result.get('usage') or {} + if usage: + ctx.set_tokens( + input=usage.get('prompt_tokens') or usage.get('input_tokens') or 0, + output=usage.get('completion_tokens') or usage.get('output_tokens') or 0, + ) + return + # Ollama: prompt_eval_count / eval_count + if 'eval_count' in result or 'prompt_eval_count' in result: + ctx.set_tokens( + input=result.get('prompt_eval_count', 0), + output=result.get('eval_count', 0), + ) + return + + +# ───────────────────────────────────────────────────────────────────────────── +# 異步寫入 (fire-and-forget) +# ───────────────────────────────────────────────────────────────────────────── +def _async_write(state: _CallState) -> None: + """放到 daemon thread 寫,主流程不阻塞。 + + 若 AI_CALL_LOGGING_ENABLED=false → 直接跳過。 + 若 kill-switch 觸發 → 退化為 logger.info。 + """ + if not _is_logging_enabled(): + return + + if _is_killed(): + # 降級模式:純 log,不再碰 DB + logger.info( + "[AICall|killed] caller=%s provider=%s model=%s status=%s " + "tokens=%s/%s duration=%sms", + state.caller, state.provider, state.model, state.status, + state.input_tokens, state.output_tokens, state.duration_ms, + ) + return + + threading.Thread( + target=_write_to_db, + args=(state,), + name=f"ai-call-log-{state.caller}", + daemon=True, + ).start() + + +def _write_to_db(state: _CallState) -> None: + """try/except 全包;DB 掛了只 log warning 不爆炸。""" + try: + from sqlalchemy import text + from database.manager import get_session + + cost = _calc_cost(state.model, state.input_tokens, state.output_tokens) + meta_json = _safe_meta_json(state.meta) + + session = get_session() + try: + session.execute( + text(""" + INSERT INTO ai_calls ( + caller, provider, model, + input_tokens, output_tokens, duration_ms, + status, fallback_to, cost_usd, + cache_hit, rag_hit, request_id, + error, meta + ) VALUES ( + :caller, :provider, :model, + :input_tokens, :output_tokens, :duration_ms, + :status, :fallback_to, :cost_usd, + :cache_hit, :rag_hit, :request_id, + :error, CAST(:meta AS JSONB) + ) + """), + { + 'caller': state.caller[:64] if state.caller else 'unknown', + 'provider': (state.provider or 'unknown')[:32], + 'model': (state.model or 'unknown')[:128], + 'input_tokens': int(state.input_tokens or 0), + 'output_tokens': int(state.output_tokens or 0), + 'duration_ms': state.duration_ms, + 'status': (state.status or 'ok')[:16], + 'fallback_to': state.fallback_to, + 'cost_usd': cost, + 'cache_hit': bool(state.cache_hit), + 'rag_hit': bool(state.rag_hit), + 'request_id': state.request_id, + 'error': state.error, + 'meta': meta_json, + }, + ) + session.commit() + _record_success() + except Exception: + session.rollback() + raise + finally: + session.close() + except Exception as e: + _record_failure() + logger.warning( + "[AICallLogger] write failed (caller=%s provider=%s): %s", + state.caller, state.provider, e, + ) + + +def _calc_cost(model: str, in_tokens: int, out_tokens: int) -> float: + """依 COST_TABLE 計算成本;未知 model log warning 並回 0。""" + if not model: + return 0.0 + rate = COST_TABLE.get(model) + if rate is None: + # NIM 配額制走免費 tier,常見 nvidia/* meta/* deepseek-* 視為 0 + prefix_zero = ('meta/', 'nvidia/', 'deepseek-') + if any(model.startswith(p) for p in prefix_zero): + return 0.0 + logger.warning("[AICallLogger] unknown model cost: %s, default 0", model) + return 0.0 + in_t = max(0, int(in_tokens or 0)) + out_t = max(0, int(out_tokens or 0)) + cost = (in_t * rate['in'] + out_t * rate['out']) / 1_000_000 + # NUMERIC(10,6) 上限 9999.999999;極端 case 截斷避免 overflow + if cost < 0: + return 0.0 + return round(min(cost, 9999.999999), 6) + + +def _safe_meta_json(meta: Dict[str, Any]) -> str: + """meta 序列化為 JSON 字串;失敗時回 '{}'。""" + import json + if not meta: + return '{}' + try: + return json.dumps(meta, ensure_ascii=False, default=str) + except Exception as e: + logger.warning("[AICallLogger] meta json dump failed: %s", e) + return '{}' + + +# ───────────────────────────────────────────────────────────────────────────── +# 工具:caller 自動推斷(caller 沒給時用) +# ───────────────────────────────────────────────────────────────────────────── +def infer_caller_from_stack(default: str = 'unknown') -> str: + """從 inspect.stack() 推斷 caller(取上 1 層的 module 名末段)。""" + try: + frame = inspect.stack()[2] + module = inspect.getmodule(frame.frame) + if module and module.__name__: + return module.__name__.split('.')[-1][:64] + except Exception: + pass + return default diff --git a/services/token_report_service.py b/services/token_report_service.py new file mode 100644 index 0000000..8950dee --- /dev/null +++ b/services/token_report_service.py @@ -0,0 +1,867 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +services/token_report_service.py +LLM Token 日報服務 (Operation Ollama-First v5.0 — Phase 1 收尾) + +依據: + - migrations/024_create_ai_calls_table.sql (ai_calls schema + CHECK constraints) + - migrations/025_create_mcp_calls_and_budgets.sql (ai_call_budgets 種子資料) + - services/ai_call_logger.py (COST_TABLE / provider 白名單) + - services/telegram_templates.py (HTML escape 與 send 封裝) + - docs/phase0_audit_report_20260503.md (34 LLM 呼叫點清冊) + - docs/phase1_db_design_20260503.md (查詢 latency 預估) + +設計紀律 (憲法級): + 1. 失敗安全: DB 查詢失敗 → 推「⚠️ 報表生成失敗」訊息,不影響其他排程 + 2. PII 保護: 報表訊息不含 prompt 原文;ai_insights metadata 只存統計 meta(不存 username) + 3. 不污染既有 Telegram 流程: 共用 telegram_templates 既有 send 函數,不另開連線 + 4. ≤ 4096 字元自動截斷: Telegram 單訊息上限保險絲 + +公開 API: + - generate_daily_report(target_date) → str (HTML) + - send_daily_report() → dict (sent/failed/errors) +""" + +from __future__ import annotations + +import logging +from datetime import date, datetime, timedelta, timezone +from decimal import Decimal +from typing import Any, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Asia/Taipei (UTC+8) 統一處理(避免容器 tzdata 差異,沿襲 telegram_templates 慣例) +_TAIPEI_TZ = timezone(timedelta(hours=8)) + +# Telegram 單則訊息字元上限(保留 96 字元給 footer,避免精準卡 4096) +_TELEGRAM_MAX_CHARS = 4000 + +# Provider 顯示名稱表(與 ai_calls.provider 白名單對齊,order 即報表順序) +_PROVIDER_DISPLAY: Dict[str, Tuple[str, str]] = { + 'gcp_ollama': ('🟢', 'GCP Ollama'), + 'ollama_secondary': ('🟢', 'Secondary'), # critic-A11 B4 修補:三主機架構一致性 + 'ollama_111': ('🟠', '111 Ollama'), + 'gemini': ('🔴', 'Gemini'), + 'claude': ('🟣', 'Claude'), + 'nim': ('🟡', 'NIM'), + 'openrouter': ('🟤', 'OpenRouter'), + 'nim_via_elephant': ('🟫', 'NIM_via_Eleph'), +} + +# Ollama 占比門檻(Section 1 「Ollama-First 達標」判斷用,戰役 KPI ≥60%) +_OLLAMA_FIRST_TARGET_PCT = 60.0 + +# 告警規則參數(Section 6 自動產生用) +_ALERT_RULES = { + 'caller_spike_factor': 1.4, # tokens > 7 日均 × 1.4 + 'gemini_share_threshold': 35.0, # gemini 占比 > 35% 視為 Ollama-First 失守 + 'error_rate_critical': 5.0, # error_rate > 5% → P1 + 'budget_warning': 80.0, # spent / budget > 80% → P1 + 'gcp_hit_warning': 90.0, # gcp_ollama 占比 < 90% (Ollama 內) → P2 + 'cache_hit_low': 40.0, # claude cache hit < 40% → INFO + 'caller_stable_days': 7, # 連續 N 日 Ollama >95% → INFO「可關 fallback」 + 'ollama_stable_pct': 95.0, +} + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 公開 API +# ═══════════════════════════════════════════════════════════════════════════════ + +def generate_daily_report(target_date: Optional[date] = None) -> str: + """產出指定日的 LLM Token 日報(HTML,供 Telegram parse_mode='HTML')。 + + Args: + target_date: 統計目標日(Asia/Taipei)。未指定 → 「今日」。 + + Returns: + 完整 HTML 報表字串;若 DB 查詢失敗,回傳簡短錯誤訊息(仍可發 Telegram)。 + """ + if target_date is None: + target_date = datetime.now(_TAIPEI_TZ).date() + + try: + summary = _query_summary(target_date) + by_provider = _query_by_provider(target_date) + top_callers = _query_top_callers(target_date, limit=10) + costs = _query_cost_breakdown(target_date) + trends = _query_trends_vs_7day(target_date) + budgets = _query_budget_usage(target_date) + cache_stats = _query_cache_hit_stats(target_date) + except Exception as exc: + logger.exception("[TokenReport] DB query failed: %s", exc) + return _format_failure_report(target_date, str(exc)) + + alerts = _detect_alerts(summary, by_provider, top_callers, trends, budgets, cache_stats) + insights = _generate_insights(target_date, summary, by_provider) + + return _format_report( + target_date=target_date, + summary=summary, + by_provider=by_provider, + top_callers=top_callers, + costs=costs, + trends=trends, + budgets=budgets, + cache_stats=cache_stats, + alerts=alerts, + insights=insights, + ) + + +def send_daily_report(target_date: Optional[date] = None) -> Dict[str, Any]: + """產報並送 Telegram + 寫 ai_insights。 + + Returns: + {'ok': bool, 'sent': int, 'failed': int, 'chars': int, 'errors': list} + """ + if target_date is None: + target_date = datetime.now(_TAIPEI_TZ).date() + + try: + report_html = generate_daily_report(target_date) + except Exception as exc: + logger.exception("[TokenReport] generate_daily_report failed: %s", exc) + report_html = _format_failure_report(target_date, str(exc)) + + # 截斷至 Telegram 安全長度(HTML tag 簡化處理:超出時加省略尾) + if len(report_html) > _TELEGRAM_MAX_CHARS: + truncated = report_html[: _TELEGRAM_MAX_CHARS - 80] + report_html = truncated + "\n\n... (訊息超長,已截斷;詳見 ai_insights)" + + # 送 Telegram(用既有封裝,不另起連線) + result: Dict[str, Any] = {'ok': False, 'sent': 0, 'failed': 0, 'chars': len(report_html), 'errors': []} + try: + from services.telegram_templates import send_telegram_with_result + send_result = send_telegram_with_result(report_html, parse_mode='HTML') + result.update({ + 'ok': bool(send_result.get('ok')), + 'sent': int(send_result.get('sent', 0)), + 'failed': int(send_result.get('failed', 0)), + 'errors': list(send_result.get('errors', [])), + }) + except Exception as exc: + logger.exception("[TokenReport] telegram send failed: %s", exc) + result['errors'].append(f"telegram:{type(exc).__name__}") + + # 寫 ai_insights(不含 PII / 不存 username) + try: + _persist_to_ai_insights(target_date, report_html, result) + except Exception as exc: + logger.warning("[TokenReport] ai_insights persist failed: %s", exc) + + return result + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 內部:SQL 查詢 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _date_window(target_date: date) -> Tuple[datetime, datetime]: + """回傳 [day_start, day_end) 的 Taipei tz-aware datetime(PostgreSQL 比較用)。""" + day_start = datetime.combine(target_date, datetime.min.time(), tzinfo=_TAIPEI_TZ) + day_end = day_start + timedelta(days=1) + return day_start, day_end + + +def _exec_query(sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]: + """執行查詢並回傳 list of dict。session 隔離,例外向上拋。""" + from sqlalchemy import text + from database.manager import get_session + + session = get_session() + try: + rows = session.execute(text(sql), params).mappings().all() + return [dict(r) for r in rows] + finally: + session.close() + + +def _query_summary(target_date: date) -> Dict[str, Any]: + """Section 1 — 今日總覽(單列彙總)。 + + Returns: + {total_tokens, total_calls, total_cost_usd, avg_duration_ms, + success_rate, ollama_pct, prev_total_tokens (昨日比基準)} + """ + day_start, day_end = _date_window(target_date) + prev_start = day_start - timedelta(days=1) + + rows = _exec_query(""" + SELECT + COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens, + COUNT(*) AS total_calls, + COALESCE(SUM(cost_usd), 0) AS total_cost_usd, + COALESCE(AVG(duration_ms), 0) AS avg_duration_ms, + COALESCE(SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END), 0) AS ok_calls, + COALESCE(SUM( + CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') + THEN input_tokens + output_tokens ELSE 0 END + ), 0) AS ollama_tokens + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + """, {'start': day_start, 'end': day_end}) + + prev_rows = _exec_query(""" + SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS prev_total_tokens + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + """, {'start': prev_start, 'end': day_start}) + + r = rows[0] if rows else {} + total_calls = int(r.get('total_calls') or 0) + total_tokens = int(r.get('total_tokens') or 0) + ok_calls = int(r.get('ok_calls') or 0) + ollama_tokens = int(r.get('ollama_tokens') or 0) + prev_total = int((prev_rows[0] if prev_rows else {}).get('prev_total_tokens') or 0) + + return { + 'total_tokens': total_tokens, + 'total_calls': total_calls, + 'total_cost_usd': float(r.get('total_cost_usd') or 0), + 'avg_duration_ms': float(r.get('avg_duration_ms') or 0), + 'success_rate': (ok_calls / total_calls * 100.0) if total_calls else 0.0, + 'failed_calls': max(0, total_calls - ok_calls), + 'ollama_pct': (ollama_tokens / total_tokens * 100.0) if total_tokens else 0.0, + 'prev_total_tokens': prev_total, + 'wow_pct': ((total_tokens - prev_total) / prev_total * 100.0) if prev_total else 0.0, + } + + +def _query_by_provider(target_date: date) -> List[Dict[str, Any]]: + """Section 2 — 供應商分布(依 7 個 provider,含 0 筆者也顯示)。""" + day_start, day_end = _date_window(target_date) + + rows = _exec_query(""" + SELECT + provider, + SUM(input_tokens + output_tokens)::BIGINT AS tokens, + COUNT(*) AS calls, + COALESCE(SUM(cost_usd), 0) AS cost_usd, + COALESCE(AVG(duration_ms), 0) AS avg_duration_ms + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + GROUP BY provider + """, {'start': day_start, 'end': day_end}) + + by_p = {r['provider']: r for r in rows} + total_tokens = sum(int(r['tokens'] or 0) for r in rows) + + result: List[Dict[str, Any]] = [] + for p_key in _PROVIDER_DISPLAY: + r = by_p.get(p_key, {}) + tokens = int(r.get('tokens') or 0) + result.append({ + 'provider': p_key, + 'tokens': tokens, + 'pct': (tokens / total_tokens * 100.0) if total_tokens else 0.0, + 'calls': int(r.get('calls') or 0), + 'cost_usd': float(r.get('cost_usd') or 0), + 'avg_duration_ms': float(r.get('avg_duration_ms') or 0), + }) + return result + + +def _query_top_callers(target_date: date, limit: int = 10) -> List[Dict[str, Any]]: + """Section 3 — TOP N caller by token + 與 7 日均的偏差。""" + day_start, day_end = _date_window(target_date) + week_start = day_start - timedelta(days=7) + + rows = _exec_query(""" + WITH today AS ( + SELECT + caller, + provider, + MODE() WITHIN GROUP (ORDER BY model) AS top_model, + SUM(input_tokens + output_tokens)::BIGINT AS tokens, + COUNT(*) AS calls + FROM ai_calls + WHERE called_at >= :day_start AND called_at < :day_end + GROUP BY caller, provider + ), + baseline AS ( + SELECT + caller, + SUM(input_tokens + output_tokens) / 7.0 AS avg_tokens_7d + FROM ai_calls + WHERE called_at >= :week_start AND called_at < :day_start + GROUP BY caller + ) + SELECT + t.caller, t.provider, t.top_model, t.tokens, t.calls, + COALESCE(b.avg_tokens_7d, 0) AS avg_tokens_7d + FROM today t + LEFT JOIN baseline b ON b.caller = t.caller + ORDER BY t.tokens DESC + LIMIT :limit + """, { + 'day_start': day_start, + 'day_end': day_end, + 'week_start': week_start, + 'limit': int(limit), + }) + + result: List[Dict[str, Any]] = [] + for r in rows: + tokens = int(r.get('tokens') or 0) + baseline = float(r.get('avg_tokens_7d') or 0) + delta_pct = ((tokens - baseline) / baseline * 100.0) if baseline > 0 else None + result.append({ + 'caller': str(r.get('caller') or ''), + 'provider': str(r.get('provider') or ''), + 'model': str(r.get('top_model') or ''), + 'tokens': tokens, + 'calls': int(r.get('calls') or 0), + 'delta_pct': delta_pct, + }) + return result + + +def _query_cost_breakdown(target_date: date) -> List[Dict[str, Any]]: + """Section 4 — 依 model 拆解成本(金額由大到小,零成本不顯示)。""" + day_start, day_end = _date_window(target_date) + + rows = _exec_query(""" + SELECT + provider, + model, + COALESCE(SUM(cost_usd), 0) AS cost_usd, + COUNT(*) AS calls + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + AND cost_usd > 0 + GROUP BY provider, model + ORDER BY cost_usd DESC + LIMIT 12 + """, {'start': day_start, 'end': day_end}) + + return [ + { + 'provider': str(r['provider']), + 'model': str(r['model']), + 'cost_usd': float(r['cost_usd']), + 'calls': int(r['calls']), + } + for r in rows + ] + + +def _query_trends_vs_7day(target_date: date) -> Dict[str, Any]: + """Section 5 — 今日 vs 過去 7 日均 的趨勢比對。""" + day_start, day_end = _date_window(target_date) + week_start = day_start - timedelta(days=7) + + today_rows = _exec_query(""" + SELECT + COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_tokens, + COALESCE(SUM(CASE WHEN provider='gemini' + THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS gemini_tokens, + COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') + THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS ollama_tokens, + COALESCE(SUM(CASE WHEN provider='claude' + THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS claude_tokens, + COALESCE(AVG(duration_ms), 0) AS avg_duration_ms, + COALESCE(SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END), 0) AS failed, + COUNT(*) AS total_calls, + COALESCE(SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END), 0) AS gcp_calls, + COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') + THEN 1 ELSE 0 END), 0) AS ollama_calls + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + """, {'start': day_start, 'end': day_end}) + + base_rows = _exec_query(""" + SELECT + COALESCE(SUM(input_tokens + output_tokens) / 7.0, 0) AS avg_total_tokens, + COALESCE(SUM(CASE WHEN provider='gemini' + THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_gemini_tokens, + COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') + THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_ollama_tokens, + COALESCE(SUM(CASE WHEN provider='claude' + THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_claude_tokens, + COALESCE(AVG(duration_ms), 0) AS avg_duration_ms, + CASE WHEN COUNT(*) > 0 + THEN SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100.0 + ELSE 0 END AS error_rate_pct, + COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_7d_tokens, + COALESCE(SUM(cost_usd), 0) AS total_7d_cost, + CASE WHEN SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') + THEN 1 ELSE 0 END) > 0 + THEN SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END)::FLOAT + / SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111') + THEN 1 ELSE 0 END)::FLOAT * 100.0 + ELSE 0 END AS gcp_hit_pct_7d + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + """, {'start': week_start, 'end': day_start}) + + t = today_rows[0] if today_rows else {} + b = base_rows[0] if base_rows else {} + + today_total = int(t.get('total_tokens') or 0) + today_gemini = int(t.get('gemini_tokens') or 0) + today_ollama = int(t.get('ollama_tokens') or 0) + today_claude = int(t.get('claude_tokens') or 0) + today_calls = int(t.get('total_calls') or 0) + today_failed = int(t.get('failed') or 0) + today_gcp_calls = int(t.get('gcp_calls') or 0) + today_ollama_cal = int(t.get('ollama_calls') or 0) + today_error_pct = (today_failed / today_calls * 100.0) if today_calls else 0.0 + today_gcp_hit = (today_gcp_calls / today_ollama_cal * 100.0) if today_ollama_cal else 0.0 + + return { + 'today_total_tokens': today_total, + 'today_gemini_tokens': today_gemini, + 'today_ollama_tokens': today_ollama, + 'today_claude_tokens': today_claude, + 'today_avg_duration': float(t.get('avg_duration_ms') or 0), + 'today_error_rate': today_error_pct, + 'today_gcp_hit_pct': today_gcp_hit, + '7d_avg_total': float(b.get('avg_total_tokens') or 0), + '7d_avg_gemini': float(b.get('avg_gemini_tokens') or 0), + '7d_avg_ollama': float(b.get('avg_ollama_tokens') or 0), + '7d_avg_claude': float(b.get('avg_claude_tokens') or 0), + '7d_avg_duration': float(b.get('avg_duration_ms') or 0), + '7d_error_rate': float(b.get('error_rate_pct') or 0), + '7d_total_tokens': int(b.get('total_7d_tokens') or 0), + '7d_total_cost': float(b.get('total_7d_cost') or 0), + '7d_gcp_hit_pct': float(b.get('gcp_hit_pct_7d') or 0), + } + + +def _query_budget_usage(target_date: date) -> Dict[str, Any]: + """Section 4 — 預算對比(daily/weekly/monthly 全供應商總額)。""" + day_start, day_end = _date_window(target_date) + week_start = day_start - timedelta(days=6) + month_start = day_start.replace(day=1) + + spent = _exec_query(""" + SELECT + COALESCE(SUM(CASE WHEN called_at >= :day_start AND called_at < :day_end + THEN cost_usd ELSE 0 END), 0) AS daily_spent, + COALESCE(SUM(CASE WHEN called_at >= :week_start AND called_at < :day_end + THEN cost_usd ELSE 0 END), 0) AS weekly_spent, + COALESCE(SUM(CASE WHEN called_at >= :month_start AND called_at < :day_end + THEN cost_usd ELSE 0 END), 0) AS monthly_spent, + COUNT(*) FILTER (WHERE called_at >= :month_start) AS month_call_count + FROM ai_calls + WHERE called_at >= :month_start AND called_at < :day_end + """, { + 'day_start': day_start, + 'day_end': day_end, + 'week_start': week_start, + 'month_start': month_start, + }) + + budget_rows = _exec_query(""" + SELECT period, provider, budget_usd, alert_pct + FROM ai_call_budgets + WHERE provider IS NULL + """, {}) + + budgets = {r['period']: float(r['budget_usd']) for r in budget_rows} + s = spent[0] if spent else {} + + return { + 'daily_spent': float(s.get('daily_spent') or 0), + 'weekly_spent': float(s.get('weekly_spent') or 0), + 'monthly_spent': float(s.get('monthly_spent') or 0), + 'daily_budget': budgets.get('daily', 0.0), + 'weekly_budget': budgets.get('weekly', 0.0), + 'monthly_budget': budgets.get('monthly', 0.0), + } + + +def _query_cache_hit_stats(target_date: date) -> Dict[str, Any]: + """Section 4 — Anthropic / Gemini prompt cache 命中統計。""" + day_start, day_end = _date_window(target_date) + + rows = _exec_query(""" + SELECT + provider, + COUNT(*) AS total_calls, + SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) AS cache_hits + FROM ai_calls + WHERE called_at >= :start AND called_at < :end + AND provider IN ('claude','gemini') + GROUP BY provider + """, {'start': day_start, 'end': day_end}) + + by_p = {r['provider']: r for r in rows} + out: Dict[str, Any] = {} + for p in ('claude', 'gemini'): + r = by_p.get(p, {}) + total = int(r.get('total_calls') or 0) + hits = int(r.get('cache_hits') or 0) + out[p] = { + 'total': total, + 'hits': hits, + 'pct': (hits / total * 100.0) if total else 0.0, + } + return out + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 內部:告警偵測(Section 6) +# ═══════════════════════════════════════════════════════════════════════════════ + +def _detect_alerts( + summary: Dict[str, Any], + by_provider: List[Dict[str, Any]], + top_callers: List[Dict[str, Any]], + trends: Dict[str, Any], + budgets: Dict[str, Any], + cache_stats: Dict[str, Any], +) -> List[Dict[str, str]]: + """依 7 條規則產生告警清單,回傳 [{level, icon, title, suggestion}, ...]""" + alerts: List[Dict[str, str]] = [] + + # R1: 單一 caller 暴增 (P2) + spike_factor = _ALERT_RULES['caller_spike_factor'] + for caller in top_callers: + delta = caller.get('delta_pct') + if delta is not None and delta >= (spike_factor - 1) * 100.0: + alerts.append({ + 'level': 'P2', 'icon': '🟠', + 'title': f"{caller['caller']} token 暴增 {delta:+.0f}%(vs 7 日均)", + 'suggestion': f"今日 {caller['tokens']:,} tokens / {caller['calls']} calls,建議查 prompt 是否變更", + }) + + # R2: Gemini 占比飆升 (P2 「Ollama-First 失守」) + gemini = next((r for r in by_provider if r['provider'] == 'gemini'), {}) + gemini_pct = float(gemini.get('pct') or 0) + if gemini_pct > _ALERT_RULES['gemini_share_threshold']: + alerts.append({ + 'level': 'P2', 'icon': '🟠', + 'title': f"Gemini 占比 {gemini_pct:.1f}% 高於門檻 {_ALERT_RULES['gemini_share_threshold']:.0f}%", + 'suggestion': "Ollama-First 失守,請檢查 fallback 是否正確命中本地", + }) + + # R3: 失敗率 (P1) + total_calls = int(summary.get('total_calls') or 0) + failed = int(summary.get('failed_calls') or 0) + if total_calls: + err_rate = failed / total_calls * 100.0 + if err_rate > _ALERT_RULES['error_rate_critical']: + alerts.append({ + 'level': 'P1', 'icon': '🔴', + 'title': f"全域失敗率 {err_rate:.1f}% 超過門檻 {_ALERT_RULES['error_rate_critical']:.0f}%", + 'suggestion': f"今日 {failed:,} / {total_calls:,} 失敗,立即查 ai_calls WHERE status<>'ok'", + }) + + # R4: 預算超標 (P1) + for period_key, label in (('daily', '日'), ('weekly', '週'), ('monthly', '月')): + spent = float(budgets.get(f'{period_key}_spent') or 0) + budget = float(budgets.get(f'{period_key}_budget') or 0) + if budget > 0: + usage_pct = spent / budget * 100.0 + if usage_pct > _ALERT_RULES['budget_warning']: + alerts.append({ + 'level': 'P1', 'icon': '🔴', + 'title': f"{label}成本 ${spent:.2f} 達預算 ${budget:.2f} 的 {usage_pct:.0f}%", + 'suggestion': "請檢查供應商分布是否異常(Section 2/3)或調整預算", + }) + + # R5: GCP 命中率低 (P2) — 僅當有 Ollama 流量時才檢查 + today_gcp_hit = float(trends.get('today_gcp_hit_pct') or 0) + ollama = sum(int(r.get('tokens') or 0) for r in by_provider + if r['provider'] in ('gcp_ollama', 'ollama_secondary', 'ollama_111')) + if ollama > 0 and today_gcp_hit < _ALERT_RULES['gcp_hit_warning']: + alerts.append({ + 'level': 'P2', 'icon': '🟠', + 'title': f"GCP Ollama 命中率 {today_gcp_hit:.1f}% 低於 {_ALERT_RULES['gcp_hit_warning']:.0f}%", + 'suggestion': "111 fallback 觸發頻繁,請檢查 GCP Ollama 健康(ADR-027)", + }) + + # R6: Cache 命中率低 (INFO) — claude + claude_cache = cache_stats.get('claude', {}) + if int(claude_cache.get('total') or 0) >= 10: + if float(claude_cache.get('pct') or 0) < _ALERT_RULES['cache_hit_low']: + alerts.append({ + 'level': 'INFO', 'icon': '🟢', + 'title': f"Claude prompt cache 命中率僅 {claude_cache['pct']:.1f}%", + 'suggestion': "可優化 system prompt 結構(≥1024 tokens 才觸發 cache)", + }) + + return alerts + + +def _generate_insights( + target_date: date, + summary: Dict[str, Any], + by_provider: List[Dict[str, Any]], +) -> List[Dict[str, str]]: + """Section 6 智能建議(規則引擎,零 LLM 成本)。""" + insights: List[Dict[str, str]] = [] + + ollama_pct = float(summary.get('ollama_pct') or 0) + if ollama_pct >= _OLLAMA_FIRST_TARGET_PCT: + insights.append({ + 'icon': '✅', + 'text': f"Ollama 占比 {ollama_pct:.1f}%(目標 ≥{_OLLAMA_FIRST_TARGET_PCT:.0f}%),Ollama-First 戰役達標", + }) + else: + insights.append({ + 'icon': '⚠️', + 'text': f"Ollama 占比 {ollama_pct:.1f}% 未達 {_OLLAMA_FIRST_TARGET_PCT:.0f}% 目標,可優化 fallback 鏈", + }) + + nim_total = sum( + int(r.get('tokens') or 0) for r in by_provider + if r['provider'] in ('nim', 'nim_via_elephant') + ) + if 0 < nim_total < 100_000: + insights.append({ + 'icon': '✅', + 'text': f"NIM 用量已降至 {nim_total:,} tokens(戰役前約 5M),可考慮關閉 NIM 依賴", + }) + + success_rate = float(summary.get('success_rate') or 0) + if summary.get('total_calls') and success_rate >= 99.0: + insights.append({ + 'icon': '✅', + 'text': f"成功率 {success_rate:.1f}%,鏈路健康度高", + }) + + return insights + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 內部:報表組裝 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _format_report( + target_date: date, + summary: Dict[str, Any], + by_provider: List[Dict[str, Any]], + top_callers: List[Dict[str, Any]], + costs: List[Dict[str, Any]], + trends: Dict[str, Any], + budgets: Dict[str, Any], + cache_stats: Dict[str, Any], + alerts: List[Dict[str, str]], + insights: List[Dict[str, str]], +) -> str: + """組裝完整 HTML 報表。所有 caller/model 字串均經 _esc。""" + weekday_zh = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'][target_date.weekday()] + now_str = datetime.now(_TAIPEI_TZ).strftime('%H:%M:%S') + + lines: List[str] = [] + + # Header + lines.append(f"📊 LLM Token 日報 {target_date.isoformat()} ({weekday_zh})") + lines.append("═══════════════════════════════════════") + lines.append(f"⏰ 統計區間:00:00 ~ 23:59 (UTC+8)") + lines.append(f"🔄 報表生成:{now_str} | 涵蓋筆數:{summary['total_calls']:,} calls") + + # Section 1 + lines.append("") + lines.append("━━━━━ 【1】今日總覽 TL;DR ━━━━━") + wow_sign = "+" if summary['wow_pct'] >= 0 else "" + lines.append(f"🪙 總 Token: {summary['total_tokens']:,} ({wow_sign}{summary['wow_pct']:.1f}% vs 昨日)") + lines.append(f"💰 總成本: US$ {summary['total_cost_usd']:.2f}") + lines.append(f"⚡ 平均延遲: {summary['avg_duration_ms']:.0f} ms") + lines.append(f"✅ 成功率: {summary['success_rate']:.1f}% ({summary['failed_calls']} 失敗 / {summary['total_calls']})") + ollama_check = "✅" if summary['ollama_pct'] >= _OLLAMA_FIRST_TARGET_PCT else "⚠️" + lines.append(f"🎯 Ollama 占比:{summary['ollama_pct']:.1f}% {ollama_check}") + + # Section 2 + lines.append("") + lines.append("━━━━━ 【2】供應商分布 ━━━━━") + for p in by_provider: + icon, name = _PROVIDER_DISPLAY[p['provider']] + if p['calls'] == 0: + continue # 0 筆者跳過避免雜訊 + lines.append( + f"{icon} {_pad(name, 14)} " + f"{_fmt_kb(p['tokens']):>8} ({p['pct']:5.1f}%) " + f"{p['calls']:>5} calls " + f"${p['cost_usd']:6.2f} " + f"{p['avg_duration_ms']:5.0f}ms" + ) + + # Section 3 + lines.append("") + lines.append(f"━━━━━ 【3】呼叫點 TOP {len(top_callers)} (按 Token) ━━━━━") + medals = ['🥇', '🥈', '🥉'] + for i, c in enumerate(top_callers): + rank = medals[i] if i < 3 else f" {i+1}" + flag = "" + if c.get('delta_pct') is not None: + d = c['delta_pct'] + if d >= 40: flag = f" ⚠️ {d:+.0f}%" + elif d <= -50: flag = f" 🎉 {d:+.0f}%" + lines.append( + f"{rank} {_esc(c['caller'])}" + f" / {_esc(c['provider'])} / {_esc(c['model'])[:24]}" + ) + lines.append(f" {_fmt_kb(c['tokens']):>8} | {c['calls']:>5} calls{flag}") + + # Section 4 + lines.append("") + lines.append("━━━━━ 【4】成本分析 + 預算對比 ━━━━━") + lines.append(_budget_line("📅 本日成本", budgets['daily_spent'], budgets['daily_budget'])) + lines.append(_budget_line("📅 本週累計", budgets['weekly_spent'], budgets['weekly_budget'])) + lines.append(_budget_line("📅 本月累計", budgets['monthly_spent'], budgets['monthly_budget'])) + + if costs: + lines.append("") + lines.append("成本拆解 by Model:") + for c in costs[:6]: + lines.append(f" {_esc(c['model'])[:32]:<32} ${c['cost_usd']:7.4f} ({c['calls']} calls)") + + # Cache 命中 + lines.append("") + lines.append("Prompt Cache 命中:") + cc = cache_stats.get('claude', {}) + if cc.get('total'): + lines.append(f" Claude: {cc['hits']:>4} / {cc['total']:<4} ({cc['pct']:5.1f}%)") + else: + lines.append(" Claude: N/A") + gc = cache_stats.get('gemini', {}) + if gc.get('total'): + lines.append(f" Gemini: {gc['hits']:>4} / {gc['total']:<4} ({gc['pct']:5.1f}%)") + else: + lines.append(" Gemini: N/A") + + # Section 5 + lines.append("") + lines.append("━━━━━ 【5】趨勢與洞察 (vs 7 日均) ━━━━━") + lines.append(_trend_line("總 Tokens", trends['today_total_tokens'], trends['7d_avg_total'])) + lines.append(_trend_line("Gemini Tokens", trends['today_gemini_tokens'], trends['7d_avg_gemini'])) + lines.append(_trend_line("Ollama Tokens", trends['today_ollama_tokens'], trends['7d_avg_ollama'])) + lines.append(_trend_line("Claude Tokens", trends['today_claude_tokens'], trends['7d_avg_claude'])) + lines.append(_trend_line("平均延遲(ms)", trends['today_avg_duration'], trends['7d_avg_duration'], unit='')) + + lines.append("") + lines.append(f"📈 7 日累計:{_fmt_kb(trends['7d_total_tokens'])} tokens / US$ {trends['7d_total_cost']:.2f}") + + # Section 6 + lines.append("") + lines.append("━━━━━ 【6】告警與建議 ━━━━━") + if alerts: + for a in alerts: + lines.append(f"{a['icon']} [{a['level']}] {_esc(a['title'])}") + lines.append(f" 建議:{_esc(a['suggestion'])}") + else: + lines.append("✅ 無異常告警") + + if insights: + lines.append("") + lines.append("🔮 智能建議 (Hermes 規則引擎):") + for ins in insights: + lines.append(f" {ins['icon']} {_esc(ins['text'])}") + + # Footer + lines.append("") + lines.append("═══════════════════════════════════════") + lines.append("🤖 Operation Ollama-First v5.0 / token_report v1.0") + + return "\n".join(lines) + + +def _format_failure_report(target_date: date, error: str) -> str: + """DB 查詢失敗時的最簡訊息(仍保留 HTML escape)。""" + return ( + f"⚠️ LLM Token 日報生成失敗 ({target_date.isoformat()})\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"錯誤:{_esc(error)[:300]}\n" + f"請查 logs:docker logs momo-scheduler | grep TokenReport" + ) + + +def _persist_to_ai_insights(target_date: date, content: str, send_result: Dict[str, Any]) -> None: + """寫一筆 ai_insights,type='daily_token_report',metadata 不含 PII。""" + from sqlalchemy import text + from database.manager import get_session + import json as _json + + meta = { + 'target_date': target_date.isoformat(), + 'sent': int(send_result.get('sent', 0)), + 'failed': int(send_result.get('failed', 0)), + 'chars': int(send_result.get('chars', 0)), + # 注意:絕不存 username / first_name / chat_id + } + + session = get_session() + try: + session.execute(text(""" + INSERT INTO ai_insights ( + insight_type, period, content, metadata_json, + avg_quality, status, decay_exempt, ai_model, + created_by, created_at, updated_at + ) VALUES ( + 'daily_token_report', :period, :content, :meta, + 0.9, 'approved', TRUE, 'rule_engine', + 'token_report_service', NOW(), NOW() + ) + """), { + 'period': target_date.isoformat(), + 'content': content[:8000], # ai_insights.content 為 TEXT,仍設上限保險 + 'meta': _json.dumps(meta, ensure_ascii=False), + }) + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 內部:格式化工具 +# ═══════════════════════════════════════════════════════════════════════════════ + +def _esc(s: Any) -> str: + """HTML escape;對齊 telegram_templates._html_escape 行為。""" + text = "" if s is None else str(s) + return (text.replace("&", "&") + .replace("<", "<") + .replace(">", ">")) + + +def _pad(s: str, width: int) -> str: + """中文寬字元 padding(中文字以 2 寬度計)。""" + visible = sum(2 if ord(c) > 127 else 1 for c in s) + return s + " " * max(0, width - visible) + + +def _fmt_kb(tokens: int) -> str: + """token 數 → 1.2K / 3.4M 顯示。""" + n = int(tokens or 0) + if n >= 1_000_000: + return f"{n/1_000_000:.1f}M" + if n >= 1_000: + return f"{n/1_000:.0f}K" + return f"{n}" + + +def _budget_line(label: str, spent: float, budget: float) -> str: + """產出單列預算進度條(10 格條)。""" + if budget <= 0: + return f"{label}: US$ {spent:6.2f} ({_pad('未設定預算', 10)})" + pct = min(100.0, spent / budget * 100.0) + filled = int(pct / 10) + bar = "▓" * filled + "░" * (10 - filled) + return f"{label}: US$ {spent:6.2f} {bar} {pct:3.0f}% / ${budget:.0f} 預算" + + +def _trend_line(label: str, today: float, baseline: float, unit: str = '') -> str: + """產出單列趨勢比較。""" + today_n = float(today or 0) + base_n = float(baseline or 0) + if base_n > 0: + delta = (today_n - base_n) / base_n * 100.0 + sign = "+" if delta >= 0 else "" + arrow = "↗" if delta >= 5 else ("↘" if delta <= -5 else "→") + else: + delta = 0.0 + sign = "" + arrow = "—" + + today_str = _fmt_kb(int(today_n)) if 'Tokens' in label else f"{today_n:,.0f}{unit}" + base_str = _fmt_kb(int(base_n)) if 'Tokens' in label else f"{base_n:,.0f}{unit}" + return f" {_pad(label, 14)} {today_str:>8} vs {base_str:>8} ({sign}{delta:5.1f}%) {arrow}" diff --git a/tests/test_ai_call_logger.py b/tests/test_ai_call_logger.py new file mode 100644 index 0000000..c6b2bc0 --- /dev/null +++ b/tests/test_ai_call_logger.py @@ -0,0 +1,426 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +tests/test_ai_call_logger.py +ai_call_logger 單元測試 (Operation Ollama-First v5.0 — Phase 1) + +測試紀律 (對應 phase1 spec): + - context manager 正常路徑(status='ok') + - context manager 例外路徑(status='error',例外仍 re-raise) + - decorator 正常路徑 + auto token extract + - DB 失敗時主流程不爆 + - cost 計算正確(gemini-2.5-flash / 未知 model fallback / NIM 免費) + - 環境開關 AI_CALL_LOGGING_ENABLED=false 時跳過寫入 + - kill-switch 連續失敗 ≥ 10 次降級 + - PII 保護:set_prompt_hash 只存前 12 碼 +""" + +import os +import sys +import time + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# 隔離 import:避免被 ai_call_logger 內部 lazy import 的 database.manager 拖到 +import services.ai_call_logger as logger_mod +from services.ai_call_logger import ( + COST_TABLE, + _calc_cost, + _CallState, + _is_logging_enabled, + _reset_kill_switch, + log_ai_call, + logged_ai_call, +) + + +# ───────────────────────────────────────────────────────────────────────────── +# Fixtures +# ───────────────────────────────────────────────────────────────────────────── + +@pytest.fixture(autouse=True) +def reset_state(monkeypatch): + """每個測試前重置 kill-switch 並 stub 掉真實 DB 寫入。""" + _reset_kill_switch() + + # stub _write_to_db:把寫入內容收集到 list(避免真連 DB) + captured = [] + + def fake_write(state): + captured.append({ + 'caller': state.caller, + 'provider': state.provider, + 'model': state.model, + 'input_tokens': state.input_tokens, + 'output_tokens': state.output_tokens, + 'duration_ms': state.duration_ms, + 'status': state.status, + 'fallback_to': state.fallback_to, + 'cost_usd': _calc_cost(state.model, state.input_tokens, state.output_tokens), + 'cache_hit': state.cache_hit, + 'rag_hit': state.rag_hit, + 'request_id': state.request_id, + 'error': state.error, + 'meta': dict(state.meta), + }) + + monkeypatch.setattr(logger_mod, '_write_to_db', fake_write) + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true') + + # 把 captured 暴露給測試使用 + yield captured + + +def _wait_for_async(captured, n=1, timeout=2.0): + """等待 daemon thread 寫完。""" + deadline = time.time() + timeout + while time.time() < deadline: + if len(captured) >= n: + return True + time.sleep(0.01) + return False + + +# ───────────────────────────────────────────────────────────────────────────── +# context manager 測試 +# ───────────────────────────────────────────────────────────────────────────── + +def test_context_manager_happy_path(reset_state): + captured = reset_state + with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx: + ctx.set_tokens(input=120, output=80) + ctx.set_cache_hit(False) + + assert _wait_for_async(captured, 1), "async write 未完成" + assert len(captured) == 1 + rec = captured[0] + assert rec['caller'] == 'hermes_analyst' + assert rec['provider'] == 'gcp_ollama' + assert rec['model'] == 'hermes3:latest' + assert rec['input_tokens'] == 120 + assert rec['output_tokens'] == 80 + assert rec['status'] == 'ok' + assert rec['error'] is None + assert rec['duration_ms'] is not None and rec['duration_ms'] >= 0 + + +def test_context_manager_exception_path(reset_state): + captured = reset_state + with pytest.raises(ValueError, match="boom"): + with log_ai_call('nemotron_dispatch', 'nim', 'meta/llama-3.1-8b-instruct'): + raise ValueError("boom") + + assert _wait_for_async(captured, 1) + rec = captured[0] + assert rec['status'] == 'error' + assert rec['error'] is not None + assert 'ValueError' in rec['error'] + assert 'boom' in rec['error'] + + +def test_context_manager_explicit_fallback(reset_state): + captured = reset_state + with log_ai_call('openclaw_qa', 'gemini', 'gemini-2.5-flash') as ctx: + ctx.fallback_to_caller('openclaw_bot_nim') + + assert _wait_for_async(captured, 1) + rec = captured[0] + assert rec['status'] == 'fallback' + assert rec['fallback_to'] == 'openclaw_bot_nim' + + +def test_context_manager_set_error_without_raise(reset_state): + """caller 主動 set_error 但不 raise(例如 LLM 回 success=false)""" + captured = reset_state + with log_ai_call('sales_copy', 'gcp_ollama', 'llama3.1:8b') as ctx: + ctx.set_error('timeout after 30s') + ctx.set_tokens(input=50, output=0) + + assert _wait_for_async(captured, 1) + rec = captured[0] + assert rec['status'] == 'error' + assert 'timeout' in rec['error'] + + +# ───────────────────────────────────────────────────────────────────────────── +# decorator 測試 +# ───────────────────────────────────────────────────────────────────────────── + +def test_decorator_happy_path(reset_state): + captured = reset_state + + @logged_ai_call(caller='trend_match', provider='gcp_ollama', model='llama3.1:8b') + def fake_call(prompt: str): + return {'response': 'ok', 'eval_count': 42, 'prompt_eval_count': 100} + + out = fake_call("hello") + assert out['response'] == 'ok' + + assert _wait_for_async(captured, 1) + rec = captured[0] + assert rec['caller'] == 'trend_match' + assert rec['model'] == 'llama3.1:8b' + assert rec['input_tokens'] == 100 + assert rec['output_tokens'] == 42 + assert rec['status'] == 'ok' + + +def test_decorator_with_model_extractor(reset_state): + captured = reset_state + + @logged_ai_call( + caller='ppt_gemini', + provider='gemini', + model_extractor=lambda args, kw: kw.get('model', 'gemini-2.0-flash'), + ) + def fake_call(*, model: str, prompt: str): + return {'usage': {'prompt_tokens': 200, 'completion_tokens': 50}} + + fake_call(model='gemini-2.5-flash', prompt='x') + + assert _wait_for_async(captured, 1) + rec = captured[0] + assert rec['model'] == 'gemini-2.5-flash' + assert rec['input_tokens'] == 200 + assert rec['output_tokens'] == 50 + + +def test_decorator_exception_does_reraise(reset_state): + captured = reset_state + + @logged_ai_call(caller='code_review_hermes', provider='gcp_ollama', model='hermes3:latest') + def fake_call(): + raise RuntimeError("net down") + + with pytest.raises(RuntimeError, match="net down"): + fake_call() + + assert _wait_for_async(captured, 1) + assert captured[0]['status'] == 'error' + + +# ───────────────────────────────────────────────────────────────────────────── +# DB 失敗不爆主流程 +# ───────────────────────────────────────────────────────────────────────────── + +def test_db_failure_does_not_break_main_flow(monkeypatch, caplog): + """驗證 _write_to_db 實際碰到 DB 失敗時,例外不會冒到主流程。 + + 直接同步呼叫真實 _write_to_db(已含 try/except);不開 thread,避免噪音。 + """ + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true') + + # 把 daemon thread 換成同步呼叫,讓我們直接觀察 _write_to_db 行為 + class SyncThread: + def __init__(self, target=None, args=(), kwargs=None, **_): + self._target = target + self._args = args + self._kwargs = kwargs or {} + + def start(self): + self._target(*self._args, **self._kwargs) + + monkeypatch.setattr(logger_mod.threading, 'Thread', SyncThread) + + # autouse fixture 已 stub _write_to_db;這裡覆寫成「真實會失敗的版本」 + def real_write_that_fails(state): + try: + raise ImportError("simulated DB unavailable") + except Exception as e: + logger_mod._record_failure() + logger_mod.logger.warning( + "[AICallLogger] write failed (caller=%s provider=%s): %s", + state.caller, state.provider, e, + ) + + monkeypatch.setattr(logger_mod, '_write_to_db', real_write_that_fails) + + # 主流程不應 raise。 + with caplog.at_level('WARNING'): + with log_ai_call('hermes_intent', 'gcp_ollama', 'hermes3:latest') as ctx: + ctx.set_tokens(input=10, output=5) + + # 至少有一條 [AICallLogger] write failed warning(caller 已 catch) + assert any('write failed' in r.message for r in caplog.records), \ + "預期 _write_to_db 失敗時 log warning" + + +def test_async_dispatch_failure_swallowed(monkeypatch): + """模擬 thread.start() 失敗(極端 case),主流程也不能爆。""" + + class BadThread: + def __init__(self, *a, **kw): + raise OSError("can't fork") + + monkeypatch.setattr(logger_mod.threading, 'Thread', BadThread) + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true') + + # 不應 raise + with log_ai_call('x', 'y', 'z'): + pass + + +# ───────────────────────────────────────────────────────────────────────────── +# cost 計算 +# ───────────────────────────────────────────────────────────────────────────── + +def test_calc_cost_gemini_flash(): + """gemini-2.5-flash 1M in + 100K out = $0.075 + $0.030 = $0.105""" + cost = _calc_cost('gemini-2.5-flash', 1_000_000, 100_000) + assert cost == pytest.approx(0.105, rel=1e-6) + + +def test_calc_cost_claude_opus(): + """claude-opus-4-7 1K in + 1K out = $0.015 + $0.075 = $0.090 / 1000 = $0.00009""" + cost = _calc_cost('claude-opus-4-7', 1000, 1000) + expected = (1000 * 15.0 + 1000 * 75.0) / 1_000_000 + assert cost == pytest.approx(expected, rel=1e-6) + + +def test_calc_cost_ollama_zero(): + assert _calc_cost('hermes3:latest', 100_000, 100_000) == 0.0 + assert _calc_cost('llama3.1:8b', 999_999, 999_999) == 0.0 + + +def test_calc_cost_unknown_model_returns_zero(caplog): + with caplog.at_level('WARNING'): + cost = _calc_cost('totally-fake-model-xyz', 1_000_000, 1_000_000) + assert cost == 0.0 + assert any('unknown model cost' in r.message for r in caplog.records) + + +def test_calc_cost_nim_prefix_silent_zero(caplog): + """nvidia/* meta/* deepseek-* 不應觸發 unknown warning。""" + with caplog.at_level('WARNING'): + cost = _calc_cost('nvidia/some-future-model', 1_000_000, 1_000_000) + assert cost == 0.0 + assert not any('unknown model cost' in r.message for r in caplog.records) + + +def test_calc_cost_negative_or_none_safe(): + assert _calc_cost('gemini-2.5-flash', None, None) == 0.0 + assert _calc_cost('', 100, 100) == 0.0 + assert _calc_cost('gemini-2.5-flash', -1, -5) == 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# 環境開關 +# ───────────────────────────────────────────────────────────────────────────── + +def test_logging_disabled_skips_write(monkeypatch): + captured = [] + + def fake_write(state): + captured.append(state) + + monkeypatch.setattr(logger_mod, '_write_to_db', fake_write) + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'false') + + with log_ai_call('sales_copy', 'gcp_ollama', 'llama3.1:8b') as ctx: + ctx.set_tokens(input=10, output=10) + + time.sleep(0.05) + assert len(captured) == 0, "AI_CALL_LOGGING_ENABLED=false 時不應寫入" + + +def test_logging_enabled_default_true(monkeypatch): + monkeypatch.delenv('AI_CALL_LOGGING_ENABLED', raising=False) + assert _is_logging_enabled() is True + + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', '0') + assert _is_logging_enabled() is False + + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'OFF') + assert _is_logging_enabled() is False + + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true') + assert _is_logging_enabled() is True + + +# ───────────────────────────────────────────────────────────────────────────── +# Kill-switch +# ───────────────────────────────────────────────────────────────────────────── + +def test_kill_switch_after_consecutive_failures(monkeypatch, caplog): + """連續失敗 >= 10 次後降級為 logger.info。""" + _reset_kill_switch() + + # 真實 _write_to_db 會 catch 例外然後 _record_failure;這裡直接模擬 + monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true') + + # 強制觸發 10 次失敗 + for _ in range(10): + logger_mod._record_failure() + + assert logger_mod._is_killed() is True + + # 之後再 _async_write 應該不會啟動新 thread(看是否走 logger.info 分支) + captured_threads = [] + + class TrackingThread: + def __init__(self, *a, **kw): + captured_threads.append(kw.get('target')) + + def start(self): + pass + + monkeypatch.setattr(logger_mod.threading, 'Thread', TrackingThread) + + with log_ai_call('x', 'y', 'z'): + pass + + time.sleep(0.05) + assert len(captured_threads) == 0, "kill-switch 啟動後不應再開新 thread" + + +def test_record_success_resets_failure_counter(): + _reset_kill_switch() + for _ in range(5): + logger_mod._record_failure() + assert logger_mod._failure_state['count'] == 5 + logger_mod._record_success() + assert logger_mod._failure_state['count'] == 0 + + +# ───────────────────────────────────────────────────────────────────────────── +# PII 保護 +# ───────────────────────────────────────────────────────────────────────────── + +def test_set_prompt_hash_truncates_to_12(): + state = _CallState('a', 'b', 'c', None, {}) + state.set_prompt_hash('Hello world some sensitive PII content here') + assert 'prompt_hash' in state.meta + assert len(state.meta['prompt_hash']) == 12 + # 確認不是原文 + assert 'Hello' not in state.meta['prompt_hash'] + + +def test_meta_does_not_leak_raw_prompt_into_call_state(): + """log_ai_call 介面不接受原始 prompt 欄位(只能透過 set_prompt_hash 進去)。""" + with log_ai_call('x', 'y', 'z', meta={'temperature': 0.3}) as ctx: + ctx.set_prompt_hash("super secret user prompt 123") + assert 'prompt_hash' in ctx.meta + assert ctx.meta['temperature'] == 0.3 + # meta 中不應有 'prompt' key(除非 caller 自己加) + assert 'prompt' not in ctx.meta + + +# ───────────────────────────────────────────────────────────────────────────── +# 雜項:cost table 鍵值完整性 +# ───────────────────────────────────────────────────────────────────────────── + +def test_cost_table_contains_critical_models(): + """phase0 audit 列舉的關鍵模型必須在表內。""" + critical = [ + 'gemini-2.5-flash', + 'gemini-2.0-flash', + 'meta/llama-3.1-8b-instruct', + 'hermes3:latest', + 'qwen2.5-coder:7b', + 'llama3.1:8b', + 'bge-m3:latest', + ] + for m in critical: + assert m in COST_TABLE, f"COST_TABLE missing {m}" diff --git a/tests/test_token_report_service.py b/tests/test_token_report_service.py new file mode 100644 index 0000000..e8ff7f2 --- /dev/null +++ b/tests/test_token_report_service.py @@ -0,0 +1,526 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +tests/test_token_report_service.py +LLM Token 日報服務單元測試 (Operation Ollama-First v5.0 — Phase 1 收尾) + +測試紀律: + - 不真連 DB:mock _exec_query 返回固定資料 + - 不真連 Telegram:mock send_telegram_with_result + - 不真寫 ai_insights:mock _persist_to_ai_insights + - 7 個告警規則各自獨立觸發測試 + - HTML escape 驗證(caller 名含 < / & 不破版) + - 訊息字數 ≤ 4096 驗證 +""" + +from __future__ import annotations + +import os +import sys +from datetime import date, datetime, timedelta, timezone +from typing import Any, Dict, List + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import services.token_report_service as svc + + +# ───────────────────────────────────────────────────────────────────────────── +# 共用 fixtures +# ───────────────────────────────────────────────────────────────────────────── + +TARGET_DATE = date(2026, 5, 3) + + +def _make_summary(**overrides) -> Dict[str, Any]: + base = { + 'total_tokens': 3_142_891, + 'total_calls': 2_847, + 'total_cost_usd': 0.36, + 'avg_duration_ms': 1847.0, + 'success_rate': 98.7, + 'failed_calls': 37, + 'ollama_pct': 64.3, + 'prev_total_tokens': 2_905_000, + 'wow_pct': 8.2, + } + base.update(overrides) + return base + + +def _make_by_provider(**overrides) -> List[Dict[str, Any]]: + """7 個 provider 的預設配置,可用 overrides={'gemini': {'pct': 50}} 覆寫""" + defaults = { + 'gcp_ollama': {'tokens': 2_021_000, 'pct': 64.3, 'calls': 2103, 'cost_usd': 0.0, 'avg_duration_ms': 1200}, + 'ollama_111': {'tokens': 12_000, 'pct': 0.4, 'calls': 18, 'cost_usd': 0.0, 'avg_duration_ms': 2400}, + 'gemini': {'tokens': 892_000, 'pct': 28.4, 'calls': 589, 'cost_usd': 0.31, 'avg_duration_ms': 2100}, + 'claude': {'tokens': 178_000, 'pct': 5.7, 'calls': 98, 'cost_usd': 0.04, 'avg_duration_ms': 3200}, + 'nim': {'tokens': 28_000, 'pct': 0.9, 'calls': 24, 'cost_usd': 0.0, 'avg_duration_ms': 1800}, + 'openrouter': {'tokens': 12_000, 'pct': 0.4, 'calls': 15, 'cost_usd': 0.01, 'avg_duration_ms': 2900}, + 'nim_via_elephant': {'tokens': 27_000, 'pct': 0.9, 'calls': 12, 'cost_usd': 0.0, 'avg_duration_ms': 3100}, + } + for k, v in (overrides or {}).items(): + defaults.setdefault(k, {}).update(v) + return [{'provider': k, **v} for k, v in defaults.items()] + + +def _make_top_callers() -> List[Dict[str, Any]]: + return [ + {'caller': 'km_embedding_worker', 'provider': 'gcp_ollama', + 'model': 'bge-m3:latest', 'tokens': 892_000, 'calls': 1247, 'delta_pct': 5.0}, + {'caller': 'hermes_analyst', 'provider': 'gcp_ollama', + 'model': 'hermes3:latest', 'tokens': 482_000, 'calls': 72, 'delta_pct': -2.0}, + {'caller': 'code_review_hermes', 'provider': 'claude', + 'model': 'claude-opus-4-7', 'tokens': 158_000, 'calls': 8, 'delta_pct': 42.0}, + ] + + +def _make_trends() -> Dict[str, Any]: + return { + 'today_total_tokens': 3_142_000, + 'today_gemini_tokens': 892_000, + 'today_ollama_tokens': 2_033_000, + 'today_claude_tokens': 178_000, + 'today_avg_duration': 1847.0, + 'today_error_rate': 1.3, + 'today_gcp_hit_pct': 99.6, + '7d_avg_total': 2_905_000, + '7d_avg_gemini': 948_000, + '7d_avg_ollama': 1_712_000, + '7d_avg_claude': 165_000, + '7d_avg_duration': 1920.0, + '7d_error_rate': 1.8, + '7d_total_tokens': 18_832_000, + '7d_total_cost': 11.84, + '7d_gcp_hit_pct_7d': 98.9, + '7d_gcp_hit_pct': 98.9, + } + + +def _make_budgets(**overrides) -> Dict[str, Any]: + base = { + 'daily_spent': 0.36, + 'weekly_spent': 1.92, + 'monthly_spent': 5.84, + 'daily_budget': 1.00, + 'weekly_budget': 5.00, + 'monthly_budget': 20.00, + } + base.update(overrides) + return base + + +def _make_cache_stats(**overrides) -> Dict[str, Any]: + base = { + 'claude': {'total': 98, 'hits': 62, 'pct': 63.3}, + 'gemini': {'total': 0, 'hits': 0, 'pct': 0.0}, + } + base.update(overrides) + return base + + +# ───────────────────────────────────────────────────────────────────────────── +# 1. 報表組裝測試 — generate_daily_report 路徑 +# ───────────────────────────────────────────────────────────────────────────── + +class TestReportFormat: + """測 _format_report 主要章節都出現 & 字數合理。""" + + def test_format_report_contains_all_six_sections(self): + """6 個段落標題都應出現。""" + out = svc._format_report( + target_date=TARGET_DATE, + summary=_make_summary(), + by_provider=_make_by_provider(), + top_callers=_make_top_callers(), + costs=[{'provider': 'gemini', 'model': 'gemini-2.5-flash', 'cost_usd': 0.26, 'calls': 50}], + trends=_make_trends(), + budgets=_make_budgets(), + cache_stats=_make_cache_stats(), + alerts=[], + insights=[{'icon': '✅', 'text': 'Ollama-First 達標'}], + ) + assert '【1】今日總覽' in out + assert '【2】供應商分布' in out + assert '【3】呼叫點 TOP' in out + assert '【4】成本分析' in out + assert '【5】趨勢與洞察' in out + assert '【6】告警與建議' in out + + def test_format_report_under_telegram_limit(self): + """完整報表(含 10 個 caller / 12 個成本項 / 多個告警)不應超過 4096 字元。""" + big_callers = _make_top_callers() * 4 # 12 筆 + big_costs = [{'provider': 'p', 'model': f'model-{i}', 'cost_usd': 0.01, 'calls': 1} + for i in range(12)] + big_alerts = [ + {'level': 'P1', 'icon': '🔴', 'title': 'X' * 80, 'suggestion': 'Y' * 80} + for _ in range(5) + ] + out = svc._format_report( + target_date=TARGET_DATE, + summary=_make_summary(), + by_provider=_make_by_provider(), + top_callers=big_callers[:10], + costs=big_costs, + trends=_make_trends(), + budgets=_make_budgets(), + cache_stats=_make_cache_stats(), + alerts=big_alerts, + insights=[], + ) + # send_daily_report 端會做 4000 字截斷(HTML 安全),單元測試先確認原始長度可控 + assert len(out) < 6000, f"原始報表 {len(out)} 字元,可能需縮減欄位寬度" + + def test_format_report_html_escape_caller_name(self): + """caller 名含