aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/adxl345.py
blob: 49e2888f4ac14798940de67721b0118dd7aff931 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# Support for reading acceleration data from an adxl345 chip
#
# Copyright (C) 2020  Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, time, collections, multiprocessing, os
from . import bus

# ADXL345 registers
REG_DEVID = 0x00
REG_BW_RATE = 0x2C
REG_POWER_CTL = 0x2D
REG_DATA_FORMAT = 0x31
REG_FIFO_CTL = 0x38
REG_MOD_READ = 0x80
REG_MOD_MULTI = 0x40

QUERY_RATES = {
    25: 0x8, 50: 0x9, 100: 0xa, 200: 0xb, 400: 0xc,
    800: 0xd, 1600: 0xe, 3200: 0xf,
}

ADXL345_DEV_ID = 0xe5

FREEFALL_ACCEL = 9.80665 * 1000.
SCALE = 0.0039 * FREEFALL_ACCEL # 3.9mg/LSB * Earth gravity in mm/s**2

Accel_Measurement = collections.namedtuple(
    'Accel_Measurement', ('time', 'accel_x', 'accel_y', 'accel_z'))

# Sample results
class ADXL345Results:
    def __init__(self):
        self.raw_samples = None
        self.samples = []
        self.drops = self.overflows = 0
        self.time_per_sample = self.start_range = self.end_range = 0.
    def get_stats(self):
        return ("drops=%d,overflows=%d"
                ",time_per_sample=%.9f,start_range=%.6f,end_range=%.6f"
                % (self.drops, self.overflows,
                   self.time_per_sample, self.start_range, self.end_range))
    def setup_data(self, axes_map, raw_samples, end_sequence, overflows,
                   start1_time, start2_time, end1_time, end2_time):
        if not raw_samples or not end_sequence:
            return
        self.axes_map = axes_map
        self.raw_samples = raw_samples
        self.overflows = overflows
        self.start2_time = start2_time
        self.start_range = start2_time - start1_time
        self.end_range = end2_time - end1_time
        self.total_count = (end_sequence - 1) * 8 + len(raw_samples[-1][1]) // 6
        total_time = end2_time - start2_time
        self.time_per_sample = time_per_sample = total_time / self.total_count
        self.seq_to_time = time_per_sample * 8.
        actual_count = sum([len(data)//6 for _, data in raw_samples])
        self.drops = self.total_count - actual_count
    def decode_samples(self):
        if not self.raw_samples:
            return self.samples
        (x_pos, x_scale), (y_pos, y_scale), (z_pos, z_scale) = self.axes_map
        actual_count = 0
        self.samples = samples = [None] * self.total_count
        for seq, data in self.raw_samples:
            d = bytearray(data)
            count = len(data)
            sdata = [(d[i] | (d[i+1] << 8)) - ((d[i+1] & 0x80) << 9)
                     for i in range(0, count-1, 2)]
            seq_time = self.start2_time + seq * self.seq_to_time
            for i in range(count//6):
                samp_time = seq_time + i * self.time_per_sample
                x = sdata[i*3 + x_pos] * x_scale
                y = sdata[i*3 + y_pos] * y_scale
                z = sdata[i*3 + z_pos] * z_scale
                samples[actual_count] = Accel_Measurement(samp_time, x, y, z)
                actual_count += 1
        del samples[actual_count:]
        return self.samples
    def write_to_file(self, filename):
        def write_impl():
            try:
                # Try to re-nice writing process
                os.nice(20)
            except:
                pass
            f = open(filename, "w")
            f.write("##%s\n#time,accel_x,accel_y,accel_z\n" % (
                self.get_stats(),))
            samples = self.samples or self.decode_samples()
            for t, accel_x, accel_y, accel_z in samples:
                f.write("%.6f,%.6f,%.6f,%.6f\n" % (
                    t, accel_x, accel_y, accel_z))
            f.close()
        write_proc = multiprocessing.Process(target=write_impl)
        write_proc.daemon = True
        write_proc.start()

# Printer class that controls ADXL345 chip
class ADXL345:
    def __init__(self, config):
        self.printer = config.get_printer()
        self.query_rate = 0
        self.last_tx_time = 0.
        am = {'x': (0, SCALE), 'y': (1, SCALE), 'z': (2, SCALE),
              '-x': (0, -SCALE), '-y': (1, -SCALE), '-z': (2, -SCALE)}
        axes_map = config.get('axes_map', 'x,y,z').split(',')
        if len(axes_map) != 3 or any([a.strip() not in am for a in axes_map]):
            raise config.error("Invalid adxl345 axes_map parameter")
        self.axes_map = [am[a.strip()] for a in axes_map]
        self.data_rate = config.getint('rate', 3200)
        if self.data_rate not in QUERY_RATES:
            raise config.error("Invalid rate parameter: %d" % (self.data_rate,))
        # Measurement storage (accessed from background thread)
        self.raw_samples = []
        self.last_sequence = 0
        self.samples_start1 = self.samples_start2 = 0.
        # Setup mcu sensor_adxl345 bulk query code
        self.spi = bus.MCU_SPI_from_config(config, 3, default_speed=5000000)
        self.mcu = mcu = self.spi.get_mcu()
        self.oid = oid = mcu.create_oid()
        self.query_adxl345_cmd = self.query_adxl345_end_cmd =None
        mcu.add_config_cmd("config_adxl345 oid=%d spi_oid=%d"
                           % (oid, self.spi.get_oid()))
        mcu.add_config_cmd("query_adxl345 oid=%d clock=0 rest_ticks=0"
                           % (oid,), on_restart=True)
        mcu.register_config_callback(self._build_config)
        mcu.register_response(self._handle_adxl345_start, "adxl345_start", oid)
        mcu.register_response(self._handle_adxl345_data, "adxl345_data", oid)
        # Register commands
        self.name = "default"
        if len(config.get_name().split()) > 1:
            self.name = config.get_name().split()[1]
        gcode = self.printer.lookup_object('gcode')
        gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", self.name,
                                   self.cmd_ACCELEROMETER_MEASURE,
                                   desc=self.cmd_ACCELEROMETER_MEASURE_help)
        gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", self.name,
                                   self.cmd_ACCELEROMETER_QUERY,
                                   desc=self.cmd_ACCELEROMETER_QUERY_help)
        gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", self.name,
                                   self.cmd_ADXL345_DEBUG_READ,
                                   desc=self.cmd_ADXL345_DEBUG_READ_help)
        gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", self.name,
                                   self.cmd_ADXL345_DEBUG_WRITE,
                                   desc=self.cmd_ADXL345_DEBUG_WRITE_help)
        if self.name == "default":
            gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", None,
                                       self.cmd_ACCELEROMETER_MEASURE)
            gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", None,
                                       self.cmd_ACCELEROMETER_QUERY)
            gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", None,
                                       self.cmd_ADXL345_DEBUG_READ,
                                       desc=self.cmd_ADXL345_DEBUG_READ_help)
            gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", None,
                                       self.cmd_ADXL345_DEBUG_WRITE,
                                       desc=self.cmd_ADXL345_DEBUG_WRITE_help)
    def is_initialized(self):
        # In case of miswiring, testing ADXL345 device ID prevents treating
        # noise or wrong signal as a correctly initialized device
        return (self.read_reg(REG_DEVID) == ADXL345_DEV_ID and
                (self.read_reg(REG_DATA_FORMAT) & 0xB) != 0)
    def initialize(self):
        # Setup ADXL345 parameters and verify chip connectivity
        self.set_reg(REG_POWER_CTL, 0x00)
        dev_id = self.read_reg(REG_DEVID)
        if dev_id != ADXL345_DEV_ID:
            raise self.printer.command_error("Invalid adxl345 id (got %x vs %x)"
                                             % (dev_id, ADXL345_DEV_ID))
        self.set_reg(REG_DATA_FORMAT, 0x0B)
    def _build_config(self):
        self.query_adxl345_cmd = self.mcu.lookup_command(
            "query_adxl345 oid=%c clock=%u rest_ticks=%u",
            cq=self.spi.get_command_queue())
        self.query_adxl345_end_cmd = self.mcu.lookup_query_command(
            "query_adxl345 oid=%c clock=%u rest_ticks=%u",
            "adxl345_end oid=%c end1_clock=%u end2_clock=%u"
            " limit_count=%hu sequence=%hu",
            oid=self.oid, cq=self.spi.get_command_queue())
    def _clock_to_print_time(self, clock):
        return self.mcu.clock_to_print_time(self.mcu.clock32_to_clock64(clock))
    def _handle_adxl345_start(self, params):
        self.samples_start1 = self._clock_to_print_time(params['start1_clock'])
        self.samples_start2 = self._clock_to_print_time(params['start2_clock'])
    def _handle_adxl345_data(self, params):
        last_sequence = self.last_sequence
        sequence = (last_sequence & ~0xffff) | params['sequence']
        if sequence < last_sequence:
            sequence += 0x10000
        self.last_sequence = sequence
        raw_samples = self.raw_samples
        if len(raw_samples) >= 300000:
            # Avoid filling up memory with too many samples
            return
        raw_samples.append((sequence, params['data']))
    def _convert_sequence(self, sequence):
        sequence = (self.last_sequence & ~0xffff) | sequence
        if sequence < self.last_sequence:
            sequence += 0x10000
        return sequence
    def read_reg(self, reg):
        params = self.spi.spi_transfer([reg | REG_MOD_READ, 0x00])
        response = bytearray(params['response'])
        return response[1]
    def set_reg(self, reg, val, minclock=0):
        self.spi.spi_send([reg, val & 0xFF], minclock=minclock)
        stored_val = self.read_reg(reg)
        if stored_val != val:
            raise self.printer.command_error(
                    "Failed to set ADXL345 register [0x%x] to 0x%x: got 0x%x. "
                    "This is generally indicative of connection problems "
                    "(e.g. faulty wiring) or a faulty adxl345 chip." % (
                        reg, val, stored_val))
    def is_measuring(self):
        return self.query_rate > 0
    def start_measurements(self, rate=None):
        if self.is_measuring():
            return
        rate = rate or self.data_rate
        if not self.is_initialized():
            self.initialize()
        # Setup chip in requested query rate
        clock = 0
        if self.last_tx_time:
            clock = self.mcu.print_time_to_clock(self.last_tx_time)
        self.set_reg(REG_POWER_CTL, 0x00, minclock=clock)
        self.set_reg(REG_FIFO_CTL, 0x00)
        self.set_reg(REG_BW_RATE, QUERY_RATES[rate])
        self.set_reg(REG_FIFO_CTL, 0x80)
        # Setup samples
        print_time = self.printer.lookup_object('toolhead').get_last_move_time()
        self.raw_samples = []
        self.last_sequence = 0
        self.samples_start1 = self.samples_start2 = print_time
        # Start bulk reading
        reqclock = self.mcu.print_time_to_clock(print_time)
        rest_ticks = self.mcu.seconds_to_clock(4. / rate)
        self.last_tx_time = print_time
        self.query_rate = rate
        self.query_adxl345_cmd.send([self.oid, reqclock, rest_ticks],
                                    reqclock=reqclock)
    def finish_measurements(self):
        if not self.is_measuring():
            return ADXL345Results()
        # Halt bulk reading
        print_time = self.printer.lookup_object('toolhead').get_last_move_time()
        clock = self.mcu.print_time_to_clock(print_time)
        params = self.query_adxl345_end_cmd.send([self.oid, 0, 0],
                                                 minclock=clock)
        self.last_tx_time = print_time
        self.query_rate = 0
        raw_samples = self.raw_samples
        self.raw_samples = []
        # Generate results
        end1_time = self._clock_to_print_time(params['end1_clock'])
        end2_time = self._clock_to_print_time(params['end2_clock'])
        end_sequence = self._convert_sequence(params['sequence'])
        overflows = params['limit_count']
        res = ADXL345Results()
        res.setup_data(self.axes_map, raw_samples, end_sequence, overflows,
                       self.samples_start1, self.samples_start2,
                       end1_time, end2_time)
        logging.info("ADXL345 finished %d measurements: %s",
                     res.total_count, res.get_stats())
        return res
    def end_query(self, name, gcmd):
        if not self.is_measuring():
            return
        res = self.finish_measurements()
        # Write data to file
        if self.name == "default":
            filename = "/tmp/adxl345-%s.csv" % (name,)
        else:
            filename = "/tmp/adxl345-%s-%s.csv" % (self.name, name,)
        res.write_to_file(filename)
        gcmd.respond_info(
                "Writing raw accelerometer data to %s file" % (filename,))
    cmd_ACCELEROMETER_MEASURE_help = "Start/stop accelerometer"
    def cmd_ACCELEROMETER_MEASURE(self, gcmd):
        if self.is_measuring():
            name = gcmd.get("NAME", time.strftime("%Y%m%d_%H%M%S"))
            if not name.replace('-', '').replace('_', '').isalnum():
                raise gcmd.error("Invalid adxl345 NAME parameter")
            self.end_query(name, gcmd)
            gcmd.respond_info("adxl345 measurements stopped")
        else:
            rate = gcmd.get_int("RATE", self.data_rate)
            if rate not in QUERY_RATES:
                raise gcmd.error("Not a valid adxl345 query rate: %d" % (rate,))
            self.start_measurements(rate)
            gcmd.respond_info("adxl345 measurements started")
    cmd_ACCELEROMETER_QUERY_help = "Query accelerometer for the current values"
    def cmd_ACCELEROMETER_QUERY(self, gcmd):
        if self.is_measuring():
            raise gcmd.error("adxl345 measurements in progress")
        self.start_measurements()
        reactor = self.printer.get_reactor()
        eventtime = starttime = reactor.monotonic()
        while not self.raw_samples:
            eventtime = reactor.pause(eventtime + .1)
            if eventtime > starttime + 3.:
                # Try to shutdown the measurements
                self.finish_measurements()
                raise gcmd.error("Timeout reading adxl345 data")
        result = self.finish_measurements()
        values = result.decode_samples()
        _, accel_x, accel_y, accel_z = values[-1]
        gcmd.respond_info("adxl345 values (x, y, z): %.6f, %.6f, %.6f" % (
            accel_x, accel_y, accel_z))
    cmd_ADXL345_DEBUG_READ_help = "Query accelerometer register (for debugging)"
    def cmd_ADXL345_DEBUG_READ(self, gcmd):
        if self.is_measuring():
            raise gcmd.error("adxl345 measurements in progress")
        reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
        val = self.read_reg(reg)
        gcmd.respond_info("ADXL345 REG[0x%x] = 0x%x" % (reg, val))
    cmd_ADXL345_DEBUG_WRITE_help = "Set accelerometer register (for debugging)"
    def cmd_ADXL345_DEBUG_WRITE(self, gcmd):
        if self.is_measuring():
            raise gcmd.error("adxl345 measurements in progress")
        reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
        val = gcmd.get("VAL", minval=0, maxval=255, parser=lambda x: int(x, 0))
        self.set_reg(reg, val)

def load_config(config):
    return ADXL345(config)

def load_config_prefix(config):
    return ADXL345(config)