diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index 86314dc6..ac5fe207 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -34,6 +34,9 @@ from src.core.logging import get_logger # Phase 6.1: Event Bus (Redis Streams) from src.core.redis_client import get_redis + +# Phase 15.2: Trace Context 傳遞 +from src.core.telemetry import get_trace_context from src.models.approval import ( ApprovalRequestCreate, BlastRadius, @@ -533,6 +536,8 @@ async def produce_signal_to_stream(signal: SignalPayload) -> str: """ 將 Signal 寫入 Redis Stream + Phase 15.2: 注入 Trace Context 解決斷鏈問題 + 使用 XADD 命令: - MAXLEN ~10000: 限制 Stream 長度,自動裁剪舊訊息 - *: 自動生成 Message ID @@ -555,6 +560,12 @@ async def produce_signal_to_stream(signal: SignalPayload) -> str: "received_at": now_taipei().isoformat(), } + # Phase 15.2: 注入 Trace Context + trace_ctx = get_trace_context() + if trace_ctx: + signal_dict["_trace_id"] = trace_ctx.get("trace_id", "") + signal_dict["_span_id"] = trace_ctx.get("span_id", "") + # XADD 寫入 Stream message_id = await redis_client.xadd( SIGNAL_STREAM_KEY, @@ -569,6 +580,7 @@ async def produce_signal_to_stream(signal: SignalPayload) -> str: source=signal.source, alert_name=signal.alert_name, severity=signal.severity, + trace_id=trace_ctx.get("trace_id") if trace_ctx else None, ) return message_id diff --git a/apps/api/src/core/telemetry.py b/apps/api/src/core/telemetry.py index 6fdd66dd..82ca8a03 100644 --- a/apps/api/src/core/telemetry.py +++ b/apps/api/src/core/telemetry.py @@ -219,3 +219,121 @@ def get_current_trace_id() -> str | None: return None return format(ctx.trace_id, '032x') + + +def get_current_span_id() -> str | None: + """ + Get current span ID + + Returns: + Span ID as hex string, or None if no active span + """ + span = trace.get_current_span() + if span is None: + return None + + ctx = span.get_span_context() + if ctx is None or not ctx.is_valid: + return None + + return format(ctx.span_id, '016x') + + +# ============================================================================= +# Phase 15.2: Redis Trace Context Propagation +# ============================================================================= + +def get_trace_context() -> dict[str, str] | None: + """ + 取得當前 Trace Context 用於 Redis Streams 注入 + + Phase 15.2: 解決 Redis Streams Trace 斷鏈問題 + + Returns: + dict with trace_id, span_id, or None if no active span + + Usage (寫入 Redis): + payload = {**signal.dict(), "_trace_context": get_trace_context()} + await redis.xadd("stream:signals", payload) + """ + trace_id = get_current_trace_id() + span_id = get_current_span_id() + + if not trace_id: + return None + + return { + "trace_id": trace_id, + "span_id": span_id or "", + } + + +def restore_trace_context(trace_context: dict[str, str] | None): + """ + 從 Redis 訊息還原 Trace Context 並建立新 Span + + Phase 15.2: Worker 端 Context 重建 + + Args: + trace_context: 從 Redis 訊息取得的 _trace_context + + Returns: + Context manager for the restored span + + Usage (讀取 Redis): + message = await redis.xreadgroup(...) + trace_ctx = message.get("_trace_context") + + with restore_trace_context(trace_ctx) as span: + # 處理邏輯,此處的 span 會繼承原始 trace_id + pass + """ + from contextlib import contextmanager + + from opentelemetry.trace import SpanKind + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + + tracer = get_tracer("awoooi.worker") + + @contextmanager + def _context_manager(): + if not trace_context or not trace_context.get("trace_id"): + # 沒有 trace context,建立新的 span + with tracer.start_as_current_span( + "worker_process", + kind=SpanKind.CONSUMER, + ) as span: + yield span + return + + # 有 trace context,嘗試還原 + try: + # 使用 W3C Trace Context 格式建立 carrier + carrier = { + "traceparent": f"00-{trace_context['trace_id']}-{trace_context.get('span_id', '0' * 16)}-01" + } + + # 從 carrier 提取 context + propagator = TraceContextTextMapPropagator() + ctx = propagator.extract(carrier=carrier) + + # 建立子 span 繼承原始 trace + with tracer.start_as_current_span( + "worker_process", + context=ctx, + kind=SpanKind.CONSUMER, + ) as span: + span.set_attribute("trace.restored", True) + span.set_attribute("trace.parent_trace_id", trace_context["trace_id"]) + yield span + + except Exception as e: + _logger.warning(f"Trace context restore failed: {e}, creating new span") + with tracer.start_as_current_span( + "worker_process", + kind=SpanKind.CONSUMER, + ) as span: + span.set_attribute("trace.restore_failed", str(e)) + yield span + + return _context_manager() diff --git a/apps/api/src/workers/signal_worker.py b/apps/api/src/workers/signal_worker.py index 236abafd..523c55df 100644 --- a/apps/api/src/workers/signal_worker.py +++ b/apps/api/src/workers/signal_worker.py @@ -2,18 +2,25 @@ Signal Worker - Redis Streams Consumer ======================================= Phase 6.1: Event Bus Implementation +Phase 15.2: Redis Trace Context Propagation (2026-03-26) 功能: - XREADGROUP 消費 stream:awoooi_signals - Signal → Incident 聚合邏輯 (Phase 6.3 實作) - 失敗重試 + ACK 機制 - Graceful Shutdown +- **Phase 15.2**: Trace Context 還原 (零斷鏈觀測) Redis Streams 概念: - Stream: stream:awoooi_signals (訊息佇列) - Consumer Group: awoooi_workers (消費者群組) - Consumer: worker_{hostname} (單一消費者) +Trace Context 傳遞 (Phase 15.2): +- Producer (webhooks.py) 寫入 _trace_id, _span_id 到 Redis +- Consumer (此檔案) 還原 Context,建立子 Span +- 實現 API → Redis → Worker 完整追蹤鏈 + 統帥鐵律: - 使用 XREADGROUP 確保訊息只被處理一次 - 處理完成後必須 XACK @@ -27,6 +34,7 @@ from typing import Any import structlog from src.core.redis_client import get_redis, get_worker_redis +from src.core.telemetry import restore_trace_context from src.services.incident_engine import get_incident_engine logger = structlog.get_logger(__name__) @@ -195,57 +203,96 @@ class SignalWorker: 3. Incident 建立/更新 (聚合到同一 Incident) 4. GraphRAG 爆炸半徑分析 5. 雙層持久化 (Redis + PostgreSQL) + + Phase 15.2: Trace Context 還原 + - 從 Redis 訊息提取 _trace_id, _span_id + - 建立子 Span 繼承原始 Trace,實現零斷鏈觀測 """ redis_client = get_redis() - try: - logger.info( - "signal_received", - message_id=message_id, - source=data.get("source", "unknown"), - alert_name=data.get("alert_name", "unknown"), - severity=data.get("severity", "unknown"), - namespace=data.get("namespace", "default"), - target=data.get("target", "unknown"), - ) + # ================================================================= + # Phase 15.2: 提取 Trace Context (從 Producer 注入的欄位) + # ================================================================= + trace_context = None + trace_id = data.pop("_trace_id", None) # pop 避免污染 signal data + span_id = data.pop("_span_id", None) - # Phase 6.3: 使用 IncidentEngine 處理訊號 - # - 自動聚合相關告警到同一 Incident - # - GraphRAG 分析爆炸半徑 - # - 雙層持久化 - engine = get_incident_engine() - incident = await engine.process_signal(data) + if trace_id: + trace_context = { + "trace_id": trace_id, + "span_id": span_id or "", + } + + # ================================================================= + # 在還原的 Trace Context 中處理訊號 + # ================================================================= + with restore_trace_context(trace_context) as span: + try: + # 設置 Span 屬性 (用於 SignOz 搜尋) + span.set_attribute("messaging.system", "redis_streams") + span.set_attribute("messaging.destination", STREAM_KEY) + span.set_attribute("messaging.message_id", message_id) + span.set_attribute("signal.source", data.get("source", "unknown")) + span.set_attribute("signal.alert_name", data.get("alert_name", "unknown")) + span.set_attribute("signal.severity", data.get("severity", "unknown")) - if incident: logger.info( - "signal_processed_by_engine", + "signal_received", message_id=message_id, - incident_id=incident.incident_id, - severity=incident.severity.value, - signal_count=len(incident.signals), - affected_services=incident.affected_services, - persisted_to_pg=incident.persisted_to_pg, - ) - else: - logger.warning( - "signal_processing_failed", - message_id=message_id, - signal_data=data, + source=data.get("source", "unknown"), + alert_name=data.get("alert_name", "unknown"), + severity=data.get("severity", "unknown"), + namespace=data.get("namespace", "default"), + target=data.get("target", "unknown"), + trace_restored=trace_context is not None, ) - # ACK: 確認訊息已處理 - await redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id) + # Phase 6.3: 使用 IncidentEngine 處理訊號 + # - 自動聚合相關告警到同一 Incident + # - GraphRAG 分析爆炸半徑 + # - 雙層持久化 + engine = get_incident_engine() + incident = await engine.process_signal(data) - logger.debug("signal_acked", message_id=message_id) + if incident: + # 記錄 Incident 到 Span + span.set_attribute("incident.id", incident.incident_id) + span.set_attribute("incident.severity", incident.severity.value) + span.set_attribute("incident.signal_count", len(incident.signals)) - except Exception as e: - logger.exception( - "signal_process_error", - message_id=message_id, - error=str(e), - ) - # 不 ACK,訊息會留在 Pending List - # Phase 6.3 將實作 Pending List 清理機制 + logger.info( + "signal_processed_by_engine", + message_id=message_id, + incident_id=incident.incident_id, + severity=incident.severity.value, + signal_count=len(incident.signals), + affected_services=incident.affected_services, + persisted_to_pg=incident.persisted_to_pg, + ) + else: + span.set_attribute("signal.processing_failed", True) + logger.warning( + "signal_processing_failed", + message_id=message_id, + signal_data=data, + ) + + # ACK: 確認訊息已處理 + await redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id) + span.set_attribute("messaging.acked", True) + + logger.debug("signal_acked", message_id=message_id) + + except Exception as e: + span.set_attribute("error", True) + span.set_attribute("error.message", str(e)) + logger.exception( + "signal_process_error", + message_id=message_id, + error=str(e), + ) + # 不 ACK,訊息會留在 Pending List + # Phase 6.3 將實作 Pending List 清理機制 # =============================================================================