diff options
Diffstat (limited to 'klippy/extras/ads1220.py')
-rw-r--r-- | klippy/extras/ads1220.py | 169 |
1 files changed, 115 insertions, 54 deletions
diff --git a/klippy/extras/ads1220.py b/klippy/extras/ads1220.py index 16080dc7..d1ccc9c9 100644 --- a/klippy/extras/ads1220.py +++ b/klippy/extras/ads1220.py @@ -19,6 +19,7 @@ WREG_CMD = 0x40 NOOP_CMD = 0x0 RESET_STATE = bytearray([0x0, 0x0, 0x0, 0x0]) + # turn bytearrays into pretty hex strings: [0xff, 0x1] def hexify(byte_array): return "[%s]" % (", ".join([hex(b) for b in byte_array])) @@ -32,59 +33,103 @@ class ADS1220: self.consecutive_fails = 0 # Chip options # Gain - self.gain_options = {'1': 0x0, '2': 0x1, '4': 0x2, '8': 0x3, '16': 0x4, - '32': 0x5, '64': 0x6, '128': 0x7} - self.gain = config.getchoice('gain', self.gain_options, default='128') + self.gain_options = { + "1": 0x0, + "2": 0x1, + "4": 0x2, + "8": 0x3, + "16": 0x4, + "32": 0x5, + "64": 0x6, + "128": 0x7, + } + self.gain = config.getchoice("gain", self.gain_options, default="128") # Sample rate - self.sps_normal = {'20': 20, '45': 45, '90': 90, '175': 175, - '330': 330, '600': 600, '1000': 1000} - self.sps_turbo = {'40': 40, '90': 90, '180': 180, '350': 350, - '660': 660, '1200': 1200, '2000': 2000} + self.sps_normal = { + "20": 20, + "45": 45, + "90": 90, + "175": 175, + "330": 330, + "600": 600, + "1000": 1000, + } + self.sps_turbo = { + "40": 40, + "90": 90, + "180": 180, + "350": 350, + "660": 660, + "1200": 1200, + "2000": 2000, + } self.sps_options = self.sps_normal.copy() self.sps_options.update(self.sps_turbo) - self.sps = config.getchoice('sample_rate', self.sps_options, - default='660') + self.sps = config.getchoice("sample_rate", self.sps_options, default="660") self.is_turbo = str(self.sps) in self.sps_turbo # Input multiplexer: AINP and AINN - mux_options = {'AIN0_AIN1': 0b0000, 'AIN0_AIN2': 0b0001, - 'AIN0_AIN3': 0b0010, 'AIN1_AIN2': 0b0011, - 'AIN1_AIN3': 0b0100, 'AIN2_AIN3': 0b0101, - 'AIN1_AIN0': 0b0110, 'AIN3_AIN2': 0b0111, - 'AIN0_AVSS': 0b1000, 'AIN1_AVSS': 0b1001, - 'AIN2_AVSS': 0b1010, 'AIN3_AVSS': 0b1011} - self.mux = config.getchoice('input_mux', mux_options, - default='AIN0_AIN1') + mux_options = { + "AIN0_AIN1": 0b0000, + "AIN0_AIN2": 0b0001, + "AIN0_AIN3": 0b0010, + "AIN1_AIN2": 0b0011, + "AIN1_AIN3": 0b0100, + "AIN2_AIN3": 0b0101, + "AIN1_AIN0": 0b0110, + "AIN3_AIN2": 0b0111, + "AIN0_AVSS": 0b1000, + "AIN1_AVSS": 0b1001, + "AIN2_AVSS": 0b1010, + "AIN3_AVSS": 0b1011, + } + self.mux = config.getchoice("input_mux", mux_options, default="AIN0_AIN1") # PGA Bypass - self.pga_bypass = config.getboolean('pga_bypass', default=False) + self.pga_bypass = config.getboolean("pga_bypass", default=False) # bypass PGA when AVSS is the negative input force_pga_bypass = self.mux >= 0b1000 self.pga_bypass = force_pga_bypass or self.pga_bypass # Voltage Reference - self.vref_options = {'internal': 0b0, 'REF0': 0b01, 'REF1': 0b10, - 'analog_supply': 0b11} - self.vref = config.getchoice('vref', self.vref_options, - default='internal') + self.vref_options = { + "internal": 0b0, + "REF0": 0b01, + "REF1": 0b10, + "analog_supply": 0b11, + } + self.vref = config.getchoice("vref", self.vref_options, default="internal") # check for conflict between REF1 and AIN0/AIN3 - mux_conflict = [0b0000, 0b0001, 0b0010, 0b0100, 0b0101, 0b0110, 0b0111, - 0b1000, 0b1011] + mux_conflict = [ + 0b0000, + 0b0001, + 0b0010, + 0b0100, + 0b0101, + 0b0110, + 0b0111, + 0b1000, + 0b1011, + ] if self.vref == 0b10 and self.mux in mux_conflict: - raise config.error("ADS1220 config error: AIN0/REFP1 and AIN3/REFN1" - " cant be used as a voltage reference and" - " an input at the same time") + raise config.error( + "ADS1220 config error: AIN0/REFP1 and AIN3/REFN1" + " cant be used as a voltage reference and" + " an input at the same time" + ) # SPI Setup spi_speed = 512000 if self.is_turbo else 256000 self.spi = bus.MCU_SPI_from_config(config, 1, default_speed=spi_speed) self.mcu = mcu = self.spi.get_mcu() self.oid = mcu.create_oid() # Data Ready (DRDY) Pin - drdy_pin = config.get('data_ready_pin') - ppins = printer.lookup_object('pins') + drdy_pin = config.get("data_ready_pin") + ppins = printer.lookup_object("pins") drdy_ppin = ppins.lookup_pin(drdy_pin) - self.data_ready_pin = drdy_ppin['pin'] - drdy_pin_mcu = drdy_ppin['chip'] + self.data_ready_pin = drdy_ppin["pin"] + drdy_pin_mcu = drdy_ppin["chip"] if drdy_pin_mcu != self.mcu: - raise config.error("ADS1220 config error: SPI communication and" - " data_ready_pin must be on the same MCU") + raise config.error( + "ADS1220 config error: SPI communication and" + " data_ready_pin must be on the same MCU" + ) # Bulk Sensor Setup self.bulk_queue = bulk_sensor.BulkDataQueue(self.mcu, oid=self.oid) # Clock tracking @@ -93,26 +138,35 @@ class ADS1220: self.ffreader = bulk_sensor.FixedFreqReader(mcu, chip_smooth, "<i") # Process messages in batches self.batch_bulk = bulk_sensor.BatchBulkHelper( - self.printer, self._process_batch, self._start_measurements, - self._finish_measurements, UPDATE_INTERVAL) + self.printer, + self._process_batch, + self._start_measurements, + self._finish_measurements, + UPDATE_INTERVAL, + ) # Command Configuration self.attach_probe_cmd = None mcu.add_config_cmd( "config_ads1220 oid=%d spi_oid=%d data_ready_pin=%s" - % (self.oid, self.spi.get_oid(), self.data_ready_pin)) - mcu.add_config_cmd("query_ads1220 oid=%d rest_ticks=0" - % (self.oid,), on_restart=True) + % (self.oid, self.spi.get_oid(), self.data_ready_pin) + ) + mcu.add_config_cmd( + "query_ads1220 oid=%d rest_ticks=0" % (self.oid,), on_restart=True + ) mcu.register_config_callback(self._build_config) self.query_ads1220_cmd = None def _build_config(self): cmdqueue = self.spi.get_command_queue() self.query_ads1220_cmd = self.mcu.lookup_command( - "query_ads1220 oid=%c rest_ticks=%u", cq=cmdqueue) + "query_ads1220 oid=%c rest_ticks=%u", cq=cmdqueue + ) self.attach_probe_cmd = self.mcu.lookup_command( - "ads1220_attach_load_cell_probe oid=%c load_cell_probe_oid=%c") - self.ffreader.setup_query_command("query_ads1220_status oid=%c", - oid=self.oid, cq=cmdqueue) + "ads1220_attach_load_cell_probe oid=%c load_cell_probe_oid=%c" + ) + self.ffreader.setup_query_command( + "query_ads1220_status oid=%c", oid=self.oid, cq=cmdqueue + ) def get_mcu(self): return self.mcu @@ -134,7 +188,7 @@ class ADS1220: # Measurement decoding def _convert_samples(self, samples): - adc_factor = 1. / (1 << 23) + adc_factor = 1.0 / (1 << 23) count = 0 for ptime, val in samples: samples[count] = (round(ptime, 6), val, round(val * adc_factor, 9)) @@ -148,7 +202,7 @@ class ADS1220: # Start bulk reading self.reset_chip() self.setup_chip() - rest_ticks = self.mcu.seconds_to_clock(1. / (10. * self.sps)) + rest_ticks = self.mcu.seconds_to_clock(1.0 / (10.0 * self.sps)) self.query_ads1220_cmd.send([self.oid, rest_ticks]) logging.info("ADS1220 starting '%s' measurements", self.name) # Initialize clock tracking @@ -166,8 +220,11 @@ class ADS1220: def _process_batch(self, eventtime): samples = self.ffreader.pull_samples() self._convert_samples(samples) - return {'data': samples, 'errors': self.last_error_count, - 'overflows': self.ffreader.get_last_overflows()} + return { + "data": samples, + "errors": self.last_error_count, + "overflows": self.ffreader.get_last_overflows(), + } def reset_chip(self): # the reset command takes 50us to complete @@ -179,17 +236,20 @@ class ADS1220: "Invalid ads1220 reset state (got %s vs %s).\n" "This is generally indicative of connection problems\n" "(e.g. faulty wiring) or a faulty ADS1220 chip." - % (hexify(val), hexify(RESET_STATE))) + % (hexify(val), hexify(RESET_STATE)) + ) def setup_chip(self): continuous = 0x1 # enable continuous conversions mode = 0x2 if self.is_turbo else 0x0 # turbo mode sps_list = self.sps_turbo if self.is_turbo else self.sps_normal data_rate = list(sps_list.keys()).index(str(self.sps)) - reg_values = [(self.mux << 4) | (self.gain << 1) | int(self.pga_bypass), - (data_rate << 5) | (mode << 3) | (continuous << 2), - (self.vref << 6), - 0x0] + reg_values = [ + (self.mux << 4) | (self.gain << 1) | int(self.pga_bypass), + (data_rate << 5) | (mode << 3) | (continuous << 2), + (self.vref << 6), + 0x0, + ] self.write_reg(0x0, reg_values) # start measurements immediately self.send_command(START_SYNC_CMD) @@ -198,7 +258,7 @@ class ADS1220: read_command = [RREG_CMD | (reg << 2) | (byte_count - 1)] read_command += [NOOP_CMD] * byte_count params = self.spi.spi_transfer(read_command) - return bytearray(params['response'][1:]) + return bytearray(params["response"][1:]) def send_command(self, cmd): self.spi.spi_send([cmd]) @@ -211,8 +271,9 @@ class ADS1220: if bytearray(register_bytes) != stored_val: raise self.printer.command_error( "Failed to set ADS1220 register [0x%x] to %s: got %s. " - "This may be a connection problem (e.g. faulty wiring)" % ( - reg, hexify(register_bytes), hexify(stored_val))) + "This may be a connection problem (e.g. faulty wiring)" + % (reg, hexify(register_bytes), hexify(stored_val)) + ) ADS1220_SENSOR_TYPE = {"ads1220": ADS1220} |