Files
awoooi/apps/api/src/services/langfuse_client.py
2026-03-26 09:37:09 +08:00

332 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Langfuse LLMOps Client - Phase 15.1 + 15.3
==========================================
LLM 呼叫追蹤、成本監控、Prompt 版本管理
Phase 15.1 (2026-03-26): 基礎整合
Phase 15.3 (2026-03-26): Deep Linking (Langfuse ↔ SignOz)
端點: http://192.168.0.110:3100 (DevOps 金庫)
Features:
- 自動追蹤所有 LLM 呼叫 (Ollama/Gemini/Claude)
- 成本估算與監控
- Prompt 版本管理
- **Phase 15.3**: OTEL Trace 整合 (SignOz Deep Link)
Usage:
from src.services.langfuse_client import get_langfuse, langfuse_trace
# 方法 1: Context Manager (自動整合 OTEL trace_id)
async with langfuse_trace("openclaw_decision") as trace:
result = await call_llm(prompt)
trace.generation(
name="ollama_call",
model="qwen2.5:7b-instruct",
input=prompt,
output=result,
)
# trace.metadata 自動包含 signoz_trace_url
# 方法 2: 裝飾器
@langfuse_observe(name="analyze_incident")
async def analyze_incident(incident_id: str):
...
"""
from collections.abc import Callable
from contextlib import asynccontextmanager
from functools import wraps
from typing import Any
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# Langfuse client singleton
_langfuse_client = None
def get_langfuse():
"""
取得 Langfuse client singleton
Returns:
Langfuse client 或 None (如果未啟用或未配置)
"""
global _langfuse_client
if not settings.LANGFUSE_ENABLED:
return None
if not settings.LANGFUSE_PUBLIC_KEY or not settings.LANGFUSE_SECRET_KEY:
logger.warning(
"langfuse_not_configured",
message="Langfuse enabled but keys not set",
)
return None
if _langfuse_client is None:
try:
from langfuse import Langfuse
_langfuse_client = Langfuse(
public_key=settings.LANGFUSE_PUBLIC_KEY,
secret_key=settings.LANGFUSE_SECRET_KEY,
host=settings.LANGFUSE_URL,
)
logger.info(
"langfuse_initialized",
host=settings.LANGFUSE_URL,
)
except Exception as e:
logger.error(
"langfuse_init_failed",
error=str(e),
)
return None
return _langfuse_client
class LangfuseTraceContext:
"""
Langfuse Trace Context for tracking LLM calls
Phase 15.3: 自動整合 OTEL trace_id實現 Langfuse ↔ SignOz Deep Linking
"""
def __init__(self, name: str, metadata: dict[str, Any] | None = None):
self.name = name
self.metadata = metadata or {}
self.trace = None
self._client = get_langfuse()
self._otel_trace_id: str | None = None
self._langfuse_trace_id: str | None = None
def __enter__(self):
if self._client:
try:
# Phase 15.3: 取得當前 OTEL trace_id 並注入 metadata
from src.core.deep_linking import DeepLinking
from src.core.telemetry import get_current_trace_id
self._otel_trace_id = get_current_trace_id()
# 建立含 SignOz Deep Link 的 metadata
enriched_metadata = {**self.metadata}
if self._otel_trace_id:
enriched_metadata["otel_trace_id"] = self._otel_trace_id
enriched_metadata["signoz_trace_url"] = DeepLinking.signoz_trace_url(
self._otel_trace_id
)
self.trace = self._client.trace(
name=self.name,
metadata=enriched_metadata,
)
# 記錄 Langfuse trace_id 供反向連結
if self.trace:
self._langfuse_trace_id = self.trace.id
logger.debug(
"langfuse_trace_started",
name=self.name,
otel_trace_id=self._otel_trace_id,
langfuse_trace_id=self._langfuse_trace_id,
)
except Exception as e:
logger.warning("langfuse_trace_start_failed", error=str(e))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Langfuse auto-flushes, no explicit close needed
pass
@property
def otel_trace_id(self) -> str | None:
"""取得關聯的 OTEL trace_id"""
return self._otel_trace_id
@property
def langfuse_trace_id(self) -> str | None:
"""取得 Langfuse trace_id"""
return self._langfuse_trace_id
def generation(
self,
name: str,
model: str,
input: str | dict[str, Any],
output: str | dict[str, Any] | None = None,
usage: dict[str, int] | None = None,
metadata: dict[str, Any] | None = None,
):
"""
記錄一次 LLM generation
Args:
name: Generation 名稱 (e.g., "ollama_call", "gemini_fallback")
model: 模型名稱 (e.g., "qwen2.5:7b-instruct", "gemini-1.5-flash")
input: 輸入 prompt
output: 輸出結果
usage: Token 使用量 {"input": x, "output": y}
metadata: 額外 metadata
"""
if not self.trace:
return None
try:
gen = self.trace.generation(
name=name,
model=model,
input=input,
output=output,
usage=usage,
metadata=metadata or {},
)
return gen
except Exception as e:
logger.warning(
"langfuse_generation_failed",
error=str(e),
name=name,
model=model,
)
return None
def span(self, name: str, metadata: dict[str, Any] | None = None):
"""
記錄一個 span (非 LLM 操作)
Args:
name: Span 名稱
metadata: 額外 metadata
"""
if not self.trace:
return None
try:
return self.trace.span(name=name, metadata=metadata or {})
except Exception as e:
logger.warning("langfuse_span_failed", error=str(e), name=name)
return None
def score(
self,
name: str,
value: float,
comment: str | None = None,
):
"""
記錄評分 (用於 Prompt 品質追蹤)
Args:
name: 評分名稱 (e.g., "response_quality", "format_compliance")
value: 分數 (0.0 - 1.0)
comment: 評論
"""
if not self.trace:
return
try:
self.trace.score(
name=name,
value=value,
comment=comment,
)
except Exception as e:
logger.warning(
"langfuse_score_failed",
error=str(e),
name=name,
)
def langfuse_trace(name: str, metadata: dict[str, Any] | None = None):
"""
Langfuse trace context manager
Usage:
with langfuse_trace("openclaw_decision") as trace:
result = await call_llm(prompt)
trace.generation(name="ollama", model="qwen2.5:7b-instruct", ...)
"""
return LangfuseTraceContext(name=name, metadata=metadata)
@asynccontextmanager
async def langfuse_trace_async(name: str, metadata: dict[str, Any] | None = None):
"""
Async version of langfuse_trace
Usage:
async with langfuse_trace_async("openclaw_decision") as trace:
result = await call_llm(prompt)
"""
ctx = LangfuseTraceContext(name=name, metadata=metadata)
ctx.__enter__()
try:
yield ctx
finally:
ctx.__exit__(None, None, None)
def langfuse_observe(
name: str | None = None,
metadata: dict[str, Any] | None = None,
):
"""
Langfuse 裝飾器 - 自動追蹤函數執行
Usage:
@langfuse_observe(name="analyze_incident")
async def analyze_incident(incident_id: str):
...
"""
def decorator(func: Callable):
trace_name = name or func.__name__
@wraps(func)
async def async_wrapper(*args, **kwargs):
async with langfuse_trace_async(trace_name, metadata) as trace:
# Inject trace into kwargs if function accepts it
if "langfuse_trace" in func.__code__.co_varnames:
kwargs["langfuse_trace"] = trace
return await func(*args, **kwargs)
@wraps(func)
def sync_wrapper(*args, **kwargs):
with langfuse_trace(trace_name, metadata) as trace:
if "langfuse_trace" in func.__code__.co_varnames:
kwargs["langfuse_trace"] = trace
return func(*args, **kwargs)
# Return appropriate wrapper based on function type
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
def flush_langfuse():
"""
手動 flush Langfuse (通常不需要client 會自動 flush)
用於測試或確保資料送出
"""
client = get_langfuse()
if client:
try:
client.flush()
logger.debug("langfuse_flushed")
except Exception as e:
logger.warning("langfuse_flush_failed", error=str(e))