fix(db): 重用 DatabaseManager engine pool
Some checks failed
CD Pipeline / deploy (push) Has been cancelled

This commit is contained in:
OoO
2026-04-30 09:01:17 +08:00
parent 5a569d1e05
commit 9750093abd
4 changed files with 91 additions and 14 deletions

View File

@@ -78,6 +78,9 @@ def sanitize_timestamp(timestamp_str):
return timestamp_str
class DatabaseManager:
_instance_cache = {}
_instance_lock = threading.Lock()
def __init__(self, db_path=None):
"""
初始化資料庫連線。
@@ -86,11 +89,23 @@ class DatabaseManager:
# V-Fix (2026-01-23): 優先使用 config.py 的資料庫設定
from config import DATABASE_PATH, DATABASE_TYPE
effective_db_path = DATABASE_PATH if db_path is None else db_path
cache_key = (DATABASE_TYPE, str(effective_db_path))
# V-Fix (2026-04-30): DatabaseManager 在多 route 內被頻繁直接 new
# 若每次都 create_engine 會不斷新增連線池,最終把 PostgreSQL clients 用光。
with self._instance_lock:
cached = self._instance_cache.get(cache_key)
if cached:
self.engine = cached['engine']
self.Session = cached['Session']
return
if DATABASE_TYPE == 'postgresql':
# PostgreSQL 模式 - 使用 config.py 的連線字串
# 連線池配置以提升穩定性
self.engine = create_engine(
DATABASE_PATH,
effective_db_path,
echo=False,
pool_pre_ping=True, # 自動檢測斷線連線
pool_size=5, # 連線池大小
@@ -104,6 +119,10 @@ class DatabaseManager:
)
ensure_metadata_initialized(self.engine, use_postgres_lock=True)
self.Session = sessionmaker(bind=self.engine)
self._instance_cache[cache_key] = {
'engine': self.engine,
'Session': self.Session,
}
sys_log.info(f"[Database] ✅ 使用 PostgreSQL 資料庫 (連線池已優化)")
# ADR-013: 確保 AIOps 自動修復表存在並植入種子 PlayBook
self._init_autoheal_tables()
@@ -111,20 +130,24 @@ class DatabaseManager:
# SQLite 模式 - 向後相容
if db_path is None:
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
db_path = os.path.join(base_dir, 'data', 'momo_database.db')
effective_db_path = os.path.join(base_dir, 'data', 'momo_database.db')
if str(db_path).startswith('sqlite://'):
sqlite_db_file = make_url(db_path).database
if str(effective_db_path).startswith('sqlite://'):
sqlite_db_file = make_url(effective_db_path).database
if sqlite_db_file:
os.makedirs(os.path.dirname(sqlite_db_file), exist_ok=True)
self.engine = create_engine(db_path, echo=False)
self.engine = create_engine(effective_db_path, echo=False)
else:
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
os.makedirs(os.path.dirname(effective_db_path), exist_ok=True)
self.engine = create_engine(f'sqlite:///{effective_db_path}', echo=False)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self._instance_cache[cache_key] = {
'engine': self.engine,
'Session': self.Session,
}
self._check_and_fix_schema()
sys_log.info(f"[Database] 使用 SQLite 資料庫: {db_path}")
sys_log.info(f"[Database] 使用 SQLite 資料庫: {effective_db_path}")
def _check_and_fix_schema(self):
"""自動檢查並修復資料庫結構 (僅限 SQLite)"""

View File

@@ -41,6 +41,15 @@ def post_fork(server, worker):
disposed_count = 0
prefixes = ("app", "database.", "routes.", "services.")
def dispose_once(engine, label):
nonlocal disposed_count
engine_id = id(engine)
if engine_id in disposed_ids:
return
disposed_ids.add(engine_id)
if _dispose_engine(engine, label, server):
disposed_count += 1
for module_name, module in list(sys.modules.items()):
if module is None or not module_name.startswith(prefixes):
continue
@@ -60,12 +69,17 @@ def post_fork(server, worker):
candidates.append((engine, f"{module_name}.{attr_name}.engine"))
for engine, label in candidates:
engine_id = id(engine)
if engine_id in disposed_ids:
continue
disposed_ids.add(engine_id)
if _dispose_engine(engine, label, server):
disposed_count += 1
dispose_once(engine, label)
manager_module = sys.modules.get("database.manager")
manager_class = getattr(manager_module, "DatabaseManager", None)
manager_cache = getattr(manager_class, "_instance_cache", {}) if manager_class else {}
for cache_key, cached in list(manager_cache.items()):
if not isinstance(cached, dict):
continue
engine = cached.get("engine")
if isinstance(engine, Engine):
dispose_once(engine, f"database.manager.DatabaseManager._instance_cache[{cache_key!r}]")
server.log.info(
"Worker %s reset %s SQLAlchemy engine pool(s) after preload fork",

View File

@@ -0,0 +1,16 @@
from database.manager import DatabaseManager
def test_database_manager_reuses_engine_for_same_sqlite_path(tmp_path):
DatabaseManager._instance_cache.clear()
db_url = f"sqlite:///{tmp_path / 'cache.db'}"
first = DatabaseManager(db_url)
second = DatabaseManager(db_url)
try:
assert second.engine is first.engine
assert second.Session is first.Session
finally:
DatabaseManager._instance_cache.clear()
first.engine.dispose()

View File

@@ -48,3 +48,27 @@ def test_post_fork_skips_request_bound_local_proxy(monkeypatch):
monkeypatch.setitem(sys.modules, fake_module.__name__, fake_module)
config.post_fork(_Server(), _Worker())
def test_post_fork_disposes_database_manager_instance_cache(monkeypatch):
from sqlalchemy import create_engine
config = _load_gunicorn_config()
fake_module = types.ModuleType("database.manager")
engine = create_engine("sqlite:///:memory:")
before_pool = id(engine.pool)
class FakeDatabaseManager:
_instance_cache = {
("sqlite", "memory"): {
"engine": engine,
"Session": object(),
}
}
fake_module.DatabaseManager = FakeDatabaseManager
monkeypatch.setitem(sys.modules, fake_module.__name__, fake_module)
config.post_fork(_Server(), _Worker())
assert id(engine.pool) != before_pool