aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/hx71x.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/hx71x.py')
-rw-r--r--klippy/extras/hx71x.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/klippy/extras/hx71x.py b/klippy/extras/hx71x.py
new file mode 100644
index 00000000..1b1128ac
--- /dev/null
+++ b/klippy/extras/hx71x.py
@@ -0,0 +1,170 @@
+# HX711/HX717 Support
+#
+# Copyright (C) 2024 Gareth Farrington <gareth@waves.ky>
+#
+# This file may be distributed under the terms of the GNU GPLv3 license.
+import logging
+from . import bulk_sensor
+
+#
+# Constants
+#
+UPDATE_INTERVAL = 0.10
+SAMPLE_ERROR_DESYNC = -0x80000000
+SAMPLE_ERROR_LONG_READ = 0x40000000
+
+# Implementation of HX711 and HX717
+class HX71xBase():
+ def __init__(self, config, sensor_type,
+ sample_rate_options, default_sample_rate,
+ gain_options, default_gain):
+ self.printer = printer = config.get_printer()
+ self.name = config.get_name().split()[-1]
+ self.last_error_count = 0
+ self.consecutive_fails = 0
+ self.sensor_type = sensor_type
+ # Chip options
+ dout_pin_name = config.get('dout_pin')
+ sclk_pin_name = config.get('sclk_pin')
+ ppins = printer.lookup_object('pins')
+ dout_ppin = ppins.lookup_pin(dout_pin_name)
+ sclk_ppin = ppins.lookup_pin(sclk_pin_name)
+ self.mcu = mcu = dout_ppin['chip']
+ self.oid = mcu.create_oid()
+ if sclk_ppin['chip'] is not mcu:
+ raise config.error("%s config error: All pins must be "
+ "connected to the same MCU" % (self.name,))
+ self.dout_pin = dout_ppin['pin']
+ self.sclk_pin = sclk_ppin['pin']
+ # Samples per second choices
+ self.sps = config.getchoice('sample_rate', sample_rate_options,
+ default=default_sample_rate)
+ # gain/channel choices
+ self.gain_channel = int(config.getchoice('gain', gain_options,
+ default=default_gain))
+ ## Bulk Sensor Setup
+ self.bulk_queue = bulk_sensor.BulkDataQueue(mcu, oid=self.oid)
+ # Clock tracking
+ chip_smooth = self.sps * UPDATE_INTERVAL * 2
+ self.ffreader = bulk_sensor.FixedFreqReader(mcu, chip_smooth, "<i")
+ # Process messages in batches
+ self.batch_bulk = bulk_sensor.BatchBulkHelper(
+ self.printer, self._process_batch, self._start_measurements,
+ self._finish_measurements, UPDATE_INTERVAL)
+ # publish raw samples to the socket
+ dump_path = "%s/dump_%s" % (sensor_type, sensor_type)
+ self.batch_bulk.add_mux_endpoint(dump_path, "sensor", self.name,
+ {'header': ('time', 'counts')})
+ # Command Configuration
+ self.query_hx71x_cmd = None
+ mcu.add_config_cmd(
+ "config_hx71x oid=%d gain_channel=%d dout_pin=%s sclk_pin=%s"
+ % (self.oid, self.gain_channel, self.dout_pin, self.sclk_pin))
+ mcu.add_config_cmd("query_hx71x oid=%d rest_ticks=0"
+ % (self.oid,), on_restart=True)
+
+ mcu.register_config_callback(self._build_config)
+
+ def _build_config(self):
+ self.query_hx71x_cmd = self.mcu.lookup_command(
+ "query_hx71x oid=%c rest_ticks=%u")
+ self.ffreader.setup_query_command("query_hx71x_status oid=%c",
+ oid=self.oid,
+ cq=self.mcu.alloc_command_queue())
+
+ def get_mcu(self):
+ return self.mcu
+
+ def get_samples_per_second(self):
+ return self.sps
+
+ # returns a tuple of the minimum and maximum value of the sensor, used to
+ # detect if a data value is saturated
+ def get_range(self):
+ return -0x800000, 0x7FFFFF
+
+ # add_client interface, direct pass through to bulk_sensor API
+ def add_client(self, callback):
+ self.batch_bulk.add_client(callback)
+
+ # Measurement decoding
+ def _convert_samples(self, samples):
+ adc_factor = 1. / (1 << 23)
+ count = 0
+ for ptime, val in samples:
+ if val == SAMPLE_ERROR_DESYNC or val == SAMPLE_ERROR_LONG_READ:
+ self.last_error_count += 1
+ break # additional errors are duplicates
+ samples[count] = (round(ptime, 6), val, round(val * adc_factor, 9))
+ count += 1
+ del samples[count:]
+
+ # Start, stop, and process message batches
+ def _start_measurements(self):
+ self.consecutive_fails = 0
+ self.last_error_count = 0
+ # Start bulk reading
+ rest_ticks = self.mcu.seconds_to_clock(1. / (10. * self.sps))
+ self.query_hx71x_cmd.send([self.oid, rest_ticks])
+ logging.info("%s starting '%s' measurements",
+ self.sensor_type, self.name)
+ # Initialize clock tracking
+ self.ffreader.note_start()
+
+ def _finish_measurements(self):
+ # don't use serial connection after shutdown
+ if self.printer.is_shutdown():
+ return
+ # Halt bulk reading
+ self.query_hx71x_cmd.send_wait_ack([self.oid, 0])
+ self.ffreader.note_end()
+ logging.info("%s finished '%s' measurements",
+ self.sensor_type, self.name)
+
+ def _process_batch(self, eventtime):
+ prev_overflows = self.ffreader.get_last_overflows()
+ prev_error_count = self.last_error_count
+ samples = self.ffreader.pull_samples()
+ self._convert_samples(samples)
+ overflows = self.ffreader.get_last_overflows() - prev_overflows
+ errors = self.last_error_count - prev_error_count
+ if errors > 0:
+ logging.error("%s: Forced sensor restart due to error", self.name)
+ self._finish_measurements()
+ self._start_measurements()
+ elif overflows > 0:
+ self.consecutive_fails += 1
+ if self.consecutive_fails > 4:
+ logging.error("%s: Forced sensor restart due to overflows",
+ self.name)
+ self._finish_measurements()
+ self._start_measurements()
+ else:
+ self.consecutive_fails = 0
+ return {'data': samples, 'errors': self.last_error_count,
+ 'overflows': self.ffreader.get_last_overflows()}
+
+
+class HX711(HX71xBase):
+ def __init__(self, config):
+ super(HX711, self).__init__(config, "hx711",
+ # HX711 sps options
+ {80: 80, 10: 10}, 80,
+ # HX711 gain/channel options
+ {'A-128': 1, 'B-32': 2, 'A-64': 3}, 'A-128')
+
+
+class HX717(HX71xBase):
+ def __init__(self, config):
+ super(HX717, self).__init__(config, "hx717",
+ # HX717 sps options
+ {320: 320, 80: 80, 20: 20, 10: 10}, 320,
+ # HX717 gain/channel options
+ {'A-128': 1, 'B-64': 2, 'A-64': 3,
+ 'B-8': 4}, 'A-128')
+
+
+HX71X_SENSOR_TYPES = {
+ "hx711": HX711,
+ "hx717": HX717
+}