diff options
Diffstat (limited to 'klippy/extras/bme280.py')
-rw-r--r-- | klippy/extras/bme280.py | 609 |
1 files changed, 338 insertions, 271 deletions
diff --git a/klippy/extras/bme280.py b/klippy/extras/bme280.py index 1c26bbee..44de9de3 100644 --- a/klippy/extras/bme280.py +++ b/klippy/extras/bme280.py @@ -6,15 +6,25 @@ import logging from . import bus -REPORT_TIME = .8 +REPORT_TIME = 0.8 BME280_CHIP_ADDR = 0x76 BME280_REGS = { - 'RESET': 0xE0, 'CTRL_HUM': 0xF2, - 'STATUS': 0xF3, 'CTRL_MEAS': 0xF4, 'CONFIG': 0xF5, - 'PRESSURE_MSB': 0xF7, 'PRESSURE_LSB': 0xF8, 'PRESSURE_XLSB': 0xF9, - 'TEMP_MSB': 0xFA, 'TEMP_LSB': 0xFB, 'TEMP_XLSB': 0xFC, - 'HUM_MSB': 0xFD, 'HUM_LSB': 0xFE, 'CAL_1': 0x88, 'CAL_2': 0xE1 + "RESET": 0xE0, + "CTRL_HUM": 0xF2, + "STATUS": 0xF3, + "CTRL_MEAS": 0xF4, + "CONFIG": 0xF5, + "PRESSURE_MSB": 0xF7, + "PRESSURE_LSB": 0xF8, + "PRESSURE_XLSB": 0xF9, + "TEMP_MSB": 0xFA, + "TEMP_LSB": 0xFB, + "TEMP_XLSB": 0xFC, + "HUM_MSB": 0xFD, + "HUM_LSB": 0xFE, + "CAL_1": 0x88, + "CAL_2": 0xE1, } BMP388_REGS = { @@ -41,43 +51,61 @@ BMP388_REG_VAL_DRDY_EN = 0b100000 BMP388_REG_VAL_NORMAL_MODE = 0x30 BME680_REGS = { - 'RESET': 0xE0, 'CTRL_HUM': 0x72, 'CTRL_GAS_1': 0x71, 'CTRL_GAS_0': 0x70, - 'GAS_WAIT_0': 0x64, 'RES_HEAT_0': 0x5A, 'IDAC_HEAT_0': 0x50, - 'STATUS': 0x73, 'EAS_STATUS_0': 0x1D, 'CTRL_MEAS': 0x74, 'CONFIG': 0x75, - 'GAS_R_LSB': 0x2B, 'GAS_R_MSB': 0x2A, - 'PRESSURE_MSB': 0x1F, 'PRESSURE_LSB': 0x20, 'PRESSURE_XLSB': 0x21, - 'TEMP_MSB': 0x22, 'TEMP_LSB': 0x23, 'TEMP_XLSB': 0x24, - 'HUM_MSB': 0x25, 'HUM_LSB': 0x26, 'CAL_1': 0x88, 'CAL_2': 0xE1, - 'RES_HEAT_VAL': 0x00, 'RES_HEAT_RANGE': 0x02, 'RANGE_SWITCHING_ERROR': 0x04 + "RESET": 0xE0, + "CTRL_HUM": 0x72, + "CTRL_GAS_1": 0x71, + "CTRL_GAS_0": 0x70, + "GAS_WAIT_0": 0x64, + "RES_HEAT_0": 0x5A, + "IDAC_HEAT_0": 0x50, + "STATUS": 0x73, + "EAS_STATUS_0": 0x1D, + "CTRL_MEAS": 0x74, + "CONFIG": 0x75, + "GAS_R_LSB": 0x2B, + "GAS_R_MSB": 0x2A, + "PRESSURE_MSB": 0x1F, + "PRESSURE_LSB": 0x20, + "PRESSURE_XLSB": 0x21, + "TEMP_MSB": 0x22, + "TEMP_LSB": 0x23, + "TEMP_XLSB": 0x24, + "HUM_MSB": 0x25, + "HUM_LSB": 0x26, + "CAL_1": 0x88, + "CAL_2": 0xE1, + "RES_HEAT_VAL": 0x00, + "RES_HEAT_RANGE": 0x02, + "RANGE_SWITCHING_ERROR": 0x04, } BME680_GAS_CONSTANTS = { - 0: (1., 8000000.), - 1: (1., 4000000.), - 2: (1., 2000000.), - 3: (1., 1000000.), - 4: (1., 499500.4995), + 0: (1.0, 8000000.0), + 1: (1.0, 4000000.0), + 2: (1.0, 2000000.0), + 3: (1.0, 1000000.0), + 4: (1.0, 499500.4995), 5: (0.99, 248262.1648), - 6: (1., 125000.), + 6: (1.0, 125000.0), 7: (0.992, 63004.03226), - 8: (1., 31281.28128), - 9: (1., 15625.), + 8: (1.0, 31281.28128), + 9: (1.0, 15625.0), 10: (0.998, 7812.5), 11: (0.995, 3906.25), - 12: (1., 1953.125), + 12: (1.0, 1953.125), 13: (0.99, 976.5625), - 14: (1., 488.28125), - 15: (1., 244.140625) + 14: (1.0, 488.28125), + 15: (1.0, 244.140625), } BMP180_REGS = { - 'RESET': 0xE0, - 'CAL_1': 0xAA, - 'CTRL_MEAS': 0xF4, - 'REG_MSB': 0xF6, - 'REG_LSB': 0xF7, - 'CRV_TEMP': 0x2E, - 'CRV_PRES': 0x34 + "RESET": 0xE0, + "CAL_1": 0xAA, + "CTRL_MEAS": 0xF4, + "REG_MSB": 0xF6, + "REG_LSB": 0xF7, + "CRV_TEMP": 0x2E, + "CRV_PRES": 0x34, } STATUS_MEASURING = 1 << 3 @@ -92,8 +120,11 @@ MEASURE_DONE = 1 << 5 RESET_CHIP_VALUE = 0xB6 BME_CHIPS = { - 0x58: 'BMP280', 0x60: 'BME280', 0x61: 'BME680', 0x55: 'BMP180', - 0x50: 'BMP388' + 0x58: "BMP280", + 0x60: "BME280", + 0x61: "BME680", + 0x55: "BMP180", + 0x50: "BMP388", } BME_CHIP_ID_REG = 0xD0 BMP3_CHIP_ID_REG = 0x00 @@ -101,7 +132,7 @@ BMP3_CHIP_ID_REG = 0x00 def get_twos_complement(val, bit_size): if val & (1 << (bit_size - 1)): - val -= (1 << bit_size) + val -= 1 << bit_size return val @@ -126,37 +157,43 @@ def get_signed_short_msb(bits): val = get_unsigned_short_msb(bits) return get_twos_complement(val, 16) + class BME280: def __init__(self, config): self.printer = config.get_printer() self.name = config.get_name().split()[-1] self.reactor = self.printer.get_reactor() self.i2c = bus.MCU_I2C_from_config( - config, default_addr=BME280_CHIP_ADDR, default_speed=100000) + config, default_addr=BME280_CHIP_ADDR, default_speed=100000 + ) self.mcu = self.i2c.get_mcu() - self.iir_filter = config.getint('bme280_iir_filter', 1) - self.os_temp = config.getint('bme280_oversample_temp', 2) - self.os_hum = config.getint('bme280_oversample_hum', 2) - self.os_pres = config.getint('bme280_oversample_pressure', 2) - self.gas_heat_temp = config.getint('bme280_gas_target_temp', 320) - self.gas_heat_duration = config.getint('bme280_gas_heat_duration', 150) - logging.info("BMxx80: Oversampling: Temp %dx Humid %dx Pressure %dx" % ( - pow(2, self.os_temp - 1), pow(2, self.os_hum - 1), - pow(2, self.os_pres - 1))) + self.iir_filter = config.getint("bme280_iir_filter", 1) + self.os_temp = config.getint("bme280_oversample_temp", 2) + self.os_hum = config.getint("bme280_oversample_hum", 2) + self.os_pres = config.getint("bme280_oversample_pressure", 2) + self.gas_heat_temp = config.getint("bme280_gas_target_temp", 320) + self.gas_heat_duration = config.getint("bme280_gas_heat_duration", 150) + logging.info( + "BMxx80: Oversampling: Temp %dx Humid %dx Pressure %dx" + % ( + pow(2, self.os_temp - 1), + pow(2, self.os_hum - 1), + pow(2, self.os_pres - 1), + ) + ) logging.info("BMxx80: IIR: %dx" % (pow(2, self.iir_filter) - 1)) self.iir_filter = self.iir_filter & 0x07 - self.temp = self.pressure = self.humidity = self.gas = self.t_fine = 0. - self.min_temp = self.max_temp = self.range_switching_error = 0. + self.temp = self.pressure = self.humidity = self.gas = self.t_fine = 0.0 + self.min_temp = self.max_temp = self.range_switching_error = 0.0 self.max_sample_time = None self.dig = self.sample_timer = None - self.chip_type = 'BMP280' + self.chip_type = "BMP280" self.chip_registers = BME280_REGS self.printer.add_object("bme280 " + self.name, self) - if self.printer.get_start_args().get('debugoutput') is not None: + if self.printer.get_start_args().get("debugoutput") is not None: return - self.printer.register_event_handler("klippy:connect", - self.handle_connect) + self.printer.register_event_handler("klippy:connect", self.handle_connect) self.last_gas_time = 0 def handle_connect(self): @@ -176,19 +213,19 @@ class BME280: def _init_bmxx80(self): def read_calibration_data_bmp280(calib_data_1): dig = {} - dig['T1'] = get_unsigned_short(calib_data_1[0:2]) - dig['T2'] = get_signed_short(calib_data_1[2:4]) - dig['T3'] = get_signed_short(calib_data_1[4:6]) + dig["T1"] = get_unsigned_short(calib_data_1[0:2]) + dig["T2"] = get_signed_short(calib_data_1[2:4]) + dig["T3"] = get_signed_short(calib_data_1[4:6]) - dig['P1'] = get_unsigned_short(calib_data_1[6:8]) - dig['P2'] = get_signed_short(calib_data_1[8:10]) - dig['P3'] = get_signed_short(calib_data_1[10:12]) - dig['P4'] = get_signed_short(calib_data_1[12:14]) - dig['P5'] = get_signed_short(calib_data_1[14:16]) - dig['P6'] = get_signed_short(calib_data_1[16:18]) - dig['P7'] = get_signed_short(calib_data_1[18:20]) - dig['P8'] = get_signed_short(calib_data_1[20:22]) - dig['P9'] = get_signed_short(calib_data_1[22:24]) + dig["P1"] = get_unsigned_short(calib_data_1[6:8]) + dig["P2"] = get_signed_short(calib_data_1[8:10]) + dig["P3"] = get_signed_short(calib_data_1[10:12]) + dig["P4"] = get_signed_short(calib_data_1[12:14]) + dig["P5"] = get_signed_short(calib_data_1[14:16]) + dig["P6"] = get_signed_short(calib_data_1[16:18]) + dig["P7"] = get_signed_short(calib_data_1[18:20]) + dig["P8"] = get_signed_short(calib_data_1[20:22]) + dig["P9"] = get_signed_short(calib_data_1[22:24]) return dig def read_calibration_data_bmp388(calib_data_1): @@ -216,63 +253,67 @@ class BME280: def read_calibration_data_bme280(calib_data_1, calib_data_2): dig = read_calibration_data_bmp280(calib_data_1) - dig['H1'] = calib_data_1[25] & 0xFF - dig['H2'] = get_signed_short(calib_data_2[0:2]) - dig['H3'] = calib_data_2[2] & 0xFF - dig['H4'] = get_twos_complement( - (calib_data_2[3] << 4) | (calib_data_2[4] & 0x0F), 12) - dig['H5'] = get_twos_complement( - (calib_data_2[5] << 4) | ((calib_data_2[4] & 0xF0) >> 4), 12) - dig['H6'] = get_twos_complement(calib_data_2[6], 8) + dig["H1"] = calib_data_1[25] & 0xFF + dig["H2"] = get_signed_short(calib_data_2[0:2]) + dig["H3"] = calib_data_2[2] & 0xFF + dig["H4"] = get_twos_complement( + (calib_data_2[3] << 4) | (calib_data_2[4] & 0x0F), 12 + ) + dig["H5"] = get_twos_complement( + (calib_data_2[5] << 4) | ((calib_data_2[4] & 0xF0) >> 4), 12 + ) + dig["H6"] = get_twos_complement(calib_data_2[6], 8) return dig def read_calibration_data_bme680(calib_data_1, calib_data_2): dig = {} - dig['T1'] = get_unsigned_short(calib_data_2[8:10]) - dig['T2'] = get_signed_short(calib_data_1[2:4]) - dig['T3'] = get_signed_byte(calib_data_1[4]) + dig["T1"] = get_unsigned_short(calib_data_2[8:10]) + dig["T2"] = get_signed_short(calib_data_1[2:4]) + dig["T3"] = get_signed_byte(calib_data_1[4]) - dig['P1'] = get_unsigned_short(calib_data_1[6:8]) - dig['P2'] = get_signed_short(calib_data_1[8:10]) - dig['P3'] = calib_data_1[10] - dig['P4'] = get_signed_short(calib_data_1[12:14]) - dig['P5'] = get_signed_short(calib_data_1[14:16]) - dig['P6'] = get_signed_byte(calib_data_1[17]) - dig['P7'] = get_signed_byte(calib_data_1[16]) - dig['P8'] = get_signed_short(calib_data_1[20:22]) - dig['P9'] = get_signed_short(calib_data_1[22:24]) - dig['P10'] = calib_data_1[24] + dig["P1"] = get_unsigned_short(calib_data_1[6:8]) + dig["P2"] = get_signed_short(calib_data_1[8:10]) + dig["P3"] = calib_data_1[10] + dig["P4"] = get_signed_short(calib_data_1[12:14]) + dig["P5"] = get_signed_short(calib_data_1[14:16]) + dig["P6"] = get_signed_byte(calib_data_1[17]) + dig["P7"] = get_signed_byte(calib_data_1[16]) + dig["P8"] = get_signed_short(calib_data_1[20:22]) + dig["P9"] = get_signed_short(calib_data_1[22:24]) + dig["P10"] = calib_data_1[24] - dig['H1'] = get_twos_complement( - (calib_data_2[2] << 4) | (calib_data_2[1] & 0x0F), 12) - dig['H2'] = get_twos_complement( - (calib_data_2[0] << 4) | ((calib_data_2[1] & 0xF0) >> 4), 12) - dig['H3'] = get_signed_byte(calib_data_2[3]) - dig['H4'] = get_signed_byte(calib_data_2[4]) - dig['H5'] = get_signed_byte(calib_data_2[5]) - dig['H6'] = calib_data_2[6] - dig['H7'] = get_signed_byte(calib_data_2[7]) + dig["H1"] = get_twos_complement( + (calib_data_2[2] << 4) | (calib_data_2[1] & 0x0F), 12 + ) + dig["H2"] = get_twos_complement( + (calib_data_2[0] << 4) | ((calib_data_2[1] & 0xF0) >> 4), 12 + ) + dig["H3"] = get_signed_byte(calib_data_2[3]) + dig["H4"] = get_signed_byte(calib_data_2[4]) + dig["H5"] = get_signed_byte(calib_data_2[5]) + dig["H6"] = calib_data_2[6] + dig["H7"] = get_signed_byte(calib_data_2[7]) - dig['G1'] = get_signed_byte(calib_data_2[12]) - dig['G2'] = get_signed_short(calib_data_2[10:12]) - dig['G3'] = get_signed_byte(calib_data_2[13]) + dig["G1"] = get_signed_byte(calib_data_2[12]) + dig["G2"] = get_signed_short(calib_data_2[10:12]) + dig["G3"] = get_signed_byte(calib_data_2[13]) return dig def read_calibration_data_bmp180(calib_data_1): dig = {} - dig['AC1'] = get_signed_short_msb(calib_data_1[0:2]) - dig['AC2'] = get_signed_short_msb(calib_data_1[2:4]) - dig['AC3'] = get_signed_short_msb(calib_data_1[4:6]) - dig['AC4'] = get_unsigned_short_msb(calib_data_1[6:8]) - dig['AC5'] = get_unsigned_short_msb(calib_data_1[8:10]) - dig['AC6'] = get_unsigned_short_msb(calib_data_1[10:12]) + dig["AC1"] = get_signed_short_msb(calib_data_1[0:2]) + dig["AC2"] = get_signed_short_msb(calib_data_1[2:4]) + dig["AC3"] = get_signed_short_msb(calib_data_1[4:6]) + dig["AC4"] = get_unsigned_short_msb(calib_data_1[6:8]) + dig["AC5"] = get_unsigned_short_msb(calib_data_1[8:10]) + dig["AC6"] = get_unsigned_short_msb(calib_data_1[10:12]) - dig['B1'] = get_signed_short_msb(calib_data_1[12:14]) - dig['B2'] = get_signed_short_msb(calib_data_1[14:16]) + dig["B1"] = get_signed_short_msb(calib_data_1[12:14]) + dig["B2"] = get_signed_short_msb(calib_data_1[14:16]) - dig['MB'] = get_signed_short_msb(calib_data_1[16:18]) - dig['MC'] = get_signed_short_msb(calib_data_1[18:20]) - dig['MD'] = get_signed_short_msb(calib_data_1[20:22]) + dig["MB"] = get_signed_short_msb(calib_data_1[16:18]) + dig["MC"] = get_signed_short_msb(calib_data_1[18:20]) + dig["MD"] = get_signed_short_msb(calib_data_1[20:22]) return dig chip_id = self.read_id() or self.read_bmp3_id() @@ -280,31 +321,35 @@ class BME280: logging.info("bme280: Unknown Chip ID received %#x" % chip_id) else: self.chip_type = BME_CHIPS[chip_id] - logging.info("bme280: Found Chip %s at %#x" % ( - self.chip_type, self.i2c.i2c_address)) + logging.info( + "bme280: Found Chip %s at %#x" % (self.chip_type, self.i2c.i2c_address) + ) # Reset chip - self.write_register('RESET', [RESET_CHIP_VALUE], wait=True) - self.reactor.pause(self.reactor.monotonic() + .5) + self.write_register("RESET", [RESET_CHIP_VALUE], wait=True) + self.reactor.pause(self.reactor.monotonic() + 0.5) # Make sure non-volatile memory has been copied to registers - if self.chip_type != 'BMP180': + if self.chip_type != "BMP180": # BMP180 has no status register available - status = self.read_register('STATUS', 1)[0] + status = self.read_register("STATUS", 1)[0] while status & STATUS_IM_UPDATE: - self.reactor.pause(self.reactor.monotonic() + .01) - status = self.read_register('STATUS', 1)[0] + self.reactor.pause(self.reactor.monotonic() + 0.01) + status = self.read_register("STATUS", 1)[0] - if self.chip_type == 'BME680': - self.max_sample_time = \ - (1.25 + (2.3 * self.os_temp) + ((2.3 * self.os_pres) + .575) - + ((2.3 * self.os_hum) + .575)) / 1000 + if self.chip_type == "BME680": + self.max_sample_time = ( + 1.25 + + (2.3 * self.os_temp) + + ((2.3 * self.os_pres) + 0.575) + + ((2.3 * self.os_hum) + 0.575) + ) / 1000 self.sample_timer = self.reactor.register_timer(self._sample_bme680) self.chip_registers = BME680_REGS - elif self.chip_type == 'BMP180': + elif self.chip_type == "BMP180": self.sample_timer = self.reactor.register_timer(self._sample_bmp180) self.chip_registers = BMP180_REGS - elif self.chip_type == 'BMP388': + elif self.chip_type == "BMP388": self.chip_registers = BMP388_REGS self.write_register( "PWR_CTRL", @@ -321,43 +366,46 @@ class BME280: self.write_register("INT_CTRL", [BMP388_REG_VAL_DRDY_EN]) self.sample_timer = self.reactor.register_timer(self._sample_bmp388) - elif self.chip_type == 'BME280': - self.max_sample_time = \ - (1.25 + (2.3 * self.os_temp) + ((2.3 * self.os_pres) + .575) - + ((2.3 * self.os_hum) + .575)) / 1000 + elif self.chip_type == "BME280": + self.max_sample_time = ( + 1.25 + + (2.3 * self.os_temp) + + ((2.3 * self.os_pres) + 0.575) + + ((2.3 * self.os_hum) + 0.575) + ) / 1000 self.sample_timer = self.reactor.register_timer(self._sample_bme280) self.chip_registers = BME280_REGS else: - self.max_sample_time = \ - (1.25 + (2.3 * self.os_temp) - + ((2.3 * self.os_pres) + .575)) / 1000 + self.max_sample_time = ( + 1.25 + (2.3 * self.os_temp) + ((2.3 * self.os_pres) + 0.575) + ) / 1000 self.sample_timer = self.reactor.register_timer(self._sample_bme280) self.chip_registers = BME280_REGS # Read out and calculate the trimming parameters - if self.chip_type == 'BMP180': - cal_1 = self.read_register('CAL_1', 22) - elif self.chip_type == 'BMP388': - cal_1 = self.read_register('CAL_1', 21) + if self.chip_type == "BMP180": + cal_1 = self.read_register("CAL_1", 22) + elif self.chip_type == "BMP388": + cal_1 = self.read_register("CAL_1", 21) else: - cal_1 = self.read_register('CAL_1', 26) - cal_2 = self.read_register('CAL_2', 16) - if self.chip_type == 'BME280': + cal_1 = self.read_register("CAL_1", 26) + cal_2 = self.read_register("CAL_2", 16) + if self.chip_type == "BME280": self.dig = read_calibration_data_bme280(cal_1, cal_2) - elif self.chip_type == 'BMP280': + elif self.chip_type == "BMP280": self.dig = read_calibration_data_bmp280(cal_1) - elif self.chip_type == 'BME680': + elif self.chip_type == "BME680": self.dig = read_calibration_data_bme680(cal_1, cal_2) - elif self.chip_type == 'BMP180': + elif self.chip_type == "BMP180": self.dig = read_calibration_data_bmp180(cal_1) - elif self.chip_type == 'BMP388': + elif self.chip_type == "BMP388": self.dig = read_calibration_data_bmp388(cal_1) - if self.chip_type in ('BME280', 'BMP280'): + if self.chip_type in ("BME280", "BMP280"): max_standby_time = REPORT_TIME - self.max_sample_time # 0.5 ms t_sb = 0 - if self.chip_type == 'BME280': + if self.chip_type == "BME280": if max_standby_time > 1: t_sb = 5 elif max_standby_time > 0.5: @@ -389,50 +437,51 @@ class BME280: t_sb = 1 cfg = t_sb << 5 | self.iir_filter << 2 - self.write_register('CONFIG', cfg) - if self.chip_type == 'BME280': - self.write_register('CTRL_HUM', self.os_hum) + self.write_register("CONFIG", cfg) + if self.chip_type == "BME280": + self.write_register("CTRL_HUM", self.os_hum) # Enter normal (periodic) mode meas = self.os_temp << 5 | self.os_pres << 2 | MODE_PERIODIC - self.write_register('CTRL_MEAS', meas, wait=True) + self.write_register("CTRL_MEAS", meas, wait=True) - if self.chip_type == 'BME680': - self.write_register('CONFIG', self.iir_filter << 2) + if self.chip_type == "BME680": + self.write_register("CONFIG", self.iir_filter << 2) # Should be set once and reused on every mode register write - self.write_register('CTRL_HUM', self.os_hum & 0x07) + self.write_register("CTRL_HUM", self.os_hum & 0x07) gas_wait_0 = self._calc_gas_heater_duration(self.gas_heat_duration) - self.write_register('GAS_WAIT_0', [gas_wait_0]) + self.write_register("GAS_WAIT_0", [gas_wait_0]) res_heat_0 = self._calc_gas_heater_resistance(self.gas_heat_temp) - self.write_register('RES_HEAT_0', [res_heat_0]) + self.write_register("RES_HEAT_0", [res_heat_0]) # Set initial heater current to reach Gas heater target on start - self.write_register('IDAC_HEAT_0', 96) + self.write_register("IDAC_HEAT_0", 96) def _sample_bme280(self, eventtime): # In normal mode data shadowing is performed # So reading can be done while measurements are in process try: - if self.chip_type == 'BME280': - data = self.read_register('PRESSURE_MSB', 8) - elif self.chip_type == 'BMP280': - data = self.read_register('PRESSURE_MSB', 6) + if self.chip_type == "BME280": + data = self.read_register("PRESSURE_MSB", 8) + elif self.chip_type == "BMP280": + data = self.read_register("PRESSURE_MSB", 6) else: return self.reactor.NEVER except Exception: logging.exception("BME280: Error reading data") - self.temp = self.pressure = self.humidity = .0 + self.temp = self.pressure = self.humidity = 0.0 return self.reactor.NEVER temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4) self.temp = self._compensate_temp(temp_raw) pressure_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4) - self.pressure = self._compensate_pressure_bme280(pressure_raw) / 100. - if self.chip_type == 'BME280': + self.pressure = self._compensate_pressure_bme280(pressure_raw) / 100.0 + if self.chip_type == "BME280": humid_raw = (data[6] << 8) | data[7] self.humidity = self._compensate_humidity_bme280(humid_raw) if self.temp < self.min_temp or self.temp > self.max_temp: self.printer.invoke_shutdown( "BME280 temperature %0.1f outside range of %0.1f:%.01f" - % (self.temp, self.min_temp, self.max_temp)) + % (self.temp, self.min_temp, self.max_temp) + ) measured_time = self.reactor.monotonic() self._callback(self.mcu.estimated_print_time(measured_time), self.temp) return measured_time + REPORT_TIME @@ -490,7 +539,7 @@ class BME280: partial_data1 = self.dig["P2"] * self.t_fine partial_data2 = self.dig["P3"] * (self.t_fine * self.t_fine) partial_data3 = self.dig["P4"] - partial_data3 *= (self.t_fine * self.t_fine * self.t_fine) + partial_data3 *= self.t_fine * self.t_fine * self.t_fine partial_out2 = adc_P * ( self.dig["P1"] + partial_data1 + partial_data2 + partial_data3 ) @@ -512,7 +561,7 @@ class BME280: def _sample_bme680(self, eventtime): def data_ready(stat, run_gas): - new_data = (stat & EAS_NEW_DATA) + new_data = stat & EAS_NEW_DATA gas_done = not (stat & GAS_DONE) meas_done = not (stat & MEASURE_DONE) if not run_gas: @@ -523,31 +572,30 @@ class BME280: # Check VOC once a while if self.reactor.monotonic() - self.last_gas_time > 3: gas_config = RUN_GAS | NB_CONV_0 - self.write_register('CTRL_GAS_1', [gas_config]) + self.write_register("CTRL_GAS_1", [gas_config]) run_gas = True # Enter forced mode meas = self.os_temp << 5 | self.os_pres << 2 | MODE - self.write_register('CTRL_MEAS', meas, wait=True) + self.write_register("CTRL_MEAS", meas, wait=True) max_sample_time = self.max_sample_time if run_gas: max_sample_time += self.gas_heat_duration / 1000 self.reactor.pause(self.reactor.monotonic() + max_sample_time) try: # wait until results are ready - status = self.read_register('EAS_STATUS_0', 1)[0] + status = self.read_register("EAS_STATUS_0", 1)[0] while not data_ready(status, run_gas): - self.reactor.pause( - self.reactor.monotonic() + self.max_sample_time) - status = self.read_register('EAS_STATUS_0', 1)[0] + self.reactor.pause(self.reactor.monotonic() + self.max_sample_time) + status = self.read_register("EAS_STATUS_0", 1)[0] - data = self.read_register('PRESSURE_MSB', 8) + data = self.read_register("PRESSURE_MSB", 8) gas_data = [0, 0] if run_gas: - gas_data = self.read_register('GAS_R_MSB', 2) + gas_data = self.read_register("GAS_R_MSB", 2) except Exception: logging.exception("BME680: Error reading data") - self.temp = self.pressure = self.humidity = self.gas = .0 + self.temp = self.pressure = self.humidity = self.gas = 0.0 return self.reactor.NEVER temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4) @@ -555,169 +603,191 @@ class BME280: self.temp = self._compensate_temp(temp_raw) pressure_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4) if pressure_raw != 0x80000: - self.pressure = self._compensate_pressure_bme680( - pressure_raw) / 100. + self.pressure = self._compensate_pressure_bme680(pressure_raw) / 100.0 humid_raw = (data[6] << 8) | data[7] self.humidity = self._compensate_humidity_bme680(humid_raw) - gas_valid = ((gas_data[1] & 0x20) == 0x20) + gas_valid = (gas_data[1] & 0x20) == 0x20 if gas_valid: - gas_heater_stable = ((gas_data[1] & 0x10) == 0x10) + gas_heater_stable = (gas_data[1] & 0x10) == 0x10 if not gas_heater_stable: logging.warning("BME680: Gas heater didn't reach target") gas_raw = (gas_data[0] << 2) | ((gas_data[1] & 0xC0) >> 6) - gas_range = (gas_data[1] & 0x0F) + gas_range = gas_data[1] & 0x0F self.gas = self._compensate_gas(gas_raw, gas_range) # Disable gas measurement on success gas_config = NB_CONV_0 - self.write_register('CTRL_GAS_1', [gas_config]) + self.write_register("CTRL_GAS_1", [gas_config]) self.last_gas_time = self.reactor.monotonic() if self.temp < self.min_temp or self.temp > self.max_temp: self.printer.invoke_shutdown( "BME680 temperature %0.1f outside range of %0.1f:%.01f" - % (self.temp, self.min_temp, self.max_temp)) + % (self.temp, self.min_temp, self.max_temp) + ) measured_time = self.reactor.monotonic() self._callback(self.mcu.estimated_print_time(measured_time), self.temp) return measured_time + REPORT_TIME def _sample_bmp180(self, eventtime): - meas = self.chip_registers['CRV_TEMP'] - self.write_register('CTRL_MEAS', meas) + meas = self.chip_registers["CRV_TEMP"] + self.write_register("CTRL_MEAS", meas) try: - self.reactor.pause(self.reactor.monotonic() + .01) - data = self.read_register('REG_MSB', 2) + self.reactor.pause(self.reactor.monotonic() + 0.01) + data = self.read_register("REG_MSB", 2) temp_raw = (data[0] << 8) | data[1] except Exception: logging.exception("BMP180: Error reading temperature") - self.temp = self.pressure = .0 + self.temp = self.pressure = 0.0 return self.reactor.NEVER - meas = self.chip_registers['CRV_PRES'] | (self.os_pres << 6) - self.write_register('CTRL_MEAS', meas) + meas = self.chip_registers["CRV_PRES"] | (self.os_pres << 6) + self.write_register("CTRL_MEAS", meas) try: - self.reactor.pause(self.reactor.monotonic() + .01) - data = self.read_register('REG_MSB', 3) - pressure_raw = \ - ((data[0] << 16)|(data[1] << 8)|data[2]) >> (8 - self.os_pres) + self.reactor.pause(self.reactor.monotonic() + 0.01) + data = self.read_register("REG_MSB", 3) + pressure_raw = ((data[0] << 16) | (data[1] << 8) | data[2]) >> ( + 8 - self.os_pres + ) except Exception: logging.exception("BMP180: Error reading pressure") - self.temp = self.pressure = .0 + self.temp = self.pressure = 0.0 return self.reactor.NEVER self.temp = self._compensate_temp_bmp180(temp_raw) - self.pressure = self._compensate_pressure_bmp180(pressure_raw) / 100. + self.pressure = self._compensate_pressure_bmp180(pressure_raw) / 100.0 if self.temp < self.min_temp or self.temp > self.max_temp: self.printer.invoke_shutdown( "BMP180 temperature %0.1f outside range of %0.1f:%.01f" - % (self.temp, self.min_temp, self.max_temp)) + % (self.temp, self.min_temp, self.max_temp) + ) measured_time = self.reactor.monotonic() self._callback(self.mcu.estimated_print_time(measured_time), self.temp) return measured_time + REPORT_TIME - def _compensate_temp(self, raw_temp): dig = self.dig - var1 = ((raw_temp / 16384. - (dig['T1'] / 1024.)) * dig['T2']) + var1 = (raw_temp / 16384.0 - (dig["T1"] / 1024.0)) * dig["T2"] var2 = ( - ((raw_temp / 131072.) - (dig['T1'] / 8192.)) * - ((raw_temp / 131072.) - (dig['T1'] / 8192.)) * dig['T3']) + ((raw_temp / 131072.0) - (dig["T1"] / 8192.0)) + * ((raw_temp / 131072.0) - (dig["T1"] / 8192.0)) + * dig["T3"] + ) self.t_fine = var1 + var2 return self.t_fine / 5120.0 def _compensate_pressure_bme280(self, raw_pressure): dig = self.dig t_fine = self.t_fine - var1 = t_fine / 2. - 64000. - var2 = var1 * var1 * dig['P6'] / 32768. - var2 = var2 + var1 * dig['P5'] * 2. - var2 = var2 / 4. + (dig['P4'] * 65536.) - var1 = (dig['P3'] * var1 * var1 / 524288. + dig['P2'] * var1) / 524288. - var1 = (1. + var1 / 32768.) * dig['P1'] + var1 = t_fine / 2.0 - 64000.0 + var2 = var1 * var1 * dig["P6"] / 32768.0 + var2 = var2 + var1 * dig["P5"] * 2.0 + var2 = var2 / 4.0 + (dig["P4"] * 65536.0) + var1 = (dig["P3"] * var1 * var1 / 524288.0 + dig["P2"] * var1) / 524288.0 + var1 = (1.0 + var1 / 32768.0) * dig["P1"] if var1 == 0: - return 0. + return 0.0 else: pressure = 1048576.0 - raw_pressure - pressure = ((pressure - var2 / 4096.) * 6250.) / var1 - var1 = dig['P9'] * pressure * pressure / 2147483648. - var2 = pressure * dig['P8'] / 32768. - return pressure + (var1 + var2 + dig['P7']) / 16. + pressure = ((pressure - var2 / 4096.0) * 6250.0) / var1 + var1 = dig["P9"] * pressure * pressure / 2147483648.0 + var2 = pressure * dig["P8"] / 32768.0 + return pressure + (var1 + var2 + dig["P7"]) / 16.0 def _compensate_pressure_bme680(self, raw_pressure): dig = self.dig t_fine = self.t_fine - var1 = t_fine / 2. - 64000. - var2 = var1 * var1 * dig['P6'] / 131072. - var2 = var2 + var1 * dig['P5'] * 2. - var2 = var2 / 4. + (dig['P4'] * 65536.) - var1 = (dig['P3'] * var1 * var1 / 16384. + dig['P2'] * var1) / 524288. - var1 = (1. + var1 / 32768.) * dig['P1'] + var1 = t_fine / 2.0 - 64000.0 + var2 = var1 * var1 * dig["P6"] / 131072.0 + var2 = var2 + var1 * dig["P5"] * 2.0 + var2 = var2 / 4.0 + (dig["P4"] * 65536.0) + var1 = (dig["P3"] * var1 * var1 / 16384.0 + dig["P2"] * var1) / 524288.0 + var1 = (1.0 + var1 / 32768.0) * dig["P1"] if var1 == 0: - return 0. + return 0.0 else: pressure = 1048576.0 - raw_pressure - pressure = ((pressure - var2 / 4096.) * 6250.) / var1 - var1 = dig['P9'] * pressure * pressure / 2147483648. - var2 = pressure * dig['P8'] / 32768. - var3 = (pressure / 256.) * (pressure / 256.) * (pressure / 256.) * ( - dig['P10'] / 131072.) - return pressure + (var1 + var2 + var3 + (dig['P7'] * 128.)) / 16. + pressure = ((pressure - var2 / 4096.0) * 6250.0) / var1 + var1 = dig["P9"] * pressure * pressure / 2147483648.0 + var2 = pressure * dig["P8"] / 32768.0 + var3 = ( + (pressure / 256.0) + * (pressure / 256.0) + * (pressure / 256.0) + * (dig["P10"] / 131072.0) + ) + return pressure + (var1 + var2 + var3 + (dig["P7"] * 128.0)) / 16.0 def _compensate_humidity_bme280(self, raw_humidity): dig = self.dig t_fine = self.t_fine - humidity = t_fine - 76800. - h1 = ( - raw_humidity - ( - dig['H4'] * 64. + dig['H5'] / 16384. * humidity)) - h2 = (dig['H2'] / 65536. * (1. + dig['H6'] / 67108864. * humidity * - (1. + dig['H3'] / 67108864. * humidity))) + humidity = t_fine - 76800.0 + h1 = raw_humidity - (dig["H4"] * 64.0 + dig["H5"] / 16384.0 * humidity) + h2 = ( + dig["H2"] + / 65536.0 + * ( + 1.0 + + dig["H6"] + / 67108864.0 + * humidity + * (1.0 + dig["H3"] / 67108864.0 * humidity) + ) + ) humidity = h1 * h2 - humidity = humidity * (1. - dig['H1'] * humidity / 524288.) - return min(100., max(0., humidity)) + humidity = humidity * (1.0 - dig["H1"] * humidity / 524288.0) + return min(100.0, max(0.0, humidity)) def _compensate_humidity_bme680(self, raw_humidity): dig = self.dig temp_comp = self.temp - var1 = raw_humidity - ( - (dig['H1'] * 16.) + ((dig['H3'] / 2.) * temp_comp)) - var2 = var1 * ((dig['H2'] / 262144.) * - (1. + ((dig['H4'] / 16384.) * temp_comp) + - ((dig['H5'] / 1048576.) * temp_comp * temp_comp))) - var3 = dig['H6'] / 16384. - var4 = dig['H7'] / 2097152. + var1 = raw_humidity - ((dig["H1"] * 16.0) + ((dig["H3"] / 2.0) * temp_comp)) + var2 = var1 * ( + (dig["H2"] / 262144.0) + * ( + 1.0 + + ((dig["H4"] / 16384.0) * temp_comp) + + ((dig["H5"] / 1048576.0) * temp_comp * temp_comp) + ) + ) + var3 = dig["H6"] / 16384.0 + var4 = dig["H7"] / 2097152.0 humidity = var2 + ((var3 + (var4 * temp_comp)) * var2 * var2) - return min(100., max(0., humidity)) + return min(100.0, max(0.0, humidity)) def _compensate_gas(self, gas_raw, gas_range): - gas_switching_error = self.read_register('RANGE_SWITCHING_ERROR', 1)[0] - var1 = (1340. + 5. * gas_switching_error) * \ - BME680_GAS_CONSTANTS[gas_range][0] - gas = var1 * BME680_GAS_CONSTANTS[gas_range][1] / ( - gas_raw - 512. + var1) + gas_switching_error = self.read_register("RANGE_SWITCHING_ERROR", 1)[0] + var1 = (1340.0 + 5.0 * gas_switching_error) * BME680_GAS_CONSTANTS[gas_range][0] + gas = var1 * BME680_GAS_CONSTANTS[gas_range][1] / (gas_raw - 512.0 + var1) return gas def _calc_gas_heater_resistance(self, target_temp): amb_temp = self.temp - heater_data = self.read_register('RES_HEAT_VAL', 3) + heater_data = self.read_register("RES_HEAT_VAL", 3) res_heat_val = get_signed_byte(heater_data[0]) res_heat_range = (heater_data[2] & 0x30) >> 4 dig = self.dig - var1 = (dig['G1'] / 16.) + 49. - var2 = ((dig['G2'] / 32768.) * 0.0005) + 0.00235 - var3 = dig['G3'] / 1024. - var4 = var1 * (1. + (var2 * target_temp)) + var1 = (dig["G1"] / 16.0) + 49.0 + var2 = ((dig["G2"] / 32768.0) * 0.0005) + 0.00235 + var3 = dig["G3"] / 1024.0 + var4 = var1 * (1.0 + (var2 * target_temp)) var5 = var4 + (var3 * amb_temp) - res_heat = (3.4 * ((var5 * (4. / (4. + res_heat_range)) - * (1. / (1. + (res_heat_val * 0.002)))) - 25)) + res_heat = 3.4 * ( + ( + var5 + * (4.0 / (4.0 + res_heat_range)) + * (1.0 / (1.0 + (res_heat_val * 0.002))) + ) + - 25 + ) return int(res_heat) def _calc_gas_heater_duration(self, duration_ms): if duration_ms >= 4032: - duration_reg = 0xff + duration_reg = 0xFF else: factor = 0 while duration_ms > 0x3F: @@ -729,54 +799,54 @@ class BME280: def _compensate_temp_bmp180(self, raw_temp): dig = self.dig - x1 = (raw_temp - dig['AC6']) * dig['AC5'] / 32768. - x2 = dig['MC'] * 2048 / (x1 + dig['MD']) + x1 = (raw_temp - dig["AC6"]) * dig["AC5"] / 32768.0 + x2 = dig["MC"] * 2048 / (x1 + dig["MD"]) b5 = x1 + x2 self.t_fine = b5 - return (b5 + 8)/16./10. + return (b5 + 8) / 16.0 / 10.0 def _compensate_pressure_bmp180(self, raw_pressure): dig = self.dig b5 = self.t_fine b6 = b5 - 4000 - x1 = (dig['B2'] * (b6 * b6 / 4096)) / 2048 - x2 = dig['AC2'] * b6 / 2048 + x1 = (dig["B2"] * (b6 * b6 / 4096)) / 2048 + x2 = dig["AC2"] * b6 / 2048 x3 = x1 + x2 - b3 = ((int(dig['AC1'] * 4 + x3) << self.os_pres) + 2) / 4 - x1 = dig['AC3'] * b6 / 8192 - x2 = (dig['B1'] * (b6 * b6 / 4096)) / 65536 + b3 = ((int(dig["AC1"] * 4 + x3) << self.os_pres) + 2) / 4 + x1 = dig["AC3"] * b6 / 8192 + x2 = (dig["B1"] * (b6 * b6 / 4096)) / 65536 x3 = ((x1 + x2) + 2) / 4 - b4 = dig['AC4'] * (x3 + 32768) / 32768 + b4 = dig["AC4"] * (x3 + 32768) / 32768 b7 = (raw_pressure - b3) * (50000 >> self.os_pres) - if (b7 < 0x80000000): + if b7 < 0x80000000: p = (b7 * 2) / b4 else: p = (b7 / b4) * 2 x1 = (p / 256) * (p / 256) x1 = (x1 * 3038) / 65536 x2 = (-7357 * p) / 65536 - p = p + (x1 + x2 + 3791) / 16. + p = p + (x1 + x2 + 3791) / 16.0 return p def read_id(self): # read chip id register regs = [BME_CHIP_ID_REG] params = self.i2c.i2c_read(regs, 1) - return bytearray(params['response'])[0] + return bytearray(params["response"])[0] def read_bmp3_id(self): # read chip id register regs = [BMP3_CHIP_ID_REG] params = self.i2c.i2c_read(regs, 1) - return bytearray(params['response'])[0] + return bytearray(params["response"])[0] def read_register(self, reg_name, read_len): # read a single register regs = [self.chip_registers[reg_name]] params = self.i2c.i2c_read(regs, read_len) - return bytearray(params['response']) + return bytearray(params["response"]) - def write_register(self, reg_name, data, wait = False): + def write_register(self, reg_name, data, wait=False): if type(data) is not list: data = [data] reg = self.chip_registers[reg_name] @@ -787,14 +857,11 @@ class BME280: self.i2c.i2c_write_wait_ack(data) def get_status(self, eventtime): - data = { - 'temperature': round(self.temp, 2), - 'pressure': self.pressure - } - if self.chip_type in ('BME280', 'BME680'): - data['humidity'] = self.humidity - if self.chip_type == 'BME680': - data['gas'] = self.gas + data = {"temperature": round(self.temp, 2), "pressure": self.pressure} + if self.chip_type in ("BME280", "BME680"): + data["humidity"] = self.humidity + if self.chip_type == "BME680": + data["gas"] = self.gas return data |