Files
awoooi/apps/api/src/services/signoz_client.py
OG T b00f318450 fix(api): correct OTEL gRPC endpoint format and SignOz query table
Root cause analysis:
1. OTEL gRPC endpoint had http:// prefix which is invalid for gRPC
2. SignOz query was targeting wrong table (signoz_metrics.distributed_samples_v4)
3. Should query signoz_traces.distributed_signoz_index_v2 for trace data

Fixes:
- Remove http:// prefix from OTEL_EXPORTER_OTLP_ENDPOINT (gRPC needs host:port)
- Update SignOz client to query traces table instead of metrics table
- Fix timestamp format (nanoseconds for DateTime64(9))
- statusCode: 0=Unset, 1=Ok, 2=Error

This should enable OTEL traces to reach SigNoz and GlobalPulse to show real metrics.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-23 00:41:51 +08:00

448 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
SignOz Client - 全能視力中心 (戰略校正版)
==========================================
統帥鐵律: 嚴禁 Prometheus 碎片化SignOz 為唯一真相來源
Features:
- ClickHouse 直查 (繞過需認證的 SignOz API)
- Gold Metrics 擷取 (P99 Latency, Error Rate, RPS)
- 動態時間範圍 Trace URL 生成
- 趨勢圖表數據提取 (供 AI 分析)
架構:
- SignOz Query Service: 192.168.0.188:3301 (需認證)
- ClickHouse HTTP API: 192.168.0.188:8123 (直查)
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
import json
import time
import structlog
from src.core.config import settings
from src.core.http_client import get_clickhouse_client
logger = structlog.get_logger(__name__)
# =============================================================================
# SignOz Data Models
# =============================================================================
@dataclass
class GoldMetrics:
"""
Gold Metrics - RED Methodology (Rate, Errors, Duration)
SRE 黃金指標:
- RPS (Requests Per Second): 流量
- Error Rate: 錯誤率 (%)
- P99 Latency: 99th percentile 延遲 (ms)
"""
service_name: str
namespace: str
time_range_start: datetime
time_range_end: datetime
# Rate
rps: float = 0.0
rps_trend: str = "stable" # up, down, stable
# Errors
error_rate: float = 0.0 # percentage
error_count: int = 0
total_requests: int = 0
# Duration
p50_latency_ms: float = 0.0
p95_latency_ms: float = 0.0
p99_latency_ms: float = 0.0
latency_trend: str = "stable"
# Raw data for AI analysis
raw_metrics: dict = field(default_factory=dict)
def to_summary(self) -> str:
"""生成 AI 分析摘要"""
trend_emoji = {"up": "📈", "down": "📉", "stable": "➡️"}
error_emoji = "🟢" if self.error_rate < 1 else ("🟡" if self.error_rate < 5 else "🔴")
return (
f"📊 Gold Metrics ({self.service_name})\n"
f"• RPS: {self.rps:.1f} {trend_emoji.get(self.rps_trend, '➡️')}\n"
f"• Error Rate: {error_emoji} {self.error_rate:.2f}%\n"
f"• P99 Latency: {self.p99_latency_ms:.0f}ms {trend_emoji.get(self.latency_trend, '➡️')}"
)
def to_telegram_block(self) -> str:
"""生成 Telegram 卡片區塊 (HTML)"""
trend_emoji = {"up": "📈", "down": "📉", "stable": "➡️"}
error_emoji = "🟢" if self.error_rate < 1 else ("🟡" if self.error_rate < 5 else "🔴")
return (
f"📊 <b>SignOz 指標</b>\n"
f"├ RPS: <code>{self.rps:.1f}</code> {trend_emoji.get(self.rps_trend, '➡️')}\n"
f"├ Error: {error_emoji} <code>{self.error_rate:.2f}%</code>\n"
f"└ P99: <code>{self.p99_latency_ms:.0f}ms</code> {trend_emoji.get(self.latency_trend, '➡️')}"
)
@dataclass
class SignOzTraceLink:
"""動態 SignOz Trace 連結"""
base_url: str
service_name: str
start_time: datetime
end_time: datetime
namespace: str = "default"
def generate_url(self) -> str:
"""
生成帶時間參數的 Trace URL
格式: http://host:port/traces?service=xxx&start=timestamp&end=timestamp
"""
start_ns = int(self.start_time.timestamp() * 1_000_000_000)
end_ns = int(self.end_time.timestamp() * 1_000_000_000)
return (
f"{self.base_url}/traces?"
f"service={self.service_name}&"
f"start={start_ns}&"
f"end={end_ns}"
)
# =============================================================================
# SignOz Client
# =============================================================================
class SignOzClient:
"""
SignOz Client - 直查 ClickHouse (永久架構版)
統帥鐵律: 禁止 subprocess+curl使用 Lifespan 管理的 httpx.AsyncClient
使用 ClickHouse HTTP API 繞過需認證的 SignOz Query Service
"""
def __init__(self):
self.signoz_url = settings.SIGNOZ_URL # http://192.168.0.188:3301
self.clickhouse_url = settings.CLICKHOUSE_URL # http://192.168.0.188:8123
async def close(self) -> None:
"""關閉連線 (由 Lifespan 統一管理,此處為相容性保留)"""
pass # HTTP Client 由 src.core.http_client 管理
# =========================================================================
# ClickHouse Direct Queries (永久架構)
# =========================================================================
async def _query_clickhouse(self, query: str) -> list[dict]:
"""
執行 ClickHouse 查詢 (原生 httpx非 curl)
統帥鐵律:
- 使用 Lifespan 管理的 httpx.AsyncClient
- trust_env=False 防止 HTTP_PROXY 干擾
- < 50ms 延遲目標
ClickHouse HTTP API: POST body = SQL, 加 FORMAT JSONEachRow 到查詢末尾
"""
# 加入 FORMAT JSONEachRow 到查詢末尾
formatted_query = query.strip().rstrip(";") + " FORMAT JSONEachRow"
start_time = time.perf_counter()
try:
# 取得 Lifespan 管理的 Client
client = await get_clickhouse_client()
logger.debug(
"clickhouse_query_start",
base_url=self.clickhouse_url,
query_preview=formatted_query[:80],
)
# 原生 httpx POST 請求
response = await client.post(
"/", # base_url 已設定,只需 path
content=formatted_query,
)
elapsed_ms = (time.perf_counter() - start_time) * 1000
# 檢查 HTTP 狀態
if response.status_code != 200:
logger.warning(
"clickhouse_query_http_error",
status_code=response.status_code,
response_text=response.text[:200],
elapsed_ms=round(elapsed_ms, 2),
)
return []
# 解析 JSONEachRow 格式 (每行一個 JSON 物件)
results = []
for line in response.text.strip().split("\n"):
if line:
try:
results.append(json.loads(line))
except json.JSONDecodeError:
continue
logger.info(
"clickhouse_query_success",
result_count=len(results),
elapsed_ms=round(elapsed_ms, 2),
method="httpx_native", # 🎯 統帥要求: 原生 httpx非 curl
)
return results
except Exception as e:
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.warning(
"clickhouse_query_failed",
error=str(e),
error_type=type(e).__name__,
query=query[:100],
elapsed_ms=round(elapsed_ms, 2),
)
return []
# =========================================================================
# Gold Metrics Extraction
# =========================================================================
async def get_gold_metrics(
self,
service_name: str,
namespace: str = "default",
time_window_minutes: int = 10,
) -> GoldMetrics:
"""
從 SignOz/ClickHouse 擷取 Gold Metrics
查詢過去 N 分鐘的:
- signoz_calls_total: RPS + Error Count
- signoz_latency.bucket: P50/P95/P99 延遲
Args:
service_name: 服務名稱 (如 api-gateway, harbor-core)
namespace: K8s namespace
time_window_minutes: 時間窗口 (分鐘)
Returns:
GoldMetrics: 黃金指標數據
"""
now = datetime.now(timezone.utc)
start_time = now - timedelta(minutes=time_window_minutes)
end_time = now
# 初始化 metrics
metrics = GoldMetrics(
service_name=service_name,
namespace=namespace,
time_range_start=start_time,
time_range_end=end_time,
)
# =====================================================================
# Query 1: RPS & Error Rate (從 traces 表直接計算)
# =====================================================================
# 使用 signoz_traces.distributed_signoz_index_v2 表
# statusCode: 0=Unset, 1=Ok, 2=Error
# 計算 Unix 納秒時間戳 (ClickHouse DateTime64(9) 格式)
start_ns = int(start_time.timestamp() * 1_000_000_000)
end_ns = int(end_time.timestamp() * 1_000_000_000)
rps_query = f"""
SELECT
count() as total_requests,
countIf(statusCode = 2) as error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp BETWEEN toDateTime64({start_ns}, 9) AND toDateTime64({end_ns}, 9)
AND serviceName LIKE '%{service_name}%'
"""
rps_results = await self._query_clickhouse(rps_query)
if rps_results:
row = rps_results[0]
total = int(row.get("total_requests", 0))
errors = int(row.get("error_count", 0))
metrics.total_requests = total
metrics.error_count = errors
metrics.error_rate = (errors / total * 100) if total > 0 else 0.0
metrics.rps = total / (time_window_minutes * 60)
# =====================================================================
# Query 2: Latency Percentiles (從 traces 表的 durationNano)
# =====================================================================
latency_query = f"""
SELECT
quantile(0.50)(durationNano / 1000000.0) as p50,
quantile(0.95)(durationNano / 1000000.0) as p95,
quantile(0.99)(durationNano / 1000000.0) as p99
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp BETWEEN toDateTime64({start_ns}, 9) AND toDateTime64({end_ns}, 9)
AND serviceName LIKE '%{service_name}%'
"""
latency_results = await self._query_clickhouse(latency_query)
if latency_results:
row = latency_results[0]
metrics.p50_latency_ms = float(row.get("p50", 0) or 0)
metrics.p95_latency_ms = float(row.get("p95", 0) or 0)
metrics.p99_latency_ms = float(row.get("p99", 0) or 0)
# =====================================================================
# Query 3: Trend Analysis (對比前一時間窗)
# =====================================================================
prev_start_ns = int((start_time - timedelta(minutes=time_window_minutes)).timestamp() * 1_000_000_000)
prev_end_ns = start_ns
trend_query = f"""
SELECT count() as prev_requests
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp BETWEEN toDateTime64({prev_start_ns}, 9) AND toDateTime64({prev_end_ns}, 9)
AND serviceName LIKE '%{service_name}%'
"""
trend_results = await self._query_clickhouse(trend_query)
if trend_results:
prev_total = int(trend_results[0].get("prev_requests", 0))
if prev_total > 0:
change_pct = (metrics.total_requests - prev_total) / prev_total * 100
if change_pct > 10:
metrics.rps_trend = "up"
elif change_pct < -10:
metrics.rps_trend = "down"
else:
metrics.rps_trend = "stable"
logger.info(
"signoz_gold_metrics_fetched",
service=service_name,
rps=metrics.rps,
error_rate=metrics.error_rate,
p99_latency=metrics.p99_latency_ms,
)
return metrics
# =========================================================================
# Trace URL Generation
# =========================================================================
def generate_trace_url(
self,
service_name: str,
alert_timestamp: datetime | None = None,
window_minutes: int = 5,
) -> str:
"""
生成動態時間範圍的 SignOz Trace URL
告警發生時間 ± window_minutes
Args:
service_name: 服務名稱
alert_timestamp: 告警發生時間 (預設為現在)
window_minutes: 前後時間窗口 (分鐘)
Returns:
str: SignOz Trace URL with timestamps
"""
if alert_timestamp is None:
alert_timestamp = datetime.now(timezone.utc)
link = SignOzTraceLink(
base_url=self.signoz_url,
service_name=service_name,
start_time=alert_timestamp - timedelta(minutes=window_minutes),
end_time=alert_timestamp + timedelta(minutes=window_minutes),
)
return link.generate_url()
# =========================================================================
# System Metrics (CPU, Memory, Disk)
# =========================================================================
async def get_system_metrics(
self,
_host: str = "192.168.0.188", # Reserved for future host filtering
time_window_minutes: int = 5,
) -> dict:
"""
擷取系統指標 (system.cpu.time, system.disk.io)
用於 High CPU / Disk Full 告警分析
"""
now = datetime.now(timezone.utc)
start_ms = int((now - timedelta(minutes=time_window_minutes)).timestamp() * 1000)
end_ms = int(now.timestamp() * 1000)
cpu_query = f"""
SELECT
avg(value) as cpu_avg,
max(value) as cpu_max
FROM signoz_metrics.distributed_samples_v4
WHERE
metric_name = 'system.cpu.time'
AND unix_milli BETWEEN {start_ms} AND {end_ms}
"""
disk_query = f"""
SELECT
sum(value) as disk_io_bytes
FROM signoz_metrics.distributed_samples_v4
WHERE
metric_name = 'system.disk.io'
AND unix_milli BETWEEN {start_ms} AND {end_ms}
"""
cpu_results = await self._query_clickhouse(cpu_query)
disk_results = await self._query_clickhouse(disk_query)
return {
"cpu": cpu_results[0] if cpu_results else {},
"disk": disk_results[0] if disk_results else {},
"time_range": {
"start": start_ms,
"end": end_ms,
},
}
# =============================================================================
# Singleton
# =============================================================================
_signoz_client: SignOzClient | None = None
def get_signoz_client() -> SignOzClient:
"""取得全域 SignOz Client 實例"""
global _signoz_client
if _signoz_client is None:
_signoz_client = SignOzClient()
return _signoz_client
async def close_signoz_client() -> None:
"""關閉 SignOz Client"""
global _signoz_client
if _signoz_client:
await _signoz_client.close()
_signoz_client = None