aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--klippy/extras/load_cell_probe.py658
1 files changed, 658 insertions, 0 deletions
diff --git a/klippy/extras/load_cell_probe.py b/klippy/extras/load_cell_probe.py
new file mode 100644
index 00000000..115609f5
--- /dev/null
+++ b/klippy/extras/load_cell_probe.py
@@ -0,0 +1,658 @@
+# Load Cell Probe
+#
+# Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
+#
+# This file may be distributed under the terms of the GNU GPLv3 license.
+import logging, math
+import mcu
+from . import probe, sos_filter, load_cell, hx71x, ads1220
+
+np = None # delay NumPy import until configuration time
+
+# constants for fixed point numbers
+Q2_INT_BITS = 2
+Q2_FRAC_BITS = (32 - (1 + Q2_INT_BITS))
+Q16_INT_BITS = 16
+Q16_FRAC_BITS = (32 - (1 + Q16_INT_BITS))
+
+
+class TapAnalysis:
+ def __init__(self, samples):
+ nd_samples = np.asarray(samples, dtype=np.float64)
+ self.time = nd_samples[:, 0]
+ self.force = nd_samples[:, 1]
+
+ # convert to dictionary for JSON encoder
+ def to_dict(self):
+ return {
+ 'time': self.time.tolist(), 'force': self.force.tolist(),
+ 'is_valid': True,
+ }
+
+
+# Access a parameter from config or GCode command via a consistent interface
+# stores name and constraints to keep things DRY
+class ParamHelper:
+ def __init__(self, config, name, type_name, default=None, minval=None,
+ maxval=None, above=None, below=None, max_len=None):
+ self._config_section = config.section
+ self._config_error = config.error
+ self.name = name
+ self._type_name = type_name
+ self.value = default
+ self.minval = minval
+ self.maxval = maxval
+ self.above = above
+ self.below = below
+ self.max_len = max_len
+ # read from config once
+ self.value = self.get(config=config)
+
+ def _get_name(self, gcmd):
+ return self.name.upper() if gcmd else self.name
+
+ def _validate_float(self, description, error, value, above, below):
+ above = above or self.above
+ if above is not None and value <= above:
+ raise error("%s must be above %s" % (description, above))
+ below = below or self.below
+ if below is not None and value >= below:
+ raise error("%s must be below %s" % (description, below))
+
+ # support for validating individual options in a list of floats
+ def _validate_float_list(self, gcmd, values, above, below):
+ if gcmd:
+ description = ("Error on '%s': %s" % (
+ gcmd.get_commandline(), self._get_name(gcmd)))
+ error = gcmd.error
+ else:
+ description = ("Option '%s' in section '%s'" % (
+ self._get_name(gcmd), self._config_section))
+ error = self._config_error
+ if self.max_len is not None and len(values) > self.max_len:
+ raise error(
+ "%s has maximum length %s" % (description, self.max_len))
+ for value in values:
+ self._validate_float(description, error, value, above, below)
+
+ def _get_int(self, config, gcmd, minval, maxval):
+ get = gcmd.get_int if gcmd else config.getint
+ return get(self._get_name(gcmd), self.value, minval or self.minval,
+ maxval or self.maxval)
+
+ def _get_float(self, config, gcmd, minval, maxval, above, below):
+ get = gcmd.get_float if gcmd else config.getfloat
+ return get(self._get_name(gcmd), self.value, minval or self.minval,
+ maxval or self.maxval, above or self.above, below or self.below)
+
+ def _get_float_list(self, config, gcmd, above, below):
+ # this code defaults to the empty list, never return None
+ default = (self.value or [])
+ if gcmd:
+ # if the parameter isn't part of the command, return the default
+ if not self._get_name(gcmd) in gcmd.get_command_parameters():
+ return default
+ # parameter exists, always prefer whatever is in the command
+ value = gcmd.get(self._get_name(gcmd), default='')
+ # Return an empty list for empty value
+ if len(value.strip()) == 0:
+ return []
+ try:
+ float_list = [float(p.strip()) for p in value.split(',')]
+ except:
+ raise gcmd.error("Error on '%s': unable to parse %s" % (
+ gcmd.get_commandline(), value))
+ else:
+ float_list = config.getfloatlist(self._get_name(gcmd),
+ default=default)
+ if float_list:
+ self._validate_float_list(gcmd, float_list, above, below)
+ return float_list
+
+ def get(self, gcmd=None, minval=None, maxval=None, above=None, below=None,
+ config=None):
+ if config is None and gcmd is None:
+ return self.value
+ if self._type_name == 'int':
+ return self._get_int(config, gcmd, minval, maxval)
+ elif self._type_name == 'float':
+ return self._get_float(config, gcmd, minval, maxval, above, below)
+ else:
+ return self._get_float_list(config, gcmd, above, below)
+
+
+def intParamHelper(config, name, default=None, minval=None, maxval=None):
+ return ParamHelper(config, name, 'int', default, minval=minval,
+ maxval=maxval)
+
+
+def floatParamHelper(config, name, default=None, minval=None, maxval=None,
+ above=None, below=None):
+ return ParamHelper(config, name, 'float', default, minval=minval,
+ maxval=maxval, above=above, below=below)
+
+
+def floatListParamHelper(config, name, default=None, above=None, below=None,
+ max_len=None):
+ return ParamHelper(config, name, 'float_list', default, above=above,
+ below=below, max_len=max_len)
+
+
+# container for filter parameters
+# allows different filter configurations to be compared
+class ContinuousTareFilter:
+ def __init__(self, sps=None, drift=None, drift_delay=None, buzz=None,
+ buzz_delay=None, notches=None, notch_quality=None):
+ self.sps = sps
+ self.drift = drift
+ self.drift_delay = drift_delay
+ self.buzz = buzz
+ self.buzz_delay = buzz_delay
+ self.notches = notches
+ self.notch_quality = notch_quality
+
+ def __eq__(self, other):
+ if not isinstance(other, ContinuousTareFilter):
+ return False
+ return (
+ self.sps == other.sps and self.drift == other.drift and
+ self.drift_delay == other.drift_delay and self.buzz ==
+ other.buzz and self.buzz_delay == other.buzz_delay and
+ self.notches == other.notches and self.notch_quality ==
+ other.notch_quality)
+
+ # create a filter design from the parameters
+ def design_filter(self, error_func):
+ design = sos_filter.DigitalFilter(self.sps, error_func, self.drift,
+ self.drift_delay, self.buzz, self.buzz_delay, self.notches,
+ self.notch_quality)
+ fixed_filter = sos_filter.FixedPointSosFilter(
+ design.get_filter_sections(), design.get_initial_state(),
+ Q2_INT_BITS, Q16_INT_BITS)
+ return fixed_filter
+
+
+# Combine ContinuousTareFilter and SosFilter into an easy-to-use class
+class ContinuousTareFilterHelper:
+ def __init__(self, config, sensor, cmd_queue):
+ self._sensor = sensor
+ self._sps = self._sensor.get_samples_per_second()
+ max_filter_frequency = math.floor(self._sps / 2.)
+ # setup filter parameters
+ self._drift_param = floatParamHelper(config,
+ "drift_filter_cutoff_frequency", default=None, minval=0.1,
+ maxval=20.0)
+ self._drift_delay_param = intParamHelper(config, "drift_filter_delay",
+ default=2, minval=1, maxval=2)
+ self._buzz_param = floatParamHelper(config,
+ "buzz_filter_cutoff_frequency", default=None,
+ above=min(80.0, max_filter_frequency - 1.0),
+ below=max_filter_frequency)
+ self._buzz_delay_param = intParamHelper(config, "buzz_filter_delay",
+ default=2, minval=1, maxval=2)
+ self._notches_param = floatListParamHelper(config,
+ "notch_filter_frequencies", default=[], above=0.,
+ below=max_filter_frequency, max_len=2)
+ self._notch_quality_param = floatParamHelper(config,
+ "notch_filter_quality", default=2.0, minval=0.5, maxval=6.0)
+ # filter design specified in the config file, used for defaults
+ self._config_design = ContinuousTareFilter() # empty filter
+ self._config_design = self._build_filter()
+ # filter design currently inside the MCU
+ self._active_design = self._config_design
+ self._sos_filter = self._create_filter(
+ self._active_design.design_filter(config.error), cmd_queue)
+
+ def _build_filter(self, gcmd=None):
+ drift = self._drift_param.get(gcmd)
+ drift_delay = self._drift_delay_param.get(gcmd)
+ buzz = self._buzz_param.get(gcmd)
+ buzz_delay = self._buzz_delay_param.get(gcmd)
+ # notches must be between drift and buzz:
+ notches = self._notches_param.get(gcmd, above=drift, below=buzz)
+ notch_quality = self._notch_quality_param.get(gcmd)
+ return ContinuousTareFilter(self._sps, drift, drift_delay, buzz,
+ buzz_delay, notches, notch_quality)
+
+ def _create_filter(self, fixed_filter, cmd_queue):
+ return sos_filter.SosFilter(self._sensor.get_mcu(), cmd_queue,
+ fixed_filter, 4)
+
+ def update_from_command(self, gcmd, cq=None):
+ gcmd_filter = self._build_filter(gcmd)
+ # if filters are identical, no change required
+ if self._active_design == gcmd_filter:
+ return
+ # update MCU filter from GCode command
+ self._sos_filter.change_filter(
+ self._active_design.design_filter(gcmd.error))
+
+ def get_sos_filter(self):
+ return self._sos_filter
+
+
+# check results from the collector for errors and raise an exception is found
+def check_sensor_errors(results, printer):
+ samples, errors = results
+ if errors:
+ raise printer.command_error("Load cell sensor reported errors while"
+ " probing: %i errors, %i overflows" % (
+ errors[0], errors[1]))
+ return samples
+
+
+class LoadCellProbeConfigHelper:
+ def __init__(self, config, load_cell_inst):
+ self._printer = config.get_printer()
+ self._load_cell = load_cell_inst
+ self._sensor = load_cell_inst.get_sensor()
+ self._rest_time = 1. / float(self._sensor.get_samples_per_second())
+ # Collect 4 x 60hz power cycles of data to average across power noise
+ self._tare_time_param = floatParamHelper(config, 'tare_time',
+ default=4. / 60., minval=0.01, maxval=1.0)
+ # triggering options
+ self._trigger_force_param = intParamHelper(config, 'trigger_force',
+ default=75, minval=10, maxval=250)
+ self._force_safety_limit_param = intParamHelper(config,
+ 'force_safety_limit', minval=100, maxval=5000, default=2000)
+
+ def get_tare_samples(self, gcmd=None):
+ tare_time = self._tare_time_param.get(gcmd)
+ sps = self._sensor.get_samples_per_second()
+ return max(2, math.ceil(tare_time * sps))
+
+ def get_trigger_force_grams(self, gcmd=None):
+ return self._trigger_force_param.get(gcmd)
+
+ def get_safety_limit_grams(self, gcmd=None):
+ return self._force_safety_limit_param.get(gcmd)
+
+ def get_rest_time(self):
+ return self._rest_time
+
+ def get_safety_range(self, gcmd=None):
+ counts_per_gram = self._load_cell.get_counts_per_gram()
+ # calculate the safety band
+ zero = self._load_cell.get_reference_tare_counts()
+ safety_counts = int(counts_per_gram * self.get_safety_limit_grams(gcmd))
+ safety_min = int(zero - safety_counts)
+ safety_max = int(zero + safety_counts)
+ # don't allow a safety range outside the sensor's real range
+ sensor_min, sensor_max = self._load_cell.get_sensor().get_range()
+ if safety_min <= sensor_min or safety_max >= sensor_max:
+ cmd_err = self._printer.command_error
+ raise cmd_err("Load cell force_safety_limit exceeds sensor range!")
+ return safety_min, safety_max
+
+ # calculate 1/counts_per_gram in Q2 fixed point
+ def get_grams_per_count(self):
+ counts_per_gram = self._load_cell.get_counts_per_gram()
+ # The counts_per_gram could be so large that it becomes 0.0 when
+ # converted to Q2 format. This would mean the ADC range only measures a
+ # few grams which seems very unlikely. Treat this as an error:
+ if counts_per_gram >= 2**Q2_FRAC_BITS:
+ raise OverflowError("counts_per_gram value is too large to filter")
+ return sos_filter.to_fixed_32((1. / counts_per_gram), Q2_INT_BITS)
+
+
+# McuLoadCellProbe is the interface to `load_cell_probe` on the MCU
+# This also manages the SosFilter so all commands use one command queue
+class McuLoadCellProbe:
+ WATCHDOG_MAX = 3
+ ERROR_SAFETY_RANGE = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 1
+ ERROR_OVERFLOW = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 2
+ ERROR_WATCHDOG = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 3
+
+ def __init__(self, config, load_cell_inst, sos_filter_inst, config_helper,
+ trigger_dispatch):
+ self._printer = config.get_printer()
+ self._load_cell = load_cell_inst
+ self._sos_filter = sos_filter_inst
+ self._config_helper = config_helper
+ self._sensor = load_cell_inst.get_sensor()
+ self._mcu = self._sensor.get_mcu()
+ # configure MCU objects
+ self._dispatch = trigger_dispatch
+ self._cmd_queue = self._dispatch.get_command_queue()
+ self._oid = self._mcu.create_oid()
+ self._config_commands()
+ self._home_cmd = None
+ self._query_cmd = None
+ self._set_range_cmd = None
+ self._mcu.register_config_callback(self._build_config)
+ self._printer.register_event_handler("klippy:connect", self._on_connect)
+
+ def _config_commands(self):
+ self._sos_filter.create_filter()
+ self._mcu.add_config_cmd(
+ "config_load_cell_probe oid=%d sos_filter_oid=%d" % (
+ self._oid, self._sos_filter.get_oid()))
+
+ def _build_config(self):
+ # Lookup commands
+ self._query_cmd = self._mcu.lookup_query_command(
+ "load_cell_probe_query_state oid=%c",
+ "load_cell_probe_state oid=%c is_homing_trigger=%c "
+ "trigger_ticks=%u", oid=self._oid, cq=self._cmd_queue)
+ self._set_range_cmd = self._mcu.lookup_command(
+ "load_cell_probe_set_range"
+ " oid=%c safety_counts_min=%i safety_counts_max=%i tare_counts=%i"
+ " trigger_grams=%u grams_per_count=%i", cq=self._cmd_queue)
+ self._home_cmd = self._mcu.lookup_command(
+ "load_cell_probe_home oid=%c trsync_oid=%c trigger_reason=%c"
+ " error_reason=%c clock=%u rest_ticks=%u timeout=%u",
+ cq=self._cmd_queue)
+
+ # the sensor data stream is connected on the MCU at the ready event
+ def _on_connect(self):
+ self._sensor.attach_load_cell_probe(self._oid)
+
+ def get_oid(self):
+ return self._oid
+
+ def get_mcu(self):
+ return self._mcu
+
+ def get_load_cell(self):
+ return self._load_cell
+
+ def get_dispatch(self):
+ return self._dispatch
+
+ def set_endstop_range(self, tare_counts, gcmd=None):
+ # update the load cell so it reflects the new tare value
+ self._load_cell.tare(tare_counts)
+ # update internal tare value
+ safety_min, safety_max = self._config_helper.get_safety_range(gcmd)
+ args = [self._oid, safety_min, safety_max, int(tare_counts),
+ self._config_helper.get_trigger_force_grams(gcmd),
+ self._config_helper.get_grams_per_count()]
+ self._set_range_cmd.send(args)
+ self._sos_filter.reset_filter()
+
+ def home_start(self, print_time):
+ clock = self._mcu.print_time_to_clock(print_time)
+ rest_time = self._config_helper.get_rest_time()
+ rest_ticks = self._mcu.seconds_to_clock(rest_time)
+ self._home_cmd.send([self._oid, self._dispatch.get_oid(),
+ mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.ERROR_SAFETY_RANGE, clock,
+ rest_ticks, self.WATCHDOG_MAX], reqclock=clock)
+
+ def clear_home(self):
+ params = self._query_cmd.send([self._oid])
+ # The time of the first sample that triggered is in "trigger_ticks"
+ trigger_ticks = self._mcu.clock32_to_clock64(params['trigger_ticks'])
+ # clear trsync from load_cell_endstop
+ self._home_cmd.send([self._oid, 0, 0, 0, 0, 0, 0, 0])
+ return self._mcu.clock_to_print_time(trigger_ticks)
+
+
+# Execute probing moves using the McuLoadCellProbe
+class LoadCellProbingMove:
+ ERROR_MAP = {
+ mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during "
+ "homing",
+ McuLoadCellProbe.ERROR_SAFETY_RANGE: "Load Cell Probe Error: load "
+ "exceeds safety limit",
+ McuLoadCellProbe.ERROR_OVERFLOW: "Load Cell Probe Error: fixed point "
+ "math overflow",
+ McuLoadCellProbe.ERROR_WATCHDOG: "Load Cell Probe Error: timed out "
+ "waiting for sensor data"
+ }
+
+ def __init__(self, config, mcu_load_cell_probe, param_helper,
+ continuous_tare_filter_helper, config_helper):
+ self._printer = config.get_printer()
+ self._mcu_load_cell_probe = mcu_load_cell_probe
+ self._param_helper = param_helper
+ self._continuous_tare_filter_helper = continuous_tare_filter_helper
+ self._config_helper = config_helper
+ self._mcu = mcu_load_cell_probe.get_mcu()
+ self._load_cell = mcu_load_cell_probe.get_load_cell()
+ self._z_min_position = probe.lookup_minimum_z(config)
+ self._dispatch = mcu_load_cell_probe.get_dispatch()
+ probe.LookupZSteppers(config, self._dispatch.add_stepper)
+ # internal state tracking
+ self._tare_counts = 0
+ self._last_trigger_time = 0
+
+ def _start_collector(self):
+ toolhead = self._printer.lookup_object('toolhead')
+ # homing uses the toolhead last move time which gets special handling
+ # to significantly buffer print_time if the move queue has drained
+ print_time = toolhead.get_last_move_time()
+ collector = self._load_cell.get_collector()
+ collector.start_collecting(min_time=print_time)
+ return collector
+
+ # pauses for the last move to complete and then
+ # sets the endstop tare value and range
+ def _pause_and_tare(self, gcmd):
+ collector = self._start_collector()
+ num_samples = self._config_helper.get_tare_samples(gcmd)
+ # use collect_min collected samples are not wasted
+ results = collector.collect_min(num_samples)
+ tare_samples = check_sensor_errors(results, self._printer)
+ tare_counts = np.average(np.array(tare_samples)[:, 2].astype(float))
+ # update sos_filter with any gcode parameter changes
+ self._continuous_tare_filter_helper.update_from_command(gcmd)
+ self._mcu_load_cell_probe.set_endstop_range(tare_counts, gcmd)
+
+ def _home_start(self, print_time):
+ # start trsync
+ trigger_completion = self._dispatch.start(print_time)
+ self._mcu_load_cell_probe.home_start(print_time)
+ return trigger_completion
+
+ def home_start(self, print_time, sample_time, sample_count, rest_time,
+ triggered=True):
+ return self._home_start(print_time)
+
+ def home_wait(self, home_end_time):
+ self._dispatch.wait_end(home_end_time)
+ # trigger has happened, now to find out why...
+ res = self._dispatch.stop()
+ # clear the homing state so it stops processing samples
+ self._last_trigger_time = self._mcu_load_cell_probe.clear_home()
+ if res >= mcu.MCU_trsync.REASON_COMMS_TIMEOUT:
+ error = "Load Cell Probe Error: unknown reason code %i" % (res,)
+ if res in self.ERROR_MAP:
+ error = self.ERROR_MAP[res]
+ raise self._printer.command_error(error)
+ if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT:
+ return 0.
+ return self._last_trigger_time
+
+ def get_steppers(self):
+ return self._dispatch.get_steppers()
+
+ # Probe towards z_min until the load_cell_probe on the MCU triggers
+ def probing_move(self, gcmd):
+ # do not permit probing if the load cell is not calibrated
+ if not self._load_cell.is_calibrated():
+ raise self._printer.command_error("Load Cell not calibrated")
+ # tare the sensor just before probing
+ self._pause_and_tare(gcmd)
+ # get params for the homing move
+ toolhead = self._printer.lookup_object('toolhead')
+ pos = toolhead.get_position()
+ pos[2] = self._z_min_position
+ speed = self._param_helper.get_probe_params(gcmd)['probe_speed']
+ phoming = self._printer.lookup_object('homing')
+ # start collector after tare samples are consumed
+ collector = self._start_collector()
+ # do homing move
+ return phoming.probing_move(self, pos, speed), collector
+
+ # Wait for the MCU to trigger with no movement
+ def probing_test(self, gcmd, timeout):
+ self._pause_and_tare(gcmd)
+ toolhead = self._printer.lookup_object('toolhead')
+ print_time = toolhead.get_last_move_time()
+ self._home_start(print_time)
+ return self.home_wait(print_time + timeout)
+
+ def get_status(self, eventtime):
+ return {
+ 'tare_counts': self._tare_counts,
+ 'last_trigger_time': self._last_trigger_time,
+ }
+
+
+# Perform a single complete tap
+class TappingMove:
+ def __init__(self, config, load_cell_probing_move, config_helper):
+ self._printer = config.get_printer()
+ self._load_cell_probing_move = load_cell_probing_move
+ self._config_helper = config_helper
+ # track results of the last tap
+ self._last_result = None
+ self._is_last_result_valid = False
+ # webhooks support
+ self._clients = load_cell.ApiClientHelper(config.get_printer())
+ name = config.get_name()
+ header = {"header": ["probe_tap_event"]}
+ self._clients.add_mux_endpoint("load_cell_probe/dump_taps",
+ "load_cell_probe", name, header)
+
+ # perform a probing move and a pullback move
+ def run_tap(self, gcmd):
+ # do the descending move
+ epos, collector = self._load_cell_probing_move.probing_move(gcmd)
+ # collect samples from the tap
+ toolhead = self._printer.lookup_object('toolhead')
+ toolhead.flush_step_generation()
+ move_end = toolhead.get_last_move_time()
+ results = collector.collect_until(move_end)
+ samples = check_sensor_errors(results, self._printer)
+ # Analyze the tap data
+ ppa = TapAnalysis(samples)
+ # broadcast tap event data:
+ self._clients.send({'tap': ppa.to_dict()})
+ self._is_last_result_valid = True
+ self._last_result = epos[2]
+ return epos, self._is_last_result_valid
+
+ def get_status(self, eventtime):
+ return {
+ 'last_z_result': self._last_result,
+ 'is_last_tap_valid': self._is_last_result_valid
+ }
+
+
+# ProbeSession that implements Tap logic
+class TapSession:
+ def __init__(self, config, tapping_move, probe_params_helper):
+ self._printer = config.get_printer()
+ self._tapping_move = tapping_move
+ self._probe_params_helper = probe_params_helper
+ # Session state
+ self._results = []
+
+ def start_probe_session(self, gcmd):
+ return self
+
+ def end_probe_session(self):
+ self._results = []
+
+ # probe until a single good sample is returned or retries are exhausted
+ def run_probe(self, gcmd):
+ epos, is_good = self._tapping_move.run_tap(gcmd)
+ self._results.append(epos)
+
+ def pull_probed_results(self):
+ res = self._results
+ self._results = []
+ return res
+
+
+class LoadCellProbeCommands:
+ def __init__(self, config, load_cell_probing_move):
+ self._printer = config.get_printer()
+ self._load_cell_probing_move = load_cell_probing_move
+ self._register_commands()
+
+ def _register_commands(self):
+ # Register commands
+ gcode = self._printer.lookup_object('gcode')
+ gcode.register_command("LOAD_CELL_TEST_TAP",
+ self.cmd_LOAD_CELL_TEST_TAP, desc=self.cmd_LOAD_CELL_TEST_TAP_help)
+
+ cmd_LOAD_CELL_TEST_TAP_help = "Tap the load cell probe to verify operation"
+
+ def cmd_LOAD_CELL_TEST_TAP(self, gcmd):
+ taps = gcmd.get_int("TAPS", 3, minval=1, maxval=10)
+ timeout = gcmd.get_float("TIMEOUT", 30., minval=1., maxval=120.)
+ gcmd.respond_info("Tap the load cell %s times:" % (taps,))
+ reactor = self._printer.get_reactor()
+ for i in range(0, taps):
+ result = self._load_cell_probing_move.probing_test(gcmd, timeout)
+ if result == 0.:
+ # notify of error, likely due to timeout
+ raise gcmd.error("Test timeout out")
+ gcmd.respond_info("Tap Detected!")
+ # give the user some time for their finger to move away
+ reactor.pause(reactor.monotonic() + 0.2)
+ gcmd.respond_info("Test complete, %s taps detected" % (taps,))
+
+
+class LoadCellPrinterProbe:
+ def __init__(self, config):
+ cfg_error = config.error
+ try:
+ global np
+ import numpy as np
+ except:
+ raise cfg_error("[load_cell_probe] requires the NumPy module")
+ self._printer = config.get_printer()
+ # Sensor types supported by load_cell_probe
+ sensors = {}
+ sensors.update(hx71x.HX71X_SENSOR_TYPES)
+ sensors.update(ads1220.ADS1220_SENSOR_TYPE)
+ sensor_class = config.getchoice('sensor_type', sensors)
+ sensor = sensor_class(config)
+ self._load_cell = load_cell.LoadCell(config, sensor)
+ # Read all user configuration and build modules
+ config_helper = LoadCellProbeConfigHelper(config, self._load_cell)
+ self._mcu = self._load_cell.get_sensor().get_mcu()
+ trigger_dispatch = mcu.TriggerDispatch(self._mcu)
+ continuous_tare_filter_helper = ContinuousTareFilterHelper(config,
+ sensor, trigger_dispatch.get_command_queue())
+ # Probe Interface
+ self._param_helper = probe.ProbeParameterHelper(config)
+ self._cmd_helper = probe.ProbeCommandHelper(config, self)
+ self._probe_offsets = probe.ProbeOffsetsHelper(config)
+ self._mcu_load_cell_probe = McuLoadCellProbe(config, self._load_cell,
+ continuous_tare_filter_helper.get_sos_filter(), config_helper,
+ trigger_dispatch)
+ load_cell_probing_move = LoadCellProbingMove(config,
+ self._mcu_load_cell_probe, self._param_helper,
+ continuous_tare_filter_helper, config_helper)
+ self._tapping_move = TappingMove(config, load_cell_probing_move,
+ config_helper)
+ tap_session = TapSession(config, self._tapping_move, self._param_helper)
+ self._probe_session = probe.ProbeSessionHelper(config,
+ self._param_helper, tap_session.start_probe_session)
+ # printer integration
+ LoadCellProbeCommands(config, load_cell_probing_move)
+ probe.ProbeVirtualEndstopDeprecation(config)
+ self._printer.add_object('probe', self)
+
+ def get_probe_params(self, gcmd=None):
+ return self._param_helper.get_probe_params(gcmd)
+
+ def get_offsets(self):
+ return self._probe_offsets.get_offsets()
+
+ def start_probe_session(self, gcmd):
+ return self._probe_session.start_probe_session(gcmd)
+
+ def get_status(self, eventtime):
+ status = self._cmd_helper.get_status(eventtime)
+ status.update(self._load_cell.get_status(eventtime))
+ status.update(self._tapping_move.get_status(eventtime))
+ return status
+
+
+def load_config(config):
+ return LoadCellPrinterProbe(config)