aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/motan/analyzers.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/motan/analyzers.py')
-rw-r--r--scripts/motan/analyzers.py225
1 files changed, 142 insertions, 83 deletions
diff --git a/scripts/motan/analyzers.py b/scripts/motan/analyzers.py
index 2796362f..917cb032 100644
--- a/scripts/motan/analyzers.py
+++ b/scripts/motan/analyzers.py
@@ -14,49 +14,57 @@ import readlog
# Analyzer handlers: {name: class, ...}
AHandlers = {}
+
# Calculate a derivative (position to velocity, or velocity to accel)
class GenDerivative:
ParametersMin = ParametersMax = 1
DataSets = [
- ('derivative(<dataset>)', 'Derivative of the given dataset'),
+ ("derivative(<dataset>)", "Derivative of the given dataset"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source = name_parts[1]
amanager.setup_dataset(self.source)
+
def get_label(self):
label = self.amanager.get_label(self.source)
- lname = label['label']
- units = label['units']
- if '(mm)' in units:
- rep = [('Position', 'Velocity'), ('(mm)', '(mm/s)')]
- elif '(mm/s)' in units:
- rep = [('Velocity', 'Acceleration'), ('(mm/s)', '(mm/s^2)')]
+ lname = label["label"]
+ units = label["units"]
+ if "(mm)" in units:
+ rep = [("Position", "Velocity"), ("(mm)", "(mm/s)")]
+ elif "(mm/s)" in units:
+ rep = [("Velocity", "Acceleration"), ("(mm/s)", "(mm/s^2)")]
else:
- return {'label': 'Derivative', 'units': 'Unknown'}
+ return {"label": "Derivative", "units": "Unknown"}
for old, new in rep:
lname = lname.replace(old, new).replace(old.lower(), new.lower())
units = units.replace(old, new).replace(old.lower(), new.lower())
- return {'label': lname, 'units': units}
+ return {"label": lname, "units": units}
+
def generate_data(self):
- inv_seg_time = 1. / self.amanager.get_segment_time()
+ inv_seg_time = 1.0 / self.amanager.get_segment_time()
data = self.amanager.get_datasets()[self.source]
- deriv = [(data[i+1] - data[i]) * inv_seg_time
- for i in range(len(data)-1)]
+ deriv = [(data[i + 1] - data[i]) * inv_seg_time for i in range(len(data) - 1)]
return [deriv[0]] + deriv
+
+
AHandlers["derivative"] = GenDerivative
+
# Calculate an integral (accel to velocity, or velocity to position)
class GenIntegral:
ParametersMin = 1
ParametersMax = 3
DataSets = [
- ('integral(<dataset>)', 'Integral of the given dataset'),
- ('integral(<dataset1>,<dataset2>)',
- 'Integral with dataset2 as reference'),
- ('integral(<dataset1>,<dataset2>,<half_life>)',
- 'Integral with weighted half-life time'),
+ ("integral(<dataset>)", "Integral of the given dataset"),
+ ("integral(<dataset1>,<dataset2>)", "Integral with dataset2 as reference"),
+ (
+ "integral(<dataset1>,<dataset2>,<half_life>)",
+ "Integral with weighted half-life time",
+ ),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source = name_parts[1]
@@ -68,54 +76,58 @@ class GenIntegral:
amanager.setup_dataset(self.ref)
if len(name_parts) == 4:
self.half_life = float(name_parts[3])
+
def get_label(self):
label = self.amanager.get_label(self.source)
- lname = label['label']
- units = label['units']
- if '(mm/s)' in units:
- rep = [('Velocity', 'Position'), ('(mm/s)', '(mm)')]
- elif '(mm/s^2)' in units:
- rep = [('Acceleration', 'Velocity'), ('(mm/s^2)', '(mm/s)')]
+ lname = label["label"]
+ units = label["units"]
+ if "(mm/s)" in units:
+ rep = [("Velocity", "Position"), ("(mm/s)", "(mm)")]
+ elif "(mm/s^2)" in units:
+ rep = [("Acceleration", "Velocity"), ("(mm/s^2)", "(mm/s)")]
else:
- return {'label': 'Integral', 'units': 'Unknown'}
+ return {"label": "Integral", "units": "Unknown"}
for old, new in rep:
lname = lname.replace(old, new).replace(old.lower(), new.lower())
units = units.replace(old, new).replace(old.lower(), new.lower())
- return {'label': lname, 'units': units}
+ return {"label": lname, "units": units}
+
def generate_data(self):
seg_time = self.amanager.get_segment_time()
src = self.amanager.get_datasets()[self.source]
offset = sum(src) / len(src)
- total = 0.
+ total = 0.0
ref = None
if self.ref is not None:
ref = self.amanager.get_datasets()[self.ref]
offset -= (ref[-1] - ref[0]) / (len(src) * seg_time)
total = ref[0]
- src_weight = 1.
+ src_weight = 1.0
if self.half_life:
- src_weight = math.exp(math.log(.5) * seg_time / self.half_life)
- ref_weight = 1. - src_weight
- data = [0.] * len(src)
+ src_weight = math.exp(math.log(0.5) * seg_time / self.half_life)
+ ref_weight = 1.0 - src_weight
+ data = [0.0] * len(src)
for i, v in enumerate(src):
total += (v - offset) * seg_time
if ref is not None:
total = src_weight * total + ref_weight * ref[i]
data[i] = total
return data
+
+
AHandlers["integral"] = GenIntegral
+
# Calculate a pointwise 2-norm of several datasets (e.g. compute velocity or
# accel from its x, y,... components)
class GenNorm2:
ParametersMin = 2
ParametersMax = 3
DataSets = [
- ('norm2(<dataset1>,<dataset2>)',
- 'pointwise 2-norm of dataset1 and dataset2'),
- ('norm2(<dataset1>,<dataset2>,<dataset3>)',
- 'pointwise 2-norm of 3 datasets'),
+ ("norm2(<dataset1>,<dataset2>)", "pointwise 2-norm of dataset1 and dataset2"),
+ ("norm2(<dataset1>,<dataset2>,<dataset3>)", "pointwise 2-norm of 3 datasets"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.datasets = []
@@ -125,48 +137,56 @@ class GenNorm2:
self.datasets.append(name_parts[3])
for dataset in self.datasets:
amanager.setup_dataset(dataset)
+
def get_label(self):
label = self.amanager.get_label(self.datasets[0])
- units = label['units']
- datas = ['position', 'velocity', 'acceleration']
- data_name = ''
+ units = label["units"]
+ datas = ["position", "velocity", "acceleration"]
+ data_name = ""
for d in datas:
- if d in label['label']:
+ if d in label["label"]:
data_name = d
break
- lname = ''
+ lname = ""
for d in self.datasets:
- l = self.amanager.get_label(d)['label']
+ l = self.amanager.get_label(d)["label"]
for r in datas:
- l = l.replace(r, '').strip()
+ l = l.replace(r, "").strip()
if lname:
- lname += '+'
+ lname += "+"
lname += l
- lname += ' ' + data_name + ' norm2'
- return {'label': lname, 'units': units}
+ lname += " " + data_name + " norm2"
+ return {"label": lname, "units": units}
+
def generate_data(self):
seg_time = self.amanager.get_segment_time()
data = []
for dataset in self.datasets:
data.append(self.amanager.get_datasets()[dataset])
- res = [0.] * len(data[0])
+ res = [0.0] * len(data[0])
for i in range(len(data[0])):
- norm2 = 0.
+ norm2 = 0.0
for dataset in data:
norm2 += dataset[i] * dataset[i]
res[i] = math.sqrt(norm2)
return res
+
+
AHandlers["norm2"] = GenNorm2
+
class GenSmoothed:
ParametersMin = 1
ParametersMax = 2
DataSets = [
- ('smooth(<dataset>)', 'Generate moving weighted average of a dataset'),
- ('smooth(<dataset>,<smooth_time>)',
- 'Generate moving weighted average of a dataset with a given'
- ' smoothing time that defines the window size'),
+ ("smooth(<dataset>)", "Generate moving weighted average of a dataset"),
+ (
+ "smooth(<dataset>,<smooth_time>)",
+ "Generate moving weighted average of a dataset with a given"
+ " smoothing time that defines the window size",
+ ),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source = name_parts[1]
@@ -174,125 +194,152 @@ class GenSmoothed:
self.smooth_time = 0.01
if len(name_parts) > 2:
self.smooth_time = float(name_parts[2])
+
def get_label(self):
label = self.amanager.get_label(self.source)
- return {'label': 'Smoothed ' + label['label'], 'units': label['units']}
+ return {"label": "Smoothed " + label["label"], "units": label["units"]}
+
def generate_data(self):
seg_time = self.amanager.get_segment_time()
src = self.amanager.get_datasets()[self.source]
n = len(src)
- data = [0.] * n
+ data = [0.0] * n
hst = 0.5 * self.smooth_time
seg_half_len = round(hst / seg_time)
- inv_norm = 1. / sum([min(k + 1, seg_half_len + seg_half_len - k)
- for k in range(2 * seg_half_len)])
+ inv_norm = 1.0 / sum(
+ [
+ min(k + 1, seg_half_len + seg_half_len - k)
+ for k in range(2 * seg_half_len)
+ ]
+ )
for i in range(n):
j = max(0, i - seg_half_len)
je = min(n, i + seg_half_len)
- avg_val = 0.
+ avg_val = 0.0
for k, v in enumerate(src[j:je]):
avg_val += v * min(k + 1, seg_half_len + seg_half_len - k)
data[i] = avg_val * inv_norm
return data
+
+
AHandlers["smooth"] = GenSmoothed
+
# Calculate a kinematic stepper position from the toolhead requested position
class GenKinematicPosition:
ParametersMin = ParametersMax = 1
DataSets = [
- ('kin(<stepper>)', 'Stepper position derived from toolhead kinematics'),
+ ("kin(<stepper>)", "Stepper position derived from toolhead kinematics"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
stepper = name_parts[1]
status = self.amanager.get_initial_status()
- kin = status['configfile']['settings']['printer']['kinematics']
- if kin not in ['cartesian', 'corexy']:
+ kin = status["configfile"]["settings"]["printer"]["kinematics"]
+ if kin not in ["cartesian", "corexy"]:
raise amanager.error("Unsupported kinematics '%s'" % (kin,))
- if stepper not in ['stepper_x', 'stepper_y', 'stepper_z']:
+ if stepper not in ["stepper_x", "stepper_y", "stepper_z"]:
raise amanager.error("Unknown stepper '%s'" % (stepper,))
- if kin == 'corexy' and stepper in ['stepper_x', 'stepper_y']:
- self.source1 = 'trapq(toolhead,x)'
- self.source2 = 'trapq(toolhead,y)'
- if stepper == 'stepper_x':
+ if kin == "corexy" and stepper in ["stepper_x", "stepper_y"]:
+ self.source1 = "trapq(toolhead,x)"
+ self.source2 = "trapq(toolhead,y)"
+ if stepper == "stepper_x":
self.generate_data = self.generate_data_corexy_plus
else:
self.generate_data = self.generate_data_corexy_minus
amanager.setup_dataset(self.source1)
amanager.setup_dataset(self.source2)
else:
- self.source1 = 'trapq(toolhead,%s)' % (stepper[-1:],)
+ self.source1 = "trapq(toolhead,%s)" % (stepper[-1:],)
self.source2 = None
self.generate_data = self.generate_data_passthrough
amanager.setup_dataset(self.source1)
+
def get_label(self):
- return {'label': 'Position', 'units': 'Position\n(mm)'}
+ return {"label": "Position", "units": "Position\n(mm)"}
+
def generate_data_corexy_plus(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
return [d1 + d2 for d1, d2 in zip(data1, data2)]
+
def generate_data_corexy_minus(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
return [d1 - d2 for d1, d2 in zip(data1, data2)]
+
def generate_data_passthrough(self):
return self.amanager.get_datasets()[self.source1]
+
+
AHandlers["kin"] = GenKinematicPosition
+
# Calculate a toolhead x/y position from corexy stepper positions
class GenCorexyPosition:
ParametersMin = ParametersMax = 3
DataSets = [
- ('corexy(x,<stepper>,<stepper>)', 'Toolhead x position from steppers'),
- ('corexy(y,<stepper>,<stepper>)', 'Toolhead y position from steppers'),
+ ("corexy(x,<stepper>,<stepper>)", "Toolhead x position from steppers"),
+ ("corexy(y,<stepper>,<stepper>)", "Toolhead y position from steppers"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
- self.is_plus = name_parts[1] == 'x'
+ self.is_plus = name_parts[1] == "x"
self.source1, self.source2 = name_parts[2:]
amanager.setup_dataset(self.source1)
amanager.setup_dataset(self.source2)
+
def get_label(self):
- axis = 'x'
+ axis = "x"
if not self.is_plus:
- axis = 'y'
- return {'label': 'Derived %s position' % (axis,),
- 'units': 'Position\n(mm)'}
+ axis = "y"
+ return {"label": "Derived %s position" % (axis,), "units": "Position\n(mm)"}
+
def generate_data(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
if self.is_plus:
- return [.5 * (d1 + d2) for d1, d2 in zip(data1, data2)]
- return [.5 * (d1 - d2) for d1, d2 in zip(data1, data2)]
+ return [0.5 * (d1 + d2) for d1, d2 in zip(data1, data2)]
+ return [0.5 * (d1 - d2) for d1, d2 in zip(data1, data2)]
+
+
AHandlers["corexy"] = GenCorexyPosition
+
# Calculate a position deviation
class GenDeviation:
ParametersMin = ParametersMax = 2
DataSets = [
- ('deviation(<dataset1>,<dataset2>)', 'Difference between datasets'),
+ ("deviation(<dataset1>,<dataset2>)", "Difference between datasets"),
]
+
def __init__(self, amanager, name_parts):
self.amanager = amanager
self.source1, self.source2 = name_parts[1:]
amanager.setup_dataset(self.source1)
amanager.setup_dataset(self.source2)
+
def get_label(self):
label1 = self.amanager.get_label(self.source1)
label2 = self.amanager.get_label(self.source2)
- if label1['units'] != label2['units']:
- return {'label': 'Deviation', 'units': 'Unknown'}
- parts = label1['units'].split('\n')
- units = '\n'.join([parts[0]] + ['Deviation'] + parts[1:])
- return {'label': label1['label'] + ' deviation', 'units': units}
+ if label1["units"] != label2["units"]:
+ return {"label": "Deviation", "units": "Unknown"}
+ parts = label1["units"].split("\n")
+ units = "\n".join([parts[0]] + ["Deviation"] + parts[1:])
+ return {"label": label1["label"] + " deviation", "units": units}
+
def generate_data(self):
datasets = self.amanager.get_datasets()
data1 = datasets[self.source1]
data2 = datasets[self.source2]
return [d1 - d2 for d1, d2 in zip(data1, data2)]
+
+
AHandlers["deviation"] = GenDeviation
@@ -300,6 +347,7 @@ AHandlers["deviation"] = GenDeviation
# Analyzer management and data generation
######################################################################
+
# Return a description of available analyzers
def list_datasets():
datasets = []
@@ -307,9 +355,11 @@ def list_datasets():
datasets += AHandlers[ah].DataSets
return datasets
+
# Manage raw and generated data samples
class AnalyzerManager:
error = None
+
def __init__(self, lmanager, segment_time):
self.lmanager = lmanager
self.error = lmanager.error
@@ -318,17 +368,23 @@ class AnalyzerManager:
self.gen_datasets = collections.OrderedDict()
self.datasets = {}
self.dataset_times = []
- self.duration = 5.
+ self.duration = 5.0
+
def set_duration(self, duration):
self.duration = duration
+
def get_segment_time(self):
return self.segment_time
+
def get_datasets(self):
return self.datasets
+
def get_dataset_times(self):
return self.dataset_times
+
def get_initial_status(self):
return self.lmanager.get_initial_status()
+
def setup_dataset(self, name):
name = name.strip()
if name in self.raw_datasets:
@@ -350,6 +406,7 @@ class AnalyzerManager:
self.gen_datasets[name] = hdl
self.datasets[name] = []
return hdl
+
def get_label(self, dataset):
hdl = self.raw_datasets.get(dataset)
if hdl is None:
@@ -357,10 +414,12 @@ class AnalyzerManager:
if hdl is None:
raise self.error("Unknown dataset '%s'" % (dataset,))
return hdl.get_label()
+
def generate_datasets(self):
# Generate raw data
- list_hdls = [(self.datasets[name], hdl)
- for name, hdl in self.raw_datasets.items()]
+ list_hdls = [
+ (self.datasets[name], hdl) for name, hdl in self.raw_datasets.items()
+ ]
initial_start_time = self.lmanager.get_initial_start_time()
start_time = t = self.lmanager.get_start_time()
end_time = start_time + self.duration