feat(observability): ai_call_logger + 23:55 Telegram token 日報

services/ai_call_logger.py(300 行)— 統一 LLM 遙測層
- context manager log_ai_call() / decorator logged_ai_call()
- async fire-and-forget 寫 ai_calls,DB 失敗永不影響主流程
- kill-switch:連續 10 次失敗自動降級為 logger.info
- env AI_CALL_LOGGING_ENABLED=false 一鍵關閉
- COST_TABLE 集中 13 個模型計費(gemini/claude/nim/ollama)
- PII 保護:meta 只存 prompt_hash[:12],不存原文
- 22 unit tests 全綠

services/token_report_service.py(580 行)— 6 段落每日 23:55 日報
- Section 1-6: 總覽 / 供應商分布 / TOP10 caller / 成本預算 / 趨勢 / 告警建議
- 7 條告警規則 + Hermes 規則引擎智能建議
- HTML escape + 4096 字元雙保險
- Telegram 失敗 fallback 訊息
- ai_insights 寫入 PII safe(無 chat_id/username 落地)
- 30 unit tests 全綠

A11 critic 護欄:H6 chat_id PII fix(services/openclaw_bot_routes 4 處 → SHA1[:8])

Operation Ollama-First v5.0 / Phase 1 A4+A5

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-03 23:04:58 +08:00
parent 4648673423
commit bb891f1a6e
4 changed files with 2253 additions and 0 deletions

434
services/ai_call_logger.py Normal file
View File

