aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/bulk_sensor.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/bulk_sensor.py')
-rw-r--r--klippy/extras/bulk_sensor.py104
1 files changed, 73 insertions, 31 deletions
diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py
index b0aa320d..cd112e1a 100644
--- a/klippy/extras/bulk_sensor.py
+++ b/klippy/extras/bulk_sensor.py
@@ -19,23 +19,31 @@ import logging, threading, struct
BATCH_INTERVAL = 0.500
+
# Helper to process accumulated messages in periodic batches
class BatchBulkHelper:
- def __init__(self, printer, batch_cb, start_cb=None, stop_cb=None,
- batch_interval=BATCH_INTERVAL):
+ def __init__(
+ self,
+ printer,
+ batch_cb,
+ start_cb=None,
+ stop_cb=None,
+ batch_interval=BATCH_INTERVAL,
+ ):
self.printer = printer
self.batch_cb = batch_cb
if start_cb is None:
- start_cb = (lambda: None)
+ start_cb = lambda: None
self.start_cb = start_cb
if stop_cb is None:
- stop_cb = (lambda: None)
+ stop_cb = lambda: None
self.stop_cb = stop_cb
self.is_started = False
self.batch_interval = batch_interval
self.batch_timer = None
self.client_cbs = []
self.webhooks_start_resp = {}
+
# Periodic batch processing
def _start(self):
if self.is_started:
@@ -52,6 +60,7 @@ class BatchBulkHelper:
systime = reactor.monotonic()
waketime = systime + self.batch_interval
self.batch_timer = reactor.register_timer(self._proc_batch, waketime)
+
def _stop(self):
del self.client_cbs[:]
self.printer.get_reactor().unregister_timer(self.batch_timer)
@@ -67,6 +76,7 @@ class BatchBulkHelper:
if self.client_cbs:
# New client started while in process of stopping
self._start()
+
def _proc_batch(self, eventtime):
try:
msg = self.batch_cb(eventtime)
@@ -85,33 +95,39 @@ class BatchBulkHelper:
self._stop()
return self.printer.get_reactor().NEVER
return eventtime + self.batch_interval
+
# Client registration
def add_client(self, client_cb):
self.client_cbs.append(client_cb)
self._start()
+
# Webhooks registration
def _add_api_client(self, web_request):
whbatch = BatchWebhooksClient(web_request)
self.add_client(whbatch.handle_batch)
web_request.send(self.webhooks_start_resp)
+
def add_mux_endpoint(self, path, key, value, webhooks_start_resp):
self.webhooks_start_resp = webhooks_start_resp
- wh = self.printer.lookup_object('webhooks')
+ wh = self.printer.lookup_object("webhooks")
wh.register_mux_endpoint(path, key, value, self._add_api_client)
+
# A webhooks wrapper for use by BatchBulkHelper
class BatchWebhooksClient:
def __init__(self, web_request):
self.cconn = web_request.get_client_connection()
- self.template = web_request.get_dict('response_template', {})
+ self.template = web_request.get_dict("response_template", {})
+
def handle_batch(self, msg):
if self.cconn.is_closed():
return False
tmp = dict(self.template)
- tmp['params'] = msg
+ tmp["params"] = msg
self.cconn.send(tmp)
return True
+
# Helper class to store incoming messages in a queue
class BulkDataQueue:
def __init__(self, mcu, msg_name="sensor_bulk_data", oid=None):
@@ -120,14 +136,17 @@ class BulkDataQueue:
self.raw_samples = []
# Register callback with mcu
mcu.register_response(self._handle_data, msg_name, oid)
+
def _handle_data(self, params):
with self.lock:
self.raw_samples.append(params)
+
def pull_queue(self):
with self.lock:
raw_samples = self.raw_samples
self.raw_samples = []
return raw_samples
+
def clear_queue(self):
self.pull_queue()
@@ -150,35 +169,42 @@ class BulkDataQueue:
# estimate of measurement frequency, which can then also be utilized
# to determine the time of the first measurement.
+
# Helper class for chip clock synchronization via linear regression
class ClockSyncRegression:
- def __init__(self, mcu, chip_clock_smooth, decay = 1. / 20.):
+ def __init__(self, mcu, chip_clock_smooth, decay=1.0 / 20.0):
self.mcu = mcu
self.chip_clock_smooth = chip_clock_smooth
self.decay = decay
- self.last_chip_clock = self.last_exp_mcu_clock = 0.
- self.mcu_clock_avg = self.mcu_clock_variance = 0.
- self.chip_clock_avg = self.chip_clock_covariance = 0.
+ self.last_chip_clock = self.last_exp_mcu_clock = 0.0
+ self.mcu_clock_avg = self.mcu_clock_variance = 0.0
+ self.chip_clock_avg = self.chip_clock_covariance = 0.0
+
def reset(self, mcu_clock, chip_clock):
self.mcu_clock_avg = self.last_mcu_clock = mcu_clock
self.chip_clock_avg = chip_clock
- self.mcu_clock_variance = self.chip_clock_covariance = 0.
- self.last_chip_clock = self.last_exp_mcu_clock = 0.
+ self.mcu_clock_variance = self.chip_clock_covariance = 0.0
+ self.last_chip_clock = self.last_exp_mcu_clock = 0.0
+
def update(self, mcu_clock, chip_clock):
# Update linear regression
decay = self.decay
diff_mcu_clock = mcu_clock - self.mcu_clock_avg
self.mcu_clock_avg += decay * diff_mcu_clock
- self.mcu_clock_variance = (1. - decay) * (
- self.mcu_clock_variance + diff_mcu_clock**2 * decay)
+ self.mcu_clock_variance = (1.0 - decay) * (
+ self.mcu_clock_variance + diff_mcu_clock**2 * decay
+ )
diff_chip_clock = chip_clock - self.chip_clock_avg
self.chip_clock_avg += decay * diff_chip_clock
- self.chip_clock_covariance = (1. - decay) * (
- self.chip_clock_covariance + diff_mcu_clock*diff_chip_clock*decay)
+ self.chip_clock_covariance = (1.0 - decay) * (
+ self.chip_clock_covariance + diff_mcu_clock * diff_chip_clock * decay
+ )
+
def set_last_chip_clock(self, chip_clock):
base_mcu, base_chip, inv_cfreq = self.get_clock_translation()
self.last_chip_clock = chip_clock
- self.last_exp_mcu_clock = base_mcu + (chip_clock-base_chip) * inv_cfreq
+ self.last_exp_mcu_clock = base_mcu + (chip_clock - base_chip) * inv_cfreq
+
def get_clock_translation(self):
inv_chip_freq = self.mcu_clock_variance / self.chip_clock_covariance
if not self.last_chip_clock:
@@ -191,6 +217,7 @@ class ClockSyncRegression:
mdiff = s_mcu_clock - self.last_exp_mcu_clock
s_inv_chip_freq = mdiff / self.chip_clock_smooth
return self.last_exp_mcu_clock, self.last_chip_clock, s_inv_chip_freq
+
def get_time_translation(self):
base_mcu, base_chip, inv_cfreq = self.get_clock_translation()
clock_to_print_time = self.mcu.clock_to_print_time
@@ -198,8 +225,10 @@ class ClockSyncRegression:
inv_freq = clock_to_print_time(base_mcu + inv_cfreq) - base_time
return base_time, base_chip, inv_freq
+
MAX_BULK_MSG_SIZE = 51
+
# Read sensor_bulk_data and calculate timestamps for devices that take
# samples at a fixed frequency (and produce fixed data size samples).
class FixedFreqReader:
@@ -213,19 +242,26 @@ class FixedFreqReader:
self.last_sequence = self.max_query_duration = 0
self.last_overflows = 0
self.bulk_queue = self.oid = self.query_status_cmd = None
+
def setup_query_command(self, msgformat, oid, cq):
# Lookup sensor query command (that responds with sensor_bulk_status)
self.oid = oid
self.query_status_cmd = self.mcu.lookup_query_command(
- msgformat, "sensor_bulk_status oid=%c clock=%u query_ticks=%u"
+ msgformat,
+ "sensor_bulk_status oid=%c clock=%u query_ticks=%u"
" next_sequence=%hu buffered=%u possible_overflows=%hu",
- oid=oid, cq=cq)
+ oid=oid,
+ cq=cq,
+ )
# Read sensor_bulk_data messages and store in a queue
self.bulk_queue = BulkDataQueue(self.mcu, oid=oid)
+
def get_last_overflows(self):
return self.last_overflows
+
def _clear_duration_filter(self):
self.max_query_duration = 1 << 31
+
def note_start(self):
self.last_sequence = 0
self.last_overflows = 0
@@ -235,26 +271,31 @@ class FixedFreqReader:
self._clear_duration_filter()
self._update_clock(is_reset=True)
self._clear_duration_filter()
+
def note_end(self):
# Clear local queue (free no longer needed memory)
self.bulk_queue.clear_queue()
+
def _update_clock(self, is_reset=False):
params = self.query_status_cmd.send([self.oid])
- mcu_clock = self.mcu.clock32_to_clock64(params['clock'])
- seq_diff = (params['next_sequence'] - self.last_sequence) & 0xffff
+ mcu_clock = self.mcu.clock32_to_clock64(params["clock"])
+ seq_diff = (params["next_sequence"] - self.last_sequence) & 0xFFFF
self.last_sequence += seq_diff
- buffered = params['buffered']
- po_diff = (params['possible_overflows'] - self.last_overflows) & 0xffff
+ buffered = params["buffered"]
+ po_diff = (params["possible_overflows"] - self.last_overflows) & 0xFFFF
self.last_overflows += po_diff
- duration = params['query_ticks']
+ duration = params["query_ticks"]
if duration > self.max_query_duration:
# Skip measurement as a high query time could skew clock tracking
- self.max_query_duration = max(2 * self.max_query_duration,
- self.mcu.seconds_to_clock(.000005))
+ self.max_query_duration = max(
+ 2 * self.max_query_duration, self.mcu.seconds_to_clock(0.000005)
+ )
return
self.max_query_duration = 2 * duration
- msg_count = (self.last_sequence * self.samples_per_block
- + buffered // self.bytes_per_sample)
+ msg_count = (
+ self.last_sequence * self.samples_per_block
+ + buffered // self.bytes_per_sample
+ )
# The "chip clock" is the message counter plus .5 for average
# inaccuracy of query responses and plus .5 for assumed offset
# of hardware processing time.
@@ -264,6 +305,7 @@ class FixedFreqReader:
self.clock_sync.reset(avg_mcu_clock, chip_clock)
else:
self.clock_sync.update(avg_mcu_clock, chip_clock)
+
# Convert sensor_bulk_data responses into list of samples
def pull_samples(self):
# Query MCU for sample timing and update clock synchronization
@@ -282,11 +324,11 @@ class FixedFreqReader:
count = seq = 0
samples = [None] * (len(raw_samples) * samples_per_block)
for params in raw_samples:
- seq_diff = (params['sequence'] - last_sequence) & 0xffff
+ seq_diff = (params["sequence"] - last_sequence) & 0xFFFF
seq_diff -= (seq_diff & 0x8000) << 1
seq = last_sequence + seq_diff
msg_cdiff = seq * samples_per_block - chip_base
- data = params['data']
+ data = params["data"]
for i in range(len(data) // bytes_per_sample):
ptime = time_base + (msg_cdiff + i) * inv_freq
udata = unpack_from(data, i * bytes_per_sample)