Files
ELM/src/elm/services/DatabaseService.py
T
2026-05-21 11:32:08 +02:00

38 lines
1.1 KiB
Python

import logging
from beanie import init_beanie
from pymongo import AsyncMongoClient
from pymongo.asynchronous.collection import AsyncCollection
from elm.types import User, Transaction, Ticket
from elm.types.ConfigurationTypes import DatabaseConfiguration
logger = logging.getLogger(__name__.split(".")[-1])
logger.setLevel(logging.DEBUG)
class DuplicationError(Exception):
pass
class NoDatabaseConnectionError(Exception):
pass
class DatabaseService:
def __init__(self, db_config: DatabaseConfiguration) -> None:
self._db_config = db_config
self._client = None
self._database = None
self._users = None
async def initialize(self) -> None:
if self._client is None:
self._client = AsyncMongoClient(self._db_config.database_address)
self._database = self._client[self._db_config.database_name]
self._users: AsyncCollection = self._database["users"]
await init_beanie(
database=self._database,
document_models=[User, Transaction, Ticket]
)