Files
awoooi/apps/api/src/workers/signal_worker.py
Your Name 9b01f1fa46
All checks were successful
CD Pipeline / tests (push) Successful in 5m29s
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / build-and-deploy (push) Successful in 4m9s
CD Pipeline / post-deploy-checks (push) Successful in 1m57s
fix(api): serialize startup bootstrap ddl
2026-05-24 17:10:26 +08:00

653 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Signal Worker - Redis Streams Consumer
=======================================
Phase 6.1: Event Bus Implementation
Phase 15.2: Redis Trace Context Propagation (2026-03-26)
功能:
- XREADGROUP 消費 stream:awoooi_signals
- Signal → Incident 聚合邏輯 (Phase 6.3 實作)
- 失敗重試 + ACK 機制
- Graceful Shutdown
- **Phase 15.2**: Trace Context 還原 (零斷鏈觀測)
Redis Streams 概念:
- Stream: stream:awoooi_signals (訊息佇列)
- Consumer Group: awoooi_workers (消費者群組)
- Consumer: worker_{hostname} (單一消費者)
Trace Context 傳遞 (Phase 15.2):
- Producer (webhooks.py) 寫入 _trace_id, _span_id 到 Redis
- Consumer (此檔案) 還原 Context建立子 Span
- 實現 API → Redis → Worker 完整追蹤鏈
統帥鐵律:
- 使用 XREADGROUP 確保訊息只被處理一次
- 處理完成後必須 XACK
- 失敗訊息進入 Pending List需定期清理
"""
import asyncio
import socket
from typing import Any
import structlog
from src.core.redis_client import get_redis, get_worker_redis, init_worker_redis_pool
from src.core.telemetry import restore_trace_context
from src.services.incident_engine import get_incident_engine
logger = structlog.get_logger(__name__)
# =============================================================================
# Constants
# =============================================================================
# 2026-03-27 ogt: 統一 Stream Key 格式 (P0 修復)
STREAM_KEY = "awoooi:signals"
CONSUMER_GROUP = "awoooi_workers"
CONSUMER_NAME = f"worker_{socket.gethostname()}"
# 每次讀取的訊息數量
BATCH_SIZE = 10
# 讀取超時 (毫秒) - 0 表示阻塞等待
BLOCK_MS = 5000
# 失敗重試上限
MAX_RETRIES = 3
# =============================================================================
# ADR-038/039: 安全網配置 (Wave 1)
# =============================================================================
# XCLAIM: 閒置訊息回收閾值(毫秒)
PENDING_IDLE_MS = 60_000 # 1 分鐘無 ACK 則可被其他 Worker 回收
# Active Sweeper: 掃描間隔(秒)
SWEEP_INTERVAL_S = 30
# Graceful Shutdown: 最大等待時間(秒)
GRACEFUL_SHUTDOWN_TIMEOUT_S = 75 # K8s terminationGracePeriodSeconds: 90
# =============================================================================
# Signal Worker
# =============================================================================
class SignalWorker:
"""
Redis Streams 訊號消費者
職責:
1. 從 stream:awoooi_signals 讀取訊號
2. 將訊號聚合為 Incident (Phase 6.3)
3. 更新 Working Memory (Redis)
4. 觸發決策引擎 (Phase 6.4)
使用方式:
worker = SignalWorker()
await worker.start() # 啟動消費循環
await worker.stop() # 優雅關閉
"""
def __init__(self) -> None:
self._running = False
self._task: asyncio.Task | None = None
self._sweeper_task: asyncio.Task | None = None # ADR-038: Active Sweeper
async def _ensure_consumer_group(self) -> None:
"""
確保 Consumer Group 存在
XGROUP CREATE 如果 Group 已存在會報錯,
因此使用 MKSTREAM 選項並忽略 BUSYGROUP 錯誤。
"""
redis_client = get_redis()
try:
# MKSTREAM: 如果 Stream 不存在則建立
await redis_client.xgroup_create(
STREAM_KEY,
CONSUMER_GROUP,
id="0", # 從頭開始消費
mkstream=True,
)
logger.info(
"consumer_group_created",
stream=STREAM_KEY,
group=CONSUMER_GROUP,
)
except Exception as e:
# BUSYGROUP: Group 已存在,忽略
if "BUSYGROUP" in str(e):
logger.debug("consumer_group_exists", group=CONSUMER_GROUP)
else:
raise
async def start(self) -> None:
"""
啟動消費循環
在背景執行,不阻塞主執行緒。
"""
if self._running:
logger.warning("signal_worker_already_running")
return
await self._ensure_consumer_group()
await init_worker_redis_pool()
self._running = True
self._task = asyncio.create_task(self._consume_loop())
self._sweeper_task = asyncio.create_task(self._sweep_loop()) # ADR-038
logger.info(
"signal_worker_started",
stream=STREAM_KEY,
group=CONSUMER_GROUP,
consumer=CONSUMER_NAME,
sweep_interval_s=SWEEP_INTERVAL_S,
)
async def stop(self) -> None:
"""
優雅關閉 (ADR-038 Wave 1 強化)
等待當前處理完成後停止。
超時時間: 75 秒(搭配 K8s terminationGracePeriodSeconds: 90
"""
if not self._running:
return
logger.info(
"signal_worker_stopping",
timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S,
)
self._running = False
# 停止 Sweeper
if self._sweeper_task:
self._sweeper_task.cancel()
try:
await self._sweeper_task
except asyncio.CancelledError:
pass
# 等待主消費循環完成
if self._task:
try:
await asyncio.wait_for(
self._task,
timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S,
)
except TimeoutError:
logger.warning(
"signal_worker_stop_timeout",
timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S,
)
self._task.cancel()
except asyncio.CancelledError:
pass
logger.info("signal_worker_stopped")
async def _consume_loop(self) -> None:
"""
主消費循環
XREADGROUP 阻塞等待新訊息,處理後 XACK。
統帥鐵律 2026-03-23:
- 使用 Worker 專屬 Redis 連線 (無超時限制)
- 絕對禁止使用 API 共用的短超時連線
"""
redis_client = get_worker_redis() # Worker 專屬長連線
while self._running:
try:
# XREADGROUP: 從 Consumer Group 讀取訊息
# >: 只讀取新訊息 (不包含 Pending List)
messages = await redis_client.xreadgroup(
groupname=CONSUMER_GROUP,
consumername=CONSUMER_NAME,
streams={STREAM_KEY: ">"},
count=BATCH_SIZE,
block=BLOCK_MS,
)
if not messages:
# 超時,沒有新訊息
continue
# messages 格式: [[stream_name, [(id, data), ...]]]
for _stream_name, entries in messages:
for message_id, data in entries:
await self._process_signal(message_id, data)
except asyncio.CancelledError:
logger.info("signal_worker_cancelled")
break
except Exception as e:
logger.exception("signal_worker_error", error=str(e))
# 避免無限快速重試
await asyncio.sleep(1.0)
async def _sweep_loop(self) -> None:
"""
Active Sweeper: 定期掃描並回收閒置的 Pending 訊息
ADR-038 Wave 1: 使用 XCLAIM 回收其他 Worker 死亡或卡住的訊息
流程:
1. XPENDING 取得 Pending List 摘要
2. XPENDING 取得具體閒置訊息 (idle > PENDING_IDLE_MS)
3. XCLAIM 回收訊息到本 Consumer
4. 重新處理或強制 ACK (超過 MAX_RETRIES)
"""
redis_client = get_worker_redis()
while self._running:
try:
# 等待下次掃描
await asyncio.sleep(SWEEP_INTERVAL_S)
if not self._running:
break
# 1. 取得 Pending 摘要
pending_info = await redis_client.xpending(
STREAM_KEY,
CONSUMER_GROUP,
)
if not pending_info or pending_info.get("pending", 0) == 0:
continue # 沒有 Pending 訊息
pending_count = pending_info.get("pending", 0)
logger.debug(
"sweep_pending_check",
pending_count=pending_count,
)
# 2. 取得具體的 Pending 訊息(最多 10 條)
pending_messages = await redis_client.xpending_range(
STREAM_KEY,
CONSUMER_GROUP,
min="-",
max="+",
count=10,
)
for msg in pending_messages:
message_id = msg.get("message_id")
idle_ms = msg.get("time_since_delivered", 0)
delivery_count = msg.get("times_delivered", 0)
# 只處理閒置超過閾值的訊息
if idle_ms < PENDING_IDLE_MS:
continue
logger.info(
"sweep_claiming_message",
message_id=message_id,
idle_ms=idle_ms,
delivery_count=delivery_count,
)
# 3. XCLAIM 回收訊息
claimed = await redis_client.xclaim(
STREAM_KEY,
CONSUMER_GROUP,
CONSUMER_NAME,
min_idle_time=PENDING_IDLE_MS,
message_ids=[message_id],
)
if not claimed:
continue
# 4. 處理回收的訊息
for claimed_id, claimed_data in claimed:
if delivery_count >= MAX_RETRIES:
# 超過重試上限,強制 ACK 並記錄
logger.warning(
"sweep_force_ack_max_retries",
message_id=claimed_id,
delivery_count=delivery_count,
)
await redis_client.xack(
STREAM_KEY,
CONSUMER_GROUP,
claimed_id,
)
else:
# 重新處理
await self._process_signal(claimed_id, claimed_data)
except asyncio.CancelledError:
logger.info("sweep_loop_cancelled")
break
except Exception as e:
logger.exception("sweep_loop_error", error=str(e))
await asyncio.sleep(5.0) # 錯誤後等待
async def _process_signal(self, message_id: str, data: dict[str, Any]) -> None:
"""
處理單一訊號
Phase 6.3 核心邏輯:
1. 訊號去重 (fingerprint)
2. 訊號聚合 (30分鐘時間窗口 + 服務關聯)
3. Incident 建立/更新 (聚合到同一 Incident)
4. GraphRAG 爆炸半徑分析
5. 雙層持久化 (Redis + PostgreSQL)
Phase 15.2: Trace Context 還原
- 從 Redis 訊息提取 _trace_id, _span_id
- 建立子 Span 繼承原始 Trace實現零斷鏈觀測
"""
redis_client = get_redis()
# =================================================================
# Phase 15.2: 提取 Trace Context (從 Producer 注入的欄位)
# =================================================================
trace_context = None
trace_id = data.pop("_trace_id", None) # pop 避免污染 signal data
span_id = data.pop("_span_id", None)
if trace_id:
trace_context = {
"trace_id": trace_id,
"span_id": span_id or "",
}
# =================================================================
# 在還原的 Trace Context 中處理訊號
# =================================================================
with restore_trace_context(trace_context) as span:
try:
# 設置 Span 屬性 (用於 SignOz 搜尋)
span.set_attribute("messaging.system", "redis_streams")
span.set_attribute("messaging.destination", STREAM_KEY)
span.set_attribute("messaging.message_id", message_id)
span.set_attribute("signal.source", data.get("source", "unknown"))
span.set_attribute("signal.alert_name", data.get("alert_name", "unknown"))
span.set_attribute("signal.severity", data.get("severity", "unknown"))
logger.info(
"signal_received",
message_id=message_id,
source=data.get("source", "unknown"),
alert_name=data.get("alert_name", "unknown"),
severity=data.get("severity", "unknown"),
namespace=data.get("namespace", "default"),
target=data.get("target", "unknown"),
trace_restored=trace_context is not None,
)
# Phase 6.3: 使用 IncidentEngine 處理訊號
# - 自動聚合相關告警到同一 Incident
# - GraphRAG 分析爆炸半徑
# - 雙層持久化
engine = get_incident_engine()
incident = await engine.process_signal(data)
if incident:
# 記錄 Incident 到 Span
span.set_attribute("incident.id", incident.incident_id)
span.set_attribute("incident.severity", incident.severity.value)
span.set_attribute("incident.signal_count", len(incident.signals))
logger.info(
"signal_processed_by_engine",
message_id=message_id,
incident_id=incident.incident_id,
severity=incident.severity.value,
signal_count=len(incident.signals),
affected_services=incident.affected_services,
persisted_to_pg=getattr(incident, "persisted_to_pg", False), # 2026-04-01 ogt: BrainIncident 無此欄位 (ADR-046 P2-01)
)
try:
from src.services.signal_observation_service import (
record_signal_worker_observation,
)
observation = await record_signal_worker_observation(
incident,
data,
message_id,
)
span.set_attribute("signal.observation_recorded", True)
span.set_attribute(
"signal.observation_raw_evidence",
bool(observation.get("raw_evidence_created")),
)
except Exception as exc:
span.set_attribute("signal.observation_recorded", False)
logger.warning(
"signal_observation_record_failed",
message_id=message_id,
incident_id=incident.incident_id,
error=str(exc),
)
else:
span.set_attribute("signal.processing_failed", True)
logger.warning(
"signal_processing_failed",
message_id=message_id,
signal_data=data,
)
# ACK: 確認訊息已處理
await redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id)
span.set_attribute("messaging.acked", True)
logger.debug("signal_acked", message_id=message_id)
except Exception as e:
span.set_attribute("error", True)
span.set_attribute("error.message", str(e))
logger.exception(
"signal_process_error",
message_id=message_id,
error=str(e),
)
# 不 ACK訊息會留在 Pending List
# Phase 6.3 將實作 Pending List 清理機制
# =============================================================================
# Singleton
# =============================================================================
_signal_worker: SignalWorker | None = None
async def init_signal_worker() -> SignalWorker:
"""
初始化並啟動 Signal Worker
統帥鐵律: 在 Lifespan 啟動時調用
"""
global _signal_worker
if _signal_worker is not None:
return _signal_worker
_signal_worker = SignalWorker()
await _signal_worker.start()
return _signal_worker
async def close_signal_worker() -> None:
"""
關閉 Signal Worker
統帥鐵律: 在 Lifespan 關閉時調用
"""
global _signal_worker
if _signal_worker is not None:
await _signal_worker.stop()
_signal_worker = None
def get_signal_worker() -> SignalWorker:
"""
取得 Signal Worker 實例
Raises:
RuntimeError: 若 Worker 未初始化
"""
if _signal_worker is None:
raise RuntimeError(
"Signal worker not initialized. Call init_signal_worker() first."
)
return _signal_worker
# =============================================================================
# Standalone Entry Point (for K8s Worker Deployment)
# =============================================================================
async def _write_health_files() -> None:
"""
Write health check files for K8s probes.
"""
from pathlib import Path
Path("/tmp/worker-healthy").touch()
Path("/tmp/worker-ready").touch()
logger.info("health_files_written")
async def _heartbeat_loop(shutdown_event: asyncio.Event) -> None:
"""
心跳循環:定期更新健康檢查文件的時間戳
長期解決方案 (2026-03-28):
- 每 30 秒 touch 健康文件,確保 mtime 更新
- K8s liveness probe 可檢查 mtime 是否在 60 秒內
- 防止 Worker 卡住但健康文件仍存在的假陽性
@author Claude Code (首席架構師)
@version 1.0.0
@date 2026-03-28 (台北時間)
"""
from pathlib import Path
HEARTBEAT_INTERVAL = 30 # 秒
while not shutdown_event.is_set():
try:
Path("/tmp/worker-healthy").touch()
logger.debug("heartbeat_tick")
except Exception as e:
logger.warning("heartbeat_error", error=str(e))
# 等待下次心跳或收到關閉信號
try:
await asyncio.wait_for(
shutdown_event.wait(),
timeout=HEARTBEAT_INTERVAL
)
break # 收到關閉信號
except TimeoutError:
continue # 超時,繼續下次心跳
async def _main() -> None:
"""
Standalone worker main function.
用於 K8s Deployment 直接執行:
python -m src.workers.signal_worker
"""
import signal
import sys
# Initialize settings first (loads env vars)
from src.core.config import settings # noqa: F401
from src.core.redis_client import (
close_redis_pool,
close_worker_redis_pool,
init_redis_pool,
init_worker_redis_pool,
)
from src.db.base import close_db, init_db
logger.info(
"signal_worker_standalone_starting",
environment=settings.ENVIRONMENT,
redis_url=settings.REDIS_URL.split("@")[-1] if settings.REDIS_URL else "N/A",
database_url=settings.DATABASE_URL.split("@")[-1] if settings.DATABASE_URL else "N/A",
)
# Initialize Redis (API pool + Worker 專屬長連線池)
await init_redis_pool()
await init_worker_redis_pool() # Worker 專屬,無超時限制
# Initialize PostgreSQL (Episodic Memory) - 確保 incidents 表存在
try:
await init_db()
logger.info("postgresql_initialized", status="ok")
except Exception as e:
logger.error(
"postgresql_init_failed",
error=str(e),
message="Episodic Memory (DB) will be unavailable - incidents won't persist",
)
try:
from src.plugins.mcp.providers import register_all_providers
from src.services.mcp_tool_registry import init_mcp_tool_registry
register_all_providers()
await init_mcp_tool_registry()
logger.info("signal_worker_mcp_runtime_initialized")
except Exception as e:
logger.warning("signal_worker_mcp_runtime_init_failed", error=str(e))
# Write health files for K8s probes
await _write_health_files()
# Initialize and start worker
worker = await init_signal_worker()
# Setup graceful shutdown
shutdown_event = asyncio.Event()
def _shutdown_handler(signum: int, frame: object) -> None:
logger.info("shutdown_signal_received", signal=signum)
shutdown_event.set()
signal.signal(signal.SIGTERM, _shutdown_handler)
signal.signal(signal.SIGINT, _shutdown_handler)
# 啟動心跳循環 (長期解決方案 - 定期更新健康文件 mtime)
heartbeat_task = asyncio.create_task(_heartbeat_loop(shutdown_event))
# Wait for shutdown signal
await shutdown_event.wait()
# 停止心跳
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
# Graceful shutdown
logger.info("signal_worker_shutting_down")
await worker.stop()
await close_worker_redis_pool() # 關閉 Worker 專屬連線
await close_redis_pool()
await close_db() # 關閉 PostgreSQL 連線池
# Remove health files
from pathlib import Path
Path("/tmp/worker-healthy").unlink(missing_ok=True)
Path("/tmp/worker-ready").unlink(missing_ok=True)
logger.info("signal_worker_shutdown_complete")
sys.exit(0)
if __name__ == "__main__":
asyncio.run(_main())