# Code for reading data logs produced by data_logger.py # # Copyright (C) 2021 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. import json, zlib class error(Exception): pass ###################################################################### # Log data handlers ###################################################################### # Log data handlers: {name: class, ...} LogHandlers = {} # Extract status fields from log class HandleStatusField: SubscriptionIdParts = 0 ParametersMin = ParametersMax = 1 DataSets = [ ("status()", "A get_status field name (separate by periods)"), ] def __init__(self, lmanager, name, name_parts): self.status_tracker = lmanager.get_status_tracker() self.field_name = name_parts[1] self.field_parts = name_parts[1].split(".") self.next_update_time = 0.0 self.result = None def get_label(self): label = "%s field" % (self.field_name,) return {"label": label, "units": "Unknown"} def pull_data(self, req_time): if req_time < self.next_update_time: return self.result db, next_update_time = self.status_tracker.pull_status(req_time) for fp in self.field_parts[:-1]: db = db.get(fp, {}) self.result = db.get(self.field_parts[-1], 0.0) self.next_update_time = next_update_time return self.result LogHandlers["status"] = HandleStatusField # Extract requested position, velocity, and accel from a trapq log class HandleTrapQ: SubscriptionIdParts = 2 ParametersMin = ParametersMax = 2 DataSets = [ ("trapq(,velocity)", "Requested velocity for the given trapq"), ("trapq(,accel)", "Requested acceleration for the given trapq"), ("trapq(,)", "Requested axis (x, y, or z) position"), ("trapq(,_velocity)", "Requested axis velocity"), ("trapq(,_accel)", "Requested axis acceleration"), ] def __init__(self, lmanager, name, name_parts): self.name = name self.jdispatch = lmanager.get_jdispatch() self.cur_data = [(0.0, 0.0, 0.0, 0.0, (0.0, 0.0, 0.0), (0.0, 0.0, 0.0))] self.data_pos = 0 tq, trapq_name, datasel = name_parts ptypes = {} ptypes["velocity"] = { "label": "%s velocity" % (trapq_name,), "units": "Velocity\n(mm/s)", "func": self._pull_velocity, } ptypes["accel"] = { "label": "%s acceleration" % (trapq_name,), "units": "Acceleration\n(mm/s^2)", "func": self._pull_accel, } for axis, name in enumerate("xyz"): ptypes["%s" % (name,)] = { "label": "%s %s position" % (trapq_name, name), "axis": axis, "units": "Position\n(mm)", "func": self._pull_axis_position, } ptypes["%s_velocity" % (name,)] = { "label": "%s %s velocity" % (trapq_name, name), "axis": axis, "units": "Velocity\n(mm/s)", "func": self._pull_axis_velocity, } ptypes["%s_accel" % (name,)] = { "label": "%s %s acceleration" % (trapq_name, name), "axis": axis, "units": "Acceleration\n(mm/s^2)", "func": self._pull_axis_accel, } pinfo = ptypes.get(datasel) if pinfo is None: raise error("Unknown trapq data selection '%s'" % (datasel,)) self.label = {"label": pinfo["label"], "units": pinfo["units"]} self.axis = pinfo.get("axis") self.pull_data = pinfo["func"] def get_label(self): return self.label def _find_move(self, req_time): data_pos = self.data_pos while 1: move = self.cur_data[data_pos] print_time, move_t, start_v, accel, start_pos, axes_r = move if req_time <= print_time + move_t: return move, req_time >= print_time data_pos += 1 if data_pos < len(self.cur_data): self.data_pos = data_pos continue jmsg = self.jdispatch.pull_msg(req_time, self.name) if jmsg is None: return move, False self.cur_data = jmsg["data"] self.data_pos = data_pos = 0 def _pull_axis_position(self, req_time): move, in_range = self._find_move(req_time) print_time, move_t, start_v, accel, start_pos, axes_r = move mtime = max(0.0, min(move_t, req_time - print_time)) dist = (start_v + 0.5 * accel * mtime) * mtime return start_pos[self.axis] + axes_r[self.axis] * dist def _pull_axis_velocity(self, req_time): move, in_range = self._find_move(req_time) if not in_range: return 0.0 print_time, move_t, start_v, accel, start_pos, axes_r = move return (start_v + accel * (req_time - print_time)) * axes_r[self.axis] def _pull_axis_accel(self, req_time): move, in_range = self._find_move(req_time) if not in_range: return 0.0 print_time, move_t, start_v, accel, start_pos, axes_r = move return accel * axes_r[self.axis] def _pull_velocity(self, req_time): move, in_range = self._find_move(req_time) if not in_range: return 0.0 print_time, move_t, start_v, accel, start_pos, axes_r = move return start_v + accel * (req_time - print_time) def _pull_accel(self, req_time): move, in_range = self._find_move(req_time) if not in_range: return 0.0 print_time, move_t, start_v, accel, start_pos, axes_r = move return accel LogHandlers["trapq"] = HandleTrapQ # Extract positions from queue_step log class HandleStepQ: SubscriptionIdParts = 2 ParametersMin = 1 ParametersMax = 2 DataSets = [ ("stepq()", "Commanded position of the given stepper"), ("stepq(,