aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/aht10.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/aht10.py')
-rw-r--r--klippy/extras/aht10.py78
1 files changed, 43 insertions, 35 deletions
diff --git a/klippy/extras/aht10.py b/klippy/extras/aht10.py
index 001f7e54..f680004d 100644
--- a/klippy/extras/aht10.py
+++ b/klippy/extras/aht10.py
@@ -13,15 +13,16 @@ from . import bus
# AHT21 - Tested w/ BTT GTR 1.0 MCU on i2c3
######################################################################
-AHT10_I2C_ADDR= 0x38
+AHT10_I2C_ADDR = 0x38
AHT10_COMMANDS = {
- 'INIT' :[0xE1, 0x08, 0x00],
- 'MEASURE' :[0xAC, 0x33, 0x00],
- 'RESET' :[0xBA, 0x08, 0x00]
+ "INIT": [0xE1, 0x08, 0x00],
+ "MEASURE": [0xAC, 0x33, 0x00],
+ "RESET": [0xBA, 0x08, 0x00],
}
-AHT10_MAX_BUSY_CYCLES= 5
+AHT10_MAX_BUSY_CYCLES = 5
+
class AHT10:
def __init__(self, config):
@@ -29,14 +30,14 @@ class AHT10:
self.name = config.get_name().split()[-1]
self.reactor = self.printer.get_reactor()
self.i2c = bus.MCU_I2C_from_config(
- config, default_addr=AHT10_I2C_ADDR, default_speed=100000)
- self.report_time = config.getint('aht10_report_time',30,minval=5)
- self.temp = self.min_temp = self.max_temp = self.humidity = 0.
+ config, default_addr=AHT10_I2C_ADDR, default_speed=100000
+ )
+ self.report_time = config.getint("aht10_report_time", 30, minval=5)
+ self.temp = self.min_temp = self.max_temp = self.humidity = 0.0
self.sample_timer = self.reactor.register_timer(self._sample_aht10)
self.printer.add_object("aht10 " + self.name, self)
- self.printer.register_event_handler("klippy:connect",
- self.handle_connect)
- self.is_calibrated = False
+ self.printer.register_event_handler("klippy:connect", self.handle_connect)
+ self.is_calibrated = False
self.init_sent = False
def handle_connect(self):
@@ -67,28 +68,31 @@ class AHT10:
# Check if we're constantly busy. If so, send soft-reset
# and issue warning.
if is_busy and cycles > AHT10_MAX_BUSY_CYCLES:
- logging.warning("aht10: device reported busy after " +
- "%d cycles, resetting device"% AHT10_MAX_BUSY_CYCLES)
+ logging.warning(
+ "aht10: device reported busy after "
+ + "%d cycles, resetting device" % AHT10_MAX_BUSY_CYCLES
+ )
self._reset_device()
data = None
break
cycles += 1
# Write command for updating temperature+status bit
- self.i2c.i2c_write(AHT10_COMMANDS['MEASURE'])
+ self.i2c.i2c_write(AHT10_COMMANDS["MEASURE"])
# Wait 110ms after first read, 75ms minimum
- self.reactor.pause(self.reactor.monotonic() + .110)
+ self.reactor.pause(self.reactor.monotonic() + 0.110)
# Read data
read = self.i2c.i2c_read([], 6)
if read is None:
- logging.warning("aht10: received data from" +
- " i2c_read is None")
+ logging.warning("aht10: received data from" + " i2c_read is None")
continue
- data = bytearray(read['response'])
+ data = bytearray(read["response"])
if len(data) < 6:
- logging.warning("aht10: received bytes less than" +
- " expected 6 [%d]"%len(data))
+ logging.warning(
+ "aht10: received bytes less than"
+ + " expected 6 [%d]" % len(data)
+ )
continue
self.is_calibrated = True if (data[0] & 0b00000100) else False
@@ -97,19 +101,20 @@ class AHT10:
if is_busy:
return False
except Exception as e:
- logging.exception("aht10: exception encountered" +
- " reading data: %s"%str(e))
+ logging.exception(
+ "aht10: exception encountered" + " reading data: %s" % str(e)
+ )
return False
temp = ((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5]
- self.temp = ((temp*200) / 1048576) - 50
+ self.temp = ((temp * 200) / 1048576) - 50
hum = ((data[1] << 16) | (data[2] << 8) | data[3]) >> 4
self.humidity = int(hum * 100 / 1048576)
# Clamp humidity
- if (self.humidity > 100):
+ if self.humidity > 100:
self.humidity = 100
- elif (self.humidity < 0):
+ elif self.humidity < 0:
self.humidity = 0
return True
@@ -119,30 +124,33 @@ class AHT10:
return
# Reset device
- self.i2c.i2c_write(AHT10_COMMANDS['RESET'])
+ self.i2c.i2c_write(AHT10_COMMANDS["RESET"])
# Wait 100ms after reset
- self.reactor.pause(self.reactor.monotonic() + .10)
+ self.reactor.pause(self.reactor.monotonic() + 0.10)
def _init_aht10(self):
# Init device
- self.i2c.i2c_write(AHT10_COMMANDS['INIT'])
+ self.i2c.i2c_write(AHT10_COMMANDS["INIT"])
# Wait 100ms after init
- self.reactor.pause(self.reactor.monotonic() + .10)
+ self.reactor.pause(self.reactor.monotonic() + 0.10)
self.init_sent = True
if self._make_measurement():
- logging.info("aht10: successfully initialized, initial temp: " +
- "%.3f, humidity: %.3f"%(self.temp, self.humidity))
+ logging.info(
+ "aht10: successfully initialized, initial temp: "
+ + "%.3f, humidity: %.3f" % (self.temp, self.humidity)
+ )
def _sample_aht10(self, eventtime):
if not self._make_measurement():
- self.temp = self.humidity = .0
+ self.temp = self.humidity = 0.0
return self.reactor.NEVER
if self.temp < self.min_temp or self.temp > self.max_temp:
self.printer.invoke_shutdown(
"AHT10 temperature %0.1f outside range of %0.1f:%.01f"
- % (self.temp, self.min_temp, self.max_temp))
+ % (self.temp, self.min_temp, self.max_temp)
+ )
measured_time = self.reactor.monotonic()
print_time = self.i2c.get_mcu().estimated_print_time(measured_time)
@@ -151,8 +159,8 @@ class AHT10:
def get_status(self, eventtime):
return {
- 'temperature': round(self.temp, 2),
- 'humidity': self.humidity,
+ "temperature": round(self.temp, 2),
+ "humidity": self.humidity,
}