Skip to content

db_connection

zeus.optimizer.batch_size.server.database.db_connection

Managing database connection.

Heavily inspired by https://praciano.com.br/fastapi-and-async-sqlalchemy-20-with-pytest-done-right.html and https://medium.com/@tclaitken/setting-up-a-fastapi-app-with-async-sqlalchemy-2-0-pydantic-v2-e6c540be4308

DatabaseSessionManager

Session manager class.

Source code in zeus/optimizer/batch_size/server/database/db_connection.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class DatabaseSessionManager:
    """Session manager class."""

    def __init__(self, host: str, engine_kwargs: dict[str, Any] | None = None):
        """Create async engine and session maker."""
        if engine_kwargs is None:
            engine_kwargs = {}
        self._engine = create_async_engine(host, **engine_kwargs)
        self._sessionmaker = async_sessionmaker(autocommit=False, bind=self._engine)

    async def close(self):
        """Close connection."""
        if self._engine is None:
            raise ZeusBSOServerRuntimeError("DatabaseSessionManager is not initialized")
        await self._engine.dispose()

        self._engine = None
        self._sessionmaker = None

    @contextlib.asynccontextmanager
    async def connect(self) -> AsyncIterator[AsyncConnection]:
        """Connect to db."""
        if self._engine is None:
            raise ZeusBSOServerRuntimeError("DatabaseSessionManager is not initialized")

        async with self._engine.begin() as connection:
            try:
                yield connection
            except Exception:
                await connection.rollback()
                raise

    @contextlib.asynccontextmanager
    async def session(self) -> AsyncIterator[AsyncSession]:
        """Get session from session maker."""
        if self._sessionmaker is None:
            raise ZeusBSOServerRuntimeError("DatabaseSessionManager is not initialized")

        session = self._sessionmaker()
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

__init__

__init__(host, engine_kwargs=None)
Source code in zeus/optimizer/batch_size/server/database/db_connection.py
25
26
27
28
29
30
def __init__(self, host: str, engine_kwargs: dict[str, Any] | None = None):
    """Create async engine and session maker."""
    if engine_kwargs is None:
        engine_kwargs = {}
    self._engine = create_async_engine(host, **engine_kwargs)
    self._sessionmaker = async_sessionmaker(autocommit=False, bind=self._engine)

close async

close()

Close connection.

Source code in zeus/optimizer/batch_size/server/database/db_connection.py
32
33
34
35
36
37
38
39
async def close(self):
    """Close connection."""
    if self._engine is None:
        raise ZeusBSOServerRuntimeError("DatabaseSessionManager is not initialized")
    await self._engine.dispose()

    self._engine = None
    self._sessionmaker = None

connect async

connect()

Connect to db.

Source code in zeus/optimizer/batch_size/server/database/db_connection.py
41
42
43
44
45
46
47
48
49
50
51
52
@contextlib.asynccontextmanager
async def connect(self) -> AsyncIterator[AsyncConnection]:
    """Connect to db."""
    if self._engine is None:
        raise ZeusBSOServerRuntimeError("DatabaseSessionManager is not initialized")

    async with self._engine.begin() as connection:
        try:
            yield connection
        except Exception:
            await connection.rollback()
            raise

session async

session()

Get session from session maker.

Source code in zeus/optimizer/batch_size/server/database/db_connection.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@contextlib.asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]:
    """Get session from session maker."""
    if self._sessionmaker is None:
        raise ZeusBSOServerRuntimeError("DatabaseSessionManager is not initialized")

    session = self._sessionmaker()
    try:
        yield session
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

get_db_session async

get_db_session()

Get db session from session manager. Used with fastapi dependency injection.

Source code in zeus/optimizer/batch_size/server/database/db_connection.py
76
77
78
79
async def get_db_session() -> AsyncIterator[AsyncSession]:
    """Get db session from session manager. Used with fastapi dependency injection."""
    async with sessionmanager.session() as session:
        yield session