diff options
Diffstat (limited to 'klippy/extras/bulk_sensor.py')
-rw-r--r-- | klippy/extras/bulk_sensor.py | 104 |
1 files changed, 73 insertions, 31 deletions
diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py index b0aa320d..cd112e1a 100644 --- a/klippy/extras/bulk_sensor.py +++ b/klippy/extras/bulk_sensor.py @@ -19,23 +19,31 @@ import logging, threading, struct BATCH_INTERVAL = 0.500 + # Helper to process accumulated messages in periodic batches class BatchBulkHelper: - def __init__(self, printer, batch_cb, start_cb=None, stop_cb=None, - batch_interval=BATCH_INTERVAL): + def __init__( + self, + printer, + batch_cb, + start_cb=None, + stop_cb=None, + batch_interval=BATCH_INTERVAL, + ): self.printer = printer self.batch_cb = batch_cb if start_cb is None: - start_cb = (lambda: None) + start_cb = lambda: None self.start_cb = start_cb if stop_cb is None: - stop_cb = (lambda: None) + stop_cb = lambda: None self.stop_cb = stop_cb self.is_started = False self.batch_interval = batch_interval self.batch_timer = None self.client_cbs = [] self.webhooks_start_resp = {} + # Periodic batch processing def _start(self): if self.is_started: @@ -52,6 +60,7 @@ class BatchBulkHelper: systime = reactor.monotonic() waketime = systime + self.batch_interval self.batch_timer = reactor.register_timer(self._proc_batch, waketime) + def _stop(self): del self.client_cbs[:] self.printer.get_reactor().unregister_timer(self.batch_timer) @@ -67,6 +76,7 @@ class BatchBulkHelper: if self.client_cbs: # New client started while in process of stopping self._start() + def _proc_batch(self, eventtime): try: msg = self.batch_cb(eventtime) @@ -85,33 +95,39 @@ class BatchBulkHelper: self._stop() return self.printer.get_reactor().NEVER return eventtime + self.batch_interval + # Client registration def add_client(self, client_cb): self.client_cbs.append(client_cb) self._start() + # Webhooks registration def _add_api_client(self, web_request): whbatch = BatchWebhooksClient(web_request) self.add_client(whbatch.handle_batch) web_request.send(self.webhooks_start_resp) + def add_mux_endpoint(self, path, key, value, webhooks_start_resp): self.webhooks_start_resp = webhooks_start_resp - wh = self.printer.lookup_object('webhooks') + wh = self.printer.lookup_object("webhooks") wh.register_mux_endpoint(path, key, value, self._add_api_client) + # A webhooks wrapper for use by BatchBulkHelper class BatchWebhooksClient: def __init__(self, web_request): self.cconn = web_request.get_client_connection() - self.template = web_request.get_dict('response_template', {}) + self.template = web_request.get_dict("response_template", {}) + def handle_batch(self, msg): if self.cconn.is_closed(): return False tmp = dict(self.template) - tmp['params'] = msg + tmp["params"] = msg self.cconn.send(tmp) return True + # Helper class to store incoming messages in a queue class BulkDataQueue: def __init__(self, mcu, msg_name="sensor_bulk_data", oid=None): @@ -120,14 +136,17 @@ class BulkDataQueue: self.raw_samples = [] # Register callback with mcu mcu.register_response(self._handle_data, msg_name, oid) + def _handle_data(self, params): with self.lock: self.raw_samples.append(params) + def pull_queue(self): with self.lock: raw_samples = self.raw_samples self.raw_samples = [] return raw_samples + def clear_queue(self): self.pull_queue() @@ -150,35 +169,42 @@ class BulkDataQueue: # estimate of measurement frequency, which can then also be utilized # to determine the time of the first measurement. + # Helper class for chip clock synchronization via linear regression class ClockSyncRegression: - def __init__(self, mcu, chip_clock_smooth, decay = 1. / 20.): + def __init__(self, mcu, chip_clock_smooth, decay=1.0 / 20.0): self.mcu = mcu self.chip_clock_smooth = chip_clock_smooth self.decay = decay - self.last_chip_clock = self.last_exp_mcu_clock = 0. - self.mcu_clock_avg = self.mcu_clock_variance = 0. - self.chip_clock_avg = self.chip_clock_covariance = 0. + self.last_chip_clock = self.last_exp_mcu_clock = 0.0 + self.mcu_clock_avg = self.mcu_clock_variance = 0.0 + self.chip_clock_avg = self.chip_clock_covariance = 0.0 + def reset(self, mcu_clock, chip_clock): self.mcu_clock_avg = self.last_mcu_clock = mcu_clock self.chip_clock_avg = chip_clock - self.mcu_clock_variance = self.chip_clock_covariance = 0. - self.last_chip_clock = self.last_exp_mcu_clock = 0. + self.mcu_clock_variance = self.chip_clock_covariance = 0.0 + self.last_chip_clock = self.last_exp_mcu_clock = 0.0 + def update(self, mcu_clock, chip_clock): # Update linear regression decay = self.decay diff_mcu_clock = mcu_clock - self.mcu_clock_avg self.mcu_clock_avg += decay * diff_mcu_clock - self.mcu_clock_variance = (1. - decay) * ( - self.mcu_clock_variance + diff_mcu_clock**2 * decay) + self.mcu_clock_variance = (1.0 - decay) * ( + self.mcu_clock_variance + diff_mcu_clock**2 * decay + ) diff_chip_clock = chip_clock - self.chip_clock_avg self.chip_clock_avg += decay * diff_chip_clock - self.chip_clock_covariance = (1. - decay) * ( - self.chip_clock_covariance + diff_mcu_clock*diff_chip_clock*decay) + self.chip_clock_covariance = (1.0 - decay) * ( + self.chip_clock_covariance + diff_mcu_clock * diff_chip_clock * decay + ) + def set_last_chip_clock(self, chip_clock): base_mcu, base_chip, inv_cfreq = self.get_clock_translation() self.last_chip_clock = chip_clock - self.last_exp_mcu_clock = base_mcu + (chip_clock-base_chip) * inv_cfreq + self.last_exp_mcu_clock = base_mcu + (chip_clock - base_chip) * inv_cfreq + def get_clock_translation(self): inv_chip_freq = self.mcu_clock_variance / self.chip_clock_covariance if not self.last_chip_clock: @@ -191,6 +217,7 @@ class ClockSyncRegression: mdiff = s_mcu_clock - self.last_exp_mcu_clock s_inv_chip_freq = mdiff / self.chip_clock_smooth return self.last_exp_mcu_clock, self.last_chip_clock, s_inv_chip_freq + def get_time_translation(self): base_mcu, base_chip, inv_cfreq = self.get_clock_translation() clock_to_print_time = self.mcu.clock_to_print_time @@ -198,8 +225,10 @@ class ClockSyncRegression: inv_freq = clock_to_print_time(base_mcu + inv_cfreq) - base_time return base_time, base_chip, inv_freq + MAX_BULK_MSG_SIZE = 51 + # Read sensor_bulk_data and calculate timestamps for devices that take # samples at a fixed frequency (and produce fixed data size samples). class FixedFreqReader: @@ -213,19 +242,26 @@ class FixedFreqReader: self.last_sequence = self.max_query_duration = 0 self.last_overflows = 0 self.bulk_queue = self.oid = self.query_status_cmd = None + def setup_query_command(self, msgformat, oid, cq): # Lookup sensor query command (that responds with sensor_bulk_status) self.oid = oid self.query_status_cmd = self.mcu.lookup_query_command( - msgformat, "sensor_bulk_status oid=%c clock=%u query_ticks=%u" + msgformat, + "sensor_bulk_status oid=%c clock=%u query_ticks=%u" " next_sequence=%hu buffered=%u possible_overflows=%hu", - oid=oid, cq=cq) + oid=oid, + cq=cq, + ) # Read sensor_bulk_data messages and store in a queue self.bulk_queue = BulkDataQueue(self.mcu, oid=oid) + def get_last_overflows(self): return self.last_overflows + def _clear_duration_filter(self): self.max_query_duration = 1 << 31 + def note_start(self): self.last_sequence = 0 self.last_overflows = 0 @@ -235,26 +271,31 @@ class FixedFreqReader: self._clear_duration_filter() self._update_clock(is_reset=True) self._clear_duration_filter() + def note_end(self): # Clear local queue (free no longer needed memory) self.bulk_queue.clear_queue() + def _update_clock(self, is_reset=False): params = self.query_status_cmd.send([self.oid]) - mcu_clock = self.mcu.clock32_to_clock64(params['clock']) - seq_diff = (params['next_sequence'] - self.last_sequence) & 0xffff + mcu_clock = self.mcu.clock32_to_clock64(params["clock"]) + seq_diff = (params["next_sequence"] - self.last_sequence) & 0xFFFF self.last_sequence += seq_diff - buffered = params['buffered'] - po_diff = (params['possible_overflows'] - self.last_overflows) & 0xffff + buffered = params["buffered"] + po_diff = (params["possible_overflows"] - self.last_overflows) & 0xFFFF self.last_overflows += po_diff - duration = params['query_ticks'] + duration = params["query_ticks"] if duration > self.max_query_duration: # Skip measurement as a high query time could skew clock tracking - self.max_query_duration = max(2 * self.max_query_duration, - self.mcu.seconds_to_clock(.000005)) + self.max_query_duration = max( + 2 * self.max_query_duration, self.mcu.seconds_to_clock(0.000005) + ) return self.max_query_duration = 2 * duration - msg_count = (self.last_sequence * self.samples_per_block - + buffered // self.bytes_per_sample) + msg_count = ( + self.last_sequence * self.samples_per_block + + buffered // self.bytes_per_sample + ) # The "chip clock" is the message counter plus .5 for average # inaccuracy of query responses and plus .5 for assumed offset # of hardware processing time. @@ -264,6 +305,7 @@ class FixedFreqReader: self.clock_sync.reset(avg_mcu_clock, chip_clock) else: self.clock_sync.update(avg_mcu_clock, chip_clock) + # Convert sensor_bulk_data responses into list of samples def pull_samples(self): # Query MCU for sample timing and update clock synchronization @@ -282,11 +324,11 @@ class FixedFreqReader: count = seq = 0 samples = [None] * (len(raw_samples) * samples_per_block) for params in raw_samples: - seq_diff = (params['sequence'] - last_sequence) & 0xffff + seq_diff = (params["sequence"] - last_sequence) & 0xFFFF seq_diff -= (seq_diff & 0x8000) << 1 seq = last_sequence + seq_diff msg_cdiff = seq * samples_per_block - chip_base - data = params['data'] + data = params["data"] for i in range(len(data) // bytes_per_sample): ptime = time_base + (msg_cdiff + i) * inv_freq udata = unpack_from(data, i * bytes_per_sample) |