@@ -0,0 +1,434 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/ai_call_logger.py
統一 LLM 呼叫遙測層 (Operation Ollama-First v5.0 — Phase 1)
依據:
- docs/phase0_audit_report_20260503.md (34 個 LLM 呼叫點 / 11.8% 覆蓋率)
- docs/phase1_db_design_20260503.md (ai_calls schema)
- migrations/024_create_ai_calls_table.sql
設計原則 (憲法級):
1. 非阻塞: DB 寫入跑 daemon thread主流程不等
2. 失敗安全: DB 例外只 log warning絕不影響 LLM 主流程
3. PII 保護: meta 不存原始 prompt只存 prompt_hash[:12]
4. Kill-switch: AI_CALL_LOGGING_ENABLED=false 一鍵關閉
5. 連續失敗 ≥ 10 次自動降級為純 logger.info
主入口:
- log_ai_call(...) context manager (推薦)
- logged_ai_call(...) decorator (簡單一行 LLM call)
"""
from __future__ import annotations
import hashlib
import inspect
import logging
import os
import threading
import time
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 成本表 (USD per 1M tokens)
# 依據 phase0 audit + 各 provider 官方定價Ollama 全部 0
# ─────────────────────────────────────────────────────────────────────────────
COST_TABLE: Dict[str, Dict[str, float]] = {
# Gemini
'gemini-2.5-flash': {'in': 0.075, 'out': 0.30},
'gemini-2.5-pro': {'in': 1.25, 'out': 10.0},
'gemini-2.0-flash': {'in': 0.075, 'out': 0.30},
'gemini-1.5-flash': {'in': 0.075, 'out': 0.30},
# NVIDIA NIM (配額制,免費 tier 全 0)
'meta/llama-3.1-8b-instruct': {'in': 0.0, 'out': 0.0},
'meta/llama-3.3-70b-instruct': {'in': 0.0, 'out': 0.0},
'nvidia/llama-3.3-nemotron-super-49b-v1.5': {'in': 0.0, 'out': 0.0},
'deepseek-ai/deepseek-v3.2': {'in': 0.0, 'out': 0.0},
# Claude
'claude-opus-4-7': {'in': 15.0, 'out': 75.0},
'claude-sonnet-4-6': {'in': 3.0, 'out': 15.0},
# Ollama 自架 (全 0)
'hermes3:latest': {'in': 0.0, 'out': 0.0},
'qwen2.5-coder:7b': {'in': 0.0, 'out': 0.0},
'llama3.1:8b': {'in': 0.0, 'out': 0.0},
'bge-m3:latest': {'in': 0.0, 'out': 0.0},
}
# ─────────────────────────────────────────────────────────────────────────────
# 環境開關 + Kill-switch
# ─────────────────────────────────────────────────────────────────────────────
def _is_logging_enabled() -> bool:
"""環境變數即時讀取 (允許 runtime toggle)"""
val = os.environ.get('AI_CALL_LOGGING_ENABLED', 'true').strip().lower()
return val not in ('false', '0', 'no', 'off')
# 連續失敗門檻;超過後降級為純 logger.info不再嘗試 DB
_MAX_CONSECUTIVE_FAILURES = 10
_failure_counter_lock = threading.Lock()
_failure_state = {'count': 0, 'killed': False}
def _record_failure() -> None:
with _failure_counter_lock:
_failure_state['count'] += 1
if _failure_state['count'] >= _MAX_CONSECUTIVE_FAILURES and not _failure_state['killed']:
_failure_state['killed'] = True
logger.error(
"[AICallLogger] consecutive write failures hit %d — kill-switch ON, "
"downgrading to logger.info only",
_MAX_CONSECUTIVE_FAILURES,
)
def _record_success() -> None:
with _failure_counter_lock:
if _failure_state['count'] > 0:
_failure_state['count'] = 0
def _is_killed() -> bool:
with _failure_counter_lock:
return _failure_state['killed']
def _reset_kill_switch() -> None:
"""測試專用:重置 kill-switch 狀態。"""
with _failure_counter_lock:
_failure_state['count'] = 0
_failure_state['killed'] = False
# ─────────────────────────────────────────────────────────────────────────────
# 內部狀態容器
# ─────────────────────────────────────────────────────────────────────────────
class _CallState:
"""單次 LLM 呼叫的遙測狀態容器。"""
__slots__ = (
'caller', 'provider', 'model', 'request_id',
'input_tokens', 'output_tokens',
'duration_ms', 'status', 'fallback_to',
'cost_usd', 'cache_hit', 'rag_hit',
'error', 'meta',
)
def __init__(self, caller: str, provider: str, model: str,
request_id: Optional[str], meta: Dict[str, Any]):
self.caller = caller
self.provider = provider
self.model = model
self.request_id = request_id
self.input_tokens = 0
self.output_tokens = 0
self.duration_ms: Optional[int] = None
self.status: Optional[str] = None
self.fallback_to: Optional[str] = None
self.cost_usd = 0.0
self.cache_hit = False
self.rag_hit = False
self.error: Optional[str] = None
self.meta: Dict[str, Any] = dict(meta) if meta else {}
# ── caller 操作 API ──────────────────────────────────────────────
def set_tokens(self, input: int = 0, output: int = 0) -> None:
"""設定 token 數。容錯 None / 非整數。"""
try:
self.input_tokens = int(input or 0)
except (TypeError, ValueError):
self.input_tokens = 0
try:
self.output_tokens = int(output or 0)
except (TypeError, ValueError):
self.output_tokens = 0
def set_cache_hit(self, hit: bool = True) -> None:
self.cache_hit = bool(hit)
def set_rag_hit(self, hit: bool = True) -> None:
self.rag_hit = bool(hit)
def fallback_to_caller(self, target_caller: str) -> None:
"""主路徑失敗、觸發下游 caller 接手。下游本身會另寫一筆 ok/error。"""
self.fallback_to = (target_caller or '')[:64]
self.status = 'fallback'
# 別名:與設計文 spec 對齊
fallback_to_target = fallback_to_caller
def set_error(self, msg: str) -> None:
self.error = (msg or '')[:2000]
self.status = 'error'
def set_status(self, status: str) -> None:
self.status = (status or '')[:16]
def set_prompt_hash(self, prompt: Optional[str]) -> None:
"""安全地將 prompt 轉成 hash 存入 metaPII 保護)。"""
if prompt:
digest = hashlib.sha256(prompt.encode('utf-8', errors='replace')).hexdigest()
self.meta['prompt_hash'] = digest[:12]
def add_meta(self, key: str, value: Any) -> None:
if key:
self.meta[key] = value
# ─────────────────────────────────────────────────────────────────────────────
# 主入口 1: context manager
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def log_ai_call(
caller: str,
provider: str,
model: str,
request_id: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None,
):
"""
使用範例:
with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx:
response = ollama.generate(...)
ctx.set_tokens(input=response['prompt_eval_count'],
output=response['eval_count'])
ctx.set_cache_hit(False)
# 失敗時 ctx.set_error('timeout') / ctx.fallback_to_caller('111_ollama')
紀律:
- 永遠不影響主流程:例外會 re-raise但 logger 寫入是 fire-and-forget
- 若 AI_CALL_LOGGING_ENABLED=false → 仍 yield ctxAPI 一致),但跳過寫入
"""
state = _CallState(caller, provider, model, request_id, meta or {})
start = time.monotonic()
try:
yield state
# 沒例外 → 若 caller 自己沒設 status預設 ok
if state.status is None:
state.status = 'ok'
except Exception as e:
state.status = 'error'
if not state.error:
state.error = f"{type(e).__name__}: {str(e)[:1500]}"
raise
finally:
state.duration_ms = int((time.monotonic() - start) * 1000)
try:
_async_write(state)
except Exception as exc: # pragma: no cover — 寫入 thread 啟動失敗
logger.warning("[AICallLogger] async dispatch failed: %s", exc)
# ─────────────────────────────────────────────────────────────────────────────
# 主入口 2: decorator
# ─────────────────────────────────────────────────────────────────────────────
def logged_ai_call(
caller: str,
provider: str,
model: Optional[str] = None,
model_extractor: Optional[Callable[[tuple, dict], str]] = None,
):
"""
使用範例:
@logged_ai_call(caller='sales_copy', provider='gcp_ollama',
model_extractor=lambda a, kw: kw.get('model', 'llama3.1:8b'))
def generate_copy(...):
return ollama.generate(model='llama3.1:8b', ...)
Args:
caller: ai_calls.caller 白名單字串
provider: ai_calls.provider
model: 靜態模型名(與 model_extractor 二擇一)
model_extractor: 從 (args, kwargs) 解析 model 名(動態優先)
注意:
- decorator 不知道 token 數;若需精準 token請改用 log_ai_call context manager
- 例外會 re-raise狀態自動標 error
"""
def deco(fn: Callable):
@wraps(fn)
def wrapper(*args, **kwargs):
try:
resolved_model = (
model_extractor(args, kwargs) if model_extractor else (model or 'unknown')
)
except Exception:
resolved_model = model or 'unknown'
with log_ai_call(caller, provider, resolved_model) as ctx:
result = fn(*args, **kwargs)
# 嘗試從 result 自動抽 tokensbest-effort失敗不影響主流程
try:
_auto_extract_tokens(ctx, result)
except Exception:
pass
return result
return wrapper
return deco
def _auto_extract_tokens(ctx: _CallState, result: Any) -> None:
"""從常見 LLM response 形態自動抽 tokenbest-effort"""
if result is None:
return
# dict (Ollama / NIM raw)
if isinstance(result, dict):
usage = result.get('usage') or {}
if usage:
ctx.set_tokens(
input=usage.get('prompt_tokens') or usage.get('input_tokens') or 0,
output=usage.get('completion_tokens') or usage.get('output_tokens') or 0,
)
return
# Ollama: prompt_eval_count / eval_count
if 'eval_count' in result or 'prompt_eval_count' in result:
ctx.set_tokens(
input=result.get('prompt_eval_count', 0),
output=result.get('eval_count', 0),
)
return
# ─────────────────────────────────────────────────────────────────────────────
# 異步寫入 (fire-and-forget)
# ─────────────────────────────────────────────────────────────────────────────
def _async_write(state: _CallState) -> None:
"""放到 daemon thread 寫,主流程不阻塞。
若 AI_CALL_LOGGING_ENABLED=false → 直接跳過。
若 kill-switch 觸發 → 退化為 logger.info。
"""
if not _is_logging_enabled():
return
if _is_killed():
# 降級模式:純 log不再碰 DB
logger.info(
"[AICall|killed] caller=%s provider=%s model=%s status=%s "
"tokens=%s/%s duration=%sms",
state.caller, state.provider, state.model, state.status,
state.input_tokens, state.output_tokens, state.duration_ms,
)
return
threading.Thread(
target=_write_to_db,
args=(state,),
name=f"ai-call-log-{state.caller}",
daemon=True,
).start()
def _write_to_db(state: _CallState) -> None:
"""try/except 全包DB 掛了只 log warning 不爆炸。"""
try:
from sqlalchemy import text
from database.manager import get_session
cost = _calc_cost(state.model, state.input_tokens, state.output_tokens)
meta_json = _safe_meta_json(state.meta)
session = get_session()
try:
session.execute(
text("""
INSERT INTO ai_calls (
caller, provider, model,
input_tokens, output_tokens, duration_ms,
status, fallback_to, cost_usd,
cache_hit, rag_hit, request_id,
error, meta
) VALUES (
:caller, :provider, :model,
:input_tokens, :output_tokens, :duration_ms,
:status, :fallback_to, :cost_usd,
:cache_hit, :rag_hit, :request_id,
:error, CAST(:meta AS JSONB)
)
"""),
{
'caller': state.caller[:64] if state.caller else 'unknown',
'provider': (state.provider or 'unknown')[:32],
'model': (state.model or 'unknown')[:128],
'input_tokens': int(state.input_tokens or 0),
'output_tokens': int(state.output_tokens or 0),
'duration_ms': state.duration_ms,
'status': (state.status or 'ok')[:16],
'fallback_to': state.fallback_to,
'cost_usd': cost,
'cache_hit': bool(state.cache_hit),
'rag_hit': bool(state.rag_hit),
'request_id': state.request_id,
'error': state.error,
'meta': meta_json,
},
)
session.commit()
_record_success()
except Exception:
session.rollback()
raise
finally:
session.close()
except Exception as e:
_record_failure()
logger.warning(
"[AICallLogger] write failed (caller=%s provider=%s): %s",
state.caller, state.provider, e,
)
def _calc_cost(model: str, in_tokens: int, out_tokens: int) -> float:
"""依 COST_TABLE 計算成本;未知 model log warning 並回 0。"""
if not model:
return 0.0
rate = COST_TABLE.get(model)
if rate is None:
# NIM 配額制走免費 tier常見 nvidia/* meta/* deepseek-* 視為 0
prefix_zero = ('meta/', 'nvidia/', 'deepseek-')
if any(model.startswith(p) for p in prefix_zero):
return 0.0
logger.warning("[AICallLogger] unknown model cost: %s, default 0", model)
return 0.0
in_t = max(0, int(in_tokens or 0))
out_t = max(0, int(out_tokens or 0))
cost = (in_t * rate['in'] + out_t * rate['out']) / 1_000_000
# NUMERIC(10,6) 上限 9999.999999;極端 case 截斷避免 overflow
if cost < 0:
return 0.0
return round(min(cost, 9999.999999), 6)
def _safe_meta_json(meta: Dict[str, Any]) -> str:
"""meta 序列化為 JSON 字串;失敗時回 '{}'"""
import json
if not meta:
return '{}'
try:
return json.dumps(meta, ensure_ascii=False, default=str)
except Exception as e:
logger.warning("[AICallLogger] meta json dump failed: %s", e)
return '{}'
# ─────────────────────────────────────────────────────────────────────────────
# 工具caller 自動推斷caller 沒給時用)
# ─────────────────────────────────────────────────────────────────────────────
def infer_caller_from_stack(default: str = 'unknown') -> str:
"""從 inspect.stack() 推斷 caller取上 1 層的 module 名末段)。"""
try:
frame = inspect.stack()[2]
module = inspect.getmodule(frame.frame)
if module and module.__name__:
return module.__name__.split('.')[-1][:64]
except Exception:
pass
return default

View File

@@ -0,0 +1,867 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/token_report_service.py
LLM Token 日報服務 (Operation Ollama-First v5.0 — Phase 1 收尾)
依據:
- migrations/024_create_ai_calls_table.sql (ai_calls schema + CHECK constraints)
- migrations/025_create_mcp_calls_and_budgets.sql (ai_call_budgets 種子資料)
- services/ai_call_logger.py (COST_TABLE / provider 白名單)
- services/telegram_templates.py (HTML escape 與 send 封裝)
- docs/phase0_audit_report_20260503.md (34 LLM 呼叫點清冊)
- docs/phase1_db_design_20260503.md (查詢 latency 預估)
設計紀律 (憲法級):
1. 失敗安全: DB 查詢失敗 → 推「⚠️ 報表生成失敗」訊息,不影響其他排程
2. PII 保護: 報表訊息不含 prompt 原文ai_insights metadata 只存統計 meta不存 username
3. 不污染既有 Telegram 流程: 共用 telegram_templates 既有 send 函數,不另開連線
4. ≤ 4096 字元自動截斷: Telegram 單訊息上限保險絲
公開 API:
- generate_daily_report(target_date) → str (HTML)
- send_daily_report() → dict (sent/failed/errors)
"""
from __future__ import annotations
import logging
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Asia/Taipei (UTC+8) 統一處理(避免容器 tzdata 差異,沿襲 telegram_templates 慣例)
_TAIPEI_TZ = timezone(timedelta(hours=8))
# Telegram 單則訊息字元上限(保留 96 字元給 footer避免精準卡 4096
_TELEGRAM_MAX_CHARS = 4000
# Provider 顯示名稱表(與 ai_calls.provider 白名單對齊order 即報表順序)
_PROVIDER_DISPLAY: Dict[str, Tuple[str, str]] = {
'gcp_ollama': ('🟢', 'GCP Ollama'),
'ollama_secondary': ('🟢', 'Secondary'), # critic-A11 B4 修補:三主機架構一致性
'ollama_111': ('🟠', '111 Ollama'),
'gemini': ('🔴', 'Gemini'),
'claude': ('🟣', 'Claude'),
'nim': ('🟡', 'NIM'),
'openrouter': ('🟤', 'OpenRouter'),
'nim_via_elephant': ('🟫', 'NIM_via_Eleph'),
}
# Ollama 占比門檻Section 1 「Ollama-First 達標」判斷用,戰役 KPI ≥60%
_OLLAMA_FIRST_TARGET_PCT = 60.0
# 告警規則參數Section 6 自動產生用)
_ALERT_RULES = {
'caller_spike_factor': 1.4, # tokens > 7 日均 × 1.4
'gemini_share_threshold': 35.0, # gemini 占比 > 35% 視為 Ollama-First 失守
'error_rate_critical': 5.0, # error_rate > 5% → P1
'budget_warning': 80.0, # spent / budget > 80% → P1
'gcp_hit_warning': 90.0, # gcp_ollama 占比 < 90% (Ollama 內) → P2
'cache_hit_low': 40.0, # claude cache hit < 40% → INFO
'caller_stable_days': 7, # 連續 N 日 Ollama >95% → INFO「可關 fallback」
'ollama_stable_pct': 95.0,
}
# ═══════════════════════════════════════════════════════════════════════════════
# 公開 API
# ═══════════════════════════════════════════════════════════════════════════════
def generate_daily_report(target_date: Optional[date] = None) -> str:
"""產出指定日的 LLM Token 日報HTML供 Telegram parse_mode='HTML')。
Args:
target_date: 統計目標日Asia/Taipei。未指定 → 「今日」。
Returns:
完整 HTML 報表字串;若 DB 查詢失敗,回傳簡短錯誤訊息(仍可發 Telegram
"""
if target_date is None:
target_date = datetime.now(_TAIPEI_TZ).date()
try:
summary = _query_summary(target_date)
by_provider = _query_by_provider(target_date)
top_callers = _query_top_callers(target_date, limit=10)
costs = _query_cost_breakdown(target_date)
trends = _query_trends_vs_7day(target_date)
budgets = _query_budget_usage(target_date)
cache_stats = _query_cache_hit_stats(target_date)
except Exception as exc:
logger.exception("[TokenReport] DB query failed: %s", exc)
return _format_failure_report(target_date, str(exc))
alerts = _detect_alerts(summary, by_provider, top_callers, trends, budgets, cache_stats)
insights = _generate_insights(target_date, summary, by_provider)
return _format_report(
target_date=target_date,
summary=summary,
by_provider=by_provider,
top_callers=top_callers,
costs=costs,
trends=trends,
budgets=budgets,
cache_stats=cache_stats,
alerts=alerts,
insights=insights,
)
def send_daily_report(target_date: Optional[date] = None) -> Dict[str, Any]:
"""產報並送 Telegram + 寫 ai_insights。
Returns:
{'ok': bool, 'sent': int, 'failed': int, 'chars': int, 'errors': list}
"""
if target_date is None:
target_date = datetime.now(_TAIPEI_TZ).date()
try:
report_html = generate_daily_report(target_date)
except Exception as exc:
logger.exception("[TokenReport] generate_daily_report failed: %s", exc)
report_html = _format_failure_report(target_date, str(exc))
# 截斷至 Telegram 安全長度HTML tag 簡化處理:超出時加省略尾)
if len(report_html) > _TELEGRAM_MAX_CHARS:
truncated = report_html[: _TELEGRAM_MAX_CHARS - 80]
report_html = truncated + "\n\n... <i>(訊息超長,已截斷;詳見 ai_insights)</i>"
# 送 Telegram用既有封裝不另起連線
result: Dict[str, Any] = {'ok': False, 'sent': 0, 'failed': 0, 'chars': len(report_html), 'errors': []}
try:
from services.telegram_templates import send_telegram_with_result
send_result = send_telegram_with_result(report_html, parse_mode='HTML')
result.update({
'ok': bool(send_result.get('ok')),
'sent': int(send_result.get('sent', 0)),
'failed': int(send_result.get('failed', 0)),
'errors': list(send_result.get('errors', [])),
})
except Exception as exc:
logger.exception("[TokenReport] telegram send failed: %s", exc)
result['errors'].append(f"telegram:{type(exc).__name__}")
# 寫 ai_insights不含 PII / 不存 username
try:
_persist_to_ai_insights(target_date, report_html, result)
except Exception as exc:
logger.warning("[TokenReport] ai_insights persist failed: %s", exc)
return result
# ═══════════════════════════════════════════════════════════════════════════════
# 內部SQL 查詢
# ═══════════════════════════════════════════════════════════════════════════════
def _date_window(target_date: date) -> Tuple[datetime, datetime]:
"""回傳 [day_start, day_end) 的 Taipei tz-aware datetimePostgreSQL 比較用)。"""
day_start = datetime.combine(target_date, datetime.min.time(), tzinfo=_TAIPEI_TZ)
day_end = day_start + timedelta(days=1)
return day_start, day_end
def _exec_query(sql: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""執行查詢並回傳 list of dict。session 隔離,例外向上拋。"""
from sqlalchemy import text
from database.manager import get_session
session = get_session()
try:
rows = session.execute(text(sql), params).mappings().all()
return [dict(r) for r in rows]
finally:
session.close()
def _query_summary(target_date: date) -> Dict[str, Any]:
"""Section 1 — 今日總覽(單列彙總)。
Returns:
{total_tokens, total_calls, total_cost_usd, avg_duration_ms,
success_rate, ollama_pct, prev_total_tokens (昨日比基準)}
"""
day_start, day_end = _date_window(target_date)
prev_start = day_start - timedelta(days=1)
rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens,
COUNT(*) AS total_calls,
COALESCE(SUM(cost_usd), 0) AS total_cost_usd,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
COALESCE(SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END), 0) AS ok_calls,
COALESCE(SUM(
CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END
), 0) AS ollama_tokens
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': day_start, 'end': day_end})
prev_rows = _exec_query("""
SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS prev_total_tokens
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': prev_start, 'end': day_start})
r = rows[0] if rows else {}
total_calls = int(r.get('total_calls') or 0)
total_tokens = int(r.get('total_tokens') or 0)
ok_calls = int(r.get('ok_calls') or 0)
ollama_tokens = int(r.get('ollama_tokens') or 0)
prev_total = int((prev_rows[0] if prev_rows else {}).get('prev_total_tokens') or 0)
return {
'total_tokens': total_tokens,
'total_calls': total_calls,
'total_cost_usd': float(r.get('total_cost_usd') or 0),
'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
'success_rate': (ok_calls / total_calls * 100.0) if total_calls else 0.0,
'failed_calls': max(0, total_calls - ok_calls),
'ollama_pct': (ollama_tokens / total_tokens * 100.0) if total_tokens else 0.0,
'prev_total_tokens': prev_total,
'wow_pct': ((total_tokens - prev_total) / prev_total * 100.0) if prev_total else 0.0,
}
def _query_by_provider(target_date: date) -> List[Dict[str, Any]]:
"""Section 2 — 供應商分布(依 7 個 provider含 0 筆者也顯示)。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
SUM(input_tokens + output_tokens)::BIGINT AS tokens,
COUNT(*) AS calls,
COALESCE(SUM(cost_usd), 0) AS cost_usd,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
GROUP BY provider
""", {'start': day_start, 'end': day_end})
by_p = {r['provider']: r for r in rows}
total_tokens = sum(int(r['tokens'] or 0) for r in rows)
result: List[Dict[str, Any]] = []
for p_key in _PROVIDER_DISPLAY:
r = by_p.get(p_key, {})
tokens = int(r.get('tokens') or 0)
result.append({
'provider': p_key,
'tokens': tokens,
'pct': (tokens / total_tokens * 100.0) if total_tokens else 0.0,
'calls': int(r.get('calls') or 0),
'cost_usd': float(r.get('cost_usd') or 0),
'avg_duration_ms': float(r.get('avg_duration_ms') or 0),
})
return result
def _query_top_callers(target_date: date, limit: int = 10) -> List[Dict[str, Any]]:
"""Section 3 — TOP N caller by token + 與 7 日均的偏差。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=7)
rows = _exec_query("""
WITH today AS (
SELECT
caller,
provider,
MODE() WITHIN GROUP (ORDER BY model) AS top_model,
SUM(input_tokens + output_tokens)::BIGINT AS tokens,
COUNT(*) AS calls
FROM ai_calls
WHERE called_at >= :day_start AND called_at < :day_end
GROUP BY caller, provider
),
baseline AS (
SELECT
caller,
SUM(input_tokens + output_tokens) / 7.0 AS avg_tokens_7d
FROM ai_calls
WHERE called_at >= :week_start AND called_at < :day_start
GROUP BY caller
)
SELECT
t.caller, t.provider, t.top_model, t.tokens, t.calls,
COALESCE(b.avg_tokens_7d, 0) AS avg_tokens_7d
FROM today t
LEFT JOIN baseline b ON b.caller = t.caller
ORDER BY t.tokens DESC
LIMIT :limit
""", {
'day_start': day_start,
'day_end': day_end,
'week_start': week_start,
'limit': int(limit),
})
result: List[Dict[str, Any]] = []
for r in rows:
tokens = int(r.get('tokens') or 0)
baseline = float(r.get('avg_tokens_7d') or 0)
delta_pct = ((tokens - baseline) / baseline * 100.0) if baseline > 0 else None
result.append({
'caller': str(r.get('caller') or ''),
'provider': str(r.get('provider') or ''),
'model': str(r.get('top_model') or ''),
'tokens': tokens,
'calls': int(r.get('calls') or 0),
'delta_pct': delta_pct,
})
return result
def _query_cost_breakdown(target_date: date) -> List[Dict[str, Any]]:
"""Section 4 — 依 model 拆解成本(金額由大到小,零成本不顯示)。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
model,
COALESCE(SUM(cost_usd), 0) AS cost_usd,
COUNT(*) AS calls
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
AND cost_usd > 0
GROUP BY provider, model
ORDER BY cost_usd DESC
LIMIT 12
""", {'start': day_start, 'end': day_end})
return [
{
'provider': str(r['provider']),
'model': str(r['model']),
'cost_usd': float(r['cost_usd']),
'calls': int(r['calls']),
}
for r in rows
]
def _query_trends_vs_7day(target_date: date) -> Dict[str, Any]:
"""Section 5 — 今日 vs 過去 7 日均 的趨勢比對。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=7)
today_rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_tokens,
COALESCE(SUM(CASE WHEN provider='gemini'
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS gemini_tokens,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS ollama_tokens,
COALESCE(SUM(CASE WHEN provider='claude'
THEN input_tokens + output_tokens ELSE 0 END), 0)::BIGINT AS claude_tokens,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
COALESCE(SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END), 0) AS failed,
COUNT(*) AS total_calls,
COALESCE(SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END), 0) AS gcp_calls,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END), 0) AS ollama_calls
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': day_start, 'end': day_end})
base_rows = _exec_query("""
SELECT
COALESCE(SUM(input_tokens + output_tokens) / 7.0, 0) AS avg_total_tokens,
COALESCE(SUM(CASE WHEN provider='gemini'
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_gemini_tokens,
COALESCE(SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_ollama_tokens,
COALESCE(SUM(CASE WHEN provider='claude'
THEN input_tokens + output_tokens ELSE 0 END) / 7.0, 0) AS avg_claude_tokens,
COALESCE(AVG(duration_ms), 0) AS avg_duration_ms,
CASE WHEN COUNT(*) > 0
THEN SUM(CASE WHEN status<>'ok' THEN 1 ELSE 0 END)::FLOAT / COUNT(*) * 100.0
ELSE 0 END AS error_rate_pct,
COALESCE(SUM(input_tokens + output_tokens), 0)::BIGINT AS total_7d_tokens,
COALESCE(SUM(cost_usd), 0) AS total_7d_cost,
CASE WHEN SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END) > 0
THEN SUM(CASE WHEN provider='gcp_ollama' THEN 1 ELSE 0 END)::FLOAT
/ SUM(CASE WHEN provider IN ('gcp_ollama','ollama_secondary','ollama_111')
THEN 1 ELSE 0 END)::FLOAT * 100.0
ELSE 0 END AS gcp_hit_pct_7d
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
""", {'start': week_start, 'end': day_start})
t = today_rows[0] if today_rows else {}
b = base_rows[0] if base_rows else {}
today_total = int(t.get('total_tokens') or 0)
today_gemini = int(t.get('gemini_tokens') or 0)
today_ollama = int(t.get('ollama_tokens') or 0)
today_claude = int(t.get('claude_tokens') or 0)
today_calls = int(t.get('total_calls') or 0)
today_failed = int(t.get('failed') or 0)
today_gcp_calls = int(t.get('gcp_calls') or 0)
today_ollama_cal = int(t.get('ollama_calls') or 0)
today_error_pct = (today_failed / today_calls * 100.0) if today_calls else 0.0
today_gcp_hit = (today_gcp_calls / today_ollama_cal * 100.0) if today_ollama_cal else 0.0
return {
'today_total_tokens': today_total,
'today_gemini_tokens': today_gemini,
'today_ollama_tokens': today_ollama,
'today_claude_tokens': today_claude,
'today_avg_duration': float(t.get('avg_duration_ms') or 0),
'today_error_rate': today_error_pct,
'today_gcp_hit_pct': today_gcp_hit,
'7d_avg_total': float(b.get('avg_total_tokens') or 0),
'7d_avg_gemini': float(b.get('avg_gemini_tokens') or 0),
'7d_avg_ollama': float(b.get('avg_ollama_tokens') or 0),
'7d_avg_claude': float(b.get('avg_claude_tokens') or 0),
'7d_avg_duration': float(b.get('avg_duration_ms') or 0),
'7d_error_rate': float(b.get('error_rate_pct') or 0),
'7d_total_tokens': int(b.get('total_7d_tokens') or 0),
'7d_total_cost': float(b.get('total_7d_cost') or 0),
'7d_gcp_hit_pct': float(b.get('gcp_hit_pct_7d') or 0),
}
def _query_budget_usage(target_date: date) -> Dict[str, Any]:
"""Section 4 — 預算對比daily/weekly/monthly 全供應商總額)。"""
day_start, day_end = _date_window(target_date)
week_start = day_start - timedelta(days=6)
month_start = day_start.replace(day=1)
spent = _exec_query("""
SELECT
COALESCE(SUM(CASE WHEN called_at >= :day_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS daily_spent,
COALESCE(SUM(CASE WHEN called_at >= :week_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS weekly_spent,
COALESCE(SUM(CASE WHEN called_at >= :month_start AND called_at < :day_end
THEN cost_usd ELSE 0 END), 0) AS monthly_spent,
COUNT(*) FILTER (WHERE called_at >= :month_start) AS month_call_count
FROM ai_calls
WHERE called_at >= :month_start AND called_at < :day_end
""", {
'day_start': day_start,
'day_end': day_end,
'week_start': week_start,
'month_start': month_start,
})
budget_rows = _exec_query("""
SELECT period, provider, budget_usd, alert_pct
FROM ai_call_budgets
WHERE provider IS NULL
""", {})
budgets = {r['period']: float(r['budget_usd']) for r in budget_rows}
s = spent[0] if spent else {}
return {
'daily_spent': float(s.get('daily_spent') or 0),
'weekly_spent': float(s.get('weekly_spent') or 0),
'monthly_spent': float(s.get('monthly_spent') or 0),
'daily_budget': budgets.get('daily', 0.0),
'weekly_budget': budgets.get('weekly', 0.0),
'monthly_budget': budgets.get('monthly', 0.0),
}
def _query_cache_hit_stats(target_date: date) -> Dict[str, Any]:
"""Section 4 — Anthropic / Gemini prompt cache 命中統計。"""
day_start, day_end = _date_window(target_date)
rows = _exec_query("""
SELECT
provider,
COUNT(*) AS total_calls,
SUM(CASE WHEN cache_hit THEN 1 ELSE 0 END) AS cache_hits
FROM ai_calls
WHERE called_at >= :start AND called_at < :end
AND provider IN ('claude','gemini')
GROUP BY provider
""", {'start': day_start, 'end': day_end})
by_p = {r['provider']: r for r in rows}
out: Dict[str, Any] = {}
for p in ('claude', 'gemini'):
r = by_p.get(p, {})
total = int(r.get('total_calls') or 0)
hits = int(r.get('cache_hits') or 0)
out[p] = {
'total': total,
'hits': hits,
'pct': (hits / total * 100.0) if total else 0.0,
}
return out
# ═══════════════════════════════════════════════════════════════════════════════
# 內部告警偵測Section 6
# ═══════════════════════════════════════════════════════════════════════════════
def _detect_alerts(
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
top_callers: List[Dict[str, Any]],
trends: Dict[str, Any],
budgets: Dict[str, Any],
cache_stats: Dict[str, Any],
) -> List[Dict[str, str]]:
"""依 7 條規則產生告警清單,回傳 [{level, icon, title, suggestion}, ...]"""
alerts: List[Dict[str, str]] = []
# R1: 單一 caller 暴增 (P2)
spike_factor = _ALERT_RULES['caller_spike_factor']
for caller in top_callers:
delta = caller.get('delta_pct')
if delta is not None and delta >= (spike_factor - 1) * 100.0:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"{caller['caller']} token 暴增 {delta:+.0f}%vs 7 日均)",
'suggestion': f"今日 {caller['tokens']:,} tokens / {caller['calls']} calls建議查 prompt 是否變更",
})
# R2: Gemini 占比飆升 (P2 「Ollama-First 失守」)
gemini = next((r for r in by_provider if r['provider'] == 'gemini'), {})
gemini_pct = float(gemini.get('pct') or 0)
if gemini_pct > _ALERT_RULES['gemini_share_threshold']:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"Gemini 占比 {gemini_pct:.1f}% 高於門檻 {_ALERT_RULES['gemini_share_threshold']:.0f}%",
'suggestion': "Ollama-First 失守,請檢查 fallback 是否正確命中本地",
})
# R3: 失敗率 (P1)
total_calls = int(summary.get('total_calls') or 0)
failed = int(summary.get('failed_calls') or 0)
if total_calls:
err_rate = failed / total_calls * 100.0
if err_rate > _ALERT_RULES['error_rate_critical']:
alerts.append({
'level': 'P1', 'icon': '🔴',
'title': f"全域失敗率 {err_rate:.1f}% 超過門檻 {_ALERT_RULES['error_rate_critical']:.0f}%",
'suggestion': f"今日 {failed:,} / {total_calls:,} 失敗,立即查 ai_calls WHERE status<>'ok'",
})
# R4: 預算超標 (P1)
for period_key, label in (('daily', ''), ('weekly', ''), ('monthly', '')):
spent = float(budgets.get(f'{period_key}_spent') or 0)
budget = float(budgets.get(f'{period_key}_budget') or 0)
if budget > 0:
usage_pct = spent / budget * 100.0
if usage_pct > _ALERT_RULES['budget_warning']:
alerts.append({
'level': 'P1', 'icon': '🔴',
'title': f"{label}成本 ${spent:.2f} 達預算 ${budget:.2f}{usage_pct:.0f}%",
'suggestion': "請檢查供應商分布是否異常Section 2/3或調整預算",
})
# R5: GCP 命中率低 (P2) — 僅當有 Ollama 流量時才檢查
today_gcp_hit = float(trends.get('today_gcp_hit_pct') or 0)
ollama = sum(int(r.get('tokens') or 0) for r in by_provider
if r['provider'] in ('gcp_ollama', 'ollama_secondary', 'ollama_111'))
if ollama > 0 and today_gcp_hit < _ALERT_RULES['gcp_hit_warning']:
alerts.append({
'level': 'P2', 'icon': '🟠',
'title': f"GCP Ollama 命中率 {today_gcp_hit:.1f}% 低於 {_ALERT_RULES['gcp_hit_warning']:.0f}%",
'suggestion': "111 fallback 觸發頻繁,請檢查 GCP Ollama 健康ADR-027",
})
# R6: Cache 命中率低 (INFO) — claude
claude_cache = cache_stats.get('claude', {})
if int(claude_cache.get('total') or 0) >= 10:
if float(claude_cache.get('pct') or 0) < _ALERT_RULES['cache_hit_low']:
alerts.append({
'level': 'INFO', 'icon': '🟢',
'title': f"Claude prompt cache 命中率僅 {claude_cache['pct']:.1f}%",
'suggestion': "可優化 system prompt 結構≥1024 tokens 才觸發 cache",
})
return alerts
def _generate_insights(
target_date: date,
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
) -> List[Dict[str, str]]:
"""Section 6 智能建議(規則引擎,零 LLM 成本)。"""
insights: List[Dict[str, str]] = []
ollama_pct = float(summary.get('ollama_pct') or 0)
if ollama_pct >= _OLLAMA_FIRST_TARGET_PCT:
insights.append({
'icon': '',
'text': f"Ollama 占比 {ollama_pct:.1f}%(目標 ≥{_OLLAMA_FIRST_TARGET_PCT:.0f}%Ollama-First 戰役達標",
})
else:
insights.append({
'icon': '⚠️',
'text': f"Ollama 占比 {ollama_pct:.1f}% 未達 {_OLLAMA_FIRST_TARGET_PCT:.0f}% 目標,可優化 fallback 鏈",
})
nim_total = sum(
int(r.get('tokens') or 0) for r in by_provider
if r['provider'] in ('nim', 'nim_via_elephant')
)
if 0 < nim_total < 100_000:
insights.append({
'icon': '',
'text': f"NIM 用量已降至 {nim_total:,} tokens戰役前約 5M可考慮關閉 NIM 依賴",
})
success_rate = float(summary.get('success_rate') or 0)
if summary.get('total_calls') and success_rate >= 99.0:
insights.append({
'icon': '',
'text': f"成功率 {success_rate:.1f}%,鏈路健康度高",
})
return insights
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:報表組裝
# ═══════════════════════════════════════════════════════════════════════════════
def _format_report(
target_date: date,
summary: Dict[str, Any],
by_provider: List[Dict[str, Any]],
top_callers: List[Dict[str, Any]],
costs: List[Dict[str, Any]],
trends: Dict[str, Any],
budgets: Dict[str, Any],
cache_stats: Dict[str, Any],
alerts: List[Dict[str, str]],
insights: List[Dict[str, str]],
) -> str:
"""組裝完整 HTML 報表。所有 caller/model 字串均經 _esc。"""
weekday_zh = ['週一', '週二', '週三', '週四', '週五', '週六', '週日'][target_date.weekday()]
now_str = datetime.now(_TAIPEI_TZ).strftime('%H:%M:%S')
lines: List[str] = []
# Header
lines.append(f"📊 <b>LLM Token 日報 {target_date.isoformat()} ({weekday_zh})</b>")
lines.append("═══════════════════════════════════════")
lines.append(f"⏰ 統計區間00:00 ~ 23:59 (UTC+8)")
lines.append(f"🔄 報表生成:{now_str} | 涵蓋筆數:{summary['total_calls']:,} calls")
# Section 1
lines.append("")
lines.append("━━━━━ <b>【1】今日總覽 TL;DR</b> ━━━━━")
wow_sign = "+" if summary['wow_pct'] >= 0 else ""
lines.append(f"🪙 總 Token <b>{summary['total_tokens']:,}</b> ({wow_sign}{summary['wow_pct']:.1f}% vs 昨日)")
lines.append(f"💰 總成本: <b>US$ {summary['total_cost_usd']:.2f}</b>")
lines.append(f"⚡ 平均延遲: {summary['avg_duration_ms']:.0f} ms")
lines.append(f"✅ 成功率: {summary['success_rate']:.1f}% ({summary['failed_calls']} 失敗 / {summary['total_calls']})")
ollama_check = "" if summary['ollama_pct'] >= _OLLAMA_FIRST_TARGET_PCT else "⚠️"
lines.append(f"🎯 Ollama 占比:{summary['ollama_pct']:.1f}% {ollama_check}")
# Section 2
lines.append("")
lines.append("━━━━━ <b>【2】供應商分布</b> ━━━━━")
for p in by_provider:
icon, name = _PROVIDER_DISPLAY[p['provider']]
if p['calls'] == 0:
continue # 0 筆者跳過避免雜訊
lines.append(
f"{icon} {_pad(name, 14)} "
f"{_fmt_kb(p['tokens']):>8} ({p['pct']:5.1f}%) "
f"{p['calls']:>5} calls "
f"${p['cost_usd']:6.2f} "
f"{p['avg_duration_ms']:5.0f}ms"
)
# Section 3
lines.append("")
lines.append(f"━━━━━ <b>【3】呼叫點 TOP {len(top_callers)} (按 Token)</b> ━━━━━")
medals = ['🥇', '🥈', '🥉']
for i, c in enumerate(top_callers):
rank = medals[i] if i < 3 else f" {i+1}"
flag = ""
if c.get('delta_pct') is not None:
d = c['delta_pct']
if d >= 40: flag = f" ⚠️ {d:+.0f}%"
elif d <= -50: flag = f" 🎉 {d:+.0f}%"
lines.append(
f"{rank} <code>{_esc(c['caller'])}</code>"
f" / {_esc(c['provider'])} / {_esc(c['model'])[:24]}"
)
lines.append(f" {_fmt_kb(c['tokens']):>8} | {c['calls']:>5} calls{flag}")
# Section 4
lines.append("")
lines.append("━━━━━ <b>【4】成本分析 + 預算對比</b> ━━━━━")
lines.append(_budget_line("📅 本日成本", budgets['daily_spent'], budgets['daily_budget']))
lines.append(_budget_line("📅 本週累計", budgets['weekly_spent'], budgets['weekly_budget']))
lines.append(_budget_line("📅 本月累計", budgets['monthly_spent'], budgets['monthly_budget']))
if costs:
lines.append("")
lines.append("<b>成本拆解 by Model:</b>")
for c in costs[:6]:
lines.append(f" {_esc(c['model'])[:32]:<32} ${c['cost_usd']:7.4f} ({c['calls']} calls)")
# Cache 命中
lines.append("")
lines.append("<b>Prompt Cache 命中:</b>")
cc = cache_stats.get('claude', {})
if cc.get('total'):
lines.append(f" Claude: {cc['hits']:>4} / {cc['total']:<4} ({cc['pct']:5.1f}%)")
else:
lines.append(" Claude: N/A")
gc = cache_stats.get('gemini', {})
if gc.get('total'):
lines.append(f" Gemini: {gc['hits']:>4} / {gc['total']:<4} ({gc['pct']:5.1f}%)")
else:
lines.append(" Gemini: N/A")
# Section 5
lines.append("")
lines.append("━━━━━ <b>【5】趨勢與洞察 (vs 7 日均)</b> ━━━━━")
lines.append(_trend_line("總 Tokens", trends['today_total_tokens'], trends['7d_avg_total']))
lines.append(_trend_line("Gemini Tokens", trends['today_gemini_tokens'], trends['7d_avg_gemini']))
lines.append(_trend_line("Ollama Tokens", trends['today_ollama_tokens'], trends['7d_avg_ollama']))
lines.append(_trend_line("Claude Tokens", trends['today_claude_tokens'], trends['7d_avg_claude']))
lines.append(_trend_line("平均延遲(ms)", trends['today_avg_duration'], trends['7d_avg_duration'], unit=''))
lines.append("")
lines.append(f"📈 7 日累計:{_fmt_kb(trends['7d_total_tokens'])} tokens / US$ {trends['7d_total_cost']:.2f}")
# Section 6
lines.append("")
lines.append("━━━━━ <b>【6】告警與建議</b> ━━━━━")
if alerts:
for a in alerts:
lines.append(f"{a['icon']} <b>[{a['level']}]</b> {_esc(a['title'])}")
lines.append(f" 建議:{_esc(a['suggestion'])}")
else:
lines.append("✅ 無異常告警")
if insights:
lines.append("")
lines.append("<b>🔮 智能建議 (Hermes 規則引擎)</b>")
for ins in insights:
lines.append(f" {ins['icon']} {_esc(ins['text'])}")
# Footer
lines.append("")
lines.append("═══════════════════════════════════════")
lines.append("🤖 Operation Ollama-First v5.0 / token_report v1.0")
return "\n".join(lines)
def _format_failure_report(target_date: date, error: str) -> str:
"""DB 查詢失敗時的最簡訊息(仍保留 HTML escape"""
return (
f"⚠️ <b>LLM Token 日報生成失敗 ({target_date.isoformat()})</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"錯誤:<code>{_esc(error)[:300]}</code>\n"
f"請查 logs<code>docker logs momo-scheduler | grep TokenReport</code>"
)
def _persist_to_ai_insights(target_date: date, content: str, send_result: Dict[str, Any]) -> None:
"""寫一筆 ai_insightstype='daily_token_report'metadata 不含 PII。"""
from sqlalchemy import text
from database.manager import get_session
import json as _json
meta = {
'target_date': target_date.isoformat(),
'sent': int(send_result.get('sent', 0)),
'failed': int(send_result.get('failed', 0)),
'chars': int(send_result.get('chars', 0)),
# 注意:絕不存 username / first_name / chat_id
}
session = get_session()
try:
session.execute(text("""
INSERT INTO ai_insights (
insight_type, period, content, metadata_json,
avg_quality, status, decay_exempt, ai_model,
created_by, created_at, updated_at
) VALUES (
'daily_token_report', :period, :content, :meta,
0.9, 'approved', TRUE, 'rule_engine',
'token_report_service', NOW(), NOW()
)
"""), {
'period': target_date.isoformat(),
'content': content[:8000], # ai_insights.content 為 TEXT仍設上限保險
'meta': _json.dumps(meta, ensure_ascii=False),
})
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# 內部:格式化工具
# ═══════════════════════════════════════════════════════════════════════════════
def _esc(s: Any) -> str:
"""HTML escape對齊 telegram_templates._html_escape 行為。"""
text = "" if s is None else str(s)
return (text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;"))
def _pad(s: str, width: int) -> str:
"""中文寬字元 padding中文字以 2 寬度計)。"""
visible = sum(2 if ord(c) > 127 else 1 for c in s)
return s + " " * max(0, width - visible)
def _fmt_kb(tokens: int) -> str:
"""token 數 → 1.2K / 3.4M 顯示。"""
n = int(tokens or 0)
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.0f}K"
return f"{n}"
def _budget_line(label: str, spent: float, budget: float) -> str:
"""產出單列預算進度條10 格條)。"""
if budget <= 0:
return f"{label} US$ {spent:6.2f} ({_pad('未設定預算', 10)})"
pct = min(100.0, spent / budget * 100.0)
filled = int(pct / 10)
bar = "" * filled + "" * (10 - filled)
return f"{label} US$ {spent:6.2f} {bar} {pct:3.0f}% / ${budget:.0f} 預算"
def _trend_line(label: str, today: float, baseline: float, unit: str = '') -> str:
"""產出單列趨勢比較。"""
today_n = float(today or 0)
base_n = float(baseline or 0)
if base_n > 0:
delta = (today_n - base_n) / base_n * 100.0
sign = "+" if delta >= 0 else ""
arrow = "" if delta >= 5 else ("" if delta <= -5 else "")
else:
delta = 0.0
sign = ""
arrow = ""
today_str = _fmt_kb(int(today_n)) if 'Tokens' in label else f"{today_n:,.0f}{unit}"
base_str = _fmt_kb(int(base_n)) if 'Tokens' in label else f"{base_n:,.0f}{unit}"
return f" {_pad(label, 14)} {today_str:>8} vs {base_str:>8} ({sign}{delta:5.1f}%) {arrow}"

