aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--klippy/extras/adxl345.py43
-rw-r--r--klippy/extras/angle.py19
-rw-r--r--klippy/extras/bulk_sensor.py66
-rw-r--r--klippy/extras/lis2dw.py5
-rw-r--r--klippy/extras/mpu9250.py5
5 files changed, 69 insertions, 69 deletions
diff --git a/klippy/extras/adxl345.py b/klippy/extras/adxl345.py
index 1dfb6bc7..8f40c7fe 100644
--- a/klippy/extras/adxl345.py
+++ b/klippy/extras/adxl345.py
@@ -32,26 +32,29 @@ Accel_Measurement = collections.namedtuple(
# Helper class to obtain measurements
class AccelQueryHelper:
- def __init__(self, printer, cconn):
+ def __init__(self, printer):
self.printer = printer
- self.cconn = cconn
+ self.is_finished = False
print_time = printer.lookup_object('toolhead').get_last_move_time()
self.request_start_time = self.request_end_time = print_time
- self.samples = self.raw_samples = []
+ self.msgs = []
+ self.samples = []
def finish_measurements(self):
toolhead = self.printer.lookup_object('toolhead')
self.request_end_time = toolhead.get_last_move_time()
toolhead.wait_moves()
- self.cconn.finalize()
- def _get_raw_samples(self):
- raw_samples = self.cconn.get_messages()
- if raw_samples:
- self.raw_samples = raw_samples
- return self.raw_samples
+ self.is_finished = True
+ def handle_batch(self, msg):
+ if self.is_finished:
+ return False
+ if len(self.msgs) >= 10000:
+ # Avoid filling up memory with too many samples
+ return False
+ self.msgs.append(msg)
+ return True
def has_valid_samples(self):
- raw_samples = self._get_raw_samples()
- for msg in raw_samples:
- data = msg['params']['data']
+ for msg in self.msgs:
+ data = msg['data']
first_sample_time = data[0][0]
last_sample_time = data[-1][0]
if (first_sample_time > self.request_end_time
@@ -60,21 +63,20 @@ class AccelQueryHelper:
# The time intervals [first_sample_time, last_sample_time]
# and [request_start_time, request_end_time] have non-zero
# intersection. It is still theoretically possible that none
- # of the samples from raw_samples fall into the time interval
+ # of the samples from msgs fall into the time interval
# [request_start_time, request_end_time] if it is too narrow
# or on very heavy data losses. In practice, that interval
# is at least 1 second, so this possibility is negligible.
return True
return False
def get_samples(self):
- raw_samples = self._get_raw_samples()
- if not raw_samples:
+ if not self.msgs:
return self.samples
- total = sum([len(m['params']['data']) for m in raw_samples])
+ total = sum([len(m['data']) for m in self.msgs])
count = 0
self.samples = samples = [None] * total
- for msg in raw_samples:
- for samp_time, x, y, z in msg['params']['data']:
+ for msg in self.msgs:
+ for samp_time, x, y, z in msg['data']:
if samp_time < self.request_start_time:
continue
if samp_time > self.request_end_time:
@@ -250,8 +252,9 @@ class ADXL345:
"(e.g. faulty wiring) or a faulty adxl345 chip." % (
reg, val, stored_val))
def start_internal_client(self):
- cconn = self.batch_bulk.add_internal_client()
- return AccelQueryHelper(self.printer, cconn)
+ aqh = AccelQueryHelper(self.printer)
+ self.batch_bulk.add_client(aqh.handle_batch)
+ return aqh
# Measurement decoding
def _extract_samples(self, raw_samples):
# Load variables to optimize inner loop below
diff --git a/klippy/extras/angle.py b/klippy/extras/angle.py
index 0fe053df..b1aa0d96 100644
--- a/klippy/extras/angle.py
+++ b/klippy/extras/angle.py
@@ -157,8 +157,14 @@ class AngleCalibration:
def do_calibration_moves(self):
move = self.printer.lookup_object('force_move').manual_move
# Start data collection
- angle_sensor = self.printer.lookup_object(self.name)
- cconn = angle_sensor.start_internal_client()
+ msgs = []
+ is_finished = False
+ def handle_batch(msg):
+ if is_finished:
+ return False
+ msgs.append(msg)
+ return True
+ self.printer.lookup_object(self.name).add_client(handle_batch)
# Move stepper several turns (to allow internal sensor calibration)
microsteps, full_steps = self.get_microsteps()
mcu_stepper = self.mcu_stepper
@@ -190,13 +196,12 @@ class AngleCalibration:
move(mcu_stepper, .5*rotation_dist + align_dist, move_speed)
toolhead.wait_moves()
# Finish data collection
- cconn.finalize()
- msgs = cconn.get_messages()
+ is_finished = True
# Correlate query responses
cal = {}
step = 0
for msg in msgs:
- for query_time, pos in msg['params']['data']:
+ for query_time, pos in msg['data']:
# Add to step tracking
while step < len(times) and query_time > times[step][1]:
step += 1
@@ -462,8 +467,8 @@ class Angle:
"spi_angle_end oid=%c sequence=%hu", oid=self.oid, cq=cmdqueue)
def get_status(self, eventtime=None):
return {'temperature': self.sensor_helper.last_temperature}
- def start_internal_client(self):
- return self.batch_bulk.add_internal_client()
+ def add_client(self, client_cb):
+ self.batch_bulk.add_client(client_cb)
# Measurement decoding
def _extract_samples(self, raw_samples):
# Load variables to optimize inner loop below
diff --git a/klippy/extras/bulk_sensor.py b/klippy/extras/bulk_sensor.py
index 95f6201e..a8497afc 100644
--- a/klippy/extras/bulk_sensor.py
+++ b/klippy/extras/bulk_sensor.py
@@ -22,7 +22,7 @@ class BatchBulkHelper:
self.is_started = False
self.batch_interval = batch_interval
self.batch_timer = None
- self.clients = {}
+ self.client_cbs = []
self.webhooks_start_resp = {}
# Periodic batch processing
def _start(self):
@@ -34,14 +34,14 @@ class BatchBulkHelper:
except self.printer.command_error as e:
logging.exception("BatchBulkHelper start callback error")
self.is_started = False
- self.clients.clear()
+ del self.client_cbs[:]
raise
reactor = self.printer.get_reactor()
systime = reactor.monotonic()
waketime = systime + self.batch_interval
self.batch_timer = reactor.register_timer(self._proc_batch, waketime)
def _stop(self):
- self.clients.clear()
+ del self.client_cbs[:]
self.printer.get_reactor().unregister_timer(self.batch_timer)
self.batch_timer = None
if not self.is_started:
@@ -50,9 +50,9 @@ class BatchBulkHelper:
self.stop_cb()
except self.printer.command_error as e:
logging.exception("BatchBulkHelper stop callback error")
- self.clients.clear()
+ del self.client_cbs[:]
self.is_started = False
- if self.clients:
+ if self.client_cbs:
# New client started while in process of stopping
self._start()
def _proc_batch(self, eventtime):
@@ -64,51 +64,41 @@ class BatchBulkHelper:
return self.printer.get_reactor().NEVER
if not msg:
return eventtime + self.batch_interval
- for cconn, template in list(self.clients.items()):
- if cconn.is_closed():
- del self.clients[cconn]
- if not self.clients:
+ for client_cb in list(self.client_cbs):
+ res = client_cb(msg)
+ if not res:
+ # This client no longer needs updates - unregister it
+ self.client_cbs.remove(client_cb)
+ if not self.client_cbs:
self._stop()
return self.printer.get_reactor().NEVER
- continue
- tmp = dict(template)
- tmp['params'] = msg
- cconn.send(tmp)
return eventtime + self.batch_interval
- # Internal clients
- def add_internal_client(self):
- cconn = InternalDumpClient()
- self.clients[cconn] = {}
+ # Client registration
+ def add_client(self, client_cb):
+ self.client_cbs.append(client_cb)
self._start()
- return cconn
# Webhooks registration
def _add_api_client(self, web_request):
- cconn = web_request.get_client_connection()
- template = web_request.get_dict('response_template', {})
- self.clients[cconn] = template
- self._start()
+ whbatch = BatchWebhooksClient(web_request)
+ self.add_client(whbatch.handle_batch)
web_request.send(self.webhooks_start_resp)
def add_mux_endpoint(self, path, key, value, webhooks_start_resp):
self.webhooks_start_resp = webhooks_start_resp
wh = self.printer.lookup_object('webhooks')
wh.register_mux_endpoint(path, key, value, self._add_api_client)
-# An "internal webhooks" wrapper for using BatchBulkHelper internally
-class InternalDumpClient:
- def __init__(self):
- self.msgs = []
- self.is_done = False
- def get_messages(self):
- return self.msgs
- def finalize(self):
- self.is_done = True
- def is_closed(self):
- return self.is_done
- def send(self, msg):
- self.msgs.append(msg)
- if len(self.msgs) >= 10000:
- # Avoid filling up memory with too many samples
- self.finalize()
+# A webhooks wrapper for use by BatchBulkHelper
+class BatchWebhooksClient:
+ def __init__(self, web_request):
+ self.cconn = web_request.get_client_connection()
+ self.template = web_request.get_dict('response_template', {})
+ def handle_batch(self, msg):
+ if self.cconn.is_closed():
+ return False
+ tmp = dict(self.template)
+ tmp['params'] = msg
+ self.cconn.send(tmp)
+ return True
# Helper class to store incoming messages in a queue
class BulkDataQueue:
diff --git a/klippy/extras/lis2dw.py b/klippy/extras/lis2dw.py
index 739c3641..28591c21 100644
--- a/klippy/extras/lis2dw.py
+++ b/klippy/extras/lis2dw.py
@@ -97,8 +97,9 @@ class LIS2DW:
"(e.g. faulty wiring) or a faulty lis2dw chip." % (
reg, val, stored_val))
def start_internal_client(self):
- cconn = self.bulk_batch.add_internal_client()
- return adxl345.AccelQueryHelper(self.printer, cconn)
+ aqh = adxl345.AccelQueryHelper(self.printer)
+ self.batch_bulk.add_client(aqh.handle_batch)
+ return aqh
# Measurement decoding
def _extract_samples(self, raw_samples):
# Load variables to optimize inner loop below
diff --git a/klippy/extras/mpu9250.py b/klippy/extras/mpu9250.py
index 82438ca0..c975f989 100644
--- a/klippy/extras/mpu9250.py
+++ b/klippy/extras/mpu9250.py
@@ -109,8 +109,9 @@ class MPU9250:
def set_reg(self, reg, val, minclock=0):
self.i2c.i2c_write([reg, val & 0xFF], minclock=minclock)
def start_internal_client(self):
- cconn = self.batch_bulk.add_internal_client()
- return adxl345.AccelQueryHelper(self.printer, cconn)
+ aqh = adxl345.AccelQueryHelper(self.printer)
+ self.batch_bulk.add_client(aqh.handle_batch)
+ return aqh
# Measurement decoding
def _extract_samples(self, raw_samples):
# Load variables to optimize inner loop below