aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/motan/data_logger.py
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2021-08-23 11:01:56 -0400
committerKevin O'Connor <kevin@koconnor.net>2021-08-24 10:19:12 -0400
commit7aa2c11b3aca649962840dde4155cbc7b9789473 (patch)
tree8734778bc72cc4c6e1f5453b79dc90363e4ff33f /scripts/motan/data_logger.py
parent8e1929649f24e196aaef3fa1f25c3f541a6486ae (diff)
downloadkutter-7aa2c11b3aca649962840dde4155cbc7b9789473.tar.gz
kutter-7aa2c11b3aca649962840dde4155cbc7b9789473.tar.xz
kutter-7aa2c11b3aca649962840dde4155cbc7b9789473.zip
data_logger: Flush the initial index file when all queries complete
This should avoid zero byte index files for small captures. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'scripts/motan/data_logger.py')
-rwxr-xr-xscripts/motan/data_logger.py77
1 files changed, 37 insertions, 40 deletions
diff --git a/scripts/motan/data_logger.py b/scripts/motan/data_logger.py
index 05dcc198..3f23b2e8 100755
--- a/scripts/motan/data_logger.py
+++ b/scripts/motan/data_logger.py
@@ -63,19 +63,14 @@ class DataLogger:
self.logger = LogWriter(log_prefix + ".json.gz")
self.index = LogWriter(log_prefix + ".index.gz")
# Handlers
- self.query_handlers = {
- "info": self.handle_info, "list": self.handle_list,
- "status": self.handle_subscribe,
- }
- self.async_handlers = {
- "status": self.handle_async_db,
- }
+ self.query_handlers = {}
+ self.async_handlers = {}
# get_status databasing
- self.db = {"status": {}}
+ self.db = {}
self.next_index_time = 0.
# Start login process
- self.send_msg({"id": "info", "method": "info",
- "params": { "client_info": ClientInfo }})
+ self.send_query("info", "info", {"client_info": ClientInfo},
+ self.handle_info)
def error(self, msg):
sys.stderr.write(msg + "\n")
def finish(self, msg):
@@ -84,7 +79,9 @@ class DataLogger:
self.index.close()
sys.exit(0)
# Unix Domain Socket IO
- def send_msg(self, msg):
+ def send_query(self, msg_id, method, params, cb):
+ self.query_handlers[msg_id] = cb
+ msg = {"id": msg_id, "method": method, "params": params}
cm = json.dumps(msg, separators=(',', ':')).encode()
self.webhook_socket.send(cm + b"\x03")
def process_socket(self):
@@ -110,7 +107,10 @@ class DataLogger:
msg_id = msg.get("id")
hdl = self.query_handlers.get(msg_id)
if hdl is not None:
+ del self.query_handlers[msg_id]
hdl(msg, part)
+ if not self.query_handlers:
+ self.flush_index()
continue
self.error("ERROR: Message with unknown id")
def run(self):
@@ -123,52 +123,49 @@ class DataLogger:
except KeyboardInterrupt as e:
self.finish("Keyboard Interrupt")
# Query response handlers
+ def send_subscribe(self, msg_id, method, params, cb=None, async_cb=None):
+ if cb is None:
+ cb = self.handle_dump
+ if async_cb is not None:
+ self.async_handlers[msg_id] = async_cb
+ params["response_template"] = {"q": msg_id}
+ self.send_query(msg_id, method, params, cb)
def handle_info(self, msg, raw_msg):
if msg["result"]["state"] != "ready":
self.finish("Klipper not in ready state")
- self.send_msg({"id": "list", "method": "objects/list"})
+ self.send_query("list", "objects/list", {}, self.handle_list)
def handle_list(self, msg, raw_msg):
subreq = {o: None for o in msg["result"]["objects"]}
- self.send_msg({"id": "status", "method": "objects/subscribe",
- "params": { "objects": subreq,
- "response_template": {"q": "status"}}})
+ self.send_subscribe("status", "objects/subscribe", {"objects": subreq},
+ self.handle_subscribe, self.handle_async_db)
def handle_subscribe(self, msg, raw_msg):
result = msg["result"]
self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME
- status = result["status"]
- self.db["status"].update(status)
- motion_report = status.get("motion_report")
- if motion_report is not None:
- for trapq in motion_report.get("trapq", []):
- qname = "trapq:" + trapq
- self.query_handlers[qname] = self.handle_dump
- self.send_msg({"id": qname,
- "method": "motion_report/dump_trapq",
- "params": { "name": trapq,
- "response_template": {"q": qname}}})
- for stepper in motion_report.get("steppers", []):
- qname = "stepq:" + stepper
- self.query_handlers[qname] = self.handle_dump
- self.send_msg({"id": qname,
- "method": "motion_report/dump_stepper",
- "params": { "name": stepper,
- "response_template": {"q": qname}}})
+ self.db["status"] = status = result["status"]
+ # Subscribe to trapq and stepper queue updates
+ motion_report = status.get("motion_report", {})
+ for trapq in motion_report.get("trapq", []):
+ self.send_subscribe("trapq:" + trapq, "motion_report/dump_trapq",
+ {"name": trapq})
+ for stepper in motion_report.get("steppers", []):
+ self.send_subscribe("stepq:" + stepper,
+ "motion_report/dump_stepper", {"name": stepper})
def handle_dump(self, msg, raw_msg):
msg_id = msg["id"]
self.db.setdefault("subscriptions", {})[msg_id] = msg["result"]
+ def flush_index(self):
+ self.db['file_position'] = self.logger.flush()
+ self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode())
+ self.db = {"status": {}}
def handle_async_db(self, msg, raw_msg):
params = msg["params"]
db_status = self.db['status']
for k, v in params.get("status", {}).items():
db_status.setdefault(k, {}).update(v)
eventtime = params['eventtime']
- if eventtime < self.next_index_time:
- return
- # Update index file
- self.next_index_time = eventtime + INDEX_UPDATE_TIME
- self.db['file_position'] = self.logger.flush()
- self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode())
- self.db = {"status": {}}
+ if eventtime >= self.next_index_time:
+ self.next_index_time = eventtime + INDEX_UPDATE_TIME
+ self.flush_index()
def nice():
try: