aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/load_cell_probe.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/load_cell_probe.py')
-rw-r--r--klippy/extras/load_cell_probe.py394
1 files changed, 255 insertions, 139 deletions
diff --git a/klippy/extras/load_cell_probe.py b/klippy/extras/load_cell_probe.py
index db2f2a65..de54792f 100644
--- a/klippy/extras/load_cell_probe.py
+++ b/klippy/extras/load_cell_probe.py
@@ -11,9 +11,9 @@ np = None # delay NumPy import until configuration time
# constants for fixed point numbers
Q2_INT_BITS = 2
-Q2_FRAC_BITS = (32 - (1 + Q2_INT_BITS))
+Q2_FRAC_BITS = 32 - (1 + Q2_INT_BITS)
Q16_INT_BITS = 16
-Q16_FRAC_BITS = (32 - (1 + Q16_INT_BITS))
+Q16_FRAC_BITS = 32 - (1 + Q16_INT_BITS)
class TapAnalysis:
@@ -25,16 +25,27 @@ class TapAnalysis:
# convert to dictionary for JSON encoder
def to_dict(self):
return {
- 'time': self.time.tolist(), 'force': self.force.tolist(),
- 'is_valid': True,
+ "time": self.time.tolist(),
+ "force": self.force.tolist(),
+ "is_valid": True,
}
# Access a parameter from config or GCode command via a consistent interface
# stores name and constraints to keep things DRY
class ParamHelper:
- def __init__(self, config, name, type_name, default=None, minval=None,
- maxval=None, above=None, below=None, max_len=None):
+ def __init__(
+ self,
+ config,
+ name,
+ type_name,
+ default=None,
+ minval=None,
+ maxval=None,
+ above=None,
+ below=None,
+ max_len=None,
+ ):
self._config_section = config.get_name()
self._config_error = config.error
self.name = name
@@ -62,87 +73,120 @@ class ParamHelper:
# support for validating individual options in a list of floats
def _validate_float_list(self, gcmd, values, above, below):
if gcmd:
- description = ("Error on '%s': %s" % (
- gcmd.get_commandline(), self._get_name(gcmd)))
+ description = "Error on '%s': %s" % (
+ gcmd.get_commandline(),
+ self._get_name(gcmd),
+ )
error = gcmd.error
else:
- description = ("Option '%s' in section '%s'" % (
- self._get_name(gcmd), self._config_section))
+ description = "Option '%s' in section '%s'" % (
+ self._get_name(gcmd),
+ self._config_section,
+ )
error = self._config_error
if self.max_len is not None and len(values) > self.max_len:
- raise error(
- "%s has maximum length %s" % (description, self.max_len))
+ raise error("%s has maximum length %s" % (description, self.max_len))
for value in values:
self._validate_float(description, error, value, above, below)
def _get_int(self, config, gcmd, minval, maxval):
get = gcmd.get_int if gcmd else config.getint
- return get(self._get_name(gcmd), self.value, minval or self.minval,
- maxval or self.maxval)
+ return get(
+ self._get_name(gcmd),
+ self.value,
+ minval or self.minval,
+ maxval or self.maxval,
+ )
def _get_float(self, config, gcmd, minval, maxval, above, below):
get = gcmd.get_float if gcmd else config.getfloat
- return get(self._get_name(gcmd), self.value, minval or self.minval,
- maxval or self.maxval, above or self.above, below or self.below)
+ return get(
+ self._get_name(gcmd),
+ self.value,
+ minval or self.minval,
+ maxval or self.maxval,
+ above or self.above,
+ below or self.below,
+ )
def _get_float_list(self, config, gcmd, above, below):
# this code defaults to the empty list, never return None
- default = (self.value or [])
+ default = self.value or []
if gcmd:
# if the parameter isn't part of the command, return the default
if not self._get_name(gcmd) in gcmd.get_command_parameters():
return default
# parameter exists, always prefer whatever is in the command
- value = gcmd.get(self._get_name(gcmd), default='')
+ value = gcmd.get(self._get_name(gcmd), default="")
# Return an empty list for empty value
if len(value.strip()) == 0:
return []
try:
- float_list = [float(p.strip()) for p in value.split(',')]
+ float_list = [float(p.strip()) for p in value.split(",")]
except:
- raise gcmd.error("Error on '%s': unable to parse %s" % (
- gcmd.get_commandline(), value))
+ raise gcmd.error(
+ "Error on '%s': unable to parse %s"
+ % (gcmd.get_commandline(), value)
+ )
else:
- float_list = config.getfloatlist(self._get_name(gcmd),
- default=default)
+ float_list = config.getfloatlist(self._get_name(gcmd), default=default)
if float_list:
self._validate_float_list(gcmd, float_list, above, below)
return float_list
- def get(self, gcmd=None, minval=None, maxval=None, above=None, below=None,
- config=None):
+ def get(
+ self, gcmd=None, minval=None, maxval=None, above=None, below=None, config=None
+ ):
if config is None and gcmd is None:
return self.value
- if self._type_name == 'int':
+ if self._type_name == "int":
return self._get_int(config, gcmd, minval, maxval)
- elif self._type_name == 'float':
+ elif self._type_name == "float":
return self._get_float(config, gcmd, minval, maxval, above, below)
else:
return self._get_float_list(config, gcmd, above, below)
def intParamHelper(config, name, default=None, minval=None, maxval=None):
- return ParamHelper(config, name, 'int', default, minval=minval,
- maxval=maxval)
+ return ParamHelper(config, name, "int", default, minval=minval, maxval=maxval)
-def floatParamHelper(config, name, default=None, minval=None, maxval=None,
- above=None, below=None):
- return ParamHelper(config, name, 'float', default, minval=minval,
- maxval=maxval, above=above, below=below)
+def floatParamHelper(
+ config, name, default=None, minval=None, maxval=None, above=None, below=None
+):
+ return ParamHelper(
+ config,
+ name,
+ "float",
+ default,
+ minval=minval,
+ maxval=maxval,
+ above=above,
+ below=below,
+ )
-def floatListParamHelper(config, name, default=None, above=None, below=None,
- max_len=None):
- return ParamHelper(config, name, 'float_list', default, above=above,
- below=below, max_len=max_len)
+def floatListParamHelper(
+ config, name, default=None, above=None, below=None, max_len=None
+):
+ return ParamHelper(
+ config, name, "float_list", default, above=above, below=below, max_len=max_len
+ )
# container for filter parameters
# allows different filter configurations to be compared
class ContinuousTareFilter:
- def __init__(self, sps=None, drift=None, drift_delay=None, buzz=None,
- buzz_delay=None, notches=None, notch_quality=None):
+ def __init__(
+ self,
+ sps=None,
+ drift=None,
+ drift_delay=None,
+ buzz=None,
+ buzz_delay=None,
+ notches=None,
+ notch_quality=None,
+ ):
self.sps = sps
self.drift = drift
self.drift_delay = drift_delay
@@ -155,20 +199,33 @@ class ContinuousTareFilter:
if not isinstance(other, ContinuousTareFilter):
return False
return (
- self.sps == other.sps and self.drift == other.drift and
- self.drift_delay == other.drift_delay and self.buzz ==
- other.buzz and self.buzz_delay == other.buzz_delay and
- self.notches == other.notches and self.notch_quality ==
- other.notch_quality)
+ self.sps == other.sps
+ and self.drift == other.drift
+ and self.drift_delay == other.drift_delay
+ and self.buzz == other.buzz
+ and self.buzz_delay == other.buzz_delay
+ and self.notches == other.notches
+ and self.notch_quality == other.notch_quality
+ )
# create a filter design from the parameters
def design_filter(self, error_func):
- design = sos_filter.DigitalFilter(self.sps, error_func, self.drift,
- self.drift_delay, self.buzz, self.buzz_delay, self.notches,
- self.notch_quality)
+ design = sos_filter.DigitalFilter(
+ self.sps,
+ error_func,
+ self.drift,
+ self.drift_delay,
+ self.buzz,
+ self.buzz_delay,
+ self.notches,
+ self.notch_quality,
+ )
fixed_filter = sos_filter.FixedPointSosFilter(
- design.get_filter_sections(), design.get_initial_state(),
- Q2_INT_BITS, Q16_INT_BITS)
+ design.get_filter_sections(),
+ design.get_initial_state(),
+ Q2_INT_BITS,
+ Q16_INT_BITS,
+ )
return fixed_filter
@@ -177,31 +234,47 @@ class ContinuousTareFilterHelper:
def __init__(self, config, sensor, cmd_queue):
self._sensor = sensor
self._sps = self._sensor.get_samples_per_second()
- max_filter_frequency = math.floor(self._sps / 2.)
+ max_filter_frequency = math.floor(self._sps / 2.0)
# setup filter parameters
- self._drift_param = floatParamHelper(config,
- "drift_filter_cutoff_frequency", default=None, minval=0.1,
- maxval=20.0)
- self._drift_delay_param = intParamHelper(config, "drift_filter_delay",
- default=2, minval=1, maxval=2)
- self._buzz_param = floatParamHelper(config,
- "buzz_filter_cutoff_frequency", default=None,
+ self._drift_param = floatParamHelper(
+ config,
+ "drift_filter_cutoff_frequency",
+ default=None,
+ minval=0.1,
+ maxval=20.0,
+ )
+ self._drift_delay_param = intParamHelper(
+ config, "drift_filter_delay", default=2, minval=1, maxval=2
+ )
+ self._buzz_param = floatParamHelper(
+ config,
+ "buzz_filter_cutoff_frequency",
+ default=None,
above=min(80.0, max_filter_frequency - 1.0),
- below=max_filter_frequency)
- self._buzz_delay_param = intParamHelper(config, "buzz_filter_delay",
- default=2, minval=1, maxval=2)
- self._notches_param = floatListParamHelper(config,
- "notch_filter_frequencies", default=[], above=0.,
- below=max_filter_frequency, max_len=2)
- self._notch_quality_param = floatParamHelper(config,
- "notch_filter_quality", default=2.0, minval=0.5, maxval=6.0)
+ below=max_filter_frequency,
+ )
+ self._buzz_delay_param = intParamHelper(
+ config, "buzz_filter_delay", default=2, minval=1, maxval=2
+ )
+ self._notches_param = floatListParamHelper(
+ config,
+ "notch_filter_frequencies",
+ default=[],
+ above=0.0,
+ below=max_filter_frequency,
+ max_len=2,
+ )
+ self._notch_quality_param = floatParamHelper(
+ config, "notch_filter_quality", default=2.0, minval=0.5, maxval=6.0
+ )
# filter design specified in the config file, used for defaults
self._config_design = ContinuousTareFilter() # empty filter
self._config_design = self._build_filter()
# filter design currently inside the MCU
self._active_design = self._config_design
self._sos_filter = self._create_filter(
- self._active_design.design_filter(config.error), cmd_queue)
+ self._active_design.design_filter(config.error), cmd_queue
+ )
def _build_filter(self, gcmd=None):
drift = self._drift_param.get(gcmd)
@@ -211,12 +284,12 @@ class ContinuousTareFilterHelper:
# notches must be between drift and buzz:
notches = self._notches_param.get(gcmd, above=drift, below=buzz)
notch_quality = self._notch_quality_param.get(gcmd)
- return ContinuousTareFilter(self._sps, drift, drift_delay, buzz,
- buzz_delay, notches, notch_quality)
+ return ContinuousTareFilter(
+ self._sps, drift, drift_delay, buzz, buzz_delay, notches, notch_quality
+ )
def _create_filter(self, fixed_filter, cmd_queue):
- return sos_filter.SosFilter(self._sensor.get_mcu(), cmd_queue,
- fixed_filter, 4)
+ return sos_filter.SosFilter(self._sensor.get_mcu(), cmd_queue, fixed_filter, 4)
def update_from_command(self, gcmd, cq=None):
gcmd_filter = self._build_filter(gcmd)
@@ -224,8 +297,7 @@ class ContinuousTareFilterHelper:
if self._active_design == gcmd_filter:
return
# update MCU filter from GCode command
- self._sos_filter.change_filter(
- self._active_design.design_filter(gcmd.error))
+ self._sos_filter.change_filter(self._active_design.design_filter(gcmd.error))
def get_sos_filter(self):
return self._sos_filter
@@ -235,9 +307,10 @@ class ContinuousTareFilterHelper:
def check_sensor_errors(results, printer):
samples, errors = results
if errors:
- raise printer.command_error("Load cell sensor reported errors while"
- " probing: %i errors, %i overflows" % (
- errors[0], errors[1]))
+ raise printer.command_error(
+ "Load cell sensor reported errors while"
+ " probing: %i errors, %i overflows" % (errors[0], errors[1])
+ )
return samples
@@ -246,15 +319,18 @@ class LoadCellProbeConfigHelper:
self._printer = config.get_printer()
self._load_cell = load_cell_inst
self._sensor = load_cell_inst.get_sensor()
- self._rest_time = 1. / float(self._sensor.get_samples_per_second())
+ self._rest_time = 1.0 / float(self._sensor.get_samples_per_second())
# Collect 4 x 60hz power cycles of data to average across power noise
- self._tare_time_param = floatParamHelper(config, 'tare_time',
- default=4. / 60., minval=0.01, maxval=1.0)
+ self._tare_time_param = floatParamHelper(
+ config, "tare_time", default=4.0 / 60.0, minval=0.01, maxval=1.0
+ )
# triggering options
- self._trigger_force_param = intParamHelper(config, 'trigger_force',
- default=75, minval=10, maxval=250)
- self._force_safety_limit_param = intParamHelper(config,
- 'force_safety_limit', minval=100, maxval=5000, default=2000)
+ self._trigger_force_param = intParamHelper(
+ config, "trigger_force", default=75, minval=10, maxval=250
+ )
+ self._force_safety_limit_param = intParamHelper(
+ config, "force_safety_limit", minval=100, maxval=5000, default=2000
+ )
def get_tare_samples(self, gcmd=None):
tare_time = self._tare_time_param.get(gcmd)
@@ -292,7 +368,7 @@ class LoadCellProbeConfigHelper:
# few grams which seems very unlikely. Treat this as an error:
if counts_per_gram >= 2**Q2_FRAC_BITS:
raise OverflowError("counts_per_gram value is too large to filter")
- return sos_filter.to_fixed_32((1. / counts_per_gram), Q2_INT_BITS)
+ return sos_filter.to_fixed_32((1.0 / counts_per_gram), Q2_INT_BITS)
# McuLoadCellProbe is the interface to `load_cell_probe` on the MCU
@@ -303,8 +379,9 @@ class McuLoadCellProbe:
ERROR_OVERFLOW = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 2
ERROR_WATCHDOG = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 3
- def __init__(self, config, load_cell_inst, sos_filter_inst, config_helper,
- trigger_dispatch):
+ def __init__(
+ self, config, load_cell_inst, sos_filter_inst, config_helper, trigger_dispatch
+ ):
self._printer = config.get_printer()
self._load_cell = load_cell_inst
self._sos_filter = sos_filter_inst
@@ -325,23 +402,29 @@ class McuLoadCellProbe:
def _config_commands(self):
self._sos_filter.create_filter()
self._mcu.add_config_cmd(
- "config_load_cell_probe oid=%d sos_filter_oid=%d" % (
- self._oid, self._sos_filter.get_oid()))
+ "config_load_cell_probe oid=%d sos_filter_oid=%d"
+ % (self._oid, self._sos_filter.get_oid())
+ )
def _build_config(self):
# Lookup commands
self._query_cmd = self._mcu.lookup_query_command(
"load_cell_probe_query_state oid=%c",
- "load_cell_probe_state oid=%c is_homing_trigger=%c "
- "trigger_ticks=%u", oid=self._oid, cq=self._cmd_queue)
+ "load_cell_probe_state oid=%c is_homing_trigger=%c " "trigger_ticks=%u",
+ oid=self._oid,
+ cq=self._cmd_queue,
+ )
self._set_range_cmd = self._mcu.lookup_command(
"load_cell_probe_set_range"
" oid=%c safety_counts_min=%i safety_counts_max=%i tare_counts=%i"
- " trigger_grams=%u grams_per_count=%i", cq=self._cmd_queue)
+ " trigger_grams=%u grams_per_count=%i",
+ cq=self._cmd_queue,
+ )
self._home_cmd = self._mcu.lookup_command(
"load_cell_probe_home oid=%c trsync_oid=%c trigger_reason=%c"
" error_reason=%c clock=%u rest_ticks=%u timeout=%u",
- cq=self._cmd_queue)
+ cq=self._cmd_queue,
+ )
# the sensor data stream is connected on the MCU at the ready event
def _on_connect(self):
@@ -364,9 +447,14 @@ class McuLoadCellProbe:
self._load_cell.tare(tare_counts)
# update internal tare value
safety_min, safety_max = self._config_helper.get_safety_range(gcmd)
- args = [self._oid, safety_min, safety_max, int(tare_counts),
+ args = [
+ self._oid,
+ safety_min,
+ safety_max,
+ int(tare_counts),
self._config_helper.get_trigger_force_grams(gcmd),
- self._config_helper.get_grams_per_count()]
+ self._config_helper.get_grams_per_count(),
+ ]
self._set_range_cmd.send(args)
self._sos_filter.reset_filter()
@@ -374,14 +462,23 @@ class McuLoadCellProbe:
clock = self._mcu.print_time_to_clock(print_time)
rest_time = self._config_helper.get_rest_time()
rest_ticks = self._mcu.seconds_to_clock(rest_time)
- self._home_cmd.send([self._oid, self._dispatch.get_oid(),
- mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.ERROR_SAFETY_RANGE, clock,
- rest_ticks, self.WATCHDOG_MAX], reqclock=clock)
+ self._home_cmd.send(
+ [
+ self._oid,
+ self._dispatch.get_oid(),
+ mcu.MCU_trsync.REASON_ENDSTOP_HIT,
+ self.ERROR_SAFETY_RANGE,
+ clock,
+ rest_ticks,
+ self.WATCHDOG_MAX,
+ ],
+ reqclock=clock,
+ )
def clear_home(self):
params = self._query_cmd.send([self._oid])
# The time of the first sample that triggered is in "trigger_ticks"
- trigger_ticks = self._mcu.clock32_to_clock64(params['trigger_ticks'])
+ trigger_ticks = self._mcu.clock32_to_clock64(params["trigger_ticks"])
# clear trsync from load_cell_endstop
self._home_cmd.send([self._oid, 0, 0, 0, 0, 0, 0, 0])
return self._mcu.clock_to_print_time(trigger_ticks)
@@ -390,18 +487,23 @@ class McuLoadCellProbe:
# Execute probing moves using the McuLoadCellProbe
class LoadCellProbingMove:
ERROR_MAP = {
- mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during "
- "homing",
+ mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during " "homing",
McuLoadCellProbe.ERROR_SAFETY_RANGE: "Load Cell Probe Error: load "
- "exceeds safety limit",
+ "exceeds safety limit",
McuLoadCellProbe.ERROR_OVERFLOW: "Load Cell Probe Error: fixed point "
- "math overflow",
+ "math overflow",
McuLoadCellProbe.ERROR_WATCHDOG: "Load Cell Probe Error: timed out "
- "waiting for sensor data"
+ "waiting for sensor data",
}
- def __init__(self, config, mcu_load_cell_probe, param_helper,
- continuous_tare_filter_helper, config_helper):
+ def __init__(
+ self,
+ config,
+ mcu_load_cell_probe,
+ param_helper,
+ continuous_tare_filter_helper,
+ config_helper,
+ ):
self._printer = config.get_printer()
self._mcu_load_cell_probe = mcu_load_cell_probe
self._param_helper = param_helper
@@ -417,7 +519,7 @@ class LoadCellProbingMove:
self._last_trigger_time = 0
def _start_collector(self):
- toolhead = self._printer.lookup_object('toolhead')
+ toolhead = self._printer.lookup_object("toolhead")
# homing uses the toolhead last move time which gets special handling
# to significantly buffer print_time if the move queue has drained
print_time = toolhead.get_last_move_time()
@@ -444,8 +546,9 @@ class LoadCellProbingMove:
self._mcu_load_cell_probe.home_start(print_time)
return trigger_completion
- def home_start(self, print_time, sample_time, sample_count, rest_time,
- triggered=True):
+ def home_start(
+ self, print_time, sample_time, sample_count, rest_time, triggered=True
+ ):
return self._home_start(print_time)
def home_wait(self, home_end_time):
@@ -460,7 +563,7 @@ class LoadCellProbingMove:
error = self.ERROR_MAP[res]
raise self._printer.command_error(error)
if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT:
- return 0.
+ return 0.0
return self._last_trigger_time
def get_steppers(self):
@@ -474,11 +577,11 @@ class LoadCellProbingMove:
# tare the sensor just before probing
self._pause_and_tare(gcmd)
# get params for the homing move
- toolhead = self._printer.lookup_object('toolhead')
+ toolhead = self._printer.lookup_object("toolhead")
pos = toolhead.get_position()
pos[2] = self._z_min_position
- speed = self._param_helper.get_probe_params(gcmd)['probe_speed']
- phoming = self._printer.lookup_object('homing')
+ speed = self._param_helper.get_probe_params(gcmd)["probe_speed"]
+ phoming = self._printer.lookup_object("homing")
# start collector after tare samples are consumed
collector = self._start_collector()
# do homing move
@@ -487,15 +590,15 @@ class LoadCellProbingMove:
# Wait for the MCU to trigger with no movement
def probing_test(self, gcmd, timeout):
self._pause_and_tare(gcmd)
- toolhead = self._printer.lookup_object('toolhead')
+ toolhead = self._printer.lookup_object("toolhead")
print_time = toolhead.get_last_move_time()
self._home_start(print_time)
return self.home_wait(print_time + timeout)
def get_status(self, eventtime):
return {
- 'tare_counts': self._tare_counts,
- 'last_trigger_time': self._last_trigger_time,
+ "tare_counts": self._tare_counts,
+ "last_trigger_time": self._last_trigger_time,
}
@@ -512,15 +615,16 @@ class TappingMove:
self._clients = load_cell.ApiClientHelper(config.get_printer())
name = config.get_name()
header = {"header": ["probe_tap_event"]}
- self._clients.add_mux_endpoint("load_cell_probe/dump_taps",
- "load_cell_probe", name, header)
+ self._clients.add_mux_endpoint(
+ "load_cell_probe/dump_taps", "load_cell_probe", name, header
+ )
# perform a probing move and a pullback move
def run_tap(self, gcmd):
# do the descending move
epos, collector = self._load_cell_probing_move.probing_move(gcmd)
# collect samples from the tap
- toolhead = self._printer.lookup_object('toolhead')
+ toolhead = self._printer.lookup_object("toolhead")
toolhead.flush_step_generation()
move_end = toolhead.get_last_move_time()
results = collector.collect_until(move_end)
@@ -528,15 +632,15 @@ class TappingMove:
# Analyze the tap data
ppa = TapAnalysis(samples)
# broadcast tap event data:
- self._clients.send({'tap': ppa.to_dict()})
+ self._clients.send({"tap": ppa.to_dict()})
self._is_last_result_valid = True
self._last_result = epos[2]
return epos, self._is_last_result_valid
def get_status(self, eventtime):
return {
- 'last_z_result': self._last_result,
- 'is_last_tap_valid': self._is_last_result_valid
+ "last_z_result": self._last_result,
+ "is_last_tap_valid": self._is_last_result_valid,
}
@@ -574,20 +678,23 @@ class LoadCellProbeCommands:
def _register_commands(self):
# Register commands
- gcode = self._printer.lookup_object('gcode')
- gcode.register_command("LOAD_CELL_TEST_TAP",
- self.cmd_LOAD_CELL_TEST_TAP, desc=self.cmd_LOAD_CELL_TEST_TAP_help)
+ gcode = self._printer.lookup_object("gcode")
+ gcode.register_command(
+ "LOAD_CELL_TEST_TAP",
+ self.cmd_LOAD_CELL_TEST_TAP,
+ desc=self.cmd_LOAD_CELL_TEST_TAP_help,
+ )
cmd_LOAD_CELL_TEST_TAP_help = "Tap the load cell probe to verify operation"
def cmd_LOAD_CELL_TEST_TAP(self, gcmd):
taps = gcmd.get_int("TAPS", 3, minval=1, maxval=10)
- timeout = gcmd.get_float("TIMEOUT", 30., minval=1., maxval=120.)
+ timeout = gcmd.get_float("TIMEOUT", 30.0, minval=1.0, maxval=120.0)
gcmd.respond_info("Tap the load cell %s times:" % (taps,))
reactor = self._printer.get_reactor()
for i in range(0, taps):
result = self._load_cell_probing_move.probing_test(gcmd, timeout)
- if result == 0.:
+ if result == 0.0:
# notify of error, likely due to timeout
raise gcmd.error("Test timeout out")
gcmd.respond_info("Tap Detected!")
@@ -609,34 +716,43 @@ class LoadCellPrinterProbe:
sensors = {}
sensors.update(hx71x.HX71X_SENSOR_TYPES)
sensors.update(ads1220.ADS1220_SENSOR_TYPE)
- sensor_class = config.getchoice('sensor_type', sensors)
+ sensor_class = config.getchoice("sensor_type", sensors)
sensor = sensor_class(config)
self._load_cell = load_cell.LoadCell(config, sensor)
# Read all user configuration and build modules
config_helper = LoadCellProbeConfigHelper(config, self._load_cell)
self._mcu = self._load_cell.get_sensor().get_mcu()
trigger_dispatch = mcu.TriggerDispatch(self._mcu)
- continuous_tare_filter_helper = ContinuousTareFilterHelper(config,
- sensor, trigger_dispatch.get_command_queue())
+ continuous_tare_filter_helper = ContinuousTareFilterHelper(
+ config, sensor, trigger_dispatch.get_command_queue()
+ )
# Probe Interface
self._param_helper = probe.ProbeParameterHelper(config)
self._cmd_helper = probe.ProbeCommandHelper(config, self)
self._probe_offsets = probe.ProbeOffsetsHelper(config)
- self._mcu_load_cell_probe = McuLoadCellProbe(config, self._load_cell,
- continuous_tare_filter_helper.get_sos_filter(), config_helper,
- trigger_dispatch)
- load_cell_probing_move = LoadCellProbingMove(config,
- self._mcu_load_cell_probe, self._param_helper,
- continuous_tare_filter_helper, config_helper)
- self._tapping_move = TappingMove(config, load_cell_probing_move,
- config_helper)
+ self._mcu_load_cell_probe = McuLoadCellProbe(
+ config,
+ self._load_cell,
+ continuous_tare_filter_helper.get_sos_filter(),
+ config_helper,
+ trigger_dispatch,
+ )
+ load_cell_probing_move = LoadCellProbingMove(
+ config,
+ self._mcu_load_cell_probe,
+ self._param_helper,
+ continuous_tare_filter_helper,
+ config_helper,
+ )
+ self._tapping_move = TappingMove(config, load_cell_probing_move, config_helper)
tap_session = TapSession(config, self._tapping_move, self._param_helper)
- self._probe_session = probe.ProbeSessionHelper(config,
- self._param_helper, tap_session.start_probe_session)
+ self._probe_session = probe.ProbeSessionHelper(
+ config, self._param_helper, tap_session.start_probe_session
+ )
# printer integration
LoadCellProbeCommands(config, load_cell_probing_move)
probe.ProbeVirtualEndstopDeprecation(config)
- self._printer.add_object('probe', self)
+ self._printer.add_object("probe", self)
def get_probe_params(self, gcmd=None):
return self._param_helper.get_probe_params(gcmd)