initial commit

This commit is contained in:
David Rodenkirchen
2026-05-20 22:51:52 +02:00
parent 45ad5f164a
commit 85619feed5
43 changed files with 2592 additions and 0 deletions
+150
View File
@@ -0,0 +1,150 @@
import io
import logging
from decimal import Decimal, ROUND_DOWN
from typing import Optional
import httpx
import qrcode
from elm.types import Transaction, User, PayPalConfiguration
logger = logging.getLogger(__name__.split(".")[-1])
class InsufficientFundsError(Exception):
pass
class AccountingService:
def __init__(self, paypal_config: PayPalConfiguration) -> None:
self._paypal_config = paypal_config
async def get_paypal_access_token(self) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api-m.sandbox.paypal.com/v1/oauth2/token",
auth=(
self._paypal_config.client_id_sandbox,
self._paypal_config.secret_sandbox,
),
headers={
"Content-Type": "application/x-www-form-urlencoded",
},
data={
"grant_type": "client_credentials"
}
)
response.raise_for_status()
data = response.json()
return data["access_token"]
async def start_paypal_process(self, user_name: str, amount: Decimal) -> str:
async with httpx.AsyncClient() as client:
access_token = await self.get_paypal_access_token()
response = await client.post(
url="https://api-m.sandbox.paypal.com/v2/checkout/orders/",
headers={
"Authorization": f"Bearer {access_token}"
},
json={
"intent": "CAPTURE",
"purchase_units": [
{
"custom_id": user_name,
"amount": {
"currency_code": "EUR",
"value": str(amount)
}
}
]
}
)
approval_url = next(
link["href"]
for link in response.json()["links"]
if link["rel"] == "approve"
)
return approval_url
async def add_balance(self, user_name: str, balance_to_add: Decimal, title: str) -> Decimal:
user = await User.find_one(User.user_name == user_name)
if not user:
raise KeyError("User does not exist")
await Transaction(
user_name=user_name,
value=balance_to_add,
is_debit=False,
title=title
).save()
logger.debug(f"Added balance of {self.make_euro_string_from_decimal(balance_to_add)} to user '{user_name}'")
return await self.get_balance(user_name)
async def remove_balance(self, user_name: str, balance_to_remove: Decimal, title: str) -> Decimal:
current_balance = await self.get_balance(user_name)
if (current_balance - balance_to_remove) < 0:
raise InsufficientFundsError
await Transaction(
user_name=user_name,
value=balance_to_remove,
is_debit=True,
title=title
).save()
logger.debug(
f"Removed balance of {self.make_euro_string_from_decimal(balance_to_remove)} from user '{user_name}'")
return await self.get_balance(user_name)
async def get_balance(self, user_name: str) -> Decimal:
balance_buffer = Decimal("0")
for transaction in await self.get_transaction_history(user_name):
if transaction.is_debit:
balance_buffer -= transaction.value
else:
balance_buffer += transaction.value
return balance_buffer
@staticmethod
async def get_transaction_history(user_name: str) -> list[Transaction]:
user = await User.find_one(User.user_name == user_name)
if not user:
raise KeyError("User does not exist")
return await Transaction.find_many(Transaction.user_name == user_name).to_list()
@staticmethod
def make_euro_string_from_decimal(euros: Optional[Decimal]) -> str:
"""
Internally, all money values are euros as decimal. Only when showing them to the user we generate a string.
"""
if euros is None:
return "0.00 €"
rounded_decimal = str(euros.quantize(Decimal(".01"), rounding=ROUND_DOWN))
return f"{rounded_decimal}"
@staticmethod
def make_payment_qr_image(beneficiary_name, beneficiary_bic, beneficiary_iban, text, amount_euros=None) -> bytes:
text = text.replace("\n", ";")
amount_formatted = "EUR{:.2f}".format(amount_euros) if amount_euros else ""
epc_text = f"""BCD
002
1
SCT
{beneficiary_bic}
{beneficiary_name}
{beneficiary_iban}
{amount_formatted}
{text}
"""
qr = qrcode.QRCode(
version=6,
error_correction=qrcode.constants.ERROR_CORRECT_M,
)
qr.add_data(epc_text)
img = qr.make_image()
img_bytes = io.BytesIO()
img.save(img_bytes)
return img_bytes.getvalue()
+119
View File
@@ -0,0 +1,119 @@
import sys
from datetime import datetime
from pathlib import Path
import logging
import tomllib
from from_root import from_root
from elm.types.ConfigurationTypes import MailingServiceConfiguration, LanInfo, ReceiptPrintingConfiguration, DatabaseConfiguration, PayPalConfiguration
logger = logging.getLogger(__name__.split(".")[-1])
logger.setLevel(logging.DEBUG)
class ConfigurationService:
def __init__(self, config_file_path: Path) -> None:
try:
with open(from_root("VERSION"), "r") as version_file:
self._version = version_file.read().strip()
except FileNotFoundError:
logger.warning("Could not find VERSION file, defaulting to '0.0.0'")
self._version = "0.0.0"
try:
with open(config_file_path, "rb") as config_file:
self._config = tomllib.load(config_file)
except FileNotFoundError:
logger.fatal(f"Could not find config file at \"{config_file_path}\", exiting...")
exit(1)
try:
self._DEFAULT_PROFILE_PICTURE = self._preload_default_profile_picture()
except FileNotFoundError:
logger.fatal("Could not find default profile picture, exiting...")
exit(1)
def get_paypal_configuration(self) -> PayPalConfiguration:
try:
return PayPalConfiguration(
client_id_sandbox=self._config["paypal"]["client_id_sandbox"],
secret_sandbox=self._config["paypal"]["secret_sandbox"],
client_id=self._config["paypal"]["client_id"],
secret=self._config["paypal"]["secret"]
)
except KeyError:
logger.fatal("Error loading DatabaseConfiguration, exiting...")
sys.exit(1)
def get_database_configuration(self) -> DatabaseConfiguration:
try:
return DatabaseConfiguration(
database_address=self._config["database"]["database_address"],
database_name=self._config["database"]["database_name"],
)
except KeyError:
logger.fatal("Error loading DatabaseConfiguration, exiting...")
sys.exit(1)
def get_mailing_service_configuration(self) -> MailingServiceConfiguration:
try:
mailing_configuration = self._config["mailing"]
return MailingServiceConfiguration(
smtp_server=mailing_configuration["smtp_server"],
smtp_port=mailing_configuration["smtp_port"],
sender=mailing_configuration["sender"],
username=mailing_configuration["username"],
password=mailing_configuration["password"]
)
except KeyError:
logger.fatal("Error loading MailingServiceConfiguration, exiting...")
sys.exit(1)
def get_lan_info(self) -> LanInfo:
try:
lan_info = self._config["lan"]
return LanInfo(
name=lan_info["name"],
iteration=lan_info["iteration"],
date_from=datetime.strptime(lan_info["date_from"], "%Y-%m-%d %H:%M:%S"),
date_till=datetime.strptime(lan_info["date_till"], "%Y-%m-%d %H:%M:%S"),
organizer_mail=lan_info["organizer_mail"],
internet_speed_mbs=lan_info["internet_speed_mbs"],
has_wifi=lan_info["has_wifi"],
has_showers=lan_info["has_showers"],
ts3_address=lan_info["ts3_address"],
discord_invite_link=lan_info["discord_invite_link"]
)
except KeyError:
logger.fatal("Error loading LAN Info, exiting...")
sys.exit(1)
def get_receipt_printing_configuration(self) -> ReceiptPrintingConfiguration:
try:
receipt_printing_configuration = self._config["receipt_printing"]
return ReceiptPrintingConfiguration(
host=receipt_printing_configuration["host"],
port=receipt_printing_configuration["port"],
order_print_endpoint=receipt_printing_configuration["order_print_endpoint"],
password=receipt_printing_configuration["password"]
)
except KeyError:
logger.fatal("Error loading Receipt Printing Configuration, exiting...")
sys.exit(1)
def _preload_default_profile_picture(self) -> bytes:
with open(from_root(self._config["misc"]["default_profile_picture"]), "rb") as file:
return file.read()
@property
def APP_VERSION(self) -> str:
return self._version
@property
def DEV_MODE_ACTIVE(self) -> bool:
return self._config["misc"]["dev_mode_active"]
@property
def DEFAULT_PROFILE_PICTURE(self) -> bytes:
return self._DEFAULT_PROFILE_PICTURE
+37
View File
@@ -0,0 +1,37 @@
import logging
from beanie import init_beanie
from pymongo import AsyncMongoClient
from pymongo.asynchronous.collection import AsyncCollection
from elm.types import User, Transaction
from elm.types.ConfigurationTypes import DatabaseConfiguration
logger = logging.getLogger(__name__.split(".")[-1])
logger.setLevel(logging.DEBUG)
class DuplicationError(Exception):
pass
class NoDatabaseConnectionError(Exception):
pass
class DatabaseService:
def __init__(self, db_config: DatabaseConfiguration) -> None:
self._db_config = db_config
self._client = None
self._database = None
self._users = None
async def initialize(self) -> None:
if self._client is None:
self._client = AsyncMongoClient(self._db_config.database_address)
self._database = self._client[self._db_config.database_name]
self._users: AsyncCollection = self._database["users"]
await init_beanie(
database=self._database,
document_models=[User, Transaction]
)
+26
View File
@@ -0,0 +1,26 @@
import secrets
from typing import Optional
from rio import UserSettings
from elm.types.UserSession import UserSession
class LocalData(UserSettings):
stored_session_token: Optional[str] = None
class LocalDataService:
def __init__(self) -> None:
self._session: dict[str, UserSession] = {}
def verify_token(self, token: str) -> Optional[UserSession]:
return self._session.get(token)
def set_session(self, session: UserSession) -> str:
key = secrets.token_hex(32)
self._session[key] = session
return key
def del_session(self, token: Optional[str]) -> None:
if token is not None:
self._session.pop(token, None)
+55
View File
@@ -0,0 +1,55 @@
import logging
from decimal import Decimal
from email.message import EmailMessage
from asyncio import sleep
import aiosmtplib
from elm.services.ConfigurationService import ConfigurationService
from elm.types.User import User
logger = logging.getLogger(__name__.split(".")[-1])
logger.setLevel(logging.DEBUG)
class MailingService:
def __init__(self, configuration_service: ConfigurationService):
self._configuration_service = configuration_service
self._config = self._configuration_service.get_mailing_service_configuration()
async def send_email(self, subject: str, body: str, receiver: str) -> None:
if self._configuration_service.DEV_MODE_ACTIVE:
logger.info(f"Skipped sending mail to {receiver} because demo mode is active.")
logger.info(f"Subject: {subject}")
logger.info(f"Receiver: {receiver}")
logger.info(f"Body: {body}")
await sleep(1)
return
try:
message = EmailMessage()
message["From"] = self._config.sender
message["To"] = receiver
message["Subject"] = subject
message.set_content(body)
await aiosmtplib.send(
message,
hostname=self._config.smtp_server,
port=self._config.smtp_port,
username=self._config.username,
password=self._config.password
)
except Exception as e:
logger.error(f"Failed to send email: {e}")
def generate_account_balance_added_mail_body(self, user: User, added_balance: Decimal, total_balance: Decimal) -> str:
return f"""
Hallo {user.user_name},
deinem Account wurden {added_balance:.2f} € hinzugefügt. Dein neues Guthaben beträgt nun {total_balance:.2f} €.
Wenn du zu dieser Aufladung Fragen hast, stehen wir dir in unserem Discord Server oder per Mail an {self._configuration_service.get_lan_info().organizer_mail} zur Verfügung.
Liebe Grüße
Dein {self._configuration_service.get_lan_info().name} Team
"""
+84
View File
@@ -0,0 +1,84 @@
from asyncio import sleep
from hashlib import sha256
from typing import Optional
from string import ascii_letters, digits
from pymongo.errors import DuplicateKeyError
from elm.types.User import User
class NameNotAllowedError(Exception):
def __init__(self, disallowed_char: str) -> None:
self.disallowed_char = disallowed_char
class MailAlreadyInUseError(Exception):
pass
class UserService:
ALLOWED_USER_NAME_SYMBOLS = ascii_letters + digits + "!#$%&*+,-./:;<=>?[]^_{|}~"
MAX_USERNAME_LENGTH = 14
def __init__(self) -> None:
pass
@staticmethod
async def get_all_users() -> list[User]:
return await User.find_all().to_list()
@staticmethod
async def get_user(user_name: str) -> Optional[User]:
return await User.find_one(User.user_name == user_name)
@staticmethod
async def get_user_by_mail(mail: str) -> Optional[User]:
return await User.find_one(User.user_mail == mail.lower())
@staticmethod
async def get_user_picture(user_name: str) -> Optional[bytes]:
user = await User.find_one(User.user_name == user_name)
if user:
return user.user_picture
return None
@staticmethod
async def change_user_password(user_name: str, new_password: str) -> bool:
user = await User.find_one(User.user_name == user_name)
if not user:
return False
user.user_password = sha256(new_password.encode(encoding="utf-8")).hexdigest()
await user.save()
return True
async def create_user(self, user_name: str, user_mail: str, password_clear_text: str) -> User:
disallowed_char = self._check_for_disallowed_char(user_name)
if disallowed_char:
raise NameNotAllowedError(disallowed_char)
try:
return await User(
user_name=user_name,
user_mail=user_mail.lower(),
user_password=sha256(password_clear_text.encode(encoding="utf-8")).hexdigest()
).insert()
except DuplicateKeyError:
raise MailAlreadyInUseError
async def is_login_valid(self, user_name: str, password_clear_text: str) -> bool:
user = await self.get_user(user_name)
user_password_hash = sha256(password_clear_text.encode(encoding="utf-8")).hexdigest()
if not user:
return False
if user.user_fallback_password and user.user_fallback_password == user_password_hash:
return True
return user.user_password == user_password_hash
def _check_for_disallowed_char(self, name: str) -> Optional[str]:
for c in name:
if c not in self.ALLOWED_USER_NAME_SYMBOLS:
return c
return None
+6
View File
@@ -0,0 +1,6 @@
from .ConfigurationService import ConfigurationService
from .DatabaseService import DatabaseService
from .UserService import UserService, NameNotAllowedError, MailAlreadyInUseError
from .LocalDataService import LocalData, LocalDataService
from .MailingService import MailingService
from .AccountingService import AccountingService