diff --git a/ezgg_website/__init__.py b/ezgg_website/__init__.py index c12f4a0..e3ad404 100644 --- a/ezgg_website/__init__.py +++ b/ezgg_website/__init__.py @@ -8,6 +8,7 @@ from from_root import from_root from . import pages from . import components as comps from . import services +from .services import AuthenticationStatus rio.Icon.register_single_icon( set_name="custom", @@ -105,10 +106,15 @@ themes = [ async def on_session_start(s: rio.Session) -> None: + s.authentication_status = AuthenticationStatus(None, None) await s.set_title("EZ GG e.V.") # Create the Rio app database_service = services.DatabaseService() +authentication_service = services.FileBasedAuthenticationService( + from_root("pw.txt"), + raise_broken_entries=False +) app = rio.App( name='ezgg-website', pages=[ @@ -157,7 +163,7 @@ app = rio.App( icon=from_root("ezgg_website/assets/icons/favicon.png"), assets_dir=Path(__file__).parent / "assets", on_session_start=on_session_start, - default_attachments=[database_service], + default_attachments=[database_service, authentication_service], meta_tags={ # "content-Type": "text/html; utf-8", # Temporarily disabled until "http-equiv" is supported by Rio # "Pragma": "cache", # Temporarily disabled until "http-equiv" is supported by Rio diff --git a/ezgg_website/pages/home.py b/ezgg_website/pages/home.py index e468099..998b7c3 100644 --- a/ezgg_website/pages/home.py +++ b/ezgg_website/pages/home.py @@ -19,6 +19,7 @@ class Home(rio.Component): await self.session.set_title("EZ GG e.V.") def build(self) -> rio.Component: + self.session.get_permission_level = True return build_page(rio.Column( rio.Rectangle( content=rio.Markdown("### Willkommen auf der Homepage der EZ GG e.V.", margin=1), diff --git a/ezgg_website/services/__init__.py b/ezgg_website/services/__init__.py index 9b61a58..3d4e637 100644 --- a/ezgg_website/services/__init__.py +++ b/ezgg_website/services/__init__.py @@ -1 +1,2 @@ from .database_service import DatabaseService +from .authentication_service import BrokenEntryError, PermissionLevel, FileBasedAuthenticationService, AuthenticationStatus diff --git a/ezgg_website/services/authentication_service.py b/ezgg_website/services/authentication_service.py new file mode 100644 index 0000000..b6dfb4a --- /dev/null +++ b/ezgg_website/services/authentication_service.py @@ -0,0 +1,201 @@ +import logging +import warnings +from dataclasses import dataclass +from enum import IntEnum +from hashlib import sha256 +from pathlib import Path +from typing import Type, Union, Self, Optional + +__author__ = "David 'Typhus' Rodenkirchen" +__license__ = "WTFPL @ wtfpl.net" +__email__ = "vorstand@ezgg-ev.de" + +""" +This simple authentication service is meant to be dropped into a Rio application. +Instantiate the service, add it to the rio.App attachments and implement the following session hook: + +``` +authentication_service = FileBasedAuthenticationService("/path/to/pw_file.txt") +rio.App( + [...] + default_attachments=[authentication_service], + on_session_start= lambda session: session.authentication_status = AuthenticationStatus(None, None) +) +``` + +You can log the user in on a session like this: + +``` +permission_level = self.authentication_service.authenticate(username, password) +if permission_level: + self.session.authentication_status = AuthenticationStatus(username, permission_level) +``` + +You can also perform a logout on the session: + +``` +self.session.authentication_status.log_out() +``` + +To install this service, copy this file over to your project and import it like this: + +``` +from .authentication_service import BrokenEntryError, PermissionLevel, FileBasedAuthenticationService, AuthenticationStatus +``` +""" + + +class PermissionLevel(IntEnum): + """ + Subclass and modify/add levels at your own discretion + """ + GUEST = 0 + USER = 10 + MODERATOR = 20 + ADMIN = 30 + SUPER_ADMIN = 40 + + @classmethod + def from_int(cls, value: int) -> Self: + """ + Returns the highest available level for the provided value. + E.g. if 10 and 20 are available and the input is 19, the returning PermissionLevel is 10. + """ + last = cls(0) + for level in cls: + if value < level.value: + return last + last = level + return last + + +class BrokenEntryError(Exception): + def __init__(self, original_error: str): + super().__init__(f"An entry in your password file is broken: {original_error}") + + +class FileBasedAuthenticationService: + """ + A simple authentication service that allows authentication against a file and offers an API to insert and + update user accounts. Supports multiple permission levels. + """ + DELIMITER = "|" # This symbol is used to delimit username and password. May need to change if you allow this symbol in usernames + + def __init__(self, password_file_path: Union[str, Path], is_sha256: bool = True, permission_levels: Type[PermissionLevel] = PermissionLevel, raise_broken_entries: bool = True) -> None: + """ + :param password_file_path: Path to file containing username/password pairs. + :param is_sha256: Specify that the passwords in the password file are SHA256 hashed. It is recommended to never disable this in production + :param permission_levels: IntEnum Specifying permission level names + :param raise_broken_entries: If this is set to false, broken entries are skipped instead of raising an error on them. + """ + self._permission_levels = permission_levels + self._password_file_path = password_file_path + self._is_sha256 = is_sha256 + self._users: dict[str, tuple[str, PermissionLevel]] = {} + self._logger = logging.getLogger("FileBasedAuthenticationService") + self._raise_broken_entries = raise_broken_entries + if not self._is_sha256: + warnings.warn("\nYou have specified that the password file is not hashed." + "\nThis is incredibly unsafe for both you and your users." + "\nPlease do not use this setting in production environments!") + + try: + with open(self._password_file_path, "r") as password_file: + try: + for line in password_file.readlines(): + username, password, permission_level_value = line.strip().split(self.DELIMITER) + self._users[username] = (password, self._permission_levels.from_int(int(permission_level_value))) + except (ValueError, TypeError) as e: + if self._raise_broken_entries: + raise BrokenEntryError(str(e)) + self._logger.warning(f"A broken Entry was skipped: {line}. Will be deleted on next sync!") + + except FileNotFoundError: + self._logger.warning(f"Password file at '{self._password_file_path}' not found. " + f"Creating new one on next sync.") + self._passwords = {} + + def authenticate(self, username: str, password: str) -> Optional[PermissionLevel]: + """ + Checks if the username and the password match. If it does, the PermissionLevel is returned. If not, `None` is returned. + :param username: Username to check + :param password: Password to check + :return: PermissionLevel of user or None + """ + try: + password_entry = self._users[username][0] + except KeyError: + return + + if not self._is_sha256 and password_entry == password: + return self._users[username][1] + + if self._is_sha256 and password_entry == sha256(password.encode("utf-8")).hexdigest(): + return self._users[username][1] + + def add_user(self, username: str, password: str, permission_level: PermissionLevel) -> None: + """ + Adds a new user + :param username: Username for the new user + :param password: Password for the new user + :param permission_level: Permission Level for the new user + """ + if username in self._users.keys(): + raise ValueError(f"User {username} already exists. Use FileBasedAuthenticationService.edit_user to change it's values.") + + if self.DELIMITER in username: + raise ValueError(f"Symbol '{self.DELIMITER}' detected in username. This is not allowed.") + + if self._is_sha256: + password = sha256(password.encode("utf-8")).hexdigest() + + self._users[username] = (password, permission_level) + self._sync() + + def delete_user(self, username: str) -> None: + """ + Deletes an existing user + :param username: Username of the user to delete + """ + if username in self._users.keys(): + self._users.pop(username) + self._sync() + + def edit_user(self, username: str, new_permission_level: Optional[PermissionLevel] = None, new_password: Optional[str] = None) -> None: + """ + Edits an existing user. + :param username: Username of the user to edit + :param new_permission_level: New permission level. Set to None if permission level should not change. + :param new_password: New password. Set to None if password should not change. + """ + if username not in self._users.keys(): + raise ValueError(f"User {username} does not exists.") + + if new_permission_level: + self._users[username] = (self._users[username][0], new_permission_level) + + if new_password and self._is_sha256: + self._users[username] = (sha256(new_password.encode("utf-8")).hexdigest(), self._users[username][1]) + + if new_password and not self._is_sha256: + self._users[username] = (new_password, self._users[username][1]) + + self._sync() + + def _sync(self) -> None: + with open(self._password_file_path, "w+") as password_file: + for user, pw_and_perm in self._users.items(): + password_file.write(f"{user}{self.DELIMITER}{pw_and_perm[0]}{self.DELIMITER}{pw_and_perm[1].value}\n") + + +@dataclass +class AuthenticationStatus: + username: Optional[str] + permission_level: Optional[PermissionLevel] + + def is_logged_in(self) -> bool: + return self.username is not None + + def log_out(self) -> None: + self.username = None + self.permission_level = None