feat(api): Phase 15.2 Redis Trace Context 傳遞

實現 Redis Streams 跨服務追蹤零斷鏈:
- telemetry.py: 新增 get_trace_context() + restore_trace_context()
- webhooks.py: Producer 注入 _trace_id, _span_id 到 Redis
- signal_worker.py: Consumer 還原 Trace Context 建立子 Span

架構: API → Redis Streams → Worker 完整追蹤鏈
格式: W3C Trace Context (traceparent)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-26 00:40:20 +08:00
parent 1ac8965a7a
commit 0d31ccb911
3 changed files with 217 additions and 40 deletions

View File

@@ -34,6 +34,9 @@ from src.core.logging import get_logger
# Phase 6.1: Event Bus (Redis Streams)
from src.core.redis_client import get_redis
# Phase 15.2: Trace Context 傳遞
from src.core.telemetry import get_trace_context
from src.models.approval import (
ApprovalRequestCreate,
BlastRadius,
@@ -533,6 +536,8 @@ async def produce_signal_to_stream(signal: SignalPayload) -> str:
"""
將 Signal 寫入 Redis Stream
Phase 15.2: 注入 Trace Context 解決斷鏈問題
使用 XADD 命令:
- MAXLEN ~10000: 限制 Stream 長度,自動裁剪舊訊息
- *: 自動生成 Message ID
@@ -555,6 +560,12 @@ async def produce_signal_to_stream(signal: SignalPayload) -> str:
"received_at": now_taipei().isoformat(),
}
# Phase 15.2: 注入 Trace Context
trace_ctx = get_trace_context()
if trace_ctx:
signal_dict["_trace_id"] = trace_ctx.get("trace_id", "")
signal_dict["_span_id"] = trace_ctx.get("span_id", "")
# XADD 寫入 Stream
message_id = await redis_client.xadd(
SIGNAL_STREAM_KEY,
@@ -569,6 +580,7 @@ async def produce_signal_to_stream(signal: SignalPayload) -> str:
source=signal.source,
alert_name=signal.alert_name,
severity=signal.severity,
trace_id=trace_ctx.get("trace_id") if trace_ctx else None,
)
return message_id

View File

@@ -219,3 +219,121 @@ def get_current_trace_id() -> str | None:
return None
return format(ctx.trace_id, '032x')
def get_current_span_id() -> str | None:
"""
Get current span ID
Returns:
Span ID as hex string, or None if no active span
"""
span = trace.get_current_span()
if span is None:
return None
ctx = span.get_span_context()
if ctx is None or not ctx.is_valid:
return None
return format(ctx.span_id, '016x')
# =============================================================================
# Phase 15.2: Redis Trace Context Propagation
# =============================================================================
def get_trace_context() -> dict[str, str] | None:
"""
取得當前 Trace Context 用於 Redis Streams 注入
Phase 15.2: 解決 Redis Streams Trace 斷鏈問題
Returns:
dict with trace_id, span_id, or None if no active span
Usage (寫入 Redis):
payload = {**signal.dict(), "_trace_context": get_trace_context()}
await redis.xadd("stream:signals", payload)
"""
trace_id = get_current_trace_id()
span_id = get_current_span_id()
if not trace_id:
return None
return {
"trace_id": trace_id,
"span_id": span_id or "",
}
def restore_trace_context(trace_context: dict[str, str] | None):
"""
從 Redis 訊息還原 Trace Context 並建立新 Span
Phase 15.2: Worker 端 Context 重建
Args:
trace_context: 從 Redis 訊息取得的 _trace_context
Returns:
Context manager for the restored span
Usage (讀取 Redis):
message = await redis.xreadgroup(...)
trace_ctx = message.get("_trace_context")
with restore_trace_context(trace_ctx) as span:
# 處理邏輯,此處的 span 會繼承原始 trace_id
pass
"""
from contextlib import contextmanager
from opentelemetry.trace import SpanKind
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
tracer = get_tracer("awoooi.worker")
@contextmanager
def _context_manager():
if not trace_context or not trace_context.get("trace_id"):
# 沒有 trace context建立新的 span
with tracer.start_as_current_span(
"worker_process",
kind=SpanKind.CONSUMER,
) as span:
yield span
return
# 有 trace context嘗試還原
try:
# 使用 W3C Trace Context 格式建立 carrier
carrier = {
"traceparent": f"00-{trace_context['trace_id']}-{trace_context.get('span_id', '0' * 16)}-01"
}
# 從 carrier 提取 context
propagator = TraceContextTextMapPropagator()
ctx = propagator.extract(carrier=carrier)
# 建立子 span 繼承原始 trace
with tracer.start_as_current_span(
"worker_process",
context=ctx,
kind=SpanKind.CONSUMER,
) as span:
span.set_attribute("trace.restored", True)
span.set_attribute("trace.parent_trace_id", trace_context["trace_id"])
yield span
except Exception as e:
_logger.warning(f"Trace context restore failed: {e}, creating new span")
with tracer.start_as_current_span(
"worker_process",
kind=SpanKind.CONSUMER,
) as span:
span.set_attribute("trace.restore_failed", str(e))
yield span
return _context_manager()

View File

@@ -2,18 +2,25 @@
Signal Worker - Redis Streams Consumer
=======================================
Phase 6.1: Event Bus Implementation
Phase 15.2: Redis Trace Context Propagation (2026-03-26)
功能:
- XREADGROUP 消費 stream:awoooi_signals
- Signal → Incident 聚合邏輯 (Phase 6.3 實作)
- 失敗重試 + ACK 機制
- Graceful Shutdown
- **Phase 15.2**: Trace Context 還原 (零斷鏈觀測)
Redis Streams 概念:
- Stream: stream:awoooi_signals (訊息佇列)
- Consumer Group: awoooi_workers (消費者群組)
- Consumer: worker_{hostname} (單一消費者)
Trace Context 傳遞 (Phase 15.2):
- Producer (webhooks.py) 寫入 _trace_id, _span_id 到 Redis
- Consumer (此檔案) 還原 Context建立子 Span
- 實現 API → Redis → Worker 完整追蹤鏈
統帥鐵律:
- 使用 XREADGROUP 確保訊息只被處理一次
- 處理完成後必須 XACK
@@ -27,6 +34,7 @@ from typing import Any
import structlog
from src.core.redis_client import get_redis, get_worker_redis
from src.core.telemetry import restore_trace_context
from src.services.incident_engine import get_incident_engine
logger = structlog.get_logger(__name__)
@@ -195,57 +203,96 @@ class SignalWorker:
3. Incident 建立/更新 (聚合到同一 Incident)
4. GraphRAG 爆炸半徑分析
5. 雙層持久化 (Redis + PostgreSQL)
Phase 15.2: Trace Context 還原
- 從 Redis 訊息提取 _trace_id, _span_id
- 建立子 Span 繼承原始 Trace實現零斷鏈觀測
"""
redis_client = get_redis()
try:
logger.info(
"signal_received",
message_id=message_id,
source=data.get("source", "unknown"),
alert_name=data.get("alert_name", "unknown"),
severity=data.get("severity", "unknown"),
namespace=data.get("namespace", "default"),
target=data.get("target", "unknown"),
)
# =================================================================
# Phase 15.2: 提取 Trace Context (從 Producer 注入的欄位)
# =================================================================
trace_context = None
trace_id = data.pop("_trace_id", None) # pop 避免污染 signal data
span_id = data.pop("_span_id", None)
# Phase 6.3: 使用 IncidentEngine 處理訊號
# - 自動聚合相關告警到同一 Incident
# - GraphRAG 分析爆炸半徑
# - 雙層持久化
engine = get_incident_engine()
incident = await engine.process_signal(data)
if trace_id:
trace_context = {
"trace_id": trace_id,
"span_id": span_id or "",
}
# =================================================================
# 在還原的 Trace Context 中處理訊號
# =================================================================
with restore_trace_context(trace_context) as span:
try:
# 設置 Span 屬性 (用於 SignOz 搜尋)
span.set_attribute("messaging.system", "redis_streams")
span.set_attribute("messaging.destination", STREAM_KEY)
span.set_attribute("messaging.message_id", message_id)
span.set_attribute("signal.source", data.get("source", "unknown"))
span.set_attribute("signal.alert_name", data.get("alert_name", "unknown"))
span.set_attribute("signal.severity", data.get("severity", "unknown"))
if incident:
logger.info(
"signal_processed_by_engine",
"signal_received",
message_id=message_id,
incident_id=incident.incident_id,
severity=incident.severity.value,
signal_count=len(incident.signals),
affected_services=incident.affected_services,
persisted_to_pg=incident.persisted_to_pg,
)
else:
logger.warning(
"signal_processing_failed",
message_id=message_id,
signal_data=data,
source=data.get("source", "unknown"),
alert_name=data.get("alert_name", "unknown"),
severity=data.get("severity", "unknown"),
namespace=data.get("namespace", "default"),
target=data.get("target", "unknown"),
trace_restored=trace_context is not None,
)
# ACK: 確認訊息已處理
await redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id)
# Phase 6.3: 使用 IncidentEngine 處理訊號
# - 自動聚合相關告警到同一 Incident
# - GraphRAG 分析爆炸半徑
# - 雙層持久化
engine = get_incident_engine()
incident = await engine.process_signal(data)
logger.debug("signal_acked", message_id=message_id)
if incident:
# 記錄 Incident 到 Span
span.set_attribute("incident.id", incident.incident_id)
span.set_attribute("incident.severity", incident.severity.value)
span.set_attribute("incident.signal_count", len(incident.signals))
except Exception as e:
logger.exception(
"signal_process_error",
message_id=message_id,
error=str(e),
)
# 不 ACK訊息會留在 Pending List
# Phase 6.3 將實作 Pending List 清理機制
logger.info(
"signal_processed_by_engine",
message_id=message_id,
incident_id=incident.incident_id,
severity=incident.severity.value,
signal_count=len(incident.signals),
affected_services=incident.affected_services,
persisted_to_pg=incident.persisted_to_pg,
)
else:
span.set_attribute("signal.processing_failed", True)
logger.warning(
"signal_processing_failed",
message_id=message_id,
signal_data=data,
)
# ACK: 確認訊息已處理
await redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id)
span.set_attribute("messaging.acked", True)
logger.debug("signal_acked", message_id=message_id)
except Exception as e:
span.set_attribute("error", True)
span.set_attribute("error.message", str(e))
logger.exception(
"signal_process_error",
message_id=message_id,
error=str(e),
)
# 不 ACK訊息會留在 Pending List
# Phase 6.3 將實作 Pending List 清理機制
# =============================================================================