diff options
Diffstat (limited to 'scripts/motan/analyzers.py')
-rw-r--r-- | scripts/motan/analyzers.py | 225 |
1 files changed, 142 insertions, 83 deletions
diff --git a/scripts/motan/analyzers.py b/scripts/motan/analyzers.py index 2796362f..917cb032 100644 --- a/scripts/motan/analyzers.py +++ b/scripts/motan/analyzers.py @@ -14,49 +14,57 @@ import readlog # Analyzer handlers: {name: class, ...} AHandlers = {} + # Calculate a derivative (position to velocity, or velocity to accel) class GenDerivative: ParametersMin = ParametersMax = 1 DataSets = [ - ('derivative(<dataset>)', 'Derivative of the given dataset'), + ("derivative(<dataset>)", "Derivative of the given dataset"), ] + def __init__(self, amanager, name_parts): self.amanager = amanager self.source = name_parts[1] amanager.setup_dataset(self.source) + def get_label(self): label = self.amanager.get_label(self.source) - lname = label['label'] - units = label['units'] - if '(mm)' in units: - rep = [('Position', 'Velocity'), ('(mm)', '(mm/s)')] - elif '(mm/s)' in units: - rep = [('Velocity', 'Acceleration'), ('(mm/s)', '(mm/s^2)')] + lname = label["label"] + units = label["units"] + if "(mm)" in units: + rep = [("Position", "Velocity"), ("(mm)", "(mm/s)")] + elif "(mm/s)" in units: + rep = [("Velocity", "Acceleration"), ("(mm/s)", "(mm/s^2)")] else: - return {'label': 'Derivative', 'units': 'Unknown'} + return {"label": "Derivative", "units": "Unknown"} for old, new in rep: lname = lname.replace(old, new).replace(old.lower(), new.lower()) units = units.replace(old, new).replace(old.lower(), new.lower()) - return {'label': lname, 'units': units} + return {"label": lname, "units": units} + def generate_data(self): - inv_seg_time = 1. / self.amanager.get_segment_time() + inv_seg_time = 1.0 / self.amanager.get_segment_time() data = self.amanager.get_datasets()[self.source] - deriv = [(data[i+1] - data[i]) * inv_seg_time - for i in range(len(data)-1)] + deriv = [(data[i + 1] - data[i]) * inv_seg_time for i in range(len(data) - 1)] return [deriv[0]] + deriv + + AHandlers["derivative"] = GenDerivative + # Calculate an integral (accel to velocity, or velocity to position) class GenIntegral: ParametersMin = 1 ParametersMax = 3 DataSets = [ - ('integral(<dataset>)', 'Integral of the given dataset'), - ('integral(<dataset1>,<dataset2>)', - 'Integral with dataset2 as reference'), - ('integral(<dataset1>,<dataset2>,<half_life>)', - 'Integral with weighted half-life time'), + ("integral(<dataset>)", "Integral of the given dataset"), + ("integral(<dataset1>,<dataset2>)", "Integral with dataset2 as reference"), + ( + "integral(<dataset1>,<dataset2>,<half_life>)", + "Integral with weighted half-life time", + ), ] + def __init__(self, amanager, name_parts): self.amanager = amanager self.source = name_parts[1] @@ -68,54 +76,58 @@ class GenIntegral: amanager.setup_dataset(self.ref) if len(name_parts) == 4: self.half_life = float(name_parts[3]) + def get_label(self): label = self.amanager.get_label(self.source) - lname = label['label'] - units = label['units'] - if '(mm/s)' in units: - rep = [('Velocity', 'Position'), ('(mm/s)', '(mm)')] - elif '(mm/s^2)' in units: - rep = [('Acceleration', 'Velocity'), ('(mm/s^2)', '(mm/s)')] + lname = label["label"] + units = label["units"] + if "(mm/s)" in units: + rep = [("Velocity", "Position"), ("(mm/s)", "(mm)")] + elif "(mm/s^2)" in units: + rep = [("Acceleration", "Velocity"), ("(mm/s^2)", "(mm/s)")] else: - return {'label': 'Integral', 'units': 'Unknown'} + return {"label": "Integral", "units": "Unknown"} for old, new in rep: lname = lname.replace(old, new).replace(old.lower(), new.lower()) units = units.replace(old, new).replace(old.lower(), new.lower()) - return {'label': lname, 'units': units} + return {"label": lname, "units": units} + def generate_data(self): seg_time = self.amanager.get_segment_time() src = self.amanager.get_datasets()[self.source] offset = sum(src) / len(src) - total = 0. + total = 0.0 ref = None if self.ref is not None: ref = self.amanager.get_datasets()[self.ref] offset -= (ref[-1] - ref[0]) / (len(src) * seg_time) total = ref[0] - src_weight = 1. + src_weight = 1.0 if self.half_life: - src_weight = math.exp(math.log(.5) * seg_time / self.half_life) - ref_weight = 1. - src_weight - data = [0.] * len(src) + src_weight = math.exp(math.log(0.5) * seg_time / self.half_life) + ref_weight = 1.0 - src_weight + data = [0.0] * len(src) for i, v in enumerate(src): total += (v - offset) * seg_time if ref is not None: total = src_weight * total + ref_weight * ref[i] data[i] = total return data + + AHandlers["integral"] = GenIntegral + # Calculate a pointwise 2-norm of several datasets (e.g. compute velocity or # accel from its x, y,... components) class GenNorm2: ParametersMin = 2 ParametersMax = 3 DataSets = [ - ('norm2(<dataset1>,<dataset2>)', - 'pointwise 2-norm of dataset1 and dataset2'), - ('norm2(<dataset1>,<dataset2>,<dataset3>)', - 'pointwise 2-norm of 3 datasets'), + ("norm2(<dataset1>,<dataset2>)", "pointwise 2-norm of dataset1 and dataset2"), + ("norm2(<dataset1>,<dataset2>,<dataset3>)", "pointwise 2-norm of 3 datasets"), ] + def __init__(self, amanager, name_parts): self.amanager = amanager self.datasets = [] @@ -125,48 +137,56 @@ class GenNorm2: self.datasets.append(name_parts[3]) for dataset in self.datasets: amanager.setup_dataset(dataset) + def get_label(self): label = self.amanager.get_label(self.datasets[0]) - units = label['units'] - datas = ['position', 'velocity', 'acceleration'] - data_name = '' + units = label["units"] + datas = ["position", "velocity", "acceleration"] + data_name = "" for d in datas: - if d in label['label']: + if d in label["label"]: data_name = d break - lname = '' + lname = "" for d in self.datasets: - l = self.amanager.get_label(d)['label'] + l = self.amanager.get_label(d)["label"] for r in datas: - l = l.replace(r, '').strip() + l = l.replace(r, "").strip() if lname: - lname += '+' + lname += "+" lname += l - lname += ' ' + data_name + ' norm2' - return {'label': lname, 'units': units} + lname += " " + data_name + " norm2" + return {"label": lname, "units": units} + def generate_data(self): seg_time = self.amanager.get_segment_time() data = [] for dataset in self.datasets: data.append(self.amanager.get_datasets()[dataset]) - res = [0.] * len(data[0]) + res = [0.0] * len(data[0]) for i in range(len(data[0])): - norm2 = 0. + norm2 = 0.0 for dataset in data: norm2 += dataset[i] * dataset[i] res[i] = math.sqrt(norm2) return res + + AHandlers["norm2"] = GenNorm2 + class GenSmoothed: ParametersMin = 1 ParametersMax = 2 DataSets = [ - ('smooth(<dataset>)', 'Generate moving weighted average of a dataset'), - ('smooth(<dataset>,<smooth_time>)', - 'Generate moving weighted average of a dataset with a given' - ' smoothing time that defines the window size'), + ("smooth(<dataset>)", "Generate moving weighted average of a dataset"), + ( + "smooth(<dataset>,<smooth_time>)", + "Generate moving weighted average of a dataset with a given" + " smoothing time that defines the window size", + ), ] + def __init__(self, amanager, name_parts): self.amanager = amanager self.source = name_parts[1] @@ -174,125 +194,152 @@ class GenSmoothed: self.smooth_time = 0.01 if len(name_parts) > 2: self.smooth_time = float(name_parts[2]) + def get_label(self): label = self.amanager.get_label(self.source) - return {'label': 'Smoothed ' + label['label'], 'units': label['units']} + return {"label": "Smoothed " + label["label"], "units": label["units"]} + def generate_data(self): seg_time = self.amanager.get_segment_time() src = self.amanager.get_datasets()[self.source] n = len(src) - data = [0.] * n + data = [0.0] * n hst = 0.5 * self.smooth_time seg_half_len = round(hst / seg_time) - inv_norm = 1. / sum([min(k + 1, seg_half_len + seg_half_len - k) - for k in range(2 * seg_half_len)]) + inv_norm = 1.0 / sum( + [ + min(k + 1, seg_half_len + seg_half_len - k) + for k in range(2 * seg_half_len) + ] + ) for i in range(n): j = max(0, i - seg_half_len) je = min(n, i + seg_half_len) - avg_val = 0. + avg_val = 0.0 for k, v in enumerate(src[j:je]): avg_val += v * min(k + 1, seg_half_len + seg_half_len - k) data[i] = avg_val * inv_norm return data + + AHandlers["smooth"] = GenSmoothed + # Calculate a kinematic stepper position from the toolhead requested position class GenKinematicPosition: ParametersMin = ParametersMax = 1 DataSets = [ - ('kin(<stepper>)', 'Stepper position derived from toolhead kinematics'), + ("kin(<stepper>)", "Stepper position derived from toolhead kinematics"), ] + def __init__(self, amanager, name_parts): self.amanager = amanager stepper = name_parts[1] status = self.amanager.get_initial_status() - kin = status['configfile']['settings']['printer']['kinematics'] - if kin not in ['cartesian', 'corexy']: + kin = status["configfile"]["settings"]["printer"]["kinematics"] + if kin not in ["cartesian", "corexy"]: raise amanager.error("Unsupported kinematics '%s'" % (kin,)) - if stepper not in ['stepper_x', 'stepper_y', 'stepper_z']: + if stepper not in ["stepper_x", "stepper_y", "stepper_z"]: raise amanager.error("Unknown stepper '%s'" % (stepper,)) - if kin == 'corexy' and stepper in ['stepper_x', 'stepper_y']: - self.source1 = 'trapq(toolhead,x)' - self.source2 = 'trapq(toolhead,y)' - if stepper == 'stepper_x': + if kin == "corexy" and stepper in ["stepper_x", "stepper_y"]: + self.source1 = "trapq(toolhead,x)" + self.source2 = "trapq(toolhead,y)" + if stepper == "stepper_x": self.generate_data = self.generate_data_corexy_plus else: self.generate_data = self.generate_data_corexy_minus amanager.setup_dataset(self.source1) amanager.setup_dataset(self.source2) else: - self.source1 = 'trapq(toolhead,%s)' % (stepper[-1:],) + self.source1 = "trapq(toolhead,%s)" % (stepper[-1:],) self.source2 = None self.generate_data = self.generate_data_passthrough amanager.setup_dataset(self.source1) + def get_label(self): - return {'label': 'Position', 'units': 'Position\n(mm)'} + return {"label": "Position", "units": "Position\n(mm)"} + def generate_data_corexy_plus(self): datasets = self.amanager.get_datasets() data1 = datasets[self.source1] data2 = datasets[self.source2] return [d1 + d2 for d1, d2 in zip(data1, data2)] + def generate_data_corexy_minus(self): datasets = self.amanager.get_datasets() data1 = datasets[self.source1] data2 = datasets[self.source2] return [d1 - d2 for d1, d2 in zip(data1, data2)] + def generate_data_passthrough(self): return self.amanager.get_datasets()[self.source1] + + AHandlers["kin"] = GenKinematicPosition + # Calculate a toolhead x/y position from corexy stepper positions class GenCorexyPosition: ParametersMin = ParametersMax = 3 DataSets = [ - ('corexy(x,<stepper>,<stepper>)', 'Toolhead x position from steppers'), - ('corexy(y,<stepper>,<stepper>)', 'Toolhead y position from steppers'), + ("corexy(x,<stepper>,<stepper>)", "Toolhead x position from steppers"), + ("corexy(y,<stepper>,<stepper>)", "Toolhead y position from steppers"), ] + def __init__(self, amanager, name_parts): self.amanager = amanager - self.is_plus = name_parts[1] == 'x' + self.is_plus = name_parts[1] == "x" self.source1, self.source2 = name_parts[2:] amanager.setup_dataset(self.source1) amanager.setup_dataset(self.source2) + def get_label(self): - axis = 'x' + axis = "x" if not self.is_plus: - axis = 'y' - return {'label': 'Derived %s position' % (axis,), - 'units': 'Position\n(mm)'} + axis = "y" + return {"label": "Derived %s position" % (axis,), "units": "Position\n(mm)"} + def generate_data(self): datasets = self.amanager.get_datasets() data1 = datasets[self.source1] data2 = datasets[self.source2] if self.is_plus: - return [.5 * (d1 + d2) for d1, d2 in zip(data1, data2)] - return [.5 * (d1 - d2) for d1, d2 in zip(data1, data2)] + return [0.5 * (d1 + d2) for d1, d2 in zip(data1, data2)] + return [0.5 * (d1 - d2) for d1, d2 in zip(data1, data2)] + + AHandlers["corexy"] = GenCorexyPosition + # Calculate a position deviation class GenDeviation: ParametersMin = ParametersMax = 2 DataSets = [ - ('deviation(<dataset1>,<dataset2>)', 'Difference between datasets'), + ("deviation(<dataset1>,<dataset2>)", "Difference between datasets"), ] + def __init__(self, amanager, name_parts): self.amanager = amanager self.source1, self.source2 = name_parts[1:] amanager.setup_dataset(self.source1) amanager.setup_dataset(self.source2) + def get_label(self): label1 = self.amanager.get_label(self.source1) label2 = self.amanager.get_label(self.source2) - if label1['units'] != label2['units']: - return {'label': 'Deviation', 'units': 'Unknown'} - parts = label1['units'].split('\n') - units = '\n'.join([parts[0]] + ['Deviation'] + parts[1:]) - return {'label': label1['label'] + ' deviation', 'units': units} + if label1["units"] != label2["units"]: + return {"label": "Deviation", "units": "Unknown"} + parts = label1["units"].split("\n") + units = "\n".join([parts[0]] + ["Deviation"] + parts[1:]) + return {"label": label1["label"] + " deviation", "units": units} + def generate_data(self): datasets = self.amanager.get_datasets() data1 = datasets[self.source1] data2 = datasets[self.source2] return [d1 - d2 for d1, d2 in zip(data1, data2)] + + AHandlers["deviation"] = GenDeviation @@ -300,6 +347,7 @@ AHandlers["deviation"] = GenDeviation # Analyzer management and data generation ###################################################################### + # Return a description of available analyzers def list_datasets(): datasets = [] @@ -307,9 +355,11 @@ def list_datasets(): datasets += AHandlers[ah].DataSets return datasets + # Manage raw and generated data samples class AnalyzerManager: error = None + def __init__(self, lmanager, segment_time): self.lmanager = lmanager self.error = lmanager.error @@ -318,17 +368,23 @@ class AnalyzerManager: self.gen_datasets = collections.OrderedDict() self.datasets = {} self.dataset_times = [] - self.duration = 5. + self.duration = 5.0 + def set_duration(self, duration): self.duration = duration + def get_segment_time(self): return self.segment_time + def get_datasets(self): return self.datasets + def get_dataset_times(self): return self.dataset_times + def get_initial_status(self): return self.lmanager.get_initial_status() + def setup_dataset(self, name): name = name.strip() if name in self.raw_datasets: @@ -350,6 +406,7 @@ class AnalyzerManager: self.gen_datasets[name] = hdl self.datasets[name] = [] return hdl + def get_label(self, dataset): hdl = self.raw_datasets.get(dataset) if hdl is None: @@ -357,10 +414,12 @@ class AnalyzerManager: if hdl is None: raise self.error("Unknown dataset '%s'" % (dataset,)) return hdl.get_label() + def generate_datasets(self): # Generate raw data - list_hdls = [(self.datasets[name], hdl) - for name, hdl in self.raw_datasets.items()] + list_hdls = [ + (self.datasets[name], hdl) for name, hdl in self.raw_datasets.items() + ] initial_start_time = self.lmanager.get_initial_start_time() start_time = t = self.lmanager.get_start_time() end_time = start_time + self.duration |