Homepage/ezgg_website/services/authentication_service.py
David Rodenkirchen 85f7364ed8 add auth service
2024-06-03 09:05:02 +02:00

202 lines
7.8 KiB
Python

import logging
import warnings
from dataclasses import dataclass
from enum import IntEnum
from hashlib import sha256
from pathlib import Path
from typing import Type, Union, Self, Optional
__author__ = "David 'Typhus' Rodenkirchen"
__license__ = "WTFPL @ wtfpl.net"
__email__ = "vorstand@ezgg-ev.de"
"""
This simple authentication service is meant to be dropped into a Rio application.
Instantiate the service, add it to the rio.App attachments and implement the following session hook:
```
authentication_service = FileBasedAuthenticationService("/path/to/pw_file.txt")
rio.App(
[...]
default_attachments=[authentication_service],
on_session_start= lambda session: session.authentication_status = AuthenticationStatus(None, None)
)
```
You can log the user in on a session like this:
```
permission_level = self.authentication_service.authenticate(username, password)
if permission_level:
self.session.authentication_status = AuthenticationStatus(username, permission_level)
```
You can also perform a logout on the session:
```
self.session.authentication_status.log_out()
```
To install this service, copy this file over to your project and import it like this:
```
from .authentication_service import BrokenEntryError, PermissionLevel, FileBasedAuthenticationService, AuthenticationStatus
```
"""
class PermissionLevel(IntEnum):
"""
Subclass and modify/add levels at your own discretion
"""
GUEST = 0
USER = 10
MODERATOR = 20
ADMIN = 30
SUPER_ADMIN = 40
@classmethod
def from_int(cls, value: int) -> Self:
"""
Returns the highest available level for the provided value.
E.g. if 10 and 20 are available and the input is 19, the returning PermissionLevel is 10.
"""
last = cls(0)
for level in cls:
if value < level.value:
return last
last = level
return last
class BrokenEntryError(Exception):
def __init__(self, original_error: str):
super().__init__(f"An entry in your password file is broken: {original_error}")
class FileBasedAuthenticationService:
"""
A simple authentication service that allows authentication against a file and offers an API to insert and
update user accounts. Supports multiple permission levels.
"""
DELIMITER = "|" # This symbol is used to delimit username and password. May need to change if you allow this symbol in usernames
def __init__(self, password_file_path: Union[str, Path], is_sha256: bool = True, permission_levels: Type[PermissionLevel] = PermissionLevel, raise_broken_entries: bool = True) -> None:
"""
:param password_file_path: Path to file containing username/password pairs.
:param is_sha256: Specify that the passwords in the password file are SHA256 hashed. It is recommended to never disable this in production
:param permission_levels: IntEnum Specifying permission level names
:param raise_broken_entries: If this is set to false, broken entries are skipped instead of raising an error on them.
"""
self._permission_levels = permission_levels
self._password_file_path = password_file_path
self._is_sha256 = is_sha256
self._users: dict[str, tuple[str, PermissionLevel]] = {}
self._logger = logging.getLogger("FileBasedAuthenticationService")
self._raise_broken_entries = raise_broken_entries
if not self._is_sha256:
warnings.warn("\nYou have specified that the password file is not hashed."
"\nThis is incredibly unsafe for both you and your users."
"\nPlease do not use this setting in production environments!")
try:
with open(self._password_file_path, "r") as password_file:
try:
for line in password_file.readlines():
username, password, permission_level_value = line.strip().split(self.DELIMITER)
self._users[username] = (password, self._permission_levels.from_int(int(permission_level_value)))
except (ValueError, TypeError) as e:
if self._raise_broken_entries:
raise BrokenEntryError(str(e))
self._logger.warning(f"A broken Entry was skipped: {line}. Will be deleted on next sync!")
except FileNotFoundError:
self._logger.warning(f"Password file at '{self._password_file_path}' not found. "
f"Creating new one on next sync.")
self._passwords = {}
def authenticate(self, username: str, password: str) -> Optional[PermissionLevel]:
"""
Checks if the username and the password match. If it does, the PermissionLevel is returned. If not, `None` is returned.
:param username: Username to check
:param password: Password to check
:return: PermissionLevel of user or None
"""
try:
password_entry = self._users[username][0]
except KeyError:
return
if not self._is_sha256 and password_entry == password:
return self._users[username][1]
if self._is_sha256 and password_entry == sha256(password.encode("utf-8")).hexdigest():
return self._users[username][1]
def add_user(self, username: str, password: str, permission_level: PermissionLevel) -> None:
"""
Adds a new user
:param username: Username for the new user
:param password: Password for the new user
:param permission_level: Permission Level for the new user
"""
if username in self._users.keys():
raise ValueError(f"User {username} already exists. Use FileBasedAuthenticationService.edit_user to change it's values.")
if self.DELIMITER in username:
raise ValueError(f"Symbol '{self.DELIMITER}' detected in username. This is not allowed.")
if self._is_sha256:
password = sha256(password.encode("utf-8")).hexdigest()
self._users[username] = (password, permission_level)
self._sync()
def delete_user(self, username: str) -> None:
"""
Deletes an existing user
:param username: Username of the user to delete
"""
if username in self._users.keys():
self._users.pop(username)
self._sync()
def edit_user(self, username: str, new_permission_level: Optional[PermissionLevel] = None, new_password: Optional[str] = None) -> None:
"""
Edits an existing user.
:param username: Username of the user to edit
:param new_permission_level: New permission level. Set to None if permission level should not change.
:param new_password: New password. Set to None if password should not change.
"""
if username not in self._users.keys():
raise ValueError(f"User {username} does not exists.")
if new_permission_level:
self._users[username] = (self._users[username][0], new_permission_level)
if new_password and self._is_sha256:
self._users[username] = (sha256(new_password.encode("utf-8")).hexdigest(), self._users[username][1])
if new_password and not self._is_sha256:
self._users[username] = (new_password, self._users[username][1])
self._sync()
def _sync(self) -> None:
with open(self._password_file_path, "w+") as password_file:
for user, pw_and_perm in self._users.items():
password_file.write(f"{user}{self.DELIMITER}{pw_and_perm[0]}{self.DELIMITER}{pw_and_perm[1].value}\n")
@dataclass
class AuthenticationStatus:
username: Optional[str]
permission_level: Optional[PermissionLevel]
def is_logged_in(self) -> bool:
return self.username is not None
def log_out(self) -> None:
self.username = None
self.permission_level = None