diff options
Diffstat (limited to 'klippy/extras/load_cell_probe.py')
-rw-r--r-- | klippy/extras/load_cell_probe.py | 394 |
1 files changed, 255 insertions, 139 deletions
diff --git a/klippy/extras/load_cell_probe.py b/klippy/extras/load_cell_probe.py index db2f2a65..de54792f 100644 --- a/klippy/extras/load_cell_probe.py +++ b/klippy/extras/load_cell_probe.py @@ -11,9 +11,9 @@ np = None # delay NumPy import until configuration time # constants for fixed point numbers Q2_INT_BITS = 2 -Q2_FRAC_BITS = (32 - (1 + Q2_INT_BITS)) +Q2_FRAC_BITS = 32 - (1 + Q2_INT_BITS) Q16_INT_BITS = 16 -Q16_FRAC_BITS = (32 - (1 + Q16_INT_BITS)) +Q16_FRAC_BITS = 32 - (1 + Q16_INT_BITS) class TapAnalysis: @@ -25,16 +25,27 @@ class TapAnalysis: # convert to dictionary for JSON encoder def to_dict(self): return { - 'time': self.time.tolist(), 'force': self.force.tolist(), - 'is_valid': True, + "time": self.time.tolist(), + "force": self.force.tolist(), + "is_valid": True, } # Access a parameter from config or GCode command via a consistent interface # stores name and constraints to keep things DRY class ParamHelper: - def __init__(self, config, name, type_name, default=None, minval=None, - maxval=None, above=None, below=None, max_len=None): + def __init__( + self, + config, + name, + type_name, + default=None, + minval=None, + maxval=None, + above=None, + below=None, + max_len=None, + ): self._config_section = config.get_name() self._config_error = config.error self.name = name @@ -62,87 +73,120 @@ class ParamHelper: # support for validating individual options in a list of floats def _validate_float_list(self, gcmd, values, above, below): if gcmd: - description = ("Error on '%s': %s" % ( - gcmd.get_commandline(), self._get_name(gcmd))) + description = "Error on '%s': %s" % ( + gcmd.get_commandline(), + self._get_name(gcmd), + ) error = gcmd.error else: - description = ("Option '%s' in section '%s'" % ( - self._get_name(gcmd), self._config_section)) + description = "Option '%s' in section '%s'" % ( + self._get_name(gcmd), + self._config_section, + ) error = self._config_error if self.max_len is not None and len(values) > self.max_len: - raise error( - "%s has maximum length %s" % (description, self.max_len)) + raise error("%s has maximum length %s" % (description, self.max_len)) for value in values: self._validate_float(description, error, value, above, below) def _get_int(self, config, gcmd, minval, maxval): get = gcmd.get_int if gcmd else config.getint - return get(self._get_name(gcmd), self.value, minval or self.minval, - maxval or self.maxval) + return get( + self._get_name(gcmd), + self.value, + minval or self.minval, + maxval or self.maxval, + ) def _get_float(self, config, gcmd, minval, maxval, above, below): get = gcmd.get_float if gcmd else config.getfloat - return get(self._get_name(gcmd), self.value, minval or self.minval, - maxval or self.maxval, above or self.above, below or self.below) + return get( + self._get_name(gcmd), + self.value, + minval or self.minval, + maxval or self.maxval, + above or self.above, + below or self.below, + ) def _get_float_list(self, config, gcmd, above, below): # this code defaults to the empty list, never return None - default = (self.value or []) + default = self.value or [] if gcmd: # if the parameter isn't part of the command, return the default if not self._get_name(gcmd) in gcmd.get_command_parameters(): return default # parameter exists, always prefer whatever is in the command - value = gcmd.get(self._get_name(gcmd), default='') + value = gcmd.get(self._get_name(gcmd), default="") # Return an empty list for empty value if len(value.strip()) == 0: return [] try: - float_list = [float(p.strip()) for p in value.split(',')] + float_list = [float(p.strip()) for p in value.split(",")] except: - raise gcmd.error("Error on '%s': unable to parse %s" % ( - gcmd.get_commandline(), value)) + raise gcmd.error( + "Error on '%s': unable to parse %s" + % (gcmd.get_commandline(), value) + ) else: - float_list = config.getfloatlist(self._get_name(gcmd), - default=default) + float_list = config.getfloatlist(self._get_name(gcmd), default=default) if float_list: self._validate_float_list(gcmd, float_list, above, below) return float_list - def get(self, gcmd=None, minval=None, maxval=None, above=None, below=None, - config=None): + def get( + self, gcmd=None, minval=None, maxval=None, above=None, below=None, config=None + ): if config is None and gcmd is None: return self.value - if self._type_name == 'int': + if self._type_name == "int": return self._get_int(config, gcmd, minval, maxval) - elif self._type_name == 'float': + elif self._type_name == "float": return self._get_float(config, gcmd, minval, maxval, above, below) else: return self._get_float_list(config, gcmd, above, below) def intParamHelper(config, name, default=None, minval=None, maxval=None): - return ParamHelper(config, name, 'int', default, minval=minval, - maxval=maxval) + return ParamHelper(config, name, "int", default, minval=minval, maxval=maxval) -def floatParamHelper(config, name, default=None, minval=None, maxval=None, - above=None, below=None): - return ParamHelper(config, name, 'float', default, minval=minval, - maxval=maxval, above=above, below=below) +def floatParamHelper( + config, name, default=None, minval=None, maxval=None, above=None, below=None +): + return ParamHelper( + config, + name, + "float", + default, + minval=minval, + maxval=maxval, + above=above, + below=below, + ) -def floatListParamHelper(config, name, default=None, above=None, below=None, - max_len=None): - return ParamHelper(config, name, 'float_list', default, above=above, - below=below, max_len=max_len) +def floatListParamHelper( + config, name, default=None, above=None, below=None, max_len=None +): + return ParamHelper( + config, name, "float_list", default, above=above, below=below, max_len=max_len + ) # container for filter parameters # allows different filter configurations to be compared class ContinuousTareFilter: - def __init__(self, sps=None, drift=None, drift_delay=None, buzz=None, - buzz_delay=None, notches=None, notch_quality=None): + def __init__( + self, + sps=None, + drift=None, + drift_delay=None, + buzz=None, + buzz_delay=None, + notches=None, + notch_quality=None, + ): self.sps = sps self.drift = drift self.drift_delay = drift_delay @@ -155,20 +199,33 @@ class ContinuousTareFilter: if not isinstance(other, ContinuousTareFilter): return False return ( - self.sps == other.sps and self.drift == other.drift and - self.drift_delay == other.drift_delay and self.buzz == - other.buzz and self.buzz_delay == other.buzz_delay and - self.notches == other.notches and self.notch_quality == - other.notch_quality) + self.sps == other.sps + and self.drift == other.drift + and self.drift_delay == other.drift_delay + and self.buzz == other.buzz + and self.buzz_delay == other.buzz_delay + and self.notches == other.notches + and self.notch_quality == other.notch_quality + ) # create a filter design from the parameters def design_filter(self, error_func): - design = sos_filter.DigitalFilter(self.sps, error_func, self.drift, - self.drift_delay, self.buzz, self.buzz_delay, self.notches, - self.notch_quality) + design = sos_filter.DigitalFilter( + self.sps, + error_func, + self.drift, + self.drift_delay, + self.buzz, + self.buzz_delay, + self.notches, + self.notch_quality, + ) fixed_filter = sos_filter.FixedPointSosFilter( - design.get_filter_sections(), design.get_initial_state(), - Q2_INT_BITS, Q16_INT_BITS) + design.get_filter_sections(), + design.get_initial_state(), + Q2_INT_BITS, + Q16_INT_BITS, + ) return fixed_filter @@ -177,31 +234,47 @@ class ContinuousTareFilterHelper: def __init__(self, config, sensor, cmd_queue): self._sensor = sensor self._sps = self._sensor.get_samples_per_second() - max_filter_frequency = math.floor(self._sps / 2.) + max_filter_frequency = math.floor(self._sps / 2.0) # setup filter parameters - self._drift_param = floatParamHelper(config, - "drift_filter_cutoff_frequency", default=None, minval=0.1, - maxval=20.0) - self._drift_delay_param = intParamHelper(config, "drift_filter_delay", - default=2, minval=1, maxval=2) - self._buzz_param = floatParamHelper(config, - "buzz_filter_cutoff_frequency", default=None, + self._drift_param = floatParamHelper( + config, + "drift_filter_cutoff_frequency", + default=None, + minval=0.1, + maxval=20.0, + ) + self._drift_delay_param = intParamHelper( + config, "drift_filter_delay", default=2, minval=1, maxval=2 + ) + self._buzz_param = floatParamHelper( + config, + "buzz_filter_cutoff_frequency", + default=None, above=min(80.0, max_filter_frequency - 1.0), - below=max_filter_frequency) - self._buzz_delay_param = intParamHelper(config, "buzz_filter_delay", - default=2, minval=1, maxval=2) - self._notches_param = floatListParamHelper(config, - "notch_filter_frequencies", default=[], above=0., - below=max_filter_frequency, max_len=2) - self._notch_quality_param = floatParamHelper(config, - "notch_filter_quality", default=2.0, minval=0.5, maxval=6.0) + below=max_filter_frequency, + ) + self._buzz_delay_param = intParamHelper( + config, "buzz_filter_delay", default=2, minval=1, maxval=2 + ) + self._notches_param = floatListParamHelper( + config, + "notch_filter_frequencies", + default=[], + above=0.0, + below=max_filter_frequency, + max_len=2, + ) + self._notch_quality_param = floatParamHelper( + config, "notch_filter_quality", default=2.0, minval=0.5, maxval=6.0 + ) # filter design specified in the config file, used for defaults self._config_design = ContinuousTareFilter() # empty filter self._config_design = self._build_filter() # filter design currently inside the MCU self._active_design = self._config_design self._sos_filter = self._create_filter( - self._active_design.design_filter(config.error), cmd_queue) + self._active_design.design_filter(config.error), cmd_queue + ) def _build_filter(self, gcmd=None): drift = self._drift_param.get(gcmd) @@ -211,12 +284,12 @@ class ContinuousTareFilterHelper: # notches must be between drift and buzz: notches = self._notches_param.get(gcmd, above=drift, below=buzz) notch_quality = self._notch_quality_param.get(gcmd) - return ContinuousTareFilter(self._sps, drift, drift_delay, buzz, - buzz_delay, notches, notch_quality) + return ContinuousTareFilter( + self._sps, drift, drift_delay, buzz, buzz_delay, notches, notch_quality + ) def _create_filter(self, fixed_filter, cmd_queue): - return sos_filter.SosFilter(self._sensor.get_mcu(), cmd_queue, - fixed_filter, 4) + return sos_filter.SosFilter(self._sensor.get_mcu(), cmd_queue, fixed_filter, 4) def update_from_command(self, gcmd, cq=None): gcmd_filter = self._build_filter(gcmd) @@ -224,8 +297,7 @@ class ContinuousTareFilterHelper: if self._active_design == gcmd_filter: return # update MCU filter from GCode command - self._sos_filter.change_filter( - self._active_design.design_filter(gcmd.error)) + self._sos_filter.change_filter(self._active_design.design_filter(gcmd.error)) def get_sos_filter(self): return self._sos_filter @@ -235,9 +307,10 @@ class ContinuousTareFilterHelper: def check_sensor_errors(results, printer): samples, errors = results if errors: - raise printer.command_error("Load cell sensor reported errors while" - " probing: %i errors, %i overflows" % ( - errors[0], errors[1])) + raise printer.command_error( + "Load cell sensor reported errors while" + " probing: %i errors, %i overflows" % (errors[0], errors[1]) + ) return samples @@ -246,15 +319,18 @@ class LoadCellProbeConfigHelper: self._printer = config.get_printer() self._load_cell = load_cell_inst self._sensor = load_cell_inst.get_sensor() - self._rest_time = 1. / float(self._sensor.get_samples_per_second()) + self._rest_time = 1.0 / float(self._sensor.get_samples_per_second()) # Collect 4 x 60hz power cycles of data to average across power noise - self._tare_time_param = floatParamHelper(config, 'tare_time', - default=4. / 60., minval=0.01, maxval=1.0) + self._tare_time_param = floatParamHelper( + config, "tare_time", default=4.0 / 60.0, minval=0.01, maxval=1.0 + ) # triggering options - self._trigger_force_param = intParamHelper(config, 'trigger_force', - default=75, minval=10, maxval=250) - self._force_safety_limit_param = intParamHelper(config, - 'force_safety_limit', minval=100, maxval=5000, default=2000) + self._trigger_force_param = intParamHelper( + config, "trigger_force", default=75, minval=10, maxval=250 + ) + self._force_safety_limit_param = intParamHelper( + config, "force_safety_limit", minval=100, maxval=5000, default=2000 + ) def get_tare_samples(self, gcmd=None): tare_time = self._tare_time_param.get(gcmd) @@ -292,7 +368,7 @@ class LoadCellProbeConfigHelper: # few grams which seems very unlikely. Treat this as an error: if counts_per_gram >= 2**Q2_FRAC_BITS: raise OverflowError("counts_per_gram value is too large to filter") - return sos_filter.to_fixed_32((1. / counts_per_gram), Q2_INT_BITS) + return sos_filter.to_fixed_32((1.0 / counts_per_gram), Q2_INT_BITS) # McuLoadCellProbe is the interface to `load_cell_probe` on the MCU @@ -303,8 +379,9 @@ class McuLoadCellProbe: ERROR_OVERFLOW = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 2 ERROR_WATCHDOG = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 3 - def __init__(self, config, load_cell_inst, sos_filter_inst, config_helper, - trigger_dispatch): + def __init__( + self, config, load_cell_inst, sos_filter_inst, config_helper, trigger_dispatch + ): self._printer = config.get_printer() self._load_cell = load_cell_inst self._sos_filter = sos_filter_inst @@ -325,23 +402,29 @@ class McuLoadCellProbe: def _config_commands(self): self._sos_filter.create_filter() self._mcu.add_config_cmd( - "config_load_cell_probe oid=%d sos_filter_oid=%d" % ( - self._oid, self._sos_filter.get_oid())) + "config_load_cell_probe oid=%d sos_filter_oid=%d" + % (self._oid, self._sos_filter.get_oid()) + ) def _build_config(self): # Lookup commands self._query_cmd = self._mcu.lookup_query_command( "load_cell_probe_query_state oid=%c", - "load_cell_probe_state oid=%c is_homing_trigger=%c " - "trigger_ticks=%u", oid=self._oid, cq=self._cmd_queue) + "load_cell_probe_state oid=%c is_homing_trigger=%c " "trigger_ticks=%u", + oid=self._oid, + cq=self._cmd_queue, + ) self._set_range_cmd = self._mcu.lookup_command( "load_cell_probe_set_range" " oid=%c safety_counts_min=%i safety_counts_max=%i tare_counts=%i" - " trigger_grams=%u grams_per_count=%i", cq=self._cmd_queue) + " trigger_grams=%u grams_per_count=%i", + cq=self._cmd_queue, + ) self._home_cmd = self._mcu.lookup_command( "load_cell_probe_home oid=%c trsync_oid=%c trigger_reason=%c" " error_reason=%c clock=%u rest_ticks=%u timeout=%u", - cq=self._cmd_queue) + cq=self._cmd_queue, + ) # the sensor data stream is connected on the MCU at the ready event def _on_connect(self): @@ -364,9 +447,14 @@ class McuLoadCellProbe: self._load_cell.tare(tare_counts) # update internal tare value safety_min, safety_max = self._config_helper.get_safety_range(gcmd) - args = [self._oid, safety_min, safety_max, int(tare_counts), + args = [ + self._oid, + safety_min, + safety_max, + int(tare_counts), self._config_helper.get_trigger_force_grams(gcmd), - self._config_helper.get_grams_per_count()] + self._config_helper.get_grams_per_count(), + ] self._set_range_cmd.send(args) self._sos_filter.reset_filter() @@ -374,14 +462,23 @@ class McuLoadCellProbe: clock = self._mcu.print_time_to_clock(print_time) rest_time = self._config_helper.get_rest_time() rest_ticks = self._mcu.seconds_to_clock(rest_time) - self._home_cmd.send([self._oid, self._dispatch.get_oid(), - mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.ERROR_SAFETY_RANGE, clock, - rest_ticks, self.WATCHDOG_MAX], reqclock=clock) + self._home_cmd.send( + [ + self._oid, + self._dispatch.get_oid(), + mcu.MCU_trsync.REASON_ENDSTOP_HIT, + self.ERROR_SAFETY_RANGE, + clock, + rest_ticks, + self.WATCHDOG_MAX, + ], + reqclock=clock, + ) def clear_home(self): params = self._query_cmd.send([self._oid]) # The time of the first sample that triggered is in "trigger_ticks" - trigger_ticks = self._mcu.clock32_to_clock64(params['trigger_ticks']) + trigger_ticks = self._mcu.clock32_to_clock64(params["trigger_ticks"]) # clear trsync from load_cell_endstop self._home_cmd.send([self._oid, 0, 0, 0, 0, 0, 0, 0]) return self._mcu.clock_to_print_time(trigger_ticks) @@ -390,18 +487,23 @@ class McuLoadCellProbe: # Execute probing moves using the McuLoadCellProbe class LoadCellProbingMove: ERROR_MAP = { - mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during " - "homing", + mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during " "homing", McuLoadCellProbe.ERROR_SAFETY_RANGE: "Load Cell Probe Error: load " - "exceeds safety limit", + "exceeds safety limit", McuLoadCellProbe.ERROR_OVERFLOW: "Load Cell Probe Error: fixed point " - "math overflow", + "math overflow", McuLoadCellProbe.ERROR_WATCHDOG: "Load Cell Probe Error: timed out " - "waiting for sensor data" + "waiting for sensor data", } - def __init__(self, config, mcu_load_cell_probe, param_helper, - continuous_tare_filter_helper, config_helper): + def __init__( + self, + config, + mcu_load_cell_probe, + param_helper, + continuous_tare_filter_helper, + config_helper, + ): self._printer = config.get_printer() self._mcu_load_cell_probe = mcu_load_cell_probe self._param_helper = param_helper @@ -417,7 +519,7 @@ class LoadCellProbingMove: self._last_trigger_time = 0 def _start_collector(self): - toolhead = self._printer.lookup_object('toolhead') + toolhead = self._printer.lookup_object("toolhead") # homing uses the toolhead last move time which gets special handling # to significantly buffer print_time if the move queue has drained print_time = toolhead.get_last_move_time() @@ -444,8 +546,9 @@ class LoadCellProbingMove: self._mcu_load_cell_probe.home_start(print_time) return trigger_completion - def home_start(self, print_time, sample_time, sample_count, rest_time, - triggered=True): + def home_start( + self, print_time, sample_time, sample_count, rest_time, triggered=True + ): return self._home_start(print_time) def home_wait(self, home_end_time): @@ -460,7 +563,7 @@ class LoadCellProbingMove: error = self.ERROR_MAP[res] raise self._printer.command_error(error) if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT: - return 0. + return 0.0 return self._last_trigger_time def get_steppers(self): @@ -474,11 +577,11 @@ class LoadCellProbingMove: # tare the sensor just before probing self._pause_and_tare(gcmd) # get params for the homing move - toolhead = self._printer.lookup_object('toolhead') + toolhead = self._printer.lookup_object("toolhead") pos = toolhead.get_position() pos[2] = self._z_min_position - speed = self._param_helper.get_probe_params(gcmd)['probe_speed'] - phoming = self._printer.lookup_object('homing') + speed = self._param_helper.get_probe_params(gcmd)["probe_speed"] + phoming = self._printer.lookup_object("homing") # start collector after tare samples are consumed collector = self._start_collector() # do homing move @@ -487,15 +590,15 @@ class LoadCellProbingMove: # Wait for the MCU to trigger with no movement def probing_test(self, gcmd, timeout): self._pause_and_tare(gcmd) - toolhead = self._printer.lookup_object('toolhead') + toolhead = self._printer.lookup_object("toolhead") print_time = toolhead.get_last_move_time() self._home_start(print_time) return self.home_wait(print_time + timeout) def get_status(self, eventtime): return { - 'tare_counts': self._tare_counts, - 'last_trigger_time': self._last_trigger_time, + "tare_counts": self._tare_counts, + "last_trigger_time": self._last_trigger_time, } @@ -512,15 +615,16 @@ class TappingMove: self._clients = load_cell.ApiClientHelper(config.get_printer()) name = config.get_name() header = {"header": ["probe_tap_event"]} - self._clients.add_mux_endpoint("load_cell_probe/dump_taps", - "load_cell_probe", name, header) + self._clients.add_mux_endpoint( + "load_cell_probe/dump_taps", "load_cell_probe", name, header + ) # perform a probing move and a pullback move def run_tap(self, gcmd): # do the descending move epos, collector = self._load_cell_probing_move.probing_move(gcmd) # collect samples from the tap - toolhead = self._printer.lookup_object('toolhead') + toolhead = self._printer.lookup_object("toolhead") toolhead.flush_step_generation() move_end = toolhead.get_last_move_time() results = collector.collect_until(move_end) @@ -528,15 +632,15 @@ class TappingMove: # Analyze the tap data ppa = TapAnalysis(samples) # broadcast tap event data: - self._clients.send({'tap': ppa.to_dict()}) + self._clients.send({"tap": ppa.to_dict()}) self._is_last_result_valid = True self._last_result = epos[2] return epos, self._is_last_result_valid def get_status(self, eventtime): return { - 'last_z_result': self._last_result, - 'is_last_tap_valid': self._is_last_result_valid + "last_z_result": self._last_result, + "is_last_tap_valid": self._is_last_result_valid, } @@ -574,20 +678,23 @@ class LoadCellProbeCommands: def _register_commands(self): # Register commands - gcode = self._printer.lookup_object('gcode') - gcode.register_command("LOAD_CELL_TEST_TAP", - self.cmd_LOAD_CELL_TEST_TAP, desc=self.cmd_LOAD_CELL_TEST_TAP_help) + gcode = self._printer.lookup_object("gcode") + gcode.register_command( + "LOAD_CELL_TEST_TAP", + self.cmd_LOAD_CELL_TEST_TAP, + desc=self.cmd_LOAD_CELL_TEST_TAP_help, + ) cmd_LOAD_CELL_TEST_TAP_help = "Tap the load cell probe to verify operation" def cmd_LOAD_CELL_TEST_TAP(self, gcmd): taps = gcmd.get_int("TAPS", 3, minval=1, maxval=10) - timeout = gcmd.get_float("TIMEOUT", 30., minval=1., maxval=120.) + timeout = gcmd.get_float("TIMEOUT", 30.0, minval=1.0, maxval=120.0) gcmd.respond_info("Tap the load cell %s times:" % (taps,)) reactor = self._printer.get_reactor() for i in range(0, taps): result = self._load_cell_probing_move.probing_test(gcmd, timeout) - if result == 0.: + if result == 0.0: # notify of error, likely due to timeout raise gcmd.error("Test timeout out") gcmd.respond_info("Tap Detected!") @@ -609,34 +716,43 @@ class LoadCellPrinterProbe: sensors = {} sensors.update(hx71x.HX71X_SENSOR_TYPES) sensors.update(ads1220.ADS1220_SENSOR_TYPE) - sensor_class = config.getchoice('sensor_type', sensors) + sensor_class = config.getchoice("sensor_type", sensors) sensor = sensor_class(config) self._load_cell = load_cell.LoadCell(config, sensor) # Read all user configuration and build modules config_helper = LoadCellProbeConfigHelper(config, self._load_cell) self._mcu = self._load_cell.get_sensor().get_mcu() trigger_dispatch = mcu.TriggerDispatch(self._mcu) - continuous_tare_filter_helper = ContinuousTareFilterHelper(config, - sensor, trigger_dispatch.get_command_queue()) + continuous_tare_filter_helper = ContinuousTareFilterHelper( + config, sensor, trigger_dispatch.get_command_queue() + ) # Probe Interface self._param_helper = probe.ProbeParameterHelper(config) self._cmd_helper = probe.ProbeCommandHelper(config, self) self._probe_offsets = probe.ProbeOffsetsHelper(config) - self._mcu_load_cell_probe = McuLoadCellProbe(config, self._load_cell, - continuous_tare_filter_helper.get_sos_filter(), config_helper, - trigger_dispatch) - load_cell_probing_move = LoadCellProbingMove(config, - self._mcu_load_cell_probe, self._param_helper, - continuous_tare_filter_helper, config_helper) - self._tapping_move = TappingMove(config, load_cell_probing_move, - config_helper) + self._mcu_load_cell_probe = McuLoadCellProbe( + config, + self._load_cell, + continuous_tare_filter_helper.get_sos_filter(), + config_helper, + trigger_dispatch, + ) + load_cell_probing_move = LoadCellProbingMove( + config, + self._mcu_load_cell_probe, + self._param_helper, + continuous_tare_filter_helper, + config_helper, + ) + self._tapping_move = TappingMove(config, load_cell_probing_move, config_helper) tap_session = TapSession(config, self._tapping_move, self._param_helper) - self._probe_session = probe.ProbeSessionHelper(config, - self._param_helper, tap_session.start_probe_session) + self._probe_session = probe.ProbeSessionHelper( + config, self._param_helper, tap_session.start_probe_session + ) # printer integration LoadCellProbeCommands(config, load_cell_probing_move) probe.ProbeVirtualEndstopDeprecation(config) - self._printer.add_object('probe', self) + self._printer.add_object("probe", self) def get_probe_params(self, gcmd=None): return self._param_helper.get_probe_params(gcmd) |