From cc281f7860f84f58505ee7bc3cce7baeea0d8b42 Mon Sep 17 00:00:00 2001 From: Peter Boin Date: Mon, 14 Aug 2017 22:44:35 +1000 Subject: [PATCH] grbl-stream moved to it's own repo' --- scripts/grbl-stream | 1088 ----------------------------- setup.py | 1 - src/pygcode.egg-info/requires.txt | 1 + 3 files changed, 1 insertion(+), 1089 deletions(-) delete mode 100755 scripts/grbl-stream diff --git a/scripts/grbl-stream b/scripts/grbl-stream deleted file mode 100755 index 393a2fc..0000000 --- a/scripts/grbl-stream +++ /dev/null @@ -1,1088 +0,0 @@ -#!/usr/bin/env python - -import curses -import time -import six -import copy -import argparse -import re -import threading -import serial - -for pygcode_lib_type in ('installed_lib', 'relative_lib'): - try: - # pygcode - from pygcode import NullMachine - from pygcode import GCodeRapidMove - from pygcode import GCodeIncrementalDistanceMode - from pygcode import text2gcodes - - except ImportError: - import sys, os, inspect - # Add pygcode (relative to this test-path) to the system path - _this_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) - sys.path.insert(0, os.path.join(_this_path, '..', 'src')) - if pygcode_lib_type == 'installed_lib': - continue # import was attempted before sys.path addition. retry import - raise # otherwise the raised ImportError is a genuine problem - break - - -class Widget(object): - pass - - -class Label(Widget): - def __init__(self, window, row, col, len, text='', prefix=''): - self.window = window - self.row = row - self.col = col - self.len = len - self._text = text - self.prefix = prefix - self.render() - - def render(self): - text = self._text + (' ' * max(0, self.len - len(self._text))) - self.window.addstr(self.row, self.col, self.prefix + text) - - @property - def text(self): - return self._text - - @text.setter - def text(self, val): - self._text = val - self.render() - - -class NumberLabel(Widget): - def __init__(self, window, row, col, value=0.0): - self.window = window - self.row = row - self.col = col - self._value = value - self.render() - - def render(self): - val_text = "{:>8s}".format("{:.3f}".format(self.value)) - self.window.addstr(self.row, self.col, val_text) - - @property - def value(self): - return self._value - - @value.setter - def value(self, v): - self._value = v - self.render() - - -class Button(Widget): - def __init__(self, window, label, row, col): - self.window = window - self.label = label - self.row = row - self.col = col - - self.render() - - def flash(self): - self.render(strong=True) - self.window.refresh() - time.sleep(0.1) - self.render() - self.window.refresh() - - def render(self, strong=False): - addstr_params = [self.row, self.col, '[{}]'.format(self.label)] - if strong: - addstr_params.append(curses.A_BOLD) - self.window.addstr(*addstr_params) - - -class Banner(Widget): - label_col = 4 # text - - def __init__(self, window, label=None, row=0): - self.window = window - self._label = label - self.row = row - self.render() - - def render(self, strong=False): - (y, x) = self.window.getmaxyx() - label_str = " {} ".format(self._label) - hline_params = [0, 0, curses.ACS_HLINE, x] - addstr_params = [self.row, self.label_col, " {} ".format(self._label)] - if strong: - hline_params.append(curses.A_BOLD) - addstr_params.append(curses.A_BOLD) - - self.window.hline(*hline_params) - self.window.addstr(*addstr_params) - - @property - def label(self): - return self._label - - @label.setter - def label(self, value): - self._label = value - self.render() - - -class Status(object): - row = 0 - banner_prefix = 'Status: ' - - def __init__(self, screen): - self.screen = screen - (max_y, max_x) = self.screen.getmaxyx() - self.window = curses.newwin(5, max_x, self.row, 0) - self.status = '' - self.banner = Banner(self.window, self.banner_prefix) - - - # Print backdrop (to use as template) - backdrop = [ - 'Jog: [xxxx.yyy ] MPos WPos', - ' [Y+] [Z+] X |xxxx.yyy |xxxx.yyy | Feed Rate: ?', - '[X-] [X+] Y |xxxx.yyy |xxxx.yyy | Spindle: ?', - ' [Y-] [Z-] Z |xxxx.yyy |xxxx.yyy | ', - ] - for (row, line) in enumerate(backdrop): - self.window.addstr(row + 1, 0, line[:max_x]) - - # Widgets - self.widgets = { - 'X+': Button(self.window, 'X+', 3, 6), - 'X-': Button(self.window, 'X-', 3, 0), - 'Y+': Button(self.window, 'Y+', 2, 3), - 'Y-': Button(self.window, 'Y-', 4, 3), - 'Z+': Button(self.window, 'Z+', 2, 11), - 'Z-': Button(self.window, 'Z-', 4, 11), - 'jog': NumberLabel(self.window, 1, 6, 0.001), - 'MPosX': NumberLabel(self.window, 2, 21), - 'MPosY': NumberLabel(self.window, 3, 21), - 'MPosZ': NumberLabel(self.window, 4, 21), - 'WPosX': NumberLabel(self.window, 2, 31), - 'WPosY': NumberLabel(self.window, 3, 31), - 'WPosZ': NumberLabel(self.window, 4, 31), - 'feed_rate': Label(self.window, 2, 44, len=20, text='?', prefix='Feed Rate: '), - 'spindle': Label(self.window, 3, 44, len=20, text='?', prefix='Spindle: '), - } - - self.refresh() - - def set_status(self, status): - self.status = status - self.banner.label = "{prefix}{label}".format( - prefix=self.banner_prefix, - label=status, - ) - self.refresh() - - @property - def is_idle(self): - return self.status == 'Idle' - - def refresh(self): - self.window.refresh() - - -# Line types: -# gcode: <'>' if cur> > -# info: <'>' if cur> - -class GCodeContent(object): - def __init__(self, gcode, sent=False, status=''): - self.gcode = gcode - self.sent = sent - self.status = status - - def to_str(self, width): - gcode_w = max(0, width - 23) - return ("{gcode:<%i.%is} {sent} {status:<20s}" % (gcode_w, gcode_w)).format( - gcode=self.gcode, - sent='>' if self.sent else ' ', - status=self.status, - ) - -class ConsoleLine(object): - def __init__(self, window, content, cur=False): - self.window = window - self.content = content - self._cur = cur - self._width = 0 - self.update_width() - - def render(self, row): - line = '> ' if self._cur else ' ' - if isinstance(self.content, GCodeContent): - line += self.content.to_str(max(0, self._width - len(line))) - else: - line += str(self.content) - line = ("{:%i.%is}" % (self._width-1,self._width-1)).format(line) - self.window.addstr(row, 0, line) - - def update_width(self): - (_, self._width) = self.window.getmaxyx() - - @property - def cur(self): - return self._cur - - @cur.setter - def cur(self, value): - self._cur = value - - -class AccordionWindow(object): - """ - A region of the screen that can push those above and below it around. - eg: as an active accordion window's content grows, it can push the window - below it down, and make it smaller. - """ - - def __init__(self, screen, title, soft_height=2, min_height=0): - self.screen = screen - self.title = title - self.soft_height = soft_height - self.min_height = min_height - - self.focus = False - self.lines = [] # list of ConsoleLine instances (first are at the top) - - self.window = None - self.banner = None - self.add_line_callback = None - - def init_window(self, row, height): - self.window = curses.newwin(height, self.screen.getmaxyx()[1], row, 0) - self.banner = Banner(self.window, self.title) - - def _add_line(self, line): - i = len(self.lines) - self.lines.append(line) - if self.add_line_callback: - self.add_line_callback(self) - - def move_window(self, row, height): - self.window.resize(height, self.screen.getmaxyx()[1]) - self.window.mvwin(row, 0) - #self.window.box() - #self.window.addstr(0, 5, self.title) - - def render(self, active=False): - #self.window.clear() # FIXME: start fresh every render?, inefficient - self.banner.render(strong=active) - (rows, cols) = self.window.getmaxyx() - if rows > 1: # we have room to render lines - for (i, line) in enumerate(self.lines[-(rows - 1):]): - line.render(i + 1) - self.refresh() - - def __eq__(self, other): - return self.title == other.title - - def refresh(self): - self.window.refresh() - - def clear(self): - self.lines = [] - self.window.clear() - - -class AccordionWindowManager(object): - def __init__(self, screen, windows, header_height, footer_height=0): - - self.screen = screen - self.windows = windows - self.header_height = header_height - self.footer_height = footer_height - - self._focus_index = 0 - - self._log = [] - - # Initialize accordion window's curses.Window instance - def _add_line_cb(window): - self.update_distrobution() - window.render() - - cur_row = self.header_height - for (i, window) in enumerate(self.windows): - window.init_window(cur_row, window.soft_height) - window.refresh() - window.add_line_callback = _add_line_cb - cur_row += window.min_height - - @property - def focus(self): - return self.windows[self._focus_index] - - @focus.setter - def focus(self, value): - self._focus_index = self.windows.index(value) - self.update_distrobution() - - def update_distrobution(self): - # rows available across space - rows = self.screen.getmaxyx()[0] - (self.header_height + self.footer_height) - min_occupied = sum(w.min_height for w in self.windows) - available = rows - min_occupied - if rows - min_occupied < 3: - pass # TODO: not enough room to respect min_heights - # focussed window height - # - self.focus.min_height - # - len(self.focus.lines) + 1 - # - available + self.focus.min_height # max available space - extra_alocate = max(0, (len(self.focus.lines) + 1) - self.focus.min_height) - height_f = min(available, extra_alocate) + self.focus.min_height - available -= height_f - self.focus.min_height - - cur_row = self.header_height - for w in self.windows: - if w == self.focus: - height = height_f - else: - extra_alocate = max(0, (len(w.lines) + 1) - w.min_height) - height = min(available, extra_alocate) + w.min_height - available -= height - w.min_height - - w.move_window(cur_row, height) - w.render(active=True if w == self.focus else False) - cur_row += height - - self.refresh() - - def refresh(self): - for w in self.windows: - w.refresh() - - -class InitWindow(AccordionWindow): - def add_line(self, text): - obj = ConsoleLine(self.window, text) - self._add_line(obj) - return obj - - -class JoggingWindow(AccordionWindow): - def add_line(self, gcode, sent=False, status=''): - obj = ConsoleLine(self.window, GCodeContent( - gcode=gcode, sent=sent, status=status - )) - self._add_line(obj) - return obj - - -class StreamWindow(AccordionWindow): - def add_line(self, gcode, sent=False, status=''): - obj = ConsoleLine(self.window, GCodeContent( - gcode=gcode, sent=sent, status=status - )) - self._add_line(obj) - return obj - - -JOGGING_VALUES = [ - 0.001, 0.01, 0.1, 1, 10, 25, 50, 100, 250, 500 -] - -def jog_value(cur_val, dindex): - try: - i = JOGGING_VALUES.index(cur_val) - except ValueError: - i = 0 - i = max(0, min(i + dindex, len(JOGGING_VALUES) - 1)) - return JOGGING_VALUES[i] - - -# ---------------- Serial Utilities --------------------- - -class SerialPort(object): - def __init__(self, device, baudrate, logfilename=None): - self.device = device - self.baudrate = baudrate - self.serial = serial.Serial(self.device, self.baudrate) - self.logfilename = logfilename - self.log = None - - self._cur_line = '' # buffered line chars before '\n' received - - if self.logfilename: - self.log = open(self.logfilename, 'w') - - def __del__(self): - self.serial.close() - if self.log: - self.log.close() - - def _log_write(self, prefix, msg): - if self.log: - self.log.write("[{time:.2f}] {prefix} {msg}\n".format( - time=time.time(), - prefix=prefix, - msg=msg.replace('\n', r'\n').replace('\r', r'\r'), - )) - - def write(self, data): - self._log_write('>>', data) - self.serial.write(data) - - def readlines(self, timeout=None): - start_time = time.time() - orig_timeout = self.serial.timeout - - def _new_timeout(): - """Return linearly diminished timeout (with realtime)""" - if timeout is None: - return None - cur_time = time.time() - if cur_time - start_time >= timeout: - return 0.0 - return timeout - (cur_time - start_time) - - while True: - # Set read timeout - time_remaining = _new_timeout() - if time_remaining == 0: - break - self.serial.timeout = time_remaining - - # read - # FIXME: char by char (a bit clunky, but easy to write) - try: - received_chr = self.serial.read() - except serial.serialutil.SerialException: - continue # terminal resize interrupts serial read - self._cur_line += received_chr - if self._cur_line.endswith('\n'): - self._log_write('<<', self._cur_line) - received_line = re.sub(r'\r?\n$', '', self._cur_line) - self._cur_line = '' - yield received_line - - self.serial.timeout = orig_timeout - - -class GCodeStreamException(Exception): - """Raised when GRBL responds with error""" - pass - - -class GCodeStreamer(object): - # - keep track of GRBL buffer size - # - transmit to GRBL - # - read GRLB responses and delegate accordingly - - class Line(object): - # - handle status & screen update - # - new - # - sent - # - received (status string) (raise exception if bad - # - send - # - publish on screen - # - set status - # - publish status on screen - # - report bad status back to streamer - def __init__(self, gcode, widget=None): - # verify parameter(s) - if widget is not None: - assert isinstance(widget, ConsoleLine), "bad widget type: %r" % widget - assert isinstance(widget.content, GCodeContent), "bad widget content: %r" % widget.content - - # initialize - self.gcode = gcode - self.widget = widget - - - def set_sent(self, value=True): - if self.widget: - self.widget.content.sent = value - - def set_status(self, msg): - if self.widget: - self.widget.content.status = msg - - def _normalized(self): - return re.sub(r'\(.*?\)|;.*|\s', '', self.gcode).upper() - - def __str__(self): - """ - :return: str to be sent over serial (including newline) - """ - return self._normalized() + "\n" - - def __len__(self): - return len(str(self)) - - def __bool__(self): - if self._normalized(): - return True - return False - - __nonzero__ = __bool__ # python 2.x compatability - - - DEFAULT_MAX_BUFFER = 128 - RESPONSE_REGEX = re.compile(r'^(?P(ok|error))', re.I) - - def __init__(self, serial, max_buffer=None): - assert isinstance(serial, SerialPort), "bad serial type: %r" % serial - self.serial = serial - self.max_buffer = max_buffer if max_buffer is not None else self.DEFAULT_MAX_BUFFER - - # --- Lines - # Description: - # a moving window buffer of GCodeStreamer.Line instances sent to GRBL. - # this moving window stretches between: - # [0] oldest sent gcode that has not yet received a GRBL response. - # once status is received, [0] is removed: self.sent_lines.pop(0) - # [-1] most recent gcode sent to GRBL device. - self.sent_lines = [] - self.pending_lines = [] - - def is_valid_response(self, response_msg): # TODO: delete if not used - """returns truthy: regex match if valid, None otherwise""" - return self.RESPONSE_REGEX.search(response_msg) - - def process_response(self, response): - """ - A response from GRBL is assumed to be to gcode at self.sent_lines[0] - Once processed, the first of self.sent_lines is removed - :param response: str received from GRBL device - """ - match = self.RESPONSE_REGEX.search(response) - if match: - # Pop oldest line - line = self.sent_lines.pop(0) - line.set_status(response) - if match.group('keyword').lower() == 'error': - raise GCodeStreamException("error on gcode: '{gcode}' {msg}".format( - gcode=line.gcode, - msg=response - )) - - # Send next line (if possible) - self.poll_transmission() - else: - raise GCodeStreamException("unidentified message: %s" % response) - - def can_send(self, line): - """ - Can the given line be transmitted? - :return: True if line will not push GRBL's buffer over it's limit - """ - assert isinstance(line, GCodeStreamer.Line) - return (self.used_buffer + len(line)) <= self.max_buffer - - def send(self, line): - """Add to pending lines, then poll transmission (once)""" - assert isinstance(line, GCodeStreamer.Line) - self.pending_lines.append(line) - self.poll_transmission() - - def _transmit(self, line): - assert isinstance(line, GCodeStreamer.Line) - self.serial.write(str(line)) # Send to GRBL device - - def poll_transmission(self): - """ - Send next line if there's enough room in GRBL's buffer. - :return: True if there's data to transmit, False if it's all been sent - """ - if not self.pending_lines: - return False - elif self.can_send(self.pending_lines[0]): - line = self.pending_lines.pop(0) - self._transmit(line) - self.sent_lines.append(line) # Add to line buffer - line.set_sent() - return True - - @property - def finished(self): - if self.sent_lines or self.pending_lines: - return False - return True - - @property - def used_buffer(self): - return sum(len(l) for l in self.sent_lines) - - @property - def pending_count(self): - return len(self.pending_lines) - - -# ---------------- Arduino Util --------------------- -def arduino_comports(): - """ - List of serial comports serial numbers of Arduino boards connected to this machine - :return: generator of comports connected to arduino - """ - #for comport in arduino_comports(): - # comport.serial_number # '55639303235351C071B0' - # comport.device # '/def/ttyACM0' - from serial.tools.list_ports_posix import comports - - arduino_manufacturer_regex = re.compile(r'arduino', re.IGNORECASE) # simple, because I've only got one to test on - for comport in comports(): - if comport.manufacturer: - match = arduino_manufacturer_regex.search(comport.manufacturer) - if match: - yield comport - - -def device_type(value): - # argparse type - - def _safe_comport_wrapper(key): - # define comports list - try: - comports = [cp for cp in arduino_comports() if key(cp)] - except Exception: - print("ERROR: could not utilise serial library to find connected Arduino " - "(for given device: %s) just try the serial device (eg: /dev/ttyACM0)" % value) - raise - if len(comports) == 1: - # exchange serial number for serial device Arduino is connected to - return comports[0] - else: - raise argparse.ArgumentTypeError("could not find Arduino from '%s', just try the serial device (eg: /dev/ttyACM0)" % value) - - if value is None: - comport = _safe_comport_wrapper(lambda cp: True) # all - value = comport.device - elif re.search(r'^[0-9a-f]{15,25}$', value, re.IGNORECASE): - comport = _safe_comport_wrapper(lambda cp: cp.serial_number.upper() == value.upper()) - value = comport.device - - return value - - -# ---------------- Parse Arguments ------------------- -# Defaults -DEFAULT_BAUDRATE = 115200 -DEFAULT_BUFFERSIZE = GCodeStreamer.DEFAULT_MAX_BUFFER -DEFAULT_POLL_INTERVAL = 0.2 # seconds -DEFAULT_PENDING_COUNT = 5 -DEFAULT_END_SLEEP = 1 - -parser = argparse.ArgumentParser( - description="GRBL gcode streamer for CNC machine. Assist jogging to " - "position, then stream gcode via serial.", -) - -parser.add_argument( - 'infile', help="gcode file to stream", -) - -parser.add_argument( - '--quiet', '-q', - action='store_const', const=True, default=False, - help="if quiet, help messages won't be printed", -) - -# Serial Connection -group = parser.add_argument_group("Serial Connectivity") -group.add_argument( - '-d', '--device', type=device_type, default=None, - help="serial device GRBL is connected to, can use direct device name " - "(eg: /dev/ttyACM0) or the Arduino's serial number (eg: 55639303235351C071B0)" -) -group.add_argument( - '-b', '--baudrate', type=int, default=DEFAULT_BAUDRATE, - help="serial baud rate (default: %i)" % DEFAULT_BAUDRATE -) - -# Debugging -group = parser.add_argument_group("Debug Parameters") -group.add_argument( - '--disable-status', dest='status_poll', - action='store_const', const=False, default=True, - help="disables '?' being sent every '--poll-interval'", -) -group.add_argument( - '--poll-interval', dest='poll_interval', type=float, default=DEFAULT_POLL_INTERVAL, - help="frequency of sending '?' for status report when status is enabled", -) -group.add_argument( - '--buffer-size', dest='buffer_size', type=int, default=DEFAULT_BUFFERSIZE, - help="GRBL internal serial buffer size", -) -group.add_argument( - '--serial-log', dest='serial_log', default=None, metavar="LOG_FILE", - help="if given, data read from, and written to serial port is logged here " - "(note: \\r and \\n characters are escaped for debugging purposes)", -) -group.add_argument( - '--pending-count', dest='pending_count', default=DEFAULT_PENDING_COUNT, - help="number of gcode lines to display ahead of them being sent", -) -group.add_argument( - '--keep-open', '-o', dest='no_close', - action='store_const', const=True, default=False, - help="if set, window won't close when job is done", -) - -args = parser.parse_args() -if args.device is None: - args.device = device_type(args.device) # FIXME: there has to be a way to do this native to argparse - -# ----------------- Curses Shi.. stuff ----------------- -def curses_wrap(screen): - - # ----------------- Initialise widgets 'n' stuff ----------------- - # Clear screen - screen.clear() - screen.refresh() - - def keypress(): - k = None - try: - k = screen.getkey() - except curses.error as e: - pass # raised if no key event is buffered - # (because that's elegant... hmm) - return k - - status = Status(screen=screen) - init = InitWindow( - screen=screen, - title='Initialize Device: {}'.format(args.device), - min_height=1, - ) - jogging = JoggingWindow( - screen=screen, - title='Jogging', - min_height=1, - ) - stream = StreamWindow( - screen=screen, - title='Stream: {}'.format(args.infile), - min_height=min(6, args.pending_count + 1), - ) - accordion = AccordionWindowManager( - screen=screen, - windows=[init, jogging, stream], - header_height=status.window.getmaxyx()[0], - ) - - # dump first few lines into stream window - with open(args.infile, 'r') as fh: - for i in range(args.pending_count): - line = fh.readline().rstrip() - stream.add_line(line) - stream.add_line('... [truncated]') - - - # ----------------- Virtual Machine ----------------- - machine = NullMachine() - - # Detect & Set Machine's Mode from GRBL text - machine_mode_regex = re.compile(r'^\s*\[GC:\s*(?P[^\]]+)\]\s*$', re.I) - def machine_set_mode(mode_match): - machine.set_mode(*text2gcodes(mode_match.group('gcode'))) - - # Detect & Set Machine's State from GRBL text - machine_state_regex = re.compile(r'^\s*<(?P[^\>\<]*)>\s*$') - def machine_set_state(state_match): - # - feed_rate = None # float - spindle = None # float - abs_pos = None # Position - work_pos = None # Position - work_offset = None # Position - - coords = lambda s: [float(x) for x in s[s.find(':')+1:].split(',')] - pos = lambda s: machine.Position(**dict(zip('XYZ', coords(s)))) - for (i, state_str) in enumerate(state_match.group('state').split('|')): - if i == 0: - status.set_status(state_str) - elif state_str.startswith('MPos:'): # machine.abs_pos - abs_pos = pos(state_str) - elif state_str.startswith('WPos:'): # machine.pos - work_pos = pos(state_str) - elif state_str.startswith('F:'): - (feed_rate,) = coords(state_str) - elif state_str.startswith('FS:'): - (feed_rate, spindle) = coords(state_str) - elif state_str.startswith('WCO:'): # machine.state.coord_sys.offset - work_offset = pos(state_str) - - # Update Machine Axes - if work_offset is not None: - machine.state.coord_sys.offset = work_offset # done first - if abs_pos is not None: - machine.abs_pos = abs_pos - if work_pos is not None: - machine.pos = work_pos - - # Update Status Display - status.widgets['MPosX'].value = machine.abs_pos.X - status.widgets['MPosY'].value = machine.abs_pos.Y - status.widgets['MPosZ'].value = machine.abs_pos.Z - status.widgets['WPosX'].value = machine.pos.X - status.widgets['WPosY'].value = machine.pos.Y - status.widgets['WPosZ'].value = machine.pos.Z - if feed_rate is not None: - status.widgets['feed_rate'].text = "%g" % feed_rate - if spindle is not None: - spindle_str = ('%g (rpm)' % spindle) if spindle else 'off' - status.widgets['spindle'].text = spindle_str - - status.refresh() - - - # ----------------- Initialize Serial ----------------- - - # Initialize & Wake up grbl - serialport = SerialPort(args.device, args.baudrate, args.serial_log) - serialport.write("\r\n\r\n") - - # Wait for grbl to initialize and flush startup text in serial input - accordion.focus = init - init_complete = False - for line in serialport.readlines(timeout=5): - line = line.strip() - if line and (line != 'ok'): - init.add_line(line) - - # Process line - mode_match = machine_mode_regex.search(line) - state_match = machine_state_regex.search(line) - if re.search(r'^GRBL', line, re.I): - # Request: Mode - Request Machine's Mode - serialport.write('$G\n') - elif mode_match: - # Received: Mode - Machine's Mode received, pass to virtual machine - machine_set_mode(mode_match) - # Request: State - serialport.write('?') - elif state_match: - # Received: State - machine_set_state(state_match) - init_complete = True - if init_complete: - break - - if init_complete: - serialport.serial.flushInput() - else: - raise RuntimeError("Could not initialize GRBL serial interface") - - # Machine state initialized, remember it. - # machine's mode may be altered while jogging. - # machine's mode is duplicated so it may be reverted after jogging - init_machine = copy.copy(machine) - - # Start period polling of status - poll_daemon_keepalive = True - poll_daemon_thread = None - if args.poll_interval: - def _poll_daemon_runtime(): - while poll_daemon_keepalive: - serialport.write('?') - time.sleep(args.poll_interval) - poll_daemon_thread = threading.Thread(target=_poll_daemon_runtime) - poll_daemon_thread.daemon = True - poll_daemon_thread.start() - - - # Serial Polling Process - message_regex = re.compile(r'^\s*\[(?P.*)\]\s*$') - def poll_serial(timeout, callback=None): - for line in serialport.readlines(timeout=0.05): - line = line.rstrip('\r\n').lstrip('\r\n') - state_match = machine_state_regex.search(line) - message_match = message_regex.search(line) - #jogging.add_line("%s : %s" % (line, "state" if state_match else "dunno")) - if state_match: # Received: State - machine_set_state(state_match) - elif message_match: - # TODO: how to display messages - pass - else: - streamer.process_response(line) - if callback: - callback() - - def _poll_callback_jogging(): - jogging.render() - jogging.refresh() - - def _poll_callback_streaming(): - jogging.render() - jogging.refresh() - stream.render() - stream.refresh() - - - # Connect GCode Streamer - streamer = GCodeStreamer(serialport, args.buffer_size) - - def send_gcode(gcode, window): - widget = window.add_line(str(gcode)) - line = GCodeStreamer.Line(str(gcode), widget) - if line: # don't send blank lines - streamer.send(line) - - # ----------------- Interactive Jogging ----------------- - if not args.quiet: - help_lines = [ - "; Jogging Keys", - "; - Arrow Keys: X/Y axes", - "; - PgUp/PgDn: Z axis", - "; - []: increase/decrease jog distance", - "; - Space: zero work offsets (all axes)", - "; - x,y,z: zero individual axis", - "; - Enter: start streaming gcode", - ] - for line in help_lines: - jogging.add_line(line) - accordion.focus = jogging - - screen.nodelay(True) # non-blocking .getkey() behaviour - - while True: - k = keypress() - - if k is None: - pass # nothing pressed; do nothing - else: - # Process key-press... - if k in ('q', 'Q'): - break - elif k == '?': - serialport.write('?') - - # Jogging Keys - elif k in ["KEY_RIGHT", "KEY_LEFT", "KEY_UP", "KEY_DOWN", "KEY_PPAGE", "KEY_NPAGE"]: - # put machine into incremental mode (if it's not already) - if not isinstance(machine.mode.distance, GCodeIncrementalDistanceMode): - dist_mode = GCodeIncrementalDistanceMode() - send_gcode(dist_mode, jogging) - machine.process_gcodes(dist_mode) # changes machine's mode - - if k == "KEY_RIGHT": - send_gcode(GCodeRapidMove(X=status.widgets['jog'].value), jogging) - status.widgets['X+'].flash() - elif k == "KEY_LEFT": - send_gcode(GCodeRapidMove(X=-status.widgets['jog'].value), jogging) - status.widgets['X-'].flash() - elif k == "KEY_UP": - send_gcode(GCodeRapidMove(Y=status.widgets['jog'].value), jogging) - status.widgets['Y+'].flash() - elif k == "KEY_DOWN": - send_gcode(GCodeRapidMove(Y=-status.widgets['jog'].value), jogging) - status.widgets['Y-'].flash() - elif k == "KEY_PPAGE": - send_gcode(GCodeRapidMove(Z=status.widgets['jog'].value), jogging) - status.widgets['Z+'].flash() - elif k == "KEY_NPAGE": - send_gcode(GCodeRapidMove(Z=-status.widgets['jog'].value), jogging) - status.widgets['Z-'].flash() - - # Jogging Increment (up/down) - elif k in '[],.': - status.widgets['jog'].value = jog_value( - status.widgets['jog'].value, - {'[': 1, ',': 1, ']': -1, '.': -1}[k] - ) - - # Zeroing Axes Offsets - elif k == ' ': - # Zero WPos coordinates - send_gcode(text2gcodes('G10 L20 P1 X0 Y0 Z0')[0], jogging) - elif k in 'xX': - send_gcode(text2gcodes('G10 L20 P1 X0')[0], jogging) - elif k in 'yY': - send_gcode(text2gcodes('G10 L20 P1 Y0')[0], jogging) - elif k in 'zZ': - send_gcode(text2gcodes('G10 L20 P1 Z0')[0], jogging) - - - # Jogging complete, begin stream - elif k == '\n': - break - - # dummy keypress on console window resize - elif k == 'KEY_RESIZE': - accordion.update_distrobution() - - status.refresh() - - # spend 50ms processing received serial lines - # then loop back up to process another key - poll_serial(0.05, _poll_callback_jogging) - - streamer.poll_transmission() - - # Revert machine's state after jogging - mode_keys = set() - mode_keys |= set(init_machine.mode.modal_groups.keys()) - mode_keys |= set(machine.mode.modal_groups.keys()) - - for key in mode_keys: - initial = init_machine.mode.modal_groups[key] - current = machine.mode.modal_groups[key] - if initial != current: - send_gcode(initial, jogging) - - - # ----------------- File Streaming ----------------- - stream.clear() - accordion.focus = stream - - def _check_resize(): - if keypress() == 'KEY_RESIZE': - accordion.update_distrobution() - - - with open(args.infile, 'r') as gcode_file: - gcode_file_moredata = True - while gcode_file_moredata: - # Push pending lines into streamer - while streamer.pending_count < args.pending_count: - line_data = gcode_file.readline() - if not line_data: - gcode_file_moredata = False - break # file's done - - send_gcode(line_data.strip(), stream) - - # process serial packets, then try again - poll_serial(0.05, _poll_callback_streaming) - - _check_resize() - - # streamer is still: - # - sending gcodes to GRBL - # - processing GRBL's responses in confirmation of those codes - while not streamer.finished: - # process serial packets, then try again - poll_serial(0.05, _poll_callback_streaming) - _check_resize() - - # Machine is still working, wait 'till it's Idle - while not status.is_idle: - # process serial packets, then try again - poll_serial(0.05, _poll_callback_streaming) - _check_resize() - - - # ---- Kill status polling daemon - poll_daemon_keepalive = False # kills polling daemon (if running) - poll_daemon_thread.join() # blocks until daemon is complete - serialport.serial.flushInput() - - while args.no_close: - if keypress() == 'q': - break - time.sleep(0.1) - -curses.wrapper(curses_wrap) diff --git a/setup.py b/setup.py index a17370d..ca02036 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,6 @@ CLASSIFIERS = [ INSTALL_REQUIRES = [ 'argparse', # Python command-line parsing library 'euclid3', # 2D and 3D vector, matrix, quaternion and geometry module. - 'pyserial', # Python Serial Port Extension 'six', # Python 2 and 3 compatibility utilities ] SCRIPTS = [ diff --git a/src/pygcode.egg-info/requires.txt b/src/pygcode.egg-info/requires.txt index e0978f0..23939ad 100644 --- a/src/pygcode.egg-info/requires.txt +++ b/src/pygcode.egg-info/requires.txt @@ -1,3 +1,4 @@ argparse euclid3 +pyserial six