Compare commits

..

2 Commits

Author SHA1 Message Date
David Rodenkirchen
6cc77f26b5 finalize core feature 2026-02-15 01:02:01 +01:00
David Rodenkirchen
8a9004a9a0 integrate database layer 2026-02-13 11:57:32 +01:00
8 changed files with 625 additions and 80 deletions

63
sql/teams_patch.sql Normal file
View File

@ -0,0 +1,63 @@
-- =====================================================
-- Teams
-- =====================================================
DROP TABLE IF EXISTS `team_members`;
DROP TABLE IF EXISTS `teams`;
-- -----------------------------------------------------
-- Teams table
-- -----------------------------------------------------
CREATE TABLE `teams` (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
abbreviation VARCHAR(10) NOT NULL,
join_password VARCHAR(255) NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_team_name (name),
UNIQUE KEY uq_team_abbr (abbreviation)
) ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_unicode_ci;
-- -----------------------------------------------------
-- Team Members (Junction Table)
-- -----------------------------------------------------
CREATE TABLE `team_members` (
team_id INT NOT NULL,
user_id INT NOT NULL,
status ENUM('MEMBER','OFFICER','LEADER')
NOT NULL DEFAULT 'MEMBER',
joined_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (team_id, user_id),
CONSTRAINT fk_tm_team
FOREIGN KEY (team_id)
REFERENCES teams(id)
ON DELETE CASCADE,
CONSTRAINT fk_tm_user
FOREIGN KEY (user_id)
REFERENCES users(user_id)
ON DELETE CASCADE
) ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_unicode_ci;
-- -----------------------------------------------------
-- Indexes
-- -----------------------------------------------------
CREATE INDEX idx_tm_user
ON team_members(user_id);
CREATE INDEX idx_tm_team_status
ON team_members(team_id, status);

View File

@ -1,5 +1,5 @@
from functools import partial from functools import partial
from typing import Callable, Optional from typing import Callable, Optional, Literal
from rio import Component, Revealer, TextStyle, Column, Row, Tooltip, Icon, Spacer, Text, Button from rio import Component, Revealer, TextStyle, Column, Row, Tooltip, Icon, Spacer, Text, Button
@ -10,7 +10,8 @@ from src.ezgg_lan_manager.types.User import User
class TeamRevealer(Component): class TeamRevealer(Component):
user: Optional[User] user: Optional[User]
team: Team team: Team
on_join_button_pressed: Callable mode: Literal["join", "leave", "display"]
on_button_pressed: Callable
def build(self) -> Component: def build(self) -> Component:
return Revealer( return Revealer(
@ -29,12 +30,12 @@ class TeamRevealer(Component):
for member in self.team.members for member in self.team.members
], ],
Row(Button( Row(Button(
content=f"{self.team.name} beitreten", content=f"{self.team.name} beitreten" if self.mode == "join" else f"{self.team.name} verlassen",
shape="rectangle", shape="rectangle",
style="major", style="major",
color="hud", color="hud",
on_press=partial(self.on_join_button_pressed, self.team), on_press=partial(self.on_button_pressed, self.team),
) if self.user not in self.team.members.keys() and self.user is not None else Spacer(grow_x=False, grow_y=False), margin_top=1, margin_bottom=1), ), margin_top=1, margin_bottom=1),
margin_right=1, margin_right=1,
margin_left=1 margin_left=1
), ),

View File

