aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/parsecandump.py
blob: 4ae862dcacd3e865ec626fe6685b2b40efac4667 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/python3
# Check for out of order timestamps in the output of candump
#
# Copyright (C) 2023  Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import sys, os, optparse


def import_msgproto():
    global msgproto
    # Load msgproto.py module
    kdir = os.path.join(os.path.dirname(__file__), "..", "klippy")
    sys.path.append(kdir)
    import msgproto


def read_dictionary(filename):
    dfile = open(filename, "rb")
    dictionary = dfile.read()
    dfile.close()
    return dictionary


def report(msg, line_info, name="", is_err=False):
    line_number, line_time = line_info
    warn = ""
    if is_err:
        warn = " WARN"
    sys.stdout.write(
        "%04d:%010.6f:%s%s %s\n" % (line_number, line_time, name, warn, msg)
    )


class canscan:
    def __init__(self, name, mp):
        self.name = name
        self.mp = mp
        self.data = bytearray()
        self.need_scan = False

    def handle_data(self, line_info, line, newdata):
        data = self.data
        data += bytearray(newdata)
        while 1:
            if self.need_scan:
                drop = len(data)
                syncpos = data.find(msgproto.MESSAGE_SYNC)
                if syncpos >= 0:
                    drop = syncpos + 1
                    self.need_scan = False
                disc = ["%02X" % (d,) for d in data[:drop]]
                report(
                    "Discarding %d (%s)" % (drop, " ".join(disc)),
                    line_info,
                    self.name,
                    is_err=True,
                )
                data[:drop] = []
                if not data:
                    break
            l = self.mp.check_packet(data)
            if l == 0:
                break
            if l < 0:
                report(
                    "Invalid data: %s" % (line.strip(),),
                    line_info,
                    self.name,
                    is_err=True,
                )
                self.need_scan = True
                continue
            if l == 5:
                report("Ack %02x" % (data[1],), line_info, self.name)
            else:
                msgs = self.mp.dump(data[:l])
                report("%d: %s" % (l, ", ".join(msgs)), line_info, self.name)
            data[:l] = []


def read_candump(canfile, canid, dictionary):
    mp = msgproto.MessageParser()
    mp.process_identify(dictionary, decompress=False)
    rxid = "%03X" % (canid | 1,)
    txid = "%03X" % (canid & ~1,)
    handlers = {rxid: canscan("RX", mp), txid: canscan("TX", mp)}

    last_time = -1.0
    line_number = 0
    must_scan = False
    data = bytearray()
    for line in canfile:
        line_number += 1
        parts = line.split()
        if len(parts) < 7:
            if parts:
                report(
                    "Ignoring line: %s" % (line.strip(),),
                    (line_number, 0.0),
                    is_err=True,
                )
            continue
        p_ts = parts[0]
        p_canid = parts[5]
        p_len = parts[6]
        p_data = parts[7:]
        if (
            not p_ts.startswith("(")
            or not p_ts.endswith(")")
            or not p_len.startswith("[")
            or not p_len.endswith("]")
        ):
            report(
                "Ignoring line: %s" % (line.strip(),), (line_number, 0.0), is_err=True
            )
            continue
        new_time = float(p_ts[1:-1])
        line_info = (line_number, new_time)
        if new_time < last_time:
            report(
                "Backwards time %.6f vs %.6f: %s" % (new_time, last_time, line.strip()),
                line_info,
                is_err=True,
            )
        last_time = new_time

        hdlr = handlers.get(p_canid)
        if hdlr is not None:
            newdata = [int(i, 16) for i in p_data]
            hdlr.handle_data(line_info, line, newdata)


def main():
    usage = "%prog <candump.log> <canid> <mcu.dict>"
    opts = optparse.OptionParser(usage)
    options, args = opts.parse_args()
    if len(args) != 3:
        opts.error("Incorrect number of arguments")
    canfilename, canid, dictfilename = args
    canid = int(canid, 16)

    import_msgproto()
    dictionary = read_dictionary(dictfilename)

    canfile = open(canfilename, "r")
    read_candump(canfile, canid, dictionary)
    canfile.close()


if __name__ == "__main__":
    main()