This repository has been archived on 2026-06-11. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
ezgg-lan-manager/src/ezgg_lan_manager/services/TournamentService.py
T

145 lines
6.7 KiB
Python

import json
from pprint import pprint
from typing import Optional
from from_root import from_root
from src.ezgg_lan_manager.services.DatabaseService import DatabaseService
from src.ezgg_lan_manager.services.UserService import UserService
from src.ezgg_lan_manager.types.Participant import Participant
from src.ezgg_lan_manager.types.Team import Team
from src.ezgg_lan_manager.types.Tournament import Tournament
from src.ezgg_lan_manager.types.TournamentBase import ParticipantType, TournamentError
from src.ezgg_lan_manager.types.User import User
class TournamentService:
def __init__(self, db_service: DatabaseService, user_service: UserService) -> None:
self._db_service = db_service
self._user_service = user_service
# Crude cache mechanism. If performance suffers, maybe implement a queue with Single-Owner-Pattern or a Lock
self._cache: dict[int, Tournament] = {}
self._cache_dirty: bool = True # Setting this flag invokes cache update on next read
async def queue_cache_renewal(self) -> None:
# Used in admin UI to provoke cache renewal after direct database access
self._cache_dirty = True
async def _update_cache(self) -> None:
tournaments = await self._db_service.get_all_tournaments()
for tournament in tournaments:
self._cache[tournament.id] = tournament
self._cache_dirty = False
async def register_user_for_tournament(self, user_id: int, tournament_id: int) -> None:
tournament = await self.get_tournament_by_id(tournament_id)
if not tournament:
raise TournamentError(f"No tournament with ID {tournament_id} was found")
if tournament.participant_type != ParticipantType.PLAYER:
raise TournamentError(f"Can only add single player to team tournament, not {tournament.participant_type.name}")
participant = Participant(id_=user_id, participant_type=ParticipantType.PLAYER)
tournament.add_participant(participant)
await self._db_service.add_participant_to_tournament(participant, tournament)
self._cache_dirty = True
async def register_team_for_tournament(self, team_id: int, tournament_id: int) -> None:
tournament = await self.get_tournament_by_id(tournament_id)
if not tournament:
raise TournamentError(f"No tournament with ID {tournament_id} was found")
if tournament.participant_type != ParticipantType.TEAM:
raise TournamentError(f"Can only add team to team tournament, not {tournament.participant_type.name}")
participant = Participant(id_=team_id, participant_type=ParticipantType.TEAM)
tournament.add_participant(participant)
await self._db_service.add_participant_to_tournament(participant, tournament)
self._cache_dirty = True
async def unregister_user_from_tournament(self, user_id: int, tournament_id: int) -> None:
tournament = await self.get_tournament_by_id(tournament_id)
if not tournament:
raise TournamentError(f"No tournament with ID {tournament_id} was found")
participant = next(filter(lambda p: p.id == user_id, tournament.participants), None)
if participant is not None:
tournament.remove_participant(participant)
await self._db_service.remove_participant_from_tournament(participant, tournament)
self._cache_dirty = True
async def unregister_team_from_tournament(self, team_id: int, tournament_id: int) -> None:
tournament = await self.get_tournament_by_id(tournament_id)
if not tournament:
raise TournamentError(f"No tournament with ID {tournament_id} was found")
participant = next(filter(lambda p: p.id == team_id, tournament.participants), None)
if participant is not None:
tournament.remove_participant(participant)
await self._db_service.remove_participant_from_tournament(participant, tournament)
self._cache_dirty = True
async def get_tournaments(self) -> list[Tournament]:
if self._cache_dirty:
await self._update_cache()
return list(self._cache.values())
async def get_tournament_by_id(self, tournament_id: int) -> Optional[Tournament]:
if self._cache_dirty:
await self._update_cache()
return self._cache.get(tournament_id, None)
async def get_users_from_participant_list(self, participants: list[Participant]) -> list[User]:
all_users = await self._db_service.get_all_users()
participant_ids = [p.id for p in participants]
return list(filter(lambda u: u.user_id in participant_ids, all_users))
async def get_teams_from_participant_list(self, participants: list[Participant]) -> list[Team]:
all_teams = await self._db_service.get_teams()
participant_ids = [p.id for p in participants]
return list(filter(lambda t: t.id in participant_ids, all_teams))
async def start_tournament(self, tournament_id: int):
tournament = await self.get_tournament_by_id(tournament_id)
if tournament:
tournament.start()
await self._generate_initial_json_file(tournament)
await self._db_service.change_tournament_status(tournament_id, tournament.status)
self._cache_dirty = True
async def cancel_tournament(self, tournament_id: int):
tournament = await self.get_tournament_by_id(tournament_id)
if tournament:
tournament.cancel()
await self._db_service.change_tournament_status(tournament_id, tournament.status)
self._cache_dirty = True
async def _generate_initial_json_file(self, tournament: Tournament) -> None:
"""
Generates the initial JSON file for the tournament. Won't generate a new one if one already exists.
ToDo: Remove this method when final tournament system is completed.
"""
p = tournament.participants
pairs = [
(p[i], p[i + 1]) if i + 1 < len(p) else (p[i], None)
for i in range(0, len(p), 2)
]
data = {
"rounds": [
[
{
"opponent_1_id": pair[0].id if pair[0] is not None else None,
"opponent_2_id": pair[1].id if pair[1] is not None else None,
"winner": None
} for pair in pairs
]
]
}
# Resolve byes
for match in data["rounds"][0]:
if match["opponent_2_id"] is None:
match["winner"] = match["opponent_1_id"]
file_name = tournament.name.replace(" ", "_") + ".json"
try:
with open(from_root("tournament_data", file_name), "x") as f:
json.dump(data, f, indent=4)
except FileExistsError:
pass