aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/load_cell.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/load_cell.py')
-rw-r--r--klippy/extras/load_cell.py294
1 files changed, 183 insertions, 111 deletions
diff --git a/klippy/extras/load_cell.py b/klippy/extras/load_cell.py
index 5ef2c5b7..3481c06e 100644
--- a/klippy/extras/load_cell.py
+++ b/klippy/extras/load_cell.py
@@ -8,20 +8,24 @@ from . import hx71x
from . import ads1220
from .bulk_sensor import BatchWebhooksClient
import collections, itertools
+
# We want either Python 3's zip() or Python 2's izip() but NOT 2's zip():
zip_impl = zip
try:
- from itertools import izip as zip_impl # python 2.x izip
-except ImportError: # will be Python 3.x
+ from itertools import izip as zip_impl # python 2.x izip
+except ImportError: # will be Python 3.x
pass
+
# alternative to numpy's column selection:
def select_column(data, column_idx):
return list(zip_impl(*data))[column_idx]
+
def avg(data):
return sum(data) / len(data)
+
# Helper for event driven webhooks and subscription based API clients
class ApiClientHelper(object):
def __init__(self, printer):
@@ -50,9 +54,10 @@ class ApiClientHelper(object):
# Set up a webhooks endpoint with a static header
def add_mux_endpoint(self, path, key, value, webhooks_start_resp):
self.webhooks_start_resp = webhooks_start_resp
- wh = self.printer.lookup_object('webhooks')
+ wh = self.printer.lookup_object("webhooks")
wh.register_mux_endpoint(path, key, value, self._add_webhooks_client)
+
# Class for handling commands related to load cells
class LoadCellCommandHelper:
def __init__(self, config, load_cell):
@@ -66,33 +71,53 @@ class LoadCellCommandHelper:
def register_commands(self, name):
# Register commands
- gcode = self.printer.lookup_object('gcode')
- gcode.register_mux_command("LOAD_CELL_TARE", "LOAD_CELL", name,
- self.cmd_LOAD_CELL_TARE,
- desc=self.cmd_LOAD_CELL_TARE_help)
- gcode.register_mux_command("LOAD_CELL_CALIBRATE", "LOAD_CELL", name,
- self.cmd_LOAD_CELL_CALIBRATE,
- desc=self.cmd_CALIBRATE_LOAD_CELL_help)
- gcode.register_mux_command("LOAD_CELL_READ", "LOAD_CELL", name,
- self.cmd_LOAD_CELL_READ,
- desc=self.cmd_LOAD_CELL_READ_help)
- gcode.register_mux_command("LOAD_CELL_DIAGNOSTIC", "LOAD_CELL", name,
- self.cmd_LOAD_CELL_DIAGNOSTIC,
- desc=self.cmd_LOAD_CELL_DIAGNOSTIC_help)
+ gcode = self.printer.lookup_object("gcode")
+ gcode.register_mux_command(
+ "LOAD_CELL_TARE",
+ "LOAD_CELL",
+ name,
+ self.cmd_LOAD_CELL_TARE,
+ desc=self.cmd_LOAD_CELL_TARE_help,
+ )
+ gcode.register_mux_command(
+ "LOAD_CELL_CALIBRATE",
+ "LOAD_CELL",
+ name,
+ self.cmd_LOAD_CELL_CALIBRATE,
+ desc=self.cmd_CALIBRATE_LOAD_CELL_help,
+ )
+ gcode.register_mux_command(
+ "LOAD_CELL_READ",
+ "LOAD_CELL",
+ name,
+ self.cmd_LOAD_CELL_READ,
+ desc=self.cmd_LOAD_CELL_READ_help,
+ )
+ gcode.register_mux_command(
+ "LOAD_CELL_DIAGNOSTIC",
+ "LOAD_CELL",
+ name,
+ self.cmd_LOAD_CELL_DIAGNOSTIC,
+ desc=self.cmd_LOAD_CELL_DIAGNOSTIC_help,
+ )
cmd_LOAD_CELL_TARE_help = "Set the Zero point of the load cell"
+
def cmd_LOAD_CELL_TARE(self, gcmd):
tare_counts = self.load_cell.avg_counts()
self.load_cell.tare(tare_counts)
tare_percent = self.load_cell.counts_to_percent(tare_counts)
- gcmd.respond_info("Load cell tare value: %.2f%% (%i)"
- % (tare_percent, tare_counts))
+ gcmd.respond_info(
+ "Load cell tare value: %.2f%% (%i)" % (tare_percent, tare_counts)
+ )
cmd_CALIBRATE_LOAD_CELL_help = "Start interactive calibration tool"
+
def cmd_LOAD_CELL_CALIBRATE(self, gcmd):
LoadCellGuidedCalibrationHelper(self.printer, self.load_cell)
cmd_LOAD_CELL_READ_help = "Take a reading from the load cell"
+
def cmd_LOAD_CELL_READ(self, gcmd):
counts = self.load_cell.avg_counts()
percent = self.load_cell.counts_to_percent(counts)
@@ -105,16 +130,19 @@ class LoadCellCommandHelper:
gcmd.respond_info("%.1fg (%.2f%%)" % (force, percent))
cmd_LOAD_CELL_DIAGNOSTIC_help = "Check the health of the load cell"
+
def cmd_LOAD_CELL_DIAGNOSTIC(self, gcmd):
gcmd.respond_info("Collecting load cell data for 10 seconds...")
collector = self.load_cell.get_collector()
reactor = self.printer.get_reactor()
collector.start_collecting()
- reactor.pause(reactor.monotonic() + 10.)
+ reactor.pause(reactor.monotonic() + 10.0)
samples, errors = collector.stop_collecting()
if errors:
- gcmd.respond_info("Sensor reported errors: %i errors,"
- " %i overflows" % (errors[0], errors[1]))
+ gcmd.respond_info(
+ "Sensor reported errors: %i errors,"
+ " %i overflows" % (errors[0], errors[1])
+ )
else:
gcmd.respond_info("Sensor reported no errors")
if not samples:
@@ -132,41 +160,49 @@ class LoadCellCommandHelper:
if len(samples) > 2:
sensor_sps = self.load_cell.sensor.get_samples_per_second()
sps = float(len(samples)) / (samples[-1][0] - samples[0][0])
- gcmd.respond_info("Measured samples per second: %.1f, "
- "configured: %.1f" % (sps, sensor_sps))
- gcmd.respond_info("Good samples: %i, Saturated samples: %i, Unique"
- " values: %i" % (good_count, saturation_count,
- len(set(counts))))
+ gcmd.respond_info(
+ "Measured samples per second: %.1f, "
+ "configured: %.1f" % (sps, sensor_sps)
+ )
+ gcmd.respond_info(
+ "Good samples: %i, Saturated samples: %i, Unique"
+ " values: %i" % (good_count, saturation_count, len(set(counts)))
+ )
max_pct = self.load_cell.counts_to_percent(max(counts))
min_pct = self.load_cell.counts_to_percent(min(counts))
- gcmd.respond_info("Sample range: [%.2f%% to %.2f%%]"
- % (min_pct, max_pct))
- gcmd.respond_info("Sample range / sensor capacity: %.5f%%"
- % ((max_pct - min_pct) / 2.))
+ gcmd.respond_info("Sample range: [%.2f%% to %.2f%%]" % (min_pct, max_pct))
+ gcmd.respond_info(
+ "Sample range / sensor capacity: %.5f%%" % ((max_pct - min_pct) / 2.0)
+ )
+
# Class to guide the user through calibrating a load cell
class LoadCellGuidedCalibrationHelper:
def __init__(self, printer, load_cell):
self.printer = printer
- self.gcode = printer.lookup_object('gcode')
+ self.gcode = printer.lookup_object("gcode")
self.load_cell = load_cell
self._tare_counts = self._counts_per_gram = None
- self.tare_percent = 0.
+ self.tare_percent = 0.0
self.register_commands()
self.gcode.respond_info(
"Starting load cell calibration. \n"
"1.) Remove all load and run TARE. \n"
"2.) Apply a known load, run CALIBRATE GRAMS=nnn. \n"
"Complete calibration with the ACCEPT command.\n"
- "Use the ABORT command to quit.")
+ "Use the ABORT command to quit."
+ )
- def verify_no_active_calibration(self,):
+ def verify_no_active_calibration(
+ self,
+ ):
try:
- self.gcode.register_command('TARE', 'dummy')
+ self.gcode.register_command("TARE", "dummy")
except self.printer.config_error as e:
raise self.gcode.error(
- "Already Calibrating a Load Cell. Use ABORT to quit.")
- self.gcode.register_command('TARE', None)
+ "Already Calibrating a Load Cell. Use ABORT to quit."
+ )
+ self.gcode.register_command("TARE", None)
def register_commands(self):
self.verify_no_active_calibration()
@@ -174,8 +210,7 @@ class LoadCellGuidedCalibrationHelper:
register_command("ABORT", self.cmd_ABORT, desc=self.cmd_ABORT_help)
register_command("ACCEPT", self.cmd_ACCEPT, desc=self.cmd_ACCEPT_help)
register_command("TARE", self.cmd_TARE, desc=self.cmd_TARE_help)
- register_command("CALIBRATE", self.cmd_CALIBRATE,
- desc=self.cmd_CALIBRATE_help)
+ register_command("CALIBRATE", self.cmd_CALIBRATE, desc=self.cmd_CALIBRATE_help)
# convert the delta of counts to a counts/gram metric
def counts_per_gram(self, grams, cal_counts):
@@ -185,85 +220,102 @@ class LoadCellGuidedCalibrationHelper:
# given tare bias, at saturation in kilograms
def capacity_kg(self, counts_per_gram):
range_min, range_max = self.load_cell.saturation_range()
- return (int((range_max - abs(self._tare_counts)) / counts_per_gram)
- / 1000.)
+ return int((range_max - abs(self._tare_counts)) / counts_per_gram) / 1000.0
def finalize(self, save_results=False):
- for name in ['ABORT', 'ACCEPT', 'TARE', 'CALIBRATE']:
+ for name in ["ABORT", "ACCEPT", "TARE", "CALIBRATE"]:
self.gcode.register_command(name, None)
if not save_results:
self.gcode.respond_info("Load cell calibration aborted")
return
if self._counts_per_gram is None or self._tare_counts is None:
- self.gcode.respond_info("Calibration process is incomplete, "
- "aborting")
+ self.gcode.respond_info("Calibration process is incomplete, " "aborting")
self.load_cell.set_calibration(self._counts_per_gram, self._tare_counts)
- self.gcode.respond_info("Load cell calibration settings:\n\n"
+ self.gcode.respond_info(
+ "Load cell calibration settings:\n\n"
"counts_per_gram: %.6f\n"
"reference_tare_counts: %i\n\n"
"The SAVE_CONFIG command will update the printer config file"
" with the above and restart the printer."
- % (self._counts_per_gram, self._tare_counts))
+ % (self._counts_per_gram, self._tare_counts)
+ )
self.load_cell.tare(self._tare_counts)
cmd_ABORT_help = "Abort load cell calibration tool"
+
def cmd_ABORT(self, gcmd):
self.finalize(False)
cmd_ACCEPT_help = "Accept calibration results and apply to load cell"
+
def cmd_ACCEPT(self, gcmd):
self.finalize(True)
cmd_TARE_help = "Tare the load cell"
+
def cmd_TARE(self, gcmd):
self._tare_counts = self.load_cell.avg_counts()
self._counts_per_gram = None # require re-calibration on tare
self.tare_percent = self.load_cell.counts_to_percent(self._tare_counts)
- gcmd.respond_info("Load cell tare value: %.2f%% (%i)"
- % (self.tare_percent, self._tare_counts))
- if self.tare_percent > 2.:
+ gcmd.respond_info(
+ "Load cell tare value: %.2f%% (%i)" % (self.tare_percent, self._tare_counts)
+ )
+ if self.tare_percent > 2.0:
gcmd.respond_info(
"WARNING: tare value is more than 2% away from 0!\n"
"The load cell's range will be impacted.\n"
- "Check for external force on the load cell.")
- gcmd.respond_info("Now apply a known force to the load cell and enter \
- the force value with:\n CALIBRATE GRAMS=nnn")
+ "Check for external force on the load cell."
+ )
+ gcmd.respond_info(
+ "Now apply a known force to the load cell and enter \
+ the force value with:\n CALIBRATE GRAMS=nnn"
+ )
cmd_CALIBRATE_help = "Enter the load cell value in grams"
+
def cmd_CALIBRATE(self, gcmd):
if self._tare_counts is None:
gcmd.respond_info("You must use TARE first.")
return
- grams = gcmd.get_float("GRAMS", minval=50., maxval=25000.)
+ grams = gcmd.get_float("GRAMS", minval=50.0, maxval=25000.0)
cal_counts = self.load_cell.avg_counts()
cal_percent = self.load_cell.counts_to_percent(cal_counts)
c_per_g = self.counts_per_gram(grams, cal_counts)
cap_kg = self.capacity_kg(c_per_g)
- gcmd.respond_info("Calibration value: %.2f%% (%i), Counts/gram: %.5f, \
+ gcmd.respond_info(
+ "Calibration value: %.2f%% (%i), Counts/gram: %.5f, \
Total capacity: +/- %0.2fKg"
- % (cal_percent, cal_counts, c_per_g, cap_kg))
+ % (cal_percent, cal_counts, c_per_g, cap_kg)
+ )
range_min, range_max = self.load_cell.saturation_range()
if cal_counts >= range_max or cal_counts <= range_min:
raise self.printer.command_error(
"ERROR: Sensor is saturated with too much load!\n"
- "Use less force to calibrate the load cell.")
+ "Use less force to calibrate the load cell."
+ )
if cal_counts == self._tare_counts:
raise self.printer.command_error(
"ERROR: Tare and Calibration readings are the same!\n"
- "Check wiring and validate sensor with READ_LOAD_CELL command.")
- if (abs(cal_percent - self.tare_percent)) < 1.:
+ "Check wiring and validate sensor with READ_LOAD_CELL command."
+ )
+ if (abs(cal_percent - self.tare_percent)) < 1.0:
raise self.printer.command_error(
"ERROR: Tare and Calibration readings are less than 1% "
"different!\n"
- "Use more force when calibrating or a higher sensor gain.")
+ "Use more force when calibrating or a higher sensor gain."
+ )
# only set _counts_per_gram after all errors are raised
self._counts_per_gram = c_per_g
- if cap_kg < 1.:
- gcmd.respond_info("WARNING: Load cell capacity is less than 1kg!\n"
- "Check wiring and consider using a lower sensor gain.")
- if cap_kg > 25.:
- gcmd.respond_info("WARNING: Load cell capacity is more than 25Kg!\n"
- "Check wiring and consider using a higher sensor gain.")
+ if cap_kg < 1.0:
+ gcmd.respond_info(
+ "WARNING: Load cell capacity is less than 1kg!\n"
+ "Check wiring and consider using a lower sensor gain."
+ )
+ if cap_kg > 25.0:
+ gcmd.respond_info(
+ "WARNING: Load cell capacity is more than 25Kg!\n"
+ "Check wiring and consider using a higher sensor gain."
+ )
gcmd.respond_info("Accept calibration with the ACCEPT command.")
@@ -272,13 +324,15 @@ class LoadCellGuidedCalibrationHelper:
# can collect a minimum n samples or collect until a specific print_time
# samples returned in [[time],[force],[counts]] arrays for easy processing
RETRY_DELAY = 0.05 # 20Hz
+
+
class LoadCellSampleCollector:
def __init__(self, printer, load_cell):
self._printer = printer
self._load_cell = load_cell
self._reactor = printer.get_reactor()
self._mcu = load_cell.sensor.get_mcu()
- self.min_time = 0.
+ self.min_time = 0.0
self.max_time = float("inf")
self.min_count = float("inf") # In Python 3.5 math.inf is better
self.is_started = False
@@ -289,9 +343,9 @@ class LoadCellSampleCollector:
def _on_samples(self, msg):
if not self.is_started:
return False # already stopped, ignore
- self._errors += msg['errors']
- self._overflows += msg['overflows']
- samples = msg['data']
+ self._errors += msg["errors"]
+ self._overflows += msg["overflows"]
+ samples = msg["data"]
for sample in samples:
time = sample[0]
if self.min_time <= time <= self.max_time:
@@ -304,7 +358,7 @@ class LoadCellSampleCollector:
def _finish_collecting(self):
self.is_started = False
- self.min_time = 0.
+ self.min_time = 0.0
self.max_time = float("inf")
self.min_count = float("inf") # In Python 3.5 math.inf is better
samples = self._samples
@@ -323,7 +377,8 @@ class LoadCellSampleCollector:
self._finish_collecting()
raise self._printer.command_error(
"LoadCellSampleCollector timed out! Errors: %i,"
- " Overflows: %i" % (self._errors, self._overflows))
+ " Overflows: %i" % (self._errors, self._overflows)
+ )
self._reactor.pause(now + RETRY_DELAY)
return self._finish_collecting()
@@ -348,38 +403,45 @@ class LoadCellSampleCollector:
print_time = self._mcu.estimated_print_time(self._reactor.monotonic())
start_time = max(print_time, self.min_time)
sps = self._load_cell.sensor.get_samples_per_second()
- return self._collect_until(start_time + 1. + (min_count / sps))
+ return self._collect_until(start_time + 1.0 + (min_count / sps))
# returns when a sample is collected with a timestamp after print_time
def collect_until(self, print_time=None):
self.max_time = print_time
if len(self._samples) and self._samples[-1][0] >= print_time:
return self._finish_collecting()
- return self._collect_until(self.max_time + 1.)
+ return self._collect_until(self.max_time + 1.0)
+
# Printer class that controls the load cell
-MIN_COUNTS_PER_GRAM = 1.
+MIN_COUNTS_PER_GRAM = 1.0
+
+
class LoadCell:
def __init__(self, config, sensor):
self.printer = printer = config.get_printer()
self.config_name = config.get_name()
self.name = config.get_name().split()[-1]
- self.sensor = sensor # must implement BulkSensorAdc
+ self.sensor = sensor # must implement BulkSensorAdc
buffer_size = sensor.get_samples_per_second() // 2
self._force_buffer = collections.deque(maxlen=buffer_size)
- self.reference_tare_counts = config.getint('reference_tare_counts',
- default=None)
+ self.reference_tare_counts = config.getint(
+ "reference_tare_counts", default=None
+ )
self.tare_counts = self.reference_tare_counts
- self.counts_per_gram = config.getfloat('counts_per_gram',
- minval=MIN_COUNTS_PER_GRAM, default=None)
- self.invert = config.getchoice('sensor_orientation',
- {'normal': 1., 'inverted': -1.}, default="normal")
+ self.counts_per_gram = config.getfloat(
+ "counts_per_gram", minval=MIN_COUNTS_PER_GRAM, default=None
+ )
+ self.invert = config.getchoice(
+ "sensor_orientation", {"normal": 1.0, "inverted": -1.0}, default="normal"
+ )
LoadCellCommandHelper(config, self)
# Client support:
self.clients = ApiClientHelper(printer)
header = {"header": ["time", "force (g)", "counts", "tare_counts"]}
- self.clients.add_mux_endpoint("load_cell/dump_force",
- "load_cell", self.name, header)
+ self.clients.add_mux_endpoint(
+ "load_cell/dump_force", "load_cell", self.name, header
+ )
# startup, when klippy is ready, start capturing data
printer.register_event_handler("klippy:ready", self._handle_ready)
@@ -402,9 +464,10 @@ class LoadCell:
samples = []
for row in data:
# [time, grams, counts, tare_counts]
- samples.append([row[0], self.counts_to_grams(row[1]), row[1],
- self.tare_counts])
- msg = {'data': samples, 'errors': errors, 'overflows': overflows}
+ samples.append(
+ [row[0], self.counts_to_grams(row[1]), row[1], self.tare_counts]
+ )
+ msg = {"data": samples, "errors": errors, "overflows": overflows}
self.clients.send(msg)
return True
@@ -417,18 +480,21 @@ class LoadCell:
self.printer.send_event("load_cell:tare", self)
def set_calibration(self, counts_per_gram, tare_counts):
- if (counts_per_gram is None
- or abs(counts_per_gram) < MIN_COUNTS_PER_GRAM):
+ if counts_per_gram is None or abs(counts_per_gram) < MIN_COUNTS_PER_GRAM:
raise self.printer.command_error("Invalid counts per gram value")
if tare_counts is None:
raise self.printer.command_error("Missing tare counts")
self.counts_per_gram = counts_per_gram
self.reference_tare_counts = int(tare_counts)
- configfile = self.printer.lookup_object('configfile')
- configfile.set(self.config_name, 'counts_per_gram',
- "%.5f" % (self.counts_per_gram,))
- configfile.set(self.config_name, 'reference_tare_counts',
- "%i" % (self.reference_tare_counts,))
+ configfile = self.printer.lookup_object("configfile")
+ configfile.set(
+ self.config_name, "counts_per_gram", "%.5f" % (self.counts_per_gram,)
+ )
+ configfile.set(
+ self.config_name,
+ "reference_tare_counts",
+ "%i" % (self.reference_tare_counts,),
+ )
self.printer.send_event("load_cell:calibrate", self)
def counts_to_grams(self, sample):
@@ -444,7 +510,7 @@ class LoadCell:
# convert raw counts to a +/- percentage of the sensors range
def counts_to_percent(self, counts):
range_min, range_max = self.saturation_range()
- return (float(counts) / float(range_max)) * 100.
+ return (float(counts) / float(range_max)) * 100.0
# read 1 second of load cell data and average it
# performs safety checks for saturation
@@ -454,40 +520,41 @@ class LoadCell:
samples, errors = self.get_collector().collect_min(num_samples)
if errors:
raise self.printer.command_error(
- "Sensor reported %i errors while sampling"
- % (errors[0] + errors[1]))
+ "Sensor reported %i errors while sampling" % (errors[0] + errors[1])
+ )
# check samples for saturated readings
range_min, range_max = self.saturation_range()
for sample in samples:
if sample[2] >= range_max or sample[2] <= range_min:
- raise self.printer.command_error(
- "Some samples are saturated (+/-100%)")
+ raise self.printer.command_error("Some samples are saturated (+/-100%)")
return avg(select_column(samples, 2))
# Provide ongoing force tracking/averaging for status updates
def _track_force(self, msg):
if not (self.is_calibrated() and self.is_tared()):
return True
- samples = msg['data']
+ samples = msg["data"]
# selectColumn unusable here because Python 2 lacks deque.extend
for sample in samples:
self._force_buffer.append(sample[1])
return True
def _force_g(self):
- if (self.is_calibrated() and self.is_tared()
- and len(self._force_buffer) > 0):
- return {"force_g": round(avg(self._force_buffer), 1),
- "min_force_g": round(min(self._force_buffer), 1),
- "max_force_g": round(max(self._force_buffer), 1)}
+ if self.is_calibrated() and self.is_tared() and len(self._force_buffer) > 0:
+ return {
+ "force_g": round(avg(self._force_buffer), 1),
+ "min_force_g": round(min(self._force_buffer), 1),
+ "max_force_g": round(max(self._force_buffer), 1),
+ }
return {}
def is_tared(self):
return self.tare_counts is not None
def is_calibrated(self):
- return (self.counts_per_gram is not None
- and self.reference_tare_counts is not None)
+ return (
+ self.counts_per_gram is not None and self.reference_tare_counts is not None
+ )
def get_sensor(self):
return self.sensor
@@ -506,10 +573,14 @@ class LoadCell:
def get_status(self, eventtime):
status = self._force_g()
- status.update({'is_calibrated': self.is_calibrated(),
- 'counts_per_gram': self.counts_per_gram,
- 'reference_tare_counts': self.reference_tare_counts,
- 'tare_counts': self.tare_counts})
+ status.update(
+ {
+ "is_calibrated": self.is_calibrated(),
+ "counts_per_gram": self.counts_per_gram,
+ "reference_tare_counts": self.reference_tare_counts,
+ "tare_counts": self.tare_counts,
+ }
+ )
return status
@@ -518,8 +589,9 @@ def load_config(config):
sensors = {}
sensors.update(hx71x.HX71X_SENSOR_TYPES)
sensors.update(ads1220.ADS1220_SENSOR_TYPE)
- sensor_class = config.getchoice('sensor_type', sensors)
+ sensor_class = config.getchoice("sensor_type", sensors)
return LoadCell(config, sensor_class(config))
+
def load_config_prefix(config):
return load_config(config)