@ -0,0 +1,222 @@
import logging
from typing import Optional, Callable
from rio import Component, Text, Spacer, Rectangle, Column, TextStyle, Row, Button, TextInput, ThemeContextSwitcher
from src.ezgg_lan_manager.services.TeamService import TeamService, NotMemberError, TeamLeadRemovalError, AlreadyMemberError, NameNotAllowedError, TeamNameTooLongError, \
TeamAbbrInvalidError, TeamNameAlreadyTaken
from src.ezgg_lan_manager.types.Team import Team
from src.ezgg_lan_manager.types.User import User
logger = logging.getLogger(__name__.split(".")[-1])
class ErrorBox(Component):
error_message: str
cancel: Callable
def build(self) -> Component:
return Rectangle(
content=Column(
Text(self.error_message, style=TextStyle(fill=self.session.theme.background_color), margin_bottom=1.5),
Row(
Button(
content="Ok",
shape="rectangle",
style="major",
color="hud",
on_press=self.cancel,
)
),
margin=1
),
fill=self.session.theme.primary_color
)
class TeamsDialogJoinHandler(Component):
is_active: bool
cancel: Callable
user: Optional[User] = None
team: Optional[Team] = None
error_message: Optional[str] = None
password: str = ""
async def join(self) -> None:
if self.user is None or self.team is None:
return
if self.password != self.team.join_password:
self.error_message = "Falsches Passwort!"
return
try:
await self.session[TeamService].add_member_to_team(self.team, self.user)
except AlreadyMemberError:
self.error_message = "Du bist bereits Mitglied dieses Teams"
else:
await self.cancel_with_reset()
async def cancel_with_reset(self) -> None:
await self.cancel()
self.error_message = None
self.password = ""
def build(self) -> Component:
if not self.is_active or self.user is None or self.team is None:
return Spacer()
if self.error_message is not None:
return ErrorBox(error_message=self.error_message, cancel=self.cancel_with_reset)
return Rectangle(
content=Column(
Text(f"Team {self.team.name} beitreten", style=TextStyle(fill=self.session.theme.background_color), margin_bottom=1, justify="center"),
ThemeContextSwitcher(content=TextInput(text=self.bind().password, label="Beitrittspasswort", margin_bottom=1), color="secondary"),
Row(
Button(
content="Abbrechen",
shape="rectangle",
style="major",
color=self.session.theme.danger_color,
on_press=self.cancel_with_reset,
),
Button(
content="Beitreten",
shape="rectangle",
style="major",
color=self.session.theme.success_color,
on_press=self.join,
),
spacing=1
),
margin=1
),
fill=self.session.theme.primary_color
)
class TeamsDialogLeaveHandler(Component):
is_active: bool
cancel: Callable
user: Optional[User] = None
team: Optional[Team] = None
error_message: Optional[str] = None
async def leave(self) -> None:
if self.user is not None and self.team is not None:
try:
await self.session[TeamService].remove_member_from_team(self.team, self.user)
except NotMemberError:
self.error_message = "Du bist kein Mitglied in diesem Team"
except TeamLeadRemovalError:
self.error_message = "Als Teamleiter kannst du das Team nicht verlassen"
else:
await self.cancel_with_reset()
async def cancel_with_reset(self) -> None:
await self.cancel()
self.error_message = None
def build(self) -> Component:
if not self.is_active or self.user is None or self.team is None:
return Spacer()
if self.error_message is not None:
return ErrorBox(error_message=self.error_message, cancel=self.cancel_with_reset)
return Rectangle(
content=Column(
Text(f"Team {self.team.name} wirklich verlassen?", style=TextStyle(fill=self.session.theme.background_color), margin_bottom=1.5, justify="center"),
Row(
Button(
content="Nein",
shape="rectangle",
style="major",
color=self.session.theme.danger_color,
on_press=self.cancel_with_reset,
),
Button(
content="Ja",
shape="rectangle",
style="major",
color=self.session.theme.success_color,
on_press=self.leave,
),
spacing=1
),
margin=1
),
fill=self.session.theme.primary_color
)
class TeamsDialogCreateHandler(Component):
is_active: bool
cancel: Callable
user: Optional[User] = None
error_message: Optional[str] = None
team_name: str = ""
team_abbr: str = ""
team_join_password: str = ""
async def cancel_with_reset(self) -> None:
await self.cancel()
self.error_message = None
self.team_name, self.team_abbr, self.team_join_password = "", "", ""
async def create(self) -> None:
if self.user is None:
return
if not self.team_name or not self.team_abbr or not self.team_join_password:
self.error_message = "Angaben unvollständig"
return
try:
await self.session[TeamService].create_team(self.team_name, self.team_abbr, self.team_join_password, self.user)
except NameNotAllowedError as e:
self.error_message = f"Angaben ungültig. Darf kein '{e.disallowed_char}' enthalten."
except TeamNameTooLongError:
self.error_message = f"Name zu lang. Maximal {TeamService.MAX_TEAM_NAME_LENGTH} Zeichen."
except TeamAbbrInvalidError:
self.error_message = f"Name zu lang. Maximal {TeamService.MAX_TEAM_ABBR_LENGTH} Zeichen."
except TeamNameAlreadyTaken:
self.error_message = "Ein Team mit diesem Namen existiert bereits."
else:
await self.cancel_with_reset()
def build(self) -> Component:
if not self.is_active or self.user is None:
return Spacer()
if self.error_message is not None:
return ErrorBox(error_message=self.error_message, cancel=self.cancel_with_reset)
return Rectangle(
content=Column(
Text(f"Team gründen", style=TextStyle(fill=self.session.theme.background_color), margin_bottom=1.5, justify="center"),
ThemeContextSwitcher(content=TextInput(text=self.bind().team_name, label="Team Name", margin_bottom=1), color="secondary"),
ThemeContextSwitcher(content=TextInput(text=self.bind().team_abbr, label="Team Abkürzung", margin_bottom=1), color="secondary"),
ThemeContextSwitcher(content=TextInput(text=self.bind().team_join_password, label="Beitrittspasswort", margin_bottom=1), color="secondary"),
Row(
Button(
content="Abbrechen",
shape="rectangle",
style="major",
color=self.session.theme.danger_color,
on_press=self.cancel_with_reset,
),
Button(
content="Gründen",
shape="rectangle",
style="major",
color=self.session.theme.success_color,
on_press=self.create,
),
spacing=1
),
margin=1
),
fill=self.session.theme.primary_color
)

