aboutsummaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2023-04-14 14:55:35 -0400
committerKevin O'Connor <kevin@koconnor.net>2023-05-30 20:18:41 -0400
commitddf0994ba21b94497800576d623ea4b2ca27cc5a (patch)
treec6609a8b678e168426f7a89d113cb9065410173e /scripts
parent14ff50c94dee95c0a50841e4990528cbe43db247 (diff)
downloadkutter-ddf0994ba21b94497800576d623ea4b2ca27cc5a.tar.gz
kutter-ddf0994ba21b94497800576d623ea4b2ca27cc5a.tar.xz
kutter-ddf0994ba21b94497800576d623ea4b2ca27cc5a.zip
parsecandump: New utility to parse Klipper messages in a candump capture
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/parsecandump.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/scripts/parsecandump.py b/scripts/parsecandump.py
new file mode 100755
index 00000000..d575dc5a
--- /dev/null
+++ b/scripts/parsecandump.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python3
+# Check for out of order timestamps in the output of candump
+#
+# Copyright (C) 2023 Kevin O'Connor <kevin@koconnor.net>
+#
+# This file may be distributed under the terms of the GNU GPLv3 license.
+import sys, os, optparse
+
+def import_msgproto():
+ global msgproto
+ # Load msgproto.py module
+ kdir = os.path.join(os.path.dirname(__file__), '..', 'klippy')
+ sys.path.append(kdir)
+ import msgproto
+
+def read_dictionary(filename):
+ dfile = open(filename, 'rb')
+ dictionary = dfile.read()
+ dfile.close()
+ return dictionary
+
+def report(msg, line_info, name="", is_err=False):
+ line_number, line_time = line_info
+ warn = ""
+ if is_err:
+ warn = " WARN"
+ sys.stdout.write("%04d:%010.6f:%s%s %s\n"
+ % (line_number, line_time, name, warn, msg))
+
+class canscan:
+ def __init__(self, name, mp):
+ self.name = name
+ self.mp = mp
+ self.data = bytearray()
+ self.need_scan = False
+ def handle_data(self, line_info, line, newdata):
+ data = self.data
+ data += bytearray(newdata)
+ while 1:
+ if self.need_scan:
+ drop = len(data)
+ syncpos = data.find(msgproto.MESSAGE_SYNC)
+ if syncpos >= 0:
+ drop = syncpos + 1
+ self.need_scan = False
+ disc = ["%02X" % (d,) for d in data[:drop]]
+ report("Discarding %d (%s)" % (drop, " ".join(disc)),
+ line_info, self.name, is_err=True)
+ data[:drop] = []
+ if not data:
+ break
+ l = self.mp.check_packet(data)
+ if l == 0:
+ break
+ if l < 0:
+ report("Invalid data: %s" % (line.strip(),),
+ line_info, self.name, is_err=True)
+ self.need_scan = True
+ continue
+ if l == 5:
+ report("Ack %02x" % (data[1],), line_info, self.name)
+ else:
+ msgs = self.mp.dump(data[:l])
+ report("%d: %s" % (l, ', '.join(msgs)), line_info, self.name)
+ data[:l] = []
+
+def read_candump(canfile, canid, dictionary):
+ mp = msgproto.MessageParser()
+ mp.process_identify(dictionary, decompress=False)
+ rxid = "%03X" % (canid | 1,)
+ txid = "%03X" % (canid & ~1,)
+ handlers = {rxid: canscan("RX", mp), txid: canscan("TX", mp)}
+
+ last_time = -1.
+ line_number = 0
+ must_scan = False
+ data = bytearray()
+ for line in canfile:
+ line_number += 1
+ parts = line.split()
+ if len(parts) < 7:
+ if parts:
+ report("Ignoring line: %s" % (line.strip(),),
+ (line_number, 0.), is_err=True)
+ continue
+ p_ts = parts[0]
+ p_canid = parts[5]
+ p_len = parts[6]
+ p_data = parts[7:]
+ if (not p_ts.startswith('(') or not p_ts.endswith(')')
+ or not p_len.startswith('[') or not p_len.endswith(']')):
+ report("Ignoring line: %s" % (line.strip(),),
+ (line_number, 0.), is_err=True)
+ continue
+ new_time = float(p_ts[1:-1])
+ line_info = (line_number, new_time)
+ if new_time < last_time:
+ report("Backwards time %.6f vs %.6f: %s"
+ % (new_time, last_time, line.strip()),
+ line_info, is_err=True)
+ last_time = new_time
+
+ hdlr = handlers.get(p_canid)
+ if hdlr is not None:
+ newdata = [int(i, 16) for i in p_data]
+ hdlr.handle_data(line_info, line, newdata)
+
+def main():
+ usage = "%prog <candump.log> <canid> <mcu.dict>"
+ opts = optparse.OptionParser(usage)
+ options, args = opts.parse_args()
+ if len(args) != 3:
+ opts.error("Incorrect number of arguments")
+ canfilename, canid, dictfilename = args
+ canid = int(canid, 16)
+
+ import_msgproto()
+ dictionary = read_dictionary(dictfilename)
+
+ canfile = open(canfilename, "r")
+ read_candump(canfile, canid, dictionary)
+ canfile.close()
+
+if __name__ == '__main__':
+ main()