diff options
author | Kevin O'Connor <kevin@koconnor.net> | 2025-01-14 15:26:29 -0500 |
---|---|---|
committer | Kevin O'Connor <kevin@koconnor.net> | 2025-02-02 18:43:34 -0500 |
commit | 2db2ef82f2c0ba08d7e72fbab355de0846272b3a (patch) | |
tree | 72b11156522886a068ba76fb399656270ad02859 /klippy/extras | |
parent | eb0581c2646fde09335615fbed8ed39641b096fc (diff) | |
download | kutter-2db2ef82f2c0ba08d7e72fbab355de0846272b3a.tar.gz kutter-2db2ef82f2c0ba08d7e72fbab355de0846272b3a.tar.xz kutter-2db2ef82f2c0ba08d7e72fbab355de0846272b3a.zip |
canbus_stats: Periodically report canbus interface statistics
Add support for a new get_canbus_status command to canserial.c .
Add new canbus_stats.py module that will periodically query canbus
mcus for connection status information.
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy/extras')
-rw-r--r-- | klippy/extras/canbus_stats.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/klippy/extras/canbus_stats.py b/klippy/extras/canbus_stats.py new file mode 100644 index 00000000..b9ee3102 --- /dev/null +++ b/klippy/extras/canbus_stats.py @@ -0,0 +1,68 @@ +# Report canbus connection status +# +# Copyright (C) 2025 Kevin O'Connor <kevin@koconnor.net> +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +class PrinterCANBusStats: + def __init__(self, config): + self.printer = config.get_printer() + self.reactor = self.printer.get_reactor() + self.name = config.get_name().split()[-1] + self.mcu = None + self.get_canbus_status_cmd = None + self.status = {'rx_error': None, 'tx_error': None, 'tx_retries': None, + 'bus_state': None} + self.printer.register_event_handler("klippy:connect", + self.handle_connect) + self.printer.register_event_handler("klippy:shutdown", + self.handle_shutdown) + def handle_shutdown(self): + status = self.status.copy() + if status['bus_state'] is not None: + # Clear bus_state on shutdown to note that the values may be stale + status['bus_state'] = 'unknown' + self.status = status + def handle_connect(self): + # Lookup mcu + mcu_name = self.name + if mcu_name != 'mcu': + mcu_name = 'mcu ' + mcu_name + self.mcu = self.printer.lookup_object(mcu_name) + # Lookup status query command + if self.mcu.try_lookup_command("get_canbus_status") is None: + return + self.get_canbus_status_cmd = self.mcu.lookup_query_command( + "get_canbus_status", + "canbus_status rx_error=%u tx_error=%u tx_retries=%u" + " canbus_bus_state=%u") + # Register periodic query timer + self.reactor.register_timer(self.query_event, self.reactor.NOW) + def query_event(self, eventtime): + prev_rx = self.status['rx_error'] + prev_tx = self.status['tx_error'] + prev_retries = self.status['tx_retries'] + if prev_rx is None: + prev_rx = prev_tx = prev_retries = 0 + params = self.get_canbus_status_cmd.send() + rx = prev_rx + ((params['rx_error'] - prev_rx) & 0xffffffff) + tx = prev_tx + ((params['tx_error'] - prev_tx) & 0xffffffff) + retries = prev_retries + ((params['tx_retries'] - prev_retries) + & 0xffffffff) + state = params['canbus_bus_state'] + self.status = {'rx_error': rx, 'tx_error': tx, 'tx_retries': retries, + 'bus_state': state} + return self.reactor.monotonic() + 1. + def stats(self, eventtime): + status = self.status + if status['rx_error'] is None: + return (False, '') + return (False, 'canstat_%s: bus_state=%s rx_error=%d' + ' tx_error=%d tx_retries=%d' + % (self.name, status['bus_state'], status['rx_error'], + status['tx_error'], status['tx_retries'])) + def get_status(self, eventtime): + return self.status + +def load_config_prefix(config): + return PrinterCANBusStats(config) |