View File

@ -1,10 +1,11 @@
from typing import Optional from asyncio import sleep
from rio import Column, Component, event, Text, TextStyle, ProgressCircle, Spacer, Revealer, Row, Button, Icon, Tooltip, Rectangle, PointerEventListener, PointerEvent from rio import event, ProgressCircle, PointerEventListener, PointerEvent, Popup, Color
from src.ezgg_lan_manager import ConfigurationService from src.ezgg_lan_manager import ConfigurationService
from src.ezgg_lan_manager.components.MainViewContentBox import MainViewContentBox from src.ezgg_lan_manager.components.MainViewContentBox import MainViewContentBox
from src.ezgg_lan_manager.components.TeamRevealer import TeamRevealer from src.ezgg_lan_manager.components.TeamRevealer import TeamRevealer
from src.ezgg_lan_manager.components.TeamsDialogHandler import *
from src.ezgg_lan_manager.services.TeamService import TeamService from src.ezgg_lan_manager.services.TeamService import TeamService
from src.ezgg_lan_manager.services.UserService import UserService from src.ezgg_lan_manager.services.UserService import UserService
from src.ezgg_lan_manager.types.SessionStorage import SessionStorage from src.ezgg_lan_manager.types.SessionStorage import SessionStorage
@ -15,29 +16,47 @@ from src.ezgg_lan_manager.types.User import User
class TeamsPage(Component): class TeamsPage(Component):
all_teams: Optional[list[Team]] = None all_teams: Optional[list[Team]] = None
user: Optional[User] = None user: Optional[User] = None
user_teams: list[Team] = []
# Dialog handling
popup_open: bool = False
join_active: bool = False
leave_active: bool = True
create_active: bool = False
selected_team_for_join_or_leave: Optional[Team] = None
@event.on_populate @event.on_populate
async def on_populate(self) -> None: async def on_populate(self) -> None:
self.all_teams = await self.session[TeamService].get_all_teams() self.all_teams = await self.session[TeamService].get_all_teams()
#self.user = await self.session[UserService].get_user(self.session[SessionStorage].user_id) self.user = await self.session[UserService].get_user(self.session[SessionStorage].user_id)
self.user = await self.session[UserService].get_user("Typhus") # FixMe: Only debug
if self.user is not None:
self.user_teams = await self.session[TeamService].get_teams_for_user_by_id(self.user.user_id)
await self.session.set_title(f"{self.session[ConfigurationService].get_lan_info().name} - Teams") await self.session.set_title(f"{self.session[ConfigurationService].get_lan_info().name} - Teams")
self.session[SessionStorage].subscribe_to_logged_in_or_out_event(str(self.__class__), self.on_populate) self.session[SessionStorage].subscribe_to_logged_in_or_out_event(str(self.__class__), self.on_populate)
async def on_join_button_pressed(self, team: Team) -> None: async def on_join_button_pressed(self, team: Team) -> None:
# ToDo: handle joining by displaying password popup if self.user is None:
print(f"Starting join process for team {team.abbreviation}") return
self.selected_team_for_join_or_leave = team
self.join_active, self.leave_active, self.create_active = True, False, False
self.popup_open = True
async def on_leave_button_pressed(self, team: Team) -> None: async def on_leave_button_pressed(self, team: Team) -> None:
# ToDo: handle leaving if self.user is None:
print(f"Starting leaving process for team {team.abbreviation}") return
self.selected_team_for_join_or_leave = team
self.join_active, self.leave_active, self.create_active = False, True, False
self.popup_open = True
async def on_create_button_pressed(self, _: PointerEvent) -> None: async def on_create_button_pressed(self, _: PointerEvent) -> None:
# ToDo: Add Popup creation dialog if self.user is None:
print(f"Starting creation process for team") return
self.join_active, self.leave_active, self.create_active = False, False, True
self.popup_open = True
async def popup_action_cancelled(self) -> None:
self.popup_open = False
await sleep(0.2) # Waits for the animation to play before resetting its contents
self.join_active, self.leave_active, self.create_active = False, False, False
self.selected_team_for_join_or_leave = None
self.all_teams = await self.session[TeamService].get_all_teams()
def build(self) -> Component: def build(self) -> Component:
if self.all_teams is None: if self.all_teams is None:
@ -55,7 +74,14 @@ class TeamsPage(Component):
team_list = [] team_list = []
for team in self.all_teams: for team in self.all_teams:
team_list.append(TeamRevealer(user=self.user, team=team, on_join_button_pressed=self.on_join_button_pressed)) team_list.append(
TeamRevealer(
user=self.user,
team=team,
mode="leave" if self.user in team.members.keys() else "join",
on_button_pressed=self.on_leave_button_pressed if self.user in team.members.keys() else self.on_join_button_pressed
)
)
if team_list: if team_list:
team_list[-1].margin_bottom = 1 team_list[-1].margin_bottom = 1
@ -63,9 +89,9 @@ class TeamsPage(Component):
own_teams_content = Spacer(grow_x=False, grow_y=False) own_teams_content = Spacer(grow_x=False, grow_y=False)
if self.user is not None: if self.user is not None:
user_team_list = [] user_team_list = []
for team in self.user_teams: for team in self.all_teams:
# ToDo: Remove from team instead of join if self.user in team.members.keys():
user_team_list.append(TeamRevealer(user=self.user, team=team, on_join_button_pressed=self.on_join_button_pressed)) user_team_list.append(TeamRevealer(user=self.user, team=team, mode="leave", on_button_pressed=self.on_leave_button_pressed))
if not user_team_list: if not user_team_list:
user_team_list.append(Text( user_team_list.append(Text(
@ -112,22 +138,35 @@ class TeamsPage(Component):
) )
) )
return Column( return Popup(
own_teams_content, anchor=Column(
MainViewContentBox( own_teams_content,
Column( MainViewContentBox(
Text( Column(
text="Alle Teams", Text(
style=TextStyle( text="Alle Teams",
fill=self.session.theme.background_color, style=TextStyle(
font_size=1.2 fill=self.session.theme.background_color,
font_size=1.2
),
margin_top=1,
margin_bottom=1,
align_x=0.5
), ),
margin_top=1, *team_list
margin_bottom=1, )
align_x=0.5 ),
), align_y=0,
*team_list
)
), ),
align_y=0 content=Column(
TeamsDialogJoinHandler(is_active=self.join_active, cancel=self.popup_action_cancelled, user=self.user, team=self.selected_team_for_join_or_leave),
TeamsDialogLeaveHandler(is_active=self.leave_active, cancel=self.popup_action_cancelled, user=self.user, team=self.selected_team_for_join_or_leave),
TeamsDialogCreateHandler(is_active=self.create_active, cancel=self.popup_action_cancelled, user=self.user)
),
is_open=self.popup_open,
modal=False,
corner_radius=(0.5, 0.5, 0.5, 0.5),
color=Color.TRANSPARENT,
user_closable=False,
position="top"
) )

