653 lines
22 KiB
Python
653 lines
22 KiB
Python
"""
|
||
Signal Worker - Redis Streams Consumer
|
||
=======================================
|
||
Phase 6.1: Event Bus Implementation
|
||
Phase 15.2: Redis Trace Context Propagation (2026-03-26)
|
||
|
||
功能:
|
||
- XREADGROUP 消費 stream:awoooi_signals
|
||
- Signal → Incident 聚合邏輯 (Phase 6.3 實作)
|
||
- 失敗重試 + ACK 機制
|
||
- Graceful Shutdown
|
||
- **Phase 15.2**: Trace Context 還原 (零斷鏈觀測)
|
||
|
||
Redis Streams 概念:
|
||
- Stream: stream:awoooi_signals (訊息佇列)
|
||
- Consumer Group: awoooi_workers (消費者群組)
|
||
- Consumer: worker_{hostname} (單一消費者)
|
||
|
||
Trace Context 傳遞 (Phase 15.2):
|
||
- Producer (webhooks.py) 寫入 _trace_id, _span_id 到 Redis
|
||
- Consumer (此檔案) 還原 Context,建立子 Span
|
||
- 實現 API → Redis → Worker 完整追蹤鏈
|
||
|
||
統帥鐵律:
|
||
- 使用 XREADGROUP 確保訊息只被處理一次
|
||
- 處理完成後必須 XACK
|
||
- 失敗訊息進入 Pending List,需定期清理
|
||
"""
|
||
|
||
import asyncio
|
||
import socket
|
||
from typing import Any
|
||
|
||
import structlog
|
||
|
||
from src.core.redis_client import get_redis, get_worker_redis, init_worker_redis_pool
|
||
from src.core.telemetry import restore_trace_context
|
||
from src.services.incident_engine import get_incident_engine
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
|
||
# =============================================================================
|
||
# Constants
|
||
# =============================================================================
|
||
|
||
# 2026-03-27 ogt: 統一 Stream Key 格式 (P0 修復)
|
||
STREAM_KEY = "awoooi:signals"
|
||
CONSUMER_GROUP = "awoooi_workers"
|
||
CONSUMER_NAME = f"worker_{socket.gethostname()}"
|
||
|
||
# 每次讀取的訊息數量
|
||
BATCH_SIZE = 10
|
||
# 讀取超時 (毫秒) - 0 表示阻塞等待
|
||
BLOCK_MS = 5000
|
||
# 失敗重試上限
|
||
MAX_RETRIES = 3
|
||
|
||
# =============================================================================
|
||
# ADR-038/039: 安全網配置 (Wave 1)
|
||
# =============================================================================
|
||
# XCLAIM: 閒置訊息回收閾值(毫秒)
|
||
PENDING_IDLE_MS = 60_000 # 1 分鐘無 ACK 則可被其他 Worker 回收
|
||
# Active Sweeper: 掃描間隔(秒)
|
||
SWEEP_INTERVAL_S = 30
|
||
# Graceful Shutdown: 最大等待時間(秒)
|
||
GRACEFUL_SHUTDOWN_TIMEOUT_S = 75 # K8s terminationGracePeriodSeconds: 90
|
||
|
||
|
||
# =============================================================================
|
||
# Signal Worker
|
||
# =============================================================================
|
||
|
||
class SignalWorker:
|
||
"""
|
||
Redis Streams 訊號消費者
|
||
|
||
職責:
|
||
1. 從 stream:awoooi_signals 讀取訊號
|
||
2. 將訊號聚合為 Incident (Phase 6.3)
|
||
3. 更新 Working Memory (Redis)
|
||
4. 觸發決策引擎 (Phase 6.4)
|
||
|
||
使用方式:
|
||
worker = SignalWorker()
|
||
await worker.start() # 啟動消費循環
|
||
await worker.stop() # 優雅關閉
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
self._running = False
|
||
self._task: asyncio.Task | None = None
|
||
self._sweeper_task: asyncio.Task | None = None # ADR-038: Active Sweeper
|
||
|
||
async def _ensure_consumer_group(self) -> None:
|
||
"""
|
||
確保 Consumer Group 存在
|
||
|
||
XGROUP CREATE 如果 Group 已存在會報錯,
|
||
因此使用 MKSTREAM 選項並忽略 BUSYGROUP 錯誤。
|
||
"""
|
||
redis_client = get_redis()
|
||
try:
|
||
# MKSTREAM: 如果 Stream 不存在則建立
|
||
await redis_client.xgroup_create(
|
||
STREAM_KEY,
|
||
CONSUMER_GROUP,
|
||
id="0", # 從頭開始消費
|
||
mkstream=True,
|
||
)
|
||
logger.info(
|
||
"consumer_group_created",
|
||
stream=STREAM_KEY,
|
||
group=CONSUMER_GROUP,
|
||
)
|
||
except Exception as e:
|
||
# BUSYGROUP: Group 已存在,忽略
|
||
if "BUSYGROUP" in str(e):
|
||
logger.debug("consumer_group_exists", group=CONSUMER_GROUP)
|
||
else:
|
||
raise
|
||
|
||
async def start(self) -> None:
|
||
"""
|
||
啟動消費循環
|
||
|
||
在背景執行,不阻塞主執行緒。
|
||
"""
|
||
if self._running:
|
||
logger.warning("signal_worker_already_running")
|
||
return
|
||
|
||
await self._ensure_consumer_group()
|
||
await init_worker_redis_pool()
|
||
|
||
self._running = True
|
||
self._task = asyncio.create_task(self._consume_loop())
|
||
self._sweeper_task = asyncio.create_task(self._sweep_loop()) # ADR-038
|
||
logger.info(
|
||
"signal_worker_started",
|
||
stream=STREAM_KEY,
|
||
group=CONSUMER_GROUP,
|
||
consumer=CONSUMER_NAME,
|
||
sweep_interval_s=SWEEP_INTERVAL_S,
|
||
)
|
||
|
||
async def stop(self) -> None:
|
||
"""
|
||
優雅關閉 (ADR-038 Wave 1 強化)
|
||
|
||
等待當前處理完成後停止。
|
||
超時時間: 75 秒(搭配 K8s terminationGracePeriodSeconds: 90)
|
||
"""
|
||
if not self._running:
|
||
return
|
||
|
||
logger.info(
|
||
"signal_worker_stopping",
|
||
timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S,
|
||
)
|
||
self._running = False
|
||
|
||
# 停止 Sweeper
|
||
if self._sweeper_task:
|
||
self._sweeper_task.cancel()
|
||
try:
|
||
await self._sweeper_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# 等待主消費循環完成
|
||
if self._task:
|
||
try:
|
||
await asyncio.wait_for(
|
||
self._task,
|
||
timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S,
|
||
)
|
||
except TimeoutError:
|
||
logger.warning(
|
||
"signal_worker_stop_timeout",
|
||
timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S,
|
||
)
|
||
self._task.cancel()
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
logger.info("signal_worker_stopped")
|
||
|
||
async def _consume_loop(self) -> None:
|
||
"""
|
||
主消費循環
|
||
|
||
XREADGROUP 阻塞等待新訊息,處理後 XACK。
|
||
|
||
統帥鐵律 2026-03-23:
|
||
- 使用 Worker 專屬 Redis 連線 (無超時限制)
|
||
- 絕對禁止使用 API 共用的短超時連線
|
||
"""
|
||
redis_client = get_worker_redis() # Worker 專屬長連線
|
||
|
||
while self._running:
|
||
try:
|
||
# XREADGROUP: 從 Consumer Group 讀取訊息
|
||
# >: 只讀取新訊息 (不包含 Pending List)
|
||
messages = await redis_client.xreadgroup(
|
||
groupname=CONSUMER_GROUP,
|
||
consumername=CONSUMER_NAME,
|
||
streams={STREAM_KEY: ">"},
|
||
count=BATCH_SIZE,
|
||
block=BLOCK_MS,
|
||
)
|
||
|
||
if not messages:
|
||
# 超時,沒有新訊息
|
||
continue
|
||
|
||
# messages 格式: [[stream_name, [(id, data), ...]]]
|
||
for _stream_name, entries in messages:
|
||
for message_id, data in entries:
|
||
await self._process_signal(message_id, data)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("signal_worker_cancelled")
|
||
break
|
||
except Exception as e:
|
||
logger.exception("signal_worker_error", error=str(e))
|
||
# 避免無限快速重試
|
||
await asyncio.sleep(1.0)
|
||
|
||
async def _sweep_loop(self) -> None:
|
||
"""
|
||
Active Sweeper: 定期掃描並回收閒置的 Pending 訊息
|
||
|
||
ADR-038 Wave 1: 使用 XCLAIM 回收其他 Worker 死亡或卡住的訊息
|
||
|
||
流程:
|
||
1. XPENDING 取得 Pending List 摘要
|
||
2. XPENDING 取得具體閒置訊息 (idle > PENDING_IDLE_MS)
|
||
3. XCLAIM 回收訊息到本 Consumer
|
||
4. 重新處理或強制 ACK (超過 MAX_RETRIES)
|
||
"""
|
||
redis_client = get_worker_redis()
|
||
|
||
while self._running:
|
||
try:
|
||
# 等待下次掃描
|
||
await asyncio.sleep(SWEEP_INTERVAL_S)
|
||
|
||
if not self._running:
|
||
break
|
||
|
||
# 1. 取得 Pending 摘要
|
||
pending_info = await redis_client.xpending(
|
||
STREAM_KEY,
|
||
CONSUMER_GROUP,
|
||
)
|
||
|
||
if not pending_info or pending_info.get("pending", 0) == 0:
|
||
continue # 沒有 Pending 訊息
|
||
|
||
pending_count = pending_info.get("pending", 0)
|
||
logger.debug(
|
||
"sweep_pending_check",
|
||
pending_count=pending_count,
|
||
)
|
||
|
||
# 2. 取得具體的 Pending 訊息(最多 10 條)
|
||
pending_messages = await redis_client.xpending_range(
|
||
STREAM_KEY,
|
||
CONSUMER_GROUP,
|
||
min="-",
|
||
max="+",
|
||
count=10,
|
||
)
|
||
|
||
for msg in pending_messages:
|
||
message_id = msg.get("message_id")
|
||
idle_ms = msg.get("time_since_delivered", 0)
|
||
delivery_count = msg.get("times_delivered", 0)
|
||
|
||
# 只處理閒置超過閾值的訊息
|
||
if idle_ms < PENDING_IDLE_MS:
|
||
continue
|
||
|
||
logger.info(
|
||
"sweep_claiming_message",
|
||
message_id=message_id,
|
||
idle_ms=idle_ms,
|
||
delivery_count=delivery_count,
|
||
)
|
||
|
||
# 3. XCLAIM 回收訊息
|
||
claimed = await redis_client.xclaim(
|
||
STREAM_KEY,
|
||
CONSUMER_GROUP,
|
||
CONSUMER_NAME,
|
||
min_idle_time=PENDING_IDLE_MS,
|
||
message_ids=[message_id],
|
||
)
|
||
|
||
if not claimed:
|
||
continue
|
||
|
||
# 4. 處理回收的訊息
|
||
for claimed_id, claimed_data in claimed:
|
||
if delivery_count >= MAX_RETRIES:
|
||
# 超過重試上限,強制 ACK 並記錄
|
||
logger.warning(
|
||
"sweep_force_ack_max_retries",
|
||
message_id=claimed_id,
|
||
delivery_count=delivery_count,
|
||
)
|
||
await redis_client.xack(
|
||
STREAM_KEY,
|
||
CONSUMER_GROUP,
|
||
claimed_id,
|
||
)
|
||
else:
|
||
# 重新處理
|
||
await self._process_signal(claimed_id, claimed_data)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("sweep_loop_cancelled")
|
||
break
|
||
except Exception as e:
|
||
logger.exception("sweep_loop_error", error=str(e))
|
||
await asyncio.sleep(5.0) # 錯誤後等待
|
||
|
||
async def _process_signal(self, message_id: str, data: dict[str, Any]) -> None:
|
||
"""
|
||
處理單一訊號
|
||
|
||
Phase 6.3 核心邏輯:
|
||
1. 訊號去重 (fingerprint)
|
||
2. 訊號聚合 (30分鐘時間窗口 + 服務關聯)
|
||
3. Incident 建立/更新 (聚合到同一 Incident)
|
||
4. GraphRAG 爆炸半徑分析
|
||
5. 雙層持久化 (Redis + PostgreSQL)
|
||
|
||
Phase 15.2: Trace Context 還原
|
||
- 從 Redis 訊息提取 _trace_id, _span_id
|
||
- 建立子 Span 繼承原始 Trace,實現零斷鏈觀測
|
||
"""
|
||
redis_client = get_redis()
|
||
|
||
# =================================================================
|
||
# Phase 15.2: 提取 Trace Context (從 Producer 注入的欄位)
|
||
# =================================================================
|
||
trace_context = None
|
||
trace_id = data.pop("_trace_id", None) # pop 避免污染 signal data
|
||
span_id = data.pop("_span_id", None)
|
||
|
||
if trace_id:
|
||
trace_context = {
|
||
"trace_id": trace_id,
|
||
"span_id": span_id or "",
|
||
}
|
||
|
||
# =================================================================
|
||
# 在還原的 Trace Context 中處理訊號
|
||
# =================================================================
|
||
with restore_trace_context(trace_context) as span:
|
||
try:
|
||
# 設置 Span 屬性 (用於 SignOz 搜尋)
|
||
span.set_attribute("messaging.system", "redis_streams")
|
||
span.set_attribute("messaging.destination", STREAM_KEY)
|
||
span.set_attribute("messaging.message_id", message_id)
|
||
span.set_attribute("signal.source", data.get("source", "unknown"))
|
||
span.set_attribute("signal.alert_name", data.get("alert_name", "unknown"))
|
||
span.set_attribute("signal.severity", data.get("severity", "unknown"))
|
||
|
||
logger.info(
|
||
"signal_received",
|
||
message_id=message_id,
|
||
source=data.get("source", "unknown"),
|
||
alert_name=data.get("alert_name", "unknown"),
|
||
severity=data.get("severity", "unknown"),
|
||
namespace=data.get("namespace", "default"),
|
||
target=data.get("target", "unknown"),
|
||
trace_restored=trace_context is not None,
|
||
)
|
||
|
||
# Phase 6.3: 使用 IncidentEngine 處理訊號
|
||
# - 自動聚合相關告警到同一 Incident
|
||
# - GraphRAG 分析爆炸半徑
|
||
# - 雙層持久化
|
||
engine = get_incident_engine()
|
||
incident = await engine.process_signal(data)
|
||
|
||
if incident:
|
||
# 記錄 Incident 到 Span
|
||
span.set_attribute("incident.id", incident.incident_id)
|
||
span.set_attribute("incident.severity", incident.severity.value)
|
||
span.set_attribute("incident.signal_count", len(incident.signals))
|
||
|
||
logger.info(
|
||
"signal_processed_by_engine",
|
||
message_id=message_id,
|
||
incident_id=incident.incident_id,
|
||
severity=incident.severity.value,
|
||
signal_count=len(incident.signals),
|
||
affected_services=incident.affected_services,
|
||
persisted_to_pg=getattr(incident, "persisted_to_pg", False), # 2026-04-01 ogt: BrainIncident 無此欄位 (ADR-046 P2-01)
|
||
)
|
||
try:
|
||
from src.services.signal_observation_service import (
|
||
record_signal_worker_observation,
|
||
)
|
||
|
||
observation = await record_signal_worker_observation(
|
||
incident,
|
||
data,
|
||
message_id,
|
||
)
|
||
span.set_attribute("signal.observation_recorded", True)
|
||
span.set_attribute(
|
||
"signal.observation_raw_evidence",
|
||
bool(observation.get("raw_evidence_created")),
|
||
)
|
||
except Exception as exc:
|
||
span.set_attribute("signal.observation_recorded", False)
|
||
logger.warning(
|
||
"signal_observation_record_failed",
|
||
message_id=message_id,
|
||
incident_id=incident.incident_id,
|
||
error=str(exc),
|
||
)
|
||
else:
|
||
span.set_attribute("signal.processing_failed", True)
|
||
logger.warning(
|
||
"signal_processing_failed",
|
||
message_id=message_id,
|
||
signal_data=data,
|
||
)
|
||
|
||
# ACK: 確認訊息已處理
|
||
await redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id)
|
||
span.set_attribute("messaging.acked", True)
|
||
|
||
logger.debug("signal_acked", message_id=message_id)
|
||
|
||
except Exception as e:
|
||
span.set_attribute("error", True)
|
||
span.set_attribute("error.message", str(e))
|
||
logger.exception(
|
||
"signal_process_error",
|
||
message_id=message_id,
|
||
error=str(e),
|
||
)
|
||
# 不 ACK,訊息會留在 Pending List
|
||
# Phase 6.3 將實作 Pending List 清理機制
|
||
|
||
|
||
# =============================================================================
|
||
# Singleton
|
||
# =============================================================================
|
||
|
||
_signal_worker: SignalWorker | None = None
|
||
|
||
|
||
async def init_signal_worker() -> SignalWorker:
|
||
"""
|
||
初始化並啟動 Signal Worker
|
||
|
||
統帥鐵律: 在 Lifespan 啟動時調用
|
||
"""
|
||
global _signal_worker
|
||
|
||
if _signal_worker is not None:
|
||
return _signal_worker
|
||
|
||
_signal_worker = SignalWorker()
|
||
await _signal_worker.start()
|
||
return _signal_worker
|
||
|
||
|
||
async def close_signal_worker() -> None:
|
||
"""
|
||
關閉 Signal Worker
|
||
|
||
統帥鐵律: 在 Lifespan 關閉時調用
|
||
"""
|
||
global _signal_worker
|
||
|
||
if _signal_worker is not None:
|
||
await _signal_worker.stop()
|
||
_signal_worker = None
|
||
|
||
|
||
def get_signal_worker() -> SignalWorker:
|
||
"""
|
||
取得 Signal Worker 實例
|
||
|
||
Raises:
|
||
RuntimeError: 若 Worker 未初始化
|
||
"""
|
||
if _signal_worker is None:
|
||
raise RuntimeError(
|
||
"Signal worker not initialized. Call init_signal_worker() first."
|
||
)
|
||
return _signal_worker
|
||
|
||
|
||
# =============================================================================
|
||
# Standalone Entry Point (for K8s Worker Deployment)
|
||
# =============================================================================
|
||
|
||
async def _write_health_files() -> None:
|
||
"""
|
||
Write health check files for K8s probes.
|
||
"""
|
||
from pathlib import Path
|
||
|
||
Path("/tmp/worker-healthy").touch()
|
||
Path("/tmp/worker-ready").touch()
|
||
logger.info("health_files_written")
|
||
|
||
|
||
async def _heartbeat_loop(shutdown_event: asyncio.Event) -> None:
|
||
"""
|
||
心跳循環:定期更新健康檢查文件的時間戳
|
||
|
||
長期解決方案 (2026-03-28):
|
||
- 每 30 秒 touch 健康文件,確保 mtime 更新
|
||
- K8s liveness probe 可檢查 mtime 是否在 60 秒內
|
||
- 防止 Worker 卡住但健康文件仍存在的假陽性
|
||
|
||
@author Claude Code (首席架構師)
|
||
@version 1.0.0
|
||
@date 2026-03-28 (台北時間)
|
||
"""
|
||
from pathlib import Path
|
||
|
||
HEARTBEAT_INTERVAL = 30 # 秒
|
||
|
||
while not shutdown_event.is_set():
|
||
try:
|
||
Path("/tmp/worker-healthy").touch()
|
||
logger.debug("heartbeat_tick")
|
||
except Exception as e:
|
||
logger.warning("heartbeat_error", error=str(e))
|
||
|
||
# 等待下次心跳或收到關閉信號
|
||
try:
|
||
await asyncio.wait_for(
|
||
shutdown_event.wait(),
|
||
timeout=HEARTBEAT_INTERVAL
|
||
)
|
||
break # 收到關閉信號
|
||
except TimeoutError:
|
||
continue # 超時,繼續下次心跳
|
||
|
||
|
||
async def _main() -> None:
|
||
"""
|
||
Standalone worker main function.
|
||
|
||
用於 K8s Deployment 直接執行:
|
||
python -m src.workers.signal_worker
|
||
"""
|
||
import signal
|
||
import sys
|
||
|
||
# Initialize settings first (loads env vars)
|
||
from src.core.config import settings # noqa: F401
|
||
from src.core.redis_client import (
|
||
close_redis_pool,
|
||
close_worker_redis_pool,
|
||
init_redis_pool,
|
||
init_worker_redis_pool,
|
||
)
|
||
from src.db.base import close_db, init_db
|
||
|
||
logger.info(
|
||
"signal_worker_standalone_starting",
|
||
environment=settings.ENVIRONMENT,
|
||
redis_url=settings.REDIS_URL.split("@")[-1] if settings.REDIS_URL else "N/A",
|
||
database_url=settings.DATABASE_URL.split("@")[-1] if settings.DATABASE_URL else "N/A",
|
||
)
|
||
|
||
# Initialize Redis (API pool + Worker 專屬長連線池)
|
||
await init_redis_pool()
|
||
await init_worker_redis_pool() # Worker 專屬,無超時限制
|
||
|
||
# Initialize PostgreSQL (Episodic Memory) - 確保 incidents 表存在
|
||
try:
|
||
await init_db()
|
||
logger.info("postgresql_initialized", status="ok")
|
||
except Exception as e:
|
||
logger.error(
|
||
"postgresql_init_failed",
|
||
error=str(e),
|
||
message="Episodic Memory (DB) will be unavailable - incidents won't persist",
|
||
)
|
||
|
||
try:
|
||
from src.plugins.mcp.providers import register_all_providers
|
||
from src.services.mcp_tool_registry import init_mcp_tool_registry
|
||
|
||
register_all_providers()
|
||
await init_mcp_tool_registry()
|
||
logger.info("signal_worker_mcp_runtime_initialized")
|
||
except Exception as e:
|
||
logger.warning("signal_worker_mcp_runtime_init_failed", error=str(e))
|
||
|
||
# Write health files for K8s probes
|
||
await _write_health_files()
|
||
|
||
# Initialize and start worker
|
||
worker = await init_signal_worker()
|
||
|
||
# Setup graceful shutdown
|
||
shutdown_event = asyncio.Event()
|
||
|
||
def _shutdown_handler(signum: int, frame: object) -> None:
|
||
logger.info("shutdown_signal_received", signal=signum)
|
||
shutdown_event.set()
|
||
|
||
signal.signal(signal.SIGTERM, _shutdown_handler)
|
||
signal.signal(signal.SIGINT, _shutdown_handler)
|
||
|
||
# 啟動心跳循環 (長期解決方案 - 定期更新健康文件 mtime)
|
||
heartbeat_task = asyncio.create_task(_heartbeat_loop(shutdown_event))
|
||
|
||
# Wait for shutdown signal
|
||
await shutdown_event.wait()
|
||
|
||
# 停止心跳
|
||
heartbeat_task.cancel()
|
||
try:
|
||
await heartbeat_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# Graceful shutdown
|
||
logger.info("signal_worker_shutting_down")
|
||
await worker.stop()
|
||
await close_worker_redis_pool() # 關閉 Worker 專屬連線
|
||
await close_redis_pool()
|
||
await close_db() # 關閉 PostgreSQL 連線池
|
||
|
||
# Remove health files
|
||
from pathlib import Path
|
||
Path("/tmp/worker-healthy").unlink(missing_ok=True)
|
||
Path("/tmp/worker-ready").unlink(missing_ok=True)
|
||
|
||
logger.info("signal_worker_shutdown_complete")
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(_main())
|