aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/ads1220.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/ads1220.py')
-rw-r--r--klippy/extras/ads1220.py169
1 files changed, 115 insertions, 54 deletions
diff --git a/klippy/extras/ads1220.py b/klippy/extras/ads1220.py
index 16080dc7..d1ccc9c9 100644
--- a/klippy/extras/ads1220.py
+++ b/klippy/extras/ads1220.py
@@ -19,6 +19,7 @@ WREG_CMD = 0x40
NOOP_CMD = 0x0
RESET_STATE = bytearray([0x0, 0x0, 0x0, 0x0])
+
# turn bytearrays into pretty hex strings: [0xff, 0x1]
def hexify(byte_array):
return "[%s]" % (", ".join([hex(b) for b in byte_array]))
@@ -32,59 +33,103 @@ class ADS1220:
self.consecutive_fails = 0
# Chip options
# Gain
- self.gain_options = {'1': 0x0, '2': 0x1, '4': 0x2, '8': 0x3, '16': 0x4,
- '32': 0x5, '64': 0x6, '128': 0x7}
- self.gain = config.getchoice('gain', self.gain_options, default='128')
+ self.gain_options = {
+ "1": 0x0,
+ "2": 0x1,
+ "4": 0x2,
+ "8": 0x3,
+ "16": 0x4,
+ "32": 0x5,
+ "64": 0x6,
+ "128": 0x7,
+ }
+ self.gain = config.getchoice("gain", self.gain_options, default="128")
# Sample rate
- self.sps_normal = {'20': 20, '45': 45, '90': 90, '175': 175,
- '330': 330, '600': 600, '1000': 1000}
- self.sps_turbo = {'40': 40, '90': 90, '180': 180, '350': 350,
- '660': 660, '1200': 1200, '2000': 2000}
+ self.sps_normal = {
+ "20": 20,
+ "45": 45,
+ "90": 90,
+ "175": 175,
+ "330": 330,
+ "600": 600,
+ "1000": 1000,
+ }
+ self.sps_turbo = {
+ "40": 40,
+ "90": 90,
+ "180": 180,
+ "350": 350,
+ "660": 660,
+ "1200": 1200,
+ "2000": 2000,
+ }
self.sps_options = self.sps_normal.copy()
self.sps_options.update(self.sps_turbo)
- self.sps = config.getchoice('sample_rate', self.sps_options,
- default='660')
+ self.sps = config.getchoice("sample_rate", self.sps_options, default="660")
self.is_turbo = str(self.sps) in self.sps_turbo
# Input multiplexer: AINP and AINN
- mux_options = {'AIN0_AIN1': 0b0000, 'AIN0_AIN2': 0b0001,
- 'AIN0_AIN3': 0b0010, 'AIN1_AIN2': 0b0011,
- 'AIN1_AIN3': 0b0100, 'AIN2_AIN3': 0b0101,
- 'AIN1_AIN0': 0b0110, 'AIN3_AIN2': 0b0111,
- 'AIN0_AVSS': 0b1000, 'AIN1_AVSS': 0b1001,
- 'AIN2_AVSS': 0b1010, 'AIN3_AVSS': 0b1011}
- self.mux = config.getchoice('input_mux', mux_options,
- default='AIN0_AIN1')
+ mux_options = {
+ "AIN0_AIN1": 0b0000,
+ "AIN0_AIN2": 0b0001,
+ "AIN0_AIN3": 0b0010,
+ "AIN1_AIN2": 0b0011,
+ "AIN1_AIN3": 0b0100,
+ "AIN2_AIN3": 0b0101,
+ "AIN1_AIN0": 0b0110,
+ "AIN3_AIN2": 0b0111,
+ "AIN0_AVSS": 0b1000,
+ "AIN1_AVSS": 0b1001,
+ "AIN2_AVSS": 0b1010,
+ "AIN3_AVSS": 0b1011,
+ }
+ self.mux = config.getchoice("input_mux", mux_options, default="AIN0_AIN1")
# PGA Bypass
- self.pga_bypass = config.getboolean('pga_bypass', default=False)
+ self.pga_bypass = config.getboolean("pga_bypass", default=False)
# bypass PGA when AVSS is the negative input
force_pga_bypass = self.mux >= 0b1000
self.pga_bypass = force_pga_bypass or self.pga_bypass
# Voltage Reference
- self.vref_options = {'internal': 0b0, 'REF0': 0b01, 'REF1': 0b10,
- 'analog_supply': 0b11}
- self.vref = config.getchoice('vref', self.vref_options,
- default='internal')
+ self.vref_options = {
+ "internal": 0b0,
+ "REF0": 0b01,
+ "REF1": 0b10,
+ "analog_supply": 0b11,
+ }
+ self.vref = config.getchoice("vref", self.vref_options, default="internal")
# check for conflict between REF1 and AIN0/AIN3
- mux_conflict = [0b0000, 0b0001, 0b0010, 0b0100, 0b0101, 0b0110, 0b0111,
- 0b1000, 0b1011]
+ mux_conflict = [
+ 0b0000,
+ 0b0001,
+ 0b0010,
+ 0b0100,
+ 0b0101,
+ 0b0110,
+ 0b0111,
+ 0b1000,
+ 0b1011,
+ ]
if self.vref == 0b10 and self.mux in mux_conflict:
- raise config.error("ADS1220 config error: AIN0/REFP1 and AIN3/REFN1"
- " cant be used as a voltage reference and"
- " an input at the same time")
+ raise config.error(
+ "ADS1220 config error: AIN0/REFP1 and AIN3/REFN1"
+ " cant be used as a voltage reference and"
+ " an input at the same time"
+ )
# SPI Setup
spi_speed = 512000 if self.is_turbo else 256000
self.spi = bus.MCU_SPI_from_config(config, 1, default_speed=spi_speed)
self.mcu = mcu = self.spi.get_mcu()
self.oid = mcu.create_oid()
# Data Ready (DRDY) Pin
- drdy_pin = config.get('data_ready_pin')
- ppins = printer.lookup_object('pins')
+ drdy_pin = config.get("data_ready_pin")
+ ppins = printer.lookup_object("pins")
drdy_ppin = ppins.lookup_pin(drdy_pin)
- self.data_ready_pin = drdy_ppin['pin']
- drdy_pin_mcu = drdy_ppin['chip']
+ self.data_ready_pin = drdy_ppin["pin"]
+ drdy_pin_mcu = drdy_ppin["chip"]
if drdy_pin_mcu != self.mcu:
- raise config.error("ADS1220 config error: SPI communication and"
- " data_ready_pin must be on the same MCU")
+ raise config.error(
+ "ADS1220 config error: SPI communication and"
+ " data_ready_pin must be on the same MCU"
+ )
# Bulk Sensor Setup
self.bulk_queue = bulk_sensor.BulkDataQueue(self.mcu, oid=self.oid)
# Clock tracking
@@ -93,26 +138,35 @@ class ADS1220:
self.ffreader = bulk_sensor.FixedFreqReader(mcu, chip_smooth, "<i")
# Process messages in batches
self.batch_bulk = bulk_sensor.BatchBulkHelper(
- self.printer, self._process_batch, self._start_measurements,
- self._finish_measurements, UPDATE_INTERVAL)
+ self.printer,
+ self._process_batch,
+ self._start_measurements,
+ self._finish_measurements,
+ UPDATE_INTERVAL,
+ )
# Command Configuration
self.attach_probe_cmd = None
mcu.add_config_cmd(
"config_ads1220 oid=%d spi_oid=%d data_ready_pin=%s"
- % (self.oid, self.spi.get_oid(), self.data_ready_pin))
- mcu.add_config_cmd("query_ads1220 oid=%d rest_ticks=0"
- % (self.oid,), on_restart=True)
+ % (self.oid, self.spi.get_oid(), self.data_ready_pin)
+ )
+ mcu.add_config_cmd(
+ "query_ads1220 oid=%d rest_ticks=0" % (self.oid,), on_restart=True
+ )
mcu.register_config_callback(self._build_config)
self.query_ads1220_cmd = None
def _build_config(self):
cmdqueue = self.spi.get_command_queue()
self.query_ads1220_cmd = self.mcu.lookup_command(
- "query_ads1220 oid=%c rest_ticks=%u", cq=cmdqueue)
+ "query_ads1220 oid=%c rest_ticks=%u", cq=cmdqueue
+ )
self.attach_probe_cmd = self.mcu.lookup_command(
- "ads1220_attach_load_cell_probe oid=%c load_cell_probe_oid=%c")
- self.ffreader.setup_query_command("query_ads1220_status oid=%c",
- oid=self.oid, cq=cmdqueue)
+ "ads1220_attach_load_cell_probe oid=%c load_cell_probe_oid=%c"
+ )
+ self.ffreader.setup_query_command(
+ "query_ads1220_status oid=%c", oid=self.oid, cq=cmdqueue
+ )
def get_mcu(self):
return self.mcu
@@ -134,7 +188,7 @@ class ADS1220:
# Measurement decoding
def _convert_samples(self, samples):
- adc_factor = 1. / (1 << 23)
+ adc_factor = 1.0 / (1 << 23)
count = 0
for ptime, val in samples:
samples[count] = (round(ptime, 6), val, round(val * adc_factor, 9))
@@ -148,7 +202,7 @@ class ADS1220:
# Start bulk reading
self.reset_chip()
self.setup_chip()
- rest_ticks = self.mcu.seconds_to_clock(1. / (10. * self.sps))
+ rest_ticks = self.mcu.seconds_to_clock(1.0 / (10.0 * self.sps))
self.query_ads1220_cmd.send([self.oid, rest_ticks])
logging.info("ADS1220 starting '%s' measurements", self.name)
# Initialize clock tracking
@@ -166,8 +220,11 @@ class ADS1220:
def _process_batch(self, eventtime):
samples = self.ffreader.pull_samples()
self._convert_samples(samples)
- return {'data': samples, 'errors': self.last_error_count,
- 'overflows': self.ffreader.get_last_overflows()}
+ return {
+ "data": samples,
+ "errors": self.last_error_count,
+ "overflows": self.ffreader.get_last_overflows(),
+ }
def reset_chip(self):
# the reset command takes 50us to complete
@@ -179,17 +236,20 @@ class ADS1220:
"Invalid ads1220 reset state (got %s vs %s).\n"
"This is generally indicative of connection problems\n"
"(e.g. faulty wiring) or a faulty ADS1220 chip."
- % (hexify(val), hexify(RESET_STATE)))
+ % (hexify(val), hexify(RESET_STATE))
+ )
def setup_chip(self):
continuous = 0x1 # enable continuous conversions
mode = 0x2 if self.is_turbo else 0x0 # turbo mode
sps_list = self.sps_turbo if self.is_turbo else self.sps_normal
data_rate = list(sps_list.keys()).index(str(self.sps))
- reg_values = [(self.mux << 4) | (self.gain << 1) | int(self.pga_bypass),
- (data_rate << 5) | (mode << 3) | (continuous << 2),
- (self.vref << 6),
- 0x0]
+ reg_values = [
+ (self.mux << 4) | (self.gain << 1) | int(self.pga_bypass),
+ (data_rate << 5) | (mode << 3) | (continuous << 2),
+ (self.vref << 6),
+ 0x0,
+ ]
self.write_reg(0x0, reg_values)
# start measurements immediately
self.send_command(START_SYNC_CMD)
@@ -198,7 +258,7 @@ class ADS1220:
read_command = [RREG_CMD | (reg << 2) | (byte_count - 1)]
read_command += [NOOP_CMD] * byte_count
params = self.spi.spi_transfer(read_command)
- return bytearray(params['response'][1:])
+ return bytearray(params["response"][1:])
def send_command(self, cmd):
self.spi.spi_send([cmd])
@@ -211,8 +271,9 @@ class ADS1220:
if bytearray(register_bytes) != stored_val:
raise self.printer.command_error(
"Failed to set ADS1220 register [0x%x] to %s: got %s. "
- "This may be a connection problem (e.g. faulty wiring)" % (
- reg, hexify(register_bytes), hexify(stored_val)))
+ "This may be a connection problem (e.g. faulty wiring)"
+ % (reg, hexify(register_bytes), hexify(stored_val))
+ )
ADS1220_SENSOR_TYPE = {"ads1220": ADS1220}