fix(api): serialize startup bootstrap ddl
This commit is contained in:
119
apps/api/tests/test_runtime_bootstrap_guards.py
Normal file
119
apps/api/tests/test_runtime_bootstrap_guards.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""
|
||||
Runtime bootstrap guard tests.
|
||||
|
||||
這組測試鎖住 production rollout 曾踩到的兩個啟動序問題:
|
||||
- API replicas 同時執行 DB bootstrap DDL 時必須有 advisory lock。
|
||||
- SignalWorker 建立 Redis Stream 背景 task 前,必須先初始化 worker Redis pool。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Awaitable
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class _FakeLockConnection:
|
||||
def __init__(self) -> None:
|
||||
self.statements: list[str] = []
|
||||
|
||||
async def __aenter__(self) -> _FakeLockConnection:
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_exc: object) -> None:
|
||||
return None
|
||||
|
||||
async def execute(self, statement: object, params: dict[str, str] | None = None) -> None:
|
||||
self.statements.append(str(statement))
|
||||
assert params == {"lock_name": "awoooi:init_db:ddl"}
|
||||
|
||||
|
||||
class _FakeEngine:
|
||||
def __init__(self) -> None:
|
||||
self.lock_conn = _FakeLockConnection()
|
||||
|
||||
def connect(self) -> _FakeLockConnection:
|
||||
return self.lock_conn
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_db_serializes_bootstrap_ddl_with_advisory_lock(monkeypatch):
|
||||
from src.db import base as db_base
|
||||
|
||||
fake_engine = _FakeEngine()
|
||||
calls: list[object] = []
|
||||
|
||||
async def fake_run_init_db_ddl(engine: object) -> None:
|
||||
calls.append(engine)
|
||||
|
||||
monkeypatch.setattr(db_base, "get_engine", lambda: fake_engine)
|
||||
monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl)
|
||||
|
||||
await db_base.init_db()
|
||||
|
||||
assert calls == [fake_engine]
|
||||
assert "pg_advisory_lock" in fake_engine.lock_conn.statements[0]
|
||||
assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_db_releases_bootstrap_lock_when_ddl_fails(monkeypatch):
|
||||
from src.db import base as db_base
|
||||
|
||||
fake_engine = _FakeEngine()
|
||||
|
||||
async def fake_run_init_db_ddl(_engine: object) -> None:
|
||||
raise RuntimeError("ddl failed")
|
||||
|
||||
monkeypatch.setattr(db_base, "get_engine", lambda: fake_engine)
|
||||
monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl)
|
||||
|
||||
with pytest.raises(RuntimeError, match="ddl failed"):
|
||||
await db_base.init_db()
|
||||
|
||||
assert "pg_advisory_lock" in fake_engine.lock_conn.statements[0]
|
||||
assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signal_worker_initializes_worker_redis_pool_before_tasks(monkeypatch):
|
||||
from src.workers import signal_worker
|
||||
|
||||
events: list[str] = []
|
||||
worker = signal_worker.SignalWorker()
|
||||
|
||||
async def fake_ensure_consumer_group() -> None:
|
||||
events.append("consumer_group")
|
||||
|
||||
async def fake_init_worker_redis_pool() -> None:
|
||||
events.append("worker_redis_pool")
|
||||
|
||||
def fake_create_task(coro: Awaitable[Any]) -> object:
|
||||
events.append("task")
|
||||
coro.close()
|
||||
|
||||
class _FakeTask:
|
||||
def cancel(self) -> None:
|
||||
return None
|
||||
|
||||
return _FakeTask()
|
||||
|
||||
monkeypatch.setattr(worker, "_ensure_consumer_group", fake_ensure_consumer_group)
|
||||
monkeypatch.setattr(signal_worker, "init_worker_redis_pool", fake_init_worker_redis_pool)
|
||||
monkeypatch.setattr(signal_worker.asyncio, "create_task", fake_create_task)
|
||||
|
||||
await worker.start()
|
||||
|
||||
assert events == ["consumer_group", "worker_redis_pool", "task", "task"]
|
||||
|
||||
|
||||
def test_api_lifespan_closes_worker_redis_pool_after_signal_worker() -> None:
|
||||
import inspect
|
||||
|
||||
from src import main as api_main
|
||||
|
||||
source = inspect.getsource(api_main.lifespan)
|
||||
assert "close_worker_redis_pool" in source
|
||||
assert source.index("close_signal_worker") < source.index("close_worker_redis_pool")
|
||||
assert source.index("close_worker_redis_pool") < source.index("close_redis_pool")
|
||||
Reference in New Issue
Block a user