mirror of
https://git.mirrors.martin98.com/https://github.com/Ultimaker/Cura
synced 2025-05-05 23:04:30 +08:00
162 lines
7.3 KiB
Python
162 lines
7.3 KiB
Python
from UM.Signal import Signal, SignalEmitter
|
|
from UM.PluginObject import PluginObject
|
|
from . import PrinterConnection
|
|
from UM.Application import Application
|
|
from UM.Scene.Iterator.DepthFirstIterator import DepthFirstIterator
|
|
from UM.Scene.SceneNode import SceneNode
|
|
import threading
|
|
import platform
|
|
import glob
|
|
import time
|
|
import os
|
|
|
|
class USBPrinterManager(SignalEmitter,PluginObject):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._serial_port_list = []
|
|
self._printer_connections = []
|
|
self._check_ports_thread = threading.Thread(target=self._updateConnectionList)
|
|
self._check_ports_thread.daemon = True
|
|
self._check_ports_thread.start()
|
|
|
|
## DEBUG CODE
|
|
#time.sleep(1)
|
|
#self.connectAllConnections()
|
|
#time.sleep(5)
|
|
#f = open("Orb.gcode")
|
|
#lines = f.readlines()
|
|
#print(len(lines))
|
|
#print(len(self._printer_connections))
|
|
#self.sendGCodeToAllActive(lines)
|
|
#print("sending heat " , self.sendCommandToAllActive("M104 S190"))
|
|
|
|
|
|
## Check all serial ports and create a PrinterConnection object for them.
|
|
# Note that this does not validate if the serial ports are actually usable!
|
|
# This is only done when the connect function is called.
|
|
def _updateConnectionList(self):
|
|
while True:
|
|
temp_serial_port_list = self.getSerialPortList(only_list_usb = True)
|
|
if temp_serial_port_list != self._serial_port_list: # Something changed about the list since we last changed something.
|
|
disconnected_ports = [port for port in self._serial_port_list if port not in temp_serial_port_list ]
|
|
self._serial_port_list = temp_serial_port_list
|
|
for serial_port in self._serial_port_list:
|
|
if self.getConnectionByPort(serial_port) is None: #If it doesn't already exist, add it
|
|
if not os.path.islink(serial_port): #Only add the connection if it's a non symbolic link
|
|
connection = PrinterConnection.PrinterConnection(serial_port)
|
|
connection.connect()
|
|
connection.connectionStateChanged.connect(self.serialConectionStateCallback)
|
|
self._printer_connections.append(connection)
|
|
|
|
for serial_port in disconnected_ports: # Close connections and remove them from list.
|
|
connection = self.getConnectionByPort(serial_port)
|
|
if connection != None:
|
|
connection.close()
|
|
self._printer_connections.remove(connection)
|
|
time.sleep(5) #Throttle, as we don't need this information to be updated every single second.
|
|
|
|
## Attempt to connect with all possible connections.
|
|
def connectAllConnections(self):
|
|
for connection in self._printer_connections:
|
|
connection.connect()
|
|
|
|
## send gcode to printer and start printing
|
|
def sendGCodeByPort(self, serial_port, gcode_list):
|
|
printer_connection = self.getConnectionByPort(serial_port)
|
|
if printer_connection is not None:
|
|
printer_connection.printGCode(gcode_list)
|
|
return True
|
|
return False
|
|
|
|
## Send gcode to all active printers.
|
|
# \return True if there was at least one active connection.
|
|
def sendGCodeToAllActive(self, gcode_list):
|
|
for printer_connection in self.getActiveConnections():
|
|
printer_connection.printGCode(gcode_list)
|
|
if len(self.getActiveConnections()):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
## Send a command to a printer indentified by port
|
|
# \param serial port String indentifieing the port
|
|
# \param command String with the g-code command to send.
|
|
# \return True if connection was found, false otherwise
|
|
def sendCommandByPort(self, serial_port, command):
|
|
printer_connection = self.getConnectionByPort(serial_port)
|
|
if printer_connection is not None:
|
|
printer_connection.sendCommand(command)
|
|
return True
|
|
return False
|
|
|
|
## Send a command to all active (eg; connected) printers
|
|
# \param command String with the g-code command to send.
|
|
# \return True if at least one connection was found, false otherwise
|
|
def sendCommandToAllActive(self, command):
|
|
for printer_connection in self.getActiveConnections():
|
|
printer_connection.sendCommand(command)
|
|
if len(self.getActiveConnections()):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def serialConectionStateCallback(self,serial_port):
|
|
connection = self.getConnectionByPort(serial_port)
|
|
if connection.isConnected():
|
|
Application.getInstance().addOutputDevice(serial_port, {
|
|
'id': serial_port,
|
|
'function': self._writeToSerial,
|
|
'description': 'Write to USB {0}'.format(serial_port),
|
|
'icon': 'print_usb',
|
|
'priority': 1
|
|
})
|
|
|
|
def _writeToSerial(self, serial_port):
|
|
for node in DepthFirstIterator(Application.getInstance().getController().getScene().getRoot()):
|
|
if type(node) is not SceneNode or not node.getMeshData():
|
|
continue
|
|
|
|
gcode = getattr(node.getMeshData(), 'gcode', False)
|
|
self.sendGCodeByPort(serial_port,gcode.split('\n'))
|
|
|
|
|
|
## Get a list of printer connection objects that are connected.
|
|
def getActiveConnections(self):
|
|
return [connection for connection in self._printer_connections if connection.isConnected()]
|
|
|
|
## get a printer connection object by serial port
|
|
def getConnectionByPort(self, serial_port):
|
|
for printer_connection in self._printer_connections:
|
|
if serial_port == printer_connection.getSerialPort():
|
|
return printer_connection
|
|
return None
|
|
|
|
## Create a list of serial ports on the system.
|
|
# \param only_list_usb If true, only usb ports are listed
|
|
def getSerialPortList(self,only_list_usb=False):
|
|
base_list=[]
|
|
if platform.system() == "Windows":
|
|
try:
|
|
key=_winreg.OpenKey(_winreg.HKEY_LOCAL_MACHINE,"HARDWARE\\DEVICEMAP\\SERIALCOMM")
|
|
i=0
|
|
while True:
|
|
values = _winreg.EnumValue(key, i)
|
|
if not base_list or 'USBSER' in values[0]:
|
|
base_list+=[values[1]]
|
|
i+=1
|
|
except:
|
|
pass
|
|
|
|
if base_list:
|
|
base_list = base_list + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob("/dev/cu.usb*")
|
|
base_list = filter(lambda s: not 'Bluetooth' in s, base_list) #Filter because mac sometimes puts them in the list
|
|
#prev = profile.getMachineSetting('serial_port_auto')
|
|
#if prev in base_list:
|
|
# base_list.remove(prev)
|
|
# base_list.insert(0, prev)
|
|
else:
|
|
base_list = base_list + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob("/dev/cu.*") + glob.glob("/dev/tty.usb*") + glob.glob("/dev/rfcomm*") + glob.glob('/dev/serial/by-id/*')
|
|
#if version.isDevVersion() and not base_list:
|
|
#base_list.append('VIRTUAL')
|
|
return base_list |