aboutsummaryrefslogtreecommitdiffstats
path: root/klippy
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2021-07-09 22:04:10 -0400
committerKevin O'Connor <kevin@koconnor.net>2022-03-29 20:34:46 -0400
commit74937326d313ae741c42207035ebcf8c09aa9e01 (patch)
treee4c42e6dc3fccd54cbd6942019835edc358b8cc3 /klippy
parent91ba9c00e33a8dde38eb303e617c5f878db5922d (diff)
downloadkutter-74937326d313ae741c42207035ebcf8c09aa9e01.tar.gz
kutter-74937326d313ae741c42207035ebcf8c09aa9e01.tar.xz
kutter-74937326d313ae741c42207035ebcf8c09aa9e01.zip
sensor_angle: Add support for bulk querying of spi angle sensors
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy')
-rw-r--r--klippy/extras/angle.py188
1 files changed, 188 insertions, 0 deletions
diff --git a/klippy/extras/angle.py b/klippy/extras/angle.py
new file mode 100644
index 00000000..43d4fd1d
--- /dev/null
+++ b/klippy/extras/angle.py
@@ -0,0 +1,188 @@
+# Support for reading SPI magnetic angle sensors
+#
+# Copyright (C) 2021,2022 Kevin O'Connor <kevin@koconnor.net>
+#
+# This file may be distributed under the terms of the GNU GPLv3 license.
+import logging, threading
+from . import bus, motion_report
+
+MIN_MSG_TIME = 0.100
+TCODE_ERROR = 0xff
+
+class HelperA1333:
+ SPI_MODE = 3
+ SPI_SPEED = 10000000
+ def __init__(self, config, spi, oid):
+ self.spi = spi
+ def get_static_delay(self):
+ return .000001
+ def start(self):
+ # Setup for angle query
+ self.spi.spi_transfer([0x32, 0x00])
+
+class HelperAS5047D:
+ SPI_MODE = 1
+ SPI_SPEED = int(1. / .000000350)
+ def __init__(self, config, spi, oid):
+ self.spi = spi
+ def get_static_delay(self):
+ return .000100
+ def start(self):
+ # Clear any errors from device
+ self.spi.spi_transfer([0xff, 0xfc]) # Read DIAAGC
+ self.spi.spi_transfer([0x40, 0x01]) # Read ERRFL
+ self.spi.spi_transfer([0xc0, 0x00]) # Read NOP
+
+class HelperTLE5012B:
+ SPI_MODE = 1
+ SPI_SPEED = 4000000
+ def __init__(self, config, spi, oid):
+ self.spi = spi
+ def get_static_delay(self):
+ return .000042700 * 2.5
+ def start(self):
+ # Clear any errors from device
+ self.spi.spi_transfer([0x80, 0x01, 0x00, 0x00, 0x00, 0x00]) # Read STAT
+
+SAMPLE_PERIOD = 0.000400
+
+class Angle:
+ def __init__(self, config):
+ self.printer = config.get_printer()
+ self.sample_period = config.getfloat('sample_period', SAMPLE_PERIOD,
+ above=0.)
+ # Measurement conversion
+ self.start_clock = self.time_shift = self.sample_ticks = 0
+ self.last_sequence = self.last_angle = 0
+ # Measurement storage (accessed from background thread)
+ self.lock = threading.Lock()
+ self.raw_samples = []
+ # Sensor type
+ sensors = { "a1333": HelperA1333, "as5047d": HelperAS5047D,
+ "tle5012b": HelperTLE5012B }
+ sensor_type = config.getchoice('sensor_type', {s: s for s in sensors})
+ sensor_class = sensors[sensor_type]
+ self.spi = bus.MCU_SPI_from_config(config, sensor_class.SPI_MODE,
+ default_speed=sensor_class.SPI_SPEED)
+ self.mcu = mcu = self.spi.get_mcu()
+ self.oid = oid = mcu.create_oid()
+ self.sensor_helper = sensor_class(config, self.spi, oid)
+ # Setup mcu sensor_spi_angle bulk query code
+ self.query_spi_angle_cmd = self.query_spi_angle_end_cmd = None
+ mcu.add_config_cmd(
+ "config_spi_angle oid=%d spi_oid=%d spi_angle_type=%s"
+ % (oid, self.spi.get_oid(), sensor_type))
+ mcu.add_config_cmd(
+ "query_spi_angle oid=%d clock=0 rest_ticks=0 time_shift=0"
+ % (oid,), on_restart=True)
+ mcu.register_config_callback(self._build_config)
+ mcu.register_response(self._handle_spi_angle_data,
+ "spi_angle_data", oid)
+ # API server endpoints
+ self.api_dump = motion_report.APIDumpHelper(
+ self.printer, self._api_update, self._api_startstop, 0.100)
+ self.name = config.get_name().split()[1]
+ wh = self.printer.lookup_object('webhooks')
+ wh.register_mux_endpoint("angle/dump_angle", "sensor", self.name,
+ self._handle_dump_angle)
+ def _build_config(self):
+ freq = self.mcu.seconds_to_clock(1.)
+ while float(TCODE_ERROR << self.time_shift) / freq < 0.002:
+ self.time_shift += 1
+ cmdqueue = self.spi.get_command_queue()
+ self.query_spi_angle_cmd = self.mcu.lookup_command(
+ "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c",
+ cq=cmdqueue)
+ self.query_spi_angle_end_cmd = self.mcu.lookup_query_command(
+ "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c",
+ "spi_angle_end oid=%c sequence=%hu", oid=self.oid, cq=cmdqueue)
+ # Measurement collection
+ def is_measuring(self):
+ return self.start_clock != 0
+ def _handle_spi_angle_data(self, params):
+ with self.lock:
+ self.raw_samples.append(params)
+ def _extract_samples(self, raw_samples):
+ # Load variables to optimize inner loop below
+ sample_ticks = self.sample_ticks
+ start_clock = self.start_clock
+ clock_to_print_time = self.mcu.clock_to_print_time
+ last_sequence = self.last_sequence
+ last_angle = self.last_angle
+ time_shift = self.time_shift
+ static_delay = self.sensor_helper.get_static_delay()
+ # Process every message in raw_samples
+ count = error_count = 0
+ samples = [None] * (len(raw_samples) * 16)
+ for params in raw_samples:
+ seq = (last_sequence & ~0xffff) | params['sequence']
+ if seq < last_sequence:
+ seq += 0x10000
+ last_sequence = seq
+ d = bytearray(params['data'])
+ msg_mclock = start_clock + seq*16*sample_ticks
+ for i in range(len(d) // 3):
+ tcode = d[i*3]
+ if tcode == TCODE_ERROR:
+ error_count += 1
+ continue
+ raw_angle = d[i*3 + 1] | (d[i*3 + 2] << 8)
+ angle_diff = (last_angle - raw_angle) & 0xffff
+ angle_diff -= (angle_diff & 0x8000) << 1
+ last_angle -= angle_diff
+ mclock = msg_mclock + i*sample_ticks + (tcode<<time_shift)
+ ptime = round(clock_to_print_time(mclock) - static_delay, 6)
+ samples[count] = (ptime, last_angle)
+ count += 1
+ self.last_sequence = last_sequence
+ self.last_angle = last_angle
+ del samples[count:]
+ return samples, error_count
+ # API interface
+ def _api_update(self, eventtime):
+ with self.lock:
+ raw_samples = self.raw_samples
+ self.raw_samples = []
+ if not raw_samples:
+ return {}
+ samples, error_count = self._extract_samples(raw_samples)
+ if not samples:
+ return {}
+ return {'data': samples, 'errors': error_count}
+ def _start_measurements(self):
+ if self.is_measuring():
+ return
+ logging.info("Starting angle '%s' measurements", self.name)
+ self.sensor_helper.start()
+ # Start bulk reading
+ with self.lock:
+ self.raw_samples = []
+ self.last_sequence = 0
+ systime = self.printer.get_reactor().monotonic()
+ print_time = self.mcu.estimated_print_time(systime) + MIN_MSG_TIME
+ self.start_clock = reqclock = self.mcu.print_time_to_clock(print_time)
+ rest_ticks = self.mcu.seconds_to_clock(self.sample_period)
+ self.sample_ticks = rest_ticks
+ self.query_spi_angle_cmd.send([self.oid, reqclock, rest_ticks,
+ self.time_shift], reqclock=reqclock)
+ def _finish_measurements(self):
+ if not self.is_measuring():
+ return
+ # Halt bulk reading
+ params = self.query_spi_angle_end_cmd.send([self.oid, 0, 0, 0])
+ self.start_clock = 0
+ with self.lock:
+ self.raw_samples = []
+ logging.info("Stopped angle '%s' measurements", self.name)
+ def _api_startstop(self, is_start):
+ if is_start:
+ self._start_measurements()
+ else:
+ self._finish_measurements()
+ def _handle_dump_angle(self, web_request):
+ self.api_dump.add_client(web_request)
+ hdr = ('time', 'angle')
+ web_request.send({'header': hdr})
+
+def load_config_prefix(config):
+ return Angle(config)