Files
ewoooc/services/ai_call_logger.py
OoO c329d96dff
All checks were successful
CD Pipeline / deploy (push) Successful in 1m10s
限制 111 fallback context 大小
2026-05-21 12:44:33 +08:00

451 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/ai_call_logger.py
統一 LLM 呼叫遙測層 (Operation Ollama-First v5.0 — Phase 1)
依據:
- docs/phase0_audit_report_20260503.md (34 個 LLM 呼叫點 / 11.8% 覆蓋率)
- docs/phase1_db_design_20260503.md (ai_calls schema)
- migrations/024_create_ai_calls_table.sql
設計原則 (憲法級):
1. 非阻塞: DB 寫入跑 daemon thread主流程不等
2. 失敗安全: DB 例外只 log warning絕不影響 LLM 主流程
3. PII 保護: meta 不存原始 prompt只存 prompt_hash[:12]
4. Kill-switch: AI_CALL_LOGGING_ENABLED=false 一鍵關閉
5. 連續失敗 ≥ 10 次自動降級為純 logger.info
主入口:
- log_ai_call(...) context manager (推薦)
- logged_ai_call(...) decorator (簡單一行 LLM call)
"""
from __future__ import annotations
import hashlib
import logging
import os
import threading
import time
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 成本表 (USD per 1M tokens)
# 依據 phase0 audit + 各 provider 官方定價Ollama 全部 0
# ─────────────────────────────────────────────────────────────────────────────
COST_TABLE: Dict[str, Dict[str, float]] = {
# Gemini
'gemini-2.5-flash': {'in': 0.075, 'out': 0.30},
'gemini-2.5-pro': {'in': 1.25, 'out': 10.0},
'gemini-2.0-flash': {'in': 0.075, 'out': 0.30},
'gemini-1.5-flash': {'in': 0.075, 'out': 0.30},
# NVIDIA NIM (配額制,免費 tier 全 0)
'meta/llama-3.1-8b-instruct': {'in': 0.0, 'out': 0.0},
'meta/llama-3.3-70b-instruct': {'in': 0.0, 'out': 0.0},
'nvidia/llama-3.3-nemotron-super-49b-v1.5': {'in': 0.0, 'out': 0.0},
'deepseek-ai/deepseek-v3.2': {'in': 0.0, 'out': 0.0},
# Claude (Anthropic) — 2026-05 市場價USD per 1M tokens
'claude-opus-4-7': {'in': 15.0, 'out': 75.0}, # 程式碼 #1Arena Elo 1548
'claude-sonnet-4-6': {'in': 3.0, 'out': 15.0}, # agentic 平衡
'claude-haiku-4-5': {'in': 0.8, 'out': 4.0}, # 輕量快速
# Ollama 自架 (全 0Phase 8/13 GCP 拉模型陸續加入)
'hermes3:latest': {'in': 0.0, 'out': 0.0},
'qwen2.5-coder:7b': {'in': 0.0, 'out': 0.0},
'qwen2.5-coder:32b': {'in': 0.0, 'out': 0.0}, # 2026-05-04 Primary+Secondary 已拉19GBAiderHeal 32B
'qwen2.5:7b-instruct': {'in': 0.0, 'out': 0.0}, # Phase 3 A7 OpenClaw Q&A 預設
'qwen3:14b': {'in': 0.0, 'out': 0.0}, # Phase 3 A9 Nemotron + A7 升級
'qwen2-vl:7b': {'in': 0.0, 'out': 0.0}, # Phase 13 PPT visionOllama registry 暫無,用 minicpm-v 代替)
'deepseek-r1:14b': {'in': 0.0, 'out': 0.0}, # 2026-05-04 已拉9GB推理鏈備援
'gemma3:4b': {'in': 0.0, 'out': 0.0}, # 2026-05-04 已拉3.3GB),輕量任務
'minicpm-v:latest': {'in': 0.0, 'out': 0.0}, # Phase 14 PPT vision5.5GB
'llava:latest': {'in': 0.0, 'out': 0.0}, # 2026-05-04 已拉4.7GBVision 備援
'llama3.1:8b': {'in': 0.0, 'out': 0.0},
'llama3.2:latest': {'in': 0.0, 'out': 0.0}, # 111 final fallback 3B
'bge-m3:latest': {'in': 0.0, 'out': 0.0},
}
# ─────────────────────────────────────────────────────────────────────────────
# 環境開關 + Kill-switch
# ─────────────────────────────────────────────────────────────────────────────
def _is_logging_enabled() -> bool:
"""環境變數即時讀取 (允許 runtime toggle)"""
val = os.environ.get('AI_CALL_LOGGING_ENABLED', 'true').strip().lower()
return val not in ('false', '0', 'no', 'off')
# 連續失敗門檻;超過後降級為純 logger.info不再嘗試 DB
_MAX_CONSECUTIVE_FAILURES = 10
_failure_counter_lock = threading.Lock()
_failure_state = {'count': 0, 'killed': False}
def _record_failure() -> None:
with _failure_counter_lock:
_failure_state['count'] += 1
if _failure_state['count'] >= _MAX_CONSECUTIVE_FAILURES and not _failure_state['killed']:
_failure_state['killed'] = True
logger.error(
"[AICallLogger] consecutive write failures hit %d — kill-switch ON, "
"downgrading to logger.info only",
_MAX_CONSECUTIVE_FAILURES,
)
def _record_success() -> None:
with _failure_counter_lock:
if _failure_state['count'] > 0:
_failure_state['count'] = 0
def _is_killed() -> bool:
with _failure_counter_lock:
return _failure_state['killed']
def _reset_kill_switch() -> None:
"""測試專用:重置 kill-switch 狀態。"""
with _failure_counter_lock:
_failure_state['count'] = 0
_failure_state['killed'] = False
# ─────────────────────────────────────────────────────────────────────────────
# 內部狀態容器
# ─────────────────────────────────────────────────────────────────────────────
class _CallState:
"""單次 LLM 呼叫的遙測狀態容器。"""
__slots__ = (
'caller', 'provider', 'model', 'request_id',
'input_tokens', 'output_tokens',
'duration_ms', 'status', 'fallback_to',
'cost_usd', 'cache_hit', 'rag_hit',
'error', 'meta',
)
def __init__(self, caller: str, provider: str, model: str,
request_id: Optional[str], meta: Dict[str, Any]):
# Phase 16 (2026-05-04)caller_registry 強制驗證critic-A11 L4 修補)
# 不在 registry 不 raise保留擴展彈性只 log warning 提醒新增 ADR
try:
from services.llm_caller_registry import assert_known_caller
assert_known_caller(caller, strict=False)
except ImportError:
logger.warning(
"[AICallLogger] caller registry import failed; skipping caller validation (caller=%s)",
caller,
exc_info=True,
)
self.caller = caller
self.provider = provider
self.model = model
self.request_id = request_id
self.input_tokens = 0
self.output_tokens = 0
self.duration_ms: Optional[int] = None
self.status: Optional[str] = None
self.fallback_to: Optional[str] = None
self.cost_usd = 0.0
self.cache_hit = False
self.rag_hit = False
self.error: Optional[str] = None
self.meta: Dict[str, Any] = dict(meta) if meta else {}
# ── caller 操作 API ──────────────────────────────────────────────
def set_tokens(self, input: int = 0, output: int = 0) -> None:
"""設定 token 數。容錯 None / 非整數。"""
try:
self.input_tokens = int(input or 0)
except (TypeError, ValueError):
self.input_tokens = 0
try:
self.output_tokens = int(output or 0)
except (TypeError, ValueError):
self.output_tokens = 0
def set_provider(self, provider: str) -> None:
"""更新實際 provider。適用於 Ollama 三主機 retry 後才知道落點的 caller。"""
if provider:
self.provider = provider[:32]
def set_model(self, model: str) -> None:
"""更新實際模型。適用於 host-aware downgrade 後才知道落點模型的 caller。"""
if model:
self.model = model[:128]
def set_cache_hit(self, hit: bool = True) -> None:
self.cache_hit = bool(hit)
def set_rag_hit(self, hit: bool = True) -> None:
self.rag_hit = bool(hit)
def fallback_to_caller(self, target_caller: str) -> None:
"""主路徑失敗、觸發下游 caller 接手。下游本身會另寫一筆 ok/error。"""
self.fallback_to = (target_caller or '')[:64]
self.status = 'fallback'
# 別名:與設計文 spec 對齊
fallback_to_target = fallback_to_caller
def set_error(self, msg: str) -> None:
self.error = (msg or '')[:2000]
self.status = 'error'
def set_status(self, status: str) -> None:
self.status = (status or '')[:16]
def set_prompt_hash(self, prompt: Optional[str]) -> None:
"""安全地將 prompt 轉成 hash 存入 metaPII 保護)。"""
if prompt:
digest = hashlib.sha256(prompt.encode('utf-8', errors='replace')).hexdigest()
self.meta['prompt_hash'] = digest[:12]
def add_meta(self, key: str, value: Any) -> None:
if key:
self.meta[key] = value
# ─────────────────────────────────────────────────────────────────────────────
# 主入口 1: context manager
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def log_ai_call(
caller: str,
provider: str,
model: str,
request_id: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None,
):
"""
使用範例:
with log_ai_call('hermes_analyst', 'gcp_ollama', 'hermes3:latest') as ctx:
response = ollama.generate(...)
ctx.set_tokens(input=response['prompt_eval_count'],
output=response['eval_count'])
ctx.set_cache_hit(False)
# 失敗時 ctx.set_error('timeout') / ctx.fallback_to_caller('111_ollama')
紀律:
- 永遠不影響主流程:例外會 re-raise但 logger 寫入是 fire-and-forget
- 若 AI_CALL_LOGGING_ENABLED=false → 仍 yield ctxAPI 一致),但跳過寫入
"""
state = _CallState(caller, provider, model, request_id, meta or {})
start = time.monotonic()
try:
yield state
# 沒例外 → 若 caller 自己沒設 status預設 ok
if state.status is None:
state.status = 'ok'
except Exception as e:
state.status = 'error'
if not state.error:
state.error = f"{type(e).__name__}: {str(e)[:1500]}"
raise
finally:
state.duration_ms = int((time.monotonic() - start) * 1000)
try:
_async_write(state)
except Exception as exc: # pragma: no cover — 寫入 thread 啟動失敗
logger.warning("[AICallLogger] async dispatch failed: %s", exc)
# ─────────────────────────────────────────────────────────────────────────────
# 主入口 2: decorator
# ─────────────────────────────────────────────────────────────────────────────
def logged_ai_call(
caller: str,
provider: str,
model: Optional[str] = None,
model_extractor: Optional[Callable[[tuple, dict], str]] = None,
):
"""
使用範例:
@logged_ai_call(caller='sales_copy', provider='gcp_ollama',
model_extractor=lambda a, kw: kw.get('model', 'llama3.1:8b'))
def generate_copy(...):
return ollama.generate(model='llama3.1:8b', ...)
Args:
caller: ai_calls.caller 白名單字串
provider: ai_calls.provider
model: 靜態模型名(與 model_extractor 二擇一)
model_extractor: 從 (args, kwargs) 解析 model 名(動態優先)
注意:
- decorator 不知道 token 數;若需精準 token請改用 log_ai_call context manager
- 例外會 re-raise狀態自動標 error
"""
def deco(fn: Callable):
@wraps(fn)
def wrapper(*args, **kwargs):
try:
resolved_model = (
model_extractor(args, kwargs) if model_extractor else (model or 'unknown')
)
except Exception:
resolved_model = model or 'unknown'
with log_ai_call(caller, provider, resolved_model) as ctx:
result = fn(*args, **kwargs)
# 嘗試從 result 自動抽 tokensbest-effort失敗不影響主流程
try:
_auto_extract_tokens(ctx, result)
except Exception:
logger.debug("[AICallLogger] auto token extraction failed", exc_info=True)
return result
return wrapper
return deco
def _auto_extract_tokens(ctx: _CallState, result: Any) -> None:
"""從常見 LLM response 形態自動抽 tokenbest-effort"""
if result is None:
return
# dict (Ollama / NIM raw)
if isinstance(result, dict):
usage = result.get('usage') or {}
if usage:
ctx.set_tokens(
input=usage.get('prompt_tokens') or usage.get('input_tokens') or 0,
output=usage.get('completion_tokens') or usage.get('output_tokens') or 0,
)
return
# Ollama: prompt_eval_count / eval_count
if 'eval_count' in result or 'prompt_eval_count' in result:
ctx.set_tokens(
input=result.get('prompt_eval_count', 0),
output=result.get('eval_count', 0),
)
return
# ─────────────────────────────────────────────────────────────────────────────
# 異步寫入 (fire-and-forget)
# ─────────────────────────────────────────────────────────────────────────────
def _async_write(state: _CallState) -> None:
"""放到 daemon thread 寫,主流程不阻塞。
若 AI_CALL_LOGGING_ENABLED=false → 直接跳過。
若 kill-switch 觸發 → 退化為 logger.info。
"""
if not _is_logging_enabled():
return
if _is_killed():
# 降級模式:純 log不再碰 DB
logger.info(
"[AICall|killed] caller=%s provider=%s model=%s status=%s "
"tokens=%s/%s duration=%sms",
state.caller, state.provider, state.model, state.status,
state.input_tokens, state.output_tokens, state.duration_ms,
)
return
threading.Thread(
target=_write_to_db,
args=(state,),
name=f"ai-call-log-{state.caller}",
daemon=True,
).start()
def _write_to_db(state: _CallState) -> None:
"""try/except 全包DB 掛了只 log warning 不爆炸。"""
try:
from sqlalchemy import text
from database.manager import get_session
cost = _calc_cost(state.model, state.input_tokens, state.output_tokens)
meta_json = _safe_meta_json(state.meta)
session = get_session()
try:
session.execute(
text("""
INSERT INTO ai_calls (
caller, provider, model,
input_tokens, output_tokens, duration_ms,
status, fallback_to, cost_usd,
cache_hit, rag_hit, request_id,
error, meta
) VALUES (
:caller, :provider, :model,
:input_tokens, :output_tokens, :duration_ms,
:status, :fallback_to, :cost_usd,
:cache_hit, :rag_hit, :request_id,
:error, CAST(:meta AS JSONB)
)
"""),
{
'caller': state.caller[:64] if state.caller else 'unknown',
'provider': (state.provider or 'unknown')[:32],
'model': (state.model or 'unknown')[:128],
'input_tokens': int(state.input_tokens or 0),
'output_tokens': int(state.output_tokens or 0),
'duration_ms': state.duration_ms,
'status': (state.status or 'ok')[:16],
'fallback_to': state.fallback_to,
'cost_usd': cost,
'cache_hit': bool(state.cache_hit),
'rag_hit': bool(state.rag_hit),
'request_id': state.request_id,
'error': state.error,
'meta': meta_json,
},
)
session.commit()
_record_success()
except Exception:
session.rollback()
raise
finally:
session.close()
except Exception as e:
_record_failure()
logger.warning(
"[AICallLogger] write failed (caller=%s provider=%s): %s",
state.caller, state.provider, e,
)
def _calc_cost(model: str, in_tokens: int, out_tokens: int) -> float:
"""依 COST_TABLE 計算成本;未知 model log warning 並回 0。"""
if not model:
return 0.0
rate = COST_TABLE.get(model)
if rate is None:
# NIM 配額制走免費 tier常見 nvidia/* meta/* deepseek-* 視為 0
prefix_zero = ('meta/', 'nvidia/', 'deepseek-')
if any(model.startswith(p) for p in prefix_zero):
return 0.0
logger.warning("[AICallLogger] unknown model cost: %s, default 0", model)
return 0.0
in_t = max(0, int(in_tokens or 0))
out_t = max(0, int(out_tokens or 0))
cost = (in_t * rate['in'] + out_t * rate['out']) / 1_000_000
# NUMERIC(10,6) 上限 9999.999999;極端 case 截斷避免 overflow
if cost < 0:
return 0.0
return round(min(cost, 9999.999999), 6)
def _safe_meta_json(meta: Dict[str, Any]) -> str:
"""meta 序列化為 JSON 字串;失敗時回 '{}'"""
import json
if not meta:
return '{}'
try:
return json.dumps(meta, ensure_ascii=False, default=str)
except Exception as e:
logger.warning("[AICallLogger] meta json dump failed: %s", e)
return '{}'