From 4561c65fe90c287b7b69b6bfa7cb08da54007af4 Mon Sep 17 00:00:00 2001 From: ogt Date: Wed, 1 Jul 2026 17:00:53 +0800 Subject: [PATCH] fix(api): cap db pool during prod rollout --- apps/api/src/core/config.py | 18 +++ apps/api/src/db/base.py | 110 ++++++++++++------ apps/api/tests/test_config_url_validation.py | 8 ++ .../tests/test_runtime_bootstrap_guards.py | 60 +++++++++- k8s/awoooi-prod/04-configmap.yaml | 6 + k8s/awoooi-prod/08-deployment-worker.yaml | 7 +- 6 files changed, 169 insertions(+), 40 deletions(-) diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index 1b47c9f3..2f008d45 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -273,6 +273,24 @@ class Settings(BaseSettings): DATABASE_URL: str = Field( description="PostgreSQL connection URL (必填,從 K8s Secret awoooi-secrets → DATABASE_URL 取得)", ) + DATABASE_POOL_SIZE: int = Field( + default=1, + ge=1, + le=10, + description="SQLAlchemy asyncpg pool size. Production DB role currently has a very small connection budget.", + ) + DATABASE_MAX_OVERFLOW: int = Field( + default=0, + ge=0, + le=20, + description="Extra asyncpg connections above DATABASE_POOL_SIZE. Keep 0 in production until DB role limit is raised.", + ) + DATABASE_POOL_TIMEOUT_SECONDS: float = Field( + default=5.0, + ge=1.0, + le=30.0, + description="Seconds to wait for a pooled DB connection before failing visible.", + ) # ========================================================================== # Redis (192.168.0.188:6380, DB 0 - 與 OpenClaw 共用) diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index 85b7f38a..9e63f017 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -19,7 +19,9 @@ from contextlib import asynccontextmanager from fastapi import HTTPException from sqlalchemy import text +from sqlalchemy.exc import DBAPIError, TimeoutError as SQLAlchemyTimeoutError from sqlalchemy.ext.asyncio import ( + AsyncConnection, AsyncEngine, AsyncSession, async_sessionmaker, @@ -87,8 +89,9 @@ def get_engine() -> AsyncEngine: _engine = create_async_engine( database_url, echo=settings.DEBUG, - pool_size=10, - max_overflow=20, + pool_size=settings.DATABASE_POOL_SIZE, + max_overflow=settings.DATABASE_MAX_OVERFLOW, + pool_timeout=settings.DATABASE_POOL_TIMEOUT_SECONDS, pool_pre_ping=True, ) return _engine @@ -189,6 +192,30 @@ _DB_BOOTSTRAP_LOCK_POLL_SECONDS = 2.0 _DB_BOOTSTRAP_DDL_WAIT_SECONDS = 120.0 +def _is_database_connection_budget_error(exc: BaseException) -> bool: + """Return True for PostgreSQL role/pool exhaustion without hiding other DB errors.""" + seen: set[int] = set() + current: BaseException | None = exc + budget_markers = ( + "too many connections", + "remaining connection slots are reserved", + "connection limit", + "queuepool limit", + ) + + while current is not None and id(current) not in seen: + seen.add(id(current)) + message = f"{type(current).__name__}: {current}".lower() + if any(marker in message for marker in budget_markers): + return True + current = ( + getattr(current, "orig", None) + or getattr(current, "__cause__", None) + or getattr(current, "__context__", None) + ) + return False + + async def init_db() -> None: """ Initialize database tables @@ -197,35 +224,50 @@ async def init_db() -> None: """ engine = get_engine() - async with engine.connect() as lock_conn: - # 2026-05-24 ogt + Codex: 兩個 API replica 同時啟動時,PostgreSQL 會在 - # ALTER TABLE ... IF NOT EXISTS 上互相等待並 deadlock。整段 bootstrap - # DDL 必須序列化。 - # 2026-06-30 Codex: production rollout 觀察到新 API pod 卡在 init_db() - # 超過 startup probe 預算。bootstrap DDL 是冪等補欄流程,不能再用 - # 無界 pg_advisory_lock 把新版 API route 擋成永久舊版;改為有界等待, - # 逾時則 fail-visible skip,讓 serving path 先恢復,再由 verifier 回報。 - lock_acquired = await _try_acquire_bootstrap_lock(lock_conn) - if not lock_acquired: - return + try: + async with engine.connect() as lock_conn: + # 2026-05-24 ogt + Codex: 兩個 API replica 同時啟動時,PostgreSQL 會在 + # ALTER TABLE ... IF NOT EXISTS 上互相等待並 deadlock。整段 bootstrap + # DDL 必須序列化。 + # 2026-06-30 Codex: production rollout 觀察到新 API pod 卡在 init_db() + # 超過 startup probe 預算。bootstrap DDL 是冪等補欄流程,不能再用 + # 無界 pg_advisory_lock 把新版 API route 擋成永久舊版;改為有界等待, + # 逾時則 fail-visible skip,讓 serving path 先恢復,再由 verifier 回報。 + # 2026-07-01 Codex: production DB role has CONNECTION LIMIT 2. Keep + # bootstrap on the lock connection so pool_size=1 can still start. + lock_acquired = await _try_acquire_bootstrap_lock(lock_conn) + if not lock_acquired: + return - try: try: - await asyncio.wait_for( - _run_init_db_ddl(engine), - timeout=_DB_BOOTSTRAP_DDL_WAIT_SECONDS, + try: + await asyncio.wait_for( + _run_init_db_ddl(lock_conn), + timeout=_DB_BOOTSTRAP_DDL_WAIT_SECONDS, + ) + except TimeoutError: + logger.warning( + "database_bootstrap_ddl_timeout_skipped", + timeout_seconds=_DB_BOOTSTRAP_DDL_WAIT_SECONDS, + lock_name=_DB_BOOTSTRAP_LOCK_NAME, + ) + finally: + await lock_conn.execute( + text("SELECT pg_advisory_unlock(hashtext(:lock_name))"), + {"lock_name": _DB_BOOTSTRAP_LOCK_NAME}, ) - except TimeoutError: - logger.warning( - "database_bootstrap_ddl_timeout_skipped", - timeout_seconds=_DB_BOOTSTRAP_DDL_WAIT_SECONDS, - lock_name=_DB_BOOTSTRAP_LOCK_NAME, - ) - finally: - await lock_conn.execute( - text("SELECT pg_advisory_unlock(hashtext(:lock_name))"), - {"lock_name": _DB_BOOTSTRAP_LOCK_NAME}, + await lock_conn.commit() + except (DBAPIError, SQLAlchemyTimeoutError, OSError, RuntimeError) as exc: + if _is_database_connection_budget_error(exc): + logger.warning( + "database_bootstrap_connection_budget_exhausted_skipped", + error_type=type(exc).__name__, + lock_name=_DB_BOOTSTRAP_LOCK_NAME, + pool_size=settings.DATABASE_POOL_SIZE, + max_overflow=settings.DATABASE_MAX_OVERFLOW, ) + return + raise async def _try_acquire_bootstrap_lock(lock_conn: object) -> bool: @@ -256,7 +298,7 @@ async def _try_acquire_bootstrap_lock(lock_conn: object) -> bool: await asyncio.sleep(min(_DB_BOOTSTRAP_LOCK_POLL_SECONDS, remaining)) -async def _run_init_db_ddl(engine: AsyncEngine) -> None: +async def _run_init_db_ddl(conn: AsyncConnection) -> None: """ Run idempotent DB bootstrap DDL while caller holds the bootstrap advisory lock. """ @@ -267,15 +309,15 @@ async def _run_init_db_ddl(engine: AsyncEngine) -> None: # → table + index 全消失 → 下次重啟同樣問題 → 無限 CrashLoop # 修法:每個 table 獨立 transaction;先 DROP INDEX IF EXISTS 清殘留孤兒 index; # 捕捉 "already exists" 讓並行 pod 優雅跳過 - async with engine.connect() as probe_conn: - existing = set(await probe_conn.run_sync( - lambda c: __import__('sqlalchemy', fromlist=['inspect']).inspect(c).get_table_names() - )) + existing = set(await conn.run_sync( + lambda c: __import__('sqlalchemy', fromlist=['inspect']).inspect(c).get_table_names() + )) + await conn.commit() for table in Base.metadata.sorted_tables: if table.name not in existing: try: - async with engine.begin() as conn: + async with conn.begin(): # 先清殘留孤兒 index(前次 CrashLoop 留下的部分狀態) for index in table.indexes: await conn.execute(text(f'DROP INDEX IF EXISTS "{index.name}"')) @@ -286,7 +328,7 @@ async def _run_init_db_ddl(engine: AsyncEngine) -> None: else: raise - async with engine.begin() as conn: + async with conn.begin(): # 2026-04-02 Claude Code: 確保 risklevel enum 包含 'high' 值 # Phase 23 新增,避免舊 DB 缺少此值導致 InvalidTextRepresentation await conn.execute( diff --git a/apps/api/tests/test_config_url_validation.py b/apps/api/tests/test_config_url_validation.py index bf416859..947b7b2a 100644 --- a/apps/api/tests/test_config_url_validation.py +++ b/apps/api/tests/test_config_url_validation.py @@ -164,3 +164,11 @@ def test_empty_string_fallback_url_accepted(): """OLLAMA_FALLBACK_URL 預設空字串(未設定),應通過""" s = _make_settings(OLLAMA_FALLBACK_URL="") assert s.OLLAMA_FALLBACK_URL == "" + + +def test_database_pool_budget_defaults_to_production_safe_values(): + """Production DB role has a tiny connection limit; defaults must not fan out.""" + s = _make_settings() + assert s.DATABASE_POOL_SIZE == 1 + assert s.DATABASE_MAX_OVERFLOW == 0 + assert s.DATABASE_POOL_TIMEOUT_SECONDS == 5.0 diff --git a/apps/api/tests/test_runtime_bootstrap_guards.py b/apps/api/tests/test_runtime_bootstrap_guards.py index 412101d6..93db3215 100644 --- a/apps/api/tests/test_runtime_bootstrap_guards.py +++ b/apps/api/tests/test_runtime_bootstrap_guards.py @@ -33,6 +33,9 @@ class _FakeLockConnection: async def __aexit__(self, *_exc: object) -> None: return None + async def commit(self) -> None: + self.statements.append("COMMIT") + async def execute( self, statement: object, @@ -54,6 +57,37 @@ class _FakeEngine: return self.lock_conn +class _ConnectionBudgetEngine: + def connect(self) -> _FakeLockConnection: + raise RuntimeError('too many connections for role "awoooi"') + + +def test_get_engine_uses_database_pool_budget(monkeypatch): + from src.db import base as db_base + + captured: dict[str, object] = {} + fake_engine = object() + + def fake_create_async_engine(database_url: str, **kwargs: object) -> object: + captured["database_url"] = database_url + captured.update(kwargs) + return fake_engine + + monkeypatch.setattr(db_base, "_engine", None) + monkeypatch.setattr(db_base, "_session_factory", None) + monkeypatch.setattr(db_base.settings, "DATABASE_URL", "postgresql+asyncpg://u:p@localhost/db") + monkeypatch.setattr(db_base.settings, "DATABASE_POOL_SIZE", 1) + monkeypatch.setattr(db_base.settings, "DATABASE_MAX_OVERFLOW", 0) + monkeypatch.setattr(db_base.settings, "DATABASE_POOL_TIMEOUT_SECONDS", 5.0) + monkeypatch.setattr(db_base, "create_async_engine", fake_create_async_engine) + + assert db_base.get_engine() is fake_engine + assert captured["database_url"] == "postgresql+asyncpg://u:p@localhost/db" + assert captured["pool_size"] == 1 + assert captured["max_overflow"] == 0 + assert captured["pool_timeout"] == 5.0 + + @pytest.mark.asyncio async def test_init_db_serializes_bootstrap_ddl_with_advisory_lock(monkeypatch): from src.db import base as db_base @@ -69,9 +103,27 @@ async def test_init_db_serializes_bootstrap_ddl_with_advisory_lock(monkeypatch): await db_base.init_db() - assert calls == [fake_engine] + assert calls == [fake_engine.lock_conn] assert "pg_try_advisory_lock" in fake_engine.lock_conn.statements[0] - assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1] + assert any("pg_advisory_unlock" in stmt for stmt in fake_engine.lock_conn.statements) + assert "COMMIT" in fake_engine.lock_conn.statements + + +@pytest.mark.asyncio +async def test_init_db_skips_bootstrap_when_connection_budget_exhausted(monkeypatch): + from src.db import base as db_base + + calls: list[object] = [] + + async def fake_run_init_db_ddl(engine: object) -> None: + calls.append(engine) + + monkeypatch.setattr(db_base, "get_engine", lambda: _ConnectionBudgetEngine()) + monkeypatch.setattr(db_base, "_run_init_db_ddl", fake_run_init_db_ddl) + + await db_base.init_db() + + assert calls == [] @pytest.mark.asyncio @@ -90,7 +142,7 @@ async def test_init_db_releases_bootstrap_lock_when_ddl_fails(monkeypatch): await db_base.init_db() assert "pg_try_advisory_lock" in fake_engine.lock_conn.statements[0] - assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1] + assert any("pg_advisory_unlock" in stmt for stmt in fake_engine.lock_conn.statements) @pytest.mark.asyncio @@ -135,7 +187,7 @@ async def test_init_db_releases_bootstrap_lock_when_ddl_times_out(monkeypatch): await db_base.init_db() assert "pg_try_advisory_lock" in fake_engine.lock_conn.statements[0] - assert "pg_advisory_unlock" in fake_engine.lock_conn.statements[-1] + assert any("pg_advisory_unlock" in stmt for stmt in fake_engine.lock_conn.statements) @pytest.mark.asyncio diff --git a/k8s/awoooi-prod/04-configmap.yaml b/k8s/awoooi-prod/04-configmap.yaml index 8a35d2ea..d6246757 100644 --- a/k8s/awoooi-prod/04-configmap.yaml +++ b/k8s/awoooi-prod/04-configmap.yaml @@ -42,6 +42,12 @@ data: # JSON array 格式 (pydantic-settings 要求) # 正式域名優先,本機開發備用 CORS_ORIGINS: '["https://awoooi.wooo.work","http://localhost:3000","http://localhost:3001"]' + # 2026-07-01 Codex: production DB role `awoooi` currently has CONNECTION + # LIMIT 2. Keep API/worker pools single-connection until the role budget is + # raised and verified, otherwise rollout startup can exhaust PostgreSQL. + DATABASE_POOL_SIZE: "1" + DATABASE_MAX_OVERFLOW: "0" + DATABASE_POOL_TIMEOUT_SECONDS: "5" # AI 配置 (JSON array 格式 for pydantic-settings) # 2026-04-16 ogt + Claude Sonnet 4.6: 修正 fallback 順序 — 本地 GPU Ollama(111) 第一 diff --git a/k8s/awoooi-prod/08-deployment-worker.yaml b/k8s/awoooi-prod/08-deployment-worker.yaml index 54089b05..7c037564 100644 --- a/k8s/awoooi-prod/08-deployment-worker.yaml +++ b/k8s/awoooi-prod/08-deployment-worker.yaml @@ -31,8 +31,11 @@ spec: strategy: type: RollingUpdate rollingUpdate: - maxSurge: 1 - maxUnavailable: 0 + # 2026-07-01 Codex: DB role `awoooi` is capped at two connections and + # worker imports the same API DB stack. Do not run old+new worker pods + # during rollout until the DB role/pool budget is raised and verified. + maxSurge: 0 + maxUnavailable: 1 template: metadata: labels: