fix(api): serialize startup bootstrap ddl
All checks were successful
CD Pipeline / tests (push) Successful in 5m29s
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / build-and-deploy (push) Successful in 4m9s
CD Pipeline / post-deploy-checks (push) Successful in 1m57s

This commit is contained in:
Your Name
2026-05-24 17:10:26 +08:00
parent 5b8f14e32e
commit 9b01f1fa46
4 changed files with 155 additions and 8 deletions

View File

@@ -157,6 +157,9 @@ async def get_db_context(project_id: str | None = None) -> AsyncGenerator[AsyncS
# Initialization
# =============================================================================
_DB_BOOTSTRAP_LOCK_NAME = "awoooi:init_db:ddl"
async def init_db() -> None:
"""
Initialize database tables
@@ -165,6 +168,28 @@ async def init_db() -> None:
"""
engine = get_engine()
async with engine.connect() as lock_conn:
# 2026-05-24 ogt + Codex: 兩個 API replica 同時啟動時PostgreSQL 會在
# ALTER TABLE ... IF NOT EXISTS 上互相等待並 deadlock。整段 bootstrap
# DDL 必須序列化,避免 rollout 因一個 pod CrashLoop 變成 1/2 ready。
await lock_conn.execute(
text("SELECT pg_advisory_lock(hashtext(:lock_name))"),
{"lock_name": _DB_BOOTSTRAP_LOCK_NAME},
)
try:
await _run_init_db_ddl(engine)
finally:
await lock_conn.execute(
text("SELECT pg_advisory_unlock(hashtext(:lock_name))"),
{"lock_name": _DB_BOOTSTRAP_LOCK_NAME},
)
async def _run_init_db_ddl(engine: AsyncEngine) -> None:
"""
Run idempotent DB bootstrap DDL while caller holds the bootstrap advisory lock.
"""
# 2026-04-15 ogt: 多 replica 並行啟動競爭修復
# 問題:單一大 transaction 裡兩個 pod 同時建 table → 其中一個 CREATE INDEX 失敗
# PostgreSQL 中 transaction 內任何錯誤導致整個 transaction ROLLBACK

View File

@@ -31,6 +31,7 @@ from fastapi.responses import JSONResponse, Response
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API
from src.api.v1 import ai as ai_v1
@@ -84,7 +85,11 @@ from src.api.v1 import webhooks as webhooks_v1
from src.core.config import settings
from src.core.http_client import close_all_http_clients, init_all_http_clients
from src.core.logging import get_logger, setup_logging
from src.core.redis_client import close_redis_pool, init_redis_pool
from src.core.redis_client import (
close_redis_pool,
close_worker_redis_pool,
init_redis_pool,
)
from src.core.sse import get_publisher
from src.core.telemetry import setup_telemetry, shutdown_telemetry
@@ -783,6 +788,7 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# Phase 6.1: 關閉 Signal Worker (先關閉 Consumer)
await close_signal_worker()
await close_worker_redis_pool()
await publisher.stop()
await close_executor()
await close_openclaw()
@@ -835,12 +841,8 @@ else:
# Middleware
# =============================================================================
# 2026-04-03 ogt: Nginx 反向代理修正 — 讓 FastAPI 信任 X-Forwarded-Proto
# 解決問題: /api/v1/knowledge (無結尾斜線) 307 redirect 產生 http:// Location
# 原因: FastAPI 不知道自己在 HTTPS 後面redirect 回 http://
# 效果: 有了此中間件307 Location 會是 https://
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
# 2026-04-03 ogt: Nginx 反向代理修正 — 讓 FastAPI 信任 X-Forwarded-Proto
# 避免 /api/v1/knowledge 等 redirect 在 HTTPS 反向代理後產生 http:// Location
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
# CORS - Strict Whitelist (Iron Law #2)

View File

@@ -33,7 +33,7 @@ from typing import Any
import structlog
from src.core.redis_client import get_redis, get_worker_redis
from src.core.redis_client import get_redis, get_worker_redis, init_worker_redis_pool
from src.core.telemetry import restore_trace_context
from src.services.incident_engine import get_incident_engine
@@ -131,6 +131,7 @@ class SignalWorker:
return
await self._ensure_consumer_group()
await init_worker_redis_pool()
self._running = True
self._task = asyncio.create_task(self._consume_loop())

View File

@@ -0,0 +1,119 @@
"""
Runtime bootstrap guard tests.
這組測試鎖住 production rollout 曾踩到的兩個啟動序問題:
- API replicas 同時執行 DB bootstrap DDL 時必須有 advisory lock。
- SignalWorker 建立 Redis Stream 背景 task 前,必須先初始化 worker Redis pool。
"""
from __future__ import annotations
from collections.abc import Awaitable
from typing import Any
import pytest
class _FakeLockConnection:
def __init__(self) -> None:
self.statements: list[str] = []
async def __aenter__(self) -> _FakeLockConnection:
return self
async def __aexit__(self, *_exc: object) -> None:
return None
async def execute(self, statement: object, params: dict[str, str] | None = None) -> None:
self.statements.append(str(statement))
assert params == {"lock_name": "awoooi:init_db:ddl"}
class _FakeEngine:
def __init__(self) -> None:
self.lock_conn = _FakeLockConnection()
def connect(self) -> _FakeLockConnection:
return self.lock_conn
@pytest.mark.asyncio
async def test_init_db_serializes_bootstrap_ddl_with_advisory_lock(monkeypatch):
from src.db import base as db_base
fake_engine = _FakeEngine()
calls: list[object] = []
async def fake_run_init_db_ddl(engine: object) -> None:
calls.append(engine)
monkeypatch.setattr(db_base, "get_engine", lambda: fake_engine)
monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl)
await db_base.init_db()
assert calls == [fake_engine]
assert "pg_advisory_lock" in fake_engine.lock_conn.statements[0]
assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1]
@pytest.mark.asyncio
async def test_init_db_releases_bootstrap_lock_when_ddl_fails(monkeypatch):
from src.db import base as db_base
fake_engine = _FakeEngine()
async def fake_run_init_db_ddl(_engine: object) -> None:
raise RuntimeError("ddl failed")
monkeypatch.setattr(db_base, "get_engine", lambda: fake_engine)
monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl)
with pytest.raises(RuntimeError, match="ddl failed"):
await db_base.init_db()
assert "pg_advisory_lock" in fake_engine.lock_conn.statements[0]
assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1]
@pytest.mark.asyncio
async def test_signal_worker_initializes_worker_redis_pool_before_tasks(monkeypatch):
from src.workers import signal_worker
events: list[str] = []
worker = signal_worker.SignalWorker()
async def fake_ensure_consumer_group() -> None:
events.append("consumer_group")
async def fake_init_worker_redis_pool() -> None:
events.append("worker_redis_pool")
def fake_create_task(coro: Awaitable[Any]) -> object:
events.append("task")
coro.close()
class _FakeTask:
def cancel(self) -> None:
return None
return _FakeTask()
monkeypatch.setattr(worker, "_ensure_consumer_group", fake_ensure_consumer_group)
monkeypatch.setattr(signal_worker, "init_worker_redis_pool", fake_init_worker_redis_pool)
monkeypatch.setattr(signal_worker.asyncio, "create_task", fake_create_task)
await worker.start()
assert events == ["consumer_group", "worker_redis_pool", "task", "task"]
def test_api_lifespan_closes_worker_redis_pool_after_signal_worker() -> None:
import inspect
from src import main as api_main
source = inspect.getsource(api_main.lifespan)
assert "close_worker_redis_pool" in source
assert source.index("close_signal_worker") < source.index("close_worker_redis_pool")
assert source.index("close_worker_redis_pool") < source.index("close_redis_pool")