commit 6f4caf7f799d2d355bf0b0727e9663ce87f466af Author: David Rodenkirchen Date: Wed Apr 2 08:55:43 2025 +0200 add receipt printer code diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d17b523 --- /dev/null +++ b/.gitignore @@ -0,0 +1,154 @@ +# CONFIG +*.toml + +# Chatch-all default Python gitignore + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# plaintext files +*.txt + +# csv-files +*.csv + +# certificates +*.crt + +# icons +*.ico + +# C extensions +*.so + +# PyCharm +.idea/ + +# Excel tables +*.ods +*.xlsx + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# JSON containing project data +projects.json + +# Authentication directoy +auth/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..1350a1e --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# Receipt Printer + +This software is designed to run on a debian based system and acts as a middle-man between the EZ LAN Manager database and the USB interface of the receipt printer. + +# Notes + +- Does only work on Linux +- Needs root privileges or udev config to work +- Designed to run alone on a Raspberry Pi +- Not meant to be exposed to the internet, local deployment only +- For EZ LAN Manager configuration, check README in that repo + +# Deploy + +1. Configure password and port in `main.py` +2. Make sure USB printer is connected to device +3. Install requirements with sudo +4. Run `main.py` with sudo diff --git a/init_printer.py b/init_printer.py new file mode 100644 index 0000000..9858e86 --- /dev/null +++ b/init_printer.py @@ -0,0 +1,57 @@ +import logging + +import usb.core +import usb.util +from usb.core import Endpoint + +logger = logging.getLogger(__name__) + +def init_printer() -> tuple[Endpoint, Endpoint]: + """ + ! DEBUG ONLY ! + Initializes printer and returns the input and output endpoints, in that order. + """ + dev = usb.core.find(idVendor=0x28e9, idProduct=0x0289) + + if dev is None: + raise ValueError("Printer could not be found") + else: + logger.debug("Printer found!") + + # Check for config + cfg = dev.get_active_configuration() + logger.debug(f"Found configuration: {cfg.bConfigurationValue}") + + intf = cfg[(0, 0)] # Select first interface and first setting + + logger.debug(f"Interface #: {intf.bInterfaceNumber}, Endpoints: {intf.bNumEndpoints}") + + # Find Bulk OUT- und IN-Endpoints + ep_out = usb.util.find_descriptor( + intf, + custom_match=lambda endpoint: usb.util.endpoint_direction(endpoint.bEndpointAddress) == usb.util.ENDPOINT_OUT + ) + + ep_in = usb.util.find_descriptor( + intf, + custom_match=lambda endpoint: usb.util.endpoint_direction(endpoint.bEndpointAddress) == usb.util.ENDPOINT_IN + ) + + # Validate endpoints + if ep_out: + logger.debug(f"OUT-Endpoint found: {ep_out.bEndpointAddress}") + else: + raise ValueError("OUT-Endpoint not found") + + if ep_in: + logger.debug(f"IN-Endpoint found: {ep_in.bEndpointAddress}") + else: + raise ValueError("IN-Endpoint not found") + + if dev.is_kernel_driver_active(intf.bInterfaceNumber): + try: + dev.detach_kernel_driver(intf.bInterfaceNumber) + except usb.core.USBError as e: + raise ValueError(f"Could not detach kernel driver from interface({intf.bInterfaceNumber}): {e}") + + return ep_in, ep_out diff --git a/logo.png b/logo.png new file mode 100644 index 0000000..2d77216 Binary files /dev/null and b/logo.png differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..a4e6b3b --- /dev/null +++ b/main.py @@ -0,0 +1,78 @@ +from datetime import datetime + +import uvicorn +from fastapi import FastAPI, HTTPException +from fastapi.params import Header +from pydantic import BaseModel + +from escpos.printer import Usb + +MAX_LINE_LEN = 30 + +PRINTER = Usb(0x28e9, 0x0289) + +SECRET_PASSWORD = "Alkohol1" + +class OrderItem(BaseModel): + menu_item_name: str + amount: int + +class Order(BaseModel): + order_id: str + order_date: datetime + customer_name: str + seat_id: str + items: list[OrderItem] + +def build_header(order_id: str, order_date: datetime, customer_name: str, seat_id: str) -> str: + return (f"EZ GG e.V. - EZ LAN 1.0\n\n" + f"Bestellungs-ID: {order_id}\n" + f"Bestellt: {order_date.strftime('%d.%m. - %H:%M')}\n" + f"Gast: {customer_name}\n" + f"Sitzplatz: {seat_id}\n\n") + +def format_order(order: list[OrderItem]) -> list[str]: + formatted_order = [] + for item in order: + formatted_text = f"{item.amount}x {item.menu_item_name}" + formatted_text = formatted_text.replace("ä", "ae") + formatted_text = formatted_text.replace("ö", "oe") + formatted_text = formatted_text.replace("ü", "ue") + formatted_text = formatted_text.replace("ß", "ss") + if len(formatted_text) < MAX_LINE_LEN: + while len(formatted_text) < MAX_LINE_LEN: + formatted_text = formatted_text.replace(" ", " ", 1) + else: + formatted_text = formatted_text[:MAX_LINE_LEN] + + formatted_order.append(formatted_text + "\n") + return formatted_order + +def print_order(order: Order, printer_: Usb) -> None: + header = build_header(order.order_id, order.order_date, order.customer_name, order.seat_id) + items = format_order(order.items) + printer_.image("logo.png") + printer_.text(header) + for item in items: + printer_.text(item) + printer_.cut() + +api = FastAPI() + + +@api.post("/print_order") +async def print_order_api_endpoint(order: Order, x_password: str = Header(None)): + if x_password != SECRET_PASSWORD: + raise HTTPException(status_code=401, detail="Unauthorized") + + for item in order.items: + if item.amount < 1: + return 422 + + print_order(order, PRINTER) + + return "done" + +if __name__ == "__main__": + print("Starting receipt printing server...") + uvicorn.run(api, host="0.0.0.0", port=5000, log_level="warning") diff --git a/post_request_tester.py b/post_request_tester.py new file mode 100644 index 0000000..2020cbc --- /dev/null +++ b/post_request_tester.py @@ -0,0 +1,21 @@ +import requests + +if __name__ == "__main__": + # Helper for testing the API + response = requests.post( + "http://127.0.0.1:5000/print_order", + json={ + "order_id": "12345", + "order_date": "2025-04-02T06:34:08.012Z", + "customer_name": "Typhus", + "seat_id": "A13", + "items": [ + { + "menu_item_name": "Bier", + "amount": 2 + } + ] + }, + headers={"x-password": "Alkohol1"} + ) + print(response)