pygcode/scripts/grbl-stream
2017-08-06 19:00:55 +10:00

1081 lines
35 KiB
Python
Executable File

#!/usr/bin/env python
import curses
import time
import six
import copy
import argparse
import re
import threading
import serial
for pygcode_lib_type in ('installed_lib', 'relative_lib'):
try:
# pygcode
from pygcode import NullMachine
from pygcode import GCodeRapidMove
from pygcode import GCodeIncrementalDistanceMode
from pygcode import text2gcodes
except ImportError:
import sys, os, inspect
# Add pygcode (relative to this test-path) to the system path
_this_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, os.path.join(_this_path, '..', 'src'))
if pygcode_lib_type == 'installed_lib':
continue # import was attempted before sys.path addition. retry import
raise # otherwise the raised ImportError is a genuine problem
break
class Widget(object):
pass
class Label(Widget):
def __init__(self, window, row, col, len, text='', prefix=''):
self.window = window
self.row = row
self.col = col
self.len = len
self._text = text
self.prefix = prefix
self.render()
def render(self):
text = self._text + (' ' * max(0, self.len - len(self._text)))
self.window.addstr(self.row, self.col, self.prefix + text)
@property
def text(self):
return self._text
@text.setter
def text(self, val):
self._text = val
self.render()
class NumberLabel(Widget):
def __init__(self, window, row, col, value=0.0):
self.window = window
self.row = row
self.col = col
self._value = value
self.render()
def render(self):
val_text = "{:>8s}".format("{:.3f}".format(self.value))
self.window.addstr(self.row, self.col, val_text)
@property
def value(self):
return self._value
@value.setter
def value(self, v):
self._value = v
self.render()
class Button(Widget):
def __init__(self, window, label, row, col):
self.window = window
self.label = label
self.row = row
self.col = col
self.render()
def flash(self):
self.render(strong=True)
self.window.refresh()
time.sleep(0.1)
self.render()
self.window.refresh()
def render(self, strong=False):
addstr_params = [self.row, self.col, '[{}]'.format(self.label)]
if strong:
addstr_params.append(curses.A_BOLD)
self.window.addstr(*addstr_params)
class Banner(Widget):
label_col = 4 # <space>text<space>
def __init__(self, window, label=None, row=0):
self.window = window
self._label = label
self.row = row
self.render()
def render(self, strong=False):
(y, x) = self.window.getmaxyx()
label_str = " {} ".format(self._label)
hline_params = [0, 0, curses.ACS_HLINE, x]
addstr_params = [self.row, self.label_col, " {} ".format(self._label)]
if strong:
hline_params.append(curses.A_BOLD)
addstr_params.append(curses.A_BOLD)
self.window.hline(*hline_params)
self.window.addstr(*addstr_params)
@property
def label(self):
return self._label
@label.setter
def label(self, value):
self._label = value
self.render()
class Status(object):
row = 0
banner_prefix = 'Status: '
def __init__(self, screen):
self.screen = screen
(max_y, max_x) = self.screen.getmaxyx()
self.window = curses.newwin(5, max_x, self.row, 0)
self.status = ''
self.banner = Banner(self.window, self.banner_prefix)
# Print backdrop (to use as template)
backdrop = [
'Jog: [xxxx.yyy ] MPos WPos',
' [Y+] [Z+] X |xxxx.yyy |xxxx.yyy | Feed Rate: ?',
'[X-] [X+] Y |xxxx.yyy |xxxx.yyy | Spindle: ?',
' [Y-] [Z-] Z |xxxx.yyy |xxxx.yyy | ',
]
for (row, line) in enumerate(backdrop):
self.window.addstr(row + 1, 0, line[:max_x])
# Widgets
self.widgets = {
'X+': Button(self.window, 'X+', 3, 6),
'X-': Button(self.window, 'X-', 3, 0),
'Y+': Button(self.window, 'Y+', 2, 3),
'Y-': Button(self.window, 'Y-', 4, 3),
'Z+': Button(self.window, 'Z+', 2, 11),
'Z-': Button(self.window, 'Z-', 4, 11),
'jog': NumberLabel(self.window, 1, 6, 0.001),
'MPosX': NumberLabel(self.window, 2, 21),
'MPosY': NumberLabel(self.window, 3, 21),
'MPosZ': NumberLabel(self.window, 4, 21),
'WPosX': NumberLabel(self.window, 2, 31),
'WPosY': NumberLabel(self.window, 3, 31),
'WPosZ': NumberLabel(self.window, 4, 31),
'feed_rate': Label(self.window, 2, 44, len=20, text='?', prefix='Feed Rate: '),
'spindle': Label(self.window, 3, 44, len=20, text='?', prefix='Spindle: '),
}
self.refresh()
def set_status(self, status):
self.status = status
self.banner.label = "{prefix}{label}".format(
prefix=self.banner_prefix,
label=status,
)
self.refresh()
@property
def is_idle(self):
return self.status == 'Idle'
def refresh(self):
self.window.refresh()
# Line types:
# gcode: <'>' if cur> <gcode> > <response>
# info: <'>' if cur> <info text>
class GCodeContent(object):
def __init__(self, gcode, sent=False, status=''):
self.gcode = gcode
self.sent = sent
self.status = status
def to_str(self, width):
gcode_w = max(0, width - 23)
return ("{gcode:<%i.%is} {sent} {status:<20s}" % (gcode_w, gcode_w)).format(
gcode=self.gcode,
sent='>' if self.sent else ' ',
status=self.status,
)
class ConsoleLine(object):
def __init__(self, window, content, cur=False):
self.window = window
self.content = content
self._cur = cur
self._width = 0
self.update_width()
def render(self, row):
line = '> ' if self._cur else ' '
if isinstance(self.content, GCodeContent):
line += self.content.to_str(max(0, self._width - len(line)))
else:
line += str(self.content)
line = ("{:%i.%is}" % (self._width-1,self._width-1)).format(line)
self.window.addstr(row, 0, line)
def update_width(self):
(_, self._width) = self.window.getmaxyx()
@property
def cur(self):
return self._cur
@cur.setter
def cur(self, value):
self._cur = value
class AccordionWindow(object):
"""
A region of the screen that can push those above and below it around.
eg: as an active accordion window's content grows, it can push the window
below it down, and make it smaller.
"""
def __init__(self, screen, title, soft_height=2, min_height=0):
self.screen = screen
self.title = title
self.soft_height = soft_height
self.min_height = min_height
self.focus = False
self.lines = [] # list of ConsoleLine instances (first are at the top)
self.window = None
self.banner = None
self.add_line_callback = None
def init_window(self, row, height):
self.window = curses.newwin(height, self.screen.getmaxyx()[1], row, 0)
self.banner = Banner(self.window, self.title)
def _add_line(self, line):
i = len(self.lines)
self.lines.append(line)
if self.add_line_callback:
self.add_line_callback(self)
def move_window(self, row, height):
self.window.resize(height, self.screen.getmaxyx()[1])
self.window.mvwin(row, 0)
#self.window.box()
#self.window.addstr(0, 5, self.title)
def render(self, active=False):
#self.window.clear() # FIXME: start fresh every render?, inefficient
self.banner.render(strong=active)
(rows, cols) = self.window.getmaxyx()
if rows > 1: # we have room to render lines
for (i, line) in enumerate(self.lines[-(rows - 1):]):
line.render(i + 1)
self.refresh()
def __eq__(self, other):
return self.title == other.title
def refresh(self):
self.window.refresh()
def clear(self):
self.lines = []
self.window.clear()
class AccordionWindowManager(object):
def __init__(self, screen, windows, header_height, footer_height=0):
self.screen = screen
self.windows = windows
self.header_height = header_height
self.footer_height = footer_height
self._focus_index = 0
self._log = []
# Initialize accordion window's curses.Window instance
def _add_line_cb(window):
self.update_distrobution()
window.render()
cur_row = self.header_height
for (i, window) in enumerate(self.windows):
window.init_window(cur_row, window.soft_height)
window.refresh()
window.add_line_callback = _add_line_cb
cur_row += window.min_height
@property
def focus(self):
return self.windows[self._focus_index]
@focus.setter
def focus(self, value):
self._focus_index = self.windows.index(value)
self.update_distrobution()
def update_distrobution(self):
# rows available across space
rows = self.screen.getmaxyx()[0] - (self.header_height + self.footer_height)
min_occupied = sum(w.min_height for w in self.windows)
available = rows - min_occupied
if rows - min_occupied < 3:
pass # TODO: not enough room to respect min_heights
# focussed window height
# - self.focus.min_height
# - len(self.focus.lines) + 1
# - available + self.focus.min_height # max available space
extra_alocate = max(0, (len(self.focus.lines) + 1) - self.focus.min_height)
height_f = min(available, extra_alocate) + self.focus.min_height
available -= height_f - self.focus.min_height
cur_row = self.header_height
for w in self.windows:
if w == self.focus:
height = height_f
else:
extra_alocate = max(0, (len(w.lines) + 1) - w.min_height)
height = min(available, extra_alocate) + w.min_height
available -= height - w.min_height
w.move_window(cur_row, height)
w.render(active=True if w == self.focus else False)
cur_row += height
self.refresh()
def refresh(self):
for w in self.windows:
w.refresh()
class InitWindow(AccordionWindow):
def add_line(self, text):
obj = ConsoleLine(self.window, text)
self._add_line(obj)
return obj
class JoggingWindow(AccordionWindow):
def add_line(self, gcode, sent=False, status=''):
obj = ConsoleLine(self.window, GCodeContent(
gcode=gcode, sent=sent, status=status
))
self._add_line(obj)
return obj
class StreamWindow(AccordionWindow):
def add_line(self, gcode, sent=False, status=''):
obj = ConsoleLine(self.window, GCodeContent(
gcode=gcode, sent=sent, status=status
))
self._add_line(obj)
return obj
JOGGING_VALUES = [
0.001, 0.01, 0.1, 1, 10, 25, 50, 100, 250, 500
]
def jog_value(cur_val, dindex):
try:
i = JOGGING_VALUES.index(cur_val)
except ValueError:
i = 0
i = max(0, min(i + dindex, len(JOGGING_VALUES) - 1))
return JOGGING_VALUES[i]
# ---------------- Serial Utilities ---------------------
class SerialPort(object):
def __init__(self, device, baudrate, logfilename=None):
self.device = device
self.baudrate = baudrate
self.serial = serial.Serial(self.device, self.baudrate)
self.logfilename = logfilename
self.log = None
self._cur_line = '' # buffered line chars before '\n' received
if self.logfilename:
self.log = open(self.logfilename, 'w')
def __del__(self):
self.serial.close()
if self.log:
self.log.close()
def _log_write(self, prefix, msg):
if self.log:
self.log.write("[{time:.2f}] {prefix} {msg}\n".format(
time=time.time(),
prefix=prefix,
msg=msg.replace('\n', r'\n').replace('\r', r'\r'),
))
def write(self, data):
self._log_write('>>', data)
self.serial.write(data)
def readlines(self, timeout=None):
start_time = time.time()
orig_timeout = self.serial.timeout
def _new_timeout():
"""Return linearly diminished timeout (with realtime)"""
if timeout is None:
return None
cur_time = time.time()
if cur_time - start_time >= timeout:
return 0.0
return timeout - (cur_time - start_time)
while True:
# Set read timeout
time_remaining = _new_timeout()
if time_remaining == 0:
break
self.serial.timeout = time_remaining
# read
# FIXME: char by char (a bit clunky, but easy to write)
try:
received_chr = self.serial.read()
except serial.serialutil.SerialException:
continue # terminal resize interrupts serial read
self._cur_line += received_chr
if self._cur_line.endswith('\n'):
self._log_write('<<', self._cur_line)
received_line = re.sub(r'\r?\n$', '', self._cur_line)
self._cur_line = ''
yield received_line
self.serial.timeout = orig_timeout
class GCodeStreamException(Exception):
"""Raised when GRBL responds with error"""
pass
class GCodeStreamer(object):
# - keep track of GRBL buffer size
# - transmit to GRBL
# - read GRLB responses and delegate accordingly
class Line(object):
# - handle status & screen update
# - new
# - sent
# - received (status string) (raise exception if bad
# - send
# - publish on screen
# - set status
# - publish status on screen
# - report bad status back to streamer
def __init__(self, gcode, widget=None):
# verify parameter(s)
if widget is not None:
assert isinstance(widget, ConsoleLine), "bad widget type: %r" % widget
assert isinstance(widget.content, GCodeContent), "bad widget content: %r" % widget.content
# initialize
self.gcode = gcode
self.widget = widget
def set_sent(self, value=True):
if self.widget:
self.widget.content.sent = value
def set_status(self, msg):
if self.widget:
self.widget.content.status = msg
def _normalized(self):
return re.sub(r'\(.*?\)|;.*|\s', '', self.gcode).upper()
def __str__(self):
"""
:return: str to be sent over serial (including newline)
"""
return self._normalized() + "\n"
def __len__(self):
return len(str(self))
def __bool__(self):
if self._normalized():
return True
return False
__nonzero__ = __bool__ # python 2.x compatability
DEFAULT_MAX_BUFFER = 128
RESPONSE_REGEX = re.compile(r'^(?P<keyword>(ok|error))', re.I)
def __init__(self, serial, max_buffer=None):
assert isinstance(serial, SerialPort), "bad serial type: %r" % serial
self.serial = serial
self.max_buffer = max_buffer if max_buffer is not None else self.DEFAULT_MAX_BUFFER
# --- Lines
# Description:
# a moving window buffer of GCodeStreamer.Line instances sent to GRBL.
# this moving window stretches between:
# [0] oldest sent gcode that has not yet received a GRBL response.
# once status is received, [0] is removed: self.sent_lines.pop(0)
# [-1] most recent gcode sent to GRBL device.
self.sent_lines = []
self.pending_lines = []
def is_valid_response(self, response_msg): # TODO: delete if not used
"""returns truthy: regex match if valid, None otherwise"""
return self.RESPONSE_REGEX.search(response_msg)
def process_response(self, response):
"""
A response from GRBL is assumed to be to gcode at self.sent_lines[0]
Once processed, the first of self.sent_lines is removed
:param response: str received from GRBL device
"""
match = self.RESPONSE_REGEX.search(response)
if match:
# Pop oldest line
line = self.sent_lines.pop(0)
line.set_status(response)
if match.group('keyword').lower() == 'error':
raise GCodeStreamException("error on gcode: '{gcode}' {msg}".format(
gcode=line.gcode,
msg=response
))
# Send next line (if possible)
self.poll_transmission()
else:
raise GCodeStreamException("unidentified message: %s" % response)
def can_send(self, line):
"""
Can the given line be transmitted?
:return: True if line will not push GRBL's buffer over it's limit
"""
assert isinstance(line, GCodeStreamer.Line)
return (self.used_buffer + len(line)) <= self.max_buffer
def send(self, line):
"""Add to pending lines, then poll transmission (once)"""
assert isinstance(line, GCodeStreamer.Line)
self.pending_lines.append(line)
self.poll_transmission()
def _transmit(self, line):
assert isinstance(line, GCodeStreamer.Line)
self.serial.write(str(line)) # Send to GRBL device
def poll_transmission(self):
"""
Send next line if there's enough room in GRBL's buffer.
:return: True if there's data to transmit, False if it's all been sent
"""
if not self.pending_lines:
return False
elif self.can_send(self.pending_lines[0]):
line = self.pending_lines.pop(0)
self._transmit(line)
self.sent_lines.append(line) # Add to line buffer
line.set_sent()
return True
@property
def finished(self):
if self.sent_lines or self.pending_lines:
return False
return True
@property
def used_buffer(self):
return sum(len(l) for l in self.sent_lines)
@property
def pending_count(self):
return len(self.pending_lines)
# ---------------- Arduino Util ---------------------
def arduino_comports():
"""
List of serial comports serial numbers of Arduino boards connected to this machine
:return: generator of comports connected to arduino
"""
#for comport in arduino_comports():
# comport.serial_number # '55639303235351C071B0'
# comport.device # '/def/ttyACM0'
from serial.tools.list_ports_posix import comports
arduino_manufacturer_regex = re.compile(r'arduino', re.IGNORECASE) # simple, because I've only got one to test on
for comport in comports():
if comport.manufacturer:
match = arduino_manufacturer_regex.search(comport.manufacturer)
if match:
yield comport
def device_type(value):
# argparse type
def _safe_comport_wrapper(key):
# define comports list
try:
comports = [cp for cp in arduino_comports() if key(cp)]
except Exception:
print("ERROR: could not utilise serial library to find connected Arduino "
"(for given device: %s) just try the serial device (eg: /dev/ttyACM0)" % value)
raise
if len(comports) == 1:
# exchange serial number for serial device Arduino is connected to
return comports[0]
else:
raise argparse.ArgumentTypeError("could not find Arduino from '%s', just try the serial device (eg: /dev/ttyACM0)" % value)
if value is None:
comport = _safe_comport_wrapper(lambda cp: True) # all
value = comport.device
elif re.search(r'^[0-9a-f]{15,25}$', value, re.IGNORECASE):
comport = _safe_comport_wrapper(lambda cp: cp.serial_number.upper() == value.upper())
value = comport.device
return value
# ---------------- Parse Arguments -------------------
# Defaults
DEFAULT_BAUDRATE = 115200
DEFAULT_BUFFERSIZE = GCodeStreamer.DEFAULT_MAX_BUFFER
DEFAULT_POLL_INTERVAL = 0.2 # seconds
DEFAULT_PENDING_COUNT = 5
DEFAULT_END_SLEEP = 1
parser = argparse.ArgumentParser(
description="GRBL gcode streamer for CNC machine. Assist jogging to "
"position, then stream gcode via serial.",
)
parser.add_argument(
'infile', help="gcode file to stream",
)
parser.add_argument(
'--quiet', '-q',
action='store_const', const=True, default=False,
help="if quiet, help messages won't be printed",
)
# Serial Connection
group = parser.add_argument_group("Serial Connectivity")
group.add_argument(
'-d', '--device', type=device_type, default=None,
help="serial device GRBL is connected to, can use direct device name "
"(eg: /dev/ttyACM0) or the Arduino's serial number (eg: 55639303235351C071B0)"
)
group.add_argument(
'-b', '--baudrate', type=int, default=DEFAULT_BAUDRATE,
help="serial baud rate (default: %i)" % DEFAULT_BAUDRATE
)
# Debugging
group = parser.add_argument_group("Debug Parameters")
group.add_argument(
'--disable-status', dest='status_poll',
action='store_const', const=False, default=True,
help="disables '?' being sent every '--poll-interval'",
)
group.add_argument(
'--poll-interval', dest='poll_interval', type=float, default=DEFAULT_POLL_INTERVAL,
help="frequency of sending '?' for status report when status is enabled",
)
group.add_argument(
'--buffer-size', dest='buffer_size', type=int, default=DEFAULT_BUFFERSIZE,
help="GRBL internal serial buffer size",
)
group.add_argument(
'--serial-log', dest='serial_log', default=None, metavar="LOG_FILE",
help="if given, data read from, and written to serial port is logged here "
"(note: \\r and \\n characters are escaped for debugging purposes)",
)
group.add_argument(
'--pending-count', dest='pending_count', default=DEFAULT_PENDING_COUNT,
help="number of gcode lines to display ahead of them being sent",
)
group.add_argument(
'--keep-open', '-o', dest='no_close',
action='store_const', const=True, default=False,
help="if set, window won't close when job is done",
)
args = parser.parse_args()
if args.device is None:
args.device = device_type(args.device) # FIXME: there has to be a way to do this native to argparse
# ----------------- Curses Shi.. stuff -----------------
def curses_wrap(screen):
# ----------------- Initialise widgets 'n' stuff -----------------
# Clear screen
screen.clear()
screen.refresh()
def keypress():
k = None
try:
k = screen.getkey()
except curses.error as e:
pass # raised if no key event is buffered
# (because that's elegant... hmm)
return k
status = Status(screen=screen)
init = InitWindow(
screen=screen,
title='Initialize Device: {}'.format(args.device),
min_height=1,
)
jogging = JoggingWindow(
screen=screen,
title='Jogging',
min_height=1,
)
stream = StreamWindow(
screen=screen,
title='Stream: {}'.format(args.infile),
min_height=min(6, args.pending_count + 1),
)
accordion = AccordionWindowManager(
screen=screen,
windows=[init, jogging, stream],
header_height=status.window.getmaxyx()[0],
)
# dump first few lines into stream window
with open(args.infile, 'r') as fh:
for i in range(args.pending_count):
line = fh.readline().rstrip()
stream.add_line(line)
stream.add_line('... [truncated]')
# ----------------- Virtual Machine -----------------
machine = NullMachine()
# Detect & Set Machine's Mode from GRBL text
machine_mode_regex = re.compile(r'^\s*\[GC:\s*(?P<gcode>[^\]]+)\]\s*$', re.I)
def machine_set_mode(mode_match):
machine.set_mode(*text2gcodes(mode_match.group('gcode')))
# Detect & Set Machine's State from GRBL text
machine_state_regex = re.compile(r'^\s*<(?P<state>[^\>\<]*)>\s*$')
def machine_set_state(state_match):
# <Idle|MPos:0.000,0.000,0.000|FS:0,0|WCO:-8.393,100.000,2.063>
feed_rate = None # float
spindle = None # float
abs_pos = None # Position
work_pos = None # Position
work_offset = None # Position
coords = lambda s: [float(x) for x in s[s.find(':')+1:].split(',')]
pos = lambda s: machine.Position(**dict(zip('XYZ', coords(s))))
for (i, state_str) in enumerate(state_match.group('state').split('|')):
if i == 0:
status.set_status(state_str)
elif state_str.startswith('MPos:'): # machine.abs_pos
abs_pos = pos(state_str)
elif state_str.startswith('WPos:'): # machine.pos
work_pos = pos(state_str)
elif state_str.startswith('F:'):
(feed_rate,) = coords(state_str)
elif state_str.startswith('FS:'):
(feed_rate, spindle) = coords(state_str)
elif state_str.startswith('WCO:'): # machine.state.coord_sys.offset
work_offset = pos(state_str)
# Update Machine Axes
if work_offset is not None:
machine.state.coord_sys.offset = work_offset # done first
if abs_pos is not None:
machine.abs_pos = abs_pos
if work_pos is not None:
machine.pos = work_pos
# Update Status Display
status.widgets['MPosX'].value = machine.abs_pos.X
status.widgets['MPosY'].value = machine.abs_pos.Y
status.widgets['MPosZ'].value = machine.abs_pos.Z
status.widgets['WPosX'].value = machine.pos.X
status.widgets['WPosY'].value = machine.pos.Y
status.widgets['WPosZ'].value = machine.pos.Z
if feed_rate is not None:
status.widgets['feed_rate'].text = "%g" % feed_rate
if spindle is not None:
spindle_str = ('%g (rpm)' % spindle) if spindle else 'off'
status.widgets['spindle'].text = spindle_str
status.refresh()
# ----------------- Initialize Serial -----------------
# Initialize & Wake up grbl
serialport = SerialPort(args.device, args.baudrate, args.serial_log)
serialport.write("\r\n\r\n")
# Wait for grbl to initialize and flush startup text in serial input
accordion.focus = init
init_complete = False
for line in serialport.readlines(timeout=5):
line = line.strip()
if line and (line != 'ok'):
init.add_line(line)
# Process line
mode_match = machine_mode_regex.search(line)
state_match = machine_state_regex.search(line)
if re.search(r'^GRBL', line, re.I):
# Request: Mode - Request Machine's Mode
serialport.write('$G\n')
elif mode_match:
# Received: Mode - Machine's Mode received, pass to virtual machine
machine_set_mode(mode_match)
# Request: State
serialport.write('?')
elif state_match:
# Received: State
machine_set_state(state_match)
init_complete = True
if init_complete:
break
if init_complete:
serialport.serial.flushInput()
else:
raise RuntimeError("Could not initialize GRBL serial interface")
# Machine state initialized, remember it.
# machine's mode may be altered while jogging.
# machine's mode is duplicated so it may be reverted after jogging
init_machine = copy.copy(machine)
# Start period polling of status
poll_daemon_keepalive = True
poll_daemon_thread = None
if args.poll_interval:
def _poll_daemon_runtime():
while poll_daemon_keepalive:
serialport.write('?')
time.sleep(args.poll_interval)
poll_daemon_thread = threading.Thread(target=_poll_daemon_runtime)
poll_daemon_thread.daemon = True
poll_daemon_thread.start()
# Serial Polling Process
message_regex = re.compile(r'^\s*\[(?P<msg>.*)\]\s*$')
def poll_serial(timeout, callback=None):
for line in serialport.readlines(timeout=0.05):
line = line.rstrip('\r\n').lstrip('\r\n')
state_match = machine_state_regex.search(line)
message_match = message_regex.search(line)
#jogging.add_line("%s : %s" % (line, "state" if state_match else "dunno"))
if state_match: # Received: State
machine_set_state(state_match)
elif message_match:
# TODO: how to display messages
pass
else:
streamer.process_response(line)
if callback:
callback()
def _poll_callback_jogging():
jogging.render()
jogging.refresh()
def _poll_callback_streaming():
jogging.render()
jogging.refresh()
stream.render()
stream.refresh()
# Connect GCode Streamer
streamer = GCodeStreamer(serialport, args.buffer_size)
def send_gcode(gcode, window):
widget = window.add_line(str(gcode))
line = GCodeStreamer.Line(str(gcode), widget)
if line: # don't send blank lines
streamer.send(line)
# ----------------- Interactive Jogging -----------------
if not args.quiet:
help_lines = [
"; Jogging Keys",
"; - Arrow Keys: X/Y axes",
"; - PgUp/PgDn: Z axis",
"; - []: increase/decrease jog distance",
"; - Space: zero work offsets (all axes)",
"; - x,y,z: zero individual axis",
"; - Enter: start streaming gcode",
]
for line in help_lines:
jogging.add_line(line)
accordion.focus = jogging
screen.nodelay(True) # non-blocking .getkey() behaviour
while True:
k = keypress()
if k is None:
pass # nothing pressed; do nothing
else:
# Process key-press...
if k in ('q', 'Q'):
break
elif k == '?':
serialport.write('?')
# Jogging Keys
elif k in ["KEY_RIGHT", "KEY_LEFT", "KEY_UP", "KEY_DOWN", "KEY_PPAGE", "KEY_NPAGE"]:
# put machine into incremental mode (if it's not already)
if not isinstance(machine.mode.distance, GCodeIncrementalDistanceMode):
dist_mode = GCodeIncrementalDistanceMode()
send_gcode(dist_mode, jogging)
machine.process_gcodes(dist_mode) # changes machine's mode
if k == "KEY_RIGHT":
send_gcode(GCodeRapidMove(X=status.widgets['jog'].value), jogging)
status.widgets['X+'].flash()
elif k == "KEY_LEFT":
send_gcode(GCodeRapidMove(X=-status.widgets['jog'].value), jogging)
status.widgets['X-'].flash()
elif k == "KEY_UP":
send_gcode(GCodeRapidMove(Y=status.widgets['jog'].value), jogging)
status.widgets['Y+'].flash()
elif k == "KEY_DOWN":
send_gcode(GCodeRapidMove(Y=-status.widgets['jog'].value), jogging)
status.widgets['Y-'].flash()
elif k == "KEY_PPAGE":
send_gcode(GCodeRapidMove(Z=status.widgets['jog'].value), jogging)
status.widgets['Z+'].flash()
elif k == "KEY_NPAGE":
send_gcode(GCodeRapidMove(Z=-status.widgets['jog'].value), jogging)
status.widgets['Z-'].flash()
# Jogging Increment (up/down)
elif k in '[],.':
status.widgets['jog'].value = jog_value(
status.widgets['jog'].value,
{'[': 1, ',': 1, ']': -1, '.': -1}[k]
)
elif k == ' ':
# Zero WPos coordinates
send_gcode(text2gcodes('G10 L20 P1 X0 Y0 Z0')[0], jogging)
elif k == '\n':
# Jogging complete, begin stream
break
elif k == 'KEY_RESIZE':
# dummy keypress on console window resize
accordion.update_distrobution()
status.refresh()
# spend 50ms processing received serial lines
# then loop back up to process another key
poll_serial(0.05, _poll_callback_jogging)
streamer.poll_transmission()
# Revert machine's state after jogging
mode_keys = set()
mode_keys |= set(init_machine.mode.modal_groups.keys())
mode_keys |= set(machine.mode.modal_groups.keys())
for key in mode_keys:
initial = init_machine.mode.modal_groups[key]
current = machine.mode.modal_groups[key]
if initial != current:
send_gcode(initial, jogging)
# ----------------- File Streaming -----------------
stream.clear()
accordion.focus = stream
def _check_resize():
if keypress() == 'KEY_RESIZE':
accordion.update_distrobution()
with open(args.infile, 'r') as gcode_file:
gcode_file_moredata = True
while gcode_file_moredata:
# Push pending lines into streamer
while streamer.pending_count < args.pending_count:
line_data = gcode_file.readline()
if not line_data:
gcode_file_moredata = False
break # file's done
send_gcode(line_data.strip(), stream)
# process serial packets, then try again
poll_serial(0.05, _poll_callback_streaming)
_check_resize()
# streamer is still:
# - sending gcodes to GRBL
# - processing GRBL's responses in confirmation of those codes
while not streamer.finished:
# process serial packets, then try again
poll_serial(0.05, _poll_callback_streaming)
_check_resize()
# Machine is still working, wait 'till it's Idle
while not status.is_idle:
# process serial packets, then try again
poll_serial(0.05, _poll_callback_streaming)
_check_resize()
# ---- Kill status polling daemon
poll_daemon_keepalive = False # kills polling daemon (if running)
poll_daemon_thread.join() # blocks until daemon is complete
serialport.serial.flushInput()
while args.no_close:
if keypress() == 'q':
break
time.sleep(0.1)
curses.wrapper(curses_wrap)