diff options
Diffstat (limited to 'klippy/console.py')
-rwxr-xr-x | klippy/console.py | 148 |
1 files changed, 98 insertions, 50 deletions
diff --git a/klippy/console.py b/klippy/console.py index 0a20e09e..3ad6e4eb 100755 --- a/klippy/console.py +++ b/klippy/console.py @@ -28,7 +28,8 @@ help_txt = """ freq : The mcu clock frequency """ -re_eval = re.compile(r'\{(?P<eval>[^}]*)\}') +re_eval = re.compile(r"\{(?P<eval>[^}]*)\}") + class KeyboardReader: def __init__(self, reactor, serialport, baud, canbus_iface, canbus_nodeid): @@ -48,18 +49,24 @@ class KeyboardReader: reactor.register_callback(self.connect) self.local_commands = { "SET": self.command_SET, - "DUMP": self.command_DUMP, "FILEDUMP": self.command_FILEDUMP, - "DELAY": self.command_DELAY, "FLOOD": self.command_FLOOD, - "SUPPRESS": self.command_SUPPRESS, "STATS": self.command_STATS, - "LIST": self.command_LIST, "HELP": self.command_HELP, + "DUMP": self.command_DUMP, + "FILEDUMP": self.command_FILEDUMP, + "DELAY": self.command_DELAY, + "FLOOD": self.command_FLOOD, + "SUPPRESS": self.command_SUPPRESS, + "STATS": self.command_STATS, + "LIST": self.command_LIST, + "HELP": self.command_HELP, } self.eval_globals = {} + def connect(self, eventtime): self.output(help_txt) - self.output("="*20 + " attempting to connect " + "="*20) + self.output("=" * 20 + " attempting to connect " + "=" * 20) if self.canbus_iface is not None: - self.ser.connect_canbus(self.serialport, self.canbus_nodeid, - self.canbus_iface) + self.ser.connect_canbus( + self.serialport, self.canbus_nodeid, self.canbus_iface + ) elif self.baud: self.ser.connect_uart(self.serialport, self.baud) else: @@ -67,31 +74,44 @@ class KeyboardReader: msgparser = self.ser.get_msgparser() message_count = len(msgparser.get_messages()) version, build_versions = msgparser.get_version_info() - self.output("Loaded %d commands (%s / %s)" - % (message_count, version, build_versions)) - self.output("MCU config: %s" % (" ".join( - ["%s=%s" % (k, v) for k, v in msgparser.get_constants().items()]))) + self.output( + "Loaded %d commands (%s / %s)" % (message_count, version, build_versions) + ) + self.output( + "MCU config: %s" + % ( + " ".join( + ["%s=%s" % (k, v) for k, v in msgparser.get_constants().items()] + ) + ) + ) self.clocksync.connect(self.ser) self.ser.handle_default = self.handle_default - self.ser.register_response(self.handle_output, '#output') - self.mcu_freq = msgparser.get_constant_float('CLOCK_FREQ') - self.output("="*20 + " connected " + "="*20) + self.ser.register_response(self.handle_output, "#output") + self.mcu_freq = msgparser.get_constant_float("CLOCK_FREQ") + self.output("=" * 20 + " connected " + "=" * 20) return self.reactor.NEVER + def output(self, msg): sys.stdout.write("%s\n" % (msg,)) sys.stdout.flush() + def handle_default(self, params): - tdiff = params['#receive_time'] - self.start_time + tdiff = params["#receive_time"] - self.start_time msg = self.ser.get_msgparser().format_params(params) self.output("%07.3f: %s" % (tdiff, msg)) + def handle_output(self, params): - tdiff = params['#receive_time'] - self.start_time - self.output("%07.3f: %s: %s" % (tdiff, params['#name'], params['#msg'])) + tdiff = params["#receive_time"] - self.start_time + self.output("%07.3f: %s: %s" % (tdiff, params["#name"], params["#msg"])) + def handle_suppress(self, params): pass + def update_evals(self, eventtime): - self.eval_globals['freq'] = self.mcu_freq - self.eval_globals['clock'] = self.clocksync.get_clock(eventtime) + self.eval_globals["freq"] = self.mcu_freq + self.eval_globals["clock"] = self.clocksync.get_clock(eventtime) + def command_SET(self, parts): val = parts[2] try: @@ -99,6 +119,7 @@ class KeyboardReader: except ValueError: pass self.eval_globals[parts[1]] = val + def command_DUMP(self, parts, filename=None): # Extract command args try: @@ -106,7 +127,7 @@ class KeyboardReader: count = int(parts[2], 0) order = [2, 0, 1, 0][(addr | count) & 3] if len(parts) > 3: - order = {'32': 2, '16': 1, '8': 0}[parts[3]] + order = {"32": 2, "16": 1, "8": 0}[parts[3]] except ValueError as e: self.output("Error: %s" % (str(e),)) return @@ -117,23 +138,23 @@ class KeyboardReader: caddr = addr + (i << order) cmd = "debug_read order=%d addr=%d" % (order, caddr) params = self.ser.send_with_response(cmd, "debug_result") - vals.append(params['val']) + vals.append(params["val"]) # Report data if filename is None and order == 2: # Common 32bit hex dump for i in range((len(vals) + 3) // 4): p = i * 4 - hexvals = " ".join(["%08x" % (v,) for v in vals[p:p+4]]) + hexvals = " ".join(["%08x" % (v,) for v in vals[p : p + 4]]) self.output("%08x %s" % (addr + p * 4, hexvals)) return # Convert to byte format data = bytearray() for val in vals: for b in range(bsize): - data.append((val >> (8 * b)) & 0xff) + data.append((val >> (8 * b)) & 0xFF) data = data[:count] if filename is not None: - f = open(filename, 'wb') + f = open(filename, "wb") f.write(data) f.close() self.output("Wrote %d bytes to '%s'" % (len(data), filename)) @@ -141,13 +162,15 @@ class KeyboardReader: for i in range((count + 15) // 16): p = i * 16 paddr = addr + p - d = data[p:p+16] + d = data[p : p + 16] hexbytes = " ".join(["%02x" % (v,) for v in d]) - pb = "".join([chr(v) if v >= 0x20 and v < 0x7f else '.' for v in d]) + pb = "".join([chr(v) if v >= 0x20 and v < 0x7F else "." for v in d]) o = "%08x %-47s |%s|" % (paddr, hexbytes, pb) self.output("%s %s" % (o[:34], o[34:])) + def command_FILEDUMP(self, parts): self.command_DUMP(parts[1:], filename=parts[1]) + def command_DELAY(self, parts): try: val = int(parts[1]) @@ -155,10 +178,11 @@ class KeyboardReader: self.output("Error: %s" % (str(e),)) return try: - self.ser.send(' '.join(parts[2:]), minclock=val) + self.ser.send(" ".join(parts[2:]), minclock=val) except msgproto.error as e: self.output("Error: %s" % (str(e),)) return + def command_FLOOD(self, parts): try: count = int(parts[1]) @@ -166,10 +190,11 @@ class KeyboardReader: except ValueError as e: self.output("Error: %s" % (str(e),)) return - msg = ' '.join(parts[3:]) + msg = " ".join(parts[3:]) delay_clock = int(delay * self.mcu_freq) - msg_clock = int(self.clocksync.get_clock(self.reactor.monotonic()) - + self.mcu_freq * .200) + msg_clock = int( + self.clocksync.get_clock(self.reactor.monotonic()) + self.mcu_freq * 0.200 + ) try: for i in range(count): next_clock = msg_clock + delay_clock @@ -178,6 +203,7 @@ class KeyboardReader: except msgproto.error as e: self.output("Error: %s" % (str(e),)) return + def command_SUPPRESS(self, parts): oid = None try: @@ -188,15 +214,19 @@ class KeyboardReader: self.output("Error: %s" % (str(e),)) return self.ser.register_response(self.handle_suppress, name, oid) + def command_STATS(self, parts): curtime = self.reactor.monotonic() - self.output(' '.join([self.ser.stats(curtime), - self.clocksync.stats(curtime)])) + self.output(" ".join([self.ser.stats(curtime), self.clocksync.stats(curtime)])) + def command_LIST(self, parts): self.update_evals(self.reactor.monotonic()) mp = self.ser.get_msgparser() - cmds = [msgformat for msgtag, msgtype, msgformat in mp.get_messages() - if msgtype == 'command'] + cmds = [ + msgformat + for msgtag, msgtype, msgformat in mp.get_messages() + if msgtype == "command" + ] out = "Available mcu commands:" out += "\n ".join([""] + sorted(cmds)) out += "\nAvailable artificial commands:" @@ -205,8 +235,10 @@ class KeyboardReader: lvars = sorted(self.eval_globals.items()) out += "\n ".join([""] + ["%s: %s" % (k, v) for k, v in lvars]) self.output(out) + def command_HELP(self, parts): self.output(help_txt) + def translate(self, line, eventtime): evalparts = re_eval.split(line) if len(evalparts) > 1: @@ -214,13 +246,13 @@ class KeyboardReader: try: for i in range(1, len(evalparts), 2): e = eval(evalparts[i], dict(self.eval_globals)) - if type(e) == type(0.): + if type(e) == type(0.0): e = int(e) evalparts[i] = str(e) except: self.output("Unable to evaluate: %s" % (line,)) return None - line = ''.join(evalparts) + line = "".join(evalparts) self.output("Eval: %s" % (line,)) line = line.strip() if line: @@ -229,13 +261,14 @@ class KeyboardReader: self.local_commands[parts[0]](parts) return None return line + def process_kbd(self, eventtime): self.data += str(os.read(self.fd, 4096).decode()) - kbdlines = self.data.split('\n') + kbdlines = self.data.split("\n") for line in kbdlines[:-1]: line = line.strip() - cpos = line.find('#') + cpos = line.find("#") if cpos >= 0: line = line[:cpos] if not line: @@ -249,24 +282,37 @@ class KeyboardReader: self.output("Error: %s" % (str(e),)) self.data = kbdlines[-1] + def main(): usage = "%prog [options] <serialdevice>" opts = optparse.OptionParser(usage) - opts.add_option("-v", action="store_true", dest="verbose", - help="enable debug messages") + opts.add_option( + "-v", action="store_true", dest="verbose", help="enable debug messages" + ) opts.add_option("-b", "--baud", type="int", dest="baud", help="baud rate") - opts.add_option("-c", "--canbus_iface", dest="canbus_iface", - help="Use CAN bus interface; serialdevice is the chip UUID") - opts.add_option("-i", "--canbus_nodeid", type="int", dest="canbus_nodeid", - default=64, help="The CAN nodeid to use (default 64)") + opts.add_option( + "-c", + "--canbus_iface", + dest="canbus_iface", + help="Use CAN bus interface; serialdevice is the chip UUID", + ) + opts.add_option( + "-i", + "--canbus_nodeid", + type="int", + dest="canbus_nodeid", + default=64, + help="The CAN nodeid to use (default 64)", + ) options, args = opts.parse_args() if len(args) != 1: opts.error("Incorrect number of arguments") serialport = args[0] baud = options.baud - if baud is None and not (serialport.startswith("/dev/rpmsg_") - or serialport.startswith("/tmp/")): + if baud is None and not ( + serialport.startswith("/dev/rpmsg_") or serialport.startswith("/tmp/") + ): baud = 250000 debuglevel = logging.INFO @@ -275,12 +321,14 @@ def main(): logging.basicConfig(level=debuglevel) r = reactor.Reactor() - kbd = KeyboardReader(r, serialport, baud, options.canbus_iface, - options.canbus_nodeid) + kbd = KeyboardReader( + r, serialport, baud, options.canbus_iface, options.canbus_nodeid + ) try: r.run() except KeyboardInterrupt: sys.stdout.write("\n") -if __name__ == '__main__': + +if __name__ == "__main__": main() |