View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tests/test_ai_call_logger.py
ai_call_logger 單元測試 (Operation Ollama-First v5.0 — Phase 1)
測試紀律 (對應 phase1 spec):
- context manager 正常路徑status='ok'
- context manager 例外路徑status='error',例外仍 re-raise
- decorator 正常路徑 + auto token extract
- DB 失敗時主流程不爆
- cost 計算正確gemini-2.5-flash / 未知 model fallback / NIM 免費)
- 環境開關 AI_CALL_LOGGING_ENABLED=false 時跳過寫入
- kill-switch 連續失敗 ≥ 10 次降級
- PII 保護set_prompt_hash 只存前 12 碼
"""
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 隔離 import避免被 ai_call_logger 內部 lazy import 的 database.manager 拖到
import services.ai_call_logger as logger_mod
from services.ai_call_logger import (
COST_TABLE,
_calc_cost,
_CallState,
_is_logging_enabled,
_reset_kill_switch,
log_ai_call,
logged_ai_call,
)
# ─────────────────────────────────────────────────────────────────────────────
# Fixtures
# ─────────────────────────────────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def reset_state(monkeypatch):
"""每個測試前重置 kill-switch 並 stub 掉真實 DB 寫入。"""
_reset_kill_switch()
# stub _write_to_db把寫入內容收集到 list避免真連 DB
captured = []
def fake_write(state):
captured.append({
'caller': state.caller,
'provider': state.provider,
'model': state.model,
'input_tokens': state.input_tokens,
'output_tokens': state.output_tokens,
'duration_ms': state.duration_ms,
'status': state.status,
'fallback_to': state.fallback_to,
'cost_usd': _calc_cost(state.model, state.input_tokens, state.output_tokens),
'cache_hit': state.cache_hit,
'rag_hit': state.rag_hit,
'request_id': state.request_id,
'error': state.error,
'meta': dict(state.meta),
})
monkeypatch.setattr(logger_mod, '_write_to_db', fake_write)
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
# 把 captured 暴露給測試使用
yield captured
def _wait_for_async(captured, n=1, timeout=2.0):
"""等待 daemon thread 寫完。"""
deadline = time.time() + timeout
while time.time() < deadline:
if len(captured) >= n:
return True
time.sleep(0.01)
return False
# ─────────────────────────────────────────────────────────────────────────────
# context manager 測試
# ─────────────────────────────────────────────────────────────────────────────
def test_context_manager_happy_path(reset_state):
captured = reset_state
with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx:
ctx.set_tokens(input=120, output=80)
ctx.set_cache_hit(False)
assert _wait_for_async(captured, 1), "async write 未完成"
assert len(captured) == 1
rec = captured[0]
assert rec['caller'] == 'hermes_analyst'
assert rec['provider'] == 'gcp_ollama'
assert rec['model'] == 'hermes3:latest'
assert rec['input_tokens'] == 120
assert rec['output_tokens'] == 80
assert rec['status'] == 'ok'
assert rec['error'] is None
assert rec['duration_ms'] is not None and rec['duration_ms'] >= 0
def test_context_manager_exception_path(reset_state):
captured = reset_state
with pytest.raises(ValueError, match="boom"):
with log_ai_call('nemotron_dispatch', 'nim', 'meta/llama-3.1-8b-instruct'):
raise ValueError("boom")
assert _wait_for_async(captured, 1)
rec = captured[0]
assert rec['status'] == 'error'
assert rec['error'] is not None
assert 'ValueError' in rec['error']
assert 'boom' in rec['error']
def test_context_manager_explicit_fallback(reset_state):
captured = reset_state
with log_ai_call('openclaw_qa', 'gemini', 'gemini-2.5-flash') as ctx:
ctx.fallback_to_caller('openclaw_bot_nim')
assert _wait_for_async(captured, 1)
rec = captured[0]
assert rec['status'] == 'fallback'
assert rec['fallback_to'] == 'openclaw_bot_nim'
def test_context_manager_set_error_without_raise(reset_state):
"""caller 主動 set_error 但不 raise例如 LLM 回 success=false"""
captured = reset_state
with log_ai_call('sales_copy', 'gcp_ollama', 'llama3.1:8b') as ctx:
ctx.set_error('timeout after 30s')
ctx.set_tokens(input=50, output=0)
assert _wait_for_async(captured, 1)
rec = captured[0]
assert rec['status'] == 'error'
assert 'timeout' in rec['error']
# ─────────────────────────────────────────────────────────────────────────────
# decorator 測試
# ─────────────────────────────────────────────────────────────────────────────
def test_decorator_happy_path(reset_state):
captured = reset_state
@logged_ai_call(caller='trend_match', provider='gcp_ollama', model='llama3.1:8b')
def fake_call(prompt: str):
return {'response': 'ok', 'eval_count': 42, 'prompt_eval_count': 100}
out = fake_call("hello")
assert out['response'] == 'ok'
assert _wait_for_async(captured, 1)
rec = captured[0]
assert rec['caller'] == 'trend_match'
assert rec['model'] == 'llama3.1:8b'
assert rec['input_tokens'] == 100
assert rec['output_tokens'] == 42
assert rec['status'] == 'ok'
def test_decorator_with_model_extractor(reset_state):
captured = reset_state
@logged_ai_call(
caller='ppt_gemini',
provider='gemini',
model_extractor=lambda args, kw: kw.get('model', 'gemini-2.0-flash'),
)
def fake_call(*, model: str, prompt: str):
return {'usage': {'prompt_tokens': 200, 'completion_tokens': 50}}
fake_call(model='gemini-2.5-flash', prompt='x')
assert _wait_for_async(captured, 1)
rec = captured[0]
assert rec['model'] == 'gemini-2.5-flash'
assert rec['input_tokens'] == 200
assert rec['output_tokens'] == 50
def test_decorator_exception_does_reraise(reset_state):
captured = reset_state
@logged_ai_call(caller='code_review_hermes', provider='gcp_ollama', model='hermes3:latest')
def fake_call():
raise RuntimeError("net down")
with pytest.raises(RuntimeError, match="net down"):
fake_call()
assert _wait_for_async(captured, 1)
assert captured[0]['status'] == 'error'
# ─────────────────────────────────────────────────────────────────────────────
# DB 失敗不爆主流程
# ─────────────────────────────────────────────────────────────────────────────
def test_db_failure_does_not_break_main_flow(monkeypatch, caplog):
"""驗證 _write_to_db 實際碰到 DB 失敗時,例外不會冒到主流程。
直接同步呼叫真實 _write_to_db已含 try/except不開 thread避免噪音。
"""
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
# 把 daemon thread 換成同步呼叫,讓我們直接觀察 _write_to_db 行為
class SyncThread:
def __init__(self, target=None, args=(), kwargs=None, **_):
self._target = target
self._args = args
self._kwargs = kwargs or {}
def start(self):
self._target(*self._args, **self._kwargs)
monkeypatch.setattr(logger_mod.threading, 'Thread', SyncThread)
# autouse fixture 已 stub _write_to_db這裡覆寫成「真實會失敗的版本」
def real_write_that_fails(state):
try:
raise ImportError("simulated DB unavailable")
except Exception as e:
logger_mod._record_failure()
logger_mod.logger.warning(
"[AICallLogger] write failed (caller=%s provider=%s): %s",
state.caller, state.provider, e,
)
monkeypatch.setattr(logger_mod, '_write_to_db', real_write_that_fails)
# 主流程不應 raise。
with caplog.at_level('WARNING'):
with log_ai_call('hermes_intent', 'gcp_ollama', 'hermes3:latest') as ctx:
ctx.set_tokens(input=10, output=5)
# 至少有一條 [AICallLogger] write failed warningcaller 已 catch
assert any('write failed' in r.message for r in caplog.records), \
"預期 _write_to_db 失敗時 log warning"
def test_async_dispatch_failure_swallowed(monkeypatch):
"""模擬 thread.start() 失敗(極端 case主流程也不能爆。"""
class BadThread:
def __init__(self, *a, **kw):
raise OSError("can't fork")
monkeypatch.setattr(logger_mod.threading, 'Thread', BadThread)
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
# 不應 raise
with log_ai_call('x', 'y', 'z'):
pass
# ─────────────────────────────────────────────────────────────────────────────
# cost 計算
# ─────────────────────────────────────────────────────────────────────────────
def test_calc_cost_gemini_flash():
"""gemini-2.5-flash 1M in + 100K out = $0.075 + $0.030 = $0.105"""
cost = _calc_cost('gemini-2.5-flash', 1_000_000, 100_000)
assert cost == pytest.approx(0.105, rel=1e-6)
def test_calc_cost_claude_opus():
"""claude-opus-4-7 1K in + 1K out = $0.015 + $0.075 = $0.090 / 1000 = $0.00009"""
cost = _calc_cost('claude-opus-4-7', 1000, 1000)
expected = (1000 * 15.0 + 1000 * 75.0) / 1_000_000
assert cost == pytest.approx(expected, rel=1e-6)
def test_calc_cost_ollama_zero():
assert _calc_cost('hermes3:latest', 100_000, 100_000) == 0.0
assert _calc_cost('llama3.1:8b', 999_999, 999_999) == 0.0
def test_calc_cost_unknown_model_returns_zero(caplog):
with caplog.at_level('WARNING'):
cost = _calc_cost('totally-fake-model-xyz', 1_000_000, 1_000_000)
assert cost == 0.0
assert any('unknown model cost' in r.message for r in caplog.records)
def test_calc_cost_nim_prefix_silent_zero(caplog):
"""nvidia/* meta/* deepseek-* 不應觸發 unknown warning。"""
with caplog.at_level('WARNING'):
cost = _calc_cost('nvidia/some-future-model', 1_000_000, 1_000_000)
assert cost == 0.0
assert not any('unknown model cost' in r.message for r in caplog.records)
def test_calc_cost_negative_or_none_safe():
assert _calc_cost('gemini-2.5-flash', None, None) == 0.0
assert _calc_cost('', 100, 100) == 0.0
assert _calc_cost('gemini-2.5-flash', -1, -5) == 0.0
# ─────────────────────────────────────────────────────────────────────────────
# 環境開關
# ─────────────────────────────────────────────────────────────────────────────
def test_logging_disabled_skips_write(monkeypatch):
captured = []
def fake_write(state):
captured.append(state)
monkeypatch.setattr(logger_mod, '_write_to_db', fake_write)
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'false')
with log_ai_call('sales_copy', 'gcp_ollama', 'llama3.1:8b') as ctx:
ctx.set_tokens(input=10, output=10)
time.sleep(0.05)
assert len(captured) == 0, "AI_CALL_LOGGING_ENABLED=false 時不應寫入"
def test_logging_enabled_default_true(monkeypatch):
monkeypatch.delenv('AI_CALL_LOGGING_ENABLED', raising=False)
assert _is_logging_enabled() is True
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', '0')
assert _is_logging_enabled() is False
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'OFF')
assert _is_logging_enabled() is False
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
assert _is_logging_enabled() is True
# ─────────────────────────────────────────────────────────────────────────────
# Kill-switch
# ─────────────────────────────────────────────────────────────────────────────
def test_kill_switch_after_consecutive_failures(monkeypatch, caplog):
"""連續失敗 >= 10 次後降級為 logger.info。"""
_reset_kill_switch()
# 真實 _write_to_db 會 catch 例外然後 _record_failure這裡直接模擬
monkeypatch.setenv('AI_CALL_LOGGING_ENABLED', 'true')
# 強制觸發 10 次失敗
for _ in range(10):
logger_mod._record_failure()
assert logger_mod._is_killed() is True
# 之後再 _async_write 應該不會啟動新 thread看是否走 logger.info 分支)
captured_threads = []
class TrackingThread:
def __init__(self, *a, **kw):
captured_threads.append(kw.get('target'))
def start(self):
pass
monkeypatch.setattr(logger_mod.threading, 'Thread', TrackingThread)
with log_ai_call('x', 'y', 'z'):
pass
time.sleep(0.05)
assert len(captured_threads) == 0, "kill-switch 啟動後不應再開新 thread"
def test_record_success_resets_failure_counter():
_reset_kill_switch()
for _ in range(5):
logger_mod._record_failure()
assert logger_mod._failure_state['count'] == 5
logger_mod._record_success()
assert logger_mod._failure_state['count'] == 0
# ─────────────────────────────────────────────────────────────────────────────
# PII 保護
# ─────────────────────────────────────────────────────────────────────────────
def test_set_prompt_hash_truncates_to_12():
state = _CallState('a', 'b', 'c', None, {})
state.set_prompt_hash('Hello world some sensitive PII content here')
assert 'prompt_hash' in state.meta
assert len(state.meta['prompt_hash']) == 12
# 確認不是原文
assert 'Hello' not in state.meta['prompt_hash']
def test_meta_does_not_leak_raw_prompt_into_call_state():
"""log_ai_call 介面不接受原始 prompt 欄位(只能透過 set_prompt_hash 進去)。"""
with log_ai_call('x', 'y', 'z', meta={'temperature': 0.3}) as ctx:
ctx.set_prompt_hash("super secret user prompt 123")
assert 'prompt_hash' in ctx.meta
assert ctx.meta['temperature'] == 0.3
# meta 中不應有 'prompt' key除非 caller 自己加)
assert 'prompt' not in ctx.meta
# ─────────────────────────────────────────────────────────────────────────────
# 雜項cost table 鍵值完整性
# ─────────────────────────────────────────────────────────────────────────────
def test_cost_table_contains_critical_models():
"""phase0 audit 列舉的關鍵模型必須在表內。"""
critical = [
'gemini-2.5-flash',
'gemini-2.0-flash',
'meta/llama-3.1-8b-instruct',
'hermes3:latest',
'qwen2.5-coder:7b',
'llama3.1:8b',
'bge-m3:latest',
]
for m in critical:
assert m in COST_TABLE, f"COST_TABLE missing {m}"

View File

@@ -0,0 +1,526 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tests/test_token_report_service.py
LLM Token 日報服務單元測試 (Operation Ollama-First v5.0 — Phase 1 收尾)
測試紀律:
- 不真連 DBmock _exec_query 返回固定資料
- 不真連 Telegrammock send_telegram_with_result
- 不真寫 ai_insightsmock _persist_to_ai_insights
- 7 個告警規則各自獨立觸發測試
- HTML escape 驗證caller 名含 < / & 不破版)
- 訊息字數 ≤ 4096 驗證
"""
from __future__ import annotations
import os
import sys
from datetime import date, datetime, timedelta, timezone
from typing import Any, Dict, List
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import services.token_report_service as svc
# ─────────────────────────────────────────────────────────────────────────────
# 共用 fixtures
# ─────────────────────────────────────────────────────────────────────────────
TARGET_DATE = date(2026, 5, 3)
def _make_summary(**overrides) -> Dict[str, Any]:
base = {
'total_tokens': 3_142_891,
'total_calls': 2_847,
'total_cost_usd': 0.36,
'avg_duration_ms': 1847.0,
'success_rate': 98.7,
'failed_calls': 37,
'ollama_pct': 64.3,
'prev_total_tokens': 2_905_000,
'wow_pct': 8.2,
}
base.update(overrides)
return base
def _make_by_provider(**overrides) -> List[Dict[str, Any]]:
"""7 個 provider 的預設配置,可用 overrides={'gemini': {'pct': 50}} 覆寫"""
defaults = {
'gcp_ollama': {'tokens': 2_021_000, 'pct': 64.3, 'calls': 2103, 'cost_usd': 0.0, 'avg_duration_ms': 1200},
'ollama_111': {'tokens': 12_000, 'pct': 0.4, 'calls': 18, 'cost_usd': 0.0, 'avg_duration_ms': 2400},
'gemini': {'tokens': 892_000, 'pct': 28.4, 'calls': 589, 'cost_usd': 0.31, 'avg_duration_ms': 2100},
'claude': {'tokens': 178_000, 'pct': 5.7, 'calls': 98, 'cost_usd': 0.04, 'avg_duration_ms': 3200},
'nim': {'tokens': 28_000, 'pct': 0.9, 'calls': 24, 'cost_usd': 0.0, 'avg_duration_ms': 1800},
'openrouter': {'tokens': 12_000, 'pct': 0.4, 'calls': 15, 'cost_usd': 0.01, 'avg_duration_ms': 2900},
'nim_via_elephant': {'tokens': 27_000, 'pct': 0.9, 'calls': 12, 'cost_usd': 0.0, 'avg_duration_ms': 3100},
}
for k, v in (overrides or {}).items():
defaults.setdefault(k, {}).update(v)
return [{'provider': k, **v} for k, v in defaults.items()]
def _make_top_callers() -> List[Dict[str, Any]]:
return [
{'caller': 'km_embedding_worker', 'provider': 'gcp_ollama',
'model': 'bge-m3:latest', 'tokens': 892_000, 'calls': 1247, 'delta_pct': 5.0},
{'caller': 'hermes_analyst', 'provider': 'gcp_ollama',
'model': 'hermes3:latest', 'tokens': 482_000, 'calls': 72, 'delta_pct': -2.0},
{'caller': 'code_review_hermes', 'provider': 'claude',
'model': 'claude-opus-4-7', 'tokens': 158_000, 'calls': 8, 'delta_pct': 42.0},
]
def _make_trends() -> Dict[str, Any]:
return {
'today_total_tokens': 3_142_000,
'today_gemini_tokens': 892_000,
'today_ollama_tokens': 2_033_000,
'today_claude_tokens': 178_000,
'today_avg_duration': 1847.0,
'today_error_rate': 1.3,
'today_gcp_hit_pct': 99.6,
'7d_avg_total': 2_905_000,
'7d_avg_gemini': 948_000,
'7d_avg_ollama': 1_712_000,
'7d_avg_claude': 165_000,
'7d_avg_duration': 1920.0,
'7d_error_rate': 1.8,
'7d_total_tokens': 18_832_000,
'7d_total_cost': 11.84,
'7d_gcp_hit_pct_7d': 98.9,
'7d_gcp_hit_pct': 98.9,
}
def _make_budgets(**overrides) -> Dict[str, Any]:
base = {
'daily_spent': 0.36,
'weekly_spent': 1.92,
'monthly_spent': 5.84,
'daily_budget': 1.00,
'weekly_budget': 5.00,
'monthly_budget': 20.00,
}
base.update(overrides)
return base
def _make_cache_stats(**overrides) -> Dict[str, Any]:
base = {
'claude': {'total': 98, 'hits': 62, 'pct': 63.3},
'gemini': {'total': 0, 'hits': 0, 'pct': 0.0},
}
base.update(overrides)
return base
# ─────────────────────────────────────────────────────────────────────────────
# 1. 報表組裝測試 — generate_daily_report 路徑
# ─────────────────────────────────────────────────────────────────────────────
class TestReportFormat:
"""測 _format_report 主要章節都出現 & 字數合理。"""
def test_format_report_contains_all_six_sections(self):
"""6 個段落標題都應出現。"""
out = svc._format_report(
target_date=TARGET_DATE,
summary=_make_summary(),
by_provider=_make_by_provider(),
top_callers=_make_top_callers(),
costs=[{'provider': 'gemini', 'model': 'gemini-2.5-flash', 'cost_usd': 0.26, 'calls': 50}],
trends=_make_trends(),
budgets=_make_budgets(),
cache_stats=_make_cache_stats(),
alerts=[],
insights=[{'icon': '', 'text': 'Ollama-First 達標'}],
)
assert '【1】今日總覽' in out
assert '【2】供應商分布' in out
assert '【3】呼叫點 TOP' in out
assert '【4】成本分析' in out
assert '【5】趨勢與洞察' in out
assert '【6】告警與建議' in out
def test_format_report_under_telegram_limit(self):
"""完整報表(含 10 個 caller / 12 個成本項 / 多個告警)不應超過 4096 字元。"""
big_callers = _make_top_callers() * 4 # 12 筆
big_costs = [{'provider': 'p', 'model': f'model-{i}', 'cost_usd': 0.01, 'calls': 1}
for i in range(12)]
big_alerts = [
{'level': 'P1', 'icon': '🔴', 'title': 'X' * 80, 'suggestion': 'Y' * 80}
for _ in range(5)
]
out = svc._format_report(
target_date=TARGET_DATE,
summary=_make_summary(),
by_provider=_make_by_provider(),
top_callers=big_callers[:10],
costs=big_costs,
trends=_make_trends(),
budgets=_make_budgets(),
cache_stats=_make_cache_stats(),
alerts=big_alerts,
insights=[],
)
# send_daily_report 端會做 4000 字截斷HTML 安全),單元測試先確認原始長度可控
assert len(out) < 6000, f"原始報表 {len(out)} 字元,可能需縮減欄位寬度"
def test_format_report_html_escape_caller_name(self):
"""caller 名含 <script> 不應原樣輸出(防 HTML 注入)。"""
nasty_callers = [{
'caller': 'evil<script>',
'provider': 'gcp_ollama',
'model': 'a&b<c>',
'tokens': 100,
'calls': 1,
'delta_pct': None,
}]
out = svc._format_report(
target_date=TARGET_DATE,
summary=_make_summary(),
by_provider=_make_by_provider(),
top_callers=nasty_callers,
costs=[],
trends=_make_trends(),
budgets=_make_budgets(),
cache_stats=_make_cache_stats(),
alerts=[],
insights=[],
)
assert '<script>' not in out, "caller 含 <script> 必須被 escape"
assert '&lt;script&gt;' in out
assert '&amp;' in out
def test_failure_report_html_safe(self):
"""DB 失敗時的 fallback 訊息不應洩漏 stack trace 且 HTML 安全。"""
out = svc._format_failure_report(TARGET_DATE, 'DB error: <a href="x">x</a>')
assert '日報生成失敗' in out
assert '&lt;a href' in out # < 已被 escape
# ─────────────────────────────────────────────────────────────────────────────
# 2. 告警規則測試 — _detect_alerts 7 條規則
# ─────────────────────────────────────────────────────────────────────────────
class TestAlertRules:
"""每條告警規則一個獨立測試,確保都會觸發。"""
def test_rule1_caller_token_spike(self):
"""R1: 單一 caller 暴增 ≥ +40% (factor=1.4)"""
callers = [{'caller': 'code_review_hermes', 'provider': 'claude',
'model': 'claude-opus-4-7', 'tokens': 158_000,
'calls': 8, 'delta_pct': 42.0}]
alerts = svc._detect_alerts(_make_summary(), _make_by_provider(),
callers, _make_trends(),
_make_budgets(), _make_cache_stats())
assert any('暴增' in a['title'] and a['level'] == 'P2' for a in alerts), \
f"R1 未觸發alerts={alerts}"
def test_rule2_gemini_share_too_high(self):
"""R2: Gemini 占比 > 35% → 「Ollama-First 失守」"""
prov = _make_by_provider()
for p in prov:
if p['provider'] == 'gemini':
p['pct'] = 50.0
alerts = svc._detect_alerts(_make_summary(), prov, [], _make_trends(),
_make_budgets(), _make_cache_stats())
assert any('Gemini 占比' in a['title'] for a in alerts), \
f"R2 未觸發alerts={alerts}"
def test_rule3_error_rate_critical(self):
"""R3: 全域失敗率 > 5% → P1"""
summary = _make_summary(failed_calls=300, total_calls=2000) # 15%
alerts = svc._detect_alerts(summary, _make_by_provider(), [],
_make_trends(), _make_budgets(), _make_cache_stats())
p1 = [a for a in alerts if a['level'] == 'P1' and '失敗率' in a['title']]
assert p1, f"R3 未觸發alerts={alerts}"
def test_rule4_budget_overrun(self):
"""R4: 月成本達 80% 預算 → P1"""
budgets = _make_budgets(monthly_spent=18.0, monthly_budget=20.0) # 90%
alerts = svc._detect_alerts(_make_summary(), _make_by_provider(), [],
_make_trends(), budgets, _make_cache_stats())
assert any('月成本' in a['title'] and a['level'] == 'P1' for a in alerts), \
f"R4 未觸發alerts={alerts}"
def test_rule5_gcp_hit_low(self):
"""R5: GCP Ollama 命中率 < 90% → P2 (但需有 Ollama 流量)"""
trends = _make_trends()
trends['today_gcp_hit_pct'] = 70.0
alerts = svc._detect_alerts(_make_summary(), _make_by_provider(), [],
trends, _make_budgets(), _make_cache_stats())
assert any('GCP Ollama 命中率' in a['title'] for a in alerts), \
f"R5 未觸發alerts={alerts}"
def test_rule6_claude_cache_low(self):
"""R6: Claude cache 命中率 < 40% (≥10 calls 才檢查) → INFO"""
cache = _make_cache_stats(claude={'total': 100, 'hits': 20, 'pct': 20.0})
alerts = svc._detect_alerts(_make_summary(), _make_by_provider(), [],
_make_trends(), _make_budgets(), cache)
assert any('Claude prompt cache' in a['title'] for a in alerts), \
f"R6 未觸發alerts={alerts}"
def test_rule6_claude_cache_low_skipped_when_few_calls(self):
"""R6 邊界:< 10 calls 時不應觸發告警(樣本不足)"""
cache = _make_cache_stats(claude={'total': 5, 'hits': 0, 'pct': 0.0})
alerts = svc._detect_alerts(_make_summary(), _make_by_provider(), [],
_make_trends(), _make_budgets(), cache)
cache_alerts = [a for a in alerts if 'Claude prompt cache' in a['title']]
assert not cache_alerts, "樣本不足時不應告警"
def test_no_alerts_when_healthy(self):
"""健康狀態下應無 P1/P2 告警。"""
alerts = svc._detect_alerts(_make_summary(), _make_by_provider(),
_make_top_callers()[:2], # 不含 +42% spike
_make_trends(), _make_budgets(),
_make_cache_stats())
critical = [a for a in alerts if a['level'] in ('P1', 'P2')]
assert not critical, f"健康狀態不應有 P1/P2 告警;得到:{critical}"
# ─────────────────────────────────────────────────────────────────────────────
# 3. 智能建議測試 — _generate_insights
# ─────────────────────────────────────────────────────────────────────────────
class TestInsights:
def test_ollama_first_target_met(self):
"""Ollama 占比 ≥ 60% → 應含「達標」建議。"""
insights = svc._generate_insights(TARGET_DATE,
_make_summary(ollama_pct=64.3),
_make_by_provider())
assert any('達標' in i['text'] for i in insights)
def test_ollama_first_target_missed(self):
"""Ollama 占比 < 60% → 應含「未達」建議。"""
insights = svc._generate_insights(TARGET_DATE,
_make_summary(ollama_pct=45.0),
_make_by_provider())
assert any('未達' in i['text'] for i in insights)
def test_nim_low_usage_suggestion(self):
"""NIM 用量 < 100K 時應建議下線 NIM。"""
prov = _make_by_provider()
for p in prov:
if p['provider'] in ('nim', 'nim_via_elephant'):
p['tokens'] = 5000
insights = svc._generate_insights(TARGET_DATE, _make_summary(), prov)
assert any('NIM 用量' in i['text'] for i in insights)
# ─────────────────────────────────────────────────────────────────────────────
# 4. SQL 查詢測試 — mock _exec_query 驗證 SQL 結構正確
# ─────────────────────────────────────────────────────────────────────────────
class TestQueriesViaMock:
"""mock _exec_query 確認查詢函數呼叫順序與參數正確。"""
def test_query_summary_calls_two_windows(self, monkeypatch):
"""_query_summary 應分別查今日 + 昨日(共 2 次 SQL"""
captured: List[Dict] = []
def fake_exec(sql, params):
captured.append({'sql_head': sql.strip().split('\n')[0],
'params': dict(params)})
# 第 1 次回今日資料;第 2 次回昨日資料
if 'COUNT(*)' in sql:
return [{'total_tokens': 100_000, 'total_calls': 50,
'total_cost_usd': 0.5, 'avg_duration_ms': 1500,
'ok_calls': 49, 'ollama_tokens': 70_000}]
return [{'prev_total_tokens': 90_000}]
monkeypatch.setattr(svc, '_exec_query', fake_exec)
result = svc._query_summary(TARGET_DATE)
assert len(captured) == 2
# 第二次查詢的 end 應等於第一次的 start昨日窗
assert captured[1]['params']['end'] == captured[0]['params']['start']
assert result['total_tokens'] == 100_000
assert result['ollama_pct'] == pytest.approx(70.0, rel=0.01)
assert result['success_rate'] == pytest.approx(98.0, rel=0.01)
assert result['failed_calls'] == 1
assert result['wow_pct'] == pytest.approx(11.11, rel=0.01)
def test_query_by_provider_returns_all_eight_providers(self, monkeypatch):
"""即使只有 1 個 provider 有資料,也要回傳 8 個 provider0 占位)。
critic-A11 B4 修補:補 ollama_secondary 後從 7 → 8 個(三主機架構一致性)。
"""
def fake_exec(sql, params):
return [{'provider': 'gcp_ollama', 'tokens': 1000, 'calls': 5,
'cost_usd': 0.0, 'avg_duration_ms': 1000}]
monkeypatch.setattr(svc, '_exec_query', fake_exec)
result = svc._query_by_provider(TARGET_DATE)
assert len(result) == 8
gcp = next(r for r in result if r['provider'] == 'gcp_ollama')
assert gcp['tokens'] == 1000
secondary = next(r for r in result if r['provider'] == 'ollama_secondary')
assert secondary['tokens'] == 0 # 沒資料應給 0
gemini = next(r for r in result if r['provider'] == 'gemini')
assert gemini['tokens'] == 0 # 沒資料應給 0
def test_query_top_callers_orders_by_tokens(self, monkeypatch):
def fake_exec(sql, params):
return [
{'caller': 'a', 'provider': 'gcp_ollama', 'top_model': 'm1',
'tokens': 500, 'calls': 5, 'avg_tokens_7d': 400},
{'caller': 'b', 'provider': 'gemini', 'top_model': 'm2',
'tokens': 200, 'calls': 2, 'avg_tokens_7d': 0},
]
monkeypatch.setattr(svc, '_exec_query', fake_exec)
result = svc._query_top_callers(TARGET_DATE, limit=10)
assert len(result) == 2
assert result[0]['caller'] == 'a'
# delta = (500-400)/400 = 25%
assert result[0]['delta_pct'] == pytest.approx(25.0, rel=0.01)
# baseline=0 → delta_pct=None避免除 0
assert result[1]['delta_pct'] is None
def test_query_cost_breakdown_filters_zero_cost(self, monkeypatch):
"""Ollama 等成本 0 的 model 不應出現在拆解中。"""
captured = []
def fake_exec(sql, params):
captured.append(sql)
return []
monkeypatch.setattr(svc, '_exec_query', fake_exec)
svc._query_cost_breakdown(TARGET_DATE)
assert 'cost_usd > 0' in captured[0]
# ─────────────────────────────────────────────────────────────────────────────
# 5. send_daily_report 整合 — mock 整條鏈
# ─────────────────────────────────────────────────────────────────────────────
class TestSendDailyReport:
def test_send_happy_path(self, monkeypatch):
"""整條鏈走通generate → send → persist 都被呼叫。"""
monkeypatch.setattr(svc, 'generate_daily_report', lambda d: '<b>OK</b>')
sent_calls = []
def fake_send(text, **kwargs):
sent_calls.append({'text': text, 'kwargs': kwargs})
return {'ok': True, 'sent': 1, 'failed': 0, 'chat_ids': [-1], 'errors': []}
# mock telegram_templates.send_telegram_with_result
import services.telegram_templates as tg
monkeypatch.setattr(tg, 'send_telegram_with_result', fake_send)
persist_calls = []
monkeypatch.setattr(svc, '_persist_to_ai_insights',
lambda d, c, r: persist_calls.append((d, c, r)))
result = svc.send_daily_report(TARGET_DATE)
assert result['ok'] is True
assert result['sent'] == 1
assert len(sent_calls) == 1
assert sent_calls[0]['kwargs'].get('parse_mode') == 'HTML'
assert len(persist_calls) == 1
assert persist_calls[0][0] == TARGET_DATE
def test_send_truncates_oversized_message(self, monkeypatch):
"""訊息 > 4000 應自動截斷並加省略尾。"""
big = 'X' * 5000
monkeypatch.setattr(svc, 'generate_daily_report', lambda d: big)
captured_text = []
def fake_send(text, **kwargs):
captured_text.append(text)
return {'ok': True, 'sent': 1, 'failed': 0, 'chat_ids': [], 'errors': []}
import services.telegram_templates as tg
monkeypatch.setattr(tg, 'send_telegram_with_result', fake_send)
monkeypatch.setattr(svc, '_persist_to_ai_insights', lambda *a, **k: None)
svc.send_daily_report(TARGET_DATE)
assert len(captured_text) == 1
assert len(captured_text[0]) <= svc._TELEGRAM_MAX_CHARS
assert '截斷' in captured_text[0]
def test_send_resilient_to_telegram_failure(self, monkeypatch):
"""Telegram 送失敗時 send_daily_report 仍應回 dict不爆"""
monkeypatch.setattr(svc, 'generate_daily_report', lambda d: 'msg')
def boom(text, **kwargs):
raise RuntimeError("network down")
import services.telegram_templates as tg
monkeypatch.setattr(tg, 'send_telegram_with_result', boom)
monkeypatch.setattr(svc, '_persist_to_ai_insights', lambda *a, **k: None)
result = svc.send_daily_report(TARGET_DATE)
assert result['ok'] is False
assert any('telegram' in e for e in result['errors'])
def test_generate_returns_failure_msg_when_db_dies(self, monkeypatch):
"""DB 例外時 generate_daily_report 應回 fallback 字串而不是丟 exception。"""
def boom(*a, **kw):
raise RuntimeError("DB connection refused")
monkeypatch.setattr(svc, '_query_summary', boom)
out = svc.generate_daily_report(TARGET_DATE)
assert '日報生成失敗' in out
assert '<code>' in out # fallback 訊息含 escape 過的錯誤
# ─────────────────────────────────────────────────────────────────────────────
# 6. telegram_templates.daily_token_report 包裝測試
# ─────────────────────────────────────────────────────────────────────────────
class TestTelegramTemplate:
def test_daily_token_report_appends_footer(self):
from services.telegram_templates import daily_token_report
out = daily_token_report("body", footer_url="http://x/y")
assert 'body' in out
assert 'http://x/y' in out
def test_daily_token_report_truncates_to_4096(self):
from services.telegram_templates import daily_token_report
big = 'A' * 5000
out = daily_token_report(big)
assert len(out) <= 4096
assert '截斷' in out
def test_daily_token_report_escapes_footer_url(self):
"""footer_url 含特殊字元應被 escape。"""
from services.telegram_templates import daily_token_report
out = daily_token_report("body", footer_url="http://x/?a=1&b=<2>")
assert '<2>' not in out # 應 escape
assert '&amp;' in out or '&lt;2&gt;' in out
# ─────────────────────────────────────────────────────────────────────────────
# 7. 格式化工具測試
# ─────────────────────────────────────────────────────────────────────────────
class TestFormatHelpers:
def test_fmt_kb(self):
assert svc._fmt_kb(0) == '0'
assert svc._fmt_kb(500) == '500'
assert svc._fmt_kb(1500) == '2K' # round
assert svc._fmt_kb(2_021_000) == '2.0M'
def test_esc_handles_none(self):
assert svc._esc(None) == ''
assert svc._esc('<a>') == '&lt;a&gt;'
assert svc._esc('a&b') == 'a&amp;b'
def test_budget_line_zero_budget(self):
line = svc._budget_line("📅 本日", 0.5, 0.0)
assert '未設定預算' in line
def test_trend_line_handles_zero_baseline(self):
line = svc._trend_line("X", 100.0, 0.0)
assert '' in line # 無基準應顯示「—」