diff options
Diffstat (limited to 'klippy/extras/load_cell.py')
-rw-r--r-- | klippy/extras/load_cell.py | 294 |
1 files changed, 183 insertions, 111 deletions
diff --git a/klippy/extras/load_cell.py b/klippy/extras/load_cell.py index 5ef2c5b7..3481c06e 100644 --- a/klippy/extras/load_cell.py +++ b/klippy/extras/load_cell.py @@ -8,20 +8,24 @@ from . import hx71x from . import ads1220 from .bulk_sensor import BatchWebhooksClient import collections, itertools + # We want either Python 3's zip() or Python 2's izip() but NOT 2's zip(): zip_impl = zip try: - from itertools import izip as zip_impl # python 2.x izip -except ImportError: # will be Python 3.x + from itertools import izip as zip_impl # python 2.x izip +except ImportError: # will be Python 3.x pass + # alternative to numpy's column selection: def select_column(data, column_idx): return list(zip_impl(*data))[column_idx] + def avg(data): return sum(data) / len(data) + # Helper for event driven webhooks and subscription based API clients class ApiClientHelper(object): def __init__(self, printer): @@ -50,9 +54,10 @@ class ApiClientHelper(object): # Set up a webhooks endpoint with a static header def add_mux_endpoint(self, path, key, value, webhooks_start_resp): self.webhooks_start_resp = webhooks_start_resp - wh = self.printer.lookup_object('webhooks') + wh = self.printer.lookup_object("webhooks") wh.register_mux_endpoint(path, key, value, self._add_webhooks_client) + # Class for handling commands related to load cells class LoadCellCommandHelper: def __init__(self, config, load_cell): @@ -66,33 +71,53 @@ class LoadCellCommandHelper: def register_commands(self, name): # Register commands - gcode = self.printer.lookup_object('gcode') - gcode.register_mux_command("LOAD_CELL_TARE", "LOAD_CELL", name, - self.cmd_LOAD_CELL_TARE, - desc=self.cmd_LOAD_CELL_TARE_help) - gcode.register_mux_command("LOAD_CELL_CALIBRATE", "LOAD_CELL", name, - self.cmd_LOAD_CELL_CALIBRATE, - desc=self.cmd_CALIBRATE_LOAD_CELL_help) - gcode.register_mux_command("LOAD_CELL_READ", "LOAD_CELL", name, - self.cmd_LOAD_CELL_READ, - desc=self.cmd_LOAD_CELL_READ_help) - gcode.register_mux_command("LOAD_CELL_DIAGNOSTIC", "LOAD_CELL", name, - self.cmd_LOAD_CELL_DIAGNOSTIC, - desc=self.cmd_LOAD_CELL_DIAGNOSTIC_help) + gcode = self.printer.lookup_object("gcode") + gcode.register_mux_command( + "LOAD_CELL_TARE", + "LOAD_CELL", + name, + self.cmd_LOAD_CELL_TARE, + desc=self.cmd_LOAD_CELL_TARE_help, + ) + gcode.register_mux_command( + "LOAD_CELL_CALIBRATE", + "LOAD_CELL", + name, + self.cmd_LOAD_CELL_CALIBRATE, + desc=self.cmd_CALIBRATE_LOAD_CELL_help, + ) + gcode.register_mux_command( + "LOAD_CELL_READ", + "LOAD_CELL", + name, + self.cmd_LOAD_CELL_READ, + desc=self.cmd_LOAD_CELL_READ_help, + ) + gcode.register_mux_command( + "LOAD_CELL_DIAGNOSTIC", + "LOAD_CELL", + name, + self.cmd_LOAD_CELL_DIAGNOSTIC, + desc=self.cmd_LOAD_CELL_DIAGNOSTIC_help, + ) cmd_LOAD_CELL_TARE_help = "Set the Zero point of the load cell" + def cmd_LOAD_CELL_TARE(self, gcmd): tare_counts = self.load_cell.avg_counts() self.load_cell.tare(tare_counts) tare_percent = self.load_cell.counts_to_percent(tare_counts) - gcmd.respond_info("Load cell tare value: %.2f%% (%i)" - % (tare_percent, tare_counts)) + gcmd.respond_info( + "Load cell tare value: %.2f%% (%i)" % (tare_percent, tare_counts) + ) cmd_CALIBRATE_LOAD_CELL_help = "Start interactive calibration tool" + def cmd_LOAD_CELL_CALIBRATE(self, gcmd): LoadCellGuidedCalibrationHelper(self.printer, self.load_cell) cmd_LOAD_CELL_READ_help = "Take a reading from the load cell" + def cmd_LOAD_CELL_READ(self, gcmd): counts = self.load_cell.avg_counts() percent = self.load_cell.counts_to_percent(counts) @@ -105,16 +130,19 @@ class LoadCellCommandHelper: gcmd.respond_info("%.1fg (%.2f%%)" % (force, percent)) cmd_LOAD_CELL_DIAGNOSTIC_help = "Check the health of the load cell" + def cmd_LOAD_CELL_DIAGNOSTIC(self, gcmd): gcmd.respond_info("Collecting load cell data for 10 seconds...") collector = self.load_cell.get_collector() reactor = self.printer.get_reactor() collector.start_collecting() - reactor.pause(reactor.monotonic() + 10.) + reactor.pause(reactor.monotonic() + 10.0) samples, errors = collector.stop_collecting() if errors: - gcmd.respond_info("Sensor reported errors: %i errors," - " %i overflows" % (errors[0], errors[1])) + gcmd.respond_info( + "Sensor reported errors: %i errors," + " %i overflows" % (errors[0], errors[1]) + ) else: gcmd.respond_info("Sensor reported no errors") if not samples: @@ -132,41 +160,49 @@ class LoadCellCommandHelper: if len(samples) > 2: sensor_sps = self.load_cell.sensor.get_samples_per_second() sps = float(len(samples)) / (samples[-1][0] - samples[0][0]) - gcmd.respond_info("Measured samples per second: %.1f, " - "configured: %.1f" % (sps, sensor_sps)) - gcmd.respond_info("Good samples: %i, Saturated samples: %i, Unique" - " values: %i" % (good_count, saturation_count, - len(set(counts)))) + gcmd.respond_info( + "Measured samples per second: %.1f, " + "configured: %.1f" % (sps, sensor_sps) + ) + gcmd.respond_info( + "Good samples: %i, Saturated samples: %i, Unique" + " values: %i" % (good_count, saturation_count, len(set(counts))) + ) max_pct = self.load_cell.counts_to_percent(max(counts)) min_pct = self.load_cell.counts_to_percent(min(counts)) - gcmd.respond_info("Sample range: [%.2f%% to %.2f%%]" - % (min_pct, max_pct)) - gcmd.respond_info("Sample range / sensor capacity: %.5f%%" - % ((max_pct - min_pct) / 2.)) + gcmd.respond_info("Sample range: [%.2f%% to %.2f%%]" % (min_pct, max_pct)) + gcmd.respond_info( + "Sample range / sensor capacity: %.5f%%" % ((max_pct - min_pct) / 2.0) + ) + # Class to guide the user through calibrating a load cell class LoadCellGuidedCalibrationHelper: def __init__(self, printer, load_cell): self.printer = printer - self.gcode = printer.lookup_object('gcode') + self.gcode = printer.lookup_object("gcode") self.load_cell = load_cell self._tare_counts = self._counts_per_gram = None - self.tare_percent = 0. + self.tare_percent = 0.0 self.register_commands() self.gcode.respond_info( "Starting load cell calibration. \n" "1.) Remove all load and run TARE. \n" "2.) Apply a known load, run CALIBRATE GRAMS=nnn. \n" "Complete calibration with the ACCEPT command.\n" - "Use the ABORT command to quit.") + "Use the ABORT command to quit." + ) - def verify_no_active_calibration(self,): + def verify_no_active_calibration( + self, + ): try: - self.gcode.register_command('TARE', 'dummy') + self.gcode.register_command("TARE", "dummy") except self.printer.config_error as e: raise self.gcode.error( - "Already Calibrating a Load Cell. Use ABORT to quit.") - self.gcode.register_command('TARE', None) + "Already Calibrating a Load Cell. Use ABORT to quit." + ) + self.gcode.register_command("TARE", None) def register_commands(self): self.verify_no_active_calibration() @@ -174,8 +210,7 @@ class LoadCellGuidedCalibrationHelper: register_command("ABORT", self.cmd_ABORT, desc=self.cmd_ABORT_help) register_command("ACCEPT", self.cmd_ACCEPT, desc=self.cmd_ACCEPT_help) register_command("TARE", self.cmd_TARE, desc=self.cmd_TARE_help) - register_command("CALIBRATE", self.cmd_CALIBRATE, - desc=self.cmd_CALIBRATE_help) + register_command("CALIBRATE", self.cmd_CALIBRATE, desc=self.cmd_CALIBRATE_help) # convert the delta of counts to a counts/gram metric def counts_per_gram(self, grams, cal_counts): @@ -185,85 +220,102 @@ class LoadCellGuidedCalibrationHelper: # given tare bias, at saturation in kilograms def capacity_kg(self, counts_per_gram): range_min, range_max = self.load_cell.saturation_range() - return (int((range_max - abs(self._tare_counts)) / counts_per_gram) - / 1000.) + return int((range_max - abs(self._tare_counts)) / counts_per_gram) / 1000.0 def finalize(self, save_results=False): - for name in ['ABORT', 'ACCEPT', 'TARE', 'CALIBRATE']: + for name in ["ABORT", "ACCEPT", "TARE", "CALIBRATE"]: self.gcode.register_command(name, None) if not save_results: self.gcode.respond_info("Load cell calibration aborted") return if self._counts_per_gram is None or self._tare_counts is None: - self.gcode.respond_info("Calibration process is incomplete, " - "aborting") + self.gcode.respond_info("Calibration process is incomplete, " "aborting") self.load_cell.set_calibration(self._counts_per_gram, self._tare_counts) - self.gcode.respond_info("Load cell calibration settings:\n\n" + self.gcode.respond_info( + "Load cell calibration settings:\n\n" "counts_per_gram: %.6f\n" "reference_tare_counts: %i\n\n" "The SAVE_CONFIG command will update the printer config file" " with the above and restart the printer." - % (self._counts_per_gram, self._tare_counts)) + % (self._counts_per_gram, self._tare_counts) + ) self.load_cell.tare(self._tare_counts) cmd_ABORT_help = "Abort load cell calibration tool" + def cmd_ABORT(self, gcmd): self.finalize(False) cmd_ACCEPT_help = "Accept calibration results and apply to load cell" + def cmd_ACCEPT(self, gcmd): self.finalize(True) cmd_TARE_help = "Tare the load cell" + def cmd_TARE(self, gcmd): self._tare_counts = self.load_cell.avg_counts() self._counts_per_gram = None # require re-calibration on tare self.tare_percent = self.load_cell.counts_to_percent(self._tare_counts) - gcmd.respond_info("Load cell tare value: %.2f%% (%i)" - % (self.tare_percent, self._tare_counts)) - if self.tare_percent > 2.: + gcmd.respond_info( + "Load cell tare value: %.2f%% (%i)" % (self.tare_percent, self._tare_counts) + ) + if self.tare_percent > 2.0: gcmd.respond_info( "WARNING: tare value is more than 2% away from 0!\n" "The load cell's range will be impacted.\n" - "Check for external force on the load cell.") - gcmd.respond_info("Now apply a known force to the load cell and enter \ - the force value with:\n CALIBRATE GRAMS=nnn") + "Check for external force on the load cell." + ) + gcmd.respond_info( + "Now apply a known force to the load cell and enter \ + the force value with:\n CALIBRATE GRAMS=nnn" + ) cmd_CALIBRATE_help = "Enter the load cell value in grams" + def cmd_CALIBRATE(self, gcmd): if self._tare_counts is None: gcmd.respond_info("You must use TARE first.") return - grams = gcmd.get_float("GRAMS", minval=50., maxval=25000.) + grams = gcmd.get_float("GRAMS", minval=50.0, maxval=25000.0) cal_counts = self.load_cell.avg_counts() cal_percent = self.load_cell.counts_to_percent(cal_counts) c_per_g = self.counts_per_gram(grams, cal_counts) cap_kg = self.capacity_kg(c_per_g) - gcmd.respond_info("Calibration value: %.2f%% (%i), Counts/gram: %.5f, \ + gcmd.respond_info( + "Calibration value: %.2f%% (%i), Counts/gram: %.5f, \ Total capacity: +/- %0.2fKg" - % (cal_percent, cal_counts, c_per_g, cap_kg)) + % (cal_percent, cal_counts, c_per_g, cap_kg) + ) range_min, range_max = self.load_cell.saturation_range() if cal_counts >= range_max or cal_counts <= range_min: raise self.printer.command_error( "ERROR: Sensor is saturated with too much load!\n" - "Use less force to calibrate the load cell.") + "Use less force to calibrate the load cell." + ) if cal_counts == self._tare_counts: raise self.printer.command_error( "ERROR: Tare and Calibration readings are the same!\n" - "Check wiring and validate sensor with READ_LOAD_CELL command.") - if (abs(cal_percent - self.tare_percent)) < 1.: + "Check wiring and validate sensor with READ_LOAD_CELL command." + ) + if (abs(cal_percent - self.tare_percent)) < 1.0: raise self.printer.command_error( "ERROR: Tare and Calibration readings are less than 1% " "different!\n" - "Use more force when calibrating or a higher sensor gain.") + "Use more force when calibrating or a higher sensor gain." + ) # only set _counts_per_gram after all errors are raised self._counts_per_gram = c_per_g - if cap_kg < 1.: - gcmd.respond_info("WARNING: Load cell capacity is less than 1kg!\n" - "Check wiring and consider using a lower sensor gain.") - if cap_kg > 25.: - gcmd.respond_info("WARNING: Load cell capacity is more than 25Kg!\n" - "Check wiring and consider using a higher sensor gain.") + if cap_kg < 1.0: + gcmd.respond_info( + "WARNING: Load cell capacity is less than 1kg!\n" + "Check wiring and consider using a lower sensor gain." + ) + if cap_kg > 25.0: + gcmd.respond_info( + "WARNING: Load cell capacity is more than 25Kg!\n" + "Check wiring and consider using a higher sensor gain." + ) gcmd.respond_info("Accept calibration with the ACCEPT command.") @@ -272,13 +324,15 @@ class LoadCellGuidedCalibrationHelper: # can collect a minimum n samples or collect until a specific print_time # samples returned in [[time],[force],[counts]] arrays for easy processing RETRY_DELAY = 0.05 # 20Hz + + class LoadCellSampleCollector: def __init__(self, printer, load_cell): self._printer = printer self._load_cell = load_cell self._reactor = printer.get_reactor() self._mcu = load_cell.sensor.get_mcu() - self.min_time = 0. + self.min_time = 0.0 self.max_time = float("inf") self.min_count = float("inf") # In Python 3.5 math.inf is better self.is_started = False @@ -289,9 +343,9 @@ class LoadCellSampleCollector: def _on_samples(self, msg): if not self.is_started: return False # already stopped, ignore - self._errors += msg['errors'] - self._overflows += msg['overflows'] - samples = msg['data'] + self._errors += msg["errors"] + self._overflows += msg["overflows"] + samples = msg["data"] for sample in samples: time = sample[0] if self.min_time <= time <= self.max_time: @@ -304,7 +358,7 @@ class LoadCellSampleCollector: def _finish_collecting(self): self.is_started = False - self.min_time = 0. + self.min_time = 0.0 self.max_time = float("inf") self.min_count = float("inf") # In Python 3.5 math.inf is better samples = self._samples @@ -323,7 +377,8 @@ class LoadCellSampleCollector: self._finish_collecting() raise self._printer.command_error( "LoadCellSampleCollector timed out! Errors: %i," - " Overflows: %i" % (self._errors, self._overflows)) + " Overflows: %i" % (self._errors, self._overflows) + ) self._reactor.pause(now + RETRY_DELAY) return self._finish_collecting() @@ -348,38 +403,45 @@ class LoadCellSampleCollector: print_time = self._mcu.estimated_print_time(self._reactor.monotonic()) start_time = max(print_time, self.min_time) sps = self._load_cell.sensor.get_samples_per_second() - return self._collect_until(start_time + 1. + (min_count / sps)) + return self._collect_until(start_time + 1.0 + (min_count / sps)) # returns when a sample is collected with a timestamp after print_time def collect_until(self, print_time=None): self.max_time = print_time if len(self._samples) and self._samples[-1][0] >= print_time: return self._finish_collecting() - return self._collect_until(self.max_time + 1.) + return self._collect_until(self.max_time + 1.0) + # Printer class that controls the load cell -MIN_COUNTS_PER_GRAM = 1. +MIN_COUNTS_PER_GRAM = 1.0 + + class LoadCell: def __init__(self, config, sensor): self.printer = printer = config.get_printer() self.config_name = config.get_name() self.name = config.get_name().split()[-1] - self.sensor = sensor # must implement BulkSensorAdc + self.sensor = sensor # must implement BulkSensorAdc buffer_size = sensor.get_samples_per_second() // 2 self._force_buffer = collections.deque(maxlen=buffer_size) - self.reference_tare_counts = config.getint('reference_tare_counts', - default=None) + self.reference_tare_counts = config.getint( + "reference_tare_counts", default=None + ) self.tare_counts = self.reference_tare_counts - self.counts_per_gram = config.getfloat('counts_per_gram', - minval=MIN_COUNTS_PER_GRAM, default=None) - self.invert = config.getchoice('sensor_orientation', - {'normal': 1., 'inverted': -1.}, default="normal") + self.counts_per_gram = config.getfloat( + "counts_per_gram", minval=MIN_COUNTS_PER_GRAM, default=None + ) + self.invert = config.getchoice( + "sensor_orientation", {"normal": 1.0, "inverted": -1.0}, default="normal" + ) LoadCellCommandHelper(config, self) # Client support: self.clients = ApiClientHelper(printer) header = {"header": ["time", "force (g)", "counts", "tare_counts"]} - self.clients.add_mux_endpoint("load_cell/dump_force", - "load_cell", self.name, header) + self.clients.add_mux_endpoint( + "load_cell/dump_force", "load_cell", self.name, header + ) # startup, when klippy is ready, start capturing data printer.register_event_handler("klippy:ready", self._handle_ready) @@ -402,9 +464,10 @@ class LoadCell: samples = [] for row in data: # [time, grams, counts, tare_counts] - samples.append([row[0], self.counts_to_grams(row[1]), row[1], - self.tare_counts]) - msg = {'data': samples, 'errors': errors, 'overflows': overflows} + samples.append( + [row[0], self.counts_to_grams(row[1]), row[1], self.tare_counts] + ) + msg = {"data": samples, "errors": errors, "overflows": overflows} self.clients.send(msg) return True @@ -417,18 +480,21 @@ class LoadCell: self.printer.send_event("load_cell:tare", self) def set_calibration(self, counts_per_gram, tare_counts): - if (counts_per_gram is None - or abs(counts_per_gram) < MIN_COUNTS_PER_GRAM): + if counts_per_gram is None or abs(counts_per_gram) < MIN_COUNTS_PER_GRAM: raise self.printer.command_error("Invalid counts per gram value") if tare_counts is None: raise self.printer.command_error("Missing tare counts") self.counts_per_gram = counts_per_gram self.reference_tare_counts = int(tare_counts) - configfile = self.printer.lookup_object('configfile') - configfile.set(self.config_name, 'counts_per_gram', - "%.5f" % (self.counts_per_gram,)) - configfile.set(self.config_name, 'reference_tare_counts', - "%i" % (self.reference_tare_counts,)) + configfile = self.printer.lookup_object("configfile") + configfile.set( + self.config_name, "counts_per_gram", "%.5f" % (self.counts_per_gram,) + ) + configfile.set( + self.config_name, + "reference_tare_counts", + "%i" % (self.reference_tare_counts,), + ) self.printer.send_event("load_cell:calibrate", self) def counts_to_grams(self, sample): @@ -444,7 +510,7 @@ class LoadCell: # convert raw counts to a +/- percentage of the sensors range def counts_to_percent(self, counts): range_min, range_max = self.saturation_range() - return (float(counts) / float(range_max)) * 100. + return (float(counts) / float(range_max)) * 100.0 # read 1 second of load cell data and average it # performs safety checks for saturation @@ -454,40 +520,41 @@ class LoadCell: samples, errors = self.get_collector().collect_min(num_samples) if errors: raise self.printer.command_error( - "Sensor reported %i errors while sampling" - % (errors[0] + errors[1])) + "Sensor reported %i errors while sampling" % (errors[0] + errors[1]) + ) # check samples for saturated readings range_min, range_max = self.saturation_range() for sample in samples: if sample[2] >= range_max or sample[2] <= range_min: - raise self.printer.command_error( - "Some samples are saturated (+/-100%)") + raise self.printer.command_error("Some samples are saturated (+/-100%)") return avg(select_column(samples, 2)) # Provide ongoing force tracking/averaging for status updates def _track_force(self, msg): if not (self.is_calibrated() and self.is_tared()): return True - samples = msg['data'] + samples = msg["data"] # selectColumn unusable here because Python 2 lacks deque.extend for sample in samples: self._force_buffer.append(sample[1]) return True def _force_g(self): - if (self.is_calibrated() and self.is_tared() - and len(self._force_buffer) > 0): - return {"force_g": round(avg(self._force_buffer), 1), - "min_force_g": round(min(self._force_buffer), 1), - "max_force_g": round(max(self._force_buffer), 1)} + if self.is_calibrated() and self.is_tared() and len(self._force_buffer) > 0: + return { + "force_g": round(avg(self._force_buffer), 1), + "min_force_g": round(min(self._force_buffer), 1), + "max_force_g": round(max(self._force_buffer), 1), + } return {} def is_tared(self): return self.tare_counts is not None def is_calibrated(self): - return (self.counts_per_gram is not None - and self.reference_tare_counts is not None) + return ( + self.counts_per_gram is not None and self.reference_tare_counts is not None + ) def get_sensor(self): return self.sensor @@ -506,10 +573,14 @@ class LoadCell: def get_status(self, eventtime): status = self._force_g() - status.update({'is_calibrated': self.is_calibrated(), - 'counts_per_gram': self.counts_per_gram, - 'reference_tare_counts': self.reference_tare_counts, - 'tare_counts': self.tare_counts}) + status.update( + { + "is_calibrated": self.is_calibrated(), + "counts_per_gram": self.counts_per_gram, + "reference_tare_counts": self.reference_tare_counts, + "tare_counts": self.tare_counts, + } + ) return status @@ -518,8 +589,9 @@ def load_config(config): sensors = {} sensors.update(hx71x.HX71X_SENSOR_TYPES) sensors.update(ads1220.ADS1220_SENSOR_TYPE) - sensor_class = config.getchoice('sensor_type', sensors) + sensor_class = config.getchoice("sensor_type", sensors) return LoadCell(config, sensor_class(config)) + def load_config_prefix(config): return load_config(config) |