import io import logging from decimal import Decimal, ROUND_DOWN from typing import Optional import httpx import qrcode from elm.types import Transaction, User, PayPalConfiguration from elm.services import MailingService, ConfigurationService logger = logging.getLogger(__name__.split(".")[-1]) class InsufficientFundsError(Exception): pass class AccountingService: PAYPAL_SANDBOX_URL = "https://api-m.sandbox.paypal.com" PAYPAL_PROD_URL = "https://api-m.paypal.com" def __init__(self, configuration_service: ConfigurationService, mailing_service: MailingService) -> None: self._configuration_service = configuration_service self._paypal_config: PayPalConfiguration = configuration_service.get_paypal_configuration() self._pending_paypal_orders: dict[str, tuple[str, Decimal]] = {} self._mailing_service = mailing_service async def has_user_open_orders(self, user_name: str) -> bool: for pending_paypal_order in self._pending_paypal_orders.values(): if pending_paypal_order[0] == user_name: return True return False async def get_paypal_access_token(self) -> str: url = self.PAYPAL_SANDBOX_URL if self._configuration_service.DEV_MODE_ACTIVE else self.PAYPAL_PROD_URL async with httpx.AsyncClient() as client: response = await client.post( f"{url}/v1/oauth2/token", auth=( self._paypal_config.client_id_sandbox if self._configuration_service.DEV_MODE_ACTIVE else self._paypal_config.client_id, self._paypal_config.secret_sandbox if self._configuration_service.DEV_MODE_ACTIVE else self._paypal_config.secret, ), headers={ "Content-Type": "application/x-www-form-urlencoded", }, data={ "grant_type": "client_credentials" } ) response.raise_for_status() data = response.json() return data["access_token"] async def start_paypal_process(self, user_name: str, amount: Decimal) -> str: url = self.PAYPAL_SANDBOX_URL if self._configuration_service.DEV_MODE_ACTIVE else self.PAYPAL_PROD_URL return_domain = "http://localhost:8000" if self._configuration_service.DEV_MODE_ACTIVE else self._configuration_service.BASE_URL amount = amount.quantize(Decimal(".01")) async with httpx.AsyncClient() as client: access_token = await self.get_paypal_access_token() response = await client.post( url=f"{url}/v2/checkout/orders/", headers={ "Authorization": f"Bearer {access_token}" }, json={ "intent": "CAPTURE", "purchase_units": [ { "custom_id": user_name, "amount": { "currency_code": "EUR", "value": str(amount) } } ], "payment_source": { "paypal": { "experience_context": { "return_url": f"{return_domain}/return-paypal", "cancel_url": f"{return_domain}/cancel-paypal", "user_action": "PAY_NOW", "shipping_preference": "NO_SHIPPING" } } } } ) try: payer_action_url = next( link["href"] for link in response.json()["links"] if link["rel"] == "payer-action" ) except StopIteration: logger.error("No payer action url found: %s", response.text) return "#" self._pending_paypal_orders[response.json()["id"]] = (user_name, amount) return payer_action_url async def finalize_paypal_process(self, order_id: str) -> bool: url = self.PAYPAL_SANDBOX_URL if self._configuration_service.DEV_MODE_ACTIVE else self.PAYPAL_PROD_URL async with httpx.AsyncClient() as client: access_token = await self.get_paypal_access_token() response = await client.get( url=f"{url}/v2/checkout/orders/{order_id}", headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } ) is_approved = response.json()["status"] == "APPROVED" if is_approved: response = await client.post( f"{url}/v2/checkout/orders/{order_id}/capture", headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", } ) is_completed = response.json()["status"] == "COMPLETED" if is_completed: await self.add_balance(self._pending_paypal_orders[order_id][0], self._pending_paypal_orders[order_id][1], "PayPal Aufladung") self._pending_paypal_orders.pop(order_id) return True return False async def add_balance(self, user_name: str, balance_to_add: Decimal, title: str) -> Decimal: user = await User.find_one(User.user_name == user_name) if not user: raise KeyError("User does not exist") await Transaction( user_name=user_name, value=balance_to_add, is_debit=False, title=title ).save() logger.debug(f"Added balance of {self.make_euro_string_from_decimal(balance_to_add)} to user '{user_name}'") new_balance = await self.get_balance(user_name) await self._mailing_service.send_email( "Dein Guthaben wurde aufgeladen", self._mailing_service.generate_account_balance_added_mail_body(user, balance_to_add, new_balance), user.user_mail ) return new_balance async def remove_balance(self, user_name: str, balance_to_remove: Decimal, title: str) -> Decimal: current_balance = await self.get_balance(user_name) if (current_balance - balance_to_remove) < 0: raise InsufficientFundsError await Transaction( user_name=user_name, value=balance_to_remove, is_debit=True, title=title ).save() logger.debug( f"Removed balance of {self.make_euro_string_from_decimal(balance_to_remove)} from user '{user_name}'") return await self.get_balance(user_name) async def get_balance(self, user_name: str) -> Decimal: balance_buffer = Decimal("0") for transaction in await self.get_transaction_history(user_name): if transaction.is_debit: balance_buffer -= transaction.value else: balance_buffer += transaction.value return balance_buffer @staticmethod async def get_transaction_history(user_name: str) -> list[Transaction]: user = await User.find_one(User.user_name == user_name) if not user: raise KeyError("User does not exist") return await Transaction.find_many(Transaction.user_name == user_name).to_list() @staticmethod def make_euro_string_from_decimal(euros: Optional[Decimal]) -> str: """ Internally, all money values are euros as decimal. Only when showing them to the user we generate a string. """ if euros is None: return "0.00 €" rounded_decimal = str(euros.quantize(Decimal(".01"), rounding=ROUND_DOWN)) return f"{rounded_decimal} €" @staticmethod def make_payment_qr_image(beneficiary_name, beneficiary_bic, beneficiary_iban, text, amount_euros=None) -> bytes: text = text.replace("\n", ";") amount_formatted = "EUR{:.2f}".format(amount_euros) if amount_euros else "" epc_text = f"""BCD 002 1 SCT {beneficiary_bic} {beneficiary_name} {beneficiary_iban} {amount_formatted} {text} """ qr = qrcode.QRCode( version=6, error_correction=qrcode.constants.ERROR_CORRECT_M, ) qr.add_data(epc_text) img = qr.make_image() img_bytes = io.BytesIO() img.save(img_bytes) return img_bytes.getvalue()