import logging from asyncio import sleep from typing import Optional from rio import Text, Column, TextStyle, Component, event, PressEvent, ProgressCircle, Row, Image, Button, Spacer from src.ez_lan_manager import ConfigurationService, SeatingService, TicketingService, UserService from src.ez_lan_manager.components.MainViewContentBox import MainViewContentBox from src.ez_lan_manager.components.SeatingPlan import SeatingPlan, SeatingPlanLegend from src.ez_lan_manager.components.SeatingPlanInfoBox import SeatingPlanInfoBox from src.ez_lan_manager.components.SeatingPurchaseBox import SeatingPurchaseBox from src.ez_lan_manager.pages import BasePage from src.ez_lan_manager.services.SeatingService import NoTicketError, SeatNotFoundError, WrongCategoryError, SeatAlreadyTakenError from src.ez_lan_manager.types.Seat import Seat from src.ez_lan_manager.types.SessionStorage import SessionStorage from src.ez_lan_manager.types.User import User logger = logging.getLogger(__name__.split(".")[-1]) class SeatingPlanPage(Component): seating_info: Optional[list[Seat]] = None current_seat_id: Optional[str] = None current_seat_occupant: Optional[str] = None current_seat_price: int = 0 current_seat_is_blocked: bool = False user: Optional[User] = None show_info_box: bool = True show_purchase_box: bool = False purchase_box_loading: bool = False purchase_box_success_msg: Optional[str] = None purchase_box_error_msg: Optional[str] = None is_booking_blocked: bool = False @event.on_populate async def on_populate(self) -> None: await self.session.set_title(f"{self.session[ConfigurationService].get_lan_info().name} - Sitzplan") self.seating_info = await self.session[SeatingService].get_seating() self.user = await self.session[UserService].get_user(self.session[SessionStorage].user_id) if not self.user: self.is_booking_blocked = True else: for seat in self.seating_info: if not seat.user or not self.user: continue if seat.user.user_id == self.user.user_id: self.is_booking_blocked = True async def on_seat_clicked(self, seat_id: str, _: PressEvent) -> None: self.show_info_box = True self.show_purchase_box = False seat = next(filter(lambda s: s.seat_id == seat_id, self.seating_info), None) if not seat: return self.current_seat_is_blocked = seat.is_blocked self.current_seat_id = seat.seat_id ticket_info = self.session[TicketingService].get_ticket_info_by_category(seat.category) price = 0 if not ticket_info else ticket_info.price self.current_seat_price = price if seat.user: self.current_seat_occupant = seat.user.user_name else: self.current_seat_occupant = None def set_error(self, msg: str) -> None: self.purchase_box_error_msg = msg self.purchase_box_success_msg = None def set_success(self, msg: str) -> None: self.purchase_box_error_msg = None self.purchase_box_success_msg = msg async def on_purchase_clicked(self) -> None: self.show_info_box = False self.show_purchase_box = True async def on_purchase_confirmed(self) -> None: self.purchase_box_loading = True await self.force_refresh() await sleep(0.5) try: await self.session[SeatingService].seat_user(self.user.user_id, self.current_seat_id) except (NoTicketError, WrongCategoryError): self.set_error("Du besitzt kein gültiges Ticket für diesen Platz") except SeatNotFoundError: self.set_error("Der angegebene Sitzplatz existiert nicht") except SeatAlreadyTakenError: self.set_error("Dieser Platz ist bereits vergeben") except Exception as e: self.set_error("Ein unbekannter Fehler ist aufgetreten") logger.error(e) else: self.set_success("Platz erfolgreich gebucht!") self.purchase_box_loading = False await self.on_populate() async def on_purchase_cancelled(self) -> None: self.purchase_box_loading = False self.show_info_box = True self.show_purchase_box = False self.purchase_box_error_msg = None self.purchase_box_success_msg = None def build(self) -> Component: if not self.seating_info: return BasePage( content=Column( MainViewContentBox( ProgressCircle( color="secondary", align_x=0.5, margin_top=2, margin_bottom=2 ) ), align_y=0 ) ) return BasePage( content=Column( MainViewContentBox( Column( SeatingPlanInfoBox(seat_id=self.current_seat_id, seat_occupant=self.current_seat_occupant, seat_price=self.current_seat_price, is_blocked=self.current_seat_is_blocked, is_booking_blocked=self.is_booking_blocked, show=self.show_info_box, purchase_cb=self.on_purchase_clicked), SeatingPurchaseBox( show=self.show_purchase_box, seat_id=self.current_seat_id, is_loading=self.purchase_box_loading, confirm_cb=self.on_purchase_confirmed, cancel_cb=self.on_purchase_cancelled, error_msg=self.purchase_box_error_msg, success_msg=self.purchase_box_success_msg ) ) ), MainViewContentBox( SeatingPlan(seat_clicked_cb=self.on_seat_clicked, seating_info=self.seating_info) if self.seating_info else Column(ProgressCircle(color=self.session.theme.secondary_color, margin=3), Text("Sitzplan wird geladen", style=TextStyle(fill=self.session.theme.neutral_color), align_x=0.5, margin=1)) ), MainViewContentBox( SeatingPlanLegend(), ), align_y=0 ), grow_x=True )