From 4581b8edffdddeb02b5479efccccf4ef79b3e885 Mon Sep 17 00:00:00 2001
From: piramja <57192020+piramja@users.noreply.github.com>
Date: Fri, 11 Oct 2024 14:18:28 +0200
Subject: [PATCH] initial upload
---
README.md | 44 +++
inventree_niimbot/__init__.py | 0
inventree_niimbot/niimbot_plugin.py | 249 ++++++++++++++++
inventree_niimbot/nimmy/__init__.py | 0
inventree_niimbot/nimmy/bluetooth.py | 79 ++++++
inventree_niimbot/nimmy/exception.py | 5 +
inventree_niimbot/nimmy/logger_config.py | 42 +++
inventree_niimbot/nimmy/packet.py | 37 +++
inventree_niimbot/nimmy/printer.py | 345 +++++++++++++++++++++++
inventree_niimbot/version.py | 3 +
setup.cfg | 9 +
setup.py | 55 ++++
12 files changed, 868 insertions(+)
create mode 100644 README.md
create mode 100644 inventree_niimbot/__init__.py
create mode 100644 inventree_niimbot/niimbot_plugin.py
create mode 100644 inventree_niimbot/nimmy/__init__.py
create mode 100644 inventree_niimbot/nimmy/bluetooth.py
create mode 100644 inventree_niimbot/nimmy/exception.py
create mode 100644 inventree_niimbot/nimmy/logger_config.py
create mode 100644 inventree_niimbot/nimmy/packet.py
create mode 100644 inventree_niimbot/nimmy/printer.py
create mode 100644 inventree_niimbot/version.py
create mode 100644 setup.cfg
create mode 100644 setup.py
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..1bc57f7
--- /dev/null
+++ b/README.md
@@ -0,0 +1,44 @@
+[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
+![PEP](https://github.com/inventree/inventree-python/actions/workflows/pep.yaml/badge.svg)
+
+
+# inventree-niimbot-plugin
+
+A label printing plugin for [InvenTree](https://inventree.org), which provides support for the [Niimbot Label Printers](https://www.niimbot.com/enweb/product_label.html?category_id=6). This plugin is based on the amazing work from [labbots/NiimPrintX](https://github.com/labbots/NiimPrintX/tree/main) and modifications from [LorisPolenz/NiimPrintX](https://github.com/LorisPolenz/NiimPrintX/tree/main).
+
+## Installation
+
+Install this plugin manually as follows:
+
+```
+pip install inventree-niimbot-plugin
+```
+
+Or, add to your `plugins.txt` file to install automatically using the `invoke install` command:
+
+```
+inventree-niimbot-plugin
+```
+
+## Configuration Options
+The following list gives an overview of the available settings. You find them under the InvenTree plugin specific settings.
+
+* **Printer Model**
+Currently supported models are:
+b1, b18, b21, d11, d110 (but i was only able to test b1 because i don't have other printer models. Please report back if you can test other models!!).
+
+* **Density**
+The print density. Different models seem to accept only certain values (b1 accepts 1-3).
+
+* **Rotation**
+Rotation angle, either 0, 90, 180 or 270 degrees.
+
+* **Scaling**
+Scaling factor, from 10% to 200%.
+
+* **Vertical Offset**
+Vertical offset, from 0 to 200px.
+
+* **Horizontal Offset**
+Horizontal offset, from 0 to 200px.
+
diff --git a/inventree_niimbot/__init__.py b/inventree_niimbot/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/inventree_niimbot/niimbot_plugin.py b/inventree_niimbot/niimbot_plugin.py
new file mode 100644
index 0000000..a067165
--- /dev/null
+++ b/inventree_niimbot/niimbot_plugin.py
@@ -0,0 +1,249 @@
+"""Niimbot label printing plugin for InvenTree.
+
+Supports direct printing of labels to USB or Bluetooth label printers, using NiimbotPrintX.
+"""
+
+# translation
+from django.utils.translation import gettext_lazy as _
+
+# printing options
+from rest_framework import serializers
+
+from inventree_niimbot.version import NIIMBOT_PLUGIN_VERSION
+
+# InvenTree plugin libs
+from plugin import InvenTreePlugin
+from plugin.mixins import LabelPrintingMixin, SettingsMixin
+
+# Image library
+from PIL import Image
+
+import asyncio
+from bleak import BleakClient, BleakScanner
+
+# NiimbotPrintX printer client
+from inventree_niimbot.nimmy.bluetooth import find_device
+from inventree_niimbot.nimmy.printer import PrinterClient, InfoEnum
+from inventree_niimbot.nimmy.logger_config import setup_logger, get_logger, logger_enable
+
+
+
+class NiimbotLabelSerializer(serializers.Serializer):
+ """Custom serializer class for NiimbotLabelPlugin.
+
+ Used to specify printing parameters at runtime
+ """
+
+ copies = serializers.IntegerField(
+ default=1,
+ label=_('Copies'),
+ help_text=_('Number of copies to print'),
+ )
+
+
+class NiimbotLabelPlugin(LabelPrintingMixin, SettingsMixin, InvenTreePlugin):
+
+ AUTHOR = "piramja"
+ DESCRIPTION = "Label printing plugin for Niimbot label printers"
+ VERSION = NIIMBOT_PLUGIN_VERSION
+
+ NAME = "Niimbot Labels"
+ SLUG = "niimbot"
+ TITLE = "Niimbot Label Printer"
+
+ PrintingOptionsSerializer = NiimbotLabelSerializer
+
+ # Use background printing
+ BLOCKING_PRINT = False
+
+ SETTINGS = {
+ 'MODEL': {
+ 'name': _('Printer Model'),
+ 'description': _('Select model of Niimbot printer'),
+ 'choices': [
+ ('b1', 'Niimbot B1'),
+ ('b18', 'Niimbot B18'),
+ ('b21', 'Niimbot B21'),
+ ('d11', 'Niimbot D11'),
+ ('d110', 'Niimbot D110')
+ ],
+ 'default': 'b1',
+ },
+ 'DENSITY': {
+ 'name': _('Density'),
+ 'description': _('Density of the print (3 is max for b18, d11, d110)'),
+ 'choices': [
+ ('1', 'density 1'),
+ ('2', 'density 2'),
+ ('3', 'density 3'),
+ ('4', 'density 4'),
+ ('5', 'density 5'),
+ ],
+ 'default': '3',
+ },
+ 'ROTATION': {
+ 'name': _('Rotation'),
+ 'description': _('Image rotation (clockwise)'),
+ 'choices': [
+ ('0', '0 degrees'),
+ ('90', '90 degrees'),
+ ('180', '180 degrees'),
+ ('270', '270 degrees'),
+ ],
+ 'default': '0',
+ },
+ 'SCALING': {
+ 'name': _('Scaling (%)'),
+ 'description': _('Image scaling in percent'),
+ 'choices': [
+ ('2', '200%'),
+ ('1.9', '190%'),
+ ('1.8', '180%'),
+ ('1.7', '170%'),
+ ('1.6', '160%'),
+ ('1.5', '150%'),
+ ('1.4', '140%'),
+ ('1.3', '130%'),
+ ('1.2', '120%'),
+ ('1.1', '110%'),
+ ('1', '100%'),
+ ('0.9', '90%'),
+ ('0.8', '80%'),
+ ('0.7', '70%'),
+ ('0.6', '60%'),
+ ('0.5', '50%'),
+ ('0.4', '40%'),
+ ('0.3', '30%'),
+ ('0.2', '20%'),
+ ('0.1', '10%'),
+ ],
+ 'default': '1',
+ },
+ 'V_OFFSET': {
+ 'name': _('Vertical Offset (px)'),
+ 'description': _('Image offset vertical'),
+ 'choices': [
+ ('0', '0px'),
+ ('10', '10px'),
+ ('20', '20px'),
+ ('30', '30px'),
+ ('40', '40px'),
+ ('50', '50px'),
+ ('60', '60px'),
+ ('70', '70px'),
+ ('80', '80px'),
+ ('90', '90px'),
+ ('100', '100px'),
+ ('110', '110px'),
+ ('120', '120px'),
+ ('130', '130px'),
+ ('140', '140px'),
+ ('150', '150px'),
+ ('160', '160px'),
+ ('170', '170px'),
+ ('180', '180px'),
+ ('190', '190px'),
+ ('200', '200px'),
+ ],
+ 'default': '0',
+ },
+ 'H_OFFSET': {
+ 'name': _('Horizontal Offset (px)'),
+ 'description': _('Image offset horizontal'),
+ 'choices': [
+ ('0', '0px'),
+ ('10', '10px'),
+ ('20', '20px'),
+ ('30', '30px'),
+ ('40', '40px'),
+ ('50', '50px'),
+ ('60', '60px'),
+ ('70', '70px'),
+ ('80', '80px'),
+ ('90', '90px'),
+ ('100', '100px'),
+ ('110', '110px'),
+ ('120', '120px'),
+ ('130', '130px'),
+ ('140', '140px'),
+ ('150', '150px'),
+ ('160', '160px'),
+ ('170', '170px'),
+ ('180', '180px'),
+ ('190', '190px'),
+ ('200', '200px'),
+ ],
+ 'default': '0',
+ },
+ }
+
+
+ def print_label(self, **kwargs):
+ """
+ Send the label to the printer
+ """
+
+ # TODO: Add padding around the provided image, otherwise the label does not print correctly
+ # ^ Why? The wording in the underlying brother_ql library ('dots_printable') seems to suggest
+ # at least that area is fully printable.
+ # TODO: Improve label auto-scaling based on provided width and height information
+
+ # Extract width (x) and height (y) information
+ # width = kwargs['width']
+ # height = kwargs['height']
+ # ^ currently this width and height are those of the label template (before conversion to PDF
+ # and PNG) and are of little use
+
+ # Printing options requires a modern-ish InvenTree backend,
+ # which supports the 'printing_options' keyword argument
+ options = kwargs.get('printing_options', {})
+ n_copies = int(options.get('copies', 1))
+
+ # Look for png data in kwargs (if provided)
+ label_image = kwargs.get('png_file', None)
+
+ if not label_image:
+ # Convert PDF to PNG
+ pdf_data = kwargs['pdf_data']
+ label_image = self.render_to_png(label=None, pdf_data=pdf_data)
+
+
+ # Read settings
+ model = self.get_setting('MODEL')
+ density = int(self.get_setting('DENSITY'))
+ vertical_offset = int(self.get_setting('V_OFFSET'))
+ horizontal_offset = int(self.get_setting('H_OFFSET'))
+ scaling = float(self.get_setting('SCALING'))
+ rotation = int(self.get_setting('ROTATION')) + 90
+ rotation = rotation % 360
+
+ # Rotate image
+ if rotation in [90, 180, 270]:
+ label_image = label_image.rotate(rotation, expand=True)
+
+ # Resize image
+ width, height = label_image.size
+ new_size = (int(width * scaling), int(height * scaling))
+ label_image = label_image.resize(new_size, Image.LANCZOS)
+
+ # Add offsets to the image data directly if model is b1 (maybe necessary for other models too?)
+ if model == "b1":
+ if vertical_offset > 0 or horizontal_offset > 0:
+ new_image = Image.new("RGB", (label_image.width + horizontal_offset, label_image.height + vertical_offset), (255, 255, 255))
+ new_image.paste(label_image, (horizontal_offset, vertical_offset))
+ label_image = new_image
+
+ # Print labels
+ asyncio.run(self._print(model, density, label_image, n_copies, vertical_offset, horizontal_offset))
+
+
+ async def _print(self, model, density, image, quantity, vertical_offset, horizontal_offset):
+ device = await find_device(model)
+ printer = PrinterClient(device)
+ if await printer.connect():
+ if model == "b1":
+ await printer.print_imageV2(image, density=density, quantity=quantity)
+ else:
+ await printer.print_image(image, density=density, quantity=quantity, vertical_offset=vertical_offset, horizontal_offset=horizontal_offset)
+
+ await printer.disconnect()
diff --git a/inventree_niimbot/nimmy/__init__.py b/inventree_niimbot/nimmy/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/inventree_niimbot/nimmy/bluetooth.py b/inventree_niimbot/nimmy/bluetooth.py
new file mode 100644
index 0000000..c59e248
--- /dev/null
+++ b/inventree_niimbot/nimmy/bluetooth.py
@@ -0,0 +1,79 @@
+import asyncio
+from bleak import BleakClient, BleakScanner
+
+from .exception import BLEException
+from .logger_config import get_logger
+
+logger = get_logger()
+
+
+async def find_device(device_name_prefix=None):
+ devices = await BleakScanner.discover()
+ for device in devices:
+ if device.name and device.name.lower().startswith(device_name_prefix.lower()):
+ return device
+ raise BLEException(f"Failed to find device {device_name_prefix}")
+
+
+async def scan_devices(device_name=None):
+ print("Scanning for devices...")
+ devices = await BleakScanner.discover()
+ for device in devices:
+ if device_name:
+ if device.name and device_name.lower() in device.name.lower():
+ print(f"Found device: {device.name} at {device.address}")
+ return device
+ else:
+ print(f"Found device: {device.name} at {device.address}")
+ return None
+
+
+class BLETransport:
+ def __init__(self, address=None):
+ self.address = address
+ self.client = None
+
+ async def __aenter__(self):
+ # Automatically connect if address is provided during initialization
+ if self.address:
+ self.client = BleakClient(self.address)
+ if await self.client.connect():
+ logger.info(f"Connected to {self.address}")
+ return self
+ else:
+ raise BLEException(f"Failed to connect to the BLE device at {self.address}")
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self.client:
+ await self.client.disconnect()
+ logger.info("Disconnected.")
+
+ async def connect(self, address):
+ if self.client is None:
+ self.client = BleakClient(address)
+ if not self.client.is_connected:
+ return await self.client.connect()
+ return False
+
+ async def disconnect(self):
+ if self.client and self.client.is_connected:
+ await self.client.disconnect()
+
+ async def write(self, data, char_uuid):
+ if self.client and self.client.is_connected:
+ await self.client.write_gatt_char(char_uuid, data)
+ else:
+ raise BLEException("BLE client is not connected.")
+
+ async def start_notification(self, char_uuid, handler):
+ if self.client and self.client.is_connected:
+ await self.client.start_notify(char_uuid, handler)
+ else:
+ raise BLEException("BLE client is not connected.")
+
+ async def stop_notification(self, char_uuid):
+ if self.client and self.client.is_connected:
+ await self.client.stop_notify(char_uuid)
+ else:
+ raise BLEException("BLE client is not connected.")
diff --git a/inventree_niimbot/nimmy/exception.py b/inventree_niimbot/nimmy/exception.py
new file mode 100644
index 0000000..fddb8ac
--- /dev/null
+++ b/inventree_niimbot/nimmy/exception.py
@@ -0,0 +1,5 @@
+class BLEException(Exception):
+ pass
+
+class PrinterException(Exception):
+ pass
\ No newline at end of file
diff --git a/inventree_niimbot/nimmy/logger_config.py b/inventree_niimbot/nimmy/logger_config.py
new file mode 100644
index 0000000..3e959f5
--- /dev/null
+++ b/inventree_niimbot/nimmy/logger_config.py
@@ -0,0 +1,42 @@
+import sys
+from loguru import logger
+
+from devtools import debug
+
+
+def setup_logger():
+ logger.remove()
+ default_level = "INFO"
+ logger.add(sys.stderr, colorize=True, format="{time} | {level} | {message}",
+ level=default_level)
+ logger.add("nimmy.log", rotation="100 MB", compression="zip", level=default_level)
+
+
+# | Level name | Severity value | Logger method |
+# ---------------------------------------------------
+# | TRACE | 5 | logger.trace() |
+# | DEBUG | 10 | logger.debug() |
+# | INFO | 20 | logger.info() |
+# | SUCCESS | 25 | logger.success() |
+# | WARNING | 30 | logger.warning() |
+# | ERROR | 40 | logger.error() |
+# | CRITICAL | 50 | logger.critical() |
+# ---------------------------------------------------
+def logger_enable(verbose: int):
+ # Mapping verbosity level to Loguru levels
+ levels = {0: "INFO", 1: "INFO", 2: "DEBUG", 3: "TRACE"}
+ new_level = levels.get(verbose, "DEBUG")
+
+ # Iterate over all handlers and update the level
+ for handler_id in list(logger._core.handlers):
+ logger.remove(handler_id)
+
+ if verbose != 0:
+ # Re-adding handlers with new levels
+ logger.add(sys.stdout, colorize=True, format="{time} | {level} | {message}",
+ level=new_level)
+ logger.add("nimmy.log", rotation="100 MB", compression="zip", level=new_level)
+
+
+def get_logger():
+ return logger
diff --git a/inventree_niimbot/nimmy/packet.py b/inventree_niimbot/nimmy/packet.py
new file mode 100644
index 0000000..a8ee29d
--- /dev/null
+++ b/inventree_niimbot/nimmy/packet.py
@@ -0,0 +1,37 @@
+#from devtools import debug
+
+
+def packet_to_int(x):
+ return int.from_bytes(x.data, "big")
+
+
+class NiimbotPacket:
+ def __init__(self, type_, data):
+ self.type = type_
+ self.data = data
+
+ @classmethod
+ def from_bytes(cls, pkt):
+ assert pkt[:2] == b"\x55\x55"
+ assert pkt[-2:] == b"\xaa\xaa"
+ type_ = pkt[2]
+ len_ = pkt[3]
+ data = pkt[4 : 4 + len_]
+
+ checksum = type_ ^ len_
+ for i in data:
+ checksum ^= i
+ assert checksum == pkt[-3]
+
+ return cls(type_, data)
+
+ def to_bytes(self):
+ checksum = self.type ^ len(self.data)
+ for i in self.data:
+ checksum ^= i
+ return bytes(
+ (0x55, 0x55, self.type, len(self.data), *self.data, checksum, 0xAA, 0xAA)
+ )
+
+ def __repr__(self):
+ return f""
diff --git a/inventree_niimbot/nimmy/printer.py b/inventree_niimbot/nimmy/printer.py
new file mode 100644
index 0000000..0811e98
--- /dev/null
+++ b/inventree_niimbot/nimmy/printer.py
@@ -0,0 +1,345 @@
+import enum
+import asyncio
+import struct
+import math
+from PIL import Image, ImageOps
+from .exception import BLEException, PrinterException
+from .bluetooth import BLETransport
+from .logger_config import get_logger
+from .packet import NiimbotPacket, packet_to_int
+
+from devtools import debug
+
+logger = get_logger()
+
+
+class InfoEnum(enum.IntEnum):
+ DENSITY = 1
+ PRINTSPEED = 2
+ LABELTYPE = 3
+ LANGUAGETYPE = 6
+ AUTOSHUTDOWNTIME = 7
+ DEVICETYPE = 8
+ SOFTVERSION = 9
+ BATTERY = 10
+ DEVICESERIAL = 11
+ HARDVERSION = 12
+
+
+class RequestCodeEnum(enum.IntEnum):
+ GET_INFO = 64 # 0x40
+ GET_RFID = 26 # 0x1A
+ HEARTBEAT = 220 # 0xDC
+ SET_LABEL_TYPE = 35 # 0x23
+ SET_LABEL_DENSITY = 33 # 0x21
+ START_PRINT = 1 # 0x01
+ END_PRINT = 243 # 0xF3
+ START_PAGE_PRINT = 3 # 0x03
+ END_PAGE_PRINT = 227 # 0xE3
+ ALLOW_PRINT_CLEAR = 32 # 0x20
+ SET_DIMENSION = 19 # 0x13
+ SET_QUANTITY = 21 # 0x15
+ GET_PRINT_STATUS = 163 # 0xA3
+
+
+class PrinterClient:
+ def __init__(self, device):
+ self.char_uuid = None
+ self.device = device
+ self.transport = BLETransport()
+ self.notification_event = asyncio.Event()
+ self.notification_data = None
+
+ async def connect(self):
+ if await self.transport.connect(self.device.address):
+ if not self.char_uuid:
+ await self.find_characteristics()
+ logger.info(f"Successfully connected to {self.device.name}")
+ return True
+ logger.error("Connection failed.")
+ return False
+
+ async def disconnect(self):
+ await self.transport.disconnect()
+ logger.info(f"Printer {self.device.name} disconnected.")
+
+ async def find_characteristics(self):
+ services = {}
+ for service in self.transport.client.services:
+ s = []
+ for char in service.characteristics:
+ s.append({
+ "id": char.uuid,
+ "handle": char.handle,
+ "properties": char.properties
+ })
+
+ services[service.uuid] = s
+
+ for service_id, characteristics in services.items():
+ if len(characteristics) == 1: # Check if there's exactly one characteristic
+ props = characteristics[0]['properties']
+ if 'read' in props and 'write-without-response' in props and 'notify' in props:
+ self.char_uuid = characteristics[0]['id'] # Return the service ID that meets the criteria
+ if not self.char_uuid:
+ raise PrinterException("Cannot find bluetooth characteristics.")
+
+ async def send_command(self, request_code, data, timeout=10):
+ try:
+ if not self.transport.client or not self.transport.client.is_connected:
+ await self.connect()
+ packet = NiimbotPacket(request_code, data)
+ await self.transport.start_notification(self.char_uuid, self.notification_handler)
+ await self.transport.write(packet.to_bytes(), self.char_uuid)
+ logger.debug(
+ f"Printer command sent - {RequestCodeEnum(request_code).name}:{request_code} - {[b for b in data]}")
+
+ await asyncio.wait_for(self.notification_event.wait(), timeout) # Wait until the notification event is set
+ response = NiimbotPacket.from_bytes(self.notification_data)
+ logger.debug(
+ f"Printer response received - {[b for b in response.data]} - {len(response.data)} bytes")
+
+ await self.transport.stop_notification(self.char_uuid)
+ self.notification_event.clear() # Reset the event for the next notification
+ return response
+ except asyncio.TimeoutError:
+ logger.error(f"Timeout occurred for request {RequestCodeEnum(request_code).name}")
+ except BLEException as e:
+ logger.error(f"An error occurred: {e}")
+
+ async def write_raw(self, data):
+ try:
+ if not self.transport.client or not self.transport.client.is_connected:
+ await self.connect()
+ await self.transport.write(data.to_bytes(), self.char_uuid)
+ except BLEException as e:
+ logger.error(f"An error occurred: {e}")
+
+ async def write_no_notify(self, request_code, data):
+ try:
+ if not self.transport.client or not self.transport.client.is_connected:
+ await self.connect()
+ packet = NiimbotPacket(request_code, data)
+ await self.transport.write(packet.to_bytes(), self.char_uuid)
+ except BLEException as e:
+ logger.error(f"An error occurred: {e}")
+
+ def notification_handler(self, sender, data):
+ # print(f"Notification from {sender}: {data}")
+ logger.trace(f"Notification: {data}")
+ self.notification_data = data
+ self.notification_event.set()
+
+ async def print_image(self, image: Image, density: int = 3, quantity: int = 1, vertical_offset= 0,
+ horizontal_offset = 0):
+ await self.set_label_density(density)
+ await self.set_label_type(1)
+ await self.start_print()
+ await self.start_page_print()
+ await self.set_dimension(image.height, image.width)
+ await self.set_quantity(quantity)
+
+ for pkt in self._encode_image(image, vertical_offset, horizontal_offset):
+ # Send each line and wait for a response or status check
+ await self.write_raw(pkt)
+ # Adding a short delay or status check here can help manage buffer issues
+ await asyncio.sleep(0.01) # Adjust the delay as needed based on printer feedback
+
+ while not await self.end_page_print():
+ await asyncio.sleep(0.05)
+
+ while True:
+ status = await self.get_print_status()
+ if status['page'] == quantity:
+ break
+ await asyncio.sleep(0.1)
+
+ await self.end_print()
+
+ async def print_imageV2(self, image: Image, density: int = 3, quantity: int = 1, vertical_offset=0,
+ horizontal_offset=0):
+ await self.set_label_density(density)
+ await self.set_label_type(1)
+ await self.start_printV2(quantity=quantity)
+ await self.start_page_print()
+ await self.set_dimensionV2(image.height, image.width, quantity)
+
+ for pkt in self._encode_image(image, vertical_offset, horizontal_offset):
+ logger.debug(f"Sending packet: {pkt}")
+ # Send each line and wait for a response or status check
+ await self.write_raw(pkt)
+ # Adding a short delay or status check here can help manage buffer issues
+ # Adjust the delay as needed based on printer feedback
+ await asyncio.sleep(0.01)
+
+ await self.end_page_print()
+
+ await asyncio.sleep(2) # Sleep for some time, looks like it enhances the reliability of the print job
+
+ def _encode_image(self, image: Image, vertical_offset=0, horizontal_offset=0):
+ # Convert the image to monochrome
+ img = ImageOps.invert(image.convert("L")).convert("1")
+
+ # Apply horizontal offset
+ if horizontal_offset > 0:
+ img = ImageOps.expand(img, border=(horizontal_offset, 0, 0, 0), fill=1)
+ else:
+ img = img.crop((-horizontal_offset, 0, img.width, img.height))
+
+ # Add vertical padding for vertical offset
+ img = ImageOps.expand(img, border=(0, vertical_offset, 0, 0), fill=1)
+
+ for y in range(img.height):
+ line_data = [img.getpixel((x, y)) for x in range(img.width)]
+ line_data = "".join("0" if pix == 0 else "1" for pix in line_data)
+ line_data = int(line_data, 2).to_bytes(math.ceil(img.width / 8), "big")
+ counts = (0, 0, 0) # It seems like you can always send zeros
+ header = struct.pack(">H3BB", y, *counts, 1)
+ pkt = NiimbotPacket(0x85, header + line_data)
+ yield pkt
+
+ async def get_info(self, key):
+ response = await self.send_command(RequestCodeEnum.GET_INFO, bytes((key,)))
+
+ match key:
+ case InfoEnum.DEVICESERIAL:
+ return response.data.hex()
+ case InfoEnum.SOFTVERSION:
+ return packet_to_int(response) / 100
+ case InfoEnum.HARDVERSION:
+ return packet_to_int(response) / 100
+ case _:
+ return packet_to_int(response)
+
+ return None
+
+ async def get_rfid(self):
+ packet = await self.send_command(RequestCodeEnum.GET_RFID, b"\x01")
+ data = packet.data
+
+ if data[0] == 0:
+ return None
+ uuid = data[0:8].hex()
+ idx = 8
+
+ barcode_len = data[idx]
+ idx += 1
+ barcode = data[idx: idx + barcode_len].decode()
+
+ idx += barcode_len
+ serial_len = data[idx]
+ idx += 1
+ serial = data[idx: idx + serial_len].decode()
+
+ idx += serial_len
+ total_len, used_len, type_ = struct.unpack(">HHB", data[idx:])
+ return {
+ "uuid": uuid,
+ "barcode": barcode,
+ "serial": serial,
+ "used_len": used_len,
+ "total_len": total_len,
+ "type": type_,
+ }
+
+ async def heartbeat(self):
+ packet = await self.send_command(RequestCodeEnum.HEARTBEAT, b"\x01")
+ closing_state = None
+ power_level = None
+ paper_state = None
+ rfid_read_state = None
+
+ match len(packet.data):
+ case 20:
+ paper_state = packet.data[18]
+ rfid_read_state = packet.data[19]
+ case 13:
+ closing_state = packet.data[9]
+ power_level = packet.data[10]
+ paper_state = packet.data[11]
+ rfid_read_state = packet.data[12]
+ case 19:
+ closing_state = packet.data[15]
+ power_level = packet.data[16]
+ paper_state = packet.data[17]
+ rfid_read_state = packet.data[18]
+ case 10:
+ closing_state = packet.data[8]
+ power_level = packet.data[9]
+ rfid_read_state = packet.data[8]
+ case 9:
+ closing_state = packet.data[8]
+
+ return {
+ "closing_state": closing_state,
+ "power_level": power_level,
+ "paper_state": paper_state,
+ "rfid_read_state": rfid_read_state,
+ }
+
+ async def set_label_type(self, n):
+ assert 1 <= n <= 3
+ packet = await self.send_command(RequestCodeEnum.SET_LABEL_TYPE, bytes((n,)))
+ return bool(packet.data[0])
+
+ async def set_label_density(self, n):
+ assert 1 <= n <= 5 # B21 has 5 levels, not sure for D11
+ packet = await self.send_command(RequestCodeEnum.SET_LABEL_DENSITY, bytes((n,)))
+ return bool(packet.data[0])
+
+ async def start_print(self):
+ packet = await self.send_command(RequestCodeEnum.START_PRINT, b"\x01")
+ return bool(packet.data[0])
+
+ async def start_printV2(self, quantity):
+ assert 0 <= quantity <= 65535 # assume quantity can not be greater than 65535 (2 bytes)
+
+ command = struct.pack('H', quantity)
+ packet = await self.send_command(RequestCodeEnum.START_PRINT, b'\x00' + command + b'\x00\x00\x00\x00')
+ return bool(packet.data[0])
+
+ async def end_print(self):
+ packet = await self.send_command(RequestCodeEnum.END_PRINT, b"\x01")
+ return bool(packet.data[0])
+
+ async def start_page_print(self):
+ packet = await self.send_command(RequestCodeEnum.START_PAGE_PRINT, b"\x01")
+ return bool(packet.data[0])
+
+ async def end_page_print(self):
+ packet = await self.send_command(RequestCodeEnum.END_PAGE_PRINT, b"\x01")
+ return bool(packet.data[0])
+
+ async def allow_print_clear(self):
+ packet = await self.send_command(RequestCodeEnum.ALLOW_PRINT_CLEAR, b"\x01")
+ return bool(packet.data[0])
+
+ async def set_dimension(self, w, h):
+ packet = await self.send_command(
+ RequestCodeEnum.SET_DIMENSION, struct.pack(">HH", w, h)
+ )
+ return bool(packet.data[0])
+
+ async def set_dimensionV2(self, w, h, copies):
+ logger.debug(f"Setting dimension: {w}x{h}")
+ packet = await self.send_command(
+ RequestCodeEnum.SET_DIMENSION, struct.pack(">HHH", w, h, copies)
+ )
+ return bool(packet.data[0])
+
+ async def set_quantity(self, n):
+ packet = await self.send_command(RequestCodeEnum.SET_QUANTITY, struct.pack(">H", n))
+ return bool(packet.data[0])
+
+ async def get_print_status(self):
+ packet = await self.send_command(RequestCodeEnum.GET_PRINT_STATUS, b"\x01")
+ page, progress1, progress2 = struct.unpack(">HBB", packet.data)
+ return {"page": page, "progress1": progress1, "progress2": progress2}
+
+ def __del__(self):
+ if self.transport.client.is_connected:
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ loop.create_task(self.disconnect())
+ else:
+ loop.run_until_complete(self.disconnect())
diff --git a/inventree_niimbot/version.py b/inventree_niimbot/version.py
new file mode 100644
index 0000000..db753d6
--- /dev/null
+++ b/inventree_niimbot/version.py
@@ -0,0 +1,3 @@
+"""Version information for the plugin"""
+
+NIIMBOT_PLUGIN_VERSION = "0.1.0"
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..09ea220
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,9 @@
+[flake8]
+ignore =
+ # - W293 - blank lines contain whitespace
+ W293,
+ # - E501 - line too long (82 characters)
+ E501
+ N802
+exclude = .git,__pycache__,dist,build,test.py
+max-complexity = 20
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..7a31643
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+
+import setuptools
+
+from inventree_niimbot.version import NIIMBOT_PLUGIN_VERSION
+
+with open('README.md', encoding='utf-8') as f:
+ long_description = f.read()
+
+
+setuptools.setup(
+ name="inventree-niimbot-plugin",
+
+ version=NIIMBOT_PLUGIN_VERSION,
+
+ author="piramja",
+
+ author_email="info@piramja.de",
+
+ description="Niimbot label printer (b1, b18, b21, d11, d110) plugin for InvenTree",
+
+ long_description=long_description,
+
+ long_description_content_type='text/markdown',
+
+ keywords="inventree inventreeplugins label printer printing inventory",
+
+ url="https://github.com/inventree/inventree-niimbot-plugin",
+
+ license="MIT",
+
+ packages=setuptools.find_packages(),
+
+ install_requires=[
+ 'bleak==0.21.1',
+ 'devtools==0.12.2',
+ 'setuptools==69.5.1',
+ 'markdown-it-py==3.0.0',
+ 'loguru==0.7.2',
+ 'pillow==10.3.0',
+ ],
+
+ setup_requires=[
+ "wheel",
+ "twine",
+ ],
+
+ python_requires=">=3.9",
+
+ entry_points={
+ "inventree_plugins": [
+ "NiimbotLabeLPlugin = inventree_niimbot.niimbot_plugin:NiimbotLabelPlugin"
+ ]
+ },
+)