ezgg-lan-manager/src/ez_lan_manager/services/DatabaseService.py
tcprod a419ee8885 Replace float with Decimal for price calculations
Fix Decimal precision issue

Fix Decimal precision issue

Fix Decimal precision issue

Fix old prices for tickets

Fix Decimal precision issue
2025-02-07 23:20:57 +01:00

791 lines
37 KiB
Python

import logging
from datetime import date, datetime
from typing import Optional
from decimal import Decimal
import aiomysql
from src.ez_lan_manager.types.CateringOrder import CateringOrder
from src.ez_lan_manager.types.CateringMenuItem import CateringMenuItem, CateringMenuItemCategory
from src.ez_lan_manager.types.CateringOrder import CateringMenuItemsWithAmount, CateringOrderStatus
from src.ez_lan_manager.types.ConfigurationTypes import DatabaseConfiguration
from src.ez_lan_manager.types.News import News
from src.ez_lan_manager.types.Seat import Seat
from src.ez_lan_manager.types.Ticket import Ticket
from src.ez_lan_manager.types.Transaction import Transaction
from src.ez_lan_manager.types.User import User
logger = logging.getLogger(__name__.split(".")[-1])
class DuplicationError(Exception):
pass
class NoDatabaseConnectionError(Exception):
pass
class DatabaseService:
MAX_CONNECTION_RETRIES = 5
def __init__(self, database_config: DatabaseConfiguration) -> None:
self._database_config = database_config
self._connection_pool: Optional[aiomysql.Pool] = None
async def is_healthy(self) -> bool:
try:
async with self._connection_pool.acquire() as conn:
async with conn.cursor() as _:
return True
except aiomysql.OperationalError:
return False
except Exception as e:
logger.error(f"Failed to acquire a connection: {e}")
return False
async def init_db_pool(self) -> bool:
logger.info(
f"Connecting to database '{self._database_config.db_name}' on "
f"{self._database_config.db_user}@{self._database_config.db_host}:{self._database_config.db_port}"
)
try:
self._connection_pool = await aiomysql.create_pool(
host=self._database_config.db_host,
port=self._database_config.db_port,
user=self._database_config.db_user,
password=self._database_config.db_password,
db=self._database_config.db_name,
minsize=1,
maxsize=40
)
except aiomysql.OperationalError:
return False
return True
@staticmethod
def _map_db_result_to_user(data: tuple) -> User:
return User(
user_id=data[0],
user_name=data[1],
user_mail=data[2],
user_password=data[3],
user_first_name=data[4],
user_last_name=data[5],
user_birth_day=data[6],
is_active=bool(data[7]),
is_team_member=bool(data[8]),
is_admin=bool(data[9]),
created_at=data[10],
last_updated_at=data[11]
)
async def get_user_by_name(self, user_name: str) -> Optional[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
await cursor.execute("SELECT * FROM users WHERE user_name=%s", (user_name,))
result = await cursor.fetchone()
if not result:
return
return self._map_db_result_to_user(result)
async def get_user_by_id(self, user_id: int) -> Optional[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
await cursor.execute("SELECT * FROM users WHERE user_id=%s", (user_id,))
result = await cursor.fetchone()
if not result:
return
return self._map_db_result_to_user(result)
async def get_user_by_mail(self, user_mail: str) -> Optional[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
await cursor.execute("SELECT * FROM users WHERE user_mail=%s", (user_mail.lower(),))
result = await cursor.fetchone()
if not result:
return
return self._map_db_result_to_user(result)
async def create_user(self, user_name: str, user_mail: str, password_hash: str) -> User:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO users (user_name, user_mail, user_password) "
"VALUES (%s, %s, %s)", (user_name, user_mail.lower(), password_hash)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.create_user(user_name, user_mail, password_hash)
except aiomysql.IntegrityError as e:
logger.warning(f"Aborted duplication entry: {e}")
raise DuplicationError
return await self.get_user_by_name(user_name)
async def update_user(self, user: User) -> User:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE users SET user_name=%s, user_mail=%s, user_password=%s, user_first_name=%s, "
"user_last_name=%s, user_birth_date=%s, is_active=%s, is_team_member=%s, is_admin=%s "
"WHERE (user_id=%s)",
(user.user_name, user.user_mail.lower(), user.user_password,
user.user_first_name, user.user_last_name, user.user_birth_day,
user.is_active, user.is_team_member, user.is_admin,
user.user_id)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_user(user)
except aiomysql.IntegrityError as e:
logger.warning(f"Aborted duplication entry: {e}")
raise DuplicationError
return user
async def add_transaction(self, transaction: Transaction) -> Optional[Transaction]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO transactions (user_id, value, is_debit, transaction_date, transaction_reference) "
"VALUES (%s, %s, %s, %s, %s)",
(transaction.user_id, transaction.value, transaction.is_debit, transaction.transaction_date,
transaction.reference)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_transaction(transaction)
except Exception as e:
logger.warning(f"Error adding Transaction: {e}")
return
return transaction
async def get_all_transactions_for_user(self, user_id: int) -> list[Transaction]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
transactions = []
try:
await cursor.execute("SELECT * FROM transactions WHERE user_id=%s", (user_id,))
await conn.commit()
result = await cursor.fetchall()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_all_transactions_for_user(user_id)
except aiomysql.Error as e:
logger.error(f"Error getting all transactions for user: {e}")
return []
for transaction_raw in result:
transactions.append(Transaction(
user_id=user_id,
value=Decimal(transaction_raw[2]),
is_debit=bool(transaction_raw[3]),
transaction_date=transaction_raw[4],
reference=transaction_raw[5]
))
return transactions
async def add_news(self, news: News) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO news (news_content, news_title, news_subtitle, news_author, news_date) "
"VALUES (%s, %s, %s, %s, %s)",
(news.content, news.title, news.subtitle, news.author.user_id, news.news_date)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_news(news)
except Exception as e:
logger.warning(f"Error adding Transaction: {e}")
async def get_news(self, dt_start: date, dt_end: date) -> list[News]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute(
"SELECT * FROM news INNER JOIN users ON news.news_author = users.user_id WHERE news_date"
" BETWEEN %s AND %s;",
(dt_start, dt_end))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
print(self._connection_pool)
raise NoDatabaseConnectionError
return await self.get_news(dt_start, dt_end)
except Exception as e:
logger.warning(f"Error fetching news: {e}")
return []
for news_raw in await cursor.fetchall():
user = self._map_db_result_to_user(news_raw[6:])
results.append(News(
news_id=news_raw[0],
title=news_raw[2],
subtitle=news_raw[3],
author=user,
content=news_raw[1],
news_date=news_raw[5]
))
return results
async def update_news(self, news: News) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"""
UPDATE news
SET news_content = %s,
news_title = %s,
news_subtitle = %s,
news_author = %s,
news_date = %s
WHERE news_id = %s
""",
(news.content, news.title, news.subtitle, news.author.user_id, news.news_date, news.news_id)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_news(news)
except Exception as e:
logger.warning(f"Error updating news: {e}")
async def remove_news(self, news_id: int) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"DELETE FROM news WHERE news_id = %s",
(news_id,)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.remove_news(news_id)
except Exception as e:
logger.warning(f"Error removing news with ID {news_id}: {e}")
async def get_tickets(self) -> list[Ticket]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute("SELECT * FROM tickets INNER JOIN users ON tickets.user = users.user_id;", ())
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_tickets()
except Exception as e:
logger.warning(f"Error fetching tickets: {e}")
return []
for ticket_raw in await cursor.fetchall():
user = self._map_db_result_to_user(ticket_raw[3:])
results.append(Ticket(
ticket_id=ticket_raw[0],
category=ticket_raw[1],
purchase_date=ticket_raw[3],
owner=user
))
return results
async def get_ticket_for_user(self, user_id: int) -> Optional[Ticket]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"SELECT * FROM tickets INNER JOIN users ON tickets.user = users.user_id WHERE user_id=%s;",
(user_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_ticket_for_user(user_id)
except Exception as e:
logger.warning(f"Error fetching ticket for user: {e}")
return
result = await cursor.fetchone()
if not result:
return
user = self._map_db_result_to_user(result[3:])
return Ticket(
ticket_id=result[0],
category=result[1],
purchase_date=result[3],
owner=user
)
async def generate_ticket_for_user(self, user_id: int, category: str) -> Optional[Ticket]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("INSERT INTO tickets (ticket_category, user) VALUES (%s, %s)",
(category, user_id))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.generate_ticket_for_user(user_id, category)
except Exception as e:
logger.warning(f"Error generating ticket for user: {e}")
return
return await self.get_ticket_for_user(user_id)
async def change_ticket_owner(self, ticket_id: int, new_owner_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("UPDATE tickets SET user = %s WHERE ticket_id = %s;",
(new_owner_id, ticket_id))
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.change_ticket_owner(ticket_id, new_owner_id)
except Exception as e:
logger.warning(f"Error transferring ticket to user: {e}")
return False
return affected_rows > 0
async def delete_ticket(self, ticket_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("DELETE FROM tickets WHERE ticket_id = %s;", (ticket_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.change_ticket_owner(ticket_id)
except Exception as e:
logger.warning(f"Error deleting ticket: {e}")
return False
return True
async def generate_fresh_seats_table(self, seats: list[tuple[str, str]]) -> None:
""" WARNING: THIS WILL DELETE ALL EXISTING DATA! DO NOT USE ON PRODUCTION DATABASE! """
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("TRUNCATE seats;")
for seat in seats:
await cursor.execute("INSERT INTO seats (seat_id, seat_category) VALUES (%s, %s);",
(seat[0], seat[1]))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.generate_fresh_seats_table(seats)
except Exception as e:
logger.warning(f"Error generating fresh seats table: {e}")
return
async def get_seating_info(self) -> list[Seat]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute(
"SELECT seats.*, users.* FROM seats LEFT JOIN users ON seats.user = users.user_id;")
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_seating_info()
except Exception as e:
logger.warning(f"Error getting seats table: {e}")
return results
for seat_raw in await cursor.fetchall():
if seat_raw[3] is None: # Empty seat
results.append(Seat(seat_raw[0], bool(seat_raw[1]), seat_raw[2], None))
else:
user = self._map_db_result_to_user(seat_raw[4:])
results.append(Seat(seat_raw[0], bool(seat_raw[1]), seat_raw[2], user))
return results
async def seat_user(self, seat_id: str, user_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("UPDATE seats SET user = %s WHERE seat_id = %s;", (user_id, seat_id))
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.seat_user(seat_id, user_id)
except Exception as e:
logger.warning(f"Error seating user: {e}")
return False
return affected_rows > 0
async def get_menu_items(self) -> list[CateringMenuItem]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute("SELECT * FROM catering_menu_items;")
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_menu_items()
except Exception as e:
logger.warning(f"Error fetching menu items: {e}")
return results
for menu_item_raw in await cursor.fetchall():
results.append(CateringMenuItem(
item_id=menu_item_raw[0],
name=menu_item_raw[1],
additional_info=menu_item_raw[2],
price=Decimal(menu_item_raw[3]),
category=CateringMenuItemCategory(menu_item_raw[4]),
is_disabled=bool(menu_item_raw[5])
))
return results
async def get_menu_item(self, menu_item_id) -> Optional[CateringMenuItem]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("SELECT * FROM catering_menu_items WHERE catering_menu_item_id = %s;",
(menu_item_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_menu_item(menu_item_id)
except Exception as e:
logger.warning(f"Error fetching menu items: {e}")
return
raw_data = await cursor.fetchone()
if raw_data is None:
return
return CateringMenuItem(
item_id=raw_data[0],
name=raw_data[1],
additional_info=raw_data[2],
price=Decimal(raw_data[3]),
category=CateringMenuItemCategory(raw_data[4]),
is_disabled=bool(raw_data[5])
)
async def add_menu_item(self, name: str, info: str, price: Decimal, category: CateringMenuItemCategory,
is_disabled: bool = False) -> Optional[CateringMenuItem]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO catering_menu_items (name, additional_info, price, category, is_disabled) VALUES "
"(%s, %s, %s, %s, %s);",
(name, info, price, category.value, is_disabled)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_menu_item(name, info, price, category, is_disabled)
except Exception as e:
logger.warning(f"Error adding menu item: {e}")
return
return CateringMenuItem(
item_id=cursor.lastrowid,
name=name,
additional_info=info,
price=price,
category=category,
is_disabled=is_disabled
)
async def delete_menu_item(self, menu_item_id: int) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("DELETE FROM catering_menu_items WHERE catering_menu_item_id = %s;",
(menu_item_id,))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.delete_menu_item(menu_item_id)
except Exception as e:
logger.warning(f"Error deleting menu item: {e}")
return False
return cursor.affected_rows > 0
async def update_menu_item(self, updated_item: CateringMenuItem) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE catering_menu_items SET name = %s, additional_info = %s, price = %s, category = %s, "
"is_disabled = %s WHERE catering_menu_item_id = %s;",
(updated_item.name, updated_item.additional_info, updated_item.price,
updated_item.category.value, updated_item.is_disabled, updated_item.item_id)
)
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_menu_item(updated_item)
except Exception as e:
logger.warning(f"Error updating menu item: {e}")
return False
return affected_rows > 0
async def add_new_order(self, menu_items: CateringMenuItemsWithAmount, user_id: int, is_delivery: bool) -> Optional[CateringOrder]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
now = datetime.now()
try:
await cursor.execute(
"INSERT INTO orders (status, user, is_delivery, order_date) VALUES (%s, %s, %s, %s);",
(CateringOrderStatus.RECEIVED.value, user_id, is_delivery, now)
)
order_id = cursor.lastrowid
for menu_item, quantity in menu_items.items():
await cursor.execute(
"INSERT INTO order_catering_menu_item (order_id, catering_menu_item_id, quantity) VALUES "
"(%s, %s, %s);",
(order_id, menu_item.item_id, quantity)
)
await conn.commit()
return CateringOrder(
order_id=order_id,
order_date=now,
status=CateringOrderStatus.RECEIVED,
items=menu_items,
customer=await self.get_user_by_id(user_id),
is_delivery=is_delivery
)
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_new_order(menu_items, user_id, is_delivery)
except Exception as e:
logger.warning(f"Error placing order: {e}")
return
async def change_order_status(self, order_id: int, status: CateringOrderStatus) -> bool:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE orders SET status = %s WHERE order_id = %s;",
(status.value, order_id)
)
affected_rows = cursor.rowcount
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.change_order_status(order_id, status)
except Exception as e:
logger.warning(f"Error updating menu item: {e}")
return False
return affected_rows > 0
async def get_orders(self, user_id: Optional[int] = None, status: Optional[CateringOrderStatus] = None) -> list[CateringOrder]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
fetched_orders = []
query = "SELECT * FROM orders LEFT JOIN users ON orders.user = users.user_id"
if user_id is not None and status is None:
query += f" WHERE user = {user_id};"
elif status is not None and user_id is None:
query += f" WHERE status = '{status.value}';"
elif status is not None and user_id is not None:
query += f" WHERE user = {user_id} AND status = '{status.value}';"
else:
query += ";"
try:
await cursor.execute(query)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_orders(user_id, status)
except Exception as e:
logger.warning(f"Error getting orders: {e}")
return fetched_orders
for raw_order in await cursor.fetchall():
fetched_orders.append(
CateringOrder(
order_id=raw_order[0],
status=CateringOrderStatus(raw_order[1]),
customer=self._map_db_result_to_user(raw_order[5:]),
items=await self.get_menu_items_for_order(raw_order[0]),
is_delivery=bool(raw_order[4]),
order_date=raw_order[3],
)
)
return fetched_orders
async def get_menu_items_for_order(self, order_id: int) -> CateringMenuItemsWithAmount:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
result = {}
try:
await cursor.execute(
"SELECT * FROM order_catering_menu_item "
"LEFT JOIN catering_menu_items ON order_catering_menu_item.catering_menu_item_id = catering_menu_items.catering_menu_item_id "
"WHERE order_id = %s;",
(order_id,)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_menu_items_for_order(order_id)
except Exception as e:
logger.warning(f"Error getting order items: {e}")
return result
for order_catering_menu_item_raw in await cursor.fetchall():
result[CateringMenuItem(
item_id=order_catering_menu_item_raw[1],
name=order_catering_menu_item_raw[4],
additional_info=order_catering_menu_item_raw[5],
price=Decimal(order_catering_menu_item_raw[6]),
category=CateringMenuItemCategory(order_catering_menu_item_raw[7]),
is_disabled=bool(order_catering_menu_item_raw[8])
)] = order_catering_menu_item_raw[2]
return result
async def set_user_profile_picture(self, user_id: int, picture_data: bytes) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO user_profile_picture (user_id, picture) VALUES (%s, %s) ON DUPLICATE KEY UPDATE picture = VALUES(picture)",
(user_id, picture_data)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.set_user_profile_picture(user_id, picture_data)
except Exception as e:
logger.warning(f"Error setting user profile picture: {e}")
async def get_user_profile_picture(self, user_id: int) -> Optional[bytes]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute("SELECT (picture) FROM user_profile_picture WHERE user_id = %s", (user_id,))
await conn.commit()
r = await cursor.fetchone()
if r is None:
return
return r[0]
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_user_profile_picture(user_id)
except Exception as e:
logger.warning(f"Error setting user profile picture: {e}")
return None
async def get_all_users(self) -> list[User]:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
results = []
try:
await cursor.execute("SELECT * FROM users;")
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_all_users()
except Exception as e:
logger.warning(f"Error getting all users: {e}")
return results
for user_raw in await cursor.fetchall():
results.append(self._map_db_result_to_user(user_raw))
return results
async def remove_profile_picture(self, user_id: int):
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"DELETE FROM user_profile_picture WHERE user_id = %s",
user_id
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.remove_profile_picture(user_id)
except Exception as e:
logger.warning(f"Error deleting user profile picture: {e}")