diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index 108aeab0..9bfbbe88 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -157,6 +157,9 @@ async def get_db_context(project_id: str | None = None) -> AsyncGenerator[AsyncS # Initialization # ============================================================================= +_DB_BOOTSTRAP_LOCK_NAME = "awoooi:init_db:ddl" + + async def init_db() -> None: """ Initialize database tables @@ -165,6 +168,28 @@ async def init_db() -> None: """ engine = get_engine() + async with engine.connect() as lock_conn: + # 2026-05-24 ogt + Codex: 兩個 API replica 同時啟動時,PostgreSQL 會在 + # ALTER TABLE ... IF NOT EXISTS 上互相等待並 deadlock。整段 bootstrap + # DDL 必須序列化,避免 rollout 因一個 pod CrashLoop 變成 1/2 ready。 + await lock_conn.execute( + text("SELECT pg_advisory_lock(hashtext(:lock_name))"), + {"lock_name": _DB_BOOTSTRAP_LOCK_NAME}, + ) + try: + await _run_init_db_ddl(engine) + finally: + await lock_conn.execute( + text("SELECT pg_advisory_unlock(hashtext(:lock_name))"), + {"lock_name": _DB_BOOTSTRAP_LOCK_NAME}, + ) + + +async def _run_init_db_ddl(engine: AsyncEngine) -> None: + """ + Run idempotent DB bootstrap DDL while caller holds the bootstrap advisory lock. + """ + # 2026-04-15 ogt: 多 replica 並行啟動競爭修復 # 問題:單一大 transaction 裡兩個 pod 同時建 table → 其中一個 CREATE INDEX 失敗 # PostgreSQL 中 transaction 內任何錯誤導致整個 transaction ROLLBACK diff --git a/apps/api/src/main.py b/apps/api/src/main.py index a47687df..ca562d58 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -31,6 +31,7 @@ from fastapi.responses import JSONResponse, Response from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from sentry_sdk.integrations.fastapi import FastApiIntegration from sentry_sdk.integrations.starlette import StarletteIntegration +from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API from src.api.v1 import ai as ai_v1 @@ -84,7 +85,11 @@ from src.api.v1 import webhooks as webhooks_v1 from src.core.config import settings from src.core.http_client import close_all_http_clients, init_all_http_clients from src.core.logging import get_logger, setup_logging -from src.core.redis_client import close_redis_pool, init_redis_pool +from src.core.redis_client import ( + close_redis_pool, + close_worker_redis_pool, + init_redis_pool, +) from src.core.sse import get_publisher from src.core.telemetry import setup_telemetry, shutdown_telemetry @@ -783,6 +788,7 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # Phase 6.1: 關閉 Signal Worker (先關閉 Consumer) await close_signal_worker() + await close_worker_redis_pool() await publisher.stop() await close_executor() await close_openclaw() @@ -835,12 +841,8 @@ else: # Middleware # ============================================================================= -# 2026-04-03 ogt: Nginx 反向代理修正 — 讓 FastAPI 信任 X-Forwarded-Proto -# 解決問題: /api/v1/knowledge (無結尾斜線) 307 redirect 產生 http:// Location -# 原因: FastAPI 不知道自己在 HTTPS 後面,redirect 回 http:// -# 效果: 有了此中間件,307 Location 會是 https:// -from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware - +# 2026-04-03 ogt: Nginx 反向代理修正 — 讓 FastAPI 信任 X-Forwarded-Proto。 +# 避免 /api/v1/knowledge 等 redirect 在 HTTPS 反向代理後產生 http:// Location。 app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*") # CORS - Strict Whitelist (Iron Law #2) diff --git a/apps/api/src/workers/signal_worker.py b/apps/api/src/workers/signal_worker.py index afb96f45..ad3136cc 100644 --- a/apps/api/src/workers/signal_worker.py +++ b/apps/api/src/workers/signal_worker.py @@ -33,7 +33,7 @@ from typing import Any import structlog -from src.core.redis_client import get_redis, get_worker_redis +from src.core.redis_client import get_redis, get_worker_redis, init_worker_redis_pool from src.core.telemetry import restore_trace_context from src.services.incident_engine import get_incident_engine @@ -131,6 +131,7 @@ class SignalWorker: return await self._ensure_consumer_group() + await init_worker_redis_pool() self._running = True self._task = asyncio.create_task(self._consume_loop()) diff --git a/apps/api/tests/test_runtime_bootstrap_guards.py b/apps/api/tests/test_runtime_bootstrap_guards.py new file mode 100644 index 00000000..9e662c4a --- /dev/null +++ b/apps/api/tests/test_runtime_bootstrap_guards.py @@ -0,0 +1,119 @@ +""" +Runtime bootstrap guard tests. + +這組測試鎖住 production rollout 曾踩到的兩個啟動序問題: +- API replicas 同時執行 DB bootstrap DDL 時必須有 advisory lock。 +- SignalWorker 建立 Redis Stream 背景 task 前,必須先初始化 worker Redis pool。 +""" + +from __future__ import annotations + +from collections.abc import Awaitable +from typing import Any + +import pytest + + +class _FakeLockConnection: + def __init__(self) -> None: + self.statements: list[str] = [] + + async def __aenter__(self) -> _FakeLockConnection: + return self + + async def __aexit__(self, *_exc: object) -> None: + return None + + async def execute(self, statement: object, params: dict[str, str] | None = None) -> None: + self.statements.append(str(statement)) + assert params == {"lock_name": "awoooi:init_db:ddl"} + + +class _FakeEngine: + def __init__(self) -> None: + self.lock_conn = _FakeLockConnection() + + def connect(self) -> _FakeLockConnection: + return self.lock_conn + + +@pytest.mark.asyncio +async def test_init_db_serializes_bootstrap_ddl_with_advisory_lock(monkeypatch): + from src.db import base as db_base + + fake_engine = _FakeEngine() + calls: list[object] = [] + + async def fake_run_init_db_ddl(engine: object) -> None: + calls.append(engine) + + monkeypatch.setattr(db_base, "get_engine", lambda: fake_engine) + monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl) + + await db_base.init_db() + + assert calls == [fake_engine] + assert "pg_advisory_lock" in fake_engine.lock_conn.statements[0] + assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1] + + +@pytest.mark.asyncio +async def test_init_db_releases_bootstrap_lock_when_ddl_fails(monkeypatch): + from src.db import base as db_base + + fake_engine = _FakeEngine() + + async def fake_run_init_db_ddl(_engine: object) -> None: + raise RuntimeError("ddl failed") + + monkeypatch.setattr(db_base, "get_engine", lambda: fake_engine) + monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl) + + with pytest.raises(RuntimeError, match="ddl failed"): + await db_base.init_db() + + assert "pg_advisory_lock" in fake_engine.lock_conn.statements[0] + assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1] + + +@pytest.mark.asyncio +async def test_signal_worker_initializes_worker_redis_pool_before_tasks(monkeypatch): + from src.workers import signal_worker + + events: list[str] = [] + worker = signal_worker.SignalWorker() + + async def fake_ensure_consumer_group() -> None: + events.append("consumer_group") + + async def fake_init_worker_redis_pool() -> None: + events.append("worker_redis_pool") + + def fake_create_task(coro: Awaitable[Any]) -> object: + events.append("task") + coro.close() + + class _FakeTask: + def cancel(self) -> None: + return None + + return _FakeTask() + + monkeypatch.setattr(worker, "_ensure_consumer_group", fake_ensure_consumer_group) + monkeypatch.setattr(signal_worker, "init_worker_redis_pool", fake_init_worker_redis_pool) + monkeypatch.setattr(signal_worker.asyncio, "create_task", fake_create_task) + + await worker.start() + + assert events == ["consumer_group", "worker_redis_pool", "task", "task"] + + +def test_api_lifespan_closes_worker_redis_pool_after_signal_worker() -> None: + import inspect + + from src import main as api_main + + source = inspect.getsource(api_main.lifespan) + assert "close_worker_redis_pool" in source + assert source.index("close_signal_worker") < source.index("close_worker_redis_pool") + assert source.index("close_worker_redis_pool") < source.index("close_redis_pool")