diff options
Diffstat (limited to 'scripts/motan/data_logger.py')
-rwxr-xr-x | scripts/motan/data_logger.py | 88 |
1 files changed, 59 insertions, 29 deletions
diff --git a/scripts/motan/data_logger.py b/scripts/motan/data_logger.py index fd4de7a5..00023c2f 100755 --- a/scripts/motan/data_logger.py +++ b/scripts/motan/data_logger.py @@ -7,7 +7,8 @@ import sys, os, optparse, socket, select, json, errno, time, zlib INDEX_UPDATE_TIME = 5.0 -ClientInfo = {'program': 'motan_data_logger', 'version': 'v0.1'} +ClientInfo = {"program": "motan_data_logger", "version": "v0.1"} + def webhook_socket_create(uds_filename): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -20,25 +21,28 @@ def webhook_socket_create(uds_filename): if e.errno == errno.ECONNREFUSED: time.sleep(0.1) continue - sys.stderr.write("Unable to connect socket %s [%d,%s]\n" - % (uds_filename, e.errno, - errno.errorcode[e.errno])) + sys.stderr.write( + "Unable to connect socket %s [%d,%s]\n" + % (uds_filename, e.errno, errno.errorcode[e.errno]) + ) sys.exit(-1) break sys.stderr.write("Connection.\n") return sock + class LogWriter: def __init__(self, filename): self.file = open(filename, "wb") - self.comp = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, - zlib.DEFLATED, 31) + self.comp = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, 31) self.raw_pos = self.file_pos = 0 + def add_data(self, data): d = self.comp.compress(data + b"\x03") self.file.write(d) self.file_pos += len(d) self.raw_pos += len(data) + 1 + def flush(self, flag=zlib.Z_FULL_FLUSH): if not self.raw_pos: return self.file_pos @@ -46,12 +50,14 @@ class LogWriter: self.file.write(d) self.file_pos += len(d) return self.file_pos + def close(self): self.flush(zlib.Z_FINISH) self.file.close() self.file = None self.comp = None + class DataLogger: def __init__(self, uds_filename, log_prefix): # IO @@ -67,23 +73,26 @@ class DataLogger: self.async_handlers = {} # get_status databasing self.db = {} - self.next_index_time = 0. + self.next_index_time = 0.0 # Start login process - self.send_query("info", "info", {"client_info": ClientInfo}, - self.handle_info) + self.send_query("info", "info", {"client_info": ClientInfo}, self.handle_info) + def error(self, msg): sys.stderr.write(msg + "\n") + def finish(self, msg): self.error(msg) self.logger.close() self.index.close() sys.exit(0) + # Unix Domain Socket IO def send_query(self, msg_id, method, params, cb): self.query_handlers[msg_id] = cb msg = {"id": msg_id, "method": method, "params": params} - cm = json.dumps(msg, separators=(',', ':')).encode() + cm = json.dumps(msg, separators=(",", ":")).encode() self.webhook_socket.send(cm + b"\x03") + def process_socket(self): data = self.webhook_socket.recv(4096) if not data: @@ -113,15 +122,17 @@ class DataLogger: self.flush_index() continue self.error("ERROR: Message with unknown id") + def run(self): try: while 1: - res = self.poll.poll(1000.) + res = self.poll.poll(1000.0) for fd, event in res: if fd == self.webhook_socket.fileno(): self.process_socket() except KeyboardInterrupt as e: self.finish("Keyboard Interrupt") + # Query response handlers def send_subscribe(self, msg_id, method, params, cb=None, async_cb=None): if cb is None: @@ -130,14 +141,22 @@ class DataLogger: self.async_handlers[msg_id] = async_cb params["response_template"] = {"q": msg_id} self.send_query(msg_id, method, params, cb) + def handle_info(self, msg, raw_msg): if msg["result"]["state"] != "ready": self.finish("Klipper not in ready state") self.send_query("list", "objects/list", {}, self.handle_list) + def handle_list(self, msg, raw_msg): subreq = {o: None for o in msg["result"]["objects"]} - self.send_subscribe("status", "objects/subscribe", {"objects": subreq}, - self.handle_subscribe, self.handle_async_db) + self.send_subscribe( + "status", + "objects/subscribe", + {"objects": subreq}, + self.handle_subscribe, + self.handle_async_db, + ) + def handle_subscribe(self, msg, raw_msg): result = msg["result"] self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME @@ -145,15 +164,17 @@ class DataLogger: # Subscribe to trapq and stepper queue updates motion_report = status.get("motion_report", {}) for trapq in motion_report.get("trapq", []): - self.send_subscribe("trapq:" + trapq, "motion_report/dump_trapq", - {"name": trapq}) + self.send_subscribe( + "trapq:" + trapq, "motion_report/dump_trapq", {"name": trapq} + ) for stepper in motion_report.get("steppers", []): - self.send_subscribe("stepq:" + stepper, - "motion_report/dump_stepper", {"name": stepper}) + self.send_subscribe( + "stepq:" + stepper, "motion_report/dump_stepper", {"name": stepper} + ) # Subscribe to additional sensor data stypes = ["adxl345", "lis2dw", "mpu9250", "angle"] - stypes = {st:st for st in stypes} - stypes['probe_eddy_current'] = 'ldc1612' + stypes = {st: st for st in stypes} + stypes["probe_eddy_current"] = "ldc1612" config = status["configfile"]["settings"] for cfgname in config.keys(): for capprefix, st in sorted(stypes.items()): @@ -163,30 +184,37 @@ class DataLogger: qcmd = "%s/dump_%s" % (st, st) self.send_subscribe(lname, qcmd, {"sensor": aname}) if cfgname.startswith("tmc"): - driver = ' '.join(cfgname.split()[1:]) - self.send_subscribe("stallguard:" + driver, - "tmc/stallguard_dump", {"name": driver}) + driver = " ".join(cfgname.split()[1:]) + self.send_subscribe( + "stallguard:" + driver, "tmc/stallguard_dump", {"name": driver} + ) + def handle_dump(self, msg, raw_msg): msg_id = msg["id"] if "result" not in msg: - self.error("Unable to subscribe to '%s': %s" - % (msg_id, msg.get("error", {}).get("message", ""))) + self.error( + "Unable to subscribe to '%s': %s" + % (msg_id, msg.get("error", {}).get("message", "")) + ) return self.db.setdefault("subscriptions", {})[msg_id] = msg["result"] + def flush_index(self): - self.db['file_position'] = self.logger.flush() - self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode()) + self.db["file_position"] = self.logger.flush() + self.index.add_data(json.dumps(self.db, separators=(",", ":")).encode()) self.db = {"status": {}} + def handle_async_db(self, msg, raw_msg): params = msg["params"] - db_status = self.db['status'] + db_status = self.db["status"] for k, v in params.get("status", {}).items(): db_status.setdefault(k, {}).update(v) - eventtime = params['eventtime'] + eventtime = params["eventtime"] if eventtime >= self.next_index_time: self.next_index_time = eventtime + INDEX_UPDATE_TIME self.flush_index() + def nice(): try: # Try to re-nice writing process @@ -194,6 +222,7 @@ def nice(): except: pass + def main(): usage = "%prog [options] <socket filename> <log name>" opts = optparse.OptionParser(usage) @@ -205,5 +234,6 @@ def main(): dl = DataLogger(args[0], args[1]) dl.run() -if __name__ == '__main__': + +if __name__ == "__main__": main() |