aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/bulk_sensor.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/bulk_sensor.py')
-rw-r--r--klippy/extras/bulk_sensor.py66
1 files changed, 28 insertions, 38 deletions
diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py
index 95f6201e..a8497afc 100644
--- a/klippy/extras/bulk_sensor.py
+++ b/klippy/extras/bulk_sensor.py
@@ -22,7 +22,7 @@ class BatchBulkHelper:
self.is_started = False
self.batch_interval = batch_interval
self.batch_timer = None
- self.clients = {}
+ self.client_cbs = []
self.webhooks_start_resp = {}
# Periodic batch processing
def _start(self):
@@ -34,14 +34,14 @@ class BatchBulkHelper:
except self.printer.command_error as e:
logging.exception("BatchBulkHelper start callback error")
self.is_started = False
- self.clients.clear()
+ del self.client_cbs[:]
raise
reactor = self.printer.get_reactor()
systime = reactor.monotonic()
waketime = systime + self.batch_interval
self.batch_timer = reactor.register_timer(self._proc_batch, waketime)
def _stop(self):
- self.clients.clear()
+ del self.client_cbs[:]
self.printer.get_reactor().unregister_timer(self.batch_timer)
self.batch_timer = None
if not self.is_started:
@@ -50,9 +50,9 @@ class BatchBulkHelper:
self.stop_cb()
except self.printer.command_error as e:
logging.exception("BatchBulkHelper stop callback error")
- self.clients.clear()
+ del self.client_cbs[:]
self.is_started = False
- if self.clients:
+ if self.client_cbs:
# New client started while in process of stopping
self._start()
def _proc_batch(self, eventtime):
@@ -64,51 +64,41 @@ class BatchBulkHelper:
return self.printer.get_reactor().NEVER
if not msg:
return eventtime + self.batch_interval
- for cconn, template in list(self.clients.items()):
- if cconn.is_closed():
- del self.clients[cconn]
- if not self.clients:
+ for client_cb in list(self.client_cbs):
+ res = client_cb(msg)
+ if not res:
+ # This client no longer needs updates - unregister it
+ self.client_cbs.remove(client_cb)
+ if not self.client_cbs:
self._stop()
return self.printer.get_reactor().NEVER
- continue
- tmp = dict(template)
- tmp['params'] = msg
- cconn.send(tmp)
return eventtime + self.batch_interval
- # Internal clients
- def add_internal_client(self):
- cconn = InternalDumpClient()
- self.clients[cconn] = {}
+ # Client registration
+ def add_client(self, client_cb):
+ self.client_cbs.append(client_cb)
self._start()
- return cconn
# Webhooks registration
def _add_api_client(self, web_request):
- cconn = web_request.get_client_connection()
- template = web_request.get_dict('response_template', {})
- self.clients[cconn] = template
- self._start()
+ whbatch = BatchWebhooksClient(web_request)
+ self.add_client(whbatch.handle_batch)
web_request.send(self.webhooks_start_resp)
def add_mux_endpoint(self, path, key, value, webhooks_start_resp):
self.webhooks_start_resp = webhooks_start_resp
wh = self.printer.lookup_object('webhooks')
wh.register_mux_endpoint(path, key, value, self._add_api_client)
-# An "internal webhooks" wrapper for using BatchBulkHelper internally
-class InternalDumpClient:
- def __init__(self):
- self.msgs = []
- self.is_done = False
- def get_messages(self):
- return self.msgs
- def finalize(self):
- self.is_done = True
- def is_closed(self):
- return self.is_done
- def send(self, msg):
- self.msgs.append(msg)
- if len(self.msgs) >= 10000:
- # Avoid filling up memory with too many samples
- self.finalize()
+# A webhooks wrapper for use by BatchBulkHelper
+class BatchWebhooksClient:
+ def __init__(self, web_request):
+ self.cconn = web_request.get_client_connection()
+ self.template = web_request.get_dict('response_template', {})
+ def handle_batch(self, msg):
+ if self.cconn.is_closed():
+ return False
+ tmp = dict(self.template)
+ tmp['params'] = msg
+ self.cconn.send(tmp)
+ return True
# Helper class to store incoming messages in a queue
class BulkDataQueue: