ezgg-lan-manager/src/ez_lan_manager/services/DatabaseService.py
2025-05-13 07:10:24 +02:00

790 lines
37 KiB
Python

import logging
from datetime import date, datetime
from typing import Optional
from decimal import Decimal
import aiomysql
from src.ez_lan_manager.types.CateringOrder import CateringOrder
from src.ez_lan_manager.types.CateringMenuItem import CateringMenuItem, CateringMenuItemCategory
from src.ez_lan_manager.types.CateringOrder import CateringMenuItemsWithAmount, CateringOrderStatus
from src.ez_lan_manager.types.ConfigurationTypes import DatabaseConfiguration
from src.ez_lan_manager.types.News import News
from src.ez_lan_manager.types.Seat import Seat
from src.ez_lan_manager.types.Ticket import Ticket
from src.ez_lan_manager.types.Transaction import Transaction
from src.ez_lan_manager.types.User import User
logger = logging.getLogger(__name__.split(".")[-1])
class DuplicationError(Exception):
pass
class NoDatabaseConnectionError(Exception):
pass
class DatabaseService:
MAX_CONNECTION_RETRIES = 5
def __init__(self, database_config: DatabaseConfiguration) -> None:
self._database_config = database_config
self._connection_pool: Optional[aiomysql.Pool] = None
async def is_healthy(self) -> bool:
try:
async with self._connection_pool.acquire() as conn:
async with conn.cursor() as _:
return True
except aiomysql.OperationalError:
return False
except Exception as e:
logger.error(f"Failed to acquire a connection: {e}")
return False
async def init_db_pool(self) -> bool:
logger.info(
f"Connecting to database '{self._database_config.db_name}' on "
f"{self._database_config.db_user}@{self._database_config.db_host}:{self._database_config.db_port}"
)
try:
self._connection_pool = await aiomysql.create_pool(
host=self._database_config.db_host,
port=self._database_config.db_port,
user=self._database_config.db_user,
password=self._database_config.db_password,
db=self._database_config.db_name,
minsize=1,
maxsize=40
)
except aiomysql.OperationalError:
return False
return True
@staticmethod
def _map_db_result_to_user(data: tuple) -> User:
return User(
user_id=data[0],
user_name=data[1],
user_mail=data[2],
user_password=data[3],
user_first_name=data[4],
user_last_name=data[5],
user_birth_day=data[6],
is_active=bool(data[7]),
is_team_member=bool(data[8]),
is_admin=bool(data[9]),
created_at=data[10],
last_updated_at=data[11]
)
async def get_user_by_name(self, user_name: str) -> Optional[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
await cursor.execute("SELECT * FROM users WHERE user_name=%s", (user_name,))
result = await cursor.fetchone()
if not result:
return
return self._map_db_result_to_user(result)
async def get_user_by_id(self, user_id: int) -> Optional[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
await cursor.execute("SELECT * FROM users WHERE user_id=%s", (user_id,))
result = await cursor.fetchone()
if not result:
return
return self._map_db_result_to_user(result)
async def get_user_by_mail(self, user_mail: str) -> Optional[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
await cursor.execute("SELECT * FROM users WHERE user_mail=%s", (user_mail.lower(),))
result = await cursor.fetchone()
if not result:
return
return self._map_db_result_to_user(result)
async def create_user(self, user_name: str, user_mail: str, password_hash: str) -> User:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO users (user_name, user_mail, user_password) "
"VALUES (%s, %s, %s)", (user_name, user_mail.lower(), password_hash)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.create_user(user_name, user_mail, password_hash)
except aiomysql.IntegrityError as e:
logger.warning(f"Aborted duplication entry: {e}")
raise DuplicationError
return await self.get_user_by_name(user_name)
async def update_user(self, user: User) -> User:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE users SET user_name=%s, user_mail=%s, user_password=%s, user_first_name=%s, "
"user_last_name=%s, user_birth_date=%s, is_active=%s, is_team_member=%s, is_admin=%s "
"WHERE (user_id=%s)",
(user.user_name, user.user_mail.lower(), user.user_password,
user.user_first_name, user.user_last_name, user.user_birth_day,
user.is_active, user.is_team_member, user.is_admin,
user.user_id)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_user(user)
except aiomysql.IntegrityError as e:
logger.warning(f"Aborted duplication entry: {e}")
raise DuplicationError
return user
async def add_transaction(self, transaction: Transaction) -> Optional[Transaction]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO transactions (user_id, value, is_debit, transaction_date, transaction_reference) "
"VALUES (%s, %s, %s, %s, %s)",
(transaction.user_id, transaction.value, transaction.is_debit, transaction.transaction_date,
transaction.reference)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_transaction(transaction)
except Exception as e:
logger.warning(f"Error adding Transaction: {e}")
return
return transaction
async def get_all_transactions_for_user(self, user_id: int) -> list[Transaction]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
transactions = []
try:
await cursor.execute("SELECT * FROM transactions WHERE user_id=%s", (user_id,))
await conn.commit()
result = await cursor.fetchall()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_all_transactions_for_user(user_id)
except aiomysql.Error as e:
logger.error(f"Error getting all transactions for user: {e}")
return []
for transaction_raw in result:
transactions.append(Transaction(
user_id=user_id,
value=Decimal(transaction_raw[2]),
is_debit=bool(transaction_raw[3]),
transaction_date=transaction_raw[4],
reference=transaction_raw[5]
))
return transactions
async def add_news(self, news: News) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO news (news_content, news_title, news_subtitle, news_author, news_date) "
"VALUES (%s, %s, %s, %s, %s)",
(news.content, news.title, news.subtitle, news.author.user_id, news.news_date)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_news(news)
except Exception as e:
logger.warning(f"Error adding Transaction: {e}")
async def get_news(self, dt_start: date, dt_end: date) -> list[News]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute(
"SELECT * FROM news INNER JOIN users ON news.news_author = users.user_id WHERE news_date"
" BETWEEN %s AND %s;",
(dt_start, dt_end))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_news(dt_start, dt_end)
except Exception as e:
logger.warning(f"Error fetching news: {e}")
return []
for news_raw in await cursor.fetchall():
user = self._map_db_result_to_user(news_raw[6:])
results.append(News(
news_id=news_raw[0],
title=news_raw[2],
subtitle=news_raw[3],
author=user,
content=news_raw[1],
news_date=news_raw[5]
))
return results
async def update_news(self, news: News) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"""
UPDATE news
SET news_content = %s,
news_title = %s,
news_subtitle = %s,
news_author = %s,
news_date = %s
WHERE news_id = %s
""",
(news.content, news.title, news.subtitle, news.author.user_id, news.news_date, news.news_id)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_news(news)
except Exception as e:
logger.warning(f"Error updating news: {e}")
async def remove_news(self, news_id: int) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"DELETE FROM news WHERE news_id = %s",
(news_id,)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.remove_news(news_id)
except Exception as e:
logger.warning(f"Error removing news with ID {news_id}: {e}")
async def get_tickets(self) -> list[Ticket]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute("SELECT * FROM tickets INNER JOIN users ON tickets.user = users.user_id;", ())
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_tickets()
except Exception as e:
logger.warning(f"Error fetching tickets: {e}")
return []
for ticket_raw in await cursor.fetchall():
user = self._map_db_result_to_user(ticket_raw[3:])
results.append(Ticket(
ticket_id=ticket_raw[0],
category=ticket_raw[1],
purchase_date=ticket_raw[3],
owner=user
))
return results
async def get_ticket_for_user(self, user_id: int) -> Optional[Ticket]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"SELECT * FROM tickets INNER JOIN users ON tickets.user = users.user_id WHERE user_id=%s;",
(user_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_ticket_for_user(user_id)
except Exception as e:
logger.warning(f"Error fetching ticket for user: {e}")
return
result = await cursor.fetchone()
if not result:
return
user = self._map_db_result_to_user(result[3:])
return Ticket(
ticket_id=result[0],
category=result[1],
purchase_date=result[3],
owner=user
)
async def generate_ticket_for_user(self, user_id: int, category: str) -> Optional[Ticket]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("INSERT INTO tickets (ticket_category, user) VALUES (%s, %s)",
(category, user_id))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.generate_ticket_for_user(user_id, category)
except Exception as e:
logger.warning(f"Error generating ticket for user: {e}")
return
return await self.get_ticket_for_user(user_id)
async def change_ticket_owner(self, ticket_id: int, new_owner_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("UPDATE tickets SET user = %s WHERE ticket_id = %s;",
(new_owner_id, ticket_id))
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.change_ticket_owner(ticket_id, new_owner_id)
except Exception as e:
logger.warning(f"Error transferring ticket to user: {e}")
return False
return affected_rows > 0
async def delete_ticket(self, ticket_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("DELETE FROM tickets WHERE ticket_id = %s;", (ticket_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.change_ticket_owner(ticket_id)
except Exception as e:
logger.warning(f"Error deleting ticket: {e}")
return False
return True
async def generate_fresh_seats_table(self, seats: list[tuple[str, str]]) -> None:
""" WARNING: THIS WILL DELETE ALL EXISTING DATA! DO NOT USE ON PRODUCTION DATABASE! """
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("TRUNCATE seats;")
for seat in seats:
await cursor.execute("INSERT INTO seats (seat_id, seat_category) VALUES (%s, %s);",
(seat[0], seat[1]))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.generate_fresh_seats_table(seats)
except Exception as e:
logger.warning(f"Error generating fresh seats table: {e}")
return
async def get_seating_info(self) -> list[Seat]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute(
"SELECT seats.*, users.* FROM seats LEFT JOIN users ON seats.user = users.user_id;")
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_seating_info()
except Exception as e:
logger.warning(f"Error getting seats table: {e}")
return results
for seat_raw in await cursor.fetchall():
if seat_raw[3] is None: # Empty seat
results.append(Seat(seat_raw[0], bool(seat_raw[1]), seat_raw[2], None))
else:
user = self._map_db_result_to_user(seat_raw[4:])
results.append(Seat(seat_raw[0], bool(seat_raw[1]), seat_raw[2], user))
return results
async def seat_user(self, seat_id: str, user_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("UPDATE seats SET user = %s WHERE seat_id = %s;", (user_id, seat_id))
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.seat_user(seat_id, user_id)
except Exception as e:
logger.warning(f"Error seating user: {e}")
return False
return affected_rows > 0
async def get_menu_items(self) -> list[CateringMenuItem]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute("SELECT * FROM catering_menu_items;")
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_menu_items()
except Exception as e:
logger.warning(f"Error fetching menu items: {e}")
return results
for menu_item_raw in await cursor.fetchall():
results.append(CateringMenuItem(
item_id=menu_item_raw[0],
name=menu_item_raw[1],
additional_info=menu_item_raw[2],
price=Decimal(menu_item_raw[3]),
category=CateringMenuItemCategory(menu_item_raw[4]),
is_disabled=bool(menu_item_raw[5])
))
return results
async def get_menu_item(self, menu_item_id) -> Optional[CateringMenuItem]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("SELECT * FROM catering_menu_items WHERE catering_menu_item_id = %s;",
(menu_item_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_menu_item(menu_item_id)
except Exception as e:
logger.warning(f"Error fetching menu items: {e}")
return
raw_data = await cursor.fetchone()
if raw_data is None:
return
return CateringMenuItem(
item_id=raw_data[0],
name=raw_data[1],
additional_info=raw_data[2],
price=Decimal(raw_data[3]),
category=CateringMenuItemCategory(raw_data[4]),
is_disabled=bool(raw_data[5])
)
async def add_menu_item(self, name: str, info: str, price: Decimal, category: CateringMenuItemCategory,
is_disabled: bool = False) -> Optional[CateringMenuItem]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO catering_menu_items (name, additional_info, price, category, is_disabled) VALUES "
"(%s, %s, %s, %s, %s);",
(name, info, price, category.value, is_disabled)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_menu_item(name, info, price, category, is_disabled)
except Exception as e:
logger.warning(f"Error adding menu item: {e}")
return
return CateringMenuItem(
item_id=cursor.lastrowid,
name=name,
additional_info=info,
price=price,
category=category,
is_disabled=is_disabled
)
async def delete_menu_item(self, menu_item_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("DELETE FROM catering_menu_items WHERE catering_menu_item_id = %s;",
(menu_item_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.delete_menu_item(menu_item_id)
except Exception as e:
logger.warning(f"Error deleting menu item: {e}")
return False
return cursor.affected_rows > 0
async def update_menu_item(self, updated_item: CateringMenuItem) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE catering_menu_items SET name = %s, additional_info = %s, price = %s, category = %s, "
"is_disabled = %s WHERE catering_menu_item_id = %s;",
(updated_item.name, updated_item.additional_info, updated_item.price,
updated_item.category.value, updated_item.is_disabled, updated_item.item_id)
)
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_menu_item(updated_item)
except Exception as e:
logger.warning(f"Error updating menu item: {e}")
return False
return affected_rows > 0
async def add_new_order(self, menu_items: CateringMenuItemsWithAmount, user_id: int, is_delivery: bool) -> Optional[CateringOrder]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
now = datetime.now()
try:
await cursor.execute(
"INSERT INTO orders (status, user, is_delivery, order_date) VALUES (%s, %s, %s, %s);",
(CateringOrderStatus.RECEIVED.value, user_id, is_delivery, now)
)
order_id = cursor.lastrowid
for menu_item, quantity in menu_items.items():
await cursor.execute(
"INSERT INTO order_catering_menu_item (order_id, catering_menu_item_id, quantity) VALUES "
"(%s, %s, %s);",
(order_id, menu_item.item_id, quantity)
)
await conn.commit()
return CateringOrder(
order_id=order_id,
order_date=now,
status=CateringOrderStatus.RECEIVED,
items=menu_items,
customer=await self.get_user_by_id(user_id),
is_delivery=is_delivery
)
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_new_order(menu_items, user_id, is_delivery)
except Exception as e:
logger.warning(f"Error placing order: {e}")
return
async def change_order_status(self, order_id: int, status: CateringOrderStatus) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE orders SET status = %s WHERE order_id = %s;",
(status.value, order_id)
)
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.change_order_status(order_id, status)
except Exception as e:
logger.warning(f"Error updating menu item: {e}")
return False
return affected_rows > 0
async def get_orders(self, user_id: Optional[int] = None, status: Optional[CateringOrderStatus] = None) -> list[CateringOrder]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
fetched_orders = []
query = "SELECT * FROM orders LEFT JOIN users ON orders.user = users.user_id"
if user_id is not None and status is None:
query += f" WHERE user = {user_id};"
elif status is not None and user_id is None:
query += f" WHERE status = '{status.value}';"
elif status is not None and user_id is not None:
query += f" WHERE user = {user_id} AND status = '{status.value}';"
else:
query += ";"
try:
await cursor.execute(query)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_orders(user_id, status)
except Exception as e:
logger.warning(f"Error getting orders: {e}")
return fetched_orders
for raw_order in await cursor.fetchall():
fetched_orders.append(
CateringOrder(
order_id=raw_order[0],
status=CateringOrderStatus(raw_order[1]),
customer=self._map_db_result_to_user(raw_order[5:]),
items=await self.get_menu_items_for_order(raw_order[0]),
is_delivery=bool(raw_order[4]),
order_date=raw_order[3],
)
)
return fetched_orders
async def get_menu_items_for_order(self, order_id: int) -> CateringMenuItemsWithAmount:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
result = {}
try:
await cursor.execute(
"SELECT * FROM order_catering_menu_item "
"LEFT JOIN catering_menu_items ON order_catering_menu_item.catering_menu_item_id = catering_menu_items.catering_menu_item_id "
"WHERE order_id = %s;",
(order_id,)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_menu_items_for_order(order_id)
except Exception as e:
logger.warning(f"Error getting order items: {e}")
return result
for order_catering_menu_item_raw in await cursor.fetchall():
result[CateringMenuItem(
item_id=order_catering_menu_item_raw[1],
name=order_catering_menu_item_raw[4],
additional_info=order_catering_menu_item_raw[5],
price=Decimal(order_catering_menu_item_raw[6]),
category=CateringMenuItemCategory(order_catering_menu_item_raw[7]),
is_disabled=bool(order_catering_menu_item_raw[8])
)] = order_catering_menu_item_raw[2]
return result
async def set_user_profile_picture(self, user_id: int, picture_data: bytes) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO user_profile_picture (user_id, picture) VALUES (%s, %s) ON DUPLICATE KEY UPDATE picture = VALUES(picture)",
(user_id, picture_data)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.set_user_profile_picture(user_id, picture_data)
except Exception as e:
logger.warning(f"Error setting user profile picture: {e}")
async def get_user_profile_picture(self, user_id: int) -> Optional[bytes]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("SELECT (picture) FROM user_profile_picture WHERE user_id = %s", (user_id,))
await conn.commit()
r = await cursor.fetchone()
if r is None:
return
return r[0]
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_user_profile_picture(user_id)
except Exception as e:
logger.warning(f"Error setting user profile picture: {e}")
return None
async def get_all_users(self) -> list[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute("SELECT * FROM users;")
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_all_users()
except Exception as e:
logger.warning(f"Error getting all users: {e}")
return results
for user_raw in await cursor.fetchall():
results.append(self._map_db_result_to_user(user_raw))
return results
async def remove_profile_picture(self, user_id: int):
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"DELETE FROM user_profile_picture WHERE user_id = %s",
user_id
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.remove_profile_picture(user_id)
except Exception as e:
logger.warning(f"Error deleting user profile picture: {e}")