diff options
author | Kevin O'Connor <kevin@koconnor.net> | 2023-12-17 00:15:55 -0500 |
---|---|---|
committer | Kevin O'Connor <kevin@koconnor.net> | 2023-12-26 11:47:21 -0500 |
commit | c716edafe291a3d32700becfb67cb1504cd6902b (patch) | |
tree | 35a3fe64f505406e4ab11b672a5724a0bcc1a16f /klippy/extras/bulk_sensor.py | |
parent | 337013459303a220e1c3552583676c35b4800dd0 (diff) | |
download | kutter-c716edafe291a3d32700becfb67cb1504cd6902b.tar.gz kutter-c716edafe291a3d32700becfb67cb1504cd6902b.tar.xz kutter-c716edafe291a3d32700becfb67cb1504cd6902b.zip |
bulk_sensor: Simplify the registration of internal clients in BatchBulkHelper
Previously, the BatchBulkHelper class was designed primarily to
register webhook clients, and internal clients used a wrapper class
that emulated a webhooks client.
Change BatchBulkHelper to support regular internal callbacks, and
introduce a new BatchWebhooksClient class that can translate these
internal callback to webhooks client messages.
This makes it easier to register internal clients that can process the
bulk messages every batch interval.
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy/extras/bulk_sensor.py')
-rw-r--r-- | klippy/extras/bulk_sensor.py | 66 |
1 files changed, 28 insertions, 38 deletions
diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py index 95f6201e..a8497afc 100644 --- a/klippy/extras/bulk_sensor.py +++ b/klippy/extras/bulk_sensor.py @@ -22,7 +22,7 @@ class BatchBulkHelper: self.is_started = False self.batch_interval = batch_interval self.batch_timer = None - self.clients = {} + self.client_cbs = [] self.webhooks_start_resp = {} # Periodic batch processing def _start(self): @@ -34,14 +34,14 @@ class BatchBulkHelper: except self.printer.command_error as e: logging.exception("BatchBulkHelper start callback error") self.is_started = False - self.clients.clear() + del self.client_cbs[:] raise reactor = self.printer.get_reactor() systime = reactor.monotonic() waketime = systime + self.batch_interval self.batch_timer = reactor.register_timer(self._proc_batch, waketime) def _stop(self): - self.clients.clear() + del self.client_cbs[:] self.printer.get_reactor().unregister_timer(self.batch_timer) self.batch_timer = None if not self.is_started: @@ -50,9 +50,9 @@ class BatchBulkHelper: self.stop_cb() except self.printer.command_error as e: logging.exception("BatchBulkHelper stop callback error") - self.clients.clear() + del self.client_cbs[:] self.is_started = False - if self.clients: + if self.client_cbs: # New client started while in process of stopping self._start() def _proc_batch(self, eventtime): @@ -64,51 +64,41 @@ class BatchBulkHelper: return self.printer.get_reactor().NEVER if not msg: return eventtime + self.batch_interval - for cconn, template in list(self.clients.items()): - if cconn.is_closed(): - del self.clients[cconn] - if not self.clients: + for client_cb in list(self.client_cbs): + res = client_cb(msg) + if not res: + # This client no longer needs updates - unregister it + self.client_cbs.remove(client_cb) + if not self.client_cbs: self._stop() return self.printer.get_reactor().NEVER - continue - tmp = dict(template) - tmp['params'] = msg - cconn.send(tmp) return eventtime + self.batch_interval - # Internal clients - def add_internal_client(self): - cconn = InternalDumpClient() - self.clients[cconn] = {} + # Client registration + def add_client(self, client_cb): + self.client_cbs.append(client_cb) self._start() - return cconn # Webhooks registration def _add_api_client(self, web_request): - cconn = web_request.get_client_connection() - template = web_request.get_dict('response_template', {}) - self.clients[cconn] = template - self._start() + whbatch = BatchWebhooksClient(web_request) + self.add_client(whbatch.handle_batch) web_request.send(self.webhooks_start_resp) def add_mux_endpoint(self, path, key, value, webhooks_start_resp): self.webhooks_start_resp = webhooks_start_resp wh = self.printer.lookup_object('webhooks') wh.register_mux_endpoint(path, key, value, self._add_api_client) -# An "internal webhooks" wrapper for using BatchBulkHelper internally -class InternalDumpClient: - def __init__(self): - self.msgs = [] - self.is_done = False - def get_messages(self): - return self.msgs - def finalize(self): - self.is_done = True - def is_closed(self): - return self.is_done - def send(self, msg): - self.msgs.append(msg) - if len(self.msgs) >= 10000: - # Avoid filling up memory with too many samples - self.finalize() +# A webhooks wrapper for use by BatchBulkHelper +class BatchWebhooksClient: + def __init__(self, web_request): + self.cconn = web_request.get_client_connection() + self.template = web_request.get_dict('response_template', {}) + def handle_batch(self, msg): + if self.cconn.is_closed(): + return False + tmp = dict(self.template) + tmp['params'] = msg + self.cconn.send(tmp) + return True # Helper class to store incoming messages in a queue class BulkDataQueue: |