diff options
Diffstat (limited to 'klippy/extras/tmc_uart.py')
-rw-r--r-- | klippy/extras/tmc_uart.py | 205 |
1 files changed, 130 insertions, 75 deletions
diff --git a/klippy/extras/tmc_uart.py b/klippy/extras/tmc_uart.py index fa0d6262..141b8965 100644 --- a/klippy/extras/tmc_uart.py +++ b/klippy/extras/tmc_uart.py @@ -10,38 +10,48 @@ import logging # TMC uart analog mux support ###################################################################### + class MCU_analog_mux: def __init__(self, mcu, cmd_queue, select_pins_desc): self.mcu = mcu self.cmd_queue = cmd_queue ppins = mcu.get_printer().lookup_object("pins") - select_pin_params = [ppins.lookup_pin(spd, can_invert=True) - for spd in select_pins_desc] + select_pin_params = [ + ppins.lookup_pin(spd, can_invert=True) for spd in select_pins_desc + ] self.oids = [self.mcu.create_oid() for pp in select_pin_params] - self.pins = [pp['pin'] for pp in select_pin_params] + self.pins = [pp["pin"] for pp in select_pin_params] self.pin_values = tuple([-1 for pp in select_pin_params]) for oid, pin in zip(self.oids, self.pins): - self.mcu.add_config_cmd("config_digital_out oid=%d pin=%s" - " value=0 default_value=0 max_duration=0" - % (oid, pin)) + self.mcu.add_config_cmd( + "config_digital_out oid=%d pin=%s" + " value=0 default_value=0 max_duration=0" % (oid, pin) + ) self.update_pin_cmd = None self.mcu.register_config_callback(self.build_config) + def build_config(self): self.update_pin_cmd = self.mcu.lookup_command( - "update_digital_out oid=%c value=%c", cq=self.cmd_queue) + "update_digital_out oid=%c value=%c", cq=self.cmd_queue + ) + def get_instance_id(self, select_pins_desc): ppins = self.mcu.get_printer().lookup_object("pins") - select_pin_params = [ppins.parse_pin(spd, can_invert=True) - for spd in select_pins_desc] + select_pin_params = [ + ppins.parse_pin(spd, can_invert=True) for spd in select_pins_desc + ] for pin_params in select_pin_params: - if pin_params['chip'] != self.mcu: + if pin_params["chip"] != self.mcu: raise self.mcu.get_printer().config_error( - "TMC mux pins must be on the same mcu") - pins = [pp['pin'] for pp in select_pin_params] + "TMC mux pins must be on the same mcu" + ) + pins = [pp["pin"] for pp in select_pin_params] if pins != self.pins: raise self.mcu.get_printer().config_error( - "All TMC mux instances must use identical pins") - return tuple([not pp['invert'] for pp in select_pin_params]) + "All TMC mux instances must use identical pins" + ) + return tuple([not pp["invert"] for pp in select_pin_params]) + def activate(self, instance_id): for oid, old, new in zip(self.oids, self.pin_values, instance_id): if old != new: @@ -53,71 +63,85 @@ class MCU_analog_mux: # TMC uart communication ###################################################################### + # Share mutexes so only one active tmc_uart command on a single mcu at # a time. This helps limit cpu usage on slower micro-controllers. class PrinterTMCUartMutexes: def __init__(self): self.mcu_to_mutex = {} + + def lookup_tmc_uart_mutex(mcu): printer = mcu.get_printer() - pmutexes = printer.lookup_object('tmc_uart', None) + pmutexes = printer.lookup_object("tmc_uart", None) if pmutexes is None: pmutexes = PrinterTMCUartMutexes() - printer.add_object('tmc_uart', pmutexes) + printer.add_object("tmc_uart", pmutexes) mutex = pmutexes.mcu_to_mutex.get(mcu) if mutex is None: mutex = printer.get_reactor().mutex() pmutexes.mcu_to_mutex[mcu] = mutex return mutex + TMC_BAUD_RATE = 40000 TMC_BAUD_RATE_AVR = 9000 + # Code for sending messages on a TMC uart class MCU_TMC_uart_bitbang: def __init__(self, rx_pin_params, tx_pin_params, select_pins_desc): - self.mcu = rx_pin_params['chip'] + self.mcu = rx_pin_params["chip"] self.mutex = lookup_tmc_uart_mutex(self.mcu) - self.pullup = rx_pin_params['pullup'] - self.rx_pin = rx_pin_params['pin'] - self.tx_pin = tx_pin_params['pin'] + self.pullup = rx_pin_params["pullup"] + self.rx_pin = rx_pin_params["pin"] + self.tx_pin = tx_pin_params["pin"] self.oid = self.mcu.create_oid() self.cmd_queue = self.mcu.alloc_command_queue() self.analog_mux = None if select_pins_desc is not None: - self.analog_mux = MCU_analog_mux(self.mcu, self.cmd_queue, - select_pins_desc) + self.analog_mux = MCU_analog_mux(self.mcu, self.cmd_queue, select_pins_desc) self.instances = {} self.tmcuart_send_cmd = None self.mcu.register_config_callback(self.build_config) + def build_config(self): baud = TMC_BAUD_RATE mcu_type = self.mcu.get_constants().get("MCU", "") if mcu_type.startswith("atmega") or mcu_type.startswith("at90usb"): baud = TMC_BAUD_RATE_AVR - bit_ticks = self.mcu.seconds_to_clock(1. / baud) + bit_ticks = self.mcu.seconds_to_clock(1.0 / baud) self.mcu.add_config_cmd( "config_tmcuart oid=%d rx_pin=%s pull_up=%d tx_pin=%s bit_time=%d" - % (self.oid, self.rx_pin, self.pullup, self.tx_pin, bit_ticks)) + % (self.oid, self.rx_pin, self.pullup, self.tx_pin, bit_ticks) + ) self.tmcuart_send_cmd = self.mcu.lookup_query_command( "tmcuart_send oid=%c write=%*s read=%c", - "tmcuart_response oid=%c read=%*s", oid=self.oid, - cq=self.cmd_queue, is_async=True) - def register_instance(self, rx_pin_params, tx_pin_params, - select_pins_desc, addr): - if (rx_pin_params['pin'] != self.rx_pin - or tx_pin_params['pin'] != self.tx_pin - or (select_pins_desc is None) != (self.analog_mux is None)): + "tmcuart_response oid=%c read=%*s", + oid=self.oid, + cq=self.cmd_queue, + is_async=True, + ) + + def register_instance(self, rx_pin_params, tx_pin_params, select_pins_desc, addr): + if ( + rx_pin_params["pin"] != self.rx_pin + or tx_pin_params["pin"] != self.tx_pin + or (select_pins_desc is None) != (self.analog_mux is None) + ): raise self.mcu.get_printer().config_error( - "Shared TMC uarts must use the same pins") + "Shared TMC uarts must use the same pins" + ) instance_id = None if self.analog_mux is not None: instance_id = self.analog_mux.get_instance_id(select_pins_desc) if (instance_id, addr) in self.instances: raise self.mcu.get_printer().config_error( - "Shared TMC uarts need unique address or select_pins polarity") + "Shared TMC uarts need unique address or select_pins polarity" + ) self.instances[(instance_id, addr)] = True return instance_id + def _calc_crc8(self, data): # Generate a CRC8-ATM value for a bytearray crc = 0 @@ -126,33 +150,46 @@ class MCU_TMC_uart_bitbang: if (crc >> 7) ^ (b & 0x01): crc = (crc << 1) ^ 0x07 else: - crc = (crc << 1) - crc &= 0xff + crc = crc << 1 + crc &= 0xFF b >>= 1 return crc + def _add_serial_bits(self, data): # Add serial start and stop bits to a message in a bytearray out = 0 pos = 0 for d in data: b = (d << 1) | 0x200 - out |= (b << pos) + out |= b << pos pos += 10 res = bytearray() - for i in range((pos+7)//8): - res.append((out >> (i*8)) & 0xff) + for i in range((pos + 7) // 8): + res.append((out >> (i * 8)) & 0xFF) return res + def _encode_read(self, sync, addr, reg): # Generate a uart read register message msg = bytearray([sync, addr, reg]) msg.append(self._calc_crc8(msg)) return self._add_serial_bits(msg) + def _encode_write(self, sync, addr, reg, val): # Generate a uart write register message - msg = bytearray([sync, addr, reg, (val >> 24) & 0xff, - (val >> 16) & 0xff, (val >> 8) & 0xff, val & 0xff]) + msg = bytearray( + [ + sync, + addr, + reg, + (val >> 24) & 0xFF, + (val >> 16) & 0xFF, + (val >> 8) & 0xFF, + val & 0xFF, + ] + ) msg.append(self._calc_crc8(msg)) return self._add_serial_bits(msg) + def _decode_read(self, reg, data): # Extract a uart read response message if len(data) != 10: @@ -163,56 +200,66 @@ class MCU_TMC_uart_bitbang: mval |= d << pos pos += 8 # Extract register value - val = ((((mval >> 31) & 0xff) << 24) | (((mval >> 41) & 0xff) << 16) - | (((mval >> 51) & 0xff) << 8) | ((mval >> 61) & 0xff)) + val = ( + (((mval >> 31) & 0xFF) << 24) + | (((mval >> 41) & 0xFF) << 16) + | (((mval >> 51) & 0xFF) << 8) + | ((mval >> 61) & 0xFF) + ) # Verify start/stop bits and crc - encoded_data = self._encode_write(0x05, 0xff, reg, val) + encoded_data = self._encode_write(0x05, 0xFF, reg, val) if data != encoded_data: return None return val + def reg_read(self, instance_id, addr, reg): if self.analog_mux is not None: self.analog_mux.activate(instance_id) - msg = self._encode_read(0xf5, addr, reg) + msg = self._encode_read(0xF5, addr, reg) params = self.tmcuart_send_cmd.send([self.oid, msg, 10]) return { - 'data': self._decode_read(reg, params['read']), - '#receive_time': params['#receive_time'] + "data": self._decode_read(reg, params["read"]), + "#receive_time": params["#receive_time"], } + def reg_write(self, instance_id, addr, reg, val, print_time=None): minclock = 0 if print_time is not None: minclock = self.mcu.print_time_to_clock(print_time) if self.analog_mux is not None: self.analog_mux.activate(instance_id) - msg = self._encode_write(0xf5, addr, reg | 0x80, val) + msg = self._encode_write(0xF5, addr, reg | 0x80, val) self.tmcuart_send_cmd.send([self.oid, msg, 0], minclock=minclock) + def get_mcu(self): return self.mcu + # Lookup a (possibly shared) tmc uart def lookup_tmc_uart_bitbang(config, max_addr): ppins = config.get_printer().lookup_object("pins") - rx_pin_params = ppins.lookup_pin(config.get('uart_pin'), can_pullup=True, - share_type="tmc_uart_rx") - tx_pin_desc = config.get('tx_pin', None) + rx_pin_params = ppins.lookup_pin( + config.get("uart_pin"), can_pullup=True, share_type="tmc_uart_rx" + ) + tx_pin_desc = config.get("tx_pin", None) if tx_pin_desc is None: tx_pin_params = rx_pin_params else: tx_pin_params = ppins.lookup_pin(tx_pin_desc, share_type="tmc_uart_tx") - if rx_pin_params['chip'] is not tx_pin_params['chip']: + if rx_pin_params["chip"] is not tx_pin_params["chip"]: raise ppins.error("TMC uart rx and tx pins must be on the same mcu") - select_pins_desc = config.getlist('select_pins', None) - addr = config.getint('uart_address', 0, minval=0, maxval=max_addr) - mcu_uart = rx_pin_params.get('class') + select_pins_desc = config.getlist("select_pins", None) + addr = config.getint("uart_address", 0, minval=0, maxval=max_addr) + mcu_uart = rx_pin_params.get("class") if mcu_uart is None: - mcu_uart = MCU_TMC_uart_bitbang(rx_pin_params, tx_pin_params, - select_pins_desc) - rx_pin_params['class'] = mcu_uart - instance_id = mcu_uart.register_instance(rx_pin_params, tx_pin_params, - select_pins_desc, addr) + mcu_uart = MCU_TMC_uart_bitbang(rx_pin_params, tx_pin_params, select_pins_desc) + rx_pin_params["class"] = mcu_uart + instance_id = mcu_uart.register_instance( + rx_pin_params, tx_pin_params, select_pins_desc, addr + ) return instance_id, addr, mcu_uart + # Helper code for communicating via TMC uart class MCU_TMC_uart: def __init__(self, config, name_to_reg, fields, max_addr, tmc_frequency): @@ -222,46 +269,54 @@ class MCU_TMC_uart: self.fields = fields self.ifcnt = None self.instance_id, self.addr, self.mcu_uart = lookup_tmc_uart_bitbang( - config, max_addr) + config, max_addr + ) self.mutex = self.mcu_uart.mutex self.tmc_frequency = tmc_frequency + def get_fields(self): return self.fields + def _do_get_register(self, reg_name): reg = self.name_to_reg[reg_name] - if self.printer.get_start_args().get('debugoutput') is not None: - return { - 'data': 0, - '#receive_time': 0. - } + if self.printer.get_start_args().get("debugoutput") is not None: + return {"data": 0, "#receive_time": 0.0} for retry in range(5): val = self.mcu_uart.reg_read(self.instance_id, self.addr, reg) - if val['data'] is not None: + if val["data"] is not None: return val raise self.printer.command_error( - "Unable to read tmc uart '%s' register %s" % (self.name, reg_name)) + "Unable to read tmc uart '%s' register %s" % (self.name, reg_name) + ) + def get_register_raw(self, reg_name): with self.mutex: return self._do_get_register(reg_name) + def get_register(self, reg_name): - return self.get_register_raw(reg_name)['data'] + return self.get_register_raw(reg_name)["data"] + def set_register(self, reg_name, val, print_time=None): reg = self.name_to_reg[reg_name] - if self.printer.get_start_args().get('debugoutput') is not None: + if self.printer.get_start_args().get("debugoutput") is not None: return with self.mutex: for retry in range(5): ifcnt = self.ifcnt if ifcnt is None: - self.ifcnt = ifcnt = self._do_get_register("IFCNT")['data'] - self.mcu_uart.reg_write(self.instance_id, self.addr, reg, val, - print_time) - self.ifcnt = self._do_get_register("IFCNT")['data'] - if self.ifcnt == (ifcnt + 1) & 0xff: + self.ifcnt = ifcnt = self._do_get_register("IFCNT")["data"] + self.mcu_uart.reg_write( + self.instance_id, self.addr, reg, val, print_time + ) + self.ifcnt = self._do_get_register("IFCNT")["data"] + if self.ifcnt == (ifcnt + 1) & 0xFF: return raise self.printer.command_error( - "Unable to write tmc uart '%s' register %s" % (self.name, reg_name)) + "Unable to write tmc uart '%s' register %s" % (self.name, reg_name) + ) + def get_tmc_frequency(self): return self.tmc_frequency + def get_mcu(self): return self.mcu_uart.get_mcu() |