aboutsummaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/logextract.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/scripts/logextract.py b/scripts/logextract.py
index 95e87da7..85632d2a 100755
--- a/scripts/logextract.py
+++ b/scripts/logextract.py
@@ -46,6 +46,103 @@ class GatherConfig:
######################################################################
+# TMC UART message parsing
+######################################################################
+
+uart_r = re.compile(r"tmcuart_(?:send|response) oid=[0-9]+ (?:read|write)=")
+
+class TMCUartHelper:
+ def _calc_crc8(self, data):
+ # Generate a CRC8-ATM value for a bytearray
+ crc = 0
+ for b in data:
+ for i in range(8):
+ if (crc >> 7) ^ (b & 0x01):
+ crc = (crc << 1) ^ 0x07
+ else:
+ crc = (crc << 1)
+ crc &= 0xff
+ b >>= 1
+ return crc
+ def _add_serial_bits(self, data):
+ # Add serial start and stop bits to a message in a bytearray
+ out = 0
+ pos = 0
+ for d in data:
+ b = (d << 1) | 0x200
+ out |= (b << pos)
+ pos += 10
+ res = bytearray()
+ for i in range((pos+7)//8):
+ res.append((out >> (i*8)) & 0xff)
+ return res
+ def _encode_read(self, sync, addr, reg):
+ # Generate a uart read register message
+ msg = bytearray([sync, addr, reg])
+ msg.append(self._calc_crc8(msg))
+ return self._add_serial_bits(msg)
+ def _encode_write(self, sync, addr, reg, val):
+ # Generate a uart write register message
+ msg = bytearray([sync, addr, reg, (val >> 24) & 0xff,
+ (val >> 16) & 0xff, (val >> 8) & 0xff, val & 0xff])
+ msg.append(self._calc_crc8(msg))
+ return self._add_serial_bits(msg)
+ def _decode_read(self, data):
+ # Extract a uart read request message
+ if len(data) != 5:
+ return
+ # Convert data into a long integer for easy manipulation
+ mval = pos = 0
+ for d in bytearray(data):
+ mval |= d << pos
+ pos += 8
+ # Extract register value
+ addr = (mval >> 11) & 0xff
+ reg = (mval >> 21) & 0xff
+ # Verify start/stop bits and crc
+ encoded_data = self._encode_read(0xf5, addr, reg)
+ if data != encoded_data:
+ return "Invalid: %s" % (self.pretty_print(addr, reg),)
+ return self.pretty_print(addr, reg)
+ def _decode_reg(self, data):
+ # Extract a uart read response message
+ if len(data) != 10:
+ return
+ # Convert data into a long integer for easy manipulation
+ mval = pos = 0
+ for d in bytearray(data):
+ mval |= d << pos
+ pos += 8
+ # Extract register value
+ addr = (mval >> 11) & 0xff
+ reg = (mval >> 21) & 0xff
+ val = ((((mval >> 31) & 0xff) << 24) | (((mval >> 41) & 0xff) << 16)
+ | (((mval >> 51) & 0xff) << 8) | ((mval >> 61) & 0xff))
+ sync = 0xf5
+ if addr == 0xff:
+ sync = 0x05
+ # Verify start/stop bits and crc
+ encoded_data = self._encode_write(sync, addr, reg, val)
+ if data != encoded_data:
+ #print("Got %s vs %s" % (repr(data), repr(encoded_data)))
+ return "Invalid:%s" % (self.pretty_print(addr, reg, val),)
+ return self.pretty_print(addr, reg, val)
+ def pretty_print(self, addr, reg, val=None):
+ if val is None:
+ return "(%x@%x)" % (reg, addr)
+ return "(%x@%x=%08x)" % (reg, addr, val)
+ def parse_msg(self, msg):
+ data = bytearray(msg)
+ if len(data) == 10:
+ return self._decode_reg(data)
+ elif len(data) == 5:
+ return self._decode_read(data)
+ elif len(data) == 0:
+ return ""
+ return "(length?)"
+
+
+######################################################################
# Shutdown extraction
######################################################################
@@ -69,6 +166,9 @@ clock_r = re.compile(r"^clocksync state: .* clock_est=\((?P<st>[^ ]+)"
+ r" (?P<sc>[0-9]+) (?P<f>[^ ]+)\)")
repl_seq_r = re.compile(r": seq: 1" + shortseq_s)
repl_clock_r = re.compile(r"clock=(?P<clock>[0-9]+)(?: |$)")
+repl_uart_r = re.compile(r"tmcuart_(?:response|send) oid=[0-9]+"
+ + r" (?:read|write)=(?P<msg>(?:'[^']*'"
+ + r'|"[^"]*"))(?: |$)')
mcu_r = re.compile(r"MCU '(?P<mcu>[^']+)' (is_)?shutdown: (?P<reason>.*)$")
def add_high_bits(val, ref, mask):
@@ -178,6 +278,10 @@ class GatherShutdown:
return m.group(0).rstrip() + "(%.6f) " % (
self.trans_clock(int(m.group('clock')), ts),)
line = repl_clock_r.sub(clock_update, line)
+ def uart_update(m):
+ msg = TMCUartHelper().parse_msg(ast.literal_eval(m.group('msg')))
+ return m.group(0).rstrip() + "%s " % (msg,)
+ line = repl_uart_r.sub(uart_update, line)
if mcu_name != 'mcu':
line = "mcu '%s': %s" % (mcu_name, line)
return line