aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/motan/data_logger.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/motan/data_logger.py')
-rwxr-xr-xscripts/motan/data_logger.py88
1 files changed, 59 insertions, 29 deletions
diff --git a/scripts/motan/data_logger.py b/scripts/motan/data_logger.py
index fd4de7a5..00023c2f 100755
--- a/scripts/motan/data_logger.py
+++ b/scripts/motan/data_logger.py
@@ -7,7 +7,8 @@
import sys, os, optparse, socket, select, json, errno, time, zlib
INDEX_UPDATE_TIME = 5.0
-ClientInfo = {'program': 'motan_data_logger', 'version': 'v0.1'}
+ClientInfo = {"program": "motan_data_logger", "version": "v0.1"}
+
def webhook_socket_create(uds_filename):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -20,25 +21,28 @@ def webhook_socket_create(uds_filename):
if e.errno == errno.ECONNREFUSED:
time.sleep(0.1)
continue
- sys.stderr.write("Unable to connect socket %s [%d,%s]\n"
- % (uds_filename, e.errno,
- errno.errorcode[e.errno]))
+ sys.stderr.write(
+ "Unable to connect socket %s [%d,%s]\n"
+ % (uds_filename, e.errno, errno.errorcode[e.errno])
+ )
sys.exit(-1)
break
sys.stderr.write("Connection.\n")
return sock
+
class LogWriter:
def __init__(self, filename):
self.file = open(filename, "wb")
- self.comp = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
- zlib.DEFLATED, 31)
+ self.comp = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, 31)
self.raw_pos = self.file_pos = 0
+
def add_data(self, data):
d = self.comp.compress(data + b"\x03")
self.file.write(d)
self.file_pos += len(d)
self.raw_pos += len(data) + 1
+
def flush(self, flag=zlib.Z_FULL_FLUSH):
if not self.raw_pos:
return self.file_pos
@@ -46,12 +50,14 @@ class LogWriter:
self.file.write(d)
self.file_pos += len(d)
return self.file_pos
+
def close(self):
self.flush(zlib.Z_FINISH)
self.file.close()
self.file = None
self.comp = None
+
class DataLogger:
def __init__(self, uds_filename, log_prefix):
# IO
@@ -67,23 +73,26 @@ class DataLogger:
self.async_handlers = {}
# get_status databasing
self.db = {}
- self.next_index_time = 0.
+ self.next_index_time = 0.0
# Start login process
- self.send_query("info", "info", {"client_info": ClientInfo},
- self.handle_info)
+ self.send_query("info", "info", {"client_info": ClientInfo}, self.handle_info)
+
def error(self, msg):
sys.stderr.write(msg + "\n")
+
def finish(self, msg):
self.error(msg)
self.logger.close()
self.index.close()
sys.exit(0)
+
# Unix Domain Socket IO
def send_query(self, msg_id, method, params, cb):
self.query_handlers[msg_id] = cb
msg = {"id": msg_id, "method": method, "params": params}
- cm = json.dumps(msg, separators=(',', ':')).encode()
+ cm = json.dumps(msg, separators=(",", ":")).encode()
self.webhook_socket.send(cm + b"\x03")
+
def process_socket(self):
data = self.webhook_socket.recv(4096)
if not data:
@@ -113,15 +122,17 @@ class DataLogger:
self.flush_index()
continue
self.error("ERROR: Message with unknown id")
+
def run(self):
try:
while 1:
- res = self.poll.poll(1000.)
+ res = self.poll.poll(1000.0)
for fd, event in res:
if fd == self.webhook_socket.fileno():
self.process_socket()
except KeyboardInterrupt as e:
self.finish("Keyboard Interrupt")
+
# Query response handlers
def send_subscribe(self, msg_id, method, params, cb=None, async_cb=None):
if cb is None:
@@ -130,14 +141,22 @@ class DataLogger:
self.async_handlers[msg_id] = async_cb
params["response_template"] = {"q": msg_id}
self.send_query(msg_id, method, params, cb)
+
def handle_info(self, msg, raw_msg):
if msg["result"]["state"] != "ready":
self.finish("Klipper not in ready state")
self.send_query("list", "objects/list", {}, self.handle_list)
+
def handle_list(self, msg, raw_msg):
subreq = {o: None for o in msg["result"]["objects"]}
- self.send_subscribe("status", "objects/subscribe", {"objects": subreq},
- self.handle_subscribe, self.handle_async_db)
+ self.send_subscribe(
+ "status",
+ "objects/subscribe",
+ {"objects": subreq},
+ self.handle_subscribe,
+ self.handle_async_db,
+ )
+
def handle_subscribe(self, msg, raw_msg):
result = msg["result"]
self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME
@@ -145,15 +164,17 @@ class DataLogger:
# Subscribe to trapq and stepper queue updates
motion_report = status.get("motion_report", {})
for trapq in motion_report.get("trapq", []):
- self.send_subscribe("trapq:" + trapq, "motion_report/dump_trapq",
- {"name": trapq})
+ self.send_subscribe(
+ "trapq:" + trapq, "motion_report/dump_trapq", {"name": trapq}
+ )
for stepper in motion_report.get("steppers", []):
- self.send_subscribe("stepq:" + stepper,
- "motion_report/dump_stepper", {"name": stepper})
+ self.send_subscribe(
+ "stepq:" + stepper, "motion_report/dump_stepper", {"name": stepper}
+ )
# Subscribe to additional sensor data
stypes = ["adxl345", "lis2dw", "mpu9250", "angle"]
- stypes = {st:st for st in stypes}
- stypes['probe_eddy_current'] = 'ldc1612'
+ stypes = {st: st for st in stypes}
+ stypes["probe_eddy_current"] = "ldc1612"
config = status["configfile"]["settings"]
for cfgname in config.keys():
for capprefix, st in sorted(stypes.items()):
@@ -163,30 +184,37 @@ class DataLogger:
qcmd = "%s/dump_%s" % (st, st)
self.send_subscribe(lname, qcmd, {"sensor": aname})
if cfgname.startswith("tmc"):
- driver = ' '.join(cfgname.split()[1:])
- self.send_subscribe("stallguard:" + driver,
- "tmc/stallguard_dump", {"name": driver})
+ driver = " ".join(cfgname.split()[1:])
+ self.send_subscribe(
+ "stallguard:" + driver, "tmc/stallguard_dump", {"name": driver}
+ )
+
def handle_dump(self, msg, raw_msg):
msg_id = msg["id"]
if "result" not in msg:
- self.error("Unable to subscribe to '%s': %s"
- % (msg_id, msg.get("error", {}).get("message", "")))
+ self.error(
+ "Unable to subscribe to '%s': %s"
+ % (msg_id, msg.get("error", {}).get("message", ""))
+ )
return
self.db.setdefault("subscriptions", {})[msg_id] = msg["result"]
+
def flush_index(self):
- self.db['file_position'] = self.logger.flush()
- self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode())
+ self.db["file_position"] = self.logger.flush()
+ self.index.add_data(json.dumps(self.db, separators=(",", ":")).encode())
self.db = {"status": {}}
+
def handle_async_db(self, msg, raw_msg):
params = msg["params"]
- db_status = self.db['status']
+ db_status = self.db["status"]
for k, v in params.get("status", {}).items():
db_status.setdefault(k, {}).update(v)
- eventtime = params['eventtime']
+ eventtime = params["eventtime"]
if eventtime >= self.next_index_time:
self.next_index_time = eventtime + INDEX_UPDATE_TIME
self.flush_index()
+
def nice():
try:
# Try to re-nice writing process
@@ -194,6 +222,7 @@ def nice():
except:
pass
+
def main():
usage = "%prog [options] <socket filename> <log name>"
opts = optparse.OptionParser(usage)
@@ -205,5 +234,6 @@ def main():
dl = DataLogger(args[0], args[1])
dl.run()
-if __name__ == '__main__':
+
+if __name__ == "__main__":
main()