diff options
Diffstat (limited to 'klippy/extras/angle.py')
-rw-r--r-- | klippy/extras/angle.py | 470 |
1 files changed, 294 insertions, 176 deletions
diff --git a/klippy/extras/angle.py b/klippy/extras/angle.py index 73af67ca..f2799543 100644 --- a/klippy/extras/angle.py +++ b/klippy/extras/angle.py @@ -7,19 +7,19 @@ import logging, math from . import bus, bulk_sensor MIN_MSG_TIME = 0.100 -TCODE_ERROR = 0xff +TCODE_ERROR = 0xFF -TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2240", "tmc2660", - "tmc5160"] +TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2240", "tmc2660", "tmc5160"] + +CALIBRATION_BITS = 6 # 64 entries +ANGLE_BITS = 16 # angles range from 0..65535 -CALIBRATION_BITS = 6 # 64 entries -ANGLE_BITS = 16 # angles range from 0..65535 class AngleCalibration: def __init__(self, config): self.printer = config.get_printer() self.name = config.get_name() - self.stepper_name = config.get('stepper', None) + self.stepper_name = config.get("stepper", None) if self.stepper_name is None: # No calibration return @@ -28,30 +28,37 @@ class AngleCalibration: except: raise config.error("Angle calibration requires numpy module") sconfig = config.getsection(self.stepper_name) - sconfig.getint('microsteps', note_valid=False) + sconfig.getint("microsteps", note_valid=False) self.tmc_module = self.mcu_stepper = None # Current calibration data self.mcu_pos_offset = None - self.angle_phase_offset = 0. + self.angle_phase_offset = 0.0 self.calibration_reversed = False self.calibration = [] - cal = config.get('calibrate', None) + cal = config.get("calibrate", None) if cal is not None: - data = [d.strip() for d in cal.split(',')] + data = [d.strip() for d in cal.split(",")] angles = [float(d) for d in data if d] self.load_calibration(angles) # Register commands - self.printer.register_event_handler("stepper:sync_mcu_position", - self.handle_sync_mcu_pos) + self.printer.register_event_handler( + "stepper:sync_mcu_position", self.handle_sync_mcu_pos + ) self.printer.register_event_handler("klippy:connect", self.connect) cname = self.name.split()[-1] - gcode = self.printer.lookup_object('gcode') - gcode.register_mux_command("ANGLE_CALIBRATE", "CHIP", - cname, self.cmd_ANGLE_CALIBRATE, - desc=self.cmd_ANGLE_CALIBRATE_help) + gcode = self.printer.lookup_object("gcode") + gcode.register_mux_command( + "ANGLE_CALIBRATE", + "CHIP", + cname, + self.cmd_ANGLE_CALIBRATE, + desc=self.cmd_ANGLE_CALIBRATE_help, + ) + def handle_sync_mcu_pos(self, mcu_stepper): if mcu_stepper.get_name() == self.stepper_name: self.mcu_pos_offset = None + def calc_mcu_pos_offset(self, sample): # Lookup phase information mcu_phase_offset, phases = self.tmc_module.get_phase_offset() @@ -62,15 +69,18 @@ class AngleCalibration: mcu_pos = self.mcu_stepper.get_past_mcu_position(angle_time) # Convert angle_pos to mcu_pos units microsteps, full_steps = self.get_microsteps() - angle_to_mcu_pos = full_steps * microsteps / float(1<<ANGLE_BITS) + angle_to_mcu_pos = full_steps * microsteps / float(1 << ANGLE_BITS) angle_mpos = angle_pos * angle_to_mcu_pos # Calculate adjustment for stepper phases - phase_diff = ((angle_mpos + self.angle_phase_offset * angle_to_mcu_pos) - - (mcu_pos + mcu_phase_offset)) % phases - if phase_diff > phases//2: + phase_diff = ( + (angle_mpos + self.angle_phase_offset * angle_to_mcu_pos) + - (mcu_pos + mcu_phase_offset) + ) % phases + if phase_diff > phases // 2: phase_diff -= phases # Store final offset self.mcu_pos_offset = mcu_pos - (angle_mpos - phase_diff) + def apply_calibration(self, samples): calibration = self.calibration if not calibration: @@ -80,12 +90,12 @@ class AngleCalibration: interp_mask = (1 << interp_bits) - 1 interp_round = 1 << (interp_bits - 1) for i, (samp_time, angle) in enumerate(samples): - bucket = (angle & 0xffff) >> interp_bits + bucket = (angle & 0xFFFF) >> interp_bits cal1 = calibration[bucket] cal2 = calibration[bucket + 1] adj = (angle & interp_mask) * (cal2 - cal1) adj = cal1 + ((adj + interp_round) >> interp_bits) - angle_diff = (adj - angle) & 0xffff + angle_diff = (adj - angle) & 0xFFFF angle_diff -= (angle_diff & 0x8000) << 1 new_angle = angle + angle_diff if calibration_reversed: @@ -96,6 +106,7 @@ class AngleCalibration: if self.mcu_pos_offset is None: return None return self.mcu_stepper.mcu_to_commanded_position(self.mcu_pos_offset) + def load_calibration(self, angles): # Calculate linear interpolation calibration buckets by solving # linear equations @@ -111,59 +122,69 @@ class AngleCalibration: first_step = angles.index(min(angles)) angles = angles[first_step:] + angles[:first_step] import numpy + eqs = numpy.zeros((full_steps, calibration_count)) ans = numpy.zeros((full_steps,)) for step, angle in enumerate(angles): - int_angle = int(angle + .5) % angle_max + int_angle = int(angle + 0.5) % angle_max bucket = int(int_angle / bucket_size) bucket_start = bucket * bucket_size ang_diff = angle - bucket_start ang_diff_per = ang_diff / bucket_size eq = eqs[step] - eq[bucket] = 1. - ang_diff_per + eq[bucket] = 1.0 - ang_diff_per eq[(bucket + 1) % calibration_count] = ang_diff_per ans[step] = float(step * nominal_step) if bucket + 1 >= calibration_count: ans[step] -= ang_diff_per * angle_max sol = numpy.linalg.lstsq(eqs, ans, rcond=None)[0] - isol = [int(s + .5) for s in sol] + isol = [int(s + 0.5) for s in sol] self.calibration = isol + [isol[0] + angle_max] + def lookup_tmc(self): for driver in TRINAMIC_DRIVERS: driver_name = "%s %s" % (driver, self.stepper_name) module = self.printer.lookup_object(driver_name, None) if module is not None: return module - raise self.printer.command_error("Unable to find TMC driver for %s" - % (self.stepper_name,)) + raise self.printer.command_error( + "Unable to find TMC driver for %s" % (self.stepper_name,) + ) + def connect(self): self.tmc_module = self.lookup_tmc() - fmove = self.printer.lookup_object('force_move') + fmove = self.printer.lookup_object("force_move") self.mcu_stepper = fmove.lookup_stepper(self.stepper_name) + def get_microsteps(self): - configfile = self.printer.lookup_object('configfile') - sconfig = configfile.get_status(None)['settings'] + configfile = self.printer.lookup_object("configfile") + sconfig = configfile.get_status(None)["settings"] stconfig = sconfig.get(self.stepper_name, {}) - microsteps = stconfig['microsteps'] - full_steps = stconfig['full_steps_per_rotation'] + microsteps = stconfig["microsteps"] + full_steps = stconfig["full_steps_per_rotation"] return microsteps, full_steps + def get_stepper_phase(self): mcu_phase_offset, phases = self.tmc_module.get_phase_offset() if mcu_phase_offset is None: - raise self.printer.command_error("Driver phase not known for %s" - % (self.stepper_name,)) + raise self.printer.command_error( + "Driver phase not known for %s" % (self.stepper_name,) + ) mcu_pos = self.mcu_stepper.get_mcu_position() return (mcu_pos + mcu_phase_offset) % phases + def do_calibration_moves(self): - move = self.printer.lookup_object('force_move').manual_move + move = self.printer.lookup_object("force_move").manual_move # Start data collection msgs = [] is_finished = False + def handle_batch(msg): if is_finished: return False msgs.append(msg) return True + self.printer.lookup_object(self.name).add_client(handle_batch) # Move stepper several turns (to allow internal sensor calibration) microsteps, full_steps = self.get_microsteps() @@ -174,12 +195,12 @@ class AngleCalibration: align_dist = step_dist * self.get_stepper_phase() move_time = 0.010 move_speed = full_step_dist / move_time - move(mcu_stepper, -(rotation_dist+align_dist), move_speed) - move(mcu_stepper, 2. * rotation_dist, move_speed) - move(mcu_stepper, -2. * rotation_dist, move_speed) - move(mcu_stepper, .5 * rotation_dist - full_step_dist, move_speed) + move(mcu_stepper, -(rotation_dist + align_dist), move_speed) + move(mcu_stepper, 2.0 * rotation_dist, move_speed) + move(mcu_stepper, -2.0 * rotation_dist, move_speed) + move(mcu_stepper, 0.5 * rotation_dist - full_step_dist, move_speed) # Move to each full step position - toolhead = self.printer.lookup_object('toolhead') + toolhead = self.printer.lookup_object("toolhead") times = [] samp_dist = full_step_dist for i in range(2 * full_steps): @@ -188,12 +209,12 @@ class AngleCalibration: end_query_time = start_query_time + 0.050 times.append((start_query_time, end_query_time)) toolhead.dwell(0.150) - if i == full_steps-1: + if i == full_steps - 1: # Reverse direction and test each full step again - move(mcu_stepper, .5 * rotation_dist, move_speed) - move(mcu_stepper, -.5 * rotation_dist + samp_dist, move_speed) + move(mcu_stepper, 0.5 * rotation_dist, move_speed) + move(mcu_stepper, -0.5 * rotation_dist + samp_dist, move_speed) samp_dist = -samp_dist - move(mcu_stepper, .5*rotation_dist + align_dist, move_speed) + move(mcu_stepper, 0.5 * rotation_dist + align_dist, move_speed) toolhead.wait_moves() # Finish data collection is_finished = True @@ -201,7 +222,7 @@ class AngleCalibration: cal = {} step = 0 for msg in msgs: - for query_time, pos in msg['data']: + for query_time, pos in msg["data"]: # Add to step tracking while step < len(times) and query_time > times[step][1]: step += 1 @@ -209,10 +230,12 @@ class AngleCalibration: cal.setdefault(step, []).append(pos) if len(cal) != len(times): raise self.printer.command_error( - "Failed calibration - incomplete sensor data") - fcal = { i: cal[i] for i in range(full_steps) } - rcal = { full_steps-i-1: cal[i+full_steps] for i in range(full_steps) } + "Failed calibration - incomplete sensor data" + ) + fcal = {i: cal[i] for i in range(full_steps)} + rcal = {full_steps - i - 1: cal[i + full_steps] for i in range(full_steps)} return fcal, rcal + def calc_angles(self, meas): total_count = total_variance = 0 angles = {} @@ -221,9 +244,11 @@ class AngleCalibration: angle_avg = float(sum(data)) / count angles[step] = angle_avg total_count += count - total_variance += sum([(d - angle_avg)**2 for d in data]) + total_variance += sum([(d - angle_avg) ** 2 for d in data]) return angles, math.sqrt(total_variance / total_count), total_count + cmd_ANGLE_CALIBRATE_help = "Calibrate angle sensor to stepper motor" + def cmd_ANGLE_CALIBRATE(self, gcmd): # Perform calibration movement and capture old_calibration = self.calibration @@ -236,16 +261,20 @@ class AngleCalibration: microsteps, full_steps = self.get_microsteps() fangles, fstd, ftotal = self.calc_angles(fcal) rangles, rstd, rtotal = self.calc_angles(rcal) - if (len({a: i for i, a in fangles.items()}) != len(fangles) - or len({a: i for i, a in rangles.items()}) != len(rangles)): + if len({a: i for i, a in fangles.items()}) != len(fangles) or len( + {a: i for i, a in rangles.items()} + ) != len(rangles): raise self.printer.command_error( - "Failed calibration - sensor not updating for each step") - merged = { i: fcal[i] + rcal[i] for i in range(full_steps) } + "Failed calibration - sensor not updating for each step" + ) + merged = {i: fcal[i] + rcal[i] for i in range(full_steps)} angles, std, total = self.calc_angles(merged) - gcmd.respond_info("angle: stddev=%.3f (%.3f forward / %.3f reverse)" - " in %d queries" % (std, fstd, rstd, total)) + gcmd.respond_info( + "angle: stddev=%.3f (%.3f forward / %.3f reverse)" + " in %d queries" % (std, fstd, rstd, total) + ) # Order data with lowest/highest magnet position first - anglist = [angles[i] % 0xffff for i in range(full_steps)] + anglist = [angles[i] % 0xFFFF for i in range(full_steps)] if angles[0] > angles[1]: first_ang = max(anglist) else: @@ -256,45 +285,55 @@ class AngleCalibration: cal_contents = [] for i, angle in enumerate(anglist): if not i % 8: - cal_contents.append('\n') + cal_contents.append("\n") cal_contents.append("%.1f" % (angle,)) - cal_contents.append(',') + cal_contents.append(",") cal_contents.pop() - configfile = self.printer.lookup_object('configfile') + configfile = self.printer.lookup_object("configfile") configfile.remove_section(self.name) - configfile.set(self.name, 'calibrate', ''.join(cal_contents)) + configfile.set(self.name, "calibrate", "".join(cal_contents)) + class HelperA1333: SPI_MODE = 3 SPI_SPEED = 10000000 + def __init__(self, config, spi, oid): self.spi = spi self.is_tcode_absolute = False self.last_temperature = None + def get_static_delay(self): - return .000001 + return 0.000001 + def start(self): # Setup for angle query self.spi.spi_transfer([0x32, 0x00]) + class HelperAS5047D: SPI_MODE = 1 - SPI_SPEED = int(1. / .000000350) + SPI_SPEED = int(1.0 / 0.000000350) + def __init__(self, config, spi, oid): self.spi = spi self.is_tcode_absolute = False self.last_temperature = None + def get_static_delay(self): - return .000100 + return 0.000100 + def start(self): # Clear any errors from device - self.spi.spi_transfer([0xff, 0xfc]) # Read DIAAGC - self.spi.spi_transfer([0x40, 0x01]) # Read ERRFL - self.spi.spi_transfer([0xc0, 0x00]) # Read NOP + self.spi.spi_transfer([0xFF, 0xFC]) # Read DIAAGC + self.spi.spi_transfer([0x40, 0x01]) # Read ERRFL + self.spi.spi_transfer([0xC0, 0x00]) # Read NOP + class HelperTLE5012B: SPI_MODE = 1 SPI_SPEED = 4000000 + def __init__(self, config, spi, oid): self.printer = config.get_printer() self.spi = spi @@ -305,115 +344,140 @@ class HelperTLE5012B: self.mcu.register_config_callback(self._build_config) self.spi_angle_transfer_cmd = None self.last_chip_mcu_clock = self.last_chip_clock = 0 - self.chip_freq = 0. + self.chip_freq = 0.0 name = config.get_name().split()[-1] gcode = self.printer.lookup_object("gcode") - gcode.register_mux_command("ANGLE_DEBUG_READ", "CHIP", name, - self.cmd_ANGLE_DEBUG_READ, - desc=self.cmd_ANGLE_DEBUG_READ_help) - gcode.register_mux_command("ANGLE_DEBUG_WRITE", "CHIP", name, - self.cmd_ANGLE_DEBUG_WRITE, - desc=self.cmd_ANGLE_DEBUG_WRITE_help) + gcode.register_mux_command( + "ANGLE_DEBUG_READ", + "CHIP", + name, + self.cmd_ANGLE_DEBUG_READ, + desc=self.cmd_ANGLE_DEBUG_READ_help, + ) + gcode.register_mux_command( + "ANGLE_DEBUG_WRITE", + "CHIP", + name, + self.cmd_ANGLE_DEBUG_WRITE, + desc=self.cmd_ANGLE_DEBUG_WRITE_help, + ) + def _build_config(self): cmdqueue = self.spi.get_command_queue() self.spi_angle_transfer_cmd = self.mcu.lookup_query_command( "spi_angle_transfer oid=%c data=%*s", "spi_angle_transfer_response oid=%c clock=%u response=%*s", - oid=self.oid, cq=cmdqueue) + oid=self.oid, + cq=cmdqueue, + ) + def get_tcode_params(self): return self.last_chip_mcu_clock, self.last_chip_clock, self.chip_freq + def _calc_crc(self, data): - crc = 0xff + crc = 0xFF for d in data: crc ^= d for i in range(8): if crc & 0x80: - crc = (crc << 1) ^ 0x1d + crc = (crc << 1) ^ 0x1D else: crc <<= 1 - return (~crc) & 0xff + return (~crc) & 0xFF + def _send_spi(self, msg): for retry in range(5): if msg[0] & 0x04: params = self.spi_angle_transfer_cmd.send([self.oid, msg]) else: params = self.spi.spi_transfer(msg) - resp = bytearray(params['response']) + resp = bytearray(params["response"]) crc = self._calc_crc(bytearray(msg[:2]) + resp[2:-2]) if crc == resp[-1]: return params raise self.printer.command_error("Unable to query tle5012b chip") + def _read_reg(self, reg): - cw = 0x8000 | ((reg & 0x3f) << 4) | 0x01 + cw = 0x8000 | ((reg & 0x3F) << 4) | 0x01 if reg >= 0x05 and reg <= 0x11: cw |= 0x5000 - msg = [cw >> 8, cw & 0xff, 0, 0, 0, 0] + msg = [cw >> 8, cw & 0xFF, 0, 0, 0, 0] params = self._send_spi(msg) - resp = bytearray(params['response']) + resp = bytearray(params["response"]) return (resp[2] << 8) | resp[3] + def _write_reg(self, reg, val): - cw = ((reg & 0x3f) << 4) | 0x01 + cw = ((reg & 0x3F) << 4) | 0x01 if reg >= 0x05 and reg <= 0x11: cw |= 0x5000 - msg = [cw >> 8, cw & 0xff, (val >> 8) & 0xff, val & 0xff, 0, 0] + msg = [cw >> 8, cw & 0xFF, (val >> 8) & 0xFF, val & 0xFF, 0, 0] for retry in range(5): self._send_spi(msg) rval = self._read_reg(reg) if rval == val: return raise self.printer.command_error("Unable to write to tle5012b chip") + def _mask_reg(self, reg, off, on): rval = self._read_reg(reg) self._write_reg(reg, (rval & ~off) | on) + def _query_clock(self): # Read frame counter (and normalize to a 16bit counter) - msg = [0x84, 0x42, 0, 0, 0, 0, 0, 0] # Read with latch, AREV and FSYNC + msg = [0x84, 0x42, 0, 0, 0, 0, 0, 0] # Read with latch, AREV and FSYNC params = self._send_spi(msg) - resp = bytearray(params['response']) - mcu_clock = self.mcu.clock32_to_clock64(params['clock']) - chip_clock = ((resp[2] & 0x7e) << 9) | ((resp[4] & 0x3e) << 4) + resp = bytearray(params["response"]) + mcu_clock = self.mcu.clock32_to_clock64(params["clock"]) + chip_clock = ((resp[2] & 0x7E) << 9) | ((resp[4] & 0x3E) << 4) # Calculate temperature temper = resp[5] - ((resp[4] & 0x01) << 8) self.last_temperature = (temper + 152) / 2.776 return mcu_clock, chip_clock + def update_clock(self): mcu_clock, chip_clock = self._query_clock() mdiff = mcu_clock - self.last_chip_mcu_clock - chip_mclock = self.last_chip_clock + int(mdiff * self.chip_freq + .5) - cdiff = (chip_clock - chip_mclock) & 0xffff + chip_mclock = self.last_chip_clock + int(mdiff * self.chip_freq + 0.5) + cdiff = (chip_clock - chip_mclock) & 0xFFFF cdiff -= (cdiff & 0x8000) << 1 new_chip_clock = chip_mclock + cdiff self.chip_freq = float(new_chip_clock - self.last_chip_clock) / mdiff self.last_chip_clock = new_chip_clock self.last_chip_mcu_clock = mcu_clock + def start(self): # Clear any errors from device - self._read_reg(0x00) # Read STAT + self._read_reg(0x00) # Read STAT # Initialize chip (so different chip variants work the same way) - self._mask_reg(0x06, 0xc003, 0x4000) # MOD1: 42.7us, IIF disable - self._mask_reg(0x08, 0x0007, 0x0001) # MOD2: Predict off, autocal=1 - self._mask_reg(0x0e, 0x0003, 0x0000) # MOD4: IIF mode + self._mask_reg(0x06, 0xC003, 0x4000) # MOD1: 42.7us, IIF disable + self._mask_reg(0x08, 0x0007, 0x0001) # MOD2: Predict off, autocal=1 + self._mask_reg(0x0E, 0x0003, 0x0000) # MOD4: IIF mode # Setup starting clock values mcu_clock, chip_clock = self._query_clock() self.last_chip_clock = chip_clock self.last_chip_mcu_clock = mcu_clock - self.chip_freq = float(1<<5) / self.mcu.seconds_to_clock(1. / 750000.) + self.chip_freq = float(1 << 5) / self.mcu.seconds_to_clock(1.0 / 750000.0) self.update_clock() + cmd_ANGLE_DEBUG_READ_help = "Query low-level angle sensor register" + def cmd_ANGLE_DEBUG_READ(self, gcmd): reg = gcmd.get("REG", minval=0, maxval=0x30, parser=lambda x: int(x, 0)) val = self._read_reg(reg) gcmd.respond_info("ANGLE REG[0x%02x] = 0x%04x" % (reg, val)) + cmd_ANGLE_DEBUG_WRITE_help = "Set low-level angle sensor register" + def cmd_ANGLE_DEBUG_WRITE(self, gcmd): reg = gcmd.get("REG", minval=0, maxval=0x30, parser=lambda x: int(x, 0)) - val = gcmd.get("VAL", minval=0, maxval=0xffff, - parser=lambda x: int(x, 0)) + val = gcmd.get("VAL", minval=0, maxval=0xFFFF, parser=lambda x: int(x, 0)) self._write_reg(reg, val) + class HelperMT6816: SPI_MODE = 3 SPI_SPEED = 10000000 + def __init__(self, config, spi, oid): self.printer = config.get_printer() self.spi = spi @@ -425,28 +489,41 @@ class HelperMT6816: self.last_temperature = None name = config.get_name().split()[-1] gcode = self.printer.lookup_object("gcode") - gcode.register_mux_command("ANGLE_DEBUG_READ", "CHIP", name, - self.cmd_ANGLE_DEBUG_READ, - desc=self.cmd_ANGLE_DEBUG_READ_help) + gcode.register_mux_command( + "ANGLE_DEBUG_READ", + "CHIP", + name, + self.cmd_ANGLE_DEBUG_READ, + desc=self.cmd_ANGLE_DEBUG_READ_help, + ) + def _build_config(self): cmdqueue = self.spi.get_command_queue() self.spi_angle_transfer_cmd = self.mcu.lookup_query_command( "spi_angle_transfer oid=%c data=%*s", "spi_angle_transfer_response oid=%c clock=%u response=%*s", - oid=self.oid, cq=cmdqueue) + oid=self.oid, + cq=cmdqueue, + ) + def _send_spi(self, msg): return self.spi.spi_transfer(msg) + def get_static_delay(self): - return .000001 + return 0.000001 + def _read_reg(self, reg): msg = [reg, 0, 0] params = self._send_spi(msg) - resp = bytearray(params['response']) - val = (resp[1] << 8) | resp[2] + resp = bytearray(params["response"]) + val = (resp[1] << 8) | resp[2] return val + def start(self): pass + cmd_ANGLE_DEBUG_READ_help = "Query low-level angle sensor register" + def cmd_ANGLE_DEBUG_READ(self, gcmd): reg = 0x83 val = self._read_reg(reg) @@ -457,12 +534,14 @@ class HelperMT6816: gcmd.respond_info("No Mag: %i" % (val >> 1 & 0x1)) gcmd.respond_info("Parity: %i == %i" % (parity, val & 0x1)) + class HelperMT6826S: SPI_MODE = 3 SPI_SPEED = 10000000 + def __init__(self, config, spi, oid): self.printer = config.get_printer() - self.stepper_name = config.get('stepper', None) + self.stepper_name = config.get("stepper", None) self.spi = spi self.oid = oid self.mcu = spi.get_mcu() @@ -472,39 +551,55 @@ class HelperMT6826S: self.last_temperature = None name = config.get_name().split()[-1] gcode = self.printer.lookup_object("gcode") - gcode.register_mux_command("ANGLE_DEBUG_READ", "CHIP", name, - self.cmd_ANGLE_DEBUG_READ, - desc=self.cmd_ANGLE_DEBUG_READ_help) - gcode.register_mux_command("ANGLE_CHIP_CALIBRATE", "CHIP", name, - self.cmd_ANGLE_CHIP_CALIBRATE, - desc=self.cmd_ANGLE_CHIP_CALIBRATE_help) + gcode.register_mux_command( + "ANGLE_DEBUG_READ", + "CHIP", + name, + self.cmd_ANGLE_DEBUG_READ, + desc=self.cmd_ANGLE_DEBUG_READ_help, + ) + gcode.register_mux_command( + "ANGLE_CHIP_CALIBRATE", + "CHIP", + name, + self.cmd_ANGLE_CHIP_CALIBRATE, + desc=self.cmd_ANGLE_CHIP_CALIBRATE_help, + ) self.status_map = { 0: "No Calibration", 1: "Running Calibration", 2: "Calibration Failed", - 3: "Calibration Successful" + 3: "Calibration Successful", } + def _build_config(self): cmdqueue = self.spi.get_command_queue() self.spi_angle_transfer_cmd = self.mcu.lookup_query_command( "spi_angle_transfer oid=%c data=%*s", "spi_angle_transfer_response oid=%c clock=%u response=%*s", - oid=self.oid, cq=cmdqueue) + oid=self.oid, + cq=cmdqueue, + ) + def _send_spi(self, msg): params = self.spi.spi_transfer(msg) return params + def get_static_delay(self): - return .00001 + return 0.00001 + def _read_reg(self, reg): reg = 0x3000 | reg - msg = [reg >> 8, reg & 0xff, 0] + msg = [reg >> 8, reg & 0xFF, 0] params = self._send_spi(msg) - resp = bytearray(params['response']) + resp = bytearray(params["response"]) return resp[2] + def _write_reg(self, reg, data): reg = 0x6000 | reg - msg = [reg >> 8, reg & 0xff, data] + msg = [reg >> 8, reg & 0xFF, data] self._send_spi(msg) + def crc8(self, data): polynomial = 0x07 crc = 0x00 @@ -517,39 +612,45 @@ class HelperMT6826S: crc <<= 1 crc &= 0xFF return crc + def _read_angle(self, reg): reg = 0x3000 | reg - msg = [reg >> 8, reg & 0xff, 0, 0, 0, 0] + msg = [reg >> 8, reg & 0xFF, 0, 0, 0, 0] params = self._send_spi(msg) - resp = bytearray(params['response']) + resp = bytearray(params["response"]) angle = (resp[2] << 7) | (resp[3] >> 1) status = resp[4] crc_computed = self.crc8([resp[2], resp[3], resp[4]]) crc = resp[5] return angle, status, crc, crc_computed + def start(self): - val = self._read_reg(0x00d) + val = self._read_reg(0x00D) # Set histeresis to 0.003 degree - self._write_reg(0x00d, (val & 0xf8) | 0x5) + self._write_reg(0x00D, (val & 0xF8) | 0x5) + def get_microsteps(self): - configfile = self.printer.lookup_object('configfile') - sconfig = configfile.get_status(None)['settings'] + configfile = self.printer.lookup_object("configfile") + sconfig = configfile.get_status(None)["settings"] stconfig = sconfig.get(self.stepper_name, {}) - microsteps = stconfig['microsteps'] - full_steps = stconfig['full_steps_per_rotation'] + microsteps = stconfig["microsteps"] + full_steps = stconfig["full_steps_per_rotation"] return microsteps, full_steps + cmd_ANGLE_CHIP_CALIBRATE_help = "Run MT6826s calibration sequence" + def cmd_ANGLE_CHIP_CALIBRATE(self, gcmd): - fmove = self.printer.lookup_object('force_move') + fmove = self.printer.lookup_object("force_move") mcu_stepper = fmove.lookup_stepper(self.stepper_name) if self.stepper_name is None: gcmd.respond_info("stepper not defined") return gcmd.respond_info("MT6826S Run calibration sequence") - gcmd.respond_info("Motor will do 18+ rotations -" + - " ensure pulley is disconnected") - req_freq = self._read_reg(0x00e) >> 4 & 0x7 + gcmd.respond_info( + "Motor will do 18+ rotations -" + " ensure pulley is disconnected" + ) + req_freq = self._read_reg(0x00E) >> 4 & 0x7 # Minimal calibration speed rpm = (3200 >> req_freq) + 1 rps = rpm / 60 @@ -560,7 +661,7 @@ class HelperMT6826S: full_step_dist = step_dist * microsteps rotation_dist = full_steps * full_step_dist move(mcu_stepper, 2 * rotation_dist, rps * rotation_dist) - self._write_reg(0x155, 0x5e) + self._write_reg(0x155, 0x5E) move(mcu_stepper, 20 * rotation_dist, rps * rotation_dist) val = self._read_reg(0x113) code = val >> 6 @@ -574,22 +675,20 @@ class HelperMT6826S: gcmd.respond_info("Calibration failed") if code == 3: gcmd.respond_info("Calibration success, please poweroff sensor") + cmd_ANGLE_DEBUG_READ_help = "Query low-level angle sensor register" + def cmd_ANGLE_DEBUG_READ(self, gcmd): - reg = gcmd.get("REG", minval=0, maxval=0x155, - parser=lambda x: int(x, 0)) + reg = gcmd.get("REG", minval=0, maxval=0x155, parser=lambda x: int(x, 0)) if reg == 0x003: angle, status, crc1, crc2 = self._read_angle(reg) - gcmd.respond_info("ANGLE REG[0x003] = 0x%02x" % - (angle >> 7)) - gcmd.respond_info("ANGLE REG[0x004] = 0x%02x" % - ((angle << 1) & 0xff)) - gcmd.respond_info("Angle %i ~ %.2f" % (angle, - angle * 360 / (1 << 15))) + gcmd.respond_info("ANGLE REG[0x003] = 0x%02x" % (angle >> 7)) + gcmd.respond_info("ANGLE REG[0x004] = 0x%02x" % ((angle << 1) & 0xFF)) + gcmd.respond_info("Angle %i ~ %.2f" % (angle, angle * 360 / (1 << 15))) gcmd.respond_info("Weak Mag: %i" % (status >> 1 & 0x1)) gcmd.respond_info("Under Voltage: %i" % (status >> 2 & 0x1)) gcmd.respond_info("CRC: 0x%02x == 0x%02x" % (crc1, crc2)) - elif reg == 0x00e: + elif reg == 0x00E: val = self._read_reg(reg) gcmd.respond_info("GPIO_DS = %i" % (val >> 7)) gcmd.respond_info("AUTOCAL_FREQ = %i" % (val >> 4 & 0x7)) @@ -607,25 +706,28 @@ SAMPLES_PER_BLOCK = bulk_sensor.MAX_BULK_MSG_SIZE // BYTES_PER_SAMPLE SAMPLE_PERIOD = 0.000400 BATCH_UPDATES = 0.100 + class Angle: def __init__(self, config): self.printer = config.get_printer() - self.sample_period = config.getfloat('sample_period', SAMPLE_PERIOD, - above=0.) + self.sample_period = config.getfloat("sample_period", SAMPLE_PERIOD, above=0.0) self.calibration = AngleCalibration(config) # Measurement conversion self.start_clock = self.time_shift = self.sample_ticks = 0 self.last_sequence = self.last_angle = 0 # Sensor type - sensors = { "a1333": HelperA1333, - "as5047d": HelperAS5047D, - "tle5012b": HelperTLE5012B, - "mt6816": HelperMT6816, - "mt6826s": HelperMT6826S } - sensor_type = config.getchoice('sensor_type', {s: s for s in sensors}) + sensors = { + "a1333": HelperA1333, + "as5047d": HelperAS5047D, + "tle5012b": HelperTLE5012B, + "mt6816": HelperMT6816, + "mt6826s": HelperMT6826S, + } + sensor_type = config.getchoice("sensor_type", {s: s for s in sensors}) sensor_class = sensors[sensor_type] - self.spi = bus.MCU_SPI_from_config(config, sensor_class.SPI_MODE, - default_speed=sensor_class.SPI_SPEED) + self.spi = bus.MCU_SPI_from_config( + config, sensor_class.SPI_MODE, default_speed=sensor_class.SPI_SPEED + ) self.mcu = mcu = self.spi.get_mcu() self.oid = oid = mcu.create_oid() self.sensor_helper = sensor_class(config, self.spi, oid) @@ -633,32 +735,43 @@ class Angle: self.query_spi_angle_cmd = None mcu.add_config_cmd( "config_spi_angle oid=%d spi_oid=%d spi_angle_type=%s" - % (oid, self.spi.get_oid(), sensor_type)) + % (oid, self.spi.get_oid(), sensor_type) + ) mcu.add_config_cmd( - "query_spi_angle oid=%d clock=0 rest_ticks=0 time_shift=0" - % (oid,), on_restart=True) + "query_spi_angle oid=%d clock=0 rest_ticks=0 time_shift=0" % (oid,), + on_restart=True, + ) mcu.register_config_callback(self._build_config) self.bulk_queue = bulk_sensor.BulkDataQueue(mcu, oid=oid) # Process messages in batches self.batch_bulk = bulk_sensor.BatchBulkHelper( - self.printer, self._process_batch, - self._start_measurements, self._finish_measurements, BATCH_UPDATES) + self.printer, + self._process_batch, + self._start_measurements, + self._finish_measurements, + BATCH_UPDATES, + ) self.name = config.get_name().split()[1] - api_resp = {'header': ('time', 'angle')} - self.batch_bulk.add_mux_endpoint("angle/dump_angle", - "sensor", self.name, api_resp) + api_resp = {"header": ("time", "angle")} + self.batch_bulk.add_mux_endpoint( + "angle/dump_angle", "sensor", self.name, api_resp + ) + def _build_config(self): - freq = self.mcu.seconds_to_clock(1.) + freq = self.mcu.seconds_to_clock(1.0) while float(TCODE_ERROR << self.time_shift) / freq < 0.002: self.time_shift += 1 cmdqueue = self.spi.get_command_queue() self.query_spi_angle_cmd = self.mcu.lookup_command( - "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c", - cq=cmdqueue) + "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c", cq=cmdqueue + ) + def get_status(self, eventtime=None): - return {'temperature': self.sensor_helper.last_temperature} + return {"temperature": self.sensor_helper.last_temperature} + def add_client(self, client_cb): self.batch_bulk.add_client(client_cb) + # Measurement decoding def _extract_samples(self, raw_samples): # Load variables to optimize inner loop below @@ -668,13 +781,13 @@ class Angle: last_sequence = self.last_sequence last_angle = self.last_angle time_shift = 0 - static_delay = 0. - last_chip_mcu_clock = last_chip_clock = chip_freq = inv_chip_freq = 0. + static_delay = 0.0 + last_chip_mcu_clock = last_chip_clock = chip_freq = inv_chip_freq = 0.0 is_tcode_absolute = self.sensor_helper.is_tcode_absolute if is_tcode_absolute: tparams = self.sensor_helper.get_tcode_params() last_chip_mcu_clock, last_chip_clock, chip_freq = tparams - inv_chip_freq = 1. / chip_freq + inv_chip_freq = 1.0 / chip_freq else: time_shift = self.time_shift static_delay = self.sensor_helper.get_static_delay() @@ -682,32 +795,32 @@ class Angle: count = error_count = 0 samples = [None] * (len(raw_samples) * SAMPLES_PER_BLOCK) for params in raw_samples: - seq_diff = (params['sequence'] - last_sequence) & 0xffff + seq_diff = (params["sequence"] - last_sequence) & 0xFFFF last_sequence += seq_diff samp_count = last_sequence * SAMPLES_PER_BLOCK - msg_mclock = start_clock + samp_count*sample_ticks - d = bytearray(params['data']) + msg_mclock = start_clock + samp_count * sample_ticks + d = bytearray(params["data"]) for i in range(len(d) // BYTES_PER_SAMPLE): - d_ta = d[i*BYTES_PER_SAMPLE:(i+1)*BYTES_PER_SAMPLE] + d_ta = d[i * BYTES_PER_SAMPLE : (i + 1) * BYTES_PER_SAMPLE] tcode = d_ta[0] if tcode == TCODE_ERROR: error_count += 1 continue raw_angle = d_ta[1] | (d_ta[2] << 8) - angle_diff = (raw_angle - last_angle) & 0xffff + angle_diff = (raw_angle - last_angle) & 0xFFFF angle_diff -= (angle_diff & 0x8000) << 1 last_angle += angle_diff - mclock = msg_mclock + i*sample_ticks + mclock = msg_mclock + i * sample_ticks if is_tcode_absolute: # tcode is tle5012b frame counter mdiff = mclock - last_chip_mcu_clock - chip_mclock = last_chip_clock + int(mdiff * chip_freq + .5) - cdiff = ((tcode << 10) - chip_mclock) & 0xffff + chip_mclock = last_chip_clock + int(mdiff * chip_freq + 0.5) + cdiff = ((tcode << 10) - chip_mclock) & 0xFFFF cdiff -= (cdiff & 0x8000) << 1 sclock = mclock + (cdiff - 0x800) * inv_chip_freq else: # tcode is mcu clock offset shifted by time_shift - sclock = mclock + (tcode<<time_shift) + sclock = mclock + (tcode << time_shift) ptime = round(clock_to_print_time(sclock) - static_delay, 6) samples[count] = (ptime, last_angle) count += 1 @@ -715,9 +828,11 @@ class Angle: self.last_angle = last_angle del samples[count:] return samples, error_count + # Start, stop, and process message batches def _is_measuring(self): return self.start_clock != 0 + def _start_measurements(self): logging.info("Starting angle '%s' measurements", self.name) self.sensor_helper.start() @@ -729,14 +844,17 @@ class Angle: self.start_clock = reqclock = self.mcu.print_time_to_clock(print_time) rest_ticks = self.mcu.seconds_to_clock(self.sample_period) self.sample_ticks = rest_ticks - self.query_spi_angle_cmd.send([self.oid, reqclock, rest_ticks, - self.time_shift], reqclock=reqclock) + self.query_spi_angle_cmd.send( + [self.oid, reqclock, rest_ticks, self.time_shift], reqclock=reqclock + ) + def _finish_measurements(self): # Halt bulk reading self.query_spi_angle_cmd.send_wait_ack([self.oid, 0, 0, 0]) self.bulk_queue.clear_queue() self.sensor_helper.last_temperature = None logging.info("Stopped angle '%s' measurements", self.name) + def _process_batch(self, eventtime): if self.sensor_helper.is_tcode_absolute: self.sensor_helper.update_clock() @@ -747,8 +865,8 @@ class Angle: if not samples: return {} offset = self.calibration.apply_calibration(samples) - return {'data': samples, 'errors': error_count, - 'position_offset': offset} + return {"data": samples, "errors": error_count, "position_offset": offset} + def load_config_prefix(config): return Angle(config) |