#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ services/ai_call_logger.py 統一 LLM 呼叫遙測層 (Operation Ollama-First v5.0 — Phase 1) 依據: - docs/phase0_audit_report_20260503.md (34 個 LLM 呼叫點 / 11.8% 覆蓋率) - docs/phase1_db_design_20260503.md (ai_calls schema) - migrations/024_create_ai_calls_table.sql 設計原則 (憲法級): 1. 非阻塞: DB 寫入跑 daemon thread,主流程不等 2. 失敗安全: DB 例外只 log warning,絕不影響 LLM 主流程 3. PII 保護: meta 不存原始 prompt,只存 prompt_hash[:12] 4. Kill-switch: AI_CALL_LOGGING_ENABLED=false 一鍵關閉 5. 連續失敗 ≥ 10 次自動降級為純 logger.info 主入口: - log_ai_call(...) context manager (推薦) - logged_ai_call(...) decorator (簡單一行 LLM call) """ from __future__ import annotations import hashlib import logging import os import threading import time from contextlib import contextmanager from functools import wraps from typing import Any, Callable, Dict, Optional logger = logging.getLogger(__name__) # ───────────────────────────────────────────────────────────────────────────── # 成本表 (USD per 1M tokens) # 依據 phase0 audit + 各 provider 官方定價,Ollama 全部 0 # ───────────────────────────────────────────────────────────────────────────── COST_TABLE: Dict[str, Dict[str, float]] = { # Gemini 'gemini-2.5-flash': {'in': 0.075, 'out': 0.30}, 'gemini-2.5-pro': {'in': 1.25, 'out': 10.0}, 'gemini-2.0-flash': {'in': 0.075, 'out': 0.30}, 'gemini-1.5-flash': {'in': 0.075, 'out': 0.30}, # NVIDIA NIM (配額制,免費 tier 全 0) 'meta/llama-3.1-8b-instruct': {'in': 0.0, 'out': 0.0}, 'meta/llama-3.3-70b-instruct': {'in': 0.0, 'out': 0.0}, 'nvidia/llama-3.3-nemotron-super-49b-v1.5': {'in': 0.0, 'out': 0.0}, 'deepseek-ai/deepseek-v3.2': {'in': 0.0, 'out': 0.0}, # Claude (Anthropic) — 2026-05 市場價(USD per 1M tokens) 'claude-opus-4-7': {'in': 15.0, 'out': 75.0}, # 程式碼 #1,Arena Elo 1548 'claude-sonnet-4-6': {'in': 3.0, 'out': 15.0}, # agentic 平衡 'claude-haiku-4-5': {'in': 0.8, 'out': 4.0}, # 輕量快速 # Ollama 自架 (全 0;Phase 8/13 GCP 拉模型陸續加入) 'hermes3:latest': {'in': 0.0, 'out': 0.0}, 'qwen2.5-coder:7b': {'in': 0.0, 'out': 0.0}, 'qwen2.5-coder:32b': {'in': 0.0, 'out': 0.0}, # 2026-05-04 Primary+Secondary 已拉(19GB)AiderHeal 32B 'qwen2.5:7b-instruct': {'in': 0.0, 'out': 0.0}, # Phase 3 A7 OpenClaw Q&A 預設 'qwen3:14b': {'in': 0.0, 'out': 0.0}, # Phase 3 A9 Nemotron + A7 升級 'qwen2-vl:7b': {'in': 0.0, 'out': 0.0}, # Phase 13 PPT vision(Ollama registry 暫無,用 minicpm-v 代替) 'deepseek-r1:14b': {'in': 0.0, 'out': 0.0}, # 2026-05-04 已拉(9GB),推理鏈備援 'gemma3:4b': {'in': 0.0, 'out': 0.0}, # 2026-05-04 已拉(3.3GB),輕量任務 'minicpm-v:latest': {'in': 0.0, 'out': 0.0}, # Phase 14 PPT vision(5.5GB) 'llava:latest': {'in': 0.0, 'out': 0.0}, # 2026-05-04 已拉(4.7GB),Vision 備援 'llama3.1:8b': {'in': 0.0, 'out': 0.0}, 'llama3.2:latest': {'in': 0.0, 'out': 0.0}, # 111 final fallback 3B 'bge-m3:latest': {'in': 0.0, 'out': 0.0}, } # ───────────────────────────────────────────────────────────────────────────── # 環境開關 + Kill-switch # ───────────────────────────────────────────────────────────────────────────── def _is_logging_enabled() -> bool: """環境變數即時讀取 (允許 runtime toggle)""" val = os.environ.get('AI_CALL_LOGGING_ENABLED', 'true').strip().lower() return val not in ('false', '0', 'no', 'off') # 連續失敗門檻;超過後降級為純 logger.info,不再嘗試 DB _MAX_CONSECUTIVE_FAILURES = 10 _failure_counter_lock = threading.Lock() _failure_state = {'count': 0, 'killed': False} _AI_CALL_PROVIDER_WHITELIST = frozenset({ 'gcp_ollama', 'ollama_secondary', 'ollama_111', 'ollama_other', 'gemini', 'claude', 'nim', 'openrouter', 'nim_via_elephant', }) def _normalize_provider(provider: str) -> str: """Return an ai_calls.provider value that satisfies the DB CHECK. `ollama_other` is telemetry-only: it represents an approved Ollama route whose concrete host was unknown or failed before a successful host was selected. It must not be used as a routing target. """ text = (provider or '').strip()[:32] if text in _AI_CALL_PROVIDER_WHITELIST: return text lowered = text.lower() if not lowered or lowered in {'unknown', 'none', 'null', 'ollama', 'ollama_local'}: return 'ollama_other' if lowered.startswith('gemini'): return 'gemini' if lowered.startswith(('claude', 'anthropic')): return 'claude' if lowered.startswith(('nim', 'nvidia', 'meta/')): return 'nim' if lowered.startswith('openrouter'): return 'openrouter' if 'ollama' in lowered: return 'ollama_other' return 'ollama_other' def _record_failure() -> None: with _failure_counter_lock: _failure_state['count'] += 1 if _failure_state['count'] >= _MAX_CONSECUTIVE_FAILURES and not _failure_state['killed']: _failure_state['killed'] = True logger.error( "[AICallLogger] consecutive write failures hit %d — kill-switch ON, " "downgrading to logger.info only", _MAX_CONSECUTIVE_FAILURES, ) def _record_success() -> None: with _failure_counter_lock: if _failure_state['count'] > 0: _failure_state['count'] = 0 def _is_killed() -> bool: with _failure_counter_lock: return _failure_state['killed'] def _reset_kill_switch() -> None: """測試專用:重置 kill-switch 狀態。""" with _failure_counter_lock: _failure_state['count'] = 0 _failure_state['killed'] = False # ───────────────────────────────────────────────────────────────────────────── # 內部狀態容器 # ───────────────────────────────────────────────────────────────────────────── class _CallState: """單次 LLM 呼叫的遙測狀態容器。""" __slots__ = ( 'caller', 'provider', 'model', 'request_id', 'input_tokens', 'output_tokens', 'duration_ms', 'status', 'fallback_to', 'cost_usd', 'cache_hit', 'rag_hit', 'error', 'meta', ) def __init__(self, caller: str, provider: str, model: str, request_id: Optional[str], meta: Dict[str, Any]): # Phase 16 (2026-05-04):caller_registry 強制驗證(critic-A11 L4 修補) # 不在 registry 不 raise(保留擴展彈性),只 log warning 提醒新增 ADR try: from services.llm_caller_registry import assert_known_caller assert_known_caller(caller, strict=False) except ImportError: logger.warning( "[AICallLogger] caller registry import failed; skipping caller validation (caller=%s)", caller, exc_info=True, ) self.caller = caller self.provider = _normalize_provider(provider) self.model = model self.request_id = request_id self.input_tokens = 0 self.output_tokens = 0 self.duration_ms: Optional[int] = None self.status: Optional[str] = None self.fallback_to: Optional[str] = None self.cost_usd = 0.0 self.cache_hit = False self.rag_hit = False self.error: Optional[str] = None self.meta: Dict[str, Any] = dict(meta) if meta else {} # ── caller 操作 API ────────────────────────────────────────────── def set_tokens(self, input: int = 0, output: int = 0) -> None: """設定 token 數。容錯 None / 非整數。""" try: self.input_tokens = int(input or 0) except (TypeError, ValueError): self.input_tokens = 0 try: self.output_tokens = int(output or 0) except (TypeError, ValueError): self.output_tokens = 0 def set_provider(self, provider: str) -> None: """更新實際 provider。適用於 Ollama 三主機 retry 後才知道落點的 caller。""" if provider: self.provider = _normalize_provider(provider) def set_model(self, model: str) -> None: """更新實際模型。適用於 host-aware downgrade 後才知道落點模型的 caller。""" if model: self.model = model[:128] def set_cache_hit(self, hit: bool = True) -> None: self.cache_hit = bool(hit) def set_rag_hit(self, hit: bool = True) -> None: self.rag_hit = bool(hit) def fallback_to_caller(self, target_caller: str) -> None: """主路徑失敗、觸發下游 caller 接手。下游本身會另寫一筆 ok/error。""" self.fallback_to = (target_caller or '')[:64] self.status = 'fallback' # 別名:與設計文 spec 對齊 fallback_to_target = fallback_to_caller def set_error(self, msg: str) -> None: self.error = (msg or '')[:2000] self.status = 'error' def set_status(self, status: str) -> None: self.status = (status or '')[:16] def set_prompt_hash(self, prompt: Optional[str]) -> None: """安全地將 prompt 轉成 hash 存入 meta(PII 保護)。""" if prompt: digest = hashlib.sha256(prompt.encode('utf-8', errors='replace')).hexdigest() self.meta['prompt_hash'] = digest[:12] def add_meta(self, key: str, value: Any) -> None: if key: self.meta[key] = value # ───────────────────────────────────────────────────────────────────────────── # 主入口 1: context manager # ───────────────────────────────────────────────────────────────────────────── @contextmanager def log_ai_call( caller: str, provider: str, model: str, request_id: Optional[str] = None, meta: Optional[Dict[str, Any]] = None, ): """ 使用範例: with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx: response = ollama.generate(...) ctx.set_tokens(input=response['prompt_eval_count'], output=response['eval_count']) ctx.set_cache_hit(False) # 失敗時 ctx.set_error('timeout') / ctx.fallback_to_caller('111_ollama') 紀律: - 永遠不影響主流程:例外會 re-raise,但 logger 寫入是 fire-and-forget - 若 AI_CALL_LOGGING_ENABLED=false → 仍 yield ctx(API 一致),但跳過寫入 """ state = _CallState(caller, provider, model, request_id, meta or {}) start = time.monotonic() try: yield state # 沒例外 → 若 caller 自己沒設 status,預設 ok if state.status is None: state.status = 'ok' except Exception as e: state.status = 'error' if not state.error: state.error = f"{type(e).__name__}: {str(e)[:1500]}" raise finally: state.duration_ms = int((time.monotonic() - start) * 1000) try: _async_write(state) except Exception as exc: # pragma: no cover — 寫入 thread 啟動失敗 logger.warning("[AICallLogger] async dispatch failed: %s", exc) # ───────────────────────────────────────────────────────────────────────────── # 主入口 2: decorator # ───────────────────────────────────────────────────────────────────────────── def logged_ai_call( caller: str, provider: str, model: Optional[str] = None, model_extractor: Optional[Callable[[tuple, dict], str]] = None, ): """ 使用範例: @logged_ai_call(caller='sales_copy', provider='gcp_ollama', model_extractor=lambda a, kw: kw.get('model', 'llama3.1:8b')) def generate_copy(...): return ollama.generate(model='llama3.1:8b', ...) Args: caller: ai_calls.caller 白名單字串 provider: ai_calls.provider model: 靜態模型名(與 model_extractor 二擇一) model_extractor: 從 (args, kwargs) 解析 model 名(動態優先) 注意: - decorator 不知道 token 數;若需精準 token,請改用 log_ai_call context manager - 例外會 re-raise,狀態自動標 error """ def deco(fn: Callable): @wraps(fn) def wrapper(*args, **kwargs): try: resolved_model = ( model_extractor(args, kwargs) if model_extractor else (model or 'unknown') ) except Exception: resolved_model = model or 'unknown' with log_ai_call(caller, provider, resolved_model) as ctx: result = fn(*args, **kwargs) # 嘗試從 result 自動抽 tokens(best-effort,失敗不影響主流程) try: _auto_extract_tokens(ctx, result) except Exception: logger.debug("[AICallLogger] auto token extraction failed", exc_info=True) return result return wrapper return deco def _auto_extract_tokens(ctx: _CallState, result: Any) -> None: """從常見 LLM response 形態自動抽 token(best-effort)。""" if result is None: return # dict (Ollama / NIM raw) if isinstance(result, dict): usage = result.get('usage') or {} if usage: ctx.set_tokens( input=usage.get('prompt_tokens') or usage.get('input_tokens') or 0, output=usage.get('completion_tokens') or usage.get('output_tokens') or 0, ) return # Ollama: prompt_eval_count / eval_count if 'eval_count' in result or 'prompt_eval_count' in result: ctx.set_tokens( input=result.get('prompt_eval_count', 0), output=result.get('eval_count', 0), ) return # ───────────────────────────────────────────────────────────────────────────── # 異步寫入 (fire-and-forget) # ───────────────────────────────────────────────────────────────────────────── def _async_write(state: _CallState) -> None: """放到 daemon thread 寫,主流程不阻塞。 若 AI_CALL_LOGGING_ENABLED=false → 直接跳過。 若 kill-switch 觸發 → 退化為 logger.info。 """ if not _is_logging_enabled(): return if _is_killed(): # 降級模式:純 log,不再碰 DB logger.info( "[AICall|killed] caller=%s provider=%s model=%s status=%s " "tokens=%s/%s duration=%sms", state.caller, state.provider, state.model, state.status, state.input_tokens, state.output_tokens, state.duration_ms, ) return threading.Thread( target=_write_to_db, args=(state,), name=f"ai-call-log-{state.caller}", daemon=True, ).start() def _write_to_db(state: _CallState) -> None: """try/except 全包;DB 掛了只 log warning 不爆炸。""" try: from sqlalchemy import text from database.manager import get_session cost = _calc_cost(state.model, state.input_tokens, state.output_tokens) meta_json = _safe_meta_json(state.meta) session = get_session() try: session.execute( text(""" INSERT INTO ai_calls ( caller, provider, model, input_tokens, output_tokens, duration_ms, status, fallback_to, cost_usd, cache_hit, rag_hit, request_id, error, meta ) VALUES ( :caller, :provider, :model, :input_tokens, :output_tokens, :duration_ms, :status, :fallback_to, :cost_usd, :cache_hit, :rag_hit, :request_id, :error, CAST(:meta AS JSONB) ) """), { 'caller': state.caller[:64] if state.caller else 'unknown', 'provider': _normalize_provider(state.provider), 'model': (state.model or 'unknown')[:128], 'input_tokens': int(state.input_tokens or 0), 'output_tokens': int(state.output_tokens or 0), 'duration_ms': state.duration_ms, 'status': (state.status or 'ok')[:16], 'fallback_to': state.fallback_to, 'cost_usd': cost, 'cache_hit': bool(state.cache_hit), 'rag_hit': bool(state.rag_hit), 'request_id': state.request_id, 'error': state.error, 'meta': meta_json, }, ) session.commit() _record_success() except Exception: session.rollback() raise finally: session.close() except Exception as e: _record_failure() logger.warning( "[AICallLogger] write failed (caller=%s provider=%s): %s", state.caller, state.provider, e, ) def _calc_cost(model: str, in_tokens: int, out_tokens: int) -> float: """依 COST_TABLE 計算成本;未知 model log warning 並回 0。""" if not model: return 0.0 rate = COST_TABLE.get(model) if rate is None: # NIM 配額制走免費 tier,常見 nvidia/* meta/* deepseek-* 視為 0 prefix_zero = ('meta/', 'nvidia/', 'deepseek-') if any(model.startswith(p) for p in prefix_zero): return 0.0 logger.warning("[AICallLogger] unknown model cost: %s, default 0", model) return 0.0 in_t = max(0, int(in_tokens or 0)) out_t = max(0, int(out_tokens or 0)) cost = (in_t * rate['in'] + out_t * rate['out']) / 1_000_000 # NUMERIC(10,6) 上限 9999.999999;極端 case 截斷避免 overflow if cost < 0: return 0.0 return round(min(cost, 9999.999999), 6) def _safe_meta_json(meta: Dict[str, Any]) -> str: """meta 序列化為 JSON 字串;失敗時回 '{}'。""" import json if not meta: return '{}' try: return json.dumps(meta, ensure_ascii=False, default=str) except Exception as e: logger.warning("[AICallLogger] meta json dump failed: %s", e) return '{}'