1753d67752
Co-authored-by: David Rodenkirchen <drodenkirchen@linetco.com> Reviewed-on: #1
218 lines
8.5 KiB
Python
218 lines
8.5 KiB
Python
import io
|
|
import logging
|
|
from decimal import Decimal, ROUND_DOWN
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
import qrcode
|
|
|
|
from elm.types import Transaction, User, PayPalConfiguration
|
|
from elm.services import MailingService, ConfigurationService
|
|
|
|
logger = logging.getLogger(__name__.split(".")[-1])
|
|
|
|
|
|
class InsufficientFundsError(Exception):
|
|
pass
|
|
|
|
|
|
class AccountingService:
|
|
PAYPAL_SANDBOX_URL = "https://api-m.sandbox.paypal.com"
|
|
PAYPAL_PROD_URL = "https://api-m.paypal.com"
|
|
|
|
def __init__(self, configuration_service: ConfigurationService, mailing_service: MailingService) -> None:
|
|
self._configuration_service = configuration_service
|
|
self._paypal_config: PayPalConfiguration = configuration_service.get_paypal_configuration()
|
|
self._pending_paypal_orders: dict[str, tuple[str, Decimal]] = {}
|
|
self._mailing_service = mailing_service
|
|
|
|
async def has_user_open_orders(self, user_name: str) -> bool:
|
|
for pending_paypal_order in self._pending_paypal_orders.values():
|
|
if pending_paypal_order[0] == user_name:
|
|
return True
|
|
return False
|
|
|
|
async def get_paypal_access_token(self) -> str:
|
|
url = self.PAYPAL_SANDBOX_URL if self._configuration_service.DEV_MODE_ACTIVE else self.PAYPAL_PROD_URL
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{url}/v1/oauth2/token",
|
|
auth=(
|
|
self._paypal_config.client_id_sandbox if self._configuration_service.DEV_MODE_ACTIVE else self._paypal_config.client_id,
|
|
self._paypal_config.secret_sandbox if self._configuration_service.DEV_MODE_ACTIVE else self._paypal_config.secret,
|
|
),
|
|
headers={
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
},
|
|
data={
|
|
"grant_type": "client_credentials"
|
|
}
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
return data["access_token"]
|
|
|
|
async def start_paypal_process(self, user_name: str, amount: Decimal) -> str:
|
|
url = self.PAYPAL_SANDBOX_URL if self._configuration_service.DEV_MODE_ACTIVE else self.PAYPAL_PROD_URL
|
|
return_domain = "http://localhost:8000" if self._configuration_service.DEV_MODE_ACTIVE else self._configuration_service.BASE_URL
|
|
amount = amount.quantize(Decimal(".01"))
|
|
async with httpx.AsyncClient() as client:
|
|
access_token = await self.get_paypal_access_token()
|
|
response = await client.post(
|
|
url=f"{url}/v2/checkout/orders/",
|
|
headers={
|
|
"Authorization": f"Bearer {access_token}"
|
|
},
|
|
json={
|
|
"intent": "CAPTURE",
|
|
"purchase_units": [
|
|
{
|
|
"custom_id": user_name,
|
|
"amount": {
|
|
"currency_code": "EUR",
|
|
"value": str(amount)
|
|
}
|
|
}
|
|
],
|
|
"payment_source": {
|
|
"paypal": {
|
|
"experience_context": {
|
|
"return_url": f"{return_domain}/return-paypal",
|
|
"cancel_url": f"{return_domain}/cancel-paypal",
|
|
"user_action": "PAY_NOW",
|
|
"shipping_preference": "NO_SHIPPING"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
try:
|
|
payer_action_url = next(
|
|
link["href"]
|
|
for link in response.json()["links"]
|
|
if link["rel"] == "payer-action"
|
|
)
|
|
except StopIteration:
|
|
logger.error("No payer action url found: %s", response.text)
|
|
return "#"
|
|
|
|
self._pending_paypal_orders[response.json()["id"]] = (user_name, amount)
|
|
|
|
return payer_action_url
|
|
|
|
async def finalize_paypal_process(self, order_id: str) -> bool:
|
|
url = self.PAYPAL_SANDBOX_URL if self._configuration_service.DEV_MODE_ACTIVE else self.PAYPAL_PROD_URL
|
|
async with httpx.AsyncClient() as client:
|
|
access_token = await self.get_paypal_access_token()
|
|
response = await client.get(
|
|
url=f"{url}/v2/checkout/orders/{order_id}",
|
|
headers={
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
is_approved = response.json()["status"] == "APPROVED"
|
|
|
|
if is_approved:
|
|
response = await client.post(
|
|
f"{url}/v2/checkout/orders/{order_id}/capture",
|
|
headers={
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
)
|
|
is_completed = response.json()["status"] == "COMPLETED"
|
|
if is_completed:
|
|
await self.add_balance(self._pending_paypal_orders[order_id][0], self._pending_paypal_orders[order_id][1], "PayPal Aufladung")
|
|
self._pending_paypal_orders.pop(order_id)
|
|
return True
|
|
return False
|
|
|
|
async def add_balance(self, user_name: str, balance_to_add: Decimal, title: str) -> Decimal:
|
|
user = await User.find_one(User.user_name == user_name)
|
|
if not user:
|
|
raise KeyError("User does not exist")
|
|
await Transaction(
|
|
user_name=user_name,
|
|
value=balance_to_add,
|
|
is_debit=False,
|
|
title=title
|
|
).save()
|
|
logger.debug(f"Added balance of {self.make_euro_string_from_decimal(balance_to_add)} to user '{user_name}'")
|
|
new_balance = await self.get_balance(user_name)
|
|
await self._mailing_service.send_email(
|
|
"Dein Guthaben wurde aufgeladen",
|
|
self._mailing_service.generate_account_balance_added_mail_body(user, balance_to_add, new_balance),
|
|
user.user_mail
|
|
)
|
|
return new_balance
|
|
|
|
async def remove_balance(self, user_name: str, balance_to_remove: Decimal, title: str) -> Decimal:
|
|
current_balance = await self.get_balance(user_name)
|
|
if (current_balance - balance_to_remove) < 0:
|
|
raise InsufficientFundsError
|
|
|
|
await Transaction(
|
|
user_name=user_name,
|
|
value=balance_to_remove,
|
|
is_debit=True,
|
|
title=title
|
|
).save()
|
|
logger.debug(
|
|
f"Removed balance of {self.make_euro_string_from_decimal(balance_to_remove)} from user '{user_name}'")
|
|
return await self.get_balance(user_name)
|
|
|
|
async def get_balance(self, user_name: str) -> Decimal:
|
|
balance_buffer = Decimal("0")
|
|
for transaction in await self.get_transaction_history(user_name):
|
|
if transaction.is_debit:
|
|
balance_buffer -= transaction.value
|
|
else:
|
|
balance_buffer += transaction.value
|
|
return balance_buffer
|
|
|
|
@staticmethod
|
|
async def get_transaction_history(user_name: str) -> list[Transaction]:
|
|
user = await User.find_one(User.user_name == user_name)
|
|
if not user:
|
|
raise KeyError("User does not exist")
|
|
return await Transaction.find_many(Transaction.user_name == user_name).to_list()
|
|
|
|
@staticmethod
|
|
def make_euro_string_from_decimal(euros: Optional[Decimal]) -> str:
|
|
"""
|
|
Internally, all money values are euros as decimal. Only when showing them to the user we generate a string.
|
|
"""
|
|
if euros is None:
|
|
return "0.00 €"
|
|
rounded_decimal = str(euros.quantize(Decimal(".01"), rounding=ROUND_DOWN))
|
|
return f"{rounded_decimal} €"
|
|
|
|
@staticmethod
|
|
def make_payment_qr_image(beneficiary_name, beneficiary_bic, beneficiary_iban, text, amount_euros=None) -> bytes:
|
|
text = text.replace("\n", ";")
|
|
amount_formatted = "EUR{:.2f}".format(amount_euros) if amount_euros else ""
|
|
epc_text = f"""BCD
|
|
002
|
|
1
|
|
SCT
|
|
{beneficiary_bic}
|
|
{beneficiary_name}
|
|
{beneficiary_iban}
|
|
{amount_formatted}
|
|
|
|
|
|
{text}
|
|
"""
|
|
qr = qrcode.QRCode(
|
|
version=6,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_M,
|
|
)
|
|
qr.add_data(epc_text)
|
|
img = qr.make_image()
|
|
img_bytes = io.BytesIO()
|
|
img.save(img_bytes)
|
|
return img_bytes.getvalue()
|