aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/ads1220.py
blob: 16080dc72ca32834c5559e6b5a9e1c6ae371538f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# ADS1220 Support
#
# Copyright (C) 2024 Gareth Farrington <gareth@waves.ky>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
from . import bulk_sensor, bus

#
# Constants
#
BYTES_PER_SAMPLE = 4  # samples are 4 byte wide unsigned integers
MAX_SAMPLES_PER_MESSAGE = bulk_sensor.MAX_BULK_MSG_SIZE // BYTES_PER_SAMPLE
UPDATE_INTERVAL = 0.10
RESET_CMD = 0x06
START_SYNC_CMD = 0x08
RREG_CMD = 0x20
WREG_CMD = 0x40
NOOP_CMD = 0x0
RESET_STATE = bytearray([0x0, 0x0, 0x0, 0x0])

# turn bytearrays into pretty hex strings: [0xff, 0x1]
def hexify(byte_array):
    return "[%s]" % (", ".join([hex(b) for b in byte_array]))


class ADS1220:
    def __init__(self, config):
        self.printer = printer = config.get_printer()
        self.name = config.get_name().split()[-1]
        self.last_error_count = 0
        self.consecutive_fails = 0
        # Chip options
        # Gain
        self.gain_options = {'1': 0x0, '2': 0x1, '4': 0x2, '8': 0x3, '16': 0x4,
                             '32': 0x5, '64': 0x6, '128': 0x7}
        self.gain = config.getchoice('gain', self.gain_options, default='128')
        # Sample rate
        self.sps_normal = {'20': 20, '45': 45, '90': 90, '175': 175,
                           '330': 330, '600': 600, '1000': 1000}
        self.sps_turbo = {'40': 40, '90': 90, '180': 180, '350': 350,
                          '660': 660, '1200': 1200, '2000': 2000}
        self.sps_options = self.sps_normal.copy()
        self.sps_options.update(self.sps_turbo)
        self.sps = config.getchoice('sample_rate', self.sps_options,
                                    default='660')
        self.is_turbo = str(self.sps) in self.sps_turbo
        # Input multiplexer: AINP and AINN
        mux_options = {'AIN0_AIN1': 0b0000, 'AIN0_AIN2': 0b0001,
                       'AIN0_AIN3': 0b0010, 'AIN1_AIN2': 0b0011,
                       'AIN1_AIN3': 0b0100, 'AIN2_AIN3': 0b0101,
                       'AIN1_AIN0': 0b0110, 'AIN3_AIN2': 0b0111,
                       'AIN0_AVSS': 0b1000, 'AIN1_AVSS': 0b1001,
                       'AIN2_AVSS': 0b1010, 'AIN3_AVSS': 0b1011}
        self.mux = config.getchoice('input_mux', mux_options,
                                    default='AIN0_AIN1')
        # PGA Bypass
        self.pga_bypass = config.getboolean('pga_bypass', default=False)
        # bypass PGA when AVSS is the negative input
        force_pga_bypass = self.mux >= 0b1000
        self.pga_bypass = force_pga_bypass or self.pga_bypass
        # Voltage Reference
        self.vref_options = {'internal': 0b0, 'REF0': 0b01, 'REF1': 0b10,
                             'analog_supply': 0b11}
        self.vref = config.getchoice('vref', self.vref_options,
                                     default='internal')
        # check for conflict between REF1 and AIN0/AIN3
        mux_conflict = [0b0000, 0b0001, 0b0010, 0b0100, 0b0101, 0b0110, 0b0111,
                        0b1000, 0b1011]
        if self.vref == 0b10 and self.mux in mux_conflict:
            raise config.error("ADS1220 config error: AIN0/REFP1 and AIN3/REFN1"
                               " cant be used as a voltage reference and"
                               " an input at the same time")
        # SPI Setup
        spi_speed = 512000 if self.is_turbo else 256000
        self.spi = bus.MCU_SPI_from_config(config, 1, default_speed=spi_speed)
        self.mcu = mcu = self.spi.get_mcu()
        self.oid = mcu.create_oid()
        # Data Ready (DRDY) Pin
        drdy_pin = config.get('data_ready_pin')
        ppins = printer.lookup_object('pins')
        drdy_ppin = ppins.lookup_pin(drdy_pin)
        self.data_ready_pin = drdy_ppin['pin']
        drdy_pin_mcu = drdy_ppin['chip']
        if drdy_pin_mcu != self.mcu:
            raise config.error("ADS1220 config error: SPI communication and"
                               " data_ready_pin must be on the same MCU")
        # Bulk Sensor Setup
        self.bulk_queue = bulk_sensor.BulkDataQueue(self.mcu, oid=self.oid)
        # Clock tracking
        chip_smooth = self.sps * UPDATE_INTERVAL * 2
        # Measurement conversion
        self.ffreader = bulk_sensor.FixedFreqReader(mcu, chip_smooth, "<i")
        # Process messages in batches
        self.batch_bulk = bulk_sensor.BatchBulkHelper(
            self.printer, self._process_batch, self._start_measurements,
            self._finish_measurements, UPDATE_INTERVAL)
        # Command Configuration
        self.attach_probe_cmd = None
        mcu.add_config_cmd(
            "config_ads1220 oid=%d spi_oid=%d data_ready_pin=%s"
            % (self.oid, self.spi.get_oid(), self.data_ready_pin))
        mcu.add_config_cmd("query_ads1220 oid=%d rest_ticks=0"
                           % (self.oid,), on_restart=True)
        mcu.register_config_callback(self._build_config)
        self.query_ads1220_cmd = None

    def _build_config(self):
        cmdqueue = self.spi.get_command_queue()
        self.query_ads1220_cmd = self.mcu.lookup_command(
            "query_ads1220 oid=%c rest_ticks=%u", cq=cmdqueue)
        self.attach_probe_cmd = self.mcu.lookup_command(
            "ads1220_attach_load_cell_probe oid=%c load_cell_probe_oid=%c")
        self.ffreader.setup_query_command("query_ads1220_status oid=%c",
                                          oid=self.oid, cq=cmdqueue)

    def get_mcu(self):
        return self.mcu

    def get_samples_per_second(self):
        return self.sps

    # returns a tuple of the minimum and maximum value of the sensor, used to
    # detect if a data value is saturated
    def get_range(self):
        return -0x800000, 0x7FFFFF

    # add_client interface, direct pass through to bulk_sensor API
    def add_client(self, callback):
        self.batch_bulk.add_client(callback)

    def attach_load_cell_probe(self, load_cell_probe_oid):
        self.attach_probe_cmd.send([self.oid, load_cell_probe_oid])

    # Measurement decoding
    def _convert_samples(self, samples):
        adc_factor = 1. / (1 << 23)
        count = 0
        for ptime, val in samples:
            samples[count] = (round(ptime, 6), val, round(val * adc_factor, 9))
            count += 1
        del samples[count:]

    # Start, stop, and process message batches
    def _start_measurements(self):
        self.last_error_count = 0
        self.consecutive_fails = 0
        # Start bulk reading
        self.reset_chip()
        self.setup_chip()
        rest_ticks = self.mcu.seconds_to_clock(1. / (10. * self.sps))
        self.query_ads1220_cmd.send([self.oid, rest_ticks])
        logging.info("ADS1220 starting '%s' measurements", self.name)
        # Initialize clock tracking
        self.ffreader.note_start()

    def _finish_measurements(self):
        # don't use serial connection after shutdown
        if self.printer.is_shutdown():
            return
        # Halt bulk reading
        self.query_ads1220_cmd.send_wait_ack([self.oid, 0])
        self.ffreader.note_end()
        logging.info("ADS1220 finished '%s' measurements", self.name)

    def _process_batch(self, eventtime):
        samples = self.ffreader.pull_samples()
        self._convert_samples(samples)
        return {'data': samples, 'errors': self.last_error_count,
                'overflows': self.ffreader.get_last_overflows()}

    def reset_chip(self):
        # the reset command takes 50us to complete
        self.send_command(RESET_CMD)
        # read startup register state and validate
        val = self.read_reg(0x0, 4)
        if val != RESET_STATE:
            raise self.printer.command_error(
                "Invalid ads1220 reset state (got %s vs %s).\n"
                "This is generally indicative of connection problems\n"
                "(e.g. faulty wiring) or a faulty ADS1220 chip."
                % (hexify(val), hexify(RESET_STATE)))

    def setup_chip(self):
        continuous = 0x1  # enable continuous conversions
        mode = 0x2 if self.is_turbo else 0x0  # turbo mode
        sps_list = self.sps_turbo if self.is_turbo else self.sps_normal
        data_rate = list(sps_list.keys()).index(str(self.sps))
        reg_values = [(self.mux << 4) | (self.gain << 1) | int(self.pga_bypass),
                      (data_rate << 5) | (mode << 3) | (continuous << 2),
                      (self.vref << 6),
                      0x0]
        self.write_reg(0x0, reg_values)
        # start measurements immediately
        self.send_command(START_SYNC_CMD)

    def read_reg(self, reg, byte_count):
        read_command = [RREG_CMD | (reg << 2) | (byte_count - 1)]
        read_command += [NOOP_CMD] * byte_count
        params = self.spi.spi_transfer(read_command)
        return bytearray(params['response'][1:])

    def send_command(self, cmd):
        self.spi.spi_send([cmd])

    def write_reg(self, reg, register_bytes):
        write_command = [WREG_CMD | (reg << 2) | (len(register_bytes) - 1)]
        write_command.extend(register_bytes)
        self.spi.spi_send(write_command)
        stored_val = self.read_reg(reg, len(register_bytes))
        if bytearray(register_bytes) != stored_val:
            raise self.printer.command_error(
                "Failed to set ADS1220 register [0x%x] to %s: got %s. "
                "This may be a connection problem (e.g. faulty wiring)" % (
                    reg, hexify(register_bytes), hexify(stored_val)))


ADS1220_SENSOR_TYPE = {"ads1220": ADS1220}