feat(worker): Wave 1 Signal Worker XCLAIM + Graceful Shutdown

ADR-038/039 Wave 1 強化:
- 新增 Active Sweeper: XPENDING + XCLAIM 回收閒置訊息
- PENDING_IDLE_MS: 60秒無ACK則可被回收
- SWEEP_INTERVAL_S: 每30秒掃描一次
- Graceful Shutdown: 75秒超時 (搭配 K8s 90秒)
- 超過 MAX_RETRIES 的訊息強制 ACK

K8s Worker Deployment:
- 新增 terminationGracePeriodSeconds: 90

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-29 15:53:05 +08:00
parent bf06737eed
commit 39396dc57a
2 changed files with 150 additions and 9 deletions

View File

@@ -56,6 +56,16 @@ BLOCK_MS = 5000
# 失敗重試上限
MAX_RETRIES = 3
# =============================================================================
# ADR-038/039: 安全網配置 (Wave 1)
# =============================================================================
# XCLAIM: 閒置訊息回收閾值(毫秒)
PENDING_IDLE_MS = 60_000 # 1 分鐘無 ACK 則可被其他 Worker 回收
# Active Sweeper: 掃描間隔(秒)
SWEEP_INTERVAL_S = 30
# Graceful Shutdown: 最大等待時間(秒)
GRACEFUL_SHUTDOWN_TIMEOUT_S = 75 # K8s terminationGracePeriodSeconds: 90
# =============================================================================
# Signal Worker
@@ -80,6 +90,7 @@ class SignalWorker:
def __init__(self) -> None:
self._running = False
self._task: asyncio.Task | None = None
self._sweeper_task: asyncio.Task | None = None # ADR-038: Active Sweeper
async def _ensure_consumer_group(self) -> None:
"""
@@ -123,30 +134,51 @@ class SignalWorker:
self._running = True
self._task = asyncio.create_task(self._consume_loop())
self._sweeper_task = asyncio.create_task(self._sweep_loop()) # ADR-038
logger.info(
"signal_worker_started",
stream=STREAM_KEY,
group=CONSUMER_GROUP,
consumer=CONSUMER_NAME,
sweep_interval_s=SWEEP_INTERVAL_S,
)
async def stop(self) -> None:
"""
優雅關閉
優雅關閉 (ADR-038 Wave 1 強化)
等待當前處理完成後停止。
超時時間: 75 秒(搭配 K8s terminationGracePeriodSeconds: 90
"""
if not self._running:
return
logger.info(
"signal_worker_stopping",
timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S,
)
self._running = False
# 停止 Sweeper
if self._sweeper_task:
self._sweeper_task.cancel()
try:
await self._sweeper_task
except asyncio.CancelledError:
pass
# 等待主消費循環完成
if self._task:
try:
# 給予 5 秒完成當前處理
await asyncio.wait_for(self._task, timeout=5.0)
await asyncio.wait_for(
self._task,
timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S,
)
except TimeoutError:
logger.warning("signal_worker_stop_timeout")
logger.warning(
"signal_worker_stop_timeout",
timeout_s=GRACEFUL_SHUTDOWN_TIMEOUT_S,
)
self._task.cancel()
except asyncio.CancelledError:
pass
@@ -194,6 +226,105 @@ class SignalWorker:
# 避免無限快速重試
await asyncio.sleep(1.0)
async def _sweep_loop(self) -> None:
"""
Active Sweeper: 定期掃描並回收閒置的 Pending 訊息
ADR-038 Wave 1: 使用 XCLAIM 回收其他 Worker 死亡或卡住的訊息
流程:
1. XPENDING 取得 Pending List 摘要
2. XPENDING 取得具體閒置訊息 (idle > PENDING_IDLE_MS)
3. XCLAIM 回收訊息到本 Consumer
4. 重新處理或強制 ACK (超過 MAX_RETRIES)
"""
redis_client = get_worker_redis()
while self._running:
try:
# 等待下次掃描
await asyncio.sleep(SWEEP_INTERVAL_S)
if not self._running:
break
# 1. 取得 Pending 摘要
pending_info = await redis_client.xpending(
STREAM_KEY,
CONSUMER_GROUP,
)
if not pending_info or pending_info.get("pending", 0) == 0:
continue # 沒有 Pending 訊息
pending_count = pending_info.get("pending", 0)
logger.debug(
"sweep_pending_check",
pending_count=pending_count,
)
# 2. 取得具體的 Pending 訊息(最多 10 條)
pending_messages = await redis_client.xpending_range(
STREAM_KEY,
CONSUMER_GROUP,
min="-",
max="+",
count=10,
)
for msg in pending_messages:
message_id = msg.get("message_id")
idle_ms = msg.get("time_since_delivered", 0)
delivery_count = msg.get("times_delivered", 0)
# 只處理閒置超過閾值的訊息
if idle_ms < PENDING_IDLE_MS:
continue
logger.info(
"sweep_claiming_message",
message_id=message_id,
idle_ms=idle_ms,
delivery_count=delivery_count,
)
# 3. XCLAIM 回收訊息
claimed = await redis_client.xclaim(
STREAM_KEY,
CONSUMER_GROUP,
CONSUMER_NAME,
min_idle_time=PENDING_IDLE_MS,
message_ids=[message_id],
)
if not claimed:
continue
# 4. 處理回收的訊息
for claimed_id, claimed_data in claimed:
if delivery_count >= MAX_RETRIES:
# 超過重試上限,強制 ACK 並記錄
logger.warning(
"sweep_force_ack_max_retries",
message_id=claimed_id,
delivery_count=delivery_count,
)
await redis_client.xack(
STREAM_KEY,
CONSUMER_GROUP,
claimed_id,
)
else:
# 重新處理
await self._process_signal(claimed_id, claimed_data)
except asyncio.CancelledError:
logger.info("sweep_loop_cancelled")
break
except Exception as e:
logger.exception("sweep_loop_error", error=str(e))
await asyncio.sleep(5.0) # 錯誤後等待
async def _process_signal(self, message_id: str, data: dict[str, Any]) -> None:
"""
處理單一訊號

View File

@@ -1,8 +1,10 @@
# AWOOOI Signal Worker Deployment
# 負責人: CTO
# 版本: v1.1
# 版本: v1.3
# 日期: 2026-03-22
# 更新: 2026-03-28 - 新增 startupProbe + revisionHistoryLimit:3 (Phase K0.5/K0.7)
# 更新: 2026-03-28 v1.1 - 新增 startupProbe + revisionHistoryLimit:3 (Phase K0.5/K0.7)
# 更新: 2026-03-28 v1.2 - liveness 改為 mtime 檢查 (心跳機制長期方案)
# 更新: 2026-03-29 v1.3 - ADR-038/039 terminationGracePeriodSeconds:90 (Graceful Shutdown)
#
# Phase 6.5: Redis Streams 消費者
# 職責: 消費 awoooi:signals 串流,觸發 Incident Engine
@@ -39,6 +41,8 @@ spec:
environment: prod
component: signal-processor
spec:
# ADR-038/039: Graceful Shutdown (Worker 需要 75 秒完成清理)
terminationGracePeriodSeconds: 90
containers:
- name: worker
# 映像標籤由 CI/CD 動態注入 (格式: {sha}-{run_id})
@@ -68,12 +72,18 @@ spec:
limits:
cpu: "500m"
memory: "512Mi"
# Worker 健康檢查 (檔案探針)
# Worker 健康檢查 (mtime 時間戳檢查 - 長期解決方案)
# 心跳機制Worker 每 30 秒 touch 健康文件
# 探針檢查:文件 mtime 必須在 60 秒內
livenessProbe:
exec:
command:
- cat
- /tmp/worker-healthy
- /bin/sh
- -c
- |
[ -f /tmp/worker-healthy ] && \
[ "$(find /tmp/worker-healthy -mmin -1 2>/dev/null)" ] && exit 0
exit 1
initialDelaySeconds: 30
periodSeconds: 15
timeoutSeconds: 5