aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/canbus_query.py
blob: 12032ee7c188dc025049a086a8fac315bb5f788e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python2
# Tool to query CAN bus uuids
#
# Copyright (C) 2021  Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import sys, os, optparse, time
import can

CANBUS_ID_ADMIN = 0x3F0
CMD_QUERY_UNASSIGNED = 0x00
RESP_NEED_NODEID = 0x20
CMD_SET_KLIPPER_NODEID = 0x01
CMD_SET_CANBOOT_NODEID = 0x11


def query_unassigned(canbus_iface):
    # Open CAN socket
    filters = [{"can_id": CANBUS_ID_ADMIN + 1, "can_mask": 0x7FF, "extended": False}]
    bus = can.interface.Bus(
        channel=canbus_iface, can_filters=filters, bustype="socketcan"
    )
    # Send query
    msg = can.Message(
        arbitration_id=CANBUS_ID_ADMIN,
        data=[CMD_QUERY_UNASSIGNED],
        is_extended_id=False,
    )
    bus.send(msg)
    # Read responses
    found_ids = {}
    start_time = curtime = time.time()
    while 1:
        tdiff = start_time + 2.0 - curtime
        if tdiff <= 0.0:
            break
        msg = bus.recv(tdiff)
        curtime = time.time()
        if (
            msg is None
            or msg.arbitration_id != CANBUS_ID_ADMIN + 1
            or msg.dlc < 7
            or msg.data[0] != RESP_NEED_NODEID
        ):
            continue
        uuid = sum([v << ((5 - i) * 8) for i, v in enumerate(msg.data[1:7])])
        if uuid in found_ids:
            continue
        found_ids[uuid] = 1
        AppNames = {
            CMD_SET_KLIPPER_NODEID: "Klipper",
            CMD_SET_CANBOOT_NODEID: "CanBoot",
        }
        app_id = CMD_SET_KLIPPER_NODEID
        if msg.dlc > 7:
            app_id = msg.data[7]
        app_name = AppNames.get(app_id, "Unknown")
        sys.stdout.write(
            "Found canbus_uuid=%012x, Application: %s\n" % (uuid, app_name)
        )
    sys.stdout.write(
        "Total %d uuids found\n"
        % (
            len(
                found_ids,
            )
        )
    )


def main():
    usage = "%prog [options] <can interface>"
    opts = optparse.OptionParser(usage)
    options, args = opts.parse_args()
    if len(args) != 1:
        opts.error("Incorrect number of arguments")
    canbus_iface = args[0]
    query_unassigned(canbus_iface)


if __name__ == "__main__":
    main()