mirror of
https://git.mirrors.martin98.com/https://github.com/Ultimaker/Cura
synced 2025-04-20 12:49:38 +08:00
152 lines
6.0 KiB
Python
152 lines
6.0 KiB
Python
# Copyright (c) 2018 Ultimaker B.V.
|
|
# Cura is released under the terms of the LGPLv3 or higher.
|
|
|
|
import threading
|
|
import time
|
|
import serial.tools.list_ports
|
|
from os import environ
|
|
from re import search
|
|
|
|
from PyQt5.QtCore import QObject, pyqtSignal
|
|
|
|
from UM.Signal import Signal, signalemitter
|
|
from UM.OutputDevice.OutputDevicePlugin import OutputDevicePlugin
|
|
from UM.i18n import i18nCatalog
|
|
|
|
from cura.PrinterOutput.PrinterOutputDevice import ConnectionState
|
|
|
|
from . import USBPrinterOutputDevice
|
|
|
|
i18n_catalog = i18nCatalog("cura")
|
|
|
|
|
|
@signalemitter
|
|
class USBPrinterOutputDeviceManager(QObject, OutputDevicePlugin):
|
|
"""Manager class that ensures that an USBPrinterOutput device is created for every connected USB printer."""
|
|
|
|
addUSBOutputDeviceSignal = Signal()
|
|
progressChanged = pyqtSignal()
|
|
|
|
def __init__(self, application, parent = None):
|
|
if USBPrinterOutputDeviceManager.__instance is not None:
|
|
raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__)
|
|
USBPrinterOutputDeviceManager.__instance = self
|
|
|
|
super().__init__(parent = parent)
|
|
self._application = application
|
|
|
|
self._serial_port_list = []
|
|
self._usb_output_devices = {}
|
|
self._usb_output_devices_model = None
|
|
self._update_thread = threading.Thread(target = self._updateThread)
|
|
self._update_thread.setDaemon(True)
|
|
|
|
self._check_updates = True
|
|
|
|
self._application.applicationShuttingDown.connect(self.stop)
|
|
# Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
|
|
self.addUSBOutputDeviceSignal.connect(self.addOutputDevice)
|
|
|
|
self._application.globalContainerStackChanged.connect(self.updateUSBPrinterOutputDevices)
|
|
|
|
# The method updates/reset the USB settings for all connected USB devices
|
|
def updateUSBPrinterOutputDevices(self):
|
|
for device in self._usb_output_devices.values():
|
|
if isinstance(device, USBPrinterOutputDevice.USBPrinterOutputDevice):
|
|
device.resetDeviceSettings()
|
|
|
|
def start(self):
|
|
self._check_updates = True
|
|
self._update_thread.start()
|
|
|
|
def stop(self, store_data: bool = True):
|
|
self._check_updates = False
|
|
|
|
def _onConnectionStateChanged(self, serial_port):
|
|
if serial_port not in self._usb_output_devices:
|
|
return
|
|
|
|
changed_device = self._usb_output_devices[serial_port]
|
|
if changed_device.connectionState == ConnectionState.Connected:
|
|
self.getOutputDeviceManager().addOutputDevice(changed_device)
|
|
else:
|
|
self.getOutputDeviceManager().removeOutputDevice(serial_port)
|
|
|
|
def _updateThread(self):
|
|
while self._check_updates:
|
|
container_stack = self._application.getGlobalContainerStack()
|
|
if container_stack is None:
|
|
time.sleep(5)
|
|
continue
|
|
port_list = [] # Just an empty list; all USB devices will be removed.
|
|
if container_stack.getMetaDataEntry("supports_usb_connection"):
|
|
machine_file_formats = [file_type.strip() for file_type in container_stack.getMetaDataEntry("file_formats").split(";")]
|
|
if "text/x-gcode" in machine_file_formats:
|
|
port_list = self.getSerialPortList(only_list_usb=True)
|
|
self._addRemovePorts(port_list)
|
|
time.sleep(5)
|
|
|
|
def _addRemovePorts(self, serial_ports):
|
|
"""Helper to identify serial ports (and scan for them)"""
|
|
|
|
# First, find and add all new or changed keys
|
|
for serial_port in list(serial_ports):
|
|
if serial_port not in self._serial_port_list:
|
|
self.addUSBOutputDeviceSignal.emit(serial_port) # Hack to ensure its created in main thread
|
|
continue
|
|
self._serial_port_list = list(serial_ports)
|
|
|
|
for port, device in self._usb_output_devices.items():
|
|
if port not in self._serial_port_list:
|
|
device.close()
|
|
|
|
def addOutputDevice(self, serial_port):
|
|
"""Because the model needs to be created in the same thread as the QMLEngine, we use a signal."""
|
|
|
|
device = USBPrinterOutputDevice.USBPrinterOutputDevice(serial_port)
|
|
device.connectionStateChanged.connect(self._onConnectionStateChanged)
|
|
self._usb_output_devices[serial_port] = device
|
|
device.connect()
|
|
|
|
def getSerialPortList(self, only_list_usb = False):
|
|
"""Create a list of serial ports on the system.
|
|
|
|
:param only_list_usb: If true, only usb ports are listed
|
|
"""
|
|
base_list = []
|
|
for port in serial.tools.list_ports.comports():
|
|
if not isinstance(port, tuple):
|
|
port = (port.device, port.description, port.hwid)
|
|
if only_list_usb and not port[2].startswith("USB"):
|
|
continue
|
|
|
|
# To prevent cura from messing with serial ports of other devices,
|
|
# filter by regular expressions passed in as environment variables.
|
|
# Get possible patterns with python3 -m serial.tools.list_ports -v
|
|
|
|
# set CURA_DEVICENAMES=USB[1-9] -> e.g. not matching /dev/ttyUSB0
|
|
pattern = environ.get('CURA_DEVICENAMES')
|
|
if pattern and not search(pattern, port[0]):
|
|
continue
|
|
|
|
# set CURA_DEVICETYPES=CP2102 -> match a type of serial converter
|
|
pattern = environ.get('CURA_DEVICETYPES')
|
|
if pattern and not search(pattern, port[1]):
|
|
continue
|
|
|
|
# set CURA_DEVICEINFOS=LOCATION=2-1.4 -> match a physical port
|
|
# set CURA_DEVICEINFOS=VID:PID=10C4:EA60 -> match a vendor:product
|
|
pattern = environ.get('CURA_DEVICEINFOS')
|
|
if pattern and not search(pattern, port[2]):
|
|
continue
|
|
|
|
base_list += [port[0]]
|
|
|
|
return list(base_list)
|
|
|
|
__instance = None # type: USBPrinterOutputDeviceManager
|
|
|
|
@classmethod
|
|
def getInstance(cls, *args, **kwargs) -> "USBPrinterOutputDeviceManager":
|
|
return cls.__instance
|