Cura/plugins/USBPrinting/USBPrinterOutputDeviceManager.py

124 lines
4.9 KiB
Python

# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import threading
import time
import serial.tools.list_ports
from PyQt5.QtCore import QObject, pyqtSignal
from UM.Signal import Signal, signalemitter
from UM.OutputDevice.OutputDevicePlugin import OutputDevicePlugin
from UM.i18n import i18nCatalog
from cura.PrinterOutput.PrinterOutputDevice import ConnectionState
from . import USBPrinterOutputDevice
i18n_catalog = i18nCatalog("cura")
## Manager class that ensures that an USBPrinterOutput device is created for every connected USB printer.
@signalemitter
class USBPrinterOutputDeviceManager(QObject, OutputDevicePlugin):
addUSBOutputDeviceSignal = Signal()
progressChanged = pyqtSignal()
def __init__(self, application, parent = None):
if USBPrinterOutputDeviceManager.__instance is not None:
raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__)
USBPrinterOutputDeviceManager.__instance = self
super().__init__(parent = parent)
self._application = application
self._serial_port_list = []
self._usb_output_devices = {}
self._usb_output_devices_model = None
self._update_thread = threading.Thread(target = self._updateThread)
self._update_thread.setDaemon(True)
self._check_updates = True
self._application.applicationShuttingDown.connect(self.stop)
# Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
self.addUSBOutputDeviceSignal.connect(self.addOutputDevice)
self._application.globalContainerStackChanged.connect(self.updateUSBPrinterOutputDevices)
# The method updates/reset the USB settings for all connected USB devices
def updateUSBPrinterOutputDevices(self):
for key, device in self._usb_output_devices.items():
if isinstance(device, USBPrinterOutputDevice.USBPrinterOutputDevice):
device.resetDeviceSettings()
def start(self):
self._check_updates = True
self._update_thread.start()
def stop(self, store_data: bool = True):
self._check_updates = False
def _onConnectionStateChanged(self, serial_port):
if serial_port not in self._usb_output_devices:
return
changed_device = self._usb_output_devices[serial_port]
if changed_device.connectionState == ConnectionState.Connected:
self.getOutputDeviceManager().addOutputDevice(changed_device)
else:
self.getOutputDeviceManager().removeOutputDevice(serial_port)
def _updateThread(self):
while self._check_updates:
container_stack = self._application.getGlobalContainerStack()
if container_stack is None:
time.sleep(5)
continue
port_list = [] # Just an empty list; all USB devices will be removed.
if container_stack.getMetaDataEntry("supports_usb_connection"):
machine_file_formats = [file_type.strip() for file_type in container_stack.getMetaDataEntry("file_formats").split(";")]
if "text/x-gcode" in machine_file_formats:
port_list = self.getSerialPortList(only_list_usb=True)
self._addRemovePorts(port_list)
time.sleep(5)
## Helper to identify serial ports (and scan for them)
def _addRemovePorts(self, serial_ports):
# First, find and add all new or changed keys
for serial_port in list(serial_ports):
if serial_port not in self._serial_port_list:
self.addUSBOutputDeviceSignal.emit(serial_port) # Hack to ensure its created in main thread
continue
self._serial_port_list = list(serial_ports)
for port, device in self._usb_output_devices.items():
if port not in self._serial_port_list:
device.close()
## Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
def addOutputDevice(self, serial_port):
device = USBPrinterOutputDevice.USBPrinterOutputDevice(serial_port)
device.connectionStateChanged.connect(self._onConnectionStateChanged)
self._usb_output_devices[serial_port] = device
device.connect()
## Create a list of serial ports on the system.
# \param only_list_usb If true, only usb ports are listed
def getSerialPortList(self, only_list_usb = False):
base_list = []
for port in serial.tools.list_ports.comports():
if not isinstance(port, tuple):
port = (port.device, port.description, port.hwid)
if only_list_usb and not port[2].startswith("USB"):
continue
base_list += [port[0]]
return list(base_list)
__instance = None # type: USBPrinterOutputDeviceManager
@classmethod
def getInstance(cls, *args, **kwargs) -> "USBPrinterOutputDeviceManager":
return cls.__instance