View File

@ -970,42 +970,206 @@ class DatabaseService:
logger.warning(f"Error removing participant from tournament: {e}") logger.warning(f"Error removing participant from tournament: {e}")
async def get_teams(self) -> list[Team]: async def get_teams(self) -> list[Team]:
# ToDo: Implement async with self._connection_pool.acquire() as conn:
return [ async with conn.cursor(aiomysql.Cursor) as cursor:
Team( query = """
id=1, SELECT
name="MockTeamAlpha", t.id AS team_id,
abbreviation="-=MTA=-", t.name AS team_name,
members={User(0, "DemoUserA", "", "abuvbwer", None, None, None, True, False, False, datetime.now(), datetime.now()): TeamStatus.LEADER}, t.abbreviation AS team_abbr,
join_password="abc" t.join_password,
), t.created_at AS team_created_at,
Team(
id=1, tm.status AS team_status,
name="MockTeamBeta", tm.joined_at AS member_joined_at,
abbreviation="[MTB]",
members={ u.*
User(1, "DemoUserB", "", "abuvbwer", None, None, None, True, False, False, datetime.now(), datetime.now()): TeamStatus.LEADER,
User(2, "DemoUserC", "", "abuvbwer", None, None, None, True, False, False, datetime.now(), datetime.now()): TeamStatus.OFFICER, FROM teams t
User(3, "DemoUserD", "", "abuvbwer", None, None, None, True, False, False, datetime.now(), datetime.now()): TeamStatus.MEMBER
}, LEFT JOIN team_members tm
join_password="abc" ON t.id = tm.team_id
)
] LEFT JOIN users u
ON tm.user_id = u.user_id
ORDER BY
t.id,
CASE tm.status
WHEN 'LEADER' THEN 1
WHEN 'OFFICER' THEN 2
WHEN 'MEMBER' THEN 3
ELSE 4
END,
u.user_name;
"""
try:
await cursor.execute(query)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_teams()
except Exception as e:
logger.warning(f"Error getting teams: {e}")
return []
current_team: Optional[Team] = None
all_teams = []
for row in await cursor.fetchall():
if row[5] is None: # Teams without single member are ignored
continue
if current_team is None:
user = self._map_db_result_to_user(row[7:])
current_team = Team(id=row[0], name=row[1], abbreviation=row[2], join_password=row[3], members={user: TeamStatus.from_str(row[5])})
elif current_team.id == row[0]: # Still same team
current_team.members[self._map_db_result_to_user(row[7:])] = TeamStatus.from_str(row[5])
else:
all_teams.append(current_team)
user = self._map_db_result_to_user(row[7:])
current_team = Team(id=row[0], name=row[1], abbreviation=row[2], join_password=row[3], members={user: TeamStatus.from_str(row[5])})
all_teams.append(current_team)
return all_teams
async def get_team_by_id(self, team_id: int) -> Optional[Team]: async def get_team_by_id(self, team_id: int) -> Optional[Team]:
return async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
query = """
SELECT
t.id AS team_id,
t.name AS team_name,
t.abbreviation AS team_abbr,
t.join_password,
t.created_at AS team_created_at,
async def get_teams_for_user_by_id(self, user_id: int) -> list[Team]: tm.status AS team_status,
return [] tm.joined_at AS member_joined_at,
async def create_team(self, team_name: str, team_abbr: str, join_password: str) -> Team: u.*
return # ToDo: Raise on existing team
FROM teams t
LEFT JOIN team_members tm
ON t.id = tm.team_id
LEFT JOIN users u
ON tm.user_id = u.user_id
WHERE t.id = %s
ORDER BY
t.id,
CASE tm.status
WHEN 'LEADER' THEN 1
WHEN 'OFFICER' THEN 2
WHEN 'MEMBER' THEN 3
ELSE 4
END,
u.user_name;
"""
try:
await cursor.execute(query, (team_id, ))
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.get_team_by_id(team_id)
except Exception as e:
logger.warning(f"Error getting team: {e}")
return None
team: Optional[Team] = None
for row in await cursor.fetchall():
if team is None:
user = self._map_db_result_to_user(row[7:])
team = Team(id=row[0], name=row[1], abbreviation=row[2], join_password=row[3], members={user: TeamStatus.from_str(row[5])})
elif team.id == row[0]:
team.members[self._map_db_result_to_user(row[7:])] = TeamStatus.from_str(row[5])
return team
async def create_team(self, team_name: str, team_abbr: str, join_password: str, leader: User) -> Team:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO teams (name, abbreviation, join_password) "
"VALUES (%s, %s, %s)", (team_name, team_abbr, join_password)
)
await conn.commit()
team_id = cursor.lastrowid
await cursor.execute(
"INSERT INTO team_members (team_id, user_id, status) VALUES (%s, %s, %s)",
(team_id, leader.user_id, TeamStatus.LEADER.name)
)
await conn.commit()
return await self.get_team_by_id(team_id)
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.create_team(team_name, team_abbr, join_password)
except aiomysql.IntegrityError as e:
logger.warning(f"Aborted duplication entry: {e}")
raise DuplicationError
async def update_team(self, team: Team) -> Team: async def update_team(self, team: Team) -> Team:
pass async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"UPDATE teams SET name = %s, abbreviation = %s, join_password = %s WHERE (id = %s)",
(team.name, team.abbreviation, team.join_password, team.id)
)
await conn.commit()
return await self.get_team_by_id(team.id)
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.update_team(team)
except aiomysql.IntegrityError as e:
logger.warning(f"Aborted duplication entry: {e}")
raise DuplicationError
async def add_member_to_team(self, team: Team, user: User, status: TeamStatus = TeamStatus.MEMBER) -> None:
async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"INSERT INTO team_members (team_id, user_id, status) VALUES (%s, %s, %s)",
(team.id, user.user_id, status.name)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.add_member_to_team(team, user, status)
except aiomysql.IntegrityError as e:
logger.warning(f"Failed to add member {user.user_name} to team {team.name}: {e}")
raise DuplicationError
async def add_member_to_team(self, team: Team, user: User) -> None:
pass
async def remove_user_from_team(self, team: Team, user: User) -> None: async def remove_user_from_team(self, team: Team, user: User) -> None:
pass async with self._connection_pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
await cursor.execute(
"DELETE FROM team_members WHERE team_id = %s AND user_id = %s",
(team.id, user.user_id)
)
await conn.commit()
except aiomysql.InterfaceError:
pool_init_result = await self.init_db_pool()
if not pool_init_result:
raise NoDatabaseConnectionError
return await self.remove_user_from_team(team, user)

