84 lines
2.7 KiB
Python
84 lines
2.7 KiB
Python
from hashlib import sha256
|
|
from typing import Optional
|
|
from string import ascii_letters, digits
|
|
|
|
from pymongo.errors import DuplicateKeyError
|
|
|
|
from elm.types.User import User
|
|
|
|
|
|
class NameNotAllowedError(Exception):
|
|
def __init__(self, disallowed_char: str) -> None:
|
|
self.disallowed_char = disallowed_char
|
|
|
|
|
|
class MailAlreadyInUseError(Exception):
|
|
pass
|
|
|
|
|
|
class UserService:
|
|
ALLOWED_USER_NAME_SYMBOLS = ascii_letters + digits + "!#$%&*+,-./:;<=>?[]^_{|}~"
|
|
MAX_USERNAME_LENGTH = 14
|
|
|
|
def __init__(self) -> None:
|
|
pass
|
|
|
|
@staticmethod
|
|
async def get_all_users() -> list[User]:
|
|
return await User.find_all().to_list()
|
|
|
|
@staticmethod
|
|
async def get_user(user_name: str) -> Optional[User]:
|
|
return await User.find_one(User.user_name == user_name)
|
|
|
|
@staticmethod
|
|
async def get_user_by_mail(mail: str) -> Optional[User]:
|
|
return await User.find_one(User.user_mail == mail.lower())
|
|
|
|
@staticmethod
|
|
async def get_user_picture(user_name: str) -> Optional[bytes]:
|
|
user = await User.find_one(User.user_name == user_name)
|
|
if user:
|
|
return user.user_picture
|
|
return None
|
|
|
|
@staticmethod
|
|
async def change_user_password(user_name: str, new_password: str) -> bool:
|
|
user = await User.find_one(User.user_name == user_name)
|
|
if not user:
|
|
return False
|
|
user.user_password = sha256(new_password.encode(encoding="utf-8")).hexdigest()
|
|
await user.save()
|
|
return True
|
|
|
|
async def create_user(self, user_name: str, user_mail: str, password_clear_text: str) -> User:
|
|
disallowed_char = self._check_for_disallowed_char(user_name)
|
|
if disallowed_char:
|
|
raise NameNotAllowedError(disallowed_char)
|
|
|
|
try:
|
|
return await User(
|
|
user_name=user_name,
|
|
user_mail=user_mail.lower(),
|
|
user_password=sha256(password_clear_text.encode(encoding="utf-8")).hexdigest()
|
|
).insert()
|
|
except DuplicateKeyError:
|
|
raise MailAlreadyInUseError
|
|
|
|
|
|
async def is_login_valid(self, user_name: str, password_clear_text: str) -> bool:
|
|
user = await self.get_user(user_name)
|
|
user_password_hash = sha256(password_clear_text.encode(encoding="utf-8")).hexdigest()
|
|
if not user:
|
|
return False
|
|
if user.user_fallback_password and user.user_fallback_password == user_password_hash:
|
|
return True
|
|
return user.user_password == user_password_hash
|
|
|
|
|
|
def _check_for_disallowed_char(self, name: str) -> Optional[str]:
|
|
for c in name:
|
|
if c not in self.ALLOWED_USER_NAME_SYMBOLS:
|
|
return c
|
|
return None
|