diff options
Diffstat (limited to 'klippy/extras/lis2dw.py')
-rw-r--r-- | klippy/extras/lis2dw.py | 101 |
1 files changed, 63 insertions, 38 deletions
diff --git a/klippy/extras/lis2dw.py b/klippy/extras/lis2dw.py index dba83d08..a0d582bb 100644 --- a/klippy/extras/lis2dw.py +++ b/klippy/extras/lis2dw.py @@ -22,7 +22,7 @@ REG_LIS2DW_OUT_YL_ADDR = 0x2A REG_LIS2DW_OUT_YH_ADDR = 0x2B REG_LIS2DW_OUT_ZL_ADDR = 0x2C REG_LIS2DW_OUT_ZH_ADDR = 0x2D -REG_LIS2DW_FIFO_CTRL = 0x2E +REG_LIS2DW_FIFO_CTRL = 0x2E REG_LIS2DW_FIFO_SAMPLES = 0x2F REG_MOD_READ = 0x80 @@ -39,11 +39,12 @@ LIS3DH_SCALE = FREEFALL_ACCEL * 11.718 / 16 BATCH_UPDATES = 0.100 # "Enums" that should be compatible with all python versions -LIS2DW_TYPE = 'LIS2DW' -LIS3DH_TYPE = 'LIS3DH' +LIS2DW_TYPE = "LIS2DW" +LIS3DH_TYPE = "LIS3DH" + +SPI_SERIAL_TYPE = "spi" +I2C_SERIAL_TYPE = "i2c" -SPI_SERIAL_TYPE = 'spi' -I2C_SERIAL_TYPE = 'i2c' # Printer class that controls LIS2DW chip class LIS2DW: @@ -52,33 +53,35 @@ class LIS2DW: adxl345.AccelCommandHelper(config, self) self.lis_type = lis_type if self.lis_type == LIS2DW_TYPE: - self.axes_map = adxl345.read_axes_map(config, LIS2DW_SCALE, - LIS2DW_SCALE, LIS2DW_SCALE) + self.axes_map = adxl345.read_axes_map( + config, LIS2DW_SCALE, LIS2DW_SCALE, LIS2DW_SCALE + ) self.data_rate = 1600 else: - self.axes_map = adxl345.read_axes_map(config, LIS3DH_SCALE, - LIS3DH_SCALE, LIS3DH_SCALE) + self.axes_map = adxl345.read_axes_map( + config, LIS3DH_SCALE, LIS3DH_SCALE, LIS3DH_SCALE + ) self.data_rate = 1344 # Check for spi or i2c - if config.get('cs_pin', None) is not None: + if config.get("cs_pin", None) is not None: self.bus_type = SPI_SERIAL_TYPE else: self.bus_type = I2C_SERIAL_TYPE # Setup mcu sensor_lis2dw bulk query code if self.bus_type == SPI_SERIAL_TYPE: - self.bus = bus.MCU_SPI_from_config(config, - 3, default_speed=5000000) + self.bus = bus.MCU_SPI_from_config(config, 3, default_speed=5000000) else: - self.bus = bus.MCU_I2C_from_config(config, - default_addr=LIS_I2C_ADDR, default_speed=400000) + self.bus = bus.MCU_I2C_from_config( + config, default_addr=LIS_I2C_ADDR, default_speed=400000 + ) self.mcu = mcu = self.bus.get_mcu() self.oid = oid = mcu.create_oid() self.query_lis2dw_cmd = None - mcu.add_config_cmd("config_lis2dw oid=%d bus_oid=%d bus_oid_type=%s " - "lis_chip_type=%s" % (oid, self.bus.get_oid(), - self.bus_type, self.lis_type)) - mcu.add_config_cmd("query_lis2dw oid=%d rest_ticks=0" - % (oid,), on_restart=True) + mcu.add_config_cmd( + "config_lis2dw oid=%d bus_oid=%d bus_oid_type=%s " + "lis_chip_type=%s" % (oid, self.bus.get_oid(), self.bus_type, self.lis_type) + ) + mcu.add_config_cmd("query_lis2dw oid=%d rest_ticks=0" % (oid,), on_restart=True) mcu.register_config_callback(self._build_config) # Bulk sample message reading chip_smooth = self.data_rate * BATCH_UPDATES * 2 @@ -86,25 +89,35 @@ class LIS2DW: self.last_error_count = 0 # Process messages in batches self.batch_bulk = bulk_sensor.BatchBulkHelper( - self.printer, self._process_batch, - self._start_measurements, self._finish_measurements, BATCH_UPDATES) + self.printer, + self._process_batch, + self._start_measurements, + self._finish_measurements, + BATCH_UPDATES, + ) self.name = config.get_name().split()[-1] - hdr = ('time', 'x_acceleration', 'y_acceleration', 'z_acceleration') - self.batch_bulk.add_mux_endpoint("lis2dw/dump_lis2dw", "sensor", - self.name, {'header': hdr}) + hdr = ("time", "x_acceleration", "y_acceleration", "z_acceleration") + self.batch_bulk.add_mux_endpoint( + "lis2dw/dump_lis2dw", "sensor", self.name, {"header": hdr} + ) + def _build_config(self): cmdqueue = self.bus.get_command_queue() self.query_lis2dw_cmd = self.mcu.lookup_command( - "query_lis2dw oid=%c rest_ticks=%u", cq=cmdqueue) - self.ffreader.setup_query_command("query_lis2dw_status oid=%c", - oid=self.oid, cq=cmdqueue) + "query_lis2dw oid=%c rest_ticks=%u", cq=cmdqueue + ) + self.ffreader.setup_query_command( + "query_lis2dw_status oid=%c", oid=self.oid, cq=cmdqueue + ) + def read_reg(self, reg): if self.bus_type == SPI_SERIAL_TYPE: params = self.bus.spi_transfer([reg | REG_MOD_READ, 0x00]) - response = bytearray(params['response']) + response = bytearray(params["response"]) return response[1] params = self.bus.i2c_read([reg], 1) - return bytearray(params['response'])[0] + return bytearray(params["response"])[0] + def set_reg(self, reg, val, minclock=0): if self.bus_type == SPI_SERIAL_TYPE: self.bus.spi_send([reg, val & 0xFF], minclock=minclock) @@ -113,14 +126,16 @@ class LIS2DW: stored_val = self.read_reg(reg) if stored_val != val: raise self.printer.command_error( - "Failed to set LIS2DW register [0x%x] to 0x%x: got 0x%x. " - "This is generally indicative of connection problems " - "(e.g. faulty wiring) or a faulty lis2dw chip." % ( - reg, val, stored_val)) + "Failed to set LIS2DW register [0x%x] to 0x%x: got 0x%x. " + "This is generally indicative of connection problems " + "(e.g. faulty wiring) or a faulty lis2dw chip." % (reg, val, stored_val) + ) + def start_internal_client(self): aqh = adxl345.AccelQueryHelper(self.printer) self.batch_bulk.add_client(aqh.handle_batch) return aqh + # Measurement decoding def _convert_samples(self, samples): (x_pos, x_scale), (y_pos, y_scale), (z_pos, z_scale) = self.axes_map @@ -132,6 +147,7 @@ class LIS2DW: z = round(raw_xyz[z_pos] * z_scale, 6) samples[count] = (round(ptime, 6), x, y, z) count += 1 + # Start, stop, and process message batches def _start_measurements(self): # In case of miswiring, testing LIS2DW device ID prevents treating @@ -144,7 +160,8 @@ class LIS2DW: "Invalid lis2dw id (got %x vs %x).\n" "This is generally indicative of connection problems\n" "(e.g. faulty wiring) or a faulty lis2dw chip." - % (dev_id, LIS2DW_DEV_ID)) + % (dev_id, LIS2DW_DEV_ID) + ) if self.bus_type == SPI_SERIAL_TYPE: # Disable I2C self.set_reg(REG_LIS2DW_CTRL_REG2_ADDR, 0x06) @@ -163,7 +180,8 @@ class LIS2DW: "Invalid lis3dh id (got %x vs %x).\n" "This is generally indicative of connection problems\n" "(e.g. faulty wiring) or a faulty lis3dh chip." - % (dev_id, LIS3DH_DEV_ID)) + % (dev_id, LIS3DH_DEV_ID) + ) # High Resolution / Low Power mode 1344/5376 Hz # High Resolution mode (12-bit resolution) # Enable X Y Z axes @@ -177,7 +195,7 @@ class LIS2DW: # Stream mode self.set_reg(REG_LIS2DW_FIFO_CTRL, 0x80) # Start bulk reading - rest_ticks = self.mcu.seconds_to_clock(4. / self.data_rate) + rest_ticks = self.mcu.seconds_to_clock(4.0 / self.data_rate) self.query_lis2dw_cmd.send([self.oid, rest_ticks]) if self.lis_type == LIS2DW_TYPE: self.set_reg(REG_LIS2DW_FIFO_CTRL, 0xC0) @@ -187,6 +205,7 @@ class LIS2DW: # Initialize clock tracking self.ffreader.note_start() self.last_error_count = 0 + def _finish_measurements(self): # Halt bulk reading self.set_reg(REG_LIS2DW_FIFO_CTRL, 0x00) @@ -194,16 +213,22 @@ class LIS2DW: self.ffreader.note_end() logging.info("LIS2DW finished '%s' measurements", self.name) self.set_reg(REG_LIS2DW_FIFO_CTRL, 0x00) + def _process_batch(self, eventtime): samples = self.ffreader.pull_samples() self._convert_samples(samples) if not samples: return {} - return {'data': samples, 'errors': self.last_error_count, - 'overflows': self.ffreader.get_last_overflows()} + return { + "data": samples, + "errors": self.last_error_count, + "overflows": self.ffreader.get_last_overflows(), + } + def load_config(config): return LIS2DW(config, LIS2DW_TYPE) + def load_config_prefix(config): return LIS2DW(config, LIS2DW_TYPE) |