View File

@ -1,31 +1,50 @@
from hashlib import sha256
from typing import Union, Optional
from string import ascii_letters, digits from string import ascii_letters, digits
from typing import Optional
from src.ezgg_lan_manager.services.DatabaseService import DatabaseService from src.ezgg_lan_manager.services.DatabaseService import DatabaseService, DuplicationError
from src.ezgg_lan_manager.types.User import User
from src.ezgg_lan_manager.types.Team import TeamStatus, Team from src.ezgg_lan_manager.types.Team import TeamStatus, Team
from src.ezgg_lan_manager.types.User import User
class NameNotAllowedError(Exception): class NameNotAllowedError(Exception):
def __init__(self, disallowed_char: str) -> None: def __init__(self, disallowed_char: str) -> None:
self.disallowed_char = disallowed_char self.disallowed_char = disallowed_char
class AlreadyMemberError(Exception): class AlreadyMemberError(Exception):
def __init__(self) -> None: def __init__(self) -> None:
pass pass
class NotMemberError(Exception): class NotMemberError(Exception):
def __init__(self) -> None: def __init__(self) -> None:
pass pass
class TeamLeadRemovalError(Exception): class TeamLeadRemovalError(Exception):
def __init__(self) -> None: def __init__(self) -> None:
pass pass
class TeamNameTooLongError(Exception):
def __init__(self) -> None:
pass
class TeamNameAlreadyTaken(Exception):
def __init__(self) -> None:
pass
class TeamAbbrInvalidError(Exception):
def __init__(self) -> None:
pass
class TeamService: class TeamService:
ALLOWED_TEAM_NAME_SYMBOLS = ascii_letters + digits + "!#$%&*+,-./:;<=>?[]^_{|}~" ALLOWED_TEAM_NAME_SYMBOLS = ascii_letters + digits + "!#$%&*+,-./:;<=>?[]^_{|}~ "
MAX_TEAM_NAME_LENGTH = 24
MAX_TEAM_ABBR_LENGTH = 8
def __init__(self, db_service: DatabaseService) -> None: def __init__(self, db_service: DatabaseService) -> None:
self._db_service = db_service self._db_service = db_service
@ -37,9 +56,14 @@ class TeamService:
return await self._db_service.get_team_by_id(team_id) return await self._db_service.get_team_by_id(team_id)
async def get_teams_for_user_by_id(self, user_id: int) -> list[Team]: async def get_teams_for_user_by_id(self, user_id: int) -> list[Team]:
return await self._db_service.get_teams_for_user_by_id(user_id) all_teams = await self.get_all_teams()
user_teams = []
for team in all_teams:
if user_id in [u.user_id for u in team.members.keys()]:
user_teams.append(team)
return user_teams
async def create_team(self, team_name: str, team_abbr: str, join_password: str) -> Team: async def create_team(self, team_name: str, team_abbr: str, join_password: str, leader: User) -> Team:
disallowed_char = self._check_for_disallowed_char(team_name) disallowed_char = self._check_for_disallowed_char(team_name)
if disallowed_char: if disallowed_char:
raise NameNotAllowedError(disallowed_char) raise NameNotAllowedError(disallowed_char)
@ -47,7 +71,16 @@ class TeamService:
if disallowed_char: if disallowed_char:
raise NameNotAllowedError(disallowed_char) raise NameNotAllowedError(disallowed_char)
created_team = await self._db_service.create_team(team_name, team_abbr, join_password) if not team_name or len(team_name) > self.MAX_TEAM_NAME_LENGTH:
raise TeamNameTooLongError()
if not team_abbr or len(team_abbr) > self.MAX_TEAM_ABBR_LENGTH:
raise TeamAbbrInvalidError()
try:
created_team = await self._db_service.create_team(team_name, team_abbr, join_password, leader)
except DuplicationError:
raise TeamNameAlreadyTaken
return created_team return created_team
async def update_team(self, team: Team) -> Team: async def update_team(self, team: Team) -> Team:
@ -62,20 +95,27 @@ class TeamService:
disallowed_char = self._check_for_disallowed_char(team.abbreviation) disallowed_char = self._check_for_disallowed_char(team.abbreviation)
if disallowed_char: if disallowed_char:
raise NameNotAllowedError(disallowed_char) raise NameNotAllowedError(disallowed_char)
if not team.name or len(team.name) > self.MAX_TEAM_NAME_LENGTH:
raise TeamNameTooLongError()
if not team.abbreviation or len(team.abbreviation) > self.MAX_TEAM_ABBR_LENGTH:
raise TeamAbbrInvalidError()
return await self._db_service.update_team(team) return await self._db_service.update_team(team)
async def add_member_to_team(self, team: Team, user: User) -> Team: async def add_member_to_team(self, team: Team, user: User, status: TeamStatus = TeamStatus.MEMBER) -> Team:
if user in team.members: if user in team.members:
raise AlreadyMemberError() raise AlreadyMemberError()
await self._db_service.add_member_to_team(team, user) await self._db_service.add_member_to_team(team, user, status)
return await self.get_team_by_id(team.id) return await self.get_team_by_id(team.id)
async def remove_member_from_team(self, team: Team, user: User) -> Team: async def remove_member_from_team(self, team: Team, user: User) -> Team:
if user not in team.members: if user not in team.members:
raise NotMemberError() raise NotMemberError()
if team.members[user] is TeamStatus.LEADER: if team.members[user] == TeamStatus.LEADER:
raise TeamLeadRemovalError() raise TeamLeadRemovalError()
await self._db_service.remove_user_from_team(team, user) await self._db_service.remove_user_from_team(team, user)

View File

@ -1,5 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Self
from src.ezgg_lan_manager.types.User import User from src.ezgg_lan_manager.types.User import User
@ -8,6 +9,16 @@ class TeamStatus(Enum):
OFFICER = 1 OFFICER = 1
LEADER = 2 LEADER = 2
@classmethod
def from_str(cls, team_status: str) -> Self:
if team_status == "MEMBER":
return TeamStatus.MEMBER
elif team_status == "OFFICER":
return TeamStatus.OFFICER
elif team_status == "LEADER":
return TeamStatus.LEADER
raise ValueError
@dataclass(frozen=True) @dataclass(frozen=True)
class Team: class Team:

View File

@ -19,4 +19,9 @@ class User:
last_updated_at: datetime last_updated_at: datetime
def __hash__(self) -> int: def __hash__(self) -> int:
return hash(f"{self.user_id}{self.user_name}{self.user_mail}") return hash(self.user_id)
def __eq__(self, other):
if not isinstance(other, User):
return NotImplemented
return self.user_id == other.user_id