From ffd44c02194ecabdd559cac1654aa9a65ecb2c28 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Sat, 16 Dec 2023 14:07:06 -0500 Subject: bulk_sensor: Move APIDumpHelper() from motion_report.py to bulk_sensor.py Signed-off-by: Kevin O'Connor --- klippy/extras/bulk_sensor.py | 93 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) (limited to 'klippy/extras/bulk_sensor.py') diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py index 28aed48a..8f166ec8 100644 --- a/klippy/extras/bulk_sensor.py +++ b/klippy/extras/bulk_sensor.py @@ -5,6 +5,99 @@ # This file may be distributed under the terms of the GNU GPLv3 license. import threading +API_UPDATE_INTERVAL = 0.500 + +# Helper to periodically transmit data to a set of API clients +class APIDumpHelper: + def __init__(self, printer, data_cb, startstop_cb=None, + update_interval=API_UPDATE_INTERVAL): + self.printer = printer + self.data_cb = data_cb + if startstop_cb is None: + startstop_cb = (lambda is_start: None) + self.startstop_cb = startstop_cb + self.is_started = False + self.update_interval = update_interval + self.update_timer = None + self.clients = {} + def _stop(self): + self.clients.clear() + reactor = self.printer.get_reactor() + reactor.unregister_timer(self.update_timer) + self.update_timer = None + if not self.is_started: + return reactor.NEVER + try: + self.startstop_cb(False) + except self.printer.command_error as e: + logging.exception("API Dump Helper stop callback error") + self.clients.clear() + self.is_started = False + if self.clients: + # New client started while in process of stopping + self._start() + return reactor.NEVER + def _start(self): + if self.is_started: + return + self.is_started = True + try: + self.startstop_cb(True) + except self.printer.command_error as e: + logging.exception("API Dump Helper start callback error") + self.is_started = False + self.clients.clear() + raise + reactor = self.printer.get_reactor() + systime = reactor.monotonic() + waketime = systime + self.update_interval + self.update_timer = reactor.register_timer(self._update, waketime) + def add_client(self, web_request): + cconn = web_request.get_client_connection() + template = web_request.get_dict('response_template', {}) + self.clients[cconn] = template + self._start() + def add_internal_client(self): + cconn = InternalDumpClient() + self.clients[cconn] = {} + self._start() + return cconn + def _update(self, eventtime): + try: + msg = self.data_cb(eventtime) + except self.printer.command_error as e: + logging.exception("API Dump Helper data callback error") + return self._stop() + if not msg: + return eventtime + self.update_interval + for cconn, template in list(self.clients.items()): + if cconn.is_closed(): + del self.clients[cconn] + if not self.clients: + return self._stop() + continue + tmp = dict(template) + tmp['params'] = msg + cconn.send(tmp) + return eventtime + self.update_interval + +# An "internal webhooks" wrapper for using APIDumpHelper internally +class InternalDumpClient: + def __init__(self): + self.msgs = [] + self.is_done = False + def get_messages(self): + return self.msgs + def finalize(self): + self.is_done = True + def is_closed(self): + return self.is_done + def send(self, msg): + self.msgs.append(msg) + if len(self.msgs) >= 10000: + # Avoid filling up memory with too many samples + self.finalize() + # Helper class to store incoming messages in a queue class BulkDataQueue: def __init__(self, mcu, msg_name, oid): -- cgit v1.2.3-70-g09d2