from database.manager import DatabaseManager def test_database_manager_reuses_engine_for_same_sqlite_path(tmp_path): DatabaseManager._instance_cache.clear() db_url = f"sqlite:///{tmp_path / 'cache.db'}" first = DatabaseManager(db_url) second = DatabaseManager(db_url) try: assert second.engine is first.engine assert second.Session is first.Session finally: DatabaseManager._instance_cache.clear() first.engine.dispose() def test_database_manager_uses_bounded_postgres_pool(monkeypatch): import config import database.manager as manager captured = {} class FakeEngine: def dispose(self): pass class FakeSession: pass def fake_create_engine(url, **kwargs): captured["url"] = url captured["kwargs"] = kwargs return FakeEngine() def fake_sessionmaker(bind): captured["session_bind"] = bind return FakeSession DatabaseManager._instance_cache.clear() monkeypatch.setattr(config, "DATABASE_TYPE", "postgresql") monkeypatch.setattr(config, "DATABASE_PATH", "postgresql://example/db") monkeypatch.setattr(manager, "create_engine", fake_create_engine) monkeypatch.setattr(manager, "sessionmaker", fake_sessionmaker) monkeypatch.setattr(manager, "ensure_metadata_initialized", lambda *args, **kwargs: None) monkeypatch.setattr(DatabaseManager, "_init_autoheal_tables", lambda self: None) db = DatabaseManager() assert captured["url"] == "postgresql://example/db" assert captured["kwargs"]["pool_pre_ping"] is True assert captured["kwargs"]["pool_size"] == 2 assert captured["kwargs"]["max_overflow"] == 3 assert captured["kwargs"]["pool_recycle"] == 1800 assert captured["kwargs"]["pool_timeout"] == 30 assert captured["kwargs"]["connect_args"]["connect_timeout"] == 10 assert "statement_timeout=60000" in captured["kwargs"]["connect_args"]["options"] assert db.engine is captured["session_bind"] DatabaseManager._instance_cache.clear() def test_postgres_schema_repair_adds_action_plan_group_a_columns(): from database.manager import _repair_postgres_schema_drift captured = [] class FakeConnection: def execute(self, statement): captured.append(str(statement)) _repair_postgres_schema_drift(FakeConnection()) ddl = "\n".join(captured) assert "ADD COLUMN IF NOT EXISTS action_type VARCHAR(100)" in ddl assert "ADD COLUMN IF NOT EXISTS description TEXT" in ddl assert "ADD COLUMN IF NOT EXISTS priority INTEGER DEFAULT 3" in ddl assert "ADD COLUMN IF NOT EXISTS metadata_json TEXT" in ddl assert "CREATE INDEX IF NOT EXISTS idx_action_plans_type" in ddl