Merge branch 'feature/add-auth-service' into 'main'
add auth service See merge request ezgg/ezgg-website!15
This commit is contained in:
commit
38f5257cd3
@ -8,6 +8,7 @@ from from_root import from_root
|
|||||||
from . import pages
|
from . import pages
|
||||||
from . import components as comps
|
from . import components as comps
|
||||||
from . import services
|
from . import services
|
||||||
|
from .services import AuthenticationStatus
|
||||||
|
|
||||||
rio.Icon.register_single_icon(
|
rio.Icon.register_single_icon(
|
||||||
set_name="custom",
|
set_name="custom",
|
||||||
@ -105,10 +106,15 @@ themes = [
|
|||||||
|
|
||||||
|
|
||||||
async def on_session_start(s: rio.Session) -> None:
|
async def on_session_start(s: rio.Session) -> None:
|
||||||
|
s.authentication_status = AuthenticationStatus(None, None)
|
||||||
await s.set_title("EZ GG e.V.")
|
await s.set_title("EZ GG e.V.")
|
||||||
|
|
||||||
# Create the Rio app
|
# Create the Rio app
|
||||||
database_service = services.DatabaseService()
|
database_service = services.DatabaseService()
|
||||||
|
authentication_service = services.FileBasedAuthenticationService(
|
||||||
|
from_root("pw.txt"),
|
||||||
|
raise_broken_entries=False
|
||||||
|
)
|
||||||
app = rio.App(
|
app = rio.App(
|
||||||
name='ezgg-website',
|
name='ezgg-website',
|
||||||
pages=[
|
pages=[
|
||||||
@ -157,7 +163,7 @@ app = rio.App(
|
|||||||
icon=from_root("ezgg_website/assets/icons/favicon.png"),
|
icon=from_root("ezgg_website/assets/icons/favicon.png"),
|
||||||
assets_dir=Path(__file__).parent / "assets",
|
assets_dir=Path(__file__).parent / "assets",
|
||||||
on_session_start=on_session_start,
|
on_session_start=on_session_start,
|
||||||
default_attachments=[database_service],
|
default_attachments=[database_service, authentication_service],
|
||||||
meta_tags={
|
meta_tags={
|
||||||
# "content-Type": "text/html; utf-8", # Temporarily disabled until "http-equiv" is supported by Rio
|
# "content-Type": "text/html; utf-8", # Temporarily disabled until "http-equiv" is supported by Rio
|
||||||
# "Pragma": "cache", # Temporarily disabled until "http-equiv" is supported by Rio
|
# "Pragma": "cache", # Temporarily disabled until "http-equiv" is supported by Rio
|
||||||
|
|||||||
@ -19,6 +19,7 @@ class Home(rio.Component):
|
|||||||
await self.session.set_title("EZ GG e.V.")
|
await self.session.set_title("EZ GG e.V.")
|
||||||
|
|
||||||
def build(self) -> rio.Component:
|
def build(self) -> rio.Component:
|
||||||
|
self.session.get_permission_level = True
|
||||||
return build_page(rio.Column(
|
return build_page(rio.Column(
|
||||||
rio.Rectangle(
|
rio.Rectangle(
|
||||||
content=rio.Markdown("### Willkommen auf der Homepage der EZ GG e.V.", margin=1),
|
content=rio.Markdown("### Willkommen auf der Homepage der EZ GG e.V.", margin=1),
|
||||||
|
|||||||
@ -1 +1,2 @@
|
|||||||
from .database_service import DatabaseService
|
from .database_service import DatabaseService
|
||||||
|
from .authentication_service import BrokenEntryError, PermissionLevel, FileBasedAuthenticationService, AuthenticationStatus
|
||||||
|
|||||||
201
ezgg_website/services/authentication_service.py
Normal file
201
ezgg_website/services/authentication_service.py
Normal file
@ -0,0 +1,201 @@
|
|||||||
|
import logging
|
||||||
|
import warnings
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import IntEnum
|
||||||
|
from hashlib import sha256
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Type, Union, Self, Optional
|
||||||
|
|
||||||
|
__author__ = "David 'Typhus' Rodenkirchen"
|
||||||
|
__license__ = "WTFPL @ wtfpl.net"
|
||||||
|
__email__ = "vorstand@ezgg-ev.de"
|
||||||
|
|
||||||
|
"""
|
||||||
|
This simple authentication service is meant to be dropped into a Rio application.
|
||||||
|
Instantiate the service, add it to the rio.App attachments and implement the following session hook:
|
||||||
|
|
||||||
|
```
|
||||||
|
authentication_service = FileBasedAuthenticationService("/path/to/pw_file.txt")
|
||||||
|
rio.App(
|
||||||
|
[...]
|
||||||
|
default_attachments=[authentication_service],
|
||||||
|
on_session_start= lambda session: session.authentication_status = AuthenticationStatus(None, None)
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
You can log the user in on a session like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
permission_level = self.authentication_service.authenticate(username, password)
|
||||||
|
if permission_level:
|
||||||
|
self.session.authentication_status = AuthenticationStatus(username, permission_level)
|
||||||
|
```
|
||||||
|
|
||||||
|
You can also perform a logout on the session:
|
||||||
|
|
||||||
|
```
|
||||||
|
self.session.authentication_status.log_out()
|
||||||
|
```
|
||||||
|
|
||||||
|
To install this service, copy this file over to your project and import it like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
from .authentication_service import BrokenEntryError, PermissionLevel, FileBasedAuthenticationService, AuthenticationStatus
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class PermissionLevel(IntEnum):
|
||||||
|
"""
|
||||||
|
Subclass and modify/add levels at your own discretion
|
||||||
|
"""
|
||||||
|
GUEST = 0
|
||||||
|
USER = 10
|
||||||
|
MODERATOR = 20
|
||||||
|
ADMIN = 30
|
||||||
|
SUPER_ADMIN = 40
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_int(cls, value: int) -> Self:
|
||||||
|
"""
|
||||||
|
Returns the highest available level for the provided value.
|
||||||
|
E.g. if 10 and 20 are available and the input is 19, the returning PermissionLevel is 10.
|
||||||
|
"""
|
||||||
|
last = cls(0)
|
||||||
|
for level in cls:
|
||||||
|
if value < level.value:
|
||||||
|
return last
|
||||||
|
last = level
|
||||||
|
return last
|
||||||
|
|
||||||
|
|
||||||
|
class BrokenEntryError(Exception):
|
||||||
|
def __init__(self, original_error: str):
|
||||||
|
super().__init__(f"An entry in your password file is broken: {original_error}")
|
||||||
|
|
||||||
|
|
||||||
|
class FileBasedAuthenticationService:
|
||||||
|
"""
|
||||||
|
A simple authentication service that allows authentication against a file and offers an API to insert and
|
||||||
|
update user accounts. Supports multiple permission levels.
|
||||||
|
"""
|
||||||
|
DELIMITER = "|" # This symbol is used to delimit username and password. May need to change if you allow this symbol in usernames
|
||||||
|
|
||||||
|
def __init__(self, password_file_path: Union[str, Path], is_sha256: bool = True, permission_levels: Type[PermissionLevel] = PermissionLevel, raise_broken_entries: bool = True) -> None:
|
||||||
|
"""
|
||||||
|
:param password_file_path: Path to file containing username/password pairs.
|
||||||
|
:param is_sha256: Specify that the passwords in the password file are SHA256 hashed. It is recommended to never disable this in production
|
||||||
|
:param permission_levels: IntEnum Specifying permission level names
|
||||||
|
:param raise_broken_entries: If this is set to false, broken entries are skipped instead of raising an error on them.
|
||||||
|
"""
|
||||||
|
self._permission_levels = permission_levels
|
||||||
|
self._password_file_path = password_file_path
|
||||||
|
self._is_sha256 = is_sha256
|
||||||
|
self._users: dict[str, tuple[str, PermissionLevel]] = {}
|
||||||
|
self._logger = logging.getLogger("FileBasedAuthenticationService")
|
||||||
|
self._raise_broken_entries = raise_broken_entries
|
||||||
|
if not self._is_sha256:
|
||||||
|
warnings.warn("\nYou have specified that the password file is not hashed."
|
||||||
|
"\nThis is incredibly unsafe for both you and your users."
|
||||||
|
"\nPlease do not use this setting in production environments!")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self._password_file_path, "r") as password_file:
|
||||||
|
try:
|
||||||
|
for line in password_file.readlines():
|
||||||
|
username, password, permission_level_value = line.strip().split(self.DELIMITER)
|
||||||
|
self._users[username] = (password, self._permission_levels.from_int(int(permission_level_value)))
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
if self._raise_broken_entries:
|
||||||
|
raise BrokenEntryError(str(e))
|
||||||
|
self._logger.warning(f"A broken Entry was skipped: {line}. Will be deleted on next sync!")
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
self._logger.warning(f"Password file at '{self._password_file_path}' not found. "
|
||||||
|
f"Creating new one on next sync.")
|
||||||
|
self._passwords = {}
|
||||||
|
|
||||||
|
def authenticate(self, username: str, password: str) -> Optional[PermissionLevel]:
|
||||||
|
"""
|
||||||
|
Checks if the username and the password match. If it does, the PermissionLevel is returned. If not, `None` is returned.
|
||||||
|
:param username: Username to check
|
||||||
|
:param password: Password to check
|
||||||
|
:return: PermissionLevel of user or None
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
password_entry = self._users[username][0]
|
||||||
|
except KeyError:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._is_sha256 and password_entry == password:
|
||||||
|
return self._users[username][1]
|
||||||
|
|
||||||
|
if self._is_sha256 and password_entry == sha256(password.encode("utf-8")).hexdigest():
|
||||||
|
return self._users[username][1]
|
||||||
|
|
||||||
|
def add_user(self, username: str, password: str, permission_level: PermissionLevel) -> None:
|
||||||
|
"""
|
||||||
|
Adds a new user
|
||||||
|
:param username: Username for the new user
|
||||||
|
:param password: Password for the new user
|
||||||
|
:param permission_level: Permission Level for the new user
|
||||||
|
"""
|
||||||
|
if username in self._users.keys():
|
||||||
|
raise ValueError(f"User {username} already exists. Use FileBasedAuthenticationService.edit_user to change it's values.")
|
||||||
|
|
||||||
|
if self.DELIMITER in username:
|
||||||
|
raise ValueError(f"Symbol '{self.DELIMITER}' detected in username. This is not allowed.")
|
||||||
|
|
||||||
|
if self._is_sha256:
|
||||||
|
password = sha256(password.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
self._users[username] = (password, permission_level)
|
||||||
|
self._sync()
|
||||||
|
|
||||||
|
def delete_user(self, username: str) -> None:
|
||||||
|
"""
|
||||||
|
Deletes an existing user
|
||||||
|
:param username: Username of the user to delete
|
||||||
|
"""
|
||||||
|
if username in self._users.keys():
|
||||||
|
self._users.pop(username)
|
||||||
|
self._sync()
|
||||||
|
|
||||||
|
def edit_user(self, username: str, new_permission_level: Optional[PermissionLevel] = None, new_password: Optional[str] = None) -> None:
|
||||||
|
"""
|
||||||
|
Edits an existing user.
|
||||||
|
:param username: Username of the user to edit
|
||||||
|
:param new_permission_level: New permission level. Set to None if permission level should not change.
|
||||||
|
:param new_password: New password. Set to None if password should not change.
|
||||||
|
"""
|
||||||
|
if username not in self._users.keys():
|
||||||
|
raise ValueError(f"User {username} does not exists.")
|
||||||
|
|
||||||
|
if new_permission_level:
|
||||||
|
self._users[username] = (self._users[username][0], new_permission_level)
|
||||||
|
|
||||||
|
if new_password and self._is_sha256:
|
||||||
|
self._users[username] = (sha256(new_password.encode("utf-8")).hexdigest(), self._users[username][1])
|
||||||
|
|
||||||
|
if new_password and not self._is_sha256:
|
||||||
|
self._users[username] = (new_password, self._users[username][1])
|
||||||
|
|
||||||
|
self._sync()
|
||||||
|
|
||||||
|
def _sync(self) -> None:
|
||||||
|
with open(self._password_file_path, "w+") as password_file:
|
||||||
|
for user, pw_and_perm in self._users.items():
|
||||||
|
password_file.write(f"{user}{self.DELIMITER}{pw_and_perm[0]}{self.DELIMITER}{pw_and_perm[1].value}\n")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AuthenticationStatus:
|
||||||
|
username: Optional[str]
|
||||||
|
permission_level: Optional[PermissionLevel]
|
||||||
|
|
||||||
|
def is_logged_in(self) -> bool:
|
||||||
|
return self.username is not None
|
||||||
|
|
||||||
|
def log_out(self) -> None:
|
||||||
|
self.username = None
|
||||||
|
self.permission_level = None
|
||||||
Loading…
Reference in New Issue
Block a user