diff options
Diffstat (limited to 'klippy/extras/bulk_sensor.py')
-rw-r--r-- | klippy/extras/bulk_sensor.py | 66 |
1 files changed, 28 insertions, 38 deletions
diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py index 95f6201e..a8497afc 100644 --- a/klippy/extras/bulk_sensor.py +++ b/klippy/extras/bulk_sensor.py @@ -22,7 +22,7 @@ class BatchBulkHelper: self.is_started = False self.batch_interval = batch_interval self.batch_timer = None - self.clients = {} + self.client_cbs = [] self.webhooks_start_resp = {} # Periodic batch processing def _start(self): @@ -34,14 +34,14 @@ class BatchBulkHelper: except self.printer.command_error as e: logging.exception("BatchBulkHelper start callback error") self.is_started = False - self.clients.clear() + del self.client_cbs[:] raise reactor = self.printer.get_reactor() systime = reactor.monotonic() waketime = systime + self.batch_interval self.batch_timer = reactor.register_timer(self._proc_batch, waketime) def _stop(self): - self.clients.clear() + del self.client_cbs[:] self.printer.get_reactor().unregister_timer(self.batch_timer) self.batch_timer = None if not self.is_started: @@ -50,9 +50,9 @@ class BatchBulkHelper: self.stop_cb() except self.printer.command_error as e: logging.exception("BatchBulkHelper stop callback error") - self.clients.clear() + del self.client_cbs[:] self.is_started = False - if self.clients: + if self.client_cbs: # New client started while in process of stopping self._start() def _proc_batch(self, eventtime): @@ -64,51 +64,41 @@ class BatchBulkHelper: return self.printer.get_reactor().NEVER if not msg: return eventtime + self.batch_interval - for cconn, template in list(self.clients.items()): - if cconn.is_closed(): - del self.clients[cconn] - if not self.clients: + for client_cb in list(self.client_cbs): + res = client_cb(msg) + if not res: + # This client no longer needs updates - unregister it + self.client_cbs.remove(client_cb) + if not self.client_cbs: self._stop() return self.printer.get_reactor().NEVER - continue - tmp = dict(template) - tmp['params'] = msg - cconn.send(tmp) return eventtime + self.batch_interval - # Internal clients - def add_internal_client(self): - cconn = InternalDumpClient() - self.clients[cconn] = {} + # Client registration + def add_client(self, client_cb): + self.client_cbs.append(client_cb) self._start() - return cconn # Webhooks registration def _add_api_client(self, web_request): - cconn = web_request.get_client_connection() - template = web_request.get_dict('response_template', {}) - self.clients[cconn] = template - self._start() + whbatch = BatchWebhooksClient(web_request) + self.add_client(whbatch.handle_batch) web_request.send(self.webhooks_start_resp) def add_mux_endpoint(self, path, key, value, webhooks_start_resp): self.webhooks_start_resp = webhooks_start_resp wh = self.printer.lookup_object('webhooks') wh.register_mux_endpoint(path, key, value, self._add_api_client) -# An "internal webhooks" wrapper for using BatchBulkHelper internally -class InternalDumpClient: - def __init__(self): - self.msgs = [] - self.is_done = False - def get_messages(self): - return self.msgs - def finalize(self): - self.is_done = True - def is_closed(self): - return self.is_done - def send(self, msg): - self.msgs.append(msg) - if len(self.msgs) >= 10000: - # Avoid filling up memory with too many samples - self.finalize() +# A webhooks wrapper for use by BatchBulkHelper +class BatchWebhooksClient: + def __init__(self, web_request): + self.cconn = web_request.get_client_connection() + self.template = web_request.get_dict('response_template', {}) + def handle_batch(self, msg): + if self.cconn.is_closed(): + return False + tmp = dict(self.template) + tmp['params'] = msg + self.cconn.send(tmp) + return True # Helper class to store incoming messages in a queue class BulkDataQueue: |