aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/motan
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2025-08-06 17:50:53 +0100
committerTomasz Kramkowski <tomasz@kramkow.ski>2025-08-06 17:50:53 +0100
commit581208b2ffeeb2a2128aee0741fa3fd9e46358e2 (patch)
treef0d7e0e5cb574b235a5495b0f40f79e3ded7aa4d /scripts/motan
parente1176e4dfb9018e712d4fa86daf41e9e762a1698 (diff)
downloadkutter-581208b2ffeeb2a2128aee0741fa3fd9e46358e2.tar.gz
kutter-581208b2ffeeb2a2128aee0741fa3fd9e46358e2.tar.xz
kutter-581208b2ffeeb2a2128aee0741fa3fd9e46358e2.zip
Run black on all first party python code
Diffstat (limited to 'scripts/motan')
-rw-r--r--scripts/motan/analyzers.py225
-rwxr-xr-xscripts/motan/data_logger.py88
-rwxr-xr-xscripts/motan/motan_graph.py73
-rw-r--r--scripts/motan/readlog.py413
4 files changed, 501 insertions, 298 deletions
diff --git a/scripts/motan/analyzers.py b/scripts/motan/analyzers.py
index 2796362f..917cb032 100644
--- a/scripts/motan/analyzers.py
+++ b/scripts/motan/analyzers.py
@@ -14,49 +14,57 @@ import readlog
# Analyzer handlers: {name: class, ...}
AHandlers = {}
+
# Calculate a derivative (position to velocity, or velocity to accel)
class GenDerivative:
ParametersMin = ParametersMax = 1
DataSets = [
- ('derivative(<dataset>)', 'Derivative of the given dataset'),
+ ("derivative(<dataset>)", "Derivative of the given dataset"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source = name_parts[1]
amanager.setup_dataset(self.source)
+
def get_label(self):
label = self.amanager.get_label(self.source)
- lname = label['label']
- units = label['units']
- if '(mm)' in units:
- rep = [('Position', 'Velocity'), ('(mm)', '(mm/s)')]
- elif '(mm/s)' in units:
- rep = [('Velocity', 'Acceleration'), ('(mm/s)', '(mm/s^2)')]
+ lname = label["label"]
+ units = label["units"]
+ if "(mm)" in units:
+ rep = [("Position", "Velocity"), ("(mm)", "(mm/s)")]
+ elif "(mm/s)" in units:
+ rep = [("Velocity", "Acceleration"), ("(mm/s)", "(mm/s^2)")]
else:
- return {'label': 'Derivative', 'units': 'Unknown'}
+ return {"label": "Derivative", "units": "Unknown"}
for old, new in rep:
lname = lname.replace(old, new).replace(old.lower(), new.lower())
units = units.replace(old, new).replace(old.lower(), new.lower())
- return {'label': lname, 'units': units}
+ return {"label": lname, "units": units}
+
def generate_data(self):
- inv_seg_time = 1. / self.amanager.get_segment_time()
+ inv_seg_time = 1.0 / self.amanager.get_segment_time()
data = self.amanager.get_datasets()[self.source]
- deriv = [(data[i+1] - data[i]) * inv_seg_time
- for i in range(len(data)-1)]
+ deriv = [(data[i + 1] - data[i]) * inv_seg_time for i in range(len(data) - 1)]
return [deriv[0]] + deriv
+
+
AHandlers["derivative"] = GenDerivative
+
# Calculate an integral (accel to velocity, or velocity to position)
class GenIntegral:
ParametersMin = 1
ParametersMax = 3
DataSets = [
- ('integral(<dataset>)', 'Integral of the given dataset'),
- ('integral(<dataset1>,<dataset2>)',
- 'Integral with dataset2 as reference'),
- ('integral(<dataset1>,<dataset2>,<half_life>)',
- 'Integral with weighted half-life time'),
+ ("integral(<dataset>)", "Integral of the given dataset"),
+ ("integral(<dataset1>,<dataset2>)", "Integral with dataset2 as reference"),
+ (
+ "integral(<dataset1>,<dataset2>,<half_life>)",
+ "Integral with weighted half-life time",
+ ),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source = name_parts[1]
@@ -68,54 +76,58 @@ class GenIntegral:
amanager.setup_dataset(self.ref)
if len(name_parts) == 4:
self.half_life = float(name_parts[3])
+
def get_label(self):
label = self.amanager.get_label(self.source)
- lname = label['label']
- units = label['units']
- if '(mm/s)' in units:
- rep = [('Velocity', 'Position'), ('(mm/s)', '(mm)')]
- elif '(mm/s^2)' in units:
- rep = [('Acceleration', 'Velocity'), ('(mm/s^2)', '(mm/s)')]
+ lname = label["label"]
+ units = label["units"]
+ if "(mm/s)" in units:
+ rep = [("Velocity", "Position"), ("(mm/s)", "(mm)")]
+ elif "(mm/s^2)" in units:
+ rep = [("Acceleration", "Velocity"), ("(mm/s^2)", "(mm/s)")]
else:
- return {'label': 'Integral', 'units': 'Unknown'}
+ return {"label": "Integral", "units": "Unknown"}
for old, new in rep:
lname = lname.replace(old, new).replace(old.lower(), new.lower())
units = units.replace(old, new).replace(old.lower(), new.lower())
- return {'label': lname, 'units': units}
+ return {"label": lname, "units": units}
+
def generate_data(self):
seg_time = self.amanager.get_segment_time()
src = self.amanager.get_datasets()[self.source]
offset = sum(src) / len(src)
- total = 0.
+ total = 0.0
ref = None
if self.ref is not None:
ref = self.amanager.get_datasets()[self.ref]
offset -= (ref[-1] - ref[0]) / (len(src) * seg_time)
total = ref[0]
- src_weight = 1.
+ src_weight = 1.0
if self.half_life:
- src_weight = math.exp(math.log(.5) * seg_time / self.half_life)
- ref_weight = 1. - src_weight
- data = [0.] * len(src)
+ src_weight = math.exp(math.log(0.5) * seg_time / self.half_life)
+ ref_weight = 1.0 - src_weight
+ data = [0.0] * len(src)
for i, v in enumerate(src):
total += (v - offset) * seg_time
if ref is not None:
total = src_weight * total + ref_weight * ref[i]
data[i] = total
return data
+
+
AHandlers["integral"] = GenIntegral
+
# Calculate a pointwise 2-norm of several datasets (e.g. compute velocity or
# accel from its x, y,... components)
class GenNorm2:
ParametersMin = 2
ParametersMax = 3
DataSets = [
- ('norm2(<dataset1>,<dataset2>)',
- 'pointwise 2-norm of dataset1 and dataset2'),
- ('norm2(<dataset1>,<dataset2>,<dataset3>)',
- 'pointwise 2-norm of 3 datasets'),
+ ("norm2(<dataset1>,<dataset2>)", "pointwise 2-norm of dataset1 and dataset2"),
+ ("norm2(<dataset1>,<dataset2>,<dataset3>)", "pointwise 2-norm of 3 datasets"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.datasets = []
@@ -125,48 +137,56 @@ class GenNorm2:
self.datasets.append(name_parts[3])
for dataset in self.datasets:
amanager.setup_dataset(dataset)
+
def get_label(self):
label = self.amanager.get_label(self.datasets[0])
- units = label['units']
- datas = ['position', 'velocity', 'acceleration']
- data_name = ''
+ units = label["units"]
+ datas = ["position", "velocity", "acceleration"]
+ data_name = ""
for d in datas:
- if d in label['label']:
+ if d in label["label"]:
data_name = d
break
- lname = ''
+ lname = ""
for d in self.datasets:
- l = self.amanager.get_label(d)['label']
+ l = self.amanager.get_label(d)["label"]
for r in datas:
- l = l.replace(r, '').strip()
+ l = l.replace(r, "").strip()
if lname:
- lname += '+'
+ lname += "+"
lname += l
- lname += ' ' + data_name + ' norm2'
- return {'label': lname, 'units': units}
+ lname += " " + data_name + " norm2"
+ return {"label": lname, "units": units}
+
def generate_data(self):
seg_time = self.amanager.get_segment_time()
data = []
for dataset in self.datasets:
data.append(self.amanager.get_datasets()[dataset])
- res = [0.] * len(data[0])
+ res = [0.0] * len(data[0])
for i in range(len(data[0])):
- norm2 = 0.
+ norm2 = 0.0
for dataset in data:
norm2 += dataset[i] * dataset[i]
res[i] = math.sqrt(norm2)
return res
+
+
AHandlers["norm2"] = GenNorm2
+
class GenSmoothed:
ParametersMin = 1
ParametersMax = 2
DataSets = [
- ('smooth(<dataset>)', 'Generate moving weighted average of a dataset'),
- ('smooth(<dataset>,<smooth_time>)',
- 'Generate moving weighted average of a dataset with a given'
- ' smoothing time that defines the window size'),
+ ("smooth(<dataset>)", "Generate moving weighted average of a dataset"),
+ (
+ "smooth(<dataset>,<smooth_time>)",
+ "Generate moving weighted average of a dataset with a given"
+ " smoothing time that defines the window size",
+ ),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source = name_parts[1]
@@ -174,125 +194,152 @@ class GenSmoothed:
self.smooth_time = 0.01
if len(name_parts) > 2:
self.smooth_time = float(name_parts[2])
+
def get_label(self):
label = self.amanager.get_label(self.source)
- return {'label': 'Smoothed ' + label['label'], 'units': label['units']}
+ return {"label": "Smoothed " + label["label"], "units": label["units"]}
+
def generate_data(self):
seg_time = self.amanager.get_segment_time()
src = self.amanager.get_datasets()[self.source]
n = len(src)
- data = [0.] * n
+ data = [0.0] * n
hst = 0.5 * self.smooth_time
seg_half_len = round(hst / seg_time)
- inv_norm = 1. / sum([min(k + 1, seg_half_len + seg_half_len - k)
- for k in range(2 * seg_half_len)])
+ inv_norm = 1.0 / sum(
+ [
+ min(k + 1, seg_half_len + seg_half_len - k)
+ for k in range(2 * seg_half_len)
+ ]
+ )
for i in range(n):
j = max(0, i - seg_half_len)
je = min(n, i + seg_half_len)
- avg_val = 0.
+ avg_val = 0.0
for k, v in enumerate(src[j:je]):
avg_val += v * min(k + 1, seg_half_len + seg_half_len - k)
data[i] = avg_val * inv_norm
return data
+
+
AHandlers["smooth"] = GenSmoothed
+
# Calculate a kinematic stepper position from the toolhead requested position
class GenKinematicPosition:
ParametersMin = ParametersMax = 1
DataSets = [
- ('kin(<stepper>)', 'Stepper position derived from toolhead kinematics'),
+ ("kin(<stepper>)", "Stepper position derived from toolhead kinematics"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
stepper = name_parts[1]
status = self.amanager.get_initial_status()
- kin = status['configfile']['settings']['printer']['kinematics']
- if kin not in ['cartesian', 'corexy']:
+ kin = status["configfile"]["settings"]["printer"]["kinematics"]
+ if kin not in ["cartesian", "corexy"]:
raise amanager.error("Unsupported kinematics '%s'" % (kin,))
- if stepper not in ['stepper_x', 'stepper_y', 'stepper_z']:
+ if stepper not in ["stepper_x", "stepper_y", "stepper_z"]:
raise amanager.error("Unknown stepper '%s'" % (stepper,))
- if kin == 'corexy' and stepper in ['stepper_x', 'stepper_y']:
- self.source1 = 'trapq(toolhead,x)'
- self.source2 = 'trapq(toolhead,y)'
- if stepper == 'stepper_x':
+ if kin == "corexy" and stepper in ["stepper_x", "stepper_y"]:
+ self.source1 = "trapq(toolhead,x)"
+ self.source2 = "trapq(toolhead,y)"
+ if stepper == "stepper_x":
self.generate_data = self.generate_data_corexy_plus
else:
self.generate_data = self.generate_data_corexy_minus
amanager.setup_dataset(self.source1)
amanager.setup_dataset(self.source2)
else:
- self.source1 = 'trapq(toolhead,%s)' % (stepper[-1:],)
+ self.source1 = "trapq(toolhead,%s)" % (stepper[-1:],)
self.source2 = None
self.generate_data = self.generate_data_passthrough
amanager.setup_dataset(self.source1)
+
def get_label(self):
- return {'label': 'Position', 'units': 'Position\n(mm)'}
+ return {"label": "Position", "units": "Position\n(mm)"}
+
def generate_data_corexy_plus(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
return [d1 + d2 for d1, d2 in zip(data1, data2)]
+
def generate_data_corexy_minus(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
return [d1 - d2 for d1, d2 in zip(data1, data2)]
+
def generate_data_passthrough(self):
return self.amanager.get_datasets()[self.source1]
+
+
AHandlers["kin"] = GenKinematicPosition
+
# Calculate a toolhead x/y position from corexy stepper positions
class GenCorexyPosition:
ParametersMin = ParametersMax = 3
DataSets = [
- ('corexy(x,<stepper>,<stepper>)', 'Toolhead x position from steppers'),
- ('corexy(y,<stepper>,<stepper>)', 'Toolhead y position from steppers'),
+ ("corexy(x,<stepper>,<stepper>)", "Toolhead x position from steppers"),
+ ("corexy(y,<stepper>,<stepper>)", "Toolhead y position from steppers"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
- self.is_plus = name_parts[1] == 'x'
+ self.is_plus = name_parts[1] == "x"
self.source1, self.source2 = name_parts[2:]
amanager.setup_dataset(self.source1)
amanager.setup_dataset(self.source2)
+
def get_label(self):
- axis = 'x'
+ axis = "x"
if not self.is_plus:
- axis = 'y'
- return {'label': 'Derived %s position' % (axis,),
- 'units': 'Position\n(mm)'}
+ axis = "y"
+ return {"label": "Derived %s position" % (axis,), "units": "Position\n(mm)"}
+
def generate_data(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
if self.is_plus:
- return [.5 * (d1 + d2) for d1, d2 in zip(data1, data2)]
- return [.5 * (d1 - d2) for d1, d2 in zip(data1, data2)]
+ return [0.5 * (d1 + d2) for d1, d2 in zip(data1, data2)]
+ return [0.5 * (d1 - d2) for d1, d2 in zip(data1, data2)]
+
+
AHandlers["corexy"] = GenCorexyPosition
+
# Calculate a position deviation
class GenDeviation:
ParametersMin = ParametersMax = 2
DataSets = [
- ('deviation(<dataset1>,<dataset2>)', 'Difference between datasets'),
+ ("deviation(<dataset1>,<dataset2>)", "Difference between datasets"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source1, self.source2 = name_parts[1:]
amanager.setup_dataset(self.source1)
amanager.setup_dataset(self.source2)
+
def get_label(self):
label1 = self.amanager.get_label(self.source1)
label2 = self.amanager.get_label(self.source2)
- if label1['units'] != label2['units']:
- return {'label': 'Deviation', 'units': 'Unknown'}
- parts = label1['units'].split('\n')
- units = '\n'.join([parts[0]] + ['Deviation'] + parts[1:])
- return {'label': label1['label'] + ' deviation', 'units': units}
+ if label1["units"] != label2["units"]:
+ return {"label": "Deviation", "units": "Unknown"}
+ parts = label1["units"].split("\n")
+ units = "\n".join([parts[0]] + ["Deviation"] + parts[1:])
+ return {"label": label1["label"] + " deviation", "units": units}
+
def generate_data(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
return [d1 - d2 for d1, d2 in zip(data1, data2)]
+
+
AHandlers["deviation"] = GenDeviation
@@ -300,6 +347,7 @@ AHandlers["deviation"] = GenDeviation
# Analyzer management and data generation
######################################################################
+
# Return a description of available analyzers
def list_datasets():
datasets = []
@@ -307,9 +355,11 @@ def list_datasets():
datasets += AHandlers[ah].DataSets
return datasets
+
# Manage raw and generated data samples
class AnalyzerManager:
error = None
+
def __init__(self, lmanager, segment_time):
self.lmanager = lmanager
self.error = lmanager.error
@@ -318,17 +368,23 @@ class AnalyzerManager:
self.gen_datasets = collections.OrderedDict()
self.datasets = {}
self.dataset_times = []
- self.duration = 5.
+ self.duration = 5.0
+
def set_duration(self, duration):
self.duration = duration
+
def get_segment_time(self):
return self.segment_time
+
def get_datasets(self):
return self.datasets
+
def get_dataset_times(self):
return self.dataset_times
+
def get_initial_status(self):
return self.lmanager.get_initial_status()
+
def setup_dataset(self, name):
name = name.strip()
if name in self.raw_datasets:
@@ -350,6 +406,7 @@ class AnalyzerManager:
self.gen_datasets[name] = hdl
self.datasets[name] = []
return hdl
+
def get_label(self, dataset):
hdl = self.raw_datasets.get(dataset)
if hdl is None:
@@ -357,10 +414,12 @@ class AnalyzerManager:
if hdl is None:
raise self.error("Unknown dataset '%s'" % (dataset,))
return hdl.get_label()
+
def generate_datasets(self):
# Generate raw data
- list_hdls = [(self.datasets[name], hdl)
- for name, hdl in self.raw_datasets.items()]
+ list_hdls = [
+ (self.datasets[name], hdl) for name, hdl in self.raw_datasets.items()
+ ]
initial_start_time = self.lmanager.get_initial_start_time()
start_time = t = self.lmanager.get_start_time()
end_time = start_time + self.duration
diff --git a/scripts/motan/data_logger.py b/scripts/motan/data_logger.py
index fd4de7a5..00023c2f 100755
--- a/scripts/motan/data_logger.py
+++ b/scripts/motan/data_logger.py
@@ -7,7 +7,8 @@
import sys, os, optparse, socket, select, json, errno, time, zlib
INDEX_UPDATE_TIME = 5.0
-ClientInfo = {'program': 'motan_data_logger', 'version': 'v0.1'}
+ClientInfo = {"program": "motan_data_logger", "version": "v0.1"}
+
def webhook_socket_create(uds_filename):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -20,25 +21,28 @@ def webhook_socket_create(uds_filename):
if e.errno == errno.ECONNREFUSED:
time.sleep(0.1)
continue
- sys.stderr.write("Unable to connect socket %s [%d,%s]\n"
- % (uds_filename, e.errno,
- errno.errorcode[e.errno]))
+ sys.stderr.write(
+ "Unable to connect socket %s [%d,%s]\n"
+ % (uds_filename, e.errno, errno.errorcode[e.errno])
+ )
sys.exit(-1)
break
sys.stderr.write("Connection.\n")
return sock
+
class LogWriter:
def __init__(self, filename):
self.file = open(filename, "wb")
- self.comp = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED, 31)
+ self.comp = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, 31)
self.raw_pos = self.file_pos = 0
+
def add_data(self, data):
d = self.comp.compress(data + b"\x03")
self.file.write(d)
self.file_pos += len(d)
self.raw_pos += len(data) + 1
+
def flush(self, flag=zlib.Z_FULL_FLUSH):
if not self.raw_pos:
return self.file_pos
@@ -46,12 +50,14 @@ class LogWriter:
self.file.write(d)
self.file_pos += len(d)
return self.file_pos
+
def close(self):
self.flush(zlib.Z_FINISH)
self.file.close()
self.file = None
self.comp = None
+
class DataLogger:
def __init__(self, uds_filename, log_prefix):
# IO
@@ -67,23 +73,26 @@ class DataLogger:
self.async_handlers = {}
# get_status databasing
self.db = {}
- self.next_index_time = 0.
+ self.next_index_time = 0.0
# Start login process
- self.send_query("info", "info", {"client_info": ClientInfo},
- self.handle_info)
+ self.send_query("info", "info", {"client_info": ClientInfo}, self.handle_info)
+
def error(self, msg):
sys.stderr.write(msg + "\n")
+
def finish(self, msg):
self.error(msg)
self.logger.close()
self.index.close()
sys.exit(0)
+
# Unix Domain Socket IO
def send_query(self, msg_id, method, params, cb):
self.query_handlers[msg_id] = cb
msg = {"id": msg_id, "method": method, "params": params}
- cm = json.dumps(msg, separators=(',', ':')).encode()
+ cm = json.dumps(msg, separators=(",", ":")).encode()
self.webhook_socket.send(cm + b"\x03")
+
def process_socket(self):
data = self.webhook_socket.recv(4096)
if not data:
@@ -113,15 +122,17 @@ class DataLogger:
self.flush_index()
continue
self.error("ERROR: Message with unknown id")
+
def run(self):
try:
while 1:
- res = self.poll.poll(1000.)
+ res = self.poll.poll(1000.0)
for fd, event in res:
if fd == self.webhook_socket.fileno():
self.process_socket()
except KeyboardInterrupt as e:
self.finish("Keyboard Interrupt")
+
# Query response handlers
def send_subscribe(self, msg_id, method, params, cb=None, async_cb=None):
if cb is None:
@@ -130,14 +141,22 @@ class DataLogger:
self.async_handlers[msg_id] = async_cb
params["response_template"] = {"q": msg_id}
self.send_query(msg_id, method, params, cb)
+
def handle_info(self, msg, raw_msg):
if msg["result"]["state"] != "ready":
self.finish("Klipper not in ready state")
self.send_query("list", "objects/list", {}, self.handle_list)
+
def handle_list(self, msg, raw_msg):
subreq = {o: None for o in msg["result"]["objects"]}
- self.send_subscribe("status", "objects/subscribe", {"objects": subreq},
- self.handle_subscribe, self.handle_async_db)
+ self.send_subscribe(
+ "status",
+ "objects/subscribe",
+ {"objects": subreq},
+ self.handle_subscribe,
+ self.handle_async_db,
+ )
+
def handle_subscribe(self, msg, raw_msg):
result = msg["result"]
self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME
@@ -145,15 +164,17 @@ class DataLogger:
# Subscribe to trapq and stepper queue updates
motion_report = status.get("motion_report", {})
for trapq in motion_report.get("trapq", []):
- self.send_subscribe("trapq:" + trapq, "motion_report/dump_trapq",
- {"name": trapq})
+ self.send_subscribe(
+ "trapq:" + trapq, "motion_report/dump_trapq", {"name": trapq}
+ )
for stepper in motion_report.get("steppers", []):
- self.send_subscribe("stepq:" + stepper,
- "motion_report/dump_stepper", {"name": stepper})
+ self.send_subscribe(
+ "stepq:" + stepper, "motion_report/dump_stepper", {"name": stepper}
+ )
# Subscribe to additional sensor data
stypes = ["adxl345", "lis2dw", "mpu9250", "angle"]
- stypes = {st:st for st in stypes}
- stypes['probe_eddy_current'] = 'ldc1612'
+ stypes = {st: st for st in stypes}
+ stypes["probe_eddy_current"] = "ldc1612"
config = status["configfile"]["settings"]
for cfgname in config.keys():
for capprefix, st in sorted(stypes.items()):
@@ -163,30 +184,37 @@ class DataLogger:
qcmd = "%s/dump_%s" % (st, st)
self.send_subscribe(lname, qcmd, {"sensor": aname})
if cfgname.startswith("tmc"):
- driver = ' '.join(cfgname.split()[1:])
- self.send_subscribe("stallguard:" + driver,
- "tmc/stallguard_dump", {"name": driver})
+ driver = " ".join(cfgname.split()[1:])
+ self.send_subscribe(
+ "stallguard:" + driver, "tmc/stallguard_dump", {"name": driver}
+ )
+
def handle_dump(self, msg, raw_msg):
msg_id = msg["id"]
if "result" not in msg:
- self.error("Unable to subscribe to '%s': %s"
- % (msg_id, msg.get("error", {}).get("message", "")))
+ self.error(
+ "Unable to subscribe to '%s': %s"
+ % (msg_id, msg.get("error", {}).get("message", ""))
+ )
return
self.db.setdefault("subscriptions", {})[msg_id] = msg["result"]
+
def flush_index(self):
- self.db['file_position'] = self.logger.flush()
- self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode())
+ self.db["file_position"] = self.logger.flush()
+ self.index.add_data(json.dumps(self.db, separators=(",", ":")).encode())
self.db = {"status": {}}
+
def handle_async_db(self, msg, raw_msg):
params = msg["params"]
- db_status = self.db['status']
+ db_status = self.db["status"]
for k, v in params.get("status", {}).items():
db_status.setdefault(k, {}).update(v)
- eventtime = params['eventtime']
+ eventtime = params["eventtime"]
if eventtime >= self.next_index_time:
self.next_index_time = eventtime + INDEX_UPDATE_TIME
self.flush_index()
+
def nice():
try:
# Try to re-nice writing process
@@ -194,6 +222,7 @@ def nice():
except:
pass
+
def main():
usage = "%prog [options] <socket filename> <log name>"
opts = optparse.OptionParser(usage)
@@ -205,5 +234,6 @@ def main():
dl = DataLogger(args[0], args[1])
dl.run()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()
diff --git a/scripts/motan/motan_graph.py b/scripts/motan/motan_graph.py
index fc1dee17..8d551340 100755
--- a/scripts/motan/motan_graph.py
+++ b/scripts/motan/motan_graph.py
@@ -7,6 +7,7 @@
import sys, optparse, ast
import matplotlib
import readlog, analyzers
+
try:
import urlparse
except:
@@ -17,6 +18,7 @@ except:
# Graphing
######################################################################
+
def plot_motion(amanager, graphs, log_prefix):
# Generate data
for graph in graphs:
@@ -27,7 +29,7 @@ def plot_motion(amanager, graphs, log_prefix):
times = amanager.get_dataset_times()
# Build plot
fontP = matplotlib.font_manager.FontProperties()
- fontP.set_size('x-small')
+ fontP.set_size("x-small")
fig, rows = matplotlib.pyplot.subplots(nrows=len(graphs), sharex=True)
if len(graphs) == 1:
rows = [rows]
@@ -38,29 +40,29 @@ def plot_motion(amanager, graphs, log_prefix):
label = amanager.get_label(dataset)
ax = graph_ax
if graph_units is None:
- graph_units = label['units']
+ graph_units = label["units"]
ax.set_ylabel(graph_units)
- elif label['units'] != graph_units:
+ elif label["units"] != graph_units:
if graph_twin_units is None:
ax = twin_ax = graph_ax.twinx()
- graph_twin_units = label['units']
+ graph_twin_units = label["units"]
ax.set_ylabel(graph_twin_units)
- elif label['units'] == graph_twin_units:
+ elif label["units"] == graph_twin_units:
ax = twin_ax
else:
graph_units = "Unknown"
ax.set_ylabel(graph_units)
- pparams = {'label': label['label'], 'alpha': 0.8}
+ pparams = {"label": label["label"], "alpha": 0.8}
pparams.update(plot_params)
ax.plot(times, datasets[dataset], **pparams)
if twin_ax is not None:
li1, la1 = graph_ax.get_legend_handles_labels()
li2, la2 = twin_ax.get_legend_handles_labels()
- twin_ax.legend(li1 + li2, la1 + la2, loc='best', prop=fontP)
+ twin_ax.legend(li1 + li2, la1 + la2, loc="best", prop=fontP)
else:
- graph_ax.legend(loc='best', prop=fontP)
+ graph_ax.legend(loc="best", prop=fontP)
graph_ax.grid(True)
- rows[-1].set_xlabel('Time (s)')
+ rows[-1].set_xlabel("Time (s)")
return fig
@@ -68,23 +70,26 @@ def plot_motion(amanager, graphs, log_prefix):
# Startup
######################################################################
+
def setup_matplotlib(output_to_file):
global matplotlib
if output_to_file:
- matplotlib.use('Agg')
+ matplotlib.use("Agg")
import matplotlib.pyplot, matplotlib.dates, matplotlib.font_manager
import matplotlib.ticker
+
def parse_graph_description(desc):
- if '?' not in desc:
+ if "?" not in desc:
return (desc, {})
- dataset, params = desc.split('?', 1)
+ dataset, params = desc.split("?", 1)
params = {k: v for k, v in urlparse.parse_qsl(params)}
- for fkey in ['alpha']:
+ for fkey in ["alpha"]:
if fkey in params:
params[fkey] = float(params[fkey])
return (dataset, params)
+
def list_datasets():
datasets = readlog.list_datasets() + analyzers.list_datasets()
out = ["\nAvailable datasets:\n"]
@@ -94,21 +99,35 @@ def list_datasets():
sys.stdout.write("".join(out))
sys.exit(0)
+
def main():
# Parse command-line arguments
usage = "%prog [options] <logname>"
opts = optparse.OptionParser(usage)
- opts.add_option("-o", "--output", type="string", dest="output",
- default=None, help="filename of output graph")
- opts.add_option("-s", "--skip", type="float", default=0.,
- help="Set the start time to graph")
- opts.add_option("-d", "--duration", type="float", default=5.,
- help="Number of seconds to graph")
- opts.add_option("--segment-time", type="float", default=0.000100,
- help="Analysis segment time (default 0.000100 seconds)")
+ opts.add_option(
+ "-o",
+ "--output",
+ type="string",
+ dest="output",
+ default=None,
+ help="filename of output graph",
+ )
+ opts.add_option(
+ "-s", "--skip", type="float", default=0.0, help="Set the start time to graph"
+ )
+ opts.add_option(
+ "-d", "--duration", type="float", default=5.0, help="Number of seconds to graph"
+ )
+ opts.add_option(
+ "--segment-time",
+ type="float",
+ default=0.000100,
+ help="Analysis segment time (default 0.000100 seconds)",
+ )
opts.add_option("-g", "--graph", help="Graph to generate (python literal)")
- opts.add_option("-l", "--list-datasets", action="store_true",
- help="List available datasets")
+ opts.add_option(
+ "-l", "--list-datasets", action="store_true", help="List available datasets"
+ )
options, args = opts.parse_args()
if options.list_datasets:
list_datasets()
@@ -131,8 +150,9 @@ def main():
]
if options.graph is not None:
graph_descs = ast.literal_eval(options.graph)
- graphs = [[parse_graph_description(g) for g in graph_row]
- for graph_row in graph_descs]
+ graphs = [
+ [parse_graph_description(g) for g in graph_row] for graph_row in graph_descs
+ ]
# Draw graph
setup_matplotlib(options.output is not None)
@@ -145,5 +165,6 @@ def main():
fig.set_size_inches(8, 6)
fig.savefig(options.output)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()
diff --git a/scripts/motan/readlog.py b/scripts/motan/readlog.py
index 43c01619..8f86d4e2 100644
--- a/scripts/motan/readlog.py
+++ b/scripts/motan/readlog.py
@@ -5,6 +5,7 @@
# This file may be distributed under the terms of the GNU GPLv3 license.
import json, zlib
+
class error(Exception):
pass
@@ -16,81 +17,98 @@ class error(Exception):
# Log data handlers: {name: class, ...}
LogHandlers = {}
+
# Extract status fields from log
class HandleStatusField:
SubscriptionIdParts = 0
ParametersMin = ParametersMax = 1
DataSets = [
- ('status(<field>)', 'A get_status field name (separate by periods)'),
+ ("status(<field>)", "A get_status field name (separate by periods)"),
]
+
def __init__(self, lmanager, name, name_parts):
self.status_tracker = lmanager.get_status_tracker()
self.field_name = name_parts[1]
- self.field_parts = name_parts[1].split('.')
- self.next_update_time = 0.
+ self.field_parts = name_parts[1].split(".")
+ self.next_update_time = 0.0
self.result = None
+
def get_label(self):
- label = '%s field' % (self.field_name,)
- return {'label': label, 'units': 'Unknown'}
+ label = "%s field" % (self.field_name,)
+ return {"label": label, "units": "Unknown"}
+
def pull_data(self, req_time):
if req_time < self.next_update_time:
return self.result
db, next_update_time = self.status_tracker.pull_status(req_time)
for fp in self.field_parts[:-1]:
db = db.get(fp, {})
- self.result = db.get(self.field_parts[-1], 0.)
+ self.result = db.get(self.field_parts[-1], 0.0)
self.next_update_time = next_update_time
return self.result
+
+
LogHandlers["status"] = HandleStatusField
+
# Extract requested position, velocity, and accel from a trapq log
class HandleTrapQ:
SubscriptionIdParts = 2
ParametersMin = ParametersMax = 2
DataSets = [
- ('trapq(<name>,velocity)', 'Requested velocity for the given trapq'),
- ('trapq(<name>,accel)', 'Requested acceleration for the given trapq'),
- ('trapq(<name>,<axis>)', 'Requested axis (x, y, or z) position'),
- ('trapq(<name>,<axis>_velocity)', 'Requested axis velocity'),
- ('trapq(<name>,<axis>_accel)', 'Requested axis acceleration'),
+ ("trapq(<name>,velocity)", "Requested velocity for the given trapq"),
+ ("trapq(<name>,accel)", "Requested acceleration for the given trapq"),
+ ("trapq(<name>,<axis>)", "Requested axis (x, y, or z) position"),
+ ("trapq(<name>,<axis>_velocity)", "Requested axis velocity"),
+ ("trapq(<name>,<axis>_accel)", "Requested axis acceleration"),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.jdispatch = lmanager.get_jdispatch()
- self.cur_data = [(0., 0., 0., 0., (0., 0., 0.), (0., 0., 0.))]
+ self.cur_data = [(0.0, 0.0, 0.0, 0.0, (0.0, 0.0, 0.0), (0.0, 0.0, 0.0))]
self.data_pos = 0
tq, trapq_name, datasel = name_parts
ptypes = {}
- ptypes['velocity'] = {
- 'label': '%s velocity' % (trapq_name,),
- 'units': 'Velocity\n(mm/s)', 'func': self._pull_velocity
+ ptypes["velocity"] = {
+ "label": "%s velocity" % (trapq_name,),
+ "units": "Velocity\n(mm/s)",
+ "func": self._pull_velocity,
}
- ptypes['accel'] = {
- 'label': '%s acceleration' % (trapq_name,),
- 'units': 'Acceleration\n(mm/s^2)', 'func': self._pull_accel
+ ptypes["accel"] = {
+ "label": "%s acceleration" % (trapq_name,),
+ "units": "Acceleration\n(mm/s^2)",
+ "func": self._pull_accel,
}
for axis, name in enumerate("xyz"):
- ptypes['%s' % (name,)] = {
- 'label': '%s %s position' % (trapq_name, name), 'axis': axis,
- 'units': 'Position\n(mm)', 'func': self._pull_axis_position
+ ptypes["%s" % (name,)] = {
+ "label": "%s %s position" % (trapq_name, name),
+ "axis": axis,
+ "units": "Position\n(mm)",
+ "func": self._pull_axis_position,
}
- ptypes['%s_velocity' % (name,)] = {
- 'label': '%s %s velocity' % (trapq_name, name), 'axis': axis,
- 'units': 'Velocity\n(mm/s)', 'func': self._pull_axis_velocity
+ ptypes["%s_velocity" % (name,)] = {
+ "label": "%s %s velocity" % (trapq_name, name),
+ "axis": axis,
+ "units": "Velocity\n(mm/s)",
+ "func": self._pull_axis_velocity,
}
- ptypes['%s_accel' % (name,)] = {
- 'label': '%s %s acceleration' % (trapq_name, name),
- 'axis': axis, 'units': 'Acceleration\n(mm/s^2)',
- 'func': self._pull_axis_accel
+ ptypes["%s_accel" % (name,)] = {
+ "label": "%s %s acceleration" % (trapq_name, name),
+ "axis": axis,
+ "units": "Acceleration\n(mm/s^2)",
+ "func": self._pull_axis_accel,
}
pinfo = ptypes.get(datasel)
if pinfo is None:
raise error("Unknown trapq data selection '%s'" % (datasel,))
- self.label = {'label': pinfo['label'], 'units': pinfo['units']}
- self.axis = pinfo.get('axis')
- self.pull_data = pinfo['func']
+ self.label = {"label": pinfo["label"], "units": pinfo["units"]}
+ self.axis = pinfo.get("axis")
+ self.pull_data = pinfo["func"]
+
def get_label(self):
return self.label
+
def _find_move(self, req_time):
data_pos = self.data_pos
while 1:
@@ -105,54 +123,63 @@ class HandleTrapQ:
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
return move, False
- self.cur_data = jmsg['data']
+ self.cur_data = jmsg["data"]
self.data_pos = data_pos = 0
+
def _pull_axis_position(self, req_time):
move, in_range = self._find_move(req_time)
print_time, move_t, start_v, accel, start_pos, axes_r = move
- mtime = max(0., min(move_t, req_time - print_time))
- dist = (start_v + .5 * accel * mtime) * mtime;
+ mtime = max(0.0, min(move_t, req_time - print_time))
+ dist = (start_v + 0.5 * accel * mtime) * mtime
return start_pos[self.axis] + axes_r[self.axis] * dist
+
def _pull_axis_velocity(self, req_time):
move, in_range = self._find_move(req_time)
if not in_range:
- return 0.
+ return 0.0
print_time, move_t, start_v, accel, start_pos, axes_r = move
return (start_v + accel * (req_time - print_time)) * axes_r[self.axis]
+
def _pull_axis_accel(self, req_time):
move, in_range = self._find_move(req_time)
if not in_range:
- return 0.
+ return 0.0
print_time, move_t, start_v, accel, start_pos, axes_r = move
return accel * axes_r[self.axis]
+
def _pull_velocity(self, req_time):
move, in_range = self._find_move(req_time)
if not in_range:
- return 0.
+ return 0.0
print_time, move_t, start_v, accel, start_pos, axes_r = move
return start_v + accel * (req_time - print_time)
+
def _pull_accel(self, req_time):
move, in_range = self._find_move(req_time)
if not in_range:
- return 0.
+ return 0.0
print_time, move_t, start_v, accel, start_pos, axes_r = move
return accel
+
+
LogHandlers["trapq"] = HandleTrapQ
+
# Extract positions from queue_step log
class HandleStepQ:
SubscriptionIdParts = 2
ParametersMin = 1
ParametersMax = 2
DataSets = [
- ('stepq(<stepper>)', 'Commanded position of the given stepper'),
- ('stepq(<stepper>,<time>)', 'Commanded position with smooth time'),
+ ("stepq(<stepper>)", "Commanded position of the given stepper"),
+ ("stepq(<stepper>,<time>)", "Commanded position with smooth time"),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.stepper_name = name_parts[1]
self.jdispatch = lmanager.get_jdispatch()
- self.step_data = [(0., 0., 0.), (0., 0., 0.)] # [(time, half_pos, pos)]
+ self.step_data = [(0.0, 0.0, 0.0), (0.0, 0.0, 0.0)] # [(time, half_pos, pos)]
self.data_pos = 0
self.smooth_time = 0.010
if len(name_parts) == 3:
@@ -160,9 +187,11 @@ class HandleStepQ:
self.smooth_time = float(name_parts[2])
except ValueError:
raise error("Invalid stepq smooth time '%s'" % (name_parts[2],))
+
def get_label(self):
- label = '%s position' % (self.stepper_name,)
- return {'label': label, 'units': 'Position\n(mm)'}
+ label = "%s position" % (self.stepper_name,)
+ return {"label": label, "units": "Position\n(mm)"}
+
def pull_data(self, req_time):
smooth_time = self.smooth_time
while 1:
@@ -183,7 +212,7 @@ class HandleStepQ:
if stime <= smooth_time:
pdiff = next_halfpos - last_halfpos
return last_halfpos + rtdiff * pdiff / stime
- stime = .5 * smooth_time
+ stime = 0.5 * smooth_time
if rtdiff < stime:
pdiff = last_pos - last_halfpos
return last_halfpos + rtdiff * pdiff / stime
@@ -192,6 +221,7 @@ class HandleStepQ:
pdiff = last_pos - next_halfpos
return next_halfpos + rtdiff * pdiff / stime
return last_pos
+
def _pull_block(self, req_time):
step_data = self.step_data
del step_data[:-1]
@@ -201,25 +231,25 @@ class HandleStepQ:
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
last_time, last_halfpos, last_pos = step_data[0]
- self.step_data.append((req_time + .1, last_pos, last_pos))
+ self.step_data.append((req_time + 0.1, last_pos, last_pos))
return
- last_time = jmsg['last_step_time']
+ last_time = jmsg["last_step_time"]
if req_time <= last_time:
break
# Process block into (time, half_position, position) 3-tuples
- first_time = step_time = jmsg['first_step_time']
- first_clock = jmsg['first_clock']
- step_clock = first_clock - jmsg['data'][0][0]
- cdiff = jmsg['last_clock'] - first_clock
+ first_time = step_time = jmsg["first_step_time"]
+ first_clock = jmsg["first_clock"]
+ step_clock = first_clock - jmsg["data"][0][0]
+ cdiff = jmsg["last_clock"] - first_clock
tdiff = last_time - first_time
- inv_freq = 0.
+ inv_freq = 0.0
if cdiff:
inv_freq = tdiff / cdiff
- step_dist = jmsg['step_distance']
- step_pos = jmsg['start_position']
+ step_dist = jmsg["step_distance"]
+ step_pos = jmsg["start_position"]
if not step_data[0][0]:
- step_data[0] = (0., step_pos, step_pos)
- for interval, raw_count, add in jmsg['data']:
+ step_data[0] = (0.0, step_pos, step_pos)
+ for interval, raw_count, add in jmsg["data"]:
qs_dist = step_dist
count = raw_count
if count < 0:
@@ -229,22 +259,30 @@ class HandleStepQ:
step_clock += interval
interval += add
step_time = first_time + (step_clock - first_clock) * inv_freq
- step_halfpos = step_pos + .5 * qs_dist
+ step_halfpos = step_pos + 0.5 * qs_dist
step_pos += qs_dist
step_data.append((step_time, step_halfpos, step_pos))
+
+
LogHandlers["stepq"] = HandleStepQ
+
# Extract tmc current and stallguard data from the log
class HandleStallguard:
SubscriptionIdParts = 2
ParametersMin = 2
ParametersMax = 2
DataSets = [
- ('stallguard(<stepper>,sg_result)',
- 'Stallguard result of the given stepper driver'),
- ('stallguard(<stepper>,cs_actual)',
- 'Current level result of the given stepper driver'),
+ (
+ "stallguard(<stepper>,sg_result)",
+ "Stallguard result of the given stepper driver",
+ ),
+ (
+ "stallguard(<stepper>,cs_actual)",
+ "Current level result of the given stepper driver",
+ ),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.stepper_name = name_parts[1]
@@ -253,7 +291,7 @@ class HandleStallguard:
self.data = []
self.ret = None
self.driver_name = ""
- for k in lmanager.get_initial_status()['configfile']['settings']:
+ for k in lmanager.get_initial_status()["configfile"]["settings"]:
if not k.startswith("tmc"):
continue
if k.endswith(self.stepper_name):
@@ -261,15 +299,16 @@ class HandleStallguard:
break
# Current decode
self.status_tracker = lmanager.get_status_tracker()
- self.next_status_time = 0.
+ self.next_status_time = 0.0
self.irun = 0
+
def get_label(self):
- label = '%s %s %s' % (self.driver_name, self.stepper_name,
- self.filter)
+ label = "%s %s %s" % (self.driver_name, self.stepper_name, self.filter)
if self.filter == "sg_result":
- return {'label': label, 'units': 'Stallguard'}
+ return {"label": label, "units": "Stallguard"}
elif self.filter == "cs_actual":
- return {'label': label, 'units': 'CS Actual'}
+ return {"label": label, "units": "CS Actual"}
+
# Search datapoint in dataset extrapolate in between
def pull_data(self, req_time):
while 1:
@@ -290,25 +329,30 @@ class HandleStallguard:
if req_time <= time:
return self.ret[self.filter]
self.ret = None
+
+
LogHandlers["stallguard"] = HandleStallguard
+
# Extract stepper motor phase position
class HandleStepPhase:
SubscriptionIdParts = 0
ParametersMin = 1
ParametersMax = 2
DataSets = [
- ('step_phase(<driver>)', 'Stepper motor phase of the given stepper'),
- ('step_phase(<driver>,microstep)', 'Microstep position for stepper'),
+ ("step_phase(<driver>)", "Stepper motor phase of the given stepper"),
+ ("step_phase(<driver>,microstep)", "Microstep position for stepper"),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.driver_name = name_parts[1]
self.stepper_name = " ".join(self.driver_name.split()[1:])
- config = lmanager.get_initial_status()['configfile']['settings']
+ config = lmanager.get_initial_status()["configfile"]["settings"]
if self.driver_name not in config or self.stepper_name not in config:
- raise error("Unable to find stepper driver '%s' config"
- % (self.driver_name,))
+ raise error(
+ "Unable to find stepper driver '%s' config" % (self.driver_name,)
+ )
if len(name_parts) == 3 and name_parts[2] != "microstep":
raise error("Unknown step_phase selection '%s'" % (name_parts[2],))
self.report_microsteps = len(name_parts) == 3
@@ -319,23 +363,28 @@ class HandleStepPhase:
self.jdispatch = lmanager.get_jdispatch()
self.jdispatch.add_handler(name, "stepq:" + self.stepper_name)
# stepq tracking
- self.step_data = [(0., 0), (0., 0)] # [(time, mcu_pos)]
+ self.step_data = [(0.0, 0), (0.0, 0)] # [(time, mcu_pos)]
self.data_pos = 0
# driver phase tracking
self.status_tracker = lmanager.get_status_tracker()
- self.next_status_time = 0.
+ self.next_status_time = 0.0
self.mcu_phase_offset = 0
+
def get_label(self):
if self.report_microsteps:
- return {'label': '%s microstep' % (self.stepper_name,),
- 'units': 'Microstep'}
- return {'label': '%s phase' % (self.stepper_name,), 'units': 'Phase'}
+ return {
+ "label": "%s microstep" % (self.stepper_name,),
+ "units": "Microstep",
+ }
+ return {"label": "%s phase" % (self.stepper_name,), "units": "Phase"}
+
def _pull_phase_offset(self, req_time):
db, self.next_status_time = self.status_tracker.pull_status(req_time)
- mcu_phase_offset = db.get(self.driver_name, {}).get('mcu_phase_offset')
+ mcu_phase_offset = db.get(self.driver_name, {}).get("mcu_phase_offset")
if mcu_phase_offset is None:
mcu_phase_offset = 0
self.mcu_phase_offset = mcu_phase_offset
+
def pull_data(self, req_time):
if req_time >= self.next_status_time:
self._pull_phase_offset(req_time)
@@ -352,6 +401,7 @@ class HandleStepPhase:
continue
step_pos = step_data[data_pos][1]
return (step_pos + self.mcu_phase_offset) % self.phases
+
def _pull_block(self, req_time):
step_data = self.step_data
del step_data[:-1]
@@ -361,24 +411,24 @@ class HandleStepPhase:
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
last_time, last_pos = step_data[0]
- self.step_data.append((req_time + .1, last_pos))
+ self.step_data.append((req_time + 0.1, last_pos))
return
- last_time = jmsg['last_step_time']
+ last_time = jmsg["last_step_time"]
if req_time <= last_time:
break
# Process block into (time, position) 2-tuples
- first_time = step_time = jmsg['first_step_time']
- first_clock = jmsg['first_clock']
- step_clock = first_clock - jmsg['data'][0][0]
- cdiff = jmsg['last_clock'] - first_clock
+ first_time = step_time = jmsg["first_step_time"]
+ first_clock = jmsg["first_clock"]
+ step_clock = first_clock - jmsg["data"][0][0]
+ cdiff = jmsg["last_clock"] - first_clock
tdiff = last_time - first_time
- inv_freq = 0.
+ inv_freq = 0.0
if cdiff:
inv_freq = tdiff / cdiff
- step_pos = jmsg['start_mcu_position']
+ step_pos = jmsg["start_mcu_position"]
if not step_data[0][0]:
- step_data[0] = (0., step_pos)
- for interval, raw_count, add in jmsg['data']:
+ step_data[0] = (0.0, step_pos)
+ for interval, raw_count, add in jmsg["data"]:
qs_dist = 1
count = raw_count
if count < 0:
@@ -390,29 +440,35 @@ class HandleStepPhase:
step_time = first_time + (step_clock - first_clock) * inv_freq
step_pos += qs_dist
step_data.append((step_time, step_pos))
+
+
LogHandlers["step_phase"] = HandleStepPhase
+
# Extract accelerometer data
class HandleADXL345:
SubscriptionIdParts = 2
ParametersMin = ParametersMax = 2
DataSets = [
- ('adxl345(<name>,<axis>)', 'Accelerometer for given axis (x, y, or z)'),
+ ("adxl345(<name>,<axis>)", "Accelerometer for given axis (x, y, or z)"),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.adxl_name = name_parts[1]
self.jdispatch = lmanager.get_jdispatch()
- self.next_accel_time = self.last_accel_time = 0.
- self.next_accel = self.last_accel = (0., 0., 0.)
+ self.next_accel_time = self.last_accel_time = 0.0
+ self.next_accel = self.last_accel = (0.0, 0.0, 0.0)
self.cur_data = []
self.data_pos = 0
- if name_parts[2] not in 'xyz':
+ if name_parts[2] not in "xyz":
raise error("Unknown adxl345 data selection '%s'" % (name,))
- self.axis = 'xyz'.index(name_parts[2])
+ self.axis = "xyz".index(name_parts[2])
+
def get_label(self):
- label = '%s %s acceleration' % (self.adxl_name, 'xyz'[self.axis])
- return {'label': label, 'units': 'Acceleration\n(mm/s^2)'}
+ label = "%s %s acceleration" % (self.adxl_name, "xyz"[self.axis])
+ return {"label": label, "units": "Acceleration\n(mm/s^2)"}
+
def pull_data(self, req_time):
axis = self.axis
while 1:
@@ -425,8 +481,8 @@ class HandleADXL345:
# Read next data block
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
- return 0.
- self.cur_data = jmsg['data']
+ return 0.0
+ self.cur_data = jmsg["data"]
self.data_pos = 0
continue
self.last_accel = self.next_accel
@@ -434,42 +490,50 @@ class HandleADXL345:
self.next_accel_time, x, y, z = self.cur_data[self.data_pos]
self.next_accel = (x, y, z)
self.data_pos += 1
+
+
LogHandlers["adxl345"] = HandleADXL345
+
# Extract positions from magnetic angle sensor
class HandleAngle:
SubscriptionIdParts = 2
ParametersMin = ParametersMax = 1
DataSets = [
- ('angle(<name>)', 'Angle sensor position'),
+ ("angle(<name>)", "Angle sensor position"),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.angle_name = name_parts[1]
self.jdispatch = lmanager.get_jdispatch()
- self.next_angle_time = self.last_angle_time = 0.
- self.next_angle = self.last_angle = 0.
+ self.next_angle_time = self.last_angle_time = 0.0
+ self.next_angle = self.last_angle = 0.0
self.cur_data = []
self.data_pos = 0
- self.position_offset = 0.
- self.angle_dist = 1.
+ self.position_offset = 0.0
+ self.angle_dist = 1.0
# Determine angle distance from associated stepper's rotation_distance
- config = lmanager.get_initial_status()['configfile']['settings']
- aname = 'angle %s' % (self.angle_name,)
- stepper_name = config.get(aname, {}).get('stepper')
+ config = lmanager.get_initial_status()["configfile"]["settings"]
+ aname = "angle %s" % (self.angle_name,)
+ stepper_name = config.get(aname, {}).get("stepper")
if stepper_name is not None:
sconfig = config.get(stepper_name, {})
- rotation_distance = sconfig.get('rotation_distance', 1.)
- gear_ratio = sconfig.get('gear_ratio', ())
- if type(gear_ratio) == str: # XXX
- gear_ratio = [[float(v.strip()) for v in gr.split(':')]
- for gr in gear_ratio.split(',')]
+ rotation_distance = sconfig.get("rotation_distance", 1.0)
+ gear_ratio = sconfig.get("gear_ratio", ())
+ if type(gear_ratio) == str: # XXX
+ gear_ratio = [
+ [float(v.strip()) for v in gr.split(":")]
+ for gr in gear_ratio.split(",")
+ ]
for n, d in gear_ratio:
rotation_distance *= d / n
- self.angle_dist = rotation_distance / 65536.
+ self.angle_dist = rotation_distance / 65536.0
+
def get_label(self):
- label = '%s position' % (self.angle_name,)
- return {'label': label, 'units': 'Position\n(mm)'}
+ label = "%s position" % (self.angle_name,)
+ return {"label": label, "units": "Position\n(mm)"}
+
def pull_data(self, req_time):
while 1:
if req_time <= self.next_angle_time:
@@ -477,16 +541,14 @@ class HandleAngle:
tdiff = self.next_angle_time - self.last_angle_time
rtdiff = req_time - self.last_angle_time
po = rtdiff * pdiff / tdiff
- return ((self.last_angle + po) * self.angle_dist
- + self.position_offset)
+ return (self.last_angle + po) * self.angle_dist + self.position_offset
if self.data_pos >= len(self.cur_data):
# Read next data block
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
- return (self.next_angle * self.angle_dist
- + self.position_offset)
- self.cur_data = jmsg['data']
- position_offset = jmsg.get('position_offset')
+ return self.next_angle * self.angle_dist + self.position_offset
+ self.cur_data = jmsg["data"]
+ position_offset = jmsg.get("position_offset")
if position_offset is not None:
self.position_offset = position_offset
self.data_pos = 0
@@ -495,24 +557,29 @@ class HandleAngle:
self.last_angle_time = self.next_angle_time
self.next_angle_time, self.next_angle = self.cur_data[self.data_pos]
self.data_pos += 1
+
+
LogHandlers["angle"] = HandleAngle
+
def interpolate(next_val, prev_val, next_time, prev_time, req_time):
vdiff = next_val - prev_val
tdiff = next_time - prev_time
rtdiff = req_time - prev_time
return prev_val + rtdiff * vdiff / tdiff
+
# Extract eddy current data
class HandleEddyCurrent:
SubscriptionIdParts = 2
ParametersMin = 1
ParametersMax = 2
DataSets = [
- ('ldc1612(<name>)', 'Coil resonant frequency'),
- ('ldc1612(<name>,period)', 'Coil resonant period'),
- ('ldc1612(<name>,z)', 'Estimated Z height'),
+ ("ldc1612(<name>)", "Coil resonant frequency"),
+ ("ldc1612(<name>,period)", "Coil resonant period"),
+ ("ldc1612(<name>,z)", "Estimated Z height"),
]
+
def __init__(self, lmanager, name, name_parts):
self.name = name
self.sensor_name = name_parts[1]
@@ -521,18 +588,20 @@ class HandleEddyCurrent:
self.report_frequency = len(name_parts) == 2
self.report_z = len(name_parts) == 3 and name_parts[2] == "z"
self.jdispatch = lmanager.get_jdispatch()
- self.next_samp = self.prev_samp = [0., 0., 0.]
+ self.next_samp = self.prev_samp = [0.0, 0.0, 0.0]
self.cur_data = []
self.data_pos = 0
+
def get_label(self):
if self.report_frequency:
- label = '%s frequency' % (self.sensor_name,)
- return {'label': label, 'units': 'Frequency\n(Hz)'}
+ label = "%s frequency" % (self.sensor_name,)
+ return {"label": label, "units": "Frequency\n(Hz)"}
if self.report_z:
- label = '%s height' % (self.sensor_name,)
- return {'label': label, 'units': 'Position\n(mm)'}
- label = '%s period' % (self.sensor_name,)
- return {'label': label, 'units': 'Period\n(s)'}
+ label = "%s height" % (self.sensor_name,)
+ return {"label": label, "units": "Position\n(mm)"}
+ label = "%s period" % (self.sensor_name,)
+ return {"label": label, "units": "Period\n(s)"}
+
def pull_data(self, req_time):
while 1:
next_time, next_freq, next_z = self.next_samp
@@ -545,21 +614,22 @@ class HandleEddyCurrent:
next_val = next_z
prev_val = prev_z
else:
- next_val = 1. / next_freq
- prev_val = 1. / prev_freq
- return interpolate(next_val, prev_val, next_time, prev_time,
- req_time)
+ next_val = 1.0 / next_freq
+ prev_val = 1.0 / prev_freq
+ return interpolate(next_val, prev_val, next_time, prev_time, req_time)
if self.data_pos >= len(self.cur_data):
# Read next data block
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
- return 0.
- self.cur_data = jmsg['data']
+ return 0.0
+ self.cur_data = jmsg["data"]
self.data_pos = 0
continue
self.prev_samp = self.next_samp
self.next_samp = self.cur_data[self.data_pos]
self.data_pos += 1
+
+
LogHandlers["ldc1612"] = HandleEddyCurrent
@@ -567,15 +637,18 @@ LogHandlers["ldc1612"] = HandleEddyCurrent
# Log reading
######################################################################
+
# Read, uncompress, and parse messages in a log built by data_logger.py
class JsonLogReader:
def __init__(self, filename):
self.file = open(filename, "rb")
self.comp = zlib.decompressobj(31)
self.msgs = [b""]
+
def seek(self, pos):
self.file.seek(pos)
self.comp = zlib.decompressobj(-15)
+
def pull_msg(self):
msgs = self.msgs
while 1:
@@ -591,55 +664,61 @@ class JsonLogReader:
if not raw_data:
return None
data = self.comp.decompress(raw_data)
- parts = data.split(b'\x03')
+ parts = data.split(b"\x03")
parts[0] = msgs[0] + parts[0]
self.msgs = msgs = parts
+
# Store messages in per-subscription queues until handlers are ready for them
class JsonDispatcher:
def __init__(self, log_prefix):
self.names = {}
self.queues = {}
- self.last_read_time = 0.
+ self.last_read_time = 0.0
self.log_reader = JsonLogReader(log_prefix + ".json.gz")
self.is_eof = False
+
def check_end_of_data(self):
return self.is_eof and not any(self.queues.values())
+
def add_handler(self, name, subscription_id):
self.names[name] = q = []
self.queues.setdefault(subscription_id, []).append(q)
+
def pull_msg(self, req_time, name):
q = self.names[name]
while 1:
if q:
return q.pop(0)
- if req_time + 1. < self.last_read_time:
+ if req_time + 1.0 < self.last_read_time:
return None
json_msg = self.log_reader.pull_msg()
if json_msg is None:
self.is_eof = True
return None
- qid = json_msg.get('q')
- if qid == 'status':
- pt = json_msg.get('toolhead', {}).get('estimated_print_time')
+ qid = json_msg.get("q")
+ if qid == "status":
+ pt = json_msg.get("toolhead", {}).get("estimated_print_time")
if pt is not None:
self.last_read_time = pt
for mq in self.queues.get(qid, []):
- mq.append(json_msg['params'])
+ mq.append(json_msg["params"])
######################################################################
# Dataset and log tracking
######################################################################
+
# Tracking of get_status messages
class TrackStatus:
def __init__(self, lmanager, name, start_status):
self.name = name
self.jdispatch = lmanager.get_jdispatch()
- self.next_status_time = 0.
+ self.next_status_time = 0.0
self.status = dict(start_status)
self.next_update = {}
+
def pull_status(self, req_time):
status = self.status
while 1:
@@ -652,32 +731,35 @@ class TrackStatus:
self.next_status_time = req_time + 0.100
self.next_update = {}
return status, self.next_status_time
- self.next_update = jmsg['status']
- th = self.next_update.get('toolhead', {})
- self.next_status_time = th.get('estimated_print_time', 0.)
+ self.next_update = jmsg["status"]
+ th = self.next_update.get("toolhead", {})
+ self.next_status_time = th.get("estimated_print_time", 0.0)
+
# Split a string by commas while keeping parenthesis intact
def param_split(line):
out = []
level = prev = 0
for i, c in enumerate(line):
- if not level and c == ',':
+ if not level and c == ",":
out.append(line[prev:i])
- prev = i+1
- elif c == '(':
+ prev = i + 1
+ elif c == "(":
level += 1
- elif level and c== ')':
+ elif level and c == ")":
level -= 1
out.append(line[prev:])
return out
+
# Split a dataset name (eg, "abc(def,ghi)") into parts
def name_split(name):
- if '(' not in name or not name.endswith(')'):
+ if "(" not in name or not name.endswith(")"):
raise error("Malformed dataset name '%s'" % (name,))
- aname, aparams = name.split('(', 1)
+ aname, aparams = name.split("(", 1)
return [aname] + param_split(aparams[:-1])
+
# Return a description of possible datasets
def list_datasets():
datasets = []
@@ -685,58 +767,69 @@ def list_datasets():
datasets += LogHandlers[lh].DataSets
return datasets
+
# Main log access management
class LogManager:
error = error
+
def __init__(self, log_prefix):
self.index_reader = JsonLogReader(log_prefix + ".index.gz")
self.jdispatch = JsonDispatcher(log_prefix)
- self.initial_start_time = self.start_time = 0.
+ self.initial_start_time = self.start_time = 0.0
self.datasets = {}
self.initial_status = {}
self.start_status = {}
self.log_subscriptions = {}
self.status_tracker = None
+
def setup_index(self):
fmsg = self.index_reader.pull_msg()
- self.initial_status = status = fmsg['status']
+ self.initial_status = status = fmsg["status"]
self.start_status = dict(status)
- start_time = status['toolhead']['estimated_print_time']
+ start_time = status["toolhead"]["estimated_print_time"]
self.initial_start_time = self.start_time = start_time
- self.log_subscriptions = fmsg.get('subscriptions', {})
+ self.log_subscriptions = fmsg.get("subscriptions", {})
+
def get_initial_status(self):
return self.initial_status
+
def available_dataset_types(self):
return {name: None for name in LogHandlers}
+
def get_jdispatch(self):
return self.jdispatch
+
def seek_time(self, req_time):
self.start_time = req_start_time = self.initial_start_time + req_time
start_status = self.start_status
- seek_time = max(self.initial_start_time, req_start_time - 1.)
+ seek_time = max(self.initial_start_time, req_start_time - 1.0)
file_position = 0
while 1:
fmsg = self.index_reader.pull_msg()
if fmsg is None:
break
- th = fmsg['status']['toolhead']
- ptime = max(th['estimated_print_time'], th.get('print_time', 0.))
+ th = fmsg["status"]["toolhead"]
+ ptime = max(th["estimated_print_time"], th.get("print_time", 0.0))
if ptime > seek_time:
break
for k, v in fmsg["status"].items():
start_status.setdefault(k, {}).update(v)
- file_position = fmsg['file_position']
+ file_position = fmsg["file_position"]
if file_position:
self.jdispatch.log_reader.seek(file_position)
+
def get_initial_start_time(self):
return self.initial_start_time
+
def get_start_time(self):
return self.start_time
+
def get_status_tracker(self):
if self.status_tracker is None:
self.status_tracker = TrackStatus(self, "status", self.start_status)
self.jdispatch.add_handler("status", "status")
return self.status_tracker
+
def setup_dataset(self, name):
if name in self.datasets:
return self.datasets[name]
@@ -748,7 +841,7 @@ class LogManager:
if len_pp < cls.ParametersMin or len_pp > cls.ParametersMax:
raise error("Invalid number of parameters for '%s'" % (name,))
if cls.SubscriptionIdParts:
- subscription_id = ":".join(name_parts[:cls.SubscriptionIdParts])
+ subscription_id = ":".join(name_parts[: cls.SubscriptionIdParts])
if subscription_id not in self.log_subscriptions:
raise error("Dataset '%s' not in capture" % (subscription_id,))
self.jdispatch.add_handler(name, subscription_id)