prerelease/0.6.0 (#1)
Co-authored-by: David Rodenkirchen <drodenkirchen@linetco.com> Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
@@ -0,0 +1,86 @@
|
||||
from asyncio import sleep
|
||||
from hashlib import sha256
|
||||
from typing import Optional
|
||||
from string import ascii_letters, digits
|
||||
|
||||
from pymongo.errors import DuplicateKeyError
|
||||
|
||||
from elm.types.User import User
|
||||
|
||||
|
||||
class NameNotAllowedError(Exception):
|
||||
def __init__(self, disallowed_char: str) -> None:
|
||||
self.disallowed_char = disallowed_char
|
||||
|
||||
|
||||
class MailAlreadyInUseError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class UserService:
|
||||
ALLOWED_USER_NAME_SYMBOLS = ascii_letters + digits + "!#$%&*+,-./:;<=>?[]^_{|}~"
|
||||
MAX_USERNAME_LENGTH = 14
|
||||
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
async def get_all_users() -> list[User]:
|
||||
return await User.find_all().to_list()
|
||||
|
||||
@staticmethod
|
||||
async def get_user(user_name: str) -> Optional[User]:
|
||||
return await User.find_one(User.user_name == user_name)
|
||||
|
||||
@staticmethod
|
||||
async def get_user_by_mail(mail: str) -> Optional[User]:
|
||||
return await User.find_one(User.user_mail == mail.lower())
|
||||
|
||||
@staticmethod
|
||||
async def get_user_picture(user_name: str) -> Optional[bytes]:
|
||||
user = await User.find_one(User.user_name == user_name)
|
||||
if user:
|
||||
return user.user_picture
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def change_user_password(user_name: str, new_password: str) -> bool:
|
||||
user = await User.find_one(User.user_name == user_name)
|
||||
if not user:
|
||||
return False
|
||||
user.user_password = sha256(new_password.encode(encoding="utf-8")).hexdigest()
|
||||
await user.save()
|
||||
return True
|
||||
|
||||
async def create_user(self, user_name: str, user_mail: str, password_clear_text: str) -> User:
|
||||
disallowed_char = self._check_for_disallowed_char(user_name)
|
||||
if disallowed_char:
|
||||
raise NameNotAllowedError(disallowed_char)
|
||||
|
||||
try:
|
||||
return await User(
|
||||
user_name=user_name,
|
||||
user_mail=user_mail.lower(),
|
||||
user_password=sha256(password_clear_text.encode(encoding="utf-8")).hexdigest()
|
||||
).insert()
|
||||
except DuplicateKeyError:
|
||||
raise MailAlreadyInUseError
|
||||
|
||||
|
||||
async def is_login_valid(self, user_name: str, password_clear_text: str) -> bool:
|
||||
user = await self.get_user(user_name)
|
||||
if not user:
|
||||
user = await self.get_user(user_name.lower()) # Migrated users had all lowercase names
|
||||
user_password_hash = sha256(password_clear_text.encode(encoding="utf-8")).hexdigest()
|
||||
if not user:
|
||||
return False
|
||||
if user.user_fallback_password and user.user_fallback_password == user_password_hash:
|
||||
return True
|
||||
return user.user_password == user_password_hash
|
||||
|
||||
|
||||
def _check_for_disallowed_char(self, name: str) -> Optional[str]:
|
||||
for c in name:
|
||||
if c not in self.ALLOWED_USER_NAME_SYMBOLS:
|
||||
return c
|
||||
return None
|
||||
Reference in New Issue
Block a user